""" Retrosheet-based Outfield Arm Rating Calculator This module calculates outfield arm ratings using play-by-play event data from Retrosheet. It provides an alternative/supplement to Baseball Reference's bis_runs_outfield metric, with the advantage of being available for all historical seasons with Retrosheet data. Usage: from defenders.retrosheet_arm_calculator import calculate_of_arms_from_retrosheet arm_ratings = calculate_of_arms_from_retrosheet( df_events=retrosheet_events_df, season_pct=1.0 ) """ import pandas as pd import numpy as np from typing import Tuple, Dict, Optional from pathlib import Path from exceptions import logger def calculate_position_baselines(df_events: pd.DataFrame) -> Dict[str, Tuple[float, float]]: """ Calculate league-average and standard deviation for each OF position. These baselines are used to convert raw arm scores to z-scores, allowing for position-adjusted ratings. Args: df_events: DataFrame of retrosheet events for the season Returns: Dict mapping position to (mean, stddev) tuple """ baselines = {} for of_pos, a_col, po_col, fielder_col in [ ('LF', 'a7', 'po7', 'f7'), # f7 = fielder at position 7 (LF) ('CF', 'a8', 'po8', 'f8'), # f8 = fielder at position 8 (CF) ('RF', 'a9', 'po9', 'f9') # f9 = fielder at position 9 (RF) ]: fielders = df_events[fielder_col].dropna().unique() scores = [] for fielder in fielders: if fielder == '' or pd.isna(fielder): continue player_plays = df_events[df_events[fielder_col] == fielder] balls_fielded = player_plays[ (player_plays[po_col] > 0) | (player_plays[a_col] > 0) ].shape[0] if balls_fielded < 50: # Minimum sample continue # Calculate raw score for this player score = _calculate_raw_arm_score(player_plays, a_col, po_col) scores.append(score) if len(scores) > 0: baselines[of_pos] = (np.mean(scores), np.std(scores)) else: baselines[of_pos] = (0.0, 1.0) # Default if no qualified players logger.info(f"Position baselines calculated: {baselines}") return baselines def _calculate_raw_arm_score( player_plays: pd.DataFrame, a_col: str, po_col: str ) -> float: """ Calculate raw arm score for a player at a specific position. Formula: (assist_rate * 30) + (throwout_rate * 5) + (home_throws * 2) + (batter_extra_outs * 1.5) + (total_assists * 0.5) Args: player_plays: DataFrame of plays where player was at this position a_col: Assist column name (a7, a8, or a9) po_col: Putout column name (po7, po8, or po9) Returns: Raw arm score (before normalization) """ fielder_num = int(a_col[-1]) # Basic counting stats balls_fielded = player_plays[ (player_plays[po_col] > 0) | (player_plays[a_col] > 0) ].shape[0] total_assists = player_plays[player_plays[a_col] > 0].shape[0] # Throwouts (any assist that resulted in an out) throwouts = player_plays[ (player_plays[a_col] > 0) & ((player_plays['brout1'] == fielder_num) | (player_plays['brout2'] == fielder_num) | (player_plays['brout3'] == fielder_num) | (player_plays['brout_b'] == fielder_num)) ].shape[0] # High-value throws to home plate home_throws = player_plays[ (player_plays[a_col] > 0) & ((player_plays['brout1'] == fielder_num) | (player_plays['brout2'] == fielder_num) | (player_plays['brout3'] == fielder_num)) ].shape[0] # Batter thrown out trying for extra base batter_extra_outs = player_plays[ (player_plays[a_col] > 0) & (player_plays['brout_b'] == fielder_num) ].shape[0] # Calculate rates assist_rate = total_assists / balls_fielded if balls_fielded > 0 else 0 # Composite score (Rate-Dominant Formula) # Assists are already outs by definition, so no need for separate throwout_rate raw_score = ( (assist_rate * 300) + # PRIMARY: Assist rate dominates (home_throws * 1.0) + # Quality: home plate throws (batter_extra_outs * 1.0) + # Quality: preventing extra bases (total_assists * 0.1) # Minimal volume bonus ) return raw_score def _z_score_to_rating(z_score: float) -> int: """ Convert z-score to arm rating on -6 to +5 scale. Thresholds adjusted to match actual data distribution after formula changes. The 300x weight on assist_rate compressed the z-score spread, so thresholds are calibrated to ensure full -6 to +5 range is used. Args: z_score: Standardized arm score Returns: Arm rating from -6 (elite) to +5 (very weak) """ if z_score > 2.5: return -6 # Elite (top ~1%) elif z_score > 2.0: return -5 # Outstanding (top ~2%) elif z_score > 1.5: return -4 # Excellent (top ~3%) elif z_score > 1.0: return -3 # Very Good (top ~5%) elif z_score > 0.5: return -2 # Above Average (top ~15%) elif z_score > 0.0: return -1 # Slightly Above elif z_score > -0.15: return 0 # Average (target 45-50%) elif z_score > -0.5: return 1 # Slightly Below (expanded) elif z_score > -0.9: return 2 # Below Average elif z_score > -1.3: return 3 # Poor elif z_score > -1.6: return 4 # Very Poor else: return 5 # Very Weak def calculate_player_arm_rating( df_events: pd.DataFrame, player_bbref_id: str, baselines: Dict[str, Tuple[float, float]], season_pct: float = 1.0 ) -> int: """ Calculate arm rating for a specific player using Retrosheet data. Args: df_events: DataFrame of retrosheet events for the season player_bbref_id: Player's baseball-reference ID (key_bbref) baselines: Position baselines from calculate_position_baselines() season_pct: Percentage of season completed (for sample size adjustment) Returns: Arm rating from -6 (elite) to +5 (very weak), or 0 if insufficient data """ z_scores = [] for of_pos, a_col, po_col, fielder_col in [ ('LF', 'a7', 'po7', 'f7'), # f7 = fielder at position 7 (LF) ('CF', 'a8', 'po8', 'f8'), # f8 = fielder at position 8 (CF) ('RF', 'a9', 'po9', 'f9') # f9 = fielder at position 9 (RF) ]: # Get all plays at this position for this player player_plays = df_events[df_events[fielder_col] == player_bbref_id] if len(player_plays) == 0: continue # Check minimum sample size balls_fielded = player_plays[ (player_plays[po_col] > 0) | (player_plays[a_col] > 0) ].shape[0] min_sample = int(50 * season_pct) if balls_fielded < min_sample: logger.debug( f"{player_bbref_id} at {of_pos}: {balls_fielded} balls fielded " f"(< {min_sample} minimum)" ) continue # Calculate raw score raw_score = _calculate_raw_arm_score(player_plays, a_col, po_col) # Convert to z-score using position baselines pos_mean, pos_std = baselines.get(of_pos, (0.0, 1.0)) if pos_std > 0: z_score = (raw_score - pos_mean) / pos_std else: z_score = 0.0 logger.debug( f"{player_bbref_id} at {of_pos}: raw={raw_score:.2f}, " f"z={z_score:.2f}, balls={balls_fielded}" ) z_scores.append(z_score) if not z_scores: logger.debug(f"{player_bbref_id}: No qualifying OF positions, returning 0") return 0 # No data or insufficient sample # Use maximum z-score (best arm showing) max_z = max(z_scores) rating = _z_score_to_rating(max_z) logger.info( f"{player_bbref_id}: max_z={max_z:.2f}, arm_rating={rating}" ) return rating def calculate_of_arms_from_retrosheet( df_events: pd.DataFrame, season_pct: float = 1.0 ) -> Dict[str, int]: """ Calculate arm ratings for all qualifying outfielders in the dataset. This is the main entry point for batch calculation of arm ratings. Args: df_events: DataFrame of retrosheet events for the season season_pct: Percentage of season completed (default 1.0 for full season) Returns: Dict mapping player key_bbref to arm rating """ logger.info("Calculating position baselines...") baselines = calculate_position_baselines(df_events) logger.info("Calculating individual player arm ratings...") arm_ratings = {} # Get all unique outfielders from fielder columns (not lineup) all_fielders = set() for col in ['f7', 'f8', 'f9']: # f7/f8/f9 = actual fielders, not lineup spots all_fielders.update(df_events[col].dropna().unique()) all_fielders.discard('') # Remove empty strings logger.info(f"Found {len(all_fielders)} unique outfielders") for player_id in all_fielders: if pd.isna(player_id) or player_id == '': continue rating = calculate_player_arm_rating( df_events, player_id, baselines, season_pct ) arm_ratings[player_id] = rating # Log summary stats ratings_dist = pd.Series(list(arm_ratings.values())).value_counts().sort_index() logger.info(f"Arm ratings distribution:\n{ratings_dist}") return arm_ratings def get_arm_for_player( arm_ratings: Dict[str, int], player_bbref_id: str, default: int = 0 ) -> int: """ Lookup arm rating for a player, with default fallback. Args: arm_ratings: Dict from calculate_of_arms_from_retrosheet() player_bbref_id: Player's key_bbref default: Default rating if player not found (default: 0 = average) Returns: Arm rating for the player """ return arm_ratings.get(player_bbref_id, default) def save_arm_ratings_to_csv( df_events: pd.DataFrame, season_year: int, output_dir: str = 'data-output', season_pct: float = 1.0 ) -> str: """ Calculate and save arm ratings to CSV file for future reference. Args: df_events: DataFrame of retrosheet events season_year: Year of the season (e.g., 2005) output_dir: Directory to save output file (default: 'data-output') season_pct: Percentage of season completed (default: 1.0) Returns: Path to the saved CSV file """ logger.info(f"Calculating arm ratings for {season_year} season...") # Calculate position baselines baselines = calculate_position_baselines(df_events) # Collect detailed stats for each player player_data = [] for of_pos, a_col, po_col, fielder_col in [ ('LF', 'a7', 'po7', 'f7'), ('CF', 'a8', 'po8', 'f8'), ('RF', 'a9', 'po9', 'f9') ]: fielders = df_events[fielder_col].dropna().unique() for fielder in fielders: if fielder == '' or pd.isna(fielder): continue player_plays = df_events[df_events[fielder_col] == fielder] fielder_num = int(a_col[-1]) # Check minimum sample size balls_fielded = player_plays[ (player_plays[po_col] > 0) | (player_plays[a_col] > 0) ].shape[0] min_sample = int(50 * season_pct) if balls_fielded < min_sample: continue # Calculate stats total_assists = player_plays[player_plays[a_col] > 0].shape[0] home_throws = player_plays[ (player_plays[a_col] > 0) & ((player_plays['brout1'] == fielder_num) | (player_plays['brout2'] == fielder_num) | (player_plays['brout3'] == fielder_num)) ].shape[0] batter_extra_outs = player_plays[ (player_plays[a_col] > 0) & (player_plays['brout_b'] == fielder_num) ].shape[0] assist_rate = total_assists / balls_fielded if balls_fielded > 0 else 0 # Calculate raw score and rating raw_score = _calculate_raw_arm_score(player_plays, a_col, po_col) pos_mean, pos_std = baselines[of_pos] z_score = (raw_score - pos_mean) / pos_std if pos_std > 0 else 0 rating = _z_score_to_rating(z_score) player_data.append({ 'player_id': fielder, 'position': of_pos, 'season': season_year, 'balls_fielded': balls_fielded, 'total_assists': total_assists, 'home_throws': home_throws, 'batter_extra_outs': batter_extra_outs, 'assist_rate': round(assist_rate, 4), 'raw_score': round(raw_score, 2), 'z_score': round(z_score, 2), 'arm_rating': rating }) # Create DataFrame df_ratings = pd.DataFrame(player_data) # Sort by arm rating (best first), then by player_id df_ratings = df_ratings.sort_values(['arm_rating', 'player_id']) # Create output directory if needed output_path = Path(output_dir) output_path.mkdir(exist_ok=True) # Save to CSV filename = f"retrosheet_arm_ratings_{season_year}.csv" filepath = output_path / filename df_ratings.to_csv(filepath, index=False) logger.info(f"Saved {len(df_ratings)} player arm ratings to {filepath}") logger.info(f"Rating distribution:") for rating in sorted(df_ratings['arm_rating'].unique()): count = (df_ratings['arm_rating'] == rating).sum() logger.info(f" Rating {rating:+2d}: {count} players") return str(filepath) def load_arm_ratings_from_csv( season_year: int, input_dir: str = 'data-output' ) -> Dict[str, int]: """ Load arm ratings from previously saved CSV file. Args: season_year: Year of the season (e.g., 2005) input_dir: Directory containing the CSV file (default: 'data-output') Returns: Dict mapping player_id to arm_rating """ filename = f"retrosheet_arm_ratings_{season_year}.csv" filepath = Path(input_dir) / filename if not filepath.exists(): logger.error(f"Arm ratings file not found: {filepath}") raise FileNotFoundError(f"Arm ratings file not found: {filepath}") df = pd.read_csv(filepath) logger.info(f"Loaded {len(df)} player arm ratings from {filepath}") # For players who played multiple OF positions, use the best (lowest) rating arm_ratings = {} for _, row in df.iterrows(): player_id = row['player_id'] rating = row['arm_rating'] if player_id not in arm_ratings: arm_ratings[player_id] = rating else: # Keep the better (lower) rating arm_ratings[player_id] = min(arm_ratings[player_id], rating) return arm_ratings # Example usage in retrosheet_data.py: """ # At the top of retrosheet_data.py, after loading events: from defenders.retrosheet_arm_calculator import calculate_of_arms_from_retrosheet # After loading retrosheet events df_events = pd.read_csv(EVENTS_FILENAME) retrosheet_arm_ratings = calculate_of_arms_from_retrosheet(df_events, SEASON_PCT) # In create_positions(), replace the current arm_outfield() call: # OLD: # arm_rating = arm_outfield(of_arms) # NEW: from defenders.retrosheet_arm_calculator import get_arm_for_player arm_rating = get_arm_for_player(retrosheet_arm_ratings, df_data['key_bbref']) """