major-domo-database/app/routers_v3/custom_commands.py
Cal Corum 57c943e340 CLAUDE: Add custom commands system with migration from legacy database
- Add CustomCommandCreator and CustomCommand models to db_engine.py
- Add comprehensive custom commands API router with full CRUD operations
- Include migration script for transferring 140 commands from sba_is_fun.db
- Add FastAPI integration for /api/v3/custom_commands endpoints
- Implement usage tracking, search, autocomplete, and statistics features
- Add grace period handling for unused commands to prevent deletion
- Include comprehensive documentation for migration process

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-17 16:31:39 -05:00

842 lines
28 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, Query, Response
from typing import List, Optional, Dict, Any
import logging
from datetime import datetime, timedelta
from pydantic import BaseModel, Field
import json
from ..dependencies import oauth2_scheme, valid_token, PRIVATE_IN_SCHEMA
from ..db_engine import db
logger = logging.getLogger('database_api')
router = APIRouter(
prefix='/api/v3/custom_commands',
tags=['custom_commands']
)
# Pydantic Models for API
class CustomCommandCreatorModel(BaseModel):
id: Optional[int] = None
discord_id: int
username: str
display_name: Optional[str] = None
created_at: Optional[str] = None
total_commands: int = 0
active_commands: int = 0
class CustomCommandModel(BaseModel):
id: Optional[int] = None
name: str = Field(..., min_length=2, max_length=32)
content: str = Field(..., min_length=1, max_length=2000)
creator_id: int
created_at: Optional[str] = None
updated_at: Optional[str] = None
last_used: Optional[str] = None
use_count: int = 0
warning_sent: bool = False
is_active: bool = True
tags: Optional[List[str]] = None
class CustomCommandListResponse(BaseModel):
commands: List[Dict[str, Any]]
total_count: int
page: int
page_size: int
total_pages: int
has_more: bool
class CustomCommandStatsResponse(BaseModel):
total_commands: int
active_commands: int
total_creators: int
total_uses: int
most_popular_command: Optional[Dict[str, Any]] = None
most_active_creator: Optional[Dict[str, Any]] = None
recent_commands_count: int = 0
commands_needing_warning: int = 0
commands_eligible_for_deletion: int = 0
# Helper functions
def get_custom_commands_table():
"""Get custom commands from database with basic filtering"""
cursor = db.execute_sql("""
SELECT cc.*, creator.discord_id as creator_discord_id,
creator.username as creator_username,
creator.display_name as creator_display_name
FROM custom_commands cc
LEFT JOIN custom_command_creators creator ON cc.creator_id = creator.id
WHERE 1=1
""")
results = cursor.fetchall()
if results:
columns = [desc[0] for desc in cursor.description]
return [dict(zip(columns, row)) for row in results]
return []
def get_custom_command_by_id(command_id: int):
"""Get a single custom command by ID"""
cursor = db.execute_sql("""
SELECT cc.*, creator.discord_id as creator_discord_id,
creator.username as creator_username,
creator.display_name as creator_display_name
FROM custom_commands cc
LEFT JOIN custom_command_creators creator ON cc.creator_id = creator.id
WHERE cc.id = ?
""", (command_id,))
result = cursor.fetchone()
if result:
columns = [desc[0] for desc in cursor.description]
return dict(zip(columns, result))
return None
def get_custom_command_by_name(name: str):
"""Get a single custom command by name"""
cursor = db.execute_sql("""
SELECT cc.*, creator.discord_id as creator_discord_id,
creator.username as creator_username,
creator.display_name as creator_display_name
FROM custom_commands cc
LEFT JOIN custom_command_creators creator ON cc.creator_id = creator.id
WHERE LOWER(cc.name) = LOWER(?)
""", (name,))
result = cursor.fetchone()
if result:
columns = [desc[0] for desc in cursor.description]
return dict(zip(columns, result))
return None
def create_custom_command(command_data: Dict[str, Any]) -> int:
"""Create a new custom command"""
now = datetime.now().isoformat()
result = db.execute_sql("""
INSERT INTO custom_commands
(name, content, creator_id, created_at, last_used, use_count, warning_sent, is_active, tags)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
command_data['name'],
command_data['content'],
command_data['creator_id'],
now,
now,
0,
False,
True,
json.dumps(command_data.get('tags', []))
))
return db.execute_sql("SELECT last_insert_rowid()").fetchone()[0]
def update_custom_command(command_id: int, update_data: Dict[str, Any]) -> bool:
"""Update an existing custom command"""
set_clauses = []
params = []
for field, value in update_data.items():
if field == 'tags':
set_clauses.append(f"{field} = ?")
params.append(json.dumps(value))
else:
set_clauses.append(f"{field} = ?")
params.append(value)
if not set_clauses:
return False
params.append(command_id)
sql = f"UPDATE custom_commands SET {', '.join(set_clauses)} WHERE id = ?"
db.execute_sql(sql, params)
return True
def delete_custom_command(command_id: int) -> bool:
"""Delete a custom command"""
db.execute_sql("DELETE FROM custom_commands WHERE id = ?", (command_id,))
return True
def get_creator_by_discord_id(discord_id: int):
"""Get creator by Discord ID"""
result = db.execute_sql("""
SELECT * FROM custom_command_creators WHERE discord_id = ?
""", (discord_id,)).fetchone()
return result
def create_creator(creator_data: Dict[str, Any]) -> int:
"""Create a new command creator"""
now = datetime.now().isoformat()
result = db.execute_sql("""
INSERT INTO custom_command_creators
(discord_id, username, display_name, created_at, total_commands, active_commands)
VALUES (?, ?, ?, ?, ?, ?)
""", (
creator_data['discord_id'],
creator_data['username'],
creator_data.get('display_name'),
now,
0,
0
))
return db.execute_sql("SELECT last_insert_rowid()").fetchone()[0]
def update_creator_stats(creator_id: int):
"""Update creator command counts"""
total = db.execute_sql("""
SELECT COUNT(*) FROM custom_commands WHERE creator_id = ?
""", (creator_id,)).fetchone()[0]
active = db.execute_sql("""
SELECT COUNT(*) FROM custom_commands WHERE creator_id = ? AND is_active = 1
""", (creator_id,)).fetchone()[0]
db.execute_sql("""
UPDATE custom_command_creators
SET total_commands = ?, active_commands = ?
WHERE id = ?
""", (total, active, creator_id))
# API Endpoints
@router.get('')
async def get_custom_commands(
name: Optional[str] = None,
creator_discord_id: Optional[int] = None,
min_uses: Optional[int] = None,
max_days_unused: Optional[int] = None,
is_active: Optional[bool] = True,
sort: Optional[str] = 'name',
page: int = Query(1, ge=1),
page_size: int = Query(25, ge=1, le=100)
):
"""Get custom commands with filtering and pagination"""
try:
# Build WHERE clause
where_conditions = []
params = []
if name is not None:
where_conditions.append("LOWER(cc.name) LIKE LOWER(?)")
params.append(f"%{name}%")
if creator_discord_id is not None:
where_conditions.append("creator.discord_id = ?")
params.append(creator_discord_id)
if min_uses is not None:
where_conditions.append("cc.use_count >= ?")
params.append(min_uses)
if max_days_unused is not None:
cutoff_date = (datetime.now() - timedelta(days=max_days_unused)).isoformat()
where_conditions.append("cc.last_used >= ?")
params.append(cutoff_date)
if is_active is not None:
where_conditions.append("cc.is_active = ?")
params.append(1 if is_active else 0)
where_clause = "WHERE " + " AND ".join(where_conditions) if where_conditions else ""
# Build ORDER BY clause
sort_mapping = {
'name': 'cc.name',
'created_at': 'cc.created_at',
'last_used': 'cc.last_used',
'use_count': 'cc.use_count',
'creator': 'creator.username'
}
if sort.startswith('-'):
order_direction = 'DESC'
sort_field = sort[1:]
else:
order_direction = 'ASC'
sort_field = sort
order_by = sort_mapping.get(sort_field, 'cc.name')
order_clause = f"ORDER BY {order_by} {order_direction}"
# Get total count
count_sql = f"""
SELECT COUNT(*)
FROM custom_commands cc
LEFT JOIN custom_command_creators creator ON cc.creator_id = creator.id
{where_clause}
"""
total_count = db.execute_sql(count_sql, params).fetchone()[0]
# Calculate pagination
offset = (page - 1) * page_size
total_pages = (total_count + page_size - 1) // page_size
# Get commands
sql = f"""
SELECT cc.*, creator.discord_id as creator_discord_id,
creator.username as creator_username,
creator.display_name as creator_display_name
FROM custom_commands cc
LEFT JOIN custom_command_creators creator ON cc.creator_id = creator.id
{where_clause}
{order_clause}
LIMIT ? OFFSET ?
"""
params.extend([page_size, offset])
cursor3 = db.execute_sql(sql, params)
results = cursor3.fetchall()
# Convert to dict format
commands = []
if results:
columns3 = [desc[0] for desc in cursor3.description]
for row in results:
command_dict = dict(zip(columns3, row))
# Parse tags if they exist
if command_dict.get('tags'):
try:
command_dict['tags'] = json.loads(command_dict['tags'])
except:
command_dict['tags'] = []
# Add creator info
command_dict['creator'] = {
'discord_id': command_dict.pop('creator_discord_id'),
'username': command_dict.pop('creator_username'),
'display_name': command_dict.pop('creator_display_name')
}
commands.append(command_dict)
return CustomCommandListResponse(
commands=commands,
total_count=total_count,
page=page,
page_size=page_size,
total_pages=total_pages,
has_more=page < total_pages
)
except Exception as e:
logger.error(f"Error getting custom commands: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
db.close()
# Move this route to after the specific string routes
@router.post('', include_in_schema=PRIVATE_IN_SCHEMA)
async def create_custom_command_endpoint(
command: CustomCommandModel,
token: str = Depends(oauth2_scheme)
):
"""Create a new custom command"""
if not valid_token(token):
logger.warning(f'create_custom_command - Bad Token: {token}')
raise HTTPException(status_code=401, detail='Unauthorized')
try:
# Check if command name already exists
existing = get_custom_command_by_name(command.name)
if existing:
raise HTTPException(status_code=409, detail=f"Command '{command.name}' already exists")
# Create the command
command_data = command.model_dump(exclude={'id'})
command_id = create_custom_command(command_data)
# Update creator stats
update_creator_stats(command.creator_id)
# Return the created command
result = get_custom_command_by_id(command_id)
command_dict = dict(result)
if command_dict.get('tags'):
try:
command_dict['tags'] = json.loads(command_dict['tags'])
except:
command_dict['tags'] = []
command_dict['creator'] = {
'discord_id': command_dict.pop('creator_discord_id'),
'username': command_dict.pop('creator_username'),
'display_name': command_dict.pop('creator_display_name')
}
return command_dict
except HTTPException:
raise
except Exception as e:
logger.error(f"Error creating custom command: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
db.close()
@router.put('/{command_id}', include_in_schema=PRIVATE_IN_SCHEMA)
async def update_custom_command_endpoint(
command_id: int,
command: CustomCommandModel,
token: str = Depends(oauth2_scheme)
):
"""Update an existing custom command"""
if not valid_token(token):
logger.warning(f'update_custom_command - Bad Token: {token}')
raise HTTPException(status_code=401, detail='Unauthorized')
try:
# Check if command exists
existing = get_custom_command_by_id(command_id)
if not existing:
raise HTTPException(status_code=404, detail=f"Custom command {command_id} not found")
# Update the command
update_data = command.model_dump(exclude={'id'})
update_data['updated_at'] = datetime.now().isoformat()
update_custom_command(command_id, update_data)
# Return updated command
result = get_custom_command_by_id(command_id)
command_dict = dict(result)
if command_dict.get('tags'):
try:
command_dict['tags'] = json.loads(command_dict['tags'])
except:
command_dict['tags'] = []
command_dict['creator'] = {
'discord_id': command_dict.pop('creator_discord_id'),
'username': command_dict.pop('creator_username'),
'display_name': command_dict.pop('creator_display_name')
}
return command_dict
except HTTPException:
raise
except Exception as e:
logger.error(f"Error updating custom command {command_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
db.close()
@router.patch('/{command_id}', include_in_schema=PRIVATE_IN_SCHEMA)
async def patch_custom_command(
command_id: int,
token: str = Depends(oauth2_scheme),
content: Optional[str] = None,
tags: Optional[List[str]] = None,
use_count: Optional[int] = None,
last_used: Optional[str] = None,
warning_sent: Optional[bool] = None,
is_active: Optional[bool] = None
):
"""Partially update a custom command"""
if not valid_token(token):
logger.warning(f'patch_custom_command - Bad Token: {token}')
raise HTTPException(status_code=401, detail='Unauthorized')
try:
# Check if command exists
existing = get_custom_command_by_id(command_id)
if not existing:
raise HTTPException(status_code=404, detail=f"Custom command {command_id} not found")
# Build update data
update_data = {}
if content is not None:
update_data['content'] = content
update_data['updated_at'] = datetime.now().isoformat()
if tags is not None:
update_data['tags'] = tags
if use_count is not None:
update_data['use_count'] = use_count
if last_used is not None:
update_data['last_used'] = last_used
if warning_sent is not None:
update_data['warning_sent'] = warning_sent
if is_active is not None:
update_data['is_active'] = is_active
if not update_data:
raise HTTPException(status_code=400, detail="No fields to update")
# Update the command
update_custom_command(command_id, update_data)
# Return updated command
result = get_custom_command_by_id(command_id)
command_dict = dict(result)
if command_dict.get('tags'):
try:
command_dict['tags'] = json.loads(command_dict['tags'])
except:
command_dict['tags'] = []
command_dict['creator'] = {
'discord_id': command_dict.pop('creator_discord_id'),
'username': command_dict.pop('creator_username'),
'display_name': command_dict.pop('creator_display_name')
}
return command_dict
except HTTPException:
raise
except Exception as e:
logger.error(f"Error patching custom command {command_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
db.close()
@router.delete('/{command_id}', include_in_schema=PRIVATE_IN_SCHEMA)
async def delete_custom_command_endpoint(
command_id: int,
token: str = Depends(oauth2_scheme)
):
"""Delete a custom command"""
if not valid_token(token):
logger.warning(f'delete_custom_command - Bad Token: {token}')
raise HTTPException(status_code=401, detail='Unauthorized')
try:
# Check if command exists
existing = get_custom_command_by_id(command_id)
if not existing:
raise HTTPException(status_code=404, detail=f"Custom command {command_id} not found")
creator_id = existing['creator_id']
# Delete the command
delete_custom_command(command_id)
# Update creator stats
update_creator_stats(creator_id)
return {"message": f"Custom command {command_id} deleted successfully"}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error deleting custom command {command_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
db.close()
# Creator endpoints
@router.post('/creators', include_in_schema=PRIVATE_IN_SCHEMA)
async def create_creator_endpoint(
creator: CustomCommandCreatorModel,
token: str = Depends(oauth2_scheme)
):
"""Create a new command creator"""
if not valid_token(token):
logger.warning(f'create_creator - Bad Token: {token}')
raise HTTPException(status_code=401, detail='Unauthorized')
try:
# Check if creator already exists
existing = get_creator_by_discord_id(creator.discord_id)
if existing:
raise HTTPException(status_code=409, detail=f"Creator with Discord ID {creator.discord_id} already exists")
# Create the creator
creator_data = creator.model_dump(exclude={'id'})
creator_id = create_creator(creator_data)
# Return the created creator
cursor = db.execute_sql("SELECT * FROM custom_command_creators WHERE id = ?", (creator_id,))
result = cursor.fetchone()
if result:
columns = [desc[0] for desc in cursor.description]
return dict(zip(columns, result))
else:
raise HTTPException(status_code=500, detail="Failed to retrieve created creator")
except HTTPException:
raise
except Exception as e:
logger.error(f"Error creating creator: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
db.close()
@router.get('/stats')
async def get_custom_command_stats():
"""Get comprehensive statistics about custom commands"""
try:
# Get basic counts
total_commands = db.execute_sql("SELECT COUNT(*) FROM custom_commands").fetchone()[0]
active_commands = db.execute_sql("SELECT COUNT(*) FROM custom_commands WHERE is_active = 1").fetchone()[0]
total_creators = db.execute_sql("SELECT COUNT(*) FROM custom_command_creators").fetchone()[0]
# Get total uses
total_uses_result = db.execute_sql("SELECT SUM(use_count) FROM custom_commands WHERE is_active = 1").fetchone()
total_uses = total_uses_result[0] if total_uses_result[0] else 0
# Get most popular command
cursor = db.execute_sql("""
SELECT cc.*, creator.discord_id as creator_discord_id,
creator.username as creator_username,
creator.display_name as creator_display_name
FROM custom_commands cc
LEFT JOIN custom_command_creators creator ON cc.creator_id = creator.id
WHERE cc.is_active = 1
ORDER BY cc.use_count DESC
LIMIT 1
""")
most_popular_result = cursor.fetchone()
most_popular_command = None
if most_popular_result:
columns = [desc[0] for desc in cursor.description]
command_dict = dict(zip(columns, most_popular_result))
if command_dict.get('tags'):
try:
command_dict['tags'] = json.loads(command_dict['tags'])
except:
command_dict['tags'] = []
command_dict['creator'] = {
'discord_id': command_dict.pop('creator_discord_id'),
'username': command_dict.pop('creator_username'),
'display_name': command_dict.pop('creator_display_name')
}
most_popular_command = command_dict
# Get most active creator
cursor2 = db.execute_sql("""
SELECT * FROM custom_command_creators
ORDER BY active_commands DESC
LIMIT 1
""")
most_active_creator_result = cursor2.fetchone()
most_active_creator = None
if most_active_creator_result:
columns2 = [desc[0] for desc in cursor2.description]
most_active_creator = dict(zip(columns2, most_active_creator_result))
# Get recent commands count (last 7 days)
week_ago = (datetime.now() - timedelta(days=7)).isoformat()
recent_count = db.execute_sql("""
SELECT COUNT(*) FROM custom_commands
WHERE created_at >= ? AND is_active = 1
""", (week_ago,)).fetchone()[0]
# Get cleanup stats
sixty_days_ago = (datetime.now() - timedelta(days=60)).isoformat()
ninety_days_ago = (datetime.now() - timedelta(days=90)).isoformat()
warning_count = db.execute_sql("""
SELECT COUNT(*) FROM custom_commands
WHERE last_used < ? AND warning_sent = 0 AND is_active = 1
""", (sixty_days_ago,)).fetchone()[0]
deletion_count = db.execute_sql("""
SELECT COUNT(*) FROM custom_commands
WHERE last_used < ? AND is_active = 1
""", (ninety_days_ago,)).fetchone()[0]
return CustomCommandStatsResponse(
total_commands=total_commands,
active_commands=active_commands,
total_creators=total_creators,
total_uses=total_uses,
most_popular_command=most_popular_command,
most_active_creator=most_active_creator,
recent_commands_count=recent_count,
commands_needing_warning=warning_count,
commands_eligible_for_deletion=deletion_count
)
except Exception as e:
logger.error(f"Error getting custom command stats: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
db.close()
# Special endpoints for Discord bot integration
@router.get('/by_name/{command_name}')
async def get_custom_command_by_name_endpoint(command_name: str):
"""Get a custom command by name (for Discord bot execution)"""
try:
result = get_custom_command_by_name(command_name)
if not result:
raise HTTPException(status_code=404, detail=f"Custom command '{command_name}' not found")
command_dict = dict(result)
# Parse tags
if command_dict.get('tags'):
try:
command_dict['tags'] = json.loads(command_dict['tags'])
except:
command_dict['tags'] = []
# Add creator info
command_dict['creator'] = {
'discord_id': command_dict.pop('creator_discord_id'),
'username': command_dict.pop('creator_username'),
'display_name': command_dict.pop('creator_display_name')
}
return command_dict
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting custom command by name '{command_name}': {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
db.close()
@router.patch('/by_name/{command_name}/execute', include_in_schema=PRIVATE_IN_SCHEMA)
async def execute_custom_command(
command_name: str,
token: str = Depends(oauth2_scheme)
):
"""Execute a custom command and update usage statistics"""
if not valid_token(token):
logger.warning(f'execute_custom_command - Bad Token: {token}')
raise HTTPException(status_code=401, detail='Unauthorized')
try:
result = get_custom_command_by_name(command_name)
if not result:
raise HTTPException(status_code=404, detail=f"Custom command '{command_name}' not found")
command_dict = dict(result)
command_id = command_dict['id']
# Update usage statistics
update_data = {
'last_used': datetime.now().isoformat(),
'use_count': command_dict['use_count'] + 1,
'warning_sent': False # Reset warning on use
}
update_custom_command(command_id, update_data)
# Return updated command
updated_result = get_custom_command_by_id(command_id)
updated_dict = dict(updated_result)
if updated_dict.get('tags'):
try:
updated_dict['tags'] = json.loads(updated_dict['tags'])
except:
updated_dict['tags'] = []
updated_dict['creator'] = {
'discord_id': updated_dict.pop('creator_discord_id'),
'username': updated_dict.pop('creator_username'),
'display_name': updated_dict.pop('creator_display_name')
}
return updated_dict
except HTTPException:
raise
except Exception as e:
logger.error(f"Error executing custom command '{command_name}': {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
db.close()
@router.get('/autocomplete')
async def get_command_names_for_autocomplete(
partial_name: str = "",
limit: int = Query(25, ge=1, le=100)
):
"""Get command names for Discord autocomplete"""
try:
if partial_name:
results = db.execute_sql("""
SELECT name FROM custom_commands
WHERE is_active = 1 AND LOWER(name) LIKE LOWER(?)
ORDER BY name
LIMIT ?
""", (f"%{partial_name}%", limit)).fetchall()
else:
results = db.execute_sql("""
SELECT name FROM custom_commands
WHERE is_active = 1
ORDER BY name
LIMIT ?
""", (limit,)).fetchall()
return [row[0] for row in results]
except Exception as e:
logger.error(f"Error getting command names for autocomplete: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
db.close()
@router.get('/{command_id}')
async def get_custom_command(command_id: int):
"""Get a single custom command by ID"""
try:
result = get_custom_command_by_id(command_id)
if not result:
raise HTTPException(status_code=404, detail=f"Custom command {command_id} not found")
command_dict = dict(result)
# Parse tags
if command_dict.get('tags'):
try:
command_dict['tags'] = json.loads(command_dict['tags'])
except:
command_dict['tags'] = []
# Add creator info
command_dict['creator'] = {
'discord_id': command_dict.pop('creator_discord_id'),
'username': command_dict.pop('creator_username'),
'display_name': command_dict.pop('creator_display_name')
}
return command_dict
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting custom command {command_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
db.close()