Compare commits

...

12 Commits

4 changed files with 495 additions and 205 deletions

View File

@ -8,7 +8,7 @@
## Install ## ## Install ##
`pip install qrcode sysv-ipc pycstruct brother-ql` `pip install qrcode brother-ql`
``` ```
cd /root/ cd /root/

View File

@ -1,215 +1,365 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
""" Author: Hendrik Schutter, localhorst@mosad.xyz """
Date of creation: 2022/11/16 Author: Hendrik Schutter, localhorst@mosad.xyz
Date of last modification: 2022/11/23 Date of creation: 2022/11/16
Date of last modification: 2025/06/08
""" """
import re import re
import dataclasses import dataclasses
import glob import glob
import datetime import datetime
import json import json
import logging
import qrcode import qrcode
from PIL import Image from PIL import Image, ImageFont, ImageDraw
from PIL import ImageFont
from PIL import ImageDraw # Constants
FONT_PATH = "/usr/share/fonts"
DEFAULT_FONT_REGULAR = "DejaVuSans.ttf"
DEFAULT_FONT_BOLD = "DejaVuSans-Bold.ttf"
OUTPUT_WIDTH = 696 # px
OUTPUT_HEIGHT = 190 # px
TEXT_X_OFFSET = 190 # px
QR_CODE_SIZE = 179 # px
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
@dataclasses.dataclass @dataclasses.dataclass
class DriveData: class DriveData:
drive_index: int drive_index: int
drive_state: str #none, deleted, shredded drive_state: str
modelfamiliy: str modelfamily: str
modelname: str modelname: str
capacity: int #in bytes capacity: int
serialnumber: str serialnumber: str
power_on_hours: int #in hours power_on_hours: int
power_cycle: int power_cycle: int
smart_error_count: int smart_error_count: int
shred_timestamp: int #unix timestamp shred_timestamp: int
shred_duration: int #in seconds shred_duration: int
@dataclasses.dataclass
class DriveDataJson:
state: str
fam: str
name: str
cap: int
sn: str
poh: int
pc: int
err: int
time: int
dur: int
@dataclasses.dataclass @dataclasses.dataclass
class DriveDataPrintable: class DriveDataPrintable:
modelfamiliy: str #max lenght 25 modelfamily: str
modelname: str #max lenght 25 modelname: str
capacity: str #max lenght 25, in human-readable format with unit (GB/TB) capacity: str
serialnumber: str #max lenght 25 serialnumber: str
power_on_hours: str #max lenght 25, in hours and days and years power_on_hours: str
power_cycle: str #max lenght 25 power_cycle: str
smart_error_count: str #max lenght 25 smart_error_count: str
shred_timestamp: str #max lenght 25, human-readable shred_timestamp: str
shred_duration: str #max lenght 25, human-readable shred_duration: str
@dataclasses.dataclass @dataclasses.dataclass
class ReHddInfo: class ReHddInfo:
link: str link: str
version: str version: str
@dataclasses.dataclass @dataclasses.dataclass
class DriveDataJson: class QrDataJson:
drive: DriveData drive: DriveDataJson
rehdd: ReHddInfo rehdd: ReHddInfo
def get_font_path_regular():
path = "/usr/share/fonts"
files = glob.glob(path + "/**/DejaVuSans.ttf", recursive = True)
return files[0]
def get_font_path_bold(): def find_font_path(font_name):
path = "/usr/share/fonts" """Finds the full path of the specified font."""
files = glob.glob(path + "/**/DejaVuSans-Bold.ttf", recursive = True) try:
return files[0] files = glob.glob(f"{FONT_PATH}/**/{font_name}", recursive=True)
return files[0] if files else None
except Exception as e:
logging.error(f"Error locating font {font_name}: {e}")
return None
def human_readable_capacity_1024(size, decimal_places=0):
for unit in ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB']:
if size < 1024.0 or unit == 'PiB':
break
size /= 1024.0
return f"{size:.{decimal_places}f} {unit}"
def human_readable_capacity_1000(size, decimal_places=0): def human_readable_capacity(size, decimal_places=0, base=1000):
for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB']: """Converts bytes to a human-readable string."""
if size < 1000.0 or unit == 'PB': units = (
break ["B", "KB", "MB", "GB", "TB", "PB"]
size /= 1000.0 if base == 1000
return f"{size:.{decimal_places}f} {unit}" else ["B", "KiB", "MiB", "GiB", "TiB", "PiB"]
)
for unit in units:
if size < base or unit == units[-1]:
return f"{size:.{decimal_places}f} {unit}"
size /= base
def human_readable_power_on_hours(hours):
"""Converts hours to human-readable string in hours, days, and years."""
return (
str(hours)
+ "h or "
+ str(int(hours / 24))
+ "d or "
+ str("{:.2f}".format(float(hours / 24 / 365)))
+ "y"
)
def cut_string(max_length, data, direction="end"):
"""Trims a string to the maximum length, adding ellipses if necessary."""
if len(data) > max_length:
return (
f"{data[:max_length-4]} ..."
if direction == "end"
else f"... {data[-(max_length-4):]}"
)
return data
def human_readable_power_on_hours(hours, decimal_places=2):
return str(hours) + "h or " + str(int(hours/24)) + "d or " + str("{:.2f}".format(float(hours/24/365))) + "y"
def cut_string(max_lenght, data, direction):
if (len(data) > max_lenght):
if (direction == "end"):
return data[0:(max_lenght-4)] + " ..."
elif (direction == "start"):
return "... " + data[(len(data)-max_lenght+4):]
else:
return cut_string(max_lenght, data, "end")
else:
return data
def format_to_printable(drive): def format_to_printable(drive):
return DriveDataPrintable( return DriveDataPrintable(
cut_string(20, re.sub(r"[^a-zA-Z0-9. ]", "", drive.modelfamiliy), "end"),\ cut_string(20, re.sub(r"[^a-zA-Z0-9. ]", "", drive.modelfamily), "end"),
cut_string(20, re.sub(r"[^a-zA-Z0-9. ]", "", drive.modelname), "end"),\ cut_string(20, re.sub(r"[^a-zA-Z0-9. ]", "", drive.modelname), "end"),
cut_string(20, human_readable_capacity_1000(drive.capacity), "end"),\ cut_string(20, human_readable_capacity(drive.capacity), "end"),
cut_string(16, re.sub(r"[^a-zA-Z0-9.-_]", "", drive.serialnumber), "start"),\ cut_string(20, re.sub(r"[^a-zA-Z0-9.-_]", "", drive.serialnumber), "start"),
cut_string(30, human_readable_power_on_hours(drive.power_on_hours), "end"),\ cut_string(30, human_readable_power_on_hours(drive.power_on_hours), "end"),
cut_string(10, str(drive.power_cycle), "end"),\ cut_string(10, str(drive.power_cycle), "end"),
cut_string(10, str(drive.smart_error_count), "end"),\ cut_string(10, str(drive.smart_error_count), "end"),
cut_string(30, datetime.datetime.utcfromtimestamp(drive.shred_timestamp).strftime('%Y-%m-%d %H:%M:%S'), "end"),\ cut_string(
cut_string(30, str(datetime.timedelta(seconds = drive.shred_duration)), "end")) 30,
datetime.datetime.fromtimestamp(
drive.shred_timestamp, datetime.UTC
).strftime("%Y-%m-%d %H:%M:%S"),
"end",
),
cut_string(30, str(datetime.timedelta(seconds=drive.shred_duration)), "end"),
)
def draw_text(drawable, printable_data, text_x_offset):
try:
font_file_regular = get_font_path_regular()
font_file_bold = get_font_path_bold()
except Exception as ex:
print("unable to find font: " + str(ex))
return
font_size = 20
text_y_offset = 10
text_y_offset_increment = 25
value_colum_x_offset = 120
drawable.text((text_x_offset, text_y_offset), printable_data.serialnumber,(0),font=ImageFont.truetype(font_file_bold, 30)) def draw_text(drawable, printable_data, font_regular, font_bold, font_bold_bigger):
text_y_offset += 40 """Draws formatted text with Cycles and Errors on one row."""
drawable.text((text_x_offset, text_y_offset), "Familiy: ",(0),font=ImageFont.truetype(font_file_bold, font_size)) line_height = 26
drawable.text((text_x_offset+value_colum_x_offset, text_y_offset), printable_data.modelfamiliy,(0),font=ImageFont.truetype(font_file_regular, font_size)) label_x = TEXT_X_OFFSET
text_y_offset += text_y_offset_increment value_offset = 115
drawable.text((text_x_offset, text_y_offset), "Model: ",(0),font=ImageFont.truetype(font_file_bold, font_size)) right_field_spacing = 200 # Horizontal gap between Cycles and Errors
drawable.text((text_x_offset+value_colum_x_offset, text_y_offset), printable_data.modelname,(0),font=ImageFont.truetype(font_file_regular, font_size)) x_capacity = 520
text_y_offset += text_y_offset_increment y_capacity = 142
drawable.text((text_x_offset, text_y_offset), "Hours: " ,(0),font=ImageFont.truetype(font_file_bold, font_size)) y_start = 4
drawable.text((text_x_offset+value_colum_x_offset, text_y_offset), printable_data.power_on_hours,(0),font=ImageFont.truetype(font_file_regular, font_size))
text_y_offset += text_y_offset_increment
drawable.text((text_x_offset, text_y_offset), "Cycles: ",(0),font=ImageFont.truetype(font_file_bold, font_size))
drawable.text((text_x_offset+value_colum_x_offset, text_y_offset), printable_data.power_cycle,(0),font=ImageFont.truetype(font_file_regular, font_size))
text_y_offset += text_y_offset_increment
drawable.text((text_x_offset, text_y_offset), "Errors: ", (0),font=ImageFont.truetype(font_file_bold, font_size))
drawable.text((text_x_offset+value_colum_x_offset, text_y_offset), printable_data.smart_error_count,(0),font=ImageFont.truetype(font_file_regular, font_size))
text_y_offset += text_y_offset_increment
drawable.text((text_x_offset, text_y_offset), "Shred on: ",(0),font=ImageFont.truetype(font_file_bold, font_size))
drawable.text((text_x_offset+value_colum_x_offset, text_y_offset), printable_data.shred_timestamp,(0),font=ImageFont.truetype(font_file_regular, font_size))
text_y_offset += text_y_offset_increment
drawable.text((text_x_offset, text_y_offset), "Duration: " ,(0),font=ImageFont.truetype(font_file_bold, font_size))
drawable.text((text_x_offset+value_colum_x_offset, text_y_offset), printable_data.shred_duration,(0),font=ImageFont.truetype(font_file_regular, font_size))
text_y_offset += text_y_offset_increment
drawable.text((text_x_offset, text_y_offset), printable_data.capacity,(0),font=ImageFont.truetype(font_file_bold, font_size*3)) # Serial Number
drawable.text((label_x, y_start), "Serial:", fill=0, font=font_bold)
drawable.text(
(label_x + value_offset, y_start),
printable_data.serialnumber,
fill=0,
font=font_bold,
)
y1 = y_start + line_height
y2 = y1 + line_height
y3 = y2 + line_height
y4 = y3 + line_height
y5 = y4 + line_height
y6 = y5 + line_height
# Left-Aligned Fields (One per row)
drawable.text((label_x, y1), "Family:", fill=0, font=font_bold)
drawable.text(
(label_x + value_offset, y1),
printable_data.modelfamily,
fill=0,
font=font_regular,
)
drawable.text((label_x, y2), "Model:", fill=0, font=font_bold)
drawable.text(
(label_x + value_offset, y2),
printable_data.modelname,
fill=0,
font=font_regular,
)
drawable.text((label_x, y3), "Hours:", fill=0, font=font_bold)
drawable.text(
(label_x + value_offset, y3),
printable_data.power_on_hours,
fill=0,
font=font_regular,
)
# Cycles and Errors on the same line
drawable.text((label_x, y4), "Cycles:", fill=0, font=font_bold)
drawable.text(
(label_x + value_offset, y4),
printable_data.power_cycle,
fill=0,
font=font_regular,
)
drawable.text(
(label_x + right_field_spacing, y4), "Errors:", fill=0, font=font_bold
)
drawable.text(
(label_x + right_field_spacing + value_offset, y4),
printable_data.smart_error_count,
fill=0,
font=font_regular,
)
# Continue remaining fields
drawable.text((label_x, y5), "Shred on:", fill=0, font=font_bold)
drawable.text(
(label_x + value_offset, y5),
printable_data.shred_timestamp,
fill=0,
font=font_regular,
)
drawable.text((label_x, y6), "Duration:", fill=0, font=font_bold)
drawable.text(
(label_x + value_offset, y6),
printable_data.shred_duration,
fill=0,
font=font_regular,
)
# Capacity at the bottom
drawable.text(
(x_capacity, y_capacity),
printable_data.capacity,
fill=0,
font=font_bold_bigger,
)
def draw_outline(drawable, margin, width, output_width, output_height):
#upper
drawable.line([(margin,margin), (output_width -margin ,margin)], fill=None, width=width, joint=None)
#left
drawable.line([(margin,margin), (margin ,output_height-margin)], fill=None, width=width, joint=None)
#right
drawable.line([(output_width-margin,margin), (output_width-margin ,output_height-margin)], fill=None, width=width, joint=None)
#lower
drawable.line([(margin,output_height - margin), (output_width-margin,output_height -margin)], fill=None, width=width, joint=None)
def draw_qr_code(image, data): def draw_qr_code(image, data):
"""
Draws a QR code on the provided image at a specific location.
Parameters:
image (Image.Image): The target image.
data (str): The data to encode in the QR code.
"""
# Generate the QR code
qr_img = qrcode.make(data) qr_img = qrcode.make(data)
qr_img.thumbnail((291, 291), Image.Resampling.LANCZOS) qr_img = qr_img.convert("1") # Ensure QR code is in binary (black/white)
image.paste(qr_img, (7, 7))
# Remove white border
bbox = qr_img.getbbox() # Get the bounding box of the non-white area
qr_img = qr_img.crop(bbox)
# Resize to desired size
qr_img = qr_img.resize((QR_CODE_SIZE, QR_CODE_SIZE), Image.Resampling.LANCZOS)
# Paste the QR code onto the image
region = (5, 5, 5 + QR_CODE_SIZE, 5 + QR_CODE_SIZE)
image.paste(qr_img, box=region)
def draw_outline(drawable, margin, width, output_width, output_height):
"""
Draws a rectangular outline on the drawable canvas.
Parameters:
drawable (ImageDraw.Draw): The drawable canvas.
margin (int): The margin from the edges of the image.
width (int): The width of the outline.
output_width (int): The total width of the image.
output_height (int): The total height of the image.
"""
# Define the four corners of the rectangle, adjusting for line width
top_left = (margin, margin)
top_right = (output_width - margin - width, margin)
bottom_left = (margin, output_height - margin - width)
bottom_right = (output_width - margin - width, output_height - margin - width)
# Draw the outline lines with adjusted coordinates
lines = [
(top_left, top_right), # Top edge
(top_left, bottom_left), # Left edge
(top_right, bottom_right), # Right edge
(bottom_left, bottom_right), # Bottom edge
]
for line in lines:
drawable.line(line, fill=0, width=width)
def generate_image(drive, rehdd_info, output_file): def generate_image(drive, rehdd_info, output_file):
output_width = 696 #in px set by used paper """Generates an image containing drive data and a QR code."""
output_height = 300 #in px
text_x_offset= 300 #in px
qr_data = DriveDataJson(drive, rehdd_info)
try: try:
json_qr_daten = json.dumps(dataclasses.asdict(qr_data))
except Exception as ex:
print("unable to generate json: " + str(ex))
return
try: drive_json = DriveDataJson(
state=drive.drive_state,
fam=drive.modelfamily,
name=drive.modelname,
cap=drive.capacity,
sn=drive.serialnumber,
poh=drive.power_on_hours,
pc=drive.power_cycle,
err=drive.smart_error_count,
time=int(drive.shred_timestamp),
dur=drive.shred_duration,
)
qr_data = json.dumps(dataclasses.asdict(QrDataJson(drive_json, rehdd_info)))
printable_data = format_to_printable(drive) printable_data = format_to_printable(drive)
except Exception as ex: except Exception as e:
print("unable to format data: " + str(ex)) logging.error(f"Error preparing data: {e}")
return return
#print(printable_data.serialnumber)
#create black and white (binary) image with white background output_image = Image.new("1", (OUTPUT_WIDTH, OUTPUT_HEIGHT), "white")
output_image = Image.new('1', (output_width, output_height), "white")
#create draw pane
draw = ImageDraw.Draw(output_image) draw = ImageDraw.Draw(output_image)
draw_outline(draw, 1, 4, output_width, output_height) font_regular = ImageFont.truetype(find_font_path(DEFAULT_FONT_REGULAR), 20)
draw_text(draw, printable_data, text_x_offset) font_bold = ImageFont.truetype(find_font_path(DEFAULT_FONT_BOLD), 20)
draw_qr_code(output_image, str(json_qr_daten).replace(" ", "")) font_bold_bigger = ImageFont.truetype(find_font_path(DEFAULT_FONT_BOLD), 42)
output_image.save(output_file) draw_outline(draw, 0, 3, OUTPUT_WIDTH + 1, OUTPUT_HEIGHT + 1)
draw_text(draw, printable_data, font_regular, font_bold, font_bold_bigger)
draw_qr_code(output_image, qr_data)
try:
output_image.save(output_file)
logging.info(f"Image saved to {output_file}")
except Exception as e:
logging.error(f"Error saving image: {e}")
def main(): def main():
rehdd_info = ReHddInfo("https://git.mosad.xyz/localhorst/reHDD", "bV0.2.2")
rehdd_info = ReHddInfo("https://git.mosad.xyz/localhorst/reHDD", "bV0.2.2") # read this from rehdd process
temp_drive = DriveData( temp_drive = DriveData(
drive_index=0,\ drive_index=0,
drive_state="shredded",\ drive_state="shredded",
modelfamiliy="Toshiba 2.5\\ HDD MK..65GSSX",\ modelfamily='Toshiba 2.5" HDD MK..65GSSX',
modelname="TOSHIBA MK3265GSDX",\ modelname="TOSHIBA MK3265GSDX",
capacity=343597383680,\ capacity=343597383000,
serialnumber="YG6742U56UDRL123456789ABCDEFGJKL",\ serialnumber="YG6742U56UDRL123456789ABCDEFGJKL",
power_on_hours=7074,\ power_on_hours=7074,
power_cycle=4792,\ power_cycle=4792,
smart_error_count=1,\ smart_error_count=1,
shred_timestamp=1647937421,\ shred_timestamp=datetime.datetime.now(datetime.timezone.utc).timestamp(),
shred_duration=81718) shred_duration=81718,
)
generate_image(temp_drive, rehdd_info, "output.png") generate_image(temp_drive, rehdd_info, "output.png")
if __name__ == "__main__": if __name__ == "__main__":
main() main()

