Merge pull request 'feat: add ProcessedGame ledger for full idempotency in update_season_stats() (#105)' (#106) from ai/paper-dynasty-database#105 into card-evolution

Merge PR #106: ProcessedGame ledger for full idempotency in update_season_stats()
This commit is contained in:
cal 2026-03-18 20:30:35 +00:00
commit c69082e3ee
5 changed files with 428 additions and 310 deletions

View File

@ -1152,9 +1152,25 @@ pitss_player_season_index = ModelIndex(
PitchingSeasonStats.add_index(pitss_player_season_index) PitchingSeasonStats.add_index(pitss_player_season_index)
class ProcessedGame(BaseModel):
game = ForeignKeyField(StratGame, primary_key=True)
processed_at = DateTimeField(default=datetime.now)
class Meta:
database = db
table_name = "processed_game"
if not SKIP_TABLE_CREATION: if not SKIP_TABLE_CREATION:
db.create_tables( db.create_tables(
[StratGame, StratPlay, Decision, BattingSeasonStats, PitchingSeasonStats], [
StratGame,
StratPlay,
Decision,
BattingSeasonStats,
PitchingSeasonStats,
ProcessedGame,
],
safe=True, safe=True,
) )
@ -1194,75 +1210,6 @@ if not SKIP_TABLE_CREATION:
db.create_tables([ScoutOpportunity, ScoutClaim], safe=True) db.create_tables([ScoutOpportunity, ScoutClaim], safe=True)
class PlayerSeasonStats(BaseModel):
player = ForeignKeyField(Player)
team = ForeignKeyField(Team)
season = IntegerField()
# Batting stats
games_batting = IntegerField(default=0)
pa = IntegerField(default=0)
ab = IntegerField(default=0)
hits = IntegerField(default=0)
doubles = IntegerField(default=0)
triples = IntegerField(default=0)
hr = IntegerField(default=0)
bb = IntegerField(default=0)
hbp = IntegerField(default=0)
so = IntegerField(default=0)
rbi = IntegerField(default=0)
runs = IntegerField(default=0)
sb = IntegerField(default=0)
cs = IntegerField(default=0)
# Pitching stats
games_pitching = IntegerField(default=0)
outs = IntegerField(default=0)
k = IntegerField(default=0)
bb_allowed = IntegerField(default=0)
hits_allowed = IntegerField(default=0)
hr_allowed = IntegerField(default=0)
wins = IntegerField(default=0)
losses = IntegerField(default=0)
saves = IntegerField(default=0)
holds = IntegerField(default=0)
blown_saves = IntegerField(default=0)
# Meta
last_game = ForeignKeyField(StratGame, null=True)
last_updated_at = DateTimeField(null=True)
class Meta:
database = db
table_name = "player_season_stats"
player_season_stats_unique_index = ModelIndex(
PlayerSeasonStats,
(PlayerSeasonStats.player, PlayerSeasonStats.team, PlayerSeasonStats.season),
unique=True,
)
PlayerSeasonStats.add_index(player_season_stats_unique_index)
player_season_stats_team_season_index = ModelIndex(
PlayerSeasonStats,
(PlayerSeasonStats.team, PlayerSeasonStats.season),
unique=False,
)
PlayerSeasonStats.add_index(player_season_stats_team_season_index)
player_season_stats_player_season_index = ModelIndex(
PlayerSeasonStats,
(PlayerSeasonStats.player, PlayerSeasonStats.season),
unique=False,
)
PlayerSeasonStats.add_index(player_season_stats_player_season_index)
if not SKIP_TABLE_CREATION:
db.create_tables([PlayerSeasonStats], safe=True)
class EvolutionTrack(BaseModel): class EvolutionTrack(BaseModel):
name = CharField(unique=True) name = CharField(unique=True)
card_type = CharField() # 'batter', 'sp', 'rp' card_type = CharField() # 'batter', 'sp', 'rp'

View File

@ -1,20 +1,19 @@
""" """
season_stats.py Incremental PlayerSeasonStats update logic. season_stats.py Incremental BattingSeasonStats and PitchingSeasonStats update logic.
Called once per completed StratGame to accumulate batting and pitching Called once per completed StratGame to accumulate batting and pitching
statistics into the player_season_stats table. statistics into the batting_season_stats and pitching_season_stats tables
respectively.
Idempotency limitation: re-delivery of a game is detected by checking Idempotency: re-delivery of a game (including out-of-order re-delivery)
whether any PlayerSeasonStats row still carries that game_id as last_game. is detected via an atomic INSERT into the ProcessedGame ledger table
This guard only works if no later game has been processed for the same keyed on game_id. The first call for a given game_id succeeds; all
players if game G+1 is processed first, a re-delivery of game G will subsequent calls return early with "skipped": True without modifying
bypass the guard and double-count stats. A persistent processed-game any stats rows.
ledger is needed for full idempotency across out-of-order re-delivery
(see issue #105).
Peewee upsert strategy: Peewee upsert strategy:
- SQLite: on_conflict_replace() simplest path, deletes + re-inserts - SQLite: read-modify-write inside db.atomic() transaction
- PostgreSQL: on_conflict() with EXCLUDED true atomic increment via SQL - PostgreSQL: ON CONFLICT ... DO UPDATE with column-level EXCLUDED increments
""" """
import logging import logging
@ -26,8 +25,10 @@ from peewee import EXCLUDED
from app.db_engine import ( from app.db_engine import (
db, db,
BattingSeasonStats,
Decision, Decision,
PlayerSeasonStats, PitchingSeasonStats,
ProcessedGame,
StratGame, StratGame,
StratPlay, StratPlay,
) )
@ -41,27 +42,31 @@ def _build_batting_groups(plays):
""" """
Aggregate per-play batting stats by (batter_id, batter_team_id). Aggregate per-play batting stats by (batter_id, batter_team_id).
Only plays where pa > 0 are counted toward games_batting, but all Only plays where pa > 0 are counted toward games, but all
play-level stat fields are accumulated regardless of pa value so play-level stat fields are accumulated regardless of pa value so
that rare edge cases (e.g. sac bunt without official PA) are that rare edge cases (e.g. sac bunt without official PA) are
correctly included in the totals. correctly included in the totals.
Returns a dict keyed by (batter_id, batter_team_id) with stat dicts. Returns a dict keyed by (batter_id, batter_team_id) with stat dicts
matching BattingSeasonStats column names.
""" """
groups = defaultdict( groups = defaultdict(
lambda: { lambda: {
"games_batting": 0, "games": 0,
"pa": 0, "pa": 0,
"ab": 0, "ab": 0,
"hits": 0, "hits": 0,
"doubles": 0, "doubles": 0,
"triples": 0, "triples": 0,
"hr": 0, "hr": 0,
"bb": 0,
"hbp": 0,
"so": 0,
"rbi": 0, "rbi": 0,
"runs": 0, "runs": 0,
"bb": 0,
"strikeouts": 0,
"hbp": 0,
"sac": 0,
"ibb": 0,
"gidp": 0,
"sb": 0, "sb": 0,
"cs": 0, "cs": 0,
"appeared": False, # tracks whether batter appeared at all in this game "appeared": False, # tracks whether batter appeared at all in this game
@ -84,16 +89,19 @@ def _build_batting_groups(plays):
g["doubles"] += play.double g["doubles"] += play.double
g["triples"] += play.triple g["triples"] += play.triple
g["hr"] += play.homerun g["hr"] += play.homerun
g["bb"] += play.bb
g["hbp"] += play.hbp
g["so"] += play.so
g["rbi"] += play.rbi g["rbi"] += play.rbi
g["runs"] += play.run g["runs"] += play.run
g["bb"] += play.bb
g["strikeouts"] += play.so
g["hbp"] += play.hbp
g["sac"] += play.sac
g["ibb"] += play.ibb
g["gidp"] += play.gidp
g["sb"] += play.sb g["sb"] += play.sb
g["cs"] += play.cs g["cs"] += play.cs
if play.pa > 0 and not g["appeared"]: if play.pa > 0 and not g["appeared"]:
g["games_batting"] = 1 g["games"] = 1
g["appeared"] = True g["appeared"] = True
# Clean up the helper flag before returning # Clean up the helper flag before returning
@ -110,30 +118,40 @@ def _build_pitching_groups(plays):
Stats on StratPlay are recorded from the batter's perspective, so Stats on StratPlay are recorded from the batter's perspective, so
when accumulating pitcher stats we collect: when accumulating pitcher stats we collect:
- outs pitcher outs recorded (directly on play) - outs pitcher outs recorded (directly on play)
- so strikeouts (batter's so = pitcher's k) - so strikeouts (batter's so = pitcher's strikeouts)
- hit hits allowed - hit hits allowed
- bb+hbp base-on-balls allowed - bb walks allowed (batter bb, separate from hbp)
- hbp hit batters
- homerun home runs allowed - homerun home runs allowed
games_pitching counts unique pitchers who appeared (at least one games counts unique pitchers who appeared (at least one play as
play as pitcher), capped at 1 per game since this function processes pitcher), capped at 1 per game since this function processes a
a single game. single game. games_started is populated later via _apply_decisions().
Returns a dict keyed by (pitcher_id, pitcher_team_id) with stat dicts. Fields not available from StratPlay (runs_allowed, earned_runs,
wild_pitches, balks) default to 0 and are not incremented.
Returns a dict keyed by (pitcher_id, pitcher_team_id) with stat dicts
matching PitchingSeasonStats column names.
""" """
groups = defaultdict( groups = defaultdict(
lambda: { lambda: {
"games_pitching": 1, # pitcher appeared in this game by definition "games": 1, # pitcher appeared in this game by definition
"games_started": 0, # populated later via _apply_decisions
"outs": 0, "outs": 0,
"k": 0, "strikeouts": 0,
"bb": 0,
"hits_allowed": 0, "hits_allowed": 0,
"bb_allowed": 0, "runs_allowed": 0, # not available from StratPlay
"earned_runs": 0, # not available from StratPlay
"hr_allowed": 0, "hr_allowed": 0,
# Decision stats added later "hbp": 0,
"wild_pitches": 0, # not available from StratPlay
"balks": 0, # not available from StratPlay
"wins": 0, "wins": 0,
"losses": 0, "losses": 0,
"saves": 0,
"holds": 0, "holds": 0,
"saves": 0,
"blown_saves": 0, "blown_saves": 0,
} }
) )
@ -141,13 +159,18 @@ def _build_pitching_groups(plays):
for play in plays: for play in plays:
pitcher_id = play.pitcher_id pitcher_id = play.pitcher_id
pitcher_team_id = play.pitcher_team_id pitcher_team_id = play.pitcher_team_id
if pitcher_id is None:
continue
key = (pitcher_id, pitcher_team_id) key = (pitcher_id, pitcher_team_id)
g = groups[key] g = groups[key]
g["outs"] += play.outs g["outs"] += play.outs
g["k"] += play.so g["strikeouts"] += play.so
g["hits_allowed"] += play.hit g["hits_allowed"] += play.hit
g["bb_allowed"] += play.bb + play.hbp g["bb"] += play.bb
g["hbp"] += play.hbp
g["hr_allowed"] += play.homerun g["hr_allowed"] += play.homerun
return groups return groups
@ -170,16 +193,22 @@ def _apply_decisions(pitching_groups, decisions):
# Initialise a zeroed entry if not already present. # Initialise a zeroed entry if not already present.
if key not in pitching_groups: if key not in pitching_groups:
pitching_groups[key] = { pitching_groups[key] = {
"games_pitching": 1, "games": 1,
"games_started": 0,
"outs": 0, "outs": 0,
"k": 0, "strikeouts": 0,
"bb": 0,
"hits_allowed": 0, "hits_allowed": 0,
"bb_allowed": 0, "runs_allowed": 0,
"earned_runs": 0,
"hr_allowed": 0, "hr_allowed": 0,
"hbp": 0,
"wild_pitches": 0,
"balks": 0,
"wins": 0, "wins": 0,
"losses": 0, "losses": 0,
"saves": 0,
"holds": 0, "holds": 0,
"saves": 0,
"blown_saves": 0, "blown_saves": 0,
} }
@ -189,124 +218,71 @@ def _apply_decisions(pitching_groups, decisions):
g["saves"] += decision.is_save g["saves"] += decision.is_save
g["holds"] += decision.hold g["holds"] += decision.hold
g["blown_saves"] += decision.b_save g["blown_saves"] += decision.b_save
g["games_started"] += 1 if decision.is_start else 0
def _upsert_postgres(player_id, team_id, season, game_id, batting, pitching): def _upsert_batting_postgres(player_id, team_id, season, game_id, batting):
""" """
PostgreSQL upsert using ON CONFLICT ... DO UPDATE with column-level PostgreSQL upsert for BattingSeasonStats using ON CONFLICT ... DO UPDATE.
increments. Each stat column is incremented by the value from the Each stat column is incremented by the EXCLUDED (incoming) value,
EXCLUDED (incoming) row, ensuring concurrent games don't overwrite ensuring concurrent games don't overwrite each other.
each other.
""" """
now = datetime.now() now = datetime.now()
row = {
"player_id": player_id,
"team_id": team_id,
"season": season,
"games_batting": batting.get("games_batting", 0),
"pa": batting.get("pa", 0),
"ab": batting.get("ab", 0),
"hits": batting.get("hits", 0),
"doubles": batting.get("doubles", 0),
"triples": batting.get("triples", 0),
"hr": batting.get("hr", 0),
"bb": batting.get("bb", 0),
"hbp": batting.get("hbp", 0),
"so": batting.get("so", 0),
"rbi": batting.get("rbi", 0),
"runs": batting.get("runs", 0),
"sb": batting.get("sb", 0),
"cs": batting.get("cs", 0),
"games_pitching": pitching.get("games_pitching", 0),
"outs": pitching.get("outs", 0),
"k": pitching.get("k", 0),
"hits_allowed": pitching.get("hits_allowed", 0),
"bb_allowed": pitching.get("bb_allowed", 0),
"hr_allowed": pitching.get("hr_allowed", 0),
"wins": pitching.get("wins", 0),
"losses": pitching.get("losses", 0),
"saves": pitching.get("saves", 0),
"holds": pitching.get("holds", 0),
"blown_saves": pitching.get("blown_saves", 0),
"last_game_id": game_id,
"last_updated_at": now,
}
# Incrementable stat columns (all batting + pitching accumulators)
increment_cols = [ increment_cols = [
"games_batting", "games",
"pa", "pa",
"ab", "ab",
"hits", "hits",
"doubles", "doubles",
"triples", "triples",
"hr", "hr",
"bb",
"hbp",
"so",
"rbi", "rbi",
"runs", "runs",
"bb",
"strikeouts",
"hbp",
"sac",
"ibb",
"gidp",
"sb", "sb",
"cs", "cs",
"games_pitching",
"outs",
"k",
"hits_allowed",
"bb_allowed",
"hr_allowed",
"wins",
"losses",
"saves",
"holds",
"blown_saves",
] ]
# Build the conflict-target field objects
conflict_target = [ conflict_target = [
PlayerSeasonStats.player, BattingSeasonStats.player,
PlayerSeasonStats.team, BattingSeasonStats.team,
PlayerSeasonStats.season, BattingSeasonStats.season,
] ]
# Build the update dict: increment accumulators, overwrite metadata
update_dict = {} update_dict = {}
for col in increment_cols: for col in increment_cols:
field_obj = getattr(PlayerSeasonStats, col) field_obj = getattr(BattingSeasonStats, col)
update_dict[field_obj] = field_obj + EXCLUDED[col] update_dict[field_obj] = field_obj + EXCLUDED[col]
update_dict[BattingSeasonStats.last_game] = EXCLUDED["last_game_id"]
update_dict[BattingSeasonStats.last_updated_at] = EXCLUDED["last_updated_at"]
update_dict[PlayerSeasonStats.last_game] = EXCLUDED["last_game_id"] BattingSeasonStats.insert(
update_dict[PlayerSeasonStats.last_updated_at] = EXCLUDED["last_updated_at"]
PlayerSeasonStats.insert(
player=player_id, player=player_id,
team=team_id, team=team_id,
season=season, season=season,
games_batting=row["games_batting"], games=batting.get("games", 0),
pa=row["pa"], pa=batting.get("pa", 0),
ab=row["ab"], ab=batting.get("ab", 0),
hits=row["hits"], hits=batting.get("hits", 0),
doubles=row["doubles"], doubles=batting.get("doubles", 0),
triples=row["triples"], triples=batting.get("triples", 0),
hr=row["hr"], hr=batting.get("hr", 0),
bb=row["bb"], rbi=batting.get("rbi", 0),
hbp=row["hbp"], runs=batting.get("runs", 0),
so=row["so"], bb=batting.get("bb", 0),
rbi=row["rbi"], strikeouts=batting.get("strikeouts", 0),
runs=row["runs"], hbp=batting.get("hbp", 0),
sb=row["sb"], sac=batting.get("sac", 0),
cs=row["cs"], ibb=batting.get("ibb", 0),
games_pitching=row["games_pitching"], gidp=batting.get("gidp", 0),
outs=row["outs"], sb=batting.get("sb", 0),
k=row["k"], cs=batting.get("cs", 0),
hits_allowed=row["hits_allowed"],
bb_allowed=row["bb_allowed"],
hr_allowed=row["hr_allowed"],
wins=row["wins"],
losses=row["losses"],
saves=row["saves"],
holds=row["holds"],
blown_saves=row["blown_saves"],
last_game=game_id, last_game=game_id,
last_updated_at=now, last_updated_at=now,
).on_conflict( ).on_conflict(
@ -316,9 +292,80 @@ def _upsert_postgres(player_id, team_id, season, game_id, batting, pitching):
).execute() ).execute()
def _upsert_sqlite(player_id, team_id, season, game_id, batting, pitching): def _upsert_pitching_postgres(player_id, team_id, season, game_id, pitching):
""" """
SQLite upsert: read-modify-write inside the outer atomic() block. PostgreSQL upsert for PitchingSeasonStats using ON CONFLICT ... DO UPDATE.
Each stat column is incremented by the EXCLUDED (incoming) value,
ensuring concurrent games don't overwrite each other.
"""
now = datetime.now()
increment_cols = [
"games",
"games_started",
"outs",
"strikeouts",
"bb",
"hits_allowed",
"runs_allowed",
"earned_runs",
"hr_allowed",
"hbp",
"wild_pitches",
"balks",
"wins",
"losses",
"holds",
"saves",
"blown_saves",
]
conflict_target = [
PitchingSeasonStats.player,
PitchingSeasonStats.team,
PitchingSeasonStats.season,
]
update_dict = {}
for col in increment_cols:
field_obj = getattr(PitchingSeasonStats, col)
update_dict[field_obj] = field_obj + EXCLUDED[col]
update_dict[PitchingSeasonStats.last_game] = EXCLUDED["last_game_id"]
update_dict[PitchingSeasonStats.last_updated_at] = EXCLUDED["last_updated_at"]
PitchingSeasonStats.insert(
player=player_id,
team=team_id,
season=season,
games=pitching.get("games", 0),
games_started=pitching.get("games_started", 0),
outs=pitching.get("outs", 0),
strikeouts=pitching.get("strikeouts", 0),
bb=pitching.get("bb", 0),
hits_allowed=pitching.get("hits_allowed", 0),
runs_allowed=pitching.get("runs_allowed", 0),
earned_runs=pitching.get("earned_runs", 0),
hr_allowed=pitching.get("hr_allowed", 0),
hbp=pitching.get("hbp", 0),
wild_pitches=pitching.get("wild_pitches", 0),
balks=pitching.get("balks", 0),
wins=pitching.get("wins", 0),
losses=pitching.get("losses", 0),
holds=pitching.get("holds", 0),
saves=pitching.get("saves", 0),
blown_saves=pitching.get("blown_saves", 0),
last_game=game_id,
last_updated_at=now,
).on_conflict(
conflict_target=conflict_target,
action="update",
update=update_dict,
).execute()
def _upsert_batting_sqlite(player_id, team_id, season, game_id, batting):
"""
SQLite upsert for BattingSeasonStats: read-modify-write inside the outer atomic() block.
SQLite doesn't support EXCLUDED-based increments via Peewee's SQLite doesn't support EXCLUDED-based increments via Peewee's
on_conflict(), so we use get_or_create + field-level addition. on_conflict(), so we use get_or_create + field-level addition.
@ -327,37 +374,68 @@ def _upsert_sqlite(player_id, team_id, season, game_id, batting, pitching):
""" """
now = datetime.now() now = datetime.now()
obj, _ = PlayerSeasonStats.get_or_create( obj, _ = BattingSeasonStats.get_or_create(
player_id=player_id, player_id=player_id,
team_id=team_id, team_id=team_id,
season=season, season=season,
) )
obj.games_batting += batting.get("games_batting", 0) obj.games += batting.get("games", 0)
obj.pa += batting.get("pa", 0) obj.pa += batting.get("pa", 0)
obj.ab += batting.get("ab", 0) obj.ab += batting.get("ab", 0)
obj.hits += batting.get("hits", 0) obj.hits += batting.get("hits", 0)
obj.doubles += batting.get("doubles", 0) obj.doubles += batting.get("doubles", 0)
obj.triples += batting.get("triples", 0) obj.triples += batting.get("triples", 0)
obj.hr += batting.get("hr", 0) obj.hr += batting.get("hr", 0)
obj.bb += batting.get("bb", 0)
obj.hbp += batting.get("hbp", 0)
obj.so += batting.get("so", 0)
obj.rbi += batting.get("rbi", 0) obj.rbi += batting.get("rbi", 0)
obj.runs += batting.get("runs", 0) obj.runs += batting.get("runs", 0)
obj.bb += batting.get("bb", 0)
obj.strikeouts += batting.get("strikeouts", 0)
obj.hbp += batting.get("hbp", 0)
obj.sac += batting.get("sac", 0)
obj.ibb += batting.get("ibb", 0)
obj.gidp += batting.get("gidp", 0)
obj.sb += batting.get("sb", 0) obj.sb += batting.get("sb", 0)
obj.cs += batting.get("cs", 0) obj.cs += batting.get("cs", 0)
obj.games_pitching += pitching.get("games_pitching", 0) obj.last_game_id = game_id
obj.last_updated_at = now
obj.save()
def _upsert_pitching_sqlite(player_id, team_id, season, game_id, pitching):
"""
SQLite upsert for PitchingSeasonStats: read-modify-write inside the outer atomic() block.
SQLite doesn't support EXCLUDED-based increments via Peewee's
on_conflict(), so we use get_or_create + field-level addition.
This is safe because the entire update_season_stats() call is
wrapped in db.atomic().
"""
now = datetime.now()
obj, _ = PitchingSeasonStats.get_or_create(
player_id=player_id,
team_id=team_id,
season=season,
)
obj.games += pitching.get("games", 0)
obj.games_started += pitching.get("games_started", 0)
obj.outs += pitching.get("outs", 0) obj.outs += pitching.get("outs", 0)
obj.k += pitching.get("k", 0) obj.strikeouts += pitching.get("strikeouts", 0)
obj.bb += pitching.get("bb", 0)
obj.hits_allowed += pitching.get("hits_allowed", 0) obj.hits_allowed += pitching.get("hits_allowed", 0)
obj.bb_allowed += pitching.get("bb_allowed", 0) obj.runs_allowed += pitching.get("runs_allowed", 0)
obj.earned_runs += pitching.get("earned_runs", 0)
obj.hr_allowed += pitching.get("hr_allowed", 0) obj.hr_allowed += pitching.get("hr_allowed", 0)
obj.hbp += pitching.get("hbp", 0)
obj.wild_pitches += pitching.get("wild_pitches", 0)
obj.balks += pitching.get("balks", 0)
obj.wins += pitching.get("wins", 0) obj.wins += pitching.get("wins", 0)
obj.losses += pitching.get("losses", 0) obj.losses += pitching.get("losses", 0)
obj.saves += pitching.get("saves", 0)
obj.holds += pitching.get("holds", 0) obj.holds += pitching.get("holds", 0)
obj.saves += pitching.get("saves", 0)
obj.blown_saves += pitching.get("blown_saves", 0) obj.blown_saves += pitching.get("blown_saves", 0)
obj.last_game_id = game_id obj.last_game_id = game_id
@ -367,29 +445,28 @@ def _upsert_sqlite(player_id, team_id, season, game_id, batting, pitching):
def update_season_stats(game_id: int) -> dict: def update_season_stats(game_id: int) -> dict:
""" """
Accumulate per-game batting and pitching stats into PlayerSeasonStats. Accumulate per-game batting and pitching stats into BattingSeasonStats
and PitchingSeasonStats respectively.
This function is safe to call exactly once per game. If called again This function is safe to call exactly once per game. Idempotency is
for the same game_id while it is still the most-recently-processed enforced via an atomic INSERT into the ProcessedGame ledger table.
game for at least one affected player (detected by checking last_game The first call for a given game_id succeeds and returns full results;
FK), it returns early without modifying any data. any subsequent call (including out-of-order re-delivery after a later
game has been processed) finds the existing row and returns early with
Limitation: the guard only detects re-delivery if no later game has "skipped": True without touching any stats rows.
been processed for the same players. Out-of-order re-delivery (e.g.
game G re-delivered after game G+1 was already processed) will not be
caught and will silently double-count stats. See issue #105 for the
planned ProcessedGame ledger fix.
Algorithm: Algorithm:
1. Fetch StratGame to get the season. 1. Fetch StratGame to get the season.
2. Guard against re-processing via last_game_id check. 2. Atomic INSERT into ProcessedGame if the row already exists,
return early (skipped).
3. Collect all StratPlay rows for the game. 3. Collect all StratPlay rows for the game.
4. Group batting stats by (batter_id, batter_team_id). 4. Group batting stats by (batter_id, batter_team_id).
5. Group pitching stats by (pitcher_id, pitcher_team_id). 5. Group pitching stats by (pitcher_id, pitcher_team_id).
6. Merge Decision rows into pitching groups. 6. Merge Decision rows into pitching groups.
7. Upsert each player's contribution using either: 7. Upsert each batter into BattingSeasonStats using either:
- PostgreSQL: atomic SQL increment via ON CONFLICT DO UPDATE - PostgreSQL: atomic SQL increment via ON CONFLICT DO UPDATE
- SQLite: read-modify-write inside a transaction - SQLite: read-modify-write inside a transaction
8. Upsert each pitcher into PitchingSeasonStats using the same strategy.
Args: Args:
game_id: Primary key of the StratGame to process. game_id: Primary key of the StratGame to process.
@ -409,16 +486,13 @@ def update_season_stats(game_id: int) -> dict:
season = game.season season = game.season
with db.atomic(): with db.atomic():
# Step 2 — Double-count prevention: check if any row still # Step 2 — Full idempotency via ProcessedGame ledger.
# carries this game_id as last_game. Note: only detects replay # Atomic INSERT: if the row already exists (same game_id), get_or_create
# of the most-recently-processed game; out-of-order re-delivery # returns created=False and we skip. This handles same-game immediate
# bypasses this guard (see issue #105). # replay AND out-of-order re-delivery (game G re-delivered after G+1
already_processed = ( # was already processed).
PlayerSeasonStats.select() _, created = ProcessedGame.get_or_create(game_id=game_id)
.where(PlayerSeasonStats.last_game == game_id) if not created:
.exists()
)
if already_processed:
logger.info( logger.info(
"update_season_stats: game_id=%d already processed, skipping", "update_season_stats: game_id=%d already processed, skipping",
game_id, game_id,
@ -445,28 +519,28 @@ def update_season_stats(game_id: int) -> dict:
decisions = list(Decision.select().where(Decision.game == game_id)) decisions = list(Decision.select().where(Decision.game == game_id))
_apply_decisions(pitching_groups, decisions) _apply_decisions(pitching_groups, decisions)
# Collect all unique player keys across both perspectives. upsert_batting = (
# A two-way player (batter who also pitched, or vice-versa) gets _upsert_batting_postgres
# a single combined row in PlayerSeasonStats. if DATABASE_TYPE == "postgresql"
all_keys = set(batting_groups.keys()) | set(pitching_groups.keys()) else _upsert_batting_sqlite
)
batters_updated = 0 upsert_pitching = (
pitchers_updated = 0 _upsert_pitching_postgres
if DATABASE_TYPE == "postgresql"
upsert_fn = ( else _upsert_pitching_sqlite
_upsert_postgres if DATABASE_TYPE == "postgresql" else _upsert_sqlite
) )
for player_id, team_id in all_keys: # Step 7 — Upsert batting rows into BattingSeasonStats
batting = batting_groups.get((player_id, team_id), {}) batters_updated = 0
pitching = pitching_groups.get((player_id, team_id), {}) for (player_id, team_id), batting in batting_groups.items():
upsert_batting(player_id, team_id, season, game_id, batting)
batters_updated += 1
upsert_fn(player_id, team_id, season, game_id, batting, pitching) # Step 8 — Upsert pitching rows into PitchingSeasonStats
pitchers_updated = 0
if batting: for (player_id, team_id), pitching in pitching_groups.items():
batters_updated += 1 upsert_pitching(player_id, team_id, season, game_id, pitching)
if pitching: pitchers_updated += 1
pitchers_updated += 1
logger.info( logger.info(
"update_season_stats: game_id=%d complete — " "update_season_stats: game_id=%d complete — "

View File

@ -0,0 +1,26 @@
-- Migration: Add processed_game ledger for full update_season_stats() idempotency
-- Date: 2026-03-18
-- Issue: #105
-- Purpose: Replace the last_game FK check in update_season_stats() with an
-- atomic INSERT into processed_game. This prevents out-of-order
-- re-delivery (game G re-delivered after G+1 was already processed)
-- from bypassing the guard and double-counting stats.
BEGIN;
CREATE TABLE IF NOT EXISTS processed_game (
game_id INTEGER PRIMARY KEY REFERENCES stratgame(id) ON DELETE CASCADE,
processed_at TIMESTAMP NOT NULL DEFAULT NOW()
);
COMMIT;
-- ============================================
-- VERIFICATION QUERIES
-- ============================================
-- \d processed_game
-- ============================================
-- ROLLBACK (if needed)
-- ============================================
-- DROP TABLE IF EXISTS processed_game;

View File

@ -40,7 +40,9 @@ from app.db_engine import (
StratGame, StratGame,
StratPlay, StratPlay,
Decision, Decision,
PlayerSeasonStats, BattingSeasonStats,
PitchingSeasonStats,
ProcessedGame,
EvolutionTrack, EvolutionTrack,
EvolutionCardState, EvolutionCardState,
EvolutionTierBoost, EvolutionTierBoost,
@ -68,9 +70,11 @@ _TEST_MODELS = [
StratGame, StratGame,
StratPlay, StratPlay,
Decision, Decision,
BattingSeasonStats,
PitchingSeasonStats,
ProcessedGame,
ScoutOpportunity, ScoutOpportunity,
ScoutClaim, ScoutClaim,
PlayerSeasonStats,
EvolutionTrack, EvolutionTrack,
EvolutionCardState, EvolutionCardState,
EvolutionTierBoost, EvolutionTierBoost,

View File

@ -2,8 +2,9 @@
Tests for app/services/season_stats.py update_season_stats(). Tests for app/services/season_stats.py update_season_stats().
What: Verify that the incremental stat accumulation function correctly What: Verify that the incremental stat accumulation function correctly
aggregates StratPlay and Decision rows into PlayerSeasonStats, handles aggregates StratPlay and Decision rows into BattingSeasonStats and
duplicate calls idempotently, and accumulates stats across multiple games. PitchingSeasonStats, handles duplicate calls idempotently, and
accumulates stats across multiple games.
Why: This is the core bookkeeping engine for card evolution scoring. A Why: This is the core bookkeeping engine for card evolution scoring. A
double-count bug, a missed Decision merge, or a team-isolation failure double-count bug, a missed Decision merge, or a team-isolation failure
@ -20,10 +21,11 @@ import app.services.season_stats as _season_stats_module
import pytest import pytest
from app.db_engine import ( from app.db_engine import (
BattingSeasonStats,
Cardset, Cardset,
Decision, Decision,
PitchingSeasonStats,
Player, Player,
PlayerSeasonStats,
Rarity, Rarity,
StratGame, StratGame,
StratPlay, StratPlay,
@ -36,9 +38,9 @@ from tests.conftest import _test_db
# Module-level patch: redirect season_stats.db to the test database # Module-level patch: redirect season_stats.db to the test database
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# season_stats.py holds a module-level reference to the `db` object imported # season_stats.py holds a module-level reference to the `db` object imported
# from db_engine. When test models are rebound to _test_db via bind(), the # from db_engine. When test models are rebound to _test_db via bind(), the
# `db` object inside season_stats still points at the original production db # `db` object inside season_stats still points at the original production db
# (SQLite file or PostgreSQL). We replace it here so that db.atomic() in # (SQLite file or PostgreSQL). We replace it here so that db.atomic() in
# update_season_stats() operates on the same in-memory connection that the # update_season_stats() operates on the same in-memory connection that the
# test fixtures write to. # test fixtures write to.
_season_stats_module.db = _test_db _season_stats_module.db = _test_db
@ -262,20 +264,20 @@ def test_single_game_batting_stats(team_a, team_b, player_batter, player_pitcher
result = update_season_stats(game.id) result = update_season_stats(game.id)
assert result["batters_updated"] >= 1 assert result["batters_updated"] >= 1
stats = PlayerSeasonStats.get( stats = BattingSeasonStats.get(
PlayerSeasonStats.player == player_batter, BattingSeasonStats.player == player_batter,
PlayerSeasonStats.team == team_a, BattingSeasonStats.team == team_a,
PlayerSeasonStats.season == 11, BattingSeasonStats.season == 11,
) )
assert stats.pa == 4 assert stats.pa == 4
assert stats.ab == 3 assert stats.ab == 3
assert stats.hits == 2 assert stats.hits == 2
assert stats.hr == 1 assert stats.hr == 1
assert stats.so == 1 assert stats.strikeouts == 1
assert stats.bb == 1 assert stats.bb == 1
assert stats.rbi == 1 assert stats.rbi == 1
assert stats.runs == 1 assert stats.runs == 1
assert stats.games_batting == 1 assert stats.games == 1
def test_single_game_pitching_stats( def test_single_game_pitching_stats(
@ -332,16 +334,16 @@ def test_single_game_pitching_stats(
update_season_stats(game.id) update_season_stats(game.id)
stats = PlayerSeasonStats.get( stats = PitchingSeasonStats.get(
PlayerSeasonStats.player == player_pitcher, PitchingSeasonStats.player == player_pitcher,
PlayerSeasonStats.team == team_b, PitchingSeasonStats.team == team_b,
PlayerSeasonStats.season == 11, PitchingSeasonStats.season == 11,
) )
assert stats.outs == 1 # one strikeout = one out recorded assert stats.outs == 1 # one strikeout = one out recorded
assert stats.k == 1 # batter's so → pitcher's k assert stats.strikeouts == 1 # batter's so → pitcher's strikeouts
assert stats.hits_allowed == 1 # batter's hit → pitcher hits_allowed assert stats.hits_allowed == 1 # batter's hit → pitcher hits_allowed
assert stats.bb_allowed == 1 # batter's bb → pitcher bb_allowed assert stats.bb == 1 # batter's bb → pitcher bb (walks allowed)
assert stats.games_pitching == 1 assert stats.games == 1
def test_decision_integration(team_a, team_b, player_batter, player_pitcher, game): def test_decision_integration(team_a, team_b, player_batter, player_pitcher, game):
@ -382,10 +384,10 @@ def test_decision_integration(team_a, team_b, player_batter, player_pitcher, gam
update_season_stats(game.id) update_season_stats(game.id)
stats = PlayerSeasonStats.get( stats = PitchingSeasonStats.get(
PlayerSeasonStats.player == player_pitcher, PitchingSeasonStats.player == player_pitcher,
PlayerSeasonStats.team == team_b, PitchingSeasonStats.team == team_b,
PlayerSeasonStats.season == 11, PitchingSeasonStats.season == 11,
) )
assert stats.wins == 1 assert stats.wins == 1
assert stats.losses == 0 assert stats.losses == 0
@ -395,17 +397,13 @@ def test_double_count_prevention(team_a, team_b, player_batter, player_pitcher,
"""Calling update_season_stats() twice for the same game must not double the stats. """Calling update_season_stats() twice for the same game must not double the stats.
What: Process a game once (pa=3), then immediately call the function What: Process a game once (pa=3), then immediately call the function
again with the same game_id. The second call detects via the again with the same game_id. The second call finds the ProcessedGame
PlayerSeasonStats.last_game FK check that this game is still the ledger row and returns early with 'skipped'=True. The resulting pa
most-recently-processed game and returns early with 'skipped'=True. should still be 3, not 6.
The resulting pa should still be 3, not 6.
Why: The bot infrastructure may deliver game-complete events more than Why: The bot infrastructure may deliver game-complete events more than
once (network retries, message replays). The guard prevents once (network retries, message replays). The ProcessedGame ledger
double-counting when the replayed game is still the last game provides full idempotency for all replay scenarios.
processed for those players. Note: this test only covers same-game
immediate replay out-of-order re-delivery (game G after G+1) is a
known limitation tracked in issue #105.
""" """
for i in range(3): for i in range(3):
make_play( make_play(
@ -428,17 +426,17 @@ def test_double_count_prevention(team_a, team_b, player_batter, player_pitcher,
assert second_result["batters_updated"] == 0 assert second_result["batters_updated"] == 0
assert second_result["pitchers_updated"] == 0 assert second_result["pitchers_updated"] == 0
stats = PlayerSeasonStats.get( stats = BattingSeasonStats.get(
PlayerSeasonStats.player == player_batter, BattingSeasonStats.player == player_batter,
PlayerSeasonStats.team == team_a, BattingSeasonStats.team == team_a,
PlayerSeasonStats.season == 11, BattingSeasonStats.season == 11,
) )
# Must still be 3, not 6 # Must still be 3, not 6
assert stats.pa == 3 assert stats.pa == 3
def test_two_games_accumulate(team_a, team_b, player_batter, player_pitcher): def test_two_games_accumulate(team_a, team_b, player_batter, player_pitcher):
"""Stats from two separate games are summed in a single PlayerSeasonStats row. """Stats from two separate games are summed in a single BattingSeasonStats row.
What: Process game 1 (pa=2) then game 2 (pa=3) for the same batter/team. What: Process game 1 (pa=2) then game 2 (pa=3) for the same batter/team.
After both updates the stats row should show pa=5. After both updates the stats row should show pa=5.
@ -485,13 +483,13 @@ def test_two_games_accumulate(team_a, team_b, player_batter, player_pitcher):
update_season_stats(game1.id) update_season_stats(game1.id)
update_season_stats(game2.id) update_season_stats(game2.id)
stats = PlayerSeasonStats.get( stats = BattingSeasonStats.get(
PlayerSeasonStats.player == player_batter, BattingSeasonStats.player == player_batter,
PlayerSeasonStats.team == team_a, BattingSeasonStats.team == team_a,
PlayerSeasonStats.season == 11, BattingSeasonStats.season == 11,
) )
assert stats.pa == 5 assert stats.pa == 5
assert stats.games_batting == 2 assert stats.games == 2
def test_two_team_game(team_a, team_b): def test_two_team_game(team_a, team_b):
@ -562,33 +560,102 @@ def test_two_team_game(team_a, team_b):
update_season_stats(game.id) update_season_stats(game.id)
# Team A's batter: 2 PA, 1 hit, 1 SO # Team A's batter: 2 PA, 1 hit, 1 SO
stats_ba = PlayerSeasonStats.get( stats_ba = BattingSeasonStats.get(
PlayerSeasonStats.player == batter_a, BattingSeasonStats.player == batter_a,
PlayerSeasonStats.team == team_a, BattingSeasonStats.team == team_a,
) )
assert stats_ba.pa == 2 assert stats_ba.pa == 2
assert stats_ba.hits == 1 assert stats_ba.hits == 1
assert stats_ba.so == 1 assert stats_ba.strikeouts == 1
# Team B's batter: 1 PA, 1 BB # Team B's batter: 1 PA, 1 BB
stats_bb = PlayerSeasonStats.get( stats_bb = BattingSeasonStats.get(
PlayerSeasonStats.player == batter_b, BattingSeasonStats.player == batter_b,
PlayerSeasonStats.team == team_b, BattingSeasonStats.team == team_b,
) )
assert stats_bb.pa == 1 assert stats_bb.pa == 1
assert stats_bb.bb == 1 assert stats_bb.bb == 1
# Team B's pitcher (faced team A's batter): 1 hit allowed, 1 K # Team B's pitcher (faced team A's batter): 1 hit allowed, 1 strikeout
stats_pb = PlayerSeasonStats.get( stats_pb = PitchingSeasonStats.get(
PlayerSeasonStats.player == pitcher_b, PitchingSeasonStats.player == pitcher_b,
PlayerSeasonStats.team == team_b, PitchingSeasonStats.team == team_b,
) )
assert stats_pb.hits_allowed == 1 assert stats_pb.hits_allowed == 1
assert stats_pb.k == 1 assert stats_pb.strikeouts == 1
# Team A's pitcher (faced team B's batter): 1 BB allowed # Team A's pitcher (faced team B's batter): 1 BB allowed
stats_pa = PlayerSeasonStats.get( stats_pa = PitchingSeasonStats.get(
PlayerSeasonStats.player == pitcher_a, PitchingSeasonStats.player == pitcher_a,
PlayerSeasonStats.team == team_a, PitchingSeasonStats.team == team_a,
) )
assert stats_pa.bb_allowed == 1 assert stats_pa.bb == 1
def test_out_of_order_replay_prevented(team_a, team_b, player_batter, player_pitcher):
"""Out-of-order re-delivery of game G (after G+1 was processed) must not double-count.
What: Process game G+1 first (pa=2), then process game G (pa=3). Now
re-deliver game G. The third call must return 'skipped'=True and leave
the batter's pa unchanged at 5 (3 + 2), not 8 (3 + 2 + 3).
Why: This is the failure mode that the old last_game FK guard could not
catch. After G+1 is processed, no BattingSeasonStats row carries
last_game=G anymore (it was overwritten to G+1). The old guard would
have returned already_processed=False and double-counted. The
ProcessedGame ledger fixes this by keying on game_id independently of
the stats rows.
"""
game_g = StratGame.create(
season=11, game_type="ranked", away_team=team_a, home_team=team_b
)
game_g1 = StratGame.create(
season=11, game_type="ranked", away_team=team_a, home_team=team_b
)
# Game G: 3 plate appearances
for i in range(3):
make_play(
game_g,
i + 1,
player_batter,
team_a,
player_pitcher,
team_b,
pa=1,
ab=1,
outs=1,
)
# Game G+1: 2 plate appearances
for i in range(2):
make_play(
game_g1,
i + 1,
player_batter,
team_a,
player_pitcher,
team_b,
pa=1,
ab=1,
outs=1,
)
# Process G+1 first, then G — simulates out-of-order delivery
update_season_stats(game_g1.id)
update_season_stats(game_g.id)
stats = BattingSeasonStats.get(
BattingSeasonStats.player == player_batter,
BattingSeasonStats.team == team_a,
BattingSeasonStats.season == 11,
)
assert stats.pa == 5 # 3 (game G) + 2 (game G+1)
# Re-deliver game G — must be blocked by ProcessedGame ledger
replay_result = update_season_stats(game_g.id)
assert replay_result.get("skipped") is True
# Stats must remain at 5, not 8
stats.refresh()
assert stats.pa == 5