import asyncio import datetime import logging import sys from typing import Literal import pandas as pd import pybaseball as pb from pybaseball import cache cache.enable() date = f'{datetime.datetime.now().year}-{datetime.datetime.now().month}-{datetime.datetime.now().day}' log_level = logging.INFO logging.basicConfig( filename=f'logs/{date}.log', format='%(asctime)s - retrosheet_data - %(levelname)s - %(message)s', level=log_level ) FILE_PATH = 'data-input/retrosheet/' EVENTS_FILENAME = 'retrosheets_events_1998_short.csv' # Removed last few columns which were throwing dtype errors PERSONNEL_FILENAME = 'retrosheets_personnel.csv' def get_events_by_date(file_path: str, start_date: int, end_date: int) -> pd.DataFrame: all_plays = pd.read_csv(f'{file_path}', dtype={'game_id': 'str'}) all_plays['date'] = all_plays['game_id'].str[3:-1].astype(int) date_plays = all_plays[(all_plays.date >= start_date) & (all_plays.date <= end_date)] return date_plays def get_result_series(plays: pd.DataFrame, event_type: str, pitcher_hand: Literal['r', 'l'], col_name: str) -> pd.Series: this_series = plays[(plays.event_type == event_type) & (plays.pitcher_hand == pitcher_hand)].groupby('batter_id').count()['event_type'].astype(int).rename(col_name) return this_series # def get_batting_handedness(plays: pd.DataFrame) -> pd.DataFrame: def get_player_ids(plays: pd.DataFrame, which: Literal['batters', 'pitchers']) -> pd.DataFrame: RETRO_PLAYERS = pd.read_csv(f'{FILE_PATH}{PERSONNEL_FILENAME}') id_key = 'batter_id' if which == 'batters' else 'pitcher_id' players = pd.DataFrame() unique_players = pd.Series(plays[id_key].unique()).to_frame('id') players = pd.merge( left=RETRO_PLAYERS, right=unique_players, how='right', left_on='id', right_on='id' ).rename(columns={'id': id_key}) def get_pids(row): # return get_all_pybaseball_ids([row[id_key]], 'retro', full_name=f'{row["use_name"]} {row["last_name"]}') pull = pb.playerid_reverse_lookup([row[id_key]], key_type='retro') if len(pull.values) == 0: print(f'Could not find id {row[id_key]} in pybaseball lookup') return pull.loc[0][['key_mlbam', 'key_retro', 'key_bbref', 'key_fangraphs']] players = players[[id_key, 'last_name', 'use_name']] start_time = datetime.datetime.now() other_ids = players.apply(get_pids, axis=1) end_time = datetime.datetime.now() print(f'ID lookup: {(end_time - start_time).total_seconds():.2f}s') players = pd.merge( left=players, right=other_ids, left_on=id_key, right_on='key_retro' ) players = players.set_index(id_key) def get_bat_hand(row): pa_vl = plays[(plays.batter_id == row['key_retro']) & (plays.pitcher_hand == 'l')].groupby('result_batter_hand').count()['game_id'].astype(int) pa_vr = plays[(plays.batter_id == row['key_retro']) & (plays.pitcher_hand == 'r')].groupby('result_batter_hand').count()['game_id'].astype(int) l_vs_l = 0 if 'l' not in pa_vl else pa_vl['l'] l_vs_r = 0 if 'l' not in pa_vr else pa_vr['l'] r_vs_l = 0 if 'r' not in pa_vl else pa_vl['r'] r_vs_r = 0 if 'r' not in pa_vr else pa_vr['r'] if sum([l_vs_l, l_vs_r]) == 0 and sum([r_vs_l, r_vs_r]) > 0: return 'R' elif sum([l_vs_l, l_vs_r]) > 0 and sum([r_vs_l, r_vs_r]) == 0: return 'L' if sum([l_vs_l, l_vs_r, r_vs_l, r_vs_r]) < 10: if sum([l_vs_l, l_vs_r]) > sum([r_vs_l, r_vs_r]): return 'L' else: return 'R' else: return 'S' if which == 'batters': players['bat_hand'] = players.apply(get_bat_hand, axis=1) return players def get_base_batting_df(all_plays: pd.DataFrame) -> pd.DataFrame: bs = get_player_ids(all_plays, 'batters') # bs['key_mlbam'] = bs.apply() pal_series = all_plays[(all_plays.batter_event == 't') & (all_plays.pitcher_hand == 'l')].groupby('batter_id').count()['event_type'].astype(int).rename('PA_vL') bs = pd.concat([bs, pal_series], axis=1) par_series = all_plays[(all_plays.batter_event == 't') & (all_plays.pitcher_hand == 'r')].groupby('batter_id').count()['event_type'].astype(int).rename('PA_vR') bs = pd.concat([bs, par_series], axis=1) # bs = bs.dropna().query('PA_vL >= 20 & PA_vR >= 40') abl_series = all_plays[(all_plays.ab == 't') & (all_plays.pitcher_hand == 'l')].groupby('batter_id').count()['event_type'].astype(int).rename('AB_vL') bs = pd.concat([bs, abl_series], axis=1) abr_series = all_plays[(all_plays.ab == 't') & (all_plays.pitcher_hand == 'r')].groupby('batter_id').count()['event_type'].astype(int).rename('AB_vR') bs = pd.concat([bs, abr_series], axis=1) return bs.dropna().query('PA_vL >= 20 & PA_vR >= 40') def get_batting_stats_by_date(file_path, start_date: int, end_date: int) -> pd.DataFrame: start = datetime.datetime.now() all_plays = get_events_by_date(file_path, start_date, end_date) print(f'Pull events: {(datetime.datetime.now() - start).total_seconds():.2f}s') start = datetime.datetime.now() batting_stats = get_base_batting_df(all_plays) print(f'Get base dataframe: {(datetime.datetime.now() - start).total_seconds():.2f}s') # Basic counting stats start = datetime.datetime.now() for event_type, vs_hand, col_name in [ ('home run', 'r', 'HR_vR'), ('home run', 'l', 'HR_vL'), ('single', 'r', '1B_vR'), ('single', 'l', '1B_vL'), ('double', 'r', '2B_vR'), ('double', 'l', '2B_vL'), ('triple', 'r', '3B_vR'), ('triple', 'l', '3B_vL'), ('walk', 'r', 'BB_vR'), ('walk', 'l', 'BB_vL'), ('strikeout', 'r', 'SO_vR'), ('strikeout', 'l', 'SO_vL'), ('hit by pitch', 'r', 'HBP_vR'), ('hit by pitch', 'l', 'HBP_vL') ]: this_series = get_result_series(all_plays, event_type, vs_hand, col_name) batting_stats[col_name] = this_series print(f'Count basic stats: {(datetime.datetime.now() - start).total_seconds():.2f}s') # Bespoke counting stats start = datetime.datetime.now() def get_fb_vl(row): return all_plays[(all_plays.batter_id == row['key_retro']) & (all_plays.batted_ball_type == 'f') & (all_plays.pitcher_hand == 'l')].count()['event_type'].astype(int) def get_fb_vr(row): return all_plays[(all_plays.batter_id == row['key_retro']) & (all_plays.batted_ball_type == 'f') & (all_plays.pitcher_hand == 'r')].count()['event_type'].astype(int) def get_gb_vl(row): return all_plays[(all_plays.batter_id == row['key_retro']) & (all_plays.batted_ball_type == 'G') & (all_plays.pitcher_hand == 'l')].count()['event_type'].astype(int) def get_gb_vr(row): return all_plays[(all_plays.batter_id == row['key_retro']) & (all_plays.batted_ball_type == 'G') & (all_plays.pitcher_hand == 'r')].count()['event_type'].astype(int) def get_ld_vl(row): return all_plays[(all_plays.batter_id == row['key_retro']) & (all_plays.batted_ball_type == 'l') & (all_plays.pitcher_hand == 'l')].count()['event_type'].astype(int) def get_ld_vr(row): return all_plays[(all_plays.batter_id == row['key_retro']) & (all_plays.batted_ball_type == 'l') & (all_plays.pitcher_hand == 'r')].count()['event_type'].astype(int) def get_gdp_vl(row): dp = all_plays[(all_plays.batter_id == row['key_retro']) & (all_plays.batter_event == 't') & (all_plays.pitcher_hand == 'l') & (all_plays.dp == 't')].count()['event_type'].astype(int) tp = all_plays[(all_plays.batter_id == row['key_retro']) & (all_plays.batter_event == 't') & (all_plays.pitcher_hand == 'l') & (all_plays.tp == 't')].count()['event_type'].astype(int) return dp + tp def get_gdp_vr(row): dp = all_plays[(all_plays.batter_id == row['key_retro']) & (all_plays.batter_event == 't') & (all_plays.pitcher_hand == 'r') & (all_plays.dp == 't')].count()['event_type'].astype(int) tp = all_plays[(all_plays.batter_id == row['key_retro']) & (all_plays.batter_event == 't') & (all_plays.pitcher_hand == 'r') & (all_plays.tp == 't')].count()['event_type'].astype(int) return dp + tp def get_bunt(row): return all_plays[(all_plays.batter_id == row['key_retro']) & (all_plays.bunt == 't')].count()['event_type'].astype(int) batting_stats['FB_vL'] = batting_stats.apply(get_fb_vl, axis=1) batting_stats['FB_vR'] = batting_stats.apply(get_fb_vr, axis=1) batting_stats['GB_vL'] = batting_stats.apply(get_gb_vl, axis=1) batting_stats['GB_vR'] = batting_stats.apply(get_gb_vr, axis=1) batting_stats['LD_vL'] = batting_stats.apply(get_ld_vl, axis=1) batting_stats['LD_vR'] = batting_stats.apply(get_ld_vr, axis=1) batting_stats['GDP_vL'] = batting_stats.apply(get_gdp_vl, axis=1) batting_stats['GDP_vR'] = batting_stats.apply(get_gdp_vr, axis=1) batting_stats['Bunts'] = batting_stats.apply(get_bunt, axis=1) print(f'Custom counting stats: {(datetime.datetime.now() - start).total_seconds():.2f}s') # Infield Hit % ifh_vl = all_plays[(all_plays.hit_val.str.contains('1|2|3')) & (all_plays.pitcher_hand == 'l') & (all_plays.hit_location.str.contains('1|2|3|4|5|6')) & (~all_plays.hit_location.str.contains('D', na=False))].groupby('batter_id').count()['event_type'].astype(int).rename('ifh_vL') ifh_vr = all_plays[(all_plays.hit_val.str.contains('1|2|3')) & (all_plays.pitcher_hand == 'r') & (all_plays.hit_location.str.contains('1|2|3|4|5|6')) & (~all_plays.hit_location.str.contains('D', na=False))].groupby('batter_id').count()['event_type'].astype(int).rename('ifh_vR') batting_stats['ifh_vL'] = ifh_vl batting_stats['ifh_vR'] = ifh_vr def get_pull_vl(row): pull_loc = '5|7' if row['bat_hand'] != 'L' else '3|9' x = all_plays[(all_plays.batter_id == row['key_retro']) & (all_plays.pitcher_hand == 'l') & (all_plays.hit_location.str.contains(pull_loc))].count()['event_type'].astype(int) return x def get_pull_vr(row): pull_loc = '5|7' if row['bat_hand'] == 'R' else '3|9' x = all_plays[(all_plays.batter_id == row['key_retro']) & (all_plays.pitcher_hand == 'r') & (all_plays.hit_location.str.contains(pull_loc))].count()['event_type'].astype(int) return x # Bespoke Queries batting_stats['pull_vL'] = batting_stats.apply(get_pull_vl, axis=1) batting_stats['pull_vR'] = batting_stats.apply(get_pull_vr, axis=1) center_vl = all_plays[(all_plays.pitcher_hand == 'l') & (all_plays.hit_location.str.contains('1|4|6|8'))].groupby('batter_id').count()['event_type'].astype(int).rename('center_vl') center_vr = all_plays[(all_plays.pitcher_hand == 'r') & (all_plays.hit_location.str.contains('1|4|6|8'))].groupby('batter_id').count()['event_type'].astype(int).rename('center_vr') batting_stats['center_vL'] = center_vl batting_stats['center_vR'] = center_vr oppo_vl = all_plays[(all_plays.pitcher_hand == 'l') & (all_plays.hit_location.str.contains('5|7'))].groupby('batter_id').count()['event_type'].astype(int).rename('oppo_vL') oppo_vr = all_plays[(all_plays.pitcher_hand == 'r') & (all_plays.hit_location.str.contains('5|7'))].groupby('batter_id').count()['event_type'].astype(int).rename('oppo_vR') batting_stats['oppo_vL'] = oppo_vl batting_stats['oppo_vR'] = oppo_vr # fill na to 0 following counting stats batting_stats = batting_stats.fillna(0) # Calculated Fields start = datetime.datetime.now() batting_stats['H_vL'] = batting_stats['1B_vL'] + batting_stats['2B_vL'] + batting_stats['3B_vL'] + batting_stats['HR_vL'] batting_stats['H_vR'] = batting_stats['1B_vR'] + batting_stats['2B_vR'] + batting_stats['3B_vR'] + batting_stats['HR_vR'] batting_stats['AVG_vL'] = round(batting_stats['H_vL'] / batting_stats['AB_vL'], 5) batting_stats['AVG_vR'] = round(batting_stats['H_vR'] / batting_stats['AB_vR'], 5) batting_stats['OBP_vL'] = round((batting_stats['H_vL'] + batting_stats['BB_vL'] + batting_stats['HBP_vL']) / batting_stats['PA_vL'], 5) batting_stats['OBP_vR'] = round((batting_stats['H_vR'] + batting_stats['BB_vR'] + batting_stats['HBP_vR']) / batting_stats['PA_vR'], 5) batting_stats['SLG_vL'] = round((batting_stats['1B_vL'] + batting_stats['2B_vL'] * 2 + batting_stats['3B_vL'] * 3 + batting_stats['HR_vL'] * 4) / batting_stats['AB_vL'], 5) batting_stats['SLG_vR'] = round((batting_stats['1B_vR'] + batting_stats['2B_vR'] * 2 + batting_stats['3B_vR'] * 3 + batting_stats['HR_vR'] * 4) / batting_stats['AB_vR'], 5) batting_stats['HR/FB_vL'] = batting_stats['HR_vL'] / batting_stats['FB_vL'] batting_stats['HR/FB_vR'] = batting_stats['HR_vR'] / batting_stats['FB_vR'] batting_stats['FB%_vL'] = batting_stats['FB_vL'] / (batting_stats['FB_vL'] + batting_stats['GB_vL'] + batting_stats['LD_vL']) batting_stats['FB%_vR'] = batting_stats['FB_vR'] / (batting_stats['FB_vR'] + batting_stats['GB_vR'] + batting_stats['LD_vR']) batting_stats['GB%_vL'] = batting_stats['GB_vL'] / (batting_stats['FB_vL'] + batting_stats['GB_vL'] + batting_stats['LD_vL']) batting_stats['GB%_vR'] = batting_stats['GB_vR'] / (batting_stats['FB_vR'] + batting_stats['GB_vR'] + batting_stats['LD_vR']) batting_stats['LD%_vL'] = batting_stats['LD_vL'] / (batting_stats['FB_vL'] + batting_stats['GB_vL'] + batting_stats['LD_vL']) batting_stats['LD%_vR'] = batting_stats['LD_vR'] / (batting_stats['FB_vR'] + batting_stats['GB_vR'] + batting_stats['LD_vR']) batting_stats['Hard%_vL'] = round(0.2 + batting_stats['SLG_vL'] - batting_stats['AVG_vL'], 5) batting_stats['Hard%_vR'] = round(0.2 + batting_stats['SLG_vR'] - batting_stats['AVG_vR'], 5) def get_med_vL(row): high = 0.9 - row['Hard%_vL'] low = (row['SLG_vL'] - row['AVG_vL']) * 1.5 return round(max(min(high, low),0.1), 5) def get_med_vR(row): high = 0.9 - row['Hard%_vR'] low = (row['SLG_vR'] - row['AVG_vR']) * 1.5 return round(max(min(high, low),0.1), 5) batting_stats['Med%_vL'] = batting_stats.apply(get_med_vL, axis=1) batting_stats['Med%_vR'] = batting_stats.apply(get_med_vR, axis=1) batting_stats['Soft%_vL'] = round(1 - batting_stats['Hard%_vL'] - batting_stats['Med%_vL'], 5) batting_stats['Soft%_vR'] = round(1 - batting_stats['Hard%_vR'] - batting_stats['Med%_vR'], 5) batting_stats['IFH%_vL'] = round(batting_stats['ifh_vL'] / batting_stats['H_vL'], 5) batting_stats['IFH%_vR'] = round(batting_stats['ifh_vR'] / batting_stats['H_vR'], 5) batting_stats['Pull%_vL'] = round(batting_stats['pull_vL'] / (batting_stats['pull_vL'] + batting_stats['center_vL'] + batting_stats['oppo_vL']), 5) batting_stats['Pull%_vR'] = round(batting_stats['pull_vR'] / (batting_stats['pull_vR'] + batting_stats['center_vR'] + batting_stats['oppo_vR']), 5) batting_stats['Cent%_vL'] = round(batting_stats['center_vL'] / (batting_stats['pull_vL'] + batting_stats['center_vL'] + batting_stats['oppo_vL']), 5) batting_stats['Cent%_vR'] = round(batting_stats['center_vL'] / (batting_stats['pull_vR'] + batting_stats['center_vR'] + batting_stats['oppo_vR']), 5) batting_stats['Oppo%_vL'] = round(1 - batting_stats['Pull%_vL'] - batting_stats['Cent%_vL'], 5) batting_stats['Oppo%_vR'] = round(1 - batting_stats['Pull%_vR'] - batting_stats['Cent%_vR'], 5) print(f'Calculated fields: {(datetime.datetime.now() - start).total_seconds():.2f}s') return batting_stats async def main(args): print(f'Running the batter calcs...') batter_start = datetime.datetime.now() data = get_batting_stats_by_date(f'{FILE_PATH}{EVENTS_FILENAME}', start_date=19980101, end_date=19980430) end_calc = datetime.datetime.now() data.to_csv(f'batting_stats.csv') end_save = datetime.datetime.now() print(f'\nBatter time: {(end_calc - batter_start).total_seconds():.2f}s\nSave time: {(end_save - end_calc).total_seconds():.2f}s') pitcher_start = datetime.datetime.now() end_pitcher = datetime.datetime.now() print(f'\nPitcher time: {(end_pitcher - pitcher_start).total_seconds():.2f}s\n\nTotal: {(end_pitcher - batter_start).total_seconds():.2f}s\n\nDone!') if __name__ == '__main__': asyncio.run(main(sys.argv[1:]))