fielding and batting functional

Still need pitching and setting player positions
This commit is contained in:
Cal Corum 2023-09-24 18:58:44 -05:00
parent 443eaa3a41
commit 4bde7f60ea
5 changed files with 569 additions and 184 deletions

View File

@ -1,6 +1,8 @@
import decimal
import logging
import math
import random
import pydantic
from creation_helpers import mround
@ -10,9 +12,10 @@ from decimal import Decimal
class BattingCardRatingsModel(pydantic.BaseModel):
battingcard_id: int
bat_hand: Literal['R', 'L', 'S']
vs_hand: Literal['R', 'L']
all_hits: Decimal = Decimal(0.0)
other_ob: Decimal = Decimal(0.0)
all_other_ob: Decimal = Decimal(0.0)
all_outs: Decimal = Decimal(0.0)
rem_singles: Decimal = Decimal(0.0)
rem_xbh: Decimal = Decimal(0.0)
@ -21,6 +24,9 @@ class BattingCardRatingsModel(pydantic.BaseModel):
hard_rate: Decimal
med_rate: Decimal
soft_rate: Decimal
pull_rate: Decimal
center_rate: Decimal
slap_rate: Decimal
homerun: Decimal = Decimal(0.0)
bp_homerun: Decimal = Decimal(0.0)
triple: Decimal = Decimal(0.0)
@ -50,12 +56,18 @@ class BattingCardRatingsModel(pydantic.BaseModel):
slg: Decimal = 0.0
def total_chances(self):
return sum([
return Decimal(sum([
self.homerun, self.bp_homerun, self.triple, self.double_three, self.double_two, self.double_pull,
self.single_two, self.single_one, self.single_center, self.bp_single, self.hbp, self.walk, self.strikeout,
self.lineout, self.popout, self.flyout_a, self.flyout_bq, self.flyout_lf_b, self.flyout_rf_b,
self.groundout_a, self.groundout_b, self.groundout_c
])
]))
def total_hits(self):
return Decimal(sum([
self.homerun, self.bp_homerun, self.triple, self.double_three, self.double_two, self.double_pull,
self.single_two, self.single_one, self.single_center, self.bp_single
]))
def rem_hits(self):
return (self.all_hits -
@ -65,14 +77,14 @@ class BattingCardRatingsModel(pydantic.BaseModel):
]))
def rem_outs(self):
return (self.all_outs -
sum([
self.strikeout, self.lineout, self.popout, self.flyout_a, self.flyout_bq, self.flyout_lf_b,
self.flyout_rf_b, self.groundout_a, self.groundout_b, self.groundout_c
]))
return Decimal(self.all_outs -
sum([
self.strikeout, self.lineout, self.popout, self.flyout_a, self.flyout_bq, self.flyout_lf_b,
self.flyout_rf_b, self.groundout_a, self.groundout_b, self.groundout_c
]))
def rem_other_ob(self):
return self.other_ob - self.hbp - self.walk
return self.all_other_ob - self.hbp - self.walk
def calculate_singles(self, szn_singles, szn_hits, ifh_rate: Decimal):
tot = sanitize_chance_output(self.all_hits * Decimal((szn_singles * .8) / szn_hits))
@ -110,10 +122,86 @@ class BattingCardRatingsModel(pydantic.BaseModel):
if szn_triples > 0 and self.rem_xbh > 0:
self.triple = sanitize_chance_output(self.rem_xbh, min_chances=0.5)
if self.rem_xbh > 0:
logging.error(f'Adding {self.rem_xbh} results to all outs')
print(self)
self.all_outs += self.rem_xbh
logging.error(f'Adding {self.rem_xbh} results to all other ob')
# print(self)
self.all_other_ob += self.rem_xbh
def calculate_other_ob(self, szn_bb, szn_hbp):
self.hbp = hit_by_pitch(self.all_other_ob, szn_hbp, szn_bb)
self.walk = sanitize_chance_output(self.all_other_ob - self.hbp)
if self.walk + self.hbp < self.all_other_ob:
rem = self.all_other_ob - self.walk - self.hbp
logging.error(f'Adding {rem} chances to all_outs')
# print(self)
self.all_outs += Decimal(rem)
def calculate_strikeouts(self, szn_so, szn_ab, szn_hits):
self.strikeout = strikeouts(self.all_outs, (szn_so / (szn_ab - szn_hits)))
def calculate_other_outs(self, fb_rate, ld_rate, gb_rate, szn_gidp, szn_ab):
self.rem_flyballs = sanitize_chance_output(self.rem_outs() * Decimal(fb_rate))
self.flyout_a = flyout_a(self.rem_flyballs, self.hard_rate)
self.rem_flyballs -= self.flyout_a
self.flyout_bq = flyout_bq(self.rem_flyballs, self.soft_rate)
self.rem_flyballs -= self.flyout_bq
self.flyout_lf_b = flyout_b(
self.rem_flyballs,
pull_rate=self.pull_rate if self.bat_hand == 'R' else self.slap_rate,
cent_rate=self.center_rate
)
self.rem_flyballs -= self.flyout_lf_b
self.flyout_rf_b = sanitize_chance_output(self.rem_flyballs)
self.rem_flyballs -= self.flyout_rf_b
if self.rem_flyballs > 0:
logging.debug(f'Adding {self.rem_flyballs} chances to lineouts')
tot_oneouts = sanitize_chance_output(self.rem_outs() * Decimal(ld_rate / (ld_rate + gb_rate)))
self.lineout = sanitize_chance_output(Decimal(random.random()) * tot_oneouts)
self.popout = sanitize_chance_output(tot_oneouts - self.lineout)
self.groundout_a = groundball_a(self.rem_outs(), szn_gidp, szn_ab)
self.groundout_c = groundball_c(self.rem_outs(), self.med_rate)
self.groundout_b = self.rem_outs()
def calculate_rate_stats(self):
self.avg = Decimal(round(self.total_hits() / 108, 3))
self.obp = Decimal(round((self.total_hits() + self.hbp + self.walk) / 108, 3))
self.slg = Decimal(round(
self.homerun * 4 + self.triple * 3 + self.single_center + self.single_two + self.single_two +
(self.double_two + self.double_three + self.double_two + self.bp_homerun) * 2 + self.bp_single / 2
))
def custom_to_dict(self):
return {
'battingcard_id': self.battingcard_id,
'vs_hand': self.vs_hand,
'homerun': float(self.homerun),
'bp_homerun': float(self.bp_homerun),
'triple': float(self.triple),
'double_three': float(self.double_three),
'double_two': float(self.double_two),
'double_pull': float(self.double_pull),
'single_two': float(self.single_two),
'single_one': float(self.single_one),
'single_center': float(self.single_center),
'bp_single': float(self.bp_single),
'hbp': float(self.hbp),
'walk': float(self.walk),
'strikeout': float(self.strikeout),
'lineout': float(self.lineout),
'popout': float(self.popout),
'flyout_a': float(self.flyout_a),
'flyout_bq': float(self.flyout_bq),
'flyout_lf_b': float(self.flyout_lf_b),
'flyout_rf_b': float(self.flyout_rf_b),
'groundout_a': float(self.groundout_a),
'groundout_b': float(self.groundout_b),
'groundout_c': float(self.groundout_c)
}
# def total_chances(chance_data):
# sum_chances = 0
@ -128,7 +216,7 @@ def sanitize_chance_output(total_chances, min_chances=1.0, rounding=0.05):
# r_val = mround(total_chances) if total_chances >= min_chances else 0
r_val = Decimal(total_chances) if total_chances >= min_chances else Decimal(0)
logging.debug(f'r_val: {r_val}')
return Decimal(float(round(total_chances / Decimal(rounding)) * Decimal(rounding))).quantize(Decimal("0.05"))
return Decimal(float(round(r_val / Decimal(rounding)) * Decimal(rounding))).quantize(Decimal("0.05"))
# return r_val.quantize(Decimal(rounding))
@ -203,38 +291,38 @@ def two_doubles(all_doubles, soft_rate):
def hit_by_pitch(other_ob, hbps, walks):
if hbps == 0 or other_ob * (hbps / (hbps + walks)) < 1:
if hbps == 0 or other_ob * Decimal(hbps / (hbps + walks)) < 1:
return 0
else:
return mround(other_ob * (hbps / (hbps + walks)), base=1.0)
return sanitize_chance_output(other_ob * Decimal(hbps / (hbps + walks)), rounding=1.0)
def strikeouts(all_outs, k_rate):
if all_outs == 0 or k_rate == 0:
return 0
return Decimal(0)
else:
return mround(all_outs * k_rate)
return sanitize_chance_output(all_outs * Decimal(k_rate))
def flyout_a(all_flyouts, hard_rate):
if all_flyouts == 0 or hard_rate < .4:
return 0
return Decimal(0)
else:
return 1
return Decimal(1.0)
def flyout_bq(rem_flyouts, soft_rate):
if rem_flyouts == 0 or soft_rate < .1:
return 0
return Decimal(0)
else:
return mround(rem_flyouts * soft_rate * 3)
return sanitize_chance_output(rem_flyouts * min(soft_rate * 3, Decimal(.75)))
def flyout_b(rem_flyouts, pull_rate, cent_rate):
if rem_flyouts == 0 or pull_rate == 0:
return 0
return Decimal(0)
else:
return mround(rem_flyouts * (pull_rate + cent_rate / 2))
return sanitize_chance_output(rem_flyouts * (pull_rate + cent_rate / 2))
def popouts(rem_outs, iffb_rate):
@ -246,18 +334,18 @@ def popouts(rem_outs, iffb_rate):
def groundball_a(all_groundouts, gidps, abs):
if all_groundouts == 0 or gidps == 0:
return 0
return Decimal(0)
else:
return mround((min(gidps ** 2.5, abs) / abs) * all_groundouts)
return sanitize_chance_output(Decimal(min(gidps ** 2.5, abs) / abs) * all_groundouts)
def groundball_c(rem_groundouts, med_rate):
if rem_groundouts == 0 or med_rate < .4:
return 0
return Decimal(0)
elif med_rate > .6:
return mround(rem_groundouts)
return sanitize_chance_output(rem_groundouts)
else:
return mround(rem_groundouts * med_rate)
return sanitize_chance_output(rem_groundouts * med_rate)
def stealing(chances: int, sb2s: int, cs2s: int, sb3s: int, cs3s: int, season_pct: float):
@ -409,45 +497,111 @@ def hit_and_run(ab_vl: int, ab_vr: int, hits_vl: int, hits_vr: int, hr_vl: int,
return 'D'
def get_batter_ratings(df_data) -> List[BattingCardRatingsModel]:
def get_batter_ratings(df_data) -> List[dict]:
# Consider a sliding offense_mod based on OPS; floor of 1x and ceiling of 1.5x ?
offense_mod = 1.2
vl = BattingCardRatingsModel(
battingcard_id=df_data.key_fangraphs,
battingcard_id=df_data.battingcard_id,
bat_hand=df_data['bat_hand'],
vs_hand='L',
all_hits=mround(108 * offense_mod * df_data['AVG_vL']),
other_ob=mround(108 * offense_mod * ((df_data['BB_vL'] + df_data['HBP_vL']) / df_data['PA_vL'])),
all_other_ob=mround(108 * offense_mod * ((df_data['BB_vL'] + df_data['HBP_vL']) / df_data['PA_vL'])),
hard_rate=df_data['Hard%_vL'],
med_rate=df_data['Med%_vL'],
soft_rate=df_data['Soft%_vL']
soft_rate=df_data['Soft%_vL'],
pull_rate=df_data['Pull%_vL'],
center_rate=df_data['Cent%_vL'],
slap_rate=df_data['Oppo%_vL']
)
vr = BattingCardRatingsModel(
battingcard_id=df_data.key_fangraphs,
battingcard_id=df_data.battingcard_id,
bat_hand=df_data['bat_hand'],
vs_hand='R',
all_hits=mround(108 * offense_mod * df_data['AVG_vR']),
other_ob=mround(108 * offense_mod * ((df_data['BB_vR'] + df_data['HBP_vR']) / df_data['PA_vR'])),
all_other_ob=mround(108 * offense_mod * ((df_data['BB_vR'] + df_data['HBP_vR']) / df_data['PA_vR'])),
hard_rate=df_data['Hard%_vR'],
med_rate=df_data['Med%_vR'],
soft_rate=df_data['Soft%_vR']
soft_rate=df_data['Soft%_vR'],
pull_rate=df_data['Pull%_vR'],
center_rate=df_data['Cent%_vR'],
slap_rate=df_data['Oppo%_vR']
)
vl.all_outs = Decimal(108 - vl.all_hits - vl.other_ob).quantize(Decimal("0.05"))
vr.all_outs = Decimal(108 - vr.all_hits - vr.other_ob).quantize(Decimal("0.05"))
vl.all_outs = Decimal(108 - vl.all_hits - vl.all_other_ob).quantize(Decimal("0.05"))
vr.all_outs = Decimal(108 - vr.all_hits - vr.all_other_ob).quantize(Decimal("0.05"))
vl.calculate_singles(df_data['1B_vL'], df_data['H_vL'], Decimal(df_data['IFH%_vL']))
vr.calculate_singles(df_data['1B_vR'], df_data['H_vR'], Decimal(df_data['IFH%_vR']))
logging.debug(
f'vL - All Hits: {vl.all_hits} / Other OB: {vl.other_ob} / All Outs: {vl.all_outs} '
f'/ Total: {vl.all_hits + vl.other_ob + vl.all_outs}'
f'vL - All Hits: {vl.all_hits} / Other OB: {vl.all_other_ob} / All Outs: {vl.all_outs} '
f'/ Total: {vl.all_hits + vl.all_other_ob + vl.all_outs}'
)
logging.debug(
f'vR - All Hits: {vr.all_hits} / Other OB: {vr.other_ob} / All Outs: {vr.all_outs} '
f'/ Total: {vr.all_hits + vr.other_ob + vr.all_outs}'
f'vR - All Hits: {vr.all_hits} / Other OB: {vr.all_other_ob} / All Outs: {vr.all_outs} '
f'/ Total: {vr.all_hits + vr.all_other_ob + vr.all_outs}'
)
vl.calculate_xbh(df_data['3B_vL'], df_data['2B_vL'], df_data['HR_vL'], df_data['HR/FB_vL'])
vr.calculate_xbh(df_data['3B_vR'], df_data['2B_vR'], df_data['HR_vR'], df_data['HR/FB_vR'])
logging.info(f'all_hits: {vl.all_hits} / sum of hits: {Decimal(vl.bp_single + vl.single_one + vl.single_two + vl.single_center + vl.double_two + vl.double_pull + vl.double_three + vl.triple + vl.homerun + vl.bp_homerun)}')
logging.info(f'all_hits: {vr.all_hits} / sum of hits: {Decimal(vr.bp_single + vr.single_one + vr.single_two + vr.single_center + vr.double_two + vr.double_pull + vr.double_three + vr.triple + vr.homerun + vr.bp_homerun)}')
logging.debug(f'all_hits: {vl.all_hits} / sum of hits: {vl.total_chances()}')
logging.debug(f'all_hits: {vr.all_hits} / sum of hits: {vr.total_chances()}')
return [vl, vr]
vl.calculate_other_ob(df_data['BB_vL'], df_data['HBP_vL'])
vr.calculate_other_ob(df_data['BB_vR'], df_data['HBP_vR'])
logging.debug(f'all on base: {vl.hbp + vl.walk + vl.total_hits()} / all chances: {vl.total_chances()}'
f'{"*******ERROR ABOVE*******" if vl.hbp + vl.walk + vl.total_hits() != vl.total_chances() else ""}')
logging.debug(f'all on base: {vr.hbp + vr.walk + vr.total_hits()} / all chances: {vr.total_chances()}'
f'{"*******ERROR ABOVE*******" if vr.hbp + vr.walk + vr.total_hits() != vr.total_chances() else ""}')
vl.calculate_strikeouts(df_data['SO_vL'], df_data['AB_vL'], df_data['H_vL'])
vr.calculate_strikeouts(df_data['SO_vR'], df_data['AB_vR'], df_data['H_vR'])
logging.debug(f'K rate vL: {round(vl.strikeout / vl.all_outs, 2)} / '
f'K rate vR: {round(vr.strikeout / vr.all_outs, 2)}')
vl.calculate_other_outs(
df_data['FB%_vL'], df_data['LD%_vL'], df_data['GB%_vL'], df_data['GDP_vL'], df_data['AB_vL']
)
vr.calculate_other_outs(
df_data['FB%_vR'], df_data['LD%_vR'], df_data['GB%_vR'], df_data['GDP_vR'], df_data['AB_vR']
)
# Correct total chance errors
for x in [vl, vr]:
if x.total_chances() < 108:
diff = Decimal(108) - x.total_chances()
logging.error(f'Adding {diff} strikeouts to close gap')
x.strikeout += diff
elif x.total_chances() > 108:
diff = x.total_chances() - Decimal(108)
logging.error(f'Have surplus of {diff} chances')
if x.strikeout + 1 > diff:
logging.error(f'Subtracting {diff} strikeouts to close gap')
x.strikeout -= diff
elif x.lineout + 1 > diff:
logging.error(f'Subtracting {diff} lineouts to close gap')
x.lineout -= diff
elif x.groundout_a + 1 > diff:
logging.error(f'Subtracting {diff} gbA to close gap')
x.groundout_a -= diff
elif x.groundout_b + 1 > diff:
logging.error(f'Subtracting {diff} gbB to close gap')
x.groundout_b -= diff
elif x.groundout_c + 1 > diff:
logging.error(f'Subtracting {diff} gbC to close gap')
x.groundout_c -= diff
vl_total_chances = vl.total_chances()
vr_total_chances = vr.total_chances()
if vl_total_chances != 108:
logging.error(f'total chances for {df_data.name} come to {vl_total_chances}')
else:
logging.debug(f'total chances: {vl_total_chances}')
if vr_total_chances != 108:
logging.error(f'total chances for {df_data.name} come to {vr_total_chances}')
else:
logging.debug(f'total chances: {vr_total_chances}')
return [vl.custom_to_dict(), vr.custom_to_dict()]

View File

@ -1,6 +1,11 @@
import csv
import datetime
import pandas as pd
import random
import logging
from db_calls import db_get
from db_calls_card_creation import *
D20_CHANCES = {
@ -407,6 +412,93 @@ TESTING = False
YES = ['y', 'yes', 'yeet', 'please', 'yeah']
async def pd_players_df(cardset_id: int):
p_query = await db_get(
'players',
params=[('inc_dex', False), ('cardset_id', cardset_id), ('short_output', True)]
)
if p_query['count'] == 0:
raise ValueError(f'No players returned from Paper Dynasty API')
return pd.DataFrame(p_query['players'])
async def pd_battingcards_df(cardset_id: int):
bc_query = await db_get('battingcards', params=[('cardset_id', cardset_id), ('short_output', True)])
if bc_query['count'] == 0:
raise ValueError(f'No batting cards returned from Paper Dynasty API')
return pd.DataFrame(bc_query['cards']).rename(columns={'id': 'battingcard_id', 'player': 'player_id'})
async def pd_battingcardratings_df(cardset_id: int):
vl_query = await db_get(
'battingcardratings', params=[('cardset_id', cardset_id), ('vs_hand', 'L'), ('short_output', True)])
vr_query = await db_get(
'battingcardratings', params=[('cardset_id', cardset_id), ('vs_hand', 'R'), ('short_output', True)])
if 0 in [vl_query['count'], vr_query['count']]:
raise ValueError(f'No batting card ratings returned from Paper Dynasty API')
vl = pd.DataFrame(vl_query['ratings'])
vr = pd.DataFrame(vr_query['ratings'])
ratings = (pd.merge(vl, vr, on='battingcard', suffixes=('_vL', '_vR'))
.rename(columns={'battingcard': 'battingcard_id'}))
def get_total_ops(df_data):
ops_vl = df_data['obp_vL'] + df_data['slg_vL']
ops_vr = df_data['obp_vR'] + df_data['slg_vR']
return (ops_vr + ops_vl + min(ops_vl, ops_vr)) / 3
ratings['total_OPS'] = ratings.apply(get_total_ops, axis=1)
def new_rarity_id(df_data):
if df_data['total_OPS'] >= 1.2:
return 99
elif df_data['total_OPS'] >= 1:
return 1
elif df_data['total_OPS'] >= .9:
return 2
elif df_data['total_OPS'] >= .8:
return 3
elif df_data['total_OPS'] >= .7:
return 4
else:
return 5
ratings['new_rarity_id'] = ratings.apply(new_rarity_id, axis=1)
return ratings
# return pd.DataFrame(bcr_query['ratings']).rename(columns={'battingcard': 'battingcard_id'})
def get_batting_stats(file_path: str = None, start_date: datetime.datetime = None, end_date: datetime.datetime = None):
if file_path is not None:
vl_basic = pd.read_csv(f'{file_path}vlhp-basic.csv').query('PA >= 20')
vr_basic = pd.read_csv(f'{file_path}vrhp-basic.csv').query('PA >= 40')
total_basic = pd.merge(vl_basic, vr_basic, on="playerId", suffixes=('_vL', '_vR'))
vl_rate = pd.read_csv(f'{file_path}vlhp-rate.csv').query('PA >= 20')
vr_rate = pd.read_csv(f'{file_path}vrhp-rate.csv').query('PA >= 40')
total_rate = pd.merge(vl_rate, vr_rate, on="playerId", suffixes=('_vL', '_vR'))
return pd.merge(total_basic, total_rate, on="playerId", suffixes=('', '_rate'))
else:
raise LookupError(f'Date-based stat pulls not implemented, yet. Please provide batting csv files.')
def get_pitching_stats(file_path: str = None, start_date: datetime.datetime = None, end_date: datetime.datetime = None):
if file_path is not None:
vl_basic = pd.read_csv(f'{file_path}vlhh-basic.csv').query('TBF >= 20')
vr_basic = pd.read_csv(f'{file_path}vrhh-basic.csv').query('TBF >= 40')
total_basic = pd.merge(vl_basic, vr_basic, on="playerId", suffixes=('_vL', '_vR'))
vl_rate = pd.read_csv(f'{file_path}vlhh-rate.csv').query('TBF >= 20')
vr_rate = pd.read_csv(f'{file_path}vrhh-rate.csv').query('TBF >= 40')
total_rate = pd.merge(vl_rate, vr_rate, on="playerId", suffixes=('_vL', '_vR'))
return pd.merge(total_basic, total_rate, on="playerId", suffixes=('', '_rate'))
else:
raise LookupError(f'Date-based stat pulls not implemented, yet. Please provide batting csv files.')
def mround(x, prec=2, base=.05):
return round(base * round(float(x) / base), prec)

View File

@ -43,7 +43,7 @@ async def db_get(
retries = 0
while True:
try:
resp = requests.get(req_url, timeout=timeout)
resp = requests.get(req_url, timeout=timeout, headers=AUTH_TOKEN)
break
except requests.ReadTimeout as e:
logging.error(f'Get Timeout: {req_url} / retries: {retries} / timeout: {timeout}')

View File

View File

@ -15,7 +15,8 @@ import pybaseball as pb
import pydantic
import sys
from db_calls import db_get, db_put, db_post
from creation_helpers import pd_players_df, get_batting_stats, pd_battingcards_df, pd_battingcardratings_df
from db_calls import db_get, db_put, db_post, db_patch
from typing import Literal
from bs4 import BeautifulSoup
@ -26,7 +27,7 @@ logging.basicConfig(
format='%(asctime)s - card-creation - %(levelname)s - %(message)s',
level=log_level
)
CARD_BASE_URL = 'https://sombaseball.ddns.net/cards/pd'
CARD_BASE_URL = 'https://pd.manticorum.com/api/players'
def sanitize_name(start_name: str) -> str:
@ -101,6 +102,7 @@ async def main(args):
print(f'I do not see a cardset named {cardset_name}')
return
cardset = c_query['cardsets'][0]
del c_query
if 'season' in arg_data:
season = arg_data['season']
@ -115,20 +117,11 @@ async def main(args):
print(f'Cardset ID: {cardset["id"]} / Season: {season}\nGame count: {game_count} / Season %: {season_pct}\n')
start_time = datetime.datetime.now()
release_directory = f'{season}-{datetime.datetime.now().month}{datetime.datetime.now().day}'
release_directory = f'{datetime.datetime.now().year}-{datetime.datetime.now().month}-{datetime.datetime.now().day}'
input_path = f'data-input/{cardset["name"]} Cardset/'
print('Reading batting stats...')
vl_basic = pd.read_csv(f'{input_path}vlhp-basic.csv').query('PA >= 20')
vr_basic = pd.read_csv(f'{input_path}vrhp-basic.csv').query('PA >= 40')
total_basic = pd.merge(vl_basic, vr_basic, on="playerId", suffixes=('_vL', '_vR'))
vl_rate = pd.read_csv(f'{input_path}vlhp-rate.csv').query('PA >= 20')
vr_rate = pd.read_csv(f'{input_path}vrhp-rate.csv').query('PA >= 40')
total_rate = pd.merge(vl_rate, vr_rate, on="playerId", suffixes=('_vL', '_vR'))
all_batting = pd.merge(total_basic, total_rate, on="playerId", suffixes=('', '_rate'))
del vl_basic, vr_basic, total_basic, vl_rate, vr_rate, total_rate
all_batting = get_batting_stats(file_path=input_path)
print(f'Processed {len(all_batting.values)} batters\n')
def get_pids(df_data):
@ -146,15 +139,13 @@ async def main(args):
return 'R'
print(f'Pulling PD player IDs...')
p_query = await db_get('players', params=[('inc_dex', False), ('cardset_id', cardset['id'])])
if p_query['count'] == 0:
raise ValueError(f'No players returned from Paper Dynasty API')
pd_players = pd.DataFrame(p_query['players']).rename(columns={'bbref_id': 'key_bbref'})
pd_players = await pd_players_df(cardset['id'])
# .set_index('bbref_id', drop=False)
print(f'Now pulling mlbam player IDs...')
ids_and_names = all_batting.apply(get_pids, axis=1)
player_data = (ids_and_names
.merge(pd_players, left_on='key_bbref', right_on='key_bbref')
.merge(pd_players, left_on='key_bbref', right_on='bbref_id')
.query('key_mlbam == key_mlbam')
.set_index('key_bbref', drop=False))
print(f'Matched mlbam to pd players.')
@ -167,7 +158,7 @@ async def main(args):
new_players.append({
'p_name': f'{f_name} {l_name}',
'cost': 99999,
'image': f'{CARD_BASE_URL}/{release_directory}/{f_name.lower()}-{l_name.lower()}.png',
'image': f'{CARD_BASE_URL}/{df_data["player_id"]}/card?d={release_directory}',
'mlbclub': 'None',
'franchise': 'None',
'cardset_id': cardset['id'],
@ -178,6 +169,7 @@ async def main(args):
'bbref_id': df_data.name,
'fangr_id': int(float(df_data['key_fangraphs']))
})
player_data[player_data['player_id'].isnull()].apply(create_players, axis=1)
print(f'Creating {len(new_players)} new players...')
for x in new_players:
@ -190,6 +182,7 @@ async def main(args):
).set_index('key_bbref', drop=False)
del ids_and_names, all_batting, pd_players
print(f'Player IDs linked to batting stats.\n{len(final_batting.values)} players remain\n')
print(f'Reading baserunning stats...')
run_data = (pd.read_csv(f'{input_path}running.csv')
.set_index('Name-additional'))
@ -198,27 +191,28 @@ async def main(args):
del final_batting, run_data
print(f'Stats are tallied\n{len(offense_stats.values)} players remain\n\nCollecting defensive data from bbref...')
# print(f'Pulling pitcher defense...')
# df_p = cde.get_bbref_fielding_df('p', season)
# print(f'Pulling catcher defense...')
# df_c = cde.get_bbref_fielding_df('c', season)
# print(f'Pulling first base defense...')
# df_1b = cde.get_bbref_fielding_df('1b', season)
# print(f'Pulling second base defense...')
# df_2b = cde.get_bbref_fielding_df('2b', season)
# print(f'Pulling third base defense...')
# df_3b = cde.get_bbref_fielding_df('3b', season)
# print(f'Pulling short stop defense...')
# df_ss = cde.get_bbref_fielding_df('ss', season)
# print(f'Pulling left field defense...')
# df_lf = cde.get_bbref_fielding_df('lf', season)
# print(f'Pulling center field defense...')
# df_cf = cde.get_bbref_fielding_df('cf', season)
# print(f'Pulling right field defense...')
# df_rf = cde.get_bbref_fielding_df('rf', season)
# print(f'Pulling outfield defense...')
# df_of = cde.get_bbref_fielding_df('of', season)
print(f'Positions data is retrieved')
if 'pull_fielding' in arg_data and arg_data['pull_fielding'].lower() == 'true':
print(f'Pulling pitcher defense...')
df_p = cde.get_bbref_fielding_df('p', season)
print(f'Pulling catcher defense...')
df_c = cde.get_bbref_fielding_df('c', season)
print(f'Pulling first base defense...')
df_1b = cde.get_bbref_fielding_df('1b', season)
print(f'Pulling second base defense...')
df_2b = cde.get_bbref_fielding_df('2b', season)
print(f'Pulling third base defense...')
df_3b = cde.get_bbref_fielding_df('3b', season)
print(f'Pulling short stop defense...')
df_ss = cde.get_bbref_fielding_df('ss', season)
print(f'Pulling left field defense...')
df_lf = cde.get_bbref_fielding_df('lf', season)
print(f'Pulling center field defense...')
df_cf = cde.get_bbref_fielding_df('cf', season)
print(f'Pulling right field defense...')
df_rf = cde.get_bbref_fielding_df('rf', season)
print(f'Pulling outfield defense...')
df_of = cde.get_bbref_fielding_df('of', season)
print(f'Positions data is retrieved')
batting_cards = []
@ -254,103 +248,108 @@ async def main(args):
print(f'Calculating batting cards...')
offense_stats.apply(create_batting_card, axis=1)
print(f'Cards are complete.\n\nPosting cards now...')
# resp = await db_put('battingcards', payload={'cards': batting_cards}, timeout=30)
# print(f'Response: {resp}\n')
if 'post_updates' not in arg_data or arg_data['post_updates'].lower() == 'true':
resp = await db_put('battingcards', payload={'cards': batting_cards}, timeout=30)
print(f'Response: {resp}\n\nMatching batting card database IDs to player stats...')
offense_stats = pd.merge(
offense_stats, await pd_battingcards_df(cardset['id']), on='player_id')
position_payload = []
# def create_positions(df_data):
# for pos_data in [(df_1b, '1b'), (df_2b, '2b'), (df_3b, '3b'), (df_ss, 'ss')]:
# if df_data.name in pos_data[0].index:
# logging.debug(f'Running {pos_data[1]} stats for {player_data.at[df_data.name, "p_name"]}')
# position_payload.append({
# "player_id": int(player_data.at[df_data.name, 'player_id']),
# "position": pos_data[1].upper(),
# "innings": float(pos_data[0].at[df_data.name, 'Inn_def']),
# "range": cde.get_if_range(
# pos_code=pos_data[1],
# tz_runs=int(pos_data[0].at[df_data.name, 'tz_runs_total']),
# r_dp=0,
# season_pct=season_pct
# ),
# "error": cde.get_any_error(
# pos_code=pos_data[1],
# errors=int(pos_data[0].at[df_data.name, 'E_def']),
# chances=int(pos_data[0].at[df_data.name, 'chances']),
# season_pct=season_pct
# )
# })
#
# of_arms = []
# of_payloads = []
# for pos_data in [(df_lf, 'lf'), (df_cf, 'cf'), (df_rf, 'rf')]:
# if df_data.name in pos_data[0].index:
# of_payloads.append({
# "player_id": int(player_data.at[df_data.name, 'player_id']),
# "position": pos_data[1].upper(),
# "innings": float(pos_data[0].at[df_data.name, 'Inn_def']),
# "range": cde.get_of_range(
# pos_code=pos_data[1],
# tz_runs=int(pos_data[0].at[df_data.name, 'tz_runs_total']),
# season_pct=season_pct
# )
# })
# of_arms.append(int(pos_data[0].at[df_data.name, 'bis_runs_outfield']))
#
# if df_data.name in df_of.index and len(of_arms) > 0 and len(of_payloads) > 0:
# error_rating = cde.get_any_error(
# pos_code=pos_data[1],
# errors=int(df_of.at[df_data.name, 'E_def']),
# chances=int(df_of.at[df_data.name, 'chances']),
# season_pct=season_pct
# )
# arm_rating = cde.arm_outfield(of_arms)
# for f in of_payloads:
# f['error'] = error_rating
# f['arm'] = arm_rating
# position_payload.append(f)
#
# if df_data.name in df_c.index:
# if df_c.at[df_data.name, 'SB'] + df_c.at[df_data.name, 'CS'] == 0:
# arm_rating = 3
# else:
# arm_rating = cde.arm_catcher(
# cs_pct=df_c.at[df_data.name, 'caught_stealing_perc'],
# raa=int(df_c.at[df_data.name, 'bis_runs_catcher_sb']),
# season_pct=season_pct
# )
# position_payload.append({
# "player_id": int(player_data.at[df_data.name, 'player_id']),
# "position": 'C',
# "innings": float(df_c.at[df_data.name, 'Inn_def']),
# "range": cde.range_catcher(
# rs_value=int(df_c.at[df_data.name, 'tz_runs_catcher']),
# season_pct=season_pct
# ),
# "error": cde.get_any_error(
# pos_code='c',
# errors=int(df_c.at[df_data.name, 'E_def']),
# chances=int(df_c.at[df_data.name, 'chances']),
# season_pct=season_pct
# ),
# "arm": arm_rating,
# "pb": cde.pb_catcher(
# pb=int(df_c.at[df_data.name, 'PB']),
# innings=int(float(df_c.at[df_data.name, 'Inn_def'])),
# season_pct=season_pct
# ),
# "overthrow": cde.ot_catcher(
# errors=int(df_c.at[df_data.name, 'E_def']),
# chances=int(df_c.at[df_data.name, 'chances']),
# season_pct=season_pct
# )
# })
#
# print(f'Calculating fielding lines now...')
# offense_stats.apply(create_positions, axis=1)
# print(f'Fielding is complete.\n\nPosting positions now...')
# resp = await db_put('cardpositions', payload={'positions': position_payload}, timeout=30)
# print(f'Response: {resp}\n')
def create_positions(df_data):
for pos_data in [(df_1b, '1b'), (df_2b, '2b'), (df_3b, '3b'), (df_ss, 'ss')]:
if df_data.name in pos_data[0].index:
logging.debug(f'Running {pos_data[1]} stats for {player_data.at[df_data.name, "p_name"]}')
position_payload.append({
"player_id": int(player_data.at[df_data.name, 'player_id']),
"position": pos_data[1].upper(),
"innings": float(pos_data[0].at[df_data.name, 'Inn_def']),
"range": cde.get_if_range(
pos_code=pos_data[1],
tz_runs=int(pos_data[0].at[df_data.name, 'tz_runs_total']),
r_dp=0,
season_pct=season_pct
),
"error": cde.get_any_error(
pos_code=pos_data[1],
errors=int(pos_data[0].at[df_data.name, 'E_def']),
chances=int(pos_data[0].at[df_data.name, 'chances']),
season_pct=season_pct
)
})
of_arms = []
of_payloads = []
for pos_data in [(df_lf, 'lf'), (df_cf, 'cf'), (df_rf, 'rf')]:
if df_data.name in pos_data[0].index:
of_payloads.append({
"player_id": int(player_data.at[df_data.name, 'player_id']),
"position": pos_data[1].upper(),
"innings": float(pos_data[0].at[df_data.name, 'Inn_def']),
"range": cde.get_of_range(
pos_code=pos_data[1],
tz_runs=int(pos_data[0].at[df_data.name, 'tz_runs_total']),
season_pct=season_pct
)
})
of_arms.append(int(pos_data[0].at[df_data.name, 'bis_runs_outfield']))
if df_data.name in df_of.index and len(of_arms) > 0 and len(of_payloads) > 0:
error_rating = cde.get_any_error(
pos_code=pos_data[1],
errors=int(df_of.at[df_data.name, 'E_def']),
chances=int(df_of.at[df_data.name, 'chances']),
season_pct=season_pct
)
arm_rating = cde.arm_outfield(of_arms)
for f in of_payloads:
f['error'] = error_rating
f['arm'] = arm_rating
position_payload.append(f)
if df_data.name in df_c.index:
if df_c.at[df_data.name, 'SB'] + df_c.at[df_data.name, 'CS'] == 0:
arm_rating = 3
else:
arm_rating = cde.arm_catcher(
cs_pct=df_c.at[df_data.name, 'caught_stealing_perc'],
raa=int(df_c.at[df_data.name, 'bis_runs_catcher_sb']),
season_pct=season_pct
)
position_payload.append({
"player_id": int(player_data.at[df_data.name, 'player_id']),
"position": 'C',
"innings": float(df_c.at[df_data.name, 'Inn_def']),
"range": cde.range_catcher(
rs_value=int(df_c.at[df_data.name, 'tz_runs_catcher']),
season_pct=season_pct
),
"error": cde.get_any_error(
pos_code='c',
errors=int(df_c.at[df_data.name, 'E_def']),
chances=int(df_c.at[df_data.name, 'chances']),
season_pct=season_pct
),
"arm": arm_rating,
"pb": cde.pb_catcher(
pb=int(df_c.at[df_data.name, 'PB']),
innings=int(float(df_c.at[df_data.name, 'Inn_def'])),
season_pct=season_pct
),
"overthrow": cde.ot_catcher(
errors=int(df_c.at[df_data.name, 'E_def']),
chances=int(df_c.at[df_data.name, 'chances']),
season_pct=season_pct
)
})
if 'pull_fielding' in arg_data and arg_data['pull_fielding'].lower() == 'true':
print(f'Calculating fielding lines now...')
offense_stats.apply(create_positions, axis=1)
print(f'Fielding is complete.\n\nPosting positions now...')
if 'post_updates' not in arg_data or arg_data['post_updates'].lower() == 'true':
resp = await db_put('cardpositions', payload={'positions': position_payload}, timeout=30)
print(f'Response: {resp}\n')
batting_ratings = []
@ -361,14 +360,154 @@ async def main(args):
print(f'Calculating card ratings...')
offense_stats.apply(create_batting_card_ratings, axis=1)
print(f'Ratings are complete\n\nPosting ratings now...')
# resp = await db_put('battingcardratings', payload={'ratings': batting_ratings}, timeout=30)
if 'post_updates' not in arg_data or arg_data['post_updates'].lower() == 'true':
resp = await db_put('battingcardratings', payload={'ratings': batting_ratings}, timeout=30)
print(f'Response: {resp}\n\nPulling fresh PD player data...')
# Update player record with positions, rarity, cost
# Cost only changes if starting cost is 99999 or calculated rarity is different than current
"""
Pull fresh pd_players and set_index to player_id
Pull fresh battingcards and set_index to player
Pull fresh battingcardratings one hand at a time and join on battingcard (suffixes _vl and vR)
run_time = datetime.datetime.now() - start_time
Join battingcards (left) with battingcardratings (right) as total_ratings on id (left) and battingcard (right)
Join pd_players (left) with total_ratings (right) on indeces
Output: PD player list with batting card, ratings vL, and ratings vR
Calculate Total OPS as OPSvL + OPSvR + min(OPSvL, OPSvR) / 3 and assign rarity_id
For players with cost of 99999, set cost to <Rarity Base Cost> * Total OPS / <Rarity Avg OPS>
"""
p_data = await pd_players_df(cardset['id'])
p_data.set_index('player_id', drop=False)
total_ratings = pd.merge(
await pd_battingcards_df(cardset['id']),
await pd_battingcardratings_df(cardset['id']),
on='battingcard_id'
)
player_data = pd.merge(
p_data,
total_ratings,
on='player_id'
).set_index('player_id', drop=False)
del total_ratings, p_data, offense_stats
player_updates = {} # { <player_id> : [ (param pairs) ] }
rarity_group = player_data.query('rarity == new_rarity_id').groupby('rarity')
average_ops = rarity_group['total_OPS'].mean().to_dict()
# cost_groups = rarity_group['cost'].mean()
def get_player_updates(df_data):
base_costs = {
1: 810,
2: 270,
3: 90,
4: 30,
5: 10,
99: 2400
}
params = []
if release_directory not in df_data['image']:
params.extend([('image', f'{CARD_BASE_URL}/{df_data["player_id"]}/card?d={release_directory}')])
if df_data['cost'] == 99999:
params.extend([
('cost',
round(base_costs[df_data['new_rarity_id']] * df_data['total_OPS'] /
average_ops[df_data['new_rarity_id']])),
('rarity_id', df_data['new_rarity_id'])
])
elif df_data['rarity'] != df_data['new_rarity_id']:
old_rarity = df_data['rarity']
new_rarity = df_data['new_rarity_id']
old_cost = df_data['cost']
new_cost = 0
if old_rarity == 1:
if new_rarity == 2:
new_cost = max(old_cost - 540, 100)
elif new_rarity == 3:
new_cost = max(old_cost - 720, 50)
elif new_rarity == 4:
new_cost = max(old_cost - 780, 15)
elif new_rarity == 5:
new_cost = max(old_cost - 800, 5)
elif new_rarity == 99:
new_cost = old_cost + 1600
elif old_rarity == 2:
if new_rarity == 1:
new_cost = old_cost + 540
elif new_rarity == 3:
new_cost = max(old_cost - 180, 50)
elif new_rarity == 4:
new_cost = max(old_cost - 240, 15)
elif new_rarity == 5:
new_cost = max(old_cost - 260, 5)
elif new_rarity == 99:
new_cost = old_cost + 2140
elif old_rarity == 3:
if new_rarity == 1:
new_cost = old_cost + 720
elif new_rarity == 2:
new_cost = old_cost + 180
elif new_rarity == 4:
new_cost = max(old_cost - 60, 15)
elif new_rarity == 5:
new_cost = max(old_cost - 80, 5)
elif new_rarity == 99:
new_cost = old_cost + 2320
elif old_rarity == 4:
if new_rarity == 1:
new_cost = old_cost + 780
elif new_rarity == 2:
new_cost = old_cost + 240
elif new_rarity == 3:
new_cost = old_cost + 60
elif new_rarity == 5:
new_cost = max(old_cost - 20, 5)
elif new_rarity == 99:
new_cost = old_cost + 2380
elif old_rarity == 5:
if new_rarity == 1:
new_cost = old_cost + 800
elif new_rarity == 2:
new_cost = old_cost + 260
elif new_rarity == 3:
new_cost = old_cost + 80
elif new_rarity == 4:
new_cost = old_cost + 20
elif new_rarity == 99:
new_cost = old_cost + 2400
elif old_rarity == 99:
if new_rarity == 1:
new_cost = max(old_cost - 1600, 800)
elif new_rarity == 2:
new_cost = max(old_cost - 2140, 100)
elif new_rarity == 3:
new_cost = max(old_cost - 2320, 50)
elif new_rarity == 4:
new_cost = max(old_cost - 2380, 15)
elif new_rarity == 5:
new_cost = max(old_cost - 2400, 5)
if new_cost != 0:
params.extend([('cost', new_cost), ('rarity_id', new_rarity)])
if len(params) > 0:
player_updates[df_data.name] = params
player_data.apply(get_player_updates, axis=1)
print(f'Sending {len(player_updates)} player updates to PD database...')
if 'post_updates' not in arg_data or arg_data['post_updates'].lower() == 'true':
for x in player_updates:
await db_patch('players', object_id=x, params=player_updates[x])
print(f'Batter updates are complete')
start_time_two = datetime.datetime.now()
run_time = start_time_two - start_time
print(f'Total batting cards: {len(batting_cards)}\nNew cardset batters: {len(new_players)}\n'
f'Program runtime: {round(run_time.total_seconds())} seconds')
f'Batter runtime: {round(run_time.total_seconds())} seconds')
if __name__ == '__main__':