Batting cards and ratings being calculated; began positions

This commit is contained in:
Cal Corum 2024-10-19 23:02:32 -05:00
parent c2b0d93a02
commit d8e30ec5f9
5 changed files with 502 additions and 132 deletions

View File

@ -12,49 +12,49 @@ class BattingCardRatingsModel(pydantic.BaseModel):
battingcard_id: int
bat_hand: Literal['R', 'L', 'S']
vs_hand: Literal['R', 'L']
all_hits: Decimal = Decimal(0.0)
all_other_ob: Decimal = Decimal(0.0)
all_outs: Decimal = Decimal(0.0)
rem_singles: Decimal = Decimal(0.0)
rem_xbh: Decimal = Decimal(0.0)
rem_hr: Decimal = Decimal(0.0)
rem_doubles: Decimal = Decimal(0.0)
hard_rate: Decimal
med_rate: Decimal
soft_rate: Decimal
pull_rate: Decimal
center_rate: Decimal
slap_rate: Decimal
homerun: Decimal = Decimal(0.0)
bp_homerun: Decimal = Decimal(0.0)
triple: Decimal = Decimal(0.0)
double_three: Decimal = Decimal(0.0)
double_two: Decimal = Decimal(0.0)
double_pull: Decimal = Decimal(0.0)
single_two: Decimal = Decimal(0.0)
single_one: Decimal = Decimal(0.0)
single_center: Decimal = Decimal(0.0)
bp_single: Decimal = Decimal(0.0)
hbp: Decimal = Decimal(0.0)
walk: Decimal = Decimal(0.0)
strikeout: Decimal = Decimal(0.0)
lineout: Decimal = Decimal(0.0)
popout: Decimal = Decimal(0.0)
rem_flyballs: Decimal = Decimal(0.0)
flyout_a: Decimal = Decimal(0.0)
flyout_bq: Decimal = Decimal(0.0)
flyout_lf_b: Decimal = Decimal(0.0)
flyout_rf_b: Decimal = Decimal(0.0)
rem_groundballs: Decimal = Decimal(0.0)
groundout_a: Decimal = Decimal(0.0)
groundout_b: Decimal = Decimal(0.0)
groundout_c: Decimal = Decimal(0.0)
avg: Decimal = 0.0
obp: Decimal = 0.0
slg: Decimal = 0.0
all_hits: float = 0.0
all_other_ob: float = 0.0
all_outs: float = 0.0
rem_singles: float = 0.0
rem_xbh: float = 0.0
rem_hr: float = 0.0
rem_doubles: float = 0.0
hard_rate: float
med_rate: float
soft_rate: float
pull_rate: float
center_rate: float
slap_rate: float
homerun: float = 0.0
bp_homerun: float = 0.0
triple: float = 0.0
double_three: float = 0.0
double_two: float = 0.0
double_pull: float = 0.0
single_two: float = 0.0
single_one: float = 0.0
single_center: float = 0.0
bp_single: float = 0.0
hbp: float = 0.0
walk: float = 0.0
strikeout: float = 0.0
lineout: float = 0.0
popout: float = 0.0
rem_flyballs: float = 0.0
flyout_a: float = 0.0
flyout_bq: float = 0.0
flyout_lf_b: float = 0.0
flyout_rf_b: float = 0.0
rem_groundballs: float = 0.0
groundout_a: float = 0.0
groundout_b: float = 0.0
groundout_c: float = 0.0
avg: float = 0.0
obp: float = 0.0
slg: float = 0.0
def total_chances(self):
return Decimal(sum([
return mround(sum([
self.homerun, self.bp_homerun, self.triple, self.double_three, self.double_two, self.double_pull,
self.single_two, self.single_one, self.single_center, self.bp_single, self.hbp, self.walk, self.strikeout,
self.lineout, self.popout, self.flyout_a, self.flyout_bq, self.flyout_lf_b, self.flyout_rf_b,
@ -62,7 +62,7 @@ class BattingCardRatingsModel(pydantic.BaseModel):
]))
def total_hits(self):
return Decimal(sum([
return mround(sum([
self.homerun, self.bp_homerun, self.triple, self.double_three, self.double_two, self.double_pull,
self.single_two, self.single_one, self.single_center, self.bp_single
]))
@ -75,7 +75,7 @@ class BattingCardRatingsModel(pydantic.BaseModel):
]))
def rem_outs(self):
return Decimal(self.all_outs -
return mround(self.all_outs -
sum([
self.strikeout, self.lineout, self.popout, self.flyout_a, self.flyout_bq, self.flyout_lf_b,
self.flyout_rf_b, self.groundout_a, self.groundout_b, self.groundout_c
@ -85,7 +85,7 @@ class BattingCardRatingsModel(pydantic.BaseModel):
return self.all_other_ob - self.hbp - self.walk
def calculate_singles(self, szn_singles, szn_hits, ifh_rate: Decimal):
tot = sanitize_chance_output(self.all_hits * Decimal((szn_singles * .8) / max(szn_hits, 1)))
tot = sanitize_chance_output(self.all_hits * mround((szn_singles * .8) / max(szn_hits, 1)))
logging.debug(f'tot: {tot}')
self.rem_singles = tot
@ -107,15 +107,15 @@ class BattingCardRatingsModel(pydantic.BaseModel):
self.triple = triples(self.rem_xbh, szn_triples, szn_doubles + szn_hr)
self.rem_xbh -= self.triple
tot_doubles = sanitize_chance_output(self.rem_xbh * Decimal(szn_doubles / max(szn_hr + szn_doubles, 1)))
tot_doubles = sanitize_chance_output(self.rem_xbh * mround(szn_doubles / max(szn_hr + szn_doubles, 1)))
self.double_two = two_doubles(tot_doubles, self.soft_rate)
self.double_pull = sanitize_chance_output(tot_doubles - self.double_two)
self.rem_xbh -= Decimal(self.double_two + self.double_pull)
self.rem_xbh -= mround(self.double_two + self.double_pull)
if (self.rem_xbh > Decimal(0)) and szn_hr > 0:
if (self.rem_xbh > mround(0)) and szn_hr > 0:
self.bp_homerun = bp_homeruns(self.rem_xbh, hr_per_fb)
self.homerun = sanitize_chance_output(self.rem_xbh - self.bp_homerun, min_chances=0.5)
self.rem_xbh -= Decimal(self.bp_homerun + self.homerun)
self.rem_xbh -= mround(self.bp_homerun + self.homerun)
if szn_triples > 0 and self.rem_xbh > 0:
logging.error(f'Adding {self.rem_xbh} results to triples')
@ -133,13 +133,13 @@ class BattingCardRatingsModel(pydantic.BaseModel):
rem = self.all_other_ob - self.walk - self.hbp
logging.error(f'Adding {rem} chances to all_outs')
# print(self)
self.all_outs += Decimal(rem)
self.all_outs += mround(rem)
def calculate_strikeouts(self, szn_so, szn_ab, szn_hits):
self.strikeout = strikeouts(self.all_outs, (szn_so / max(szn_ab - szn_hits, 1)))
def calculate_other_outs(self, fb_rate, ld_rate, gb_rate, szn_gidp, szn_ab):
self.rem_flyballs = sanitize_chance_output(self.rem_outs() * Decimal(fb_rate))
self.rem_flyballs = sanitize_chance_output(self.rem_outs() * mround(fb_rate))
self.flyout_a = flyout_a(self.rem_flyballs, self.hard_rate)
self.rem_flyballs -= self.flyout_a
@ -158,8 +158,8 @@ class BattingCardRatingsModel(pydantic.BaseModel):
if self.rem_flyballs > 0:
logging.debug(f'Adding {self.rem_flyballs} chances to lineouts')
tot_oneouts = sanitize_chance_output(self.rem_outs() * Decimal(ld_rate / max(ld_rate + gb_rate, .01)))
self.lineout = sanitize_chance_output(Decimal(random.random()) * tot_oneouts)
tot_oneouts = sanitize_chance_output(self.rem_outs() * mround(ld_rate / max(ld_rate + gb_rate, .01)))
self.lineout = sanitize_chance_output(mround(random.random()) * tot_oneouts)
self.popout = sanitize_chance_output(tot_oneouts - self.lineout)
self.groundout_a = groundball_a(self.rem_outs(), szn_gidp, szn_ab)
@ -167,9 +167,9 @@ class BattingCardRatingsModel(pydantic.BaseModel):
self.groundout_b = self.rem_outs()
def calculate_rate_stats(self):
self.avg = Decimal(round(self.total_hits() / 108, 3))
self.obp = Decimal(round((self.total_hits() + self.hbp + self.walk) / 108, 3))
self.slg = Decimal(round(
self.avg = mround(round(self.total_hits() / 108, 3))
self.obp = mround(round((self.total_hits() + self.hbp + self.walk) / 108, 3))
self.slg = mround(round(
self.homerun * 4 + self.triple * 3 + self.single_center + self.single_two + self.single_two +
(self.double_two + self.double_three + self.double_two + self.bp_homerun) * 2 + self.bp_single / 2
))
@ -178,31 +178,31 @@ class BattingCardRatingsModel(pydantic.BaseModel):
return {
'battingcard_id': self.battingcard_id,
'vs_hand': self.vs_hand,
'homerun': float(self.homerun),
'bp_homerun': float(self.bp_homerun),
'triple': float(self.triple),
'double_three': float(self.double_three),
'double_two': float(self.double_two),
'double_pull': float(self.double_pull),
'single_two': float(self.single_two),
'single_one': float(self.single_one),
'single_center': float(self.single_center),
'bp_single': float(self.bp_single),
'hbp': float(self.hbp),
'walk': float(self.walk),
'strikeout': float(self.strikeout),
'lineout': float(self.lineout),
'popout': float(self.popout),
'flyout_a': float(self.flyout_a),
'flyout_bq': float(self.flyout_bq),
'flyout_lf_b': float(self.flyout_lf_b),
'flyout_rf_b': float(self.flyout_rf_b),
'groundout_a': float(self.groundout_a),
'groundout_b': float(self.groundout_b),
'groundout_c': float(self.groundout_c),
'pull_rate': float(self.pull_rate),
'center_rate': float(self.center_rate),
'slap_rate': float(self.slap_rate)
'homerun': self.homerun,
'bp_homerun': self.bp_homerun,
'triple': self.triple,
'double_three': self.double_three,
'double_two': self.double_two,
'double_pull': self.double_pull,
'single_two': self.single_two,
'single_one': self.single_one,
'single_center': self.single_center,
'bp_single': self.bp_single,
'hbp': self.hbp,
'walk': self.walk,
'strikeout': mround(self.strikeout),
'lineout': self.lineout,
'popout': self.popout,
'flyout_a': self.flyout_a,
'flyout_bq': self.flyout_bq,
'flyout_lf_b': self.flyout_lf_b,
'flyout_rf_b': self.flyout_rf_b,
'groundout_a': self.groundout_a,
'groundout_b': self.groundout_b,
'groundout_c': self.groundout_c,
'pull_rate': self.pull_rate,
'center_rate': self.center_rate,
'slap_rate': self.slap_rate
}
# def total_chances(chance_data):
@ -220,27 +220,27 @@ def total_singles(all_hits, szn_singles, szn_hits):
def bp_singles(all_singles):
if all_singles < 6:
return Decimal(0)
return mround(0)
else:
return Decimal(5)
return mround(5)
def wh_singles(rem_singles, hard_rate):
if rem_singles == 0 or hard_rate < .2:
return 0
elif hard_rate > .4:
return sanitize_chance_output(rem_singles * Decimal(.666), min_chances=2)
return sanitize_chance_output(rem_singles * 2 / 3, min_chances=2)
else:
return sanitize_chance_output(rem_singles * Decimal(.333), min_chances=2)
return sanitize_chance_output(rem_singles / 3, min_chances=2)
def one_singles(rem_singles, ifh_rate, force_rem=False):
if force_rem:
return mround(rem_singles)
elif rem_singles == 0 or ifh_rate < .05:
return Decimal(0)
return mround(0)
else:
return sanitize_chance_output(rem_singles * ifh_rate * Decimal(3), min_chances=2)
return sanitize_chance_output(rem_singles * ifh_rate * mround(3), min_chances=2)
def all_homeruns(rem_hits, all_hits, hrs, hits, singles):
@ -252,7 +252,7 @@ def all_homeruns(rem_hits, all_hits, hrs, hits, singles):
def nd_homeruns(all_hr, hr_rate):
if all_hr == 0 or hr_rate == 0:
return Decimal(0)
return mround(0)
elif hr_rate > .2:
return sanitize_chance_output(all_hr * .6)
else:
@ -261,23 +261,23 @@ def nd_homeruns(all_hr, hr_rate):
def bp_homeruns(all_hr, hr_rate):
if all_hr == 0 or hr_rate == 0:
return Decimal(0)
return mround(0)
elif hr_rate > .2:
return sanitize_chance_output(all_hr * Decimal(.4), rounding=1.0)
return sanitize_chance_output(all_hr * mround(.4), rounding=1.0)
else:
return sanitize_chance_output(all_hr * Decimal(.8), rounding=1.0)
return sanitize_chance_output(all_hr * mround(.8), rounding=1.0)
def triples(all_xbh, tr_count, do_count):
if all_xbh == Decimal(0) or tr_count == Decimal(0):
return Decimal(0)
if all_xbh == mround(0) or tr_count == mround(0):
return mround(0)
else:
return sanitize_chance_output(all_xbh * Decimal(tr_count / max(tr_count + do_count, 1)), min_chances=1)
return sanitize_chance_output(all_xbh * mround(tr_count / max(tr_count + do_count, 1)), min_chances=1)
def two_doubles(all_doubles, soft_rate):
if all_doubles == 0 or soft_rate == 0:
return Decimal(0)
return mround(0)
elif soft_rate > .2:
return sanitize_chance_output(all_doubles / 2)
else:
@ -285,36 +285,36 @@ def two_doubles(all_doubles, soft_rate):
def hit_by_pitch(other_ob, hbps, walks):
if hbps == 0 or other_ob * Decimal(hbps / max(hbps + walks, 1)) < 1:
if hbps == 0 or other_ob * mround(hbps / max(hbps + walks, 1)) < 1:
return 0
else:
return sanitize_chance_output(other_ob * Decimal(hbps / max(hbps + walks, 1)), rounding=1.0)
return sanitize_chance_output(other_ob * mround(hbps / max(hbps + walks, 1)), rounding=1.0)
def strikeouts(all_outs, k_rate):
if all_outs == 0 or k_rate == 0:
return Decimal(0)
return mround(0)
else:
return sanitize_chance_output(all_outs * Decimal(k_rate))
return sanitize_chance_output(all_outs * k_rate)
def flyout_a(all_flyouts, hard_rate):
if all_flyouts == 0 or hard_rate < .4:
return Decimal(0)
return mround(0)
else:
return Decimal(1.0)
return mround(1.0)
def flyout_bq(rem_flyouts, soft_rate):
if rem_flyouts == 0 or soft_rate < .1:
return Decimal(0)
return mround(0)
else:
return sanitize_chance_output(rem_flyouts * min(soft_rate * 3, Decimal(.75)))
return sanitize_chance_output(rem_flyouts * min(soft_rate * 3, mround(.75)))
def flyout_b(rem_flyouts, pull_rate, cent_rate):
if rem_flyouts == 0 or pull_rate == 0:
return Decimal(0)
return mround(0)
else:
return sanitize_chance_output(rem_flyouts * (pull_rate + cent_rate / 2))
@ -328,14 +328,14 @@ def popouts(rem_outs, iffb_rate):
def groundball_a(all_groundouts, gidps, abs):
if all_groundouts == 0 or gidps == 0:
return Decimal(0)
return mround(0)
else:
return sanitize_chance_output(Decimal(min(gidps ** 2.5, abs) / max(abs, 1)) * all_groundouts)
return sanitize_chance_output(mround(min(gidps ** 2.5, abs) / max(abs, 1)) * all_groundouts)
def groundball_c(rem_groundouts, med_rate):
if rem_groundouts == 0 or med_rate < .4:
return Decimal(0)
return mround(0)
elif med_rate > .6:
return sanitize_chance_output(rem_groundouts)
else:
@ -529,11 +529,11 @@ def get_batter_ratings(df_data) -> List[dict]:
center_rate=df_data['Cent%_vR'],
slap_rate=df_data['Oppo%_vR']
)
vl.all_outs = Decimal(108 - vl.all_hits - vl.all_other_ob).quantize(Decimal("0.05"))
vr.all_outs = Decimal(108 - vr.all_hits - vr.all_other_ob).quantize(Decimal("0.05"))
vl.all_outs = mround(108 - vl.all_hits - vl.all_other_ob) #.quantize(Decimal("0.05"))
vr.all_outs = mround(108 - vr.all_hits - vr.all_other_ob) #.quantize(Decimal("0.05"))
vl.calculate_singles(df_data['1B_vL'], df_data['H_vL'], Decimal(df_data['IFH%_vL']))
vr.calculate_singles(df_data['1B_vR'], df_data['H_vR'], Decimal(df_data['IFH%_vR']))
vl.calculate_singles(df_data['1B_vL'], df_data['H_vL'], mround(df_data['IFH%_vL']))
vr.calculate_singles(df_data['1B_vR'], df_data['H_vR'], mround(df_data['IFH%_vR']))
logging.debug(
f'vL - All Hits: {vl.all_hits} / Other OB: {vl.all_other_ob} / All Outs: {vl.all_outs} '
@ -574,11 +574,11 @@ def get_batter_ratings(df_data) -> List[dict]:
# Correct total chance errors
for x in [vl, vr]:
if x.total_chances() < 108:
diff = Decimal(108) - x.total_chances()
diff = mround(108) - x.total_chances()
logging.error(f'Adding {diff} strikeouts to close gap')
x.strikeout += diff
elif x.total_chances() > 108:
diff = x.total_chances() - Decimal(108)
diff = x.total_chances() - mround(108)
logging.error(f'Have surplus of {diff} chances')
if x.strikeout + 1 > diff:
logging.error(f'Subtracting {diff} strikeouts to close gap')

106
batters/stat_prep.py Normal file
View File

@ -0,0 +1,106 @@
import pandas as pd
import pydantic
from pydantic import root_validator, validator
from typing import Literal, Optional
class DataMismatchError(Exception):
pass
class BattingCardModel(pydantic.BaseModel):
player_id: Optional[int] = None
variant: int = 0
steal_low: int = 3
steal_high: int = 20
steal_auto: bool = False
steal_jump: float = 0
bunting: str = 'C'
hit_and_run: str = 'C'
running: int = 10
offense_col: int = None
hand: Literal['R', 'L', 'S'] = 'R'
class CardPositionModel(pydantic.BaseModel):
player_id: int
variant: int = 0
position: Literal['P', 'C', '1B', '2B', '3B', 'SS', 'LF', 'CF', 'RF', 'DH']
innings: int = 1
range: int = 5
error: int = 0
arm: Optional[int] = None
pb: Optional[int] = None
overthrow: Optional[int] = None
@root_validator
def position_validator(cls, values):
if values['position'] in ['C', 'LF', 'CF', 'RF'] and values['arm'] is None:
raise ValueError(f'{values["position"]} must have an arm rating')
if values['position'] == 'C' and (values['pb'] is None or values['overthrow'] is None):
raise ValueError('Catchers must have a pb and overthrow rating')
return values
class BattingCardRatingsModel(pydantic.BaseModel):
battingcard_id: int
vs_hand: Literal['R', 'L', 'vR', 'vL']
homerun: float = 0.0
bp_homerun: float = 0.0
triple: float = 0.0
double_three: float = 0.0
double_two: float = 0.0
double_pull: float = 0.0
single_two: float = 0.0
single_one: float = 0.0
single_center: float = 0.0
bp_single: float = 0.0
hbp: float = 0.0
walk: float = 0.0
strikeout: float = 0.0
lineout: float = 0.0
popout: float = 0.0
flyout_a: float = 0.0
flyout_bq: float = 0.0
flyout_lf_b: float = 0.0
flyout_rf_b: float = 0.0
groundout_a: float = 0.0
groundout_b: float = 0.0
groundout_c: float = 0.0
avg: float = 0.0
obp: float = 0.0
slg: float = 0.0
pull_rate: float = 0.0
center_rate: float = 0.0
slap_rate: float = 0.0
@validator("avg", always=True)
def avg_validator(cls, v, values, **kwargs):
return (values['homerun'] + values['bp_homerun'] / 2 + values['triple'] + values['double_three'] +
values['double_two'] + values['double_pull'] + values['single_two'] + values['single_one'] +
values['single_center'] + values['bp_single'] / 2) / 108
@validator("obp", always=True)
def obp_validator(cls, v, values, **kwargs):
return ((values['hbp'] + values['walk']) / 108) + values['avg']
@validator("slg", always=True)
def slg_validator(cls, v, values, **kwargs):
return (values['homerun'] * 4 + values['bp_homerun'] * 2 + values['triple'] * 3 + values['double_three'] * 2 +
values['double_two'] * 2 + values['double_pull'] * 2 + values['single_two'] + values['single_one'] +
values['single_center'] + values['bp_single'] / 2) / 108
@root_validator
def validate_chance_total(cls, values):
total_chances = (
values['homerun'] + values['bp_homerun'] + values['triple'] + values['double_three'] +
values['double_two'] + values['double_pull'] + values['single_two'] + values['single_one'] +
values['single_center'] + values['bp_single'] + values['hbp'] + values['walk'] +
values['strikeout'] + values['lineout'] + values['popout'] + values['flyout_a'] +
values['flyout_bq'] + values['flyout_lf_b'] + values['flyout_rf_b'] + values['groundout_a'] +
values['groundout_b'] + values['groundout_c'])
if round(total_chances) != 108:
raise ValueError(f'BC {values["battingcard_id"]} must have exactly 108 chances on the card '
f'{values["vs_hand"]}; {round(total_chances)} listed')
return values

View File

@ -865,7 +865,7 @@ def sanitize_chance_output(total_chances, min_chances=1.0, rounding=0.05):
logging.debug(f'sanitize: {total_chances} is less than min_chances ({min_chances}); returning 0')
return 0
rounded_decimal = round(Decimal(str(total_chances)) / Decimal(str(rounding))) * Decimal(str(rounding))
rounded_decimal = mround(Decimal(str(total_chances)) / Decimal(str(rounding)) * Decimal(str(rounding)))
exact_chances = [
Decimal('1.05'), Decimal('1.1'), Decimal('1.2'), Decimal('1.25'), Decimal('1.3'), Decimal('1.35'),
@ -879,7 +879,7 @@ def sanitize_chance_output(total_chances, min_chances=1.0, rounding=0.05):
]
if rounded_decimal > exact_chances[-1]:
return float(rounded_decimal)
return rounded_decimal
for x in exact_chances:
if rounded_decimal <= x:

View File

@ -530,13 +530,18 @@ def get_bbref_fielding_df(
row_data = []
col_names = []
for cell in row.find_all('td'):
try:
if cell.has_attr('data-append-csv'):
player_id = cell['data-append-csv']
row_data.append(player_id)
if len(headers) == 0:
col_names.append('key_bbref')
except Exception as e:
pass
# try:
# player_id = cell['data-append-csv']
# row_data.append(player_id)
# if len(headers) == 0:
# col_names.append('key_bbref')
# except Exception as e:
# pass
row_data.append(cell.text)
if len(headers) == 0:
col_names.append(cell['data-stat'])

View File

@ -9,6 +9,11 @@ import pandas as pd
import pybaseball as pb
from pybaseball import cache
from creation_helpers import get_args
from batters.stat_prep import DataMismatchError
import batters.calcs_batter as cba
import defenders.calcs_defense as cde
cache.enable()
date = f'{datetime.datetime.now().year}-{datetime.datetime.now().month}-{datetime.datetime.now().day}'
log_level = logging.INFO
@ -17,9 +22,23 @@ logging.basicConfig(
format='%(asctime)s - retrosheet_data - %(levelname)s - %(message)s',
level=log_level
)
FILE_PATH = 'data-input/retrosheet/'
RETRO_FILE_PATH = 'data-input/retrosheet/'
EVENTS_FILENAME = 'retrosheets_events_1998_short.csv' # Removed last few columns which were throwing dtype errors
PERSONNEL_FILENAME = 'retrosheets_personnel.csv'
DATA_INPUT_FILE_PATH = 'data-input/1998 Season Cardset/'
MIN_PA_VL = 20
MIN_PA_VR = 40
MIN_TBF_VL = MIN_PA_VL
MIN_TBF_VR = MIN_PA_VR
async def store_defense_to_csv(season: int):
for position in ['c', '1b', '2b', '3b', 'ss', 'lf', 'cf', 'rf', 'of', 'p']:
pos_df = cde.get_bbref_fielding_df(position, season)
pos_df.to_csv(f'{DATA_INPUT_FILE_PATH}defense_{position}.csv')
await asyncio.sleep(8)
def get_events_by_date(file_path: str, start_date: int, end_date: int) -> pd.DataFrame:
@ -34,12 +53,25 @@ def get_result_series(plays: pd.DataFrame, event_type: str, pitcher_hand: Litera
return this_series
# def get_batting_handedness(plays: pd.DataFrame) -> pd.DataFrame:
def get_run_stat_df(input_path: str):
run_data = pd.read_csv(f'{input_path}running.csv') #.set_index('Name-additional'))
# if 'Player' in run_data:
# run_data = run_data.rename(columns={'Player': 'Full Name'})
# if 'Name' in run_data:
# run_data = run_data.rename(columns={'Name': 'Full Name'})
if 'Player-additional' in run_data:
run_data = run_data.rename(columns={'Player-additional': 'key_bbref'})
if 'Name-additional' in run_data:
run_data = run_data.rename(columns={'Name-additional': 'key_bbref'})
run_data = run_data[['key_bbref', 'ROE', 'XI', 'RS%', 'SBO', 'SB', 'CS', 'SB%', 'SB2', 'CS2', 'SB3', 'CS3', 'SBH', 'CSH', 'PO', 'PCS', 'OOB', 'OOB1', 'OOB2', 'OOB3', 'OOBHm', 'BT', 'XBT%', '1stS', '1stS2', '1stS3', '1stD', '1stD3', '1stDH', '2ndS', '2ndS3', '2ndSH']]
run_data = run_data.fillna(0)
return run_data.set_index('key_bbref')
def get_player_ids(plays: pd.DataFrame, which: Literal['batters', 'pitchers']) -> pd.DataFrame:
RETRO_PLAYERS = pd.read_csv(f'{FILE_PATH}{PERSONNEL_FILENAME}')
RETRO_PLAYERS = pd.read_csv(f'{RETRO_FILE_PATH}{PERSONNEL_FILENAME}')
id_key = 'batter_id' if which == 'batters' else 'pitcher_id'
players = pd.DataFrame()
@ -110,19 +142,18 @@ def get_base_batting_df(all_plays: pd.DataFrame) -> pd.DataFrame:
bs = pd.concat([bs, pal_series], axis=1)
par_series = all_plays[(all_plays.batter_event == 't') & (all_plays.pitcher_hand == 'r')].groupby('batter_id').count()['event_type'].astype(int).rename('PA_vR')
bs = pd.concat([bs, par_series], axis=1)
# bs = bs.dropna().query('PA_vL >= 20 & PA_vR >= 40')
abl_series = all_plays[(all_plays.ab == 't') & (all_plays.pitcher_hand == 'l')].groupby('batter_id').count()['event_type'].astype(int).rename('AB_vL')
bs = pd.concat([bs, abl_series], axis=1)
abr_series = all_plays[(all_plays.ab == 't') & (all_plays.pitcher_hand == 'r')].groupby('batter_id').count()['event_type'].astype(int).rename('AB_vR')
bs = pd.concat([bs, abr_series], axis=1)
return bs.dropna().query('PA_vL >= 20 & PA_vR >= 40')
return bs.dropna().query(f'PA_vL >= {MIN_PA_VL} & PA_vR >= {MIN_PA_VR}')
def get_batting_stats_by_date(file_path, start_date: int, end_date: int) -> pd.DataFrame:
def get_batting_stats_by_date(retro_file_path, start_date: int, end_date: int) -> pd.DataFrame:
start = datetime.datetime.now()
all_plays = get_events_by_date(file_path, start_date, end_date)
all_plays = get_events_by_date(retro_file_path, start_date, end_date)
print(f'Pull events: {(datetime.datetime.now() - start).total_seconds():.2f}s')
start = datetime.datetime.now()
@ -291,20 +322,248 @@ def get_batting_stats_by_date(file_path, start_date: int, end_date: int) -> pd.D
return batting_stats
async def main(args):
def calc_batting_cards(bs: pd.DataFrame) -> pd.DataFrame:
def create_batting_card(row):
steal_data = cba.stealing(
chances=int(row['SBO']),
sb2s=int(row['SB2']),
cs2s=int(row['CS2']),
sb3s=int(row['SB3']),
cs3s=int(row['CS3']),
season_pct=1.0
)
y = pd.DataFrame({
'key_bbref': [row['key_bbref']],
'steal_low': [steal_data[0]],
'steal_high': [steal_data[1]],
'steal_auto': [steal_data[2]],
'steal_jump': [steal_data[3]],
'hit_and_run': [cba.hit_and_run(
row['AB_vL'], row['AB_vR'], row['H_vL'], row['H_vR'],
row['HR_vL'], row['HR_vR'], row['SO_vL'], row['SO_vR']
)],
'bunt': [0],
'running': [cba.running(row['XBT%'])],
'hand': [row['bat_hand']],
})
return y.loc[0]
all_cards = bs.apply(create_batting_card, axis=1)
return all_cards
def calc_batter_ratings(bs: pd.DataFrame) -> pd.DataFrame:
def create_batting_rating(row):
ratings = cba.get_batter_ratings(row)
# list_of_ratings = ratings[0]
x = pd.DataFrame({
'key_bbref': [row['key_bbref']],
'ratings_vL': [ratings[0]],
'ratings_vR': [ratings[1]]
})
return x.loc[0]
all_ratings = bs.apply(create_batting_rating, axis=1)
return all_ratings
def calc_positions(bs: pd.DataFrame) -> pd.DataFrame:
def process_pos(row):
no_data = True
for pos_data in [(df_1b, '1b'), (df_2b, '2b'), (df_3b, '3b'), (df_ss, 'ss')]:
if row['key_bbref'] in pos_data[0].index:
logging.info(f'Running {pos_data[1]} stats for {row["p_name"]}')
try:
average_range = (int(pos_data[0].at[row["key_bbref"], 'tz_runs_total']) +
int(pos_data[0].at[row["key_bbref"], 'bis_runs_total']) +
min(
int(pos_data[0].at[row["key_bbref"], 'tz_runs_total']),
int(pos_data[0].at[row["key_bbref"], 'bis_runs_total'])
)) / 3
position_payload.append({ # TODO: convert position_payload to a list?
"player_id": int(row['player_id']),
"position": pos_data[1].upper(),
"innings": float(pos_data[0].at[row["key_bbref"], 'Inn_def']),
"range": get_if_range(
pos_code=pos_data[1],
tz_runs=round(average_range),
r_dp=0,
season_pct=season_pct
),
"error": get_any_error(
pos_code=pos_data[1],
errors=int(pos_data[0].at[row["key_bbref"], 'E_def']),
chances=int(pos_data[0].at[row["key_bbref"], 'chances']),
season_pct=season_pct
)
})
no_data = False
except Exception as e:
logging.info(f'Infield position failed: {e}')
of_arms = []
of_payloads = []
for pos_data in [(df_lf, 'lf'), (df_cf, 'cf'), (df_rf, 'rf')]:
if row["key_bbref"] in pos_data[0].index:
try:
average_range = (int(pos_data[0].at[row["key_bbref"], 'tz_runs_total']) +
int(pos_data[0].at[row["key_bbref"], 'bis_runs_total']) +
min(
int(pos_data[0].at[row["key_bbref"], 'tz_runs_total']),
int(pos_data[0].at[row["key_bbref"], 'bis_runs_total'])
)) / 3
of_payloads.append({
"player_id": int(row['player_id']),
"position": pos_data[1].upper(),
"innings": float(pos_data[0].at[row["key_bbref"], 'Inn_def']),
"range": get_of_range(
pos_code=pos_data[1],
tz_runs=round(average_range),
season_pct=season_pct
)
})
of_arms.append(int(pos_data[0].at[row["key_bbref"], 'bis_runs_outfield']))
no_data = False
except Exception as e:
logging.info(f'Outfield position failed: {e}')
if row["key_bbref"] in df_of.index and len(of_arms) > 0 and len(of_payloads) > 0:
try:
error_rating = get_any_error(
pos_code=pos_data[1],
errors=int(df_of.at[row["key_bbref"], 'E_def']),
chances=int(df_of.at[row["key_bbref"], 'chances']),
season_pct=season_pct
)
arm_rating = arm_outfield(of_arms)
for f in of_payloads:
f['error'] = error_rating
f['arm'] = arm_rating
position_payload.append(f)
no_data = False
except Exception as e:
logging.info(f'Outfield position failed: {e}')
if row["key_bbref"] in df_c.index:
try:
if df_c.at[row["key_bbref"], 'SB'] + df_c.at[row["key_bbref"], 'CS'] == 0:
arm_rating = 3
else:
arm_rating = arm_catcher(
cs_pct=df_c.at[row["key_bbref"], 'caught_stealing_perc'],
raa=int(df_c.at[row["key_bbref"], 'bis_runs_catcher_sb']),
season_pct=season_pct
)
position_payload.append({
"player_id": int(row['player_id']),
"position": 'C',
"innings": float(df_c.at[row["key_bbref"], 'Inn_def']),
"range": range_catcher(
rs_value=int(df_c.at[row["key_bbref"], 'tz_runs_catcher']),
season_pct=season_pct
),
"error": get_any_error(
pos_code='c',
errors=int(df_c.at[row["key_bbref"], 'E_def']),
chances=int(df_c.at[row["key_bbref"], 'chances']),
season_pct=season_pct
),
"arm": arm_rating,
"pb": pb_catcher(
pb=int(df_c.at[row["key_bbref"], 'PB']),
innings=int(float(df_c.at[row["key_bbref"], 'Inn_def'])),
season_pct=season_pct
),
"overthrow": ot_catcher(
errors=int(df_c.at[row["key_bbref"], 'E_def']),
chances=int(df_c.at[row["key_bbref"], 'chances']),
season_pct=season_pct
)
})
no_data = False
except Exception as e:
logging.info(f'Catcher position failed: {e}')
if no_data:
position_payload.append({
"player_id": int(row['player_id']),
"position": 'DH',
"innings": row['PA_vL'] + row['PA_vR']
})
all_pos = bs.apply(process_pos, axis=1)
return all_pos
def run_batters(data_input_path: str, start_date: int, end_date: int):
print(f'Running the batter calcs...')
batter_start = datetime.datetime.now()
data = get_batting_stats_by_date(f'{FILE_PATH}{EVENTS_FILENAME}', start_date=19980101, end_date=19980430)
# Get batting stats
batting_stats = get_batting_stats_by_date(f'{RETRO_FILE_PATH}{EVENTS_FILENAME}', start_date=start_date, end_date=end_date)
bs_len = len(batting_stats)
end_calc = datetime.datetime.now()
print(f'Batting stats: {(end_calc - batter_start).total_seconds():.2f}s')
running_start = datetime.datetime.now()
# Get running stats
running_stats = get_run_stat_df(data_input_path)
run_len = len(running_stats)
batting_stats = pd.merge(
left=batting_stats,
right=running_stats,
how='left',
left_on='key_bbref',
right_on='key_bbref'
)
end_calc = datetime.datetime.now()
print(f'Running stats: {(end_calc - running_start).total_seconds():.2f}s')
data.to_csv(f'batting_stats.csv')
end_save = datetime.datetime.now()
print(f'\nBatter time: {(end_calc - batter_start).total_seconds():.2f}s\nSave time: {(end_save - end_calc).total_seconds():.2f}s')
if len(batting_stats) != bs_len:
raise DataMismatchError(f'retrosheet_data - run_batters - We started with {bs_len} batting lines and have {len(batting_stats)} after merging with running_stats')
# Calculate batting cards
card_start = datetime.datetime.now()
all_batting_cards = calc_batting_cards(batting_stats)
card_end = datetime.datetime.now()
pitcher_start = datetime.datetime.now()
end_pitcher = datetime.datetime.now()
print(f'Create batting cards: {(card_end - card_start).total_seconds()}s')
print(f'\nPitcher time: {(end_pitcher - pitcher_start).total_seconds():.2f}s\n\nTotal: {(end_pitcher - batter_start).total_seconds():.2f}s\n\nDone!')
# Calculate batting ratings
rating_start = datetime.datetime.now()
batting_stats['battingcard_id'] = batting_stats['key_fangraphs']
all_batting_ratings = calc_batter_ratings(batting_stats)
rating_end = datetime.datetime.now()
print(f'Create batting ratings: {(rating_end - rating_start).total_seconds()}s')
# Calculate defense ratings
defense_start = datetime.datetime.now()
all_defense_ratings = calc_positions(batting_stats)
defense_end = datetime.datetime.now()
print(f'Create defense ratings: {(defense_end - defense_start).total_seconds()}s')
return batting_stats
async def main(args):
# batter_start = datetime.datetime.now()
# batting_stats = run_batters(f'{DATA_INPUT_FILE_PATH}', start_date=19980101, end_date=19980430)
# batting_stats.to_csv(f'batting_stats.csv')
# batter_end = datetime.datetime.now()
# pitcher_start = datetime.datetime.now()
# pitcher_end = datetime.datetime.now()
# print(f'\n\nBatter time: {(batter_end - batter_start).total_seconds():.2f}s \nPitcher time: {(pitcher_end - pitcher_start).total_seconds():.2f}s\nTotal: {(pitcher_end - batter_start).total_seconds():.2f}s\n\nDone!')
await store_defense_to_csv(1998)
if __name__ == '__main__':