strat-gameplay-webapp/backend/app/core/game_engine.py
Cal Corum 72a3b94ce7 CLAUDE: Add transaction handling for multi-step DB operations
Issue #4 from code review - multi-step database operations were not
wrapped in transactions, risking partial state on failure.

Changes:
- Modified save_play() and update_game_state() in DatabaseOperations
  to accept optional session parameter for transaction grouping
- Wrapped resolve_play() DB operations in single atomic transaction
- Wrapped resolve_manual_play() DB operations in single atomic transaction
- Transaction commits atomically or rolls back entirely on failure

Pattern: When session provided, use flush() for IDs without committing;
caller controls transaction. When no session, create internal transaction.

All 739 unit tests passing.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-19 16:39:01 -06:00

1205 lines
46 KiB
Python

"""
Game Engine - Main game orchestration engine.
Coordinates game flow, validates actions, resolves plays, and persists state.
Integrates DiceSystem for roll tracking with context and batch saving.
Phase 3: Enhanced with async decision workflow and AI opponent integration.
Author: Claude
Date: 2025-10-24
"""
import asyncio
import logging
from uuid import UUID
from typing import Optional, List
import pendulum
from app.core.state_manager import state_manager
from app.core.play_resolver import PlayResolver, PlayResult
from app.config import PlayOutcome, get_league_config
from app.core.validators import game_validator, ValidationError
from app.core.dice import dice_system
from app.core.ai_opponent import ai_opponent
from app.database.operations import DatabaseOperations
from app.database.session import AsyncSessionLocal
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.game_models import (
GameState, DefensiveDecision, OffensiveDecision
)
from app.services.position_rating_service import position_rating_service
from app.services.lineup_service import lineup_service
from app.services import PlayStatCalculator
logger = logging.getLogger(f'{__name__}.GameEngine')
class GameEngine:
"""Main game orchestration engine"""
# Phase 3: Decision timeout in seconds
DECISION_TIMEOUT = 30
def __init__(self):
self.db_ops = DatabaseOperations()
# Track rolls per inning for batch saving
self._rolls_this_inning: dict[UUID, List] = {}
# Locks for concurrent decision submission (prevents race conditions)
self._game_locks: dict[UUID, asyncio.Lock] = {}
def _get_game_lock(self, game_id: UUID) -> asyncio.Lock:
"""Get or create a lock for the specified game to prevent race conditions."""
if game_id not in self._game_locks:
self._game_locks[game_id] = asyncio.Lock()
return self._game_locks[game_id]
async def _load_position_ratings_for_lineup(
self,
game_id: UUID,
team_id: int,
league_id: str
) -> None:
"""
Load position ratings for all players in a team's lineup.
Only loads for PD league games. Sets position_rating field on each
LineupPlayerState object in the StateManager's lineup cache.
Args:
game_id: Game identifier
team_id: Team identifier
league_id: League identifier ('sba' or 'pd')
Phase 3E-Main: Loads ratings at game start for X-Check resolution
"""
# Check if league supports ratings
league_config = get_league_config(league_id)
if not league_config.supports_position_ratings():
logger.debug(f"League {league_id} doesn't support position ratings, skipping")
return
# Get lineup from cache
lineup = state_manager.get_lineup(game_id, team_id)
if not lineup:
logger.warning(f"No lineup found for team {team_id} in game {game_id}")
return
logger.info(f"Loading position ratings for team {team_id} lineup ({len(lineup.players)} players)")
# Load ratings for each player
loaded_count = 0
for player in lineup.players:
try:
# Get rating for this player's position
rating = await position_rating_service.get_rating_for_position(
card_id=player.card_id,
position=player.position,
league_id=league_id
)
if rating:
player.position_rating = rating
loaded_count += 1
logger.debug(
f"Loaded rating for card {player.card_id} at {player.position}: "
f"range={rating.range}, error={rating.error}"
)
else:
logger.warning(
f"No rating found for card {player.card_id} at {player.position}"
)
except Exception as e:
logger.error(
f"Failed to load rating for card {player.card_id} at {player.position}: {e}"
)
logger.info(f"Loaded {loaded_count}/{len(lineup.players)} position ratings for team {team_id}")
async def start_game(self, game_id: UUID) -> GameState:
"""
Start a game
Transitions from 'pending' to 'active'.
Validates that both teams have complete lineups (minimum 9 players each).
Prepares the first play snapshot.
Raises:
ValidationError: If game already started or lineups incomplete
"""
state = state_manager.get_state(game_id)
if not state:
raise ValueError(f"Game {game_id} not found in state manager")
if state.status != "pending":
raise ValidationError(f"Game already started (status: {state.status})")
# HARD REQUIREMENT: Validate both lineups are complete
# At game start, we validate BOTH teams (exception to the "defensive only" rule)
home_lineup = await self.db_ops.get_active_lineup(state.game_id, state.home_team_id)
away_lineup = await self.db_ops.get_active_lineup(state.game_id, state.away_team_id)
# Check minimum 9 players per team
if not home_lineup or len(home_lineup) < 9:
raise ValidationError(
f"Home team lineup incomplete: {len(home_lineup) if home_lineup else 0} players "
f"(minimum 9 required)"
)
if not away_lineup or len(away_lineup) < 9:
raise ValidationError(
f"Away team lineup incomplete: {len(away_lineup) if away_lineup else 0} players "
f"(minimum 9 required)"
)
# Validate defensive positions - at game start, check BOTH teams
try:
game_validator.validate_defensive_lineup_positions(home_lineup)
except ValidationError as e:
raise ValidationError(f"Home team: {e}")
try:
game_validator.validate_defensive_lineup_positions(away_lineup)
except ValidationError as e:
raise ValidationError(f"Away team: {e}")
# Phase 3E-Main: Load position ratings for both teams (PD league only)
await self._load_position_ratings_for_lineup(
game_id=game_id,
team_id=state.home_team_id,
league_id=state.league_id
)
await self._load_position_ratings_for_lineup(
game_id=game_id,
team_id=state.away_team_id,
league_id=state.league_id
)
# Mark as active
state.status = "active"
state.inning = 1
state.half = "top"
state.outs = 0
# Initialize roll tracking for this game
self._rolls_this_inning[game_id] = []
# Prepare first play snapshot
await self._prepare_next_play(state)
# Update state
state_manager.update_state(game_id, state)
# Persist to DB
await self.db_ops.update_game_state(
game_id=game_id,
inning=1,
half="top",
home_score=0,
away_score=0,
status="active"
)
logger.info(
f"Started game {game_id} - First batter: lineup_id={state.current_batter.lineup_id}"
)
return state
async def submit_defensive_decision(
self,
game_id: UUID,
decision: DefensiveDecision
) -> GameState:
"""
Submit defensive team decision.
Phase 3: Now integrates with decision queue to resolve pending futures.
Uses per-game lock to prevent race conditions with concurrent submissions.
"""
async with self._get_game_lock(game_id):
state = state_manager.get_state(game_id)
if not state:
raise ValueError(f"Game {game_id} not found")
game_validator.validate_game_active(state)
game_validator.validate_defensive_decision(decision, state)
# Store decision in state (for backward compatibility)
state.decisions_this_play['defensive'] = decision.model_dump()
state.pending_decision = "offensive"
state.pending_defensive_decision = decision
# Phase 3: Resolve pending future if exists
fielding_team_id = state.get_fielding_team_id()
try:
state_manager.submit_decision(game_id, fielding_team_id, decision)
logger.info(f"Resolved pending defensive decision future for game {game_id}")
except ValueError:
# No pending future - that's okay (direct submission without await)
logger.debug(f"No pending defensive decision for game {game_id}")
state_manager.update_state(game_id, state)
logger.info(f"Defensive decision submitted for game {game_id}")
return state
async def submit_offensive_decision(
self,
game_id: UUID,
decision: OffensiveDecision
) -> GameState:
"""
Submit offensive team decision.
Phase 3: Now integrates with decision queue to resolve pending futures.
Uses per-game lock to prevent race conditions with concurrent submissions.
"""
async with self._get_game_lock(game_id):
state = state_manager.get_state(game_id)
if not state:
raise ValueError(f"Game {game_id} not found")
game_validator.validate_game_active(state)
game_validator.validate_offensive_decision(decision, state)
# Store decision in state (for backward compatibility)
state.decisions_this_play['offensive'] = decision.model_dump()
state.pending_decision = "resolution"
state.pending_offensive_decision = decision
# Phase 3: Resolve pending future if exists
batting_team_id = state.get_batting_team_id()
try:
state_manager.submit_decision(game_id, batting_team_id, decision)
logger.info(f"Resolved pending offensive decision future for game {game_id}")
except ValueError:
# No pending future - that's okay (direct submission without await)
logger.debug(f"No pending offensive decision for game {game_id}")
state_manager.update_state(game_id, state)
logger.info(f"Offensive decision submitted for game {game_id}")
return state
# ============================================================================
# PHASE 3: ENHANCED DECISION WORKFLOW
# ============================================================================
async def await_defensive_decision(
self,
state: GameState,
timeout: int = None
) -> DefensiveDecision:
"""
Wait for defensive team to submit decision.
For AI teams: Generate decision immediately
For human teams: Wait for WebSocket submission (with timeout)
Args:
state: Current game state
timeout: Seconds to wait before using default decision (default: class constant)
Returns:
DefensiveDecision (validated)
Raises:
asyncio.TimeoutError: If timeout exceeded (async games only)
"""
if timeout is None:
timeout = self.DECISION_TIMEOUT
fielding_team_id = state.get_fielding_team_id()
# Check if fielding team is AI
if state.is_fielding_team_ai():
logger.info(f"Generating AI defensive decision for game {state.game_id}")
return await ai_opponent.generate_defensive_decision(state)
# Human team: wait for decision via WebSocket
logger.info(f"Awaiting human defensive decision for game {state.game_id}, team {fielding_team_id}")
# Set pending decision in state manager
state_manager.set_pending_decision(
game_id=state.game_id,
team_id=fielding_team_id,
decision_type="defensive"
)
# Update state with decision phase
state.decision_phase = "awaiting_defensive"
state.decision_deadline = pendulum.now('UTC').add(seconds=timeout).to_iso8601_string()
state_manager.update_state(state.game_id, state)
try:
# Wait for decision with timeout
decision = await asyncio.wait_for(
state_manager.await_decision(state.game_id, fielding_team_id, "defensive"),
timeout=timeout
)
logger.info(f"Received defensive decision for game {state.game_id}")
return decision
except asyncio.TimeoutError:
# Use default decision on timeout
logger.warning(f"Defensive decision timeout for game {state.game_id}, using default")
return DefensiveDecision() # All defaults
async def await_offensive_decision(
self,
state: GameState,
timeout: int = None
) -> OffensiveDecision:
"""
Wait for offensive team to submit decision.
Similar to await_defensive_decision but for batting team.
Args:
state: Current game state
timeout: Seconds to wait before using default decision
Returns:
OffensiveDecision (validated)
Raises:
asyncio.TimeoutError: If timeout exceeded (async games only)
"""
if timeout is None:
timeout = self.DECISION_TIMEOUT
batting_team_id = state.get_batting_team_id()
# Check if batting team is AI
if state.is_batting_team_ai():
logger.info(f"Generating AI offensive decision for game {state.game_id}")
return await ai_opponent.generate_offensive_decision(state)
# Human team: wait for decision via WebSocket
logger.info(f"Awaiting human offensive decision for game {state.game_id}, team {batting_team_id}")
# Set pending decision in state manager
state_manager.set_pending_decision(
game_id=state.game_id,
team_id=batting_team_id,
decision_type="offensive"
)
# Update state with decision phase
state.decision_phase = "awaiting_offensive"
state.decision_deadline = pendulum.now('UTC').add(seconds=timeout).to_iso8601_string()
state_manager.update_state(state.game_id, state)
try:
# Wait for decision with timeout
decision = await asyncio.wait_for(
state_manager.await_decision(state.game_id, batting_team_id, "offensive"),
timeout=timeout
)
logger.info(f"Received offensive decision for game {state.game_id}")
return decision
except asyncio.TimeoutError:
# Use default decision on timeout
logger.warning(f"Offensive decision timeout for game {state.game_id}, using default")
return OffensiveDecision() # All defaults
async def resolve_play(
self,
game_id: UUID,
forced_outcome: Optional[PlayOutcome] = None,
xcheck_position: Optional[str] = None,
xcheck_result: Optional[str] = None,
xcheck_error: Optional[str] = None
) -> PlayResult:
"""
Resolve the current play with dice roll
Explicit orchestration sequence:
1. Resolve play with dice rolls
2. Save play to DB (uses snapshot from GameState)
3. Apply result to state (outs, score, runners)
4. Update game state in DB
5. Check for inning change (outs >= 3)
6. Prepare next play (always last step)
Args:
game_id: Game to resolve
forced_outcome: If provided, use this outcome instead of rolling dice (for testing)
xcheck_position: For X_CHECK outcomes, the position to check (SS, LF, etc.)
xcheck_result: For X_CHECK, force the converted result (G1, G2, SI2, DO2, etc.)
xcheck_error: For X_CHECK, force the error result (NO, E1, E2, E3, RP)
Returns:
PlayResult with complete outcome
"""
state = state_manager.get_state(game_id)
if not state:
raise ValueError(f"Game {game_id} not found")
game_validator.validate_game_active(state)
# Get decisions
defensive_decision = DefensiveDecision(**state.decisions_this_play.get('defensive', {}))
offensive_decision = OffensiveDecision(**state.decisions_this_play.get('offensive', {}))
# STEP 1: Resolve play
# Create resolver for this game's league and mode
resolver = PlayResolver(league_id=state.league_id, auto_mode=state.auto_mode, state_manager=state_manager)
# Roll dice
ab_roll = dice_system.roll_ab(league_id=state.league_id, game_id=game_id)
# Use forced outcome if provided (for testing), otherwise need to implement chart lookup
if forced_outcome is None:
raise NotImplementedError(
"This method only supports forced_outcome for testing. "
"Use resolve_manual_play() for manual mode or resolve_auto_play() for auto mode."
)
# For X_CHECK, use xcheck_position as the hit_location parameter
hit_location = xcheck_position if forced_outcome == PlayOutcome.X_CHECK else None
result = resolver.resolve_outcome(
outcome=forced_outcome,
hit_location=hit_location, # For X_CHECK, this is the position being checked
state=state,
defensive_decision=defensive_decision,
offensive_decision=offensive_decision,
ab_roll=ab_roll,
forced_xcheck_result=xcheck_result,
forced_xcheck_error=xcheck_error
)
# Track roll for batch saving at end of inning
if game_id not in self._rolls_this_inning:
self._rolls_this_inning[game_id] = []
self._rolls_this_inning[game_id].append(result.ab_roll)
# Capture state before applying result
state_before = {
'inning': state.inning,
'half': state.half,
'home_score': state.home_score,
'away_score': state.away_score,
'status': state.status
}
# STEP 3: Apply result to state (outs, score, runners) - before transaction
self._apply_play_result(state, result)
# STEP 2, 4, 5: Database operations in single transaction
async with AsyncSessionLocal() as session:
try:
# STEP 2: Save play to DB (uses snapshot from GameState)
await self._save_play_to_db(state, result, session=session)
# STEP 4: Update game state in DB only if something changed
if (state.inning != state_before['inning'] or
state.half != state_before['half'] or
state.home_score != state_before['home_score'] or
state.away_score != state_before['away_score'] or
state.status != state_before['status']):
await self.db_ops.update_game_state(
game_id=state.game_id,
inning=state.inning,
half=state.half,
home_score=state.home_score,
away_score=state.away_score,
status=state.status,
session=session
)
logger.info("Updated game state in DB - score/inning/status changed")
else:
logger.debug("Skipped game state update - no changes to persist")
# STEP 5: Check for inning change
if state.outs >= 3:
await self._advance_inning(state, game_id)
# Update DB again after inning change
await self.db_ops.update_game_state(
game_id=state.game_id,
inning=state.inning,
half=state.half,
home_score=state.home_score,
away_score=state.away_score,
status=state.status,
session=session
)
# Commit entire transaction
await session.commit()
logger.debug("Committed play transaction successfully")
except Exception as e:
await session.rollback()
logger.error(f"Transaction failed, rolled back: {e}")
raise
# Batch save rolls at half-inning boundary (separate transaction - audit data)
if state.outs >= 3:
await self._batch_save_inning_rolls(game_id)
# STEP 6: Prepare next play or clean up if game completed
if state.status == "active":
await self._prepare_next_play(state)
elif state.status == "completed":
# Clean up per-game resources to prevent memory leaks
self._cleanup_game_resources(game_id)
# Clear decisions for next play
state.decisions_this_play = {}
state.pending_decision = "defensive"
# Update in-memory state
state_manager.update_state(game_id, state)
logger.info(f"Resolved play {state.play_count} for game {game_id}: {result.description}")
return result
async def resolve_manual_play(
self,
game_id: UUID,
ab_roll: 'AbRoll',
outcome: PlayOutcome,
hit_location: Optional[str] = None
) -> PlayResult:
"""
Resolve play with manually-submitted outcome (manual mode).
In manual mode (SBA + PD manual):
1. Server rolls dice for fairness/auditing
2. Players read their physical cards based on those dice
3. Players submit the outcome they see
4. Server validates and processes with the provided outcome
Orchestration sequence (same as resolve_play):
1. Resolve play with manual outcome (uses ab_roll for audit trail)
2. Save play to DB
3. Apply result to state
4. Update game state in DB
5. Check for inning change
6. Prepare next play
Args:
game_id: Game to resolve
ab_roll: The dice roll (server-rolled for fairness)
outcome: PlayOutcome enum (from player's physical card)
hit_location: Optional hit location for groundballs/flyouts
Returns:
PlayResult with complete outcome
Raises:
ValidationError: If game not active or hit location missing when required
"""
state = state_manager.get_state(game_id)
if not state:
raise ValueError(f"Game {game_id} not found")
game_validator.validate_game_active(state)
# Validate hit location provided when required
if outcome.requires_hit_location() and not hit_location:
raise ValidationError(
f"Outcome {outcome.value} requires hit_location "
f"(one of: 1B, 2B, SS, 3B, LF, CF, RF, P, C)"
)
# Get decisions
defensive_decision = DefensiveDecision(**state.decisions_this_play.get('defensive', {}))
offensive_decision = OffensiveDecision(**state.decisions_this_play.get('offensive', {}))
# STEP 1: Resolve play with manual outcome
# Create resolver for this game's league and mode
resolver = PlayResolver(league_id=state.league_id, auto_mode=state.auto_mode, state_manager=state_manager)
# Call core resolution with manual outcome
result = resolver.resolve_outcome(
outcome=outcome,
hit_location=hit_location,
state=state,
defensive_decision=defensive_decision,
offensive_decision=offensive_decision,
ab_roll=ab_roll
)
# Track roll for batch saving at end of inning (same as auto mode)
if game_id not in self._rolls_this_inning:
self._rolls_this_inning[game_id] = []
self._rolls_this_inning[game_id].append(ab_roll)
# Capture state before applying result
state_before = {
'inning': state.inning,
'half': state.half,
'home_score': state.home_score,
'away_score': state.away_score,
'status': state.status
}
# STEP 3: Apply result to state - before transaction
self._apply_play_result(state, result)
# STEP 2, 4, 5: Database operations in single transaction
async with AsyncSessionLocal() as session:
try:
# STEP 2: Save play to DB
await self._save_play_to_db(state, result, session=session)
# STEP 4: Update game state in DB only if something changed
if (state.inning != state_before['inning'] or
state.half != state_before['half'] or
state.home_score != state_before['home_score'] or
state.away_score != state_before['away_score'] or
state.status != state_before['status']):
await self.db_ops.update_game_state(
game_id=state.game_id,
inning=state.inning,
half=state.half,
home_score=state.home_score,
away_score=state.away_score,
status=state.status,
session=session
)
logger.info("Updated game state in DB - score/inning/status changed")
else:
logger.debug("Skipped game state update - no changes to persist")
# STEP 5: Check for inning change
if state.outs >= 3:
await self._advance_inning(state, game_id)
# Update DB again after inning change
await self.db_ops.update_game_state(
game_id=state.game_id,
inning=state.inning,
half=state.half,
home_score=state.home_score,
away_score=state.away_score,
status=state.status,
session=session
)
# Commit entire transaction
await session.commit()
logger.debug("Committed manual play transaction successfully")
except Exception as e:
await session.rollback()
logger.error(f"Manual play transaction failed, rolled back: {e}")
raise
# Batch save rolls at half-inning boundary (separate transaction - audit data)
if state.outs >= 3:
await self._batch_save_inning_rolls(game_id)
# STEP 6: Prepare next play or clean up if game completed
if state.status == "active":
await self._prepare_next_play(state)
elif state.status == "completed":
# Clean up per-game resources to prevent memory leaks
self._cleanup_game_resources(game_id)
# Clear decisions for next play
state.decisions_this_play = {}
state.pending_decision = "defensive"
# Update in-memory state
state_manager.update_state(game_id, state)
logger.info(
f"Resolved manual play {state.play_count} for game {game_id}: "
f"{result.description}" + (f" (hit to {hit_location})" if hit_location else "")
)
return result
def _apply_play_result(self, state: GameState, result: PlayResult) -> None:
"""
Apply play result to in-memory game state.
Only updates state - NO database writes (handled by orchestration layer).
"""
# Update outs
state.outs += result.outs_recorded
# Build advancement lookup
advancement_map = {from_base: to_base for from_base, to_base in result.runners_advanced}
# Create temporary storage for new runner positions
new_first = None
new_second = None
new_third = None
# Process existing runners
for base, runner in state.get_all_runners():
if base in advancement_map:
to_base = advancement_map[base]
if to_base < 4: # Not scored
if to_base == 1:
new_first = runner
elif to_base == 2:
new_second = runner
elif to_base == 3:
new_third = runner
# If to_base == 4, runner scored (don't add to new positions)
else:
# Runner stays put
if base == 1:
new_first = runner
elif base == 2:
new_second = runner
elif base == 3:
new_third = runner
# Add batter if reached base
if result.batter_result and result.batter_result < 4:
# GameState now has the full batter object (set by _prepare_next_play)
batter = state.current_batter
if result.batter_result == 1:
new_first = batter
elif result.batter_result == 2:
new_second = batter
elif result.batter_result == 3:
new_third = batter
# Update state with new runner positions
state.on_first = new_first
state.on_second = new_second
state.on_third = new_third
# Update score
if state.half == "top":
state.away_score += result.runs_scored
else:
state.home_score += result.runs_scored
# Increment play count
state.play_count += 1
state.last_play_result = result.description
runner_count = len([r for r in [state.on_first, state.on_second, state.on_third] if r])
logger.debug(
f"Applied play result: outs={state.outs}, "
f"score={state.away_score}-{state.home_score}, "
f"runners={runner_count}"
)
async def _advance_inning(self, state: GameState, game_id: UUID) -> None:
"""
Advance to next half inning.
Only handles inning transition - NO database writes, NO prepare_next_play.
Those are handled by the orchestration layer.
Validates defensive team lineup positions at start of each half inning.
"""
if state.half == "top":
state.half = "bottom"
else:
state.half = "top"
state.inning += 1
# Clear bases and reset outs
state.outs = 0
state.clear_bases()
# Validate defensive team lineup positions
# Top of inning: home team is defending
# Bottom of inning: away team is defending
defensive_team = state.home_team_id if state.half == "top" else state.away_team_id
defensive_lineup = await self.db_ops.get_active_lineup(state.game_id, defensive_team)
if not defensive_lineup:
raise ValidationError(f"No lineup found for defensive team {defensive_team}")
game_validator.validate_defensive_lineup_positions(defensive_lineup)
logger.info(f"Advanced to inning {state.inning} {state.half}")
# Check if game is over
if game_validator.is_game_over(state):
state.status = "completed"
logger.info(f"Game {state.game_id} completed - Final: Away {state.away_score}, Home {state.home_score}")
async def _prepare_next_play(self, state: GameState) -> None:
"""
Prepare snapshot for the next play.
This method:
1. Determines current batter based on batting order index
2. Advances the appropriate team's batter index (with wraparound)
3. Fetches active lineups from database
4. Sets snapshot fields: current_batter/pitcher/catcher_lineup_id
5. Calculates on_base_code from current runners
This snapshot is used when saving the Play record to DB.
"""
# Determine which team is batting
if state.half == "top":
# Away team batting
current_idx = state.away_team_batter_idx
state.away_team_batter_idx = (current_idx + 1) % 9
batting_team = state.away_team_id
fielding_team = state.home_team_id
else:
# Home team batting
current_idx = state.home_team_batter_idx
state.home_team_batter_idx = (current_idx + 1) % 9
batting_team = state.home_team_id
fielding_team = state.away_team_id
# Try to get lineups from cache first, only fetch from DB if not cached
from app.models.game_models import TeamLineupState, LineupPlayerState
batting_lineup_state = state_manager.get_lineup(state.game_id, batting_team)
fielding_lineup_state = state_manager.get_lineup(state.game_id, fielding_team)
# Fetch from database only if not in cache
if not batting_lineup_state:
batting_lineup_state = await lineup_service.load_team_lineup_with_player_data(
game_id=state.game_id,
team_id=batting_team,
league_id=state.league_id
)
if batting_lineup_state:
state_manager.set_lineup(state.game_id, batting_team, batting_lineup_state)
if not fielding_lineup_state:
fielding_lineup_state = await lineup_service.load_team_lineup_with_player_data(
game_id=state.game_id,
team_id=fielding_team,
league_id=state.league_id
)
if fielding_lineup_state:
state_manager.set_lineup(state.game_id, fielding_team, fielding_lineup_state)
# Set current player snapshot using cached lineup data
# Batter: use the batting order index to find the player
if batting_lineup_state and current_idx < len(batting_lineup_state.players):
# Get batting order sorted list
batting_order = sorted(
[p for p in batting_lineup_state.players if p.batting_order is not None],
key=lambda x: x.batting_order or 0
)
if current_idx < len(batting_order):
state.current_batter = batting_order[current_idx]
else:
# Create placeholder - this shouldn't happen in normal gameplay
state.current_batter = LineupPlayerState(
lineup_id=0,
card_id=0,
position="DH",
batting_order=None
)
logger.warning(f"Batter index {current_idx} out of range for batting order")
else:
# Create placeholder - this shouldn't happen in normal gameplay
state.current_batter = LineupPlayerState(
lineup_id=0,
card_id=0,
position="DH",
batting_order=None
)
logger.warning(f"No batting lineup found for team {batting_team}")
# Pitcher and catcher: find by position from cached lineup
if fielding_lineup_state:
state.current_pitcher = next((p for p in fielding_lineup_state.players if p.position == "P"), None)
state.current_catcher = next((p for p in fielding_lineup_state.players if p.position == "C"), None)
else:
state.current_pitcher = None
state.current_catcher = None
# Calculate on_base_code from current runners (bit field)
state.current_on_base_code = 0
if state.on_first:
state.current_on_base_code |= 1 # Bit 0: first base
if state.on_second:
state.current_on_base_code |= 2 # Bit 1: second base
if state.on_third:
state.current_on_base_code |= 4 # Bit 2: third base
logger.debug(
f"Prepared next play: batter={state.current_batter.lineup_id}, "
f"pitcher={state.current_pitcher.lineup_id if state.current_pitcher else None}, "
f"catcher={state.current_catcher.lineup_id if state.current_catcher else None}, "
f"on_base_code={state.current_on_base_code}"
)
async def _batch_save_inning_rolls(self, game_id: UUID) -> None:
"""
Batch save all rolls from the inning
This is called at end of each half-inning to persist
all dice rolls with their context to the database.
"""
if game_id not in self._rolls_this_inning:
logger.debug(f"No rolls to save for game {game_id}")
return
rolls = self._rolls_this_inning[game_id]
if not rolls:
logger.debug(f"Empty roll list for game {game_id}")
return
try:
await self.db_ops.save_rolls_batch(rolls)
logger.info(f"Batch saved {len(rolls)} rolls for game {game_id}")
# Clear rolls for this inning
self._rolls_this_inning[game_id] = []
except Exception as e:
logger.error(f"Failed to batch save rolls for game {game_id}: {e}")
# Re-raise to notify caller - audit data loss is critical
# Rolls are still in _rolls_this_inning for retry on next inning boundary
raise
async def _save_play_to_db(
self,
state: GameState,
result: PlayResult,
session: Optional[AsyncSession] = None
) -> None:
"""
Save play to database using snapshot from GameState.
Uses the pre-calculated snapshot fields (no database lookbacks).
Args:
state: Current game state
result: Play result to save
session: Optional external session for transaction grouping
Raises:
ValueError: If required player IDs are missing
"""
# Use snapshot from GameState (set by _prepare_next_play)
# Extract IDs from objects for database persistence
batter_id = state.current_batter.lineup_id
pitcher_id = state.current_pitcher.lineup_id if state.current_pitcher else None
catcher_id = state.current_catcher.lineup_id if state.current_catcher else None
on_base_code = state.current_on_base_code
# VERIFY required fields are present
if batter_id is None:
raise ValueError(
f"Cannot save play: batter_id is None. "
f"Game {state.game_id} may need _prepare_next_play() called after recovery."
)
if pitcher_id is None:
raise ValueError(
f"Cannot save play: pitcher_id is None. "
f"Game {state.game_id} may need _prepare_next_play() called after recovery."
)
if catcher_id is None:
raise ValueError(
f"Cannot save play: catcher_id is None. "
f"Game {state.game_id} may need _prepare_next_play() called after recovery."
)
# Runners on base BEFORE play (from state.on_first/second/third)
on_first_id = state.on_first.lineup_id if state.on_first else None
on_second_id = state.on_second.lineup_id if state.on_second else None
on_third_id = state.on_third.lineup_id if state.on_third else None
# Runners AFTER play (from result.runners_advanced)
# Build dict of from_base -> to_base for quick lookup
finals = {from_base: to_base for from_base, to_base in result.runners_advanced}
on_first_final = finals.get(1) # None if out/scored, 1-4 if advanced
on_second_final = finals.get(2) # None if out/scored, 1-4 if advanced
on_third_final = finals.get(3) # None if out/scored, 1-4 if advanced
# Batter result (None=out, 1-4=base reached)
batter_final = result.batter_result
play_data = {
"game_id": state.game_id,
"play_number": state.play_count,
"inning": state.inning,
"half": state.half,
"outs_before": state.outs, # Capture current outs BEFORE applying result
"outs_recorded": result.outs_recorded,
# Player IDs from snapshot
"batter_id": batter_id,
"pitcher_id": pitcher_id,
"catcher_id": catcher_id,
# Base situation snapshot
"on_base_code": on_base_code,
"on_first_id": on_first_id,
"on_second_id": on_second_id,
"on_third_id": on_third_id,
# Final positions
"on_first_final": on_first_final,
"on_second_final": on_second_final,
"on_third_final": on_third_final,
"batter_final": batter_final,
# Play outcome
"dice_roll": str(result.ab_roll),
"hit_type": result.outcome.value,
"result_description": result.description,
"runs_scored": result.runs_scored,
"away_score": state.away_score,
"home_score": state.home_score,
"complete": True,
# Strategic decisions
"defensive_choices": state.decisions_this_play.get('defensive', {}),
"offensive_choices": state.decisions_this_play.get('offensive', {})
}
# Add metadata for uncapped hits (Phase 3: will include runner advancement decisions)
play_metadata = {}
if result.outcome in [PlayOutcome.SINGLE_UNCAPPED, PlayOutcome.DOUBLE_UNCAPPED]:
play_metadata["uncapped"] = True
play_metadata["outcome_type"] = result.outcome.value
play_data["play_metadata"] = play_metadata
# Calculate statistical fields (Phase 3.5: Materialized Views)
# Create state_after by cloning state and applying result
state_after = state.model_copy(deep=True)
state_after.outs += result.outs_recorded
if state.half == 'top':
state_after.away_score += result.runs_scored
else:
state_after.home_score += result.runs_scored
# Calculate stats using PlayStatCalculator
stats = PlayStatCalculator.calculate_stats(
outcome=result.outcome,
result=result,
state_before=state,
state_after=state_after
)
# Add stat fields to play_data
play_data.update(stats)
await self.db_ops.save_play(play_data, session=session)
logger.debug(f"Saved play {state.play_count}: batter={batter_id}, on_base={on_base_code}")
async def get_game_state(self, game_id: UUID) -> Optional[GameState]:
"""Get current game state"""
return state_manager.get_state(game_id)
async def rollback_plays(self, game_id: UUID, num_plays: int) -> GameState:
"""
Roll back the last N plays.
Deletes plays from the database and reconstructs game state by replaying
remaining plays. Also removes any substitutions that occurred during the
rolled-back plays.
Args:
game_id: Game to roll back
num_plays: Number of plays to roll back (must be > 0)
Returns:
Updated GameState after rollback
Raises:
ValueError: If num_plays invalid, game not found, or game completed
"""
# 1. Validate
state = state_manager.get_state(game_id)
if not state:
raise ValueError(f"Game {game_id} not found")
if num_plays <= 0:
raise ValueError("num_plays must be greater than 0")
if state.play_count < num_plays:
raise ValueError(
f"Cannot roll back {num_plays} plays (only {state.play_count} exist)"
)
if state.status == "completed":
raise ValueError("Cannot roll back a completed game")
# 2. Calculate target play number
target_play = state.play_count - num_plays
logger.info(
f"Rolling back {num_plays} plays for game {game_id} "
f"(from play {state.play_count} to play {target_play})"
)
# 3. Delete plays from database
deleted_plays = await self.db_ops.delete_plays_after(game_id, target_play)
logger.info(f"Deleted {deleted_plays} plays")
# 4. Delete substitutions that occurred after target play
deleted_subs = await self.db_ops.delete_substitutions_after(game_id, target_play)
logger.info(f"Deleted {deleted_subs} substitutions")
# Note: We don't delete dice rolls from the rolls table - they're kept for auditing
# and don't affect game state reconstruction
# 5. Clear in-memory roll tracking for this game
if game_id in self._rolls_this_inning:
del self._rolls_this_inning[game_id]
# 6. Recover game state by replaying remaining plays
logger.info(f"Recovering game state for {game_id}")
new_state = await state_manager.recover_game(game_id)
logger.info(
f"Rollback complete - now at play {new_state.play_count}, "
f"inning {new_state.inning} {new_state.half}"
)
return new_state
async def end_game(self, game_id: UUID) -> GameState:
"""
Manually end a game
For forfeit, abandonment, etc.
"""
state = state_manager.get_state(game_id)
if not state:
raise ValueError(f"Game {game_id} not found")
# Batch save any remaining rolls
await self._batch_save_inning_rolls(game_id)
state.status = "completed"
state_manager.update_state(game_id, state)
await self.db_ops.update_game_state(
game_id=game_id,
inning=state.inning,
half=state.half,
home_score=state.home_score,
away_score=state.away_score,
status="completed"
)
# Clean up per-game resources to prevent memory leaks
self._cleanup_game_resources(game_id)
logger.info(f"Game {game_id} ended manually")
return state
def _cleanup_game_resources(self, game_id: UUID) -> None:
"""
Clean up per-game resources when a game completes.
Prevents memory leaks from unbounded dictionary growth.
"""
# Clean up rolls tracking
if game_id in self._rolls_this_inning:
del self._rolls_this_inning[game_id]
# Clean up game locks
if game_id in self._game_locks:
del self._game_locks[game_id]
logger.debug(f"Cleaned up resources for game {game_id}")
# Singleton instance
game_engine = GameEngine()