import asyncio import datetime import logging import sys from typing import Literal import pandas as pd import pybaseball as pb from pybaseball import cache import urllib from creation_helpers import CLUB_LIST, FRANCHISE_LIST, sanitize_name from batters.stat_prep import DataMismatchError from db_calls import DB_URL, db_get, db_patch, db_post, db_put, db_delete from exceptions import log_exception, logger from retrosheet_transformer import load_retrosheet_csv import batters.calcs_batter as cba import defenders.calcs_defense as cde import pitchers.calcs_pitcher as cpi cache.enable() # date = f'{datetime.datetime.now().year}-{datetime.datetime.now().month}-{datetime.datetime.now().day}' # log_level = logger.INFO # logger.basicConfig( # filename=f'logs/{date}.log', # format='%(asctime)s - retrosheet_data - %(levelname)s - %(message)s', # level=log_level # ) RETRO_FILE_PATH = "data-input/retrosheet/" EVENTS_FILENAME = ( "retrosheets_events_2005.csv" # Now using transformer for new format compatibility ) PERSONNEL_FILENAME = "retrosheets_personnel.csv" DATA_INPUT_FILE_PATH = "data-input/2005 Live Cardset/" CARD_BASE_URL = f"{DB_URL}/v2/players/" start_time = datetime.datetime.now() RELEASE_DIRECTORY = f"{start_time.year}-{start_time.month}-{start_time.day}" PLAYER_DESCRIPTION = "Live" # Live for Live Series # PLAYER_DESCRIPTION = 'May PotM' # PotM for promos PROMO_INCLUSION_RETRO_IDS = [ # AL # 'rodra001', # Alex Rodriguez (IF) # 'menck001', # Kevin Mench (OF) # 'colob001', # Bartolo Colon (SP) # 'ryanb001', # BJ Ryan (RP) # NL # 'delgc001', # Carlos Delgado (IF) # 'abreb001', # Bobby Abreu (OF) # 'haraa001', # Aaron Harang (SP) # 'hofft001', # Trevor Hoffman (RP) ] MIN_PA_VL = 20 if "live" in PLAYER_DESCRIPTION.lower() else 1 # 1 for PotM MIN_PA_VR = 40 if "live" in PLAYER_DESCRIPTION.lower() else 1 # 1 for PotM MIN_TBF_VL = MIN_PA_VL MIN_TBF_VR = MIN_PA_VR CARDSET_ID = ( 27 if "live" in PLAYER_DESCRIPTION.lower() else 28 ) # 27: 2005 Live, 28: 2005 Promos # Per-Update Parameters SEASON_PCT = 81 / 162 # Through end of July (~half season) START_DATE = 20050403 # YYYYMMDD format - 2005 Opening Day # END_DATE = 20050531 # YYYYMMDD format - May PotM END_DATE = 20050731 # End of July 2005 POST_DATA = True LAST_WEEK_RATIO = 0.0 LAST_TWOWEEKS_RATIO = 0.0 LAST_MONTH_RATIO = 0.0 def date_from_int(integer_date: int) -> datetime.datetime: return datetime.datetime( int(str(integer_date)[:4]), int(str(integer_date)[4:6]), int(str(integer_date)[-2:]), ) def date_math( start_date: int, operator: Literal["+", "-"], day_delta: int = 0, month_delta: int = 0, year_delta: int = 0, ) -> int: if len(str(start_date)) != 8: log_exception(ValueError, "Start date must be 8 digits long") if True in [day_delta < 0, month_delta < 0, year_delta < 0]: log_exception( ValueError, "Time deltas must greater than or equal to 0; use `-` operator to go back in time", ) if day_delta > 28: log_exception(ValueError, "Use month_delta for days > 28") if month_delta > 12: log_exception(ValueError, "Use year_delta for months > 12") s_date = date_from_int(start_date) if year_delta > 0: s_date = datetime.datetime( s_date.year + year_delta if operator == "+" else s_date.year - year_delta, s_date.month, s_date.day, ) if month_delta > 0: month_range = [12, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12] new_index = ( s_date.month + month_delta if operator == "+" else s_date.month - month_delta ) new_month = month_range[(new_index % 12)] new_year = s_date.year if new_index > 12: new_year += 1 elif new_index < 1: new_year -= 1 s_date = datetime.datetime(new_year, new_month, s_date.day) fd = s_date + datetime.timedelta( days=day_delta if operator == "+" else day_delta * -1 ) return f"{str(fd.year).zfill(4)}{str(fd.month).zfill(2)}{str(fd.day).zfill(2)}" def weeks_between(start_date_int: int, end_date_int: int) -> int: start_date = date_from_int(start_date_int) end_date = date_from_int(end_date_int) delta = end_date - start_date return abs(round(delta.days / 7)) async def store_defense_to_csv(season: int): for position in ["c", "1b", "2b", "3b", "ss", "lf", "cf", "rf", "of", "p"]: pos_df = cde.get_bbref_fielding_df(position, season) pos_df.to_csv(f"{DATA_INPUT_FILE_PATH}defense_{position}.csv") await asyncio.sleep(8) def get_batting_result_series( plays: pd.DataFrame, event_type: str, pitcher_hand: Literal["r", "l"], col_name: str ) -> pd.Series: this_series = ( plays[(plays.event_type == event_type) & (plays.pitcher_hand == pitcher_hand)] .groupby("batter_id") .count()["event_type"] .astype(int) .rename(col_name) ) return this_series def get_pitching_result_series( plays: pd.DataFrame, event_type: str, batter_hand: Literal["r", "l"], col_name: str ) -> pd.Series: this_series = ( plays[(plays.event_type == event_type) & (plays.batter_hand == batter_hand)] .groupby("pitcher_id") .count()["event_type"] .astype(int) .rename(col_name) ) return this_series def get_run_stat_df(input_path: str): run_data = pd.read_csv(f"{input_path}running.csv") # .set_index('Name-additional')) # if 'Player' in run_data: # run_data = run_data.rename(columns={'Player': 'Full Name'}) # if 'Name' in run_data: # run_data = run_data.rename(columns={'Name': 'Full Name'}) if "Player-additional" in run_data: run_data = run_data.rename(columns={"Player-additional": "key_bbref"}) if "Name-additional" in run_data: run_data = run_data.rename(columns={"Name-additional": "key_bbref"}) run_data = run_data[ [ "key_bbref", "Tm", "ROE", "XI", "RS%", "SBO", "SB", "CS", "SB%", "SB2", "CS2", "SB3", "CS3", "SBH", "CSH", "PO", "PCS", "OOB", "OOB1", "OOB2", "OOB3", "OOBHm", "BT", "XBT%", "1stS", "1stS2", "1stS3", "1stD", "1stD3", "1stDH", "2ndS", "2ndS3", "2ndSH", ] ] run_data = run_data.fillna(0) return run_data.set_index("key_bbref") def get_periph_stat_df(input_path: str): pit_data = pd.read_csv(f"{input_path}pitching.csv") if "Player-additional" in pit_data: pit_data = pit_data.rename(columns={"Player-additional": "key_bbref"}) if "Name-additional" in pit_data: pit_data = pit_data.rename(columns={"Name-additional": "key_bbref"}) if "Team" in pit_data: pit_data = pit_data.rename(columns={"Team": "Tm"}) pit_data = pit_data[["key_bbref", "Tm", "GF", "SHO", "SV", "IP", "BK", "WP"]] pit_data = pit_data.fillna(0) return pit_data def get_player_ids( plays: pd.DataFrame, which: Literal["batters", "pitchers"] ) -> pd.DataFrame: RETRO_PLAYERS = pd.read_csv(f"{RETRO_FILE_PATH}{PERSONNEL_FILENAME}") id_key = "batter_id" if which == "batters" else "pitcher_id" players = pd.DataFrame() unique_players = pd.Series(plays[id_key].unique()).to_frame("id") players = pd.merge( left=RETRO_PLAYERS, right=unique_players, how="right", left_on="id", right_on="id", ).rename(columns={"id": id_key}) if PLAYER_DESCRIPTION not in ["Live", "1998"]: msg = f"Player description is *{PLAYER_DESCRIPTION}* so dropping players not in PROMO_INCLUSION_RETRO_IDS" print(msg) logger.info(msg) # players = players.drop(players[players.index not in PROMO_INCLUSION_RETRO_IDS].index) players = players[players[id_key].isin(PROMO_INCLUSION_RETRO_IDS)] def get_pids(row): # return get_all_pybaseball_ids([row[id_key]], 'retro', full_name=f'{row["use_name"]} {row["last_name"]}') pull = pb.playerid_reverse_lookup([row[id_key]], key_type="retro") if len(pull.values) == 0: print(f"Could not find id {row[id_key]} in pybaseball lookup") return pull.loc[0][["key_mlbam", "key_retro", "key_bbref", "key_fangraphs"]] players = players[[id_key, "last_name", "use_name"]] start_time = datetime.datetime.now() other_ids = players.apply(get_pids, axis=1) end_time = datetime.datetime.now() print(f"ID lookup: {(end_time - start_time).total_seconds():.2f}s") def clean_first(row): return sanitize_name(row["use_name"]) def clean_last(row): return sanitize_name(row["last_name"]) players["use_name"] = players.apply(clean_first, axis=1) players["last_name"] = players.apply(clean_last, axis=1) players = pd.merge( left=players, right=other_ids, left_on=id_key, right_on="key_retro" ) players = players.set_index(id_key) def get_bat_hand(row): pa_vl = ( plays[(plays.batter_id == row["key_retro"]) & (plays.pitcher_hand == "l")] .groupby("result_batter_hand") .count()["game_id"] .astype(int) ) pa_vr = ( plays[(plays.batter_id == row["key_retro"]) & (plays.pitcher_hand == "r")] .groupby("result_batter_hand") .count()["game_id"] .astype(int) ) l_vs_l = 0 if "l" not in pa_vl else pa_vl["l"] l_vs_r = 0 if "l" not in pa_vr else pa_vr["l"] r_vs_l = 0 if "r" not in pa_vl else pa_vl["r"] r_vs_r = 0 if "r" not in pa_vr else pa_vr["r"] # If player ONLY batted from one side (zero PAs from other side), classify as single-handed if sum([l_vs_l, l_vs_r]) == 0 and sum([r_vs_l, r_vs_r]) > 0: return "R" elif sum([l_vs_l, l_vs_r]) > 0 and sum([r_vs_l, r_vs_r]) == 0: return "L" # If player batted from both sides (even if limited sample), they're a switch hitter # This correctly identifies switch hitters regardless of total PA count if sum([l_vs_l, l_vs_r]) > 0 and sum([r_vs_l, r_vs_r]) > 0: return "S" # Fallback for edge cases (shouldn't reach here in normal flow) if sum([l_vs_l, l_vs_r]) > sum([r_vs_l, r_vs_r]): return "L" else: return "R" def get_pitch_hand(row): first_event = plays.drop_duplicates("pitcher_id").loc[ plays.pitcher_id == row["key_retro"], "pitcher_hand" ] return first_event.item() if which == "batters": players["bat_hand"] = players.apply(get_bat_hand, axis=1) elif which == "pitchers": players["pitch_hand"] = players.apply(get_pitch_hand, axis=1) return players def get_base_batting_df( file_path: str, start_date: int, end_date: int ) -> list[pd.DataFrame, pd.DataFrame]: all_plays = load_retrosheet_csv(file_path) all_plays["date"] = all_plays["game_id"].str[3:-1].astype(int) date_plays = all_plays[ (all_plays.date >= start_date) & (all_plays.date <= end_date) ] all_player_ids = get_player_ids(all_plays, "batters") pal_series = ( date_plays[(date_plays.batter_event == "t") & (date_plays.pitcher_hand == "l")] .groupby("batter_id") .count()["event_type"] .astype(int) .rename("PA_vL") ) bs = pd.concat([all_player_ids, pal_series], axis=1) par_series = ( date_plays[(date_plays.batter_event == "t") & (date_plays.pitcher_hand == "r")] .groupby("batter_id") .count()["event_type"] .astype(int) .rename("PA_vR") ) bs = pd.concat([bs, par_series], axis=1) abl_series = ( date_plays[(date_plays.ab == "t") & (date_plays.pitcher_hand == "l")] .groupby("batter_id") .count()["event_type"] .astype(int) .rename("AB_vL") ) bs = pd.concat([bs, abl_series], axis=1) abr_series = ( date_plays[(date_plays.ab == "t") & (date_plays.pitcher_hand == "r")] .groupby("batter_id") .count()["event_type"] .astype(int) .rename("AB_vR") ) bs = pd.concat([bs, abr_series], axis=1) core_df = bs.dropna().query(f"PA_vL >= {MIN_PA_VL} & PA_vR >= {MIN_PA_VR}") if ( LAST_WEEK_RATIO == 0.0 and LAST_TWOWEEKS_RATIO == 0.0 and LAST_MONTH_RATIO == 0.0 ): return [date_plays, core_df] base_num_weeks = weeks_between(start_date, end_date) if LAST_WEEK_RATIO > 0: new_start = date_math(end_date, "-", day_delta=7) week_plays = date_plays[ (date_plays.date >= int(new_start)) & (date_plays.date <= end_date) ] copies = round(base_num_weeks * LAST_WEEK_RATIO) for x in range(copies): date_plays = pd.concat([date_plays, week_plays], ignore_index=True) if LAST_TWOWEEKS_RATIO > 0: new_start = date_math(end_date, "-", day_delta=14) week_plays = date_plays[ (date_plays.date >= int(new_start)) & (date_plays.date <= end_date) ] copies = round(base_num_weeks * LAST_TWOWEEKS_RATIO) for x in range(copies): date_plays = pd.concat([date_plays, week_plays], ignore_index=True) if LAST_MONTH_RATIO > 0: new_start = date_math(end_date, "-", month_delta=1) week_plays = date_plays[ (date_plays.date >= int(new_start)) & (date_plays.date <= end_date) ] copies = round(base_num_weeks * LAST_MONTH_RATIO) for x in range(copies): date_plays = pd.concat([date_plays, week_plays], ignore_index=True) core_df = core_df.drop(columns=["PA_vL", "PA_vR", "AB_vL", "AB_vR"]) pal_series = ( date_plays[(date_plays.batter_event == "t") & (date_plays.pitcher_hand == "l")] .groupby("batter_id") .count()["event_type"] .astype(int) .rename("PA_vL") ) core_df["PA_vL"] = pal_series par_series = ( date_plays[(date_plays.batter_event == "t") & (date_plays.pitcher_hand == "r")] .groupby("batter_id") .count()["event_type"] .astype(int) .rename("PA_vR") ) core_df["PA_vR"] = par_series abl_series = ( date_plays[(date_plays.ab == "t") & (date_plays.pitcher_hand == "l")] .groupby("batter_id") .count()["event_type"] .astype(int) .rename("AB_vL") ) core_df["AB_vL"] = abl_series abr_series = ( date_plays[(date_plays.ab == "t") & (date_plays.pitcher_hand == "r")] .groupby("batter_id") .count()["event_type"] .astype(int) .rename("AB_vR") ) core_df["AB_vR"] = abr_series return [date_plays, core_df] def get_base_pitching_df( file_path: str, start_date: int, end_date: int ) -> list[pd.DataFrame, pd.DataFrame]: all_plays = load_retrosheet_csv(file_path) all_plays["date"] = all_plays["game_id"].str[3:-1].astype(int) date_plays = all_plays[ (all_plays.date >= start_date) & (all_plays.date <= end_date) ] ps = get_player_ids(all_plays, "pitchers") tbfl_series = ( date_plays[(date_plays.batter_event == "t") & (date_plays.batter_hand == "l")] .groupby("pitcher_id") .count()["event_type"] .astype(int) .rename("TBF_vL") ) ps = pd.concat([ps, tbfl_series], axis=1) tbfr_series = ( date_plays[(date_plays.batter_event == "t") & (date_plays.batter_hand == "r")] .groupby("pitcher_id") .count()["event_type"] .astype(int) .rename("TBF_vR") ) ps = pd.concat([ps, tbfr_series], axis=1) abl_series = ( date_plays[(date_plays.ab == "t") & (date_plays.batter_hand == "l")] .groupby("pitcher_id") .count()["event_type"] .astype(int) .rename("AB_vL") ) ps = pd.concat([ps, abl_series], axis=1) abr_series = ( date_plays[(date_plays.ab == "t") & (date_plays.batter_hand == "r")] .groupby("pitcher_id") .count()["event_type"] .astype(int) .rename("AB_vR") ) ps = pd.concat([ps, abr_series], axis=1) if PLAYER_DESCRIPTION in ["Live", "1998"]: core_df = ps.dropna().query(f"TBF_vL >= {MIN_TBF_VL} & TBF_vR >= {MIN_TBF_VR}") else: core_df = ps.dropna() if ( LAST_WEEK_RATIO == 0.0 and LAST_TWOWEEKS_RATIO == 0.0 and LAST_MONTH_RATIO == 0.0 ): return [date_plays, core_df] base_num_weeks = weeks_between(start_date, end_date) if LAST_WEEK_RATIO > 0: new_start = date_math(end_date, "-", day_delta=7) week_plays = date_plays[ (date_plays.date >= int(new_start)) & (date_plays.date <= end_date) ] copies = round(base_num_weeks * LAST_WEEK_RATIO) for x in range(copies): date_plays = pd.concat([date_plays, week_plays], ignore_index=True) if LAST_TWOWEEKS_RATIO > 0: new_start = date_math(end_date, "-", day_delta=14) week_plays = date_plays[ (date_plays.date >= int(new_start)) & (date_plays.date <= end_date) ] copies = round(base_num_weeks * LAST_TWOWEEKS_RATIO) for x in range(copies): date_plays = pd.concat([date_plays, week_plays], ignore_index=True) if LAST_MONTH_RATIO > 0: new_start = date_math(end_date, "-", month_delta=1) week_plays = date_plays[ (date_plays.date >= int(new_start)) & (date_plays.date <= end_date) ] copies = round(base_num_weeks * LAST_MONTH_RATIO) for x in range(copies): date_plays = pd.concat([date_plays, week_plays], ignore_index=True) core_df = core_df.drop(columns=["TBF_vL", "TBF_vR", "AB_vL", "AB_vR"]) tbfl_series = ( date_plays[(date_plays.batter_event == "t") & (date_plays.batter_hand == "l")] .groupby("pitcher_id") .count()["event_type"] .astype(int) .rename("TBF_vL") ) core_df["TBF_vL"] = tbfl_series tbfr_series = ( date_plays[(date_plays.batter_event == "t") & (date_plays.batter_hand == "r")] .groupby("pitcher_id") .count()["event_type"] .astype(int) .rename("TBF_vR") ) core_df["TBF_vR"] = tbfr_series abl_series = ( date_plays[(date_plays.ab == "t") & (date_plays.batter_hand == "l")] .groupby("pitcher_id") .count()["event_type"] .astype(int) .rename("AB_vL") ) core_df["AB_vL"] = abl_series abr_series = ( date_plays[(date_plays.ab == "t") & (date_plays.batter_hand == "r")] .groupby("pitcher_id") .count()["event_type"] .astype(int) .rename("AB_vR") ) core_df["AB_vR"] = abr_series return [date_plays, core_df] def get_med_vL(row): high = 0.9 - row["Hard%_vL"] low = (row["SLG_vL"] - row["AVG_vL"]) * 1.5 return round(max(min(high, low), 0.1), 5) def get_med_vR(row): high = 0.9 - row["Hard%_vR"] low = (row["SLG_vR"] - row["AVG_vR"]) * 1.5 return round(max(min(high, low), 0.1), 5) def get_batting_stats_by_date( retro_file_path, start_date: int, end_date: int ) -> pd.DataFrame: start = datetime.datetime.now() all_plays, batting_stats = get_base_batting_df( retro_file_path, start_date, end_date ) print( f"Get base dataframe: {(datetime.datetime.now() - start).total_seconds():.2f}s" ) start = datetime.datetime.now() all_player_ids = batting_stats["key_retro"] logging.info(f"all_player_ids: {all_player_ids}") all_plays = all_plays[all_plays["batter_id"].isin(all_player_ids)] print(f"Shrink all_plays: {(datetime.datetime.now() - start).total_seconds():.2f}s") # Basic counting stats start = datetime.datetime.now() for event_type, vs_hand, col_name in [ ("home run", "r", "HR_vR"), ("home run", "l", "HR_vL"), ("single", "r", "1B_vR"), ("single", "l", "1B_vL"), ("double", "r", "2B_vR"), ("double", "l", "2B_vL"), ("triple", "r", "3B_vR"), ("triple", "l", "3B_vL"), ("walk", "r", "BB_vR"), ("walk", "l", "BB_vL"), ("strikeout", "r", "SO_vR"), ("strikeout", "l", "SO_vL"), ("hit by pitch", "r", "HBP_vR"), ("hit by pitch", "l", "HBP_vL"), ]: this_series = get_batting_result_series( all_plays, event_type, vs_hand, col_name ) batting_stats[col_name] = this_series print( f"Count basic stats: {(datetime.datetime.now() - start).total_seconds():.2f}s" ) # Bespoke counting stats start = datetime.datetime.now() def get_fb_vl(row): return ( all_plays[ (all_plays.batter_id == row["key_retro"]) & (all_plays.batted_ball_type == "f") & (all_plays.pitcher_hand == "l") ] .count()["event_type"] .astype(int) ) def get_fb_vr(row): return ( all_plays[ (all_plays.batter_id == row["key_retro"]) & (all_plays.batted_ball_type == "f") & (all_plays.pitcher_hand == "r") ] .count()["event_type"] .astype(int) ) def get_gb_vl(row): return ( all_plays[ (all_plays.batter_id == row["key_retro"]) & (all_plays.batted_ball_type == "G") & (all_plays.pitcher_hand == "l") ] .count()["event_type"] .astype(int) ) def get_gb_vr(row): return ( all_plays[ (all_plays.batter_id == row["key_retro"]) & (all_plays.batted_ball_type == "G") & (all_plays.pitcher_hand == "r") ] .count()["event_type"] .astype(int) ) def get_ld_vl(row): return ( all_plays[ (all_plays.batter_id == row["key_retro"]) & (all_plays.batted_ball_type == "l") & (all_plays.pitcher_hand == "l") ] .count()["event_type"] .astype(int) ) def get_ld_vr(row): return ( all_plays[ (all_plays.batter_id == row["key_retro"]) & (all_plays.batted_ball_type == "l") & (all_plays.pitcher_hand == "r") ] .count()["event_type"] .astype(int) ) def get_gdp_vl(row): dp = ( all_plays[ (all_plays.batter_id == row["key_retro"]) & (all_plays.batter_event == "t") & (all_plays.pitcher_hand == "l") & (all_plays.dp == "t") ] .count()["event_type"] .astype(int) ) tp = ( all_plays[ (all_plays.batter_id == row["key_retro"]) & (all_plays.batter_event == "t") & (all_plays.pitcher_hand == "l") & (all_plays.tp == "t") ] .count()["event_type"] .astype(int) ) return dp + tp def get_gdp_vr(row): dp = ( all_plays[ (all_plays.batter_id == row["key_retro"]) & (all_plays.batter_event == "t") & (all_plays.pitcher_hand == "r") & (all_plays.dp == "t") ] .count()["event_type"] .astype(int) ) tp = ( all_plays[ (all_plays.batter_id == row["key_retro"]) & (all_plays.batter_event == "t") & (all_plays.pitcher_hand == "r") & (all_plays.tp == "t") ] .count()["event_type"] .astype(int) ) return dp + tp def get_bunt(row): return ( all_plays[ (all_plays.batter_id == row["key_retro"]) & (all_plays.bunt == "t") ] .count()["event_type"] .astype(int) ) batting_stats["FB_vL"] = batting_stats.apply(get_fb_vl, axis=1) batting_stats["FB_vR"] = batting_stats.apply(get_fb_vr, axis=1) batting_stats["GB_vL"] = batting_stats.apply(get_gb_vl, axis=1) batting_stats["GB_vR"] = batting_stats.apply(get_gb_vr, axis=1) batting_stats["LD_vL"] = batting_stats.apply(get_ld_vl, axis=1) batting_stats["LD_vR"] = batting_stats.apply(get_ld_vr, axis=1) batting_stats["GDP_vL"] = batting_stats.apply(get_gdp_vl, axis=1) batting_stats["GDP_vR"] = batting_stats.apply(get_gdp_vr, axis=1) batting_stats["Bunts"] = batting_stats.apply(get_bunt, axis=1) print( f"Custom counting stats: {(datetime.datetime.now() - start).total_seconds():.2f}s" ) # Infield Hit % ifh_vl = ( all_plays[ (all_plays.hit_val.str.contains("1|2|3")) & (all_plays.pitcher_hand == "l") & (all_plays.hit_location.str.contains("1|2|3|4|5|6")) & (~all_plays.hit_location.str.contains("D", na=False)) ] .groupby("batter_id") .count()["event_type"] .astype(int) .rename("ifh_vL") ) ifh_vr = ( all_plays[ (all_plays.hit_val.str.contains("1|2|3")) & (all_plays.pitcher_hand == "r") & (all_plays.hit_location.str.contains("1|2|3|4|5|6")) & (~all_plays.hit_location.str.contains("D", na=False)) ] .groupby("batter_id") .count()["event_type"] .astype(int) .rename("ifh_vR") ) batting_stats["ifh_vL"] = ifh_vl batting_stats["ifh_vR"] = ifh_vr def get_pull_vl(row): pull_loc = "5|7" if row["bat_hand"] != "L" else "3|9" x = ( all_plays[ (all_plays.batter_id == row["key_retro"]) & (all_plays.pitcher_hand == "l") & (all_plays.hit_location.str.contains(pull_loc)) ] .count()["event_type"] .astype(int) ) return x def get_pull_vr(row): pull_loc = "5|7" if row["bat_hand"] == "R" else "3|9" x = ( all_plays[ (all_plays.batter_id == row["key_retro"]) & (all_plays.pitcher_hand == "r") & (all_plays.hit_location.str.contains(pull_loc)) ] .count()["event_type"] .astype(int) ) return x # Bespoke Queries batting_stats["pull_vL"] = batting_stats.apply(get_pull_vl, axis=1) batting_stats["pull_vR"] = batting_stats.apply(get_pull_vr, axis=1) center_vl = ( all_plays[ (all_plays.pitcher_hand == "l") & (all_plays.hit_location.str.contains("1|4|6|8")) ] .groupby("batter_id") .count()["event_type"] .astype(int) .rename("center_vl") ) center_vr = ( all_plays[ (all_plays.pitcher_hand == "r") & (all_plays.hit_location.str.contains("1|4|6|8")) ] .groupby("batter_id") .count()["event_type"] .astype(int) .rename("center_vr") ) batting_stats["center_vL"] = center_vl batting_stats["center_vR"] = center_vr oppo_vl = ( all_plays[ (all_plays.pitcher_hand == "l") & (all_plays.hit_location.str.contains("5|7")) ] .groupby("batter_id") .count()["event_type"] .astype(int) .rename("oppo_vL") ) oppo_vr = ( all_plays[ (all_plays.pitcher_hand == "r") & (all_plays.hit_location.str.contains("5|7")) ] .groupby("batter_id") .count()["event_type"] .astype(int) .rename("oppo_vR") ) batting_stats["oppo_vL"] = oppo_vl batting_stats["oppo_vR"] = oppo_vr # fill na to 0 following counting stats batting_stats = batting_stats.fillna(0) # Calculated Fields start = datetime.datetime.now() batting_stats["H_vL"] = ( batting_stats["1B_vL"] + batting_stats["2B_vL"] + batting_stats["3B_vL"] + batting_stats["HR_vL"] ) batting_stats["H_vR"] = ( batting_stats["1B_vR"] + batting_stats["2B_vR"] + batting_stats["3B_vR"] + batting_stats["HR_vR"] ) batting_stats["AVG_vL"] = round(batting_stats["H_vL"] / batting_stats["AB_vL"], 5) batting_stats["AVG_vR"] = round(batting_stats["H_vR"] / batting_stats["AB_vR"], 5) batting_stats["OBP_vL"] = round( (batting_stats["H_vL"] + batting_stats["BB_vL"] + batting_stats["HBP_vL"]) / batting_stats["PA_vL"], 5, ) batting_stats["OBP_vR"] = round( (batting_stats["H_vR"] + batting_stats["BB_vR"] + batting_stats["HBP_vR"]) / batting_stats["PA_vR"], 5, ) batting_stats["SLG_vL"] = round( ( batting_stats["1B_vL"] + batting_stats["2B_vL"] * 2 + batting_stats["3B_vL"] * 3 + batting_stats["HR_vL"] * 4 ) / batting_stats["AB_vL"], 5, ) batting_stats["SLG_vR"] = round( ( batting_stats["1B_vR"] + batting_stats["2B_vR"] * 2 + batting_stats["3B_vR"] * 3 + batting_stats["HR_vR"] * 4 ) / batting_stats["AB_vR"], 5, ) batting_stats["HR/FB_vL"] = round( batting_stats["HR_vL"] / batting_stats["FB_vL"], 5 ) batting_stats["HR/FB_vR"] = round( batting_stats["HR_vR"] / batting_stats["FB_vR"], 5 ) batting_stats["FB%_vL"] = round( batting_stats["FB_vL"] / (batting_stats["FB_vL"] + batting_stats["GB_vL"] + batting_stats["LD_vL"]), 5, ) batting_stats["FB%_vR"] = round( batting_stats["FB_vR"] / (batting_stats["FB_vR"] + batting_stats["GB_vR"] + batting_stats["LD_vR"]), 5, ) batting_stats["GB%_vL"] = round( batting_stats["GB_vL"] / (batting_stats["FB_vL"] + batting_stats["GB_vL"] + batting_stats["LD_vL"]), 5, ) batting_stats["GB%_vR"] = round( batting_stats["GB_vR"] / (batting_stats["FB_vR"] + batting_stats["GB_vR"] + batting_stats["LD_vR"]), 5, ) batting_stats["LD%_vL"] = round( batting_stats["LD_vL"] / (batting_stats["FB_vL"] + batting_stats["GB_vL"] + batting_stats["LD_vL"]), 5, ) batting_stats["LD%_vR"] = round( batting_stats["LD_vR"] / (batting_stats["FB_vR"] + batting_stats["GB_vR"] + batting_stats["LD_vR"]), 5, ) batting_stats["Hard%_vL"] = round( 0.2 + batting_stats["SLG_vL"] - batting_stats["AVG_vL"], 5 ) batting_stats["Hard%_vR"] = round( 0.2 + batting_stats["SLG_vR"] - batting_stats["AVG_vR"], 5 ) # def get_med_vL(row): # high = 0.9 - row['Hard%_vL'] # low = (row['SLG_vL'] - row['AVG_vL']) * 1.5 # return round(max(min(high, low),0.1), 5) # def get_med_vR(row): # high = 0.9 - row['Hard%_vR'] # low = (row['SLG_vR'] - row['AVG_vR']) * 1.5 # return round(max(min(high, low),0.1), 5) batting_stats["Med%_vL"] = batting_stats.apply(get_med_vL, axis=1) batting_stats["Med%_vR"] = batting_stats.apply(get_med_vR, axis=1) batting_stats["Soft%_vL"] = round( 1 - batting_stats["Hard%_vL"] - batting_stats["Med%_vL"], 5 ) batting_stats["Soft%_vR"] = round( 1 - batting_stats["Hard%_vR"] - batting_stats["Med%_vR"], 5 ) batting_stats["IFH%_vL"] = round(batting_stats["ifh_vL"] / batting_stats["H_vL"], 5) batting_stats["IFH%_vR"] = round(batting_stats["ifh_vR"] / batting_stats["H_vR"], 5) pull_val = round( batting_stats["pull_vL"] / ( batting_stats["pull_vL"] + batting_stats["center_vL"] + batting_stats["oppo_vL"] ), 5, ) batting_stats["Pull%_vL"] = pull_val.clip(0.1, 0.6) pull_val = round( batting_stats["pull_vR"] / ( batting_stats["pull_vR"] + batting_stats["center_vR"] + batting_stats["oppo_vR"] ), 5, ) batting_stats["Pull%_vR"] = pull_val.clip(0.1, 0.6) cent_val = round( batting_stats["center_vL"] / ( batting_stats["pull_vL"] + batting_stats["center_vL"] + batting_stats["oppo_vL"] ), 5, ) batting_stats["Cent%_vL"] = cent_val.clip(0.1, 0.6) cent_val = round( batting_stats["center_vL"] / ( batting_stats["pull_vR"] + batting_stats["center_vR"] + batting_stats["oppo_vR"] ), 5, ) batting_stats["Cent%_vR"] = cent_val.clip(0.1, 0.6) batting_stats["Oppo%_vL"] = round( 1 - batting_stats["Pull%_vL"] - batting_stats["Cent%_vL"], 5 ) batting_stats["Oppo%_vR"] = round( 1 - batting_stats["Pull%_vR"] - batting_stats["Cent%_vR"], 5 ) batting_stats = batting_stats.fillna(0) print( f"Calculated fields: {(datetime.datetime.now() - start).total_seconds():.2f}s" ) return batting_stats def get_pitching_stats_by_date( retro_file_path, start_date: int, end_date: int ) -> pd.DataFrame: start = datetime.datetime.now() all_plays, pitching_stats = get_base_pitching_df( retro_file_path, start_date, end_date ) print( f"Get base dataframe: {(datetime.datetime.now() - start).total_seconds():.2f}s" ) start = datetime.datetime.now() all_player_ids = pitching_stats["key_retro"] all_plays = all_plays[all_plays["pitcher_id"].isin(all_player_ids)] print(f"Shrink all_plays: {(datetime.datetime.now() - start).total_seconds():.2f}s") # Basic counting stats start = datetime.datetime.now() for event_type, vs_hand, col_name in [ ("home run", "r", "HR_vR"), ("home run", "l", "HR_vL"), ("single", "r", "1B_vR"), ("single", "l", "1B_vL"), ("double", "r", "2B_vR"), ("double", "l", "2B_vL"), ("triple", "r", "3B_vR"), ("triple", "l", "3B_vL"), ("walk", "r", "BB_vR"), ("walk", "l", "BB_vL"), ("strikeout", "r", "SO_vR"), ("strikeout", "l", "SO_vL"), ("hit by pitch", "r", "HBP_vR"), ("hit by pitch", "l", "HBP_vL"), ("intentional walk", "l", "IBB_vL"), ("intentional walk", "r", "IBB_vR"), ]: this_series = get_pitching_result_series( all_plays, event_type, vs_hand, col_name ) pitching_stats[col_name] = this_series print( f"Count basic stats: {(datetime.datetime.now() - start).total_seconds():.2f}s" ) pitching_stats = pitching_stats.fillna(0) # Bespoke counting stats start = datetime.datetime.now() def get_fb_vl(row): return ( all_plays[ (all_plays.pitcher_id == row["key_retro"]) & (all_plays.batted_ball_type == "f") & (all_plays.batter_hand == "l") ] .count()["event_type"] .astype(int) ) def get_fb_vr(row): return ( all_plays[ (all_plays.pitcher_id == row["key_retro"]) & (all_plays.batted_ball_type == "f") & (all_plays.batter_hand == "r") ] .count()["event_type"] .astype(int) ) def get_gb_vl(row): return ( all_plays[ (all_plays.pitcher_id == row["key_retro"]) & (all_plays.batted_ball_type == "G") & (all_plays.batter_hand == "l") ] .count()["event_type"] .astype(int) ) def get_gb_vr(row): return ( all_plays[ (all_plays.pitcher_id == row["key_retro"]) & (all_plays.batted_ball_type == "G") & (all_plays.batter_hand == "r") ] .count()["event_type"] .astype(int) ) def get_ld_vl(row): return ( all_plays[ (all_plays.pitcher_id == row["key_retro"]) & (all_plays.batted_ball_type == "l") & (all_plays.pitcher_hand == "l") ] .count()["event_type"] .astype(int) ) def get_ld_vr(row): return ( all_plays[ (all_plays.pitcher_id == row["key_retro"]) & (all_plays.batted_ball_type == "l") & (all_plays.pitcher_hand == "r") ] .count()["event_type"] .astype(int) ) pitching_stats["FB_vL"] = pitching_stats.apply(get_fb_vl, axis=1) pitching_stats["FB_vR"] = pitching_stats.apply(get_fb_vr, axis=1) pitching_stats["GB_vL"] = pitching_stats.apply(get_gb_vl, axis=1) pitching_stats["GB_vR"] = pitching_stats.apply(get_gb_vr, axis=1) pitching_stats["LD_vL"] = pitching_stats.apply(get_ld_vl, axis=1) pitching_stats["LD_vR"] = pitching_stats.apply(get_ld_vr, axis=1) pitching_stats["H_vL"] = ( pitching_stats["1B_vL"] + pitching_stats["2B_vL"] + pitching_stats["3B_vL"] + pitching_stats["HR_vL"] ) pitching_stats["H_vR"] = ( pitching_stats["1B_vR"] + pitching_stats["2B_vR"] + pitching_stats["3B_vR"] + pitching_stats["HR_vR"] ) print( f"Custom counting stats: {(datetime.datetime.now() - start).total_seconds():.2f}s" ) # Calculated Fields """ Oppo%_vL & R """ start = datetime.datetime.now() pitching_stats["AVG_vL"] = round( pitching_stats["H_vL"] / pitching_stats["AB_vL"], 5 ) pitching_stats["AVG_vR"] = round( pitching_stats["H_vR"] / pitching_stats["AB_vR"], 5 ) pitching_stats["OBP_vL"] = round( ( pitching_stats["H_vL"] + pitching_stats["BB_vL"] + pitching_stats["HBP_vL"] + pitching_stats["IBB_vL"] ) / pitching_stats["TBF_vL"], 5, ) pitching_stats["OBP_vR"] = round( ( pitching_stats["H_vR"] + pitching_stats["BB_vR"] + pitching_stats["HBP_vR"] + pitching_stats["IBB_vR"] ) / pitching_stats["TBF_vR"], 5, ) pitching_stats["SLG_vL"] = round( ( pitching_stats["1B_vL"] + pitching_stats["2B_vL"] * 2 + pitching_stats["3B_vL"] * 3 + pitching_stats["HR_vL"] * 4 ) / pitching_stats["AB_vL"], 5, ) pitching_stats["SLG_vR"] = round( ( pitching_stats["1B_vR"] + pitching_stats["2B_vR"] * 2 + pitching_stats["3B_vR"] * 3 + pitching_stats["HR_vR"] * 4 ) / pitching_stats["AB_vR"], 5, ) pitching_stats["HR/FB_vL"] = round( pitching_stats["HR_vL"] / pitching_stats["FB_vL"], 5 ) pitching_stats["HR/FB_vR"] = round( pitching_stats["HR_vR"] / pitching_stats["FB_vR"], 5 ) pitching_stats["Hard%_vL"] = round( 0.2 + pitching_stats["SLG_vL"] - pitching_stats["AVG_vL"], 5 ) pitching_stats["Hard%_vR"] = round( 0.2 + pitching_stats["SLG_vR"] - pitching_stats["AVG_vR"], 5 ) pitching_stats["Med%_vL"] = pitching_stats.apply(get_med_vL, axis=1) pitching_stats["Med%_vR"] = pitching_stats.apply(get_med_vR, axis=1) pitching_stats["Soft%_vL"] = round( 1 - pitching_stats["Hard%_vL"] - pitching_stats["Med%_vL"], 5 ) pitching_stats["Soft%_vR"] = round( 1 - pitching_stats["Hard%_vR"] - pitching_stats["Med%_vR"], 5 ) pitching_stats["FB%_vL"] = round( pitching_stats["FB_vL"] / (pitching_stats["FB_vL"] + pitching_stats["GB_vL"] + pitching_stats["LD_vL"]), 5, ) pitching_stats["FB%_vR"] = round( pitching_stats["FB_vR"] / (pitching_stats["FB_vR"] + pitching_stats["GB_vR"] + pitching_stats["LD_vR"]), 5, ) pitching_stats["GB%_vL"] = round( pitching_stats["GB_vL"] / (pitching_stats["FB_vL"] + pitching_stats["GB_vL"] + pitching_stats["LD_vL"]), 5, ) pitching_stats["GB%_vR"] = round( pitching_stats["GB_vR"] / (pitching_stats["FB_vR"] + pitching_stats["GB_vR"] + pitching_stats["LD_vR"]), 5, ) def get_oppo_vl(row): count = ( all_plays[ (all_plays.pitcher_id == row["key_retro"]) & (all_plays.batter_hand == "l") & (all_plays.hit_location.str.contains("5|7")) ] .count()["event_type"] .astype(int) ) denom = ( all_plays[ (all_plays.pitcher_id == row["key_retro"]) & (all_plays.batter_hand == "l") & (all_plays.batter_event == "t") ] .count()["event_type"] .astype(int) ) return round(count / denom, 5) def get_oppo_vr(row): count = ( all_plays[ (all_plays.pitcher_id == row["key_retro"]) & (all_plays.batter_hand == "r") & (all_plays.hit_location.str.contains("3|9")) ] .count()["event_type"] .astype(int) ) denom = ( all_plays[ (all_plays.pitcher_id == row["key_retro"]) & (all_plays.batter_hand == "r") & (all_plays.batter_event == "t") ] .count()["event_type"] .astype(int) ) return round(count / denom, 5) pitching_stats["Oppo%_vL"] = pitching_stats.apply(get_oppo_vl, axis=1) pitching_stats["Oppo%_vR"] = pitching_stats.apply(get_oppo_vr, axis=1) pitching_stats = pitching_stats.fillna(0) print( f"Calculated fields: {(datetime.datetime.now() - start).total_seconds():.2f}s" ) return pitching_stats def calc_batting_cards(bs: pd.DataFrame, season_pct: float) -> pd.DataFrame: def create_batting_card(row): steal_data = cba.stealing( chances=int(row["SBO"]), sb2s=int(row["SB2"]), cs2s=int(row["CS2"]), sb3s=int(row["SB3"]), cs3s=int(row["CS3"]), season_pct=1.0, ) y = pd.DataFrame( { "key_bbref": [row["key_bbref"]], "steal_low": [steal_data[0]], "steal_high": [steal_data[1]], "steal_auto": [steal_data[2]], "steal_jump": [steal_data[3]], "hit_and_run": [ cba.hit_and_run( row["AB_vL"], row["AB_vR"], row["H_vL"], row["H_vR"], row["HR_vL"], row["HR_vR"], row["SO_vL"], row["SO_vR"], ) ], "bunt": [cba.bunting(row["Bunts"], season_pct)], "running": [cba.running(row["XBT%"])], "hand": [row["bat_hand"]], } ) return y.loc[0] all_cards = bs.apply(create_batting_card, axis=1) all_cards = all_cards.set_index("key_bbref") return all_cards def calc_pitching_cards(ps: pd.DataFrame, season_pct: float) -> pd.DataFrame: def create_pitching_card(row): pow_data = cde.pow_ratings(row["IP"], row["GS"], row["G"]) y = pd.DataFrame( { "key_bbref": [row["key_bbref"]], "balk": [cpi.balks(row["BK"], row["IP"], season_pct)], "wild_pitch": [cpi.wild_pitches(row["WP"], row["IP"], season_pct)], "hold": [ cde.hold_pitcher( str(row["caught_stealing_perc"]), int(row["pickoffs"]), season_pct, ) ], "starter_rating": [pow_data[0]], "relief_rating": [pow_data[1]], "closer_rating": [ cpi.closer_rating(int(row["GF"]), int(row["SV"]), int(row["G"])) ], "batting": [f'#1W{row["pitch_hand"].upper()}-C'], } ) return y.loc[0] all_cards = ps.apply(create_pitching_card, axis=1) all_cards = all_cards.set_index("key_bbref") return all_cards def calc_batter_ratings(bs: pd.DataFrame) -> pd.DataFrame: def create_batting_rating(row): if row["key_bbref"] == "galaran01": pass ratings = cba.get_batter_ratings(row) ops_vl = ratings[0]["obp"] + ratings[0]["slg"] ops_vr = ratings[1]["obp"] + ratings[1]["slg"] total_ops = (ops_vl + ops_vr + min(ops_vr, ops_vl)) / 3 def calc_cost(total_ops, base_cost, base_ops, max_delta) -> int: delta = ((total_ops - base_ops) / 0.1) * 2 if delta < 1: delta = (max_delta * (1 - (total_ops / base_ops))) * -0.1 final_cost = base_cost + (max_delta * delta) return round(final_cost) if total_ops >= 1.2: rarity_id = 99 cost = calc_cost(total_ops, base_cost=2400, base_ops=1.215, max_delta=810) elif total_ops >= 1: rarity_id = 1 cost = calc_cost(total_ops, base_cost=810, base_ops=1.05, max_delta=270) elif total_ops >= 0.9: rarity_id = 2 cost = calc_cost(total_ops, base_cost=270, base_ops=0.95, max_delta=90) elif total_ops >= 0.8: rarity_id = 3 cost = calc_cost(total_ops, base_cost=90, base_ops=0.85, max_delta=30) elif total_ops >= 0.7: rarity_id = 4 cost = calc_cost(total_ops, base_cost=30, base_ops=0.75, max_delta=10) else: rarity_id = 5 cost = calc_cost(total_ops, base_cost=10, base_ops=0.61, max_delta=8) x = pd.DataFrame( { "key_bbref": [row["key_bbref"]], "ratings_vL": [ratings[0]], "ratings_vR": [ratings[1]], "ops_vL": ops_vl, "ops_vR": ops_vr, "total_ops": total_ops, "rarity_id": rarity_id, "cost": cost, } ) return x.loc[0] all_ratings = bs.apply(create_batting_rating, axis=1) all_ratings = all_ratings.set_index("key_bbref") return all_ratings def calc_pitcher_ratings(ps: pd.DataFrame) -> pd.DataFrame: def create_pitching_rating(row): row["pitchingcard_id"] = row["key_fangraphs"] row["pitch_hand"] = row["pitch_hand"].upper() ratings = cpi.get_pitcher_ratings(row) ops_vl = ratings[0]["obp"] + ratings[0]["slg"] ops_vr = ratings[1]["obp"] + ratings[1]["slg"] total_ops = (ops_vl + ops_vr + min(ops_vr, ops_vl)) / 3 def calc_cost(total_ops, base_cost, base_ops, max_delta) -> int: delta = ((base_ops - total_ops) / 0.1) * 2 if delta < -0.9: delta = -0.95 final_cost = base_cost + (max_delta * delta) return round(final_cost) if row["starter_rating"] > 3: if total_ops <= 0.4: rarity_id = 99 cost = calc_cost(total_ops, 2400, 0.38, 810) elif total_ops <= 0.475: rarity_id = 1 cost = calc_cost(total_ops, 810, 0.44, 270) elif total_ops <= 0.53: rarity_id = 2 cost = calc_cost(total_ops, 270, 0.51, 90) elif total_ops <= 0.6: rarity_id = 3 cost = calc_cost(total_ops, 90, 0.575, 30) elif total_ops <= 0.675: rarity_id = 4 cost = calc_cost(total_ops, 30, 0.64, 10) else: rarity_id = 5 cost = calc_cost(total_ops, 10, 0.7, 8) else: if total_ops <= 0.325: rarity_id = 99 cost = calc_cost(total_ops, 2400, 0.38, 810) elif total_ops <= 0.4: rarity_id = 1 cost = calc_cost(total_ops, 810, 0.44, 270) elif total_ops <= 0.475: rarity_id = 2 cost = calc_cost(total_ops, 270, 0.51, 90) elif total_ops <= 0.55: rarity_id = 3 cost = calc_cost(total_ops, 90, 0.575, 30) elif total_ops <= 0.625: rarity_id = 4 cost = calc_cost(total_ops, 30, 0.64, 10) else: rarity_id = 5 cost = calc_cost(total_ops, 10, 0.7, 8) x = pd.DataFrame( { "key_bbref": [row["key_bbref"]], "ratings_vL": [ratings[0]], "ratings_vR": [ratings[1]], "ops_vL": ops_vl, "ops_vR": ops_vr, "total_ops": total_ops, "rarity_id": rarity_id, "cost": cost, } ) return x.loc[0] all_ratings = ps.apply(create_pitching_rating, axis=1) all_ratings = all_ratings.set_index("key_bbref") return all_ratings def calc_positions(bs: pd.DataFrame) -> pd.DataFrame: df_c = pd.read_csv(f"{DATA_INPUT_FILE_PATH}defense_c.csv").set_index("key_bbref") df_1b = pd.read_csv(f"{DATA_INPUT_FILE_PATH}defense_1b.csv").set_index("key_bbref") df_2b = pd.read_csv(f"{DATA_INPUT_FILE_PATH}defense_2b.csv").set_index("key_bbref") df_3b = pd.read_csv(f"{DATA_INPUT_FILE_PATH}defense_3b.csv").set_index("key_bbref") df_ss = pd.read_csv(f"{DATA_INPUT_FILE_PATH}defense_ss.csv").set_index("key_bbref") df_lf = pd.read_csv(f"{DATA_INPUT_FILE_PATH}defense_lf.csv").set_index("key_bbref") df_cf = pd.read_csv(f"{DATA_INPUT_FILE_PATH}defense_cf.csv").set_index("key_bbref") df_rf = pd.read_csv(f"{DATA_INPUT_FILE_PATH}defense_rf.csv").set_index("key_bbref") df_of = pd.read_csv(f"{DATA_INPUT_FILE_PATH}defense_of.csv").set_index("key_bbref") season_pct = 1.0 all_pos = [] def process_pos(row): no_data = True for pos_df, position in [ (df_1b, "1b"), (df_2b, "2b"), (df_3b, "3b"), (df_ss, "ss"), ]: if row["key_bbref"] in pos_df.index: logger.info( f'Running {position} stats for {row["use_name"]} {row["last_name"]}' ) try: if "bis_runs_total" in pos_df.columns: average_range = ( int(pos_df.at[row["key_bbref"], "tz_runs_total"]) + int(pos_df.at[row["key_bbref"], "bis_runs_total"]) + min( int(pos_df.at[row["key_bbref"], "tz_runs_total"]), int(pos_df.at[row["key_bbref"], "bis_runs_total"]), ) ) / 3 else: average_range = pos_df.at[row["key_bbref"], "tz_runs_total"] if float(pos_df.at[row["key_bbref"], "Inn_def"]) >= 10.0: all_pos.append( { "key_bbref": row["key_bbref"], "position": position.upper(), "innings": float( pos_df.at[row["key_bbref"], "Inn_def"] ), "range": cde.get_if_range( pos_code=position, tz_runs=round(average_range), r_dp=0, season_pct=season_pct, ), "error": cde.get_any_error( pos_code=position, errors=int(pos_df.at[row["key_bbref"], "E_def"]), chances=int(pos_df.at[row["key_bbref"], "chances"]), season_pct=season_pct, ), } ) no_data = False except Exception as e: logger.info(f"Infield position failed: {e}") of_arms = [] of_payloads = [] for pos_df, position in [(df_lf, "lf"), (df_cf, "cf"), (df_rf, "rf")]: if row["key_bbref"] in pos_df.index: try: if "bis_runs_total" in pos_df.columns: average_range = ( int(pos_df.at[row["key_bbref"], "tz_runs_total"]) + int(pos_df.at[row["key_bbref"], "bis_runs_total"]) + min( int(pos_df.at[row["key_bbref"], "tz_runs_total"]), int(pos_df.at[row["key_bbref"], "bis_runs_total"]), ) ) / 3 else: average_range = pos_df.at[row["key_bbref"], "tz_runs_total"] if float(pos_df.at[row["key_bbref"], "Inn_def"]) >= 10.0: of_payloads.append( { "key_bbref": row["key_bbref"], "position": position.upper(), "innings": float( pos_df.at[row["key_bbref"], "Inn_def"] ), "range": cde.get_of_range( pos_code=position, tz_runs=round(average_range), season_pct=season_pct, ), } ) of_run_rating = ( "bis_runs_outfield" if "bis_runs_outfield" in pos_df.columns else "tz_runs_total" ) of_arms.append(int(pos_df.at[row["key_bbref"], of_run_rating])) no_data = False except Exception as e: logger.info(f"Outfield position failed: {e}") if ( row["key_bbref"] in df_of.index and len(of_arms) > 0 and len(of_payloads) > 0 ): try: error_rating = cde.get_any_error( pos_code=position, errors=int(df_of.at[row["key_bbref"], "E_def"]), chances=int(df_of.at[row["key_bbref"], "chances"]), season_pct=season_pct, ) arm_rating = cde.arm_outfield(of_arms) for f in of_payloads: f["error"] = error_rating f["arm"] = arm_rating all_pos.append(f) no_data = False except Exception as e: logger.info(f"Outfield position failed: {e}") if row["key_bbref"] in df_c.index: try: run_rating = ( "bis_runs_catcher_sb" if "bis_runs_catcher_sb" in df_c else "tz_runs_catcher" ) if ( df_c.at[row["key_bbref"], "SB"] + df_c.at[row["key_bbref"], "CS"] == 0 ): arm_rating = 3 else: arm_rating = cde.arm_catcher( cs_pct=df_c.at[row["key_bbref"], "caught_stealing_perc"], raa=int(df_c.at[row["key_bbref"], run_rating]), season_pct=season_pct, ) if float(df_c.at[row["key_bbref"], "Inn_def"]) >= 10.0: all_pos.append( { "key_bbref": row["key_bbref"], "position": "C", "innings": float(df_c.at[row["key_bbref"], "Inn_def"]), "range": cde.range_catcher( rs_value=int( df_c.at[row["key_bbref"], "tz_runs_catcher"] ), season_pct=season_pct, ), "error": cde.get_any_error( pos_code="c", errors=int(df_c.at[row["key_bbref"], "E_def"]), chances=int(df_c.at[row["key_bbref"], "chances"]), season_pct=season_pct, ), "arm": arm_rating, "pb": cde.pb_catcher( pb=int(df_c.at[row["key_bbref"], "PB"]), innings=int( float(df_c.at[row["key_bbref"], "Inn_def"]) ), season_pct=season_pct, ), "overthrow": cde.ot_catcher( errors=int(df_c.at[row["key_bbref"], "E_def"]), chances=int(df_c.at[row["key_bbref"], "chances"]), season_pct=season_pct, ), } ) no_data = False except Exception as e: logger.info(f"Catcher position failed: {e}") if no_data: all_pos.append( { "key_bbref": row["key_bbref"], "position": "DH", "innings": row["PA_vL"] + row["PA_vR"], } ) bs.apply(process_pos, axis=1) pos_df = pd.DataFrame(all_pos) pos_df = pos_df.set_index("key_bbref") return pos_df def calc_pitcher_defense(ps: pd.DataFrame) -> pd.DataFrame: df_p = pd.read_csv(f"{DATA_INPUT_FILE_PATH}defense_p.csv").set_index("key_bbref") all_pos = [] def process_def(row): if "bis_runs_total" in df_p: range_val = cde.range_pitcher( rs_value=int(df_p.at[row["key_bbref"], "bis_runs_total"]) ) else: range_val = cde.range_pitcher( rf_per9_value=df_p.at[row["key_bbref"], "range_factor_per_nine"] ) if row["key_bbref"] in df_p.index: all_pos.append( { "key_bbref": row["key_bbref"], "position": "P", "innings": float(df_p.at[row["key_bbref"], "Inn_def"]), "range": range_val, "error": cde.get_any_error( pos_code="p", errors=int(df_p.at[row["key_bbref"], "E_def"]), chances=int(df_p.at[row["key_bbref"], "chances"]), season_pct=1.0, ), } ) else: all_pos.append( { "key_bbref": int(row["key_bbref"]), "position": "P", "innings": 1, "range": 5, "error": 51, } ) ps.apply(process_def, axis=1) pos_df = pd.DataFrame(all_pos) pos_df = pos_df.set_index("key_bbref") return pos_df async def get_or_post_players( bstat_df: pd.DataFrame = None, bat_rat_df: pd.DataFrame = None, def_rat_df: pd.DataFrame = None, pstat_df: pd.DataFrame = None, pit_rat_df: pd.DataFrame = None, ) -> pd.DataFrame: all_players = [] player_deltas = [ ["player_id", "player_name", "old-cost", "new-cost", "old-rarity", "new-rarity"] ] new_players = [["player_id", "player_name", "cost", "rarity", "pos1"]] async def player_search(bbref_id: str): p_query = await db_get( "players", params=[("bbref_id", bbref_id), ("cardset_id", CARDSET_ID)] ) if p_query["count"] > 0: return p_query["players"][0] else: return None async def mlb_search_or_post(retro_id: int): mlb_query = await db_get("mlbplayers", params=[("key_retro", retro_id)]) if mlb_query["count"] > 0: return mlb_query["players"][0] else: mlb_player = await db_post( "mlbplayers/one", payload={ "first_name": row["use_name"], "last_name": row["last_name"], "key_mlbam": row["key_mlbam"], "key_fangraphs": row["key_fangraphs"], "key_bbref": row["key_bbref"], "key_retro": row["key_retro"], }, ) return mlb_player def new_player_payload(row, ratings_df: pd.DataFrame): return { "p_name": f'{row["use_name"]} {row["last_name"]}', "cost": f'{ratings_df.loc[row['key_bbref']]["cost"]}', "image": "change-me", "mlbclub": CLUB_LIST[row["Tm"]], "franchise": FRANCHISE_LIST[row["Tm"]], "cardset_id": CARDSET_ID, "set_num": int(float(row["key_fangraphs"])), "rarity_id": int(ratings_df.loc[row["key_bbref"]]["rarity_id"]), "description": PLAYER_DESCRIPTION, "bbref_id": row["key_bbref"], "fangr_id": int(float(row["key_fangraphs"])), "mlbplayer_id": mlb_player["id"], } def get_player_record_pos(def_rat_df: pd.DataFrame, row) -> list[str]: all_pos = [None, None, None, None, None, None, None, None] try: count = 0 all_pos_df = def_rat_df.loc[row["key_bbref"]].sort_values( by="innings", ascending=False ) for index, pos_row in all_pos_df.iterrows(): all_pos[count] = pos_row.position count += 1 except KeyError: logger.info(f"No positions found for {row['use_name']} {row['last_name']}") all_pos[0] = "DH" except TypeError: logger.info( f"Only one position found for {row['use_name']} {row['last_name']}" ) all_pos[0] = def_rat_df.loc[row["key_bbref"]].position return all_pos dev_count = 0 if bstat_df is not None and bat_rat_df is not None and def_rat_df is not None: for index, row in bstat_df.iterrows(): if dev_count < 0: break p_search = await player_search(row["key_bbref"]) if p_search is not None: if "id" in p_search: player_id = p_search["id"] else: player_id = p_search["player_id"] # Update positions for existing players too all_pos = get_player_record_pos(def_rat_df, row) patch_params = [ ("cost", f'{bat_rat_df.loc[row['key_bbref']]["cost"]}'), ("rarity_id", int(bat_rat_df.loc[row["key_bbref"]]["rarity_id"])), ( "image", f'{CARD_BASE_URL}{player_id}/battingcard{urllib.parse.quote("?d=")}{RELEASE_DIRECTORY}', ), ] # Add position updates - set all 8 slots to clear any old positions for x in enumerate(all_pos): patch_params.append((f"pos_{x[0] + 1}", x[1])) new_player = await db_patch( "players", object_id=player_id, params=patch_params ) new_player["bbref_id"] = row["key_bbref"] all_players.append(new_player) player_deltas.append( [ new_player["player_id"], new_player["p_name"], p_search["cost"], new_player["cost"], p_search["rarity"]["name"], new_player["rarity"]["name"], ] ) else: mlb_player = await mlb_search_or_post(row["key_retro"]) player_payload = new_player_payload(row, bat_rat_df) all_pos = get_player_record_pos(def_rat_df, row) for x in enumerate(all_pos): player_payload[f"pos_{x[0] + 1}"] = x[1] new_player = await db_post("players", payload=player_payload) if "id" in new_player: player_id = new_player["id"] else: player_id = new_player["player_id"] new_player = await db_patch( "players", object_id=player_id, params=[ ( "image", f'{CARD_BASE_URL}{player_id}/battingcard{urllib.parse.quote("?d=")}{RELEASE_DIRECTORY}', ) ], ) if "paperdex" in new_player: del new_player["paperdex"] # all_bbref_ids.append(row['key_bbref']) # all_player_ids.append(player_id) new_player["bbref_id"] = row["key_bbref"] all_players.append(new_player) new_players.append( [ new_player["player_id"], new_player["p_name"], new_player["cost"], new_player["rarity"]["name"], new_player["pos_1"], ] ) dev_count += 1 elif pstat_df is not None and pit_rat_df is not None and def_rat_df is not None: starter_index = pstat_df.columns.get_loc("starter_rating") closer_index = pstat_df.columns.get_loc("closer_rating") for index, row in pstat_df.iterrows(): if dev_count < 0: break p_search = await player_search(row["key_bbref"]) if p_search is not None: if "id" in p_search: player_id = p_search["id"] else: player_id = p_search["player_id"] # Determine pitcher positions based on ratings patch_params = [ ("cost", f'{pit_rat_df.loc[row['key_bbref']]["cost"]}'), ("rarity_id", int(pit_rat_df.loc[row["key_bbref"]]["rarity_id"])), ( "image", f'{CARD_BASE_URL}{player_id}/pitchingcard{urllib.parse.quote("?d=")}{RELEASE_DIRECTORY}', ), ] player_index = pstat_df.index[ pstat_df["key_bbref"] == row["key_bbref"] ].tolist() stat_row = pstat_df.iloc[player_index] starter_rating = stat_row.iat[0, starter_index] if starter_rating >= 4: patch_params.append(("pos_1", "SP")) # Clear other position slots for i in range(2, 9): patch_params.append((f"pos_{i}", None)) else: patch_params.append(("pos_1", "RP")) closer_rating = stat_row.iat[0, closer_index] if not pd.isna(closer_rating): patch_params.append(("pos_2", "CP")) # Clear remaining position slots for i in range(3, 9): patch_params.append((f"pos_{i}", None)) else: # Clear remaining position slots for i in range(2, 9): patch_params.append((f"pos_{i}", None)) new_player = await db_patch( "players", object_id=player_id, params=patch_params ) new_player["bbref_id"] = row["key_bbref"] all_players.append(new_player) player_deltas.append( [ new_player["player_id"], new_player["p_name"], p_search["cost"], new_player["cost"], p_search["rarity"]["name"], new_player["rarity"]["name"], ] ) else: mlb_player = await mlb_search_or_post(row["key_retro"]) player_payload = new_player_payload(row, pit_rat_df) player_index = pstat_df.index[ pstat_df["key_bbref"] == row["key_bbref"] ].tolist() stat_row = pstat_df.iloc[player_index] starter_rating = stat_row.iat[0, starter_index] if starter_rating >= 4: player_payload["pos_1"] = "SP" else: player_payload["pos_1"] = "RP" closer_rating = stat_row.iat[0, closer_index] if not pd.isna(closer_rating): player_payload["pos_2"] = "CP" new_player = await db_post("players", payload=player_payload) if "id" in new_player: player_id = new_player["id"] else: player_id = new_player["player_id"] new_player = await db_patch( "players", object_id=player_id, params=[ ( "image", f'{CARD_BASE_URL}{player_id}/pitchingcard{urllib.parse.quote("?d=")}{RELEASE_DIRECTORY}', ) ], ) if "paperdex" in new_player: del new_player["paperdex"] new_player["bbref_id"] = row["key_bbref"] all_players.append(new_player) new_players.append( [ new_player["player_id"], new_player["p_name"], new_player["cost"], new_player["rarity"]["name"], new_player["pos_1"], ] ) dev_count += 1 else: raise KeyError("Could not get players - not enough stat DFs were supplied") pd.DataFrame(player_deltas[1:], columns=player_deltas[0]).to_csv( f'{"batter" if bstat_df is not None else "pitcher"}-deltas.csv' ) pd.DataFrame(new_players[1:], columns=new_players[0]).to_csv( f'new-{"batter" if bstat_df is not None else "pitcher"}s.csv' ) players_df = pd.DataFrame(all_players).set_index("bbref_id") return players_df async def post_batting_cards(cards_df: pd.DataFrame): all_cards = [] cards_df.apply( lambda x: all_cards.append( { "player_id": int(x["player_id"]), "steal_low": x["steal_low"], "steal_high": x["steal_high"], "steal_auto": x["steal_auto"], "steal_jump": x["steal_jump"], "bunting": x["bunt"], "hit_and_run": x["hit_and_run"], "running": x["running"], "hand": x["hand"], } ), axis=1, ) resp = await db_put("battingcards", payload={"cards": all_cards}, timeout=6) if resp is not None: pass else: log_exception(ValueError, "Unable to post batting cards") bc_query = await db_get("battingcards", params=[("cardset_id", CARDSET_ID)]) if bc_query["count"] > 0: bc_data = bc_query["cards"] for line in bc_data: line["player_id"] = line["player"]["player_id"] line["key_bbref"] = line["player"]["bbref_id"] line["battingcard_id"] = line["id"] return pd.DataFrame(bc_data) else: log_exception(ValueError, "Unable to pull newly posted batting cards") async def post_pitching_cards(cards_df: pd.DataFrame): all_cards = [] def get_closer_rating(raw_rating): try: if pd.isnull(raw_rating): return None else: return raw_rating except AttributeError: return None cards_df.apply( lambda x: all_cards.append( { "player_id": int(x["player_id"]), "balk": x["balk"], "wild_pitch": x["wild_pitch"], "hold": x["hold"], "starter_rating": x["starter_rating"], "relief_rating": x["relief_rating"], "closer_rating": get_closer_rating(x["closer_rating"]), "batting": x["batting"], "hand": x["pitch_hand"].upper(), } ), axis=1, ) resp = await db_put("pitchingcards", payload={"cards": all_cards}, timeout=6) if resp is not None: pass else: log_exception(ValueError, "Unable to post pitcher cards") pc_query = await db_get("pitchingcards", params=[("cardset_id", CARDSET_ID)]) if pc_query["count"] > 0: pc_data = pc_query["cards"] if PLAYER_DESCRIPTION.lower() not in ["live", "1998"]: pc_data = [ x for x in pc_query["cards"] if x["player"]["mlbplayer"]["key_retro"] in PROMO_INCLUSION_RETRO_IDS ] for line in pc_data: line["player_id"] = line["player"]["player_id"] line["key_bbref"] = line["player"]["bbref_id"] line["pitchingcard_id"] = line["id"] return pd.DataFrame(pc_data) else: log_exception(ValueError, "Unable to pull newly posted pitcher cards") async def post_batting_ratings(ratings_df: pd.DataFrame): all_ratings = [] def append_ratings(row): vl = row["ratings_vL"] vl["player_id"] = row["player_id"] vl["battingcard_id"] = row["battingcard_id"] vr = row["ratings_vR"] vr["player_id"] = row["player_id"] vr["battingcard_id"] = row["battingcard_id"] all_ratings.append(vl) all_ratings.append(vr) ratings_df.apply(append_ratings, axis=1) resp = await db_put( "battingcardratings", payload={"ratings": all_ratings}, timeout=6 ) if resp is not None: return True else: log_exception(ValueError, "Unable to post batting ratings") async def post_pitching_ratings(ratings_df: pd.DataFrame): all_ratings = [] def append_ratings(row): vl = row["ratings_vL"] vl["player_id"] = row["player_id"] vl["pitchingcard_id"] = row["pitchingcard_id"] vr = row["ratings_vR"] vr["player_id"] = row["player_id"] vr["pitchingcard_id"] = row["pitchingcard_id"] all_ratings.append(vl) all_ratings.append(vr) ratings_df.apply(append_ratings, axis=1) resp = await db_put( "pitchingcardratings", payload={"ratings": all_ratings}, timeout=6 ) if resp is not None: return True else: log_exception(ValueError, "Unable to post pitching ratings") async def post_positions(pos_df: pd.DataFrame, delete_existing: bool = False): # Delete existing cardpositions ONLY for players in this run to avoid stale data # (e.g., DH positions from buggy runs where outfielders had no defensive positions) # Only delete on the first call (batters), not the second call (pitchers) if delete_existing: player_ids = pos_df["player_id"].unique().tolist() logger.info( f"Deleting existing cardpositions for {len(player_ids)} players in current run" ) existing_positions = await db_get( "cardpositions", params=[("cardset_id", CARDSET_ID)] ) if existing_positions and existing_positions.get("count", 0) > 0: deleted_count = 0 for pos in existing_positions["positions"]: # Only delete positions for players being processed in this run if pos["player"]["player_id"] in player_ids: try: await db_delete("cardpositions", object_id=pos["id"], timeout=1) deleted_count += 1 except Exception as e: logger.warning( f'Failed to delete cardposition {pos["id"]}: {e}' ) logger.info(f"Deleted {deleted_count} positions for players in current run") all_pos = [] def append_positions(row): clean_row = row.dropna() new_val = clean_row.to_dict() new_val["player_id"] = int(row["player_id"]) all_pos.append(new_val) pos_df.apply(append_positions, axis=1) resp = await db_put("cardpositions", payload={"positions": all_pos}, timeout=6) if resp is not None: return True else: log_exception(ValueError, "Unable to post positions") async def post_batter_data( bs: pd.DataFrame, bc: pd.DataFrame, br: pd.DataFrame, dr: pd.DataFrame ) -> int: all_players = await get_or_post_players(bstat_df=bs, bat_rat_df=br, def_rat_df=dr) # Post Batting Cards bc = pd.merge( left=bc, right=all_players, how="left", left_on="key_bbref", right_on="bbref_id" ) bc = await post_batting_cards(bc) # Post Batting Ratings # Only merge the columns we need to avoid corrupting dict columns in br br = pd.merge( left=br, right=bc[["key_bbref", "player_id", "battingcard_id"]], how="left", left_on="key_bbref", right_on="key_bbref", ) br = await post_batting_ratings(br) # Post Positions dr = pd.merge( left=dr, right=all_players, how="right", # 'left', left_on="key_bbref", right_on="bbref_id", ) await post_positions(dr, delete_existing=True) # Delete on first call (batters) return len(all_players) async def post_pitcher_data( ps: pd.DataFrame, pc: pd.DataFrame, pr: pd.DataFrame, dr: pd.DataFrame ) -> int: all_players = await get_or_post_players(pstat_df=ps, pit_rat_df=pr, def_rat_df=dr) ps = pd.merge( left=all_players, right=ps, how="left", left_on="bbref_id", right_on="key_bbref" ) # Post Pitching Cards pc = await post_pitching_cards(ps) # Post Pitching Ratings # Only merge the columns we need to avoid corrupting dict columns in pr pr = pd.merge( left=pr, right=pc[["key_bbref", "player_id", "pitchingcard_id"]], how="left", left_on="key_bbref", right_on="key_bbref", ) pr = await post_pitching_ratings(pr) # Post Positions dr = pd.merge( left=all_players, right=dr, how="left", left_on="bbref_id", right_on="key_bbref" ) await post_positions( dr, delete_existing=False ) # Don't delete on second call (pitchers) return len(all_players) async def run_batters( data_input_path: str, start_date: int, end_date: int, post_data: bool = False, season_pct: float = 1.0, ): print("Running the batter calcs...") # batter_start = datetime.datetime.now() # Get batting stats batting_stats = get_batting_stats_by_date( f"{RETRO_FILE_PATH}{EVENTS_FILENAME}", start_date=start_date, end_date=end_date ) bs_len = len(batting_stats) # end_calc = datetime.datetime.now() # print(f'Combined batting stats: {(end_calc - batter_start).total_seconds():.2f}s\n') running_start = datetime.datetime.now() # Get running stats running_stats = get_run_stat_df(data_input_path) batting_stats = pd.merge( left=batting_stats, right=running_stats, how="left", left_on="key_bbref", right_on="key_bbref", ) # Handle players who played for multiple teams - keep only highest-level combined totals # Players traded during season have multiple rows: one per team + one combined (2TM, 3TM, etc.) # Prefer: 3TM > 2TM > TOT > individual teams duplicated_mask = batting_stats["key_bbref"].duplicated(keep=False) if duplicated_mask.any(): # Sort by Tm (descending) to prioritize higher-numbered combined totals (3TM > 2TM) # Then drop duplicates, keeping only the first (highest priority) row per player batting_stats = batting_stats.sort_values("Tm", ascending=False) batting_stats = batting_stats.drop_duplicates(subset="key_bbref", keep="first") logger.info("Removed team-specific rows for traded batters") bs_len = len(batting_stats) # Update length after removing duplicates end_calc = datetime.datetime.now() print(f"Running stats: {(end_calc - running_start).total_seconds():.2f}s") if len(batting_stats) != bs_len: raise DataMismatchError( f"retrosheet_data - run_batters - We started with {bs_len} batting lines and have {len(batting_stats)} after merging with running_stats" ) # Calculate batting cards card_start = datetime.datetime.now() all_batting_cards = calc_batting_cards(batting_stats, season_pct) card_end = datetime.datetime.now() print(f"Create batting cards: {(card_end - card_start).total_seconds():.2f}s") # Calculate batting ratings rating_start = datetime.datetime.now() batting_stats["battingcard_id"] = batting_stats["key_fangraphs"] all_batting_ratings = calc_batter_ratings(batting_stats) rating_end = datetime.datetime.now() print(f"Create batting ratings: {(rating_end - rating_start).total_seconds():.2f}s") # Calculate defense ratings defense_start = datetime.datetime.now() all_defense_ratings = calc_positions(batting_stats) defense_end = datetime.datetime.now() print( f"Create defense ratings: {(defense_end - defense_start).total_seconds():.2f}s" ) # Post all data if post_data: print("Posting player data...") post_start = datetime.datetime.now() num_players = await post_batter_data( batting_stats, all_batting_cards, all_batting_ratings, all_defense_ratings ) post_end = datetime.datetime.now() print(f"Post player data: {(post_end - post_start).total_seconds()}s") post_msg = f"Posted {num_players} players to the database" logger.info(post_msg) print(post_msg) else: post_msg = f"{batting_stats.index.size} total batters\n\nPlayers are NOT being posted to the database" logger.warning(post_msg) print(post_msg) return batting_stats async def run_pitchers( data_input_path: str, start_date: int, end_date: int, post_data: bool = False, season_pct: float = 1.0, ): # Get pitching stats pitching_stats = get_pitching_stats_by_date( f"{RETRO_FILE_PATH}{EVENTS_FILENAME}", start_date=start_date, end_date=end_date ) # Get peripheral stats start_time = datetime.datetime.now() periph_stats = get_periph_stat_df(data_input_path) pitching_stats = pd.merge( left=pitching_stats, right=periph_stats, how="left", left_on="key_bbref", right_on="key_bbref", ) # Handle players who played for multiple teams - keep only highest-level combined totals # Players traded during season have multiple rows: one per team + one combined (2TM, 3TM, etc.) # Prefer: 3TM > 2TM > TOT > individual teams duplicated_mask = pitching_stats["key_bbref"].duplicated(keep=False) if duplicated_mask.any(): # Sort by Tm (descending) to prioritize higher-numbered combined totals (3TM > 2TM) # Then drop duplicates, keeping only the first (highest priority) row per player pitching_stats = pitching_stats.sort_values("Tm", ascending=False) pitching_stats = pitching_stats.drop_duplicates( subset="key_bbref", keep="first" ) logger.info("Removed team-specific rows for traded players") end_time = datetime.datetime.now() print(f"Peripheral stats: {(end_time - start_time).total_seconds():.2f}s") # Calculate defense ratings start_time = datetime.datetime.now() df_p = pd.read_csv(f"{DATA_INPUT_FILE_PATH}defense_p.csv").set_index("key_bbref") # Drop 'Tm' from defense data to avoid column name conflicts (we already have it from periph_stats) if "Tm" in df_p.columns: df_p = df_p.drop(columns=["Tm"]) pitching_stats = pd.merge( left=pitching_stats, right=df_p, how="left", left_on="key_bbref", right_on="key_bbref", ) pitching_stats = pitching_stats.fillna(0) all_defense_ratings = calc_pitcher_defense(pitching_stats) end_time = datetime.datetime.now() print(f"Defense stats: {(end_time - start_time).total_seconds():.2f}s") # Calculate pitching cards start_time = datetime.datetime.now() all_pitching_cards = calc_pitching_cards(pitching_stats, season_pct) pitching_stats = pd.merge( left=pitching_stats, right=all_pitching_cards, how="left", left_on="key_bbref", right_on="key_bbref", ) end_time = datetime.datetime.now() print(f"Pit cards: {(end_time - start_time).total_seconds():.2f}s") # Calculate pitching card ratings start_time = datetime.datetime.now() all_pitching_ratings = calc_pitcher_ratings(pitching_stats) end_time = datetime.datetime.now() print(f"Pit ratings: {(end_time - start_time).total_seconds():.2f}s") # Post all data if post_data: print("\nPosting player data...") post_start = datetime.datetime.now() num_players = await post_pitcher_data( pitching_stats, all_pitching_cards, all_pitching_ratings, all_defense_ratings, ) post_end = datetime.datetime.now() print(f"Post player data: {(post_end - post_start).total_seconds()}s") post_msg = f"\nPosted {num_players} pitchers to the database" logger.info(post_msg) print(post_msg) else: post_msg = f"{pitching_stats.index.size} total pitchers\n\nPlayers are NOT being posted to the database" logger.warning(post_msg) print(post_msg) return pitching_stats async def main(args): if len(PROMO_INCLUSION_RETRO_IDS) > 0 and PLAYER_DESCRIPTION == "Live": msg = f"Player description is set to *Live*, but there are {len(PROMO_INCLUSION_RETRO_IDS)} IDs in the promo inclusion list. Clear the promo list or change the player description." log_exception(ValueError, msg=msg, level="error") # Temporarily commented out for Ryan Zimmerman full season run # if weeks_between(START_DATE, END_DATE) > 5 and len(PROMO_INCLUSION_RETRO_IDS) > 0: # msg = f'More than 5 weeks are included for a promo cardset. Please adjust START_DATE and/or END_DATE.' # log_exception(ValueError, msg=msg, level='error') batter_start = datetime.datetime.now() batting_stats = await run_batters( f"{DATA_INPUT_FILE_PATH}", start_date=START_DATE, end_date=END_DATE, post_data=POST_DATA, season_pct=SEASON_PCT, ) batting_stats.to_csv("batting_stats.csv") batter_end = datetime.datetime.now() print(f"\nBatter time: {(batter_end - batter_start).total_seconds():.2f}s\n") pitcher_start = datetime.datetime.now() pitching_stats = await run_pitchers( f"{DATA_INPUT_FILE_PATH}", start_date=START_DATE, end_date=END_DATE, post_data=POST_DATA, season_pct=SEASON_PCT, ) pitching_stats.to_csv("pitching_stats.csv") pitcher_end = datetime.datetime.now() print(f"\nPitcher time: {(pitcher_end - pitcher_start).total_seconds():.2f}s") print(f"Total: {(pitcher_end - batter_start).total_seconds():.2f}s\n\nDone!") # await store_defense_to_csv(1998) if __name__ == "__main__": asyncio.run(main(sys.argv[1:]))