Binary file not shown.

Before

Width:  |  Height:  |  Size: 5.3 KiB

After

Width:  |  Height:  |  Size: 4.4 KiB

View File

@ -3,75 +3,215 @@
""" Author: Hendrik Schutter, localhorst@mosad.xyz """ Author: Hendrik Schutter, localhorst@mosad.xyz
Date of creation: 2022/11/23 Date of creation: 2022/11/23
Date of last modification: 2022/11/23 Date of last modification: 2025/06/15
""" """
import sysv_ipc import ctypes
import pycstruct
import os import os
import time import time
import signal
import argparse
import warnings
import logging
from PIL import Image
from brother_ql.brother_ql_create import create_label from brother_ql.brother_ql_create import create_label
from brother_ql.raster import BrotherQLRaster from brother_ql.raster import BrotherQLRaster
import layouter import layouter
str_buffer_size = 64 #keep this synchronous to reHDD # Suppress deprecation and printer warnings
msg_queue_key = 0x1B11193C0 #keep this synchronous to reHDD warnings.filterwarnings("ignore", category=DeprecationWarning)
logging.getLogger("brother_ql").setLevel(logging.ERROR)
# Constants
STR_BUFFER_SIZE = 64
MSG_QUEUE_KEY = 0x1B11193C0
IPC_CREAT = 0o1000
file_name = "output.png" file_name = "output.png"
printer_path = "/dev/usb/lp0" printer_path = "/dev/usb/lp0"
def get_struct_format(): terminate = False
#keep this synchronous to struct in reHDD
driveData = pycstruct.StructDef()
driveData.add('utf-8', 'driveIndex', length=str_buffer_size) class TDriveData(ctypes.Structure):
driveData.add('utf-8', 'driveHours', length=str_buffer_size) _fields_ = [
driveData.add('utf-8', 'driveCycles', length=str_buffer_size) ("caDriveIndex", ctypes.c_char * STR_BUFFER_SIZE),
driveData.add('utf-8', 'driveErrors', length=str_buffer_size) ("caDriveHours", ctypes.c_char * STR_BUFFER_SIZE),
driveData.add('utf-8', 'driveShredTimestamp', length=str_buffer_size) ("caDriveCycles", ctypes.c_char * STR_BUFFER_SIZE),
driveData.add('utf-8', 'driveShredDuration', length=str_buffer_size) ("caDriveErrors", ctypes.c_char * STR_BUFFER_SIZE),
driveData.add('utf-8', 'driveCapacity', length=str_buffer_size) ("caDriveShredTimestamp", ctypes.c_char * STR_BUFFER_SIZE),
driveData.add('utf-8', 'driveState', length=str_buffer_size) ("caDriveShredDuration", ctypes.c_char * STR_BUFFER_SIZE),
driveData.add('utf-8', 'driveModelFamiliy', length=str_buffer_size) ("caDriveCapacity", ctypes.c_char * STR_BUFFER_SIZE),
driveData.add('utf-8', 'driveModelName', length=str_buffer_size) ("caDriveState", ctypes.c_char * STR_BUFFER_SIZE),
driveData.add('utf-8', 'driveSerialnumber', length=str_buffer_size) ("caDriveModelFamily", ctypes.c_char * STR_BUFFER_SIZE),
driveData.add('utf-8', 'driveReHddVersion', length=str_buffer_size) ("caDriveModelName", ctypes.c_char * STR_BUFFER_SIZE),
return driveData ("caDriveSerialnumber", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveReHddVersion", ctypes.c_char * STR_BUFFER_SIZE),
]
class TMsgQueueData(ctypes.Structure):
_fields_ = [
("msg_queue_type", ctypes.c_long),
("driveData", TDriveData),
]
# IPC bindings
libc = ctypes.CDLL("libc.so.6")
msgget = libc.msgget
msgrcv = libc.msgrcv
msgget.argtypes = [ctypes.c_int, ctypes.c_int]
msgget.restype = ctypes.c_int
msgrcv.argtypes = [
ctypes.c_int,
ctypes.POINTER(TMsgQueueData),
ctypes.c_size_t,
ctypes.c_long,
ctypes.c_int,
]
msgrcv.restype = ctypes.c_ssize_t
def signal_handler(signum, frame):
global terminate
print(f"Signal {signum} received, terminating...")
terminate = True
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
def wait_for_printer():
while not os.path.exists(printer_path):
print("Printer not found, waiting ...")
time.sleep(30)
return True
def create_drive_objects(drive_info):
"""Convert dictionary to layouter-compatible DriveData and ReHddInfo objects"""
drive = layouter.DriveData(
drive_index=int(drive_info["driveIndex"]),
drive_state=drive_info["driveState"],
modelfamily=drive_info["driveModelFamily"],
modelname=drive_info["driveModelName"],
capacity=int(drive_info["driveCapacity"]),
serialnumber=drive_info["driveSerialnumber"],
power_on_hours=int(drive_info["driveHours"]),
power_cycle=int(drive_info["driveCycles"]),
smart_error_count=int(drive_info["driveErrors"]),
shred_timestamp=int(drive_info["driveShredTimestamp"]),
shred_duration=int(drive_info["driveShredDuration"]),
)
rehdd_info = layouter.ReHddInfo(
link="https://git.mosad.xyz/localhorst/reHDD",
version=drive_info["driveReHddVersion"],
)
return drive, rehdd_info
def worker(queue_id, test_mode=False):
try:
while not terminate:
if test_mode:
drive_info = {
"driveIndex": "42",
"driveHours": 44,
"driveCycles": 45,
"driveErrors": 43,
"driveShredTimestamp": int(time.time()),
"driveShredDuration": 0,
"driveCapacity": 42,
"driveState": "shredded",
"driveModelFamily": "modelFamily",
"driveModelName": "modelName",
"driveSerialnumber": "serial",
"driveReHddVersion": "V1.1.2",
}
else:
msg = TMsgQueueData()
result = msgrcv(
queue_id,
ctypes.byref(msg),
ctypes.sizeof(TMsgQueueData) - ctypes.sizeof(ctypes.c_long),
0,
0,
)
if result == -1:
err = ctypes.get_errno()
print(f"Error reading from message queue: {os.strerror(err)}")
break
d = msg.driveData
drive_info = {
"driveIndex": d.caDriveIndex.decode().strip("\x00"),
"driveHours": int(d.caDriveHours.decode().strip("\x00")),
"driveCycles": int(d.caDriveCycles.decode().strip("\x00")),
"driveErrors": int(d.caDriveErrors.decode().strip("\x00")),
"driveShredTimestamp": int(
d.caDriveShredTimestamp.decode().strip("\x00")
),
"driveShredDuration": int(
d.caDriveShredDuration.decode().strip("\x00")
),
"driveCapacity": int(d.caDriveCapacity.decode().strip("\x00")),
"driveState": d.caDriveState.decode().strip("\x00"),
"driveModelFamily": d.caDriveModelFamily.decode().strip("\x00"),
"driveModelName": d.caDriveModelName.decode().strip("\x00"),
"driveSerialnumber": d.caDriveSerialnumber.decode().strip("\x00"),
"driveReHddVersion": d.caDriveReHddVersion.decode().strip("\x00"),
}
time.sleep(3)
print(f"Received Drive Data: {drive_info}")
drive_obj, rehdd_info = create_drive_objects(drive_info)
layouter.generate_image(drive_obj, rehdd_info, file_name)
if wait_for_printer():
qlr = BrotherQLRaster("QL-570")
image = Image.open(file_name)
create_label(qlr, image, "62")
with open(printer_path, "wb") as file:
file.write(qlr.data)
os.remove(file_name)
else:
print("Skipping printing due to printer unavailability.")
if test_mode:
break
except Exception as e:
print(f"Worker encountered an error: {e}")
def main(): def main():
try: parser = argparse.ArgumentParser()
mq = sysv_ipc.MessageQueue(msg_queue_key, sysv_ipc.IPC_CREAT) parser.add_argument(
"--test", action="store_true", help="Run in test mode with fake data"
)
args = parser.parse_args()
while True: if args.test:
message, mtype = mq.receive() print("Running in test mode.")
driveData = get_struct_format().deserialize(message) worker(None, test_mode=True)
return
while True:
try:
queue_id = msgget(MSG_QUEUE_KEY, 0)
if queue_id == -1:
raise RuntimeError("Failed to connect to the existing message queue.")
worker(queue_id)
except Exception as e:
print(f"Main process encountered an error: {e}")
time.sleep(30)
rehdd_info = layouter.ReHddInfo("https://git.mosad.xyz/localhorst/reHDD", driveData['driveReHddVersion'])
drive = layouter.DriveData(
drive_index=int(driveData['driveIndex']),\
drive_state=str(driveData['driveState']),\
modelfamiliy=str(driveData['driveModelFamiliy']),\
modelname=str(driveData['driveModelName']),\
capacity=int(driveData['driveCapacity']),\
serialnumber=str(driveData['driveSerialnumber']),\
power_on_hours=int(driveData['driveHours']),\
power_cycle=int(driveData['driveCycles']),\
smart_error_count=int(driveData['driveErrors']),\
shred_timestamp=int(driveData['driveShredTimestamp']),\
shred_duration=int(driveData['driveShredDuration']))
while(not os.path.exists(printer_path)):
print("Printer not found, waiting ...")
time.sleep(30) #sleep 30
layouter.generate_image(drive, rehdd_info, file_name)
qlr = BrotherQLRaster("QL-570")
create_label(qlr, file_name, '62')
with open(printer_path, 'wb') as file:
file.write(qlr.data)
os.remove(file_name)
except sysv_ipc.ExistentialError:
print("ERROR: message queue creation failed")
if __name__ == "__main__": if __name__ == "__main__":
main() main()