major-domo-database/migrate_custom_commands.py
Cal Corum 57c943e340 CLAUDE: Add custom commands system with migration from legacy database
- Add CustomCommandCreator and CustomCommand models to db_engine.py
- Add comprehensive custom commands API router with full CRUD operations
- Include migration script for transferring 140 commands from sba_is_fun.db
- Add FastAPI integration for /api/v3/custom_commands endpoints
- Implement usage tracking, search, autocomplete, and statistics features
- Add grace period handling for unused commands to prevent deletion
- Include comprehensive documentation for migration process

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-17 16:31:39 -05:00

458 lines
18 KiB
Python

#!/usr/bin/env python3
"""
Migration script to transfer custom commands from old database to new schema.
This script:
1. Reads existing commands and creators from sba_is_fun.db
2. Maps the old schema to the new custom_commands schema
3. Migrates all data preserving relationships and metadata
4. Provides detailed logging and validation
Usage:
python migrate_custom_commands.py --source /path/to/sba_is_fun.db --target /path/to/sba_master.db [--dry-run]
"""
import sqlite3
import json
import logging
import argparse
from datetime import datetime
from typing import Dict, List, Tuple, Optional
class CustomCommandMigrator:
def __init__(self, source_db: str, target_db: str, dry_run: bool = False):
self.source_db = source_db
self.target_db = target_db
self.dry_run = dry_run
self.setup_logging()
def setup_logging(self):
"""Setup logging configuration"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(f'migration_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger('migrate_custom_commands.CustomCommandMigrator')
def validate_source_database(self) -> bool:
"""Validate that source database has expected tables and structure"""
try:
conn = sqlite3.connect(self.source_db)
cursor = conn.cursor()
# Check for required tables
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name IN ('command', 'creator');")
tables = [row[0] for row in cursor.fetchall()]
if 'command' not in tables or 'creator' not in tables:
self.logger.error(f"Required tables missing. Found: {tables}")
return False
# Check command table structure
cursor.execute("PRAGMA table_info(command);")
command_cols = [row[1] for row in cursor.fetchall()]
required_command_cols = ['id', 'name', 'message', 'creator_id', 'createtime']
for col in required_command_cols:
if col not in command_cols:
self.logger.error(f"Required column '{col}' missing from command table")
return False
# Check creator table structure
cursor.execute("PRAGMA table_info(creator);")
creator_cols = [row[1] for row in cursor.fetchall()]
required_creator_cols = ['id', 'name', 'discordid']
for col in required_creator_cols:
if col not in creator_cols:
self.logger.error(f"Required column '{col}' missing from creator table")
return False
conn.close()
self.logger.info("Source database validation passed")
return True
except Exception as e:
self.logger.error(f"Error validating source database: {e}")
return False
def validate_target_database(self) -> bool:
"""Validate that target database has the new custom_commands tables"""
try:
conn = sqlite3.connect(self.target_db)
cursor = conn.cursor()
# Check for required tables
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name IN ('custom_commands', 'custom_command_creators');")
tables = [row[0] for row in cursor.fetchall()]
if 'custom_commands' not in tables or 'custom_command_creators' not in tables:
self.logger.error(f"Target tables missing. Found: {tables}")
self.logger.error("Please ensure the new custom_commands schema has been created first")
return False
conn.close()
self.logger.info("Target database validation passed")
return True
except Exception as e:
self.logger.error(f"Error validating target database: {e}")
return False
def load_source_data(self) -> Tuple[List[Dict], List[Dict]]:
"""Load creators and commands from source database"""
conn = sqlite3.connect(self.source_db)
cursor = conn.cursor()
# Load creators
self.logger.info("Loading creators from source database...")
cursor.execute("SELECT id, name, discordid FROM creator ORDER BY id;")
creators_raw = cursor.fetchall()
creators = []
for row in creators_raw:
creators.append({
'old_id': row[0],
'name': row[1],
'discord_id': row[2]
})
self.logger.info(f"Loaded {len(creators)} creators")
# Load commands
self.logger.info("Loading commands from source database...")
cursor.execute("""
SELECT c.id, c.name, c.message, c.creator_id, c.createtime, c.last_used, c.sent_warns,
cr.name as creator_name, cr.discordid as creator_discord_id
FROM command c
LEFT JOIN creator cr ON c.creator_id = cr.id
ORDER BY c.id;
""")
commands_raw = cursor.fetchall()
commands = []
for row in commands_raw:
# Parse last_used datetime
last_used = None
if row[5]: # last_used
try:
last_used = datetime.fromisoformat(row[5]).isoformat()
except:
last_used = row[5] # Keep original if parsing fails
# Parse createtime
created_at = None
if row[4]: # createtime
try:
created_at = datetime.fromisoformat(row[4]).isoformat()
except:
created_at = row[4] # Keep original if parsing fails
commands.append({
'old_id': row[0],
'name': row[1],
'content': row[2], # message -> content
'old_creator_id': row[3],
'created_at': created_at,
'last_used': last_used,
'sent_warns': row[6],
'creator_name': row[7],
'creator_discord_id': row[8]
})
self.logger.info(f"Loaded {len(commands)} commands")
conn.close()
return creators, commands
def migrate_creators(self, creators: List[Dict]) -> Dict[int, int]:
"""Migrate creators and return mapping of old_id -> new_id"""
if self.dry_run:
self.logger.info(f"[DRY RUN] Would migrate {len(creators)} creators")
return {creator['old_id']: creator['old_id'] for creator in creators} # Mock mapping
conn = sqlite3.connect(self.target_db)
cursor = conn.cursor()
creator_id_mapping = {}
now = datetime.now().isoformat()
for creator in creators:
try:
# Check if creator already exists by discord_id
cursor.execute("SELECT id FROM custom_command_creators WHERE discord_id = ?", (creator['discord_id'],))
existing = cursor.fetchone()
if existing:
creator_id_mapping[creator['old_id']] = existing[0]
self.logger.info(f"Creator '{creator['name']}' (Discord: {creator['discord_id']}) already exists with ID {existing[0]}")
continue
# Insert new creator
cursor.execute("""
INSERT INTO custom_command_creators
(discord_id, username, display_name, created_at, total_commands, active_commands)
VALUES (?, ?, ?, ?, 0, 0)
""", (creator['discord_id'], creator['name'], creator['name'], now))
new_id = cursor.lastrowid
creator_id_mapping[creator['old_id']] = new_id
self.logger.info(f"Migrated creator '{creator['name']}': {creator['old_id']} -> {new_id}")
except Exception as e:
self.logger.error(f"Error migrating creator {creator}: {e}")
raise
conn.commit()
conn.close()
self.logger.info(f"Successfully migrated {len(creator_id_mapping)} creators")
return creator_id_mapping
def migrate_commands(self, commands: List[Dict], creator_id_mapping: Dict[int, int]) -> None:
"""Migrate commands using the creator ID mapping"""
if self.dry_run:
self.logger.info(f"[DRY RUN] Would migrate {len(commands)} commands")
return
conn = sqlite3.connect(self.target_db)
cursor = conn.cursor()
migrated_count = 0
skipped_count = 0
for command in commands:
try:
# Map old creator_id to new creator_id
if command['old_creator_id'] not in creator_id_mapping:
self.logger.warning(f"Skipping command '{command['name']}' - creator ID {command['old_creator_id']} not found in mapping")
skipped_count += 1
continue
new_creator_id = creator_id_mapping[command['old_creator_id']]
# Check if command already exists by name
cursor.execute("SELECT id FROM custom_commands WHERE name = ?", (command['name'],))
existing = cursor.fetchone()
if existing:
self.logger.warning(f"Command '{command['name']}' already exists with ID {existing[0]} - skipping")
skipped_count += 1
continue
# Determine if command was warned/inactive based on sent_warns
warning_sent = bool(command['sent_warns'] and command['sent_warns'] != 0)
# For migrated commands, ensure last_used is at least the migration date
# to prevent immediate deletion eligibility
migration_date = datetime.now().isoformat()
last_used = command['last_used']
# If command hasn't been used recently, set last_used to migration date
# to give it a grace period
if last_used:
try:
last_used_dt = datetime.fromisoformat(last_used.replace('Z', '+00:00'))
# If last used more than 60 days ago, update to migration date
if (datetime.now() - last_used_dt).days > 60:
last_used = migration_date
self.logger.info(f"Updated last_used for command '{command['name']}' to migration date")
except:
# If we can't parse the date, use migration date
last_used = migration_date
else:
# If no last_used date, use migration date
last_used = migration_date
# Add migration tag to indicate this is a migrated command
tags = '["migrated"]'
# Insert command
cursor.execute("""
INSERT INTO custom_commands
(name, content, creator_id, created_at, updated_at, last_used, use_count, warning_sent, is_active, tags)
VALUES (?, ?, ?, ?, ?, ?, 0, ?, 1, ?)
""", (
command['name'],
command['content'],
new_creator_id,
command['created_at'],
None, # updated_at
last_used,
warning_sent,
tags
))
migrated_count += 1
if migrated_count % 10 == 0:
self.logger.info(f"Migrated {migrated_count} commands...")
except Exception as e:
self.logger.error(f"Error migrating command {command}: {e}")
raise
conn.commit()
conn.close()
self.logger.info(f"Successfully migrated {migrated_count} commands, skipped {skipped_count}")
def update_creator_stats(self) -> None:
"""Update creator statistics after migration"""
if self.dry_run:
self.logger.info("[DRY RUN] Would update creator statistics")
return
conn = sqlite3.connect(self.target_db)
cursor = conn.cursor()
# Update creator stats
cursor.execute("""
UPDATE custom_command_creators SET
total_commands = (
SELECT COUNT(*) FROM custom_commands
WHERE creator_id = custom_command_creators.id
),
active_commands = (
SELECT COUNT(*) FROM custom_commands
WHERE creator_id = custom_command_creators.id AND is_active = 1
)
""")
conn.commit()
conn.close()
self.logger.info("Updated creator statistics")
def generate_migration_report(self) -> None:
"""Generate a detailed migration report"""
if self.dry_run:
conn_source = sqlite3.connect(self.source_db)
cursor_source = conn_source.cursor()
cursor_source.execute("SELECT COUNT(*) FROM creator")
source_creators = cursor_source.fetchone()[0]
cursor_source.execute("SELECT COUNT(*) FROM command")
source_commands = cursor_source.fetchone()[0]
conn_source.close()
self.logger.info(f"""
=== DRY RUN MIGRATION REPORT ===
Source Database: {self.source_db}
Target Database: {self.target_db}
Would migrate:
- {source_creators} creators
- {source_commands} commands
No actual changes made (dry run mode).
""".strip())
return
# Real migration report
conn_source = sqlite3.connect(self.source_db)
conn_target = sqlite3.connect(self.target_db)
cursor_source = conn_source.cursor()
cursor_target = conn_target.cursor()
# Source counts
cursor_source.execute("SELECT COUNT(*) FROM creator")
source_creators = cursor_source.fetchone()[0]
cursor_source.execute("SELECT COUNT(*) FROM command")
source_commands = cursor_source.fetchone()[0]
# Target counts
cursor_target.execute("SELECT COUNT(*) FROM custom_command_creators")
target_creators = cursor_target.fetchone()[0]
cursor_target.execute("SELECT COUNT(*) FROM custom_commands")
target_commands = cursor_target.fetchone()[0]
# Get sample of migrated data
cursor_target.execute("""
SELECT cc.name, cc.content, ccc.username
FROM custom_commands cc
JOIN custom_command_creators ccc ON cc.creator_id = ccc.id
LIMIT 5
""")
sample_commands = cursor_target.fetchall()
conn_source.close()
conn_target.close()
self.logger.info(f"""
=== MIGRATION REPORT ===
Source Database: {self.source_db}
Target Database: {self.target_db}
Migration Results:
- Source creators: {source_creators} -> Target creators: {target_creators}
- Source commands: {source_commands} -> Target commands: {target_commands}
Sample migrated commands:
""".strip())
for cmd in sample_commands:
self.logger.info(f" '{cmd[0]}' by {cmd[2]}: {cmd[1][:50]}...")
def run_migration(self) -> bool:
"""Execute the full migration process"""
self.logger.info(f"Starting custom commands migration {'(DRY RUN)' if self.dry_run else ''}")
self.logger.info(f"Source: {self.source_db}")
self.logger.info(f"Target: {self.target_db}")
try:
# Validate databases
if not self.validate_source_database():
return False
if not self.validate_target_database():
return False
# Load source data
creators, commands = self.load_source_data()
# Migrate creators first
creator_id_mapping = self.migrate_creators(creators)
# Migrate commands
self.migrate_commands(commands, creator_id_mapping)
# Update statistics
self.update_creator_stats()
# Generate report
self.generate_migration_report()
self.logger.info("Migration completed successfully!")
return True
except Exception as e:
self.logger.error(f"Migration failed: {e}")
return False
def main():
parser = argparse.ArgumentParser(description='Migrate custom commands from old database to new schema')
parser.add_argument('--source', required=True, help='Path to source database (sba_is_fun.db)')
parser.add_argument('--target', required=True, help='Path to target database (sba_master.db)')
parser.add_argument('--dry-run', action='store_true', help='Run in dry-run mode (no actual changes)')
args = parser.parse_args()
migrator = CustomCommandMigrator(args.source, args.target, args.dry_run)
success = migrator.run_migration()
exit(0 if success else 1)
if __name__ == '__main__':
main()