Files
media_management_scripts/convert/convert.py

187 lines
5.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import json
import os
import subprocess
import time
import requests
import sys
from pathlib import Path
PROM_URL = "http://127.0.0.1:9104/metrics"
MIN_SOC = 0.5 # 50%
MIN_SOLAR_POWER = 500 # 500W
def load_config(path):
"""Load JSON configuration file."""
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
import requests
import time
def wait_for_solar_power(prom_url, check_interval=300, max_retries=None):
"""
Wait until:
- fronius_site_power_photovoltaic > 500
- fronius_inverter_soc{inverter="1"} > 0.5
Args:
prom_url (str): Prometheus metrics endpoint.
check_interval (int): Seconds between checks (default: 300).
max_retries (int or None): Optional limit on checks. If None, loop indefinitely.
Returns:
bool: True if conditions met, False if retries exhausted.
"""
attempts = 0
while True:
try:
r = requests.get(prom_url, timeout=5)
r.raise_for_status()
pv_power = None
inverter_soc = None
for line in r.text.splitlines():
if line.startswith("#"): # Skip comments and HELP/TYPE lines
continue
parts = line.split()
if len(parts) != 2:
continue
metric, value_str = parts
try:
value = float(value_str)
except ValueError:
continue
if metric == "fronius_site_power_photovoltaic":
pv_power = value
elif (
metric.startswith("fronius_inverter_soc")
and '{inverter="1"}' in metric
):
inverter_soc = value
if pv_power is not None and inverter_soc is not None:
if pv_power > MIN_SOLAR_POWER and inverter_soc > MIN_SOC:
print(
f"[INFO] Solar power available: PV={pv_power:.2f} W, SOC={inverter_soc:.2f} starting conversion."
)
return True
print(
f"[INFO] Conditions not met (PV={pv_power}, SOC={inverter_soc}). Waiting {check_interval} seconds..."
)
except requests.RequestException as e:
print(f"[WARN] Could not reach Prometheus: {e}")
attempts += 1
if max_retries is not None and attempts >= max_retries:
print("[ERROR] Max retries reached. Exiting without solar power.")
return False
time.sleep(check_interval)
def analyze_codecs(oldfile, newfile, dst_folder):
"""
Run codecVis to compare old and new files.
Then rename output.png to newfile.png.
"""
cmd = ["codecVis", oldfile, newfile]
print(f"[CMD] {' '.join(cmd)}")
subprocess.run(cmd, cwd=dst_folder, check=True)
# Rename output.png to match the new file
output_png = Path(dst_folder) / "output.png"
new_png = Path(dst_folder) / (Path(newfile).name + ".png")
if output_png.exists():
output_png.rename(new_png)
print(f"[INFO] Analysis image saved as {new_png}")
else:
print("[WARN] output.png not found!")
def main():
if len(sys.argv) != 2:
print("Give path to config file as argument.")
sys.exit(1)
cfg = load_config(sys.argv[1])
dst_folder = Path(cfg["dst_folder"])
src_folder = Path(cfg["src_folder"])
dst_folder.mkdir(parents=True, exist_ok=True)
for job in cfg["jobs"]:
src_file = src_folder / job
tmp_movie_name = "tmp_" + Path(job).stem + ".mkv"
movie_name = Path(job).stem + ".mkv"
tmp_dst_file = dst_folder / tmp_movie_name
dst_file = dst_folder / movie_name
print(f"Source: {src_file}")
print(f"Temp name: {tmp_movie_name}")
print(f"Final name: {movie_name}")
# Remove leftover temporary file
if tmp_dst_file.exists():
print(f"[INFO] File {tmp_dst_file} already exists. --> Delete!")
tmp_dst_file.unlink()
# Skip if final file already exists
if dst_file.exists():
print(f"[INFO] Skip {dst_file}, already exists. --> Convert already done!")
continue
wait_for_solar_power(PROM_URL)
try:
cmd = [
"taskset",
"-c",
"0,1,2,3", # limit to first 4 CPU cores
"ffmpeg",
"-i",
str(src_file),
"-c:v",
"libaom-av1",
"-c:a",
"libopus",
"-mapping_family",
"1",
"-af",
"aformat=channel_layouts=5.1",
"-c:s",
"copy",
"-map",
"0",
"-crf",
"24",
"-b:v",
"0",
"-b:a",
"128k",
"-cpu-used",
"4",
"-row-mt",
"1",
"-tiles",
"2x2",
str(tmp_dst_file),
]
print(f"[CMD] {' '.join(cmd)}")
subprocess.run(cmd, check=True)
# Rename temp file to final name
tmp_dst_file.rename(dst_file)
# Run codec analysis
analyze_codecs(str(src_file), str(dst_file), dst_folder)
except subprocess.CalledProcessError as e:
print(f"[ERROR] Processing failed for {job}: {e}")
if __name__ == "__main__":
main()