from fastapi import APIRouter, Depends, HTTPException, Query from typing import List, Optional, Dict, Any import logging from datetime import datetime, timedelta from pydantic import BaseModel, Field from playhouse.shortcuts import model_to_dict from peewee import fn from ..dependencies import ( oauth2_scheme, valid_token, PRIVATE_IN_SCHEMA, handle_db_errors, ) from ..db_engine import db, HelpCommand logger = logging.getLogger("database_api") router = APIRouter(prefix="/api/v3/help_commands", tags=["help_commands"]) # Pydantic Models for API class HelpCommandModel(BaseModel): id: Optional[int] = None name: str = Field(..., min_length=2, max_length=32) title: str = Field(..., min_length=1, max_length=200) content: str = Field(..., min_length=1, max_length=4000) category: Optional[str] = Field(None, max_length=50) created_by_discord_id: str # Text to safely store Discord snowflake IDs created_at: Optional[str] = None updated_at: Optional[str] = None last_modified_by: Optional[str] = None # Text to safely store Discord snowflake IDs is_active: bool = True view_count: int = 0 display_order: int = 0 class HelpCommandListResponse(BaseModel): help_commands: List[HelpCommandModel] total_count: int page: int page_size: int total_pages: int has_more: bool class HelpCommandStatsResponse(BaseModel): total_commands: int active_commands: int total_views: int most_viewed_command: Optional[Dict[str, Any]] = None recent_commands_count: int = 0 # API Endpoints @router.get("") @handle_db_errors async def get_help_commands( name: Optional[str] = None, category: Optional[str] = None, is_active: Optional[bool] = True, sort: Optional[str] = "name", page: int = Query(1, ge=1), page_size: int = Query(25, ge=1, le=100), ): """Get help commands with filtering and pagination""" try: # Build query query = HelpCommand.select() # Apply filters if name is not None: query = query.where(fn.Lower(HelpCommand.name).contains(name.lower())) if category is not None: query = query.where(fn.Lower(HelpCommand.category) == category.lower()) if is_active is not None: query = query.where(HelpCommand.is_active == is_active) # Get total count before pagination total_count = query.count() # Apply sorting sort_mapping = { "name": HelpCommand.name, "title": HelpCommand.title, "category": HelpCommand.category, "created_at": HelpCommand.created_at, "updated_at": HelpCommand.updated_at, "view_count": HelpCommand.view_count, "display_order": HelpCommand.display_order, } if sort.startswith("-"): sort_field = sort[1:] order_by = sort_mapping.get(sort_field, HelpCommand.name).desc() else: order_by = sort_mapping.get(sort, HelpCommand.name).asc() query = query.order_by(order_by) # Calculate pagination offset = (page - 1) * page_size total_pages = (total_count + page_size - 1) // page_size # Apply pagination query = query.limit(page_size).offset(offset) # Convert to dictionaries commands = [] for help_cmd in query: cmd_dict = model_to_dict(help_cmd) # Convert datetime objects to ISO strings if cmd_dict.get("created_at"): cmd_dict["created_at"] = cmd_dict["created_at"].isoformat() if cmd_dict.get("updated_at"): cmd_dict["updated_at"] = cmd_dict["updated_at"].isoformat() try: command_model = HelpCommandModel(**cmd_dict) commands.append(command_model) except Exception as e: logger.error(f"Error creating HelpCommandModel: {e}, data: {cmd_dict}") continue return HelpCommandListResponse( help_commands=commands, total_count=total_count, page=page, page_size=page_size, total_pages=total_pages, has_more=page < total_pages, ) except Exception as e: logger.error(f"Error getting help commands: {e}") raise HTTPException(status_code=500, detail=str(e)) finally: db.close() @router.post("/", include_in_schema=PRIVATE_IN_SCHEMA) @handle_db_errors async def create_help_command_endpoint( command: HelpCommandModel, token: str = Depends(oauth2_scheme) ): """Create a new help command""" if not valid_token(token): logger.warning(f"create_help_command - Bad Token: {token}") raise HTTPException(status_code=401, detail="Unauthorized") try: # Check if command name already exists existing = HelpCommand.get_by_name(command.name) if existing: raise HTTPException( status_code=409, detail=f"Help topic '{command.name}' already exists" ) # Create the command help_cmd = HelpCommand.create( name=command.name.lower(), title=command.title, content=command.content, category=command.category.lower() if command.category else None, created_by_discord_id=command.created_by_discord_id, created_at=datetime.now(), is_active=True, view_count=0, display_order=command.display_order, ) # Return the created command result = model_to_dict(help_cmd) if result.get("created_at"): result["created_at"] = result["created_at"].isoformat() if result.get("updated_at"): result["updated_at"] = result["updated_at"].isoformat() return result except HTTPException: raise except Exception as e: logger.error(f"Error creating help command: {e}") raise HTTPException(status_code=500, detail=str(e)) finally: db.close() @router.put("/{command_id}", include_in_schema=PRIVATE_IN_SCHEMA) @handle_db_errors async def update_help_command_endpoint( command_id: int, command: HelpCommandModel, token: str = Depends(oauth2_scheme) ): """Update an existing help command""" if not valid_token(token): logger.warning(f"update_help_command - Bad Token: {token}") raise HTTPException(status_code=401, detail="Unauthorized") try: # Get the command help_cmd = HelpCommand.get_or_none(HelpCommand.id == command_id) if not help_cmd: raise HTTPException( status_code=404, detail=f"Help command {command_id} not found" ) # Update fields if command.title: help_cmd.title = command.title if command.content: help_cmd.content = command.content if command.category is not None: help_cmd.category = command.category.lower() if command.category else None if command.last_modified_by: help_cmd.last_modified_by = command.last_modified_by if command.display_order is not None: help_cmd.display_order = command.display_order help_cmd.updated_at = datetime.now() help_cmd.save() # Return updated command result = model_to_dict(help_cmd) if result.get("created_at"): result["created_at"] = result["created_at"].isoformat() if result.get("updated_at"): result["updated_at"] = result["updated_at"].isoformat() return result except HTTPException: raise except Exception as e: logger.error(f"Error updating help command {command_id}: {e}") raise HTTPException(status_code=500, detail=str(e)) finally: db.close() @router.patch("/{command_id}/restore", include_in_schema=PRIVATE_IN_SCHEMA) @handle_db_errors async def restore_help_command_endpoint( command_id: int, token: str = Depends(oauth2_scheme) ): """Restore a soft-deleted help command""" if not valid_token(token): logger.warning(f"restore_help_command - Bad Token: {token}") raise HTTPException(status_code=401, detail="Unauthorized") try: # Get the command help_cmd = HelpCommand.get_or_none(HelpCommand.id == command_id) if not help_cmd: raise HTTPException( status_code=404, detail=f"Help command {command_id} not found" ) # Restore the command help_cmd.restore() # Return restored command result = model_to_dict(help_cmd) if result.get("created_at"): result["created_at"] = result["created_at"].isoformat() if result.get("updated_at"): result["updated_at"] = result["updated_at"].isoformat() return result except HTTPException: raise except Exception as e: logger.error(f"Error restoring help command {command_id}: {e}") raise HTTPException(status_code=500, detail=str(e)) finally: db.close() @router.delete("/{command_id}", include_in_schema=PRIVATE_IN_SCHEMA) @handle_db_errors async def delete_help_command_endpoint( command_id: int, token: str = Depends(oauth2_scheme) ): """Soft delete a help command""" if not valid_token(token): logger.warning(f"delete_help_command - Bad Token: {token}") raise HTTPException(status_code=401, detail="Unauthorized") try: # Get the command help_cmd = HelpCommand.get_or_none(HelpCommand.id == command_id) if not help_cmd: raise HTTPException( status_code=404, detail=f"Help command {command_id} not found" ) # Soft delete the command help_cmd.soft_delete() return {"message": f"Help command {command_id} deleted successfully"} except HTTPException: raise except Exception as e: logger.error(f"Error deleting help command {command_id}: {e}") raise HTTPException(status_code=500, detail=str(e)) finally: db.close() @router.get("/stats") @handle_db_errors async def get_help_command_stats(): """Get comprehensive statistics about help commands""" try: # Get basic counts total_commands = HelpCommand.select().count() active_commands = ( HelpCommand.select().where(HelpCommand.is_active == True).count() ) # Get total views total_views = ( HelpCommand.select(fn.SUM(HelpCommand.view_count)) .where(HelpCommand.is_active == True) .scalar() or 0 ) # Get most viewed command most_viewed = HelpCommand.get_most_viewed(limit=1).first() most_viewed_command = None if most_viewed: most_viewed_dict = model_to_dict(most_viewed) if most_viewed_dict.get("created_at"): most_viewed_dict["created_at"] = most_viewed_dict[ "created_at" ].isoformat() if most_viewed_dict.get("updated_at"): most_viewed_dict["updated_at"] = most_viewed_dict[ "updated_at" ].isoformat() most_viewed_command = most_viewed_dict # Get recent commands count (last 7 days) week_ago = datetime.now() - timedelta(days=7) recent_count = ( HelpCommand.select() .where( (HelpCommand.created_at >= week_ago) & (HelpCommand.is_active == True) ) .count() ) return HelpCommandStatsResponse( total_commands=total_commands, active_commands=active_commands, total_views=int(total_views), most_viewed_command=most_viewed_command, recent_commands_count=recent_count, ) except Exception as e: logger.error(f"Error getting help command stats: {e}") raise HTTPException(status_code=500, detail=str(e)) finally: db.close() # Special endpoints for Discord bot integration @router.get("/by_name/{command_name}") @handle_db_errors async def get_help_command_by_name_endpoint( command_name: str, include_inactive: bool = Query(False) ): """Get a help command by name (for Discord bot)""" try: help_cmd = HelpCommand.get_by_name( command_name, include_inactive=include_inactive ) if not help_cmd: raise HTTPException( status_code=404, detail=f"Help topic '{command_name}' not found" ) result = model_to_dict(help_cmd) if result.get("created_at"): result["created_at"] = result["created_at"].isoformat() if result.get("updated_at"): result["updated_at"] = result["updated_at"].isoformat() return result except HTTPException: raise except Exception as e: logger.error(f"Error getting help command by name '{command_name}': {e}") raise HTTPException(status_code=500, detail=str(e)) finally: db.close() @router.patch("/by_name/{command_name}/view", include_in_schema=PRIVATE_IN_SCHEMA) @handle_db_errors async def increment_view_count(command_name: str, token: str = Depends(oauth2_scheme)): """Increment view count for a help command""" if not valid_token(token): logger.warning(f"increment_view_count - Bad Token: {token}") raise HTTPException(status_code=401, detail="Unauthorized") try: help_cmd = HelpCommand.get_by_name(command_name) if not help_cmd: raise HTTPException( status_code=404, detail=f"Help topic '{command_name}' not found" ) # Increment view count help_cmd.increment_view_count() # Return updated command result = model_to_dict(help_cmd) if result.get("created_at"): result["created_at"] = result["created_at"].isoformat() if result.get("updated_at"): result["updated_at"] = result["updated_at"].isoformat() return result except HTTPException: raise except Exception as e: logger.error(f"Error incrementing view count for '{command_name}': {e}") raise HTTPException(status_code=500, detail=str(e)) finally: db.close() @router.get("/autocomplete") @handle_db_errors async def get_help_names_for_autocomplete( q: str = Query(""), limit: int = Query(25, ge=1, le=100) ): """Get help command names for Discord autocomplete""" try: if q: results = HelpCommand.search_by_name(q, limit=limit) else: results = HelpCommand.get_all_active().limit(limit) # Return list of dictionaries with name, title, category return { "results": [ { "name": help_cmd.name, "title": help_cmd.title, "category": help_cmd.category, } for help_cmd in results ] } except Exception as e: logger.error(f"Error getting help names for autocomplete: {e}") raise HTTPException(status_code=500, detail=str(e)) finally: db.close() @router.get("/{command_id}") @handle_db_errors async def get_help_command(command_id: int): """Get a single help command by ID""" try: help_cmd = HelpCommand.get_or_none(HelpCommand.id == command_id) if not help_cmd: raise HTTPException( status_code=404, detail=f"Help command {command_id} not found" ) result = model_to_dict(help_cmd) if result.get("created_at"): result["created_at"] = result["created_at"].isoformat() if result.get("updated_at"): result["updated_at"] = result["updated_at"].isoformat() return result except HTTPException: raise except Exception as e: logger.error(f"Error getting help command {command_id}: {e}") raise HTTPException(status_code=500, detail=str(e)) finally: db.close()