diff --git a/README.md b/README.md index b25b795..00cbcdf 100644 --- a/README.md +++ b/README.md @@ -55,6 +55,19 @@ Or use the preconfigured VSCode tasks (`Ctrl+Shift+B`) and launch configurations Tested with: [Epson TM T20II](https://www.epson.de/products/sd/pos-printer/epson-tm-t20ii). Other receipt printers should work as well. +## Tools +### merge_db + +Merges multiple SQLite databases into one file with automatic deduplication. +Reference tables (`category`, `positionen`) are deduplicated by content; `jobs` rows are appended in order with re-sequenced IDs to avoid conflicts. + +```bash +python tools/merge_db.py -o OUTPUT_DB INPUT_DB [INPUT_DB ...] + +# Example +python tools/merge_db.py -o merged.db 02.db 03.db 04.db +``` + ## License GPL-3.0 \ No newline at end of file diff --git a/tools/merge_db.py b/tools/merge_db.py new file mode 100644 index 0000000..02e2959 --- /dev/null +++ b/tools/merge_db.py @@ -0,0 +1,281 @@ +#!/usr/bin/env python3 +""" +merge_db.py – Merge multiple SQLite databases into one output file. + +Rules +----- +* Tables `category` and `positionen` are deduplicated by ALL columns + (content-based identity, ignoring the local rowid/id column). +* Table `jobs` rows from later files are appended with a re-sequenced + `jobid` so IDs never clash across files. +* Rows are inserted in file order: the first file wins on deduplication + for reference tables; job rows from later files come after earlier ones. + +Usage +----- + python merge_db.py -o merged.db 02.db 03.db [04.db ...] +""" + +import argparse +import shutil +import sqlite3 +import sys +from pathlib import Path + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def get_table_names(con: sqlite3.Connection) -> list[str]: + cur = con.execute("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name") + return [row[0] for row in cur.fetchall()] + + +def get_columns(con: sqlite3.Connection, table: str) -> list[str]: + cur = con.execute(f'PRAGMA table_info("{table}")') + return [row[1] for row in cur.fetchall()] + + +def get_create_statement(con: sqlite3.Connection, table: str) -> str: + cur = con.execute( + "SELECT sql FROM sqlite_master WHERE type='table' AND name=?", (table,) + ) + row = cur.fetchone() + return row[0] if row else None + + +# --------------------------------------------------------------------------- +# Merge logic per table type +# --------------------------------------------------------------------------- + +def merge_dedup_table( + src: sqlite3.Connection, + dst: sqlite3.Connection, + table: str, + id_col: str, +) -> tuple[int, int]: + """ + Merge a reference table using content-based deduplication. + + Rows are compared on every column *except* `id_col`. If an identical + row already exists in `dst`, it is skipped; otherwise it is inserted + with a new auto-incremented id. + + Returns (rows_read, rows_inserted). + """ + cols = get_columns(src, table) + content_cols = [c for c in cols if c != id_col] + + if not content_cols: + return 0, 0 + + placeholders = ", ".join("?" * len(content_cols)) + select_content = ", ".join(f'"{c}"' for c in content_cols) + + # Build a set of existing content tuples for fast dedup lookup + existing: set[tuple] = set() + for row in dst.execute(f'SELECT {select_content} FROM "{table}"'): + existing.add(row) + + src_rows = src.execute(f'SELECT {select_content} FROM "{table}"').fetchall() + inserted = 0 + for row in src_rows: + key = tuple(row) + if key not in existing: + dst.execute( + f'INSERT INTO "{table}" ({select_content}) VALUES ({placeholders})', + row, + ) + existing.add(key) + inserted += 1 + + return len(src_rows), inserted + + +def merge_jobs_table( + src: sqlite3.Connection, + dst: sqlite3.Connection, + table: str = "jobs", + id_col: str = "jobid", +) -> tuple[int, int]: + """ + Append job rows from `src` into `dst`, re-sequencing `id_col` so that + new IDs start after the current maximum in `dst`. + + Duplicate detection is done on all columns except `id_col`; rows with + identical content are skipped. + + Returns (rows_read, rows_inserted). + """ + cols = get_columns(src, table) + content_cols = [c for c in cols if c != id_col] + all_col_str = ", ".join(f'"{c}"' for c in cols) + content_str = ", ".join(f'"{c}"' for c in content_cols) + placeholders = ", ".join("?" * len(cols)) + + # Current max id in destination + row = dst.execute(f'SELECT MAX("{id_col}") FROM "{table}"').fetchone() + offset = (row[0] or 0) + + # Existing content fingerprints for dedup + existing: set[tuple] = set() + for row in dst.execute(f'SELECT {content_str} FROM "{table}"'): + existing.add(tuple(row)) + + src_rows = src.execute(f'SELECT {all_col_str} FROM "{table}"').fetchall() + col_index = {c: i for i, c in enumerate(cols)} + id_idx = col_index[id_col] + + inserted = 0 + for row in src_rows: + content_key = tuple(row[col_index[c]] for c in content_cols) + if content_key in existing: + continue # skip duplicate content + new_id = row[id_idx] + offset + new_row = list(row) + new_row[id_idx] = new_id + dst.execute( + f'INSERT INTO "{table}" ({all_col_str}) VALUES ({placeholders})', + new_row, + ) + existing.add(content_key) + inserted += 1 + + return len(src_rows), inserted + + +# --------------------------------------------------------------------------- +# Schema helpers +# --------------------------------------------------------------------------- + +def ensure_table(dst: sqlite3.Connection, src: sqlite3.Connection, table: str) -> None: + """Create `table` in `dst` if it doesn't exist yet, using src's DDL.""" + exists = dst.execute( + "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?", (table,) + ).fetchone() + if not exists: + ddl = get_create_statement(src, table) + if ddl: + dst.execute(ddl) + + +# --------------------------------------------------------------------------- +# Main merge routine +# --------------------------------------------------------------------------- + +# Map table name → (id_column, merge_strategy) +# strategy: "dedup" → content-based dedup, new ids assigned +# "jobs" → append with id re-sequencing + content dedup +TABLE_CONFIG: dict[str, tuple[str, str]] = { + "category": ("catid", "dedup"), + "positionen": ("posid", "dedup"), + "jobs": ("jobid", "jobs"), +} + + +def merge_databases(input_files: list[Path], output_file: Path) -> None: + # Bootstrap: copy first file as the base for the output + print(f"[init] Bootstrapping output from '{input_files[0].name}' …") + shutil.copy2(input_files[0], output_file) + + dst = sqlite3.connect(output_file) + dst.execute("PRAGMA journal_mode=WAL") + dst.execute("PRAGMA foreign_keys=OFF") + + try: + for src_path in input_files[1:]: + print(f"\n[merge] Processing '{src_path.name}' …") + src = sqlite3.connect(src_path) + + src_tables = get_table_names(src) + for table in src_tables: + ensure_table(dst, src, table) + config = TABLE_CONFIG.get(table) + + if config is None: + # Unknown table: fall back to content-based dedup without + # special id handling – treat first column as the id. + cols = get_columns(src, table) + id_col = cols[0] if cols else None + if id_col: + read, ins = merge_dedup_table(src, dst, table, id_col) + print(f" [{table}] (fallback dedup on '{id_col}') " + f"read={read} inserted={ins}") + else: + id_col, strategy = config + if strategy == "dedup": + read, ins = merge_dedup_table(src, dst, table, id_col) + else: + read, ins = merge_jobs_table(src, dst, table, id_col) + print(f" [{table}] strategy={strategy} " + f"read={read} inserted={ins}") + + dst.commit() + src.close() + + finally: + dst.close() + + print(f"\n[done] Output written to '{output_file}'") + + +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + prog="merge_db", + description=( + "Merge multiple SQLite databases into one output file.\n" + "Files are merged in the order they are provided.\n" + "Duplicate rows are detected by content and inserted only once." + ), + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=( + "Examples:\n" + " python merge_db.py -o merged.db 02.db 03.db\n" + " python merge_db.py -o merged.db 02.db 03.db 04.db 05.db\n" + ), + ) + parser.add_argument( + "inputs", + metavar="INPUT_DB", + nargs="+", + help="Input SQLite database files (in desired merge order)", + ) + parser.add_argument( + "-o", "--output", + metavar="OUTPUT_DB", + required=True, + help="Path for the merged output database (will be overwritten if it exists)", + ) + return parser + + +def main() -> None: + parser = build_parser() + args = parser.parse_args() + + input_paths = [Path(p) for p in args.inputs] + output_path = Path(args.output) + + # Validate inputs + for p in input_paths: + if not p.exists(): + parser.error(f"Input file not found: {p}") + if not p.is_file(): + parser.error(f"Not a file: {p}") + + if len(input_paths) < 2: + parser.error("At least two input files are required.") + + if output_path.exists(): + print(f"[warn] Output file '{output_path}' already exists and will be overwritten.") + + merge_databases(input_paths, output_path) + + +if __name__ == "__main__": + main()