Compare commits
14 Commits
718f81b8e7
...
main
Author | SHA1 | Date | |
---|---|---|---|
d6c4b24149 | |||
c10b1cd342 | |||
e06d10caee | |||
880ff8c6e3 | |||
6ec782609c | |||
cbf4c3c273 | |||
954bed9e56 | |||
ad2da922db | |||
6b3dee9864 | |||
211bf80b91 | |||
0b94ac2a3b | |||
7b4dfebbdc | |||
63ba1e8d1d | |||
5bf2ab8b2e |
@ -8,7 +8,7 @@
|
||||
|
||||
## Install ##
|
||||
|
||||
`pip install qrcode sysv-ipc pycstruct brother-ql`
|
||||
`pip install qrcode brother-ql`
|
||||
|
||||
```
|
||||
cd /root/
|
||||
|
429
layouter.py
429
layouter.py
@ -1,9 +1,10 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
""" Author: Hendrik Schutter, localhorst@mosad.xyz
|
||||
Date of creation: 2022/11/16
|
||||
Date of last modification: 2022/11/23
|
||||
"""
|
||||
Author: Hendrik Schutter, localhorst@mosad.xyz
|
||||
Date of creation: 2022/11/16
|
||||
Date of last modification: 2025/06/08
|
||||
"""
|
||||
|
||||
import re
|
||||
@ -11,197 +12,351 @@ import dataclasses
|
||||
import glob
|
||||
import datetime
|
||||
import json
|
||||
import logging
|
||||
import qrcode
|
||||
from PIL import Image
|
||||
from PIL import ImageFont
|
||||
from PIL import ImageDraw
|
||||
from PIL import Image, ImageFont, ImageDraw
|
||||
|
||||
# Constants
|
||||
FONT_PATH = "/usr/share/fonts"
|
||||
DEFAULT_FONT_REGULAR = "DejaVuSans.ttf"
|
||||
DEFAULT_FONT_BOLD = "DejaVuSans-Bold.ttf"
|
||||
OUTPUT_WIDTH = 696 # px
|
||||
OUTPUT_HEIGHT = 190 # px
|
||||
TEXT_X_OFFSET = 190 # px
|
||||
QR_CODE_SIZE = 179 # px
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
|
||||
)
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class DriveData:
|
||||
drive_index: int
|
||||
drive_state: str #none, deleted, shredded
|
||||
modelfamiliy: str
|
||||
drive_state: str
|
||||
modelfamily: str
|
||||
modelname: str
|
||||
capacity: int #in bytes
|
||||
capacity: int
|
||||
serialnumber: str
|
||||
power_on_hours: int #in hours
|
||||
power_on_hours: int
|
||||
power_cycle: int
|
||||
smart_error_count: int
|
||||
shred_timestamp: int #unix timestamp
|
||||
shred_duration: int #in seconds
|
||||
shred_timestamp: int
|
||||
shred_duration: int
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class DriveDataJson:
|
||||
state: str
|
||||
fam: str
|
||||
name: str
|
||||
cap: int
|
||||
sn: str
|
||||
poh: int
|
||||
pc: int
|
||||
err: int
|
||||
time: int
|
||||
dur: int
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class DriveDataPrintable:
|
||||
modelfamiliy: str #max lenght 25
|
||||
modelname: str #max lenght 25
|
||||
capacity: str #max lenght 25, in human-readable format with unit (GB/TB)
|
||||
serialnumber: str #max lenght 25
|
||||
power_on_hours: str #max lenght 25, in hours and days and years
|
||||
power_cycle: str #max lenght 25
|
||||
smart_error_count: str #max lenght 25
|
||||
shred_timestamp: str #max lenght 25, human-readable
|
||||
shred_duration: str #max lenght 25, human-readable
|
||||
modelfamily: str
|
||||
modelname: str
|
||||
capacity: str
|
||||
serialnumber: str
|
||||
power_on_hours: str
|
||||
power_cycle: str
|
||||
smart_error_count: str
|
||||
shred_timestamp: str
|
||||
shred_duration: str
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class ReHddInfo:
|
||||
link: str
|
||||
version: str
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class DriveDataJson:
|
||||
drive: DriveData
|
||||
class QrDataJson:
|
||||
drive: DriveDataJson
|
||||
rehdd: ReHddInfo
|
||||
|
||||
def get_font_path_regular():
|
||||
path = "/usr/share/fonts"
|
||||
files = glob.glob(path + "/**/DejaVuSans.ttf", recursive = True)
|
||||
return files[0]
|
||||
|
||||
def get_font_path_bold():
|
||||
path = "/usr/share/fonts"
|
||||
files = glob.glob(path + "/**/DejaVuSans-Bold.ttf", recursive = True)
|
||||
return files[0]
|
||||
def find_font_path(font_name):
|
||||
"""Finds the full path of the specified font."""
|
||||
try:
|
||||
files = glob.glob(f"{FONT_PATH}/**/{font_name}", recursive=True)
|
||||
return files[0] if files else None
|
||||
except Exception as e:
|
||||
logging.error(f"Error locating font {font_name}: {e}")
|
||||
return None
|
||||
|
||||
def human_readable_capacity_1024(size, decimal_places=0):
|
||||
for unit in ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB']:
|
||||
if size < 1024.0 or unit == 'PiB':
|
||||
break
|
||||
size /= 1024.0
|
||||
|
||||
def human_readable_capacity(size, decimal_places=0, base=1000):
|
||||
"""Converts bytes to a human-readable string."""
|
||||
units = (
|
||||
["B", "KB", "MB", "GB", "TB", "PB"]
|
||||
if base == 1000
|
||||
else ["B", "KiB", "MiB", "GiB", "TiB", "PiB"]
|
||||
)
|
||||
for unit in units:
|
||||
if size < base or unit == units[-1]:
|
||||
return f"{size:.{decimal_places}f} {unit}"
|
||||
size /= base
|
||||
|
||||
def human_readable_capacity_1000(size, decimal_places=0):
|
||||
for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB']:
|
||||
if size < 1000.0 or unit == 'PB':
|
||||
break
|
||||
size /= 1000.0
|
||||
return f"{size:.{decimal_places}f} {unit}"
|
||||
|
||||
def human_readable_power_on_hours(hours, decimal_places=2):
|
||||
return str(hours) + "h or " + str(int(hours/24)) + "d or " + str("{:.2f}".format(float(hours/24/365))) + "y"
|
||||
def human_readable_power_on_hours(hours):
|
||||
"""Converts hours to human-readable string in hours, days, and years."""
|
||||
return (
|
||||
str(hours)
|
||||
+ "h or "
|
||||
+ str(int(hours / 24))
|
||||
+ "d or "
|
||||
+ str("{:.2f}".format(float(hours / 24 / 365)))
|
||||
+ "y"
|
||||
)
|
||||
|
||||
def cut_string(max_lenght, data):
|
||||
if (len(data) > max_lenght):
|
||||
return data[0:(max_lenght-4)] + " ..."
|
||||
else:
|
||||
|
||||
def cut_string(max_length, data, direction="end"):
|
||||
"""Trims a string to the maximum length, adding ellipses if necessary."""
|
||||
if len(data) > max_length:
|
||||
return (
|
||||
f"{data[:max_length-4]} ..."
|
||||
if direction == "end"
|
||||
else f"... {data[-(max_length-4):]}"
|
||||
)
|
||||
return data
|
||||
|
||||
|
||||
def format_to_printable(drive):
|
||||
return DriveDataPrintable(
|
||||
cut_string(20, re.sub(r"[^a-zA-Z0-9. ]", "", drive.modelfamiliy)),\
|
||||
cut_string(20, re.sub(r"[^a-zA-Z0-9. ]", "", drive.modelname)),\
|
||||
cut_string(20, human_readable_capacity_1000(drive.capacity)),\
|
||||
cut_string(16, re.sub(r"[^a-zA-Z0-9.-_]", "", drive.serialnumber)),\
|
||||
cut_string(30, human_readable_power_on_hours(drive.power_on_hours)),\
|
||||
cut_string(10, str(drive.power_cycle)),\
|
||||
cut_string(10, str(drive.smart_error_count)),\
|
||||
cut_string(30, datetime.datetime.utcfromtimestamp(drive.shred_timestamp).strftime('%Y-%m-%d %H:%M:%S')),\
|
||||
cut_string(30, str(datetime.timedelta(seconds = drive.shred_duration))))
|
||||
cut_string(20, re.sub(r"[^a-zA-Z0-9. ]", "", drive.modelfamily), "end"),
|
||||
cut_string(20, re.sub(r"[^a-zA-Z0-9. ]", "", drive.modelname), "end"),
|
||||
cut_string(20, human_readable_capacity(drive.capacity), "end"),
|
||||
cut_string(20, re.sub(r"[^a-zA-Z0-9.-_]", "", drive.serialnumber), "start"),
|
||||
cut_string(30, human_readable_power_on_hours(drive.power_on_hours), "end"),
|
||||
cut_string(10, str(drive.power_cycle), "end"),
|
||||
cut_string(10, str(drive.smart_error_count), "end"),
|
||||
cut_string(
|
||||
30,
|
||||
datetime.datetime.fromtimestamp(
|
||||
drive.shred_timestamp, datetime.UTC
|
||||
).strftime("%Y-%m-%d %H:%M:%S"),
|
||||
"end",
|
||||
),
|
||||
cut_string(30, str(datetime.timedelta(seconds=drive.shred_duration)), "end"),
|
||||
)
|
||||
|
||||
def draw_text(drawable, printable_data, text_x_offset):
|
||||
try:
|
||||
font_file_regular = get_font_path_regular()
|
||||
font_file_bold = get_font_path_bold()
|
||||
except Exception as ex:
|
||||
print("unable to find font: " + str(ex))
|
||||
return
|
||||
|
||||
font_size = 20
|
||||
text_y_offset = 10
|
||||
text_y_offset_increment = 25
|
||||
value_colum_x_offset = 120
|
||||
def draw_text(drawable, printable_data, font_regular, font_bold, font_bold_bigger):
|
||||
"""Draws formatted text with Cycles and Errors on one row."""
|
||||
|
||||
drawable.text((text_x_offset, text_y_offset), printable_data.serialnumber,(0),font=ImageFont.truetype(font_file_bold, 30))
|
||||
text_y_offset += 40
|
||||
line_height = 26
|
||||
label_x = TEXT_X_OFFSET
|
||||
value_offset = 115
|
||||
right_field_spacing = 200 # Horizontal gap between Cycles and Errors
|
||||
x_capacity = 520
|
||||
y_capacity = 142
|
||||
y_start = 4
|
||||
|
||||
drawable.text((text_x_offset, text_y_offset), "Familiy: ",(0),font=ImageFont.truetype(font_file_bold, font_size))
|
||||
drawable.text((text_x_offset+value_colum_x_offset, text_y_offset), printable_data.modelfamiliy,(0),font=ImageFont.truetype(font_file_regular, font_size))
|
||||
text_y_offset += text_y_offset_increment
|
||||
drawable.text((text_x_offset, text_y_offset), "Model: ",(0),font=ImageFont.truetype(font_file_bold, font_size))
|
||||
drawable.text((text_x_offset+value_colum_x_offset, text_y_offset), printable_data.modelname,(0),font=ImageFont.truetype(font_file_regular, font_size))
|
||||
text_y_offset += text_y_offset_increment
|
||||
drawable.text((text_x_offset, text_y_offset), "Hours: " ,(0),font=ImageFont.truetype(font_file_bold, font_size))
|
||||
drawable.text((text_x_offset+value_colum_x_offset, text_y_offset), printable_data.power_on_hours,(0),font=ImageFont.truetype(font_file_regular, font_size))
|
||||
text_y_offset += text_y_offset_increment
|
||||
drawable.text((text_x_offset, text_y_offset), "Cycles: ",(0),font=ImageFont.truetype(font_file_bold, font_size))
|
||||
drawable.text((text_x_offset+value_colum_x_offset, text_y_offset), printable_data.power_cycle,(0),font=ImageFont.truetype(font_file_regular, font_size))
|
||||
text_y_offset += text_y_offset_increment
|
||||
drawable.text((text_x_offset, text_y_offset), "Errors: ", (0),font=ImageFont.truetype(font_file_bold, font_size))
|
||||
drawable.text((text_x_offset+value_colum_x_offset, text_y_offset), printable_data.smart_error_count,(0),font=ImageFont.truetype(font_file_regular, font_size))
|
||||
text_y_offset += text_y_offset_increment
|
||||
drawable.text((text_x_offset, text_y_offset), "Shred on: ",(0),font=ImageFont.truetype(font_file_bold, font_size))
|
||||
drawable.text((text_x_offset+value_colum_x_offset, text_y_offset), printable_data.shred_timestamp,(0),font=ImageFont.truetype(font_file_regular, font_size))
|
||||
text_y_offset += text_y_offset_increment
|
||||
drawable.text((text_x_offset, text_y_offset), "Duration: " ,(0),font=ImageFont.truetype(font_file_bold, font_size))
|
||||
drawable.text((text_x_offset+value_colum_x_offset, text_y_offset), printable_data.shred_duration,(0),font=ImageFont.truetype(font_file_regular, font_size))
|
||||
text_y_offset += text_y_offset_increment
|
||||
# Serial Number
|
||||
drawable.text((label_x, y_start), "Serial:", fill=0, font=font_bold)
|
||||
drawable.text(
|
||||
(label_x + value_offset, y_start),
|
||||
printable_data.serialnumber,
|
||||
fill=0,
|
||||
font=font_bold,
|
||||
)
|
||||
|
||||
drawable.text((text_x_offset, text_y_offset), printable_data.capacity,(0),font=ImageFont.truetype(font_file_bold, font_size*3))
|
||||
y1 = y_start + line_height
|
||||
y2 = y1 + line_height
|
||||
y3 = y2 + line_height
|
||||
y4 = y3 + line_height
|
||||
y5 = y4 + line_height
|
||||
y6 = y5 + line_height
|
||||
|
||||
# Left-Aligned Fields (One per row)
|
||||
drawable.text((label_x, y1), "Family:", fill=0, font=font_bold)
|
||||
drawable.text(
|
||||
(label_x + value_offset, y1),
|
||||
printable_data.modelfamily,
|
||||
fill=0,
|
||||
font=font_regular,
|
||||
)
|
||||
|
||||
drawable.text((label_x, y2), "Model:", fill=0, font=font_bold)
|
||||
drawable.text(
|
||||
(label_x + value_offset, y2),
|
||||
printable_data.modelname,
|
||||
fill=0,
|
||||
font=font_regular,
|
||||
)
|
||||
|
||||
drawable.text((label_x, y3), "Hours:", fill=0, font=font_bold)
|
||||
drawable.text(
|
||||
(label_x + value_offset, y3),
|
||||
printable_data.power_on_hours,
|
||||
fill=0,
|
||||
font=font_regular,
|
||||
)
|
||||
|
||||
# Cycles and Errors on the same line
|
||||
drawable.text((label_x, y4), "Cycles:", fill=0, font=font_bold)
|
||||
drawable.text(
|
||||
(label_x + value_offset, y4),
|
||||
printable_data.power_cycle,
|
||||
fill=0,
|
||||
font=font_regular,
|
||||
)
|
||||
|
||||
drawable.text(
|
||||
(label_x + right_field_spacing, y4), "Errors:", fill=0, font=font_bold
|
||||
)
|
||||
drawable.text(
|
||||
(label_x + right_field_spacing + value_offset, y4),
|
||||
printable_data.smart_error_count,
|
||||
fill=0,
|
||||
font=font_regular,
|
||||
)
|
||||
|
||||
# Continue remaining fields
|
||||
drawable.text((label_x, y5), "Shred on:", fill=0, font=font_bold)
|
||||
drawable.text(
|
||||
(label_x + value_offset, y5),
|
||||
printable_data.shred_timestamp,
|
||||
fill=0,
|
||||
font=font_regular,
|
||||
)
|
||||
|
||||
drawable.text((label_x, y6), "Duration:", fill=0, font=font_bold)
|
||||
drawable.text(
|
||||
(label_x + value_offset, y6),
|
||||
printable_data.shred_duration,
|
||||
fill=0,
|
||||
font=font_regular,
|
||||
)
|
||||
|
||||
# Capacity at the bottom
|
||||
drawable.text(
|
||||
(x_capacity, y_capacity),
|
||||
printable_data.capacity,
|
||||
fill=0,
|
||||
font=font_bold_bigger,
|
||||
)
|
||||
|
||||
def draw_outline(drawable, margin, width, output_width, output_height):
|
||||
#upper
|
||||
drawable.line([(margin,margin), (output_width -margin ,margin)], fill=None, width=width, joint=None)
|
||||
#left
|
||||
drawable.line([(margin,margin), (margin ,output_height-margin)], fill=None, width=width, joint=None)
|
||||
#right
|
||||
drawable.line([(output_width-margin,margin), (output_width-margin ,output_height-margin)], fill=None, width=width, joint=None)
|
||||
#lower
|
||||
drawable.line([(margin,output_height - margin), (output_width-margin,output_height -margin)], fill=None, width=width, joint=None)
|
||||
|
||||
def draw_qr_code(image, data):
|
||||
"""
|
||||
Draws a QR code on the provided image at a specific location.
|
||||
|
||||
Parameters:
|
||||
image (Image.Image): The target image.
|
||||
data (str): The data to encode in the QR code.
|
||||
"""
|
||||
# Generate the QR code
|
||||
qr_img = qrcode.make(data)
|
||||
qr_img.thumbnail((291, 291), Image.Resampling.LANCZOS)
|
||||
image.paste(qr_img, (7, 7))
|
||||
qr_img = qr_img.convert("1") # Ensure QR code is in binary (black/white)
|
||||
|
||||
# Remove white border
|
||||
bbox = qr_img.getbbox() # Get the bounding box of the non-white area
|
||||
qr_img = qr_img.crop(bbox)
|
||||
|
||||
# Resize to desired size
|
||||
qr_img = qr_img.resize((QR_CODE_SIZE, QR_CODE_SIZE), Image.Resampling.LANCZOS)
|
||||
|
||||
# Paste the QR code onto the image
|
||||
region = (5, 5, 5 + QR_CODE_SIZE, 5 + QR_CODE_SIZE)
|
||||
image.paste(qr_img, box=region)
|
||||
|
||||
|
||||
def draw_outline(drawable, margin, width, output_width, output_height):
|
||||
"""
|
||||
Draws a rectangular outline on the drawable canvas.
|
||||
|
||||
Parameters:
|
||||
drawable (ImageDraw.Draw): The drawable canvas.
|
||||
margin (int): The margin from the edges of the image.
|
||||
width (int): The width of the outline.
|
||||
output_width (int): The total width of the image.
|
||||
output_height (int): The total height of the image.
|
||||
"""
|
||||
# Define the four corners of the rectangle, adjusting for line width
|
||||
top_left = (margin, margin)
|
||||
top_right = (output_width - margin - width, margin)
|
||||
bottom_left = (margin, output_height - margin - width)
|
||||
bottom_right = (output_width - margin - width, output_height - margin - width)
|
||||
|
||||
# Draw the outline lines with adjusted coordinates
|
||||
lines = [
|
||||
(top_left, top_right), # Top edge
|
||||
(top_left, bottom_left), # Left edge
|
||||
(top_right, bottom_right), # Right edge
|
||||
(bottom_left, bottom_right), # Bottom edge
|
||||
]
|
||||
|
||||
for line in lines:
|
||||
drawable.line(line, fill=0, width=width)
|
||||
|
||||
|
||||
def generate_image(drive, rehdd_info, output_file):
|
||||
output_width = 696 #in px set by used paper
|
||||
output_height = 300 #in px
|
||||
text_x_offset= 300 #in px
|
||||
|
||||
qr_data = DriveDataJson(drive, rehdd_info)
|
||||
"""Generates an image containing drive data and a QR code."""
|
||||
try:
|
||||
json_qr_daten = json.dumps(dataclasses.asdict(qr_data))
|
||||
except Exception as ex:
|
||||
print("unable to generate json: " + str(ex))
|
||||
return
|
||||
|
||||
try:
|
||||
drive_json = DriveDataJson(
|
||||
state=drive.drive_state,
|
||||
fam=drive.modelfamily,
|
||||
name=drive.modelname,
|
||||
cap=drive.capacity,
|
||||
sn=drive.serialnumber,
|
||||
poh=drive.power_on_hours,
|
||||
pc=drive.power_cycle,
|
||||
err=drive.smart_error_count,
|
||||
time=int(drive.shred_timestamp),
|
||||
dur=drive.shred_duration,
|
||||
)
|
||||
|
||||
qr_data = json.dumps(dataclasses.asdict(QrDataJson(drive_json, rehdd_info)))
|
||||
printable_data = format_to_printable(drive)
|
||||
except Exception as ex:
|
||||
print("unable to format data: " + str(ex))
|
||||
except Exception as e:
|
||||
logging.error(f"Error preparing data: {e}")
|
||||
return
|
||||
|
||||
#print(printable_data)
|
||||
|
||||
#create black and white (binary) image with white background
|
||||
output_image = Image.new('1', (output_width, output_height), "white")
|
||||
|
||||
#create draw pane
|
||||
output_image = Image.new("1", (OUTPUT_WIDTH, OUTPUT_HEIGHT), "white")
|
||||
draw = ImageDraw.Draw(output_image)
|
||||
|
||||
draw_outline(draw, 1, 4, output_width, output_height)
|
||||
draw_text(draw, printable_data, text_x_offset)
|
||||
draw_qr_code(output_image, str(json_qr_daten).replace(" ", ""))
|
||||
font_regular = ImageFont.truetype(find_font_path(DEFAULT_FONT_REGULAR), 20)
|
||||
font_bold = ImageFont.truetype(find_font_path(DEFAULT_FONT_BOLD), 20)
|
||||
font_bold_bigger = ImageFont.truetype(find_font_path(DEFAULT_FONT_BOLD), 42)
|
||||
|
||||
draw_outline(draw, 0, 3, OUTPUT_WIDTH + 1, OUTPUT_HEIGHT + 1)
|
||||
draw_text(draw, printable_data, font_regular, font_bold, font_bold_bigger)
|
||||
draw_qr_code(output_image, qr_data)
|
||||
|
||||
try:
|
||||
output_image.save(output_file)
|
||||
logging.info(f"Image saved to {output_file}")
|
||||
except Exception as e:
|
||||
logging.error(f"Error saving image: {e}")
|
||||
|
||||
|
||||
def main():
|
||||
|
||||
rehdd_info = ReHddInfo("https://git.mosad.xyz/localhorst/reHDD", "bV0.2.2") # read this from rehdd process
|
||||
rehdd_info = ReHddInfo("https://git.mosad.xyz/localhorst/reHDD", "bV0.2.2")
|
||||
|
||||
temp_drive = DriveData(
|
||||
drive_index=0,\
|
||||
drive_state="shredded",\
|
||||
modelfamiliy="Toshiba 2.5\\ HDD MK..65GSSX",\
|
||||
modelname="TOSHIBA MK3265GSDX",\
|
||||
capacity=343597383680,\
|
||||
serialnumber="YG6742U56UDRL123",\
|
||||
power_on_hours=7074,\
|
||||
power_cycle=4792,\
|
||||
smart_error_count=1,\
|
||||
shred_timestamp=1647937421,\
|
||||
shred_duration=81718)
|
||||
drive_index=0,
|
||||
drive_state="shredded",
|
||||
modelfamily='Toshiba 2.5" HDD MK..65GSSX',
|
||||
modelname="TOSHIBA MK3265GSDX",
|
||||
capacity=343597383000,
|
||||
serialnumber="YG6742U56UDRL123456789ABCDEFGJKL",
|
||||
power_on_hours=7074,
|
||||
power_cycle=4792,
|
||||
smart_error_count=1,
|
||||
shred_timestamp=datetime.datetime.now(datetime.timezone.utc).timestamp(),
|
||||
shred_duration=81718,
|
||||
)
|
||||
|
||||
generate_image(temp_drive, rehdd_info, "output.png")
|
||||
|
||||
|
BIN
output.png
BIN
output.png
Binary file not shown.
Before Width: | Height: | Size: 5.4 KiB After Width: | Height: | Size: 4.4 KiB |
236
reHDDPrinter.py
236
reHDDPrinter.py
@ -3,75 +3,215 @@
|
||||
|
||||
""" Author: Hendrik Schutter, localhorst@mosad.xyz
|
||||
Date of creation: 2022/11/23
|
||||
Date of last modification: 2022/11/23
|
||||
Date of last modification: 2025/06/15
|
||||
"""
|
||||
|
||||
import sysv_ipc
|
||||
import pycstruct
|
||||
import ctypes
|
||||
import os
|
||||
import time
|
||||
import signal
|
||||
import argparse
|
||||
import warnings
|
||||
import logging
|
||||
from PIL import Image
|
||||
from brother_ql.brother_ql_create import create_label
|
||||
from brother_ql.raster import BrotherQLRaster
|
||||
import layouter
|
||||
|
||||
str_buffer_size = 64 #keep this synchronous to reHDD
|
||||
msg_queue_key = 0x1B11193C0 #keep this synchronous to reHDD
|
||||
# Suppress deprecation and printer warnings
|
||||
warnings.filterwarnings("ignore", category=DeprecationWarning)
|
||||
logging.getLogger("brother_ql").setLevel(logging.ERROR)
|
||||
|
||||
# Constants
|
||||
STR_BUFFER_SIZE = 64
|
||||
MSG_QUEUE_KEY = 0x1B11193C0
|
||||
IPC_CREAT = 0o1000
|
||||
|
||||
file_name = "output.png"
|
||||
printer_path = "/dev/usb/lp0"
|
||||
|
||||
def get_struct_format():
|
||||
#keep this synchronous to struct in reHDD
|
||||
driveData = pycstruct.StructDef()
|
||||
driveData.add('utf-8', 'driveIndex', length=str_buffer_size)
|
||||
driveData.add('utf-8', 'driveHours', length=str_buffer_size)
|
||||
driveData.add('utf-8', 'driveCycles', length=str_buffer_size)
|
||||
driveData.add('utf-8', 'driveErrors', length=str_buffer_size)
|
||||
driveData.add('utf-8', 'driveShredTimestamp', length=str_buffer_size)
|
||||
driveData.add('utf-8', 'driveShredDuration', length=str_buffer_size)
|
||||
driveData.add('utf-8', 'driveCapacity', length=str_buffer_size)
|
||||
driveData.add('utf-8', 'driveState', length=str_buffer_size)
|
||||
driveData.add('utf-8', 'driveModelFamiliy', length=str_buffer_size)
|
||||
driveData.add('utf-8', 'driveModelName', length=str_buffer_size)
|
||||
driveData.add('utf-8', 'driveSerialnumber', length=str_buffer_size)
|
||||
driveData.add('utf-8', 'driveReHddVersion', length=str_buffer_size)
|
||||
return driveData
|
||||
terminate = False
|
||||
|
||||
def main():
|
||||
try:
|
||||
mq = sysv_ipc.MessageQueue(msg_queue_key, sysv_ipc.IPC_CREAT)
|
||||
|
||||
while True:
|
||||
message, mtype = mq.receive()
|
||||
driveData = get_struct_format().deserialize(message)
|
||||
class TDriveData(ctypes.Structure):
|
||||
_fields_ = [
|
||||
("caDriveIndex", ctypes.c_char * STR_BUFFER_SIZE),
|
||||
("caDriveHours", ctypes.c_char * STR_BUFFER_SIZE),
|
||||
("caDriveCycles", ctypes.c_char * STR_BUFFER_SIZE),
|
||||
("caDriveErrors", ctypes.c_char * STR_BUFFER_SIZE),
|
||||
("caDriveShredTimestamp", ctypes.c_char * STR_BUFFER_SIZE),
|
||||
("caDriveShredDuration", ctypes.c_char * STR_BUFFER_SIZE),
|
||||
("caDriveCapacity", ctypes.c_char * STR_BUFFER_SIZE),
|
||||
("caDriveState", ctypes.c_char * STR_BUFFER_SIZE),
|
||||
("caDriveModelFamily", ctypes.c_char * STR_BUFFER_SIZE),
|
||||
("caDriveModelName", ctypes.c_char * STR_BUFFER_SIZE),
|
||||
("caDriveSerialnumber", ctypes.c_char * STR_BUFFER_SIZE),
|
||||
("caDriveReHddVersion", ctypes.c_char * STR_BUFFER_SIZE),
|
||||
]
|
||||
|
||||
rehdd_info = layouter.ReHddInfo("https://git.mosad.xyz/localhorst/reHDD", driveData['driveReHddVersion'])
|
||||
drive = layouter.DriveData(
|
||||
drive_index=int(driveData['driveIndex']),\
|
||||
drive_state=str(driveData['driveState']),\
|
||||
modelfamiliy=str(driveData['driveModelFamiliy']),\
|
||||
modelname=str(driveData['driveModelName']),\
|
||||
capacity=int(driveData['driveCapacity']),\
|
||||
serialnumber=str(driveData['driveSerialnumber']),\
|
||||
power_on_hours=int(driveData['driveHours']),\
|
||||
power_cycle=int(driveData['driveCycles']),\
|
||||
smart_error_count=int(driveData['driveErrors']),\
|
||||
shred_timestamp=int(driveData['driveShredTimestamp']),\
|
||||
shred_duration=int(driveData['driveShredDuration']))
|
||||
|
||||
while(not os.path.exists(printer_path)):
|
||||
class TMsgQueueData(ctypes.Structure):
|
||||
_fields_ = [
|
||||
("msg_queue_type", ctypes.c_long),
|
||||
("driveData", TDriveData),
|
||||
]
|
||||
|
||||
|
||||
# IPC bindings
|
||||
libc = ctypes.CDLL("libc.so.6")
|
||||
msgget = libc.msgget
|
||||
msgrcv = libc.msgrcv
|
||||
|
||||
msgget.argtypes = [ctypes.c_int, ctypes.c_int]
|
||||
msgget.restype = ctypes.c_int
|
||||
|
||||
msgrcv.argtypes = [
|
||||
ctypes.c_int,
|
||||
ctypes.POINTER(TMsgQueueData),
|
||||
ctypes.c_size_t,
|
||||
ctypes.c_long,
|
||||
ctypes.c_int,
|
||||
]
|
||||
msgrcv.restype = ctypes.c_ssize_t
|
||||
|
||||
|
||||
def signal_handler(signum, frame):
|
||||
global terminate
|
||||
print(f"Signal {signum} received, terminating...")
|
||||
terminate = True
|
||||
|
||||
|
||||
signal.signal(signal.SIGINT, signal_handler)
|
||||
signal.signal(signal.SIGTERM, signal_handler)
|
||||
|
||||
|
||||
def wait_for_printer():
|
||||
while not os.path.exists(printer_path):
|
||||
print("Printer not found, waiting ...")
|
||||
time.sleep(30) #sleep 30
|
||||
time.sleep(30)
|
||||
return True
|
||||
|
||||
layouter.generate_image(drive, rehdd_info, file_name)
|
||||
|
||||
def create_drive_objects(drive_info):
|
||||
"""Convert dictionary to layouter-compatible DriveData and ReHddInfo objects"""
|
||||
drive = layouter.DriveData(
|
||||
drive_index=int(drive_info["driveIndex"]),
|
||||
drive_state=drive_info["driveState"],
|
||||
modelfamily=drive_info["driveModelFamily"],
|
||||
modelname=drive_info["driveModelName"],
|
||||
capacity=int(drive_info["driveCapacity"]),
|
||||
serialnumber=drive_info["driveSerialnumber"],
|
||||
power_on_hours=int(drive_info["driveHours"]),
|
||||
power_cycle=int(drive_info["driveCycles"]),
|
||||
smart_error_count=int(drive_info["driveErrors"]),
|
||||
shred_timestamp=int(drive_info["driveShredTimestamp"]),
|
||||
shred_duration=int(drive_info["driveShredDuration"]),
|
||||
)
|
||||
|
||||
rehdd_info = layouter.ReHddInfo(
|
||||
link="https://git.mosad.xyz/localhorst/reHDD",
|
||||
version=drive_info["driveReHddVersion"],
|
||||
)
|
||||
|
||||
return drive, rehdd_info
|
||||
|
||||
|
||||
def worker(queue_id, test_mode=False):
|
||||
try:
|
||||
while not terminate:
|
||||
if test_mode:
|
||||
drive_info = {
|
||||
"driveIndex": "42",
|
||||
"driveHours": 44,
|
||||
"driveCycles": 45,
|
||||
"driveErrors": 43,
|
||||
"driveShredTimestamp": int(time.time()),
|
||||
"driveShredDuration": 0,
|
||||
"driveCapacity": 42,
|
||||
"driveState": "shredded",
|
||||
"driveModelFamily": "modelFamily",
|
||||
"driveModelName": "modelName",
|
||||
"driveSerialnumber": "serial",
|
||||
"driveReHddVersion": "V1.1.2",
|
||||
}
|
||||
else:
|
||||
msg = TMsgQueueData()
|
||||
result = msgrcv(
|
||||
queue_id,
|
||||
ctypes.byref(msg),
|
||||
ctypes.sizeof(TMsgQueueData) - ctypes.sizeof(ctypes.c_long),
|
||||
0,
|
||||
0,
|
||||
)
|
||||
if result == -1:
|
||||
err = ctypes.get_errno()
|
||||
print(f"Error reading from message queue: {os.strerror(err)}")
|
||||
break
|
||||
|
||||
d = msg.driveData
|
||||
drive_info = {
|
||||
"driveIndex": d.caDriveIndex.decode().strip("\x00"),
|
||||
"driveHours": int(d.caDriveHours.decode().strip("\x00")),
|
||||
"driveCycles": int(d.caDriveCycles.decode().strip("\x00")),
|
||||
"driveErrors": int(d.caDriveErrors.decode().strip("\x00")),
|
||||
"driveShredTimestamp": int(
|
||||
d.caDriveShredTimestamp.decode().strip("\x00")
|
||||
),
|
||||
"driveShredDuration": int(
|
||||
d.caDriveShredDuration.decode().strip("\x00")
|
||||
),
|
||||
"driveCapacity": int(d.caDriveCapacity.decode().strip("\x00")),
|
||||
"driveState": d.caDriveState.decode().strip("\x00"),
|
||||
"driveModelFamily": d.caDriveModelFamily.decode().strip("\x00"),
|
||||
"driveModelName": d.caDriveModelName.decode().strip("\x00"),
|
||||
"driveSerialnumber": d.caDriveSerialnumber.decode().strip("\x00"),
|
||||
"driveReHddVersion": d.caDriveReHddVersion.decode().strip("\x00"),
|
||||
}
|
||||
time.sleep(3)
|
||||
|
||||
print(f"Received Drive Data: {drive_info}")
|
||||
|
||||
drive_obj, rehdd_info = create_drive_objects(drive_info)
|
||||
layouter.generate_image(drive_obj, rehdd_info, file_name)
|
||||
|
||||
if wait_for_printer():
|
||||
qlr = BrotherQLRaster("QL-570")
|
||||
create_label(qlr, file_name, '62')
|
||||
|
||||
with open(printer_path, 'wb') as file:
|
||||
image = Image.open(file_name)
|
||||
create_label(qlr, image, "62")
|
||||
with open(printer_path, "wb") as file:
|
||||
file.write(qlr.data)
|
||||
os.remove(file_name)
|
||||
except sysv_ipc.ExistentialError:
|
||||
print("ERROR: message queue creation failed")
|
||||
else:
|
||||
print("Skipping printing due to printer unavailability.")
|
||||
|
||||
if test_mode:
|
||||
break
|
||||
|
||||
except Exception as e:
|
||||
print(f"Worker encountered an error: {e}")
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument(
|
||||
"--test", action="store_true", help="Run in test mode with fake data"
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.test:
|
||||
print("Running in test mode.")
|
||||
worker(None, test_mode=True)
|
||||
return
|
||||
while True:
|
||||
try:
|
||||
queue_id = msgget(MSG_QUEUE_KEY, 0)
|
||||
if queue_id == -1:
|
||||
raise RuntimeError("Failed to connect to the existing message queue.")
|
||||
worker(queue_id)
|
||||
except Exception as e:
|
||||
print(f"Main process encountered an error: {e}")
|
||||
time.sleep(30)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
Reference in New Issue
Block a user