Merge pull request 'feature/native-ipc' (#17) from feature/native-ipc into main
Reviewed-on: #17
This commit is contained in:
commit
6b3dee9864
@ -8,7 +8,7 @@
|
||||
|
||||
## Install ##
|
||||
|
||||
`pip install qrcode sysv-ipc pycstruct brother-ql`
|
||||
`pip install qrcode brother-ql`
|
||||
|
||||
```
|
||||
cd /root/
|
||||
|
BIN
output.png
BIN
output.png
Binary file not shown.
Before Width: | Height: | Size: 5.5 KiB After Width: | Height: | Size: 5.5 KiB |
243
reHDDPrinter.py
243
reHDDPrinter.py
@ -3,75 +3,214 @@
|
||||
|
||||
""" Author: Hendrik Schutter, localhorst@mosad.xyz
|
||||
Date of creation: 2022/11/23
|
||||
Date of last modification: 2022/11/23
|
||||
Date of last modification: 2025/06/15
|
||||
"""
|
||||
|
||||
import sysv_ipc
|
||||
import pycstruct
|
||||
import ctypes
|
||||
import os
|
||||
import time
|
||||
import signal
|
||||
import argparse
|
||||
import warnings
|
||||
import logging
|
||||
from PIL import Image
|
||||
from brother_ql.brother_ql_create import create_label
|
||||
from brother_ql.raster import BrotherQLRaster
|
||||
import layouter
|
||||
|
||||
str_buffer_size = 64 #keep this synchronous to reHDD
|
||||
msg_queue_key = 0x1B11193C0 #keep this synchronous to reHDD
|
||||
# Suppress deprecation and printer warnings
|
||||
warnings.filterwarnings("ignore", category=DeprecationWarning)
|
||||
logging.getLogger("brother_ql").setLevel(logging.ERROR)
|
||||
|
||||
# Constants
|
||||
STR_BUFFER_SIZE = 64
|
||||
MSG_QUEUE_KEY = 0x1B11193C0
|
||||
IPC_CREAT = 0o1000
|
||||
|
||||
file_name = "output.png"
|
||||
printer_path = "/dev/usb/lp0"
|
||||
|
||||
def get_struct_format():
|
||||
#keep this synchronous to struct in reHDD
|
||||
driveData = pycstruct.StructDef()
|
||||
driveData.add('utf-8', 'driveIndex', length=str_buffer_size)
|
||||
driveData.add('utf-8', 'driveHours', length=str_buffer_size)
|
||||
driveData.add('utf-8', 'driveCycles', length=str_buffer_size)
|
||||
driveData.add('utf-8', 'driveErrors', length=str_buffer_size)
|
||||
driveData.add('utf-8', 'driveShredTimestamp', length=str_buffer_size)
|
||||
driveData.add('utf-8', 'driveShredDuration', length=str_buffer_size)
|
||||
driveData.add('utf-8', 'driveCapacity', length=str_buffer_size)
|
||||
driveData.add('utf-8', 'driveState', length=str_buffer_size)
|
||||
driveData.add('utf-8', 'driveModelFamiliy', length=str_buffer_size)
|
||||
driveData.add('utf-8', 'driveModelName', length=str_buffer_size)
|
||||
driveData.add('utf-8', 'driveSerialnumber', length=str_buffer_size)
|
||||
driveData.add('utf-8', 'driveReHddVersion', length=str_buffer_size)
|
||||
return driveData
|
||||
terminate = False
|
||||
|
||||
|
||||
class TDriveData(ctypes.Structure):
|
||||
_fields_ = [
|
||||
("caDriveIndex", ctypes.c_char * STR_BUFFER_SIZE),
|
||||
("caDriveHours", ctypes.c_char * STR_BUFFER_SIZE),
|
||||
("caDriveCycles", ctypes.c_char * STR_BUFFER_SIZE),
|
||||
("caDriveErrors", ctypes.c_char * STR_BUFFER_SIZE),
|
||||
("caDriveShredTimestamp", ctypes.c_char * STR_BUFFER_SIZE),
|
||||
("caDriveShredDuration", ctypes.c_char * STR_BUFFER_SIZE),
|
||||
("caDriveCapacity", ctypes.c_char * STR_BUFFER_SIZE),
|
||||
("caDriveState", ctypes.c_char * STR_BUFFER_SIZE),
|
||||
("caDriveModelFamily", ctypes.c_char * STR_BUFFER_SIZE),
|
||||
("caDriveModelName", ctypes.c_char * STR_BUFFER_SIZE),
|
||||
("caDriveSerialnumber", ctypes.c_char * STR_BUFFER_SIZE),
|
||||
("caDriveReHddVersion", ctypes.c_char * STR_BUFFER_SIZE),
|
||||
]
|
||||
|
||||
|
||||
class TMsgQueueData(ctypes.Structure):
|
||||
_fields_ = [
|
||||
("msg_queue_type", ctypes.c_long),
|
||||
("driveData", TDriveData),
|
||||
]
|
||||
|
||||
|
||||
# IPC bindings
|
||||
libc = ctypes.CDLL("libc.so.6")
|
||||
msgget = libc.msgget
|
||||
msgrcv = libc.msgrcv
|
||||
|
||||
msgget.argtypes = [ctypes.c_int, ctypes.c_int]
|
||||
msgget.restype = ctypes.c_int
|
||||
|
||||
msgrcv.argtypes = [
|
||||
ctypes.c_int,
|
||||
ctypes.POINTER(TMsgQueueData),
|
||||
ctypes.c_size_t,
|
||||
ctypes.c_long,
|
||||
ctypes.c_int,
|
||||
]
|
||||
msgrcv.restype = ctypes.c_ssize_t
|
||||
|
||||
|
||||
def signal_handler(signum, frame):
|
||||
global terminate
|
||||
print(f"Signal {signum} received, terminating...")
|
||||
terminate = True
|
||||
|
||||
|
||||
signal.signal(signal.SIGINT, signal_handler)
|
||||
signal.signal(signal.SIGTERM, signal_handler)
|
||||
|
||||
|
||||
def wait_for_printer():
|
||||
while not os.path.exists(printer_path):
|
||||
print("Printer not found, waiting ...")
|
||||
time.sleep(30)
|
||||
return True
|
||||
|
||||
|
||||
def create_drive_objects(drive_info):
|
||||
"""Convert dictionary to layouter-compatible DriveData and ReHddInfo objects"""
|
||||
drive = layouter.DriveData(
|
||||
drive_index=int(drive_info["driveIndex"]),
|
||||
drive_state=drive_info["driveState"],
|
||||
modelfamily=drive_info["driveModelFamily"],
|
||||
modelname=drive_info["driveModelName"],
|
||||
capacity=int(drive_info["driveCapacity"]),
|
||||
serialnumber=drive_info["driveSerialnumber"],
|
||||
power_on_hours=int(drive_info["driveHours"]),
|
||||
power_cycle=int(drive_info["driveCycles"]),
|
||||
smart_error_count=int(drive_info["driveErrors"]),
|
||||
shred_timestamp=int(drive_info["driveShredTimestamp"]),
|
||||
shred_duration=int(drive_info["driveShredDuration"]),
|
||||
)
|
||||
|
||||
rehdd_info = layouter.ReHddInfo(
|
||||
link="https://git.mosad.xyz/localhorst/reHDD",
|
||||
version=drive_info["driveReHddVersion"],
|
||||
)
|
||||
|
||||
return drive, rehdd_info
|
||||
|
||||
|
||||
def worker(queue_id, test_mode=False):
|
||||
try:
|
||||
while not terminate:
|
||||
if test_mode:
|
||||
drive_info = {
|
||||
"driveIndex": "42",
|
||||
"driveHours": 44,
|
||||
"driveCycles": 45,
|
||||
"driveErrors": 43,
|
||||
"driveShredTimestamp": int(time.time()),
|
||||
"driveShredDuration": 0,
|
||||
"driveCapacity": 42,
|
||||
"driveState": "shredded",
|
||||
"driveModelFamily": "modelFamily",
|
||||
"driveModelName": "modelName",
|
||||
"driveSerialnumber": "serial",
|
||||
"driveReHddVersion": "V1.1.2",
|
||||
}
|
||||
else:
|
||||
msg = TMsgQueueData()
|
||||
result = msgrcv(
|
||||
queue_id,
|
||||
ctypes.byref(msg),
|
||||
ctypes.sizeof(TMsgQueueData) - ctypes.sizeof(ctypes.c_long),
|
||||
0,
|
||||
0,
|
||||
)
|
||||
if result == -1:
|
||||
err = ctypes.get_errno()
|
||||
print(f"Error reading from message queue: {os.strerror(err)}")
|
||||
break
|
||||
|
||||
d = msg.driveData
|
||||
drive_info = {
|
||||
"driveIndex": d.caDriveIndex.decode().strip("\x00"),
|
||||
"driveHours": int(d.caDriveHours.decode().strip("\x00")),
|
||||
"driveCycles": int(d.caDriveCycles.decode().strip("\x00")),
|
||||
"driveErrors": int(d.caDriveErrors.decode().strip("\x00")),
|
||||
"driveShredTimestamp": int(
|
||||
d.caDriveShredTimestamp.decode().strip("\x00")
|
||||
),
|
||||
"driveShredDuration": int(
|
||||
d.caDriveShredDuration.decode().strip("\x00")
|
||||
),
|
||||
"driveCapacity": int(d.caDriveCapacity.decode().strip("\x00")),
|
||||
"driveState": d.caDriveState.decode().strip("\x00"),
|
||||
"driveModelFamily": d.caDriveModelFamily.decode().strip("\x00"),
|
||||
"driveModelName": d.caDriveModelName.decode().strip("\x00"),
|
||||
"driveSerialnumber": d.caDriveSerialnumber.decode().strip("\x00"),
|
||||
"driveReHddVersion": d.caDriveReHddVersion.decode().strip("\x00"),
|
||||
}
|
||||
|
||||
print(f"Received Drive Data: {drive_info}")
|
||||
|
||||
drive_obj, rehdd_info = create_drive_objects(drive_info)
|
||||
layouter.generate_image(drive_obj, rehdd_info, file_name)
|
||||
|
||||
if wait_for_printer():
|
||||
qlr = BrotherQLRaster("QL-570")
|
||||
image = Image.open(file_name)
|
||||
create_label(qlr, image, "62")
|
||||
with open(printer_path, "wb") as file:
|
||||
file.write(qlr.data)
|
||||
os.remove(file_name)
|
||||
else:
|
||||
print("Skipping printing due to printer unavailability.")
|
||||
|
||||
if test_mode:
|
||||
break
|
||||
|
||||
except Exception as e:
|
||||
print(f"Worker encountered an error: {e}")
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument(
|
||||
"--test", action="store_true", help="Run in test mode with fake data"
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.test:
|
||||
print("Running in test mode.")
|
||||
worker(None, test_mode=True)
|
||||
return
|
||||
|
||||
try:
|
||||
mq = sysv_ipc.MessageQueue(msg_queue_key, sysv_ipc.IPC_CREAT)
|
||||
queue_id = msgget(MSG_QUEUE_KEY, 0)
|
||||
if queue_id == -1:
|
||||
raise RuntimeError("Failed to connect to the existing message queue.")
|
||||
|
||||
while True:
|
||||
message, mtype = mq.receive()
|
||||
driveData = get_struct_format().deserialize(message)
|
||||
worker(queue_id)
|
||||
except Exception as e:
|
||||
print(f"Main process encountered an error: {e}")
|
||||
|
||||
rehdd_info = layouter.ReHddInfo("https://git.mosad.xyz/localhorst/reHDD", driveData['driveReHddVersion'])
|
||||
drive = layouter.DriveData(
|
||||
drive_index=int(driveData['driveIndex']),\
|
||||
drive_state=str(driveData['driveState']),\
|
||||
modelfamiliy=str(driveData['driveModelFamiliy']),\
|
||||
modelname=str(driveData['driveModelName']),\
|
||||
capacity=int(driveData['driveCapacity']),\
|
||||
serialnumber=str(driveData['driveSerialnumber']),\
|
||||
power_on_hours=int(driveData['driveHours']),\
|
||||
power_cycle=int(driveData['driveCycles']),\
|
||||
smart_error_count=int(driveData['driveErrors']),\
|
||||
shred_timestamp=int(driveData['driveShredTimestamp']),\
|
||||
shred_duration=int(driveData['driveShredDuration']))
|
||||
|
||||
while(not os.path.exists(printer_path)):
|
||||
print("Printer not found, waiting ...")
|
||||
time.sleep(30) #sleep 30
|
||||
|
||||
layouter.generate_image(drive, rehdd_info, file_name)
|
||||
qlr = BrotherQLRaster("QL-570")
|
||||
create_label(qlr, file_name, '62')
|
||||
|
||||
with open(printer_path, 'wb') as file:
|
||||
file.write(qlr.data)
|
||||
os.remove(file_name)
|
||||
except sysv_ipc.ExistentialError:
|
||||
print("ERROR: message queue creation failed")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
Loading…
Reference in New Issue
Block a user