#!/usr/bin/env python3 """ Migration script to transfer custom commands from old database to new schema. This script: 1. Reads existing commands and creators from sba_is_fun.db 2. Maps the old schema to the new custom_commands schema 3. Migrates all data preserving relationships and metadata 4. Provides detailed logging and validation Usage: python migrate_custom_commands.py --source /path/to/sba_is_fun.db --target /path/to/sba_master.db [--dry-run] """ import sqlite3 import json import logging import argparse from datetime import datetime from typing import Dict, List, Tuple, Optional class CustomCommandMigrator: def __init__(self, source_db: str, target_db: str, dry_run: bool = False): self.source_db = source_db self.target_db = target_db self.dry_run = dry_run self.setup_logging() def setup_logging(self): """Setup logging configuration""" logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(f'migration_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'), logging.StreamHandler() ] ) self.logger = logging.getLogger('migrate_custom_commands.CustomCommandMigrator') def validate_source_database(self) -> bool: """Validate that source database has expected tables and structure""" try: conn = sqlite3.connect(self.source_db) cursor = conn.cursor() # Check for required tables cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name IN ('command', 'creator');") tables = [row[0] for row in cursor.fetchall()] if 'command' not in tables or 'creator' not in tables: self.logger.error(f"Required tables missing. Found: {tables}") return False # Check command table structure cursor.execute("PRAGMA table_info(command);") command_cols = [row[1] for row in cursor.fetchall()] required_command_cols = ['id', 'name', 'message', 'creator_id', 'createtime'] for col in required_command_cols: if col not in command_cols: self.logger.error(f"Required column '{col}' missing from command table") return False # Check creator table structure cursor.execute("PRAGMA table_info(creator);") creator_cols = [row[1] for row in cursor.fetchall()] required_creator_cols = ['id', 'name', 'discordid'] for col in required_creator_cols: if col not in creator_cols: self.logger.error(f"Required column '{col}' missing from creator table") return False conn.close() self.logger.info("Source database validation passed") return True except Exception as e: self.logger.error(f"Error validating source database: {e}") return False def validate_target_database(self) -> bool: """Validate that target database has the new custom_commands tables""" try: conn = sqlite3.connect(self.target_db) cursor = conn.cursor() # Check for required tables cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name IN ('custom_commands', 'custom_command_creators');") tables = [row[0] for row in cursor.fetchall()] if 'custom_commands' not in tables or 'custom_command_creators' not in tables: self.logger.error(f"Target tables missing. Found: {tables}") self.logger.error("Please ensure the new custom_commands schema has been created first") return False conn.close() self.logger.info("Target database validation passed") return True except Exception as e: self.logger.error(f"Error validating target database: {e}") return False def load_source_data(self) -> Tuple[List[Dict], List[Dict]]: """Load creators and commands from source database""" conn = sqlite3.connect(self.source_db) cursor = conn.cursor() # Load creators self.logger.info("Loading creators from source database...") cursor.execute("SELECT id, name, discordid FROM creator ORDER BY id;") creators_raw = cursor.fetchall() creators = [] for row in creators_raw: creators.append({ 'old_id': row[0], 'name': row[1], 'discord_id': row[2] }) self.logger.info(f"Loaded {len(creators)} creators") # Load commands self.logger.info("Loading commands from source database...") cursor.execute(""" SELECT c.id, c.name, c.message, c.creator_id, c.createtime, c.last_used, c.sent_warns, cr.name as creator_name, cr.discordid as creator_discord_id FROM command c LEFT JOIN creator cr ON c.creator_id = cr.id ORDER BY c.id; """) commands_raw = cursor.fetchall() commands = [] for row in commands_raw: # Parse last_used datetime last_used = None if row[5]: # last_used try: last_used = datetime.fromisoformat(row[5]).isoformat() except: last_used = row[5] # Keep original if parsing fails # Parse createtime created_at = None if row[4]: # createtime try: created_at = datetime.fromisoformat(row[4]).isoformat() except: created_at = row[4] # Keep original if parsing fails commands.append({ 'old_id': row[0], 'name': row[1], 'content': row[2], # message -> content 'old_creator_id': row[3], 'created_at': created_at, 'last_used': last_used, 'sent_warns': row[6], 'creator_name': row[7], 'creator_discord_id': row[8] }) self.logger.info(f"Loaded {len(commands)} commands") conn.close() return creators, commands def migrate_creators(self, creators: List[Dict]) -> Dict[int, int]: """Migrate creators and return mapping of old_id -> new_id""" if self.dry_run: self.logger.info(f"[DRY RUN] Would migrate {len(creators)} creators") return {creator['old_id']: creator['old_id'] for creator in creators} # Mock mapping conn = sqlite3.connect(self.target_db) cursor = conn.cursor() creator_id_mapping = {} now = datetime.now().isoformat() for creator in creators: try: # Check if creator already exists by discord_id cursor.execute("SELECT id FROM custom_command_creators WHERE discord_id = ?", (creator['discord_id'],)) existing = cursor.fetchone() if existing: creator_id_mapping[creator['old_id']] = existing[0] self.logger.info(f"Creator '{creator['name']}' (Discord: {creator['discord_id']}) already exists with ID {existing[0]}") continue # Insert new creator cursor.execute(""" INSERT INTO custom_command_creators (discord_id, username, display_name, created_at, total_commands, active_commands) VALUES (?, ?, ?, ?, 0, 0) """, (creator['discord_id'], creator['name'], creator['name'], now)) new_id = cursor.lastrowid creator_id_mapping[creator['old_id']] = new_id self.logger.info(f"Migrated creator '{creator['name']}': {creator['old_id']} -> {new_id}") except Exception as e: self.logger.error(f"Error migrating creator {creator}: {e}") raise conn.commit() conn.close() self.logger.info(f"Successfully migrated {len(creator_id_mapping)} creators") return creator_id_mapping def migrate_commands(self, commands: List[Dict], creator_id_mapping: Dict[int, int]) -> None: """Migrate commands using the creator ID mapping""" if self.dry_run: self.logger.info(f"[DRY RUN] Would migrate {len(commands)} commands") return conn = sqlite3.connect(self.target_db) cursor = conn.cursor() migrated_count = 0 skipped_count = 0 for command in commands: try: # Map old creator_id to new creator_id if command['old_creator_id'] not in creator_id_mapping: self.logger.warning(f"Skipping command '{command['name']}' - creator ID {command['old_creator_id']} not found in mapping") skipped_count += 1 continue new_creator_id = creator_id_mapping[command['old_creator_id']] # Check if command already exists by name cursor.execute("SELECT id FROM custom_commands WHERE name = ?", (command['name'],)) existing = cursor.fetchone() if existing: self.logger.warning(f"Command '{command['name']}' already exists with ID {existing[0]} - skipping") skipped_count += 1 continue # Determine if command was warned/inactive based on sent_warns warning_sent = bool(command['sent_warns'] and command['sent_warns'] != 0) # For migrated commands, ensure last_used is at least the migration date # to prevent immediate deletion eligibility migration_date = datetime.now().isoformat() last_used = command['last_used'] # If command hasn't been used recently, set last_used to migration date # to give it a grace period if last_used: try: last_used_dt = datetime.fromisoformat(last_used.replace('Z', '+00:00')) # If last used more than 60 days ago, update to migration date if (datetime.now() - last_used_dt).days > 60: last_used = migration_date self.logger.info(f"Updated last_used for command '{command['name']}' to migration date") except: # If we can't parse the date, use migration date last_used = migration_date else: # If no last_used date, use migration date last_used = migration_date # Add migration tag to indicate this is a migrated command tags = '["migrated"]' # Insert command cursor.execute(""" INSERT INTO custom_commands (name, content, creator_id, created_at, updated_at, last_used, use_count, warning_sent, is_active, tags) VALUES (?, ?, ?, ?, ?, ?, 0, ?, 1, ?) """, ( command['name'], command['content'], new_creator_id, command['created_at'], None, # updated_at last_used, warning_sent, tags )) migrated_count += 1 if migrated_count % 10 == 0: self.logger.info(f"Migrated {migrated_count} commands...") except Exception as e: self.logger.error(f"Error migrating command {command}: {e}") raise conn.commit() conn.close() self.logger.info(f"Successfully migrated {migrated_count} commands, skipped {skipped_count}") def update_creator_stats(self) -> None: """Update creator statistics after migration""" if self.dry_run: self.logger.info("[DRY RUN] Would update creator statistics") return conn = sqlite3.connect(self.target_db) cursor = conn.cursor() # Update creator stats cursor.execute(""" UPDATE custom_command_creators SET total_commands = ( SELECT COUNT(*) FROM custom_commands WHERE creator_id = custom_command_creators.id ), active_commands = ( SELECT COUNT(*) FROM custom_commands WHERE creator_id = custom_command_creators.id AND is_active = 1 ) """) conn.commit() conn.close() self.logger.info("Updated creator statistics") def generate_migration_report(self) -> None: """Generate a detailed migration report""" if self.dry_run: conn_source = sqlite3.connect(self.source_db) cursor_source = conn_source.cursor() cursor_source.execute("SELECT COUNT(*) FROM creator") source_creators = cursor_source.fetchone()[0] cursor_source.execute("SELECT COUNT(*) FROM command") source_commands = cursor_source.fetchone()[0] conn_source.close() self.logger.info(f""" === DRY RUN MIGRATION REPORT === Source Database: {self.source_db} Target Database: {self.target_db} Would migrate: - {source_creators} creators - {source_commands} commands No actual changes made (dry run mode). """.strip()) return # Real migration report conn_source = sqlite3.connect(self.source_db) conn_target = sqlite3.connect(self.target_db) cursor_source = conn_source.cursor() cursor_target = conn_target.cursor() # Source counts cursor_source.execute("SELECT COUNT(*) FROM creator") source_creators = cursor_source.fetchone()[0] cursor_source.execute("SELECT COUNT(*) FROM command") source_commands = cursor_source.fetchone()[0] # Target counts cursor_target.execute("SELECT COUNT(*) FROM custom_command_creators") target_creators = cursor_target.fetchone()[0] cursor_target.execute("SELECT COUNT(*) FROM custom_commands") target_commands = cursor_target.fetchone()[0] # Get sample of migrated data cursor_target.execute(""" SELECT cc.name, cc.content, ccc.username FROM custom_commands cc JOIN custom_command_creators ccc ON cc.creator_id = ccc.id LIMIT 5 """) sample_commands = cursor_target.fetchall() conn_source.close() conn_target.close() self.logger.info(f""" === MIGRATION REPORT === Source Database: {self.source_db} Target Database: {self.target_db} Migration Results: - Source creators: {source_creators} -> Target creators: {target_creators} - Source commands: {source_commands} -> Target commands: {target_commands} Sample migrated commands: """.strip()) for cmd in sample_commands: self.logger.info(f" '{cmd[0]}' by {cmd[2]}: {cmd[1][:50]}...") def run_migration(self) -> bool: """Execute the full migration process""" self.logger.info(f"Starting custom commands migration {'(DRY RUN)' if self.dry_run else ''}") self.logger.info(f"Source: {self.source_db}") self.logger.info(f"Target: {self.target_db}") try: # Validate databases if not self.validate_source_database(): return False if not self.validate_target_database(): return False # Load source data creators, commands = self.load_source_data() # Migrate creators first creator_id_mapping = self.migrate_creators(creators) # Migrate commands self.migrate_commands(commands, creator_id_mapping) # Update statistics self.update_creator_stats() # Generate report self.generate_migration_report() self.logger.info("Migration completed successfully!") return True except Exception as e: self.logger.error(f"Migration failed: {e}") return False def main(): parser = argparse.ArgumentParser(description='Migrate custom commands from old database to new schema') parser.add_argument('--source', required=True, help='Path to source database (sba_is_fun.db)') parser.add_argument('--target', required=True, help='Path to target database (sba_master.db)') parser.add_argument('--dry-run', action='store_true', help='Run in dry-run mode (no actual changes)') args = parser.parse_args() migrator = CustomCommandMigrator(args.source, args.target, args.dry_run) success = migrator.run_migration() exit(0 if success else 1) if __name__ == '__main__': main()