Added custom_commands endpoint

This commit is contained in:
Cal Corum 2025-08-18 16:27:39 -05:00
parent 57c943e340
commit 27369a92fb
2 changed files with 637 additions and 37 deletions

View File

@ -18,11 +18,11 @@ router = APIRouter(
# Pydantic Models for API
class CustomCommandCreatorModel(BaseModel):
id: Optional[int] = None
id: int
discord_id: int
username: str
display_name: Optional[str] = None
created_at: Optional[str] = None
created_at: str
total_commands: int = 0
active_commands: int = 0
@ -32,6 +32,7 @@ class CustomCommandModel(BaseModel):
name: str = Field(..., min_length=2, max_length=32)
content: str = Field(..., min_length=1, max_length=2000)
creator_id: int
creator: Optional[CustomCommandCreatorModel] = None
created_at: Optional[str] = None
updated_at: Optional[str] = None
last_used: Optional[str] = None
@ -42,7 +43,7 @@ class CustomCommandModel(BaseModel):
class CustomCommandListResponse(BaseModel):
commands: List[Dict[str, Any]]
custom_commands: List[CustomCommandModel]
total_count: int
page: int
page_size: int
@ -66,9 +67,9 @@ class CustomCommandStatsResponse(BaseModel):
def get_custom_commands_table():
"""Get custom commands from database with basic filtering"""
cursor = db.execute_sql("""
SELECT cc.*, creator.discord_id as creator_discord_id,
creator.username as creator_username,
creator.display_name as creator_display_name
SELECT cc.*, creator.id as creator_db_id, creator.discord_id as creator_discord_id,
creator.username as creator_username, creator.display_name as creator_display_name,
creator.created_at as creator_created_at, creator.total_commands, creator.active_commands
FROM custom_commands cc
LEFT JOIN custom_command_creators creator ON cc.creator_id = creator.id
WHERE 1=1
@ -84,9 +85,9 @@ def get_custom_commands_table():
def get_custom_command_by_id(command_id: int):
"""Get a single custom command by ID"""
cursor = db.execute_sql("""
SELECT cc.*, creator.discord_id as creator_discord_id,
creator.username as creator_username,
creator.display_name as creator_display_name
SELECT cc.*, creator.id as creator_db_id, creator.discord_id as creator_discord_id,
creator.username as creator_username, creator.display_name as creator_display_name,
creator.created_at as creator_created_at, creator.total_commands, creator.active_commands
FROM custom_commands cc
LEFT JOIN custom_command_creators creator ON cc.creator_id = creator.id
WHERE cc.id = ?
@ -102,9 +103,9 @@ def get_custom_command_by_id(command_id: int):
def get_custom_command_by_name(name: str):
"""Get a single custom command by name"""
cursor = db.execute_sql("""
SELECT cc.*, creator.discord_id as creator_discord_id,
creator.username as creator_username,
creator.display_name as creator_display_name
SELECT cc.*, creator.id as creator_db_id, creator.discord_id as creator_discord_id,
creator.username as creator_username, creator.display_name as creator_display_name,
creator.created_at as creator_created_at, creator.total_commands, creator.active_commands
FROM custom_commands cc
LEFT JOIN custom_command_creators creator ON cc.creator_id = creator.id
WHERE LOWER(cc.name) = LOWER(?)
@ -304,12 +305,13 @@ async def get_custom_commands(
cursor3 = db.execute_sql(sql, params)
results = cursor3.fetchall()
# Convert to dict format
# Convert to CustomCommandModel objects with creator info
commands = []
if results:
columns3 = [desc[0] for desc in cursor3.description]
for row in results:
command_dict = dict(zip(columns3, row))
# Parse tags if they exist
if command_dict.get('tags'):
try:
@ -317,16 +319,41 @@ async def get_custom_commands(
except:
command_dict['tags'] = []
# Add creator info
command_dict['creator'] = {
'discord_id': command_dict.pop('creator_discord_id'),
'username': command_dict.pop('creator_username'),
'display_name': command_dict.pop('creator_display_name')
}
commands.append(command_dict)
# Get full creator information
creator_id = command_dict['creator_id']
creator_cursor = db.execute_sql("SELECT * FROM custom_command_creators WHERE id = ?", (creator_id,))
creator_result = creator_cursor.fetchone()
if creator_result:
# Create complete creator object
creator_columns = [desc[0] for desc in creator_cursor.description]
creator_dict = dict(zip(creator_columns, creator_result))
try:
creator_model = CustomCommandCreatorModel(**creator_dict)
command_dict['creator'] = creator_model
except Exception as e:
logger.error(f"Error creating CustomCommandCreatorModel: {e}, data: {creator_dict}")
command_dict['creator'] = None
else:
# No creator found, set to None
command_dict['creator'] = None
# Remove the individual creator fields now that we have the creator object
command_dict.pop('creator_discord_id', None)
command_dict.pop('creator_username', None)
command_dict.pop('creator_display_name', None)
# Create CustomCommandModel instance
try:
command_model = CustomCommandModel(**command_dict)
commands.append(command_model)
except Exception as e:
logger.error(f"Error creating CustomCommandModel: {e}, data: {command_dict}")
# Skip invalid commands rather than failing the entire request
continue
return CustomCommandListResponse(
commands=commands,
custom_commands=commands,
total_count=total_count,
page=page,
page_size=page_size,
@ -378,9 +405,13 @@ async def create_custom_command_endpoint(
command_dict['tags'] = []
command_dict['creator'] = {
'id': command_dict.pop('creator_db_id'),
'discord_id': command_dict.pop('creator_discord_id'),
'username': command_dict.pop('creator_username'),
'display_name': command_dict.pop('creator_display_name')
'display_name': command_dict.pop('creator_display_name'),
'created_at': command_dict.pop('creator_created_at'),
'total_commands': command_dict.pop('total_commands'),
'active_commands': command_dict.pop('active_commands')
}
return command_dict
@ -412,7 +443,7 @@ async def update_custom_command_endpoint(
raise HTTPException(status_code=404, detail=f"Custom command {command_id} not found")
# Update the command
update_data = command.model_dump(exclude={'id'})
update_data = command.model_dump(exclude={'id', 'creator'})
update_data['updated_at'] = datetime.now().isoformat()
update_custom_command(command_id, update_data)
@ -427,9 +458,13 @@ async def update_custom_command_endpoint(
command_dict['tags'] = []
command_dict['creator'] = {
'id': command_dict.pop('creator_db_id'),
'discord_id': command_dict.pop('creator_discord_id'),
'username': command_dict.pop('creator_username'),
'display_name': command_dict.pop('creator_display_name')
'display_name': command_dict.pop('creator_display_name'),
'created_at': command_dict.pop('creator_created_at'),
'total_commands': command_dict.pop('total_commands'),
'active_commands': command_dict.pop('active_commands')
}
return command_dict
@ -498,9 +533,13 @@ async def patch_custom_command(
command_dict['tags'] = []
command_dict['creator'] = {
'id': command_dict.pop('creator_db_id'),
'discord_id': command_dict.pop('creator_discord_id'),
'username': command_dict.pop('creator_username'),
'display_name': command_dict.pop('creator_display_name')
'display_name': command_dict.pop('creator_display_name'),
'created_at': command_dict.pop('creator_created_at'),
'total_commands': command_dict.pop('total_commands'),
'active_commands': command_dict.pop('active_commands')
}
return command_dict
@ -550,6 +589,68 @@ async def delete_custom_command_endpoint(
# Creator endpoints
@router.get('/creators')
async def get_creators(
discord_id: Optional[int] = None,
page: int = Query(1, ge=1),
page_size: int = Query(25, ge=1, le=100)
):
"""Get custom command creators with optional filtering"""
try:
# Build WHERE clause
where_conditions = []
params = []
if discord_id is not None:
where_conditions.append("discord_id = ?")
params.append(discord_id)
where_clause = "WHERE " + " AND ".join(where_conditions) if where_conditions else ""
# Get total count
count_sql = f"SELECT COUNT(*) FROM custom_command_creators {where_clause}"
total_count = db.execute_sql(count_sql, params).fetchone()[0]
# Calculate pagination
offset = (page - 1) * page_size
total_pages = (total_count + page_size - 1) // page_size
# Get creators
sql = f"""
SELECT * FROM custom_command_creators
{where_clause}
ORDER BY username
LIMIT ? OFFSET ?
"""
params.extend([page_size, offset])
cursor = db.execute_sql(sql, params)
results = cursor.fetchall()
# Convert to dict format
creators = []
if results:
columns = [desc[0] for desc in cursor.description]
for row in results:
creator_dict = dict(zip(columns, row))
creators.append(creator_dict)
return {
'creators': creators,
'total_count': total_count,
'page': page,
'page_size': page_size,
'total_pages': total_pages,
'has_more': page < total_pages
}
except Exception as e:
logger.error(f"Error getting creators: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
db.close()
@router.post('/creators', include_in_schema=PRIVATE_IN_SCHEMA)
async def create_creator_endpoint(
creator: CustomCommandCreatorModel,
@ -702,12 +803,30 @@ async def get_custom_command_by_name_endpoint(command_name: str):
except:
command_dict['tags'] = []
# Add creator info
command_dict['creator'] = {
'discord_id': command_dict.pop('creator_discord_id'),
'username': command_dict.pop('creator_username'),
'display_name': command_dict.pop('creator_display_name')
}
# Add creator info - get full creator record
creator_id = command_dict['creator_id']
creator_cursor = db.execute_sql("SELECT * FROM custom_command_creators WHERE id = ?", (creator_id,))
creator_result = creator_cursor.fetchone()
if creator_result:
creator_columns = [desc[0] for desc in creator_cursor.description]
creator_dict = dict(zip(creator_columns, creator_result))
command_dict['creator'] = creator_dict
else:
# Fallback to basic info if full creator not found
command_dict['creator'] = {
'id': creator_id,
'discord_id': command_dict.pop('creator_discord_id'),
'username': command_dict.pop('creator_username'),
'display_name': command_dict.pop('creator_display_name'),
'created_at': command_dict['created_at'], # Use command creation as fallback
'total_commands': 0,
'active_commands': 0
}
# Remove the duplicate fields
command_dict.pop('creator_discord_id', None)
command_dict.pop('creator_username', None)
command_dict.pop('creator_display_name', None)
return command_dict
@ -758,11 +877,30 @@ async def execute_custom_command(
except:
updated_dict['tags'] = []
updated_dict['creator'] = {
'discord_id': updated_dict.pop('creator_discord_id'),
'username': updated_dict.pop('creator_username'),
'display_name': updated_dict.pop('creator_display_name')
}
# Add creator info - get full creator record
creator_id = updated_dict['creator_id']
creator_cursor = db.execute_sql("SELECT * FROM custom_command_creators WHERE id = ?", (creator_id,))
creator_result = creator_cursor.fetchone()
if creator_result:
creator_columns = [desc[0] for desc in creator_cursor.description]
creator_dict = dict(zip(creator_columns, creator_result))
updated_dict['creator'] = creator_dict
else:
# Fallback to basic info if full creator not found
updated_dict['creator'] = {
'id': creator_id,
'discord_id': updated_dict.pop('creator_discord_id'),
'username': updated_dict.pop('creator_username'),
'display_name': updated_dict.pop('creator_display_name'),
'created_at': updated_dict['created_at'], # Use command creation as fallback
'total_commands': 0,
'active_commands': 0
}
# Remove the duplicate fields
updated_dict.pop('creator_discord_id', None)
updated_dict.pop('creator_username', None)
updated_dict.pop('creator_display_name', None)
return updated_dict
@ -826,9 +964,13 @@ async def get_custom_command(command_id: int):
# Add creator info
command_dict['creator'] = {
'id': command_dict.pop('creator_db_id'),
'discord_id': command_dict.pop('creator_discord_id'),
'username': command_dict.pop('creator_username'),
'display_name': command_dict.pop('creator_display_name')
'display_name': command_dict.pop('creator_display_name'),
'created_at': command_dict.pop('creator_created_at'),
'total_commands': command_dict.pop('total_commands'),
'active_commands': command_dict.pop('active_commands')
}
return command_dict

View File

@ -0,0 +1,458 @@
#!/usr/bin/env python3
"""
Migration script to transfer custom commands from old database to new schema.
This script:
1. Reads existing commands and creators from sba_is_fun.db
2. Maps the old schema to the new custom_commands schema
3. Migrates all data preserving relationships and metadata
4. Provides detailed logging and validation
Usage:
python migrate_custom_commands.py --source /path/to/sba_is_fun.db --target /path/to/sba_master.db [--dry-run]
"""
import sqlite3
import json
import logging
import argparse
from datetime import datetime
from typing import Dict, List, Tuple, Optional
class CustomCommandMigrator:
def __init__(self, source_db: str, target_db: str, dry_run: bool = False):
self.source_db = source_db
self.target_db = target_db
self.dry_run = dry_run
self.setup_logging()
def setup_logging(self):
"""Setup logging configuration"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(f'migration_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger('migrate_custom_commands.CustomCommandMigrator')
def validate_source_database(self) -> bool:
"""Validate that source database has expected tables and structure"""
try:
conn = sqlite3.connect(self.source_db)
cursor = conn.cursor()
# Check for required tables
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name IN ('command', 'creator');")
tables = [row[0] for row in cursor.fetchall()]
if 'command' not in tables or 'creator' not in tables:
self.logger.error(f"Required tables missing. Found: {tables}")
return False
# Check command table structure
cursor.execute("PRAGMA table_info(command);")
command_cols = [row[1] for row in cursor.fetchall()]
required_command_cols = ['id', 'name', 'message', 'creator_id', 'createtime']
for col in required_command_cols:
if col not in command_cols:
self.logger.error(f"Required column '{col}' missing from command table")
return False
# Check creator table structure
cursor.execute("PRAGMA table_info(creator);")
creator_cols = [row[1] for row in cursor.fetchall()]
required_creator_cols = ['id', 'name', 'discordid']
for col in required_creator_cols:
if col not in creator_cols:
self.logger.error(f"Required column '{col}' missing from creator table")
return False
conn.close()
self.logger.info("Source database validation passed")
return True
except Exception as e:
self.logger.error(f"Error validating source database: {e}")
return False
def validate_target_database(self) -> bool:
"""Validate that target database has the new custom_commands tables"""
try:
conn = sqlite3.connect(self.target_db)
cursor = conn.cursor()
# Check for required tables
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name IN ('custom_commands', 'custom_command_creators');")
tables = [row[0] for row in cursor.fetchall()]
if 'custom_commands' not in tables or 'custom_command_creators' not in tables:
self.logger.error(f"Target tables missing. Found: {tables}")
self.logger.error("Please ensure the new custom_commands schema has been created first")
return False
conn.close()
self.logger.info("Target database validation passed")
return True
except Exception as e:
self.logger.error(f"Error validating target database: {e}")
return False
def load_source_data(self) -> Tuple[List[Dict], List[Dict]]:
"""Load creators and commands from source database"""
conn = sqlite3.connect(self.source_db)
cursor = conn.cursor()
# Load creators
self.logger.info("Loading creators from source database...")
cursor.execute("SELECT id, name, discordid FROM creator ORDER BY id;")
creators_raw = cursor.fetchall()
creators = []
for row in creators_raw:
creators.append({
'old_id': row[0],
'name': row[1],
'discord_id': row[2]
})
self.logger.info(f"Loaded {len(creators)} creators")
# Load commands
self.logger.info("Loading commands from source database...")
cursor.execute("""
SELECT c.id, c.name, c.message, c.creator_id, c.createtime, c.last_used, c.sent_warns,
cr.name as creator_name, cr.discordid as creator_discord_id
FROM command c
LEFT JOIN creator cr ON c.creator_id = cr.id
ORDER BY c.id;
""")
commands_raw = cursor.fetchall()
commands = []
for row in commands_raw:
# Parse last_used datetime
last_used = None
if row[5]: # last_used
try:
last_used = datetime.fromisoformat(row[5]).isoformat()
except:
last_used = row[5] # Keep original if parsing fails
# Parse createtime
created_at = None
if row[4]: # createtime
try:
created_at = datetime.fromisoformat(row[4]).isoformat()
except:
created_at = row[4] # Keep original if parsing fails
commands.append({
'old_id': row[0],
'name': row[1],
'content': row[2], # message -> content
'old_creator_id': row[3],
'created_at': created_at,
'last_used': last_used,
'sent_warns': row[6],
'creator_name': row[7],
'creator_discord_id': row[8]
})
self.logger.info(f"Loaded {len(commands)} commands")
conn.close()
return creators, commands
def migrate_creators(self, creators: List[Dict]) -> Dict[int, int]:
"""Migrate creators and return mapping of old_id -> new_id"""
if self.dry_run:
self.logger.info(f"[DRY RUN] Would migrate {len(creators)} creators")
return {creator['old_id']: creator['old_id'] for creator in creators} # Mock mapping
conn = sqlite3.connect(self.target_db)
cursor = conn.cursor()
creator_id_mapping = {}
now = datetime.now().isoformat()
for creator in creators:
try:
# Check if creator already exists by discord_id
cursor.execute("SELECT id FROM custom_command_creators WHERE discord_id = ?", (creator['discord_id'],))
existing = cursor.fetchone()
if existing:
creator_id_mapping[creator['old_id']] = existing[0]
self.logger.info(f"Creator '{creator['name']}' (Discord: {creator['discord_id']}) already exists with ID {existing[0]}")
continue
# Insert new creator
cursor.execute("""
INSERT INTO custom_command_creators
(discord_id, username, display_name, created_at, total_commands, active_commands)
VALUES (?, ?, ?, ?, 0, 0)
""", (creator['discord_id'], creator['name'], creator['name'], now))
new_id = cursor.lastrowid
creator_id_mapping[creator['old_id']] = new_id
self.logger.info(f"Migrated creator '{creator['name']}': {creator['old_id']} -> {new_id}")
except Exception as e:
self.logger.error(f"Error migrating creator {creator}: {e}")
raise
conn.commit()
conn.close()
self.logger.info(f"Successfully migrated {len(creator_id_mapping)} creators")
return creator_id_mapping
def migrate_commands(self, commands: List[Dict], creator_id_mapping: Dict[int, int]) -> None:
"""Migrate commands using the creator ID mapping"""
if self.dry_run:
self.logger.info(f"[DRY RUN] Would migrate {len(commands)} commands")
return
conn = sqlite3.connect(self.target_db)
cursor = conn.cursor()
migrated_count = 0
skipped_count = 0
for command in commands:
try:
# Map old creator_id to new creator_id
if command['old_creator_id'] not in creator_id_mapping:
self.logger.warning(f"Skipping command '{command['name']}' - creator ID {command['old_creator_id']} not found in mapping")
skipped_count += 1
continue
new_creator_id = creator_id_mapping[command['old_creator_id']]
# Check if command already exists by name
cursor.execute("SELECT id FROM custom_commands WHERE name = ?", (command['name'],))
existing = cursor.fetchone()
if existing:
self.logger.warning(f"Command '{command['name']}' already exists with ID {existing[0]} - skipping")
skipped_count += 1
continue
# Determine if command was warned/inactive based on sent_warns
warning_sent = bool(command['sent_warns'] and command['sent_warns'] != 0)
# For migrated commands, ensure last_used is at least the migration date
# to prevent immediate deletion eligibility
migration_date = datetime.now().isoformat()
last_used = command['last_used']
# If command hasn't been used recently, set last_used to migration date
# to give it a grace period
if last_used:
try:
last_used_dt = datetime.fromisoformat(last_used.replace('Z', '+00:00'))
# If last used more than 60 days ago, update to migration date
if (datetime.now() - last_used_dt).days > 60:
last_used = migration_date
self.logger.info(f"Updated last_used for command '{command['name']}' to migration date")
except:
# If we can't parse the date, use migration date
last_used = migration_date
else:
# If no last_used date, use migration date
last_used = migration_date
# Add migration tag to indicate this is a migrated command
tags = '["migrated"]'
# Insert command
cursor.execute("""
INSERT INTO custom_commands
(name, content, creator_id, created_at, updated_at, last_used, use_count, warning_sent, is_active, tags)
VALUES (?, ?, ?, ?, ?, ?, 0, ?, 1, ?)
""", (
command['name'],
command['content'],
new_creator_id,
command['created_at'],
None, # updated_at
last_used,
warning_sent,
tags
))
migrated_count += 1
if migrated_count % 10 == 0:
self.logger.info(f"Migrated {migrated_count} commands...")
except Exception as e:
self.logger.error(f"Error migrating command {command}: {e}")
raise
conn.commit()
conn.close()
self.logger.info(f"Successfully migrated {migrated_count} commands, skipped {skipped_count}")
def update_creator_stats(self) -> None:
"""Update creator statistics after migration"""
if self.dry_run:
self.logger.info("[DRY RUN] Would update creator statistics")
return
conn = sqlite3.connect(self.target_db)
cursor = conn.cursor()
# Update creator stats
cursor.execute("""
UPDATE custom_command_creators SET
total_commands = (
SELECT COUNT(*) FROM custom_commands
WHERE creator_id = custom_command_creators.id
),
active_commands = (
SELECT COUNT(*) FROM custom_commands
WHERE creator_id = custom_command_creators.id AND is_active = 1
)
""")
conn.commit()
conn.close()
self.logger.info("Updated creator statistics")
def generate_migration_report(self) -> None:
"""Generate a detailed migration report"""
if self.dry_run:
conn_source = sqlite3.connect(self.source_db)
cursor_source = conn_source.cursor()
cursor_source.execute("SELECT COUNT(*) FROM creator")
source_creators = cursor_source.fetchone()[0]
cursor_source.execute("SELECT COUNT(*) FROM command")
source_commands = cursor_source.fetchone()[0]
conn_source.close()
self.logger.info(f"""
=== DRY RUN MIGRATION REPORT ===
Source Database: {self.source_db}
Target Database: {self.target_db}
Would migrate:
- {source_creators} creators
- {source_commands} commands
No actual changes made (dry run mode).
""".strip())
return
# Real migration report
conn_source = sqlite3.connect(self.source_db)
conn_target = sqlite3.connect(self.target_db)
cursor_source = conn_source.cursor()
cursor_target = conn_target.cursor()
# Source counts
cursor_source.execute("SELECT COUNT(*) FROM creator")
source_creators = cursor_source.fetchone()[0]
cursor_source.execute("SELECT COUNT(*) FROM command")
source_commands = cursor_source.fetchone()[0]
# Target counts
cursor_target.execute("SELECT COUNT(*) FROM custom_command_creators")
target_creators = cursor_target.fetchone()[0]
cursor_target.execute("SELECT COUNT(*) FROM custom_commands")
target_commands = cursor_target.fetchone()[0]
# Get sample of migrated data
cursor_target.execute("""
SELECT cc.name, cc.content, ccc.username
FROM custom_commands cc
JOIN custom_command_creators ccc ON cc.creator_id = ccc.id
LIMIT 5
""")
sample_commands = cursor_target.fetchall()
conn_source.close()
conn_target.close()
self.logger.info(f"""
=== MIGRATION REPORT ===
Source Database: {self.source_db}
Target Database: {self.target_db}
Migration Results:
- Source creators: {source_creators} -> Target creators: {target_creators}
- Source commands: {source_commands} -> Target commands: {target_commands}
Sample migrated commands:
""".strip())
for cmd in sample_commands:
self.logger.info(f" '{cmd[0]}' by {cmd[2]}: {cmd[1][:50]}...")
def run_migration(self) -> bool:
"""Execute the full migration process"""
self.logger.info(f"Starting custom commands migration {'(DRY RUN)' if self.dry_run else ''}")
self.logger.info(f"Source: {self.source_db}")
self.logger.info(f"Target: {self.target_db}")
try:
# Validate databases
if not self.validate_source_database():
return False
if not self.validate_target_database():
return False
# Load source data
creators, commands = self.load_source_data()
# Migrate creators first
creator_id_mapping = self.migrate_creators(creators)
# Migrate commands
self.migrate_commands(commands, creator_id_mapping)
# Update statistics
self.update_creator_stats()
# Generate report
self.generate_migration_report()
self.logger.info("Migration completed successfully!")
return True
except Exception as e:
self.logger.error(f"Migration failed: {e}")
return False
def main():
parser = argparse.ArgumentParser(description='Migrate custom commands from old database to new schema')
parser.add_argument('--source', required=True, help='Path to source database (sba_is_fun.db)')
parser.add_argument('--target', required=True, help='Path to target database (sba_master.db)')
parser.add_argument('--dry-run', action='store_true', help='Run in dry-run mode (no actual changes)')
args = parser.parse_args()
migrator = CustomCommandMigrator(args.source, args.target, args.dry_run)
success = migrator.run_migration()
exit(0 if success else 1)
if __name__ == '__main__':
main()