diff --git a/README.md b/README.md index a31523c..7e6c02d 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ ## Install ## -`pip install qrcode sysv-ipc pycstruct brother-ql` +`pip install qrcode brother-ql` ``` cd /root/ diff --git a/output.png b/output.png index b210d77..59eb1bc 100644 Binary files a/output.png and b/output.png differ diff --git a/reHDDPrinter.py b/reHDDPrinter.py index 8bab4bf..899c0e6 100644 --- a/reHDDPrinter.py +++ b/reHDDPrinter.py @@ -3,75 +3,214 @@ """ Author: Hendrik Schutter, localhorst@mosad.xyz Date of creation: 2022/11/23 - Date of last modification: 2022/11/23 + Date of last modification: 2025/06/15 """ -import sysv_ipc -import pycstruct +import ctypes import os import time +import signal +import argparse +import warnings +import logging +from PIL import Image from brother_ql.brother_ql_create import create_label from brother_ql.raster import BrotherQLRaster import layouter -str_buffer_size = 64 #keep this synchronous to reHDD -msg_queue_key = 0x1B11193C0 #keep this synchronous to reHDD +# Suppress deprecation and printer warnings +warnings.filterwarnings("ignore", category=DeprecationWarning) +logging.getLogger("brother_ql").setLevel(logging.ERROR) + +# Constants +STR_BUFFER_SIZE = 64 +MSG_QUEUE_KEY = 0x1B11193C0 +IPC_CREAT = 0o1000 file_name = "output.png" printer_path = "/dev/usb/lp0" -def get_struct_format(): - #keep this synchronous to struct in reHDD - driveData = pycstruct.StructDef() - driveData.add('utf-8', 'driveIndex', length=str_buffer_size) - driveData.add('utf-8', 'driveHours', length=str_buffer_size) - driveData.add('utf-8', 'driveCycles', length=str_buffer_size) - driveData.add('utf-8', 'driveErrors', length=str_buffer_size) - driveData.add('utf-8', 'driveShredTimestamp', length=str_buffer_size) - driveData.add('utf-8', 'driveShredDuration', length=str_buffer_size) - driveData.add('utf-8', 'driveCapacity', length=str_buffer_size) - driveData.add('utf-8', 'driveState', length=str_buffer_size) - driveData.add('utf-8', 'driveModelFamiliy', length=str_buffer_size) - driveData.add('utf-8', 'driveModelName', length=str_buffer_size) - driveData.add('utf-8', 'driveSerialnumber', length=str_buffer_size) - driveData.add('utf-8', 'driveReHddVersion', length=str_buffer_size) - return driveData +terminate = False + + +class TDriveData(ctypes.Structure): + _fields_ = [ + ("caDriveIndex", ctypes.c_char * STR_BUFFER_SIZE), + ("caDriveHours", ctypes.c_char * STR_BUFFER_SIZE), + ("caDriveCycles", ctypes.c_char * STR_BUFFER_SIZE), + ("caDriveErrors", ctypes.c_char * STR_BUFFER_SIZE), + ("caDriveShredTimestamp", ctypes.c_char * STR_BUFFER_SIZE), + ("caDriveShredDuration", ctypes.c_char * STR_BUFFER_SIZE), + ("caDriveCapacity", ctypes.c_char * STR_BUFFER_SIZE), + ("caDriveState", ctypes.c_char * STR_BUFFER_SIZE), + ("caDriveModelFamily", ctypes.c_char * STR_BUFFER_SIZE), + ("caDriveModelName", ctypes.c_char * STR_BUFFER_SIZE), + ("caDriveSerialnumber", ctypes.c_char * STR_BUFFER_SIZE), + ("caDriveReHddVersion", ctypes.c_char * STR_BUFFER_SIZE), + ] + + +class TMsgQueueData(ctypes.Structure): + _fields_ = [ + ("msg_queue_type", ctypes.c_long), + ("driveData", TDriveData), + ] + + +# IPC bindings +libc = ctypes.CDLL("libc.so.6") +msgget = libc.msgget +msgrcv = libc.msgrcv + +msgget.argtypes = [ctypes.c_int, ctypes.c_int] +msgget.restype = ctypes.c_int + +msgrcv.argtypes = [ + ctypes.c_int, + ctypes.POINTER(TMsgQueueData), + ctypes.c_size_t, + ctypes.c_long, + ctypes.c_int, +] +msgrcv.restype = ctypes.c_ssize_t + + +def signal_handler(signum, frame): + global terminate + print(f"Signal {signum} received, terminating...") + terminate = True + + +signal.signal(signal.SIGINT, signal_handler) +signal.signal(signal.SIGTERM, signal_handler) + + +def wait_for_printer(): + while not os.path.exists(printer_path): + print("Printer not found, waiting ...") + time.sleep(30) + return True + + +def create_drive_objects(drive_info): + """Convert dictionary to layouter-compatible DriveData and ReHddInfo objects""" + drive = layouter.DriveData( + drive_index=int(drive_info["driveIndex"]), + drive_state=drive_info["driveState"], + modelfamily=drive_info["driveModelFamily"], + modelname=drive_info["driveModelName"], + capacity=int(drive_info["driveCapacity"]), + serialnumber=drive_info["driveSerialnumber"], + power_on_hours=int(drive_info["driveHours"]), + power_cycle=int(drive_info["driveCycles"]), + smart_error_count=int(drive_info["driveErrors"]), + shred_timestamp=int(drive_info["driveShredTimestamp"]), + shred_duration=int(drive_info["driveShredDuration"]), + ) + + rehdd_info = layouter.ReHddInfo( + link="https://git.mosad.xyz/localhorst/reHDD", + version=drive_info["driveReHddVersion"], + ) + + return drive, rehdd_info + + +def worker(queue_id, test_mode=False): + try: + while not terminate: + if test_mode: + drive_info = { + "driveIndex": "42", + "driveHours": 44, + "driveCycles": 45, + "driveErrors": 43, + "driveShredTimestamp": int(time.time()), + "driveShredDuration": 0, + "driveCapacity": 42, + "driveState": "shredded", + "driveModelFamily": "modelFamily", + "driveModelName": "modelName", + "driveSerialnumber": "serial", + "driveReHddVersion": "V1.1.2", + } + else: + msg = TMsgQueueData() + result = msgrcv( + queue_id, + ctypes.byref(msg), + ctypes.sizeof(TMsgQueueData) - ctypes.sizeof(ctypes.c_long), + 0, + 0, + ) + if result == -1: + err = ctypes.get_errno() + print(f"Error reading from message queue: {os.strerror(err)}") + break + + d = msg.driveData + drive_info = { + "driveIndex": d.caDriveIndex.decode().strip("\x00"), + "driveHours": int(d.caDriveHours.decode().strip("\x00")), + "driveCycles": int(d.caDriveCycles.decode().strip("\x00")), + "driveErrors": int(d.caDriveErrors.decode().strip("\x00")), + "driveShredTimestamp": int( + d.caDriveShredTimestamp.decode().strip("\x00") + ), + "driveShredDuration": int( + d.caDriveShredDuration.decode().strip("\x00") + ), + "driveCapacity": int(d.caDriveCapacity.decode().strip("\x00")), + "driveState": d.caDriveState.decode().strip("\x00"), + "driveModelFamily": d.caDriveModelFamily.decode().strip("\x00"), + "driveModelName": d.caDriveModelName.decode().strip("\x00"), + "driveSerialnumber": d.caDriveSerialnumber.decode().strip("\x00"), + "driveReHddVersion": d.caDriveReHddVersion.decode().strip("\x00"), + } + + print(f"Received Drive Data: {drive_info}") + + drive_obj, rehdd_info = create_drive_objects(drive_info) + layouter.generate_image(drive_obj, rehdd_info, file_name) + + if wait_for_printer(): + qlr = BrotherQLRaster("QL-570") + image = Image.open(file_name) + create_label(qlr, image, "62") + with open(printer_path, "wb") as file: + file.write(qlr.data) + os.remove(file_name) + else: + print("Skipping printing due to printer unavailability.") + + if test_mode: + break + + except Exception as e: + print(f"Worker encountered an error: {e}") + def main(): + parser = argparse.ArgumentParser() + parser.add_argument( + "--test", action="store_true", help="Run in test mode with fake data" + ) + args = parser.parse_args() + + if args.test: + print("Running in test mode.") + worker(None, test_mode=True) + return + try: - mq = sysv_ipc.MessageQueue(msg_queue_key, sysv_ipc.IPC_CREAT) + queue_id = msgget(MSG_QUEUE_KEY, 0) + if queue_id == -1: + raise RuntimeError("Failed to connect to the existing message queue.") - while True: - message, mtype = mq.receive() - driveData = get_struct_format().deserialize(message) + worker(queue_id) + except Exception as e: + print(f"Main process encountered an error: {e}") - rehdd_info = layouter.ReHddInfo("https://git.mosad.xyz/localhorst/reHDD", driveData['driveReHddVersion']) - drive = layouter.DriveData( - drive_index=int(driveData['driveIndex']),\ - drive_state=str(driveData['driveState']),\ - modelfamiliy=str(driveData['driveModelFamiliy']),\ - modelname=str(driveData['driveModelName']),\ - capacity=int(driveData['driveCapacity']),\ - serialnumber=str(driveData['driveSerialnumber']),\ - power_on_hours=int(driveData['driveHours']),\ - power_cycle=int(driveData['driveCycles']),\ - smart_error_count=int(driveData['driveErrors']),\ - shred_timestamp=int(driveData['driveShredTimestamp']),\ - shred_duration=int(driveData['driveShredDuration'])) - - while(not os.path.exists(printer_path)): - print("Printer not found, waiting ...") - time.sleep(30) #sleep 30 - - layouter.generate_image(drive, rehdd_info, file_name) - qlr = BrotherQLRaster("QL-570") - create_label(qlr, file_name, '62') - - with open(printer_path, 'wb') as file: - file.write(qlr.data) - os.remove(file_name) - except sysv_ipc.ExistentialError: - print("ERROR: message queue creation failed") if __name__ == "__main__": main()