from datetime import datetime from fastapi import APIRouter, Depends, HTTPException, Response from typing import Optional, List import logging import pydantic from pandas import DataFrame from ..db_engine import db, PitchingStat, model_to_dict, Card, Player, Current from ..dependencies import oauth2_scheme, valid_token, LOG_DATA logging.basicConfig( filename=LOG_DATA['filename'], format=LOG_DATA['format'], level=LOG_DATA['log_level'] ) router = APIRouter( prefix='/api/v2/pitstats', tags=['pitstats'] ) class PitStat(pydantic.BaseModel): card_id: int team_id: int vs_team_id: int roster_num: int ip: float hit: Optional[int] = 0 run: Optional[int] = 0 erun: Optional[int] = 0 so: Optional[int] = 0 bb: Optional[int] = 0 hbp: Optional[int] = 0 wp: Optional[int] = 0 balk: Optional[int] = 0 hr: Optional[int] = 0 ir: Optional[int] = 0 irs: Optional[int] = 0 gs: Optional[int] = 0 win: Optional[int] = 0 loss: Optional[int] = 0 hold: Optional[int] = 0 sv: Optional[int] = 0 bsv: Optional[int] = 0 week: int season: int created: Optional[int] = int(datetime.timestamp(datetime.now())*100000) game_id: int class PitchingStatModel(pydantic.BaseModel): stats: List[PitStat] @router.get('') async def get_pit_stats( card_id: int = None, player_id: int = None, team_id: int = None, vs_team_id: int = None, week: int = None, season: int = None, week_start: int = None, week_end: int = None, created: int = None, gs: bool = None, csv: bool = None): all_stats = PitchingStat.select().join(Card).join(Player) logging.debug(f'pit query:\n\n{all_stats}') if season is not None: all_stats = all_stats.where(PitchingStat.season == season) else: curr = Current.latest() all_stats = all_stats.where(PitchingStat.season == curr.season) if card_id is not None: all_stats = all_stats.where(PitchingStat.card_id == card_id) if player_id is not None: all_stats = all_stats.where(PitchingStat.card.player.player_id == player_id) if team_id is not None: all_stats = all_stats.where(PitchingStat.team_id == team_id) if vs_team_id is not None: all_stats = all_stats.where(PitchingStat.vs_team_id == vs_team_id) if week is not None: all_stats = all_stats.where(PitchingStat.week == week) if week_start is not None: all_stats = all_stats.where(PitchingStat.week >= week_start) if week_end is not None: all_stats = all_stats.where(PitchingStat.week <= week_end) if created is not None: # Convert milliseconds timestamp to datetime for PostgreSQL comparison created_dt = datetime.fromtimestamp(created / 1000) all_stats = all_stats.where(PitchingStat.created <= created_dt) if gs is not None: all_stats = all_stats.where(PitchingStat.gs == 1 if gs else 0) # if all_stats.count() == 0: # db.close() # raise HTTPException(status_code=404, detail=f'No pitching stats found') if csv: data_list = [['id', 'card_id', 'player_id', 'cardset', 'team', 'vs_team', 'ip', 'hit', 'run', 'erun', 'so', 'bb', 'hbp', 'wp', 'balk', 'hr', 'ir', 'irs', 'gs', 'win', 'loss', 'hold', 'sv', 'bsv', 'week', 'season', 'created', 'game_id', 'roster_num']] for line in all_stats: data_list.append( [ line.id, line.card.id, line.card.player.player_id, line.card.player.cardset.name, line.team.abbrev, line.vs_team.abbrev, line.ip, line.hit, line.run, line.erun, line.so, line.bb, line.hbp, line.wp, line.balk, line.hr, line.ir, line.irs, line.gs, line.win, line.loss, line.hold, line.sv, line.bsv, line.week, line.season, line.created, line.game_id, line.roster_num ] ) return_val = DataFrame(data_list).to_csv(header=False, index=False) db.close() return Response(content=return_val, media_type='text/csv') else: return_val = {'count': all_stats.count(), 'stats': []} for x in all_stats: return_val['stats'].append(model_to_dict(x, recurse=False)) db.close() return return_val @router.post('') async def post_pitstat(stats: PitchingStatModel, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to post stats. This event has been logged.' ) new_stats = [] for x in stats.stats: this_stat = PitchingStat( card_id=x.card_id, team_id=x.team_id, vs_team_id=x.vs_team_id, roster_num=x.roster_num, ip=x.ip, hit=x.hit, run=x.run, erun=x.erun, so=x.so, bb=x.bb, hbp=x.hbp, wp=x.wp, balk=x.balk, hr=x.hr, ir=x.ir, irs=x.irs, gs=x.gs, win=x.win, loss=x.loss, hold=x.hold, sv=x.sv, bsv=x.bsv, week=x.week, season=x.season, created=x.created, game_id=x.game_id ) new_stats.append(this_stat) with db.atomic(): PitchingStat.bulk_create(new_stats, batch_size=15) db.close() raise HTTPException(status_code=200, detail=f'{len(new_stats)} pitching lines have been added') @router.delete('/{stat_id}') async def delete_pitstat(stat_id, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to delete stats. This event has been logged.' ) try: this_stat = PitchingStat.get_by_id(stat_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No stat found with id {stat_id}') count = this_stat.delete_instance() db.close() if count == 1: raise HTTPException(status_code=200, detail=f'Stat {stat_id} has been deleted') else: raise HTTPException(status_code=500, detail=f'Stat {stat_id} was not deleted')