fix: validate sort_by parameter with Literal type in views.py (#36) #44

Merged
cal merged 1 commits from ai/major-domo-database-36 into next-release 2026-03-10 14:41:31 +00:00

View File

@ -3,239 +3,331 @@ from typing import List, Literal, Optional
import logging import logging
import pydantic import pydantic
from ..db_engine import SeasonBattingStats, SeasonPitchingStats, db, Manager, Team, Current, model_to_dict, fn, query_to_csv, StratPlay, StratGame from ..db_engine import (
from ..dependencies import add_cache_headers, cache_result, oauth2_scheme, valid_token, PRIVATE_IN_SCHEMA, handle_db_errors, update_season_batting_stats, update_season_pitching_stats, get_cache_stats SeasonBattingStats,
SeasonPitchingStats,
logger = logging.getLogger('discord_app') db,
Manager,
router = APIRouter( Team,
prefix='/api/v3/views', Current,
tags=['views'] model_to_dict,
fn,
query_to_csv,
StratPlay,
StratGame,
)
from ..dependencies import (
add_cache_headers,
cache_result,
oauth2_scheme,
valid_token,
PRIVATE_IN_SCHEMA,
handle_db_errors,
update_season_batting_stats,
update_season_pitching_stats,
get_cache_stats,
) )
@router.get('/season-stats/batting') logger = logging.getLogger("discord_app")
router = APIRouter(prefix="/api/v3/views", tags=["views"])
@router.get("/season-stats/batting")
@handle_db_errors @handle_db_errors
@add_cache_headers(max_age=10*60) @add_cache_headers(max_age=10 * 60)
@cache_result(ttl=5*60, key_prefix='season-batting') @cache_result(ttl=5 * 60, key_prefix="season-batting")
async def get_season_batting_stats( async def get_season_batting_stats(
season: Optional[int] = None, season: Optional[int] = None,
team_id: Optional[int] = None, team_id: Optional[int] = None,
player_id: Optional[int] = None, player_id: Optional[int] = None,
sbaplayer_id: Optional[int] = None, sbaplayer_id: Optional[int] = None,
min_pa: Optional[int] = None, # Minimum plate appearances min_pa: Optional[int] = None, # Minimum plate appearances
sort_by: str = "woba", # Default sort field sort_by: Literal[
sort_order: Literal['asc', 'desc'] = 'desc', # asc or desc "pa",
"ab",
"run",
"hit",
"double",
"triple",
"homerun",
"rbi",
"bb",
"so",
"bphr",
"bpfo",
"bp1b",
"bplo",
"gidp",
"hbp",
"sac",
"ibb",
"avg",
"obp",
"slg",
"ops",
"woba",
"k_pct",
"sb",
"cs",
] = "woba", # Sort field
sort_order: Literal["asc", "desc"] = "desc", # asc or desc
limit: Optional[int] = 200, limit: Optional[int] = 200,
offset: int = 0, offset: int = 0,
csv: Optional[bool] = False csv: Optional[bool] = False,
): ):
logger.info(f'Getting season {season} batting stats - team_id: {team_id}, player_id: {player_id}, min_pa: {min_pa}, sort_by: {sort_by}, sort_order: {sort_order}, limit: {limit}, offset: {offset}') logger.info(
f"Getting season {season} batting stats - team_id: {team_id}, player_id: {player_id}, min_pa: {min_pa}, sort_by: {sort_by}, sort_order: {sort_order}, limit: {limit}, offset: {offset}"
)
# Use the enhanced get_top_hitters method # Use the enhanced get_top_hitters method
query = SeasonBattingStats.get_top_hitters( query = SeasonBattingStats.get_top_hitters(
season=season, season=season,
stat=sort_by, stat=sort_by,
limit=limit if limit != 0 else None, limit=limit if limit != 0 else None,
desc=(sort_order.lower() == 'desc'), desc=(sort_order.lower() == "desc"),
team_id=team_id, team_id=team_id,
player_id=player_id, player_id=player_id,
sbaplayer_id=sbaplayer_id, sbaplayer_id=sbaplayer_id,
min_pa=min_pa, min_pa=min_pa,
offset=offset offset=offset,
) )
# Build applied filters for response # Build applied filters for response
applied_filters = {} applied_filters = {}
if season is not None: if season is not None:
applied_filters['season'] = season applied_filters["season"] = season
if team_id is not None: if team_id is not None:
applied_filters['team_id'] = team_id applied_filters["team_id"] = team_id
if player_id is not None: if player_id is not None:
applied_filters['player_id'] = player_id applied_filters["player_id"] = player_id
if min_pa is not None: if min_pa is not None:
applied_filters['min_pa'] = min_pa applied_filters["min_pa"] = min_pa
if csv: if csv:
return_val = query_to_csv(query) return_val = query_to_csv(query)
return Response(content=return_val, media_type='text/csv') return Response(content=return_val, media_type="text/csv")
else: else:
stat_list = [model_to_dict(stat) for stat in query] stat_list = [model_to_dict(stat) for stat in query]
return { return {"count": len(stat_list), "filters": applied_filters, "stats": stat_list}
'count': len(stat_list),
'filters': applied_filters,
'stats': stat_list
}
@router.post('/season-stats/batting/refresh', include_in_schema=PRIVATE_IN_SCHEMA) @router.post("/season-stats/batting/refresh", include_in_schema=PRIVATE_IN_SCHEMA)
@handle_db_errors @handle_db_errors
async def refresh_season_batting_stats( async def refresh_season_batting_stats(
season: int, season: int, token: str = Depends(oauth2_scheme)
token: str = Depends(oauth2_scheme)
) -> dict: ) -> dict:
""" """
Refresh batting stats for all players in a specific season. Refresh batting stats for all players in a specific season.
Useful for full season updates. Useful for full season updates.
""" """
if not valid_token(token): if not valid_token(token):
logger.warning(f'refresh_season_batting_stats - Bad Token: {token}') logger.warning(f"refresh_season_batting_stats - Bad Token: {token}")
raise HTTPException(status_code=401, detail='Unauthorized') raise HTTPException(status_code=401, detail="Unauthorized")
logger.info(f"Refreshing all batting stats for season {season}")
logger.info(f'Refreshing all batting stats for season {season}')
try: try:
# Get all player IDs who have stratplay records in this season # Get all player IDs who have stratplay records in this season
batter_ids = [row.batter_id for row in batter_ids = [
StratPlay.select(StratPlay.batter_id.distinct()) row.batter_id
.join(StratGame).where(StratGame.season == season)] for row in StratPlay.select(StratPlay.batter_id.distinct())
.join(StratGame)
.where(StratGame.season == season)
]
if batter_ids: if batter_ids:
update_season_batting_stats(batter_ids, season, db) update_season_batting_stats(batter_ids, season, db)
logger.info(f'Successfully refreshed {len(batter_ids)} players for season {season}') logger.info(
f"Successfully refreshed {len(batter_ids)} players for season {season}"
)
return { return {
'message': f'Season {season} batting stats refreshed', "message": f"Season {season} batting stats refreshed",
'players_updated': len(batter_ids) "players_updated": len(batter_ids),
} }
else: else:
logger.warning(f'No batting data found for season {season}') logger.warning(f"No batting data found for season {season}")
return { return {
'message': f'No batting data found for season {season}', "message": f"No batting data found for season {season}",
'players_updated': 0 "players_updated": 0,
} }
except Exception as e: except Exception as e:
logger.error(f'Error refreshing season {season}: {e}') logger.error(f"Error refreshing season {season}: {e}")
raise HTTPException(status_code=500, detail=f'Refresh failed: {str(e)}') raise HTTPException(status_code=500, detail=f"Refresh failed: {str(e)}")
@router.get('/season-stats/pitching') @router.get("/season-stats/pitching")
@handle_db_errors @handle_db_errors
@add_cache_headers(max_age=10*60) @add_cache_headers(max_age=10 * 60)
@cache_result(ttl=5*60, key_prefix='season-pitching') @cache_result(ttl=5 * 60, key_prefix="season-pitching")
async def get_season_pitching_stats( async def get_season_pitching_stats(
season: Optional[int] = None, season: Optional[int] = None,
team_id: Optional[int] = None, team_id: Optional[int] = None,
player_id: Optional[int] = None, player_id: Optional[int] = None,
sbaplayer_id: Optional[int] = None, sbaplayer_id: Optional[int] = None,
min_outs: Optional[int] = None, # Minimum outs pitched min_outs: Optional[int] = None, # Minimum outs pitched
sort_by: str = "era", # Default sort field sort_by: Literal[
sort_order: Literal['asc', 'desc'] = 'asc', # asc or desc (asc default for ERA) "tbf",
"outs",
"games",
"gs",
"win",
"loss",
"hold",
"saves",
"bsave",
"ir",
"irs",
"ab",
"run",
"e_run",
"hits",
"double",
"triple",
"homerun",
"bb",
"so",
"hbp",
"sac",
"ibb",
"gidp",
"sb",
"cs",
"bphr",
"bpfo",
"bp1b",
"bplo",
"wp",
"balk",
"wpa",
"era",
"whip",
"avg",
"obp",
"slg",
"ops",
"woba",
"hper9",
"kper9",
"bbper9",
"kperbb",
"lob_2outs",
"rbipercent",
"re24",
] = "era", # Sort field
sort_order: Literal["asc", "desc"] = "asc", # asc or desc (asc default for ERA)
limit: Optional[int] = 200, limit: Optional[int] = 200,
offset: int = 0, offset: int = 0,
csv: Optional[bool] = False csv: Optional[bool] = False,
): ):
logger.info(f'Getting season {season} pitching stats - team_id: {team_id}, player_id: {player_id}, min_outs: {min_outs}, sort_by: {sort_by}, sort_order: {sort_order}, limit: {limit}, offset: {offset}') logger.info(
f"Getting season {season} pitching stats - team_id: {team_id}, player_id: {player_id}, min_outs: {min_outs}, sort_by: {sort_by}, sort_order: {sort_order}, limit: {limit}, offset: {offset}"
)
# Use the get_top_pitchers method # Use the get_top_pitchers method
query = SeasonPitchingStats.get_top_pitchers( query = SeasonPitchingStats.get_top_pitchers(
season=season, season=season,
stat=sort_by, stat=sort_by,
limit=limit if limit != 0 else None, limit=limit if limit != 0 else None,
desc=(sort_order.lower() == 'desc'), desc=(sort_order.lower() == "desc"),
team_id=team_id, team_id=team_id,
player_id=player_id, player_id=player_id,
sbaplayer_id=sbaplayer_id, sbaplayer_id=sbaplayer_id,
min_outs=min_outs, min_outs=min_outs,
offset=offset offset=offset,
) )
# Build applied filters for response # Build applied filters for response
applied_filters = {} applied_filters = {}
if season is not None: if season is not None:
applied_filters['season'] = season applied_filters["season"] = season
if team_id is not None: if team_id is not None:
applied_filters['team_id'] = team_id applied_filters["team_id"] = team_id
if player_id is not None: if player_id is not None:
applied_filters['player_id'] = player_id applied_filters["player_id"] = player_id
if min_outs is not None: if min_outs is not None:
applied_filters['min_outs'] = min_outs applied_filters["min_outs"] = min_outs
if csv: if csv:
return_val = query_to_csv(query) return_val = query_to_csv(query)
return Response(content=return_val, media_type='text/csv') return Response(content=return_val, media_type="text/csv")
else: else:
stat_list = [model_to_dict(stat) for stat in query] stat_list = [model_to_dict(stat) for stat in query]
return { return {"count": len(stat_list), "filters": applied_filters, "stats": stat_list}
'count': len(stat_list),
'filters': applied_filters,
'stats': stat_list
}
@router.post('/season-stats/pitching/refresh', include_in_schema=PRIVATE_IN_SCHEMA) @router.post("/season-stats/pitching/refresh", include_in_schema=PRIVATE_IN_SCHEMA)
@handle_db_errors @handle_db_errors
async def refresh_season_pitching_stats( async def refresh_season_pitching_stats(
season: int, season: int, token: str = Depends(oauth2_scheme)
token: str = Depends(oauth2_scheme)
) -> dict: ) -> dict:
""" """
Refresh pitching statistics for a specific season by aggregating from individual games. Refresh pitching statistics for a specific season by aggregating from individual games.
Private endpoint - not included in public API documentation. Private endpoint - not included in public API documentation.
""" """
if not valid_token(token): if not valid_token(token):
logger.warning(f'refresh_season_batting_stats - Bad Token: {token}') logger.warning(f"refresh_season_batting_stats - Bad Token: {token}")
raise HTTPException(status_code=401, detail='Unauthorized') raise HTTPException(status_code=401, detail="Unauthorized")
logger.info(f"Refreshing season {season} pitching stats")
logger.info(f'Refreshing season {season} pitching stats')
try: try:
# Get all pitcher IDs for this season # Get all pitcher IDs for this season
pitcher_query = ( pitcher_query = (
StratPlay StratPlay.select(StratPlay.pitcher_id)
.select(StratPlay.pitcher_id)
.join(StratGame, on=(StratPlay.game_id == StratGame.id)) .join(StratGame, on=(StratPlay.game_id == StratGame.id))
.where((StratGame.season == season) & (StratPlay.pitcher_id.is_null(False))) .where((StratGame.season == season) & (StratPlay.pitcher_id.is_null(False)))
.distinct() .distinct()
) )
pitcher_ids = [row.pitcher_id for row in pitcher_query] pitcher_ids = [row.pitcher_id for row in pitcher_query]
if not pitcher_ids: if not pitcher_ids:
logger.warning(f'No pitchers found for season {season}') logger.warning(f"No pitchers found for season {season}")
return { return {
'status': 'success', "status": "success",
'message': f'No pitchers found for season {season}', "message": f"No pitchers found for season {season}",
'players_updated': 0 "players_updated": 0,
} }
# Use the dependency function to update pitching stats # Use the dependency function to update pitching stats
update_season_pitching_stats(pitcher_ids, season, db) update_season_pitching_stats(pitcher_ids, season, db)
logger.info(f'Season {season} pitching stats refreshed successfully - {len(pitcher_ids)} players updated') logger.info(
f"Season {season} pitching stats refreshed successfully - {len(pitcher_ids)} players updated"
)
return { return {
'status': 'success', "status": "success",
'message': f'Season {season} pitching stats refreshed', "message": f"Season {season} pitching stats refreshed",
'players_updated': len(pitcher_ids) "players_updated": len(pitcher_ids),
} }
except Exception as e: except Exception as e:
logger.error(f'Error refreshing season {season} pitching stats: {e}') logger.error(f"Error refreshing season {season} pitching stats: {e}")
raise HTTPException(status_code=500, detail=f'Refresh failed: {str(e)}') raise HTTPException(status_code=500, detail=f"Refresh failed: {str(e)}")
@router.get('/admin/cache', include_in_schema=PRIVATE_IN_SCHEMA) @router.get("/admin/cache", include_in_schema=PRIVATE_IN_SCHEMA)
@handle_db_errors @handle_db_errors
async def get_admin_cache_stats( async def get_admin_cache_stats(token: str = Depends(oauth2_scheme)) -> dict:
token: str = Depends(oauth2_scheme)
) -> dict:
""" """
Get Redis cache statistics and status. Get Redis cache statistics and status.
Private endpoint - requires authentication. Private endpoint - requires authentication.
""" """
if not valid_token(token): if not valid_token(token):
logger.warning(f'get_admin_cache_stats - Bad Token: {token}') logger.warning(f"get_admin_cache_stats - Bad Token: {token}")
raise HTTPException(status_code=401, detail='Unauthorized') raise HTTPException(status_code=401, detail="Unauthorized")
logger.info("Getting cache statistics")
logger.info('Getting cache statistics')
try: try:
cache_stats = get_cache_stats() cache_stats = get_cache_stats()
logger.info(f'Cache stats retrieved: {cache_stats}') logger.info(f"Cache stats retrieved: {cache_stats}")
return { return {"status": "success", "cache_info": cache_stats}
'status': 'success',
'cache_info': cache_stats
}
except Exception as e: except Exception as e:
logger.error(f'Error getting cache stats: {e}') logger.error(f"Error getting cache stats: {e}")
raise HTTPException(status_code=500, detail=f'Failed to get cache stats: {str(e)}') raise HTTPException(
status_code=500, detail=f"Failed to get cache stats: {str(e)}"
)