#!/usr/bin/env python3 """ Migration script to transfer custom commands from old SQLite schema to new PostgreSQL schema. This script: 1. Reads data from the old 'creator' and 'command' tables in test-storage/sba_is_fun.db 2. Transforms the data to match the new schema (custom_command_creators and custom_commands) 3. Inserts the data into the target database (dev or production PostgreSQL) Usage: # Migrate to dev database python migrate_custom_commands_v2.py --source test-storage/sba_is_fun.db --target dev # Migrate to production database python migrate_custom_commands_v2.py --source test-storage/sba_is_fun.db --target prod """ import sqlite3 import os import sys import argparse from datetime import datetime import psycopg2 from psycopg2.extras import execute_values def get_postgres_connection(target): """Get PostgreSQL connection based on target environment.""" if target == 'dev': return psycopg2.connect( host=os.environ.get('POSTGRES_HOST', '10.10.0.42'), database=os.environ.get('POSTGRES_DB', 'sba_master'), user=os.environ.get('POSTGRES_USER', 'sba_admin'), password=os.environ.get('POSTGRES_PASSWORD', 'your_production_password'), port=int(os.environ.get('POSTGRES_PORT', '5432')) ) elif target == 'prod': # Production credentials should be in environment variables return psycopg2.connect( host=os.environ.get('PROD_POSTGRES_HOST', 'api.sba.manticorum.com'), database=os.environ.get('PROD_POSTGRES_DB', 'sba_master'), user=os.environ.get('PROD_POSTGRES_USER', 'sba_admin'), password=os.environ.get('PROD_POSTGRES_PASSWORD', 'your_production_password'), port=int(os.environ.get('PROD_POSTGRES_PORT', '5432')) ) else: raise ValueError(f"Invalid target: {target}. Must be 'dev' or 'prod'") def read_source_data(source_db_path): """Read creators and commands from source SQLite database.""" if not os.path.exists(source_db_path): raise FileNotFoundError(f"Source database not found: {source_db_path}") conn = sqlite3.connect(source_db_path) conn.row_factory = sqlite3.Row cursor = conn.cursor() # Read creators cursor.execute("SELECT id, name, discordid FROM creator ORDER BY id") creators = [dict(row) for row in cursor.fetchall()] # Read commands cursor.execute(""" SELECT id, name, message, creator_id, createtime, last_used, sent_warns FROM command ORDER BY id """) commands = [dict(row) for row in cursor.fetchall()] conn.close() print(f"✓ Read {len(creators)} creators and {len(commands)} commands from source") return creators, commands def transform_data(creators, commands): """Transform old schema data to new schema format.""" # Create a mapping of old creator IDs to new data creator_map = {} transformed_creators = [] for creator in creators: # Transform creator data new_creator = { 'discord_id': creator['discordid'], 'username': creator['name'], 'display_name': None, # Not in old schema 'created_at': datetime.now().isoformat(), 'total_commands': 0, 'active_commands': 0 } transformed_creators.append(new_creator) creator_map[creator['id']] = creator['discordid'] # Transform commands transformed_commands = [] skipped_names = [] for cmd in commands: # Skip commands with names that are too long (max 32 chars) if len(cmd['name']) > 32: skipped_names.append(cmd['name']) continue # Calculate use_count from last_used (if used at all, assume at least 1) use_count = 1 if cmd['last_used'] else 0 # warning_sent is True if sent_warns has a value warning_sent = bool(cmd['sent_warns']) new_command = { 'name': cmd['name'], 'content': cmd['message'], 'creator_discord_id': creator_map.get(cmd['creator_id']), 'created_at': cmd['createtime'] or datetime.now().isoformat(), 'updated_at': None, 'last_used': cmd['last_used'] or cmd['createtime'] or datetime.now().isoformat(), 'use_count': use_count, 'warning_sent': warning_sent, 'is_active': True, 'tags': '[]' # Empty JSON array } transformed_commands.append(new_command) if skipped_names: print(f"⚠ Skipped {len(skipped_names)} commands with names > 32 characters:") for name in skipped_names: print(f" - {name} ({len(name)} chars)") print(f"✓ Transformed data for {len(transformed_creators)} creators and {len(transformed_commands)} commands") return transformed_creators, transformed_commands def migrate_to_postgres(pg_conn, creators, commands): """Insert transformed data into PostgreSQL database.""" cursor = pg_conn.cursor() try: # Start transaction pg_conn.autocommit = False # Create a mapping of discord_id to new database ID creator_id_map = {} # Insert creators print(f"\nMigrating {len(creators)} creators...") for i, creator in enumerate(creators, 1): cursor.execute(""" INSERT INTO custom_command_creators (discord_id, username, display_name, created_at, total_commands, active_commands) VALUES (%(discord_id)s, %(username)s, %(display_name)s, %(created_at)s, %(total_commands)s, %(active_commands)s) ON CONFLICT (discord_id) DO UPDATE SET username = EXCLUDED.username RETURNING id, discord_id """, creator) db_id, discord_id = cursor.fetchone() creator_id_map[discord_id] = db_id if i % 10 == 0: print(f" ✓ Migrated {i}/{len(creators)} creators") print(f"✓ All {len(creators)} creators migrated") # Insert commands print(f"\nMigrating {len(commands)} commands...") migrated = 0 skipped = 0 for i, cmd in enumerate(commands, 1): # Get the new creator_id from the mapping creator_id = creator_id_map.get(cmd['creator_discord_id']) if not creator_id: print(f" ⚠ Skipping command '{cmd['name']}' - creator not found") skipped += 1 continue try: # Use savepoint to allow individual command failure without rolling back entire transaction cursor.execute("SAVEPOINT cmd_insert") cursor.execute(""" INSERT INTO custom_commands (name, content, creator_id, created_at, updated_at, last_used, use_count, warning_sent, is_active, tags) VALUES (%(name)s, %(content)s, %(creator_id)s, %(created_at)s, %(updated_at)s, %(last_used)s, %(use_count)s, %(warning_sent)s, %(is_active)s, %(tags)s) ON CONFLICT (name) DO UPDATE SET content = EXCLUDED.content, last_used = EXCLUDED.last_used, use_count = EXCLUDED.use_count """, {**cmd, 'creator_id': creator_id}) cursor.execute("RELEASE SAVEPOINT cmd_insert") migrated += 1 if i % 25 == 0: print(f" ✓ Migrated {migrated}/{len(commands)} commands") except Exception as e: cursor.execute("ROLLBACK TO SAVEPOINT cmd_insert") print(f" ⚠ Error migrating command '{cmd['name']}': {e}") skipped += 1 print(f"✓ Migrated {migrated} commands ({skipped} skipped)") # Update creator stats print("\nUpdating creator statistics...") cursor.execute(""" UPDATE custom_command_creators cc SET total_commands = ( SELECT COUNT(*) FROM custom_commands WHERE creator_id = cc.id ), active_commands = ( SELECT COUNT(*) FROM custom_commands WHERE creator_id = cc.id AND is_active = TRUE ) """) # Commit transaction pg_conn.commit() print("✓ Transaction committed successfully") # Print final stats cursor.execute("SELECT COUNT(*) FROM custom_command_creators") total_creators = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM custom_commands") total_commands = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM custom_commands WHERE is_active = TRUE") active_commands = cursor.fetchone()[0] print(f"\n{'='*60}") print(f"Migration Summary:") print(f" Total creators in database: {total_creators}") print(f" Total commands in database: {total_commands}") print(f" Active commands: {active_commands}") print(f"{'='*60}") except Exception as e: pg_conn.rollback() print(f"\n✗ Migration failed: {e}") raise finally: cursor.close() def main(): parser = argparse.ArgumentParser( description='Migrate custom commands from old SQLite schema to new PostgreSQL schema' ) parser.add_argument( '--source', default='test-storage/sba_is_fun.db', help='Path to source SQLite database (default: test-storage/sba_is_fun.db)' ) parser.add_argument( '--target', choices=['dev', 'prod'], required=True, help='Target database environment (dev or prod)' ) parser.add_argument( '--dry-run', action='store_true', help='Show what would be migrated without actually doing it' ) parser.add_argument( '--yes', action='store_true', help='Skip confirmation prompt for production' ) args = parser.parse_args() print(f"{'='*60}") print(f"Custom Commands Migration Script") print(f"{'='*60}") print(f"Source: {args.source}") print(f"Target: {args.target.upper()} PostgreSQL database") print(f"Dry run: {'Yes' if args.dry_run else 'No'}") print(f"{'='*60}\n") # Read source data creators, commands = read_source_data(args.source) # Transform data transformed_creators, transformed_commands = transform_data(creators, commands) if args.dry_run: print("\n=== DRY RUN MODE ===") print(f"\nWould migrate {len(transformed_creators)} creators:") for creator in transformed_creators[:5]: print(f" - {creator['username']} (discord_id: {creator['discord_id']})") if len(transformed_creators) > 5: print(f" ... and {len(transformed_creators) - 5} more") print(f"\nWould migrate {len(transformed_commands)} commands:") for cmd in transformed_commands[:5]: print(f" - {cmd['name']} (by discord_id: {cmd['creator_discord_id']})") if len(transformed_commands) > 5: print(f" ... and {len(transformed_commands) - 5} more") print("\n=== No changes made (dry run) ===") return # Confirm before proceeding with production if args.target == 'prod' and not args.yes: print("\n⚠️ WARNING: You are about to migrate to PRODUCTION!") response = input("Type 'YES' to continue: ") if response != 'YES': print("Migration cancelled.") return # Connect to PostgreSQL and migrate print(f"\nConnecting to {args.target.upper()} PostgreSQL database...") try: pg_conn = get_postgres_connection(args.target) print("✓ Connected successfully") migrate_to_postgres(pg_conn, transformed_creators, transformed_commands) pg_conn.close() print("\n✓ Migration completed successfully!") except Exception as e: print(f"\n✗ Migration failed: {e}") sys.exit(1) if __name__ == '__main__': main()