paper-dynasty-database/app/routers_v2/battingcardratings.py
2023-10-27 20:05:12 -05:00

546 lines
21 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, Query, Response
from scipy import stats
from typing import Literal, Optional, List
import logging
import pandas as pd
import pydantic
from pydantic import validator, root_validator
from ..db_engine import db, BattingCardRatings, model_to_dict, chunked, BattingCard, Player, query_to_csv, Team, \
CardPosition
from ..dependencies import oauth2_scheme, valid_token, LOG_DATA
logging.basicConfig(
filename=LOG_DATA['filename'],
format=LOG_DATA['format'],
level=LOG_DATA['log_level']
)
router = APIRouter(
prefix='/api/v2/battingcardratings',
tags=['battingcardratings']
)
class BattingCardRatingsModel(pydantic.BaseModel):
battingcard_id: int
vs_hand: Literal['R', 'L', 'vR', 'vL']
homerun: float = 0.0
bp_homerun: float = 0.0
triple: float = 0.0
double_three: float = 0.0
double_two: float = 0.0
double_pull: float = 0.0
single_two: float = 0.0
single_one: float = 0.0
single_center: float = 0.0
bp_single: float = 0.0
hbp: float = 0.0
walk: float = 0.0
strikeout: float = 0.0
lineout: float = 0.0
popout: float = 0.0
flyout_a: float = 0.0
flyout_bq: float = 0.0
flyout_lf_b: float = 0.0
flyout_rf_b: float = 0.0
groundout_a: float = 0.0
groundout_b: float = 0.0
groundout_c: float = 0.0
avg: float = 0.0
obp: float = 0.0
slg: float = 0.0
pull_rate: float = 0.0
center_rate: float = 0.0
slap_rate: float = 0.0
@validator("avg", always=True)
def avg_validator(cls, v, values, **kwargs):
return (values['homerun'] + values['bp_homerun'] / 2 + values['triple'] + values['double_three'] +
values['double_two'] + values['double_pull'] + values['single_two'] + values['single_one'] +
values['single_center'] + values['bp_single'] / 2) / 108
@validator("obp", always=True)
def obp_validator(cls, v, values, **kwargs):
return ((values['hbp'] + values['walk']) / 108) + values['avg']
@validator("slg", always=True)
def slg_validator(cls, v, values, **kwargs):
return (values['homerun'] * 4 + values['bp_homerun'] * 2 + values['triple'] * 3 + values['double_three'] * 2 +
values['double_two'] * 2 + values['double_pull'] * 2 + values['single_two'] + values['single_one'] +
values['single_center'] + values['bp_single'] / 2) / 108
@root_validator
def validate_chance_total(cls, values):
total_chances = (
values['homerun'] + values['bp_homerun'] + values['triple'] + values['double_three'] +
values['double_two'] + values['double_pull'] + values['single_two'] + values['single_one'] +
values['single_center'] + values['bp_single'] + values['hbp'] + values['walk'] +
values['strikeout'] + values['lineout'] + values['popout'] + values['flyout_a'] +
values['flyout_bq'] + values['flyout_lf_b'] + values['flyout_rf_b'] + values['groundout_a'] +
values['groundout_b'] + values['groundout_c'])
if round(total_chances) != 108:
raise ValueError("Must have exactly 108 chances on the card")
return values
class RatingsList(pydantic.BaseModel):
ratings: List[BattingCardRatingsModel]
@router.get('')
async def get_card_ratings(
team_id: int, ts: str, battingcard_id: list = Query(default=None), cardset_id: list = Query(default=None),
vs_hand: Literal['R', 'L', 'vR', 'vL'] = None, short_output: bool = False, csv: bool = False):
this_team = Team.get_or_none(Team.id == team_id)
logging.debug(f'Team: {this_team} / has_guide: {this_team.has_guide}')
if this_team is None or ts != this_team.team_hash() or this_team.has_guide != 1:
logging.warning(f'Team_id {team_id} attempted to pull ratings')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to pull card ratings.'
)
# elif not valid_token(token):
# logging.warning(f'Bad Token: {token}')
# db.close()
# raise HTTPException(
# status_code=401,
# detail='You are not authorized to pull card ratings.'
# )
all_ratings = BattingCardRatings.select()
if battingcard_id is not None:
all_ratings = all_ratings.where(BattingCardRatings.battingcard_id << battingcard_id)
if vs_hand is not None:
all_ratings = all_ratings.where(BattingCardRatings.vs_hand == vs_hand[-1])
if cardset_id is not None:
set_players = Player.select(Player.player_id).where(Player.cardset_id << cardset_id)
set_cards = BattingCard.select(BattingCard.id).where(BattingCard.player << set_players)
all_ratings = all_ratings.where(BattingCardRatings.battingcard << set_cards)
if csv:
# return_val = query_to_csv(all_ratings)
return_vals = [model_to_dict(x) for x in all_ratings]
for x in return_vals:
x.update(x['battingcard'])
x['player_id'] = x['battingcard']['player']['player_id']
del x['battingcard'], x['player']
db.close()
return Response(content=pd.DataFrame(return_vals).to_csv(index=False), media_type='text/csv')
else:
return_val = {'count': all_ratings.count(), 'ratings': [
model_to_dict(x, recurse=not short_output) for x in all_ratings
]}
db.close()
return return_val
def get_scouting_dfs(cardset_id: list = None):
all_ratings = BattingCardRatings.select()
if cardset_id is not None:
set_players = Player.select(Player.player_id).where(Player.cardset_id << cardset_id)
set_cards = BattingCard.select(BattingCard.id).where(BattingCard.player << set_players)
all_ratings = all_ratings.where(BattingCardRatings.battingcard << set_cards)
vl_query = all_ratings.where(BattingCardRatings.vs_hand == 'L')
vr_query = all_ratings.where(BattingCardRatings.vs_hand == 'R')
vl_vals = [model_to_dict(x) for x in vl_query]
for x in vl_vals:
x.update(x['battingcard'])
x['player_id'] = x['battingcard']['player']['player_id']
x['player_name'] = x['battingcard']['player']['p_name']
x['rarity'] = x['battingcard']['player']['rarity']['name']
x['cardset_id'] = x['battingcard']['player']['cardset']['id']
x['cardset_name'] = x['battingcard']['player']['cardset']['name']
del x['battingcard']
del x['player']
vr_vals = [model_to_dict(x) for x in vr_query]
for x in vr_vals:
x['player_id'] = x['battingcard']['player']['player_id']
del x['battingcard']
vl = pd.DataFrame(vl_vals)
vr = pd.DataFrame(vr_vals)
db.close()
bat_df = pd.merge(vl, vr, on='player_id', suffixes=('_vl', '_vr')).set_index('player_id', drop=False)
logging.debug(f'bat_df: {bat_df}')
positions = CardPosition.select()
if cardset_id is not None:
set_players = Player.select(Player.player_id).where(Player.cardset_id << cardset_id)
positions = positions.where(CardPosition.player << set_players)
series_list = []
for pos_code in ['P', 'C', '1B', '2B', '3B', 'SS', 'LF', 'CF', 'RF']:
series_list.append(pd.Series(
dict([(x.player.player_id, x.range) for x in positions.where(CardPosition.position == pos_code)]),
name=f'Range {pos_code}'
))
series_list.append(pd.Series(
dict([(x.player.player_id, x.error) for x in positions.where(CardPosition.position == pos_code)]),
name=f'Error {pos_code}'
))
series_list.append(pd.Series(
dict([(x.player.player_id, x.arm) for x in positions.where(CardPosition.position << ['LF', 'CF', 'RF'])]),
name=f'Arm OF'
))
series_list.append(pd.Series(
dict([(x.player.player_id, x.arm) for x in positions.where(CardPosition.position == 'C')]),
name=f'Arm C'
))
series_list.append(pd.Series(
dict([(x.player.player_id, x.pb) for x in positions.where(CardPosition.position == 'C')]),
name=f'PB C'
))
series_list.append(pd.Series(
dict([(x.player.player_id, x.overthrow) for x in positions.where(CardPosition.position == 'C')]),
name=f'Throw C'
))
logging.debug(f'series_list: {series_list}')
return bat_df.join(series_list)
@router.get('/scouting')
async def get_card_scouting(team_id: int, ts: str, cardset_id: list = Query(default=None)):
this_team = Team.get_or_none(Team.id == team_id)
logging.debug(f'Team: {this_team} / has_guide: {this_team.has_guide}')
if this_team is None or ts != this_team.team_hash() or this_team.has_guide != 1:
logging.warning(f'Team_id {team_id} attempted to pull ratings')
db.close()
return 'Your team does not have the ratings guide enabled. If you have purchased a copy ping Cal to ' \
'make sure it is enabled on your team. If you are interested you can pick it up here (thank you!): ' \
'https://ko-fi.com/manticorum/shop'
# all_ratings = BattingCardRatings.select()
# if cardset_id is not None:
# set_players = Player.select(Player.player_id).where(Player.cardset_id << cardset_id)
# set_cards = BattingCard.select(BattingCard.id).where(BattingCard.player << set_players)
# all_ratings = all_ratings.where(BattingCardRatings.battingcard << set_cards)
#
# vl_query = all_ratings.where(BattingCardRatings.vs_hand == 'L')
# vr_query = all_ratings.where(BattingCardRatings.vs_hand == 'R')
#
# vl_vals = [model_to_dict(x) for x in vl_query]
# for x in vl_vals:
# x.update(x['battingcard'])
# x['player_id'] = x['battingcard']['player']['player_id']
# x['player_name'] = x['battingcard']['player']['p_name']
# x['rarity'] = x['battingcard']['player']['rarity']['name']
# x['cardset_id'] = x['battingcard']['player']['cardset']['id']
# x['cardset_name'] = x['battingcard']['player']['cardset']['name']
# del x['battingcard']
# del x['player']
#
# vr_vals = [model_to_dict(x) for x in vr_query]
# for x in vr_vals:
# x['player_id'] = x['battingcard']['player']['player_id']
# del x['battingcard']
#
# vl = pd.DataFrame(vl_vals)
# vr = pd.DataFrame(vr_vals)
# db.close()
output = get_scouting_dfs(cardset_id)
first = ['player_id', 'player_name', 'cardset_name', 'rarity', 'hand', 'variant']
exclude = first + ['id_vl', 'id_vr', 'vs_hand_vl', 'vs_hand_vr']
output = output[first + [col for col in output.columns if col not in exclude]]
# output = output.sort_values(by=['player_id'])
return Response(content=pd.DataFrame(output).to_csv(index=False), media_type='text/csv')
@router.get('/basic')
async def get_basic_scouting(cardset_id: list = Query(default=None)):
raw_data = get_scouting_dfs(cardset_id)
logging.debug(f'output: {raw_data}')
def get_raw_speed(df_data):
speed_raw = df_data['running'] / 20 + df_data['steal_jump']
if df_data['steal_auto']:
speed_raw += 0.5
return speed_raw
raw_data['speed_raw'] = raw_data.apply(get_raw_speed, axis=1)
raw_data['speed_rank'] = raw_data['speed_raw'].rank(pct=True)
raw_data['Speed'] = round(raw_data['speed_rank'] * 100)
def get_raw_steal(df_data):
return (
((df_data['steal_high'] / 20) + (df_data['steal_low'] / 20)) * df_data['steal_jump']
)
raw_data['steal_raw'] = raw_data.apply(get_raw_steal, axis=1)
raw_data['steal_rank'] = raw_data['steal_raw'].rank(pct=True)
raw_data['Steal'] = round(raw_data['steal_rank'] * 100)
def get_raw_reaction(df_data):
raw_total = 0
for pos_range in [df_data['Range C'], df_data['Range 1B'], df_data['Range 2B'], df_data['Range 3B'],
df_data['Range SS'], df_data['Range LF'], df_data['Range CF'], df_data['Range RF']]:
if pd.notna(pos_range):
raw_total += 10 ** (5 - pos_range)
return raw_total
raw_data['reaction_raw'] = raw_data.apply(get_raw_reaction, axis=1)
raw_data['reaction_rank'] = raw_data['reaction_raw'].rank(pct=True)
raw_data['Reaction'] = round(raw_data['reaction_rank'] * 100)
def get_raw_arm(df_data):
of_arm = None
of_pos = None
if pd.notna(df_data['Range RF']):
of_pos = 'RF'
elif pd.notna(df_data['Range CF']):
of_pos = 'CF'
elif pd.notna(df_data['Range LF']):
of_pos = 'LF'
if of_pos is not None:
if df_data['Arm OF'] < 0:
of_raw = df_data['Arm OF'] * -5
else:
of_raw = (5 - df_data['Arm OF'])
if of_pos == 'RF':
of_raw = of_raw * 1.5
of_raw += ((6 - df_data['Range RF']) * 4)
elif of_pos == 'CF':
of_raw += ((6 - df_data['Range CF']) * 3)
elif of_pos == 'LF':
of_raw = of_raw / 2
of_raw += ((6 - df_data['Range LF']) * 2)
of_arm = of_raw
if_arm = None
if pd.notna(df_data['Range 3B']) or pd.notna(df_data['Range 2B']) or pd.notna(df_data['Range 1B']) or \
pd.notna(df_data['Range SS']):
range_totals = 0
if pd.notna(df_data['Range 3B']):
range_totals += ((6 - df_data['Range 3B']) * 5)
if pd.notna(df_data['Range SS']):
range_totals += ((6 - df_data['Range SS']) * 4)
if pd.notna(df_data['Range 2B']):
range_totals += ((6 - df_data['Range 2B']) * 3)
if pd.notna(df_data['Range 1B']):
range_totals += (6 - df_data['Range 1B'])
if_arm = 100 - (50 - range_totals)
c_arm = None
if pd.notna(df_data['Arm C']):
if df_data['Arm C'] == -5:
c_arm = 100
else:
temp_arm = 20 + ((10 - df_data['Arm C']) * 3) + (20 - df_data['PB C']) + (20 - df_data['Throw C']) - \
df_data['Error C']
c_arm = min(100, temp_arm)
if c_arm is not None:
return c_arm
elif of_arm is not None:
return of_arm
elif if_arm is not None:
return if_arm
else:
return 1
raw_data['arm_raw'] = raw_data.apply(get_raw_arm, axis=1)
raw_data['arm_rank'] = raw_data['arm_raw'].rank(pct=True)
raw_data['Arm'] = round(raw_data['arm_rank'] * 100)
def get_raw_fielding(df_data):
if_error, of_error, c_error = 0, 0, 0
denom = 0
if pd.notna(df_data['Error 3B']) or pd.notna(df_data['Error 2B']) or pd.notna(df_data['Error 1B']) or \
pd.notna(df_data['Error SS']):
raw_if = 100
if pd.notna(df_data['Error 3B']):
raw_if -= (df_data['Error 3B'] * 2)
if pd.notna(df_data['Error SS']):
raw_if -= (df_data['Error SS'] * .75)
if pd.notna(df_data['Error 2B']):
raw_if -= (df_data['Error 2B'] * 1.25)
if pd.notna(df_data['Error 1B']):
raw_if -= (df_data['Error 1B'] * 2)
if_error = max(1, raw_if)
denom += 1
if pd.notna(df_data['Error LF']) or pd.notna(df_data['Error CF']) or pd.notna(df_data['Error RF']):
raw_of = 100
if pd.notna(df_data['Error LF']):
raw_of -= (df_data['Error LF'] * 2)
if pd.notna(df_data['Error CF']):
raw_of -= (df_data['Error CF'] * .75)
if pd.notna(df_data['Error RF']):
raw_of -= (df_data['Error RF'] * 1.25)
of_error = max(1, raw_of)
denom += 1
if pd.notna(df_data['Error C']):
c_error = max(100 - (df_data['Error C'] * 5) - df_data['Throw C'] - df_data['PB C'], 1)
denom += 1
return sum([if_error, of_error, c_error]) / max(denom, 1)
raw_data['fld_raw'] = raw_data.apply(get_raw_fielding, axis=1)
raw_data['fld_rank'] = raw_data['fld_raw'].rank(pct=True)
raw_data['Fielding'] = round(raw_data['fld_rank'] * 100)
raw_data['convl_rank'] = raw_data['avg_vl'].rank(pct=True)
raw_data['Contact L'] = round(raw_data['convl_rank'] * 100)
raw_data['convr_rank'] = raw_data['avg_vr'].rank(pct=True)
raw_data['Contact R'] = round(raw_data['convr_rank'] * 100)
raw_data['powvl_rank'] = raw_data['slg_vl'].rank(pct=True)
raw_data['Power L'] = round(raw_data['powvl_rank'] * 100)
raw_data['powvr_rank'] = raw_data['slg_vr'].rank(pct=True)
raw_data['Power R'] = round(raw_data['powvr_rank'] * 100)
def get_raw_vision(df_data):
return (
((((df_data['obp_vr'] * 0.67) + (df_data['obp_vl'] * 0.33)) -
((df_data['avg_vr'] * 0.67) + (df_data['avg_vl'] * 0.33))) * 5) -
(((df_data['strikeout_vl'] * 0.33) + (df_data['strikeout_vr'] * 0.67)) / 208)
)
raw_data['vis_raw'] = raw_data.apply(get_raw_vision, axis=1)
raw_data['vis_rank'] = raw_data['vis_raw'].rank(pct=True)
raw_data['Vision'] = round(raw_data['vis_rank'] * 100)
def get_raw_rating(df_data):
return (
((df_data['Reaction'] + df_data['Arm'] + df_data['Fielding']) * 2) +
(df_data['Speed'] + df_data['Steal']) +
((((df_data['Contact R'] + df_data['Power R']) * 0.67) +
((df_data['Contact L'] + df_data['Power L']) * 0.33) + df_data['Vision'] ) * 6
)
)
raw_data['rat_raw'] = raw_data.apply(get_raw_rating, axis=1)
raw_data['rat_rank'] = raw_data['rat_raw'].rank(pct=True)
raw_data['Rating'] = round(raw_data['rat_rank'] * 100)
output = raw_data[[
'player_name', 'Rating', 'Contact R', 'Contact L', 'Power R', 'Power L', 'Vision', 'Speed',
'Steal', 'Reaction', 'Arm', 'Fielding', 'hand', 'cardset_name'
]]
return Response(content=output.to_csv(index=True), media_type='text/csv')
@router.get('/{ratings_id}')
async def get_one_rating(ratings_id: int, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to pull card ratings.'
)
this_rating = BattingCardRatings.get_or_none(BattingCardRatings.id == ratings_id)
if this_rating is None:
db.close()
raise HTTPException(status_code=404, detail=f'BattingCardRating id {ratings_id} not found')
r_data = model_to_dict(this_rating)
db.close()
return r_data
@router.get('/player/{player_id}')
async def get_player_ratings(
player_id: int, variant: list = Query(default=None), short_output: bool = False,
token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to pull card ratings.'
)
all_cards = BattingCard.select().where(BattingCard.player_id == player_id).order_by(BattingCard.variant)
if variant is not None:
all_cards = all_cards.where(BattingCard.variant << variant)
all_ratings = BattingCardRatings.select().where(BattingCardRatings.battingcard << all_cards)
return_val = {'count': all_ratings.count(), 'ratings': [
model_to_dict(x, recurse=not short_output) for x in all_ratings
]}
db.close()
return return_val
@router.put('')
async def put_ratings(ratings: RatingsList, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to post card ratings.'
)
new_ratings = []
updates = 0
for x in ratings.ratings:
try:
BattingCardRatings.get(
(BattingCardRatings.battingcard_id == x.battingcard_id) & (BattingCardRatings.vs_hand == x.vs_hand)
)
updates += BattingCardRatings.update(x.dict()).where(
(BattingCardRatings.battingcard_id == x.battingcard_id) & (BattingCardRatings.vs_hand == x.vs_hand)
).execute()
except BattingCardRatings.DoesNotExist:
new_ratings.append(x.dict())
with db.atomic():
for batch in chunked(new_ratings, 30):
BattingCardRatings.insert_many(batch).on_conflict_replace().execute()
db.close()
return f'Updated ratings: {updates}; new ratings: {len(new_ratings)}'
@router.delete('/{ratings_id}')
async def delete_rating(
ratings_id: int, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to post card ratings.'
)
this_rating = BattingCardRatings.get_or_none(BattingCardRatings.id == ratings_id)
if this_rating is None:
db.close()
raise HTTPException(status_code=404, detail=f'BattingCardRating id {ratings_id} not found')
count = this_rating.delete_instance()
db.close()
if count == 1:
return f'Rating {this_rating} has been deleted'
else:
raise HTTPException(status_code=500, detail=f'Rating {this_rating} could not be deleted')