major-domo-database/app/routers_v3/custom_commands.py
Cal Corum 99f501e748 Fix custom command creator POST validation (v2.3.1)
Changed CustomCommandCreatorModel.id from required `int` to `Optional[int] = None`
to allow POST requests to create new creators without specifying an ID (database
auto-generates it).

Bug: Users couldn't create custom commands with /new-cc - API returned 422 error
"Field required" for missing id field.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-13 16:31:47 -06:00

1010 lines
36 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, Query, Response
from typing import List, Optional, Dict, Any
import logging
from datetime import datetime, timedelta
from pydantic import BaseModel, Field
import json
from playhouse.shortcuts import model_to_dict
from peewee import fn
from ..dependencies import oauth2_scheme, valid_token, PRIVATE_IN_SCHEMA, handle_db_errors
from ..db_engine import db, CustomCommand, CustomCommandCreator
logger = logging.getLogger('database_api')
router = APIRouter(
prefix='/api/v3/custom_commands',
tags=['custom_commands']
)
# Pydantic Models for API
class CustomCommandCreatorModel(BaseModel):
id: Optional[int] = None # Optional for POST (auto-generated), required on response
discord_id: int
username: str
display_name: Optional[str] = None
created_at: str
total_commands: int = 0
active_commands: int = 0
class CustomCommandModel(BaseModel):
id: Optional[int] = None
name: str = Field(..., min_length=2, max_length=32)
content: str = Field(..., min_length=1, max_length=2000)
creator_id: int
creator: Optional[CustomCommandCreatorModel] = None
created_at: Optional[str] = None
updated_at: Optional[str] = None
last_used: Optional[str] = None
use_count: int = 0
warning_sent: bool = False
is_active: bool = True
tags: Optional[List[str]] = None
class CustomCommandListResponse(BaseModel):
custom_commands: List[CustomCommandModel]
total_count: int
page: int
page_size: int
total_pages: int
has_more: bool
class CustomCommandStatsResponse(BaseModel):
total_commands: int
active_commands: int
total_creators: int
total_uses: int
most_popular_command: Optional[Dict[str, Any]] = None
most_active_creator: Optional[Dict[str, Any]] = None
recent_commands_count: int = 0
commands_needing_warning: int = 0
commands_eligible_for_deletion: int = 0
# Helper functions
def convert_datetime_to_iso(data_dict, fields=None):
"""Convert datetime objects to ISO strings in a dictionary."""
if fields is None:
fields = ['created_at', 'updated_at', 'last_used']
for field in fields:
if data_dict.get(field) and hasattr(data_dict[field], 'isoformat'):
data_dict[field] = data_dict[field].isoformat()
return data_dict
def update_creator_stats(creator_id: int):
"""Update creator command counts"""
creator = CustomCommandCreator.get_or_none(CustomCommandCreator.id == creator_id)
if not creator:
return
total = CustomCommand.select().where(CustomCommand.creator == creator).count()
active = CustomCommand.select().where(
(CustomCommand.creator == creator) & (CustomCommand.is_active == True)
).count()
creator.total_commands = total
creator.active_commands = active
creator.save()
def get_custom_command_by_name(name: str):
"""Get a custom command by name with creator info"""
command = CustomCommand.select(
CustomCommand,
CustomCommandCreator
).join(
CustomCommandCreator, on=(CustomCommand.creator == CustomCommandCreator.id)
).where(
fn.LOWER(CustomCommand.name) == name.lower()
).first()
if command:
result = model_to_dict(command, recurse=False)
# Ensure creator_id is in the result (it should be from model_to_dict, but make sure)
result['creator_id'] = command.creator.id
result['creator_discord_id'] = command.creator.discord_id
result['creator_username'] = command.creator.username
result['creator_display_name'] = command.creator.display_name
return result
return None
def get_custom_command_by_id(command_id: int):
"""Get a custom command by ID with creator info"""
command = CustomCommand.select(
CustomCommand,
CustomCommandCreator
).join(
CustomCommandCreator, on=(CustomCommand.creator == CustomCommandCreator.id)
).where(
CustomCommand.id == command_id
).first()
if command:
result = model_to_dict(command, recurse=False)
result['creator_db_id'] = command.creator.id
result['creator_discord_id'] = command.creator.discord_id
result['creator_username'] = command.creator.username
result['creator_display_name'] = command.creator.display_name
result['creator_created_at'] = command.creator.created_at
result['total_commands'] = command.creator.total_commands
result['active_commands'] = command.creator.active_commands
return result
return None
def create_custom_command(command_data: dict) -> int:
"""Create a new custom command and return its ID"""
# Convert tags list to JSON if present
if 'tags' in command_data and isinstance(command_data['tags'], list):
command_data['tags'] = json.dumps(command_data['tags'])
# Set created_at if not provided
if 'created_at' not in command_data:
command_data['created_at'] = datetime.now()
command = CustomCommand.create(**command_data)
return command.id
def update_custom_command(command_id: int, update_data: dict):
"""Update an existing custom command"""
# Convert tags list to JSON if present
if 'tags' in update_data and isinstance(update_data['tags'], list):
update_data['tags'] = json.dumps(update_data['tags'])
query = CustomCommand.update(**update_data).where(CustomCommand.id == command_id)
query.execute()
def delete_custom_command(command_id: int):
"""Delete a custom command"""
query = CustomCommand.delete().where(CustomCommand.id == command_id)
query.execute()
def get_creator_by_discord_id(discord_id: int):
"""Get a creator by Discord ID"""
creator = CustomCommandCreator.get_or_none(CustomCommandCreator.discord_id == str(discord_id))
if creator:
return model_to_dict(creator)
return None
def create_creator(creator_data: dict) -> int:
"""Create a new creator and return their ID"""
if 'created_at' not in creator_data:
creator_data['created_at'] = datetime.now()
creator = CustomCommandCreator.create(**creator_data)
return creator.id
# API Endpoints
@router.get('')
@handle_db_errors
async def get_custom_commands(
name: Optional[str] = None,
creator_discord_id: Optional[int] = None,
min_uses: Optional[int] = None,
max_days_unused: Optional[int] = None,
is_active: Optional[bool] = True,
sort: Optional[str] = 'name',
page: int = Query(1, ge=1),
page_size: int = Query(25, ge=1, le=100)
):
"""Get custom commands with filtering and pagination"""
try:
# Build WHERE clause
where_conditions = []
params = []
if name is not None:
where_conditions.append("cc.name ILIKE %s")
params.append(f"%{name}%")
if creator_discord_id is not None:
where_conditions.append("creator.discord_id = %s")
params.append(creator_discord_id)
if min_uses is not None:
where_conditions.append("cc.use_count >= %s")
params.append(min_uses)
if max_days_unused is not None:
cutoff_date = (datetime.now() - timedelta(days=max_days_unused)).isoformat()
where_conditions.append("cc.last_used >= %s")
params.append(cutoff_date)
if is_active is not None:
where_conditions.append("cc.is_active = %s")
params.append(is_active)
where_clause = "WHERE " + " AND ".join(where_conditions) if where_conditions else ""
# Build ORDER BY clause
sort_mapping = {
'name': 'cc.name',
'created_at': 'cc.created_at',
'last_used': 'cc.last_used',
'use_count': 'cc.use_count',
'creator': 'creator.username'
}
if sort.startswith('-'):
order_direction = 'DESC'
sort_field = sort[1:]
else:
order_direction = 'ASC'
sort_field = sort
order_by = sort_mapping.get(sort_field, 'cc.name')
order_clause = f"ORDER BY {order_by} {order_direction}"
# Get total count
count_sql = f"""
SELECT COUNT(*)
FROM custom_commands cc
LEFT JOIN custom_command_creators creator ON cc.creator_id = creator.id
{where_clause}
"""
total_count = db.execute_sql(count_sql, params).fetchone()[0]
# Calculate pagination
offset = (page - 1) * page_size
total_pages = (total_count + page_size - 1) // page_size
# Get commands
sql = f"""
SELECT cc.*, creator.discord_id as creator_discord_id,
creator.username as creator_username,
creator.display_name as creator_display_name
FROM custom_commands cc
LEFT JOIN custom_command_creators creator ON cc.creator_id = creator.id
{where_clause}
{order_clause}
LIMIT %s OFFSET %s
"""
params.extend([page_size, offset])
cursor3 = db.execute_sql(sql, params)
results = cursor3.fetchall()
# Convert to CustomCommandModel objects with creator info
commands = []
if results:
columns3 = [desc[0] for desc in cursor3.description]
for row in results:
command_dict = dict(zip(columns3, row))
# Parse tags if they exist
if command_dict.get('tags'):
try:
command_dict['tags'] = json.loads(command_dict['tags'])
except:
command_dict['tags'] = []
# Get full creator information
creator_id = command_dict['creator_id']
creator_cursor = db.execute_sql("SELECT * FROM custom_command_creators WHERE id = %s", (creator_id,))
creator_result = creator_cursor.fetchone()
if creator_result:
# Create complete creator object
creator_columns = [desc[0] for desc in creator_cursor.description]
creator_dict = dict(zip(creator_columns, creator_result))
# Convert datetime to ISO string
if creator_dict.get('created_at') and hasattr(creator_dict['created_at'], 'isoformat'):
creator_dict['created_at'] = creator_dict['created_at'].isoformat()
try:
creator_model = CustomCommandCreatorModel(**creator_dict)
command_dict['creator'] = creator_model
except Exception as e:
logger.error(f"Error creating CustomCommandCreatorModel: {e}, data: {creator_dict}")
command_dict['creator'] = None
else:
# No creator found, set to None
command_dict['creator'] = None
# Remove the individual creator fields now that we have the creator object
command_dict.pop('creator_discord_id', None)
command_dict.pop('creator_username', None)
command_dict.pop('creator_display_name', None)
# Convert datetime fields to ISO strings
for field in ['created_at', 'updated_at', 'last_used']:
if command_dict.get(field) and hasattr(command_dict[field], 'isoformat'):
command_dict[field] = command_dict[field].isoformat()
# Create CustomCommandModel instance
try:
command_model = CustomCommandModel(**command_dict)
commands.append(command_model)
except Exception as e:
logger.error(f"Error creating CustomCommandModel: {e}, data: {command_dict}")
# Skip invalid commands rather than failing the entire request
continue
return CustomCommandListResponse(
custom_commands=commands,
total_count=total_count,
page=page,
page_size=page_size,
total_pages=total_pages,
has_more=page < total_pages
)
except Exception as e:
logger.error(f"Error getting custom commands: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
db.close()
# Move this route to after the specific string routes
@router.post('', include_in_schema=PRIVATE_IN_SCHEMA)
@handle_db_errors
async def create_custom_command_endpoint(
command: CustomCommandModel,
token: str = Depends(oauth2_scheme)
):
"""Create a new custom command"""
if not valid_token(token):
logger.warning(f'create_custom_command - Bad Token: {token}')
raise HTTPException(status_code=401, detail='Unauthorized')
try:
# Check if command name already exists
existing = get_custom_command_by_name(command.name)
if existing:
raise HTTPException(status_code=409, detail=f"Command '{command.name}' already exists")
# Create the command
command_data = command.model_dump(exclude={'id', 'creator'})
command_id = create_custom_command(command_data)
# Update creator stats
update_creator_stats(command.creator_id)
# Return the created command
result = get_custom_command_by_id(command_id)
command_dict = dict(result)
# Convert datetime fields
convert_datetime_to_iso(command_dict)
if command_dict.get('tags'):
try:
command_dict['tags'] = json.loads(command_dict['tags'])
except:
command_dict['tags'] = []
creator_created_at = command_dict.pop('creator_created_at')
if hasattr(creator_created_at, 'isoformat'):
creator_created_at = creator_created_at.isoformat()
command_dict['creator'] = {
'id': command_dict.pop('creator_db_id'),
'discord_id': command_dict.pop('creator_discord_id'),
'username': command_dict.pop('creator_username'),
'display_name': command_dict.pop('creator_display_name'),
'created_at': creator_created_at,
'total_commands': command_dict.pop('total_commands'),
'active_commands': command_dict.pop('active_commands')
}
return command_dict
except HTTPException:
raise
except Exception as e:
logger.error(f"Error creating custom command: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
db.close()
@router.put('/{command_id}', include_in_schema=PRIVATE_IN_SCHEMA)
@handle_db_errors
async def update_custom_command_endpoint(
command_id: int,
command: CustomCommandModel,
token: str = Depends(oauth2_scheme)
):
"""Update an existing custom command"""
if not valid_token(token):
logger.warning(f'update_custom_command - Bad Token: {token}')
raise HTTPException(status_code=401, detail='Unauthorized')
try:
# Check if command exists
existing = get_custom_command_by_id(command_id)
if not existing:
raise HTTPException(status_code=404, detail=f"Custom command {command_id} not found")
# Update the command
update_data = command.model_dump(exclude={'id', 'creator'})
update_data['updated_at'] = datetime.now().isoformat()
update_custom_command(command_id, update_data)
# Return updated command
result = get_custom_command_by_id(command_id)
command_dict = dict(result)
# Convert datetime fields
convert_datetime_to_iso(command_dict)
if command_dict.get('tags'):
try:
command_dict['tags'] = json.loads(command_dict['tags'])
except:
command_dict['tags'] = []
creator_created_at = command_dict.pop('creator_created_at')
if hasattr(creator_created_at, 'isoformat'):
creator_created_at = creator_created_at.isoformat()
command_dict['creator'] = {
'id': command_dict.pop('creator_db_id'),
'discord_id': command_dict.pop('creator_discord_id'),
'username': command_dict.pop('creator_username'),
'display_name': command_dict.pop('creator_display_name'),
'created_at': creator_created_at,
'total_commands': command_dict.pop('total_commands'),
'active_commands': command_dict.pop('active_commands')
}
return command_dict
except HTTPException:
raise
except Exception as e:
logger.error(f"Error updating custom command {command_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
db.close()
@router.patch('/{command_id}', include_in_schema=PRIVATE_IN_SCHEMA)
@handle_db_errors
async def patch_custom_command(
command_id: int,
token: str = Depends(oauth2_scheme),
content: Optional[str] = None,
tags: Optional[List[str]] = None,
use_count: Optional[int] = None,
last_used: Optional[str] = None,
warning_sent: Optional[bool] = None,
is_active: Optional[bool] = None
):
"""Partially update a custom command"""
if not valid_token(token):
logger.warning(f'patch_custom_command - Bad Token: {token}')
raise HTTPException(status_code=401, detail='Unauthorized')
try:
# Check if command exists
existing = get_custom_command_by_id(command_id)
if not existing:
raise HTTPException(status_code=404, detail=f"Custom command {command_id} not found")
# Build update data
update_data = {}
if content is not None:
update_data['content'] = content
update_data['updated_at'] = datetime.now().isoformat()
if tags is not None:
update_data['tags'] = tags
if use_count is not None:
update_data['use_count'] = use_count
if last_used is not None:
update_data['last_used'] = last_used
if warning_sent is not None:
update_data['warning_sent'] = warning_sent
if is_active is not None:
update_data['is_active'] = is_active
if not update_data:
raise HTTPException(status_code=400, detail="No fields to update")
# Update the command
update_custom_command(command_id, update_data)
# Return updated command
result = get_custom_command_by_id(command_id)
command_dict = dict(result)
# Convert datetime fields to ISO strings
convert_datetime_to_iso(command_dict)
if command_dict.get('tags'):
try:
command_dict['tags'] = json.loads(command_dict['tags'])
except:
command_dict['tags'] = []
creator_created_at = command_dict.pop('creator_created_at')
if hasattr(creator_created_at, 'isoformat'):
creator_created_at = creator_created_at.isoformat()
command_dict['creator'] = {
'id': command_dict.pop('creator_db_id'),
'discord_id': command_dict.pop('creator_discord_id'),
'username': command_dict.pop('creator_username'),
'display_name': command_dict.pop('creator_display_name'),
'created_at': creator_created_at,
'total_commands': command_dict.pop('total_commands'),
'active_commands': command_dict.pop('active_commands')
}
return command_dict
except HTTPException:
raise
except Exception as e:
logger.error(f"Error patching custom command {command_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
db.close()
@router.delete('/{command_id}', include_in_schema=PRIVATE_IN_SCHEMA)
@handle_db_errors
async def delete_custom_command_endpoint(
command_id: int,
token: str = Depends(oauth2_scheme)
):
"""Delete a custom command"""
if not valid_token(token):
logger.warning(f'delete_custom_command - Bad Token: {token}')
raise HTTPException(status_code=401, detail='Unauthorized')
try:
# Check if command exists
existing = get_custom_command_by_id(command_id)
if not existing:
raise HTTPException(status_code=404, detail=f"Custom command {command_id} not found")
creator_id = existing['creator_id']
# Delete the command
delete_custom_command(command_id)
# Update creator stats
update_creator_stats(creator_id)
return {"message": f"Custom command {command_id} deleted successfully"}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error deleting custom command {command_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
db.close()
# Creator endpoints
@router.get('/creators')
@handle_db_errors
async def get_creators(
discord_id: Optional[int] = None,
page: int = Query(1, ge=1),
page_size: int = Query(25, ge=1, le=100)
):
"""Get custom command creators with optional filtering"""
try:
# Build WHERE clause
where_conditions = []
params = []
if discord_id is not None:
where_conditions.append("discord_id = %s")
params.append(discord_id)
where_clause = "WHERE " + " AND ".join(where_conditions) if where_conditions else ""
# Get total count
count_sql = f"SELECT COUNT(*) FROM custom_command_creators {where_clause}"
total_count = db.execute_sql(count_sql, params).fetchone()[0]
# Calculate pagination
offset = (page - 1) * page_size
total_pages = (total_count + page_size - 1) // page_size
# Get creators
sql = f"""
SELECT * FROM custom_command_creators
{where_clause}
ORDER BY username
LIMIT %s OFFSET %s
"""
params.extend([page_size, offset])
cursor = db.execute_sql(sql, params)
results = cursor.fetchall()
# Convert to dict format
creators = []
if results:
columns = [desc[0] for desc in cursor.description]
for row in results:
creator_dict = dict(zip(columns, row))
# Convert datetime to ISO string
if creator_dict.get('created_at') and hasattr(creator_dict['created_at'], 'isoformat'):
creator_dict['created_at'] = creator_dict['created_at'].isoformat()
creators.append(creator_dict)
return {
'creators': creators,
'total_count': total_count,
'page': page,
'page_size': page_size,
'total_pages': total_pages,
'has_more': page < total_pages
}
except Exception as e:
logger.error(f"Error getting creators: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
db.close()
@router.post('/creators', include_in_schema=PRIVATE_IN_SCHEMA)
@handle_db_errors
async def create_creator_endpoint(
creator: CustomCommandCreatorModel,
token: str = Depends(oauth2_scheme)
):
"""Create a new command creator"""
if not valid_token(token):
logger.warning(f'create_creator - Bad Token: {token}')
raise HTTPException(status_code=401, detail='Unauthorized')
try:
# Check if creator already exists
existing = get_creator_by_discord_id(creator.discord_id)
if existing:
raise HTTPException(status_code=409, detail=f"Creator with Discord ID {creator.discord_id} already exists")
# Create the creator
creator_data = creator.model_dump(exclude={'id'})
creator_id = create_creator(creator_data)
# Return the created creator
cursor = db.execute_sql("SELECT * FROM custom_command_creators WHERE id = %s", (creator_id,))
result = cursor.fetchone()
if result:
columns = [desc[0] for desc in cursor.description]
return dict(zip(columns, result))
else:
raise HTTPException(status_code=500, detail="Failed to retrieve created creator")
except HTTPException:
raise
except Exception as e:
logger.error(f"Error creating creator: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
db.close()
@router.get('/stats')
@handle_db_errors
async def get_custom_command_stats():
"""Get comprehensive statistics about custom commands"""
try:
# Get basic counts
total_commands = db.execute_sql("SELECT COUNT(*) FROM custom_commands").fetchone()[0]
active_commands = db.execute_sql("SELECT COUNT(*) FROM custom_commands WHERE is_active = TRUE").fetchone()[0]
total_creators = db.execute_sql("SELECT COUNT(*) FROM custom_command_creators").fetchone()[0]
# Get total uses
total_uses_result = db.execute_sql("SELECT SUM(use_count) FROM custom_commands WHERE is_active = TRUE").fetchone()
total_uses = total_uses_result[0] if total_uses_result[0] else 0
# Get most popular command
cursor = db.execute_sql("""
SELECT cc.*, creator.discord_id as creator_discord_id,
creator.username as creator_username,
creator.display_name as creator_display_name
FROM custom_commands cc
LEFT JOIN custom_command_creators creator ON cc.creator_id = creator.id
WHERE cc.is_active = TRUE
ORDER BY cc.use_count DESC
LIMIT 1
""")
most_popular_result = cursor.fetchone()
most_popular_command = None
if most_popular_result:
columns = [desc[0] for desc in cursor.description]
command_dict = dict(zip(columns, most_popular_result))
# Convert datetime fields to ISO strings
for field in ['created_at', 'updated_at', 'last_used']:
if command_dict.get(field) and hasattr(command_dict[field], 'isoformat'):
command_dict[field] = command_dict[field].isoformat()
if command_dict.get('tags'):
try:
command_dict['tags'] = json.loads(command_dict['tags'])
except:
command_dict['tags'] = []
command_dict['creator'] = {
'discord_id': command_dict.pop('creator_discord_id'),
'username': command_dict.pop('creator_username'),
'display_name': command_dict.pop('creator_display_name')
}
most_popular_command = command_dict
# Get most active creator
cursor2 = db.execute_sql("""
SELECT * FROM custom_command_creators
ORDER BY active_commands DESC
LIMIT 1
""")
most_active_creator_result = cursor2.fetchone()
most_active_creator = None
if most_active_creator_result:
columns2 = [desc[0] for desc in cursor2.description]
most_active_creator = dict(zip(columns2, most_active_creator_result))
# Convert datetime to ISO string
if most_active_creator.get('created_at') and hasattr(most_active_creator['created_at'], 'isoformat'):
most_active_creator['created_at'] = most_active_creator['created_at'].isoformat()
# Get recent commands count (last 7 days)
week_ago = (datetime.now() - timedelta(days=7)).isoformat()
recent_count = db.execute_sql("""
SELECT COUNT(*) FROM custom_commands
WHERE created_at >= %s AND is_active = TRUE
""", (week_ago,)).fetchone()[0]
# Get cleanup stats
sixty_days_ago = (datetime.now() - timedelta(days=60)).isoformat()
ninety_days_ago = (datetime.now() - timedelta(days=90)).isoformat()
warning_count = db.execute_sql("""
SELECT COUNT(*) FROM custom_commands
WHERE last_used < %s AND warning_sent = FALSE AND is_active = TRUE
""", (sixty_days_ago,)).fetchone()[0]
deletion_count = db.execute_sql("""
SELECT COUNT(*) FROM custom_commands
WHERE last_used < %s AND is_active = TRUE
""", (ninety_days_ago,)).fetchone()[0]
return CustomCommandStatsResponse(
total_commands=total_commands,
active_commands=active_commands,
total_creators=total_creators,
total_uses=total_uses,
most_popular_command=most_popular_command,
most_active_creator=most_active_creator,
recent_commands_count=recent_count,
commands_needing_warning=warning_count,
commands_eligible_for_deletion=deletion_count
)
except Exception as e:
logger.error(f"Error getting custom command stats: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
db.close()
# Special endpoints for Discord bot integration
@router.get('/by_name/{command_name}')
@handle_db_errors
async def get_custom_command_by_name_endpoint(command_name: str):
"""Get a custom command by name (for Discord bot execution)"""
try:
result = get_custom_command_by_name(command_name)
if not result:
raise HTTPException(status_code=404, detail=f"Custom command '{command_name}' not found")
command_dict = dict(result)
# Convert datetime fields to ISO strings
convert_datetime_to_iso(command_dict)
# Parse tags
if command_dict.get('tags'):
try:
command_dict['tags'] = json.loads(command_dict['tags'])
except:
command_dict['tags'] = []
# Add creator info - get full creator record
creator_id = command_dict['creator_id']
creator_cursor = db.execute_sql("SELECT * FROM custom_command_creators WHERE id = %s", (creator_id,))
creator_result = creator_cursor.fetchone()
if creator_result:
creator_columns = [desc[0] for desc in creator_cursor.description]
creator_dict = dict(zip(creator_columns, creator_result))
# Convert creator datetime to ISO string
convert_datetime_to_iso(creator_dict, fields=['created_at'])
command_dict['creator'] = creator_dict
else:
# Fallback to basic info if full creator not found
command_dict['creator'] = {
'id': creator_id,
'discord_id': command_dict.pop('creator_discord_id'),
'username': command_dict.pop('creator_username'),
'display_name': command_dict.pop('creator_display_name'),
'created_at': command_dict['created_at'], # Use command creation as fallback
'total_commands': 0,
'active_commands': 0
}
# Remove the duplicate fields
command_dict.pop('creator_discord_id', None)
command_dict.pop('creator_username', None)
command_dict.pop('creator_display_name', None)
return command_dict
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting custom command by name '{command_name}': {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
db.close()
@router.patch('/by_name/{command_name}/execute', include_in_schema=PRIVATE_IN_SCHEMA)
@handle_db_errors
async def execute_custom_command(
command_name: str,
token: str = Depends(oauth2_scheme)
):
"""Execute a custom command and update usage statistics"""
if not valid_token(token):
logger.warning(f'execute_custom_command - Bad Token: {token}')
raise HTTPException(status_code=401, detail='Unauthorized')
try:
result = get_custom_command_by_name(command_name)
if not result:
raise HTTPException(status_code=404, detail=f"Custom command '{command_name}' not found")
command_dict = dict(result)
command_id = command_dict['id']
# Update usage statistics
update_data = {
'last_used': datetime.now().isoformat(),
'use_count': command_dict['use_count'] + 1,
'warning_sent': False # Reset warning on use
}
update_custom_command(command_id, update_data)
# Return updated command - get_custom_command_by_id already has all creator info
updated_result = get_custom_command_by_id(command_id)
updated_dict = dict(updated_result)
# Convert datetime fields to ISO strings
convert_datetime_to_iso(updated_dict)
if updated_dict.get('tags'):
try:
updated_dict['tags'] = json.loads(updated_dict['tags'])
except:
updated_dict['tags'] = []
# Build creator object from the fields returned by get_custom_command_by_id
creator_created_at = updated_dict.pop('creator_created_at')
if hasattr(creator_created_at, 'isoformat'):
creator_created_at = creator_created_at.isoformat()
updated_dict['creator'] = {
'id': updated_dict.pop('creator_db_id'),
'discord_id': updated_dict.pop('creator_discord_id'),
'username': updated_dict.pop('creator_username'),
'display_name': updated_dict.pop('creator_display_name'),
'created_at': creator_created_at,
'total_commands': updated_dict.pop('total_commands'),
'active_commands': updated_dict.pop('active_commands')
}
return updated_dict
except HTTPException:
raise
except Exception as e:
logger.error(f"Error executing custom command '{command_name}': {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
db.close()
@router.get('/autocomplete')
@handle_db_errors
async def get_command_names_for_autocomplete(
partial_name: str = "",
limit: int = Query(25, ge=1, le=100)
):
"""Get command names for Discord autocomplete"""
try:
if partial_name:
results = db.execute_sql("""
SELECT name FROM custom_commands
WHERE is_active = TRUE AND name ILIKE %s
ORDER BY name
LIMIT %s
""", (f"%{partial_name}%", limit)).fetchall()
else:
results = db.execute_sql("""
SELECT name FROM custom_commands
WHERE is_active = TRUE
ORDER BY name
LIMIT %s
""", (limit,)).fetchall()
return [row[0] for row in results]
except Exception as e:
logger.error(f"Error getting command names for autocomplete: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
db.close()
@router.get('/{command_id}')
@handle_db_errors
async def get_custom_command(command_id: int):
"""Get a single custom command by ID"""
try:
result = get_custom_command_by_id(command_id)
if not result:
raise HTTPException(status_code=404, detail=f"Custom command {command_id} not found")
command_dict = dict(result)
# Convert datetime fields to ISO strings
convert_datetime_to_iso(command_dict)
# Parse tags
if command_dict.get('tags'):
try:
command_dict['tags'] = json.loads(command_dict['tags'])
except:
command_dict['tags'] = []
creator_created_at = command_dict.pop('creator_created_at')
if hasattr(creator_created_at, 'isoformat'):
creator_created_at = creator_created_at.isoformat()
# Add creator info
command_dict['creator'] = {
'id': command_dict.pop('creator_db_id'),
'discord_id': command_dict.pop('creator_discord_id'),
'username': command_dict.pop('creator_username'),
'display_name': command_dict.pop('creator_display_name'),
'created_at': creator_created_at,
'total_commands': command_dict.pop('total_commands'),
'active_commands': command_dict.pop('active_commands')
}
return command_dict
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting custom command {command_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
db.close()