218 lines
		
	
	
		
			7.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			218 lines
		
	
	
		
			7.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/env python3
 | |
| # -*- coding: utf-8 -*-
 | |
| 
 | |
| """ Author:                     Hendrik Schutter, localhorst@mosad.xyz
 | |
|     Date of creation:           2022/11/23
 | |
|     Date of last modification:  2025/06/15
 | |
| """
 | |
| 
 | |
| import ctypes
 | |
| import os
 | |
| import time
 | |
| import signal
 | |
| import argparse
 | |
| import warnings
 | |
| import logging
 | |
| from PIL import Image
 | |
| from brother_ql.brother_ql_create import create_label
 | |
| from brother_ql.raster import BrotherQLRaster
 | |
| import layouter
 | |
| 
 | |
| # Suppress deprecation and printer warnings
 | |
| warnings.filterwarnings("ignore", category=DeprecationWarning)
 | |
| logging.getLogger("brother_ql").setLevel(logging.ERROR)
 | |
| 
 | |
| # Constants
 | |
| STR_BUFFER_SIZE = 64
 | |
| MSG_QUEUE_KEY = 0x1B11193C0
 | |
| IPC_CREAT = 0o1000
 | |
| 
 | |
| file_name = "output.png"
 | |
| printer_path = "/dev/usb/lp0"
 | |
| 
 | |
| terminate = False
 | |
| 
 | |
| 
 | |
| class TDriveData(ctypes.Structure):
 | |
|     _fields_ = [
 | |
|         ("caDriveIndex", ctypes.c_char * STR_BUFFER_SIZE),
 | |
|         ("caDriveHours", ctypes.c_char * STR_BUFFER_SIZE),
 | |
|         ("caDriveCycles", ctypes.c_char * STR_BUFFER_SIZE),
 | |
|         ("caDriveErrors", ctypes.c_char * STR_BUFFER_SIZE),
 | |
|         ("caDriveShredTimestamp", ctypes.c_char * STR_BUFFER_SIZE),
 | |
|         ("caDriveShredDuration", ctypes.c_char * STR_BUFFER_SIZE),
 | |
|         ("caDriveCapacity", ctypes.c_char * STR_BUFFER_SIZE),
 | |
|         ("caDriveState", ctypes.c_char * STR_BUFFER_SIZE),
 | |
|         ("caDriveModelFamily", ctypes.c_char * STR_BUFFER_SIZE),
 | |
|         ("caDriveModelName", ctypes.c_char * STR_BUFFER_SIZE),
 | |
|         ("caDriveSerialnumber", ctypes.c_char * STR_BUFFER_SIZE),
 | |
|         ("caDriveReHddVersion", ctypes.c_char * STR_BUFFER_SIZE),
 | |
|     ]
 | |
| 
 | |
| 
 | |
| class TMsgQueueData(ctypes.Structure):
 | |
|     _fields_ = [
 | |
|         ("msg_queue_type", ctypes.c_long),
 | |
|         ("driveData", TDriveData),
 | |
|     ]
 | |
| 
 | |
| 
 | |
| # IPC bindings
 | |
| libc = ctypes.CDLL("libc.so.6")
 | |
| msgget = libc.msgget
 | |
| msgrcv = libc.msgrcv
 | |
| 
 | |
| msgget.argtypes = [ctypes.c_int, ctypes.c_int]
 | |
| msgget.restype = ctypes.c_int
 | |
| 
 | |
| msgrcv.argtypes = [
 | |
|     ctypes.c_int,
 | |
|     ctypes.POINTER(TMsgQueueData),
 | |
|     ctypes.c_size_t,
 | |
|     ctypes.c_long,
 | |
|     ctypes.c_int,
 | |
| ]
 | |
| msgrcv.restype = ctypes.c_ssize_t
 | |
| 
 | |
| 
 | |
| def signal_handler(signum, frame):
 | |
|     global terminate
 | |
|     print(f"Signal {signum} received, terminating...")
 | |
|     terminate = True
 | |
| 
 | |
| 
 | |
| signal.signal(signal.SIGINT, signal_handler)
 | |
| signal.signal(signal.SIGTERM, signal_handler)
 | |
| 
 | |
| 
 | |
| def wait_for_printer():
 | |
|     while not os.path.exists(printer_path):
 | |
|         print("Printer not found, waiting ...")
 | |
|         time.sleep(30)
 | |
|     return True
 | |
| 
 | |
| 
 | |
| def create_drive_objects(drive_info):
 | |
|     """Convert dictionary to layouter-compatible DriveData and ReHddInfo objects"""
 | |
|     drive = layouter.DriveData(
 | |
|         drive_index=int(drive_info["driveIndex"]),
 | |
|         drive_state=drive_info["driveState"],
 | |
|         modelfamily=drive_info["driveModelFamily"],
 | |
|         modelname=drive_info["driveModelName"],
 | |
|         capacity=int(drive_info["driveCapacity"]),
 | |
|         serialnumber=drive_info["driveSerialnumber"],
 | |
|         power_on_hours=int(drive_info["driveHours"]),
 | |
|         power_cycle=int(drive_info["driveCycles"]),
 | |
|         smart_error_count=int(drive_info["driveErrors"]),
 | |
|         shred_timestamp=int(drive_info["driveShredTimestamp"]),
 | |
|         shred_duration=int(drive_info["driveShredDuration"]),
 | |
|     )
 | |
