major-domo-database/app/routers_v3/fieldingstats.py
Cal Corum a68e4216d6
All checks were successful
Build Docker Image / build (pull_request) Successful in 1m57s
fix: preserve total_count in get_totalstats instead of overwriting with page length (#101)
Closes #101

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-01 18:01:38 -05:00

286 lines
9.5 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, Query
from typing import List, Optional, Literal
import logging
import pydantic
from ..db_engine import (
db,
BattingStat,
Team,
Player,
Current,
model_to_dict,
chunked,
fn,
per_season_weeks,
)
from ..dependencies import (
oauth2_scheme,
valid_token,
handle_db_errors,
MAX_LIMIT,
DEFAULT_LIMIT,
)
logger = logging.getLogger("discord_app")
router = APIRouter(prefix="/api/v3/fieldingstats", tags=["fieldingstats"])
@router.get("")
@handle_db_errors
async def get_fieldingstats(
season: int,
s_type: Optional[str] = "regular",
team_abbrev: list = Query(default=None),
player_name: list = Query(default=None),
player_id: list = Query(default=None),
week_start: Optional[int] = None,
week_end: Optional[int] = None,
game_num: list = Query(default=None),
position: list = Query(default=None),
limit: int = Query(default=DEFAULT_LIMIT, ge=1, le=MAX_LIMIT),
sort: Optional[str] = None,
short_output: Optional[bool] = True,
):
if "post" in s_type.lower():
all_stats = BattingStat.post_season(season)
if all_stats.count() == 0:
db.close()
return {"count": 0, "stats": []}
elif s_type.lower() in ["combined", "total", "all"]:
all_stats = BattingStat.combined_season(season)
if all_stats.count() == 0:
db.close()
return {"count": 0, "stats": []}
else:
all_stats = BattingStat.regular_season(season)
if all_stats.count() == 0:
db.close()
return {"count": 0, "stats": []}
all_stats = all_stats.where(
(BattingStat.xch > 0) | (BattingStat.pb > 0) | (BattingStat.sbc > 0)
)
if position is not None:
all_stats = all_stats.where(BattingStat.pos << [x.upper() for x in position])
if team_abbrev is not None:
t_query = Team.select().where(Team.abbrev << [x.upper() for x in team_abbrev])
all_stats = all_stats.where(BattingStat.team << t_query)
if player_name is not None or player_id is not None:
if player_id:
all_stats = all_stats.where(BattingStat.player_id << player_id)
else:
p_query = Player.select_season(season).where(
fn.Lower(Player.name) << [x.lower() for x in player_name]
)
all_stats = all_stats.where(BattingStat.player << p_query)
if game_num:
all_stats = all_stats.where(BattingStat.game == game_num)
start = 1
end = Current.get(Current.season == season).week
if week_start is not None:
start = week_start
if week_end is not None:
end = min(week_end, end)
if start > end:
db.close()
raise HTTPException(
status_code=404,
detail=f"Start week {start} is after end week {end} - cannot pull stats",
)
all_stats = all_stats.where((BattingStat.week >= start) & (BattingStat.week <= end))
total_count = all_stats.count()
all_stats = all_stats.limit(limit)
if sort:
if sort == "newest":
all_stats = all_stats.order_by(-BattingStat.week, -BattingStat.game)
return_stats = {
"count": total_count,
"stats": [
{
"player": x.player_id
if short_output
else model_to_dict(x.player, recurse=False),
"team": x.team_id
if short_output
else model_to_dict(x.team, recurse=False),
"pos": x.pos,
"xch": x.xch,
"xhit": x.xhit,
"error": x.error,
"pb": x.pb,
"sbc": x.sbc,
"csc": x.csc,
"week": x.week,
"game": x.game,
"season": x.season,
}
for x in all_stats
],
}
db.close()
return return_stats
@router.get("/totals")
@handle_db_errors
async def get_totalstats(
season: int,
s_type: Literal["regular", "post", "total", None] = None,
team_abbrev: list = Query(default=None),
team_id: list = Query(default=None),
player_name: list = Query(default=None),
week_start: Optional[int] = None,
week_end: Optional[int] = None,
game_num: list = Query(default=None),
position: list = Query(default=None),
sort: Optional[str] = None,
player_id: list = Query(default=None),
group_by: Literal["team", "player", "playerteam"] = "player",
short_output: Optional[bool] = False,
min_ch: Optional[int] = 1,
week: list = Query(default=None),
limit: int = Query(default=DEFAULT_LIMIT, ge=1, le=MAX_LIMIT),
offset: int = Query(default=0, ge=0),
):
# Build SELECT fields conditionally based on group_by to match GROUP BY exactly
select_fields = []
if group_by == "player":
select_fields = [BattingStat.player, BattingStat.pos]
elif group_by == "team":
select_fields = [BattingStat.team, BattingStat.pos]
elif group_by == "playerteam":
select_fields = [BattingStat.player, BattingStat.team, BattingStat.pos]
else:
# Default case
select_fields = [BattingStat.player, BattingStat.pos]
all_stats = (
BattingStat.select(
*select_fields,
fn.SUM(BattingStat.xch).alias("sum_xch"),
fn.SUM(BattingStat.xhit).alias("sum_xhit"),
fn.SUM(BattingStat.error).alias("sum_error"),
fn.SUM(BattingStat.pb).alias("sum_pb"),
fn.SUM(BattingStat.sbc).alias("sum_sbc"),
fn.SUM(BattingStat.csc).alias("sum_csc"),
)
.where(BattingStat.season == season)
.having(fn.SUM(BattingStat.xch) >= min_ch)
)
if True in [s_type is not None, week_start is not None, week_end is not None]:
weeks = {}
if s_type is not None:
weeks = per_season_weeks(season, s_type)
elif week_start is not None or week_end is not None:
if week_start is None or week_end is None:
raise HTTPException(
status_code=400,
detail="Both week_start and week_end must be included if either is used.",
)
weeks["start"] = week_start
if week_end < weeks["start"]:
raise HTTPException(
status_code=400,
detail="week_end must be greater than or equal to week_start",
)
else:
weeks["end"] = week_end
all_stats = all_stats.where(
(BattingStat.week >= weeks["start"]) & (BattingStat.week <= weeks["end"])
)
elif week is not None:
all_stats = all_stats.where(BattingStat.week << week)
if game_num is not None:
all_stats = all_stats.where(BattingStat.game << game_num)
if position is not None:
p_list = [x.upper() for x in position]
all_players = Player.select().where(
(Player.pos_1 << p_list)
| (Player.pos_2 << p_list)
| (Player.pos_3 << p_list)
| (Player.pos_4 << p_list)
| (Player.pos_5 << p_list)
| (Player.pos_6 << p_list)
| (Player.pos_7 << p_list)
| (Player.pos_8 << p_list)
)
all_stats = all_stats.where(BattingStat.player << all_players)
if sort is not None:
if sort == "player":
all_stats = all_stats.order_by(BattingStat.player)
elif sort == "team":
all_stats = all_stats.order_by(BattingStat.team)
if group_by is not None:
# Use the same fields for GROUP BY as we used for SELECT
all_stats = all_stats.group_by(*select_fields)
if team_id is not None:
all_teams = Team.select().where(Team.id << team_id)
all_stats = all_stats.where(BattingStat.team << all_teams)
elif team_abbrev is not None:
all_teams = Team.select().where(
fn.Lower(Team.abbrev) << [x.lower() for x in team_abbrev]
)
all_stats = all_stats.where(BattingStat.team << all_teams)
if player_name is not None:
all_players = Player.select().where(
fn.Lower(Player.name) << [x.lower() for x in player_name]
)
all_stats = all_stats.where(BattingStat.player << all_players)
elif player_id is not None:
all_players = Player.select().where(Player.id << player_id)
all_stats = all_stats.where(BattingStat.player << all_players)
total_count = all_stats.count()
all_stats = all_stats.offset(offset).limit(limit)
return_stats = {"count": total_count, "stats": []}
for x in all_stats:
if x.sum_xch + x.sum_sbc <= 0:
continue
# Handle player field based on grouping with safe access
this_player = "TOT"
if "player" in group_by and hasattr(x, "player"):
this_player = (
x.player_id if short_output else model_to_dict(x.player, recurse=False)
)
# Handle team field based on grouping with safe access
this_team = "TOT"
if "team" in group_by and hasattr(x, "team"):
this_team = (
x.team_id if short_output else model_to_dict(x.team, recurse=False)
)
return_stats["stats"].append(
{
"player": this_player,
"team": this_team,
"pos": x.pos,
"xch": x.sum_xch,
"xhit": x.sum_xhit,
"error": x.sum_error,
"pb": x.sum_pb,
"sbc": x.sum_sbc,
"csc": x.sum_csc,
}
)
db.close()
return return_stats