paper-dynasty-card-creation/offense_col_resolver.py
Cal Corum db3822565c Add offense_col resolver for retrosheet pipeline to fix 883 silent KeyErrors
The FullCard migration requires offense_col and player_id on each player's
DataFrame row. The retrosheet pipeline calculates ratings before posting,
so both fields were missing — causing silent card layout builder failures.

Adds a three-tier resolution: CSV cache → API bulk fetch → deterministic
hash fallback. Also includes player_id fallback in both calcs modules.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 15:37:59 -06:00

103 lines
3.3 KiB
Python

"""Resolve offense_col for players in the retrosheet pipeline.
Three-tier resolution:
1. Cache hit → stored value from data-input/offense_col_cache.csv
2. API pre-fetch → bulk-fetch all MlbPlayers, merge new entries into cache
3. Hash fallback → deterministic hash(player_name) % 3 + 1
"""
import hashlib
import os
import pandas as pd
from db_calls import db_get
from exceptions import logger
CACHE_PATH = "data-input/offense_col_cache.csv"
def hash_offense_col(player_name: str) -> int:
"""Deterministic offense_col from player name. Returns 1, 2, or 3."""
normalized = player_name.strip().lower()
digest = hashlib.md5(normalized.encode()).hexdigest()
return int(digest, 16) % 3 + 1
def load_cache(path: str = CACHE_PATH) -> dict[str, int]:
"""Load {key_bbref: offense_col} from CSV cache."""
if not os.path.exists(path):
return {}
df = pd.read_csv(path, dtype={"key_bbref": str, "offense_col": int})
return dict(zip(df["key_bbref"], df["offense_col"]))
def save_cache(cache: dict[str, tuple[str, int]], path: str = CACHE_PATH):
"""Write cache to CSV. cache values are (player_name, offense_col)."""
rows = sorted(
[
{"key_bbref": k, "player_name": v[0], "offense_col": v[1]}
for k, v in cache.items()
],
key=lambda r: r["key_bbref"],
)
pd.DataFrame(rows).to_csv(path, index=False)
async def resolve_offense_cols(
df: pd.DataFrame, api_available: bool = True
) -> pd.DataFrame:
"""Add offense_col column to a stats DataFrame.
Args:
df: DataFrame with key_bbref, use_name, last_name columns.
api_available: If True, fetch from API to refresh cache.
Returns:
df with offense_col column added.
"""
cache = load_cache()
full_cache: dict[str, tuple[str, int]] = {}
# Seed full_cache from existing file cache
for bbref, oc in cache.items():
full_cache[bbref] = ("", oc)
# Refresh from API if available
if api_available:
try:
result = await db_get("mlbplayers")
if result and "players" in result:
api_count = 0
for p in result["players"]:
bbref = p.get("key_bbref")
oc = p.get("offense_col")
name = f'{p.get("first_name", "")} {p.get("last_name", "")}'.strip()
if bbref and oc:
full_cache[bbref] = (name, int(oc))
api_count += 1
logger.info(
f"offense_col_resolver: loaded {api_count} entries from API"
)
save_cache(full_cache)
except Exception as e:
logger.warning(
f"offense_col_resolver: API fetch failed, using cache only: {e}"
)
# Build lookup from full_cache
lookup = {k: v[1] for k, v in full_cache.items()}
# Resolve for each row
def resolve_row(row):
bbref = row.get("key_bbref", "")
if bbref in lookup:
return lookup[bbref]
name = f'{row.get("use_name", "")} {row.get("last_name", "")}'.strip()
oc = hash_offense_col(name)
logger.debug(f"offense_col_resolver: hash fallback for {name} ({bbref}) → {oc}")
return oc
df["offense_col"] = df.apply(resolve_row, axis=1)
return df