579 lines
22 KiB
Python
579 lines
22 KiB
Python
import os
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Query, Response
|
|
from fastapi.responses import FileResponse
|
|
from scipy import stats
|
|
from typing import Literal, Optional, List
|
|
import logging
|
|
import pandas as pd
|
|
import pydantic
|
|
from pydantic import validator, root_validator
|
|
|
|
from ..db_engine import db, BattingCardRatings, model_to_dict, chunked, BattingCard, Player, query_to_csv, Team, \
|
|
CardPosition
|
|
from ..dependencies import oauth2_scheme, valid_token, LOG_DATA, PRIVATE_IN_SCHEMA
|
|
|
|
logging.basicConfig(
|
|
filename=LOG_DATA['filename'],
|
|
format=LOG_DATA['format'],
|
|
level=LOG_DATA['log_level']
|
|
)
|
|
|
|
router = APIRouter(
|
|
prefix='/api/v2/battingcardratings',
|
|
tags=['battingcardratings']
|
|
)
|
|
RATINGS_FILE = 'storage/batting-ratings.csv'
|
|
BASIC_FILE = 'storage/batting-basic.csv'
|
|
|
|
|
|
class BattingCardRatingsModel(pydantic.BaseModel):
|
|
battingcard_id: int
|
|
vs_hand: Literal['R', 'L', 'vR', 'vL']
|
|
homerun: float = 0.0
|
|
bp_homerun: float = 0.0
|
|
triple: float = 0.0
|
|
double_three: float = 0.0
|
|
double_two: float = 0.0
|
|
double_pull: float = 0.0
|
|
single_two: float = 0.0
|
|
single_one: float = 0.0
|
|
single_center: float = 0.0
|
|
bp_single: float = 0.0
|
|
hbp: float = 0.0
|
|
walk: float = 0.0
|
|
strikeout: float = 0.0
|
|
lineout: float = 0.0
|
|
popout: float = 0.0
|
|
flyout_a: float = 0.0
|
|
flyout_bq: float = 0.0
|
|
flyout_lf_b: float = 0.0
|
|
flyout_rf_b: float = 0.0
|
|
groundout_a: float = 0.0
|
|
groundout_b: float = 0.0
|
|
groundout_c: float = 0.0
|
|
avg: float = 0.0
|
|
obp: float = 0.0
|
|
slg: float = 0.0
|
|
pull_rate: float = 0.0
|
|
center_rate: float = 0.0
|
|
slap_rate: float = 0.0
|
|
|
|
@validator("avg", always=True)
|
|
def avg_validator(cls, v, values, **kwargs):
|
|
return (values['homerun'] + values['bp_homerun'] / 2 + values['triple'] + values['double_three'] +
|
|
values['double_two'] + values['double_pull'] + values['single_two'] + values['single_one'] +
|
|
values['single_center'] + values['bp_single'] / 2) / 108
|
|
|
|
@validator("obp", always=True)
|
|
def obp_validator(cls, v, values, **kwargs):
|
|
return ((values['hbp'] + values['walk']) / 108) + values['avg']
|
|
|
|
@validator("slg", always=True)
|
|
def slg_validator(cls, v, values, **kwargs):
|
|
return (values['homerun'] * 4 + values['bp_homerun'] * 2 + values['triple'] * 3 + values['double_three'] * 2 +
|
|
values['double_two'] * 2 + values['double_pull'] * 2 + values['single_two'] + values['single_one'] +
|
|
values['single_center'] + values['bp_single'] / 2) / 108
|
|
|
|
@root_validator
|
|
def validate_chance_total(cls, values):
|
|
total_chances = (
|
|
values['homerun'] + values['bp_homerun'] + values['triple'] + values['double_three'] +
|
|
values['double_two'] + values['double_pull'] + values['single_two'] + values['single_one'] +
|
|
values['single_center'] + values['bp_single'] + values['hbp'] + values['walk'] +
|
|
values['strikeout'] + values['lineout'] + values['popout'] + values['flyout_a'] +
|
|
values['flyout_bq'] + values['flyout_lf_b'] + values['flyout_rf_b'] + values['groundout_a'] +
|
|
values['groundout_b'] + values['groundout_c'])
|
|
|
|
if round(total_chances) != 108:
|
|
raise ValueError("Must have exactly 108 chances on the card")
|
|
return values
|
|
|
|
|
|
class RatingsReturnList(pydantic.BaseModel):
|
|
count: int
|
|
awards: list[BattingCardRatingsModel]
|
|
|
|
|
|
class RatingsList(pydantic.BaseModel):
|
|
ratings: List[BattingCardRatingsModel]
|
|
|
|
|
|
@router.get('', response_model=RatingsReturnList)
|
|
async def get_card_ratings(
|
|
team_id: int, ts: str, battingcard_id: list = Query(default=None), cardset_id: list = Query(default=None),
|
|
vs_hand: Literal['R', 'L', 'vR', 'vL'] = None, short_output: bool = False, csv: bool = False):
|
|
this_team = Team.get_or_none(Team.id == team_id)
|
|
logging.debug(f'Team: {this_team} / has_guide: {this_team.has_guide}')
|
|
if this_team is None or ts != this_team.team_hash() or this_team.has_guide != 1:
|
|
logging.warning(f'Team_id {team_id} attempted to pull ratings')
|
|
db.close()
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail='You are not authorized to pull card ratings.'
|
|
)
|
|
# elif not valid_token(token):
|
|
# logging.warning(f'Bad Token: {token}')
|
|
# db.close()
|
|
# raise HTTPException(
|
|
# status_code=401,
|
|
# detail='You are not authorized to pull card ratings.'
|
|
# )
|
|
|
|
all_ratings = BattingCardRatings.select()
|
|
|
|
if battingcard_id is not None:
|
|
all_ratings = all_ratings.where(BattingCardRatings.battingcard_id << battingcard_id)
|
|
if vs_hand is not None:
|
|
all_ratings = all_ratings.where(BattingCardRatings.vs_hand == vs_hand[-1])
|
|
if cardset_id is not None:
|
|
set_players = Player.select(Player.player_id).where(Player.cardset_id << cardset_id)
|
|
set_cards = BattingCard.select(BattingCard.id).where(BattingCard.player << set_players)
|
|
all_ratings = all_ratings.where(BattingCardRatings.battingcard << set_cards)
|
|
|
|
if csv:
|
|
# return_val = query_to_csv(all_ratings)
|
|
return_vals = [model_to_dict(x) for x in all_ratings]
|
|
for x in return_vals:
|
|
x.update(x['battingcard'])
|
|
x['player_id'] = x['battingcard']['player']['player_id']
|
|
del x['battingcard'], x['player']
|
|
|
|
db.close()
|
|
return Response(content=pd.DataFrame(return_vals).to_csv(index=False), media_type='text/csv')
|
|
|
|
else:
|
|
return_val = {'count': all_ratings.count(), 'ratings': [
|
|
model_to_dict(x, recurse=not short_output) for x in all_ratings
|
|
]}
|
|
db.close()
|
|
return return_val
|
|
|
|
|
|
def get_scouting_dfs(cardset_id: list = None):
|
|
all_ratings = BattingCardRatings.select()
|
|
if cardset_id is not None:
|
|
set_players = Player.select(Player.player_id).where(Player.cardset_id << cardset_id)
|
|
set_cards = BattingCard.select(BattingCard.id).where(BattingCard.player << set_players)
|
|
all_ratings = all_ratings.where(BattingCardRatings.battingcard << set_cards)
|
|
|
|
vl_query = all_ratings.where(BattingCardRatings.vs_hand == 'L')
|
|
vr_query = all_ratings.where(BattingCardRatings.vs_hand == 'R')
|
|
|
|
vl_vals = [model_to_dict(x) for x in vl_query]
|
|
for x in vl_vals:
|
|
x.update(x['battingcard'])
|
|
x['player_id'] = x['battingcard']['player']['player_id']
|
|
x['player_name'] = x['battingcard']['player']['p_name']
|
|
x['rarity'] = x['battingcard']['player']['rarity']['name']
|
|
x['cardset_id'] = x['battingcard']['player']['cardset']['id']
|
|
x['cardset_name'] = x['battingcard']['player']['cardset']['name']
|
|
del x['battingcard']
|
|
del x['player']
|
|
|
|
vr_vals = [model_to_dict(x) for x in vr_query]
|
|
for x in vr_vals:
|
|
x['player_id'] = x['battingcard']['player']['player_id']
|
|
del x['battingcard']
|
|
|
|
vl = pd.DataFrame(vl_vals)
|
|
vr = pd.DataFrame(vr_vals)
|
|
|
|
bat_df = pd.merge(vl, vr, on='player_id', suffixes=('_vl', '_vr')).set_index('player_id', drop=False)
|
|
|
|
logging.debug(f'bat_df: {bat_df}')
|
|
|
|
positions = CardPosition.select()
|
|
if cardset_id is not None:
|
|
set_players = Player.select(Player.player_id).where(Player.cardset_id << cardset_id)
|
|
positions = positions.where(CardPosition.player << set_players)
|
|
|
|
series_list = []
|
|
for pos_code in ['P', 'C', '1B', '2B', '3B', 'SS', 'LF', 'CF', 'RF']:
|
|
series_list.append(pd.Series(
|
|
dict([(x.player.player_id, x.range) for x in positions.where(CardPosition.position == pos_code)]),
|
|
name=f'Range {pos_code}'
|
|
))
|
|
series_list.append(pd.Series(
|
|
dict([(x.player.player_id, x.error) for x in positions.where(CardPosition.position == pos_code)]),
|
|
name=f'Error {pos_code}'
|
|
))
|
|
|
|
series_list.append(pd.Series(
|
|
dict([(x.player.player_id, x.arm) for x in positions.where(CardPosition.position << ['LF', 'CF', 'RF'])]),
|
|
name=f'Arm OF'
|
|
))
|
|
series_list.append(pd.Series(
|
|
dict([(x.player.player_id, x.arm) for x in positions.where(CardPosition.position == 'C')]),
|
|
name=f'Arm C'
|
|
))
|
|
series_list.append(pd.Series(
|
|
dict([(x.player.player_id, x.pb) for x in positions.where(CardPosition.position == 'C')]),
|
|
name=f'PB C'
|
|
))
|
|
series_list.append(pd.Series(
|
|
dict([(x.player.player_id, x.overthrow) for x in positions.where(CardPosition.position == 'C')]),
|
|
name=f'Throw C'
|
|
))
|
|
db.close()
|
|
logging.debug(f'series_list: {series_list}')
|
|
|
|
return bat_df.join(series_list)
|
|
|
|
|
|
@router.get('/scouting')
|
|
async def get_card_scouting(team_id: int, ts: str):
|
|
this_team = Team.get_or_none(Team.id == team_id)
|
|
logging.debug(f'Team: {this_team} / has_guide: {this_team.has_guide}')
|
|
if this_team is None or ts != this_team.team_hash() or this_team.has_guide != 1:
|
|
logging.warning(f'Team_id {team_id} attempted to pull ratings')
|
|
db.close()
|
|
return 'Your team does not have the ratings guide enabled. If you have purchased a copy ping Cal to ' \
|
|
'make sure it is enabled on your team. If you are interested you can pick it up here (thank you!): ' \
|
|
'https://ko-fi.com/manticorum/shop'
|
|
|
|
if os.path.isfile(f'storage/batting-ratings.csv'):
|
|
return FileResponse(
|
|
path=f'storage/batting-ratings.csv',
|
|
media_type='text/csv',
|
|
# headers=headers
|
|
)
|
|
|
|
raise HTTPException(status_code=400, detail='Go pester Cal - the scouting file is missing')
|
|
|
|
|
|
@router.post('/calculate/scouting', include_in_schema=PRIVATE_IN_SCHEMA)
|
|
async def post_calc_scouting(token: str = Depends(oauth2_scheme)):
|
|
if not valid_token(token):
|
|
logging.warning(f'Bad Token: {token}')
|
|
db.close()
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail='You are not authorized to calculate card ratings.'
|
|
)
|
|
|
|
logging.warning(f'Re-calculating batting ratings\n\n')
|
|
|
|
output = get_scouting_dfs()
|
|
first = ['player_id', 'player_name', 'cardset_name', 'rarity', 'hand', 'variant']
|
|
exclude = first + ['id_vl', 'id_vr', 'vs_hand_vl', 'vs_hand_vr']
|
|
output = output[first + [col for col in output.columns if col not in exclude]]
|
|
|
|
csv_file = pd.DataFrame(output).to_csv(index=False)
|
|
with open(RATINGS_FILE, 'w') as file:
|
|
file.write(csv_file)
|
|
|
|
return Response(content=csv_file, media_type='text/csv')
|
|
|
|
|
|
@router.get('/basic')
|
|
async def get_basic_scouting(cardset_id: list = Query(default=None)):
|
|
if os.path.isfile(f'storage/batting-basic.csv'):
|
|
return FileResponse(
|
|
path=f'storage/batting-basic.csv',
|
|
media_type='text/csv',
|
|
# headers=headers
|
|
)
|
|
|
|
raise HTTPException(status_code=400, detail='Go pester Cal - the scouting file is missing')
|
|
|
|
|
|
@router.post('/calculate/basic', include_in_schema=PRIVATE_IN_SCHEMA)
|
|
async def post_calc_basic(token: str = Depends(oauth2_scheme)):
|
|
if not valid_token(token):
|
|
logging.warning(f'Bad Token: {token}')
|
|
db.close()
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail='You are not authorized to calculate basic ratings.'
|
|
)
|
|
|
|
logging.warning(f'Re-calculating basic batting ratings\n\n')
|
|
|
|
raw_data = get_scouting_dfs()
|
|
logging.debug(f'output: {raw_data}')
|
|
|
|
def get_raw_speed(df_data):
|
|
speed_raw = df_data['running'] / 20 + df_data['steal_jump']
|
|
if df_data['steal_auto']:
|
|
speed_raw += 0.5
|
|
return speed_raw
|
|
|
|
raw_series = raw_data.apply(get_raw_speed, axis=1)
|
|
rank_series = raw_series.rank(pct=True)
|
|
raw_data['Speed'] = round(rank_series * 100)
|
|
|
|
def get_raw_steal(df_data):
|
|
return (
|
|
((df_data['steal_high'] / 20) + (df_data['steal_low'] / 20)) * df_data['steal_jump']
|
|
)
|
|
|
|
raw_series = raw_data.apply(get_raw_steal, axis=1)
|
|
rank_series = raw_series.rank(pct=True)
|
|
raw_data['Steal'] = round(rank_series * 100)
|
|
|
|
def get_raw_reaction(df_data):
|
|
raw_total = 0
|
|
for pos_range in [df_data['Range C'], df_data['Range 1B'], df_data['Range 2B'], df_data['Range 3B'],
|
|
df_data['Range SS'], df_data['Range LF'], df_data['Range CF'], df_data['Range RF']]:
|
|
if pd.notna(pos_range):
|
|
raw_total += 10 ** (5 - pos_range)
|
|
return raw_total
|
|
|
|
raw_series = raw_data.apply(get_raw_reaction, axis=1)
|
|
rank_series = raw_series.rank(pct=True)
|
|
raw_data['Reaction'] = round(rank_series * 100)
|
|
|
|
def get_raw_arm(df_data):
|
|
of_arm = None
|
|
of_pos = None
|
|
if pd.notna(df_data['Range RF']):
|
|
of_pos = 'RF'
|
|
elif pd.notna(df_data['Range CF']):
|
|
of_pos = 'CF'
|
|
elif pd.notna(df_data['Range LF']):
|
|
of_pos = 'LF'
|
|
|
|
if of_pos is not None:
|
|
if df_data['Arm OF'] < 0:
|
|
of_raw = df_data['Arm OF'] * -10
|
|
else:
|
|
of_raw = (5 - df_data['Arm OF'])
|
|
|
|
if of_pos == 'RF':
|
|
of_raw = of_raw * 1.5
|
|
of_raw += ((6 - df_data['Range RF']) * 4)
|
|
elif of_pos == 'CF':
|
|
of_raw += ((6 - df_data['Range CF']) * 3)
|
|
elif of_pos == 'LF':
|
|
of_raw = of_raw / 2
|
|
of_raw += ((6 - df_data['Range LF']) * 2)
|
|
|
|
of_arm = of_raw
|
|
|
|
if_arm = None
|
|
if pd.notna(df_data['Range 3B']) or pd.notna(df_data['Range 2B']) or pd.notna(df_data['Range 1B']) or \
|
|
pd.notna(df_data['Range SS']):
|
|
range_totals = 0
|
|
if pd.notna(df_data['Range 3B']):
|
|
range_totals += ((6 - df_data['Range 3B']) * 5)
|
|
if pd.notna(df_data['Range SS']):
|
|
range_totals += ((6 - df_data['Range SS']) * 4)
|
|
if pd.notna(df_data['Range 2B']):
|
|
range_totals += ((6 - df_data['Range 2B']) * 3)
|
|
if pd.notna(df_data['Range 1B']):
|
|
range_totals += (6 - df_data['Range 1B'])
|
|
if_arm = 100 - (50 - range_totals)
|
|
|
|
c_arm = None
|
|
if pd.notna(df_data['Arm C']):
|
|
if df_data['Arm C'] == -5:
|
|
c_arm = 100
|
|
else:
|
|
temp_arm = 20 + ((10 - df_data['Arm C']) * 3) + (20 - df_data['PB C']) + (20 - df_data['Throw C']) - \
|
|
df_data['Error C']
|
|
c_arm = min(100, temp_arm)
|
|
|
|
if c_arm is not None:
|
|
return c_arm
|
|
elif of_arm is not None:
|
|
return of_arm
|
|
elif if_arm is not None:
|
|
return if_arm
|
|
else:
|
|
return 1
|
|
|
|
raw_series = raw_data.apply(get_raw_arm, axis=1)
|
|
rank_series = raw_series.rank(pct=True)
|
|
raw_data['Arm'] = round(rank_series * 100)
|
|
|
|
def get_raw_fielding(df_data):
|
|
if_error, of_error, c_error = 0, 0, 0
|
|
denom = 0
|
|
if pd.notna(df_data['Error 3B']) or pd.notna(df_data['Error 2B']) or pd.notna(df_data['Error 1B']) or \
|
|
pd.notna(df_data['Error SS']):
|
|
raw_if = 100
|
|
if pd.notna(df_data['Error 3B']):
|
|
raw_if -= (df_data['Error 3B'] * 2)
|
|
if pd.notna(df_data['Error SS']):
|
|
raw_if -= (df_data['Error SS'] * .75)
|
|
if pd.notna(df_data['Error 2B']):
|
|
raw_if -= (df_data['Error 2B'] * 1.25)
|
|
if pd.notna(df_data['Error 1B']):
|
|
raw_if -= (df_data['Error 1B'] * 2)
|
|
|
|
if_error = max(1, raw_if)
|
|
denom += 1
|
|
|
|
if pd.notna(df_data['Error LF']) or pd.notna(df_data['Error CF']) or pd.notna(df_data['Error RF']):
|
|
raw_of = 100
|
|
if pd.notna(df_data['Error LF']):
|
|
raw_of -= (df_data['Error LF'] * 2)
|
|
if pd.notna(df_data['Error CF']):
|
|
raw_of -= (df_data['Error CF'] * .75)
|
|
if pd.notna(df_data['Error RF']):
|
|
raw_of -= (df_data['Error RF'] * 1.25)
|
|
|
|
of_error = max(1, raw_of)
|
|
denom += 1
|
|
|
|
if pd.notna(df_data['Error C']):
|
|
c_error = max(100 - (df_data['Error C'] * 5) - df_data['Throw C'] - df_data['PB C'], 1)
|
|
denom += 1
|
|
|
|
return sum([if_error, of_error, c_error]) / max(denom, 1)
|
|
|
|
raw_series = raw_data.apply(get_raw_fielding, axis=1)
|
|
rank_series = raw_series.rank(pct=True)
|
|
raw_data['Fielding'] = round(rank_series * 100)
|
|
|
|
rank_series = raw_data['avg_vl'].rank(pct=True)
|
|
raw_data['Contact L'] = round(rank_series * 100)
|
|
|
|
rank_series = raw_data['avg_vr'].rank(pct=True)
|
|
raw_data['Contact R'] = round(rank_series * 100)
|
|
|
|
rank_series = raw_data['slg_vl'].rank(pct=True)
|
|
raw_data['Power L'] = round(rank_series * 100)
|
|
|
|
rank_series = raw_data['slg_vr'].rank(pct=True)
|
|
raw_data['Power R'] = round(rank_series * 100)
|
|
|
|
def get_raw_vision(df_data):
|
|
return (
|
|
((((df_data['obp_vr'] * 0.67) + (df_data['obp_vl'] * 0.33)) -
|
|
((df_data['avg_vr'] * 0.67) + (df_data['avg_vl'] * 0.33))) * 5) -
|
|
(((df_data['strikeout_vl'] * 0.33) + (df_data['strikeout_vr'] * 0.67)) / 208)
|
|
)
|
|
|
|
raw_series = raw_data.apply(get_raw_vision, axis=1)
|
|
rank_series = raw_series.rank(pct=True)
|
|
raw_data['Vision'] = round(rank_series * 100)
|
|
|
|
def get_raw_rating(df_data):
|
|
return (
|
|
((df_data['Reaction'] + df_data['Arm'] + df_data['Fielding']) * 2) +
|
|
(df_data['Speed'] + df_data['Steal']) +
|
|
((((df_data['Contact R'] + df_data['Power R']) * 0.67) +
|
|
((df_data['Contact L'] + df_data['Power L']) * 0.33) + df_data['Vision'] ) * 6
|
|
)
|
|
)
|
|
|
|
raw_series = raw_data.apply(get_raw_rating, axis=1)
|
|
rank_series = raw_series.rank(pct=True)
|
|
raw_data['Rating'] = round(rank_series * 100)
|
|
|
|
output = raw_data[[
|
|
'player_id', 'player_name', 'Rating', 'Contact R', 'Contact L', 'Power R', 'Power L', 'Vision', 'Speed',
|
|
'Steal', 'Reaction', 'Arm', 'Fielding', 'hand', 'cardset_name'
|
|
]]
|
|
|
|
csv_file = pd.DataFrame(output).to_csv(index=False)
|
|
with open(BASIC_FILE, 'w') as file:
|
|
file.write(csv_file)
|
|
|
|
return Response(content=csv_file, media_type='text/csv')
|
|
|
|
|
|
@router.get('/{ratings_id}', response_model=BattingCardRatingsModel)
|
|
async def get_one_rating(ratings_id: int, token: str = Depends(oauth2_scheme)):
|
|
if not valid_token(token):
|
|
logging.warning(f'Bad Token: {token}')
|
|
db.close()
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail='You are not authorized to pull card ratings.'
|
|
)
|
|
|
|
this_rating = BattingCardRatings.get_or_none(BattingCardRatings.id == ratings_id)
|
|
if this_rating is None:
|
|
db.close()
|
|
raise HTTPException(status_code=404, detail=f'BattingCardRating id {ratings_id} not found')
|
|
|
|
r_data = model_to_dict(this_rating)
|
|
db.close()
|
|
return r_data
|
|
|
|
|
|
@router.get('/player/{player_id}', response_model=BattingCardRatingsModel)
|
|
async def get_player_ratings(
|
|
player_id: int, variant: list = Query(default=None), short_output: bool = False,
|
|
token: str = Depends(oauth2_scheme)):
|
|
if not valid_token(token):
|
|
logging.warning(f'Bad Token: {token}')
|
|
db.close()
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail='You are not authorized to pull card ratings.'
|
|
)
|
|
|
|
all_cards = BattingCard.select().where(BattingCard.player_id == player_id).order_by(BattingCard.variant)
|
|
if variant is not None:
|
|
all_cards = all_cards.where(BattingCard.variant << variant)
|
|
|
|
all_ratings = BattingCardRatings.select().where(BattingCardRatings.battingcard << all_cards)
|
|
|
|
return_val = {'count': all_ratings.count(), 'ratings': [
|
|
model_to_dict(x, recurse=not short_output) for x in all_ratings
|
|
]}
|
|
db.close()
|
|
return return_val
|
|
|
|
|
|
@router.put('', include_in_schema=PRIVATE_IN_SCHEMA)
|
|
async def put_ratings(ratings: RatingsList, token: str = Depends(oauth2_scheme)):
|
|
if not valid_token(token):
|
|
logging.warning(f'Bad Token: {token}')
|
|
db.close()
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail='You are not authorized to post card ratings.'
|
|
)
|
|
|
|
new_ratings = []
|
|
updates = 0
|
|
for x in ratings.ratings:
|
|
try:
|
|
BattingCardRatings.get(
|
|
(BattingCardRatings.battingcard_id == x.battingcard_id) & (BattingCardRatings.vs_hand == x.vs_hand)
|
|
)
|
|
updates += BattingCardRatings.update(x.dict()).where(
|
|
(BattingCardRatings.battingcard_id == x.battingcard_id) & (BattingCardRatings.vs_hand == x.vs_hand)
|
|
).execute()
|
|
except BattingCardRatings.DoesNotExist:
|
|
new_ratings.append(x.dict())
|
|
|
|
with db.atomic():
|
|
for batch in chunked(new_ratings, 30):
|
|
BattingCardRatings.insert_many(batch).on_conflict_replace().execute()
|
|
|
|
db.close()
|
|
return f'Updated ratings: {updates}; new ratings: {len(new_ratings)}'
|
|
|
|
|
|
@router.delete('/{ratings_id}', include_in_schema=PRIVATE_IN_SCHEMA)
|
|
async def delete_rating(
|
|
ratings_id: int, token: str = Depends(oauth2_scheme)):
|
|
if not valid_token(token):
|
|
logging.warning(f'Bad Token: {token}')
|
|
db.close()
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail='You are not authorized to post card ratings.'
|
|
)
|
|
|
|
this_rating = BattingCardRatings.get_or_none(BattingCardRatings.id == ratings_id)
|
|
if this_rating is None:
|
|
db.close()
|
|
raise HTTPException(status_code=404, detail=f'BattingCardRating id {ratings_id} not found')
|
|
|
|
count = this_rating.delete_instance()
|
|
db.close()
|
|
|
|
if count == 1:
|
|
return f'Rating {this_rating} has been deleted'
|
|
else:
|
|
raise HTTPException(status_code=500, detail=f'Rating {this_rating} could not be deleted')
|
|
|
|
|