paper-dynasty-card-creation/retrosheet_data.py
Cal Corum 6746b51ca6 CLAUDE: Add missing db_delete import for cardposition cleanup
The deletion logic was failing with 'name db_delete is not defined' because
the function wasn't imported from db_calls.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-09 11:53:39 -06:00

1703 lines
76 KiB
Python

import asyncio
import datetime
import logging
from logging.handlers import RotatingFileHandler
import math
import sys
from typing import Literal
import pandas as pd
import pybaseball as pb
from pybaseball import cache
import urllib
from creation_helpers import get_args, CLUB_LIST, FRANCHISE_LIST, sanitize_name
from batters.stat_prep import DataMismatchError
from db_calls import DB_URL, db_get, db_patch, db_post, db_put, db_delete
from exceptions import log_exception, logger
from retrosheet_transformer import load_retrosheet_csv
import batters.calcs_batter as cba
import defenders.calcs_defense as cde
import pitchers.calcs_pitcher as cpi
cache.enable()
# date = f'{datetime.datetime.now().year}-{datetime.datetime.now().month}-{datetime.datetime.now().day}'
# log_level = logger.INFO
# logger.basicConfig(
# filename=f'logs/{date}.log',
# format='%(asctime)s - retrosheet_data - %(levelname)s - %(message)s',
# level=log_level
# )
RETRO_FILE_PATH = 'data-input/retrosheet/'
EVENTS_FILENAME = 'retrosheets_events_2005.csv' # Now using transformer for new format compatibility
PERSONNEL_FILENAME = 'retrosheets_personnel.csv'
DATA_INPUT_FILE_PATH = 'data-input/2005 Live Cardset/'
CARD_BASE_URL = f'{DB_URL}/v2/players/'
start_time = datetime.datetime.now()
RELEASE_DIRECTORY = f'{start_time.year}-{start_time.month}-{start_time.day}'
PLAYER_DESCRIPTION = 'Live' # Live for Live Series
# PLAYER_DESCRIPTION = 'September PotM' # <Month> PotM for promos
PROMO_INCLUSION_RETRO_IDS = [
# 'marte001',
# 'willg001',
# 'sampb003',
# 'ruscg001',
# 'larkb001',
# 'sosas001',
# 'smolj001',
# 'acevj001'
]
MIN_PA_VL = 20 if 'live' in PLAYER_DESCRIPTION.lower() else 1 # 1 for PotM
MIN_PA_VR = 40 if 'live' in PLAYER_DESCRIPTION.lower() else 1 # 1 for PotM
MIN_TBF_VL = MIN_PA_VL
MIN_TBF_VR = MIN_PA_VR
CARDSET_ID = 27 if 'live' in PLAYER_DESCRIPTION.lower() else 28 # 27: 2005 Live, 28: 2005 Promos
# Per-Update Parameters
SEASON_PCT = 28 / 162 # Full season
START_DATE = 20050301 # YYYYMMDD format - 2005 Opening Day
END_DATE = 20050430 # YYYYMMDD format - Month 1 of play
POST_DATA = True
LAST_WEEK_RATIO = 0.0 if PLAYER_DESCRIPTION == 'Live' else 0.0
LAST_TWOWEEKS_RATIO = 0.0
LAST_MONTH_RATIO = 0.0
def date_from_int(integer_date: int) -> datetime.datetime:
return datetime.datetime(int(str(integer_date)[:4]), int(str(integer_date)[4:6]), int(str(integer_date)[-2:]))
def date_math(start_date: int, operator: Literal['+', '-'], day_delta: int = 0, month_delta: int = 0, year_delta: int = 0) -> int:
if len(str(start_date)) != 8:
log_exception(ValueError, 'Start date must be 8 digits long')
if True in [day_delta < 0, month_delta < 0, year_delta < 0]:
log_exception(ValueError, 'Time deltas must greater than or equal to 0; use `-` operator to go back in time')
if day_delta > 28:
log_exception(ValueError, 'Use month_delta for days > 28')
if month_delta > 12:
log_exception(ValueError, 'Use year_delta for months > 12')
s_date = date_from_int(start_date)
if year_delta > 0:
s_date = datetime.datetime(
s_date.year + year_delta if operator == '+' else s_date.year - year_delta,
s_date.month,
s_date.day
)
if month_delta > 0:
month_range = [12, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
new_index = s_date.month + month_delta if operator == '+' else s_date.month - month_delta
new_month = month_range[(new_index % 12)]
new_year = s_date.year
if new_index > 12:
new_year += 1
elif new_index < 1:
new_year -= 1
s_date = datetime.datetime(
new_year,
new_month,
s_date.day
)
fd = s_date + datetime.timedelta(days=day_delta if operator == '+' else day_delta * -1)
return f'{str(fd.year).zfill(4)}{str(fd.month).zfill(2)}{str(fd.day).zfill(2)}'
def weeks_between(start_date_int: int, end_date_int: int) -> int:
start_date = date_from_int(start_date_int)
end_date = date_from_int(end_date_int)
delta = end_date - start_date
return abs(round(delta.days / 7))
async def store_defense_to_csv(season: int):
for position in ['c', '1b', '2b', '3b', 'ss', 'lf', 'cf', 'rf', 'of', 'p']:
pos_df = cde.get_bbref_fielding_df(position, season)
pos_df.to_csv(f'{DATA_INPUT_FILE_PATH}defense_{position}.csv')
await asyncio.sleep(8)
def get_batting_result_series(plays: pd.DataFrame, event_type: str, pitcher_hand: Literal['r', 'l'], col_name: str) -> pd.Series:
this_series = plays[(plays.event_type == event_type) & (plays.pitcher_hand == pitcher_hand)].groupby('batter_id').count()['event_type'].astype(int).rename(col_name)
return this_series
def get_pitching_result_series(plays: pd.DataFrame, event_type: str, batter_hand: Literal['r', 'l'], col_name: str) -> pd.Series:
this_series = plays[(plays.event_type == event_type) & (plays.batter_hand == batter_hand)].groupby('pitcher_id').count()['event_type'].astype(int).rename(col_name)
return this_series
def get_run_stat_df(input_path: str):
run_data = pd.read_csv(f'{input_path}running.csv') #.set_index('Name-additional'))
# if 'Player' in run_data:
# run_data = run_data.rename(columns={'Player': 'Full Name'})
# if 'Name' in run_data:
# run_data = run_data.rename(columns={'Name': 'Full Name'})
if 'Player-additional' in run_data:
run_data = run_data.rename(columns={'Player-additional': 'key_bbref'})
if 'Name-additional' in run_data:
run_data = run_data.rename(columns={'Name-additional': 'key_bbref'})
run_data = run_data[['key_bbref', 'Tm', 'ROE', 'XI', 'RS%', 'SBO', 'SB', 'CS', 'SB%', 'SB2', 'CS2', 'SB3', 'CS3', 'SBH', 'CSH', 'PO', 'PCS', 'OOB', 'OOB1', 'OOB2', 'OOB3', 'OOBHm', 'BT', 'XBT%', '1stS', '1stS2', '1stS3', '1stD', '1stD3', '1stDH', '2ndS', '2ndS3', '2ndSH']]
run_data = run_data.fillna(0)
return run_data.set_index('key_bbref')
def get_periph_stat_df(input_path: str):
pit_data = pd.read_csv(f'{input_path}pitching.csv')
if 'Player-additional' in pit_data:
pit_data = pit_data.rename(columns={'Player-additional': 'key_bbref'})
if 'Name-additional' in pit_data:
pit_data = pit_data.rename(columns={'Name-additional': 'key_bbref'})
if 'Team' in pit_data:
pit_data = pit_data.rename(columns={'Team': 'Tm'})
pit_data = pit_data[['key_bbref', 'Tm', 'GF', 'SHO', 'SV', 'IP', 'BK', 'WP']]
pit_data = pit_data.fillna(0)
return pit_data
def get_player_ids(plays: pd.DataFrame, which: Literal['batters', 'pitchers']) -> pd.DataFrame:
RETRO_PLAYERS = pd.read_csv(f'{RETRO_FILE_PATH}{PERSONNEL_FILENAME}')
id_key = 'batter_id' if which == 'batters' else 'pitcher_id'
players = pd.DataFrame()
unique_players = pd.Series(plays[id_key].unique()).to_frame('id')
players = pd.merge(
left=RETRO_PLAYERS,
right=unique_players,
how='right',
left_on='id',
right_on='id'
).rename(columns={'id': id_key})
if PLAYER_DESCRIPTION not in ['Live', '1998']:
msg = f'Player description is *{PLAYER_DESCRIPTION}* so dropping players not in PROMO_INCLUSION_RETRO_IDS'
print(msg)
logger.info(msg)
# players = players.drop(players[players.index not in PROMO_INCLUSION_RETRO_IDS].index)
players = players[players[id_key].isin(PROMO_INCLUSION_RETRO_IDS)]
def get_pids(row):
# return get_all_pybaseball_ids([row[id_key]], 'retro', full_name=f'{row["use_name"]} {row["last_name"]}')
pull = pb.playerid_reverse_lookup([row[id_key]], key_type='retro')
if len(pull.values) == 0:
print(f'Could not find id {row[id_key]} in pybaseball lookup')
return pull.loc[0][['key_mlbam', 'key_retro', 'key_bbref', 'key_fangraphs']]
players = players[[id_key, 'last_name', 'use_name']]
start_time = datetime.datetime.now()
other_ids = players.apply(get_pids, axis=1)
end_time = datetime.datetime.now()
print(f'ID lookup: {(end_time - start_time).total_seconds():.2f}s')
def clean_first(row):
return sanitize_name(row['use_name'])
def clean_last(row):
return sanitize_name(row['last_name'])
players['use_name'] = players.apply(clean_first, axis=1)
players['last_name'] = players.apply(clean_last, axis=1)
players = pd.merge(
left=players,
right=other_ids,
left_on=id_key,
right_on='key_retro'
)
players = players.set_index(id_key)
def get_bat_hand(row):
pa_vl = plays[(plays.batter_id == row['key_retro']) & (plays.pitcher_hand == 'l')].groupby('result_batter_hand').count()['game_id'].astype(int)
pa_vr = plays[(plays.batter_id == row['key_retro']) & (plays.pitcher_hand == 'r')].groupby('result_batter_hand').count()['game_id'].astype(int)
l_vs_l = 0 if 'l' not in pa_vl else pa_vl['l']
l_vs_r = 0 if 'l' not in pa_vr else pa_vr['l']
r_vs_l = 0 if 'r' not in pa_vl else pa_vl['r']
r_vs_r = 0 if 'r' not in pa_vr else pa_vr['r']
if sum([l_vs_l, l_vs_r]) == 0 and sum([r_vs_l, r_vs_r]) > 0:
return 'R'
elif sum([l_vs_l, l_vs_r]) > 0 and sum([r_vs_l, r_vs_r]) == 0:
return 'L'
if sum([l_vs_l, l_vs_r, r_vs_l, r_vs_r]) < 10:
if sum([l_vs_l, l_vs_r]) > sum([r_vs_l, r_vs_r]):
return 'L'
else:
return 'R'
else:
return 'S'
def get_pitch_hand(row):
first_event = plays.drop_duplicates('pitcher_id').loc[plays.pitcher_id == row['key_retro'], 'pitcher_hand']
return first_event.item()
if which == 'batters':
players['bat_hand'] = players.apply(get_bat_hand, axis=1)
elif which == 'pitchers':
players['pitch_hand'] = players.apply(get_pitch_hand, axis=1)
return players
def get_base_batting_df(file_path: str, start_date: int, end_date: int) -> list[pd.DataFrame, pd.DataFrame]:
all_plays = load_retrosheet_csv(file_path)
all_plays['date'] = all_plays['game_id'].str[3:-1].astype(int)
date_plays = all_plays[(all_plays.date >= start_date) & (all_plays.date <= end_date)]
all_player_ids = get_player_ids(all_plays, 'batters')
pal_series = date_plays[(date_plays.batter_event == 't') & (date_plays.pitcher_hand == 'l')].groupby('batter_id').count()['event_type'].astype(int).rename('PA_vL')
bs = pd.concat([all_player_ids, pal_series], axis=1)
par_series = date_plays[(date_plays.batter_event == 't') & (date_plays.pitcher_hand == 'r')].groupby('batter_id').count()['event_type'].astype(int).rename('PA_vR')
bs = pd.concat([bs, par_series], axis=1)
abl_series = date_plays[(date_plays.ab == 't') & (date_plays.pitcher_hand == 'l')].groupby('batter_id').count()['event_type'].astype(int).rename('AB_vL')
bs = pd.concat([bs, abl_series], axis=1)
abr_series = date_plays[(date_plays.ab == 't') & (date_plays.pitcher_hand == 'r')].groupby('batter_id').count()['event_type'].astype(int).rename('AB_vR')
bs = pd.concat([bs, abr_series], axis=1)
core_df = bs.dropna().query(f'PA_vL >= {MIN_PA_VL} & PA_vR >= {MIN_PA_VR}')
if LAST_WEEK_RATIO == 0.0 and LAST_TWOWEEKS_RATIO == 0.0 and LAST_MONTH_RATIO == 0.0:
return [date_plays, core_df]
base_num_weeks = weeks_between(start_date, end_date)
if LAST_WEEK_RATIO > 0:
new_start = date_math(end_date, '-', day_delta=7)
week_plays = date_plays[(date_plays.date >= int(new_start)) & (date_plays.date <= end_date)]
copies = round(base_num_weeks * LAST_WEEK_RATIO)
for x in range(copies):
date_plays = pd.concat([date_plays, week_plays], ignore_index=True)
if LAST_TWOWEEKS_RATIO > 0:
new_start = date_math(end_date, '-', day_delta=14)
week_plays = date_plays[(date_plays.date >= int(new_start)) & (date_plays.date <= end_date)]
copies = round(base_num_weeks * LAST_TWOWEEKS_RATIO)
for x in range(copies):
date_plays = pd.concat([date_plays, week_plays], ignore_index=True)
if LAST_MONTH_RATIO > 0:
new_start = date_math(end_date, '-', month_delta=1)
week_plays = date_plays[(date_plays.date >= int(new_start)) & (date_plays.date <= end_date)]
copies = round(base_num_weeks * LAST_MONTH_RATIO)
for x in range(copies):
date_plays = pd.concat([date_plays, week_plays], ignore_index=True)
core_df = core_df.drop(columns=['PA_vL', 'PA_vR', 'AB_vL', 'AB_vR'])
pal_series = date_plays[(date_plays.batter_event == 't') & (date_plays.pitcher_hand == 'l')].groupby('batter_id').count()['event_type'].astype(int).rename('PA_vL')
core_df['PA_vL'] = pal_series
par_series = date_plays[(date_plays.batter_event == 't') & (date_plays.pitcher_hand == 'r')].groupby('batter_id').count()['event_type'].astype(int).rename('PA_vR')
core_df['PA_vR'] = par_series
abl_series = date_plays[(date_plays.ab == 't') & (date_plays.pitcher_hand == 'l')].groupby('batter_id').count()['event_type'].astype(int).rename('AB_vL')
core_df['AB_vL'] = abl_series
abr_series = date_plays[(date_plays.ab == 't') & (date_plays.pitcher_hand == 'r')].groupby('batter_id').count()['event_type'].astype(int).rename('AB_vR')
core_df['AB_vR'] = abr_series
return [date_plays, core_df]
def get_base_pitching_df(file_path: str, start_date: int, end_date: int) -> list[pd.DataFrame, pd.DataFrame]:
all_plays = load_retrosheet_csv(file_path)
all_plays['date'] = all_plays['game_id'].str[3:-1].astype(int)
date_plays = all_plays[(all_plays.date >= start_date) & (all_plays.date <= end_date)]
ps = get_player_ids(all_plays, 'pitchers')
tbfl_series = date_plays[(date_plays.batter_event == 't') & (date_plays.batter_hand == 'l')].groupby('pitcher_id').count()['event_type'].astype(int).rename('TBF_vL')
ps = pd.concat([ps, tbfl_series], axis=1)
tbfr_series = date_plays[(date_plays.batter_event == 't') & (date_plays.batter_hand == 'r')].groupby('pitcher_id').count()['event_type'].astype(int).rename('TBF_vR')
ps = pd.concat([ps, tbfr_series], axis=1)
abl_series = date_plays[(date_plays.ab == 't') & (date_plays.batter_hand == 'l')].groupby('pitcher_id').count()['event_type'].astype(int).rename('AB_vL')
ps = pd.concat([ps, abl_series], axis=1)
abr_series = date_plays[(date_plays.ab == 't') & (date_plays.batter_hand == 'r')].groupby('pitcher_id').count()['event_type'].astype(int).rename('AB_vR')
ps = pd.concat([ps, abr_series], axis=1)
if PLAYER_DESCRIPTION in ['Live', '1998']:
core_df = ps.dropna().query(f'TBF_vL >= {MIN_TBF_VL} & TBF_vR >= {MIN_TBF_VR}')
else:
core_df = ps.dropna()
if LAST_WEEK_RATIO == 0.0 and LAST_TWOWEEKS_RATIO == 0.0 and LAST_MONTH_RATIO == 0.0:
return [date_plays, core_df]
base_num_weeks = weeks_between(start_date, end_date)
if LAST_WEEK_RATIO > 0:
new_start = date_math(end_date, '-', day_delta=7)
week_plays = date_plays[(date_plays.date >= int(new_start)) & (date_plays.date <= end_date)]
copies = round(base_num_weeks * LAST_WEEK_RATIO)
for x in range(copies):
date_plays = pd.concat([date_plays, week_plays], ignore_index=True)
if LAST_TWOWEEKS_RATIO > 0:
new_start = date_math(end_date, '-', day_delta=14)
week_plays = date_plays[(date_plays.date >= int(new_start)) & (date_plays.date <= end_date)]
copies = round(base_num_weeks * LAST_TWOWEEKS_RATIO)
for x in range(copies):
date_plays = pd.concat([date_plays, week_plays], ignore_index=True)
if LAST_MONTH_RATIO > 0:
new_start = date_math(end_date, '-', month_delta=1)
week_plays = date_plays[(date_plays.date >= int(new_start)) & (date_plays.date <= end_date)]
copies = round(base_num_weeks * LAST_MONTH_RATIO)
for x in range(copies):
date_plays = pd.concat([date_plays, week_plays], ignore_index=True)
core_df = core_df.drop(columns=['TBF_vL', 'TBF_vR', 'AB_vL', 'AB_vR'])
tbfl_series = date_plays[(date_plays.batter_event == 't') & (date_plays.batter_hand == 'l')].groupby('pitcher_id').count()['event_type'].astype(int).rename('TBF_vL')
core_df['TBF_vL'] = tbfl_series
tbfr_series = date_plays[(date_plays.batter_event == 't') & (date_plays.batter_hand == 'r')].groupby('pitcher_id').count()['event_type'].astype(int).rename('TBF_vR')
core_df['TBF_vR'] = tbfr_series
abl_series = date_plays[(date_plays.ab == 't') & (date_plays.batter_hand == 'l')].groupby('pitcher_id').count()['event_type'].astype(int).rename('AB_vL')
core_df['AB_vL'] = abl_series
abr_series = date_plays[(date_plays.ab == 't') & (date_plays.batter_hand == 'r')].groupby('pitcher_id').count()['event_type'].astype(int).rename('AB_vR')
core_df['AB_vR'] = abr_series
return [date_plays, core_df]
def get_med_vL(row):
high = 0.9 - row['Hard%_vL']
low = (row['SLG_vL'] - row['AVG_vL']) * 1.5
return round(max(min(high, low),0.1), 5)
def get_med_vR(row):
high = 0.9 - row['Hard%_vR']
low = (row['SLG_vR'] - row['AVG_vR']) * 1.5
return round(max(min(high, low),0.1), 5)
def get_batting_stats_by_date(retro_file_path, start_date: int, end_date: int) -> pd.DataFrame:
start = datetime.datetime.now()
all_plays, batting_stats = get_base_batting_df(retro_file_path, start_date, end_date)
print(f'Get base dataframe: {(datetime.datetime.now() - start).total_seconds():.2f}s')
start = datetime.datetime.now()
all_player_ids = batting_stats['key_retro']
logging.info(f'all_player_ids: {all_player_ids}')
all_plays = all_plays[all_plays['batter_id'].isin(all_player_ids)]
print(f'Shrink all_plays: {(datetime.datetime.now() - start).total_seconds():.2f}s')
# Basic counting stats
start = datetime.datetime.now()
for event_type, vs_hand, col_name in [
('home run', 'r', 'HR_vR'),
('home run', 'l', 'HR_vL'),
('single', 'r', '1B_vR'),
('single', 'l', '1B_vL'),
('double', 'r', '2B_vR'),
('double', 'l', '2B_vL'),
('triple', 'r', '3B_vR'),
('triple', 'l', '3B_vL'),
('walk', 'r', 'BB_vR'),
('walk', 'l', 'BB_vL'),
('strikeout', 'r', 'SO_vR'),
('strikeout', 'l', 'SO_vL'),
('hit by pitch', 'r', 'HBP_vR'),
('hit by pitch', 'l', 'HBP_vL')
]:
this_series = get_batting_result_series(all_plays, event_type, vs_hand, col_name)
batting_stats[col_name] = this_series
print(f'Count basic stats: {(datetime.datetime.now() - start).total_seconds():.2f}s')
# Bespoke counting stats
start = datetime.datetime.now()
def get_fb_vl(row):
return all_plays[(all_plays.batter_id == row['key_retro']) & (all_plays.batted_ball_type == 'f') & (all_plays.pitcher_hand == 'l')].count()['event_type'].astype(int)
def get_fb_vr(row):
return all_plays[(all_plays.batter_id == row['key_retro']) & (all_plays.batted_ball_type == 'f') & (all_plays.pitcher_hand == 'r')].count()['event_type'].astype(int)
def get_gb_vl(row):
return all_plays[(all_plays.batter_id == row['key_retro']) & (all_plays.batted_ball_type == 'G') & (all_plays.pitcher_hand == 'l')].count()['event_type'].astype(int)
def get_gb_vr(row):
return all_plays[(all_plays.batter_id == row['key_retro']) & (all_plays.batted_ball_type == 'G') & (all_plays.pitcher_hand == 'r')].count()['event_type'].astype(int)
def get_ld_vl(row):
return all_plays[(all_plays.batter_id == row['key_retro']) & (all_plays.batted_ball_type == 'l') & (all_plays.pitcher_hand == 'l')].count()['event_type'].astype(int)
def get_ld_vr(row):
return all_plays[(all_plays.batter_id == row['key_retro']) & (all_plays.batted_ball_type == 'l') & (all_plays.pitcher_hand == 'r')].count()['event_type'].astype(int)
def get_gdp_vl(row):
dp = all_plays[(all_plays.batter_id == row['key_retro']) & (all_plays.batter_event == 't') & (all_plays.pitcher_hand == 'l') & (all_plays.dp == 't')].count()['event_type'].astype(int)
tp = all_plays[(all_plays.batter_id == row['key_retro']) & (all_plays.batter_event == 't') & (all_plays.pitcher_hand == 'l') & (all_plays.tp == 't')].count()['event_type'].astype(int)
return dp + tp
def get_gdp_vr(row):
dp = all_plays[(all_plays.batter_id == row['key_retro']) & (all_plays.batter_event == 't') & (all_plays.pitcher_hand == 'r') & (all_plays.dp == 't')].count()['event_type'].astype(int)
tp = all_plays[(all_plays.batter_id == row['key_retro']) & (all_plays.batter_event == 't') & (all_plays.pitcher_hand == 'r') & (all_plays.tp == 't')].count()['event_type'].astype(int)
return dp + tp
def get_bunt(row):
return all_plays[(all_plays.batter_id == row['key_retro']) & (all_plays.bunt == 't')].count()['event_type'].astype(int)
batting_stats['FB_vL'] = batting_stats.apply(get_fb_vl, axis=1)
batting_stats['FB_vR'] = batting_stats.apply(get_fb_vr, axis=1)
batting_stats['GB_vL'] = batting_stats.apply(get_gb_vl, axis=1)
batting_stats['GB_vR'] = batting_stats.apply(get_gb_vr, axis=1)
batting_stats['LD_vL'] = batting_stats.apply(get_ld_vl, axis=1)
batting_stats['LD_vR'] = batting_stats.apply(get_ld_vr, axis=1)
batting_stats['GDP_vL'] = batting_stats.apply(get_gdp_vl, axis=1)
batting_stats['GDP_vR'] = batting_stats.apply(get_gdp_vr, axis=1)
batting_stats['Bunts'] = batting_stats.apply(get_bunt, axis=1)
print(f'Custom counting stats: {(datetime.datetime.now() - start).total_seconds():.2f}s')
# Infield Hit %
ifh_vl = all_plays[(all_plays.hit_val.str.contains('1|2|3')) & (all_plays.pitcher_hand == 'l') & (all_plays.hit_location.str.contains('1|2|3|4|5|6')) & (~all_plays.hit_location.str.contains('D', na=False))].groupby('batter_id').count()['event_type'].astype(int).rename('ifh_vL')
ifh_vr = all_plays[(all_plays.hit_val.str.contains('1|2|3')) & (all_plays.pitcher_hand == 'r') & (all_plays.hit_location.str.contains('1|2|3|4|5|6')) & (~all_plays.hit_location.str.contains('D', na=False))].groupby('batter_id').count()['event_type'].astype(int).rename('ifh_vR')
batting_stats['ifh_vL'] = ifh_vl
batting_stats['ifh_vR'] = ifh_vr
def get_pull_vl(row):
pull_loc = '5|7' if row['bat_hand'] != 'L' else '3|9'
x = all_plays[(all_plays.batter_id == row['key_retro']) & (all_plays.pitcher_hand == 'l') & (all_plays.hit_location.str.contains(pull_loc))].count()['event_type'].astype(int)
return x
def get_pull_vr(row):
pull_loc = '5|7' if row['bat_hand'] == 'R' else '3|9'
x = all_plays[(all_plays.batter_id == row['key_retro']) & (all_plays.pitcher_hand == 'r') & (all_plays.hit_location.str.contains(pull_loc))].count()['event_type'].astype(int)
return x
# Bespoke Queries
batting_stats['pull_vL'] = batting_stats.apply(get_pull_vl, axis=1)
batting_stats['pull_vR'] = batting_stats.apply(get_pull_vr, axis=1)
center_vl = all_plays[(all_plays.pitcher_hand == 'l') & (all_plays.hit_location.str.contains('1|4|6|8'))].groupby('batter_id').count()['event_type'].astype(int).rename('center_vl')
center_vr = all_plays[(all_plays.pitcher_hand == 'r') & (all_plays.hit_location.str.contains('1|4|6|8'))].groupby('batter_id').count()['event_type'].astype(int).rename('center_vr')
batting_stats['center_vL'] = center_vl
batting_stats['center_vR'] = center_vr
oppo_vl = all_plays[(all_plays.pitcher_hand == 'l') & (all_plays.hit_location.str.contains('5|7'))].groupby('batter_id').count()['event_type'].astype(int).rename('oppo_vL')
oppo_vr = all_plays[(all_plays.pitcher_hand == 'r') & (all_plays.hit_location.str.contains('5|7'))].groupby('batter_id').count()['event_type'].astype(int).rename('oppo_vR')
batting_stats['oppo_vL'] = oppo_vl
batting_stats['oppo_vR'] = oppo_vr
# fill na to 0 following counting stats
batting_stats = batting_stats.fillna(0)
# Calculated Fields
start = datetime.datetime.now()
batting_stats['H_vL'] = batting_stats['1B_vL'] + batting_stats['2B_vL'] + batting_stats['3B_vL'] + batting_stats['HR_vL']
batting_stats['H_vR'] = batting_stats['1B_vR'] + batting_stats['2B_vR'] + batting_stats['3B_vR'] + batting_stats['HR_vR']
batting_stats['AVG_vL'] = round(batting_stats['H_vL'] / batting_stats['AB_vL'], 5)
batting_stats['AVG_vR'] = round(batting_stats['H_vR'] / batting_stats['AB_vR'], 5)
batting_stats['OBP_vL'] = round((batting_stats['H_vL'] + batting_stats['BB_vL'] + batting_stats['HBP_vL']) / batting_stats['PA_vL'], 5)
batting_stats['OBP_vR'] = round((batting_stats['H_vR'] + batting_stats['BB_vR'] + batting_stats['HBP_vR']) / batting_stats['PA_vR'], 5)
batting_stats['SLG_vL'] = round((batting_stats['1B_vL'] + batting_stats['2B_vL'] * 2 + batting_stats['3B_vL'] * 3 + batting_stats['HR_vL'] * 4) / batting_stats['AB_vL'], 5)
batting_stats['SLG_vR'] = round((batting_stats['1B_vR'] + batting_stats['2B_vR'] * 2 + batting_stats['3B_vR'] * 3 + batting_stats['HR_vR'] * 4) / batting_stats['AB_vR'], 5)
batting_stats['HR/FB_vL'] = round(batting_stats['HR_vL'] / batting_stats['FB_vL'], 5)
batting_stats['HR/FB_vR'] = round(batting_stats['HR_vR'] / batting_stats['FB_vR'], 5)
batting_stats['FB%_vL'] = round(batting_stats['FB_vL'] / (batting_stats['FB_vL'] + batting_stats['GB_vL'] + batting_stats['LD_vL']), 5)
batting_stats['FB%_vR'] = round(batting_stats['FB_vR'] / (batting_stats['FB_vR'] + batting_stats['GB_vR'] + batting_stats['LD_vR']), 5)
batting_stats['GB%_vL'] = round(batting_stats['GB_vL'] / (batting_stats['FB_vL'] + batting_stats['GB_vL'] + batting_stats['LD_vL']), 5)
batting_stats['GB%_vR'] = round(batting_stats['GB_vR'] / (batting_stats['FB_vR'] + batting_stats['GB_vR'] + batting_stats['LD_vR']), 5)
batting_stats['LD%_vL'] = round(batting_stats['LD_vL'] / (batting_stats['FB_vL'] + batting_stats['GB_vL'] + batting_stats['LD_vL']), 5)
batting_stats['LD%_vR'] = round(batting_stats['LD_vR'] / (batting_stats['FB_vR'] + batting_stats['GB_vR'] + batting_stats['LD_vR']), 5)
batting_stats['Hard%_vL'] = round(0.2 + batting_stats['SLG_vL'] - batting_stats['AVG_vL'], 5)
batting_stats['Hard%_vR'] = round(0.2 + batting_stats['SLG_vR'] - batting_stats['AVG_vR'], 5)
# def get_med_vL(row):
# high = 0.9 - row['Hard%_vL']
# low = (row['SLG_vL'] - row['AVG_vL']) * 1.5
# return round(max(min(high, low),0.1), 5)
# def get_med_vR(row):
# high = 0.9 - row['Hard%_vR']
# low = (row['SLG_vR'] - row['AVG_vR']) * 1.5
# return round(max(min(high, low),0.1), 5)
batting_stats['Med%_vL'] = batting_stats.apply(get_med_vL, axis=1)
batting_stats['Med%_vR'] = batting_stats.apply(get_med_vR, axis=1)
batting_stats['Soft%_vL'] = round(1 - batting_stats['Hard%_vL'] - batting_stats['Med%_vL'], 5)
batting_stats['Soft%_vR'] = round(1 - batting_stats['Hard%_vR'] - batting_stats['Med%_vR'], 5)
batting_stats['IFH%_vL'] = round(batting_stats['ifh_vL'] / batting_stats['H_vL'], 5)
batting_stats['IFH%_vR'] = round(batting_stats['ifh_vR'] / batting_stats['H_vR'], 5)
pull_val = round(batting_stats['pull_vL'] / (batting_stats['pull_vL'] + batting_stats['center_vL'] + batting_stats['oppo_vL']), 5)
batting_stats['Pull%_vL'] = pull_val.clip(0.1, 0.6)
pull_val = round(batting_stats['pull_vR'] / (batting_stats['pull_vR'] + batting_stats['center_vR'] + batting_stats['oppo_vR']), 5)
batting_stats['Pull%_vR'] = pull_val.clip(0.1, 0.6)
cent_val = round(batting_stats['center_vL'] / (batting_stats['pull_vL'] + batting_stats['center_vL'] + batting_stats['oppo_vL']), 5)
batting_stats['Cent%_vL'] = cent_val.clip(0.1, 0.6)
cent_val = round(batting_stats['center_vL'] / (batting_stats['pull_vR'] + batting_stats['center_vR'] + batting_stats['oppo_vR']), 5)
batting_stats['Cent%_vR'] = cent_val.clip(0.1, 0.6)
batting_stats['Oppo%_vL'] = round(1 - batting_stats['Pull%_vL'] - batting_stats['Cent%_vL'], 5)
batting_stats['Oppo%_vR'] = round(1 - batting_stats['Pull%_vR'] - batting_stats['Cent%_vR'], 5)
batting_stats = batting_stats.fillna(0)
print(f'Calculated fields: {(datetime.datetime.now() - start).total_seconds():.2f}s')
return batting_stats
def get_pitching_stats_by_date(retro_file_path, start_date: int, end_date: int) -> pd.DataFrame:
start = datetime.datetime.now()
all_plays, pitching_stats = get_base_pitching_df(retro_file_path, start_date, end_date)
print(f'Get base dataframe: {(datetime.datetime.now() - start).total_seconds():.2f}s')
start = datetime.datetime.now()
all_player_ids = pitching_stats['key_retro']
all_plays = all_plays[all_plays['pitcher_id'].isin(all_player_ids)]
print(f'Shrink all_plays: {(datetime.datetime.now() - start).total_seconds():.2f}s')
# Basic counting stats
start = datetime.datetime.now()
for event_type, vs_hand, col_name in [
('home run', 'r', 'HR_vR'),
('home run', 'l', 'HR_vL'),
('single', 'r', '1B_vR'),
('single', 'l', '1B_vL'),
('double', 'r', '2B_vR'),
('double', 'l', '2B_vL'),
('triple', 'r', '3B_vR'),
('triple', 'l', '3B_vL'),
('walk', 'r', 'BB_vR'),
('walk', 'l', 'BB_vL'),
('strikeout', 'r', 'SO_vR'),
('strikeout', 'l', 'SO_vL'),
('hit by pitch', 'r', 'HBP_vR'),
('hit by pitch', 'l', 'HBP_vL'),
('intentional walk', 'l', 'IBB_vL'),
('intentional walk', 'r', 'IBB_vR')
]:
this_series = get_pitching_result_series(all_plays, event_type, vs_hand, col_name)
pitching_stats[col_name] = this_series
print(f'Count basic stats: {(datetime.datetime.now() - start).total_seconds():.2f}s')
pitching_stats = pitching_stats.fillna(0)
# Bespoke counting stats
start = datetime.datetime.now()
def get_fb_vl(row):
return all_plays[(all_plays.pitcher_id == row['key_retro']) & (all_plays.batted_ball_type == 'f') & (all_plays.batter_hand == 'l')].count()['event_type'].astype(int)
def get_fb_vr(row):
return all_plays[(all_plays.pitcher_id == row['key_retro']) & (all_plays.batted_ball_type == 'f') & (all_plays.batter_hand == 'r')].count()['event_type'].astype(int)
def get_gb_vl(row):
return all_plays[(all_plays.pitcher_id == row['key_retro']) & (all_plays.batted_ball_type == 'G') & (all_plays.batter_hand == 'l')].count()['event_type'].astype(int)
def get_gb_vr(row):
return all_plays[(all_plays.pitcher_id == row['key_retro']) & (all_plays.batted_ball_type == 'G') & (all_plays.batter_hand == 'r')].count()['event_type'].astype(int)
def get_ld_vl(row):
return all_plays[(all_plays.pitcher_id == row['key_retro']) & (all_plays.batted_ball_type == 'l') & (all_plays.pitcher_hand == 'l')].count()['event_type'].astype(int)
def get_ld_vr(row):
return all_plays[(all_plays.pitcher_id == row['key_retro']) & (all_plays.batted_ball_type == 'l') & (all_plays.pitcher_hand == 'r')].count()['event_type'].astype(int)
pitching_stats['FB_vL'] = pitching_stats.apply(get_fb_vl, axis=1)
pitching_stats['FB_vR'] = pitching_stats.apply(get_fb_vr, axis=1)
pitching_stats['GB_vL'] = pitching_stats.apply(get_gb_vl, axis=1)
pitching_stats['GB_vR'] = pitching_stats.apply(get_gb_vr, axis=1)
pitching_stats['LD_vL'] = pitching_stats.apply(get_ld_vl, axis=1)
pitching_stats['LD_vR'] = pitching_stats.apply(get_ld_vr, axis=1)
pitching_stats['H_vL'] = pitching_stats['1B_vL'] + pitching_stats['2B_vL'] + pitching_stats['3B_vL'] + pitching_stats['HR_vL']
pitching_stats['H_vR'] = pitching_stats['1B_vR'] + pitching_stats['2B_vR'] + pitching_stats['3B_vR'] + pitching_stats['HR_vR']
print(f'Custom counting stats: {(datetime.datetime.now() - start).total_seconds():.2f}s')
# Calculated Fields
"""
Oppo%_vL & R
"""
start = datetime.datetime.now()
pitching_stats['AVG_vL'] = round(pitching_stats['H_vL'] / pitching_stats['AB_vL'], 5)
pitching_stats['AVG_vR'] = round(pitching_stats['H_vR'] / pitching_stats['AB_vR'], 5)
pitching_stats['OBP_vL'] = round((pitching_stats['H_vL'] + pitching_stats['BB_vL'] + pitching_stats['HBP_vL'] + pitching_stats['IBB_vL']) / pitching_stats['TBF_vL'], 5)
pitching_stats['OBP_vR'] = round((pitching_stats['H_vR'] + pitching_stats['BB_vR'] + pitching_stats['HBP_vR'] + pitching_stats['IBB_vR']) / pitching_stats['TBF_vR'], 5)
pitching_stats['SLG_vL'] = round((pitching_stats['1B_vL'] + pitching_stats['2B_vL'] * 2 + pitching_stats['3B_vL'] * 3 + pitching_stats['HR_vL'] * 4) / pitching_stats['AB_vL'], 5)
pitching_stats['SLG_vR'] = round((pitching_stats['1B_vR'] + pitching_stats['2B_vR'] * 2 + pitching_stats['3B_vR'] * 3 + pitching_stats['HR_vR'] * 4) / pitching_stats['AB_vR'], 5)
pitching_stats['HR/FB_vL'] = round(pitching_stats['HR_vL'] / pitching_stats['FB_vL'], 5)
pitching_stats['HR/FB_vR'] = round(pitching_stats['HR_vR'] / pitching_stats['FB_vR'], 5)
pitching_stats['Hard%_vL'] = round(0.2 + pitching_stats['SLG_vL'] - pitching_stats['AVG_vL'], 5)
pitching_stats['Hard%_vR'] = round(0.2 + pitching_stats['SLG_vR'] - pitching_stats['AVG_vR'], 5)
pitching_stats['Med%_vL'] = pitching_stats.apply(get_med_vL, axis=1)
pitching_stats['Med%_vR'] = pitching_stats.apply(get_med_vR, axis=1)
pitching_stats['Soft%_vL'] = round(1 - pitching_stats['Hard%_vL'] - pitching_stats['Med%_vL'], 5)
pitching_stats['Soft%_vR'] = round(1 - pitching_stats['Hard%_vR'] - pitching_stats['Med%_vR'], 5)
pitching_stats['FB%_vL'] = round(pitching_stats['FB_vL'] / (pitching_stats['FB_vL'] + pitching_stats['GB_vL'] + pitching_stats['LD_vL']), 5)
pitching_stats['FB%_vR'] = round(pitching_stats['FB_vR'] / (pitching_stats['FB_vR'] + pitching_stats['GB_vR'] + pitching_stats['LD_vR']), 5)
pitching_stats['GB%_vL'] = round(pitching_stats['GB_vL'] / (pitching_stats['FB_vL'] + pitching_stats['GB_vL'] + pitching_stats['LD_vL']), 5)
pitching_stats['GB%_vR'] = round(pitching_stats['GB_vR'] / (pitching_stats['FB_vR'] + pitching_stats['GB_vR'] + pitching_stats['LD_vR']), 5)
def get_oppo_vl(row):
count = all_plays[(all_plays.pitcher_id == row['key_retro']) & (all_plays.batter_hand == 'l') & (all_plays.hit_location.str.contains('5|7'))].count()['event_type'].astype(int)
denom = all_plays[(all_plays.pitcher_id == row['key_retro']) & (all_plays.batter_hand == 'l') & (all_plays.batter_event == 't')].count()['event_type'].astype(int)
return round(count / denom, 5)
def get_oppo_vr(row):
count = all_plays[(all_plays.pitcher_id == row['key_retro']) & (all_plays.batter_hand == 'r') & (all_plays.hit_location.str.contains('3|9'))].count()['event_type'].astype(int)
denom = all_plays[(all_plays.pitcher_id == row['key_retro']) & (all_plays.batter_hand == 'r') & (all_plays.batter_event == 't')].count()['event_type'].astype(int)
return round(count / denom, 5)
pitching_stats['Oppo%_vL'] = pitching_stats.apply(get_oppo_vl, axis=1)
pitching_stats['Oppo%_vR'] = pitching_stats.apply(get_oppo_vr, axis=1)
pitching_stats = pitching_stats.fillna(0)
print(f'Calculated fields: {(datetime.datetime.now() - start).total_seconds():.2f}s')
return pitching_stats
def calc_batting_cards(bs: pd.DataFrame, season_pct: float) -> pd.DataFrame:
def create_batting_card(row):
steal_data = cba.stealing(
chances=int(row['SBO']),
sb2s=int(row['SB2']),
cs2s=int(row['CS2']),
sb3s=int(row['SB3']),
cs3s=int(row['CS3']),
season_pct=1.0
)
y = pd.DataFrame({
'key_bbref': [row['key_bbref']],
'steal_low': [steal_data[0]],
'steal_high': [steal_data[1]],
'steal_auto': [steal_data[2]],
'steal_jump': [steal_data[3]],
'hit_and_run': [cba.hit_and_run(
row['AB_vL'], row['AB_vR'], row['H_vL'], row['H_vR'],
row['HR_vL'], row['HR_vR'], row['SO_vL'], row['SO_vR']
)],
'bunt': [cba.bunting(row['Bunts'], season_pct)],
'running': [cba.running(row['XBT%'])],
'hand': [row['bat_hand']],
})
return y.loc[0]
all_cards = bs.apply(create_batting_card, axis=1)
all_cards = all_cards.set_index('key_bbref')
return all_cards
def calc_pitching_cards(ps: pd.DataFrame, season_pct: float) -> pd.DataFrame:
def create_pitching_card(row):
pow_data = cde.pow_ratings(row['IP'], row['GS'], row['G'])
y = pd.DataFrame({
"key_bbref": [row['key_bbref']],
"balk": [cpi.balks(row['BK'], row['IP'], season_pct)],
"wild_pitch": [cpi.wild_pitches(row['WP'], row['IP'], season_pct)],
"hold": [cde.hold_pitcher(str(row['caught_stealing_perc']), int(row['pickoffs']), season_pct)],
"starter_rating": [pow_data[0]],
"relief_rating": [pow_data[1]],
"closer_rating": [cpi.closer_rating(int(row['GF']), int(row['SV']), int(row['G']))],
"batting": [f'#1W{row["pitch_hand"].upper()}-C']
})
return y.loc[0]
all_cards = ps.apply(create_pitching_card, axis=1)
all_cards = all_cards.set_index('key_bbref')
return all_cards
def calc_batter_ratings(bs: pd.DataFrame) -> pd.DataFrame:
def create_batting_rating(row):
if row['key_bbref'] == 'galaran01':
pass
ratings = cba.get_batter_ratings(row)
ops_vl = ratings[0]['obp'] + ratings[0]['slg']
ops_vr = ratings[1]['obp'] + ratings[1]['slg']
total_ops = (ops_vl + ops_vr + min(ops_vr, ops_vl)) / 3
def calc_cost(total_ops, base_cost, base_ops, max_delta) -> int:
delta = ((total_ops - base_ops) / 0.1) * 2
if delta < 1:
delta = (max_delta * (1 - (total_ops / base_ops))) * -0.1
final_cost = base_cost + (max_delta * delta)
return round(final_cost)
if total_ops >= 1.2:
rarity_id = 99
cost = calc_cost(total_ops, base_cost=2400, base_ops=1.215, max_delta=810)
elif total_ops >= 1:
rarity_id = 1
cost = calc_cost(total_ops, base_cost=810, base_ops=1.05, max_delta=270)
elif total_ops >= 0.9:
rarity_id = 2
cost = calc_cost(total_ops, base_cost=270, base_ops=0.95, max_delta=90)
elif total_ops >= 0.8:
rarity_id = 3
cost = calc_cost(total_ops, base_cost=90, base_ops=0.85, max_delta=30)
elif total_ops >= 0.7:
rarity_id = 4
cost = calc_cost(total_ops, base_cost=30, base_ops=0.75, max_delta=10)
else:
rarity_id = 5
cost = calc_cost(total_ops, base_cost=10, base_ops=0.61, max_delta=8)
x = pd.DataFrame({
'key_bbref': [row['key_bbref']],
'ratings_vL': [ratings[0]],
'ratings_vR': [ratings[1]],
'ops_vL': ops_vl,
'ops_vR': ops_vr,
'total_ops': total_ops,
'rarity_id': rarity_id,
'cost': cost
})
return x.loc[0]
all_ratings = bs.apply(create_batting_rating, axis=1)
all_ratings = all_ratings.set_index('key_bbref')
return all_ratings
def calc_pitcher_ratings(ps: pd.DataFrame) -> pd.DataFrame:
def create_pitching_rating(row):
row['pitchingcard_id'] = row['key_fangraphs']
row['pitch_hand'] = row['pitch_hand'].upper()
ratings = cpi.get_pitcher_ratings(row)
ops_vl = ratings[0]['obp'] + ratings[0]['slg']
ops_vr = ratings[1]['obp'] + ratings[1]['slg']
total_ops = (ops_vl + ops_vr + min(ops_vr, ops_vl)) / 3
def calc_cost(total_ops, base_cost, base_ops, max_delta) -> int:
delta = ((base_ops - total_ops) / 0.1) * 2
if delta < -0.9:
delta = -0.95
final_cost = base_cost + (max_delta * delta)
return round(final_cost)
if row['starter_rating'] > 3:
if total_ops <= 0.4:
rarity_id = 99
cost = calc_cost(total_ops, 2400, 0.38, 810)
elif total_ops <= 0.475:
rarity_id = 1
cost = calc_cost(total_ops, 810, 0.44, 270)
elif total_ops <= 0.53:
rarity_id = 2
cost = calc_cost(total_ops, 270, 0.51, 90)
elif total_ops <= 0.6:
rarity_id = 3
cost = calc_cost(total_ops, 90, 0.575, 30)
elif total_ops <= 0.675:
rarity_id = 4
cost = calc_cost(total_ops, 30, 0.64, 10)
else:
rarity_id = 5
cost = calc_cost(total_ops, 10, 0.7, 8)
else:
if total_ops <= 0.325:
rarity_id = 99
cost = calc_cost(total_ops, 2400, 0.38, 810)
elif total_ops <= 0.4:
rarity_id = 1
cost = calc_cost(total_ops, 810, 0.44, 270)
elif total_ops <= 0.475:
rarity_id = 2
cost = calc_cost(total_ops, 270, 0.51, 90)
elif total_ops <= 0.55:
rarity_id = 3
cost = calc_cost(total_ops, 90, 0.575, 30)
elif total_ops <= 0.625:
rarity_id = 4
cost = calc_cost(total_ops, 30, 0.64, 10)
else:
rarity_id = 5
cost = calc_cost(total_ops, 10, 0.7, 8)
x = pd.DataFrame({
'key_bbref': [row['key_bbref']],
'ratings_vL': [ratings[0]],
'ratings_vR': [ratings[1]],
'ops_vL': ops_vl,
'ops_vR': ops_vr,
'total_ops': total_ops,
'rarity_id': rarity_id,
'cost': cost
})
return x.loc[0]
all_ratings = ps.apply(create_pitching_rating, axis=1)
all_ratings = all_ratings.set_index('key_bbref')
return all_ratings
def calc_positions(bs: pd.DataFrame) -> pd.DataFrame:
df_c = pd.read_csv(f'{DATA_INPUT_FILE_PATH}defense_c.csv').set_index('key_bbref')
df_1b = pd.read_csv(f'{DATA_INPUT_FILE_PATH}defense_1b.csv').set_index('key_bbref')
df_2b = pd.read_csv(f'{DATA_INPUT_FILE_PATH}defense_2b.csv').set_index('key_bbref')
df_3b = pd.read_csv(f'{DATA_INPUT_FILE_PATH}defense_3b.csv').set_index('key_bbref')
df_ss = pd.read_csv(f'{DATA_INPUT_FILE_PATH}defense_ss.csv').set_index('key_bbref')
df_lf = pd.read_csv(f'{DATA_INPUT_FILE_PATH}defense_lf.csv').set_index('key_bbref')
df_cf = pd.read_csv(f'{DATA_INPUT_FILE_PATH}defense_cf.csv').set_index('key_bbref')
df_rf = pd.read_csv(f'{DATA_INPUT_FILE_PATH}defense_rf.csv').set_index('key_bbref')
df_of = pd.read_csv(f'{DATA_INPUT_FILE_PATH}defense_of.csv').set_index('key_bbref')
season_pct = 1.0
all_pos = []
def process_pos(row):
no_data = True
for pos_df, position in [(df_1b, '1b'), (df_2b, '2b'), (df_3b, '3b'), (df_ss, 'ss')]:
if row['key_bbref'] in pos_df.index:
logger.info(f'Running {position} stats for {row["use_name"]} {row["last_name"]}')
try:
if 'bis_runs_total' in pos_df.columns:
average_range = (int(pos_df.at[row["key_bbref"], 'tz_runs_total']) +
int(pos_df.at[row["key_bbref"], 'bis_runs_total']) +
min(
int(pos_df.at[row["key_bbref"], 'tz_runs_total']),
int(pos_df.at[row["key_bbref"], 'bis_runs_total'])
)) / 3
else:
average_range = pos_df.at[row["key_bbref"], 'tz_runs_total']
if float(pos_df.at[row["key_bbref"], 'Inn_def']) >= 10.0:
all_pos.append({
"key_bbref": row['key_bbref'],
"position": position.upper(),
"innings": float(pos_df.at[row["key_bbref"], 'Inn_def']),
"range": cde.get_if_range(
pos_code=position,
tz_runs=round(average_range),
r_dp=0,
season_pct=season_pct
),
"error": cde.get_any_error(
pos_code=position,
errors=int(pos_df.at[row["key_bbref"], 'E_def']),
chances=int(pos_df.at[row["key_bbref"], 'chances']),
season_pct=season_pct
)
})
no_data = False
except Exception as e:
logger.info(f'Infield position failed: {e}')
of_arms = []
of_payloads = []
for pos_df, position in [(df_lf, 'lf'), (df_cf, 'cf'), (df_rf, 'rf')]:
if row["key_bbref"] in pos_df.index:
try:
if 'bis_runs_total' in pos_df.columns:
average_range = (int(pos_df.at[row["key_bbref"], 'tz_runs_total']) +
int(pos_df.at[row["key_bbref"], 'bis_runs_total']) +
min(
int(pos_df.at[row["key_bbref"], 'tz_runs_total']),
int(pos_df.at[row["key_bbref"], 'bis_runs_total'])
)) / 3
else:
average_range = pos_df.at[row["key_bbref"], 'tz_runs_total']
if float(pos_df.at[row["key_bbref"], 'Inn_def']) >= 10.0:
of_payloads.append({
"key_bbref": row['key_bbref'],
"position": position.upper(),
"innings": float(pos_df.at[row["key_bbref"], 'Inn_def']),
"range": cde.get_of_range(
pos_code=position,
tz_runs=round(average_range),
season_pct=season_pct
)
})
of_run_rating = 'bis_runs_outfield' if 'bis_runs_outfield' in pos_df.columns else 'tz_runs_total'
of_arms.append(int(pos_df.at[row["key_bbref"], of_run_rating]))
no_data = False
except Exception as e:
logger.info(f'Outfield position failed: {e}')
if row["key_bbref"] in df_of.index and len(of_arms) > 0 and len(of_payloads) > 0:
try:
error_rating = cde.get_any_error(
pos_code=position,
errors=int(df_of.at[row["key_bbref"], 'E_def']),
chances=int(df_of.at[row["key_bbref"], 'chances']),
season_pct=season_pct
)
arm_rating = cde.arm_outfield(of_arms)
for f in of_payloads:
f['error'] = error_rating
f['arm'] = arm_rating
all_pos.append(f)
no_data = False
except Exception as e:
logger.info(f'Outfield position failed: {e}')
if row["key_bbref"] in df_c.index:
try:
run_rating = 'bis_runs_catcher_sb' if 'bis_runs_catcher_sb' in df_c else 'tz_runs_catcher'
if df_c.at[row["key_bbref"], 'SB'] + df_c.at[row["key_bbref"], 'CS'] == 0:
arm_rating = 3
else:
arm_rating = cde.arm_catcher(
cs_pct=df_c.at[row["key_bbref"], 'caught_stealing_perc'],
raa=int(df_c.at[row["key_bbref"], run_rating]),
season_pct=season_pct
)
if float(df_c.at[row["key_bbref"], 'Inn_def']) >= 10.0:
all_pos.append({
"key_bbref": row['key_bbref'],
"position": 'C',
"innings": float(df_c.at[row["key_bbref"], 'Inn_def']),
"range": cde.range_catcher(
rs_value=int(df_c.at[row["key_bbref"], 'tz_runs_catcher']),
season_pct=season_pct
),
"error": cde.get_any_error(
pos_code='c',
errors=int(df_c.at[row["key_bbref"], 'E_def']),
chances=int(df_c.at[row["key_bbref"], 'chances']),
season_pct=season_pct
),
"arm": arm_rating,
"pb": cde.pb_catcher(
pb=int(df_c.at[row["key_bbref"], 'PB']),
innings=int(float(df_c.at[row["key_bbref"], 'Inn_def'])),
season_pct=season_pct
),
"overthrow": cde.ot_catcher(
errors=int(df_c.at[row["key_bbref"], 'E_def']),
chances=int(df_c.at[row["key_bbref"], 'chances']),
season_pct=season_pct
)
})
no_data = False
except Exception as e:
logger.info(f'Catcher position failed: {e}')
if no_data:
all_pos.append({
"key_bbref": row['key_bbref'],
"position": 'DH',
"innings": row['PA_vL'] + row['PA_vR']
})
bs.apply(process_pos, axis=1)
pos_df = pd.DataFrame(all_pos)
pos_df = pos_df.set_index('key_bbref')
return pos_df
def calc_pitcher_defense(ps: pd.DataFrame) -> pd.DataFrame:
df_p = pd.read_csv(f'{DATA_INPUT_FILE_PATH}defense_p.csv').set_index('key_bbref')
all_pos = []
def process_def(row):
if 'bis_runs_total' in df_p:
range_val = cde.range_pitcher(rs_value=int(df_p.at[row['key_bbref'], 'bis_runs_total']))
else:
range_val = cde.range_pitcher(rf_per9_value=df_p.at[row['key_bbref'], 'range_factor_per_nine'])
if row['key_bbref'] in df_p.index:
all_pos.append({
'key_bbref': row['key_bbref'],
'position': 'P',
'innings': float(df_p.at[row['key_bbref'], 'Inn_def']),
'range': range_val,
'error': cde.get_any_error(
pos_code='p',
errors=int(df_p.at[row["key_bbref"], 'E_def']),
chances=int(df_p.at[row["key_bbref"], 'chances']),
season_pct=1.0
)
})
else:
all_pos.append({
"key_bbref": int(row['key_bbref']),
"position": 'P',
"innings": 1,
"range": 5,
"error": 51
})
ps.apply(process_def, axis=1)
pos_df = pd.DataFrame(all_pos)
pos_df = pos_df.set_index('key_bbref')
return pos_df
async def get_or_post_players(bstat_df: pd.DataFrame = None, bat_rat_df: pd.DataFrame = None, def_rat_df: pd.DataFrame = None, pstat_df: pd.DataFrame = None, pit_rat_df: pd.DataFrame = None) -> pd.DataFrame:
all_players = []
player_deltas = [['player_id', 'player_name', 'old-cost', 'new-cost', 'old-rarity', 'new-rarity']]
new_players = [['player_id', 'player_name', 'cost', 'rarity', 'pos1']]
async def player_search(bbref_id: str):
p_query = await db_get('players', params=[('bbref_id', bbref_id), ('cardset_id', CARDSET_ID)])
if p_query['count'] > 0:
return p_query['players'][0]
else:
return None
async def mlb_search_or_post(retro_id: int):
mlb_query = await db_get('mlbplayers', params=[('key_retro', retro_id)])
if mlb_query['count'] > 0:
return mlb_query['players'][0]
else:
mlb_player = await db_post(
'mlbplayers/one',
payload={
'first_name': row['use_name'],
'last_name': row['last_name'],
'key_mlbam': row['key_mlbam'],
'key_fangraphs': row['key_fangraphs'],
'key_bbref': row['key_bbref'],
'key_retro': row['key_retro']
}
)
return mlb_player
def new_player_payload(row, ratings_df: pd.DataFrame):
return {
'p_name': f'{row["use_name"]} {row["last_name"]}',
'cost': f'{ratings_df.loc[row['key_bbref']]["cost"]}',
'image': f'change-me',
'mlbclub': CLUB_LIST[row['Tm']],
'franchise': FRANCHISE_LIST[row['Tm']],
'cardset_id': CARDSET_ID,
'set_num': int(float(row['key_fangraphs'])),
'rarity_id': int(ratings_df.loc[row['key_bbref']]['rarity_id']),
'description': PLAYER_DESCRIPTION,
'bbref_id': row['key_bbref'],
'fangr_id': int(float(row['key_fangraphs'])),
'mlbplayer_id': mlb_player['id']
}
def get_player_record_pos(def_rat_df: pd.DataFrame, row) -> list[str]:
all_pos = [None, None, None, None, None, None, None, None]
try:
count = 0
all_pos_df = def_rat_df.loc[row['key_bbref']].sort_values(by='innings', ascending=False)
for index, pos_row in all_pos_df.iterrows():
all_pos[count] = pos_row.position
count += 1
except KeyError:
logger.info(f'No positions found for {row['use_name']} {row['last_name']}')
all_pos[0] = 'DH'
except TypeError:
logger.info(f'Only one position found for {row['use_name']} {row['last_name']}')
all_pos[0] = def_rat_df.loc[row['key_bbref']].position
return all_pos
dev_count = 0
if bstat_df is not None and bat_rat_df is not None and def_rat_df is not None:
for index, row in bstat_df.iterrows():
if dev_count < 0:
break
p_search = await player_search(row['key_bbref'])
if p_search is not None:
if 'id' in p_search:
player_id = p_search['id']
else:
player_id = p_search['player_id']
# Update positions for existing players too
all_pos = get_player_record_pos(def_rat_df, row)
patch_params = [
('cost', f'{bat_rat_df.loc[row['key_bbref']]["cost"]}'),
('rarity_id', int(bat_rat_df.loc[row['key_bbref']]['rarity_id'])),
('image', f'{CARD_BASE_URL}{player_id}/battingcard{urllib.parse.quote("?d=")}{RELEASE_DIRECTORY}')
]
# Add position updates - set all 8 slots to clear any old positions
for x in enumerate(all_pos):
patch_params.append((f'pos_{x[0] + 1}', x[1]))
new_player = await db_patch('players', object_id=player_id, params=patch_params)
new_player['bbref_id'] = row['key_bbref']
all_players.append(new_player)
player_deltas.append([
new_player['player_id'], new_player['p_name'], p_search['cost'], new_player['cost'], p_search['rarity']['name'], new_player['rarity']['name']
])
else:
mlb_player = await mlb_search_or_post(row['key_retro'])
player_payload = new_player_payload(row, bat_rat_df)
all_pos = get_player_record_pos(def_rat_df, row)
for x in enumerate(all_pos):
player_payload[f'pos_{x[0] + 1}'] = x[1]
new_player = await db_post('players', payload=player_payload)
if 'id' in new_player:
player_id = new_player['id']
else:
player_id = new_player['player_id']
new_player = await db_patch('players', object_id=player_id, params=[('image', f'{CARD_BASE_URL}{player_id}/battingcard{urllib.parse.quote("?d=")}{RELEASE_DIRECTORY}')])
if 'paperdex' in new_player:
del new_player['paperdex']
# all_bbref_ids.append(row['key_bbref'])
# all_player_ids.append(player_id)
new_player['bbref_id'] = row['key_bbref']
all_players.append(new_player)
new_players.append([new_player['player_id'], new_player['p_name'], new_player['cost'], new_player['rarity']['name'], new_player['pos_1']])
dev_count += 1
elif pstat_df is not None and pit_rat_df is not None and def_rat_df is not None:
starter_index = pstat_df.columns.get_loc('starter_rating')
closer_index = pstat_df.columns.get_loc('closer_rating')
for index, row in pstat_df.iterrows():
if dev_count < 0:
break
p_search = await player_search(row['key_bbref'])
if p_search is not None:
if 'id' in p_search:
player_id = p_search['id']
else:
player_id = p_search['player_id']
# Determine pitcher positions based on ratings
patch_params = [
('cost', f'{pit_rat_df.loc[row['key_bbref']]["cost"]}'),
('rarity_id', int(pit_rat_df.loc[row['key_bbref']]['rarity_id'])),
('image', f'{CARD_BASE_URL}{player_id}/pitchingcard{urllib.parse.quote("?d=")}{RELEASE_DIRECTORY}')
]
player_index = pstat_df.index[pstat_df['key_bbref'] == row['key_bbref']].tolist()
stat_row = pstat_df.iloc[player_index]
starter_rating = stat_row.iat[0, starter_index]
if starter_rating >= 4:
patch_params.append(('pos_1', 'SP'))
# Clear other position slots
for i in range(2, 9):
patch_params.append((f'pos_{i}', None))
else:
patch_params.append(('pos_1', 'RP'))
closer_rating = stat_row.iat[0, closer_index]
if not pd.isna(closer_rating):
patch_params.append(('pos_2', 'CP'))
# Clear remaining position slots
for i in range(3, 9):
patch_params.append((f'pos_{i}', None))
else:
# Clear remaining position slots
for i in range(2, 9):
patch_params.append((f'pos_{i}', None))
new_player = await db_patch('players', object_id=player_id, params=patch_params)
new_player['bbref_id'] = row['key_bbref']
all_players.append(new_player)
player_deltas.append([
new_player['player_id'], new_player['p_name'], p_search['cost'], new_player['cost'], p_search['rarity']['name'], new_player['rarity']['name']
])
else:
mlb_player = await mlb_search_or_post(row['key_retro'])
player_payload = new_player_payload(row, pit_rat_df)
player_index = pstat_df.index[pstat_df['key_bbref'] == row['key_bbref']].tolist()
stat_row = pstat_df.iloc[player_index]
starter_rating = stat_row.iat[0, starter_index]
if starter_rating >= 4:
player_payload['pos_1'] = 'SP'
else:
player_payload['pos_1'] = 'RP'
closer_rating = stat_row.iat[0, closer_index]
if not pd.isna(closer_rating):
player_payload['pos_2'] = 'CP'
new_player = await db_post('players', payload=player_payload)
if 'id' in new_player:
player_id = new_player['id']
else:
player_id = new_player['player_id']
new_player = await db_patch('players', object_id=player_id, params=[('image', f'{CARD_BASE_URL}{player_id}/pitchingcard{urllib.parse.quote("?d=")}{RELEASE_DIRECTORY}')])
if 'paperdex' in new_player:
del new_player['paperdex']
new_player['bbref_id'] = row['key_bbref']
all_players.append(new_player)
new_players.append([new_player['player_id'], new_player['p_name'], new_player['cost'], new_player['rarity']['name'], new_player['pos_1']])
dev_count += 1
else:
raise KeyError(f'Could not get players - not enough stat DFs were supplied')
pd.DataFrame(player_deltas[1:], columns=player_deltas[0]).to_csv(f'{"batter" if bstat_df is not None else "pitcher"}-deltas.csv')
pd.DataFrame(new_players[1:], columns=new_players[0]).to_csv(f'new-{"batter" if bstat_df is not None else "pitcher"}s.csv')
players_df = pd.DataFrame(all_players).set_index('bbref_id')
return players_df
async def post_batting_cards(cards_df: pd.DataFrame):
all_cards = []
cards_df.apply(lambda x: all_cards.append({
'player_id': int(x["player_id"]),
'steal_low': x['steal_low'],
'steal_high': x['steal_high'],
'steal_auto': x['steal_auto'],
'steal_jump': x['steal_jump'],
'bunting': x['bunt'],
'hit_and_run': x['hit_and_run'],
'running': x['running'],
'hand': x['hand']
}), axis=1)
resp = await db_put('battingcards', payload={'cards': all_cards}, timeout=6)
if resp is not None:
pass
else:
log_exception(ValueError, 'Unable to post batting cards')
bc_query = await db_get('battingcards', params=[('cardset_id', CARDSET_ID)])
if bc_query['count'] > 0:
bc_data = bc_query['cards']
for line in bc_data:
line['player_id'] = line['player']['player_id']
line['key_bbref'] = line['player']['bbref_id']
line['battingcard_id'] = line['id']
return pd.DataFrame(bc_data)
else:
log_exception(ValueError, 'Unable to pull newly posted batting cards')
async def post_pitching_cards(cards_df: pd.DataFrame):
all_cards = []
def get_closer_rating(raw_rating):
try:
if pd.isnull(raw_rating):
return None
else:
return raw_rating
except AttributeError:
return None
cards_df.apply(lambda x: all_cards.append({
'player_id': int(x['player_id']),
'balk': x['balk'],
'wild_pitch': x['wild_pitch'],
'hold': x['hold'],
'starter_rating': x['starter_rating'],
'relief_rating': x['relief_rating'],
'closer_rating': get_closer_rating(x['closer_rating']),
'batting': x['batting'],
'hand': x['pitch_hand'].upper()
}), axis=1)
resp = await db_put('pitchingcards', payload={'cards': all_cards}, timeout=6)
if resp is not None:
pass
else:
log_exception(ValueError, 'Unable to post pitcher cards')
pc_query = await db_get('pitchingcards', params=[('cardset_id', CARDSET_ID)])
if pc_query['count'] > 0:
pc_data = pc_query['cards']
if PLAYER_DESCRIPTION.lower() not in ['live', '1998']:
pc_data = [x for x in pc_query['cards'] if x['player']['mlbplayer']['key_retro'] in PROMO_INCLUSION_RETRO_IDS]
for line in pc_data:
line['player_id'] = line['player']['player_id']
line['key_bbref'] = line['player']['bbref_id']
line['pitchingcard_id'] = line['id']
return pd.DataFrame(pc_data)
else:
log_exception(ValueError, 'Unable to pull newly posted pitcher cards')
async def post_batting_ratings(ratings_df: pd.DataFrame):
all_ratings = []
def append_ratings(row):
vl = row['ratings_vL']
vl['player_id'] = row['player_id']
vl['battingcard_id'] = row['battingcard_id']
vr = row['ratings_vR']
vr['player_id'] = row['player_id']
vr['battingcard_id'] = row['battingcard_id']
all_ratings.append(vl)
all_ratings.append(vr)
ratings_df.apply(append_ratings, axis=1)
resp = await db_put('battingcardratings', payload={'ratings': all_ratings}, timeout=6)
if resp is not None:
return True
else:
log_exception(ValueError, 'Unable to post batting ratings')
async def post_pitching_ratings(ratings_df: pd.DataFrame):
all_ratings = []
def append_ratings(row):
vl = row['ratings_vL']
vl['player_id'] = row['player_id']
vl['pitchingcard_id'] = row['pitchingcard_id']
vr = row['ratings_vR']
vr['player_id'] = row['player_id']
vr['pitchingcard_id'] = row['pitchingcard_id']
all_ratings.append(vl)
all_ratings.append(vr)
ratings_df.apply(append_ratings, axis=1)
resp = await db_put('pitchingcardratings', payload={'ratings': all_ratings}, timeout=6)
if resp is not None:
return True
else:
log_exception(ValueError, 'Unable to post pitching ratings')
async def post_positions(pos_df: pd.DataFrame):
# Delete all existing cardpositions for this cardset to avoid stale data
# (e.g., DH positions from buggy runs where outfielders had no defensive positions)
logger.info(f'Deleting existing cardpositions for cardset {CARDSET_ID}')
existing_positions = await db_get('cardpositions', params=[('cardset_id', CARDSET_ID)])
if existing_positions and existing_positions.get('count', 0) > 0:
for pos in existing_positions['positions']:
try:
await db_delete('cardpositions', object_id=pos['id'], timeout=1)
except Exception as e:
logger.warning(f'Failed to delete cardposition {pos["id"]}: {e}')
logger.info(f'Deleted {existing_positions["count"]} old cardpositions')
all_pos = []
def append_positions(row):
clean_row = row.dropna()
new_val = clean_row.to_dict()
new_val['player_id'] = int(row['player_id'])
all_pos.append(new_val)
pos_df.apply(append_positions, axis=1)
resp = await db_put('cardpositions', payload={'positions': all_pos}, timeout=6)
if resp is not None:
return True
else:
log_exception(ValueError, 'Unable to post positions')
async def post_batter_data(bs: pd.DataFrame, bc: pd.DataFrame, br: pd.DataFrame, dr: pd.DataFrame) -> int:
all_players = await get_or_post_players(bstat_df=bs, bat_rat_df=br, def_rat_df=dr)
# Post Batting Cards
bc = pd.merge(
left=bc,
right=all_players,
how='left',
left_on='key_bbref',
right_on='bbref_id'
)
bc = await post_batting_cards(bc)
# Post Batting Ratings
# Only merge the columns we need to avoid corrupting dict columns in br
br = pd.merge(
left=br,
right=bc[['key_bbref', 'player_id', 'battingcard_id']],
how='left',
left_on='key_bbref',
right_on='key_bbref'
)
br = await post_batting_ratings(br)
# Post Positions
dr = pd.merge(
left=dr,
right=all_players,
how='right', # 'left',
left_on='key_bbref',
right_on='bbref_id'
)
await post_positions(dr)
return len(all_players)
async def post_pitcher_data(ps: pd.DataFrame, pc: pd.DataFrame, pr: pd.DataFrame, dr: pd.DataFrame) -> int:
all_players = await get_or_post_players(pstat_df=ps, pit_rat_df=pr, def_rat_df=dr)
ps = pd.merge(
left=all_players,
right=ps,
how='left',
left_on='bbref_id',
right_on='key_bbref'
)
# Post Pitching Cards
pc = await post_pitching_cards(ps)
# Post Pitching Ratings
# Only merge the columns we need to avoid corrupting dict columns in pr
pr = pd.merge(
left=pr,
right=pc[['key_bbref', 'player_id', 'pitchingcard_id']],
how='left',
left_on='key_bbref',
right_on='key_bbref'
)
pr = await post_pitching_ratings(pr)
# Post Positions
dr = pd.merge(
left=all_players,
right=dr,
how='left',
left_on='bbref_id',
right_on='key_bbref'
)
await post_positions(dr)
return len(all_players)
async def run_batters(data_input_path: str, start_date: int, end_date: int, post_data: bool = False, season_pct: float = 1.0):
print(f'Running the batter calcs...')
# batter_start = datetime.datetime.now()
# Get batting stats
batting_stats = get_batting_stats_by_date(f'{RETRO_FILE_PATH}{EVENTS_FILENAME}', start_date=start_date, end_date=end_date)
bs_len = len(batting_stats)
# end_calc = datetime.datetime.now()
# print(f'Combined batting stats: {(end_calc - batter_start).total_seconds():.2f}s\n')
running_start = datetime.datetime.now()
# Get running stats
running_stats = get_run_stat_df(data_input_path)
batting_stats = pd.merge(
left=batting_stats,
right=running_stats,
how='left',
left_on='key_bbref',
right_on='key_bbref'
)
# Handle players who played for multiple teams - keep only combined totals
# Players traded during season have multiple rows: one per team + one combined (2TM, 3TM, etc.)
duplicated_mask = batting_stats['key_bbref'].duplicated(keep=False)
if duplicated_mask.any():
# For duplicates, keep rows where Tm contains 'TM' (combined totals: 2TM, 3TM, etc.)
# For non-duplicates, keep all rows
multi_team_mask = batting_stats['Tm'].str.contains('TM', na=False)
batting_stats = batting_stats[~duplicated_mask | multi_team_mask]
logger.info(f"Removed {duplicated_mask.sum() - multi_team_mask.sum()} team-specific rows for traded batters")
bs_len = len(batting_stats) # Update length after removing duplicates
end_calc = datetime.datetime.now()
print(f'Running stats: {(end_calc - running_start).total_seconds():.2f}s')
if len(batting_stats) != bs_len:
raise DataMismatchError(f'retrosheet_data - run_batters - We started with {bs_len} batting lines and have {len(batting_stats)} after merging with running_stats')
# Calculate batting cards
card_start = datetime.datetime.now()
all_batting_cards = calc_batting_cards(batting_stats, season_pct)
card_end = datetime.datetime.now()
print(f'Create batting cards: {(card_end - card_start).total_seconds():.2f}s')
# Calculate batting ratings
rating_start = datetime.datetime.now()
batting_stats['battingcard_id'] = batting_stats['key_fangraphs']
all_batting_ratings = calc_batter_ratings(batting_stats)
rating_end = datetime.datetime.now()
print(f'Create batting ratings: {(rating_end - rating_start).total_seconds():.2f}s')
# Calculate defense ratings
defense_start = datetime.datetime.now()
all_defense_ratings = calc_positions(batting_stats)
defense_end = datetime.datetime.now()
print(f'Create defense ratings: {(defense_end - defense_start).total_seconds():.2f}s')
# Post all data
if post_data:
print(f'Posting player data...')
post_start = datetime.datetime.now()
num_players = await post_batter_data(batting_stats, all_batting_cards, all_batting_ratings, all_defense_ratings)
post_end = datetime.datetime.now()
print(f'Post player data: {(post_end - post_start).total_seconds()}s')
post_msg = f'Posted {num_players} players to the database'
logger.info(post_msg)
print(post_msg)
else:
post_msg = f'{batting_stats.index.size} total batters\n\nPlayers are NOT being posted to the database'
logger.warning(post_msg)
print(post_msg)
return batting_stats
async def run_pitchers(data_input_path: str, start_date: int, end_date: int, post_data: bool = False, season_pct: float = 1.0):
# Get pitching stats
pitching_stats = get_pitching_stats_by_date(f'{RETRO_FILE_PATH}{EVENTS_FILENAME}', start_date=start_date, end_date=end_date)
# Get peripheral stats
start_time = datetime.datetime.now()
periph_stats = get_periph_stat_df(data_input_path)
pitching_stats = pd.merge(
left=pitching_stats,
right=periph_stats,
how='left',
left_on='key_bbref',
right_on='key_bbref'
)
# Handle players who played for multiple teams - keep only combined totals
# Players traded during season have multiple rows: one per team + one combined (2TM, 3TM, etc.)
duplicated_mask = pitching_stats['key_bbref'].duplicated(keep=False)
if duplicated_mask.any():
# For duplicates, keep rows where Tm contains 'TM' (combined totals: 2TM, 3TM, etc.)
# For non-duplicates, keep all rows
multi_team_mask = pitching_stats['Tm'].str.contains('TM', na=False)
pitching_stats = pitching_stats[~duplicated_mask | multi_team_mask]
logger.info(f"Removed {duplicated_mask.sum() - multi_team_mask.sum()} team-specific rows for traded players")
end_time = datetime.datetime.now()
print(f'Peripheral stats: {(end_time - start_time).total_seconds():.2f}s')
# Calculate defense ratings
start_time = datetime.datetime.now()
df_p = pd.read_csv(f'{DATA_INPUT_FILE_PATH}defense_p.csv').set_index('key_bbref')
# Drop 'Tm' from defense data to avoid column name conflicts (we already have it from periph_stats)
if 'Tm' in df_p.columns:
df_p = df_p.drop(columns=['Tm'])
pitching_stats = pd.merge(
left=pitching_stats,
right=df_p,
how='left',
left_on='key_bbref',
right_on='key_bbref'
)
pitching_stats = pitching_stats.fillna(0)
all_defense_ratings = calc_pitcher_defense(pitching_stats)
end_time = datetime.datetime.now()
print(f'Defense stats: {(end_time - start_time).total_seconds():.2f}s')
# Calculate pitching cards
start_time = datetime.datetime.now()
all_pitching_cards = calc_pitching_cards(pitching_stats, season_pct)
pitching_stats = pd.merge(
left=pitching_stats,
right=all_pitching_cards,
how='left',
left_on='key_bbref',
right_on='key_bbref'
)
end_time = datetime.datetime.now()
print(f'Pit cards: {(end_time - start_time).total_seconds():.2f}s')
# Calculate pitching card ratings
start_time = datetime.datetime.now()
all_pitching_ratings = calc_pitcher_ratings(pitching_stats)
end_time = datetime.datetime.now()
print(f'Pit ratings: {(end_time - start_time).total_seconds():.2f}s')
# Post all data
if post_data:
print(f'\nPosting player data...')
post_start = datetime.datetime.now()
num_players = await post_pitcher_data(pitching_stats, all_pitching_cards, all_pitching_ratings, all_defense_ratings)
post_end = datetime.datetime.now()
print(f'Post player data: {(post_end - post_start).total_seconds()}s')
post_msg = f'\nPosted {num_players} pitchers to the database'
logger.info(post_msg)
print(post_msg)
else:
post_msg = f'{pitching_stats.index.size} total pitchers\n\nPlayers are NOT being posted to the database'
logger.warning(post_msg)
print(post_msg)
return pitching_stats
async def main(args):
if len(PROMO_INCLUSION_RETRO_IDS) > 0 and PLAYER_DESCRIPTION == 'Live':
msg = f'Player description is set to *Live*, but there are {len(PROMO_INCLUSION_RETRO_IDS)} IDs in the promo inclusion list. Clear the promo list or change the player description.'
log_exception(ValueError, msg=msg, level='error')
if weeks_between(START_DATE, END_DATE) > 5 and len(PROMO_INCLUSION_RETRO_IDS) > 0:
msg = f'More than 5 weeks are included for a promo cardset. Please adjust START_DATE and/or END_DATE.'
log_exception(ValueError, msg=msg, level='error')
batter_start = datetime.datetime.now()
batting_stats = await run_batters(f'{DATA_INPUT_FILE_PATH}', start_date=START_DATE, end_date=END_DATE, post_data=POST_DATA, season_pct=SEASON_PCT)
batting_stats.to_csv(f'batting_stats.csv')
batter_end = datetime.datetime.now()
print(f'\nBatter time: {(batter_end - batter_start).total_seconds():.2f}s\n')
pitcher_start = datetime.datetime.now()
pitching_stats = await run_pitchers(f'{DATA_INPUT_FILE_PATH}', start_date=START_DATE, end_date=END_DATE, post_data=POST_DATA, season_pct=SEASON_PCT)
pitching_stats.to_csv(f'pitching_stats.csv')
pitcher_end = datetime.datetime.now()
print(f'\nPitcher time: {(pitcher_end - pitcher_start).total_seconds():.2f}s')
print(f'Total: {(pitcher_end - batter_start).total_seconds():.2f}s\n\nDone!')
# await store_defense_to_csv(1998)
if __name__ == '__main__':
asyncio.run(main(sys.argv[1:]))