458 lines
18 KiB
Python
458 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Migration script to transfer custom commands from old database to new schema.
|
|
|
|
This script:
|
|
1. Reads existing commands and creators from sba_is_fun.db
|
|
2. Maps the old schema to the new custom_commands schema
|
|
3. Migrates all data preserving relationships and metadata
|
|
4. Provides detailed logging and validation
|
|
|
|
Usage:
|
|
python migrate_custom_commands.py --source /path/to/sba_is_fun.db --target /path/to/sba_master.db [--dry-run]
|
|
"""
|
|
|
|
import sqlite3
|
|
import json
|
|
import logging
|
|
import argparse
|
|
from datetime import datetime
|
|
from typing import Dict, List, Tuple, Optional
|
|
|
|
|
|
class CustomCommandMigrator:
|
|
def __init__(self, source_db: str, target_db: str, dry_run: bool = False):
|
|
self.source_db = source_db
|
|
self.target_db = target_db
|
|
self.dry_run = dry_run
|
|
self.setup_logging()
|
|
|
|
def setup_logging(self):
|
|
"""Setup logging configuration"""
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler(f'migration_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
self.logger = logging.getLogger('migrate_custom_commands.CustomCommandMigrator')
|
|
|
|
def validate_source_database(self) -> bool:
|
|
"""Validate that source database has expected tables and structure"""
|
|
try:
|
|
conn = sqlite3.connect(self.source_db)
|
|
cursor = conn.cursor()
|
|
|
|
# Check for required tables
|
|
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name IN ('command', 'creator');")
|
|
tables = [row[0] for row in cursor.fetchall()]
|
|
|
|
if 'command' not in tables or 'creator' not in tables:
|
|
self.logger.error(f"Required tables missing. Found: {tables}")
|
|
return False
|
|
|
|
# Check command table structure
|
|
cursor.execute("PRAGMA table_info(command);")
|
|
command_cols = [row[1] for row in cursor.fetchall()]
|
|
required_command_cols = ['id', 'name', 'message', 'creator_id', 'createtime']
|
|
|
|
for col in required_command_cols:
|
|
if col not in command_cols:
|
|
self.logger.error(f"Required column '{col}' missing from command table")
|
|
return False
|
|
|
|
# Check creator table structure
|
|
cursor.execute("PRAGMA table_info(creator);")
|
|
creator_cols = [row[1] for row in cursor.fetchall()]
|
|
required_creator_cols = ['id', 'name', 'discordid']
|
|
|
|
for col in required_creator_cols:
|
|
if col not in creator_cols:
|
|
self.logger.error(f"Required column '{col}' missing from creator table")
|
|
return False
|
|
|
|
conn.close()
|
|
self.logger.info("Source database validation passed")
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error validating source database: {e}")
|
|
return False
|
|
|
|
def validate_target_database(self) -> bool:
|
|
"""Validate that target database has the new custom_commands tables"""
|
|
try:
|
|
conn = sqlite3.connect(self.target_db)
|
|
cursor = conn.cursor()
|
|
|
|
# Check for required tables
|
|
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name IN ('custom_commands', 'custom_command_creators');")
|
|
tables = [row[0] for row in cursor.fetchall()]
|
|
|
|
if 'custom_commands' not in tables or 'custom_command_creators' not in tables:
|
|
self.logger.error(f"Target tables missing. Found: {tables}")
|
|
self.logger.error("Please ensure the new custom_commands schema has been created first")
|
|
return False
|
|
|
|
conn.close()
|
|
self.logger.info("Target database validation passed")
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error validating target database: {e}")
|
|
return False
|
|
|
|
def load_source_data(self) -> Tuple[List[Dict], List[Dict]]:
|
|
"""Load creators and commands from source database"""
|
|
conn = sqlite3.connect(self.source_db)
|
|
cursor = conn.cursor()
|
|
|
|
# Load creators
|
|
self.logger.info("Loading creators from source database...")
|
|
cursor.execute("SELECT id, name, discordid FROM creator ORDER BY id;")
|
|
creators_raw = cursor.fetchall()
|
|
|
|
creators = []
|
|
for row in creators_raw:
|
|
creators.append({
|
|
'old_id': row[0],
|
|
'name': row[1],
|
|
'discord_id': row[2]
|
|
})
|
|
|
|
self.logger.info(f"Loaded {len(creators)} creators")
|
|
|
|
# Load commands
|
|
self.logger.info("Loading commands from source database...")
|
|
cursor.execute("""
|
|
SELECT c.id, c.name, c.message, c.creator_id, c.createtime, c.last_used, c.sent_warns,
|
|
cr.name as creator_name, cr.discordid as creator_discord_id
|
|
FROM command c
|
|
LEFT JOIN creator cr ON c.creator_id = cr.id
|
|
ORDER BY c.id;
|
|
""")
|
|
commands_raw = cursor.fetchall()
|
|
|
|
commands = []
|
|
for row in commands_raw:
|
|
# Parse last_used datetime
|
|
last_used = None
|
|
if row[5]: # last_used
|
|
try:
|
|
last_used = datetime.fromisoformat(row[5]).isoformat()
|
|
except:
|
|
last_used = row[5] # Keep original if parsing fails
|
|
|
|
# Parse createtime
|
|
created_at = None
|
|
if row[4]: # createtime
|
|
try:
|
|
created_at = datetime.fromisoformat(row[4]).isoformat()
|
|
except:
|
|
created_at = row[4] # Keep original if parsing fails
|
|
|
|
commands.append({
|
|
'old_id': row[0],
|
|
'name': row[1],
|
|
'content': row[2], # message -> content
|
|
'old_creator_id': row[3],
|
|
'created_at': created_at,
|
|
'last_used': last_used,
|
|
'sent_warns': row[6],
|
|
'creator_name': row[7],
|
|
'creator_discord_id': row[8]
|
|
})
|
|
|
|
self.logger.info(f"Loaded {len(commands)} commands")
|
|
|
|
conn.close()
|
|
return creators, commands
|
|
|
|
def migrate_creators(self, creators: List[Dict]) -> Dict[int, int]:
|
|
"""Migrate creators and return mapping of old_id -> new_id"""
|
|
if self.dry_run:
|
|
self.logger.info(f"[DRY RUN] Would migrate {len(creators)} creators")
|
|
return {creator['old_id']: creator['old_id'] for creator in creators} # Mock mapping
|
|
|
|
conn = sqlite3.connect(self.target_db)
|
|
cursor = conn.cursor()
|
|
|
|
creator_id_mapping = {}
|
|
now = datetime.now().isoformat()
|
|
|
|
for creator in creators:
|
|
try:
|
|
# Check if creator already exists by discord_id
|
|
cursor.execute("SELECT id FROM custom_command_creators WHERE discord_id = ?", (creator['discord_id'],))
|
|
existing = cursor.fetchone()
|
|
|
|
if existing:
|
|
creator_id_mapping[creator['old_id']] = existing[0]
|
|
self.logger.info(f"Creator '{creator['name']}' (Discord: {creator['discord_id']}) already exists with ID {existing[0]}")
|
|
continue
|
|
|
|
# Insert new creator
|
|
cursor.execute("""
|
|
INSERT INTO custom_command_creators
|
|
(discord_id, username, display_name, created_at, total_commands, active_commands)
|
|
VALUES (?, ?, ?, ?, 0, 0)
|
|
""", (creator['discord_id'], creator['name'], creator['name'], now))
|
|
|
|
new_id = cursor.lastrowid
|
|
creator_id_mapping[creator['old_id']] = new_id
|
|
|
|
self.logger.info(f"Migrated creator '{creator['name']}': {creator['old_id']} -> {new_id}")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error migrating creator {creator}: {e}")
|
|
raise
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
self.logger.info(f"Successfully migrated {len(creator_id_mapping)} creators")
|
|
return creator_id_mapping
|
|
|
|
def migrate_commands(self, commands: List[Dict], creator_id_mapping: Dict[int, int]) -> None:
|
|
"""Migrate commands using the creator ID mapping"""
|
|
if self.dry_run:
|
|
self.logger.info(f"[DRY RUN] Would migrate {len(commands)} commands")
|
|
return
|
|
|
|
conn = sqlite3.connect(self.target_db)
|
|
cursor = conn.cursor()
|
|
|
|
migrated_count = 0
|
|
skipped_count = 0
|
|
|
|
for command in commands:
|
|
try:
|
|
# Map old creator_id to new creator_id
|
|
if command['old_creator_id'] not in creator_id_mapping:
|
|
self.logger.warning(f"Skipping command '{command['name']}' - creator ID {command['old_creator_id']} not found in mapping")
|
|
skipped_count += 1
|
|
continue
|
|
|
|
new_creator_id = creator_id_mapping[command['old_creator_id']]
|
|
|
|
# Check if command already exists by name
|
|
cursor.execute("SELECT id FROM custom_commands WHERE name = ?", (command['name'],))
|
|
existing = cursor.fetchone()
|
|
|
|
if existing:
|
|
self.logger.warning(f"Command '{command['name']}' already exists with ID {existing[0]} - skipping")
|
|
skipped_count += 1
|
|
continue
|
|
|
|
# Determine if command was warned/inactive based on sent_warns
|
|
warning_sent = bool(command['sent_warns'] and command['sent_warns'] != 0)
|
|
|
|
# For migrated commands, ensure last_used is at least the migration date
|
|
# to prevent immediate deletion eligibility
|
|
migration_date = datetime.now().isoformat()
|
|
last_used = command['last_used']
|
|
|
|
# If command hasn't been used recently, set last_used to migration date
|
|
# to give it a grace period
|
|
if last_used:
|
|
try:
|
|
last_used_dt = datetime.fromisoformat(last_used.replace('Z', '+00:00'))
|
|
# If last used more than 60 days ago, update to migration date
|
|
if (datetime.now() - last_used_dt).days > 60:
|
|
last_used = migration_date
|
|
self.logger.info(f"Updated last_used for command '{command['name']}' to migration date")
|
|
except:
|
|
# If we can't parse the date, use migration date
|
|
last_used = migration_date
|
|
else:
|
|
# If no last_used date, use migration date
|
|
last_used = migration_date
|
|
|
|
# Add migration tag to indicate this is a migrated command
|
|
tags = '["migrated"]'
|
|
|
|
# Insert command
|
|
cursor.execute("""
|
|
INSERT INTO custom_commands
|
|
(name, content, creator_id, created_at, updated_at, last_used, use_count, warning_sent, is_active, tags)
|
|
VALUES (?, ?, ?, ?, ?, ?, 0, ?, 1, ?)
|
|
""", (
|
|
command['name'],
|
|
command['content'],
|
|
new_creator_id,
|
|
command['created_at'],
|
|
None, # updated_at
|
|
last_used,
|
|
warning_sent,
|
|
tags
|
|
))
|
|
|
|
migrated_count += 1
|
|
if migrated_count % 10 == 0:
|
|
self.logger.info(f"Migrated {migrated_count} commands...")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error migrating command {command}: {e}")
|
|
raise
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
self.logger.info(f"Successfully migrated {migrated_count} commands, skipped {skipped_count}")
|
|
|
|
def update_creator_stats(self) -> None:
|
|
"""Update creator statistics after migration"""
|
|
if self.dry_run:
|
|
self.logger.info("[DRY RUN] Would update creator statistics")
|
|
return
|
|
|
|
conn = sqlite3.connect(self.target_db)
|
|
cursor = conn.cursor()
|
|
|
|
# Update creator stats
|
|
cursor.execute("""
|
|
UPDATE custom_command_creators SET
|
|
total_commands = (
|
|
SELECT COUNT(*) FROM custom_commands
|
|
WHERE creator_id = custom_command_creators.id
|
|
),
|
|
active_commands = (
|
|
SELECT COUNT(*) FROM custom_commands
|
|
WHERE creator_id = custom_command_creators.id AND is_active = 1
|
|
)
|
|
""")
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
self.logger.info("Updated creator statistics")
|
|
|
|
def generate_migration_report(self) -> None:
|
|
"""Generate a detailed migration report"""
|
|
if self.dry_run:
|
|
conn_source = sqlite3.connect(self.source_db)
|
|
cursor_source = conn_source.cursor()
|
|
|
|
cursor_source.execute("SELECT COUNT(*) FROM creator")
|
|
source_creators = cursor_source.fetchone()[0]
|
|
|
|
cursor_source.execute("SELECT COUNT(*) FROM command")
|
|
source_commands = cursor_source.fetchone()[0]
|
|
|
|
conn_source.close()
|
|
|
|
self.logger.info(f"""
|
|
=== DRY RUN MIGRATION REPORT ===
|
|
Source Database: {self.source_db}
|
|
Target Database: {self.target_db}
|
|
|
|
Would migrate:
|
|
- {source_creators} creators
|
|
- {source_commands} commands
|
|
|
|
No actual changes made (dry run mode).
|
|
""".strip())
|
|
return
|
|
|
|
# Real migration report
|
|
conn_source = sqlite3.connect(self.source_db)
|
|
conn_target = sqlite3.connect(self.target_db)
|
|
|
|
cursor_source = conn_source.cursor()
|
|
cursor_target = conn_target.cursor()
|
|
|
|
# Source counts
|
|
cursor_source.execute("SELECT COUNT(*) FROM creator")
|
|
source_creators = cursor_source.fetchone()[0]
|
|
|
|
cursor_source.execute("SELECT COUNT(*) FROM command")
|
|
source_commands = cursor_source.fetchone()[0]
|
|
|
|
# Target counts
|
|
cursor_target.execute("SELECT COUNT(*) FROM custom_command_creators")
|
|
target_creators = cursor_target.fetchone()[0]
|
|
|
|
cursor_target.execute("SELECT COUNT(*) FROM custom_commands")
|
|
target_commands = cursor_target.fetchone()[0]
|
|
|
|
# Get sample of migrated data
|
|
cursor_target.execute("""
|
|
SELECT cc.name, cc.content, ccc.username
|
|
FROM custom_commands cc
|
|
JOIN custom_command_creators ccc ON cc.creator_id = ccc.id
|
|
LIMIT 5
|
|
""")
|
|
sample_commands = cursor_target.fetchall()
|
|
|
|
conn_source.close()
|
|
conn_target.close()
|
|
|
|
self.logger.info(f"""
|
|
=== MIGRATION REPORT ===
|
|
Source Database: {self.source_db}
|
|
Target Database: {self.target_db}
|
|
|
|
Migration Results:
|
|
- Source creators: {source_creators} -> Target creators: {target_creators}
|
|
- Source commands: {source_commands} -> Target commands: {target_commands}
|
|
|
|
Sample migrated commands:
|
|
""".strip())
|
|
|
|
for cmd in sample_commands:
|
|
self.logger.info(f" '{cmd[0]}' by {cmd[2]}: {cmd[1][:50]}...")
|
|
|
|
def run_migration(self) -> bool:
|
|
"""Execute the full migration process"""
|
|
self.logger.info(f"Starting custom commands migration {'(DRY RUN)' if self.dry_run else ''}")
|
|
self.logger.info(f"Source: {self.source_db}")
|
|
self.logger.info(f"Target: {self.target_db}")
|
|
|
|
try:
|
|
# Validate databases
|
|
if not self.validate_source_database():
|
|
return False
|
|
|
|
if not self.validate_target_database():
|
|
return False
|
|
|
|
# Load source data
|
|
creators, commands = self.load_source_data()
|
|
|
|
# Migrate creators first
|
|
creator_id_mapping = self.migrate_creators(creators)
|
|
|
|
# Migrate commands
|
|
self.migrate_commands(commands, creator_id_mapping)
|
|
|
|
# Update statistics
|
|
self.update_creator_stats()
|
|
|
|
# Generate report
|
|
self.generate_migration_report()
|
|
|
|
self.logger.info("Migration completed successfully!")
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Migration failed: {e}")
|
|
return False
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Migrate custom commands from old database to new schema')
|
|
parser.add_argument('--source', required=True, help='Path to source database (sba_is_fun.db)')
|
|
parser.add_argument('--target', required=True, help='Path to target database (sba_master.db)')
|
|
parser.add_argument('--dry-run', action='store_true', help='Run in dry-run mode (no actual changes)')
|
|
|
|
args = parser.parse_args()
|
|
|
|
migrator = CustomCommandMigrator(args.source, args.target, args.dry_run)
|
|
success = migrator.run_migration()
|
|
|
|
exit(0 if success else 1)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main() |