#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Author: Hendrik Schutter, localhorst@mosad.xyz Date of creation: 2022/11/23 Date of last modification: 2025/06/08 """ import ctypes import os import time from brother_ql.brother_ql_create import create_label from brother_ql.raster import BrotherQLRaster import layouter # Constants STR_BUFFER_SIZE = 64 # Buffer size for each field in t_driveData MSG_QUEUE_KEY = 0x1B11193C0 # Message queue key from C code # IPC Constants IPC_CREAT = 0o1000 # Required for msgget file_name = "output.png" printer_path = "/dev/usb/lp0" # Define ctypes structure for t_driveData class TDriveData(ctypes.Structure): _fields_ = [ ("caDriveIndex", ctypes.c_char * STR_BUFFER_SIZE), ("caDriveHours", ctypes.c_char * STR_BUFFER_SIZE), ("caDriveCycles", ctypes.c_char * STR_BUFFER_SIZE), ("caDriveErrors", ctypes.c_char * STR_BUFFER_SIZE), ("caDriveShredTimestamp", ctypes.c_char * STR_BUFFER_SIZE), ("caDriveShredDuration", ctypes.c_char * STR_BUFFER_SIZE), ("caDriveCapacity", ctypes.c_char * STR_BUFFER_SIZE), ("caDriveState", ctypes.c_char * STR_BUFFER_SIZE), ("caDriveModelFamily", ctypes.c_char * STR_BUFFER_SIZE), ("caDriveModelName", ctypes.c_char * STR_BUFFER_SIZE), ("caDriveSerialnumber", ctypes.c_char * STR_BUFFER_SIZE), ("caDriveReHddVersion", ctypes.c_char * STR_BUFFER_SIZE), ] # Define ctypes structure for t_msgQueueData class TMsgQueueData(ctypes.Structure): _fields_ = [ ("msg_queue_type", ctypes.c_long), # Message type ("driveData", TDriveData), # Drive data ] # Define ctypes bindings for Linux IPC libc = ctypes.CDLL("libc.so.6") msgget = libc.msgget msgrcv = libc.msgrcv msgget.argtypes = [ctypes.c_int, ctypes.c_int] msgget.restype = ctypes.c_int msgrcv.argtypes = [ ctypes.c_int, ctypes.POINTER(TMsgQueueData), ctypes.c_size_t, ctypes.c_long, ctypes.c_int, ] msgrcv.restype = ctypes.c_ssize_t def worker(queue_id): """ Worker process to read messages from the IPC queue and print them. """ try: while True: # Prepare to receive a message msg = TMsgQueueData() result = msgrcv( queue_id, ctypes.byref(msg), ctypes.sizeof(TMsgQueueData) - ctypes.sizeof(ctypes.c_long), 0, 0, ) if result == -1: print("Error reading from message queue.") break # Extract drive data driveData = msg.driveData # Convert drive data fields from bytes to strings drive_info = { "driveIndex": driveData.caDriveIndex.decode().strip("\x00"), "driveHours": int(driveData.caDriveHours.decode().strip("\x00")), "driveCycles": int(driveData.caDriveCycles.decode().strip("\x00")), "driveErrors": int(driveData.caDriveErrors.decode().strip("\x00")), "driveShredTimestamp": int( driveData.caDriveShredTimestamp.decode().strip("\x00") ), "driveShredDuration": int( driveData.caDriveShredDuration.decode().strip("\x00") ), "driveCapacity": int(driveData.caDriveCapacity.decode().strip("\x00")), "driveState": driveData.caDriveState.decode().strip("\x00"), "driveModelFamily": driveData.caDriveModelFamily.decode().strip("\x00"), "driveModelName": driveData.caDriveModelName.decode().strip("\x00"), "driveSerialnumber": driveData.caDriveSerialnumber.decode().strip( "\x00" ), "driveReHddVersion": driveData.caDriveReHddVersion.decode().strip( "\x00" ), } #rehdd_info = layouter.ReHddInfo("https://git.mosad.xyz/localhorst/reHDD", driveData["driveReHddVersion"]) # Print the drive information print(f"Received Drive Data: {drive_info}") # while(not os.path.exists(printer_path)): # print("Printer not found, waiting ...") # time.sleep(30) #sleep 30 # layouter.generate_image(drive_info, rehdd_info, file_name) # qlr = BrotherQLRaster("QL-570") # create_label(qlr, file_name, '62') # with open(printer_path, 'wb') as file: # file.write(qlr.data) # os.remove(file_name) except Exception as e: print(f"Worker encountered an error: {e}") def main(): """ Main function to connect to the existing IPC queue and start the worker process. """ try: # Connect to the existing message queue queue_id = msgget(MSG_QUEUE_KEY, 0) if queue_id == -1: raise RuntimeError("Failed to connect to the existing message queue.") # Start the worker process worker(queue_id) except Exception as e: print(f"Main process encountered an error: {e}") if __name__ == "__main__": main()