from datetime import datetime from fastapi import APIRouter, Depends, HTTPException, Response from typing import Optional, List import logging import pydantic from pandas import DataFrame from ..db_engine import ( db, PitchingStat, model_to_dict, Card, Player, Current, DoesNotExist, ) from ..dependencies import oauth2_scheme, valid_token router = APIRouter(prefix="/api/v2/pitstats", tags=["pitstats"]) class PitStat(pydantic.BaseModel): card_id: int team_id: int vs_team_id: int roster_num: int ip: float hit: Optional[int] = 0 run: Optional[int] = 0 erun: Optional[int] = 0 so: Optional[int] = 0 bb: Optional[int] = 0 hbp: Optional[int] = 0 wp: Optional[int] = 0 balk: Optional[int] = 0 hr: Optional[int] = 0 ir: Optional[int] = 0 irs: Optional[int] = 0 gs: Optional[int] = 0 win: Optional[int] = 0 loss: Optional[int] = 0 hold: Optional[int] = 0 sv: Optional[int] = 0 bsv: Optional[int] = 0 week: int season: int created: Optional[int] = int(datetime.timestamp(datetime.now()) * 1000) game_id: int class PitchingStatModel(pydantic.BaseModel): stats: List[PitStat] @router.get("") async def get_pit_stats( card_id: int = None, player_id: int = None, team_id: int = None, vs_team_id: int = None, week: int = None, season: int = None, week_start: int = None, week_end: int = None, created: int = None, gs: bool = None, csv: bool = None, limit: Optional[int] = 100, ): all_stats = PitchingStat.select().join(Card).join(Player).order_by(PitchingStat.id) logging.debug(f"pit query:\n\n{all_stats}") if season is not None: all_stats = all_stats.where(PitchingStat.season == season) else: curr = Current.latest() all_stats = all_stats.where(PitchingStat.season == curr.season) if card_id is not None: all_stats = all_stats.where(PitchingStat.card_id == card_id) if player_id is not None: all_stats = all_stats.where(PitchingStat.card.player.player_id == player_id) if team_id is not None: all_stats = all_stats.where(PitchingStat.team_id == team_id) if vs_team_id is not None: all_stats = all_stats.where(PitchingStat.vs_team_id == vs_team_id) if week is not None: all_stats = all_stats.where(PitchingStat.week == week) if week_start is not None: all_stats = all_stats.where(PitchingStat.week >= week_start) if week_end is not None: all_stats = all_stats.where(PitchingStat.week <= week_end) if created is not None: # Convert milliseconds timestamp to datetime for PostgreSQL comparison created_dt = datetime.fromtimestamp(created / 1000) all_stats = all_stats.where(PitchingStat.created <= created_dt) if gs is not None: all_stats = all_stats.where(PitchingStat.gs == 1 if gs else 0) total_count = all_stats.count() if not csv else 0 all_stats = all_stats.limit(max(0, min(limit, 500))) # if all_stats.count() == 0: # db.close() # raise HTTPException(status_code=404, detail=f'No pitching stats found') if csv: data_list = [ [ "id", "card_id", "player_id", "cardset", "team", "vs_team", "ip", "hit", "run", "erun", "so", "bb", "hbp", "wp", "balk", "hr", "ir", "irs", "gs", "win", "loss", "hold", "sv", "bsv", "week", "season", "created", "game_id", "roster_num", ] ] for line in all_stats: data_list.append( [ line.id, line.card.id, line.card.player.player_id, line.card.player.cardset.name, line.team.abbrev, line.vs_team.abbrev, line.ip, line.hit, line.run, line.erun, line.so, line.bb, line.hbp, line.wp, line.balk, line.hr, line.ir, line.irs, line.gs, line.win, line.loss, line.hold, line.sv, line.bsv, line.week, line.season, line.created, line.game_id, line.roster_num, ] ) return_val = DataFrame(data_list).to_csv(header=False, index=False) return Response(content=return_val, media_type="text/csv") else: return_val = {"count": total_count, "stats": []} for x in all_stats: return_val["stats"].append(model_to_dict(x, recurse=False)) return return_val @router.post("") async def post_pitstat(stats: PitchingStatModel, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning("Bad Token: [REDACTED]") raise HTTPException( status_code=401, detail="You are not authorized to post stats. This event has been logged.", ) new_stats = [] for x in stats.stats: this_stat = PitchingStat( card_id=x.card_id, team_id=x.team_id, vs_team_id=x.vs_team_id, roster_num=x.roster_num, ip=x.ip, hit=x.hit, run=x.run, erun=x.erun, so=x.so, bb=x.bb, hbp=x.hbp, wp=x.wp, balk=x.balk, hr=x.hr, ir=x.ir, irs=x.irs, gs=x.gs, win=x.win, loss=x.loss, hold=x.hold, sv=x.sv, bsv=x.bsv, week=x.week, season=x.season, created=datetime.fromtimestamp(x.created / 1000) if x.created else datetime.now(), game_id=x.game_id, ) new_stats.append(this_stat) with db.atomic(): PitchingStat.bulk_create(new_stats, batch_size=15) raise HTTPException( status_code=200, detail=f"{len(new_stats)} pitching lines have been added" ) @router.delete("/{stat_id}") async def delete_pitstat(stat_id, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning("Bad Token: [REDACTED]") raise HTTPException( status_code=401, detail="You are not authorized to delete stats. This event has been logged.", ) try: this_stat = PitchingStat.get_by_id(stat_id) except DoesNotExist: raise HTTPException(status_code=404, detail=f"No stat found with id {stat_id}") count = this_stat.delete_instance() if count == 1: raise HTTPException(status_code=200, detail=f"Stat {stat_id} has been deleted") else: raise HTTPException(status_code=500, detail=f"Stat {stat_id} was not deleted")