major-domo-database/app/routers_v3/views.py
2025-08-27 22:49:37 -05:00

242 lines
8.6 KiB
Python

from fastapi import APIRouter, Response, HTTPException, Query, Depends
from typing import List, Literal, Optional
import logging
import pydantic
from ..db_engine import SeasonBattingStats, SeasonPitchingStats, db, Manager, Team, Current, model_to_dict, fn, query_to_csv, StratPlay, StratGame
from ..dependencies import add_cache_headers, cache_result, oauth2_scheme, valid_token, PRIVATE_IN_SCHEMA, handle_db_errors, update_season_batting_stats, update_season_pitching_stats, get_cache_stats
logger = logging.getLogger('discord_app')
router = APIRouter(
prefix='/api/v3/views',
tags=['views']
)
@router.get('/season-stats/batting')
@handle_db_errors
@add_cache_headers(max_age=10*60)
@cache_result(ttl=5*60, key_prefix='season-batting')
async def get_season_batting_stats(
season: Optional[int] = None,
team_id: Optional[int] = None,
player_id: Optional[int] = None,
sbaplayer_id: Optional[int] = None,
min_pa: Optional[int] = None, # Minimum plate appearances
sort_by: str = "woba", # Default sort field
sort_order: Literal['asc', 'desc'] = 'desc', # asc or desc
limit: Optional[int] = 200,
offset: int = 0,
csv: Optional[bool] = False
):
logger.info(f'Getting season {season} batting stats - team_id: {team_id}, player_id: {player_id}, min_pa: {min_pa}, sort_by: {sort_by}, sort_order: {sort_order}, limit: {limit}, offset: {offset}')
# Use the enhanced get_top_hitters method
query = SeasonBattingStats.get_top_hitters(
season=season,
stat=sort_by,
limit=limit if limit != 0 else None,
desc=(sort_order.lower() == 'desc'),
team_id=team_id,
player_id=player_id,
sbaplayer_id=sbaplayer_id,
min_pa=min_pa,
offset=offset
)
# Build applied filters for response
applied_filters = {}
if season is not None:
applied_filters['season'] = season
if team_id is not None:
applied_filters['team_id'] = team_id
if player_id is not None:
applied_filters['player_id'] = player_id
if min_pa is not None:
applied_filters['min_pa'] = min_pa
if csv:
return_val = query_to_csv(query)
return Response(content=return_val, media_type='text/csv')
else:
stat_list = [model_to_dict(stat) for stat in query]
return {
'count': len(stat_list),
'filters': applied_filters,
'stats': stat_list
}
@router.post('/season-stats/batting/refresh', include_in_schema=PRIVATE_IN_SCHEMA)
@handle_db_errors
async def refresh_season_batting_stats(
season: int,
token: str = Depends(oauth2_scheme)
) -> dict:
"""
Refresh batting stats for all players in a specific season.
Useful for full season updates.
"""
if not valid_token(token):
logger.warning(f'refresh_season_batting_stats - Bad Token: {token}')
raise HTTPException(status_code=401, detail='Unauthorized')
logger.info(f'Refreshing all batting stats for season {season}')
try:
# Get all player IDs who have stratplay records in this season
batter_ids = [row.batter_id for row in
StratPlay.select(StratPlay.batter_id.distinct())
.join(StratGame).where(StratGame.season == season)]
if batter_ids:
update_season_batting_stats(batter_ids, season, db)
logger.info(f'Successfully refreshed {len(batter_ids)} players for season {season}')
return {
'message': f'Season {season} batting stats refreshed',
'players_updated': len(batter_ids)
}
else:
logger.warning(f'No batting data found for season {season}')
return {
'message': f'No batting data found for season {season}',
'players_updated': 0
}
except Exception as e:
logger.error(f'Error refreshing season {season}: {e}')
raise HTTPException(status_code=500, detail=f'Refresh failed: {str(e)}')
@router.get('/season-stats/pitching')
@handle_db_errors
@add_cache_headers(max_age=10*60)
@cache_result(ttl=5*60, key_prefix='season-pitching')
async def get_season_pitching_stats(
season: Optional[int] = None,
team_id: Optional[int] = None,
player_id: Optional[int] = None,
sbaplayer_id: Optional[int] = None,
min_outs: Optional[int] = None, # Minimum outs pitched
sort_by: str = "era", # Default sort field
sort_order: Literal['asc', 'desc'] = 'asc', # asc or desc (asc default for ERA)
limit: Optional[int] = 200,
offset: int = 0,
csv: Optional[bool] = False
):
logger.info(f'Getting season {season} pitching stats - team_id: {team_id}, player_id: {player_id}, min_outs: {min_outs}, sort_by: {sort_by}, sort_order: {sort_order}, limit: {limit}, offset: {offset}')
# Use the get_top_pitchers method
query = SeasonPitchingStats.get_top_pitchers(
season=season,
stat=sort_by,
limit=limit if limit != 0 else None,
desc=(sort_order.lower() == 'desc'),
team_id=team_id,
player_id=player_id,
sbaplayer_id=sbaplayer_id,
min_outs=min_outs,
offset=offset
)
# Build applied filters for response
applied_filters = {}
if season is not None:
applied_filters['season'] = season
if team_id is not None:
applied_filters['team_id'] = team_id
if player_id is not None:
applied_filters['player_id'] = player_id
if min_outs is not None:
applied_filters['min_outs'] = min_outs
if csv:
return_val = query_to_csv(query)
return Response(content=return_val, media_type='text/csv')
else:
stat_list = [model_to_dict(stat) for stat in query]
return {
'count': len(stat_list),
'filters': applied_filters,
'stats': stat_list
}
@router.post('/season-stats/pitching/refresh', include_in_schema=PRIVATE_IN_SCHEMA)
@handle_db_errors
async def refresh_season_pitching_stats(
season: int,
token: str = Depends(oauth2_scheme)
) -> dict:
"""
Refresh pitching statistics for a specific season by aggregating from individual games.
Private endpoint - not included in public API documentation.
"""
if not valid_token(token):
logger.warning(f'refresh_season_batting_stats - Bad Token: {token}')
raise HTTPException(status_code=401, detail='Unauthorized')
logger.info(f'Refreshing season {season} pitching stats')
try:
# Get all pitcher IDs for this season
pitcher_query = (
StratPlay
.select(StratPlay.pitcher_id)
.join(StratGame, on=(StratPlay.game_id == StratGame.id))
.where((StratGame.season == season) & (StratPlay.pitcher_id.is_null(False)))
.distinct()
)
pitcher_ids = [row.pitcher_id for row in pitcher_query]
if not pitcher_ids:
logger.warning(f'No pitchers found for season {season}')
return {
'status': 'success',
'message': f'No pitchers found for season {season}',
'players_updated': 0
}
# Use the dependency function to update pitching stats
update_season_pitching_stats(pitcher_ids, season, db)
logger.info(f'Season {season} pitching stats refreshed successfully - {len(pitcher_ids)} players updated')
return {
'status': 'success',
'message': f'Season {season} pitching stats refreshed',
'players_updated': len(pitcher_ids)
}
except Exception as e:
logger.error(f'Error refreshing season {season} pitching stats: {e}')
raise HTTPException(status_code=500, detail=f'Refresh failed: {str(e)}')
@router.get('/admin/cache', include_in_schema=PRIVATE_IN_SCHEMA)
@handle_db_errors
async def get_admin_cache_stats(
token: str = Depends(oauth2_scheme)
) -> dict:
"""
Get Redis cache statistics and status.
Private endpoint - requires authentication.
"""
if not valid_token(token):
logger.warning(f'get_admin_cache_stats - Bad Token: {token}')
raise HTTPException(status_code=401, detail='Unauthorized')
logger.info('Getting cache statistics')
try:
cache_stats = get_cache_stats()
logger.info(f'Cache stats retrieved: {cache_stats}')
return {
'status': 'success',
'cache_info': cache_stats
}
except Exception as e:
logger.error(f'Error getting cache stats: {e}')
raise HTTPException(status_code=500, detail=f'Failed to get cache stats: {str(e)}')