from fastapi import APIRouter, Depends, HTTPException, Query, Response from typing import List, Optional, Dict, Any import logging from datetime import datetime, timedelta from pydantic import BaseModel, Field import json from ..dependencies import oauth2_scheme, valid_token, PRIVATE_IN_SCHEMA from ..db_engine import db logger = logging.getLogger('database_api') router = APIRouter( prefix='/api/v3/custom_commands', tags=['custom_commands'] ) # Pydantic Models for API class CustomCommandCreatorModel(BaseModel): id: int discord_id: int username: str display_name: Optional[str] = None created_at: str total_commands: int = 0 active_commands: int = 0 class CustomCommandModel(BaseModel): id: Optional[int] = None name: str = Field(..., min_length=2, max_length=32) content: str = Field(..., min_length=1, max_length=2000) creator_id: int creator: Optional[CustomCommandCreatorModel] = None created_at: Optional[str] = None updated_at: Optional[str] = None last_used: Optional[str] = None use_count: int = 0 warning_sent: bool = False is_active: bool = True tags: Optional[List[str]] = None class CustomCommandListResponse(BaseModel): custom_commands: List[CustomCommandModel] total_count: int page: int page_size: int total_pages: int has_more: bool class CustomCommandStatsResponse(BaseModel): total_commands: int active_commands: int total_creators: int total_uses: int most_popular_command: Optional[Dict[str, Any]] = None most_active_creator: Optional[Dict[str, Any]] = None recent_commands_count: int = 0 commands_needing_warning: int = 0 commands_eligible_for_deletion: int = 0 # Helper functions def get_custom_commands_table(): """Get custom commands from database with basic filtering""" cursor = db.execute_sql(""" SELECT cc.*, creator.id as creator_db_id, creator.discord_id as creator_discord_id, creator.username as creator_username, creator.display_name as creator_display_name, creator.created_at as creator_created_at, creator.total_commands, creator.active_commands FROM custom_commands cc LEFT JOIN custom_command_creators creator ON cc.creator_id = creator.id WHERE 1=1 """) results = cursor.fetchall() if results: columns = [desc[0] for desc in cursor.description] return [dict(zip(columns, row)) for row in results] return [] def get_custom_command_by_id(command_id: int): """Get a single custom command by ID""" cursor = db.execute_sql(""" SELECT cc.*, creator.id as creator_db_id, creator.discord_id as creator_discord_id, creator.username as creator_username, creator.display_name as creator_display_name, creator.created_at as creator_created_at, creator.total_commands, creator.active_commands FROM custom_commands cc LEFT JOIN custom_command_creators creator ON cc.creator_id = creator.id WHERE cc.id = ? """, (command_id,)) result = cursor.fetchone() if result: columns = [desc[0] for desc in cursor.description] return dict(zip(columns, result)) return None def get_custom_command_by_name(name: str): """Get a single custom command by name""" cursor = db.execute_sql(""" SELECT cc.*, creator.id as creator_db_id, creator.discord_id as creator_discord_id, creator.username as creator_username, creator.display_name as creator_display_name, creator.created_at as creator_created_at, creator.total_commands, creator.active_commands FROM custom_commands cc LEFT JOIN custom_command_creators creator ON cc.creator_id = creator.id WHERE LOWER(cc.name) = LOWER(?) """, (name,)) result = cursor.fetchone() if result: columns = [desc[0] for desc in cursor.description] return dict(zip(columns, result)) return None def create_custom_command(command_data: Dict[str, Any]) -> int: """Create a new custom command""" now = datetime.now().isoformat() result = db.execute_sql(""" INSERT INTO custom_commands (name, content, creator_id, created_at, last_used, use_count, warning_sent, is_active, tags) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( command_data['name'], command_data['content'], command_data['creator_id'], now, now, 0, False, True, json.dumps(command_data.get('tags', [])) )) return db.execute_sql("SELECT last_insert_rowid()").fetchone()[0] def update_custom_command(command_id: int, update_data: Dict[str, Any]) -> bool: """Update an existing custom command""" set_clauses = [] params = [] for field, value in update_data.items(): if field == 'tags': set_clauses.append(f"{field} = ?") params.append(json.dumps(value)) else: set_clauses.append(f"{field} = ?") params.append(value) if not set_clauses: return False params.append(command_id) sql = f"UPDATE custom_commands SET {', '.join(set_clauses)} WHERE id = ?" db.execute_sql(sql, params) return True def delete_custom_command(command_id: int) -> bool: """Delete a custom command""" db.execute_sql("DELETE FROM custom_commands WHERE id = ?", (command_id,)) return True def get_creator_by_discord_id(discord_id: int): """Get creator by Discord ID""" result = db.execute_sql(""" SELECT * FROM custom_command_creators WHERE discord_id = ? """, (discord_id,)).fetchone() return result def create_creator(creator_data: Dict[str, Any]) -> int: """Create a new command creator""" now = datetime.now().isoformat() result = db.execute_sql(""" INSERT INTO custom_command_creators (discord_id, username, display_name, created_at, total_commands, active_commands) VALUES (?, ?, ?, ?, ?, ?) """, ( creator_data['discord_id'], creator_data['username'], creator_data.get('display_name'), now, 0, 0 )) return db.execute_sql("SELECT last_insert_rowid()").fetchone()[0] def update_creator_stats(creator_id: int): """Update creator command counts""" total = db.execute_sql(""" SELECT COUNT(*) FROM custom_commands WHERE creator_id = ? """, (creator_id,)).fetchone()[0] active = db.execute_sql(""" SELECT COUNT(*) FROM custom_commands WHERE creator_id = ? AND is_active = 1 """, (creator_id,)).fetchone()[0] db.execute_sql(""" UPDATE custom_command_creators SET total_commands = ?, active_commands = ? WHERE id = ? """, (total, active, creator_id)) # API Endpoints @router.get('') async def get_custom_commands( name: Optional[str] = None, creator_discord_id: Optional[int] = None, min_uses: Optional[int] = None, max_days_unused: Optional[int] = None, is_active: Optional[bool] = True, sort: Optional[str] = 'name', page: int = Query(1, ge=1), page_size: int = Query(25, ge=1, le=100) ): """Get custom commands with filtering and pagination""" try: # Build WHERE clause where_conditions = [] params = [] if name is not None: where_conditions.append("LOWER(cc.name) LIKE LOWER(?)" if db.database == 'sqlite' else "cc.name ILIKE ?") params.append(f"%{name}%") if creator_discord_id is not None: where_conditions.append("creator.discord_id = ?") params.append(creator_discord_id) if min_uses is not None: where_conditions.append("cc.use_count >= ?") params.append(min_uses) if max_days_unused is not None: cutoff_date = (datetime.now() - timedelta(days=max_days_unused)).isoformat() where_conditions.append("cc.last_used >= ?") params.append(cutoff_date) if is_active is not None: where_conditions.append("cc.is_active = ?") params.append(1 if is_active else 0) where_clause = "WHERE " + " AND ".join(where_conditions) if where_conditions else "" # Build ORDER BY clause sort_mapping = { 'name': 'cc.name', 'created_at': 'cc.created_at', 'last_used': 'cc.last_used', 'use_count': 'cc.use_count', 'creator': 'creator.username' } if sort.startswith('-'): order_direction = 'DESC' sort_field = sort[1:] else: order_direction = 'ASC' sort_field = sort order_by = sort_mapping.get(sort_field, 'cc.name') order_clause = f"ORDER BY {order_by} {order_direction}" # Get total count count_sql = f""" SELECT COUNT(*) FROM custom_commands cc LEFT JOIN custom_command_creators creator ON cc.creator_id = creator.id {where_clause} """ total_count = db.execute_sql(count_sql, params).fetchone()[0] # Calculate pagination offset = (page - 1) * page_size total_pages = (total_count + page_size - 1) // page_size # Get commands sql = f""" SELECT cc.*, creator.discord_id as creator_discord_id, creator.username as creator_username, creator.display_name as creator_display_name FROM custom_commands cc LEFT JOIN custom_command_creators creator ON cc.creator_id = creator.id {where_clause} {order_clause} LIMIT ? OFFSET ? """ params.extend([page_size, offset]) cursor3 = db.execute_sql(sql, params) results = cursor3.fetchall() # Convert to CustomCommandModel objects with creator info commands = [] if results: columns3 = [desc[0] for desc in cursor3.description] for row in results: command_dict = dict(zip(columns3, row)) # Parse tags if they exist if command_dict.get('tags'): try: command_dict['tags'] = json.loads(command_dict['tags']) except: command_dict['tags'] = [] # Get full creator information creator_id = command_dict['creator_id'] creator_cursor = db.execute_sql("SELECT * FROM custom_command_creators WHERE id = ?", (creator_id,)) creator_result = creator_cursor.fetchone() if creator_result: # Create complete creator object creator_columns = [desc[0] for desc in creator_cursor.description] creator_dict = dict(zip(creator_columns, creator_result)) try: creator_model = CustomCommandCreatorModel(**creator_dict) command_dict['creator'] = creator_model except Exception as e: logger.error(f"Error creating CustomCommandCreatorModel: {e}, data: {creator_dict}") command_dict['creator'] = None else: # No creator found, set to None command_dict['creator'] = None # Remove the individual creator fields now that we have the creator object command_dict.pop('creator_discord_id', None) command_dict.pop('creator_username', None) command_dict.pop('creator_display_name', None) # Create CustomCommandModel instance try: command_model = CustomCommandModel(**command_dict) commands.append(command_model) except Exception as e: logger.error(f"Error creating CustomCommandModel: {e}, data: {command_dict}") # Skip invalid commands rather than failing the entire request continue return CustomCommandListResponse( custom_commands=commands, total_count=total_count, page=page, page_size=page_size, total_pages=total_pages, has_more=page < total_pages ) except Exception as e: logger.error(f"Error getting custom commands: {e}") raise HTTPException(status_code=500, detail=str(e)) finally: db.close() # Move this route to after the specific string routes @router.post('', include_in_schema=PRIVATE_IN_SCHEMA) async def create_custom_command_endpoint( command: CustomCommandModel, token: str = Depends(oauth2_scheme) ): """Create a new custom command""" if not valid_token(token): logger.warning(f'create_custom_command - Bad Token: {token}') raise HTTPException(status_code=401, detail='Unauthorized') try: # Check if command name already exists existing = get_custom_command_by_name(command.name) if existing: raise HTTPException(status_code=409, detail=f"Command '{command.name}' already exists") # Create the command command_data = command.model_dump(exclude={'id'}) command_id = create_custom_command(command_data) # Update creator stats update_creator_stats(command.creator_id) # Return the created command result = get_custom_command_by_id(command_id) command_dict = dict(result) if command_dict.get('tags'): try: command_dict['tags'] = json.loads(command_dict['tags']) except: command_dict['tags'] = [] command_dict['creator'] = { 'id': command_dict.pop('creator_db_id'), 'discord_id': command_dict.pop('creator_discord_id'), 'username': command_dict.pop('creator_username'), 'display_name': command_dict.pop('creator_display_name'), 'created_at': command_dict.pop('creator_created_at'), 'total_commands': command_dict.pop('total_commands'), 'active_commands': command_dict.pop('active_commands') } return command_dict except HTTPException: raise except Exception as e: logger.error(f"Error creating custom command: {e}") raise HTTPException(status_code=500, detail=str(e)) finally: db.close() @router.put('/{command_id}', include_in_schema=PRIVATE_IN_SCHEMA) async def update_custom_command_endpoint( command_id: int, command: CustomCommandModel, token: str = Depends(oauth2_scheme) ): """Update an existing custom command""" if not valid_token(token): logger.warning(f'update_custom_command - Bad Token: {token}') raise HTTPException(status_code=401, detail='Unauthorized') try: # Check if command exists existing = get_custom_command_by_id(command_id) if not existing: raise HTTPException(status_code=404, detail=f"Custom command {command_id} not found") # Update the command update_data = command.model_dump(exclude={'id', 'creator'}) update_data['updated_at'] = datetime.now().isoformat() update_custom_command(command_id, update_data) # Return updated command result = get_custom_command_by_id(command_id) command_dict = dict(result) if command_dict.get('tags'): try: command_dict['tags'] = json.loads(command_dict['tags']) except: command_dict['tags'] = [] command_dict['creator'] = { 'id': command_dict.pop('creator_db_id'), 'discord_id': command_dict.pop('creator_discord_id'), 'username': command_dict.pop('creator_username'), 'display_name': command_dict.pop('creator_display_name'), 'created_at': command_dict.pop('creator_created_at'), 'total_commands': command_dict.pop('total_commands'), 'active_commands': command_dict.pop('active_commands') } return command_dict except HTTPException: raise except Exception as e: logger.error(f"Error updating custom command {command_id}: {e}") raise HTTPException(status_code=500, detail=str(e)) finally: db.close() @router.patch('/{command_id}', include_in_schema=PRIVATE_IN_SCHEMA) async def patch_custom_command( command_id: int, token: str = Depends(oauth2_scheme), content: Optional[str] = None, tags: Optional[List[str]] = None, use_count: Optional[int] = None, last_used: Optional[str] = None, warning_sent: Optional[bool] = None, is_active: Optional[bool] = None ): """Partially update a custom command""" if not valid_token(token): logger.warning(f'patch_custom_command - Bad Token: {token}') raise HTTPException(status_code=401, detail='Unauthorized') try: # Check if command exists existing = get_custom_command_by_id(command_id) if not existing: raise HTTPException(status_code=404, detail=f"Custom command {command_id} not found") # Build update data update_data = {} if content is not None: update_data['content'] = content update_data['updated_at'] = datetime.now().isoformat() if tags is not None: update_data['tags'] = tags if use_count is not None: update_data['use_count'] = use_count if last_used is not None: update_data['last_used'] = last_used if warning_sent is not None: update_data['warning_sent'] = warning_sent if is_active is not None: update_data['is_active'] = is_active if not update_data: raise HTTPException(status_code=400, detail="No fields to update") # Update the command update_custom_command(command_id, update_data) # Return updated command result = get_custom_command_by_id(command_id) command_dict = dict(result) if command_dict.get('tags'): try: command_dict['tags'] = json.loads(command_dict['tags']) except: command_dict['tags'] = [] command_dict['creator'] = { 'id': command_dict.pop('creator_db_id'), 'discord_id': command_dict.pop('creator_discord_id'), 'username': command_dict.pop('creator_username'), 'display_name': command_dict.pop('creator_display_name'), 'created_at': command_dict.pop('creator_created_at'), 'total_commands': command_dict.pop('total_commands'), 'active_commands': command_dict.pop('active_commands') } return command_dict except HTTPException: raise except Exception as e: logger.error(f"Error patching custom command {command_id}: {e}") raise HTTPException(status_code=500, detail=str(e)) finally: db.close() @router.delete('/{command_id}', include_in_schema=PRIVATE_IN_SCHEMA) async def delete_custom_command_endpoint( command_id: int, token: str = Depends(oauth2_scheme) ): """Delete a custom command""" if not valid_token(token): logger.warning(f'delete_custom_command - Bad Token: {token}') raise HTTPException(status_code=401, detail='Unauthorized') try: # Check if command exists existing = get_custom_command_by_id(command_id) if not existing: raise HTTPException(status_code=404, detail=f"Custom command {command_id} not found") creator_id = existing['creator_id'] # Delete the command delete_custom_command(command_id) # Update creator stats update_creator_stats(creator_id) return {"message": f"Custom command {command_id} deleted successfully"} except HTTPException: raise except Exception as e: logger.error(f"Error deleting custom command {command_id}: {e}") raise HTTPException(status_code=500, detail=str(e)) finally: db.close() # Creator endpoints @router.get('/creators') async def get_creators( discord_id: Optional[int] = None, page: int = Query(1, ge=1), page_size: int = Query(25, ge=1, le=100) ): """Get custom command creators with optional filtering""" try: # Build WHERE clause where_conditions = [] params = [] if discord_id is not None: where_conditions.append("discord_id = ?") params.append(discord_id) where_clause = "WHERE " + " AND ".join(where_conditions) if where_conditions else "" # Get total count count_sql = f"SELECT COUNT(*) FROM custom_command_creators {where_clause}" total_count = db.execute_sql(count_sql, params).fetchone()[0] # Calculate pagination offset = (page - 1) * page_size total_pages = (total_count + page_size - 1) // page_size # Get creators sql = f""" SELECT * FROM custom_command_creators {where_clause} ORDER BY username LIMIT ? OFFSET ? """ params.extend([page_size, offset]) cursor = db.execute_sql(sql, params) results = cursor.fetchall() # Convert to dict format creators = [] if results: columns = [desc[0] for desc in cursor.description] for row in results: creator_dict = dict(zip(columns, row)) creators.append(creator_dict) return { 'creators': creators, 'total_count': total_count, 'page': page, 'page_size': page_size, 'total_pages': total_pages, 'has_more': page < total_pages } except Exception as e: logger.error(f"Error getting creators: {e}") raise HTTPException(status_code=500, detail=str(e)) finally: db.close() @router.post('/creators', include_in_schema=PRIVATE_IN_SCHEMA) async def create_creator_endpoint( creator: CustomCommandCreatorModel, token: str = Depends(oauth2_scheme) ): """Create a new command creator""" if not valid_token(token): logger.warning(f'create_creator - Bad Token: {token}') raise HTTPException(status_code=401, detail='Unauthorized') try: # Check if creator already exists existing = get_creator_by_discord_id(creator.discord_id) if existing: raise HTTPException(status_code=409, detail=f"Creator with Discord ID {creator.discord_id} already exists") # Create the creator creator_data = creator.model_dump(exclude={'id'}) creator_id = create_creator(creator_data) # Return the created creator cursor = db.execute_sql("SELECT * FROM custom_command_creators WHERE id = ?", (creator_id,)) result = cursor.fetchone() if result: columns = [desc[0] for desc in cursor.description] return dict(zip(columns, result)) else: raise HTTPException(status_code=500, detail="Failed to retrieve created creator") except HTTPException: raise except Exception as e: logger.error(f"Error creating creator: {e}") raise HTTPException(status_code=500, detail=str(e)) finally: db.close() @router.get('/stats') async def get_custom_command_stats(): """Get comprehensive statistics about custom commands""" try: # Get basic counts total_commands = db.execute_sql("SELECT COUNT(*) FROM custom_commands").fetchone()[0] active_commands = db.execute_sql("SELECT COUNT(*) FROM custom_commands WHERE is_active = 1").fetchone()[0] total_creators = db.execute_sql("SELECT COUNT(*) FROM custom_command_creators").fetchone()[0] # Get total uses total_uses_result = db.execute_sql("SELECT SUM(use_count) FROM custom_commands WHERE is_active = 1").fetchone() total_uses = total_uses_result[0] if total_uses_result[0] else 0 # Get most popular command cursor = db.execute_sql(""" SELECT cc.*, creator.discord_id as creator_discord_id, creator.username as creator_username, creator.display_name as creator_display_name FROM custom_commands cc LEFT JOIN custom_command_creators creator ON cc.creator_id = creator.id WHERE cc.is_active = 1 ORDER BY cc.use_count DESC LIMIT 1 """) most_popular_result = cursor.fetchone() most_popular_command = None if most_popular_result: columns = [desc[0] for desc in cursor.description] command_dict = dict(zip(columns, most_popular_result)) if command_dict.get('tags'): try: command_dict['tags'] = json.loads(command_dict['tags']) except: command_dict['tags'] = [] command_dict['creator'] = { 'discord_id': command_dict.pop('creator_discord_id'), 'username': command_dict.pop('creator_username'), 'display_name': command_dict.pop('creator_display_name') } most_popular_command = command_dict # Get most active creator cursor2 = db.execute_sql(""" SELECT * FROM custom_command_creators ORDER BY active_commands DESC LIMIT 1 """) most_active_creator_result = cursor2.fetchone() most_active_creator = None if most_active_creator_result: columns2 = [desc[0] for desc in cursor2.description] most_active_creator = dict(zip(columns2, most_active_creator_result)) # Get recent commands count (last 7 days) week_ago = (datetime.now() - timedelta(days=7)).isoformat() recent_count = db.execute_sql(""" SELECT COUNT(*) FROM custom_commands WHERE created_at >= ? AND is_active = 1 """, (week_ago,)).fetchone()[0] # Get cleanup stats sixty_days_ago = (datetime.now() - timedelta(days=60)).isoformat() ninety_days_ago = (datetime.now() - timedelta(days=90)).isoformat() warning_count = db.execute_sql(""" SELECT COUNT(*) FROM custom_commands WHERE last_used < ? AND warning_sent = 0 AND is_active = 1 """, (sixty_days_ago,)).fetchone()[0] deletion_count = db.execute_sql(""" SELECT COUNT(*) FROM custom_commands WHERE last_used < ? AND is_active = 1 """, (ninety_days_ago,)).fetchone()[0] return CustomCommandStatsResponse( total_commands=total_commands, active_commands=active_commands, total_creators=total_creators, total_uses=total_uses, most_popular_command=most_popular_command, most_active_creator=most_active_creator, recent_commands_count=recent_count, commands_needing_warning=warning_count, commands_eligible_for_deletion=deletion_count ) except Exception as e: logger.error(f"Error getting custom command stats: {e}") raise HTTPException(status_code=500, detail=str(e)) finally: db.close() # Special endpoints for Discord bot integration @router.get('/by_name/{command_name}') async def get_custom_command_by_name_endpoint(command_name: str): """Get a custom command by name (for Discord bot execution)""" try: result = get_custom_command_by_name(command_name) if not result: raise HTTPException(status_code=404, detail=f"Custom command '{command_name}' not found") command_dict = dict(result) # Parse tags if command_dict.get('tags'): try: command_dict['tags'] = json.loads(command_dict['tags']) except: command_dict['tags'] = [] # Add creator info - get full creator record creator_id = command_dict['creator_id'] creator_cursor = db.execute_sql("SELECT * FROM custom_command_creators WHERE id = ?", (creator_id,)) creator_result = creator_cursor.fetchone() if creator_result: creator_columns = [desc[0] for desc in creator_cursor.description] creator_dict = dict(zip(creator_columns, creator_result)) command_dict['creator'] = creator_dict else: # Fallback to basic info if full creator not found command_dict['creator'] = { 'id': creator_id, 'discord_id': command_dict.pop('creator_discord_id'), 'username': command_dict.pop('creator_username'), 'display_name': command_dict.pop('creator_display_name'), 'created_at': command_dict['created_at'], # Use command creation as fallback 'total_commands': 0, 'active_commands': 0 } # Remove the duplicate fields command_dict.pop('creator_discord_id', None) command_dict.pop('creator_username', None) command_dict.pop('creator_display_name', None) return command_dict except HTTPException: raise except Exception as e: logger.error(f"Error getting custom command by name '{command_name}': {e}") raise HTTPException(status_code=500, detail=str(e)) finally: db.close() @router.patch('/by_name/{command_name}/execute', include_in_schema=PRIVATE_IN_SCHEMA) async def execute_custom_command( command_name: str, token: str = Depends(oauth2_scheme) ): """Execute a custom command and update usage statistics""" if not valid_token(token): logger.warning(f'execute_custom_command - Bad Token: {token}') raise HTTPException(status_code=401, detail='Unauthorized') try: result = get_custom_command_by_name(command_name) if not result: raise HTTPException(status_code=404, detail=f"Custom command '{command_name}' not found") command_dict = dict(result) command_id = command_dict['id'] # Update usage statistics update_data = { 'last_used': datetime.now().isoformat(), 'use_count': command_dict['use_count'] + 1, 'warning_sent': False # Reset warning on use } update_custom_command(command_id, update_data) # Return updated command updated_result = get_custom_command_by_id(command_id) updated_dict = dict(updated_result) if updated_dict.get('tags'): try: updated_dict['tags'] = json.loads(updated_dict['tags']) except: updated_dict['tags'] = [] # Add creator info - get full creator record creator_id = updated_dict['creator_id'] creator_cursor = db.execute_sql("SELECT * FROM custom_command_creators WHERE id = ?", (creator_id,)) creator_result = creator_cursor.fetchone() if creator_result: creator_columns = [desc[0] for desc in creator_cursor.description] creator_dict = dict(zip(creator_columns, creator_result)) updated_dict['creator'] = creator_dict else: # Fallback to basic info if full creator not found updated_dict['creator'] = { 'id': creator_id, 'discord_id': updated_dict.pop('creator_discord_id'), 'username': updated_dict.pop('creator_username'), 'display_name': updated_dict.pop('creator_display_name'), 'created_at': updated_dict['created_at'], # Use command creation as fallback 'total_commands': 0, 'active_commands': 0 } # Remove the duplicate fields updated_dict.pop('creator_discord_id', None) updated_dict.pop('creator_username', None) updated_dict.pop('creator_display_name', None) return updated_dict except HTTPException: raise except Exception as e: logger.error(f"Error executing custom command '{command_name}': {e}") raise HTTPException(status_code=500, detail=str(e)) finally: db.close() @router.get('/autocomplete') async def get_command_names_for_autocomplete( partial_name: str = "", limit: int = Query(25, ge=1, le=100) ): """Get command names for Discord autocomplete""" try: if partial_name: like_clause = "LOWER(name) LIKE LOWER(?)" if db.database == 'sqlite' else "name ILIKE ?" results = db.execute_sql(f""" SELECT name FROM custom_commands WHERE is_active = 1 AND {like_clause} ORDER BY name LIMIT ? """, (f"%{partial_name}%", limit)).fetchall() else: results = db.execute_sql(""" SELECT name FROM custom_commands WHERE is_active = 1 ORDER BY name LIMIT ? """, (limit,)).fetchall() return [row[0] for row in results] except Exception as e: logger.error(f"Error getting command names for autocomplete: {e}") raise HTTPException(status_code=500, detail=str(e)) finally: db.close() @router.get('/{command_id}') async def get_custom_command(command_id: int): """Get a single custom command by ID""" try: result = get_custom_command_by_id(command_id) if not result: raise HTTPException(status_code=404, detail=f"Custom command {command_id} not found") command_dict = dict(result) # Parse tags if command_dict.get('tags'): try: command_dict['tags'] = json.loads(command_dict['tags']) except: command_dict['tags'] = [] # Add creator info command_dict['creator'] = { 'id': command_dict.pop('creator_db_id'), 'discord_id': command_dict.pop('creator_discord_id'), 'username': command_dict.pop('creator_username'), 'display_name': command_dict.pop('creator_display_name'), 'created_at': command_dict.pop('creator_created_at'), 'total_commands': command_dict.pop('total_commands'), 'active_commands': command_dict.pop('active_commands') } return command_dict except HTTPException: raise except Exception as e: logger.error(f"Error getting custom command {command_id}: {e}") raise HTTPException(status_code=500, detail=str(e)) finally: db.close()