major-domo-database/app/routers_v3/custom_commands.py
Cal Corum 79a559088a CLAUDE: Phase 1 PostgreSQL migration fixes complete
- Fixed 4 critical schema issues blocking migration
- Resolved integer overflow by converting Discord IDs to strings
- Fixed VARCHAR length limits for Google Photos URLs
- Made injury_count field nullable for NULL values
- Successfully migrating 7/30 tables (5,432+ records)

Issues resolved:
- CONSTRAINT-CURRENT-INJURY_COUNT-001: Made nullable
- DATA_QUALITY-PLAYER-NAME-001: Increased VARCHAR limits to 1000
- MIGRATION_LOGIC-TEAM-INTEGER-001: Discord IDs now strings
- MIGRATION_LOGIC-DRAFTDATA-INTEGER-001: Channel IDs now strings

New issues discovered for Phase 2:
- CONSTRAINT-CURRENT-BSTATCOUNT-001: NULL stats count
- CONSTRAINT-TEAM-AUTO_DRAFT-001: NULL auto draft flag

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-18 18:09:45 -05:00

985 lines
35 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, Query, Response
from typing import List, Optional, Dict, Any
import logging
from datetime import datetime, timedelta
from pydantic import BaseModel, Field
import json
from ..dependencies import oauth2_scheme, valid_token, PRIVATE_IN_SCHEMA
from ..db_engine import db
logger = logging.getLogger('database_api')
router = APIRouter(
prefix='/api/v3/custom_commands',
tags=['custom_commands']
)
# Pydantic Models for API
class CustomCommandCreatorModel(BaseModel):
id: int
discord_id: int
username: str
display_name: Optional[str] = None
created_at: str
total_commands: int = 0
active_commands: int = 0
class CustomCommandModel(BaseModel):
id: Optional[int] = None
name: str = Field(..., min_length=2, max_length=32)
content: str = Field(..., min_length=1, max_length=2000)
creator_id: int
creator: Optional[CustomCommandCreatorModel] = None
created_at: Optional[str] = None
updated_at: Optional[str] = None
last_used: Optional[str] = None
use_count: int = 0
warning_sent: bool = False
is_active: bool = True
tags: Optional[List[str]] = None
class CustomCommandListResponse(BaseModel):
custom_commands: List[CustomCommandModel]
total_count: int
page: int
page_size: int
total_pages: int
has_more: bool
class CustomCommandStatsResponse(BaseModel):
total_commands: int
active_commands: int
total_creators: int
total_uses: int
most_popular_command: Optional[Dict[str, Any]] = None
most_active_creator: Optional[Dict[str, Any]] = None
recent_commands_count: int = 0
commands_needing_warning: int = 0
commands_eligible_for_deletion: int = 0
# Helper functions
def get_custom_commands_table():
"""Get custom commands from database with basic filtering"""
cursor = db.execute_sql("""
SELECT cc.*, creator.id as creator_db_id, creator.discord_id as creator_discord_id,
creator.username as creator_username, creator.display_name as creator_display_name,
creator.created_at as creator_created_at, creator.total_commands, creator.active_commands
FROM custom_commands cc
LEFT JOIN custom_command_creators creator ON cc.creator_id = creator.id
WHERE 1=1
""")
results = cursor.fetchall()
if results:
columns = [desc[0] for desc in cursor.description]
return [dict(zip(columns, row)) for row in results]
return []
def get_custom_command_by_id(command_id: int):
"""Get a single custom command by ID"""
cursor = db.execute_sql("""
SELECT cc.*, creator.id as creator_db_id, creator.discord_id as creator_discord_id,
creator.username as creator_username, creator.display_name as creator_display_name,
creator.created_at as creator_created_at, creator.total_commands, creator.active_commands
FROM custom_commands cc
LEFT JOIN custom_command_creators creator ON cc.creator_id = creator.id
WHERE cc.id = ?
""", (command_id,))
result = cursor.fetchone()
if result:
columns = [desc[0] for desc in cursor.description]
return dict(zip(columns, result))
return None
def get_custom_command_by_name(name: str):
"""Get a single custom command by name"""
cursor = db.execute_sql("""
SELECT cc.*, creator.id as creator_db_id, creator.discord_id as creator_discord_id,
creator.username as creator_username, creator.display_name as creator_display_name,
creator.created_at as creator_created_at, creator.total_commands, creator.active_commands
FROM custom_commands cc
LEFT JOIN custom_command_creators creator ON cc.creator_id = creator.id
WHERE LOWER(cc.name) = LOWER(?)
""", (name,))
result = cursor.fetchone()
if result:
columns = [desc[0] for desc in cursor.description]
return dict(zip(columns, result))
return None
def create_custom_command(command_data: Dict[str, Any]) -> int:
"""Create a new custom command"""
now = datetime.now().isoformat()
result = db.execute_sql("""
INSERT INTO custom_commands
(name, content, creator_id, created_at, last_used, use_count, warning_sent, is_active, tags)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
command_data['name'],
command_data['content'],
command_data['creator_id'],
now,
now,
0,
False,
True,
json.dumps(command_data.get('tags', []))
))
return db.execute_sql("SELECT last_insert_rowid()").fetchone()[0]
def update_custom_command(command_id: int, update_data: Dict[str, Any]) -> bool:
"""Update an existing custom command"""
set_clauses = []
params = []
for field, value in update_data.items():
if field == 'tags':
set_clauses.append(f"{field} = ?")
params.append(json.dumps(value))
else:
set_clauses.append(f"{field} = ?")
params.append(value)
if not set_clauses:
return False
params.append(command_id)
sql = f"UPDATE custom_commands SET {', '.join(set_clauses)} WHERE id = ?"
db.execute_sql(sql, params)
return True
def delete_custom_command(command_id: int) -> bool:
"""Delete a custom command"""
db.execute_sql("DELETE FROM custom_commands WHERE id = ?", (command_id,))
return True
def get_creator_by_discord_id(discord_id: int):
"""Get creator by Discord ID"""
result = db.execute_sql("""
SELECT * FROM custom_command_creators WHERE discord_id = ?
""", (discord_id,)).fetchone()
return result
def create_creator(creator_data: Dict[str, Any]) -> int:
"""Create a new command creator"""
now = datetime.now().isoformat()
result = db.execute_sql("""
INSERT INTO custom_command_creators
(discord_id, username, display_name, created_at, total_commands, active_commands)
VALUES (?, ?, ?, ?, ?, ?)
""", (
creator_data['discord_id'],
creator_data['username'],
creator_data.get('display_name'),
now,
0,
0
))
return db.execute_sql("SELECT last_insert_rowid()").fetchone()[0]
def update_creator_stats(creator_id: int):
"""Update creator command counts"""
total = db.execute_sql("""
SELECT COUNT(*) FROM custom_commands WHERE creator_id = ?
""", (creator_id,)).fetchone()[0]
active = db.execute_sql("""
SELECT COUNT(*) FROM custom_commands WHERE creator_id = ? AND is_active = 1
""", (creator_id,)).fetchone()[0]
db.execute_sql("""
UPDATE custom_command_creators
SET total_commands = ?, active_commands = ?
WHERE id = ?
""", (total, active, creator_id))
# API Endpoints
@router.get('')
async def get_custom_commands(
name: Optional[str] = None,
creator_discord_id: Optional[int] = None,
min_uses: Optional[int] = None,
max_days_unused: Optional[int] = None,
is_active: Optional[bool] = True,
sort: Optional[str] = 'name',
page: int = Query(1, ge=1),
page_size: int = Query(25, ge=1, le=100)
):
"""Get custom commands with filtering and pagination"""
try:
# Build WHERE clause
where_conditions = []
params = []
if name is not None:
where_conditions.append("LOWER(cc.name) LIKE LOWER(?)" if db.database == 'sqlite' else "cc.name ILIKE ?")
params.append(f"%{name}%")
if creator_discord_id is not None:
where_conditions.append("creator.discord_id = ?")
params.append(creator_discord_id)
if min_uses is not None:
where_conditions.append("cc.use_count >= ?")
params.append(min_uses)
if max_days_unused is not None:
cutoff_date = (datetime.now() - timedelta(days=max_days_unused)).isoformat()
where_conditions.append("cc.last_used >= ?")
params.append(cutoff_date)
if is_active is not None:
where_conditions.append("cc.is_active = ?")
params.append(1 if is_active else 0)
where_clause = "WHERE " + " AND ".join(where_conditions) if where_conditions else ""
# Build ORDER BY clause
sort_mapping = {
'name': 'cc.name',
'created_at': 'cc.created_at',
'last_used': 'cc.last_used',
'use_count': 'cc.use_count',
'creator': 'creator.username'
}
if sort.startswith('-'):
order_direction = 'DESC'
sort_field = sort[1:]
else:
order_direction = 'ASC'
sort_field = sort
order_by = sort_mapping.get(sort_field, 'cc.name')
order_clause = f"ORDER BY {order_by} {order_direction}"
# Get total count
count_sql = f"""
SELECT COUNT(*)
FROM custom_commands cc
LEFT JOIN custom_command_creators creator ON cc.creator_id = creator.id
{where_clause}
"""
total_count = db.execute_sql(count_sql, params).fetchone()[0]
# Calculate pagination
offset = (page - 1) * page_size
total_pages = (total_count + page_size - 1) // page_size
# Get commands
sql = f"""
SELECT cc.*, creator.discord_id as creator_discord_id,
creator.username as creator_username,
creator.display_name as creator_display_name
FROM custom_commands cc
LEFT JOIN custom_command_creators creator ON cc.creator_id = creator.id
{where_clause}
{order_clause}
LIMIT ? OFFSET ?
"""
params.extend([page_size, offset])
cursor3 = db.execute_sql(sql, params)
results = cursor3.fetchall()
# Convert to CustomCommandModel objects with creator info
commands = []
if results:
columns3 = [desc[0] for desc in cursor3.description]
for row in results:
command_dict = dict(zip(columns3, row))
# Parse tags if they exist
if command_dict.get('tags'):
try:
command_dict['tags'] = json.loads(command_dict['tags'])
except:
command_dict['tags'] = []
# Get full creator information
creator_id = command_dict['creator_id']
creator_cursor = db.execute_sql("SELECT * FROM custom_command_creators WHERE id = ?", (creator_id,))
creator_result = creator_cursor.fetchone()
if creator_result:
# Create complete creator object
creator_columns = [desc[0] for desc in creator_cursor.description]
creator_dict = dict(zip(creator_columns, creator_result))
try:
creator_model = CustomCommandCreatorModel(**creator_dict)
command_dict['creator'] = creator_model
except Exception as e:
logger.error(f"Error creating CustomCommandCreatorModel: {e}, data: {creator_dict}")
command_dict['creator'] = None
else:
# No creator found, set to None
command_dict['creator'] = None
# Remove the individual creator fields now that we have the creator object
command_dict.pop('creator_discord_id', None)
command_dict.pop('creator_username', None)
command_dict.pop('creator_display_name', None)
# Create CustomCommandModel instance
try:
command_model = CustomCommandModel(**command_dict)
commands.append(command_model)
except Exception as e:
logger.error(f"Error creating CustomCommandModel: {e}, data: {command_dict}")
# Skip invalid commands rather than failing the entire request
continue
return CustomCommandListResponse(
custom_commands=commands,
total_count=total_count,
page=page,
page_size=page_size,
total_pages=total_pages,
has_more=page < total_pages
)
except Exception as e:
logger.error(f"Error getting custom commands: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
db.close()
# Move this route to after the specific string routes
@router.post('', include_in_schema=PRIVATE_IN_SCHEMA)
async def create_custom_command_endpoint(
command: CustomCommandModel,
token: str = Depends(oauth2_scheme)
):
"""Create a new custom command"""
if not valid_token(token):
logger.warning(f'create_custom_command - Bad Token: {token}')
raise HTTPException(status_code=401, detail='Unauthorized')
try:
# Check if command name already exists
existing = get_custom_command_by_name(command.name)
if existing:
raise HTTPException(status_code=409, detail=f"Command '{command.name}' already exists")
# Create the command
command_data = command.model_dump(exclude={'id'})
command_id = create_custom_command(command_data)
# Update creator stats
update_creator_stats(command.creator_id)
# Return the created command
result = get_custom_command_by_id(command_id)
command_dict = dict(result)
if command_dict.get('tags'):
try:
command_dict['tags'] = json.loads(command_dict['tags'])
except:
command_dict['tags'] = []
command_dict['creator'] = {
'id': command_dict.pop('creator_db_id'),
'discord_id': command_dict.pop('creator_discord_id'),
'username': command_dict.pop('creator_username'),
'display_name': command_dict.pop('creator_display_name'),
'created_at': command_dict.pop('creator_created_at'),
'total_commands': command_dict.pop('total_commands'),
'active_commands': command_dict.pop('active_commands')
}
return command_dict
except HTTPException:
raise
except Exception as e:
logger.error(f"Error creating custom command: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
db.close()
@router.put('/{command_id}', include_in_schema=PRIVATE_IN_SCHEMA)
async def update_custom_command_endpoint(
command_id: int,
command: CustomCommandModel,
token: str = Depends(oauth2_scheme)
):
"""Update an existing custom command"""
if not valid_token(token):
logger.warning(f'update_custom_command - Bad Token: {token}')
raise HTTPException(status_code=401, detail='Unauthorized')
try:
# Check if command exists
existing = get_custom_command_by_id(command_id)
if not existing:
raise HTTPException(status_code=404, detail=f"Custom command {command_id} not found")
# Update the command
update_data = command.model_dump(exclude={'id', 'creator'})
update_data['updated_at'] = datetime.now().isoformat()
update_custom_command(command_id, update_data)
# Return updated command
result = get_custom_command_by_id(command_id)
command_dict = dict(result)
if command_dict.get('tags'):
try:
command_dict['tags'] = json.loads(command_dict['tags'])
except:
command_dict['tags'] = []
command_dict['creator'] = {
'id': command_dict.pop('creator_db_id'),
'discord_id': command_dict.pop('creator_discord_id'),
'username': command_dict.pop('creator_username'),
'display_name': command_dict.pop('creator_display_name'),
'created_at': command_dict.pop('creator_created_at'),
'total_commands': command_dict.pop('total_commands'),
'active_commands': command_dict.pop('active_commands')
}
return command_dict
except HTTPException:
raise
except Exception as e:
logger.error(f"Error updating custom command {command_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
db.close()
@router.patch('/{command_id}', include_in_schema=PRIVATE_IN_SCHEMA)
async def patch_custom_command(
command_id: int,
token: str = Depends(oauth2_scheme),
content: Optional[str] = None,
tags: Optional[List[str]] = None,
use_count: Optional[int] = None,
last_used: Optional[str] = None,
warning_sent: Optional[bool] = None,
is_active: Optional[bool] = None
):
"""Partially update a custom command"""
if not valid_token(token):
logger.warning(f'patch_custom_command - Bad Token: {token}')
raise HTTPException(status_code=401, detail='Unauthorized')
try:
# Check if command exists
existing = get_custom_command_by_id(command_id)
if not existing:
raise HTTPException(status_code=404, detail=f"Custom command {command_id} not found")
# Build update data
update_data = {}
if content is not None:
update_data['content'] = content
update_data['updated_at'] = datetime.now().isoformat()
if tags is not None:
update_data['tags'] = tags
if use_count is not None:
update_data['use_count'] = use_count
if last_used is not None:
update_data['last_used'] = last_used
if warning_sent is not None:
update_data['warning_sent'] = warning_sent
if is_active is not None:
update_data['is_active'] = is_active
if not update_data:
raise HTTPException(status_code=400, detail="No fields to update")
# Update the command
update_custom_command(command_id, update_data)
# Return updated command
result = get_custom_command_by_id(command_id)
command_dict = dict(result)
if command_dict.get('tags'):
try:
command_dict['tags'] = json.loads(command_dict['tags'])
except:
command_dict['tags'] = []
command_dict['creator'] = {
'id': command_dict.pop('creator_db_id'),
'discord_id': command_dict.pop('creator_discord_id'),
'username': command_dict.pop('creator_username'),
'display_name': command_dict.pop('creator_display_name'),
'created_at': command_dict.pop('creator_created_at'),
'total_commands': command_dict.pop('total_commands'),
'active_commands': command_dict.pop('active_commands')
}
return command_dict
except HTTPException:
raise
except Exception as e:
logger.error(f"Error patching custom command {command_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
db.close()
@router.delete('/{command_id}', include_in_schema=PRIVATE_IN_SCHEMA)
async def delete_custom_command_endpoint(
command_id: int,
token: str = Depends(oauth2_scheme)
):
"""Delete a custom command"""
if not valid_token(token):
logger.warning(f'delete_custom_command - Bad Token: {token}')
raise HTTPException(status_code=401, detail='Unauthorized')
try:
# Check if command exists
existing = get_custom_command_by_id(command_id)
if not existing:
raise HTTPException(status_code=404, detail=f"Custom command {command_id} not found")
creator_id = existing['creator_id']
# Delete the command
delete_custom_command(command_id)
# Update creator stats
update_creator_stats(creator_id)
return {"message": f"Custom command {command_id} deleted successfully"}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error deleting custom command {command_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
db.close()
# Creator endpoints
@router.get('/creators')
async def get_creators(
discord_id: Optional[int] = None,
page: int = Query(1, ge=1),
page_size: int = Query(25, ge=1, le=100)
):
"""Get custom command creators with optional filtering"""
try:
# Build WHERE clause
where_conditions = []
params = []
if discord_id is not None:
where_conditions.append("discord_id = ?")
params.append(discord_id)
where_clause = "WHERE " + " AND ".join(where_conditions) if where_conditions else ""
# Get total count
count_sql = f"SELECT COUNT(*) FROM custom_command_creators {where_clause}"
total_count = db.execute_sql(count_sql, params).fetchone()[0]
# Calculate pagination
offset = (page - 1) * page_size
total_pages = (total_count + page_size - 1) // page_size
# Get creators
sql = f"""
SELECT * FROM custom_command_creators
{where_clause}
ORDER BY username
LIMIT ? OFFSET ?
"""
params.extend([page_size, offset])
cursor = db.execute_sql(sql, params)
results = cursor.fetchall()
# Convert to dict format
creators = []
if results:
columns = [desc[0] for desc in cursor.description]
for row in results:
creator_dict = dict(zip(columns, row))
creators.append(creator_dict)
return {
'creators': creators,
'total_count': total_count,
'page': page,
'page_size': page_size,
'total_pages': total_pages,
'has_more': page < total_pages
}
except Exception as e:
logger.error(f"Error getting creators: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
db.close()
@router.post('/creators', include_in_schema=PRIVATE_IN_SCHEMA)
async def create_creator_endpoint(
creator: CustomCommandCreatorModel,
token: str = Depends(oauth2_scheme)
):
"""Create a new command creator"""
if not valid_token(token):
logger.warning(f'create_creator - Bad Token: {token}')
raise HTTPException(status_code=401, detail='Unauthorized')
try:
# Check if creator already exists
existing = get_creator_by_discord_id(creator.discord_id)
if existing:
raise HTTPException(status_code=409, detail=f"Creator with Discord ID {creator.discord_id} already exists")
# Create the creator
creator_data = creator.model_dump(exclude={'id'})
creator_id = create_creator(creator_data)
# Return the created creator
cursor = db.execute_sql("SELECT * FROM custom_command_creators WHERE id = ?", (creator_id,))
result = cursor.fetchone()
if result:
columns = [desc[0] for desc in cursor.description]
return dict(zip(columns, result))
else:
raise HTTPException(status_code=500, detail="Failed to retrieve created creator")
except HTTPException:
raise
except Exception as e:
logger.error(f"Error creating creator: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
db.close()
@router.get('/stats')
async def get_custom_command_stats():
"""Get comprehensive statistics about custom commands"""
try:
# Get basic counts
total_commands = db.execute_sql("SELECT COUNT(*) FROM custom_commands").fetchone()[0]
active_commands = db.execute_sql("SELECT COUNT(*) FROM custom_commands WHERE is_active = 1").fetchone()[0]
total_creators = db.execute_sql("SELECT COUNT(*) FROM custom_command_creators").fetchone()[0]
# Get total uses
total_uses_result = db.execute_sql("SELECT SUM(use_count) FROM custom_commands WHERE is_active = 1").fetchone()
total_uses = total_uses_result[0] if total_uses_result[0] else 0
# Get most popular command
cursor = db.execute_sql("""
SELECT cc.*, creator.discord_id as creator_discord_id,
creator.username as creator_username,
creator.display_name as creator_display_name
FROM custom_commands cc
LEFT JOIN custom_command_creators creator ON cc.creator_id = creator.id
WHERE cc.is_active = 1
ORDER BY cc.use_count DESC
LIMIT 1
""")
most_popular_result = cursor.fetchone()
most_popular_command = None
if most_popular_result:
columns = [desc[0] for desc in cursor.description]
command_dict = dict(zip(columns, most_popular_result))
if command_dict.get('tags'):
try:
command_dict['tags'] = json.loads(command_dict['tags'])
except:
command_dict['tags'] = []
command_dict['creator'] = {
'discord_id': command_dict.pop('creator_discord_id'),
'username': command_dict.pop('creator_username'),
'display_name': command_dict.pop('creator_display_name')
}
most_popular_command = command_dict
# Get most active creator
cursor2 = db.execute_sql("""
SELECT * FROM custom_command_creators
ORDER BY active_commands DESC
LIMIT 1
""")
most_active_creator_result = cursor2.fetchone()
most_active_creator = None
if most_active_creator_result:
columns2 = [desc[0] for desc in cursor2.description]
most_active_creator = dict(zip(columns2, most_active_creator_result))
# Get recent commands count (last 7 days)
week_ago = (datetime.now() - timedelta(days=7)).isoformat()
recent_count = db.execute_sql("""
SELECT COUNT(*) FROM custom_commands
WHERE created_at >= ? AND is_active = 1
""", (week_ago,)).fetchone()[0]
# Get cleanup stats
sixty_days_ago = (datetime.now() - timedelta(days=60)).isoformat()
ninety_days_ago = (datetime.now() - timedelta(days=90)).isoformat()
warning_count = db.execute_sql("""
SELECT COUNT(*) FROM custom_commands
WHERE last_used < ? AND warning_sent = 0 AND is_active = 1
""", (sixty_days_ago,)).fetchone()[0]
deletion_count = db.execute_sql("""
SELECT COUNT(*) FROM custom_commands
WHERE last_used < ? AND is_active = 1
""", (ninety_days_ago,)).fetchone()[0]
return CustomCommandStatsResponse(
total_commands=total_commands,
active_commands=active_commands,
total_creators=total_creators,
total_uses=total_uses,
most_popular_command=most_popular_command,
most_active_creator=most_active_creator,
recent_commands_count=recent_count,
commands_needing_warning=warning_count,
commands_eligible_for_deletion=deletion_count
)
except Exception as e:
logger.error(f"Error getting custom command stats: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
db.close()
# Special endpoints for Discord bot integration
@router.get('/by_name/{command_name}')
async def get_custom_command_by_name_endpoint(command_name: str):
"""Get a custom command by name (for Discord bot execution)"""
try:
result = get_custom_command_by_name(command_name)
if not result:
raise HTTPException(status_code=404, detail=f"Custom command '{command_name}' not found")
command_dict = dict(result)
# Parse tags
if command_dict.get('tags'):
try:
command_dict['tags'] = json.loads(command_dict['tags'])
except:
command_dict['tags'] = []
# Add creator info - get full creator record
creator_id = command_dict['creator_id']
creator_cursor = db.execute_sql("SELECT * FROM custom_command_creators WHERE id = ?", (creator_id,))
creator_result = creator_cursor.fetchone()
if creator_result:
creator_columns = [desc[0] for desc in creator_cursor.description]
creator_dict = dict(zip(creator_columns, creator_result))
command_dict['creator'] = creator_dict
else:
# Fallback to basic info if full creator not found
command_dict['creator'] = {
'id': creator_id,
'discord_id': command_dict.pop('creator_discord_id'),
'username': command_dict.pop('creator_username'),
'display_name': command_dict.pop('creator_display_name'),
'created_at': command_dict['created_at'], # Use command creation as fallback
'total_commands': 0,
'active_commands': 0
}
# Remove the duplicate fields
command_dict.pop('creator_discord_id', None)
command_dict.pop('creator_username', None)
command_dict.pop('creator_display_name', None)
return command_dict
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting custom command by name '{command_name}': {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
db.close()
@router.patch('/by_name/{command_name}/execute', include_in_schema=PRIVATE_IN_SCHEMA)
async def execute_custom_command(
command_name: str,
token: str = Depends(oauth2_scheme)
):
"""Execute a custom command and update usage statistics"""
if not valid_token(token):
logger.warning(f'execute_custom_command - Bad Token: {token}')
raise HTTPException(status_code=401, detail='Unauthorized')
try:
result = get_custom_command_by_name(command_name)
if not result:
raise HTTPException(status_code=404, detail=f"Custom command '{command_name}' not found")
command_dict = dict(result)
command_id = command_dict['id']
# Update usage statistics
update_data = {
'last_used': datetime.now().isoformat(),
'use_count': command_dict['use_count'] + 1,
'warning_sent': False # Reset warning on use
}
update_custom_command(command_id, update_data)
# Return updated command
updated_result = get_custom_command_by_id(command_id)
updated_dict = dict(updated_result)
if updated_dict.get('tags'):
try:
updated_dict['tags'] = json.loads(updated_dict['tags'])
except:
updated_dict['tags'] = []
# Add creator info - get full creator record
creator_id = updated_dict['creator_id']
creator_cursor = db.execute_sql("SELECT * FROM custom_command_creators WHERE id = ?", (creator_id,))
creator_result = creator_cursor.fetchone()
if creator_result:
creator_columns = [desc[0] for desc in creator_cursor.description]
creator_dict = dict(zip(creator_columns, creator_result))
updated_dict['creator'] = creator_dict
else:
# Fallback to basic info if full creator not found
updated_dict['creator'] = {
'id': creator_id,
'discord_id': updated_dict.pop('creator_discord_id'),
'username': updated_dict.pop('creator_username'),
'display_name': updated_dict.pop('creator_display_name'),
'created_at': updated_dict['created_at'], # Use command creation as fallback
'total_commands': 0,
'active_commands': 0
}
# Remove the duplicate fields
updated_dict.pop('creator_discord_id', None)
updated_dict.pop('creator_username', None)
updated_dict.pop('creator_display_name', None)
return updated_dict
except HTTPException:
raise
except Exception as e:
logger.error(f"Error executing custom command '{command_name}': {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
db.close()
@router.get('/autocomplete')
async def get_command_names_for_autocomplete(
partial_name: str = "",
limit: int = Query(25, ge=1, le=100)
):
"""Get command names for Discord autocomplete"""
try:
if partial_name:
like_clause = "LOWER(name) LIKE LOWER(?)" if db.database == 'sqlite' else "name ILIKE ?"
results = db.execute_sql(f"""
SELECT name FROM custom_commands
WHERE is_active = 1 AND {like_clause}
ORDER BY name
LIMIT ?
""", (f"%{partial_name}%", limit)).fetchall()
else:
results = db.execute_sql("""
SELECT name FROM custom_commands
WHERE is_active = 1
ORDER BY name
LIMIT ?
""", (limit,)).fetchall()
return [row[0] for row in results]
except Exception as e:
logger.error(f"Error getting command names for autocomplete: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
db.close()
@router.get('/{command_id}')
async def get_custom_command(command_id: int):
"""Get a single custom command by ID"""
try:
result = get_custom_command_by_id(command_id)
if not result:
raise HTTPException(status_code=404, detail=f"Custom command {command_id} not found")
command_dict = dict(result)
# Parse tags
if command_dict.get('tags'):
try:
command_dict['tags'] = json.loads(command_dict['tags'])
except:
command_dict['tags'] = []
# Add creator info
command_dict['creator'] = {
'id': command_dict.pop('creator_db_id'),
'discord_id': command_dict.pop('creator_discord_id'),
'username': command_dict.pop('creator_username'),
'display_name': command_dict.pop('creator_display_name'),
'created_at': command_dict.pop('creator_created_at'),
'total_commands': command_dict.pop('total_commands'),
'active_commands': command_dict.pop('active_commands')
}
return command_dict
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting custom command {command_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
db.close()