""" season_stats.py — Incremental PlayerSeasonStats update logic. Called once per completed StratGame to accumulate batting and pitching statistics into the player_season_stats table. Idempotency limitation: re-delivery of a game is detected by checking whether any PlayerSeasonStats row still carries that game_id as last_game. This guard only works if no later game has been processed for the same players — if game G+1 is processed first, a re-delivery of game G will bypass the guard and double-count stats. A persistent processed-game ledger is needed for full idempotency across out-of-order re-delivery (see issue #105). Peewee upsert strategy: - SQLite: on_conflict_replace() — simplest path, deletes + re-inserts - PostgreSQL: on_conflict() with EXCLUDED — true atomic increment via SQL """ import logging import os from collections import defaultdict from datetime import datetime from peewee import EXCLUDED from app.db_engine import ( db, Decision, PlayerSeasonStats, StratGame, StratPlay, ) logger = logging.getLogger(__name__) DATABASE_TYPE = os.environ.get("DATABASE_TYPE", "sqlite").lower() def _build_batting_groups(plays): """ Aggregate per-play batting stats by (batter_id, batter_team_id). Only plays where pa > 0 are counted toward games_batting, but all play-level stat fields are accumulated regardless of pa value so that rare edge cases (e.g. sac bunt without official PA) are correctly included in the totals. Returns a dict keyed by (batter_id, batter_team_id) with stat dicts. """ groups = defaultdict( lambda: { "games_batting": 0, "pa": 0, "ab": 0, "hits": 0, "doubles": 0, "triples": 0, "hr": 0, "bb": 0, "hbp": 0, "so": 0, "rbi": 0, "runs": 0, "sb": 0, "cs": 0, "appeared": False, # tracks whether batter appeared at all in this game } ) for play in plays: batter_id = play.batter_id batter_team_id = play.batter_team_id if batter_id is None: continue key = (batter_id, batter_team_id) g = groups[key] g["pa"] += play.pa g["ab"] += play.ab g["hits"] += play.hit g["doubles"] += play.double g["triples"] += play.triple g["hr"] += play.homerun g["bb"] += play.bb g["hbp"] += play.hbp g["so"] += play.so g["rbi"] += play.rbi g["runs"] += play.run g["sb"] += play.sb g["cs"] += play.cs if play.pa > 0 and not g["appeared"]: g["games_batting"] = 1 g["appeared"] = True # Clean up the helper flag before returning for key in groups: del groups[key]["appeared"] return groups def _build_pitching_groups(plays): """ Aggregate per-play pitching stats by (pitcher_id, pitcher_team_id). Stats on StratPlay are recorded from the batter's perspective, so when accumulating pitcher stats we collect: - outs → pitcher outs recorded (directly on play) - so → strikeouts (batter's so = pitcher's k) - hit → hits allowed - bb+hbp → base-on-balls allowed - homerun → home runs allowed games_pitching counts unique pitchers who appeared (at least one play as pitcher), capped at 1 per game since this function processes a single game. Returns a dict keyed by (pitcher_id, pitcher_team_id) with stat dicts. """ groups = defaultdict( lambda: { "games_pitching": 1, # pitcher appeared in this game by definition "outs": 0, "k": 0, "hits_allowed": 0, "bb_allowed": 0, "hr_allowed": 0, # Decision stats added later "wins": 0, "losses": 0, "saves": 0, "holds": 0, "blown_saves": 0, } ) for play in plays: pitcher_id = play.pitcher_id pitcher_team_id = play.pitcher_team_id key = (pitcher_id, pitcher_team_id) g = groups[key] g["outs"] += play.outs g["k"] += play.so g["hits_allowed"] += play.hit g["bb_allowed"] += play.bb + play.hbp g["hr_allowed"] += play.homerun return groups def _apply_decisions(pitching_groups, decisions): """ Merge Decision rows into the pitching stat groups. Each Decision belongs to exactly one pitcher in the game, containing win/loss/save/hold/blown-save flags and the is_start indicator. """ for decision in decisions: pitcher_id = decision.pitcher_id pitcher_team_id = decision.pitcher_team_id key = (pitcher_id, pitcher_team_id) # Pitcher may have a Decision without plays (rare edge case for # games where the Decision was recorded without StratPlay rows). # Initialise a zeroed entry if not already present. if key not in pitching_groups: pitching_groups[key] = { "games_pitching": 1, "outs": 0, "k": 0, "hits_allowed": 0, "bb_allowed": 0, "hr_allowed": 0, "wins": 0, "losses": 0, "saves": 0, "holds": 0, "blown_saves": 0, } g = pitching_groups[key] g["wins"] += decision.win g["losses"] += decision.loss g["saves"] += decision.is_save g["holds"] += decision.hold g["blown_saves"] += decision.b_save def _upsert_postgres(player_id, team_id, season, game_id, batting, pitching): """ PostgreSQL upsert using ON CONFLICT ... DO UPDATE with column-level increments. Each stat column is incremented by the value from the EXCLUDED (incoming) row, ensuring concurrent games don't overwrite each other. """ now = datetime.now() row = { "player_id": player_id, "team_id": team_id, "season": season, "games_batting": batting.get("games_batting", 0), "pa": batting.get("pa", 0), "ab": batting.get("ab", 0), "hits": batting.get("hits", 0), "doubles": batting.get("doubles", 0), "triples": batting.get("triples", 0), "hr": batting.get("hr", 0), "bb": batting.get("bb", 0), "hbp": batting.get("hbp", 0), "so": batting.get("so", 0), "rbi": batting.get("rbi", 0), "runs": batting.get("runs", 0), "sb": batting.get("sb", 0), "cs": batting.get("cs", 0), "games_pitching": pitching.get("games_pitching", 0), "outs": pitching.get("outs", 0), "k": pitching.get("k", 0), "hits_allowed": pitching.get("hits_allowed", 0), "bb_allowed": pitching.get("bb_allowed", 0), "hr_allowed": pitching.get("hr_allowed", 0), "wins": pitching.get("wins", 0), "losses": pitching.get("losses", 0), "saves": pitching.get("saves", 0), "holds": pitching.get("holds", 0), "blown_saves": pitching.get("blown_saves", 0), "last_game_id": game_id, "last_updated_at": now, } # Incrementable stat columns (all batting + pitching accumulators) increment_cols = [ "games_batting", "pa", "ab", "hits", "doubles", "triples", "hr", "bb", "hbp", "so", "rbi", "runs", "sb", "cs", "games_pitching", "outs", "k", "hits_allowed", "bb_allowed", "hr_allowed", "wins", "losses", "saves", "holds", "blown_saves", ] # Build the conflict-target field objects conflict_target = [ PlayerSeasonStats.player, PlayerSeasonStats.team, PlayerSeasonStats.season, ] # Build the update dict: increment accumulators, overwrite metadata update_dict = {} for col in increment_cols: field_obj = getattr(PlayerSeasonStats, col) update_dict[field_obj] = field_obj + EXCLUDED[col] update_dict[PlayerSeasonStats.last_game] = EXCLUDED["last_game_id"] update_dict[PlayerSeasonStats.last_updated_at] = EXCLUDED["last_updated_at"] PlayerSeasonStats.insert( player=player_id, team=team_id, season=season, games_batting=row["games_batting"], pa=row["pa"], ab=row["ab"], hits=row["hits"], doubles=row["doubles"], triples=row["triples"], hr=row["hr"], bb=row["bb"], hbp=row["hbp"], so=row["so"], rbi=row["rbi"], runs=row["runs"], sb=row["sb"], cs=row["cs"], games_pitching=row["games_pitching"], outs=row["outs"], k=row["k"], hits_allowed=row["hits_allowed"], bb_allowed=row["bb_allowed"], hr_allowed=row["hr_allowed"], wins=row["wins"], losses=row["losses"], saves=row["saves"], holds=row["holds"], blown_saves=row["blown_saves"], last_game=game_id, last_updated_at=now, ).on_conflict( conflict_target=conflict_target, action="update", update=update_dict, ).execute() def _upsert_sqlite(player_id, team_id, season, game_id, batting, pitching): """ SQLite upsert: read-modify-write inside the outer atomic() block. SQLite doesn't support EXCLUDED-based increments via Peewee's on_conflict(), so we use get_or_create + field-level addition. This is safe because the entire update_season_stats() call is wrapped in db.atomic(). """ now = datetime.now() obj, _ = PlayerSeasonStats.get_or_create( player_id=player_id, team_id=team_id, season=season, ) obj.games_batting += batting.get("games_batting", 0) obj.pa += batting.get("pa", 0) obj.ab += batting.get("ab", 0) obj.hits += batting.get("hits", 0) obj.doubles += batting.get("doubles", 0) obj.triples += batting.get("triples", 0) obj.hr += batting.get("hr", 0) obj.bb += batting.get("bb", 0) obj.hbp += batting.get("hbp", 0) obj.so += batting.get("so", 0) obj.rbi += batting.get("rbi", 0) obj.runs += batting.get("runs", 0) obj.sb += batting.get("sb", 0) obj.cs += batting.get("cs", 0) obj.games_pitching += pitching.get("games_pitching", 0) obj.outs += pitching.get("outs", 0) obj.k += pitching.get("k", 0) obj.hits_allowed += pitching.get("hits_allowed", 0) obj.bb_allowed += pitching.get("bb_allowed", 0) obj.hr_allowed += pitching.get("hr_allowed", 0) obj.wins += pitching.get("wins", 0) obj.losses += pitching.get("losses", 0) obj.saves += pitching.get("saves", 0) obj.holds += pitching.get("holds", 0) obj.blown_saves += pitching.get("blown_saves", 0) obj.last_game_id = game_id obj.last_updated_at = now obj.save() def update_season_stats(game_id: int) -> dict: """ Accumulate per-game batting and pitching stats into PlayerSeasonStats. This function is safe to call exactly once per game. If called again for the same game_id while it is still the most-recently-processed game for at least one affected player (detected by checking last_game FK), it returns early without modifying any data. Limitation: the guard only detects re-delivery if no later game has been processed for the same players. Out-of-order re-delivery (e.g. game G re-delivered after game G+1 was already processed) will not be caught and will silently double-count stats. See issue #105 for the planned ProcessedGame ledger fix. Algorithm: 1. Fetch StratGame to get the season. 2. Guard against re-processing via last_game_id check. 3. Collect all StratPlay rows for the game. 4. Group batting stats by (batter_id, batter_team_id). 5. Group pitching stats by (pitcher_id, pitcher_team_id). 6. Merge Decision rows into pitching groups. 7. Upsert each player's contribution using either: - PostgreSQL: atomic SQL increment via ON CONFLICT DO UPDATE - SQLite: read-modify-write inside a transaction Args: game_id: Primary key of the StratGame to process. Returns: Summary dict with keys: game_id, season, batters_updated, pitchers_updated. If the game was already processed, also includes "skipped": True. Raises: StratGame.DoesNotExist: If no StratGame row matches game_id. """ logger.info("update_season_stats: starting for game_id=%d", game_id) # Step 1 — Fetch the game to get season game = StratGame.get_by_id(game_id) season = game.season with db.atomic(): # Step 2 — Double-count prevention: check if any row still # carries this game_id as last_game. Note: only detects replay # of the most-recently-processed game; out-of-order re-delivery # bypasses this guard (see issue #105). already_processed = ( PlayerSeasonStats.select() .where(PlayerSeasonStats.last_game == game_id) .exists() ) if already_processed: logger.info( "update_season_stats: game_id=%d already processed, skipping", game_id, ) return { "game_id": game_id, "season": season, "batters_updated": 0, "pitchers_updated": 0, "skipped": True, } # Step 3 — Load plays plays = list(StratPlay.select().where(StratPlay.game == game_id)) logger.debug( "update_season_stats: game_id=%d loaded %d plays", game_id, len(plays) ) # Steps 4 & 5 — Aggregate batting and pitching groups batting_groups = _build_batting_groups(plays) pitching_groups = _build_pitching_groups(plays) # Step 6 — Merge Decision rows into pitching groups decisions = list(Decision.select().where(Decision.game == game_id)) _apply_decisions(pitching_groups, decisions) # Collect all unique player keys across both perspectives. # A two-way player (batter who also pitched, or vice-versa) gets # a single combined row in PlayerSeasonStats. all_keys = set(batting_groups.keys()) | set(pitching_groups.keys()) batters_updated = 0 pitchers_updated = 0 upsert_fn = ( _upsert_postgres if DATABASE_TYPE == "postgresql" else _upsert_sqlite ) for player_id, team_id in all_keys: batting = batting_groups.get((player_id, team_id), {}) pitching = pitching_groups.get((player_id, team_id), {}) upsert_fn(player_id, team_id, season, game_id, batting, pitching) if batting: batters_updated += 1 if pitching: pitchers_updated += 1 logger.info( "update_season_stats: game_id=%d complete — " "batters_updated=%d pitchers_updated=%d", game_id, batters_updated, pitchers_updated, ) return { "game_id": game_id, "season": season, "batters_updated": batters_updated, "pitchers_updated": pitchers_updated, }