507 lines
20 KiB
Python
507 lines
20 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, Query, Response
|
|
from scipy import stats
|
|
from typing import Literal, Optional, List
|
|
import logging
|
|
import pandas as pd
|
|
import pydantic
|
|
from pydantic import validator, root_validator
|
|
|
|
from ..db_engine import db, BattingCardRatings, model_to_dict, chunked, BattingCard, Player, query_to_csv, Team, \
|
|
CardPosition
|
|
from ..dependencies import oauth2_scheme, valid_token, LOG_DATA
|
|
|
|
logging.basicConfig(
|
|
filename=LOG_DATA['filename'],
|
|
format=LOG_DATA['format'],
|
|
level=LOG_DATA['log_level']
|
|
)
|
|
|
|
router = APIRouter(
|
|
prefix='/api/v2/battingcardratings',
|
|
tags=['battingcardratings']
|
|
)
|
|
|
|
|
|
class BattingCardRatingsModel(pydantic.BaseModel):
|
|
battingcard_id: int
|
|
vs_hand: Literal['R', 'L', 'vR', 'vL']
|
|
homerun: float = 0.0
|
|
bp_homerun: float = 0.0
|
|
triple: float = 0.0
|
|
double_three: float = 0.0
|
|
double_two: float = 0.0
|
|
double_pull: float = 0.0
|
|
single_two: float = 0.0
|
|
single_one: float = 0.0
|
|
single_center: float = 0.0
|
|
bp_single: float = 0.0
|
|
hbp: float = 0.0
|
|
walk: float = 0.0
|
|
strikeout: float = 0.0
|
|
lineout: float = 0.0
|
|
popout: float = 0.0
|
|
flyout_a: float = 0.0
|
|
flyout_bq: float = 0.0
|
|
flyout_lf_b: float = 0.0
|
|
flyout_rf_b: float = 0.0
|
|
groundout_a: float = 0.0
|
|
groundout_b: float = 0.0
|
|
groundout_c: float = 0.0
|
|
avg: float = 0.0
|
|
obp: float = 0.0
|
|
slg: float = 0.0
|
|
pull_rate: float = 0.0
|
|
center_rate: float = 0.0
|
|
slap_rate: float = 0.0
|
|
|
|
@validator("avg", always=True)
|
|
def avg_validator(cls, v, values, **kwargs):
|
|
return (values['homerun'] + values['bp_homerun'] / 2 + values['triple'] + values['double_three'] +
|
|
values['double_two'] + values['double_pull'] + values['single_two'] + values['single_one'] +
|
|
values['single_center'] + values['bp_single'] / 2) / 108
|
|
|
|
@validator("obp", always=True)
|
|
def obp_validator(cls, v, values, **kwargs):
|
|
return ((values['hbp'] + values['walk']) / 108) + values['avg']
|
|
|
|
@validator("slg", always=True)
|
|
def slg_validator(cls, v, values, **kwargs):
|
|
return (values['homerun'] * 4 + values['bp_homerun'] * 2 + values['triple'] * 3 + values['double_three'] * 2 +
|
|
values['double_two'] * 2 + values['double_pull'] * 2 + values['single_two'] + values['single_one'] +
|
|
values['single_center'] + values['bp_single'] / 2) / 108
|
|
|
|
@root_validator
|
|
def validate_chance_total(cls, values):
|
|
total_chances = (
|
|
values['homerun'] + values['bp_homerun'] + values['triple'] + values['double_three'] +
|
|
values['double_two'] + values['double_pull'] + values['single_two'] + values['single_one'] +
|
|
values['single_center'] + values['bp_single'] + values['hbp'] + values['walk'] +
|
|
values['strikeout'] + values['lineout'] + values['popout'] + values['flyout_a'] +
|
|
values['flyout_bq'] + values['flyout_lf_b'] + values['flyout_rf_b'] + values['groundout_a'] +
|
|
values['groundout_b'] + values['groundout_c'])
|
|
|
|
if round(total_chances) != 108:
|
|
raise ValueError("Must have exactly 108 chances on the card")
|
|
return values
|
|
|
|
|
|
class RatingsList(pydantic.BaseModel):
|
|
ratings: List[BattingCardRatingsModel]
|
|
|
|
|
|
@router.get('')
|
|
async def get_card_ratings(
|
|
team_id: int, ts: str, battingcard_id: list = Query(default=None), cardset_id: list = Query(default=None),
|
|
vs_hand: Literal['R', 'L', 'vR', 'vL'] = None, short_output: bool = False, csv: bool = False):
|
|
this_team = Team.get_or_none(Team.id == team_id)
|
|
logging.debug(f'Team: {this_team} / has_guide: {this_team.has_guide}')
|
|
if this_team is None or ts != this_team.team_hash() or this_team.has_guide != 1:
|
|
logging.warning(f'Team_id {team_id} attempted to pull ratings')
|
|
db.close()
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail='You are not authorized to pull card ratings.'
|
|
)
|
|
# elif not valid_token(token):
|
|
# logging.warning(f'Bad Token: {token}')
|
|
# db.close()
|
|
# raise HTTPException(
|
|
# status_code=401,
|
|
# detail='You are not authorized to pull card ratings.'
|
|
# )
|
|
|
|
all_ratings = BattingCardRatings.select()
|
|
|
|
if battingcard_id is not None:
|
|
all_ratings = all_ratings.where(BattingCardRatings.battingcard_id << battingcard_id)
|
|
if vs_hand is not None:
|
|
all_ratings = all_ratings.where(BattingCardRatings.vs_hand == vs_hand[-1])
|
|
if cardset_id is not None:
|
|
set_players = Player.select(Player.player_id).where(Player.cardset_id << cardset_id)
|
|
set_cards = BattingCard.select(BattingCard.id).where(BattingCard.player << set_players)
|
|
all_ratings = all_ratings.where(BattingCardRatings.battingcard << set_cards)
|
|
|
|
if csv:
|
|
# return_val = query_to_csv(all_ratings)
|
|
return_vals = [model_to_dict(x) for x in all_ratings]
|
|
for x in return_vals:
|
|
x.update(x['battingcard'])
|
|
x['player_id'] = x['battingcard']['player']['player_id']
|
|
del x['battingcard'], x['player']
|
|
|
|
db.close()
|
|
return Response(content=pd.DataFrame(return_vals).to_csv(index=False), media_type='text/csv')
|
|
|
|
else:
|
|
return_val = {'count': all_ratings.count(), 'ratings': [
|
|
model_to_dict(x, recurse=not short_output) for x in all_ratings
|
|
]}
|
|
db.close()
|
|
return return_val
|
|
|
|
|
|
def get_scouting_dfs(cardset_id: list = None):
|
|
all_ratings = BattingCardRatings.select()
|
|
if cardset_id is not None:
|
|
set_players = Player.select(Player.player_id).where(Player.cardset_id << cardset_id)
|
|
set_cards = BattingCard.select(BattingCard.id).where(BattingCard.player << set_players)
|
|
all_ratings = all_ratings.where(BattingCardRatings.battingcard << set_cards)
|
|
|
|
vl_query = all_ratings.where(BattingCardRatings.vs_hand == 'L')
|
|
vr_query = all_ratings.where(BattingCardRatings.vs_hand == 'R')
|
|
|
|
vl_vals = [model_to_dict(x) for x in vl_query]
|
|
for x in vl_vals:
|
|
x.update(x['battingcard'])
|
|
x['player_id'] = x['battingcard']['player']['player_id']
|
|
x['player_name'] = x['battingcard']['player']['p_name']
|
|
x['rarity'] = x['battingcard']['player']['rarity']['name']
|
|
x['cardset_id'] = x['battingcard']['player']['cardset']['id']
|
|
x['cardset_name'] = x['battingcard']['player']['cardset']['name']
|
|
del x['battingcard']
|
|
del x['player']
|
|
|
|
vr_vals = [model_to_dict(x) for x in vr_query]
|
|
for x in vr_vals:
|
|
x['player_id'] = x['battingcard']['player']['player_id']
|
|
del x['battingcard']
|
|
|
|
vl = pd.DataFrame(vl_vals)
|
|
vr = pd.DataFrame(vr_vals)
|
|
db.close()
|
|
|
|
bat_df = pd.merge(vl, vr, on='player_id', suffixes=('_vl', '_vr')).set_index('player_id', drop=False)
|
|
# output['Range P'] = ''
|
|
# output['Range C'] = ''
|
|
# output['Range 1B'] = ''
|
|
# output['Range 2B'] = ''
|
|
# output['Range 3B'] = ''
|
|
# output['Range SS'] = ''
|
|
# output['Range LF'] = ''
|
|
# output['Range CF'] = ''
|
|
# output['Range RF'] = ''
|
|
# output['Error P'] = ''
|
|
# output['Error C'] = ''
|
|
# output['Error 1B'] = ''
|
|
# output['Error 2B'] = ''
|
|
# output['Error 3B'] = ''
|
|
# output['Error SS'] = ''
|
|
# output['Error LF'] = ''
|
|
# output['Error CF'] = ''
|
|
# output['Error RF'] = ''
|
|
# output['Arm C'] = ''
|
|
# output['Throw C'] = ''
|
|
# output['PB C'] = ''
|
|
# output['Arm OF'] = ''
|
|
|
|
logging.info(f'bat_df: {bat_df}')
|
|
|
|
positions = CardPosition.select()
|
|
if cardset_id is not None:
|
|
set_players = Player.select(Player.player_id).where(Player.cardset_id << cardset_id)
|
|
positions = positions.where(CardPosition.player << set_players)
|
|
|
|
series_list = []
|
|
for pos_code in ['P', 'C', '1B', '2B', '3B', 'SS', 'LF', 'CF', 'RF']:
|
|
series_list.append(pd.Series(
|
|
dict([(x.player.player_id, x.range) for x in positions.where(CardPosition.position == pos_code)]),
|
|
name=f'Range {pos_code}'
|
|
))
|
|
series_list.append(pd.Series(
|
|
dict([(x.player.player_id, x.error) for x in positions.where(CardPosition.position == pos_code)]),
|
|
name=f'Error {pos_code}'
|
|
))
|
|
# bat_df.join(pd.Series(
|
|
# dict([(x.player.player_id, x.range) for x in positions.where(CardPosition.position == pos_code)]),
|
|
# name=f'Range {pos_code}'
|
|
# ))
|
|
# bat_df.join(pd.Series(
|
|
# dict([(x.player.player_id, x.error) for x in positions.where(CardPosition.position == pos_code)]),
|
|
# name=f'Error {pos_code}'
|
|
# ))
|
|
|
|
series_list.append(pd.Series(
|
|
dict([(x.player.player_id, x.arm) for x in positions.where(CardPosition.position << ['LF', 'CF', 'RF'])]),
|
|
name=f'Arm OF'
|
|
))
|
|
series_list.append(pd.Series(
|
|
dict([(x.player.player_id, x.arm) for x in positions.where(CardPosition.position == 'C')]),
|
|
name=f'Arm C'
|
|
))
|
|
series_list.append(pd.Series(
|
|
dict([(x.player.player_id, x.pb) for x in positions.where(CardPosition.position == 'C')]),
|
|
name=f'PB C'
|
|
))
|
|
series_list.append(pd.Series(
|
|
dict([(x.player.player_id, x.overthrow) for x in positions.where(CardPosition.position == 'C')]),
|
|
name=f'Throw C'
|
|
))
|
|
|
|
logging.info(f'series_list: {series_list}')
|
|
|
|
# for x in positions:
|
|
# if x.position != 'DH':
|
|
# output.at[f'{x.player.player_id}', f'Range {x.position}'] = x.range # vals in .at list are reversed
|
|
# output.at[f'{x.player.player_id}', f'Error {x.position}'] = x.error
|
|
# if x.position in ['LF', 'CF', 'RF']:
|
|
# output.at[f'{x.player.player_id}', f'Arm OF'] = x.arm
|
|
# if x.position == 'C':
|
|
# output.at[f'{x.player.player_id}', f'Arm C'] = x.arm
|
|
# output.at[f'{x.player.player_id}', f'PB C'] = x.pb
|
|
# output.at[f'{x.player.player_id}', f'Throw C'] = x.overthrow
|
|
|
|
return bat_df.join(series_list)
|
|
|
|
|
|
@router.get('/scouting')
|
|
async def get_card_scouting(team_id: int, ts: str, cardset_id: list = Query(default=None)):
|
|
this_team = Team.get_or_none(Team.id == team_id)
|
|
logging.debug(f'Team: {this_team} / has_guide: {this_team.has_guide}')
|
|
if this_team is None or ts != this_team.team_hash() or this_team.has_guide != 1:
|
|
logging.warning(f'Team_id {team_id} attempted to pull ratings')
|
|
db.close()
|
|
return 'Your team does not have the ratings guide enabled. If you have purchased a copy ping Cal to ' \
|
|
'make sure it is enabled on your team. If you are interested you can pick it up here (thank you!): ' \
|
|
'https://ko-fi.com/manticorum/shop'
|
|
|
|
# all_ratings = BattingCardRatings.select()
|
|
# if cardset_id is not None:
|
|
# set_players = Player.select(Player.player_id).where(Player.cardset_id << cardset_id)
|
|
# set_cards = BattingCard.select(BattingCard.id).where(BattingCard.player << set_players)
|
|
# all_ratings = all_ratings.where(BattingCardRatings.battingcard << set_cards)
|
|
#
|
|
# vl_query = all_ratings.where(BattingCardRatings.vs_hand == 'L')
|
|
# vr_query = all_ratings.where(BattingCardRatings.vs_hand == 'R')
|
|
#
|
|
# vl_vals = [model_to_dict(x) for x in vl_query]
|
|
# for x in vl_vals:
|
|
# x.update(x['battingcard'])
|
|
# x['player_id'] = x['battingcard']['player']['player_id']
|
|
# x['player_name'] = x['battingcard']['player']['p_name']
|
|
# x['rarity'] = x['battingcard']['player']['rarity']['name']
|
|
# x['cardset_id'] = x['battingcard']['player']['cardset']['id']
|
|
# x['cardset_name'] = x['battingcard']['player']['cardset']['name']
|
|
# del x['battingcard']
|
|
# del x['player']
|
|
#
|
|
# vr_vals = [model_to_dict(x) for x in vr_query]
|
|
# for x in vr_vals:
|
|
# x['player_id'] = x['battingcard']['player']['player_id']
|
|
# del x['battingcard']
|
|
#
|
|
# vl = pd.DataFrame(vl_vals)
|
|
# vr = pd.DataFrame(vr_vals)
|
|
# db.close()
|
|
|
|
output = get_scouting_dfs(cardset_id)
|
|
first = ['player_id', 'player_name', 'cardset_name', 'rarity', 'hand', 'variant']
|
|
exclude = first + ['id_vl', 'id_vr', 'vs_hand_vl', 'vs_hand_vr']
|
|
output = output[first + [col for col in output.columns if col not in exclude]]
|
|
# output = output.sort_values(by=['player_id'])
|
|
return Response(content=pd.DataFrame(output).to_csv(index=False), media_type='text/csv')
|
|
|
|
|
|
@router.get('/basic')
|
|
async def get_basic_scouting(cardset_id: list = Query(default=None)):
|
|
raw_data = get_scouting_dfs(cardset_id)
|
|
logging.info(f'output: {raw_data}')
|
|
|
|
def get_raw_speed(df_data):
|
|
speed_raw = df_data['running'] / 20 + df_data['steal_jump']
|
|
if df_data['steal_auto']:
|
|
speed_raw += 0.5
|
|
return speed_raw
|
|
|
|
raw_data['speed_raw'] = raw_data.apply(get_raw_speed, axis=1)
|
|
raw_data['speed_rank'] = raw_data['speed_raw'].rank(pct=True)
|
|
raw_data['Speed'] = round(raw_data['speed_rank'] * 100)
|
|
|
|
def get_raw_steal(df_data):
|
|
return (
|
|
((df_data['steal_high'] / 20) + (df_data['steal_low'] / 20)) * df_data['steal_jump']
|
|
)
|
|
|
|
raw_data['steal_raw'] = raw_data.apply(get_raw_steal, axis=1)
|
|
raw_data['steal_rank'] = raw_data['steal_raw'].rank(pct=True)
|
|
raw_data['Steal'] = round(raw_data['steal_rank'] * 100)
|
|
|
|
def get_raw_reaction(df_data):
|
|
raw_total = 0
|
|
for pos_range in [df_data['Range C'], df_data['Range 1B'], df_data['Range 2B'], df_data['Range 3B'],
|
|
df_data['Range SS'], df_data['Range LF'], df_data['Range CF'], df_data['Range RF']]:
|
|
if pd.notna(pos_range):
|
|
raw_total += 10 ** (5 - pos_range)
|
|
return raw_total
|
|
|
|
raw_data['reaction_raw'] = raw_data.apply(get_raw_reaction, axis=1)
|
|
raw_data['reaction_rank'] = raw_data['reaction_raw'].rank(pct=True)
|
|
raw_data['Reaction'] = round(raw_data['reaction_rank'] * 100)
|
|
|
|
def get_raw_arm(df_data):
|
|
of_arm = None
|
|
of_pos = None
|
|
if pd.notna(df_data['Range RF']):
|
|
of_pos = 'RF'
|
|
elif pd.notna(df_data['Range CF']):
|
|
of_pos = 'CF'
|
|
elif pd.notna(df_data['Range LF']):
|
|
of_pos = 'LF'
|
|
|
|
if of_pos is not None:
|
|
if df_data['Arm OF'] < 0:
|
|
of_raw = df_data['Arm OF'] * -5
|
|
else:
|
|
of_raw = (5 - df_data['Arm OF'])
|
|
|
|
if of_pos == 'RF':
|
|
of_raw = of_raw * 1.5
|
|
of_raw += ((6 - df_data['Range RF']) * 4)
|
|
elif of_pos == 'CF':
|
|
of_raw += ((6 - df_data['Range CF']) * 3)
|
|
elif of_pos == 'LF':
|
|
of_raw = of_raw / 2
|
|
of_raw += ((6 - df_data['Range LF']) * 2)
|
|
|
|
of_arm = of_raw
|
|
|
|
if_arm = None
|
|
if pd.notna(df_data['Range 3B']) or pd.notna(df_data['Range 2B']) or pd.notna(df_data['Range 1B']) or \
|
|
pd.notna(df_data['Range SS']):
|
|
range_totals = 0
|
|
if pd.notna(df_data['Range 3B']):
|
|
range_totals += ((6 - df_data['Range 3B']) * 5)
|
|
if pd.notna(df_data['Range SS']):
|
|
range_totals += ((6 - df_data['Range SS']) * 4)
|
|
if pd.notna(df_data['Range 2B']):
|
|
range_totals += ((6 - df_data['Range 2B']) * 3)
|
|
if pd.notna(df_data['Range 1B']):
|
|
range_totals += (6 - df_data['Range 1B'])
|
|
if_arm = 100 - (50 - range_totals)
|
|
|
|
c_arm = None
|
|
if pd.notna(df_data['Arm C']):
|
|
if df_data['Arm C'] == -5:
|
|
c_arm = 100
|
|
else:
|
|
temp_arm = 20 + ((10 - df_data['Arm C']) * 3) + (20 - df_data['PB C']) + (20 - df_data['Throw C']) - \
|
|
df_data['Error C']
|
|
c_arm = min(100, temp_arm)
|
|
|
|
if c_arm is not None:
|
|
return c_arm
|
|
elif of_arm is not None:
|
|
return of_arm
|
|
elif if_arm is not None:
|
|
return if_arm
|
|
|
|
raw_data['arm_raw'] = raw_data.apply(get_raw_arm, axis=1)
|
|
raw_data['arm_rank'] = raw_data['arm_raw'].rank(pct=True)
|
|
raw_data['Arm'] = round(raw_data['arm_rank'] * 100)
|
|
|
|
output = raw_data[['player_name', 'cardset_name', 'Speed', 'Steal', 'Reaction', 'Arm']]
|
|
# raw_data.sort_values(by=['player_id'], inplace=True)
|
|
return Response(content=output.to_csv(index=True), media_type='text/csv')
|
|
|
|
|
|
@router.get('/{ratings_id}')
|
|
async def get_one_rating(ratings_id: int, token: str = Depends(oauth2_scheme)):
|
|
if not valid_token(token):
|
|
logging.warning(f'Bad Token: {token}')
|
|
db.close()
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail='You are not authorized to pull card ratings.'
|
|
)
|
|
|
|
this_rating = BattingCardRatings.get_or_none(BattingCardRatings.id == ratings_id)
|
|
if this_rating is None:
|
|
db.close()
|
|
raise HTTPException(status_code=404, detail=f'BattingCardRating id {ratings_id} not found')
|
|
|
|
r_data = model_to_dict(this_rating)
|
|
db.close()
|
|
return r_data
|
|
|
|
|
|
@router.get('/player/{player_id}')
|
|
async def get_player_ratings(
|
|
player_id: int, variant: list = Query(default=None), short_output: bool = False,
|
|
token: str = Depends(oauth2_scheme)):
|
|
if not valid_token(token):
|
|
logging.warning(f'Bad Token: {token}')
|
|
db.close()
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail='You are not authorized to pull card ratings.'
|
|
)
|
|
|
|
all_cards = BattingCard.select().where(BattingCard.player_id == player_id).order_by(BattingCard.variant)
|
|
if variant is not None:
|
|
all_cards = all_cards.where(BattingCard.variant << variant)
|
|
|
|
all_ratings = BattingCardRatings.select().where(BattingCardRatings.battingcard << all_cards)
|
|
|
|
return_val = {'count': all_ratings.count(), 'ratings': [
|
|
model_to_dict(x, recurse=not short_output) for x in all_ratings
|
|
]}
|
|
db.close()
|
|
return return_val
|
|
|
|
|
|
@router.put('')
|
|
async def put_ratings(ratings: RatingsList, token: str = Depends(oauth2_scheme)):
|
|
if not valid_token(token):
|
|
logging.warning(f'Bad Token: {token}')
|
|
db.close()
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail='You are not authorized to post card ratings.'
|
|
)
|
|
|
|
new_ratings = []
|
|
updates = 0
|
|
for x in ratings.ratings:
|
|
try:
|
|
BattingCardRatings.get(
|
|
(BattingCardRatings.battingcard_id == x.battingcard_id) & (BattingCardRatings.vs_hand == x.vs_hand)
|
|
)
|
|
updates += BattingCardRatings.update(x.dict()).where(
|
|
(BattingCardRatings.battingcard_id == x.battingcard_id) & (BattingCardRatings.vs_hand == x.vs_hand)
|
|
).execute()
|
|
except BattingCardRatings.DoesNotExist:
|
|
new_ratings.append(x.dict())
|
|
|
|
with db.atomic():
|
|
for batch in chunked(new_ratings, 30):
|
|
BattingCardRatings.insert_many(batch).on_conflict_replace().execute()
|
|
|
|
db.close()
|
|
return f'Updated ratings: {updates}; new ratings: {len(new_ratings)}'
|
|
|
|
|
|
@router.delete('/{ratings_id}')
|
|
async def delete_rating(
|
|
ratings_id: int, token: str = Depends(oauth2_scheme)):
|
|
if not valid_token(token):
|
|
logging.warning(f'Bad Token: {token}')
|
|
db.close()
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail='You are not authorized to post card ratings.'
|
|
)
|
|
|
|
this_rating = BattingCardRatings.get_or_none(BattingCardRatings.id == ratings_id)
|
|
if this_rating is None:
|
|
db.close()
|
|
raise HTTPException(status_code=404, detail=f'BattingCardRating id {ratings_id} not found')
|
|
|
|
count = this_rating.delete_instance()
|
|
db.close()
|
|
|
|
if count == 1:
|
|
return f'Rating {this_rating} has been deleted'
|
|
else:
|
|
raise HTTPException(status_code=500, detail=f'Rating {this_rating} could not be deleted')
|
|
|
|
|