From 211bf80b910e5dcf1efd7934ba5f8e2586bbe6ae Mon Sep 17 00:00:00 2001 From: localhorst Date: Sun, 15 Jun 2025 18:03:51 +0200 Subject: [PATCH] use libc IPC --- README.md | 2 +- reHDDPrinter.py | 203 +++++++++++++++++++++++++++++++----------------- 2 files changed, 133 insertions(+), 72 deletions(-) diff --git a/README.md b/README.md index a31523c..7e6c02d 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ ## Install ## -`pip install qrcode sysv-ipc pycstruct brother-ql` +`pip install qrcode brother-ql` ``` cd /root/ diff --git a/reHDDPrinter.py b/reHDDPrinter.py index d4a610e..899c0e6 100644 --- a/reHDDPrinter.py +++ b/reHDDPrinter.py @@ -3,28 +3,36 @@ """ Author: Hendrik Schutter, localhorst@mosad.xyz Date of creation: 2022/11/23 - Date of last modification: 2025/06/08 + Date of last modification: 2025/06/15 """ import ctypes import os import time +import signal +import argparse +import warnings +import logging +from PIL import Image from brother_ql.brother_ql_create import create_label from brother_ql.raster import BrotherQLRaster import layouter -# Constants -STR_BUFFER_SIZE = 64 # Buffer size for each field in t_driveData -MSG_QUEUE_KEY = 0x1B11193C0 # Message queue key from C code +# Suppress deprecation and printer warnings +warnings.filterwarnings("ignore", category=DeprecationWarning) +logging.getLogger("brother_ql").setLevel(logging.ERROR) -# IPC Constants -IPC_CREAT = 0o1000 # Required for msgget +# Constants +STR_BUFFER_SIZE = 64 +MSG_QUEUE_KEY = 0x1B11193C0 +IPC_CREAT = 0o1000 file_name = "output.png" printer_path = "/dev/usb/lp0" +terminate = False + -# Define ctypes structure for t_driveData class TDriveData(ctypes.Structure): _fields_ = [ ("caDriveIndex", ctypes.c_char * STR_BUFFER_SIZE), @@ -42,15 +50,14 @@ class TDriveData(ctypes.Structure): ] -# Define ctypes structure for t_msgQueueData class TMsgQueueData(ctypes.Structure): _fields_ = [ - ("msg_queue_type", ctypes.c_long), # Message type - ("driveData", TDriveData), # Drive data + ("msg_queue_type", ctypes.c_long), + ("driveData", TDriveData), ] -# Define ctypes bindings for Linux IPC +# IPC bindings libc = ctypes.CDLL("libc.so.6") msgget = libc.msgget msgrcv = libc.msgrcv @@ -68,84 +75,138 @@ msgrcv.argtypes = [ msgrcv.restype = ctypes.c_ssize_t -def worker(queue_id): - """ - Worker process to read messages from the IPC queue and print them. - """ +def signal_handler(signum, frame): + global terminate + print(f"Signal {signum} received, terminating...") + terminate = True + + +signal.signal(signal.SIGINT, signal_handler) +signal.signal(signal.SIGTERM, signal_handler) + + +def wait_for_printer(): + while not os.path.exists(printer_path): + print("Printer not found, waiting ...") + time.sleep(30) + return True + + +def create_drive_objects(drive_info): + """Convert dictionary to layouter-compatible DriveData and ReHddInfo objects""" + drive = layouter.DriveData( + drive_index=int(drive_info["driveIndex"]), + drive_state=drive_info["driveState"], + modelfamily=drive_info["driveModelFamily"], + modelname=drive_info["driveModelName"], + capacity=int(drive_info["driveCapacity"]), + serialnumber=drive_info["driveSerialnumber"], + power_on_hours=int(drive_info["driveHours"]), + power_cycle=int(drive_info["driveCycles"]), + smart_error_count=int(drive_info["driveErrors"]), + shred_timestamp=int(drive_info["driveShredTimestamp"]), + shred_duration=int(drive_info["driveShredDuration"]), + ) + + rehdd_info = layouter.ReHddInfo( + link="https://git.mosad.xyz/localhorst/reHDD", + version=drive_info["driveReHddVersion"], + ) + + return drive, rehdd_info + + +def worker(queue_id, test_mode=False): try: - while True: - # Prepare to receive a message - msg = TMsgQueueData() - result = msgrcv( - queue_id, - ctypes.byref(msg), - ctypes.sizeof(TMsgQueueData) - ctypes.sizeof(ctypes.c_long), - 0, - 0, - ) - if result == -1: - print("Error reading from message queue.") - break + while not terminate: + if test_mode: + drive_info = { + "driveIndex": "42", + "driveHours": 44, + "driveCycles": 45, + "driveErrors": 43, + "driveShredTimestamp": int(time.time()), + "driveShredDuration": 0, + "driveCapacity": 42, + "driveState": "shredded", + "driveModelFamily": "modelFamily", + "driveModelName": "modelName", + "driveSerialnumber": "serial", + "driveReHddVersion": "V1.1.2", + } + else: + msg = TMsgQueueData() + result = msgrcv( + queue_id, + ctypes.byref(msg), + ctypes.sizeof(TMsgQueueData) - ctypes.sizeof(ctypes.c_long), + 0, + 0, + ) + if result == -1: + err = ctypes.get_errno() + print(f"Error reading from message queue: {os.strerror(err)}") + break - # Extract drive data - driveData = msg.driveData + d = msg.driveData + drive_info = { + "driveIndex": d.caDriveIndex.decode().strip("\x00"), + "driveHours": int(d.caDriveHours.decode().strip("\x00")), + "driveCycles": int(d.caDriveCycles.decode().strip("\x00")), + "driveErrors": int(d.caDriveErrors.decode().strip("\x00")), + "driveShredTimestamp": int( + d.caDriveShredTimestamp.decode().strip("\x00") + ), + "driveShredDuration": int( + d.caDriveShredDuration.decode().strip("\x00") + ), + "driveCapacity": int(d.caDriveCapacity.decode().strip("\x00")), + "driveState": d.caDriveState.decode().strip("\x00"), + "driveModelFamily": d.caDriveModelFamily.decode().strip("\x00"), + "driveModelName": d.caDriveModelName.decode().strip("\x00"), + "driveSerialnumber": d.caDriveSerialnumber.decode().strip("\x00"), + "driveReHddVersion": d.caDriveReHddVersion.decode().strip("\x00"), + } - # Convert drive data fields from bytes to strings - drive_info = { - "driveIndex": driveData.caDriveIndex.decode().strip("\x00"), - "driveHours": int(driveData.caDriveHours.decode().strip("\x00")), - "driveCycles": int(driveData.caDriveCycles.decode().strip("\x00")), - "driveErrors": int(driveData.caDriveErrors.decode().strip("\x00")), - "driveShredTimestamp": int( - driveData.caDriveShredTimestamp.decode().strip("\x00") - ), - "driveShredDuration": int( - driveData.caDriveShredDuration.decode().strip("\x00") - ), - "driveCapacity": int(driveData.caDriveCapacity.decode().strip("\x00")), - "driveState": driveData.caDriveState.decode().strip("\x00"), - "driveModelFamily": driveData.caDriveModelFamily.decode().strip("\x00"), - "driveModelName": driveData.caDriveModelName.decode().strip("\x00"), - "driveSerialnumber": driveData.caDriveSerialnumber.decode().strip( - "\x00" - ), - "driveReHddVersion": driveData.caDriveReHddVersion.decode().strip( - "\x00" - ), - } - - #rehdd_info = layouter.ReHddInfo("https://git.mosad.xyz/localhorst/reHDD", driveData["driveReHddVersion"]) - - # Print the drive information print(f"Received Drive Data: {drive_info}") - # while(not os.path.exists(printer_path)): - # print("Printer not found, waiting ...") - # time.sleep(30) #sleep 30 + drive_obj, rehdd_info = create_drive_objects(drive_info) + layouter.generate_image(drive_obj, rehdd_info, file_name) - # layouter.generate_image(drive_info, rehdd_info, file_name) - # qlr = BrotherQLRaster("QL-570") - # create_label(qlr, file_name, '62') + if wait_for_printer(): + qlr = BrotherQLRaster("QL-570") + image = Image.open(file_name) + create_label(qlr, image, "62") + with open(printer_path, "wb") as file: + file.write(qlr.data) + os.remove(file_name) + else: + print("Skipping printing due to printer unavailability.") - # with open(printer_path, 'wb') as file: - # file.write(qlr.data) - # os.remove(file_name) + if test_mode: + break except Exception as e: print(f"Worker encountered an error: {e}") def main(): - """ - Main function to connect to the existing IPC queue and start the worker process. - """ + parser = argparse.ArgumentParser() + parser.add_argument( + "--test", action="store_true", help="Run in test mode with fake data" + ) + args = parser.parse_args() + + if args.test: + print("Running in test mode.") + worker(None, test_mode=True) + return + try: - # Connect to the existing message queue queue_id = msgget(MSG_QUEUE_KEY, 0) if queue_id == -1: raise RuntimeError("Failed to connect to the existing message queue.") - # Start the worker process worker(queue_id) except Exception as e: print(f"Main process encountered an error: {e}")