feature/native-ipc #17

Merged
localhorst merged 2 commits from feature/native-ipc into main 2025-06-15 18:05:02 +02:00
2 changed files with 133 additions and 72 deletions
Showing only changes of commit 211bf80b91 - Show all commits

View File

@ -8,7 +8,7 @@
## Install ##
`pip install qrcode sysv-ipc pycstruct brother-ql`
`pip install qrcode brother-ql`
```
cd /root/

View File

@ -3,28 +3,36 @@
""" Author: Hendrik Schutter, localhorst@mosad.xyz
Date of creation: 2022/11/23
Date of last modification: 2025/06/08
Date of last modification: 2025/06/15
"""
import ctypes
import os
import time
import signal
import argparse
import warnings
import logging
from PIL import Image
from brother_ql.brother_ql_create import create_label
from brother_ql.raster import BrotherQLRaster
import layouter
# Constants
STR_BUFFER_SIZE = 64 # Buffer size for each field in t_driveData
MSG_QUEUE_KEY = 0x1B11193C0 # Message queue key from C code
# Suppress deprecation and printer warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)
logging.getLogger("brother_ql").setLevel(logging.ERROR)
# IPC Constants
IPC_CREAT = 0o1000 # Required for msgget
# Constants
STR_BUFFER_SIZE = 64
MSG_QUEUE_KEY = 0x1B11193C0
IPC_CREAT = 0o1000
file_name = "output.png"
printer_path = "/dev/usb/lp0"
terminate = False
# Define ctypes structure for t_driveData
class TDriveData(ctypes.Structure):
_fields_ = [
("caDriveIndex", ctypes.c_char * STR_BUFFER_SIZE),
@ -42,15 +50,14 @@ class TDriveData(ctypes.Structure):
]
# Define ctypes structure for t_msgQueueData
class TMsgQueueData(ctypes.Structure):
_fields_ = [
("msg_queue_type", ctypes.c_long), # Message type
("driveData", TDriveData), # Drive data
("msg_queue_type", ctypes.c_long),
("driveData", TDriveData),
]
# Define ctypes bindings for Linux IPC
# IPC bindings
libc = ctypes.CDLL("libc.so.6")
msgget = libc.msgget
msgrcv = libc.msgrcv
@ -68,84 +75,138 @@ msgrcv.argtypes = [
msgrcv.restype = ctypes.c_ssize_t
def worker(queue_id):
"""
Worker process to read messages from the IPC queue and print them.
"""
def signal_handler(signum, frame):
global terminate
print(f"Signal {signum} received, terminating...")
terminate = True
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
def wait_for_printer():
while not os.path.exists(printer_path):
print("Printer not found, waiting ...")
time.sleep(30)
return True
def create_drive_objects(drive_info):
"""Convert dictionary to layouter-compatible DriveData and ReHddInfo objects"""
drive = layouter.DriveData(
drive_index=int(drive_info["driveIndex"]),
drive_state=drive_info["driveState"],
modelfamily=drive_info["driveModelFamily"],
modelname=drive_info["driveModelName"],
capacity=int(drive_info["driveCapacity"]),
serialnumber=drive_info["driveSerialnumber"],
power_on_hours=int(drive_info["driveHours"]),
power_cycle=int(drive_info["driveCycles"]),
smart_error_count=int(drive_info["driveErrors"]),
shred_timestamp=int(drive_info["driveShredTimestamp"]),
shred_duration=int(drive_info["driveShredDuration"]),
)
rehdd_info = layouter.ReHddInfo(
link="https://git.mosad.xyz/localhorst/reHDD",
version=drive_info["driveReHddVersion"],
)
return drive, rehdd_info
def worker(queue_id, test_mode=False):
try:
while True:
# Prepare to receive a message
msg = TMsgQueueData()
result = msgrcv(
queue_id,
ctypes.byref(msg),
ctypes.sizeof(TMsgQueueData) - ctypes.sizeof(ctypes.c_long),
0,
0,
)
if result == -1:
print("Error reading from message queue.")
break
while not terminate:
if test_mode:
drive_info = {
"driveIndex": "42",
"driveHours": 44,
"driveCycles": 45,
"driveErrors": 43,
"driveShredTimestamp": int(time.time()),
"driveShredDuration": 0,
"driveCapacity": 42,
"driveState": "shredded",
"driveModelFamily": "modelFamily",
"driveModelName": "modelName",
"driveSerialnumber": "serial",
"driveReHddVersion": "V1.1.2",
}
else:
msg = TMsgQueueData()
result = msgrcv(
queue_id,
ctypes.byref(msg),
ctypes.sizeof(TMsgQueueData) - ctypes.sizeof(ctypes.c_long),
0,
0,
)
if result == -1:
err = ctypes.get_errno()
print(f"Error reading from message queue: {os.strerror(err)}")
break
# Extract drive data
driveData = msg.driveData
d = msg.driveData
drive_info = {
"driveIndex": d.caDriveIndex.decode().strip("\x00"),
"driveHours": int(d.caDriveHours.decode().strip("\x00")),
"driveCycles": int(d.caDriveCycles.decode().strip("\x00")),
"driveErrors": int(d.caDriveErrors.decode().strip("\x00")),
"driveShredTimestamp": int(
d.caDriveShredTimestamp.decode().strip("\x00")
),
"driveShredDuration": int(
d.caDriveShredDuration.decode().strip("\x00")
),
"driveCapacity": int(d.caDriveCapacity.decode().strip("\x00")),
"driveState": d.caDriveState.decode().strip("\x00"),
"driveModelFamily": d.caDriveModelFamily.decode().strip("\x00"),
"driveModelName": d.caDriveModelName.decode().strip("\x00"),
"driveSerialnumber": d.caDriveSerialnumber.decode().strip("\x00"),
"driveReHddVersion": d.caDriveReHddVersion.decode().strip("\x00"),
}
# Convert drive data fields from bytes to strings
drive_info = {
"driveIndex": driveData.caDriveIndex.decode().strip("\x00"),
"driveHours": int(driveData.caDriveHours.decode().strip("\x00")),
"driveCycles": int(driveData.caDriveCycles.decode().strip("\x00")),
"driveErrors": int(driveData.caDriveErrors.decode().strip("\x00")),
"driveShredTimestamp": int(
driveData.caDriveShredTimestamp.decode().strip("\x00")
),
"driveShredDuration": int(
driveData.caDriveShredDuration.decode().strip("\x00")
),
"driveCapacity": int(driveData.caDriveCapacity.decode().strip("\x00")),
"driveState": driveData.caDriveState.decode().strip("\x00"),
"driveModelFamily": driveData.caDriveModelFamily.decode().strip("\x00"),
"driveModelName": driveData.caDriveModelName.decode().strip("\x00"),
"driveSerialnumber": driveData.caDriveSerialnumber.decode().strip(
"\x00"
),
"driveReHddVersion": driveData.caDriveReHddVersion.decode().strip(
"\x00"
),
}
#rehdd_info = layouter.ReHddInfo("https://git.mosad.xyz/localhorst/reHDD", driveData["driveReHddVersion"])
# Print the drive information
print(f"Received Drive Data: {drive_info}")
# while(not os.path.exists(printer_path)):
# print("Printer not found, waiting ...")
# time.sleep(30) #sleep 30
drive_obj, rehdd_info = create_drive_objects(drive_info)
layouter.generate_image(drive_obj, rehdd_info, file_name)
# layouter.generate_image(drive_info, rehdd_info, file_name)
# qlr = BrotherQLRaster("QL-570")
# create_label(qlr, file_name, '62')
if wait_for_printer():
qlr = BrotherQLRaster("QL-570")
image = Image.open(file_name)
create_label(qlr, image, "62")
with open(printer_path, "wb") as file:
file.write(qlr.data)
os.remove(file_name)
else:
print("Skipping printing due to printer unavailability.")
# with open(printer_path, 'wb') as file:
# file.write(qlr.data)
# os.remove(file_name)
if test_mode:
break
except Exception as e:
print(f"Worker encountered an error: {e}")
def main():
"""
Main function to connect to the existing IPC queue and start the worker process.
"""
parser = argparse.ArgumentParser()
parser.add_argument(
"--test", action="store_true", help="Run in test mode with fake data"
)
args = parser.parse_args()
if args.test:
print("Running in test mode.")
worker(None, test_mode=True)
return
try:
# Connect to the existing message queue
queue_id = msgget(MSG_QUEUE_KEY, 0)
if queue_id == -1:
raise RuntimeError("Failed to connect to the existing message queue.")
# Start the worker process
worker(queue_id)
except Exception as e:
print(f"Main process encountered an error: {e}")