paper-dynasty-card-creation/retrosheet_data.py
2024-10-18 23:31:39 -05:00

274 lines
13 KiB
Python

import asyncio
import datetime
import logging
import sys
from typing import Literal
import pandas as pd
import pybaseball as pb
from pybaseball import cache
cache.enable()
date = f'{datetime.datetime.now().year}-{datetime.datetime.now().month}-{datetime.datetime.now().day}'
log_level = logging.INFO
logging.basicConfig(
filename=f'logs/{date}.log',
format='%(asctime)s - retrosheet_data - %(levelname)s - %(message)s',
level=log_level
)
FILE_PATH = 'data-input/retrosheet/'
EVENTS_FILENAME = 'retrosheets_events_1998_short.csv' # Removed last few columns which were throwing dtype errors
PERSONNEL_FILENAME = 'retrosheets_personnel.csv'
def get_events_by_date(file_path: str, start_date: int, end_date: int) -> pd.DataFrame:
all_plays = pd.read_csv(f'{file_path}', dtype={'game_id': 'str'})
all_plays['date'] = all_plays['game_id'].str[3:-1].astype(int)
date_plays = all_plays[(all_plays.date >= start_date) & (all_plays.date <= end_date)]
return date_plays
def get_result_series(plays: pd.DataFrame, event_type: str, pitcher_hand: Literal['r', 'l'], col_name: str) -> pd.Series:
this_series = plays[(plays.event_type == event_type) & (plays.pitcher_hand == pitcher_hand)].groupby('batter_id').count()['event_type'].astype(int).rename(col_name)
return this_series
# def get_batting_handedness(plays: pd.DataFrame) -> pd.DataFrame:
def get_player_ids(plays: pd.DataFrame, which: Literal['batters', 'pitchers']) -> pd.DataFrame:
RETRO_PLAYERS = pd.read_csv(f'{FILE_PATH}{PERSONNEL_FILENAME}')
id_key = 'batter_id' if which == 'batters' else 'pitcher_id'
players = pd.DataFrame()
unique_players = pd.Series(plays[id_key].unique()).to_frame('id')
players = pd.merge(
left=RETRO_PLAYERS,
right=unique_players,
how='right',
left_on='id',
right_on='id'
).rename(columns={'id': id_key})
def get_pids(row):
# return get_all_pybaseball_ids([row[id_key]], 'retro', full_name=f'{row["use_name"]} {row["last_name"]}')
pull = pb.playerid_reverse_lookup([row[id_key]], key_type='retro')
if len(pull.values) == 0:
print(f'Could not find id {row[id_key]} in pybaseball lookup')
return pull.loc[0][['key_mlbam', 'key_retro', 'key_bbref', 'key_fangraphs']]
players = players[[id_key, 'last_name', 'use_name']]
start_time = datetime.datetime.now()
other_ids = players.apply(get_pids, axis=1)
end_time = datetime.datetime.now()
print(f'ID lookup: {(end_time - start_time).total_seconds():.2f}s')
players = pd.merge(
left=players,
right=other_ids,
left_on=id_key,
right_on='key_retro'
)
players = players.set_index(id_key)
def get_bat_hand(row):
pa_vl = plays[(plays.batter_id == row['key_retro']) & (plays.pitcher_hand == 'l')].groupby('result_batter_hand').count()['game_id'].astype(int)
pa_vr = plays[(plays.batter_id == row['key_retro']) & (plays.pitcher_hand == 'r')].groupby('result_batter_hand').count()['game_id'].astype(int)
l_vs_l = 0 if 'l' not in pa_vl else pa_vl['l']
l_vs_r = 0 if 'l' not in pa_vr else pa_vr['l']
r_vs_l = 0 if 'r' not in pa_vl else pa_vl['r']
r_vs_r = 0 if 'r' not in pa_vr else pa_vr['r']
if sum([l_vs_l, l_vs_r]) == 0 and sum([r_vs_l, r_vs_r]) > 0:
return 'R'
elif sum([l_vs_l, l_vs_r]) > 0 and sum([r_vs_l, r_vs_r]) == 0:
return 'L'
if sum([l_vs_l, l_vs_r, r_vs_l, r_vs_r]) < 10:
if sum([l_vs_l, l_vs_r]) > sum([r_vs_l, r_vs_r]):
return 'L'
else:
return 'R'
else:
return 'S'
if which == 'batters':
players['bat_hand'] = players.apply(get_bat_hand, axis=1)
return players
def get_base_batting_df(all_plays: pd.DataFrame) -> pd.DataFrame:
bs = get_player_ids(all_plays, 'batters')
# bs['key_mlbam'] = bs.apply()
pal_series = all_plays[(all_plays.batter_event == 't') & (all_plays.pitcher_hand == 'l')].groupby('batter_id').count()['event_type'].astype(int).rename('PA_vL')
bs = pd.concat([bs, pal_series], axis=1)
par_series = all_plays[(all_plays.batter_event == 't') & (all_plays.pitcher_hand == 'r')].groupby('batter_id').count()['event_type'].astype(int).rename('PA_vR')
bs = pd.concat([bs, par_series], axis=1)
# bs = bs.dropna().query('PA_vL >= 20 & PA_vR >= 40')
abl_series = all_plays[(all_plays.ab == 't') & (all_plays.pitcher_hand == 'l')].groupby('batter_id').count()['event_type'].astype(int).rename('AB_vL')
bs = pd.concat([bs, abl_series], axis=1)
abr_series = all_plays[(all_plays.ab == 't') & (all_plays.pitcher_hand == 'r')].groupby('batter_id').count()['event_type'].astype(int).rename('AB_vR')
bs = pd.concat([bs, abr_series], axis=1)
return bs.dropna().query('PA_vL >= 20 & PA_vR >= 40')
def get_batting_stats_by_date(file_path, start_date: int, end_date: int) -> pd.DataFrame:
start = datetime.datetime.now()
all_plays = get_events_by_date(file_path, start_date, end_date)
print(f'Pull events: {(datetime.datetime.now() - start).total_seconds():.2f}s')
start = datetime.datetime.now()
batting_stats = get_base_batting_df(all_plays)
print(f'Get base dataframe: {(datetime.datetime.now() - start).total_seconds():.2f}s')
# Basic counting stats
start = datetime.datetime.now()
for event_type, vs_hand, col_name in [
('home run', 'r', 'HR_vR'),
('home run', 'l', 'HR_vL'),
('single', 'r', '1B_vR'),
('single', 'l', '1B_vL'),
('double', 'r', '2B_vR'),
('double', 'l', '2B_vL'),
('triple', 'r', '3B_vR'),
('triple', 'l', '3B_vL'),
('walk', 'r', 'BB_vR'),
('walk', 'l', 'BB_vL'),
('strikeout', 'r', 'SO_vR'),
('strikeout', 'l', 'SO_vL'),
('hit by pitch', 'r', 'HBP_vR'),
('hit by pitch', 'l', 'HBP_vL')
]:
this_series = get_result_series(all_plays, event_type, vs_hand, col_name)
batting_stats[col_name] = this_series
print(f'Count basic stats: {(datetime.datetime.now() - start).total_seconds():.2f}s')
# Bespoke counting stats
start = datetime.datetime.now()
def get_fb_vl(row):
return all_plays[(all_plays.batter_id == row['key_retro']) & (all_plays.batted_ball_type == 'f') & (all_plays.pitcher_hand == 'l')].count()['event_type'].astype(int)
def get_fb_vr(row):
return all_plays[(all_plays.batter_id == row['key_retro']) & (all_plays.batted_ball_type == 'f') & (all_plays.pitcher_hand == 'r')].count()['event_type'].astype(int)
def get_gb_vl(row):
return all_plays[(all_plays.batter_id == row['key_retro']) & (all_plays.batted_ball_type == 'G') & (all_plays.pitcher_hand == 'l')].count()['event_type'].astype(int)
def get_gb_vr(row):
return all_plays[(all_plays.batter_id == row['key_retro']) & (all_plays.batted_ball_type == 'G') & (all_plays.pitcher_hand == 'r')].count()['event_type'].astype(int)
def get_ld_vl(row):
return all_plays[(all_plays.batter_id == row['key_retro']) & (all_plays.batted_ball_type == 'l') & (all_plays.pitcher_hand == 'l')].count()['event_type'].astype(int)
def get_ld_vr(row):
return all_plays[(all_plays.batter_id == row['key_retro']) & (all_plays.batted_ball_type == 'l') & (all_plays.pitcher_hand == 'r')].count()['event_type'].astype(int)
def get_gdp_vl(row):
dp = all_plays[(all_plays.batter_id == row['key_retro']) & (all_plays.batter_event == 't') & (all_plays.pitcher_hand == 'l') & (all_plays.dp == 't')].count()['event_type'].astype(int)
tp = all_plays[(all_plays.batter_id == row['key_retro']) & (all_plays.batter_event == 't') & (all_plays.pitcher_hand == 'l') & (all_plays.tp == 't')].count()['event_type'].astype(int)
return dp + tp
def get_gdp_vr(row):
dp = all_plays[(all_plays.batter_id == row['key_retro']) & (all_plays.batter_event == 't') & (all_plays.pitcher_hand == 'r') & (all_plays.dp == 't')].count()['event_type'].astype(int)
tp = all_plays[(all_plays.batter_id == row['key_retro']) & (all_plays.batter_event == 't') & (all_plays.pitcher_hand == 'r') & (all_plays.tp == 't')].count()['event_type'].astype(int)
return dp + tp
def get_bunt(row):
return all_plays[(all_plays.batter_id == row['key_retro']) & (all_plays.bunt == 't')].count()['event_type'].astype(int)
batting_stats['FB_vL'] = batting_stats.apply(get_fb_vl, axis=1)
batting_stats['FB_vR'] = batting_stats.apply(get_fb_vr, axis=1)
batting_stats['GB_vL'] = batting_stats.apply(get_gb_vl, axis=1)
batting_stats['GB_vR'] = batting_stats.apply(get_gb_vr, axis=1)
batting_stats['LD_vL'] = batting_stats.apply(get_ld_vl, axis=1)
batting_stats['LD_vR'] = batting_stats.apply(get_ld_vr, axis=1)
batting_stats['GDP_vL'] = batting_stats.apply(get_gdp_vl, axis=1)
batting_stats['GDP_vR'] = batting_stats.apply(get_gdp_vr, axis=1)
batting_stats['Bunts'] = batting_stats.apply(get_bunt, axis=1)
print(f'Custom counting stats: {(datetime.datetime.now() - start).total_seconds():.2f}s')
# fill na to 0 following counting stats
batting_stats = batting_stats.fillna(0)
# Calculated Fields
start = datetime.datetime.now()
batting_stats['H_vL'] = batting_stats['1B_vL'] + batting_stats['2B_vL'] + batting_stats['3B_vL'] + batting_stats['HR_vL']
batting_stats['H_vR'] = batting_stats['1B_vR'] + batting_stats['2B_vR'] + batting_stats['3B_vR'] + batting_stats['HR_vR']
batting_stats['AVG_vL'] = round(batting_stats['H_vL'] / batting_stats['AB_vL'], 5)
batting_stats['AVG_vR'] = round(batting_stats['H_vR'] / batting_stats['AB_vR'], 5)
batting_stats['OBP_vL'] = round((batting_stats['H_vL'] + batting_stats['BB_vL'] + batting_stats['HBP_vL']) / batting_stats['PA_vL'], 5)
batting_stats['OBP_vR'] = round((batting_stats['H_vR'] + batting_stats['BB_vR'] + batting_stats['HBP_vR']) / batting_stats['PA_vR'], 5)
batting_stats['SLG_vL'] = round((batting_stats['1B_vL'] + batting_stats['2B_vL'] * 2 + batting_stats['3B_vL'] * 3 + batting_stats['HR_vL'] * 4) / batting_stats['AB_vL'], 5)
batting_stats['SLG_vR'] = round((batting_stats['1B_vR'] + batting_stats['2B_vR'] * 2 + batting_stats['3B_vR'] * 3 + batting_stats['HR_vR'] * 4) / batting_stats['AB_vR'], 5)
batting_stats['HR/FB_vL'] = batting_stats['HR_vL'] / batting_stats['FB_vL']
batting_stats['HR/FB_vR'] = batting_stats['HR_vR'] / batting_stats['FB_vR']
batting_stats['FB%_vL'] = batting_stats['FB_vL'] / (batting_stats['FB_vL'] + batting_stats['GB_vL'] + batting_stats['LD_vL'])
batting_stats['FB%_vR'] = batting_stats['FB_vR'] / (batting_stats['FB_vR'] + batting_stats['GB_vR'] + batting_stats['LD_vR'])
batting_stats['GB%_vL'] = batting_stats['GB_vL'] / (batting_stats['FB_vL'] + batting_stats['GB_vL'] + batting_stats['LD_vL'])
batting_stats['GB%_vR'] = batting_stats['GB_vR'] / (batting_stats['FB_vR'] + batting_stats['GB_vR'] + batting_stats['LD_vR'])
batting_stats['LD%_vL'] = batting_stats['LD_vL'] / (batting_stats['FB_vL'] + batting_stats['GB_vL'] + batting_stats['LD_vL'])
batting_stats['LD%_vR'] = batting_stats['LD_vR'] / (batting_stats['FB_vR'] + batting_stats['GB_vR'] + batting_stats['LD_vR'])
batting_stats['Hard%_vL'] = round(0.2 + batting_stats['SLG_vL'] - batting_stats['AVG_vL'], 5)
batting_stats['Hard%_vR'] = round(0.2 + batting_stats['SLG_vR'] - batting_stats['AVG_vR'], 5)
def get_med_vL(row):
high = 0.9 - row['Hard%_vL']
low = (row['SLG_vL'] - row['AVG_vL']) * 1.5
return round(max(min(high, low),0.1), 5)
def get_med_vR(row):
high = 0.9 - row['Hard%_vR']
low = (row['SLG_vR'] - row['AVG_vR']) * 1.5
return round(max(min(high, low),0.1), 5)
batting_stats['Med%_vL'] = batting_stats.apply(get_med_vL, axis=1)
batting_stats['Med%_vR'] = batting_stats.apply(get_med_vR, axis=1)
batting_stats['Soft%_vL'] = round(1 - batting_stats['Hard%_vL'] - batting_stats['Med%_vL'], 5)
batting_stats['Soft%_vR'] = round(1 - batting_stats['Hard%_vR'] - batting_stats['Med%_vR'], 5)
print(f'Calculated fields: {(datetime.datetime.now() - start).total_seconds():.2f}s')
# Bespoke Queries
"""
Remaining:
Pull%_vL
Cent%_vL
Oppo%_vL
IFH%_vL
"""
return batting_stats
async def main(args):
print(f'Running the calcs...')
start = datetime.datetime.now()
data = get_batting_stats_by_date(f'{FILE_PATH}{EVENTS_FILENAME}', start_date=19980101, end_date=19980430)
end_calc = datetime.datetime.now()
data.to_csv(f'batting_stats.csv')
end = datetime.datetime.now()
print(f'\nTotal stat time: {(end_calc - start).total_seconds():.2f}s\nSave time: {(end - end_calc).total_seconds():.2f}s\n\nTotal: {(end - start).total_seconds():.2f}s\n\nDone!')
if __name__ == '__main__':
asyncio.run(main(sys.argv[1:]))