| 
 | |
|     rehdd_info = layouter.ReHddInfo(
 | |
|         link="https://git.mosad.xyz/localhorst/reHDD",
 | |
|         version=drive_info["driveReHddVersion"],
 | |
|     )
 | |
| 
 | |
|     return drive, rehdd_info
 | |
| 
 | |
| 
 | |
| def worker(queue_id, test_mode=False):
 | |
|     try:
 | |
|         while not terminate:
 | |
|             if test_mode:
 | |
|                 drive_info = {
 | |
|                     "driveIndex": "42",
 | |
|                     "driveHours": 44,
 | |
|                     "driveCycles": 45,
 | |
|                     "driveErrors": 43,
 | |
|                     "driveShredTimestamp": int(time.time()),
 | |
|                     "driveShredDuration": 0,
 | |
|                     "driveCapacity": 42,
 | |
|                     "driveState": "shredded",
 | |
|                     "driveModelFamily": "modelFamily",
 | |
|                     "driveModelName": "modelName",
 | |
|                     "driveSerialnumber": "serial",
 | |
|                     "driveReHddVersion": "V1.1.2",
 | |
|                 }
 | |
|             else:
 | |
|                 msg = TMsgQueueData()
 | |
|                 result = msgrcv(
 | |
|                     queue_id,
 | |
|                     ctypes.byref(msg),
 | |
|                     ctypes.sizeof(TMsgQueueData) - ctypes.sizeof(ctypes.c_long),
 | |
|                     0,
 | |
|                     0,
 | |
|                 )
 | |
|                 if result == -1:
 | |
|                     err = ctypes.get_errno()
 | |
|                     print(f"Error reading from message queue: {os.strerror(err)}")
 | |
|                     break
 | |
| 
 | |
|                 d = msg.driveData
 | |
|                 drive_info = {
 | |
|                     "driveIndex": d.caDriveIndex.decode().strip("\x00"),
 | |
|                     "driveHours": int(d.caDriveHours.decode().strip("\x00")),
 | |
|                     "driveCycles": int(d.caDriveCycles.decode().strip("\x00")),
 | |
|                     "driveErrors": int(d.caDriveErrors.decode().strip("\x00")),
 | |
|                     "driveShredTimestamp": int(
 | |
|                         d.caDriveShredTimestamp.decode().strip("\x00")
 | |
|                     ),
 | |
|                     "driveShredDuration": int(
 | |
|                         d.caDriveShredDuration.decode().strip("\x00")
 | |
|                     ),
 | |
|                     "driveCapacity": int(d.caDriveCapacity.decode().strip("\x00")),
 | |
|                     "driveState": d.caDriveState.decode().strip("\x00"),
 | |
|                     "driveModelFamily": d.caDriveModelFamily.decode().strip("\x00"),
 | |
|                     "driveModelName": d.caDriveModelName.decode().strip("\x00"),
 | |
|                     "driveSerialnumber": d.caDriveSerialnumber.decode().strip("\x00"),
 | |
|                     "driveReHddVersion": d.caDriveReHddVersion.decode().strip("\x00"),
 | |
|                 }
 | |
|                 time.sleep(3) 
 | |
| 
 | |
|             print(f"Received Drive Data: {drive_info}")
 | |
| 
 | |
|             drive_obj, rehdd_info = create_drive_objects(drive_info)
 | |
|             layouter.generate_image(drive_obj, rehdd_info, file_name)
 | |
| 
 | |
|             if wait_for_printer():
 | |
|                 qlr = BrotherQLRaster("QL-570")
 | |
|                 image = Image.open(file_name)
 | |
|                 create_label(qlr, image, "62")
 | |
|                 with open(printer_path, "wb") as file:
 | |
|                     file.write(qlr.data)
 | |
|                 os.remove(file_name)
 | |
|             else:
 | |
|                 print("Skipping printing due to printer unavailability.")
 | |
| 
 | |
|             if test_mode:
 | |
|                 break
 | |
| 
 | |
|     except Exception as e:
 | |
|         print(f"Worker encountered an error: {e}")
 | |
| 
 | |
| 
 | |
| def main():
 | |
|     parser = argparse.ArgumentParser()
 | |
|     parser.add_argument(
 | |
|         "--test", action="store_true", help="Run in test mode with fake data"
 | |
|     )
 | |
|     args = parser.parse_args()
 | |
| 
 | |
|     if args.test:
 | |
|         print("Running in test mode.")
 | |
|         worker(None, test_mode=True)
 | |
|         return
 | |
|     while True:
 | |
|         try:
 | |
|             queue_id = msgget(MSG_QUEUE_KEY, 0)
 | |
|             if queue_id == -1:
 | |
|                 raise RuntimeError("Failed to connect to the existing message queue.")
 | |
|             worker(queue_id)
 | |
|         except Exception as e:
 | |
|             print(f"Main process encountered an error: {e}")
 | |
|             time.sleep(30)
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     main()
 |