"""Resolve offense_col for players in the retrosheet pipeline. Three-tier resolution: 1. Cache hit → stored value from data-input/offense_col_cache.csv 2. API pre-fetch → bulk-fetch all MlbPlayers, merge new entries into cache 3. Hash fallback → deterministic hash(player_name) % 3 + 1 """ import hashlib import os import pandas as pd from db_calls import db_get from exceptions import logger CACHE_PATH = "data-input/offense_col_cache.csv" def hash_offense_col(player_name: str) -> int: """Deterministic offense_col from player name. Returns 1, 2, or 3.""" normalized = player_name.strip().lower() digest = hashlib.md5(normalized.encode()).hexdigest() return int(digest, 16) % 3 + 1 def load_cache(path: str = CACHE_PATH) -> dict[str, int]: """Load {key_bbref: offense_col} from CSV cache.""" if not os.path.exists(path): return {} df = pd.read_csv(path, dtype={"key_bbref": str, "offense_col": int}) return dict(zip(df["key_bbref"], df["offense_col"])) def save_cache(cache: dict[str, tuple[str, int]], path: str = CACHE_PATH): """Write cache to CSV. cache values are (player_name, offense_col).""" rows = sorted( [ {"key_bbref": k, "player_name": v[0], "offense_col": v[1]} for k, v in cache.items() ], key=lambda r: r["key_bbref"], ) pd.DataFrame(rows).to_csv(path, index=False) async def resolve_offense_cols( df: pd.DataFrame, api_available: bool = True ) -> pd.DataFrame: """Add offense_col column to a stats DataFrame. Args: df: DataFrame with key_bbref, use_name, last_name columns. api_available: If True, fetch from API to refresh cache. Returns: df with offense_col column added. """ cache = load_cache() full_cache: dict[str, tuple[str, int]] = {} # Seed full_cache from existing file cache for bbref, oc in cache.items(): full_cache[bbref] = ("", oc) # Refresh from API if available if api_available: try: result = await db_get("mlbplayers") if result and "players" in result: api_count = 0 for p in result["players"]: bbref = p.get("key_bbref") oc = p.get("offense_col") name = f'{p.get("first_name", "")} {p.get("last_name", "")}'.strip() if bbref and oc: full_cache[bbref] = (name, int(oc)) api_count += 1 logger.info( f"offense_col_resolver: loaded {api_count} entries from API" ) save_cache(full_cache) except Exception as e: logger.warning( f"offense_col_resolver: API fetch failed, using cache only: {e}" ) # Build lookup from full_cache lookup = {k: v[1] for k, v in full_cache.items()} # Resolve for each row def resolve_row(row): bbref = row.get("key_bbref", "") if bbref in lookup: return lookup[bbref] name = f'{row.get("use_name", "")} {row.get("last_name", "")}'.strip() oc = hash_offense_col(name) logger.debug(f"offense_col_resolver: hash fallback for {name} ({bbref}) → {oc}") return oc df["offense_col"] = df.apply(resolve_row, axis=1) return df