Changed CustomCommandCreatorModel.id from required `int` to `Optional[int] = None` to allow POST requests to create new creators without specifying an ID (database auto-generates it). Bug: Users couldn't create custom commands with /new-cc - API returned 422 error "Field required" for missing id field. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
1010 lines
36 KiB
Python
1010 lines
36 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, Query, Response
|
|
from typing import List, Optional, Dict, Any
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from pydantic import BaseModel, Field
|
|
import json
|
|
from playhouse.shortcuts import model_to_dict
|
|
from peewee import fn
|
|
|
|
from ..dependencies import oauth2_scheme, valid_token, PRIVATE_IN_SCHEMA, handle_db_errors
|
|
from ..db_engine import db, CustomCommand, CustomCommandCreator
|
|
|
|
logger = logging.getLogger('database_api')
|
|
|
|
router = APIRouter(
|
|
prefix='/api/v3/custom_commands',
|
|
tags=['custom_commands']
|
|
)
|
|
|
|
|
|
# Pydantic Models for API
|
|
class CustomCommandCreatorModel(BaseModel):
|
|
id: Optional[int] = None # Optional for POST (auto-generated), required on response
|
|
discord_id: int
|
|
username: str
|
|
display_name: Optional[str] = None
|
|
created_at: str
|
|
total_commands: int = 0
|
|
active_commands: int = 0
|
|
|
|
|
|
class CustomCommandModel(BaseModel):
|
|
id: Optional[int] = None
|
|
name: str = Field(..., min_length=2, max_length=32)
|
|
content: str = Field(..., min_length=1, max_length=2000)
|
|
creator_id: int
|
|
creator: Optional[CustomCommandCreatorModel] = None
|
|
created_at: Optional[str] = None
|
|
updated_at: Optional[str] = None
|
|
last_used: Optional[str] = None
|
|
use_count: int = 0
|
|
warning_sent: bool = False
|
|
is_active: bool = True
|
|
tags: Optional[List[str]] = None
|
|
|
|
|
|
class CustomCommandListResponse(BaseModel):
|
|
custom_commands: List[CustomCommandModel]
|
|
total_count: int
|
|
page: int
|
|
page_size: int
|
|
total_pages: int
|
|
has_more: bool
|
|
|
|
|
|
class CustomCommandStatsResponse(BaseModel):
|
|
total_commands: int
|
|
active_commands: int
|
|
total_creators: int
|
|
total_uses: int
|
|
most_popular_command: Optional[Dict[str, Any]] = None
|
|
most_active_creator: Optional[Dict[str, Any]] = None
|
|
recent_commands_count: int = 0
|
|
commands_needing_warning: int = 0
|
|
commands_eligible_for_deletion: int = 0
|
|
|
|
|
|
# Helper functions
|
|
def convert_datetime_to_iso(data_dict, fields=None):
|
|
"""Convert datetime objects to ISO strings in a dictionary."""
|
|
if fields is None:
|
|
fields = ['created_at', 'updated_at', 'last_used']
|
|
for field in fields:
|
|
if data_dict.get(field) and hasattr(data_dict[field], 'isoformat'):
|
|
data_dict[field] = data_dict[field].isoformat()
|
|
return data_dict
|
|
|
|
|
|
def update_creator_stats(creator_id: int):
|
|
"""Update creator command counts"""
|
|
creator = CustomCommandCreator.get_or_none(CustomCommandCreator.id == creator_id)
|
|
if not creator:
|
|
return
|
|
|
|
total = CustomCommand.select().where(CustomCommand.creator == creator).count()
|
|
active = CustomCommand.select().where(
|
|
(CustomCommand.creator == creator) & (CustomCommand.is_active == True)
|
|
).count()
|
|
|
|
creator.total_commands = total
|
|
creator.active_commands = active
|
|
creator.save()
|
|
|
|
|
|
def get_custom_command_by_name(name: str):
|
|
"""Get a custom command by name with creator info"""
|
|
command = CustomCommand.select(
|
|
CustomCommand,
|
|
CustomCommandCreator
|
|
).join(
|
|
CustomCommandCreator, on=(CustomCommand.creator == CustomCommandCreator.id)
|
|
).where(
|
|
fn.LOWER(CustomCommand.name) == name.lower()
|
|
).first()
|
|
|
|
if command:
|
|
result = model_to_dict(command, recurse=False)
|
|
# Ensure creator_id is in the result (it should be from model_to_dict, but make sure)
|
|
result['creator_id'] = command.creator.id
|
|
result['creator_discord_id'] = command.creator.discord_id
|
|
result['creator_username'] = command.creator.username
|
|
result['creator_display_name'] = command.creator.display_name
|
|
return result
|
|
return None
|
|
|
|
|
|
def get_custom_command_by_id(command_id: int):
|
|
"""Get a custom command by ID with creator info"""
|
|
command = CustomCommand.select(
|
|
CustomCommand,
|
|
CustomCommandCreator
|
|
).join(
|
|
CustomCommandCreator, on=(CustomCommand.creator == CustomCommandCreator.id)
|
|
).where(
|
|
CustomCommand.id == command_id
|
|
).first()
|
|
|
|
if command:
|
|
result = model_to_dict(command, recurse=False)
|
|
result['creator_db_id'] = command.creator.id
|
|
result['creator_discord_id'] = command.creator.discord_id
|
|
result['creator_username'] = command.creator.username
|
|
result['creator_display_name'] = command.creator.display_name
|
|
result['creator_created_at'] = command.creator.created_at
|
|
result['total_commands'] = command.creator.total_commands
|
|
result['active_commands'] = command.creator.active_commands
|
|
return result
|
|
return None
|
|
|
|
|
|
def create_custom_command(command_data: dict) -> int:
|
|
"""Create a new custom command and return its ID"""
|
|
# Convert tags list to JSON if present
|
|
if 'tags' in command_data and isinstance(command_data['tags'], list):
|
|
command_data['tags'] = json.dumps(command_data['tags'])
|
|
|
|
# Set created_at if not provided
|
|
if 'created_at' not in command_data:
|
|
command_data['created_at'] = datetime.now()
|
|
|
|
command = CustomCommand.create(**command_data)
|
|
return command.id
|
|
|
|
|
|
def update_custom_command(command_id: int, update_data: dict):
|
|
"""Update an existing custom command"""
|
|
# Convert tags list to JSON if present
|
|
if 'tags' in update_data and isinstance(update_data['tags'], list):
|
|
update_data['tags'] = json.dumps(update_data['tags'])
|
|
|
|
query = CustomCommand.update(**update_data).where(CustomCommand.id == command_id)
|
|
query.execute()
|
|
|
|
|
|
def delete_custom_command(command_id: int):
|
|
"""Delete a custom command"""
|
|
query = CustomCommand.delete().where(CustomCommand.id == command_id)
|
|
query.execute()
|
|
|
|
|
|
def get_creator_by_discord_id(discord_id: int):
|
|
"""Get a creator by Discord ID"""
|
|
creator = CustomCommandCreator.get_or_none(CustomCommandCreator.discord_id == str(discord_id))
|
|
if creator:
|
|
return model_to_dict(creator)
|
|
return None
|
|
|
|
|
|
def create_creator(creator_data: dict) -> int:
|
|
"""Create a new creator and return their ID"""
|
|
if 'created_at' not in creator_data:
|
|
creator_data['created_at'] = datetime.now()
|
|
|
|
creator = CustomCommandCreator.create(**creator_data)
|
|
return creator.id
|
|
|
|
|
|
# API Endpoints
|
|
|
|
@router.get('')
|
|
@handle_db_errors
|
|
async def get_custom_commands(
|
|
name: Optional[str] = None,
|
|
creator_discord_id: Optional[int] = None,
|
|
min_uses: Optional[int] = None,
|
|
max_days_unused: Optional[int] = None,
|
|
is_active: Optional[bool] = True,
|
|
sort: Optional[str] = 'name',
|
|
page: int = Query(1, ge=1),
|
|
page_size: int = Query(25, ge=1, le=100)
|
|
):
|
|
"""Get custom commands with filtering and pagination"""
|
|
try:
|
|
# Build WHERE clause
|
|
where_conditions = []
|
|
params = []
|
|
|
|
if name is not None:
|
|
where_conditions.append("cc.name ILIKE %s")
|
|
params.append(f"%{name}%")
|
|
|
|
if creator_discord_id is not None:
|
|
where_conditions.append("creator.discord_id = %s")
|
|
params.append(creator_discord_id)
|
|
|
|
if min_uses is not None:
|
|
where_conditions.append("cc.use_count >= %s")
|
|
params.append(min_uses)
|
|
|
|
if max_days_unused is not None:
|
|
cutoff_date = (datetime.now() - timedelta(days=max_days_unused)).isoformat()
|
|
where_conditions.append("cc.last_used >= %s")
|
|
params.append(cutoff_date)
|
|
|
|
if is_active is not None:
|
|
where_conditions.append("cc.is_active = %s")
|
|
params.append(is_active)
|
|
|
|
where_clause = "WHERE " + " AND ".join(where_conditions) if where_conditions else ""
|
|
|
|
# Build ORDER BY clause
|
|
sort_mapping = {
|
|
'name': 'cc.name',
|
|
'created_at': 'cc.created_at',
|
|
'last_used': 'cc.last_used',
|
|
'use_count': 'cc.use_count',
|
|
'creator': 'creator.username'
|
|
}
|
|
|
|
if sort.startswith('-'):
|
|
order_direction = 'DESC'
|
|
sort_field = sort[1:]
|
|
else:
|
|
order_direction = 'ASC'
|
|
sort_field = sort
|
|
|
|
order_by = sort_mapping.get(sort_field, 'cc.name')
|
|
order_clause = f"ORDER BY {order_by} {order_direction}"
|
|
|
|
# Get total count
|
|
count_sql = f"""
|
|
SELECT COUNT(*)
|
|
FROM custom_commands cc
|
|
LEFT JOIN custom_command_creators creator ON cc.creator_id = creator.id
|
|
{where_clause}
|
|
"""
|
|
total_count = db.execute_sql(count_sql, params).fetchone()[0]
|
|
|
|
# Calculate pagination
|
|
offset = (page - 1) * page_size
|
|
total_pages = (total_count + page_size - 1) // page_size
|
|
|
|
# Get commands
|
|
sql = f"""
|
|
SELECT cc.*, creator.discord_id as creator_discord_id,
|
|
creator.username as creator_username,
|
|
creator.display_name as creator_display_name
|
|
FROM custom_commands cc
|
|
LEFT JOIN custom_command_creators creator ON cc.creator_id = creator.id
|
|
{where_clause}
|
|
{order_clause}
|
|
LIMIT %s OFFSET %s
|
|
"""
|
|
|
|
params.extend([page_size, offset])
|
|
cursor3 = db.execute_sql(sql, params)
|
|
results = cursor3.fetchall()
|
|
|
|
# Convert to CustomCommandModel objects with creator info
|
|
commands = []
|
|
if results:
|
|
columns3 = [desc[0] for desc in cursor3.description]
|
|
for row in results:
|
|
command_dict = dict(zip(columns3, row))
|
|
|
|
# Parse tags if they exist
|
|
if command_dict.get('tags'):
|
|
try:
|
|
command_dict['tags'] = json.loads(command_dict['tags'])
|
|
except:
|
|
command_dict['tags'] = []
|
|
|
|
# Get full creator information
|
|
creator_id = command_dict['creator_id']
|
|
creator_cursor = db.execute_sql("SELECT * FROM custom_command_creators WHERE id = %s", (creator_id,))
|
|
creator_result = creator_cursor.fetchone()
|
|
|
|
if creator_result:
|
|
# Create complete creator object
|
|
creator_columns = [desc[0] for desc in creator_cursor.description]
|
|
creator_dict = dict(zip(creator_columns, creator_result))
|
|
# Convert datetime to ISO string
|
|
if creator_dict.get('created_at') and hasattr(creator_dict['created_at'], 'isoformat'):
|
|
creator_dict['created_at'] = creator_dict['created_at'].isoformat()
|
|
try:
|
|
creator_model = CustomCommandCreatorModel(**creator_dict)
|
|
command_dict['creator'] = creator_model
|
|
except Exception as e:
|
|
logger.error(f"Error creating CustomCommandCreatorModel: {e}, data: {creator_dict}")
|
|
command_dict['creator'] = None
|
|
else:
|
|
# No creator found, set to None
|
|
command_dict['creator'] = None
|
|
|
|
# Remove the individual creator fields now that we have the creator object
|
|
command_dict.pop('creator_discord_id', None)
|
|
command_dict.pop('creator_username', None)
|
|
command_dict.pop('creator_display_name', None)
|
|
|
|
# Convert datetime fields to ISO strings
|
|
for field in ['created_at', 'updated_at', 'last_used']:
|
|
if command_dict.get(field) and hasattr(command_dict[field], 'isoformat'):
|
|
command_dict[field] = command_dict[field].isoformat()
|
|
|
|
# Create CustomCommandModel instance
|
|
try:
|
|
command_model = CustomCommandModel(**command_dict)
|
|
commands.append(command_model)
|
|
except Exception as e:
|
|
logger.error(f"Error creating CustomCommandModel: {e}, data: {command_dict}")
|
|
# Skip invalid commands rather than failing the entire request
|
|
continue
|
|
|
|
return CustomCommandListResponse(
|
|
custom_commands=commands,
|
|
total_count=total_count,
|
|
page=page,
|
|
page_size=page_size,
|
|
total_pages=total_pages,
|
|
has_more=page < total_pages
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting custom commands: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# Move this route to after the specific string routes
|
|
|
|
|
|
@router.post('', include_in_schema=PRIVATE_IN_SCHEMA)
|
|
@handle_db_errors
|
|
async def create_custom_command_endpoint(
|
|
command: CustomCommandModel,
|
|
token: str = Depends(oauth2_scheme)
|
|
):
|
|
"""Create a new custom command"""
|
|
if not valid_token(token):
|
|
logger.warning(f'create_custom_command - Bad Token: {token}')
|
|
raise HTTPException(status_code=401, detail='Unauthorized')
|
|
|
|
try:
|
|
# Check if command name already exists
|
|
existing = get_custom_command_by_name(command.name)
|
|
if existing:
|
|
raise HTTPException(status_code=409, detail=f"Command '{command.name}' already exists")
|
|
|
|
# Create the command
|
|
command_data = command.model_dump(exclude={'id', 'creator'})
|
|
command_id = create_custom_command(command_data)
|
|
|
|
# Update creator stats
|
|
update_creator_stats(command.creator_id)
|
|
|
|
# Return the created command
|
|
result = get_custom_command_by_id(command_id)
|
|
command_dict = dict(result)
|
|
|
|
# Convert datetime fields
|
|
convert_datetime_to_iso(command_dict)
|
|
|
|
if command_dict.get('tags'):
|
|
try:
|
|
command_dict['tags'] = json.loads(command_dict['tags'])
|
|
except:
|
|
command_dict['tags'] = []
|
|
|
|
creator_created_at = command_dict.pop('creator_created_at')
|
|
if hasattr(creator_created_at, 'isoformat'):
|
|
creator_created_at = creator_created_at.isoformat()
|
|
|
|
command_dict['creator'] = {
|
|
'id': command_dict.pop('creator_db_id'),
|
|
'discord_id': command_dict.pop('creator_discord_id'),
|
|
'username': command_dict.pop('creator_username'),
|
|
'display_name': command_dict.pop('creator_display_name'),
|
|
'created_at': creator_created_at,
|
|
'total_commands': command_dict.pop('total_commands'),
|
|
'active_commands': command_dict.pop('active_commands')
|
|
}
|
|
|
|
return command_dict
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error creating custom command: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@router.put('/{command_id}', include_in_schema=PRIVATE_IN_SCHEMA)
|
|
@handle_db_errors
|
|
async def update_custom_command_endpoint(
|
|
command_id: int,
|
|
command: CustomCommandModel,
|
|
token: str = Depends(oauth2_scheme)
|
|
):
|
|
"""Update an existing custom command"""
|
|
if not valid_token(token):
|
|
logger.warning(f'update_custom_command - Bad Token: {token}')
|
|
raise HTTPException(status_code=401, detail='Unauthorized')
|
|
|
|
try:
|
|
# Check if command exists
|
|
existing = get_custom_command_by_id(command_id)
|
|
if not existing:
|
|
raise HTTPException(status_code=404, detail=f"Custom command {command_id} not found")
|
|
|
|
# Update the command
|
|
update_data = command.model_dump(exclude={'id', 'creator'})
|
|
update_data['updated_at'] = datetime.now().isoformat()
|
|
update_custom_command(command_id, update_data)
|
|
|
|
# Return updated command
|
|
result = get_custom_command_by_id(command_id)
|
|
command_dict = dict(result)
|
|
|
|
# Convert datetime fields
|
|
convert_datetime_to_iso(command_dict)
|
|
|
|
if command_dict.get('tags'):
|
|
try:
|
|
command_dict['tags'] = json.loads(command_dict['tags'])
|
|
except:
|
|
command_dict['tags'] = []
|
|
|
|
creator_created_at = command_dict.pop('creator_created_at')
|
|
if hasattr(creator_created_at, 'isoformat'):
|
|
creator_created_at = creator_created_at.isoformat()
|
|
|
|
command_dict['creator'] = {
|
|
'id': command_dict.pop('creator_db_id'),
|
|
'discord_id': command_dict.pop('creator_discord_id'),
|
|
'username': command_dict.pop('creator_username'),
|
|
'display_name': command_dict.pop('creator_display_name'),
|
|
'created_at': creator_created_at,
|
|
'total_commands': command_dict.pop('total_commands'),
|
|
'active_commands': command_dict.pop('active_commands')
|
|
}
|
|
|
|
return command_dict
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error updating custom command {command_id}: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@router.patch('/{command_id}', include_in_schema=PRIVATE_IN_SCHEMA)
|
|
@handle_db_errors
|
|
async def patch_custom_command(
|
|
command_id: int,
|
|
token: str = Depends(oauth2_scheme),
|
|
content: Optional[str] = None,
|
|
tags: Optional[List[str]] = None,
|
|
use_count: Optional[int] = None,
|
|
last_used: Optional[str] = None,
|
|
warning_sent: Optional[bool] = None,
|
|
is_active: Optional[bool] = None
|
|
):
|
|
"""Partially update a custom command"""
|
|
if not valid_token(token):
|
|
logger.warning(f'patch_custom_command - Bad Token: {token}')
|
|
raise HTTPException(status_code=401, detail='Unauthorized')
|
|
|
|
try:
|
|
# Check if command exists
|
|
existing = get_custom_command_by_id(command_id)
|
|
if not existing:
|
|
raise HTTPException(status_code=404, detail=f"Custom command {command_id} not found")
|
|
|
|
# Build update data
|
|
update_data = {}
|
|
if content is not None:
|
|
update_data['content'] = content
|
|
update_data['updated_at'] = datetime.now().isoformat()
|
|
if tags is not None:
|
|
update_data['tags'] = tags
|
|
if use_count is not None:
|
|
update_data['use_count'] = use_count
|
|
if last_used is not None:
|
|
update_data['last_used'] = last_used
|
|
if warning_sent is not None:
|
|
update_data['warning_sent'] = warning_sent
|
|
if is_active is not None:
|
|
update_data['is_active'] = is_active
|
|
|
|
if not update_data:
|
|
raise HTTPException(status_code=400, detail="No fields to update")
|
|
|
|
# Update the command
|
|
update_custom_command(command_id, update_data)
|
|
|
|
# Return updated command
|
|
result = get_custom_command_by_id(command_id)
|
|
command_dict = dict(result)
|
|
|
|
# Convert datetime fields to ISO strings
|
|
convert_datetime_to_iso(command_dict)
|
|
|
|
if command_dict.get('tags'):
|
|
try:
|
|
command_dict['tags'] = json.loads(command_dict['tags'])
|
|
except:
|
|
command_dict['tags'] = []
|
|
|
|
creator_created_at = command_dict.pop('creator_created_at')
|
|
if hasattr(creator_created_at, 'isoformat'):
|
|
creator_created_at = creator_created_at.isoformat()
|
|
|
|
command_dict['creator'] = {
|
|
'id': command_dict.pop('creator_db_id'),
|
|
'discord_id': command_dict.pop('creator_discord_id'),
|
|
'username': command_dict.pop('creator_username'),
|
|
'display_name': command_dict.pop('creator_display_name'),
|
|
'created_at': creator_created_at,
|
|
'total_commands': command_dict.pop('total_commands'),
|
|
'active_commands': command_dict.pop('active_commands')
|
|
}
|
|
|
|
return command_dict
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error patching custom command {command_id}: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@router.delete('/{command_id}', include_in_schema=PRIVATE_IN_SCHEMA)
|
|
@handle_db_errors
|
|
async def delete_custom_command_endpoint(
|
|
command_id: int,
|
|
token: str = Depends(oauth2_scheme)
|
|
):
|
|
"""Delete a custom command"""
|
|
if not valid_token(token):
|
|
logger.warning(f'delete_custom_command - Bad Token: {token}')
|
|
raise HTTPException(status_code=401, detail='Unauthorized')
|
|
|
|
try:
|
|
# Check if command exists
|
|
existing = get_custom_command_by_id(command_id)
|
|
if not existing:
|
|
raise HTTPException(status_code=404, detail=f"Custom command {command_id} not found")
|
|
|
|
creator_id = existing['creator_id']
|
|
|
|
# Delete the command
|
|
delete_custom_command(command_id)
|
|
|
|
# Update creator stats
|
|
update_creator_stats(creator_id)
|
|
|
|
return {"message": f"Custom command {command_id} deleted successfully"}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error deleting custom command {command_id}: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# Creator endpoints
|
|
@router.get('/creators')
|
|
@handle_db_errors
|
|
async def get_creators(
|
|
discord_id: Optional[int] = None,
|
|
page: int = Query(1, ge=1),
|
|
page_size: int = Query(25, ge=1, le=100)
|
|
):
|
|
"""Get custom command creators with optional filtering"""
|
|
try:
|
|
# Build WHERE clause
|
|
where_conditions = []
|
|
params = []
|
|
|
|
if discord_id is not None:
|
|
where_conditions.append("discord_id = %s")
|
|
params.append(discord_id)
|
|
|
|
where_clause = "WHERE " + " AND ".join(where_conditions) if where_conditions else ""
|
|
|
|
# Get total count
|
|
count_sql = f"SELECT COUNT(*) FROM custom_command_creators {where_clause}"
|
|
total_count = db.execute_sql(count_sql, params).fetchone()[0]
|
|
|
|
# Calculate pagination
|
|
offset = (page - 1) * page_size
|
|
total_pages = (total_count + page_size - 1) // page_size
|
|
|
|
# Get creators
|
|
sql = f"""
|
|
SELECT * FROM custom_command_creators
|
|
{where_clause}
|
|
ORDER BY username
|
|
LIMIT %s OFFSET %s
|
|
"""
|
|
|
|
params.extend([page_size, offset])
|
|
cursor = db.execute_sql(sql, params)
|
|
results = cursor.fetchall()
|
|
|
|
# Convert to dict format
|
|
creators = []
|
|
if results:
|
|
columns = [desc[0] for desc in cursor.description]
|
|
for row in results:
|
|
creator_dict = dict(zip(columns, row))
|
|
# Convert datetime to ISO string
|
|
if creator_dict.get('created_at') and hasattr(creator_dict['created_at'], 'isoformat'):
|
|
creator_dict['created_at'] = creator_dict['created_at'].isoformat()
|
|
creators.append(creator_dict)
|
|
|
|
return {
|
|
'creators': creators,
|
|
'total_count': total_count,
|
|
'page': page,
|
|
'page_size': page_size,
|
|
'total_pages': total_pages,
|
|
'has_more': page < total_pages
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting creators: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@router.post('/creators', include_in_schema=PRIVATE_IN_SCHEMA)
|
|
@handle_db_errors
|
|
async def create_creator_endpoint(
|
|
creator: CustomCommandCreatorModel,
|
|
token: str = Depends(oauth2_scheme)
|
|
):
|
|
"""Create a new command creator"""
|
|
if not valid_token(token):
|
|
logger.warning(f'create_creator - Bad Token: {token}')
|
|
raise HTTPException(status_code=401, detail='Unauthorized')
|
|
|
|
try:
|
|
# Check if creator already exists
|
|
existing = get_creator_by_discord_id(creator.discord_id)
|
|
if existing:
|
|
raise HTTPException(status_code=409, detail=f"Creator with Discord ID {creator.discord_id} already exists")
|
|
|
|
# Create the creator
|
|
creator_data = creator.model_dump(exclude={'id'})
|
|
creator_id = create_creator(creator_data)
|
|
|
|
# Return the created creator
|
|
cursor = db.execute_sql("SELECT * FROM custom_command_creators WHERE id = %s", (creator_id,))
|
|
result = cursor.fetchone()
|
|
if result:
|
|
columns = [desc[0] for desc in cursor.description]
|
|
return dict(zip(columns, result))
|
|
else:
|
|
raise HTTPException(status_code=500, detail="Failed to retrieve created creator")
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error creating creator: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@router.get('/stats')
|
|
@handle_db_errors
|
|
async def get_custom_command_stats():
|
|
"""Get comprehensive statistics about custom commands"""
|
|
try:
|
|
# Get basic counts
|
|
total_commands = db.execute_sql("SELECT COUNT(*) FROM custom_commands").fetchone()[0]
|
|
active_commands = db.execute_sql("SELECT COUNT(*) FROM custom_commands WHERE is_active = TRUE").fetchone()[0]
|
|
total_creators = db.execute_sql("SELECT COUNT(*) FROM custom_command_creators").fetchone()[0]
|
|
|
|
# Get total uses
|
|
total_uses_result = db.execute_sql("SELECT SUM(use_count) FROM custom_commands WHERE is_active = TRUE").fetchone()
|
|
total_uses = total_uses_result[0] if total_uses_result[0] else 0
|
|
|
|
# Get most popular command
|
|
cursor = db.execute_sql("""
|
|
SELECT cc.*, creator.discord_id as creator_discord_id,
|
|
creator.username as creator_username,
|
|
creator.display_name as creator_display_name
|
|
FROM custom_commands cc
|
|
LEFT JOIN custom_command_creators creator ON cc.creator_id = creator.id
|
|
WHERE cc.is_active = TRUE
|
|
ORDER BY cc.use_count DESC
|
|
LIMIT 1
|
|
""")
|
|
|
|
most_popular_result = cursor.fetchone()
|
|
most_popular_command = None
|
|
if most_popular_result:
|
|
columns = [desc[0] for desc in cursor.description]
|
|
command_dict = dict(zip(columns, most_popular_result))
|
|
# Convert datetime fields to ISO strings
|
|
for field in ['created_at', 'updated_at', 'last_used']:
|
|
if command_dict.get(field) and hasattr(command_dict[field], 'isoformat'):
|
|
command_dict[field] = command_dict[field].isoformat()
|
|
if command_dict.get('tags'):
|
|
try:
|
|
command_dict['tags'] = json.loads(command_dict['tags'])
|
|
except:
|
|
command_dict['tags'] = []
|
|
command_dict['creator'] = {
|
|
'discord_id': command_dict.pop('creator_discord_id'),
|
|
'username': command_dict.pop('creator_username'),
|
|
'display_name': command_dict.pop('creator_display_name')
|
|
}
|
|
most_popular_command = command_dict
|
|
|
|
# Get most active creator
|
|
cursor2 = db.execute_sql("""
|
|
SELECT * FROM custom_command_creators
|
|
ORDER BY active_commands DESC
|
|
LIMIT 1
|
|
""")
|
|
|
|
most_active_creator_result = cursor2.fetchone()
|
|
most_active_creator = None
|
|
if most_active_creator_result:
|
|
columns2 = [desc[0] for desc in cursor2.description]
|
|
most_active_creator = dict(zip(columns2, most_active_creator_result))
|
|
# Convert datetime to ISO string
|
|
if most_active_creator.get('created_at') and hasattr(most_active_creator['created_at'], 'isoformat'):
|
|
most_active_creator['created_at'] = most_active_creator['created_at'].isoformat()
|
|
|
|
# Get recent commands count (last 7 days)
|
|
week_ago = (datetime.now() - timedelta(days=7)).isoformat()
|
|
recent_count = db.execute_sql("""
|
|
SELECT COUNT(*) FROM custom_commands
|
|
WHERE created_at >= %s AND is_active = TRUE
|
|
""", (week_ago,)).fetchone()[0]
|
|
|
|
# Get cleanup stats
|
|
sixty_days_ago = (datetime.now() - timedelta(days=60)).isoformat()
|
|
ninety_days_ago = (datetime.now() - timedelta(days=90)).isoformat()
|
|
|
|
warning_count = db.execute_sql("""
|
|
SELECT COUNT(*) FROM custom_commands
|
|
WHERE last_used < %s AND warning_sent = FALSE AND is_active = TRUE
|
|
""", (sixty_days_ago,)).fetchone()[0]
|
|
|
|
deletion_count = db.execute_sql("""
|
|
SELECT COUNT(*) FROM custom_commands
|
|
WHERE last_used < %s AND is_active = TRUE
|
|
""", (ninety_days_ago,)).fetchone()[0]
|
|
|
|
return CustomCommandStatsResponse(
|
|
total_commands=total_commands,
|
|
active_commands=active_commands,
|
|
total_creators=total_creators,
|
|
total_uses=total_uses,
|
|
most_popular_command=most_popular_command,
|
|
most_active_creator=most_active_creator,
|
|
recent_commands_count=recent_count,
|
|
commands_needing_warning=warning_count,
|
|
commands_eligible_for_deletion=deletion_count
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting custom command stats: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# Special endpoints for Discord bot integration
|
|
@router.get('/by_name/{command_name}')
|
|
@handle_db_errors
|
|
async def get_custom_command_by_name_endpoint(command_name: str):
|
|
"""Get a custom command by name (for Discord bot execution)"""
|
|
try:
|
|
result = get_custom_command_by_name(command_name)
|
|
|
|
if not result:
|
|
raise HTTPException(status_code=404, detail=f"Custom command '{command_name}' not found")
|
|
|
|
command_dict = dict(result)
|
|
|
|
# Convert datetime fields to ISO strings
|
|
convert_datetime_to_iso(command_dict)
|
|
|
|
# Parse tags
|
|
if command_dict.get('tags'):
|
|
try:
|
|
command_dict['tags'] = json.loads(command_dict['tags'])
|
|
except:
|
|
command_dict['tags'] = []
|
|
|
|
# Add creator info - get full creator record
|
|
creator_id = command_dict['creator_id']
|
|
creator_cursor = db.execute_sql("SELECT * FROM custom_command_creators WHERE id = %s", (creator_id,))
|
|
creator_result = creator_cursor.fetchone()
|
|
if creator_result:
|
|
creator_columns = [desc[0] for desc in creator_cursor.description]
|
|
creator_dict = dict(zip(creator_columns, creator_result))
|
|
# Convert creator datetime to ISO string
|
|
convert_datetime_to_iso(creator_dict, fields=['created_at'])
|
|
command_dict['creator'] = creator_dict
|
|
else:
|
|
# Fallback to basic info if full creator not found
|
|
command_dict['creator'] = {
|
|
'id': creator_id,
|
|
'discord_id': command_dict.pop('creator_discord_id'),
|
|
'username': command_dict.pop('creator_username'),
|
|
'display_name': command_dict.pop('creator_display_name'),
|
|
'created_at': command_dict['created_at'], # Use command creation as fallback
|
|
'total_commands': 0,
|
|
'active_commands': 0
|
|
}
|
|
|
|
# Remove the duplicate fields
|
|
command_dict.pop('creator_discord_id', None)
|
|
command_dict.pop('creator_username', None)
|
|
command_dict.pop('creator_display_name', None)
|
|
|
|
return command_dict
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error getting custom command by name '{command_name}': {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@router.patch('/by_name/{command_name}/execute', include_in_schema=PRIVATE_IN_SCHEMA)
|
|
@handle_db_errors
|
|
async def execute_custom_command(
|
|
command_name: str,
|
|
token: str = Depends(oauth2_scheme)
|
|
):
|
|
"""Execute a custom command and update usage statistics"""
|
|
if not valid_token(token):
|
|
logger.warning(f'execute_custom_command - Bad Token: {token}')
|
|
raise HTTPException(status_code=401, detail='Unauthorized')
|
|
|
|
try:
|
|
result = get_custom_command_by_name(command_name)
|
|
|
|
if not result:
|
|
raise HTTPException(status_code=404, detail=f"Custom command '{command_name}' not found")
|
|
|
|
command_dict = dict(result)
|
|
command_id = command_dict['id']
|
|
|
|
# Update usage statistics
|
|
update_data = {
|
|
'last_used': datetime.now().isoformat(),
|
|
'use_count': command_dict['use_count'] + 1,
|
|
'warning_sent': False # Reset warning on use
|
|
}
|
|
|
|
update_custom_command(command_id, update_data)
|
|
|
|
# Return updated command - get_custom_command_by_id already has all creator info
|
|
updated_result = get_custom_command_by_id(command_id)
|
|
updated_dict = dict(updated_result)
|
|
|
|
# Convert datetime fields to ISO strings
|
|
convert_datetime_to_iso(updated_dict)
|
|
|
|
if updated_dict.get('tags'):
|
|
try:
|
|
updated_dict['tags'] = json.loads(updated_dict['tags'])
|
|
except:
|
|
updated_dict['tags'] = []
|
|
|
|
# Build creator object from the fields returned by get_custom_command_by_id
|
|
creator_created_at = updated_dict.pop('creator_created_at')
|
|
if hasattr(creator_created_at, 'isoformat'):
|
|
creator_created_at = creator_created_at.isoformat()
|
|
|
|
updated_dict['creator'] = {
|
|
'id': updated_dict.pop('creator_db_id'),
|
|
'discord_id': updated_dict.pop('creator_discord_id'),
|
|
'username': updated_dict.pop('creator_username'),
|
|
'display_name': updated_dict.pop('creator_display_name'),
|
|
'created_at': creator_created_at,
|
|
'total_commands': updated_dict.pop('total_commands'),
|
|
'active_commands': updated_dict.pop('active_commands')
|
|
}
|
|
|
|
return updated_dict
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error executing custom command '{command_name}': {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@router.get('/autocomplete')
|
|
@handle_db_errors
|
|
async def get_command_names_for_autocomplete(
|
|
partial_name: str = "",
|
|
limit: int = Query(25, ge=1, le=100)
|
|
):
|
|
"""Get command names for Discord autocomplete"""
|
|
try:
|
|
if partial_name:
|
|
results = db.execute_sql("""
|
|
SELECT name FROM custom_commands
|
|
WHERE is_active = TRUE AND name ILIKE %s
|
|
ORDER BY name
|
|
LIMIT %s
|
|
""", (f"%{partial_name}%", limit)).fetchall()
|
|
else:
|
|
results = db.execute_sql("""
|
|
SELECT name FROM custom_commands
|
|
WHERE is_active = TRUE
|
|
ORDER BY name
|
|
LIMIT %s
|
|
""", (limit,)).fetchall()
|
|
|
|
return [row[0] for row in results]
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting command names for autocomplete: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@router.get('/{command_id}')
|
|
@handle_db_errors
|
|
async def get_custom_command(command_id: int):
|
|
"""Get a single custom command by ID"""
|
|
try:
|
|
result = get_custom_command_by_id(command_id)
|
|
|
|
if not result:
|
|
raise HTTPException(status_code=404, detail=f"Custom command {command_id} not found")
|
|
|
|
command_dict = dict(result)
|
|
|
|
# Convert datetime fields to ISO strings
|
|
convert_datetime_to_iso(command_dict)
|
|
|
|
# Parse tags
|
|
if command_dict.get('tags'):
|
|
try:
|
|
command_dict['tags'] = json.loads(command_dict['tags'])
|
|
except:
|
|
command_dict['tags'] = []
|
|
|
|
creator_created_at = command_dict.pop('creator_created_at')
|
|
if hasattr(creator_created_at, 'isoformat'):
|
|
creator_created_at = creator_created_at.isoformat()
|
|
|
|
# Add creator info
|
|
command_dict['creator'] = {
|
|
'id': command_dict.pop('creator_db_id'),
|
|
'discord_id': command_dict.pop('creator_discord_id'),
|
|
'username': command_dict.pop('creator_username'),
|
|
'display_name': command_dict.pop('creator_display_name'),
|
|
'created_at': creator_created_at,
|
|
'total_commands': command_dict.pop('total_commands'),
|
|
'active_commands': command_dict.pop('active_commands')
|
|
}
|
|
|
|
return command_dict
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error getting custom command {command_id}: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
finally:
|
|
db.close() |