Added results, rewards, batstats, and pitstats

This commit is contained in:
Cal Corum 2023-09-14 15:48:41 -05:00
parent 7f008f9d98
commit 177ca2c585
6 changed files with 1425 additions and 1 deletions

View File

@ -4,7 +4,8 @@ import os
from fastapi import FastAPI
from.routers_v2 import current, teams, rarity, cardsets, players, packtypes, packs, cards, events
from.routers_v2 import current, teams, rarity, cardsets, players, packtypes, packs, cards, events, results, rewards, \
batstats, pitstats
app = FastAPI(
responses={404: {'description': 'Not found'}}
@ -19,3 +20,7 @@ app.include_router(packtypes.router)
app.include_router(packs.router)
app.include_router(cards.router)
app.include_router(events.router)
app.include_router(results.router)
app.include_router(rewards.router)
app.include_router(batstats.router)
app.include_router(pitstats.router)

155
app/routers_v2/awards.py Normal file
View File

@ -0,0 +1,155 @@
from fastapi import APIRouter, Depends, HTTPException, Response
from typing import Optional
import logging
import pydantic
from pandas import DataFrame
from ..db_engine import db, Award, model_to_dict, fn
from ..dependencies import oauth2_scheme, valid_token, LOG_DATA
logging.basicConfig(
filename=LOG_DATA['filename'],
format=LOG_DATA['format'],
level=LOG_DATA['log_level']
)
router = APIRouter(
prefix='/api/v2/awards',
tags=['awards']
)
class AwardModel(pydantic.BaseModel):
name: str
season: int
timing: str = 'In-Season'
card_id: Optional[int] = None
team_id: Optional[int] = None
image: Optional[str] = None
@app.get('/api/v1/awards')
async def get_awards(
name: Optional[str] = None, season: Optional[int] = None, timing: Optional[str] = None,
card_id: Optional[int] = None, team_id: Optional[int] = None, image: Optional[str] = None,
csv: Optional[bool] = None):
all_awards = Award.select()
if all_awards.count() == 0:
db.close()
raise HTTPException(status_code=404, detail=f'There are no awards to filter')
if name is not None:
all_awards = all_awards.where(Award.name == name)
if season is not None:
all_awards = all_awards.where(Award.season == season)
if timing is not None:
all_awards = all_awards.where(Award.timing == timing)
if card_id is not None:
all_awards = all_awards.where(Award.card_id == card_id)
if team_id is not None:
all_awards = all_awards.where(Award.team_id == team_id)
if image is not None:
all_awards = all_awards.where(Award.image == image)
if csv:
data_list = [['id', 'name', 'season', 'timing', 'card', 'team', 'image']]
for line in all_awards:
data_list.append([
line.id, line.name, line.season, line.timing, line.card, line.team, line.image
])
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = {'count': all_awards.count(), 'awards': []}
for x in all_awards:
return_val['awards'].append(model_to_dict(x))
db.close()
return return_val
@app.get('/api/v1/awards/{award_id}')
async def get_one_award(award_id, csv: Optional[bool] = None):
try:
this_award = Award.get_by_id(award_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No award found with id {award_id}')
if csv:
data_list = [
['id', 'name', 'season', 'timing', 'card', 'team', 'image'],
[this_award.id, this_award.name, this_award.season, this_award.timing, this_award.card,
this_award.team, this_award.image]
]
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = model_to_dict(this_award)
db.close()
return return_val
@app.post('/api/v1/awards')
async def post_awards(award: AwardModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to post awards. This event has been logged.'
)
this_award = Award(
name=award.name,
season=award.season,
timing=award.season,
card_id=award.card_id,
team_id=award.team_id,
image=award.image
)
saved = this_award.save()
if saved == 1:
return_val = model_to_dict(this_award)
db.close()
return return_val
else:
db.close()
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that roster'
)
@app.delete('/api/v1/awards/{award_id}')
async def delete_award(award_id, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to delete awards. This event has been logged.'
)
try:
this_award = Award.get_by_id(award_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No award found with id {award_id}')
count = this_award.delete_instance()
db.close()
if count == 1:
raise HTTPException(status_code=200, detail=f'Award {award_id} has been deleted')
else:
raise HTTPException(status_code=500, detail=f'Award {award_id} was not deleted')

455
app/routers_v2/batstats.py Normal file
View File

@ -0,0 +1,455 @@
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, Response
from typing import Optional, List
import logging
import pydantic
from pandas import DataFrame
from ..db_engine import db, BattingStat, model_to_dict, fn, Card
from ..dependencies import oauth2_scheme, valid_token, LOG_DATA
logging.basicConfig(
filename=LOG_DATA['filename'],
format=LOG_DATA['format'],
level=LOG_DATA['log_level']
)
router = APIRouter(
prefix='/api/v2/batstats',
tags=['batstats']
)
class BatStat(pydantic.BaseModel):
card_id: int
team_id: int
roster_num: int
vs_team_id: int
pos: str
pa: Optional[int] = 0
ab: Optional[int] = 0
run: Optional[int] = 0
hit: Optional[int] = 0
rbi: Optional[int] = 0
double: Optional[int] = 0
triple: Optional[int] = 0
hr: Optional[int] = 0
bb: Optional[int] = 0
so: Optional[int] = 0
hbp: Optional[int] = 0
sac: Optional[int] = 0
ibb: Optional[int] = 0
gidp: Optional[int] = 0
sb: Optional[int] = 0
cs: Optional[int] = 0
bphr: Optional[int] = 0
bpfo: Optional[int] = 0
bp1b: Optional[int] = 0
bplo: Optional[int] = 0
xch: Optional[int] = 0
xhit: Optional[int] = 0
error: Optional[int] = 0
pb: Optional[int] = 0
sbc: Optional[int] = 0
csc: Optional[int] = 0
week: int
season: int
created: Optional[int] = int(datetime.timestamp(datetime.now())*100000)
game_id: int
class BattingStatModel(pydantic.BaseModel):
stats: List[BatStat]
@router.get('')
async def get_batstats(
card_id: int = None, player_id: int = None, team_id: int = None, vs_team_id: int = None, week: int = None,
season: int = None, week_start: int = None, week_end: int = None, created: int = None, csv: bool = None):
all_stats = BattingStat.select().join(Card).join(Player)
if card_id is not None:
all_stats = all_stats.where(BattingStat.card_id == card_id)
if player_id is not None:
all_stats = all_stats.where(BattingStat.card.player.player_id == player_id)
if team_id is not None:
all_stats = all_stats.where(BattingStat.team_id == team_id)
if vs_team_id is not None:
all_stats = all_stats.where(BattingStat.vs_team_id == vs_team_id)
if week is not None:
all_stats = all_stats.where(BattingStat.week == week)
if season is not None:
all_stats = all_stats.where(BattingStat.season == season)
if week_start is not None:
all_stats = all_stats.where(BattingStat.week >= week_start)
if week_end is not None:
all_stats = all_stats.where(BattingStat.week <= week_end)
if created is not None:
all_stats = all_stats.where(BattingStat.created == created)
# if all_stats.count() == 0:
# db.close()
# raise HTTPException(status_code=404, detail=f'No batting stats found')
if csv:
data_list = [['id', 'card_id', 'player_id', 'cardset', 'team', 'vs_team', 'pos', 'pa', 'ab', 'run', 'hit', 'rbi', 'double',
'triple', 'hr', 'bb', 'so', 'hbp', 'sac', 'ibb', 'gidp', 'sb', 'cs', 'bphr', 'bpfo', 'bp1b',
'bplo', 'xch', 'xhit', 'error', 'pb', 'sbc', 'csc', 'week', 'season', 'created', 'game_id', 'roster_num']]
for line in all_stats:
data_list.append(
[
line.id, line.card.id, line.card.player.player_id, line.card.player.cardset.name, line.team.abbrev, line.vs_team.abbrev,
line.pos, line.pa, line.ab, line.run, line.hit, line.rbi, line.double, line.triple, line.hr,
line.bb, line.so, line.hbp, line.sac, line.ibb, line.gidp, line.sb, line.cs, line.bphr, line.bpfo,
line.bp1b, line.bplo, line.xch, line.xhit, line.error, line.pb, line.sbc, line.csc, line.week,
line.season, line.created, line.game_id, line.roster_num
]
)
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = {'count': all_stats.count(), 'stats': []}
for x in all_stats:
return_val['stats'].append(model_to_dict(x, recurse=False))
db.close()
return return_val
@router.get('/player/{player_id}')
async def get_player_stats(
player_id: int, team_id: int = None, vs_team_id: int = None, week_start: int = None, week_end: int = None,
csv: bool = None):
all_stats = (BattingStat
.select(fn.COUNT(BattingStat.created).alias('game_count'))
.join(Card)
.group_by(BattingStat.card)
.where(BattingStat.card.player == player_id)).scalar()
if team_id is not None:
all_stats = all_stats.where(BattingStat.team_id == team_id)
if vs_team_id is not None:
all_stats = all_stats.where(BattingStat.vs_team_id == vs_team_id)
if week_start is not None:
all_stats = all_stats.where(BattingStat.week >= week_start)
if week_end is not None:
all_stats = all_stats.where(BattingStat.week <= week_end)
if csv:
data_list = [
[
'pa', 'ab', 'run', 'hit', 'rbi', 'double', 'triple', 'hr', 'bb', 'so', 'hbp', 'sac', 'ibb', 'gidp',
'sb', 'cs', 'bphr', 'bpfo', 'bp1b', 'bplo', 'xch', 'xhit', 'error', 'pb', 'sbc', 'csc',
],[
all_stats.pa_sum, all_stats.ab_sum, all_stats.run, all_stats.hit_sum, all_stats.rbi_sum,
all_stats.double_sum, all_stats.triple_sum, all_stats.hr_sum, all_stats.bb_sum, all_stats.so_sum,
all_stats.hbp_sum, all_stats.sac, all_stats.ibb_sum, all_stats.gidp_sum, all_stats.sb_sum,
all_stats.cs_sum, all_stats.bphr_sum, all_stats.bpfo_sum, all_stats.bp1b_sum, all_stats.bplo_sum,
all_stats.xch, all_stats.xhit_sum, all_stats.error_sum, all_stats.pb_sum, all_stats.sbc_sum,
all_stats.csc_sum
]
]
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
logging.debug(f'stat pull query: {all_stats}\n')
# logging.debug(f'result 0: {all_stats[0]}\n')
for x in all_stats:
logging.debug(f'this_line: {model_to_dict(x)}')
return_val = model_to_dict(all_stats[0])
db.close()
return return_val
@router.post('')
async def post_batstats(stats: BattingStatModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to post stats. This event has been logged.'
)
new_stats = []
for x in stats.stats:
this_stat = BattingStat(
card_id=x.card_id,
team_id=x.team_id,
roster_num=x.roster_num,
vs_team_id=x.vs_team_id,
pos=x.pos,
pa=x.pa,
ab=x.ab,
run=x.run,
hit=x.hit,
rbi=x.rbi,
double=x.double,
triple=x.triple,
hr=x.hr,
bb=x.bb,
so=x.so,
hbp=x.hbp,
sac=x.sac,
ibb=x.ibb,
gidp=x.gidp,
sb=x.sb,
cs=x.cs,
bphr=x.bphr,
bpfo=x.bpfo,
bp1b=x.bp1b,
bplo=x.bplo,
xch=x.xch,
xhit=x.xhit,
error=x.error,
pb=x.pb,
sbc=x.sbc,
csc=x.csc,
week=x.week,
season=x.season,
created=x.created,
game_id=x.game_id
)
new_stats.append(this_stat)
with db.atomic():
BattingStat.bulk_create(new_stats, batch_size=15)
db.close()
raise HTTPException(status_code=200, detail=f'{len(new_stats)} batting lines have been added')
@router.delete('/{stat_id}')
async def delete_batstat(stat_id, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to delete stats. This event has been logged.'
)
try:
this_stat = BattingStat.get_by_id(stat_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No stat found with id {stat_id}')
count = this_stat.delete_instance()
db.close()
if count == 1:
raise HTTPException(status_code=200, detail=f'Stat {stat_id} has been deleted')
else:
raise HTTPException(status_code=500, detail=f'Stat {stat_id} was not deleted')
# @app.get('/api/v1/plays/batting')
# async def get_batting_totals(
# player_id: list = Query(default=None), team_id: list = Query(default=None), min_pa: Optional[int] = 1,
# season: list = Query(default=None), position: list = Query(default=None),
# group_by: Literal['team', 'player', 'playerteam', 'playergame', 'teamgame', 'league'] = 'player',
# sort: Optional[str] = None, limit: Optional[int] = None, short_output: Optional[bool] = False):
# all_stats = BattingStat.select(
# BattingStat.card, BattingStat.game_id, BattingStat.team, BattingStat.vs_team, BattingStat.pos,
# BattingStat.card.player.alias('player'),
# fn.SUM(BattingStat.pa).alias('sum_pa'), fn.SUM(BattingStat.ab).alias('sum_ab'),
# fn.SUM(BattingStat.run).alias('sum_run'), fn.SUM(BattingStat.so).alias('sum_so'),
# fn.SUM(BattingStat.hit).alias('sum_hit'), fn.SUM(BattingStat.rbi).alias('sum_rbi'),
# fn.SUM(BattingStat.double).alias('sum_double'), fn.SUM(BattingStat.triple).alias('sum_triple'),
# fn.SUM(BattingStat.hr).alias('sum_hr'), fn.SUM(BattingStat.bb).alias('sum_bb'),
# fn.SUM(BattingStat.hbp).alias('sum_hbp'), fn.SUM(BattingStat.sac).alias('sum_sac'),
# fn.SUM(BattingStat.ibb).alias('sum_ibb'), fn.SUM(BattingStat.gidp).alias('sum_gidp'),
# fn.SUM(BattingStat.sb).alias('sum_sb'), fn.SUM(BattingStat.cs).alias('sum_cs'),
# fn.SUM(BattingStat.bphr).alias('sum_bphr'), fn.SUM(BattingStat.bpfo).alias('sum_bpfo'),
# fn.SUM(BattingStat.bp1b).alias('sum_bp1b'), fn.SUM(BattingStat.bplo).alias('sum_bplo')
# ).having(
# fn.SUM(BattingStat.pa) >= min_pa
# ).join(Card)
#
# if player_id is not None:
# # all_players = Player.select().where(Player.id << player_id)
# all_cards = Card.select().where(Card.player_id << player_id)
# all_stats = all_stats.where(BattingStat.card << all_cards)
# if team_id is not None:
# all_teams = Team.select().where(Team.id << team_id)
# all_stats = all_stats.where(BattingStat.team << all_teams)
# if season is not None:
# all_stats = all_stats.where(BattingStat.season << season)
# if position is not None:
# all_stats = all_stats.where(BattingStat.pos << position)
#
# if group_by == 'player':
# all_stats = all_stats.group_by(SQL('player'))
# elif group_by == 'playerteam':
# all_stats = all_stats.group_by(SQL('player'), BattingStat.team)
# elif group_by == 'playergame':
# all_stats = all_stats.group_by(SQL('player'), BattingStat.game_id)
# elif group_by == 'team':
# all_stats = all_stats.group_by(BattingStat.team)
# elif group_by == 'teamgame':
# all_stats = all_stats.group_by(BattingStat.team, BattingStat.game_id)
# elif group_by == 'league':
# all_stats = all_stats.group_by(BattingStat.season)
#
# if sort == 'pa-desc':
# all_stats = all_stats.order_by(SQL('sum_pa').desc())
# elif sort == 'newest':
# all_stats = all_stats.order_by(-BattingStat.game_id)
# elif sort == 'oldest':
# all_stats = all_stats.order_by(BattingStat.game_id)
#
# if limit is not None:
# if limit < 1:
# limit = 1
# all_stats = all_stats.limit(limit)
#
# logging.info(f'bat_plays query: {all_stats}')
#
# return_stats = {
# 'count': all_stats.count(),
# 'stats': [{
# 'player': x.card.player_id if short_output else model_to_dict(x.card.player, recurse=False),
# 'team': x.team_id if short_output else model_to_dict(x.team, recurse=False),
# 'pa': x.sum_pa,
# 'ab': x.sum_ab,
# 'run': x.sum_run,
# 'hit': x.sum_hit,
# 'rbi': x.sum_rbi,
# 'double': x.sum_double,
# 'triple': x.sum_triple,
# 'hr': x.sum_hr,
# 'bb': x.sum_bb,
# 'so': x.sum_so,
# 'hbp': x.sum_hbp,
# 'sac': x.sum_sac,
# 'ibb': x.sum_ibb,
# 'gidp': x.sum_gidp,
# 'sb': x.sum_sb,
# 'cs': x.sum_cs,
# 'bphr': x.sum_bphr,
# 'bpfo': x.sum_bpfo,
# 'bp1b': x.sum_bp1b,
# 'bplo': x.sum_bplo,
# 'avg': x.sum_hit / max(x.sum_ab, 1),
# 'obp': (x.sum_hit + x.sum_bb + x.sum_hbp + x.sum_ibb) / max(x.sum_pa, 1),
# 'slg': (x.sum_hr * 4 + x.sum_triple * 3 + x.sum_double * 2 +
# (x.sum_hit - x.sum_double - x.sum_triple - x.sum_hr)) / max(x.sum_ab, 1),
# 'ops': ((x.sum_hit + x.sum_bb + x.sum_hbp + x.sum_ibb) / max(x.sum_pa, 1)) +
# ((x.sum_hr * 4 + x.sum_triple * 3 + x.sum_double * 2 +
# (x.sum_hit - x.sum_double - x.sum_triple - x.sum_hr)) / max(x.sum_ab, 1)),
# 'woba': (.69 * x.sum_bb + .72 * x.sum_hbp + .89 * (x.sum_hit - x.sum_double - x.sum_triple - x.sum_hr) +
# 1.27 * x.sum_double + 1.62 * x.sum_triple + 2.1 * x.sum_hr) / max(x.sum_pa - x.sum_ibb, 1),
# 'game': x.game_id
# } for x in all_stats]
# }
#
# db.close()
# return return_stats
#
#
# @app.get('/api/v1/plays/pitching')
# async def get_pitching_totals(
# player_id: list = Query(default=None), team_id: list = Query(default=None), season: list = Query(default=None),
# group_by: Literal['team', 'player', 'playerteam', 'playergame', 'teamgame', 'league'] = 'player',
# min_pa: Optional[int] = 1,
# sort: Optional[str] = None, limit: Optional[int] = None, short_output: Optional[bool] = False):
# all_stats = PitchingStat.select(
# PitchingStat.card, PitchingStat.team, PitchingStat.game_id, PitchingStat.vs_team,
# PitchingStat.card.player.alias('player'), fn.SUM(PitchingStat.ip).alias('sum_ip'),
# fn.SUM(PitchingStat.hit).alias('sum_hit'), fn.SUM(PitchingStat.run).alias('sum_run'),
# fn.SUM(PitchingStat.erun).alias('sum_erun'), fn.SUM(PitchingStat.so).alias('sum_so'),
# fn.SUM(PitchingStat.bb).alias('sum_bb'), fn.SUM(PitchingStat.hbp).alias('sum_hbp'),
# fn.SUM(PitchingStat.wp).alias('sum_wp'), fn.SUM(PitchingStat.balk).alias('sum_balk'),
# fn.SUM(PitchingStat.hr).alias('sum_hr'), fn.SUM(PitchingStat.ir).alias('sum_ir'),
# fn.SUM(PitchingStat.irs).alias('sum_irs'), fn.SUM(PitchingStat.gs).alias('sum_gs'),
# fn.SUM(PitchingStat.win).alias('sum_win'), fn.SUM(PitchingStat.loss).alias('sum_loss'),
# fn.SUM(PitchingStat.hold).alias('sum_hold'), fn.SUM(PitchingStat.sv).alias('sum_sv'),
# fn.SUM(PitchingStat.bsv).alias('sum_bsv'), fn.COUNT(PitchingStat.game_id).alias('sum_games')
# ).having(
# fn.SUM(PitchingStat.ip) >= max(min_pa / 3, 1)
# ).join(Card)
#
# if player_id is not None:
# all_cards = Card.select().where(Card.player_id << player_id)
# all_stats = all_stats.where(PitchingStat.card << all_cards)
# if team_id is not None:
# all_teams = Team.select().where(Team.id << team_id)
# all_stats = all_stats.where(PitchingStat.team << all_teams)
# if season is not None:
# all_stats = all_stats.where(PitchingStat.season << season)
#
# if group_by == 'player':
# all_stats = all_stats.group_by(SQL('player'))
# elif group_by == 'playerteam':
# all_stats = all_stats.group_by(SQL('player'), PitchingStat.team)
# elif group_by == 'playergame':
# all_stats = all_stats.group_by(SQL('player'), PitchingStat.game_id)
# elif group_by == 'team':
# all_stats = all_stats.group_by(PitchingStat.team)
# elif group_by == 'teamgame':
# all_stats = all_stats.group_by(PitchingStat.team, PitchingStat.game_id)
# elif group_by == 'league':
# all_stats = all_stats.group_by(PitchingStat.season)
#
# if sort == 'pa-desc':
# all_stats = all_stats.order_by(SQL('sum_pa').desc())
# elif sort == 'newest':
# all_stats = all_stats.order_by(-PitchingStat.game_id)
# elif sort == 'oldest':
# all_stats = all_stats.order_by(PitchingStat.game_id)
#
# if limit is not None:
# if limit < 1:
# limit = 1
# all_stats = all_stats.limit(limit)
#
# logging.info(f'bat_plays query: {all_stats}')
#
# return_stats = {
# 'count': all_stats.count(),
# 'stats': [{
# 'player': x.card.player_id if short_output else model_to_dict(x.card.player, recurse=False),
# 'team': x.team_id if short_output else model_to_dict(x.team, recurse=False),
# 'tbf': None,
# 'outs': round(x.sum_ip * 3),
# 'games': x.sum_games,
# 'gs': x.sum_gs,
# 'win': x.sum_win,
# 'loss': x.sum_loss,
# 'hold': x.sum_hold,
# 'save': x.sum_sv,
# 'bsave': x.sum_bsv,
# 'ir': x.sum_ir,
# 'ir_sc': x.sum_irs,
# 'runs': x.sum_run,
# 'e_runs': x.sum_erun,
# 'hits': x.sum_hit,
# 'hr': x.sum_hr,
# 'bb': x.sum_bb,
# 'so': x.sum_so,
# 'hbp': x.sum_hbp,
# 'wp': x.sum_wp,
# 'balk': x.sum_balk,
# 'era': (x.sum_erun * 27) / round(x.sum_ip * 3),
# 'whip': (x.sum_bb + x.sum_hit) / x.sum_ip,
# 'avg': None,
# 'obp': None,
# 'woba': None,
# 'k/9': x.sum_so * 9 / x.sum_ip,
# 'bb/9': x.sum_bb * 9 / x.sum_ip,
# 'k/bb': x.sum_so / max(x.sum_bb, .1),
# 'game': None,
# 'lob_2outs': None,
# 'rbi%': None
# } for x in all_stats]
# }
# db.close()
# return return_stats

188
app/routers_v2/pitstats.py Normal file
View File

@ -0,0 +1,188 @@
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, Response
from typing import Optional, List
import logging
import pydantic
from pandas import DataFrame
from ..db_engine import db, PitchingStat, model_to_dict, fn, Card, Player
from ..dependencies import oauth2_scheme, valid_token, LOG_DATA
logging.basicConfig(
filename=LOG_DATA['filename'],
format=LOG_DATA['format'],
level=LOG_DATA['log_level']
)
router = APIRouter(
prefix='/api/v2/pitstats',
tags=['pitstats']
)
class PitStat(pydantic.BaseModel):
card_id: int
team_id: int
vs_team_id: int
roster_num: int
ip: float
hit: Optional[int] = 0
run: Optional[int] = 0
erun: Optional[int] = 0
so: Optional[int] = 0
bb: Optional[int] = 0
hbp: Optional[int] = 0
wp: Optional[int] = 0
balk: Optional[int] = 0
hr: Optional[int] = 0
ir: Optional[int] = 0
irs: Optional[int] = 0
gs: Optional[int] = 0
win: Optional[int] = 0
loss: Optional[int] = 0
hold: Optional[int] = 0
sv: Optional[int] = 0
bsv: Optional[int] = 0
week: int
season: int
created: Optional[int] = int(datetime.timestamp(datetime.now())*100000)
game_id: int
class PitchingStatModel(pydantic.BaseModel):
stats: List[PitStat]
@router.get('')
async def get_pit_stats(
card_id: int = None, player_id: int = None, team_id: int = None, vs_team_id: int = None, week: int = None,
season: int = None, week_start: int = None, week_end: int = None, created: int = None, gs: bool = None,
csv: bool = None):
all_stats = PitchingStat.select().join(Card).join(Player)
logging.debug(f'pit query:\n\n{all_stats}')
if card_id is not None:
all_stats = all_stats.where(PitchingStat.card_id == card_id)
if player_id is not None:
all_stats = all_stats.where(PitchingStat.card.player.player_id == player_id)
if team_id is not None:
all_stats = all_stats.where(PitchingStat.team_id == team_id)
if vs_team_id is not None:
all_stats = all_stats.where(PitchingStat.vs_team_id == vs_team_id)
if week is not None:
all_stats = all_stats.where(PitchingStat.week == week)
if season is not None:
all_stats = all_stats.where(PitchingStat.season == season)
if week_start is not None:
all_stats = all_stats.where(PitchingStat.week >= week_start)
if week_end is not None:
all_stats = all_stats.where(PitchingStat.week <= week_end)
if created is not None:
all_stats = all_stats.where(PitchingStat.created <= created)
if gs is not None:
all_stats = all_stats.where(PitchingStat.gs == 1 if gs else 0)
# if all_stats.count() == 0:
# db.close()
# raise HTTPException(status_code=404, detail=f'No pitching stats found')
if csv:
data_list = [['id', 'card_id', 'player_id', 'cardset', 'team', 'vs_team', 'ip', 'hit', 'run', 'erun', 'so', 'bb', 'hbp',
'wp', 'balk', 'hr', 'ir', 'irs', 'gs', 'win', 'loss', 'hold', 'sv', 'bsv', 'week', 'season',
'created', 'game_id', 'roster_num']]
for line in all_stats:
data_list.append(
[
line.id, line.card.id, line.card.player.player_id, line.card.player.cardset.name, line.team.abbrev,
line.vs_team.abbrev, line.ip, line.hit,
line.run, line.erun, line.so, line.bb, line.hbp, line.wp, line.balk, line.hr, line.ir, line.irs,
line.gs, line.win, line.loss, line.hold, line.sv, line.bsv, line.week, line.season, line.created,
line.game_id, line.roster_num
]
)
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = {'count': all_stats.count(), 'stats': []}
for x in all_stats:
return_val['stats'].append(model_to_dict(x, recurse=False))
db.close()
return return_val
@router.post('')
async def post_pitstat(stats: PitchingStatModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to post stats. This event has been logged.'
)
new_stats = []
for x in stats.stats:
this_stat = PitchingStat(
card_id=x.card_id,
team_id=x.team_id,
vs_team_id=x.vs_team_id,
roster_num=x.roster_num,
ip=x.ip,
hit=x.hit,
run=x.run,
erun=x.erun,
so=x.so,
bb=x.bb,
hbp=x.hbp,
wp=x.wp,
balk=x.balk,
hr=x.hr,
ir=x.ir,
irs=x.irs,
gs=x.gs,
win=x.win,
loss=x.loss,
hold=x.hold,
sv=x.sv,
bsv=x.bsv,
week=x.week,
season=x.season,
created=x.created,
game_id=x.game_id
)
new_stats.append(this_stat)
with db.atomic():
PitchingStat.bulk_create(new_stats, batch_size=15)
db.close()
raise HTTPException(status_code=200, detail=f'{len(new_stats)} pitching lines have been added')
@router.delete('/{stat_id}')
async def delete_pitstat(stat_id, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to delete stats. This event has been logged.'
)
try:
this_stat = PitchingStat.get_by_id(stat_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No stat found with id {stat_id}')
count = this_stat.delete_instance()
db.close()
if count == 1:
raise HTTPException(status_code=200, detail=f'Stat {stat_id} has been deleted')
else:
raise HTTPException(status_code=500, detail=f'Stat {stat_id} was not deleted')

433
app/routers_v2/results.py Normal file
View File

@ -0,0 +1,433 @@
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, Response
from typing import Optional, List
import logging
import pydantic
from pandas import DataFrame
from ..db_engine import db, Result, model_to_dict, fn, Team, DataError
from ..dependencies import oauth2_scheme, valid_token, LOG_DATA
logging.basicConfig(
filename=LOG_DATA['filename'],
format=LOG_DATA['format'],
level=LOG_DATA['log_level']
)
router = APIRouter(
prefix='/api/v2/results',
tags=['results']
)
class ResultModel(pydantic.BaseModel):
away_team_id: int
home_team_id: int
away_score: int
home_score: int
away_team_value: Optional[int] = None
home_team_value: Optional[int] = None
away_team_ranking: Optional[int] = None
home_team_ranking: Optional[int] = None
scorecard: str
week: int
season: int
ranked: bool
short_game: bool
game_type: str
@router.get('')
async def get_results(
away_team_id: Optional[int] = None, home_team_id: Optional[int] = None, team_one_id: Optional[int] = None,
team_two_id: Optional[int] = None, away_score_min: Optional[int] = None, away_score_max: Optional[int] = None,
home_score_min: Optional[int] = None, home_score_max: Optional[int] = None, bothscore_min: Optional[int] = None,
bothscore_max: Optional[int] = None, season: Optional[int] = None, week: Optional[int] = None,
week_start: Optional[int] = None, week_end: Optional[int] = None, ranked: Optional[bool] = None,
short_game: Optional[bool] = None, game_type: Optional[str] = None, vs_ai: Optional[bool] = None,
csv: Optional[bool] = None):
all_results = Result.select()
# if all_results.count() == 0:
# db.close()
# raise HTTPException(status_code=404, detail=f'There are no results to filter')
if away_team_id is not None:
try:
this_team = Team.get_by_id(away_team_id)
all_results = all_results.where(Result.away_team == this_team)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No team found with id {away_team_id}')
if home_team_id is not None:
try:
this_team = Team.get_by_id(home_team_id)
all_results = all_results.where(Result.home_team == this_team)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No team found with id {home_team_id}')
if team_one_id is not None:
try:
this_team = Team.get_by_id(team_one_id)
all_results = all_results.where((Result.home_team == this_team) | (Result.away_team == this_team))
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No team found with id {team_one_id}')
if team_two_id is not None:
try:
this_team = Team.get_by_id(team_two_id)
all_results = all_results.where((Result.home_team == this_team) | (Result.away_team == this_team))
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No team found with id {team_two_id}')
if away_score_min is not None:
all_results = all_results.where(Result.away_score >= away_score_min)
if away_score_max is not None:
all_results = all_results.where(Result.away_score <= away_score_max)
if home_score_min is not None:
all_results = all_results.where(Result.home_score >= home_score_min)
if home_score_max is not None:
all_results = all_results.where(Result.home_score <= home_score_max)
if bothscore_min is not None:
all_results = all_results.where((Result.home_score >= bothscore_min) & (Result.away_score >= bothscore_min))
if bothscore_max is not None:
all_results = all_results.where((Result.home_score <= bothscore_max) & (Result.away_score <= bothscore_max))
if season is not None:
all_results = all_results.where(Result.season == season)
if week is not None:
all_results = all_results.where(Result.week == week)
if ranked is not None:
all_results = all_results.where(Result.ranked == ranked)
if short_game is not None:
all_results = all_results.where(Result.short_game == short_game)
if week_start is not None:
all_results = all_results.where(Result.week >= week_start)
if week_end is not None:
all_results = all_results.where(Result.week <= week_end)
if game_type is not None:
all_results = all_results.where(Result.game_type == game_type)
all_results = all_results.order_by(Result.id)
# Not functional
# if vs_ai is not None:
# AwayTeam = Team.alias()
# all_results = all_results.join(
# Team, on=Result.home_team
# ).switch(Result).join(
# Team, on=(AwayTeam.id == Result.away_team).alias('a_team')
# )
#
# if vs_ai:
# all_results = all_results.where(
# (Result.home_team.is_ai == 1) | (Result.a_team.is_ai == 1)
# )
# else:
# all_results = all_results.where(
# (Result.home_team.is_ai == 0) & (Result.a_team.is_ai == 0)
# )
# logging.info(f'Result Query:\n\n{all_results}')
if csv:
data_list = [['id', 'away_abbrev', 'home_abbrev', 'away_score', 'home_score', 'away_tv', 'home_tv',
'game_type', 'season', 'week', 'short_game', 'ranked']]
for line in all_results:
data_list.append([
line.id, line.away_team.abbrev, line.home_team.abbrev, line.away_score, line.home_score,
line.away_team_value, line.home_team_value, line.game_type if line.game_type else 'minor-league',
line.season, line.week, line.short_game, line.ranked
])
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = {'count': all_results.count(), 'results': []}
for x in all_results:
return_val['results'].append(model_to_dict(x))
db.close()
return return_val
@router.get('/{result_id}')
async def get_one_results(result_id, csv: Optional[bool] = None):
try:
this_result = Result.get_by_id(result_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No result found with id {result_id}')
if csv:
data_list = [
['id', 'away_abbrev', 'home_abbrev', 'away_score', 'home_score', 'away_tv', 'home_tv', 'game_type',
'season', 'week', 'game_type'],
[this_result.id, this_result.away_team.abbrev, this_result.away_team.abbrev, this_result.away_score,
this_result.home_score, this_result.away_team_value, this_result.home_team_value,
this_result.game_type if this_result.game_type else 'minor-league',
this_result.season, this_result.week, this_result.game_type]
]
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = model_to_dict(this_result)
db.close()
return return_val
@router.get('/team/{team_id}')
async def get_team_results(
team_id: int, season: Optional[int] = None, week: Optional[int] = None, csv: Optional[bool] = False):
all_results = Result.select().where((Result.away_team_id == team_id) | (Result.home_team_id == team_id))
try:
this_team = Team.get_by_id(team_id)
except Exception as e:
logging.error(f'Unknown team id {team_id} trying to pull team results')
raise HTTPException(404, f'Team id {team_id} not found')
if season is not None:
all_results = all_results.where(Result.season == season)
else:
all_results = all_results.where(Result.season == this_team.season)
if week is not None:
all_results = all_results.where(Result.week == week)
r_wins, r_loss, c_wins, c_loss = 0, 0, 0, 0
for x in all_results:
if x.away_team_id == team_id:
if x.away_score > x.home_score:
if x.ranked:
r_wins += 1
else:
c_wins += 1
else:
if x.ranked:
r_loss += 1
else:
c_loss += 1
elif x.home_team_id == team_id:
if x.away_score > x.home_score:
if x.ranked:
r_loss += 1
else:
c_loss += 1
else:
if x.ranked:
r_wins += 1
else:
c_wins += 1
if csv:
data_list = [
['team_id', 'ranked_wins', 'ranked_losses', 'casual_wins', 'casual_losses', 'team_ranking'],
[team_id, r_wins, r_loss, c_wins, c_loss, this_team.ranking]
]
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = {
'team': model_to_dict(this_team),
'ranked_wins': r_wins,
'ranked_losses': r_loss,
'casual_wins': c_wins,
'casual_losses': c_loss,
}
db.close()
return return_val
@router.post('')
async def post_result(result: ResultModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to post results. This event has been logged.'
)
this_result = Result(**result.__dict__)
saved = this_result.save()
if result.ranked:
if not result.away_team_ranking:
db.close()
error = f'Ranked game did not include away team ({result.away_team_id}) ranking.'
logging.error(error)
raise DataError(error)
if not result.home_team_ranking:
db.close()
error = f'Ranked game did not include home team ({result.home_team_id}) ranking.'
logging.error(error)
raise DataError(error)
k_value = 20 if result.short_game else 60
ratio = (result.home_team_ranking - result.away_team_ranking) / 400
exp_score = 1 / (1 + (10 ** ratio))
away_win = True if result.away_score > result.home_score else False
total_delta = k_value * exp_score
high_delta = total_delta * exp_score if exp_score > .5 else total_delta * (1 - exp_score)
low_delta = total_delta - high_delta
# exp_score > .5 means away team is favorite
if exp_score > .5 and away_win:
final_delta = low_delta
away_delta = low_delta * 3
home_delta = -low_delta
elif away_win:
final_delta = high_delta
away_delta = high_delta * 3
home_delta = -high_delta
elif exp_score <= .5 and not away_win:
final_delta = low_delta
away_delta = -low_delta
home_delta = low_delta * 3
elif not away_win:
final_delta = high_delta
away_delta = -high_delta
home_delta = high_delta * 3
else:
final_delta = 0
away_delta = 0
home_delta = 0
logging.debug(f'/results ranking deltas\n\nk_value: {k_value} / ratio: {ratio} / '
f'exp_score: {exp_score} / away_win: {away_win} / total_delta: {total_delta} / '
f'high_delta: {high_delta} / low_delta: {low_delta} / final_delta: {final_delta} / ')
away_team = Team.get_by_id(result.away_team_id)
away_team.ranking += away_delta
away_team.save()
logging.info(f'Just updated {away_team.abbrev} ranking to {away_team.ranking}')
home_team = Team.get_by_id(result.home_team_id)
home_team.ranking += home_delta
home_team.save()
logging.info(f'Just updated {home_team.abbrev} ranking to {home_team.ranking}')
if saved == 1:
return_val = model_to_dict(this_result)
db.close()
return return_val
else:
db.close()
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that roster'
)
@app.patch('/api/v1/results/{result_id}')
async def patch_result(
result_id, away_team_id: Optional[int] = None, home_team_id: Optional[int] = None,
away_score: Optional[int] = None, home_score: Optional[int] = None, away_team_value: Optional[int] = None,
home_team_value: Optional[int] = None, scorecard: Optional[str] = None, week: Optional[int] = None,
season: Optional[int] = None, short_game: Optional[bool] = None, game_type: Optional[str] = None,
token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to patch results. This event has been logged.'
)
try:
this_result = Result.get_by_id(result_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No result found with id {result_id}')
if away_team_id is not None:
this_result.away_team_id = away_team_id
if home_team_id is not None:
this_result.home_team_id = home_team_id
if away_score is not None:
this_result.away_score = away_score
if home_score is not None:
this_result.home_score = home_score
if away_team_value is not None:
this_result.away_team_value = away_team_value
if home_team_value is not None:
this_result.home_team_value = home_team_value
if scorecard is not None:
this_result.scorecard = scorecard
if week is not None:
this_result.week = week
if season is not None:
this_result.season = season
if game_type is not None:
this_result.game_type = game_type
if short_game is not None:
if not short_game:
this_result.short_game = None
else:
this_result.short_game = short_game
if this_result.save() == 1:
return_val = model_to_dict(this_result)
db.close()
return return_val
else:
db.close()
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that event'
)
@app.delete('/api/v1/results/{result_id}')
async def delete_result(result_id, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to post results. This event has been logged.'
)
try:
this_result = Result.get_by_id(result_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No result found with id {result_id}')
count = this_result.delete_instance()
db.close()
if count == 1:
raise HTTPException(status_code=200, detail=f'Result {result_id} has been deleted')
else:
raise HTTPException(status_code=500, detail=f'Result {result_id} was not deleted')

188
app/routers_v2/rewards.py Normal file
View File

@ -0,0 +1,188 @@
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, Response
from typing import Optional
import logging
import pydantic
from pandas import DataFrame
from ..db_engine import db, Reward, model_to_dict, fn
from ..dependencies import oauth2_scheme, valid_token, LOG_DATA
logging.basicConfig(
filename=LOG_DATA['filename'],
format=LOG_DATA['format'],
level=LOG_DATA['log_level']
)
router = APIRouter(
prefix='/api/v2/rewards',
tags=['rewards']
)
class RewardModel(pydantic.BaseModel):
name: str
season: int
week: int
team_id: int
created: Optional[int] = int(datetime.timestamp(datetime.now())*1000)
@router.get('')
async def get_rewards(
name: Optional[str] = None, in_name: Optional[str] = None, team_id: Optional[int] = None,
season: Optional[int] = None, week: Optional[int] = None, created_after: Optional[int] = None,
flat: Optional[bool] = False, csv: Optional[bool] = None):
all_rewards = Reward.select()
if all_rewards.count() == 0:
db.close()
raise HTTPException(status_code=404, detail=f'There are no rewards to filter')
if name is not None:
all_rewards = all_rewards.where(fn.Lower(Reward.name) == name.lower())
if team_id is not None:
all_rewards = all_rewards.where(Reward.team_id == team_id)
if created_after is not None:
all_rewards = all_rewards.where(Reward.created >= created_after)
if in_name is not None:
all_rewards = all_rewards.where(fn.Lower(Reward.name).contains(in_name.lower()))
if season is not None:
all_rewards = all_rewards.where(Reward.season == season)
if week is not None:
all_rewards = all_rewards.where(Reward.week == week)
if all_rewards.count() == 0:
db.close()
raise HTTPException(status_code=404, detail=f'No rewards found')
if csv:
data_list = [['id', 'name', 'team', 'daily', 'created']]
for line in all_rewards:
data_list.append(
[
line.id, line.name, line.team.id, line.daily, line.created
]
)
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = {'count': all_rewards.count(), 'rewards': []}
for x in all_rewards:
return_val['rewards'].append(model_to_dict(x, recurse=not flat))
db.close()
return return_val
@router.get('/{reward_id}')
async def get_one_reward(reward_id, csv: Optional[bool] = False):
try:
this_reward = Reward.get_by_id(reward_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No reward found with id {reward_id}')
if csv:
data_list = [
['id', 'name', 'card_count', 'description'],
[this_reward.id, this_reward.name, this_reward.team.id, this_reward.daily, this_reward.created]
]
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = model_to_dict(this_reward)
db.close()
return return_val
@router.post('')
async def post_rewards(reward: RewardModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to post rewards. This event has been logged.'
)
this_reward = Reward(**reward.dict())
saved = this_reward.save()
if saved == 1:
return_val = model_to_dict(this_reward)
db.close()
return return_val
else:
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that cardset'
)
@router.patch('/{reward_id}')
async def patch_reward(
reward_id, name: Optional[str] = None, team_id: Optional[int] = None, created: Optional[int] = None,
token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to patch rewards. This event has been logged.'
)
try:
this_reward = Reward.get_by_id(reward_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No reward found with id {reward_id}')
if name is not None:
this_reward.name = name
if team_id is not None:
this_reward.team_id = team_id
if created is not None:
this_reward.created = created
if this_reward.save() == 1:
return_val = model_to_dict(this_reward)
db.close()
return return_val
else:
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that rarity'
)
@router.delete('/{reward_id}')
async def delete_reward(reward_id, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to delete rewards. This event has been logged.'
)
try:
this_reward = Reward.get_by_id(reward_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No reward found with id {reward_id}')
count = this_reward.delete_instance()
db.close()
if count == 1:
raise HTTPException(status_code=200, detail=f'Reward {reward_id} has been deleted')
else:
raise HTTPException(status_code=500, detail=f'Reward {reward_id} was not deleted')