488 lines
15 KiB
Python
488 lines
15 KiB
Python
"""
|
|
Retrosheet-based Outfield Arm Rating Calculator
|
|
|
|
This module calculates outfield arm ratings using play-by-play event data from Retrosheet.
|
|
It provides an alternative/supplement to Baseball Reference's bis_runs_outfield metric,
|
|
with the advantage of being available for all historical seasons with Retrosheet data.
|
|
|
|
Usage:
|
|
from defenders.retrosheet_arm_calculator import calculate_of_arms_from_retrosheet
|
|
|
|
arm_ratings = calculate_of_arms_from_retrosheet(
|
|
df_events=retrosheet_events_df,
|
|
season_pct=1.0
|
|
)
|
|
"""
|
|
|
|
import pandas as pd
|
|
import numpy as np
|
|
from typing import Tuple, Dict, Optional
|
|
from pathlib import Path
|
|
from exceptions import logger
|
|
|
|
|
|
def calculate_position_baselines(df_events: pd.DataFrame) -> Dict[str, Tuple[float, float]]:
|
|
"""
|
|
Calculate league-average and standard deviation for each OF position.
|
|
|
|
These baselines are used to convert raw arm scores to z-scores,
|
|
allowing for position-adjusted ratings.
|
|
|
|
Args:
|
|
df_events: DataFrame of retrosheet events for the season
|
|
|
|
Returns:
|
|
Dict mapping position to (mean, stddev) tuple
|
|
"""
|
|
baselines = {}
|
|
|
|
for of_pos, a_col, po_col, fielder_col in [
|
|
('LF', 'a7', 'po7', 'f7'), # f7 = fielder at position 7 (LF)
|
|
('CF', 'a8', 'po8', 'f8'), # f8 = fielder at position 8 (CF)
|
|
('RF', 'a9', 'po9', 'f9') # f9 = fielder at position 9 (RF)
|
|
]:
|
|
fielders = df_events[fielder_col].dropna().unique()
|
|
scores = []
|
|
|
|
for fielder in fielders:
|
|
if fielder == '' or pd.isna(fielder):
|
|
continue
|
|
|
|
player_plays = df_events[df_events[fielder_col] == fielder]
|
|
balls_fielded = player_plays[
|
|
(player_plays[po_col] > 0) | (player_plays[a_col] > 0)
|
|
].shape[0]
|
|
|
|
if balls_fielded < 50: # Minimum sample
|
|
continue
|
|
|
|
# Calculate raw score for this player
|
|
score = _calculate_raw_arm_score(player_plays, a_col, po_col)
|
|
scores.append(score)
|
|
|
|
if len(scores) > 0:
|
|
baselines[of_pos] = (np.mean(scores), np.std(scores))
|
|
else:
|
|
baselines[of_pos] = (0.0, 1.0) # Default if no qualified players
|
|
|
|
logger.info(f"Position baselines calculated: {baselines}")
|
|
return baselines
|
|
|
|
|
|
def _calculate_raw_arm_score(
|
|
player_plays: pd.DataFrame,
|
|
a_col: str,
|
|
po_col: str
|
|
) -> float:
|
|
"""
|
|
Calculate raw arm score for a player at a specific position.
|
|
|
|
Formula:
|
|
(assist_rate * 30) +
|
|
(throwout_rate * 5) +
|
|
(home_throws * 2) +
|
|
(batter_extra_outs * 1.5) +
|
|
(total_assists * 0.5)
|
|
|
|
Args:
|
|
player_plays: DataFrame of plays where player was at this position
|
|
a_col: Assist column name (a7, a8, or a9)
|
|
po_col: Putout column name (po7, po8, or po9)
|
|
|
|
Returns:
|
|
Raw arm score (before normalization)
|
|
"""
|
|
fielder_num = int(a_col[-1])
|
|
|
|
# Basic counting stats
|
|
balls_fielded = player_plays[
|
|
(player_plays[po_col] > 0) | (player_plays[a_col] > 0)
|
|
].shape[0]
|
|
|
|
total_assists = player_plays[player_plays[a_col] > 0].shape[0]
|
|
|
|
# Throwouts (any assist that resulted in an out)
|
|
throwouts = player_plays[
|
|
(player_plays[a_col] > 0) &
|
|
((player_plays['brout1'] == fielder_num) |
|
|
(player_plays['brout2'] == fielder_num) |
|
|
(player_plays['brout3'] == fielder_num) |
|
|
(player_plays['brout_b'] == fielder_num))
|
|
].shape[0]
|
|
|
|
# High-value throws to home plate
|
|
home_throws = player_plays[
|
|
(player_plays[a_col] > 0) &
|
|
((player_plays['brout1'] == fielder_num) |
|
|
(player_plays['brout2'] == fielder_num) |
|
|
(player_plays['brout3'] == fielder_num))
|
|
].shape[0]
|
|
|
|
# Batter thrown out trying for extra base
|
|
batter_extra_outs = player_plays[
|
|
(player_plays[a_col] > 0) &
|
|
(player_plays['brout_b'] == fielder_num)
|
|
].shape[0]
|
|
|
|
# Calculate rates
|
|
assist_rate = total_assists / balls_fielded if balls_fielded > 0 else 0
|
|
|
|
# Composite score (Rate-Dominant Formula)
|
|
# Assists are already outs by definition, so no need for separate throwout_rate
|
|
raw_score = (
|
|
(assist_rate * 300) + # PRIMARY: Assist rate dominates
|
|
(home_throws * 1.0) + # Quality: home plate throws
|
|
(batter_extra_outs * 1.0) + # Quality: preventing extra bases
|
|
(total_assists * 0.1) # Minimal volume bonus
|
|
)
|
|
|
|
return raw_score
|
|
|
|
|
|
def _z_score_to_rating(z_score: float) -> int:
|
|
"""
|
|
Convert z-score to arm rating on -6 to +5 scale.
|
|
|
|
Thresholds adjusted to match actual data distribution after formula changes.
|
|
The 300x weight on assist_rate compressed the z-score spread, so thresholds
|
|
are calibrated to ensure full -6 to +5 range is used.
|
|
|
|
Args:
|
|
z_score: Standardized arm score
|
|
|
|
Returns:
|
|
Arm rating from -6 (elite) to +5 (very weak)
|
|
"""
|
|
if z_score > 2.5:
|
|
return -6 # Elite (top ~1%)
|
|
elif z_score > 2.0:
|
|
return -5 # Outstanding (top ~2%)
|
|
elif z_score > 1.5:
|
|
return -4 # Excellent (top ~3%)
|
|
elif z_score > 1.0:
|
|
return -3 # Very Good (top ~5%)
|
|
elif z_score > 0.5:
|
|
return -2 # Above Average (top ~15%)
|
|
elif z_score > 0.0:
|
|
return -1 # Slightly Above
|
|
elif z_score > -0.15:
|
|
return 0 # Average (target 45-50%)
|
|
elif z_score > -0.5:
|
|
return 1 # Slightly Below (expanded)
|
|
elif z_score > -0.9:
|
|
return 2 # Below Average
|
|
elif z_score > -1.3:
|
|
return 3 # Poor
|
|
elif z_score > -1.6:
|
|
return 4 # Very Poor
|
|
else:
|
|
return 5 # Very Weak
|
|
|
|
|
|
def calculate_player_arm_rating(
|
|
df_events: pd.DataFrame,
|
|
player_bbref_id: str,
|
|
baselines: Dict[str, Tuple[float, float]],
|
|
season_pct: float = 1.0
|
|
) -> int:
|
|
"""
|
|
Calculate arm rating for a specific player using Retrosheet data.
|
|
|
|
Args:
|
|
df_events: DataFrame of retrosheet events for the season
|
|
player_bbref_id: Player's baseball-reference ID (key_bbref)
|
|
baselines: Position baselines from calculate_position_baselines()
|
|
season_pct: Percentage of season completed (for sample size adjustment)
|
|
|
|
Returns:
|
|
Arm rating from -6 (elite) to +5 (very weak), or 0 if insufficient data
|
|
"""
|
|
z_scores = []
|
|
|
|
for of_pos, a_col, po_col, fielder_col in [
|
|
('LF', 'a7', 'po7', 'f7'), # f7 = fielder at position 7 (LF)
|
|
('CF', 'a8', 'po8', 'f8'), # f8 = fielder at position 8 (CF)
|
|
('RF', 'a9', 'po9', 'f9') # f9 = fielder at position 9 (RF)
|
|
]:
|
|
# Get all plays at this position for this player
|
|
player_plays = df_events[df_events[fielder_col] == player_bbref_id]
|
|
|
|
if len(player_plays) == 0:
|
|
continue
|
|
|
|
# Check minimum sample size
|
|
balls_fielded = player_plays[
|
|
(player_plays[po_col] > 0) | (player_plays[a_col] > 0)
|
|
].shape[0]
|
|
|
|
min_sample = int(50 * season_pct)
|
|
if balls_fielded < min_sample:
|
|
logger.debug(
|
|
f"{player_bbref_id} at {of_pos}: {balls_fielded} balls fielded "
|
|
f"(< {min_sample} minimum)"
|
|
)
|
|
continue
|
|
|
|
# Calculate raw score
|
|
raw_score = _calculate_raw_arm_score(player_plays, a_col, po_col)
|
|
|
|
# Convert to z-score using position baselines
|
|
pos_mean, pos_std = baselines.get(of_pos, (0.0, 1.0))
|
|
if pos_std > 0:
|
|
z_score = (raw_score - pos_mean) / pos_std
|
|
else:
|
|
z_score = 0.0
|
|
|
|
logger.debug(
|
|
f"{player_bbref_id} at {of_pos}: raw={raw_score:.2f}, "
|
|
f"z={z_score:.2f}, balls={balls_fielded}"
|
|
)
|
|
z_scores.append(z_score)
|
|
|
|
if not z_scores:
|
|
logger.debug(f"{player_bbref_id}: No qualifying OF positions, returning 0")
|
|
return 0 # No data or insufficient sample
|
|
|
|
# Use maximum z-score (best arm showing)
|
|
max_z = max(z_scores)
|
|
rating = _z_score_to_rating(max_z)
|
|
|
|
logger.info(
|
|
f"{player_bbref_id}: max_z={max_z:.2f}, arm_rating={rating}"
|
|
)
|
|
return rating
|
|
|
|
|
|
def calculate_of_arms_from_retrosheet(
|
|
df_events: pd.DataFrame,
|
|
season_pct: float = 1.0
|
|
) -> Dict[str, int]:
|
|
"""
|
|
Calculate arm ratings for all qualifying outfielders in the dataset.
|
|
|
|
This is the main entry point for batch calculation of arm ratings.
|
|
|
|
Args:
|
|
df_events: DataFrame of retrosheet events for the season
|
|
season_pct: Percentage of season completed (default 1.0 for full season)
|
|
|
|
Returns:
|
|
Dict mapping player key_bbref to arm rating
|
|
"""
|
|
logger.info("Calculating position baselines...")
|
|
baselines = calculate_position_baselines(df_events)
|
|
|
|
logger.info("Calculating individual player arm ratings...")
|
|
arm_ratings = {}
|
|
|
|
# Get all unique outfielders from fielder columns (not lineup)
|
|
all_fielders = set()
|
|
for col in ['f7', 'f8', 'f9']: # f7/f8/f9 = actual fielders, not lineup spots
|
|
all_fielders.update(df_events[col].dropna().unique())
|
|
|
|
all_fielders.discard('') # Remove empty strings
|
|
|
|
logger.info(f"Found {len(all_fielders)} unique outfielders")
|
|
|
|
for player_id in all_fielders:
|
|
if pd.isna(player_id) or player_id == '':
|
|
continue
|
|
|
|
rating = calculate_player_arm_rating(
|
|
df_events, player_id, baselines, season_pct
|
|
)
|
|
arm_ratings[player_id] = rating
|
|
|
|
# Log summary stats
|
|
ratings_dist = pd.Series(list(arm_ratings.values())).value_counts().sort_index()
|
|
logger.info(f"Arm ratings distribution:\n{ratings_dist}")
|
|
|
|
return arm_ratings
|
|
|
|
|
|
def get_arm_for_player(
|
|
arm_ratings: Dict[str, int],
|
|
player_bbref_id: str,
|
|
default: int = 0
|
|
) -> int:
|
|
"""
|
|
Lookup arm rating for a player, with default fallback.
|
|
|
|
Args:
|
|
arm_ratings: Dict from calculate_of_arms_from_retrosheet()
|
|
player_bbref_id: Player's key_bbref
|
|
default: Default rating if player not found (default: 0 = average)
|
|
|
|
Returns:
|
|
Arm rating for the player
|
|
"""
|
|
return arm_ratings.get(player_bbref_id, default)
|
|
|
|
|
|
def save_arm_ratings_to_csv(
|
|
df_events: pd.DataFrame,
|
|
season_year: int,
|
|
output_dir: str = 'data-output',
|
|
season_pct: float = 1.0
|
|
) -> str:
|
|
"""
|
|
Calculate and save arm ratings to CSV file for future reference.
|
|
|
|
Args:
|
|
df_events: DataFrame of retrosheet events
|
|
season_year: Year of the season (e.g., 2005)
|
|
output_dir: Directory to save output file (default: 'data-output')
|
|
season_pct: Percentage of season completed (default: 1.0)
|
|
|
|
Returns:
|
|
Path to the saved CSV file
|
|
"""
|
|
logger.info(f"Calculating arm ratings for {season_year} season...")
|
|
|
|
# Calculate position baselines
|
|
baselines = calculate_position_baselines(df_events)
|
|
|
|
# Collect detailed stats for each player
|
|
player_data = []
|
|
|
|
for of_pos, a_col, po_col, fielder_col in [
|
|
('LF', 'a7', 'po7', 'f7'),
|
|
('CF', 'a8', 'po8', 'f8'),
|
|
('RF', 'a9', 'po9', 'f9')
|
|
]:
|
|
fielders = df_events[fielder_col].dropna().unique()
|
|
|
|
for fielder in fielders:
|
|
if fielder == '' or pd.isna(fielder):
|
|
continue
|
|
|
|
player_plays = df_events[df_events[fielder_col] == fielder]
|
|
fielder_num = int(a_col[-1])
|
|
|
|
# Check minimum sample size
|
|
balls_fielded = player_plays[
|
|
(player_plays[po_col] > 0) | (player_plays[a_col] > 0)
|
|
].shape[0]
|
|
|
|
min_sample = int(50 * season_pct)
|
|
if balls_fielded < min_sample:
|
|
continue
|
|
|
|
# Calculate stats
|
|
total_assists = player_plays[player_plays[a_col] > 0].shape[0]
|
|
|
|
home_throws = player_plays[
|
|
(player_plays[a_col] > 0) &
|
|
((player_plays['brout1'] == fielder_num) |
|
|
(player_plays['brout2'] == fielder_num) |
|
|
(player_plays['brout3'] == fielder_num))
|
|
].shape[0]
|
|
|
|
batter_extra_outs = player_plays[
|
|
(player_plays[a_col] > 0) &
|
|
(player_plays['brout_b'] == fielder_num)
|
|
].shape[0]
|
|
|
|
assist_rate = total_assists / balls_fielded if balls_fielded > 0 else 0
|
|
|
|
# Calculate raw score and rating
|
|
raw_score = _calculate_raw_arm_score(player_plays, a_col, po_col)
|
|
pos_mean, pos_std = baselines[of_pos]
|
|
z_score = (raw_score - pos_mean) / pos_std if pos_std > 0 else 0
|
|
rating = _z_score_to_rating(z_score)
|
|
|
|
player_data.append({
|
|
'player_id': fielder,
|
|
'position': of_pos,
|
|
'season': season_year,
|
|
'balls_fielded': balls_fielded,
|
|
'total_assists': total_assists,
|
|
'home_throws': home_throws,
|
|
'batter_extra_outs': batter_extra_outs,
|
|
'assist_rate': round(assist_rate, 4),
|
|
'raw_score': round(raw_score, 2),
|
|
'z_score': round(z_score, 2),
|
|
'arm_rating': rating
|
|
})
|
|
|
|
# Create DataFrame
|
|
df_ratings = pd.DataFrame(player_data)
|
|
|
|
# Sort by arm rating (best first), then by player_id
|
|
df_ratings = df_ratings.sort_values(['arm_rating', 'player_id'])
|
|
|
|
# Create output directory if needed
|
|
output_path = Path(output_dir)
|
|
output_path.mkdir(exist_ok=True)
|
|
|
|
# Save to CSV
|
|
filename = f"retrosheet_arm_ratings_{season_year}.csv"
|
|
filepath = output_path / filename
|
|
df_ratings.to_csv(filepath, index=False)
|
|
|
|
logger.info(f"Saved {len(df_ratings)} player arm ratings to {filepath}")
|
|
logger.info(f"Rating distribution:")
|
|
for rating in sorted(df_ratings['arm_rating'].unique()):
|
|
count = (df_ratings['arm_rating'] == rating).sum()
|
|
logger.info(f" Rating {rating:+2d}: {count} players")
|
|
|
|
return str(filepath)
|
|
|
|
|
|
def load_arm_ratings_from_csv(
|
|
season_year: int,
|
|
input_dir: str = 'data-output'
|
|
) -> Dict[str, int]:
|
|
"""
|
|
Load arm ratings from previously saved CSV file.
|
|
|
|
Args:
|
|
season_year: Year of the season (e.g., 2005)
|
|
input_dir: Directory containing the CSV file (default: 'data-output')
|
|
|
|
Returns:
|
|
Dict mapping player_id to arm_rating
|
|
"""
|
|
filename = f"retrosheet_arm_ratings_{season_year}.csv"
|
|
filepath = Path(input_dir) / filename
|
|
|
|
if not filepath.exists():
|
|
logger.error(f"Arm ratings file not found: {filepath}")
|
|
raise FileNotFoundError(f"Arm ratings file not found: {filepath}")
|
|
|
|
df = pd.read_csv(filepath)
|
|
logger.info(f"Loaded {len(df)} player arm ratings from {filepath}")
|
|
|
|
# For players who played multiple OF positions, use the best (lowest) rating
|
|
arm_ratings = {}
|
|
for _, row in df.iterrows():
|
|
player_id = row['player_id']
|
|
rating = row['arm_rating']
|
|
|
|
if player_id not in arm_ratings:
|
|
arm_ratings[player_id] = rating
|
|
else:
|
|
# Keep the better (lower) rating
|
|
arm_ratings[player_id] = min(arm_ratings[player_id], rating)
|
|
|
|
return arm_ratings
|
|
|
|
|
|
# Example usage in retrosheet_data.py:
|
|
"""
|
|
# At the top of retrosheet_data.py, after loading events:
|
|
from defenders.retrosheet_arm_calculator import calculate_of_arms_from_retrosheet
|
|
|
|
# After loading retrosheet events
|
|
df_events = pd.read_csv(EVENTS_FILENAME)
|
|
retrosheet_arm_ratings = calculate_of_arms_from_retrosheet(df_events, SEASON_PCT)
|
|
|
|
# In create_positions(), replace the current arm_outfield() call:
|
|
# OLD:
|
|
# arm_rating = arm_outfield(of_arms)
|
|
|
|
# NEW:
|
|
from defenders.retrosheet_arm_calculator import get_arm_for_player
|
|
arm_rating = get_arm_for_player(retrosheet_arm_ratings, df_data['key_bbref'])
|
|
"""
|