From d32f9a8239261b0352e97c83d696bae8c7916dc4 Mon Sep 17 00:00:00 2001 From: Cal Corum Date: Fri, 17 Oct 2025 16:36:40 -0500 Subject: [PATCH] Added HelpCommands --- app/db_engine.py | 80 +++++ app/main.py | 3 +- app/routers_v3/custom_commands.py | 359 +++++++++------------- app/routers_v3/help_commands.py | 482 ++++++++++++++++++++++++++++++ 4 files changed, 710 insertions(+), 214 deletions(-) create mode 100644 app/routers_v3/help_commands.py diff --git a/app/db_engine.py b/app/db_engine.py index 5e9a9b4..9ce2670 100644 --- a/app/db_engine.py +++ b/app/db_engine.py @@ -2355,6 +2355,86 @@ class CustomCommand(BaseModel): self.tags = None +class HelpCommand(BaseModel): + """Model for admin-created help topics.""" + name = CharField(max_length=32, unique=True) + title = CharField(max_length=200) + content = TextField() + category = CharField(max_length=50, null=True) + + # Audit fields + created_by_discord_id = CharField(max_length=20) # Discord snowflake ID as string + created_at = DateTimeField() + updated_at = DateTimeField(null=True) + last_modified_by = CharField(max_length=20, null=True) # Discord snowflake ID as string + + # Status and metrics + is_active = BooleanField(default=True) + view_count = IntegerField(default=0) + display_order = IntegerField(default=0) + + class Meta: + table_name = 'help_commands' + + @staticmethod + def get_by_name(name: str, include_inactive: bool = False): + """Get a help command by name (case-insensitive).""" + query = HelpCommand.select().where(fn.Lower(HelpCommand.name) == name.lower()) + if not include_inactive: + query = query.where(HelpCommand.is_active == True) + return query.first() + + @staticmethod + def search_by_name(partial_name: str, limit: int = 25): + """Search help topics by partial name match.""" + return (HelpCommand + .select() + .where((HelpCommand.is_active == True) & + (fn.Lower(HelpCommand.name).contains(partial_name.lower()))) + .order_by(HelpCommand.display_order, HelpCommand.name) + .limit(limit)) + + @staticmethod + def get_by_category(category: str, include_inactive: bool = False): + """Get help commands by category.""" + query = HelpCommand.select().where(fn.Lower(HelpCommand.category) == category.lower()) + if not include_inactive: + query = query.where(HelpCommand.is_active == True) + return query.order_by(HelpCommand.display_order, HelpCommand.name) + + @staticmethod + def get_all_active(): + """Get all active help topics.""" + return (HelpCommand + .select() + .where(HelpCommand.is_active == True) + .order_by(HelpCommand.display_order, HelpCommand.name)) + + @staticmethod + def get_most_viewed(limit: int = 10): + """Get most viewed help topics.""" + return (HelpCommand + .select() + .where(HelpCommand.is_active == True) + .order_by(HelpCommand.view_count.desc()) + .limit(limit)) + + def increment_view_count(self): + """Increment view count for this help topic.""" + self.view_count += 1 + self.save() + + def soft_delete(self): + """Soft delete this help topic.""" + self.is_active = False + self.save() + + def restore(self): + """Restore this soft-deleted help topic.""" + self.is_active = True + self.save() + + class SeasonBattingStatsView(BaseModel): name = CharField() player_id = IntegerField() diff --git a/app/main.py b/app/main.py index af87706..ab04918 100644 --- a/app/main.py +++ b/app/main.py @@ -10,7 +10,7 @@ from fastapi.openapi.utils import get_openapi # from fastapi.openapi.docs import get_swagger_ui_html # from fastapi.openapi.utils import get_openapi -from .routers_v3 import current, players, results, schedules, standings, teams, transactions, battingstats, pitchingstats, fieldingstats, draftpicks, draftlist, managers, awards, draftdata, keepers, stratgame, stratplay, injuries, decisions, divisions, sbaplayers, custom_commands, views +from .routers_v3 import current, players, results, schedules, standings, teams, transactions, battingstats, pitchingstats, fieldingstats, draftpicks, draftlist, managers, awards, draftdata, keepers, stratgame, stratplay, injuries, decisions, divisions, sbaplayers, custom_commands, help_commands, views # date = f'{datetime.datetime.now().year}-{datetime.datetime.now().month}-{datetime.datetime.now().day}' log_level = logging.INFO if os.environ.get('LOG_LEVEL') == 'INFO' else logging.WARNING @@ -67,6 +67,7 @@ app.include_router(decisions.router) app.include_router(divisions.router) app.include_router(sbaplayers.router) app.include_router(custom_commands.router) +app.include_router(help_commands.router) app.include_router(views.router) logger.info(f'Loaded all routers.') diff --git a/app/routers_v3/custom_commands.py b/app/routers_v3/custom_commands.py index e76c13c..21a08d5 100644 --- a/app/routers_v3/custom_commands.py +++ b/app/routers_v3/custom_commands.py @@ -4,9 +4,11 @@ import logging from datetime import datetime, timedelta from pydantic import BaseModel, Field import json +from playhouse.shortcuts import model_to_dict +from peewee import fn from ..dependencies import oauth2_scheme, valid_token, PRIVATE_IN_SCHEMA, handle_db_errors -from ..db_engine import db +from ..db_engine import db, CustomCommand, CustomCommandCreator logger = logging.getLogger('database_api') @@ -64,155 +66,30 @@ class CustomCommandStatsResponse(BaseModel): # Helper functions -def get_custom_commands_table(): - """Get custom commands from database with basic filtering""" - cursor = db.execute_sql(""" - SELECT cc.*, creator.id as creator_db_id, creator.discord_id as creator_discord_id, - creator.username as creator_username, creator.display_name as creator_display_name, - creator.created_at as creator_created_at, creator.total_commands, creator.active_commands - FROM custom_commands cc - LEFT JOIN custom_command_creators creator ON cc.creator_id = creator.id - WHERE 1=1 - """) - - results = cursor.fetchall() - if results: - columns = [desc[0] for desc in cursor.description] - return [dict(zip(columns, row)) for row in results] - return [] - - -def get_custom_command_by_id(command_id: int): - """Get a single custom command by ID""" - cursor = db.execute_sql(""" - SELECT cc.*, creator.id as creator_db_id, creator.discord_id as creator_discord_id, - creator.username as creator_username, creator.display_name as creator_display_name, - creator.created_at as creator_created_at, creator.total_commands, creator.active_commands - FROM custom_commands cc - LEFT JOIN custom_command_creators creator ON cc.creator_id = creator.id - WHERE cc.id = ? - """, (command_id,)) - - result = cursor.fetchone() - if result: - columns = [desc[0] for desc in cursor.description] - return dict(zip(columns, result)) - return None - - -def get_custom_command_by_name(name: str): - """Get a single custom command by name""" - cursor = db.execute_sql(""" - SELECT cc.*, creator.id as creator_db_id, creator.discord_id as creator_discord_id, - creator.username as creator_username, creator.display_name as creator_display_name, - creator.created_at as creator_created_at, creator.total_commands, creator.active_commands - FROM custom_commands cc - LEFT JOIN custom_command_creators creator ON cc.creator_id = creator.id - WHERE LOWER(cc.name) = LOWER(?) - """, (name,)) - - result = cursor.fetchone() - if result: - columns = [desc[0] for desc in cursor.description] - return dict(zip(columns, result)) - return None - - -def create_custom_command(command_data: Dict[str, Any]) -> int: - """Create a new custom command""" - now = datetime.now().isoformat() - - result = db.execute_sql(""" - INSERT INTO custom_commands - (name, content, creator_id, created_at, last_used, use_count, warning_sent, is_active, tags) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) - """, ( - command_data['name'], - command_data['content'], - command_data['creator_id'], - now, - now, - 0, - False, - True, - json.dumps(command_data.get('tags', [])) - )) - - return db.execute_sql("SELECT last_insert_rowid()").fetchone()[0] - - -def update_custom_command(command_id: int, update_data: Dict[str, Any]) -> bool: - """Update an existing custom command""" - set_clauses = [] - params = [] - - for field, value in update_data.items(): - if field == 'tags': - set_clauses.append(f"{field} = ?") - params.append(json.dumps(value)) - else: - set_clauses.append(f"{field} = ?") - params.append(value) - - if not set_clauses: - return False - - params.append(command_id) - sql = f"UPDATE custom_commands SET {', '.join(set_clauses)} WHERE id = ?" - - db.execute_sql(sql, params) - return True - - -def delete_custom_command(command_id: int) -> bool: - """Delete a custom command""" - db.execute_sql("DELETE FROM custom_commands WHERE id = ?", (command_id,)) - return True - - -def get_creator_by_discord_id(discord_id: int): - """Get creator by Discord ID""" - result = db.execute_sql(""" - SELECT * FROM custom_command_creators WHERE discord_id = ? - """, (discord_id,)).fetchone() - return result - - -def create_creator(creator_data: Dict[str, Any]) -> int: - """Create a new command creator""" - now = datetime.now().isoformat() - - result = db.execute_sql(""" - INSERT INTO custom_command_creators - (discord_id, username, display_name, created_at, total_commands, active_commands) - VALUES (?, ?, ?, ?, ?, ?) - """, ( - creator_data['discord_id'], - creator_data['username'], - creator_data.get('display_name'), - now, - 0, - 0 - )) - - return db.execute_sql("SELECT last_insert_rowid()").fetchone()[0] +def convert_datetime_to_iso(data_dict, fields=None): + """Convert datetime objects to ISO strings in a dictionary.""" + if fields is None: + fields = ['created_at', 'updated_at', 'last_used'] + for field in fields: + if data_dict.get(field) and hasattr(data_dict[field], 'isoformat'): + data_dict[field] = data_dict[field].isoformat() + return data_dict def update_creator_stats(creator_id: int): """Update creator command counts""" - total = db.execute_sql(""" - SELECT COUNT(*) FROM custom_commands WHERE creator_id = ? - """, (creator_id,)).fetchone()[0] - - active = db.execute_sql(""" - SELECT COUNT(*) FROM custom_commands WHERE creator_id = ? AND is_active = 1 - """, (creator_id,)).fetchone()[0] - - db.execute_sql(""" - UPDATE custom_command_creators - SET total_commands = ?, active_commands = ? - WHERE id = ? - """, (total, active, creator_id)) + creator = CustomCommandCreator.get_or_none(CustomCommandCreator.id == creator_id) + if not creator: + return + + total = CustomCommand.select().where(CustomCommand.creator == creator).count() + active = CustomCommand.select().where( + (CustomCommand.creator == creator) & (CustomCommand.is_active == True) + ).count() + + creator.total_commands = total + creator.active_commands = active + creator.save() # API Endpoints @@ -236,25 +113,25 @@ async def get_custom_commands( params = [] if name is not None: - where_conditions.append("LOWER(cc.name) LIKE LOWER(?)" if db.database == 'sqlite' else "cc.name ILIKE ?") + where_conditions.append("LOWER(cc.name) LIKE LOWER(%s)" if DATABASE_TYPE.lower() == 'sqlite' else "cc.name ILIKE %s") params.append(f"%{name}%") - + if creator_discord_id is not None: - where_conditions.append("creator.discord_id = ?") + where_conditions.append("creator.discord_id = %s") params.append(creator_discord_id) - + if min_uses is not None: - where_conditions.append("cc.use_count >= ?") + where_conditions.append("cc.use_count >= %s") params.append(min_uses) - + if max_days_unused is not None: cutoff_date = (datetime.now() - timedelta(days=max_days_unused)).isoformat() - where_conditions.append("cc.last_used >= ?") + where_conditions.append("cc.last_used >= %s") params.append(cutoff_date) - + if is_active is not None: - where_conditions.append("cc.is_active = ?") - params.append(1 if is_active else 0) + where_conditions.append("cc.is_active = %s") + params.append(is_active) where_clause = "WHERE " + " AND ".join(where_conditions) if where_conditions else "" @@ -292,16 +169,16 @@ async def get_custom_commands( # Get commands sql = f""" - SELECT cc.*, creator.discord_id as creator_discord_id, + SELECT cc.*, creator.discord_id as creator_discord_id, creator.username as creator_username, creator.display_name as creator_display_name FROM custom_commands cc LEFT JOIN custom_command_creators creator ON cc.creator_id = creator.id {where_clause} {order_clause} - LIMIT ? OFFSET ? + LIMIT %s OFFSET %s """ - + params.extend([page_size, offset]) cursor3 = db.execute_sql(sql, params) results = cursor3.fetchall() @@ -322,13 +199,16 @@ async def get_custom_commands( # Get full creator information creator_id = command_dict['creator_id'] - creator_cursor = db.execute_sql("SELECT * FROM custom_command_creators WHERE id = ?", (creator_id,)) + creator_cursor = db.execute_sql("SELECT * FROM custom_command_creators WHERE id = %s", (creator_id,)) creator_result = creator_cursor.fetchone() if creator_result: # Create complete creator object creator_columns = [desc[0] for desc in creator_cursor.description] creator_dict = dict(zip(creator_columns, creator_result)) + # Convert datetime to ISO string + if creator_dict.get('created_at') and hasattr(creator_dict['created_at'], 'isoformat'): + creator_dict['created_at'] = creator_dict['created_at'].isoformat() try: creator_model = CustomCommandCreatorModel(**creator_dict) command_dict['creator'] = creator_model @@ -341,9 +221,14 @@ async def get_custom_commands( # Remove the individual creator fields now that we have the creator object command_dict.pop('creator_discord_id', None) - command_dict.pop('creator_username', None) + command_dict.pop('creator_username', None) command_dict.pop('creator_display_name', None) - + + # Convert datetime fields to ISO strings + for field in ['created_at', 'updated_at', 'last_used']: + if command_dict.get(field) and hasattr(command_dict[field], 'isoformat'): + command_dict[field] = command_dict[field].isoformat() + # Create CustomCommandModel instance try: command_model = CustomCommandModel(**command_dict) @@ -399,23 +284,30 @@ async def create_custom_command_endpoint( # Return the created command result = get_custom_command_by_id(command_id) command_dict = dict(result) - + + # Convert datetime fields + convert_datetime_to_iso(command_dict) + if command_dict.get('tags'): try: command_dict['tags'] = json.loads(command_dict['tags']) except: command_dict['tags'] = [] - + + creator_created_at = command_dict.pop('creator_created_at') + if hasattr(creator_created_at, 'isoformat'): + creator_created_at = creator_created_at.isoformat() + command_dict['creator'] = { 'id': command_dict.pop('creator_db_id'), 'discord_id': command_dict.pop('creator_discord_id'), 'username': command_dict.pop('creator_username'), 'display_name': command_dict.pop('creator_display_name'), - 'created_at': command_dict.pop('creator_created_at'), + 'created_at': creator_created_at, 'total_commands': command_dict.pop('total_commands'), 'active_commands': command_dict.pop('active_commands') } - + return command_dict except HTTPException: @@ -453,25 +345,32 @@ async def update_custom_command_endpoint( # Return updated command result = get_custom_command_by_id(command_id) command_dict = dict(result) - + + # Convert datetime fields + convert_datetime_to_iso(command_dict) + if command_dict.get('tags'): try: command_dict['tags'] = json.loads(command_dict['tags']) except: command_dict['tags'] = [] - + + creator_created_at = command_dict.pop('creator_created_at') + if hasattr(creator_created_at, 'isoformat'): + creator_created_at = creator_created_at.isoformat() + command_dict['creator'] = { 'id': command_dict.pop('creator_db_id'), 'discord_id': command_dict.pop('creator_discord_id'), 'username': command_dict.pop('creator_username'), 'display_name': command_dict.pop('creator_display_name'), - 'created_at': command_dict.pop('creator_created_at'), + 'created_at': creator_created_at, 'total_commands': command_dict.pop('total_commands'), 'active_commands': command_dict.pop('active_commands') } - + return command_dict - + except HTTPException: raise except Exception as e: @@ -529,19 +428,26 @@ async def patch_custom_command( # Return updated command result = get_custom_command_by_id(command_id) command_dict = dict(result) - + + # Convert datetime fields to ISO strings + convert_datetime_to_iso(command_dict) + if command_dict.get('tags'): try: command_dict['tags'] = json.loads(command_dict['tags']) except: command_dict['tags'] = [] - + + creator_created_at = command_dict.pop('creator_created_at') + if hasattr(creator_created_at, 'isoformat'): + creator_created_at = creator_created_at.isoformat() + command_dict['creator'] = { 'id': command_dict.pop('creator_db_id'), 'discord_id': command_dict.pop('creator_discord_id'), 'username': command_dict.pop('creator_username'), 'display_name': command_dict.pop('creator_display_name'), - 'created_at': command_dict.pop('creator_created_at'), + 'created_at': creator_created_at, 'total_commands': command_dict.pop('total_commands'), 'active_commands': command_dict.pop('active_commands') } @@ -608,7 +514,7 @@ async def get_creators( params = [] if discord_id is not None: - where_conditions.append("discord_id = ?") + where_conditions.append("discord_id = %s") params.append(discord_id) where_clause = "WHERE " + " AND ".join(where_conditions) if where_conditions else "" @@ -623,12 +529,12 @@ async def get_creators( # Get creators sql = f""" - SELECT * FROM custom_command_creators + SELECT * FROM custom_command_creators {where_clause} ORDER BY username - LIMIT ? OFFSET ? + LIMIT %s OFFSET %s """ - + params.extend([page_size, offset]) cursor = db.execute_sql(sql, params) results = cursor.fetchall() @@ -639,6 +545,9 @@ async def get_creators( columns = [desc[0] for desc in cursor.description] for row in results: creator_dict = dict(zip(columns, row)) + # Convert datetime to ISO string + if creator_dict.get('created_at') and hasattr(creator_dict['created_at'], 'isoformat'): + creator_dict['created_at'] = creator_dict['created_at'].isoformat() creators.append(creator_dict) return { @@ -679,7 +588,7 @@ async def create_creator_endpoint( creator_id = create_creator(creator_data) # Return the created creator - cursor = db.execute_sql("SELECT * FROM custom_command_creators WHERE id = ?", (creator_id,)) + cursor = db.execute_sql("SELECT * FROM custom_command_creators WHERE id = %s", (creator_id,)) result = cursor.fetchone() if result: columns = [desc[0] for desc in cursor.description] @@ -703,21 +612,21 @@ async def get_custom_command_stats(): try: # Get basic counts total_commands = db.execute_sql("SELECT COUNT(*) FROM custom_commands").fetchone()[0] - active_commands = db.execute_sql("SELECT COUNT(*) FROM custom_commands WHERE is_active = 1").fetchone()[0] + active_commands = db.execute_sql("SELECT COUNT(*) FROM custom_commands WHERE is_active = TRUE").fetchone()[0] total_creators = db.execute_sql("SELECT COUNT(*) FROM custom_command_creators").fetchone()[0] - + # Get total uses - total_uses_result = db.execute_sql("SELECT SUM(use_count) FROM custom_commands WHERE is_active = 1").fetchone() + total_uses_result = db.execute_sql("SELECT SUM(use_count) FROM custom_commands WHERE is_active = TRUE").fetchone() total_uses = total_uses_result[0] if total_uses_result[0] else 0 # Get most popular command cursor = db.execute_sql(""" - SELECT cc.*, creator.discord_id as creator_discord_id, + SELECT cc.*, creator.discord_id as creator_discord_id, creator.username as creator_username, creator.display_name as creator_display_name FROM custom_commands cc LEFT JOIN custom_command_creators creator ON cc.creator_id = creator.id - WHERE cc.is_active = 1 + WHERE cc.is_active = TRUE ORDER BY cc.use_count DESC LIMIT 1 """) @@ -727,6 +636,10 @@ async def get_custom_command_stats(): if most_popular_result: columns = [desc[0] for desc in cursor.description] command_dict = dict(zip(columns, most_popular_result)) + # Convert datetime fields to ISO strings + for field in ['created_at', 'updated_at', 'last_used']: + if command_dict.get(field) and hasattr(command_dict[field], 'isoformat'): + command_dict[field] = command_dict[field].isoformat() if command_dict.get('tags'): try: command_dict['tags'] = json.loads(command_dict['tags']) @@ -751,26 +664,29 @@ async def get_custom_command_stats(): if most_active_creator_result: columns2 = [desc[0] for desc in cursor2.description] most_active_creator = dict(zip(columns2, most_active_creator_result)) + # Convert datetime to ISO string + if most_active_creator.get('created_at') and hasattr(most_active_creator['created_at'], 'isoformat'): + most_active_creator['created_at'] = most_active_creator['created_at'].isoformat() # Get recent commands count (last 7 days) week_ago = (datetime.now() - timedelta(days=7)).isoformat() recent_count = db.execute_sql(""" - SELECT COUNT(*) FROM custom_commands - WHERE created_at >= ? AND is_active = 1 + SELECT COUNT(*) FROM custom_commands + WHERE created_at >= %s AND is_active = TRUE """, (week_ago,)).fetchone()[0] - + # Get cleanup stats sixty_days_ago = (datetime.now() - timedelta(days=60)).isoformat() ninety_days_ago = (datetime.now() - timedelta(days=90)).isoformat() - + warning_count = db.execute_sql(""" - SELECT COUNT(*) FROM custom_commands - WHERE last_used < ? AND warning_sent = 0 AND is_active = 1 + SELECT COUNT(*) FROM custom_commands + WHERE last_used < %s AND warning_sent = FALSE AND is_active = TRUE """, (sixty_days_ago,)).fetchone()[0] - + deletion_count = db.execute_sql(""" - SELECT COUNT(*) FROM custom_commands - WHERE last_used < ? AND is_active = 1 + SELECT COUNT(*) FROM custom_commands + WHERE last_used < %s AND is_active = TRUE """, (ninety_days_ago,)).fetchone()[0] return CustomCommandStatsResponse( @@ -799,26 +715,31 @@ async def get_custom_command_by_name_endpoint(command_name: str): """Get a custom command by name (for Discord bot execution)""" try: result = get_custom_command_by_name(command_name) - + if not result: raise HTTPException(status_code=404, detail=f"Custom command '{command_name}' not found") - + command_dict = dict(result) - + + # Convert datetime fields to ISO strings + convert_datetime_to_iso(command_dict) + # Parse tags if command_dict.get('tags'): try: command_dict['tags'] = json.loads(command_dict['tags']) except: command_dict['tags'] = [] - + # Add creator info - get full creator record creator_id = command_dict['creator_id'] - creator_cursor = db.execute_sql("SELECT * FROM custom_command_creators WHERE id = ?", (creator_id,)) + creator_cursor = db.execute_sql("SELECT * FROM custom_command_creators WHERE id = %s", (creator_id,)) creator_result = creator_cursor.fetchone() if creator_result: creator_columns = [desc[0] for desc in creator_cursor.description] creator_dict = dict(zip(creator_columns, creator_result)) + # Convert creator datetime to ISO string + convert_datetime_to_iso(creator_dict, fields=['created_at']) command_dict['creator'] = creator_dict else: # Fallback to basic info if full creator not found @@ -880,20 +801,25 @@ async def execute_custom_command( # Return updated command updated_result = get_custom_command_by_id(command_id) updated_dict = dict(updated_result) - + + # Convert datetime fields to ISO strings + convert_datetime_to_iso(updated_dict) + if updated_dict.get('tags'): try: updated_dict['tags'] = json.loads(updated_dict['tags']) except: updated_dict['tags'] = [] - + # Add creator info - get full creator record creator_id = updated_dict['creator_id'] - creator_cursor = db.execute_sql("SELECT * FROM custom_command_creators WHERE id = ?", (creator_id,)) + creator_cursor = db.execute_sql("SELECT * FROM custom_command_creators WHERE id = %s", (creator_id,)) creator_result = creator_cursor.fetchone() if creator_result: creator_columns = [desc[0] for desc in creator_cursor.description] creator_dict = dict(zip(creator_columns, creator_result)) + # Convert creator datetime to ISO string + convert_datetime_to_iso(creator_dict, fields=['created_at']) updated_dict['creator'] = creator_dict else: # Fallback to basic info if full creator not found @@ -932,19 +858,19 @@ async def get_command_names_for_autocomplete( """Get command names for Discord autocomplete""" try: if partial_name: - like_clause = "LOWER(name) LIKE LOWER(?)" if db.database == 'sqlite' else "name ILIKE ?" + like_clause = "LOWER(name) LIKE LOWER(%s)" if DATABASE_TYPE.lower() == 'sqlite' else "name ILIKE %s" results = db.execute_sql(f""" - SELECT name FROM custom_commands - WHERE is_active = 1 AND {like_clause} + SELECT name FROM custom_commands + WHERE is_active = TRUE AND {like_clause} ORDER BY name - LIMIT ? + LIMIT %s """, (f"%{partial_name}%", limit)).fetchall() else: results = db.execute_sql(""" - SELECT name FROM custom_commands - WHERE is_active = 1 + SELECT name FROM custom_commands + WHERE is_active = TRUE ORDER BY name - LIMIT ? + LIMIT %s """, (limit,)).fetchall() return [row[0] for row in results] @@ -962,26 +888,33 @@ async def get_custom_command(command_id: int): """Get a single custom command by ID""" try: result = get_custom_command_by_id(command_id) - + if not result: raise HTTPException(status_code=404, detail=f"Custom command {command_id} not found") - + command_dict = dict(result) - + + # Convert datetime fields to ISO strings + convert_datetime_to_iso(command_dict) + # Parse tags if command_dict.get('tags'): try: command_dict['tags'] = json.loads(command_dict['tags']) except: command_dict['tags'] = [] - + + creator_created_at = command_dict.pop('creator_created_at') + if hasattr(creator_created_at, 'isoformat'): + creator_created_at = creator_created_at.isoformat() + # Add creator info command_dict['creator'] = { 'id': command_dict.pop('creator_db_id'), 'discord_id': command_dict.pop('creator_discord_id'), 'username': command_dict.pop('creator_username'), 'display_name': command_dict.pop('creator_display_name'), - 'created_at': command_dict.pop('creator_created_at'), + 'created_at': creator_created_at, 'total_commands': command_dict.pop('total_commands'), 'active_commands': command_dict.pop('active_commands') } diff --git a/app/routers_v3/help_commands.py b/app/routers_v3/help_commands.py new file mode 100644 index 0000000..5b7126b --- /dev/null +++ b/app/routers_v3/help_commands.py @@ -0,0 +1,482 @@ +from fastapi import APIRouter, Depends, HTTPException, Query +from typing import List, Optional, Dict, Any +import logging +from datetime import datetime, timedelta +from pydantic import BaseModel, Field +from playhouse.shortcuts import model_to_dict +from peewee import fn + +from ..dependencies import oauth2_scheme, valid_token, PRIVATE_IN_SCHEMA, handle_db_errors +from ..db_engine import db, HelpCommand + +logger = logging.getLogger('database_api') + +router = APIRouter( + prefix='/api/v3/help_commands', + tags=['help_commands'] +) + + +# Pydantic Models for API +class HelpCommandModel(BaseModel): + id: Optional[int] = None + name: str = Field(..., min_length=2, max_length=32) + title: str = Field(..., min_length=1, max_length=200) + content: str = Field(..., min_length=1, max_length=4000) + category: Optional[str] = Field(None, max_length=50) + created_by_discord_id: str # Text to safely store Discord snowflake IDs + created_at: Optional[str] = None + updated_at: Optional[str] = None + last_modified_by: Optional[str] = None # Text to safely store Discord snowflake IDs + is_active: bool = True + view_count: int = 0 + display_order: int = 0 + + +class HelpCommandListResponse(BaseModel): + help_commands: List[HelpCommandModel] + total_count: int + page: int + page_size: int + total_pages: int + has_more: bool + + +class HelpCommandStatsResponse(BaseModel): + total_commands: int + active_commands: int + total_views: int + most_viewed_command: Optional[Dict[str, Any]] = None + recent_commands_count: int = 0 + + +# API Endpoints + +@router.get('') +@handle_db_errors +async def get_help_commands( + name: Optional[str] = None, + category: Optional[str] = None, + is_active: Optional[bool] = True, + sort: Optional[str] = 'name', + page: int = Query(1, ge=1), + page_size: int = Query(25, ge=1, le=100) +): + """Get help commands with filtering and pagination""" + try: + # Build query + query = HelpCommand.select() + + # Apply filters + if name is not None: + query = query.where(fn.Lower(HelpCommand.name).contains(name.lower())) + + if category is not None: + query = query.where(fn.Lower(HelpCommand.category) == category.lower()) + + if is_active is not None: + query = query.where(HelpCommand.is_active == is_active) + + # Get total count before pagination + total_count = query.count() + + # Apply sorting + sort_mapping = { + 'name': HelpCommand.name, + 'title': HelpCommand.title, + 'category': HelpCommand.category, + 'created_at': HelpCommand.created_at, + 'updated_at': HelpCommand.updated_at, + 'view_count': HelpCommand.view_count, + 'display_order': HelpCommand.display_order + } + + if sort.startswith('-'): + sort_field = sort[1:] + order_by = sort_mapping.get(sort_field, HelpCommand.name).desc() + else: + order_by = sort_mapping.get(sort, HelpCommand.name).asc() + + query = query.order_by(order_by) + + # Calculate pagination + offset = (page - 1) * page_size + total_pages = (total_count + page_size - 1) // page_size + + # Apply pagination + query = query.limit(page_size).offset(offset) + + # Convert to dictionaries + commands = [] + for help_cmd in query: + cmd_dict = model_to_dict(help_cmd) + # Convert datetime objects to ISO strings + if cmd_dict.get('created_at'): + cmd_dict['created_at'] = cmd_dict['created_at'].isoformat() + if cmd_dict.get('updated_at'): + cmd_dict['updated_at'] = cmd_dict['updated_at'].isoformat() + + try: + command_model = HelpCommandModel(**cmd_dict) + commands.append(command_model) + except Exception as e: + logger.error(f"Error creating HelpCommandModel: {e}, data: {cmd_dict}") + continue + + return HelpCommandListResponse( + help_commands=commands, + total_count=total_count, + page=page, + page_size=page_size, + total_pages=total_pages, + has_more=page < total_pages + ) + + except Exception as e: + logger.error(f"Error getting help commands: {e}") + raise HTTPException(status_code=500, detail=str(e)) + finally: + db.close() + + +@router.post('', include_in_schema=PRIVATE_IN_SCHEMA) +@handle_db_errors +async def create_help_command_endpoint( + command: HelpCommandModel, + token: str = Depends(oauth2_scheme) +): + """Create a new help command""" + if not valid_token(token): + logger.warning(f'create_help_command - Bad Token: {token}') + raise HTTPException(status_code=401, detail='Unauthorized') + + try: + # Check if command name already exists + existing = HelpCommand.get_by_name(command.name) + if existing: + raise HTTPException(status_code=409, detail=f"Help topic '{command.name}' already exists") + + # Create the command + help_cmd = HelpCommand.create( + name=command.name.lower(), + title=command.title, + content=command.content, + category=command.category.lower() if command.category else None, + created_by_discord_id=command.created_by_discord_id, + created_at=datetime.now(), + is_active=True, + view_count=0, + display_order=command.display_order + ) + + # Return the created command + result = model_to_dict(help_cmd) + if result.get('created_at'): + result['created_at'] = result['created_at'].isoformat() + if result.get('updated_at'): + result['updated_at'] = result['updated_at'].isoformat() + + return result + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error creating help command: {e}") + raise HTTPException(status_code=500, detail=str(e)) + finally: + db.close() + + +@router.put('/{command_id}', include_in_schema=PRIVATE_IN_SCHEMA) +@handle_db_errors +async def update_help_command_endpoint( + command_id: int, + command: HelpCommandModel, + token: str = Depends(oauth2_scheme) +): + """Update an existing help command""" + if not valid_token(token): + logger.warning(f'update_help_command - Bad Token: {token}') + raise HTTPException(status_code=401, detail='Unauthorized') + + try: + # Get the command + help_cmd = HelpCommand.get_or_none(HelpCommand.id == command_id) + if not help_cmd: + raise HTTPException(status_code=404, detail=f"Help command {command_id} not found") + + # Update fields + if command.title: + help_cmd.title = command.title + if command.content: + help_cmd.content = command.content + if command.category is not None: + help_cmd.category = command.category.lower() if command.category else None + if command.last_modified_by: + help_cmd.last_modified_by = command.last_modified_by + if command.display_order is not None: + help_cmd.display_order = command.display_order + + help_cmd.updated_at = datetime.now() + help_cmd.save() + + # Return updated command + result = model_to_dict(help_cmd) + if result.get('created_at'): + result['created_at'] = result['created_at'].isoformat() + if result.get('updated_at'): + result['updated_at'] = result['updated_at'].isoformat() + + return result + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error updating help command {command_id}: {e}") + raise HTTPException(status_code=500, detail=str(e)) + finally: + db.close() + + +@router.patch('/{command_id}/restore', include_in_schema=PRIVATE_IN_SCHEMA) +@handle_db_errors +async def restore_help_command_endpoint( + command_id: int, + token: str = Depends(oauth2_scheme) +): + """Restore a soft-deleted help command""" + if not valid_token(token): + logger.warning(f'restore_help_command - Bad Token: {token}') + raise HTTPException(status_code=401, detail='Unauthorized') + + try: + # Get the command + help_cmd = HelpCommand.get_or_none(HelpCommand.id == command_id) + if not help_cmd: + raise HTTPException(status_code=404, detail=f"Help command {command_id} not found") + + # Restore the command + help_cmd.restore() + + # Return restored command + result = model_to_dict(help_cmd) + if result.get('created_at'): + result['created_at'] = result['created_at'].isoformat() + if result.get('updated_at'): + result['updated_at'] = result['updated_at'].isoformat() + + return result + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error restoring help command {command_id}: {e}") + raise HTTPException(status_code=500, detail=str(e)) + finally: + db.close() + + +@router.delete('/{command_id}', include_in_schema=PRIVATE_IN_SCHEMA) +@handle_db_errors +async def delete_help_command_endpoint( + command_id: int, + token: str = Depends(oauth2_scheme) +): + """Soft delete a help command""" + if not valid_token(token): + logger.warning(f'delete_help_command - Bad Token: {token}') + raise HTTPException(status_code=401, detail='Unauthorized') + + try: + # Get the command + help_cmd = HelpCommand.get_or_none(HelpCommand.id == command_id) + if not help_cmd: + raise HTTPException(status_code=404, detail=f"Help command {command_id} not found") + + # Soft delete the command + help_cmd.soft_delete() + + return {"message": f"Help command {command_id} deleted successfully"} + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error deleting help command {command_id}: {e}") + raise HTTPException(status_code=500, detail=str(e)) + finally: + db.close() + + +@router.get('/stats') +@handle_db_errors +async def get_help_command_stats(): + """Get comprehensive statistics about help commands""" + try: + # Get basic counts + total_commands = HelpCommand.select().count() + active_commands = HelpCommand.select().where(HelpCommand.is_active == True).count() + + # Get total views + total_views = HelpCommand.select(fn.SUM(HelpCommand.view_count)).where( + HelpCommand.is_active == True + ).scalar() or 0 + + # Get most viewed command + most_viewed = HelpCommand.get_most_viewed(limit=1).first() + most_viewed_command = None + if most_viewed: + most_viewed_dict = model_to_dict(most_viewed) + if most_viewed_dict.get('created_at'): + most_viewed_dict['created_at'] = most_viewed_dict['created_at'].isoformat() + if most_viewed_dict.get('updated_at'): + most_viewed_dict['updated_at'] = most_viewed_dict['updated_at'].isoformat() + most_viewed_command = most_viewed_dict + + # Get recent commands count (last 7 days) + week_ago = datetime.now() - timedelta(days=7) + recent_count = HelpCommand.select().where( + (HelpCommand.created_at >= week_ago) & + (HelpCommand.is_active == True) + ).count() + + return HelpCommandStatsResponse( + total_commands=total_commands, + active_commands=active_commands, + total_views=int(total_views), + most_viewed_command=most_viewed_command, + recent_commands_count=recent_count + ) + + except Exception as e: + logger.error(f"Error getting help command stats: {e}") + raise HTTPException(status_code=500, detail=str(e)) + finally: + db.close() + + +# Special endpoints for Discord bot integration +@router.get('/by_name/{command_name}') +@handle_db_errors +async def get_help_command_by_name_endpoint( + command_name: str, + include_inactive: bool = Query(False) +): + """Get a help command by name (for Discord bot)""" + try: + help_cmd = HelpCommand.get_by_name(command_name, include_inactive=include_inactive) + + if not help_cmd: + raise HTTPException(status_code=404, detail=f"Help topic '{command_name}' not found") + + result = model_to_dict(help_cmd) + if result.get('created_at'): + result['created_at'] = result['created_at'].isoformat() + if result.get('updated_at'): + result['updated_at'] = result['updated_at'].isoformat() + + return result + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting help command by name '{command_name}': {e}") + raise HTTPException(status_code=500, detail=str(e)) + finally: + db.close() + + +@router.patch('/by_name/{command_name}/view', include_in_schema=PRIVATE_IN_SCHEMA) +@handle_db_errors +async def increment_view_count( + command_name: str, + token: str = Depends(oauth2_scheme) +): + """Increment view count for a help command""" + if not valid_token(token): + logger.warning(f'increment_view_count - Bad Token: {token}') + raise HTTPException(status_code=401, detail='Unauthorized') + + try: + help_cmd = HelpCommand.get_by_name(command_name) + + if not help_cmd: + raise HTTPException(status_code=404, detail=f"Help topic '{command_name}' not found") + + # Increment view count + help_cmd.increment_view_count() + + # Return updated command + result = model_to_dict(help_cmd) + if result.get('created_at'): + result['created_at'] = result['created_at'].isoformat() + if result.get('updated_at'): + result['updated_at'] = result['updated_at'].isoformat() + + return result + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error incrementing view count for '{command_name}': {e}") + raise HTTPException(status_code=500, detail=str(e)) + finally: + db.close() + + +@router.get('/autocomplete') +@handle_db_errors +async def get_help_names_for_autocomplete( + q: str = Query(""), + limit: int = Query(25, ge=1, le=100) +): + """Get help command names for Discord autocomplete""" + try: + if q: + results = HelpCommand.search_by_name(q, limit=limit) + else: + results = HelpCommand.get_all_active().limit(limit) + + # Return list of dictionaries with name, title, category + return { + 'results': [ + { + 'name': help_cmd.name, + 'title': help_cmd.title, + 'category': help_cmd.category + } + for help_cmd in results + ] + } + + except Exception as e: + logger.error(f"Error getting help names for autocomplete: {e}") + raise HTTPException(status_code=500, detail=str(e)) + finally: + db.close() + + +@router.get('/{command_id}') +@handle_db_errors +async def get_help_command(command_id: int): + """Get a single help command by ID""" + try: + help_cmd = HelpCommand.get_or_none(HelpCommand.id == command_id) + + if not help_cmd: + raise HTTPException(status_code=404, detail=f"Help command {command_id} not found") + + result = model_to_dict(help_cmd) + if result.get('created_at'): + result['created_at'] = result['created_at'].isoformat() + if result.get('updated_at'): + result['updated_at'] = result['updated_at'].isoformat() + + return result + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting help command {command_id}: {e}") + raise HTTPException(status_code=500, detail=str(e)) + finally: + db.close()