146 lines
5.3 KiB
Python
146 lines
5.3 KiB
Python
import asyncio
|
|
import datetime
|
|
import logging
|
|
import sys
|
|
|
|
from typing import Literal
|
|
|
|
import pandas as pd
|
|
import pybaseball as pb
|
|
|
|
from creation_helpers import get_all_pybaseball_ids
|
|
|
|
date = f'{datetime.datetime.now().year}-{datetime.datetime.now().month}-{datetime.datetime.now().day}'
|
|
log_level = logging.INFO
|
|
logging.basicConfig(
|
|
filename=f'logs/{date}.log',
|
|
format='%(asctime)s - retrosheet_data - %(levelname)s - %(message)s',
|
|
level=log_level
|
|
)
|
|
FILE_PATH = 'data-input/retrosheet/'
|
|
EVENTS_FILENAME = 'retrosheets_events_1998_short.csv' # Removed last few columns which were throwing dtype errors
|
|
PERSONNEL_FILENAME = 'retrosheets_personnel.csv'
|
|
|
|
|
|
def get_events_by_date(file_path: str, start_date: int, end_date: int) -> pd.DataFrame:
|
|
all_plays = pd.read_csv(f'{file_path}', dtype={'game_id': 'str'})
|
|
all_plays['date'] = all_plays['game_id'].str[7:-1].astype(int)
|
|
date_plays = all_plays[(all_plays.date >= start_date) & (all_plays.date <= end_date)]
|
|
return date_plays
|
|
|
|
|
|
def get_result_series(plays: pd.DataFrame, event_type: str, pitcher_hand: Literal['r', 'l'], col_name: str) -> pd.Series:
|
|
this_series = plays[(plays.event_type == event_type) & (plays.pitcher_hand == pitcher_hand)].groupby('batter_id').count()['event_type'].astype(int).rename(col_name)
|
|
return this_series
|
|
|
|
|
|
# def get_batting_handedness(plays: pd.DataFrame) -> pd.DataFrame:
|
|
|
|
|
|
|
|
def get_player_ids(plays: pd.DataFrame, which: Literal['batters', 'pitchers']) -> pd.DataFrame:
|
|
RETRO_PLAYERS = pd.read_csv(f'{FILE_PATH}{PERSONNEL_FILENAME}')
|
|
id_key = 'batter_id' if which == 'batters' else 'pitcher_id'
|
|
|
|
players = pd.DataFrame()
|
|
unique_players = pd.Series(plays[id_key].unique()).to_frame('id')
|
|
players = pd.merge(
|
|
left=RETRO_PLAYERS,
|
|
right=unique_players,
|
|
how='right',
|
|
left_on='id',
|
|
right_on='id'
|
|
).rename(columns={'id': id_key})
|
|
|
|
def get_pids(row):
|
|
# return get_all_pybaseball_ids([row[id_key]], 'retro', full_name=f'{row["use_name"]} {row["last_name"]}')
|
|
pull = pb.playerid_reverse_lookup([row[id_key]], key_type='retro')
|
|
if len(pull.values) == 0:
|
|
print(f'Could not find id {row[id_key]} in pybaseball lookup')
|
|
return pull.loc[0][['key_mlbam', 'key_retro', 'key_bbref', 'key_fangraphs']]
|
|
|
|
players = players[[id_key, 'last_name', 'use_name']]
|
|
start_time = datetime.datetime.now()
|
|
other_ids = players.apply(get_pids, axis=1)
|
|
end_time = datetime.datetime.now()
|
|
print(f'ID lookup time: {(end_time - start_time).total_seconds():.2f}s')
|
|
|
|
players = pd.merge(
|
|
left=players,
|
|
right=other_ids,
|
|
left_on=id_key,
|
|
right_on='key_retro'
|
|
)
|
|
players = players.set_index(id_key)
|
|
|
|
return players
|
|
|
|
|
|
def get_base_batting_df(all_plays: pd.DataFrame) -> pd.DataFrame:
|
|
bs = get_player_ids(all_plays, 'batters')
|
|
|
|
# bs['key_mlbam'] = bs.apply()
|
|
|
|
pal_series = all_plays[(all_plays.batter_event == 't') & (all_plays.pitcher_hand == 'l')].groupby('batter_id').count()['event_type'].astype(int).rename('PA_vL')
|
|
bs = pd.concat([bs, pal_series], axis=1)
|
|
par_series = all_plays[(all_plays.batter_event == 't') & (all_plays.pitcher_hand == 'r')].groupby('batter_id').count()['event_type'].astype(int).rename('PA_vR')
|
|
bs = pd.concat([bs, par_series], axis=1)
|
|
|
|
|
|
abl_series = all_plays[(all_plays.ab == 't') & (all_plays.pitcher_hand == 'l')].groupby('batter_id').count()['event_type'].astype(int).rename('AB_vL')
|
|
bs = pd.concat([bs, abl_series], axis=1)
|
|
abr_series = all_plays[(all_plays.ab == 't') & (all_plays.pitcher_hand == 'r')].groupby('batter_id').count()['event_type'].astype(int).rename('AB_vR')
|
|
bs = pd.concat([bs, abr_series], axis=1)
|
|
|
|
return bs
|
|
|
|
|
|
def get_batting_stats_by_date(file_path, start_date: int, end_date: int) -> pd.DataFrame:
|
|
all_plays = get_events_by_date(file_path, start_date, end_date)
|
|
|
|
batting_stats = get_base_batting_df(all_plays)
|
|
|
|
# Basic counting stats
|
|
for event_type, vs_hand, col_name in [
|
|
('home run', 'r', 'HR_vR'),
|
|
('home run', 'l', 'HR_vL'),
|
|
('single', 'r', '1B_vR'),
|
|
('single', 'l', '1B_vL'),
|
|
('double', 'r', '2B_vR'),
|
|
('double', 'l', '2B_vL'),
|
|
('triple', 'r', '3B_vR'),
|
|
('triple', 'l', '3B_vL'),
|
|
('walk', 'r', 'BB_vR'),
|
|
('walk', 'l', 'BB_vL'),
|
|
('strikeout', 'r', 'SO_vR'),
|
|
('strikeout', 'l', 'SO_vL'),
|
|
('hit by pitch', 'r', 'HBP_vR'),
|
|
('hit by pitch', 'l', 'HBP_vL')
|
|
]:
|
|
this_series = get_result_series(all_plays, event_type, vs_hand, col_name)
|
|
batting_stats = pd.concat([batting_stats, this_series], axis=1)
|
|
|
|
# Bespoke queries
|
|
# Remaining:
|
|
|
|
# fill na to 0 following counting stats
|
|
batting_stats = batting_stats.fillna(0)
|
|
|
|
return batting_stats
|
|
|
|
|
|
async def main(args):
|
|
print(f'Running the calcs...')
|
|
start = datetime.datetime.now()
|
|
data = get_batting_stats_by_date(f'{FILE_PATH}{EVENTS_FILENAME}', start_date=101, end_date=430)
|
|
end_calc = datetime.datetime.now()
|
|
|
|
data.to_csv(f'batting_stats.csv')
|
|
end = datetime.datetime.now()
|
|
|
|
print(f'Stat calc time: {(end_calc - start).total_seconds():.2f}s\nSave time: {(end - end_calc).total_seconds():.2f}s\nTotal: {(end - start).total_seconds():.2f}s\n\nDone!')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
asyncio.run(main(sys.argv[1:]))
|