msv-webcam-frontend/msv_webcam_frontend.py
2023-07-05 22:42:55 +02:00

82 lines
2.8 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
""" Author: Hendrik Schutter, mail@hendrikschutter.com
"""
import os
import errno
import time
import shutil
import sys
import subprocess
signal_pipe_path = "/tmp/msv-webcam-signal-pipe"
signal_pipe_msg = "msv-webcam-new-images"
def copy_image(src, destination_dir_path):
#print("src: " + str(src))
#print("dst: " + str(destination_dir_path))
shutil.copyfile(src, destination_dir_path)
def convert_to_avif(input, output):
#print("input: " + str(input))
#print("output: " + str(output))
temp = subprocess.Popen(['avifenc', '--jobs', 'all', input, output], stdout = subprocess.PIPE)
# get the output as a string
#output = str(temp.communicate())
#print(output)
temp = subprocess.Popen(['sync',], stdout = subprocess.PIPE)
def process_new_images(source_dir, destination_dir_path):
images = os.listdir(source_dir)
for image in images:
if ((os.path.splitext(os.path.basename(os.path.normpath(image)))[1] != ".avif")):
converted_file_name = (os.path.splitext(os.path.basename(os.path.normpath(image)))[0] + ".avif")
converted_file_path = os.path.join("/tmp/", converted_file_name)
convert_to_avif(os.path.join(source_dir, image), converted_file_path)
copy_image(converted_file_path, os.path.join(destination_dir_path, converted_file_name))
def main():
print("starting ...")
source_dir = -1
destination_dir_path = -1
for argument in sys.argv:
if argument.startswith('destination_dir'):
destination_dir_path = argument.split("=")[1]
if argument.startswith('source_dir'):
source_dir = argument.split("=")[1]
if ((source_dir == -1) or (destination_dir_path == -1)):
print("Unable to parse config!")
print("Example usage:")
print(" python msv_webcam_frontend.py source_dir=/tmp/msv_webcam_current destination_dir=www/ ")
sys.exit(-1)
# process_new_images(source_dir, destination_dir_path)
# sys.exit(-1)
while True:
try:
if not os.path.exists(signal_pipe_path):
os.mkfifo(signal_pipe_path)
pipe_fd = os.open(signal_pipe_path, os.O_RDONLY | os.O_NONBLOCK)
with os.fdopen(pipe_fd) as pipe:
while True:
#print("reading pipe")
message = pipe.read()
if message:
#print("Received: " + str(message))
if(message == signal_pipe_msg):
print("received signal about new images")
process_new_images(source_dir, destination_dir_path)
time.sleep(0.1)
except Exception as e:
print(e)
if __name__ == "__main__":
main()