Files
jFxKasse/tools/merge_db.py
2026-04-21 21:41:17 +02:00

282 lines
9.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
merge_db.py Merge multiple SQLite databases into one output file.
Rules
-----
* Tables `category` and `positionen` are deduplicated by ALL columns
(content-based identity, ignoring the local rowid/id column).
* Table `jobs` rows from later files are appended with a re-sequenced
`jobid` so IDs never clash across files.
* Rows are inserted in file order: the first file wins on deduplication
for reference tables; job rows from later files come after earlier ones.
Usage
-----
python merge_db.py -o merged.db 02.db 03.db [04.db ...]
"""
import argparse
import shutil
import sqlite3
import sys
from pathlib import Path
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def get_table_names(con: sqlite3.Connection) -> list[str]:
cur = con.execute("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
return [row[0] for row in cur.fetchall()]
def get_columns(con: sqlite3.Connection, table: str) -> list[str]:
cur = con.execute(f'PRAGMA table_info("{table}")')
return [row[1] for row in cur.fetchall()]
def get_create_statement(con: sqlite3.Connection, table: str) -> str:
cur = con.execute(
"SELECT sql FROM sqlite_master WHERE type='table' AND name=?", (table,)
)
row = cur.fetchone()
return row[0] if row else None
# ---------------------------------------------------------------------------
# Merge logic per table type
# ---------------------------------------------------------------------------
def merge_dedup_table(
src: sqlite3.Connection,
dst: sqlite3.Connection,
table: str,
id_col: str,
) -> tuple[int, int]:
"""
Merge a reference table using content-based deduplication.
Rows are compared on every column *except* `id_col`. If an identical
row already exists in `dst`, it is skipped; otherwise it is inserted
with a new auto-incremented id.
Returns (rows_read, rows_inserted).
"""
cols = get_columns(src, table)
content_cols = [c for c in cols if c != id_col]
if not content_cols:
return 0, 0
placeholders = ", ".join("?" * len(content_cols))
select_content = ", ".join(f'"{c}"' for c in content_cols)
# Build a set of existing content tuples for fast dedup lookup
existing: set[tuple] = set()
for row in dst.execute(f'SELECT {select_content} FROM "{table}"'):
existing.add(row)
src_rows = src.execute(f'SELECT {select_content} FROM "{table}"').fetchall()
inserted = 0
for row in src_rows:
key = tuple(row)
if key not in existing:
dst.execute(
f'INSERT INTO "{table}" ({select_content}) VALUES ({placeholders})',
row,
)
existing.add(key)
inserted += 1
return len(src_rows), inserted
def merge_jobs_table(
src: sqlite3.Connection,
dst: sqlite3.Connection,
table: str = "jobs",
id_col: str = "jobid",
) -> tuple[int, int]:
"""
Append job rows from `src` into `dst`, re-sequencing `id_col` so that
new IDs start after the current maximum in `dst`.
Duplicate detection is done on all columns except `id_col`; rows with
identical content are skipped.
Returns (rows_read, rows_inserted).
"""
cols = get_columns(src, table)
content_cols = [c for c in cols if c != id_col]
all_col_str = ", ".join(f'"{c}"' for c in cols)
content_str = ", ".join(f'"{c}"' for c in content_cols)
placeholders = ", ".join("?" * len(cols))
# Current max id in destination
row = dst.execute(f'SELECT MAX("{id_col}") FROM "{table}"').fetchone()
offset = (row[0] or 0)
# Existing content fingerprints for dedup
existing: set[tuple] = set()
for row in dst.execute(f'SELECT {content_str} FROM "{table}"'):
existing.add(tuple(row))
src_rows = src.execute(f'SELECT {all_col_str} FROM "{table}"').fetchall()
col_index = {c: i for i, c in enumerate(cols)}
id_idx = col_index[id_col]
inserted = 0
for row in src_rows:
content_key = tuple(row[col_index[c]] for c in content_cols)
if content_key in existing:
continue # skip duplicate content
new_id = row[id_idx] + offset
new_row = list(row)
new_row[id_idx] = new_id
dst.execute(
f'INSERT INTO "{table}" ({all_col_str}) VALUES ({placeholders})',
new_row,
)
existing.add(content_key)
inserted += 1
return len(src_rows), inserted
# ---------------------------------------------------------------------------
# Schema helpers
# ---------------------------------------------------------------------------
def ensure_table(dst: sqlite3.Connection, src: sqlite3.Connection, table: str) -> None:
"""Create `table` in `dst` if it doesn't exist yet, using src's DDL."""
exists = dst.execute(
"SELECT 1 FROM sqlite_master WHERE type='table' AND name=?", (table,)
).fetchone()
if not exists:
ddl = get_create_statement(src, table)
if ddl:
dst.execute(ddl)
# ---------------------------------------------------------------------------
# Main merge routine
# ---------------------------------------------------------------------------
# Map table name → (id_column, merge_strategy)
# strategy: "dedup" → content-based dedup, new ids assigned
# "jobs" → append with id re-sequencing + content dedup
TABLE_CONFIG: dict[str, tuple[str, str]] = {
"category": ("catid", "dedup"),
"positionen": ("posid", "dedup"),
"jobs": ("jobid", "jobs"),
}
def merge_databases(input_files: list[Path], output_file: Path) -> None:
# Bootstrap: copy first file as the base for the output
print(f"[init] Bootstrapping output from '{input_files[0].name}'")
shutil.copy2(input_files[0], output_file)
dst = sqlite3.connect(output_file)
dst.execute("PRAGMA journal_mode=WAL")
dst.execute("PRAGMA foreign_keys=OFF")
try:
for src_path in input_files[1:]:
print(f"\n[merge] Processing '{src_path.name}'")
src = sqlite3.connect(src_path)
src_tables = get_table_names(src)
for table in src_tables:
ensure_table(dst, src, table)
config = TABLE_CONFIG.get(table)
if config is None:
# Unknown table: fall back to content-based dedup without
# special id handling treat first column as the id.
cols = get_columns(src, table)
id_col = cols[0] if cols else None
if id_col:
read, ins = merge_dedup_table(src, dst, table, id_col)
print(f" [{table}] (fallback dedup on '{id_col}') "
f"read={read} inserted={ins}")
else:
id_col, strategy = config
if strategy == "dedup":
read, ins = merge_dedup_table(src, dst, table, id_col)
else:
read, ins = merge_jobs_table(src, dst, table, id_col)
print(f" [{table}] strategy={strategy} "
f"read={read} inserted={ins}")
dst.commit()
src.close()
finally:
dst.close()
print(f"\n[done] Output written to '{output_file}'")
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="merge_db",
description=(
"Merge multiple SQLite databases into one output file.\n"
"Files are merged in the order they are provided.\n"
"Duplicate rows are detected by content and inserted only once."
),
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=(
"Examples:\n"
" python merge_db.py -o merged.db 02.db 03.db\n"
" python merge_db.py -o merged.db 02.db 03.db 04.db 05.db\n"
),
)
parser.add_argument(
"inputs",
metavar="INPUT_DB",
nargs="+",
help="Input SQLite database files (in desired merge order)",
)
parser.add_argument(
"-o", "--output",
metavar="OUTPUT_DB",
required=True,
help="Path for the merged output database (will be overwritten if it exists)",
)
return parser
def main() -> None:
parser = build_parser()
args = parser.parse_args()
input_paths = [Path(p) for p in args.inputs]
output_path = Path(args.output)
# Validate inputs
for p in input_paths:
if not p.exists():
parser.error(f"Input file not found: {p}")
if not p.is_file():
parser.error(f"Not a file: {p}")
if len(input_paths) < 2:
parser.error("At least two input files are required.")
if output_path.exists():
print(f"[warn] Output file '{output_path}' already exists and will be overwritten.")
merge_databases(input_paths, output_path)
if __name__ == "__main__":
main()