#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Author: Hendrik Schutter, localhorst@mosad.xyz Date of creation: 2022/11/23 Date of last modification: 2025/06/15 """ import ctypes import os import time import signal import argparse import warnings import logging from PIL import Image from brother_ql.brother_ql_create import create_label from brother_ql.raster import BrotherQLRaster import layouter # Suppress deprecation and printer warnings warnings.filterwarnings("ignore", category=DeprecationWarning) logging.getLogger("brother_ql").setLevel(logging.ERROR) # Constants STR_BUFFER_SIZE = 64 MSG_QUEUE_KEY = 0x1B11193C0 IPC_CREAT = 0o1000 file_name = "output.png" printer_path = "/dev/usb/lp0" terminate = False class TDriveData(ctypes.Structure): _fields_ = [ ("caDriveIndex", ctypes.c_char * STR_BUFFER_SIZE), ("caDriveHours", ctypes.c_char * STR_BUFFER_SIZE), ("caDriveCycles", ctypes.c_char * STR_BUFFER_SIZE), ("caDriveErrors", ctypes.c_char * STR_BUFFER_SIZE), ("caDriveShredTimestamp", ctypes.c_char * STR_BUFFER_SIZE), ("caDriveShredDuration", ctypes.c_char * STR_BUFFER_SIZE), ("caDriveCapacity", ctypes.c_char * STR_BUFFER_SIZE), ("caDriveState", ctypes.c_char * STR_BUFFER_SIZE), ("caDriveModelFamily", ctypes.c_char * STR_BUFFER_SIZE), ("caDriveModelName", ctypes.c_char * STR_BUFFER_SIZE), ("caDriveSerialnumber", ctypes.c_char * STR_BUFFER_SIZE), ("caDriveReHddVersion", ctypes.c_char * STR_BUFFER_SIZE), ] class TMsgQueueData(ctypes.Structure): _fields_ = [ ("msg_queue_type", ctypes.c_long), ("driveData", TDriveData), ] # IPC bindings libc = ctypes.CDLL("libc.so.6") msgget = libc.msgget msgrcv = libc.msgrcv msgget.argtypes = [ctypes.c_int, ctypes.c_int] msgget.restype = ctypes.c_int msgrcv.argtypes = [ ctypes.c_int, ctypes.POINTER(TMsgQueueData), ctypes.c_size_t, ctypes.c_long, ctypes.c_int, ] msgrcv.restype = ctypes.c_ssize_t def signal_handler(signum, frame): global terminate print(f"Signal {signum} received, terminating...") terminate = True signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) def wait_for_printer(): while not os.path.exists(printer_path): print("Printer not found, waiting ...") time.sleep(30) return True def create_drive_objects(drive_info): """Convert dictionary to layouter-compatible DriveData and ReHddInfo objects""" drive = layouter.DriveData( drive_index=int(drive_info["driveIndex"]), drive_state=drive_info["driveState"], modelfamily=drive_info["driveModelFamily"], modelname=drive_info["driveModelName"], capacity=int(drive_info["driveCapacity"]), serialnumber=drive_info["driveSerialnumber"], power_on_hours=int(drive_info["driveHours"]), power_cycle=int(drive_info["driveCycles"]), smart_error_count=int(drive_info["driveErrors"]), shred_timestamp=int(drive_info["driveShredTimestamp"]), shred_duration=int(drive_info["driveShredDuration"]), ) rehdd_info = layouter.ReHddInfo( link="https://git.mosad.xyz/localhorst/reHDD", version=drive_info["driveReHddVersion"], ) return drive, rehdd_info def worker(queue_id, test_mode=False): try: while not terminate: if test_mode: drive_info = { "driveIndex": "42", "driveHours": 44, "driveCycles": 45, "driveErrors": 43, "driveShredTimestamp": int(time.time()), "driveShredDuration": 0, "driveCapacity": 42, "driveState": "shredded", "driveModelFamily": "modelFamily", "driveModelName": "modelName", "driveSerialnumber": "serial", "driveReHddVersion": "V1.1.2", } else: msg = TMsgQueueData() result = msgrcv( queue_id, ctypes.byref(msg), ctypes.sizeof(TMsgQueueData) - ctypes.sizeof(ctypes.c_long), 0, 0, ) if result == -1: err = ctypes.get_errno() print(f"Error reading from message queue: {os.strerror(err)}") break d = msg.driveData drive_info = { "driveIndex": d.caDriveIndex.decode().strip("\x00"), "driveHours": int(d.caDriveHours.decode().strip("\x00")), "driveCycles": int(d.caDriveCycles.decode().strip("\x00")), "driveErrors": int(d.caDriveErrors.decode().strip("\x00")), "driveShredTimestamp": int( d.caDriveShredTimestamp.decode().strip("\x00") ), "driveShredDuration": int( d.caDriveShredDuration.decode().strip("\x00") ), "driveCapacity": int(d.caDriveCapacity.decode().strip("\x00")), "driveState": d.caDriveState.decode().strip("\x00"), "driveModelFamily": d.caDriveModelFamily.decode().strip("\x00"), "driveModelName": d.caDriveModelName.decode().strip("\x00"), "driveSerialnumber": d.caDriveSerialnumber.decode().strip("\x00"), "driveReHddVersion": d.caDriveReHddVersion.decode().strip("\x00"), } print(f"Received Drive Data: {drive_info}") drive_obj, rehdd_info = create_drive_objects(drive_info) layouter.generate_image(drive_obj, rehdd_info, file_name) if wait_for_printer(): qlr = BrotherQLRaster("QL-570") image = Image.open(file_name) create_label(qlr, image, "62") with open(printer_path, "wb") as file: file.write(qlr.data) os.remove(file_name) else: print("Skipping printing due to printer unavailability.") if test_mode: break except Exception as e: print(f"Worker encountered an error: {e}") def main(): parser = argparse.ArgumentParser() parser.add_argument( "--test", action="store_true", help="Run in test mode with fake data" ) args = parser.parse_args() if args.test: print("Running in test mode.") worker(None, test_mode=True) return while True: try: queue_id = msgget(MSG_QUEUE_KEY, 0) if queue_id == -1: raise RuntimeError("Failed to connect to the existing message queue.") worker(queue_id) except Exception as e: print(f"Main process encountered an error: {e}") time.sleep(30) if __name__ == "__main__": main()