paper-dynasty-database/app/services/season_stats.py
Cal Corum c935c50a96 feat: add ProcessedGame ledger for full idempotency in update_season_stats() (#105)
Closes #105

Replace the last_game FK guard in update_season_stats() with an atomic
INSERT into a new processed_game ledger table. The old guard only blocked
same-game immediate replay; it was silently bypassed if game G+1 was
processed first (last_game already overwritten). The ledger is keyed on
game_id so any re-delivery — including out-of-order — is caught reliably.

Changes:
- app/db_engine.py: add ProcessedGame model (game FK PK + processed_at)
- app/services/season_stats.py: replace last_game check with
  ProcessedGame.get_or_create(); import ProcessedGame; update docstrings
- migrations/2026-03-18_add_processed_game.sql: CREATE TABLE IF NOT EXISTS
  processed_game with FK to stratgame ON DELETE CASCADE
- tests/conftest.py: add ProcessedGame to imports and _TEST_MODELS list
- tests/test_season_stats_update.py: add test_out_of_order_replay_prevented;
  update test_double_count_prevention docstring

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-18 01:05:31 -05:00

555 lines
18 KiB
Python

"""
season_stats.py — Incremental BattingSeasonStats and PitchingSeasonStats update logic.
Called once per completed StratGame to accumulate batting and pitching
statistics into the batting_season_stats and pitching_season_stats tables
respectively.
Idempotency: re-delivery of a game (including out-of-order re-delivery)
is detected via an atomic INSERT into the ProcessedGame ledger table
keyed on game_id. The first call for a given game_id succeeds; all
subsequent calls return early with "skipped": True without modifying
any stats rows.
Peewee upsert strategy:
- SQLite: read-modify-write inside db.atomic() transaction
- PostgreSQL: ON CONFLICT ... DO UPDATE with column-level EXCLUDED increments
"""
import logging
import os
from collections import defaultdict
from datetime import datetime
from peewee import EXCLUDED
from app.db_engine import (
db,
BattingSeasonStats,
Decision,
PitchingSeasonStats,
ProcessedGame,
StratGame,
StratPlay,
)
logger = logging.getLogger(__name__)
DATABASE_TYPE = os.environ.get("DATABASE_TYPE", "sqlite").lower()
def _build_batting_groups(plays):
"""
Aggregate per-play batting stats by (batter_id, batter_team_id).
Only plays where pa > 0 are counted toward games, but all
play-level stat fields are accumulated regardless of pa value so
that rare edge cases (e.g. sac bunt without official PA) are
correctly included in the totals.
Returns a dict keyed by (batter_id, batter_team_id) with stat dicts
matching BattingSeasonStats column names.
"""
groups = defaultdict(
lambda: {
"games": 0,
"pa": 0,
"ab": 0,
"hits": 0,
"doubles": 0,
"triples": 0,
"hr": 0,
"rbi": 0,
"runs": 0,
"bb": 0,
"strikeouts": 0,
"hbp": 0,
"sac": 0,
"ibb": 0,
"gidp": 0,
"sb": 0,
"cs": 0,
"appeared": False, # tracks whether batter appeared at all in this game
}
)
for play in plays:
batter_id = play.batter_id
batter_team_id = play.batter_team_id
if batter_id is None:
continue
key = (batter_id, batter_team_id)
g = groups[key]
g["pa"] += play.pa
g["ab"] += play.ab
g["hits"] += play.hit
g["doubles"] += play.double
g["triples"] += play.triple
g["hr"] += play.homerun
g["rbi"] += play.rbi
g["runs"] += play.run
g["bb"] += play.bb
g["strikeouts"] += play.so
g["hbp"] += play.hbp
g["sac"] += play.sac
g["ibb"] += play.ibb
g["gidp"] += play.gidp
g["sb"] += play.sb
g["cs"] += play.cs
if play.pa > 0 and not g["appeared"]:
g["games"] = 1
g["appeared"] = True
# Clean up the helper flag before returning
for key in groups:
del groups[key]["appeared"]
return groups
def _build_pitching_groups(plays):
"""
Aggregate per-play pitching stats by (pitcher_id, pitcher_team_id).
Stats on StratPlay are recorded from the batter's perspective, so
when accumulating pitcher stats we collect:
- outs → pitcher outs recorded (directly on play)
- so → strikeouts (batter's so = pitcher's strikeouts)
- hit → hits allowed
- bb → walks allowed (batter bb, separate from hbp)
- hbp → hit batters
- homerun → home runs allowed
games counts unique pitchers who appeared (at least one play as
pitcher), capped at 1 per game since this function processes a
single game. games_started is populated later via _apply_decisions().
Fields not available from StratPlay (runs_allowed, earned_runs,
wild_pitches, balks) default to 0 and are not incremented.
Returns a dict keyed by (pitcher_id, pitcher_team_id) with stat dicts
matching PitchingSeasonStats column names.
"""
groups = defaultdict(
lambda: {
"games": 1, # pitcher appeared in this game by definition
"games_started": 0, # populated later via _apply_decisions
"outs": 0,
"strikeouts": 0,
"bb": 0,
"hits_allowed": 0,
"runs_allowed": 0, # not available from StratPlay
"earned_runs": 0, # not available from StratPlay
"hr_allowed": 0,
"hbp": 0,
"wild_pitches": 0, # not available from StratPlay
"balks": 0, # not available from StratPlay
"wins": 0,
"losses": 0,
"holds": 0,
"saves": 0,
"blown_saves": 0,
}
)
for play in plays:
pitcher_id = play.pitcher_id
pitcher_team_id = play.pitcher_team_id
key = (pitcher_id, pitcher_team_id)
g = groups[key]
g["outs"] += play.outs
g["strikeouts"] += play.so
g["hits_allowed"] += play.hit
g["bb"] += play.bb
g["hbp"] += play.hbp
g["hr_allowed"] += play.homerun
return groups
def _apply_decisions(pitching_groups, decisions):
"""
Merge Decision rows into the pitching stat groups.
Each Decision belongs to exactly one pitcher in the game, containing
win/loss/save/hold/blown-save flags and the is_start indicator.
"""
for decision in decisions:
pitcher_id = decision.pitcher_id
pitcher_team_id = decision.pitcher_team_id
key = (pitcher_id, pitcher_team_id)
# Pitcher may have a Decision without plays (rare edge case for
# games where the Decision was recorded without StratPlay rows).
# Initialise a zeroed entry if not already present.
if key not in pitching_groups:
pitching_groups[key] = {
"games": 1,
"games_started": 0,
"outs": 0,
"strikeouts": 0,
"bb": 0,
"hits_allowed": 0,
"runs_allowed": 0,
"earned_runs": 0,
"hr_allowed": 0,
"hbp": 0,
"wild_pitches": 0,
"balks": 0,
"wins": 0,
"losses": 0,
"holds": 0,
"saves": 0,
"blown_saves": 0,
}
g = pitching_groups[key]
g["wins"] += decision.win
g["losses"] += decision.loss
g["saves"] += decision.is_save
g["holds"] += decision.hold
g["blown_saves"] += decision.b_save
g["games_started"] += 1 if decision.is_start else 0
def _upsert_batting_postgres(player_id, team_id, season, game_id, batting):
"""
PostgreSQL upsert for BattingSeasonStats using ON CONFLICT ... DO UPDATE.
Each stat column is incremented by the EXCLUDED (incoming) value,
ensuring concurrent games don't overwrite each other.
"""
now = datetime.now()
increment_cols = [
"games",
"pa",
"ab",
"hits",
"doubles",
"triples",
"hr",
"rbi",
"runs",
"bb",
"strikeouts",
"hbp",
"sac",
"ibb",
"gidp",
"sb",
"cs",
]
conflict_target = [
BattingSeasonStats.player,
BattingSeasonStats.team,
BattingSeasonStats.season,
]
update_dict = {}
for col in increment_cols:
field_obj = getattr(BattingSeasonStats, col)
update_dict[field_obj] = field_obj + EXCLUDED[col]
update_dict[BattingSeasonStats.last_game] = EXCLUDED["last_game_id"]
update_dict[BattingSeasonStats.last_updated_at] = EXCLUDED["last_updated_at"]
BattingSeasonStats.insert(
player=player_id,
team=team_id,
season=season,
games=batting.get("games", 0),
pa=batting.get("pa", 0),
ab=batting.get("ab", 0),
hits=batting.get("hits", 0),
doubles=batting.get("doubles", 0),
triples=batting.get("triples", 0),
hr=batting.get("hr", 0),
rbi=batting.get("rbi", 0),
runs=batting.get("runs", 0),
bb=batting.get("bb", 0),
strikeouts=batting.get("strikeouts", 0),
hbp=batting.get("hbp", 0),
sac=batting.get("sac", 0),
ibb=batting.get("ibb", 0),
gidp=batting.get("gidp", 0),
sb=batting.get("sb", 0),
cs=batting.get("cs", 0),
last_game=game_id,
last_updated_at=now,
).on_conflict(
conflict_target=conflict_target,
action="update",
update=update_dict,
).execute()
def _upsert_pitching_postgres(player_id, team_id, season, game_id, pitching):
"""
PostgreSQL upsert for PitchingSeasonStats using ON CONFLICT ... DO UPDATE.
Each stat column is incremented by the EXCLUDED (incoming) value,
ensuring concurrent games don't overwrite each other.
"""
now = datetime.now()
increment_cols = [
"games",
"games_started",
"outs",
"strikeouts",
"bb",
"hits_allowed",
"runs_allowed",
"earned_runs",
"hr_allowed",
"hbp",
"wild_pitches",
"balks",
"wins",
"losses",
"holds",
"saves",
"blown_saves",
]
conflict_target = [
PitchingSeasonStats.player,
PitchingSeasonStats.team,
PitchingSeasonStats.season,
]
update_dict = {}
for col in increment_cols:
field_obj = getattr(PitchingSeasonStats, col)
update_dict[field_obj] = field_obj + EXCLUDED[col]
update_dict[PitchingSeasonStats.last_game] = EXCLUDED["last_game_id"]
update_dict[PitchingSeasonStats.last_updated_at] = EXCLUDED["last_updated_at"]
PitchingSeasonStats.insert(
player=player_id,
team=team_id,
season=season,
games=pitching.get("games", 0),
games_started=pitching.get("games_started", 0),
outs=pitching.get("outs", 0),
strikeouts=pitching.get("strikeouts", 0),
bb=pitching.get("bb", 0),
hits_allowed=pitching.get("hits_allowed", 0),
runs_allowed=pitching.get("runs_allowed", 0),
earned_runs=pitching.get("earned_runs", 0),
hr_allowed=pitching.get("hr_allowed", 0),
hbp=pitching.get("hbp", 0),
wild_pitches=pitching.get("wild_pitches", 0),
balks=pitching.get("balks", 0),
wins=pitching.get("wins", 0),
losses=pitching.get("losses", 0),
holds=pitching.get("holds", 0),
saves=pitching.get("saves", 0),
blown_saves=pitching.get("blown_saves", 0),
last_game=game_id,
last_updated_at=now,
).on_conflict(
conflict_target=conflict_target,
action="update",
update=update_dict,
).execute()
def _upsert_batting_sqlite(player_id, team_id, season, game_id, batting):
"""
SQLite upsert for BattingSeasonStats: read-modify-write inside the outer atomic() block.
SQLite doesn't support EXCLUDED-based increments via Peewee's
on_conflict(), so we use get_or_create + field-level addition.
This is safe because the entire update_season_stats() call is
wrapped in db.atomic().
"""
now = datetime.now()
obj, _ = BattingSeasonStats.get_or_create(
player_id=player_id,
team_id=team_id,
season=season,
)
obj.games += batting.get("games", 0)
obj.pa += batting.get("pa", 0)
obj.ab += batting.get("ab", 0)
obj.hits += batting.get("hits", 0)
obj.doubles += batting.get("doubles", 0)
obj.triples += batting.get("triples", 0)
obj.hr += batting.get("hr", 0)
obj.rbi += batting.get("rbi", 0)
obj.runs += batting.get("runs", 0)
obj.bb += batting.get("bb", 0)
obj.strikeouts += batting.get("strikeouts", 0)
obj.hbp += batting.get("hbp", 0)
obj.sac += batting.get("sac", 0)
obj.ibb += batting.get("ibb", 0)
obj.gidp += batting.get("gidp", 0)
obj.sb += batting.get("sb", 0)
obj.cs += batting.get("cs", 0)
obj.last_game_id = game_id
obj.last_updated_at = now
obj.save()
def _upsert_pitching_sqlite(player_id, team_id, season, game_id, pitching):
"""
SQLite upsert for PitchingSeasonStats: read-modify-write inside the outer atomic() block.
SQLite doesn't support EXCLUDED-based increments via Peewee's
on_conflict(), so we use get_or_create + field-level addition.
This is safe because the entire update_season_stats() call is
wrapped in db.atomic().
"""
now = datetime.now()
obj, _ = PitchingSeasonStats.get_or_create(
player_id=player_id,
team_id=team_id,
season=season,
)
obj.games += pitching.get("games", 0)
obj.games_started += pitching.get("games_started", 0)
obj.outs += pitching.get("outs", 0)
obj.strikeouts += pitching.get("strikeouts", 0)
obj.bb += pitching.get("bb", 0)
obj.hits_allowed += pitching.get("hits_allowed", 0)
obj.runs_allowed += pitching.get("runs_allowed", 0)
obj.earned_runs += pitching.get("earned_runs", 0)
obj.hr_allowed += pitching.get("hr_allowed", 0)
obj.hbp += pitching.get("hbp", 0)
obj.wild_pitches += pitching.get("wild_pitches", 0)
obj.balks += pitching.get("balks", 0)
obj.wins += pitching.get("wins", 0)
obj.losses += pitching.get("losses", 0)
obj.holds += pitching.get("holds", 0)
obj.saves += pitching.get("saves", 0)
obj.blown_saves += pitching.get("blown_saves", 0)
obj.last_game_id = game_id
obj.last_updated_at = now
obj.save()
def update_season_stats(game_id: int) -> dict:
"""
Accumulate per-game batting and pitching stats into BattingSeasonStats
and PitchingSeasonStats respectively.
This function is safe to call exactly once per game. Idempotency is
enforced via an atomic INSERT into the ProcessedGame ledger table.
The first call for a given game_id succeeds and returns full results;
any subsequent call (including out-of-order re-delivery after a later
game has been processed) finds the existing row and returns early with
"skipped": True without touching any stats rows.
Algorithm:
1. Fetch StratGame to get the season.
2. Atomic INSERT into ProcessedGame — if the row already exists,
return early (skipped).
3. Collect all StratPlay rows for the game.
4. Group batting stats by (batter_id, batter_team_id).
5. Group pitching stats by (pitcher_id, pitcher_team_id).
6. Merge Decision rows into pitching groups.
7. Upsert each batter into BattingSeasonStats using either:
- PostgreSQL: atomic SQL increment via ON CONFLICT DO UPDATE
- SQLite: read-modify-write inside a transaction
8. Upsert each pitcher into PitchingSeasonStats using the same strategy.
Args:
game_id: Primary key of the StratGame to process.
Returns:
Summary dict with keys: game_id, season, batters_updated,
pitchers_updated. If the game was already processed, also
includes "skipped": True.
Raises:
StratGame.DoesNotExist: If no StratGame row matches game_id.
"""
logger.info("update_season_stats: starting for game_id=%d", game_id)
# Step 1 — Fetch the game to get season
game = StratGame.get_by_id(game_id)
season = game.season
with db.atomic():
# Step 2 — Full idempotency via ProcessedGame ledger.
# Atomic INSERT: if the row already exists (same game_id), get_or_create
# returns created=False and we skip. This handles same-game immediate
# replay AND out-of-order re-delivery (game G re-delivered after G+1
# was already processed).
_, created = ProcessedGame.get_or_create(game_id=game_id)
if not created:
logger.info(
"update_season_stats: game_id=%d already processed, skipping",
game_id,
)
return {
"game_id": game_id,
"season": season,
"batters_updated": 0,
"pitchers_updated": 0,
"skipped": True,
}
# Step 3 — Load plays
plays = list(StratPlay.select().where(StratPlay.game == game_id))
logger.debug(
"update_season_stats: game_id=%d loaded %d plays", game_id, len(plays)
)
# Steps 4 & 5 — Aggregate batting and pitching groups
batting_groups = _build_batting_groups(plays)
pitching_groups = _build_pitching_groups(plays)
# Step 6 — Merge Decision rows into pitching groups
decisions = list(Decision.select().where(Decision.game == game_id))
_apply_decisions(pitching_groups, decisions)
upsert_batting = (
_upsert_batting_postgres
if DATABASE_TYPE == "postgresql"
else _upsert_batting_sqlite
)
upsert_pitching = (
_upsert_pitching_postgres
if DATABASE_TYPE == "postgresql"
else _upsert_pitching_sqlite
)
# Step 7 — Upsert batting rows into BattingSeasonStats
batters_updated = 0
for (player_id, team_id), batting in batting_groups.items():
upsert_batting(player_id, team_id, season, game_id, batting)
batters_updated += 1
# Step 8 — Upsert pitching rows into PitchingSeasonStats
pitchers_updated = 0
for (player_id, team_id), pitching in pitching_groups.items():
upsert_pitching(player_id, team_id, season, game_id, pitching)
pitchers_updated += 1
logger.info(
"update_season_stats: game_id=%d complete — "
"batters_updated=%d pitchers_updated=%d",
game_id,
batters_updated,
pitchers_updated,
)
return {
"game_id": game_id,
"season": season,
"batters_updated": batters_updated,
"pitchers_updated": pitchers_updated,
}