paper-dynasty-database/app/routers_v2/pitstats.py
Cal Corum 54dccd1981 feat: capture total_count before limit across all paginated endpoints
Ensures the `count` field in JSON responses reflects total matching
records rather than the page size, consistent with the notifications
endpoint pattern from PR #150.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-24 07:37:26 -05:00

258 lines
7.3 KiB
Python

from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, Response
from typing import Optional, List
import logging
import pydantic
from pandas import DataFrame
from ..db_engine import (
db,
PitchingStat,
model_to_dict,
Card,
Player,
Current,
DoesNotExist,
)
from ..dependencies import oauth2_scheme, valid_token
router = APIRouter(prefix="/api/v2/pitstats", tags=["pitstats"])
class PitStat(pydantic.BaseModel):
card_id: int
team_id: int
vs_team_id: int
roster_num: int
ip: float
hit: Optional[int] = 0
run: Optional[int] = 0
erun: Optional[int] = 0
so: Optional[int] = 0
bb: Optional[int] = 0
hbp: Optional[int] = 0
wp: Optional[int] = 0
balk: Optional[int] = 0
hr: Optional[int] = 0
ir: Optional[int] = 0
irs: Optional[int] = 0
gs: Optional[int] = 0
win: Optional[int] = 0
loss: Optional[int] = 0
hold: Optional[int] = 0
sv: Optional[int] = 0
bsv: Optional[int] = 0
week: int
season: int
created: Optional[int] = int(datetime.timestamp(datetime.now()) * 1000)
game_id: int
class PitchingStatModel(pydantic.BaseModel):
stats: List[PitStat]
@router.get("")
async def get_pit_stats(
card_id: int = None,
player_id: int = None,
team_id: int = None,
vs_team_id: int = None,
week: int = None,
season: int = None,
week_start: int = None,
week_end: int = None,
created: int = None,
gs: bool = None,
csv: bool = None,
limit: Optional[int] = 100,
):
all_stats = PitchingStat.select().join(Card).join(Player).order_by(PitchingStat.id)
logging.debug(f"pit query:\n\n{all_stats}")
if season is not None:
all_stats = all_stats.where(PitchingStat.season == season)
else:
curr = Current.latest()
all_stats = all_stats.where(PitchingStat.season == curr.season)
if card_id is not None:
all_stats = all_stats.where(PitchingStat.card_id == card_id)
if player_id is not None:
all_stats = all_stats.where(PitchingStat.card.player.player_id == player_id)
if team_id is not None:
all_stats = all_stats.where(PitchingStat.team_id == team_id)
if vs_team_id is not None:
all_stats = all_stats.where(PitchingStat.vs_team_id == vs_team_id)
if week is not None:
all_stats = all_stats.where(PitchingStat.week == week)
if week_start is not None:
all_stats = all_stats.where(PitchingStat.week >= week_start)
if week_end is not None:
all_stats = all_stats.where(PitchingStat.week <= week_end)
if created is not None:
# Convert milliseconds timestamp to datetime for PostgreSQL comparison
created_dt = datetime.fromtimestamp(created / 1000)
all_stats = all_stats.where(PitchingStat.created <= created_dt)
if gs is not None:
all_stats = all_stats.where(PitchingStat.gs == 1 if gs else 0)
total_count = all_stats.count()
all_stats = all_stats.limit(max(0, min(limit, 500)))
# if all_stats.count() == 0:
# db.close()
# raise HTTPException(status_code=404, detail=f'No pitching stats found')
if csv:
data_list = [
[
"id",
"card_id",
"player_id",
"cardset",
"team",
"vs_team",
"ip",
"hit",
"run",
"erun",
"so",
"bb",
"hbp",
"wp",
"balk",
"hr",
"ir",
"irs",
"gs",
"win",
"loss",
"hold",
"sv",
"bsv",
"week",
"season",
"created",
"game_id",
"roster_num",
]
]
for line in all_stats:
data_list.append(
[
line.id,
line.card.id,
line.card.player.player_id,
line.card.player.cardset.name,
line.team.abbrev,
line.vs_team.abbrev,
line.ip,
line.hit,
line.run,
line.erun,
line.so,
line.bb,
line.hbp,
line.wp,
line.balk,
line.hr,
line.ir,
line.irs,
line.gs,
line.win,
line.loss,
line.hold,
line.sv,
line.bsv,
line.week,
line.season,
line.created,
line.game_id,
line.roster_num,
]
)
return_val = DataFrame(data_list).to_csv(header=False, index=False)
return Response(content=return_val, media_type="text/csv")
else:
return_val = {"count": total_count, "stats": []}
for x in all_stats:
return_val["stats"].append(model_to_dict(x, recurse=False))
return return_val
@router.post("")
async def post_pitstat(stats: PitchingStatModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(
status_code=401,
detail="You are not authorized to post stats. This event has been logged.",
)
new_stats = []
for x in stats.stats:
this_stat = PitchingStat(
card_id=x.card_id,
team_id=x.team_id,
vs_team_id=x.vs_team_id,
roster_num=x.roster_num,
ip=x.ip,
hit=x.hit,
run=x.run,
erun=x.erun,
so=x.so,
bb=x.bb,
hbp=x.hbp,
wp=x.wp,
balk=x.balk,
hr=x.hr,
ir=x.ir,
irs=x.irs,
gs=x.gs,
win=x.win,
loss=x.loss,
hold=x.hold,
sv=x.sv,
bsv=x.bsv,
week=x.week,
season=x.season,
created=datetime.fromtimestamp(x.created / 1000)
if x.created
else datetime.now(),
game_id=x.game_id,
)
new_stats.append(this_stat)
with db.atomic():
PitchingStat.bulk_create(new_stats, batch_size=15)
raise HTTPException(
status_code=200, detail=f"{len(new_stats)} pitching lines have been added"
)
@router.delete("/{stat_id}")
async def delete_pitstat(stat_id, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(
status_code=401,
detail="You are not authorized to delete stats. This event has been logged.",
)
try:
this_stat = PitchingStat.get_by_id(stat_id)
except DoesNotExist:
raise HTTPException(status_code=404, detail=f"No stat found with id {stat_id}")
count = this_stat.delete_instance()
if count == 1:
raise HTTPException(status_code=200, detail=f"Stat {stat_id} has been deleted")
else:
raise HTTPException(status_code=500, detail=f"Stat {stat_id} was not deleted")