paper-dynasty-card-creation/defenders/retrosheet_arm_calculator.py
2025-11-23 01:28:33 -06:00

488 lines
15 KiB
Python

"""
Retrosheet-based Outfield Arm Rating Calculator
This module calculates outfield arm ratings using play-by-play event data from Retrosheet.
It provides an alternative/supplement to Baseball Reference's bis_runs_outfield metric,
with the advantage of being available for all historical seasons with Retrosheet data.
Usage:
from defenders.retrosheet_arm_calculator import calculate_of_arms_from_retrosheet
arm_ratings = calculate_of_arms_from_retrosheet(
df_events=retrosheet_events_df,
season_pct=1.0
)
"""
import pandas as pd
import numpy as np
from typing import Tuple, Dict, Optional
from pathlib import Path
from exceptions import logger
def calculate_position_baselines(df_events: pd.DataFrame) -> Dict[str, Tuple[float, float]]:
"""
Calculate league-average and standard deviation for each OF position.
These baselines are used to convert raw arm scores to z-scores,
allowing for position-adjusted ratings.
Args:
df_events: DataFrame of retrosheet events for the season
Returns:
Dict mapping position to (mean, stddev) tuple
"""
baselines = {}
for of_pos, a_col, po_col, fielder_col in [
('LF', 'a7', 'po7', 'f7'), # f7 = fielder at position 7 (LF)
('CF', 'a8', 'po8', 'f8'), # f8 = fielder at position 8 (CF)
('RF', 'a9', 'po9', 'f9') # f9 = fielder at position 9 (RF)
]:
fielders = df_events[fielder_col].dropna().unique()
scores = []
for fielder in fielders:
if fielder == '' or pd.isna(fielder):
continue
player_plays = df_events[df_events[fielder_col] == fielder]
balls_fielded = player_plays[
(player_plays[po_col] > 0) | (player_plays[a_col] > 0)
].shape[0]
if balls_fielded < 50: # Minimum sample
continue
# Calculate raw score for this player
score = _calculate_raw_arm_score(player_plays, a_col, po_col)
scores.append(score)
if len(scores) > 0:
baselines[of_pos] = (np.mean(scores), np.std(scores))
else:
baselines[of_pos] = (0.0, 1.0) # Default if no qualified players
logger.info(f"Position baselines calculated: {baselines}")
return baselines
def _calculate_raw_arm_score(
player_plays: pd.DataFrame,
a_col: str,
po_col: str
) -> float:
"""
Calculate raw arm score for a player at a specific position.
Formula:
(assist_rate * 30) +
(throwout_rate * 5) +
(home_throws * 2) +
(batter_extra_outs * 1.5) +
(total_assists * 0.5)
Args:
player_plays: DataFrame of plays where player was at this position
a_col: Assist column name (a7, a8, or a9)
po_col: Putout column name (po7, po8, or po9)
Returns:
Raw arm score (before normalization)
"""
fielder_num = int(a_col[-1])
# Basic counting stats
balls_fielded = player_plays[
(player_plays[po_col] > 0) | (player_plays[a_col] > 0)
].shape[0]
total_assists = player_plays[player_plays[a_col] > 0].shape[0]
# Throwouts (any assist that resulted in an out)
throwouts = player_plays[
(player_plays[a_col] > 0) &
((player_plays['brout1'] == fielder_num) |
(player_plays['brout2'] == fielder_num) |
(player_plays['brout3'] == fielder_num) |
(player_plays['brout_b'] == fielder_num))
].shape[0]
# High-value throws to home plate
home_throws = player_plays[
(player_plays[a_col] > 0) &
((player_plays['brout1'] == fielder_num) |
(player_plays['brout2'] == fielder_num) |
(player_plays['brout3'] == fielder_num))
].shape[0]
# Batter thrown out trying for extra base
batter_extra_outs = player_plays[
(player_plays[a_col] > 0) &
(player_plays['brout_b'] == fielder_num)
].shape[0]
# Calculate rates
assist_rate = total_assists / balls_fielded if balls_fielded > 0 else 0
# Composite score (Rate-Dominant Formula)
# Assists are already outs by definition, so no need for separate throwout_rate
raw_score = (
(assist_rate * 300) + # PRIMARY: Assist rate dominates
(home_throws * 1.0) + # Quality: home plate throws
(batter_extra_outs * 1.0) + # Quality: preventing extra bases
(total_assists * 0.1) # Minimal volume bonus
)
return raw_score
def _z_score_to_rating(z_score: float) -> int:
"""
Convert z-score to arm rating on -6 to +5 scale.
Thresholds adjusted to match actual data distribution after formula changes.
The 300x weight on assist_rate compressed the z-score spread, so thresholds
are calibrated to ensure full -6 to +5 range is used.
Args:
z_score: Standardized arm score
Returns:
Arm rating from -6 (elite) to +5 (very weak)
"""
if z_score > 2.5:
return -6 # Elite (top ~1%)
elif z_score > 2.0:
return -5 # Outstanding (top ~2%)
elif z_score > 1.5:
return -4 # Excellent (top ~3%)
elif z_score > 1.0:
return -3 # Very Good (top ~5%)
elif z_score > 0.5:
return -2 # Above Average (top ~15%)
elif z_score > 0.0:
return -1 # Slightly Above
elif z_score > -0.15:
return 0 # Average (target 45-50%)
elif z_score > -0.5:
return 1 # Slightly Below (expanded)
elif z_score > -0.9:
return 2 # Below Average
elif z_score > -1.3:
return 3 # Poor
elif z_score > -1.6:
return 4 # Very Poor
else:
return 5 # Very Weak
def calculate_player_arm_rating(
df_events: pd.DataFrame,
player_bbref_id: str,
baselines: Dict[str, Tuple[float, float]],
season_pct: float = 1.0
) -> int:
"""
Calculate arm rating for a specific player using Retrosheet data.
Args:
df_events: DataFrame of retrosheet events for the season
player_bbref_id: Player's baseball-reference ID (key_bbref)
baselines: Position baselines from calculate_position_baselines()
season_pct: Percentage of season completed (for sample size adjustment)
Returns:
Arm rating from -6 (elite) to +5 (very weak), or 0 if insufficient data
"""
z_scores = []
for of_pos, a_col, po_col, fielder_col in [
('LF', 'a7', 'po7', 'f7'), # f7 = fielder at position 7 (LF)
('CF', 'a8', 'po8', 'f8'), # f8 = fielder at position 8 (CF)
('RF', 'a9', 'po9', 'f9') # f9 = fielder at position 9 (RF)
]:
# Get all plays at this position for this player
player_plays = df_events[df_events[fielder_col] == player_bbref_id]
if len(player_plays) == 0:
continue
# Check minimum sample size
balls_fielded = player_plays[
(player_plays[po_col] > 0) | (player_plays[a_col] > 0)
].shape[0]
min_sample = int(50 * season_pct)
if balls_fielded < min_sample:
logger.debug(
f"{player_bbref_id} at {of_pos}: {balls_fielded} balls fielded "
f"(< {min_sample} minimum)"
)
continue
# Calculate raw score
raw_score = _calculate_raw_arm_score(player_plays, a_col, po_col)
# Convert to z-score using position baselines
pos_mean, pos_std = baselines.get(of_pos, (0.0, 1.0))
if pos_std > 0:
z_score = (raw_score - pos_mean) / pos_std
else:
z_score = 0.0
logger.debug(
f"{player_bbref_id} at {of_pos}: raw={raw_score:.2f}, "
f"z={z_score:.2f}, balls={balls_fielded}"
)
z_scores.append(z_score)
if not z_scores:
logger.debug(f"{player_bbref_id}: No qualifying OF positions, returning 0")
return 0 # No data or insufficient sample
# Use maximum z-score (best arm showing)
max_z = max(z_scores)
rating = _z_score_to_rating(max_z)
logger.info(
f"{player_bbref_id}: max_z={max_z:.2f}, arm_rating={rating}"
)
return rating
def calculate_of_arms_from_retrosheet(
df_events: pd.DataFrame,
season_pct: float = 1.0
) -> Dict[str, int]:
"""
Calculate arm ratings for all qualifying outfielders in the dataset.
This is the main entry point for batch calculation of arm ratings.
Args:
df_events: DataFrame of retrosheet events for the season
season_pct: Percentage of season completed (default 1.0 for full season)
Returns:
Dict mapping player key_bbref to arm rating
"""
logger.info("Calculating position baselines...")
baselines = calculate_position_baselines(df_events)
logger.info("Calculating individual player arm ratings...")
arm_ratings = {}
# Get all unique outfielders from fielder columns (not lineup)
all_fielders = set()
for col in ['f7', 'f8', 'f9']: # f7/f8/f9 = actual fielders, not lineup spots
all_fielders.update(df_events[col].dropna().unique())
all_fielders.discard('') # Remove empty strings
logger.info(f"Found {len(all_fielders)} unique outfielders")
for player_id in all_fielders:
if pd.isna(player_id) or player_id == '':
continue
rating = calculate_player_arm_rating(
df_events, player_id, baselines, season_pct
)
arm_ratings[player_id] = rating
# Log summary stats
ratings_dist = pd.Series(list(arm_ratings.values())).value_counts().sort_index()
logger.info(f"Arm ratings distribution:\n{ratings_dist}")
return arm_ratings
def get_arm_for_player(
arm_ratings: Dict[str, int],
player_bbref_id: str,
default: int = 0
) -> int:
"""
Lookup arm rating for a player, with default fallback.
Args:
arm_ratings: Dict from calculate_of_arms_from_retrosheet()
player_bbref_id: Player's key_bbref
default: Default rating if player not found (default: 0 = average)
Returns:
Arm rating for the player
"""
return arm_ratings.get(player_bbref_id, default)
def save_arm_ratings_to_csv(
df_events: pd.DataFrame,
season_year: int,
output_dir: str = 'data-output',
season_pct: float = 1.0
) -> str:
"""
Calculate and save arm ratings to CSV file for future reference.
Args:
df_events: DataFrame of retrosheet events
season_year: Year of the season (e.g., 2005)
output_dir: Directory to save output file (default: 'data-output')
season_pct: Percentage of season completed (default: 1.0)
Returns:
Path to the saved CSV file
"""
logger.info(f"Calculating arm ratings for {season_year} season...")
# Calculate position baselines
baselines = calculate_position_baselines(df_events)
# Collect detailed stats for each player
player_data = []
for of_pos, a_col, po_col, fielder_col in [
('LF', 'a7', 'po7', 'f7'),
('CF', 'a8', 'po8', 'f8'),
('RF', 'a9', 'po9', 'f9')
]:
fielders = df_events[fielder_col].dropna().unique()
for fielder in fielders:
if fielder == '' or pd.isna(fielder):
continue
player_plays = df_events[df_events[fielder_col] == fielder]
fielder_num = int(a_col[-1])
# Check minimum sample size
balls_fielded = player_plays[
(player_plays[po_col] > 0) | (player_plays[a_col] > 0)
].shape[0]
min_sample = int(50 * season_pct)
if balls_fielded < min_sample:
continue
# Calculate stats
total_assists = player_plays[player_plays[a_col] > 0].shape[0]
home_throws = player_plays[
(player_plays[a_col] > 0) &
((player_plays['brout1'] == fielder_num) |
(player_plays['brout2'] == fielder_num) |
(player_plays['brout3'] == fielder_num))
].shape[0]
batter_extra_outs = player_plays[
(player_plays[a_col] > 0) &
(player_plays['brout_b'] == fielder_num)
].shape[0]
assist_rate = total_assists / balls_fielded if balls_fielded > 0 else 0
# Calculate raw score and rating
raw_score = _calculate_raw_arm_score(player_plays, a_col, po_col)
pos_mean, pos_std = baselines[of_pos]
z_score = (raw_score - pos_mean) / pos_std if pos_std > 0 else 0
rating = _z_score_to_rating(z_score)
player_data.append({
'player_id': fielder,
'position': of_pos,
'season': season_year,
'balls_fielded': balls_fielded,
'total_assists': total_assists,
'home_throws': home_throws,
'batter_extra_outs': batter_extra_outs,
'assist_rate': round(assist_rate, 4),
'raw_score': round(raw_score, 2),
'z_score': round(z_score, 2),
'arm_rating': rating
})
# Create DataFrame
df_ratings = pd.DataFrame(player_data)
# Sort by arm rating (best first), then by player_id
df_ratings = df_ratings.sort_values(['arm_rating', 'player_id'])
# Create output directory if needed
output_path = Path(output_dir)
output_path.mkdir(exist_ok=True)
# Save to CSV
filename = f"retrosheet_arm_ratings_{season_year}.csv"
filepath = output_path / filename
df_ratings.to_csv(filepath, index=False)
logger.info(f"Saved {len(df_ratings)} player arm ratings to {filepath}")
logger.info(f"Rating distribution:")
for rating in sorted(df_ratings['arm_rating'].unique()):
count = (df_ratings['arm_rating'] == rating).sum()
logger.info(f" Rating {rating:+2d}: {count} players")
return str(filepath)
def load_arm_ratings_from_csv(
season_year: int,
input_dir: str = 'data-output'
) -> Dict[str, int]:
"""
Load arm ratings from previously saved CSV file.
Args:
season_year: Year of the season (e.g., 2005)
input_dir: Directory containing the CSV file (default: 'data-output')
Returns:
Dict mapping player_id to arm_rating
"""
filename = f"retrosheet_arm_ratings_{season_year}.csv"
filepath = Path(input_dir) / filename
if not filepath.exists():
logger.error(f"Arm ratings file not found: {filepath}")
raise FileNotFoundError(f"Arm ratings file not found: {filepath}")
df = pd.read_csv(filepath)
logger.info(f"Loaded {len(df)} player arm ratings from {filepath}")
# For players who played multiple OF positions, use the best (lowest) rating
arm_ratings = {}
for _, row in df.iterrows():
player_id = row['player_id']
rating = row['arm_rating']
if player_id not in arm_ratings:
arm_ratings[player_id] = rating
else:
# Keep the better (lower) rating
arm_ratings[player_id] = min(arm_ratings[player_id], rating)
return arm_ratings
# Example usage in retrosheet_data.py:
"""
# At the top of retrosheet_data.py, after loading events:
from defenders.retrosheet_arm_calculator import calculate_of_arms_from_retrosheet
# After loading retrosheet events
df_events = pd.read_csv(EVENTS_FILENAME)
retrosheet_arm_ratings = calculate_of_arms_from_retrosheet(df_events, SEASON_PCT)
# In create_positions(), replace the current arm_outfield() call:
# OLD:
# arm_rating = arm_outfield(of_arms)
# NEW:
from defenders.retrosheet_arm_calculator import get_arm_for_player
arm_rating = get_arm_for_player(retrosheet_arm_ratings, df_data['key_bbref'])
"""