#!/usr/bin/env python3 """ merge_db.py – Merge multiple SQLite databases into one output file. Rules ----- * Tables `category` and `positionen` are deduplicated by ALL columns (content-based identity, ignoring the local rowid/id column). * Table `jobs` rows from later files are appended with a re-sequenced `jobid` so IDs never clash across files. * Rows are inserted in file order: the first file wins on deduplication for reference tables; job rows from later files come after earlier ones. Usage ----- python merge_db.py -o merged.db 02.db 03.db [04.db ...] """ import argparse import shutil import sqlite3 import sys from pathlib import Path # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def get_table_names(con: sqlite3.Connection) -> list[str]: cur = con.execute("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name") return [row[0] for row in cur.fetchall()] def get_columns(con: sqlite3.Connection, table: str) -> list[str]: cur = con.execute(f'PRAGMA table_info("{table}")') return [row[1] for row in cur.fetchall()] def get_create_statement(con: sqlite3.Connection, table: str) -> str: cur = con.execute( "SELECT sql FROM sqlite_master WHERE type='table' AND name=?", (table,) ) row = cur.fetchone() return row[0] if row else None # --------------------------------------------------------------------------- # Merge logic per table type # --------------------------------------------------------------------------- def merge_dedup_table( src: sqlite3.Connection, dst: sqlite3.Connection, table: str, id_col: str, ) -> tuple[int, int]: """ Merge a reference table using content-based deduplication. Rows are compared on every column *except* `id_col`. If an identical row already exists in `dst`, it is skipped; otherwise it is inserted with a new auto-incremented id. Returns (rows_read, rows_inserted). """ cols = get_columns(src, table) content_cols = [c for c in cols if c != id_col] if not content_cols: return 0, 0 placeholders = ", ".join("?" * len(content_cols)) select_content = ", ".join(f'"{c}"' for c in content_cols) # Build a set of existing content tuples for fast dedup lookup existing: set[tuple] = set() for row in dst.execute(f'SELECT {select_content} FROM "{table}"'): existing.add(row) src_rows = src.execute(f'SELECT {select_content} FROM "{table}"').fetchall() inserted = 0 for row in src_rows: key = tuple(row) if key not in existing: dst.execute( f'INSERT INTO "{table}" ({select_content}) VALUES ({placeholders})', row, ) existing.add(key) inserted += 1 return len(src_rows), inserted def merge_jobs_table( src: sqlite3.Connection, dst: sqlite3.Connection, table: str = "jobs", id_col: str = "jobid", ) -> tuple[int, int]: """ Append job rows from `src` into `dst`, re-sequencing `id_col` so that new IDs start after the current maximum in `dst`. Duplicate detection is done on all columns except `id_col`; rows with identical content are skipped. Returns (rows_read, rows_inserted). """ cols = get_columns(src, table) content_cols = [c for c in cols if c != id_col] all_col_str = ", ".join(f'"{c}"' for c in cols) content_str = ", ".join(f'"{c}"' for c in content_cols) placeholders = ", ".join("?" * len(cols)) # Current max id in destination row = dst.execute(f'SELECT MAX("{id_col}") FROM "{table}"').fetchone() offset = (row[0] or 0) # Existing content fingerprints for dedup existing: set[tuple] = set() for row in dst.execute(f'SELECT {content_str} FROM "{table}"'): existing.add(tuple(row)) src_rows = src.execute(f'SELECT {all_col_str} FROM "{table}"').fetchall() col_index = {c: i for i, c in enumerate(cols)} id_idx = col_index[id_col] inserted = 0 for row in src_rows: content_key = tuple(row[col_index[c]] for c in content_cols) if content_key in existing: continue # skip duplicate content new_id = row[id_idx] + offset new_row = list(row) new_row[id_idx] = new_id dst.execute( f'INSERT INTO "{table}" ({all_col_str}) VALUES ({placeholders})', new_row, ) existing.add(content_key) inserted += 1 return len(src_rows), inserted # --------------------------------------------------------------------------- # Schema helpers # --------------------------------------------------------------------------- def ensure_table(dst: sqlite3.Connection, src: sqlite3.Connection, table: str) -> None: """Create `table` in `dst` if it doesn't exist yet, using src's DDL.""" exists = dst.execute( "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?", (table,) ).fetchone() if not exists: ddl = get_create_statement(src, table) if ddl: dst.execute(ddl) # --------------------------------------------------------------------------- # Main merge routine # --------------------------------------------------------------------------- # Map table name → (id_column, merge_strategy) # strategy: "dedup" → content-based dedup, new ids assigned # "jobs" → append with id re-sequencing + content dedup TABLE_CONFIG: dict[str, tuple[str, str]] = { "category": ("catid", "dedup"), "positionen": ("posid", "dedup"), "jobs": ("jobid", "jobs"), } def merge_databases(input_files: list[Path], output_file: Path) -> None: # Bootstrap: copy first file as the base for the output print(f"[init] Bootstrapping output from '{input_files[0].name}' …") shutil.copy2(input_files[0], output_file) dst = sqlite3.connect(output_file) dst.execute("PRAGMA journal_mode=WAL") dst.execute("PRAGMA foreign_keys=OFF") try: for src_path in input_files[1:]: print(f"\n[merge] Processing '{src_path.name}' …") src = sqlite3.connect(src_path) src_tables = get_table_names(src) for table in src_tables: ensure_table(dst, src, table) config = TABLE_CONFIG.get(table) if config is None: # Unknown table: fall back to content-based dedup without # special id handling – treat first column as the id. cols = get_columns(src, table) id_col = cols[0] if cols else None if id_col: read, ins = merge_dedup_table(src, dst, table, id_col) print(f" [{table}] (fallback dedup on '{id_col}') " f"read={read} inserted={ins}") else: id_col, strategy = config if strategy == "dedup": read, ins = merge_dedup_table(src, dst, table, id_col) else: read, ins = merge_jobs_table(src, dst, table, id_col) print(f" [{table}] strategy={strategy} " f"read={read} inserted={ins}") dst.commit() src.close() finally: dst.close() print(f"\n[done] Output written to '{output_file}'") # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( prog="merge_db", description=( "Merge multiple SQLite databases into one output file.\n" "Files are merged in the order they are provided.\n" "Duplicate rows are detected by content and inserted only once." ), formatter_class=argparse.RawDescriptionHelpFormatter, epilog=( "Examples:\n" " python merge_db.py -o merged.db 02.db 03.db\n" " python merge_db.py -o merged.db 02.db 03.db 04.db 05.db\n" ), ) parser.add_argument( "inputs", metavar="INPUT_DB", nargs="+", help="Input SQLite database files (in desired merge order)", ) parser.add_argument( "-o", "--output", metavar="OUTPUT_DB", required=True, help="Path for the merged output database (will be overwritten if it exists)", ) return parser def main() -> None: parser = build_parser() args = parser.parse_args() input_paths = [Path(p) for p in args.inputs] output_path = Path(args.output) # Validate inputs for p in input_paths: if not p.exists(): parser.error(f"Input file not found: {p}") if not p.is_file(): parser.error(f"Not a file: {p}") if len(input_paths) < 2: parser.error("At least two input files are required.") if output_path.exists(): print(f"[warn] Output file '{output_path}' already exists and will be overwritten.") merge_databases(input_paths, output_path) if __name__ == "__main__": main()