#!/usr/bin/env python3 """ Paper Dynasty SQLite to PostgreSQL Migration Script CRITICAL: This script preserves primary key IDs exactly as they exist in SQLite. Failing to preserve IDs will cause all foreign key references to break. Usage: # Dry run (validate only, no changes) python scripts/migrate_to_postgres.py --dry-run # Full migration python scripts/migrate_to_postgres.py # Migrate specific table only python scripts/migrate_to_postgres.py --table player Environment Variables Required: POSTGRES_HOST, POSTGRES_DB, POSTGRES_USER, POSTGRES_PASSWORD, POSTGRES_PORT Based on lessons learned from Major Domo PostgreSQL migration (August 2025). """ import argparse import logging import os import sqlite3 import sys from datetime import datetime from typing import Any, Dict, List, Optional, Tuple import psycopg2 from psycopg2.extras import execute_values # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler( f"logs/migration_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" ), ], ) logger = logging.getLogger(__name__) # Migration order - tables with no FK dependencies first, then dependent tables # This ensures parent records exist before children are inserted MIGRATION_ORDER = [ # Tier 1: No foreign key dependencies "current", "rarity", "event", "packtype", "notification", # Tier 2: Simple FK dependencies (single level) "cardset", # -> event "mlbplayer", # no FKs # Tier 3: Core entity tables "team", # -> event "player", # -> cardset, rarity, mlbplayer # Tier 3b: Tables dependent on player "gamerewards", # -> packtype, player # Tier 4: Dependent on core entities "pack", # -> team, packtype, cardset "card", # -> player, team, pack "roster", # -> team, card (x26) "result", # -> team (x2) "stratgame", # -> team (x2) # Tier 5: Statistics and game data # NOTE: battingstat and pitchingstat are LEGACY tables - excluded from migration # "battingstat", # -> card, team, result (LEGACY - not used) # "pitchingstat", # -> card, team, result (LEGACY - not used) "stratplay", # -> stratgame, player (many), team (many) "decision", # -> stratgame, player, team # Tier 6: Card detail tables "battingcard", # -> player "battingcardratings", # -> battingcard "pitchingcard", # -> player "pitchingcardratings", # -> pitchingcard "cardposition", # -> player # Tier 7: Other dependent tables "award", # -> card, team "paperdex", # -> team, player "reward", # -> team "gauntletreward", # -> event, gamerewards "gauntletrun", # -> team, event ] # Legacy tables to skip during migration (no longer used by the application) SKIP_TABLES = ["battingstat", "pitchingstat"] # Tables with explicit primary keys (not auto-increment) EXPLICIT_PK_TABLES = { "player": "player_id", # Uses player_id as explicit PK } # All other tables use 'id' as auto-increment PK # Columns that need type conversion from SQLite to PostgreSQL # Boolean columns: SQLite stores as 0/1, PostgreSQL needs True/False BOOLEAN_COLUMNS = { "event": ["active"], "cardset": ["for_purchase", "in_packs", "ranked_legal"], "team": ["has_guide"], "packtype": ["available"], "result": ["ranked", "short_game"], "stratgame": ["ranked", "short_game", "forfeit"], "stratplay": ["is_go_ahead", "is_tied", "is_new_inning"], "decision": ["is_start"], "battingcard": ["steal_auto"], "notification": ["ack"], } # DateTime/Timestamp columns: SQLite may store as Unix ms, PostgreSQL needs datetime # Format: table -> {column: "ms" | "sec" | "iso"} to indicate conversion type DATETIME_COLUMNS = { "pack": {"open_time": "ms"}, # Unix timestamp in milliseconds "battingstat": {"created": "ms"}, "pitchingstat": {"created": "ms"}, "result": {"created": "ms"}, "stratgame": {"created": "ms", "ended": "ms"}, "notification": {"created": "ms"}, "gauntletrun": {"created": "ms", "ended": "ms"}, "paperdex": {"created": "ms"}, "reward": {"created": "ms"}, "award": {"created": "ms"}, } # Columns that are PostgreSQL reserved words and need quoting RESERVED_WORD_COLUMNS = { "notification": ["desc"], # 'desc' is reserved in PostgreSQL } def convert_value(table_name: str, column_name: str, value: Any) -> Any: """ Convert a SQLite value to PostgreSQL-compatible format. Handles: - Boolean: 0/1 -> True/False - DateTime: Unix timestamps (ms) -> datetime objects - NULL values: Pass through unchanged """ # NULL values pass through unchanged if value is None: return None # Boolean conversion if table_name in BOOLEAN_COLUMNS and column_name in BOOLEAN_COLUMNS[table_name]: return bool(value) # DateTime conversion if table_name in DATETIME_COLUMNS and column_name in DATETIME_COLUMNS[table_name]: fmt = DATETIME_COLUMNS[table_name][column_name] if value == 0: return None # 0 typically means "not set" for timestamps try: if fmt == "ms": # Unix timestamp in milliseconds return datetime.fromtimestamp(value / 1000) elif fmt == "sec": # Unix timestamp in seconds return datetime.fromtimestamp(value) elif fmt == "iso": # ISO format string return datetime.fromisoformat(value) except (ValueError, TypeError, OSError) as e: logger.warning(f"Could not convert datetime {column_name}={value}: {e}") return None return value def quote_column(table_name: str, column_name: str) -> str: """Quote column name if it's a PostgreSQL reserved word.""" if ( table_name in RESERVED_WORD_COLUMNS and column_name in RESERVED_WORD_COLUMNS[table_name] ): return f'"{column_name}"' return column_name def get_sqlite_connection(db_path: str) -> sqlite3.Connection: """Connect to SQLite database.""" if not os.path.exists(db_path): raise FileNotFoundError(f"SQLite database not found: {db_path}") conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row return conn def get_postgres_connection() -> psycopg2.extensions.connection: """Connect to PostgreSQL database using environment variables.""" required_vars = [ "POSTGRES_HOST", "POSTGRES_DB", "POSTGRES_USER", "POSTGRES_PASSWORD", ] missing = [v for v in required_vars if not os.environ.get(v)] if missing: raise EnvironmentError(f"Missing required environment variables: {missing}") return psycopg2.connect( host=os.environ["POSTGRES_HOST"], database=os.environ["POSTGRES_DB"], user=os.environ["POSTGRES_USER"], password=os.environ["POSTGRES_PASSWORD"], port=int(os.environ.get("POSTGRES_PORT", "5432")), ) def get_table_columns(sqlite_conn: sqlite3.Connection, table_name: str) -> List[str]: """Get column names for a table from SQLite.""" cursor = sqlite_conn.execute(f"PRAGMA table_info({table_name})") return [row["name"] for row in cursor.fetchall()] def get_primary_key_column(table_name: str) -> str: """Get the primary key column name for a table.""" return EXPLICIT_PK_TABLES.get(table_name, "id") def get_sequence_name(table_name: str, pk_column: str) -> str: """Get the PostgreSQL sequence name for a table's primary key.""" return f"{table_name}_{pk_column}_seq" def get_record_count(conn, table_name: str, is_sqlite: bool = True) -> int: """Get record count for a table.""" if is_sqlite: cursor = conn.execute(f"SELECT COUNT(*) FROM {table_name}") return cursor.fetchone()[0] else: cursor = conn.cursor() cursor.execute(f"SELECT COUNT(*) FROM {table_name}") return cursor.fetchone()[0] def migrate_table( sqlite_conn: sqlite3.Connection, pg_conn: psycopg2.extensions.connection, table_name: str, batch_size: int = 500, dry_run: bool = False, ) -> Dict[str, Any]: """ Migrate a single table from SQLite to PostgreSQL. CRITICAL: Preserves primary key IDs exactly. Returns: Dict with migration statistics """ stats = { "table": table_name, "sqlite_count": 0, "postgres_count": 0, "inserted": 0, "skipped": 0, "errors": [], "success": False, } try: # Get column info columns = get_table_columns(sqlite_conn, table_name) pk_column = get_primary_key_column(table_name) # Count source records stats["sqlite_count"] = get_record_count( sqlite_conn, table_name, is_sqlite=True ) logger.info(f"Table {table_name}: {stats['sqlite_count']} records to migrate") if stats["sqlite_count"] == 0: logger.info(f"Table {table_name}: No records to migrate") stats["success"] = True return stats if dry_run: logger.info( f"[DRY RUN] Would migrate {stats['sqlite_count']} records from {table_name}" ) stats["success"] = True return stats # Read all records from SQLite cursor = sqlite_conn.execute(f"SELECT * FROM {table_name}") rows = cursor.fetchall() # Prepare PostgreSQL insert pg_cursor = pg_conn.cursor() # Build column list string with proper quoting for reserved words quoted_columns = [quote_column(table_name, col) for col in columns] columns_str = ", ".join(quoted_columns) placeholders = ", ".join(["%s"] * len(columns)) # Process in batches for i in range(0, len(rows), batch_size): batch = rows[i : i + batch_size] batch_values = [] for row in batch: # Convert sqlite3.Row to tuple with type conversions values = tuple( convert_value(table_name, col, row[col]) for col in columns ) batch_values.append(values) try: # Use execute_values for efficient batch insert insert_sql = f"INSERT INTO {table_name} ({columns_str}) VALUES %s" execute_values(pg_cursor, insert_sql, batch_values) stats["inserted"] += len(batch) except ( psycopg2.errors.ForeignKeyViolation, psycopg2.errors.UniqueViolation, ) as e: # FK or unique constraint error - fall back to individual inserts error_type = ( "FK violation" if "foreign key" in str(e).lower() else "Unique violation" ) logger.warning( f"{error_type} in batch, falling back to individual inserts: {e}" ) pg_conn.rollback() for values in batch_values: try: pg_cursor.execute( f"INSERT INTO {table_name} ({columns_str}) VALUES ({placeholders})", values, ) stats["inserted"] += 1 pg_conn.commit() # Commit each successful insert except psycopg2.errors.ForeignKeyViolation as e: stats["skipped"] += 1 # Extract ID for logging pk_idx = columns.index(pk_column) if pk_column in columns else 0 record_id = values[pk_idx] stats["errors"].append( { "id": record_id, "error": "ForeignKeyViolation", "message": str(e), } ) logger.warning( f"Skipped orphaned record {table_name}.{pk_column}={record_id}" ) pg_conn.rollback() except psycopg2.errors.UniqueViolation as e: stats["skipped"] += 1 pk_idx = columns.index(pk_column) if pk_column in columns else 0 record_id = values[pk_idx] logger.warning( f"Skipped duplicate record {table_name}.{pk_column}={record_id}" ) pg_conn.rollback() except Exception as e: stats["skipped"] += 1 pk_idx = columns.index(pk_column) if pk_column in columns else 0 record_id = values[pk_idx] stats["errors"].append( { "id": record_id, "error": type(e).__name__, "message": str(e), } ) logger.error( f"Error inserting {table_name}.{pk_column}={record_id}: {e}" ) pg_conn.rollback() logger.info( f"Table {table_name}: Processed {min(i + batch_size, len(rows))}/{len(rows)} records" ) # Commit the transaction pg_conn.commit() # CRITICAL: Reset the PostgreSQL sequence to MAX(id) + 1 # Without this, new inserts will fail with duplicate key errors sequence_name = get_sequence_name(table_name, pk_column) try: pg_cursor.execute(f""" SELECT setval('{sequence_name}', COALESCE((SELECT MAX({pk_column}) FROM {table_name}), 1), true) """) pg_conn.commit() logger.info(f"Table {table_name}: Reset sequence {sequence_name}") except psycopg2.errors.UndefinedTable as e: # Sequence might not exist for explicit PK tables logger.warning(f"Could not reset sequence {sequence_name}: {e}") pg_conn.rollback() # Verify counts stats["postgres_count"] = get_record_count(pg_conn, table_name, is_sqlite=False) if stats["postgres_count"] == stats["sqlite_count"]: logger.info( f"Table {table_name}: SUCCESS - {stats['postgres_count']} records migrated" ) stats["success"] = True elif stats["postgres_count"] == stats["inserted"]: logger.warning( f"Table {table_name}: PARTIAL - {stats['inserted']} inserted, " f"{stats['skipped']} skipped (orphaned FK records)" ) stats["success"] = ( True # Partial success is acceptable for orphaned records ) else: logger.error( f"Table {table_name}: MISMATCH - SQLite: {stats['sqlite_count']}, " f"PostgreSQL: {stats['postgres_count']}" ) stats["success"] = False except Exception as e: logger.error(f"Table {table_name}: FAILED - {e}") stats["errors"].append({"error": type(e).__name__, "message": str(e)}) stats["success"] = False pg_conn.rollback() return stats def verify_id_preservation( sqlite_conn: sqlite3.Connection, pg_conn: psycopg2.extensions.connection, sample_tables: List[str] = None, ) -> bool: """ Verify that primary key IDs were preserved correctly. This is a CRITICAL check - if IDs don't match, the migration has failed. """ if sample_tables is None: sample_tables = ["player", "team", "card", "stratgame"] all_match = True for table_name in sample_tables: pk_column = get_primary_key_column(table_name) # Get first and last 5 IDs from SQLite sqlite_cursor = sqlite_conn.execute( f"SELECT {pk_column} FROM {table_name} ORDER BY {pk_column} LIMIT 5" ) sqlite_first = [row[0] for row in sqlite_cursor.fetchall()] sqlite_cursor = sqlite_conn.execute( f"SELECT {pk_column} FROM {table_name} ORDER BY {pk_column} DESC LIMIT 5" ) sqlite_last = [row[0] for row in sqlite_cursor.fetchall()] # Get same IDs from PostgreSQL pg_cursor = pg_conn.cursor() pg_cursor.execute( f"SELECT {pk_column} FROM {table_name} ORDER BY {pk_column} LIMIT 5" ) pg_first = [row[0] for row in pg_cursor.fetchall()] pg_cursor.execute( f"SELECT {pk_column} FROM {table_name} ORDER BY {pk_column} DESC LIMIT 5" ) pg_last = [row[0] for row in pg_cursor.fetchall()] if sqlite_first == pg_first and sqlite_last == pg_last: logger.info(f"ID Verification {table_name}: PASS - IDs match") else: logger.error( f"ID Verification {table_name}: FAIL - " f"SQLite first: {sqlite_first}, PG first: {pg_first}, " f"SQLite last: {sqlite_last}, PG last: {pg_last}" ) all_match = False return all_match def main(): parser = argparse.ArgumentParser( description="Migrate Paper Dynasty from SQLite to PostgreSQL" ) parser.add_argument( "--dry-run", action="store_true", help="Validate without making changes" ) parser.add_argument("--table", type=str, help="Migrate only this table") parser.add_argument( "--sqlite-path", type=str, default="storage/pd_master.db", help="Path to SQLite database", ) parser.add_argument( "--batch-size", type=int, default=500, help="Batch size for inserts" ) parser.add_argument( "--skip-verification", action="store_true", help="Skip ID verification" ) args = parser.parse_args() logger.info("=" * 60) logger.info("Paper Dynasty SQLite to PostgreSQL Migration") logger.info("=" * 60) if args.dry_run: logger.info("DRY RUN MODE - No changes will be made") # Connect to databases try: sqlite_conn = get_sqlite_connection(args.sqlite_path) logger.info(f"Connected to SQLite: {args.sqlite_path}") except FileNotFoundError as e: logger.error(str(e)) sys.exit(1) try: pg_conn = get_postgres_connection() logger.info( f"Connected to PostgreSQL: {os.environ['POSTGRES_HOST']}/{os.environ['POSTGRES_DB']}" ) except EnvironmentError as e: logger.error(str(e)) sys.exit(1) except psycopg2.Error as e: logger.error(f"PostgreSQL connection failed: {e}") sys.exit(1) # Determine tables to migrate tables_to_migrate = [args.table] if args.table else MIGRATION_ORDER # Validate tables exist available_tables = set() cursor = sqlite_conn.execute("SELECT name FROM sqlite_master WHERE type='table'") for row in cursor: available_tables.add(row[0]) for table in tables_to_migrate: if table not in available_tables: logger.warning(f"Table {table} not found in SQLite, skipping") tables_to_migrate.remove(table) # Migration summary results = [] start_time = datetime.now() logger.info(f"Migrating {len(tables_to_migrate)} tables...") logger.info("-" * 60) for table_name in tables_to_migrate: stats = migrate_table( sqlite_conn, pg_conn, table_name, batch_size=args.batch_size, dry_run=args.dry_run, ) results.append(stats) logger.info("-" * 60) # Summary elapsed = datetime.now() - start_time successful = sum(1 for r in results if r["success"]) total_records = sum(r["inserted"] for r in results) total_skipped = sum(r["skipped"] for r in results) logger.info("=" * 60) logger.info("MIGRATION SUMMARY") logger.info("=" * 60) logger.info(f"Tables: {successful}/{len(results)} successful") logger.info(f"Records: {total_records} inserted, {total_skipped} skipped") logger.info(f"Duration: {elapsed}") # Failed tables failed = [r for r in results if not r["success"]] if failed: logger.error("FAILED TABLES:") for r in failed: logger.error(f" - {r['table']}: {r['errors']}") # ID Verification (CRITICAL) if not args.dry_run and not args.skip_verification: logger.info("-" * 60) logger.info("VERIFYING ID PRESERVATION...") if verify_id_preservation(sqlite_conn, pg_conn): logger.info("ID VERIFICATION: PASS - All IDs preserved correctly") else: logger.error("ID VERIFICATION: FAIL - IDs do not match!") logger.error( "THIS IS A CRITICAL FAILURE - Foreign key references may be broken" ) sys.exit(1) # Close connections sqlite_conn.close() pg_conn.close() if all(r["success"] for r in results): logger.info("MIGRATION COMPLETE - SUCCESS") sys.exit(0) else: logger.warning("MIGRATION COMPLETE - PARTIAL SUCCESS (some tables failed)") sys.exit(1) if __name__ == "__main__": main()