All checks were successful
Build Docker Image / build (pull_request) Successful in 3m38s
aiohttp follows 307 redirects but converts POST to GET, silently
dropping the request body. Standardize all @router.post('') to
@router.post('/') so the canonical URL always has a trailing slash,
preventing 307 redirects when clients POST with trailing slashes.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1083 lines
37 KiB
Python
1083 lines
37 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, Query, Response
|
|
from typing import List, Optional, Dict, Any
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from pydantic import BaseModel, Field
|
|
import json
|
|
from playhouse.shortcuts import model_to_dict
|
|
from peewee import fn
|
|
|
|
from ..dependencies import (
|
|
oauth2_scheme,
|
|
valid_token,
|
|
PRIVATE_IN_SCHEMA,
|
|
handle_db_errors,
|
|
)
|
|
from ..db_engine import db, CustomCommand, CustomCommandCreator
|
|
|
|
logger = logging.getLogger("database_api")
|
|
|
|
router = APIRouter(prefix="/api/v3/custom_commands", tags=["custom_commands"])
|
|
|
|
|
|
# Pydantic Models for API
|
|
class CustomCommandCreatorModel(BaseModel):
|
|
id: Optional[int] = None # Optional for POST (auto-generated), required on response
|
|
discord_id: int
|
|
username: str
|
|
display_name: Optional[str] = None
|
|
created_at: str
|
|
total_commands: int = 0
|
|
active_commands: int = 0
|
|
|
|
|
|
class CustomCommandModel(BaseModel):
|
|
id: Optional[int] = None
|
|
name: str = Field(..., min_length=2, max_length=32)
|
|
content: str = Field(..., min_length=1, max_length=2000)
|
|
creator_id: int
|
|
creator: Optional[CustomCommandCreatorModel] = None
|
|
created_at: Optional[str] = None
|
|
updated_at: Optional[str] = None
|
|
last_used: Optional[str] = None
|
|
use_count: int = 0
|
|
warning_sent: bool = False
|
|
is_active: bool = True
|
|
tags: Optional[List[str]] = None
|
|
|
|
|
|
class CustomCommandListResponse(BaseModel):
|
|
custom_commands: List[CustomCommandModel]
|
|
total_count: int
|
|
page: int
|
|
page_size: int
|
|
total_pages: int
|
|
has_more: bool
|
|
|
|
|
|
class CustomCommandStatsResponse(BaseModel):
|
|
total_commands: int
|
|
active_commands: int
|
|
total_creators: int
|
|
total_uses: int
|
|
most_popular_command: Optional[Dict[str, Any]] = None
|
|
most_active_creator: Optional[Dict[str, Any]] = None
|
|
recent_commands_count: int = 0
|
|
commands_needing_warning: int = 0
|
|
commands_eligible_for_deletion: int = 0
|
|
|
|
|
|
# Helper functions
|
|
def convert_datetime_to_iso(data_dict, fields=None):
|
|
"""Convert datetime objects to ISO strings in a dictionary."""
|
|
if fields is None:
|
|
fields = ["created_at", "updated_at", "last_used"]
|
|
for field in fields:
|
|
if data_dict.get(field) and hasattr(data_dict[field], "isoformat"):
|
|
data_dict[field] = data_dict[field].isoformat()
|
|
return data_dict
|
|
|
|
|
|
def update_creator_stats(creator_id: int):
|
|
"""Update creator command counts"""
|
|
creator = CustomCommandCreator.get_or_none(CustomCommandCreator.id == creator_id)
|
|
if not creator:
|
|
return
|
|
|
|
total = CustomCommand.select().where(CustomCommand.creator == creator).count()
|
|
active = (
|
|
CustomCommand.select()
|
|
.where((CustomCommand.creator == creator) & (CustomCommand.is_active == True))
|
|
.count()
|
|
)
|
|
|
|
creator.total_commands = total
|
|
creator.active_commands = active
|
|
creator.save()
|
|
|
|
|
|
def get_custom_command_by_name(name: str):
|
|
"""Get a custom command by name with creator info"""
|
|
command = (
|
|
CustomCommand.select(CustomCommand, CustomCommandCreator)
|
|
.join(
|
|
CustomCommandCreator, on=(CustomCommand.creator == CustomCommandCreator.id)
|
|
)
|
|
.where(fn.LOWER(CustomCommand.name) == name.lower())
|
|
.first()
|
|
)
|
|
|
|
if command:
|
|
result = model_to_dict(command, recurse=False)
|
|
# Ensure creator_id is in the result (it should be from model_to_dict, but make sure)
|
|
result["creator_id"] = command.creator.id
|
|
result["creator_discord_id"] = command.creator.discord_id
|
|
result["creator_username"] = command.creator.username
|
|
result["creator_display_name"] = command.creator.display_name
|
|
return result
|
|
return None
|
|
|
|
|
|
def get_custom_command_by_id(command_id: int):
|
|
"""Get a custom command by ID with creator info"""
|
|
command = (
|
|
CustomCommand.select(CustomCommand, CustomCommandCreator)
|
|
.join(
|
|
CustomCommandCreator, on=(CustomCommand.creator == CustomCommandCreator.id)
|
|
)
|
|
.where(CustomCommand.id == command_id)
|
|
.first()
|
|
)
|
|
|
|
if command:
|
|
result = model_to_dict(command, recurse=False)
|
|
result["creator_db_id"] = command.creator.id
|
|
result["creator_discord_id"] = command.creator.discord_id
|
|
result["creator_username"] = command.creator.username
|
|
result["creator_display_name"] = command.creator.display_name
|
|
result["creator_created_at"] = command.creator.created_at
|
|
result["total_commands"] = command.creator.total_commands
|
|
result["active_commands"] = command.creator.active_commands
|
|
return result
|
|
return None
|
|
|
|
|
|
def create_custom_command(command_data: dict) -> int:
|
|
"""Create a new custom command and return its ID"""
|
|
# Convert tags list to JSON if present
|
|
if "tags" in command_data and isinstance(command_data["tags"], list):
|
|
command_data["tags"] = json.dumps(command_data["tags"])
|
|
|
|
# Set created_at if not provided
|
|
if "created_at" not in command_data:
|
|
command_data["created_at"] = datetime.now()
|
|
|
|
command = CustomCommand.create(**command_data)
|
|
return command.id
|
|
|
|
|
|
def update_custom_command(command_id: int, update_data: dict):
|
|
"""Update an existing custom command"""
|
|
# Convert tags list to JSON if present
|
|
if "tags" in update_data and isinstance(update_data["tags"], list):
|
|
update_data["tags"] = json.dumps(update_data["tags"])
|
|
|
|
query = CustomCommand.update(**update_data).where(CustomCommand.id == command_id)
|
|
query.execute()
|
|
|
|
|
|
def delete_custom_command(command_id: int):
|
|
"""Delete a custom command"""
|
|
query = CustomCommand.delete().where(CustomCommand.id == command_id)
|
|
query.execute()
|
|
|
|
|
|
def get_creator_by_discord_id(discord_id: int):
|
|
"""Get a creator by Discord ID"""
|
|
creator = CustomCommandCreator.get_or_none(
|
|
CustomCommandCreator.discord_id == str(discord_id)
|
|
)
|
|
if creator:
|
|
return model_to_dict(creator)
|
|
return None
|
|
|
|
|
|
def create_creator(creator_data: dict) -> int:
|
|
"""Create a new creator and return their ID"""
|
|
if "created_at" not in creator_data:
|
|
creator_data["created_at"] = datetime.now()
|
|
|
|
creator = CustomCommandCreator.create(**creator_data)
|
|
return creator.id
|
|
|
|
|
|
# API Endpoints
|
|
|
|
|
|
@router.get("")
|
|
@handle_db_errors
|
|
async def get_custom_commands(
|
|
name: Optional[str] = None,
|
|
creator_discord_id: Optional[int] = None,
|
|
min_uses: Optional[int] = None,
|
|
max_days_unused: Optional[int] = None,
|
|
is_active: Optional[bool] = True,
|
|
sort: Optional[str] = "name",
|
|
page: int = Query(1, ge=1),
|
|
page_size: int = Query(25, ge=1, le=100),
|
|
):
|
|
"""Get custom commands with filtering and pagination"""
|
|
try:
|
|
# Build WHERE clause
|
|
where_conditions = []
|
|
params = []
|
|
|
|
if name is not None:
|
|
where_conditions.append("cc.name ILIKE %s")
|
|
params.append(f"%{name}%")
|
|
|
|
if creator_discord_id is not None:
|
|
where_conditions.append("creator.discord_id = %s")
|
|
params.append(creator_discord_id)
|
|
|
|
if min_uses is not None:
|
|
where_conditions.append("cc.use_count >= %s")
|
|
params.append(min_uses)
|
|
|
|
if max_days_unused is not None:
|
|
cutoff_date = (datetime.now() - timedelta(days=max_days_unused)).isoformat()
|
|
where_conditions.append("cc.last_used >= %s")
|
|
params.append(cutoff_date)
|
|
|
|
if is_active is not None:
|
|
where_conditions.append("cc.is_active = %s")
|
|
params.append(is_active)
|
|
|
|
where_clause = (
|
|
"WHERE " + " AND ".join(where_conditions) if where_conditions else ""
|
|
)
|
|
|
|
# Build ORDER BY clause
|
|
sort_mapping = {
|
|
"name": "cc.name",
|
|
"created_at": "cc.created_at",
|
|
"last_used": "cc.last_used",
|
|
"use_count": "cc.use_count",
|
|
"creator": "creator.username",
|
|
}
|
|
|
|
if sort.startswith("-"):
|
|
order_direction = "DESC"
|
|
sort_field = sort[1:]
|
|
else:
|
|
order_direction = "ASC"
|
|
sort_field = sort
|
|
|
|
order_by = sort_mapping.get(sort_field, "cc.name")
|
|
order_clause = f"ORDER BY {order_by} {order_direction}"
|
|
|
|
# Get total count
|
|
count_sql = f"""
|
|
SELECT COUNT(*)
|
|
FROM custom_commands cc
|
|
LEFT JOIN custom_command_creators creator ON cc.creator_id = creator.id
|
|
{where_clause}
|
|
"""
|
|
total_count = db.execute_sql(count_sql, params).fetchone()[0]
|
|
|
|
# Calculate pagination
|
|
offset = (page - 1) * page_size
|
|
total_pages = (total_count + page_size - 1) // page_size
|
|
|
|
# Get commands
|
|
sql = f"""
|
|
SELECT cc.*, creator.discord_id as creator_discord_id,
|
|
creator.username as creator_username,
|
|
creator.display_name as creator_display_name
|
|
FROM custom_commands cc
|
|
LEFT JOIN custom_command_creators creator ON cc.creator_id = creator.id
|
|
{where_clause}
|
|
{order_clause}
|
|
LIMIT %s OFFSET %s
|
|
"""
|
|
|
|
params.extend([page_size, offset])
|
|
cursor3 = db.execute_sql(sql, params)
|
|
results = cursor3.fetchall()
|
|
|
|
# Convert to CustomCommandModel objects with creator info
|
|
commands = []
|
|
if results:
|
|
columns3 = [desc[0] for desc in cursor3.description]
|
|
for row in results:
|
|
command_dict = dict(zip(columns3, row))
|
|
|
|
# Parse tags if they exist
|
|
if command_dict.get("tags"):
|
|
try:
|
|
command_dict["tags"] = json.loads(command_dict["tags"])
|
|
except:
|
|
command_dict["tags"] = []
|
|
|
|
# Get full creator information
|
|
creator_id = command_dict["creator_id"]
|
|
creator_cursor = db.execute_sql(
|
|
"SELECT * FROM custom_command_creators WHERE id = %s", (creator_id,)
|
|
)
|
|
creator_result = creator_cursor.fetchone()
|
|
|
|
if creator_result:
|
|
# Create complete creator object
|
|
creator_columns = [desc[0] for desc in creator_cursor.description]
|
|
creator_dict = dict(zip(creator_columns, creator_result))
|
|
# Convert datetime to ISO string
|
|
if creator_dict.get("created_at") and hasattr(
|
|
creator_dict["created_at"], "isoformat"
|
|
):
|
|
creator_dict["created_at"] = creator_dict[
|
|
"created_at"
|
|
].isoformat()
|
|
try:
|
|
creator_model = CustomCommandCreatorModel(**creator_dict)
|
|
command_dict["creator"] = creator_model
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error creating CustomCommandCreatorModel: {e}, data: {creator_dict}"
|
|
)
|
|
command_dict["creator"] = None
|
|
else:
|
|
# No creator found, set to None
|
|
command_dict["creator"] = None
|
|
|
|
# Remove the individual creator fields now that we have the creator object
|
|
command_dict.pop("creator_discord_id", None)
|
|
command_dict.pop("creator_username", None)
|
|
command_dict.pop("creator_display_name", None)
|
|
|
|
# Convert datetime fields to ISO strings
|
|
for field in ["created_at", "updated_at", "last_used"]:
|
|
if command_dict.get(field) and hasattr(
|
|
command_dict[field], "isoformat"
|
|
):
|
|
command_dict[field] = command_dict[field].isoformat()
|
|
|
|
# Create CustomCommandModel instance
|
|
try:
|
|
command_model = CustomCommandModel(**command_dict)
|
|
commands.append(command_model)
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error creating CustomCommandModel: {e}, data: {command_dict}"
|
|
)
|
|
# Skip invalid commands rather than failing the entire request
|
|
continue
|
|
|
|
return CustomCommandListResponse(
|
|
custom_commands=commands,
|
|
total_count=total_count,
|
|
page=page,
|
|
page_size=page_size,
|
|
total_pages=total_pages,
|
|
has_more=page < total_pages,
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting custom commands: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# Move this route to after the specific string routes
|
|
|
|
|
|
@router.post("/", include_in_schema=PRIVATE_IN_SCHEMA)
|
|
@handle_db_errors
|
|
async def create_custom_command_endpoint(
|
|
command: CustomCommandModel, token: str = Depends(oauth2_scheme)
|
|
):
|
|
"""Create a new custom command"""
|
|
if not valid_token(token):
|
|
logger.warning(f"create_custom_command - Bad Token: {token}")
|
|
raise HTTPException(status_code=401, detail="Unauthorized")
|
|
|
|
try:
|
|
# Check if command name already exists
|
|
existing = get_custom_command_by_name(command.name)
|
|
if existing:
|
|
raise HTTPException(
|
|
status_code=409, detail=f"Command '{command.name}' already exists"
|
|
)
|
|
|
|
# Create the command
|
|
command_data = command.model_dump(exclude={"id", "creator"})
|
|
command_id = create_custom_command(command_data)
|
|
|
|
# Update creator stats
|
|
update_creator_stats(command.creator_id)
|
|
|
|
# Return the created command
|
|
result = get_custom_command_by_id(command_id)
|
|
command_dict = dict(result)
|
|
|
|
# Convert datetime fields
|
|
convert_datetime_to_iso(command_dict)
|
|
|
|
if command_dict.get("tags"):
|
|
try:
|
|
command_dict["tags"] = json.loads(command_dict["tags"])
|
|
except:
|
|
command_dict["tags"] = []
|
|
|
|
creator_created_at = command_dict.pop("creator_created_at")
|
|
if hasattr(creator_created_at, "isoformat"):
|
|
creator_created_at = creator_created_at.isoformat()
|
|
|
|
command_dict["creator"] = {
|
|
"id": command_dict.pop("creator_db_id"),
|
|
"discord_id": command_dict.pop("creator_discord_id"),
|
|
"username": command_dict.pop("creator_username"),
|
|
"display_name": command_dict.pop("creator_display_name"),
|
|
"created_at": creator_created_at,
|
|
"total_commands": command_dict.pop("total_commands"),
|
|
"active_commands": command_dict.pop("active_commands"),
|
|
}
|
|
|
|
return command_dict
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error creating custom command: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@router.put("/{command_id}", include_in_schema=PRIVATE_IN_SCHEMA)
|
|
@handle_db_errors
|
|
async def update_custom_command_endpoint(
|
|
command_id: int, command: CustomCommandModel, token: str = Depends(oauth2_scheme)
|
|
):
|
|
"""Update an existing custom command"""
|
|
if not valid_token(token):
|
|
logger.warning(f"update_custom_command - Bad Token: {token}")
|
|
raise HTTPException(status_code=401, detail="Unauthorized")
|
|
|
|
try:
|
|
# Check if command exists
|
|
existing = get_custom_command_by_id(command_id)
|
|
if not existing:
|
|
raise HTTPException(
|
|
status_code=404, detail=f"Custom command {command_id} not found"
|
|
)
|
|
|
|
# Update the command
|
|
update_data = command.model_dump(exclude={"id", "creator"})
|
|
update_data["updated_at"] = datetime.now().isoformat()
|
|
update_custom_command(command_id, update_data)
|
|
|
|
# Return updated command
|
|
result = get_custom_command_by_id(command_id)
|
|
command_dict = dict(result)
|
|
|
|
# Convert datetime fields
|
|
convert_datetime_to_iso(command_dict)
|
|
|
|
if command_dict.get("tags"):
|
|
try:
|
|
command_dict["tags"] = json.loads(command_dict["tags"])
|
|
except:
|
|
command_dict["tags"] = []
|
|
|
|
creator_created_at = command_dict.pop("creator_created_at")
|
|
if hasattr(creator_created_at, "isoformat"):
|
|
creator_created_at = creator_created_at.isoformat()
|
|
|
|
command_dict["creator"] = {
|
|
"id": command_dict.pop("creator_db_id"),
|
|
"discord_id": command_dict.pop("creator_discord_id"),
|
|
"username": command_dict.pop("creator_username"),
|
|
"display_name": command_dict.pop("creator_display_name"),
|
|
"created_at": creator_created_at,
|
|
"total_commands": command_dict.pop("total_commands"),
|
|
"active_commands": command_dict.pop("active_commands"),
|
|
}
|
|
|
|
return command_dict
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error updating custom command {command_id}: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@router.patch("/{command_id}", include_in_schema=PRIVATE_IN_SCHEMA)
|
|
@handle_db_errors
|
|
async def patch_custom_command(
|
|
command_id: int,
|
|
token: str = Depends(oauth2_scheme),
|
|
content: Optional[str] = None,
|
|
tags: Optional[List[str]] = None,
|
|
use_count: Optional[int] = None,
|
|
last_used: Optional[str] = None,
|
|
warning_sent: Optional[bool] = None,
|
|
is_active: Optional[bool] = None,
|
|
):
|
|
"""Partially update a custom command"""
|
|
if not valid_token(token):
|
|
logger.warning(f"patch_custom_command - Bad Token: {token}")
|
|
raise HTTPException(status_code=401, detail="Unauthorized")
|
|
|
|
try:
|
|
# Check if command exists
|
|
existing = get_custom_command_by_id(command_id)
|
|
if not existing:
|
|
raise HTTPException(
|
|
status_code=404, detail=f"Custom command {command_id} not found"
|
|
)
|
|
|
|
# Build update data
|
|
update_data = {}
|
|
if content is not None:
|
|
update_data["content"] = content
|
|
update_data["updated_at"] = datetime.now().isoformat()
|
|
if tags is not None:
|
|
update_data["tags"] = tags
|
|
if use_count is not None:
|
|
update_data["use_count"] = use_count
|
|
if last_used is not None:
|
|
update_data["last_used"] = last_used
|
|
if warning_sent is not None:
|
|
update_data["warning_sent"] = warning_sent
|
|
if is_active is not None:
|
|
update_data["is_active"] = is_active
|
|
|
|
if not update_data:
|
|
raise HTTPException(status_code=400, detail="No fields to update")
|
|
|
|
# Update the command
|
|
update_custom_command(command_id, update_data)
|
|
|
|
# Return updated command
|
|
result = get_custom_command_by_id(command_id)
|
|
command_dict = dict(result)
|
|
|
|
# Convert datetime fields to ISO strings
|
|
convert_datetime_to_iso(command_dict)
|
|
|
|
if command_dict.get("tags"):
|
|
try:
|
|
command_dict["tags"] = json.loads(command_dict["tags"])
|
|
except:
|
|
command_dict["tags"] = []
|
|
|
|
creator_created_at = command_dict.pop("creator_created_at")
|
|
if hasattr(creator_created_at, "isoformat"):
|
|
creator_created_at = creator_created_at.isoformat()
|
|
|
|
command_dict["creator"] = {
|
|
"id": command_dict.pop("creator_db_id"),
|
|
"discord_id": command_dict.pop("creator_discord_id"),
|
|
"username": command_dict.pop("creator_username"),
|
|
"display_name": command_dict.pop("creator_display_name"),
|
|
"created_at": creator_created_at,
|
|
"total_commands": command_dict.pop("total_commands"),
|
|
"active_commands": command_dict.pop("active_commands"),
|
|
}
|
|
|
|
return command_dict
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error patching custom command {command_id}: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@router.delete("/{command_id}", include_in_schema=PRIVATE_IN_SCHEMA)
|
|
@handle_db_errors
|
|
async def delete_custom_command_endpoint(
|
|
command_id: int, token: str = Depends(oauth2_scheme)
|
|
):
|
|
"""Delete a custom command"""
|
|
if not valid_token(token):
|
|
logger.warning(f"delete_custom_command - Bad Token: {token}")
|
|
raise HTTPException(status_code=401, detail="Unauthorized")
|
|
|
|
try:
|
|
# Check if command exists
|
|
existing = get_custom_command_by_id(command_id)
|
|
if not existing:
|
|
raise HTTPException(
|
|
status_code=404, detail=f"Custom command {command_id} not found"
|
|
)
|
|
|
|
creator_id = existing["creator_db_id"]
|
|
|
|
# Delete the command
|
|
delete_custom_command(command_id)
|
|
|
|
# Update creator stats
|
|
update_creator_stats(creator_id)
|
|
|
|
return {"message": f"Custom command {command_id} deleted successfully"}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error deleting custom command {command_id}: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# Creator endpoints
|
|
@router.get("/creators")
|
|
@handle_db_errors
|
|
async def get_creators(
|
|
discord_id: Optional[int] = None,
|
|
page: int = Query(1, ge=1),
|
|
page_size: int = Query(25, ge=1, le=100),
|
|
):
|
|
"""Get custom command creators with optional filtering"""
|
|
try:
|
|
# Build WHERE clause
|
|
where_conditions = []
|
|
params = []
|
|
|
|
if discord_id is not None:
|
|
where_conditions.append("discord_id = %s")
|
|
params.append(discord_id)
|
|
|
|
where_clause = (
|
|
"WHERE " + " AND ".join(where_conditions) if where_conditions else ""
|
|
)
|
|
|
|
# Get total count
|
|
count_sql = f"SELECT COUNT(*) FROM custom_command_creators {where_clause}"
|
|
total_count = db.execute_sql(count_sql, params).fetchone()[0]
|
|
|
|
# Calculate pagination
|
|
offset = (page - 1) * page_size
|
|
total_pages = (total_count + page_size - 1) // page_size
|
|
|
|
# Get creators
|
|
sql = f"""
|
|
SELECT * FROM custom_command_creators
|
|
{where_clause}
|
|
ORDER BY username
|
|
LIMIT %s OFFSET %s
|
|
"""
|
|
|
|
params.extend([page_size, offset])
|
|
cursor = db.execute_sql(sql, params)
|
|
results = cursor.fetchall()
|
|
|
|
# Convert to dict format
|
|
creators = []
|
|
if results:
|
|
columns = [desc[0] for desc in cursor.description]
|
|
for row in results:
|
|
creator_dict = dict(zip(columns, row))
|
|
# Convert datetime to ISO string
|
|
if creator_dict.get("created_at") and hasattr(
|
|
creator_dict["created_at"], "isoformat"
|
|
):
|
|
creator_dict["created_at"] = creator_dict["created_at"].isoformat()
|
|
creators.append(creator_dict)
|
|
|
|
return {
|
|
"creators": creators,
|
|
"total_count": total_count,
|
|
"page": page,
|
|
"page_size": page_size,
|
|
"total_pages": total_pages,
|
|
"has_more": page < total_pages,
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting creators: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@router.post("/creators", include_in_schema=PRIVATE_IN_SCHEMA)
|
|
@handle_db_errors
|
|
async def create_creator_endpoint(
|
|
creator: CustomCommandCreatorModel, token: str = Depends(oauth2_scheme)
|
|
):
|
|
"""Create a new command creator"""
|
|
if not valid_token(token):
|
|
logger.warning(f"create_creator - Bad Token: {token}")
|
|
raise HTTPException(status_code=401, detail="Unauthorized")
|
|
|
|
try:
|
|
# Check if creator already exists
|
|
existing = get_creator_by_discord_id(creator.discord_id)
|
|
if existing:
|
|
raise HTTPException(
|
|
status_code=409,
|
|
detail=f"Creator with Discord ID {creator.discord_id} already exists",
|
|
)
|
|
|
|
# Create the creator
|
|
creator_data = creator.model_dump(exclude={"id"})
|
|
creator_id = create_creator(creator_data)
|
|
|
|
# Return the created creator
|
|
cursor = db.execute_sql(
|
|
"SELECT * FROM custom_command_creators WHERE id = %s", (creator_id,)
|
|
)
|
|
result = cursor.fetchone()
|
|
if result:
|
|
columns = [desc[0] for desc in cursor.description]
|
|
return dict(zip(columns, result))
|
|
else:
|
|
raise HTTPException(
|
|
status_code=500, detail="Failed to retrieve created creator"
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error creating creator: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@router.get("/stats")
|
|
@handle_db_errors
|
|
async def get_custom_command_stats():
|
|
"""Get comprehensive statistics about custom commands"""
|
|
try:
|
|
# Get basic counts
|
|
total_commands = db.execute_sql(
|
|
"SELECT COUNT(*) FROM custom_commands"
|
|
).fetchone()[0]
|
|
active_commands = db.execute_sql(
|
|
"SELECT COUNT(*) FROM custom_commands WHERE is_active = TRUE"
|
|
).fetchone()[0]
|
|
total_creators = db.execute_sql(
|
|
"SELECT COUNT(*) FROM custom_command_creators"
|
|
).fetchone()[0]
|
|
|
|
# Get total uses
|
|
total_uses_result = db.execute_sql(
|
|
"SELECT SUM(use_count) FROM custom_commands WHERE is_active = TRUE"
|
|
).fetchone()
|
|
total_uses = total_uses_result[0] if total_uses_result[0] else 0
|
|
|
|
# Get most popular command
|
|
cursor = db.execute_sql("""
|
|
SELECT cc.*, creator.discord_id as creator_discord_id,
|
|
creator.username as creator_username,
|
|
creator.display_name as creator_display_name
|
|
FROM custom_commands cc
|
|
LEFT JOIN custom_command_creators creator ON cc.creator_id = creator.id
|
|
WHERE cc.is_active = TRUE
|
|
ORDER BY cc.use_count DESC
|
|
LIMIT 1
|
|
""")
|
|
|
|
most_popular_result = cursor.fetchone()
|
|
most_popular_command = None
|
|
if most_popular_result:
|
|
columns = [desc[0] for desc in cursor.description]
|
|
command_dict = dict(zip(columns, most_popular_result))
|
|
# Convert datetime fields to ISO strings
|
|
for field in ["created_at", "updated_at", "last_used"]:
|
|
if command_dict.get(field) and hasattr(
|
|
command_dict[field], "isoformat"
|
|
):
|
|
command_dict[field] = command_dict[field].isoformat()
|
|
if command_dict.get("tags"):
|
|
try:
|
|
command_dict["tags"] = json.loads(command_dict["tags"])
|
|
except:
|
|
command_dict["tags"] = []
|
|
command_dict["creator"] = {
|
|
"discord_id": command_dict.pop("creator_discord_id"),
|
|
"username": command_dict.pop("creator_username"),
|
|
"display_name": command_dict.pop("creator_display_name"),
|
|
}
|
|
most_popular_command = command_dict
|
|
|
|
# Get most active creator
|
|
cursor2 = db.execute_sql("""
|
|
SELECT * FROM custom_command_creators
|
|
ORDER BY active_commands DESC
|
|
LIMIT 1
|
|
""")
|
|
|
|
most_active_creator_result = cursor2.fetchone()
|
|
most_active_creator = None
|
|
if most_active_creator_result:
|
|
columns2 = [desc[0] for desc in cursor2.description]
|
|
most_active_creator = dict(zip(columns2, most_active_creator_result))
|
|
# Convert datetime to ISO string
|
|
if most_active_creator.get("created_at") and hasattr(
|
|
most_active_creator["created_at"], "isoformat"
|
|
):
|
|
most_active_creator["created_at"] = most_active_creator[
|
|
"created_at"
|
|
].isoformat()
|
|
|
|
# Get recent commands count (last 7 days)
|
|
week_ago = (datetime.now() - timedelta(days=7)).isoformat()
|
|
recent_count = db.execute_sql(
|
|
"""
|
|
SELECT COUNT(*) FROM custom_commands
|
|
WHERE created_at >= %s AND is_active = TRUE
|
|
""",
|
|
(week_ago,),
|
|
).fetchone()[0]
|
|
|
|
# Get cleanup stats
|
|
sixty_days_ago = (datetime.now() - timedelta(days=60)).isoformat()
|
|
ninety_days_ago = (datetime.now() - timedelta(days=90)).isoformat()
|
|
|
|
warning_count = db.execute_sql(
|
|
"""
|
|
SELECT COUNT(*) FROM custom_commands
|
|
WHERE last_used < %s AND warning_sent = FALSE AND is_active = TRUE
|
|
""",
|
|
(sixty_days_ago,),
|
|
).fetchone()[0]
|
|
|
|
deletion_count = db.execute_sql(
|
|
"""
|
|
SELECT COUNT(*) FROM custom_commands
|
|
WHERE last_used < %s AND is_active = TRUE
|
|
""",
|
|
(ninety_days_ago,),
|
|
).fetchone()[0]
|
|
|
|
return CustomCommandStatsResponse(
|
|
total_commands=total_commands,
|
|
active_commands=active_commands,
|
|
total_creators=total_creators,
|
|
total_uses=total_uses,
|
|
most_popular_command=most_popular_command,
|
|
most_active_creator=most_active_creator,
|
|
recent_commands_count=recent_count,
|
|
commands_needing_warning=warning_count,
|
|
commands_eligible_for_deletion=deletion_count,
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting custom command stats: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# Special endpoints for Discord bot integration
|
|
@router.get("/by_name/{command_name}")
|
|
@handle_db_errors
|
|
async def get_custom_command_by_name_endpoint(command_name: str):
|
|
"""Get a custom command by name (for Discord bot execution)"""
|
|
try:
|
|
result = get_custom_command_by_name(command_name)
|
|
|
|
if not result:
|
|
raise HTTPException(
|
|
status_code=404, detail=f"Custom command '{command_name}' not found"
|
|
)
|
|
|
|
command_dict = dict(result)
|
|
|
|
# Convert datetime fields to ISO strings
|
|
convert_datetime_to_iso(command_dict)
|
|
|
|
# Parse tags
|
|
if command_dict.get("tags"):
|
|
try:
|
|
command_dict["tags"] = json.loads(command_dict["tags"])
|
|
except:
|
|
command_dict["tags"] = []
|
|
|
|
# Add creator info - get full creator record
|
|
creator_id = command_dict["creator_id"]
|
|
creator_cursor = db.execute_sql(
|
|
"SELECT * FROM custom_command_creators WHERE id = %s", (creator_id,)
|
|
)
|
|
creator_result = creator_cursor.fetchone()
|
|
if creator_result:
|
|
creator_columns = [desc[0] for desc in creator_cursor.description]
|
|
creator_dict = dict(zip(creator_columns, creator_result))
|
|
# Convert creator datetime to ISO string
|
|
convert_datetime_to_iso(creator_dict, fields=["created_at"])
|
|
command_dict["creator"] = creator_dict
|
|
else:
|
|
# Fallback to basic info if full creator not found
|
|
command_dict["creator"] = {
|
|
"id": creator_id,
|
|
"discord_id": command_dict.pop("creator_discord_id"),
|
|
"username": command_dict.pop("creator_username"),
|
|
"display_name": command_dict.pop("creator_display_name"),
|
|
"created_at": command_dict[
|
|
"created_at"
|
|
], # Use command creation as fallback
|
|
"total_commands": 0,
|
|
"active_commands": 0,
|
|
}
|
|
|
|
# Remove the duplicate fields
|
|
command_dict.pop("creator_discord_id", None)
|
|
command_dict.pop("creator_username", None)
|
|
command_dict.pop("creator_display_name", None)
|
|
|
|
return command_dict
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error getting custom command by name '{command_name}': {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@router.patch("/by_name/{command_name}/execute", include_in_schema=PRIVATE_IN_SCHEMA)
|
|
@handle_db_errors
|
|
async def execute_custom_command(
|
|
command_name: str, token: str = Depends(oauth2_scheme)
|
|
):
|
|
"""Execute a custom command and update usage statistics"""
|
|
if not valid_token(token):
|
|
logger.warning(f"execute_custom_command - Bad Token: {token}")
|
|
raise HTTPException(status_code=401, detail="Unauthorized")
|
|
|
|
try:
|
|
result = get_custom_command_by_name(command_name)
|
|
|
|
if not result:
|
|
raise HTTPException(
|
|
status_code=404, detail=f"Custom command '{command_name}' not found"
|
|
)
|
|
|
|
command_dict = dict(result)
|
|
command_id = command_dict["id"]
|
|
|
|
# Update usage statistics
|
|
update_data = {
|
|
"last_used": datetime.now().isoformat(),
|
|
"use_count": command_dict["use_count"] + 1,
|
|
"warning_sent": False, # Reset warning on use
|
|
}
|
|
|
|
update_custom_command(command_id, update_data)
|
|
|
|
# Return updated command - get_custom_command_by_id already has all creator info
|
|
updated_result = get_custom_command_by_id(command_id)
|
|
updated_dict = dict(updated_result)
|
|
|
|
# Convert datetime fields to ISO strings
|
|
convert_datetime_to_iso(updated_dict)
|
|
|
|
if updated_dict.get("tags"):
|
|
try:
|
|
updated_dict["tags"] = json.loads(updated_dict["tags"])
|
|
except:
|
|
updated_dict["tags"] = []
|
|
|
|
# Build creator object from the fields returned by get_custom_command_by_id
|
|
creator_created_at = updated_dict.pop("creator_created_at")
|
|
if hasattr(creator_created_at, "isoformat"):
|
|
creator_created_at = creator_created_at.isoformat()
|
|
|
|
updated_dict["creator"] = {
|
|
"id": updated_dict.pop("creator_db_id"),
|
|
"discord_id": updated_dict.pop("creator_discord_id"),
|
|
"username": updated_dict.pop("creator_username"),
|
|
"display_name": updated_dict.pop("creator_display_name"),
|
|
"created_at": creator_created_at,
|
|
"total_commands": updated_dict.pop("total_commands"),
|
|
"active_commands": updated_dict.pop("active_commands"),
|
|
}
|
|
|
|
return updated_dict
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error executing custom command '{command_name}': {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@router.get("/autocomplete")
|
|
@handle_db_errors
|
|
async def get_command_names_for_autocomplete(
|
|
partial_name: str = "", limit: int = Query(25, ge=1, le=100)
|
|
):
|
|
"""Get command names for Discord autocomplete"""
|
|
try:
|
|
if partial_name:
|
|
results = db.execute_sql(
|
|
"""
|
|
SELECT name FROM custom_commands
|
|
WHERE is_active = TRUE AND name ILIKE %s
|
|
ORDER BY name
|
|
LIMIT %s
|
|
""",
|
|
(f"%{partial_name}%", limit),
|
|
).fetchall()
|
|
else:
|
|
results = db.execute_sql(
|
|
"""
|
|
SELECT name FROM custom_commands
|
|
WHERE is_active = TRUE
|
|
ORDER BY name
|
|
LIMIT %s
|
|
""",
|
|
(limit,),
|
|
).fetchall()
|
|
|
|
return [row[0] for row in results]
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting command names for autocomplete: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@router.get("/{command_id}")
|
|
@handle_db_errors
|
|
async def get_custom_command(command_id: int):
|
|
"""Get a single custom command by ID"""
|
|
try:
|
|
result = get_custom_command_by_id(command_id)
|
|
|
|
if not result:
|
|
raise HTTPException(
|
|
status_code=404, detail=f"Custom command {command_id} not found"
|
|
)
|
|
|
|
command_dict = dict(result)
|
|
|
|
# Convert datetime fields to ISO strings
|
|
convert_datetime_to_iso(command_dict)
|
|
|
|
# Parse tags
|
|
if command_dict.get("tags"):
|
|
try:
|
|
command_dict["tags"] = json.loads(command_dict["tags"])
|
|
except:
|
|
command_dict["tags"] = []
|
|
|
|
creator_created_at = command_dict.pop("creator_created_at")
|
|
if hasattr(creator_created_at, "isoformat"):
|
|
creator_created_at = creator_created_at.isoformat()
|
|
|
|
# Add creator info
|
|
command_dict["creator"] = {
|
|
"id": command_dict.pop("creator_db_id"),
|
|
"discord_id": command_dict.pop("creator_discord_id"),
|
|
"username": command_dict.pop("creator_username"),
|
|
"display_name": command_dict.pop("creator_display_name"),
|
|
"created_at": creator_created_at,
|
|
"total_commands": command_dict.pop("total_commands"),
|
|
"active_commands": command_dict.pop("active_commands"),
|
|
}
|
|
|
|
return command_dict
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error getting custom command {command_id}: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
finally:
|
|
db.close()
|