Compare commits

...

17 Commits

Author SHA1 Message Date
d6c4b24149 fix ipc api 2025-06-22 15:08:54 +02:00
c10b1cd342 Merge pull request 'Shrink label size to allow support for nvme drives' (#19) from feature/nvme-label into main
Reviewed-on: #19
2025-06-22 13:34:16 +02:00
e06d10caee shorten qrcode data 2025-06-22 13:32:56 +02:00
880ff8c6e3 increase font size 2025-06-22 12:56:40 +02:00
6ec782609c shrink more 2025-06-22 12:54:48 +02:00
cbf4c3c273 shrink label 2025-06-22 12:43:13 +02:00
954bed9e56 Merge pull request 'wait for IPC creation' (#18) from fix/ipc-startup into main
Reviewed-on: #18
2025-06-15 18:36:41 +02:00
ad2da922db wait for IPC creation 2025-06-15 18:35:33 +02:00
6b3dee9864 Merge pull request 'feature/native-ipc' (#17) from feature/native-ipc into main
Reviewed-on: #17
2025-06-15 18:05:02 +02:00
211bf80b91 use libc IPC 2025-06-15 18:03:51 +02:00
0b94ac2a3b try native ipc 2025-06-08 22:59:52 +02:00
7b4dfebbdc cleanup 2025-06-08 20:21:42 +02:00
63ba1e8d1d Merge branch 'wait_for_printer' 2023-01-19 19:56:20 +01:00
5bf2ab8b2e cut SN at start 2023-01-19 19:54:34 +01:00
718f81b8e7 Merge pull request 'fix for printer standby' (#13) from wait_for_printer into main
Reviewed-on: #13
2022-12-11 11:19:39 +01:00
94bb43f8cd give hint to disable standby 2022-12-11 11:11:47 +01:00
58c5c98265 wait for printer 2022-12-07 21:25:09 +01:00
4 changed files with 499 additions and 196 deletions

View File

@ -8,7 +8,7 @@
## Install ## ## Install ##
`pip install qrcode sysv-ipc pycstruct brother-ql` `pip install qrcode brother-ql`
``` ```
cd /root/ cd /root/
@ -33,8 +33,9 @@ systemctl enable --now /lib/systemd/system/reHDDPrinter.service
- Brother QL-570 - Brother QL-570
- Paper With: 62mm or 696px - Paper With: 62mm or 696px
**Hint: Some Brother printers have enabled standby mode. This will disconnect the printer. See [Issue #12](https://git.mosad.xyz/localhorst/reHDDPrinter/issues/12) to disable standby.**
see https://github.com/pklaus/brother_ql see https://github.com/pklaus/brother_ql for details for printer access

View File

@ -1,210 +1,365 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
""" Author: Hendrik Schutter, localhorst@mosad.xyz """
Date of creation: 2022/11/16 Author: Hendrik Schutter, localhorst@mosad.xyz
Date of last modification: 2022/11/23 Date of creation: 2022/11/16
Date of last modification: 2025/06/08
""" """
import re import re
import dataclasses import dataclasses
import glob import glob
import datetime import datetime
import json import json
import logging
import qrcode import qrcode
from PIL import Image from PIL import Image, ImageFont, ImageDraw
from PIL import ImageFont
from PIL import ImageDraw # Constants
FONT_PATH = "/usr/share/fonts"
DEFAULT_FONT_REGULAR = "DejaVuSans.ttf"
DEFAULT_FONT_BOLD = "DejaVuSans-Bold.ttf"
OUTPUT_WIDTH = 696 # px
OUTPUT_HEIGHT = 190 # px
TEXT_X_OFFSET = 190 # px
QR_CODE_SIZE = 179 # px
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
@dataclasses.dataclass @dataclasses.dataclass
class DriveData: class DriveData:
drive_index: int drive_index: int
drive_state: str #none, deleted, shredded drive_state: str
modelfamiliy: str modelfamily: str
modelname: str modelname: str
capacity: int #in bytes capacity: int
serialnumber: str serialnumber: str
power_on_hours: int #in hours power_on_hours: int
power_cycle: int power_cycle: int
smart_error_count: int smart_error_count: int
shred_timestamp: int #unix timestamp shred_timestamp: int
shred_duration: int #in seconds shred_duration: int
@dataclasses.dataclass
class DriveDataJson:
state: str
fam: str
name: str
cap: int
sn: str
poh: int
pc: int
err: int
time: int
dur: int
@dataclasses.dataclass @dataclasses.dataclass
class DriveDataPrintable: class DriveDataPrintable:
modelfamiliy: str #max lenght 25 modelfamily: str
modelname: str #max lenght 25 modelname: str
capacity: str #max lenght 25, in human-readable format with unit (GB/TB) capacity: str
serialnumber: str #max lenght 25 serialnumber: str
power_on_hours: str #max lenght 25, in hours and days and years power_on_hours: str
power_cycle: str #max lenght 25 power_cycle: str
smart_error_count: str #max lenght 25 smart_error_count: str
shred_timestamp: str #max lenght 25, human-readable shred_timestamp: str
shred_duration: str #max lenght 25, human-readable shred_duration: str
@dataclasses.dataclass @dataclasses.dataclass
class ReHddInfo: class ReHddInfo:
link: str link: str
version: str version: str
@dataclasses.dataclass @dataclasses.dataclass
class DriveDataJson: class QrDataJson:
drive: DriveData drive: DriveDataJson
rehdd: ReHddInfo rehdd: ReHddInfo
def get_font_path_regular():
path = "/usr/share/fonts"
files = glob.glob(path + "/**/DejaVuSans.ttf", recursive = True)
return files[0]
def get_font_path_bold(): def find_font_path(font_name):
path = "/usr/share/fonts" """Finds the full path of the specified font."""
files = glob.glob(path + "/**/DejaVuSans-Bold.ttf", recursive = True) try:
return files[0] files = glob.glob(f"{FONT_PATH}/**/{font_name}", recursive=True)
return files[0] if files else None
except Exception as e:
logging.error(f"Error locating font {font_name}: {e}")
return None
def human_readable_capacity_1024(size, decimal_places=0):
for unit in ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB']:
if size < 1024.0 or unit == 'PiB':
break
size /= 1024.0
return f"{size:.{decimal_places}f} {unit}"
def human_readable_capacity_1000(size, decimal_places=0): def human_readable_capacity(size, decimal_places=0, base=1000):
for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB']: """Converts bytes to a human-readable string."""
if size < 1000.0 or unit == 'PB': units = (
break ["B", "KB", "MB", "GB", "TB", "PB"]
size /= 1000.0 if base == 1000
return f"{size:.{decimal_places}f} {unit}" else ["B", "KiB", "MiB", "GiB", "TiB", "PiB"]
)
for unit in units:
if size < base or unit == units[-1]:
return f"{size:.{decimal_places}f} {unit}"
size /= base
def human_readable_power_on_hours(hours):
"""Converts hours to human-readable string in hours, days, and years."""
return (
str(hours)
+ "h or "
+ str(int(hours / 24))
+ "d or "
+ str("{:.2f}".format(float(hours / 24 / 365)))
+ "y"
)
def cut_string(max_length, data, direction="end"):
"""Trims a string to the maximum length, adding ellipses if necessary."""
if len(data) > max_length:
return (
f"{data[:max_length-4]} ..."
if direction == "end"
else f"... {data[-(max_length-4):]}"
)
return data
def human_readable_power_on_hours(hours, decimal_places=2):
return str(hours) + "h or " + str(int(hours/24)) + "d or " + str("{:.2f}".format(float(hours/24/365))) + "y"
def cut_string(max_lenght, data):
if (len(data) > max_lenght):
return data[0:(max_lenght-4)] + " ..."
else:
return data
def format_to_printable(drive): def format_to_printable(drive):
return DriveDataPrintable( return DriveDataPrintable(
cut_string(20, re.sub(r"[^a-zA-Z0-9. ]", "", drive.modelfamiliy)),\ cut_string(20, re.sub(r"[^a-zA-Z0-9. ]", "", drive.modelfamily), "end"),
cut_string(20, re.sub(r"[^a-zA-Z0-9. ]", "", drive.modelname)),\ cut_string(20, re.sub(r"[^a-zA-Z0-9. ]", "", drive.modelname), "end"),
cut_string(20, human_readable_capacity_1000(drive.capacity)),\ cut_string(20, human_readable_capacity(drive.capacity), "end"),
cut_string(16, re.sub(r"[^a-zA-Z0-9.-_]", "", drive.serialnumber)),\ cut_string(20, re.sub(r"[^a-zA-Z0-9.-_]", "", drive.serialnumber), "start"),
cut_string(30, human_readable_power_on_hours(drive.power_on_hours)),\ cut_string(30, human_readable_power_on_hours(drive.power_on_hours), "end"),
cut_string(10, str(drive.power_cycle)),\ cut_string(10, str(drive.power_cycle), "end"),
cut_string(10, str(drive.smart_error_count)),\ cut_string(10, str(drive.smart_error_count), "end"),
cut_string(30, datetime.datetime.utcfromtimestamp(drive.shred_timestamp).strftime('%Y-%m-%d %H:%M:%S')),\ cut_string(
cut_string(30, str(datetime.timedelta(seconds = drive.shred_duration)))) 30,
datetime.datetime.fromtimestamp(
drive.shred_timestamp, datetime.UTC
).strftime("%Y-%m-%d %H:%M:%S"),
"end",
),
cut_string(30, str(datetime.timedelta(seconds=drive.shred_duration)), "end"),
)
def draw_text(drawable, printable_data, text_x_offset):
try:
font_file_regular = get_font_path_regular()
font_file_bold = get_font_path_bold()
except Exception as ex:
print("unable to find font: " + str(ex))
return
font_size = 20
text_y_offset = 10
text_y_offset_increment = 25
value_colum_x_offset = 120
drawable.text((text_x_offset, text_y_offset), printable_data.serialnumber,(0),font=ImageFont.truetype(font_file_bold, 30)) def draw_text(drawable, printable_data, font_regular, font_bold, font_bold_bigger):
text_y_offset += 40 """Draws formatted text with Cycles and Errors on one row."""
drawable.text((text_x_offset, text_y_offset), "Familiy: ",(0),font=ImageFont.truetype(font_file_bold, font_size)) line_height = 26
drawable.text((text_x_offset+value_colum_x_offset, text_y_offset), printable_data.modelfamiliy,(0),font=ImageFont.truetype(font_file_regular, font_size)) label_x = TEXT_X_OFFSET
text_y_offset += text_y_offset_increment value_offset = 115
drawable.text((text_x_offset, text_y_offset), "Model: ",(0),font=ImageFont.truetype(font_file_bold, font_size)) right_field_spacing = 200 # Horizontal gap between Cycles and Errors
drawable.text((text_x_offset+value_colum_x_offset, text_y_offset), printable_data.modelname,(0),font=ImageFont.truetype(font_file_regular, font_size)) x_capacity = 520
text_y_offset += text_y_offset_increment y_capacity = 142
drawable.text((text_x_offset, text_y_offset), "Hours: " ,(0),font=ImageFont.truetype(font_file_bold, font_size)) y_start = 4
drawable.text((text_x_offset+value_colum_x_offset, text_y_offset), printable_data.power_on_hours,(0),font=ImageFont.truetype(font_file_regular, font_size))
text_y_offset += text_y_offset_increment
drawable.text((text_x_offset, text_y_offset), "Cycles: ",(0),font=ImageFont.truetype(font_file_bold, font_size))
drawable.text((text_x_offset+value_colum_x_offset, text_y_offset), printable_data.power_cycle,(0),font=ImageFont.truetype(font_file_regular, font_size))
text_y_offset += text_y_offset_increment
drawable.text((text_x_offset, text_y_offset), "Errors: ", (0),font=ImageFont.truetype(font_file_bold, font_size))
drawable.text((text_x_offset+value_colum_x_offset, text_y_offset), printable_data.smart_error_count,(0),font=ImageFont.truetype(font_file_regular, font_size))
text_y_offset += text_y_offset_increment
drawable.text((text_x_offset, text_y_offset), "Shred on: ",(0),font=ImageFont.truetype(font_file_bold, font_size))
drawable.text((text_x_offset+value_colum_x_offset, text_y_offset), printable_data.shred_timestamp,(0),font=ImageFont.truetype(font_file_regular, font_size))
text_y_offset += text_y_offset_increment
drawable.text((text_x_offset, text_y_offset), "Duration: " ,(0),font=ImageFont.truetype(font_file_bold, font_size))
drawable.text((text_x_offset+value_colum_x_offset, text_y_offset), printable_data.shred_duration,(0),font=ImageFont.truetype(font_file_regular, font_size))
text_y_offset += text_y_offset_increment
drawable.text((text_x_offset, text_y_offset), printable_data.capacity,(0),font=ImageFont.truetype(font_file_bold, font_size*3)) # Serial Number
drawable.text((label_x, y_start), "Serial:", fill=0, font=font_bold)
drawable.text(
(label_x + value_offset, y_start),
printable_data.serialnumber,
fill=0,
font=font_bold,
)
y1 = y_start + line_height
y2 = y1 + line_height
y3 = y2 + line_height
y4 = y3 + line_height
y5 = y4 + line_height
y6 = y5 + line_height
# Left-Aligned Fields (One per row)
drawable.text((label_x, y1), "Family:", fill=0, font=font_bold)
drawable.text(
(label_x + value_offset, y1),
printable_data.modelfamily,
fill=0,
font=font_regular,
)
drawable.text((label_x, y2), "Model:", fill=0, font=font_bold)
drawable.text(
(label_x + value_offset, y2),
printable_data.modelname,
fill=0,
font=font_regular,
)
drawable.text((label_x, y3), "Hours:", fill=0, font=font_bold)
drawable.text(
(label_x + value_offset, y3),
printable_data.power_on_hours,
fill=0,
font=font_regular,
)
# Cycles and Errors on the same line
drawable.text((label_x, y4), "Cycles:", fill=0, font=font_bold)
drawable.text(
(label_x + value_offset, y4),
printable_data.power_cycle,
fill=0,
font=font_regular,
)
drawable.text(
(label_x + right_field_spacing, y4), "Errors:", fill=0, font=font_bold
)
drawable.text(
(label_x + right_field_spacing + value_offset, y4),
printable_data.smart_error_count,
fill=0,
font=font_regular,
)
# Continue remaining fields
drawable.text((label_x, y5), "Shred on:", fill=0, font=font_bold)
drawable.text(
(label_x + value_offset, y5),
printable_data.shred_timestamp,
fill=0,
font=font_regular,
)
drawable.text((label_x, y6), "Duration:", fill=0, font=font_bold)
drawable.text(
(label_x + value_offset, y6),
printable_data.shred_duration,
fill=0,
font=font_regular,
)
# Capacity at the bottom
drawable.text(
(x_capacity, y_capacity),
printable_data.capacity,
fill=0,
font=font_bold_bigger,
)
def draw_outline(drawable, margin, width, output_width, output_height):
#upper
drawable.line([(margin,margin), (output_width -margin ,margin)], fill=None, width=width, joint=None)
#left
drawable.line([(margin,margin), (margin ,output_height-margin)], fill=None, width=width, joint=None)
#right
drawable.line([(output_width-margin,margin), (output_width-margin ,output_height-margin)], fill=None, width=width, joint=None)
#lower
drawable.line([(margin,output_height - margin), (output_width-margin,output_height -margin)], fill=None, width=width, joint=None)
def draw_qr_code(image, data): def draw_qr_code(image, data):
"""
Draws a QR code on the provided image at a specific location.
Parameters:
image (Image.Image): The target image.
data (str): The data to encode in the QR code.
"""
# Generate the QR code
qr_img = qrcode.make(data) qr_img = qrcode.make(data)
qr_img.thumbnail((291, 291), Image.Resampling.LANCZOS) qr_img = qr_img.convert("1") # Ensure QR code is in binary (black/white)
image.paste(qr_img, (7, 7))
# Remove white border
bbox = qr_img.getbbox() # Get the bounding box of the non-white area
qr_img = qr_img.crop(bbox)
# Resize to desired size
qr_img = qr_img.resize((QR_CODE_SIZE, QR_CODE_SIZE), Image.Resampling.LANCZOS)
# Paste the QR code onto the image
region = (5, 5, 5 + QR_CODE_SIZE, 5 + QR_CODE_SIZE)
image.paste(qr_img, box=region)
def draw_outline(drawable, margin, width, output_width, output_height):
"""
Draws a rectangular outline on the drawable canvas.
Parameters:
drawable (ImageDraw.Draw): The drawable canvas.
margin (int): The margin from the edges of the image.
width (int): The width of the outline.
output_width (int): The total width of the image.
output_height (int): The total height of the image.
"""
# Define the four corners of the rectangle, adjusting for line width
top_left = (margin, margin)
top_right = (output_width - margin - width, margin)
bottom_left = (margin, output_height - margin - width)
bottom_right = (output_width - margin - width, output_height - margin - width)
# Draw the outline lines with adjusted coordinates
lines = [
(top_left, top_right), # Top edge
(top_left, bottom_left), # Left edge
(top_right, bottom_right), # Right edge
(bottom_left, bottom_right), # Bottom edge
]
for line in lines:
drawable.line(line, fill=0, width=width)
def generate_image(drive, rehdd_info, output_file): def generate_image(drive, rehdd_info, output_file):
output_width = 696 #in px set by used paper """Generates an image containing drive data and a QR code."""
output_height = 300 #in px
text_x_offset= 300 #in px
qr_data = DriveDataJson(drive, rehdd_info)
try: try:
json_qr_daten = json.dumps(dataclasses.asdict(qr_data))
except Exception as ex:
print("unable to generate json: " + str(ex))
return
try: drive_json = DriveDataJson(
state=drive.drive_state,
fam=drive.modelfamily,
name=drive.modelname,
cap=drive.capacity,
sn=drive.serialnumber,
poh=drive.power_on_hours,
pc=drive.power_cycle,
err=drive.smart_error_count,
time=int(drive.shred_timestamp),
dur=drive.shred_duration,
)
qr_data = json.dumps(dataclasses.asdict(QrDataJson(drive_json, rehdd_info)))
printable_data = format_to_printable(drive) printable_data = format_to_printable(drive)
except Exception as ex: except Exception as e:
print("unable to format data: " + str(ex)) logging.error(f"Error preparing data: {e}")
return return
#print(printable_data)
#create black and white (binary) image with white background output_image = Image.new("1", (OUTPUT_WIDTH, OUTPUT_HEIGHT), "white")
output_image = Image.new('1', (output_width, output_height), "white")
#create draw pane
draw = ImageDraw.Draw(output_image) draw = ImageDraw.Draw(output_image)
draw_outline(draw, 1, 4, output_width, output_height) font_regular = ImageFont.truetype(find_font_path(DEFAULT_FONT_REGULAR), 20)
draw_text(draw, printable_data, text_x_offset) font_bold = ImageFont.truetype(find_font_path(DEFAULT_FONT_BOLD), 20)
draw_qr_code(output_image, str(json_qr_daten).replace(" ", "")) font_bold_bigger = ImageFont.truetype(find_font_path(DEFAULT_FONT_BOLD), 42)
output_image.save(output_file) draw_outline(draw, 0, 3, OUTPUT_WIDTH + 1, OUTPUT_HEIGHT + 1)
draw_text(draw, printable_data, font_regular, font_bold, font_bold_bigger)
draw_qr_code(output_image, qr_data)
try:
output_image.save(output_file)
logging.info(f"Image saved to {output_file}")
except Exception as e:
logging.error(f"Error saving image: {e}")
def main(): def main():
rehdd_info = ReHddInfo("https://git.mosad.xyz/localhorst/reHDD", "bV0.2.2")
rehdd_info = ReHddInfo("https://git.mosad.xyz/localhorst/reHDD", "bV0.2.2") # read this from rehdd process
temp_drive = DriveData( temp_drive = DriveData(
drive_index=0,\ drive_index=0,
drive_state="shredded",\ drive_state="shredded",
modelfamiliy="Toshiba 2.5\\ HDD MK..65GSSX",\ modelfamily='Toshiba 2.5" HDD MK..65GSSX',
modelname="TOSHIBA MK3265GSDX",\ modelname="TOSHIBA MK3265GSDX",
capacity=343597383680,\ capacity=343597383000,
serialnumber="YG6742U56UDRL123",\ serialnumber="YG6742U56UDRL123456789ABCDEFGJKL",
power_on_hours=7074,\ power_on_hours=7074,
power_cycle=4792,\ power_cycle=4792,
smart_error_count=1,\ smart_error_count=1,
shred_timestamp=1647937421,\ shred_timestamp=datetime.datetime.now(datetime.timezone.utc).timestamp(),
shred_duration=81718) shred_duration=81718,
)
generate_image(temp_drive, rehdd_info, "output.png") generate_image(temp_drive, rehdd_info, "output.png")
if __name__ == "__main__": if __name__ == "__main__":
main() main()

Binary file not shown.

Before

Width:  |  Height:  |  Size: 5.4 KiB

After

Width:  |  Height:  |  Size: 4.4 KiB

View File

@ -3,68 +3,215 @@
""" Author: Hendrik Schutter, localhorst@mosad.xyz """ Author: Hendrik Schutter, localhorst@mosad.xyz
Date of creation: 2022/11/23 Date of creation: 2022/11/23
Date of last modification: 2022/11/23 Date of last modification: 2025/06/15
""" """
import sysv_ipc import ctypes
import pycstruct
import os import os
import time
import signal
import argparse
import warnings
import logging
from PIL import Image
from brother_ql.brother_ql_create import create_label from brother_ql.brother_ql_create import create_label
from brother_ql.raster import BrotherQLRaster from brother_ql.raster import BrotherQLRaster
import layouter import layouter
str_buffer_size = 64 #keep this synchronous to reHDD # Suppress deprecation and printer warnings
msg_queue_key = 0x1B11193C0 #keep this synchronous to reHDD warnings.filterwarnings("ignore", category=DeprecationWarning)
logging.getLogger("brother_ql").setLevel(logging.ERROR)
# Constants
STR_BUFFER_SIZE = 64
MSG_QUEUE_KEY = 0x1B11193C0
IPC_CREAT = 0o1000
file_name = "output.png" file_name = "output.png"
printer_path = "/dev/usb/lp0"
terminate = False
class TDriveData(ctypes.Structure):
_fields_ = [
("caDriveIndex", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveHours", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveCycles", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveErrors", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveShredTimestamp", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveShredDuration", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveCapacity", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveState", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveModelFamily", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveModelName", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveSerialnumber", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveReHddVersion", ctypes.c_char * STR_BUFFER_SIZE),
]
class TMsgQueueData(ctypes.Structure):
_fields_ = [
("msg_queue_type", ctypes.c_long),
("driveData", TDriveData),
]
# IPC bindings
libc = ctypes.CDLL("libc.so.6")
msgget = libc.msgget
msgrcv = libc.msgrcv
msgget.argtypes = [ctypes.c_int, ctypes.c_int]
msgget.restype = ctypes.c_int
msgrcv.argtypes = [
ctypes.c_int,
ctypes.POINTER(TMsgQueueData),
ctypes.c_size_t,
ctypes.c_long,
ctypes.c_int,
]
msgrcv.restype = ctypes.c_ssize_t
def signal_handler(signum, frame):
global terminate
print(f"Signal {signum} received, terminating...")
terminate = True
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
def wait_for_printer():
while not os.path.exists(printer_path):
print("Printer not found, waiting ...")
time.sleep(30)
return True
def create_drive_objects(drive_info):
"""Convert dictionary to layouter-compatible DriveData and ReHddInfo objects"""
drive = layouter.DriveData(
drive_index=int(drive_info["driveIndex"]),
drive_state=drive_info["driveState"],
modelfamily=drive_info["driveModelFamily"],
modelname=drive_info["driveModelName"],
capacity=int(drive_info["driveCapacity"]),
serialnumber=drive_info["driveSerialnumber"],
power_on_hours=int(drive_info["driveHours"]),
power_cycle=int(drive_info["driveCycles"]),
smart_error_count=int(drive_info["driveErrors"]),
shred_timestamp=int(drive_info["driveShredTimestamp"]),
shred_duration=int(drive_info["driveShredDuration"]),
)
rehdd_info = layouter.ReHddInfo(
link="https://git.mosad.xyz/localhorst/reHDD",
version=drive_info["driveReHddVersion"],
)
return drive, rehdd_info
def worker(queue_id, test_mode=False):
try:
while not terminate:
if test_mode:
drive_info = {
"driveIndex": "42",
"driveHours": 44,
"driveCycles": 45,
"driveErrors": 43,
"driveShredTimestamp": int(time.time()),
"driveShredDuration": 0,
"driveCapacity": 42,
"driveState": "shredded",
"driveModelFamily": "modelFamily",
"driveModelName": "modelName",
"driveSerialnumber": "serial",
"driveReHddVersion": "V1.1.2",
}
else:
msg = TMsgQueueData()
result = msgrcv(
queue_id,
ctypes.byref(msg),
ctypes.sizeof(TMsgQueueData) - ctypes.sizeof(ctypes.c_long),
0,
0,
)
if result == -1:
err = ctypes.get_errno()
print(f"Error reading from message queue: {os.strerror(err)}")
break
d = msg.driveData
drive_info = {
"driveIndex": d.caDriveIndex.decode().strip("\x00"),
"driveHours": int(d.caDriveHours.decode().strip("\x00")),
"driveCycles": int(d.caDriveCycles.decode().strip("\x00")),
"driveErrors": int(d.caDriveErrors.decode().strip("\x00")),
"driveShredTimestamp": int(
d.caDriveShredTimestamp.decode().strip("\x00")
),
"driveShredDuration": int(
d.caDriveShredDuration.decode().strip("\x00")
),
"driveCapacity": int(d.caDriveCapacity.decode().strip("\x00")),
"driveState": d.caDriveState.decode().strip("\x00"),
"driveModelFamily": d.caDriveModelFamily.decode().strip("\x00"),
"driveModelName": d.caDriveModelName.decode().strip("\x00"),
"driveSerialnumber": d.caDriveSerialnumber.decode().strip("\x00"),
"driveReHddVersion": d.caDriveReHddVersion.decode().strip("\x00"),
}
time.sleep(3)
print(f"Received Drive Data: {drive_info}")
drive_obj, rehdd_info = create_drive_objects(drive_info)
layouter.generate_image(drive_obj, rehdd_info, file_name)
if wait_for_printer():
qlr = BrotherQLRaster("QL-570")
image = Image.open(file_name)
create_label(qlr, image, "62")
with open(printer_path, "wb") as file:
file.write(qlr.data)
os.remove(file_name)
else:
print("Skipping printing due to printer unavailability.")
if test_mode:
break
except Exception as e:
print(f"Worker encountered an error: {e}")
def get_struct_format():
#keep this synchronous to struct in reHDD
driveData = pycstruct.StructDef()
driveData.add('utf-8', 'driveIndex', length=str_buffer_size)
driveData.add('utf-8', 'driveHours', length=str_buffer_size)
driveData.add('utf-8', 'driveCycles', length=str_buffer_size)
driveData.add('utf-8', 'driveErrors', length=str_buffer_size)
driveData.add('utf-8', 'driveShredTimestamp', length=str_buffer_size)
driveData.add('utf-8', 'driveShredDuration', length=str_buffer_size)
driveData.add('utf-8', 'driveCapacity', length=str_buffer_size)
driveData.add('utf-8', 'driveState', length=str_buffer_size)
driveData.add('utf-8', 'driveModelFamiliy', length=str_buffer_size)
driveData.add('utf-8', 'driveModelName', length=str_buffer_size)
driveData.add('utf-8', 'driveSerialnumber', length=str_buffer_size)
driveData.add('utf-8', 'driveReHddVersion', length=str_buffer_size)
return driveData
def main(): def main():
try: parser = argparse.ArgumentParser()
mq = sysv_ipc.MessageQueue(msg_queue_key, sysv_ipc.IPC_CREAT) parser.add_argument(
"--test", action="store_true", help="Run in test mode with fake data"
)
args = parser.parse_args()
while True: if args.test:
message, mtype = mq.receive() print("Running in test mode.")
driveData = get_struct_format().deserialize(message) worker(None, test_mode=True)
return
while True:
try:
queue_id = msgget(MSG_QUEUE_KEY, 0)
if queue_id == -1:
raise RuntimeError("Failed to connect to the existing message queue.")
worker(queue_id)
except Exception as e:
print(f"Main process encountered an error: {e}")
time.sleep(30)
rehdd_info = layouter.ReHddInfo("https://git.mosad.xyz/localhorst/reHDD", driveData['driveReHddVersion'])
drive = layouter.DriveData(
drive_index=int(driveData['driveIndex']),\
drive_state=str(driveData['driveState']),\
modelfamiliy=str(driveData['driveModelFamiliy']),\
modelname=str(driveData['driveModelName']),\
capacity=int(driveData['driveCapacity']),\
serialnumber=str(driveData['driveSerialnumber']),\
power_on_hours=int(driveData['driveHours']),\
power_cycle=int(driveData['driveCycles']),\
smart_error_count=int(driveData['driveErrors']),\
shred_timestamp=int(driveData['driveShredTimestamp']),\
shred_duration=int(driveData['driveShredDuration']))
layouter.generate_image(drive, rehdd_info, file_name)
qlr = BrotherQLRaster("QL-570")
create_label(qlr, file_name, '62')
with open("/dev/usb/lp0", 'wb') as file:
file.write(qlr.data)
os.remove(file_name)
except sysv_ipc.ExistentialError:
print("ERROR: message queue creation failed")
if __name__ == "__main__": if __name__ == "__main__":
main() main()