strat-gameplay-webapp/backend/app/core/game_engine.py
Cal Corum 2b8fea36a8 CLAUDE: Redesign dice display with team colors and consolidate player cards
Backend:
- Add home_team_dice_color and away_team_dice_color to GameState model
- Extract dice_color from game metadata in StateManager (default: cc0000)
- Add runners_on_base param to roll_ab for chaos check skipping

Frontend - Dice Display:
- Create DiceShapes.vue with SVG d6 (square) and d20 (hexagon) shapes
- Apply home team's dice_color to d6 dice, white for resolution d20
- Show chaos d20 in amber only when WP/PB check triggered
- Add automatic text contrast based on color luminance
- Reduce blank space and remove info bubble from dice results

Frontend - Player Cards:
- Consolidate pitcher/batter cards to single location below diamond
- Add active card highlighting based on dice roll (d6_one: 1-3=batter, 4-6=pitcher)
- New card header format: [Team] Position [Name] with full card image
- Remove redundant card displays from GameBoard and GameplayPanel
- Enlarge PlayerCardModal on desktop (max-w-3xl at 1024px+)

Tests:
- Add DiceShapes.spec.ts with 34 tests for color calculations and rendering
- Update DiceRoller.spec.ts for new DiceShapes integration
- Fix test_roll_dice_success for new runners_on_base parameter

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-24 00:16:32 -06:00

1316 lines
50 KiB
Python

"""
Game Engine - Main game orchestration engine.
Coordinates game flow, validates actions, resolves plays, and persists state.
Integrates DiceSystem for roll tracking with context and batch saving.
Phase 3: Enhanced with async decision workflow and AI opponent integration.
Author: Claude
Date: 2025-10-24
"""
import asyncio
import logging
from uuid import UUID
import pendulum
from sqlalchemy.exc import IntegrityError, OperationalError, SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import PlayOutcome, get_league_config
from app.core.exceptions import DatabaseError, GameNotFoundError, PlayerDataError
from app.core.ai_opponent import ai_opponent
from app.core.dice import dice_system
from app.core.play_resolver import PlayResolver, PlayResult
from app.core.state_manager import state_manager
from app.core.validators import ValidationError, game_validator
from app.database.operations import DatabaseOperations
from app.database.session import AsyncSessionLocal
from app.models.game_models import DefensiveDecision, GameState, OffensiveDecision
from app.services import PlayStatCalculator
from app.services.lineup_service import lineup_service
from app.services.position_rating_service import position_rating_service
logger = logging.getLogger(f"{__name__}.GameEngine")
class GameEngine:
"""Main game orchestration engine"""
# Phase 3: Decision timeout in seconds
DECISION_TIMEOUT = 30
def __init__(self):
self.db_ops = DatabaseOperations()
# Track rolls per inning for batch saving
self._rolls_this_inning: dict[UUID, list] = {}
# WebSocket connection manager for real-time events (set by main.py)
self._connection_manager = None
def set_connection_manager(self, connection_manager):
"""
Set the WebSocket connection manager for real-time event emission.
Called by main.py after both singletons are initialized.
"""
self._connection_manager = connection_manager
logger.info("WebSocket connection manager configured for game engine")
async def _emit_decision_required(
self, game_id: UUID, state: GameState, phase: str, timeout_seconds: int = 300
):
"""
Emit decision_required event to notify frontend a decision is needed.
Args:
game_id: Game identifier
state: Current game state
phase: Decision phase ('awaiting_defensive' or 'awaiting_offensive')
timeout_seconds: Decision timeout in seconds (default 5 minutes)
"""
if not self._connection_manager:
logger.warning("No connection manager - cannot emit decision_required event")
return
# Determine which team needs to decide
if phase == "awaiting_defensive":
# Fielding team = home if top, away if bottom
role = "home" if state.half == "top" else "away"
elif phase == "awaiting_offensive":
# Batting team = away if top, home if bottom
role = "away" if state.half == "top" else "home"
else:
logger.warning(f"Unknown decision phase for emission: {phase}")
return
try:
await self._connection_manager.broadcast_to_game(
str(game_id),
"decision_required",
{
"phase": phase,
"role": role,
"timeout_seconds": timeout_seconds,
"message": f"{role.title()} team: {phase.replace('_', ' ').title()} decision required"
}
)
logger.info(f"Emitted decision_required for game {game_id}: phase={phase}, role={role}")
except (ConnectionError, OSError) as e:
# Network/socket errors - connection manager may be unavailable
logger.warning(f"Network error emitting decision_required: {e}")
except AttributeError as e:
# Connection manager not properly initialized
logger.error(f"Connection manager not ready: {e}")
async def _load_position_ratings_for_lineup(
self, game_id: UUID, team_id: int, league_id: str
) -> None:
"""
Load position ratings for all players in a team's lineup.
Only loads for PD league games. Sets position_rating field on each
LineupPlayerState object in the StateManager's lineup cache.
Args:
game_id: Game identifier
team_id: Team identifier
league_id: League identifier ('sba' or 'pd')
Phase 3E-Main: Loads ratings at game start for X-Check resolution
"""
# Check if league supports ratings
league_config = get_league_config(league_id)
if not league_config.supports_position_ratings():
logger.debug(
f"League {league_id} doesn't support position ratings, skipping"
)
return
# Get lineup from cache
lineup = state_manager.get_lineup(game_id, team_id)
if not lineup:
logger.warning(f"No lineup found for team {team_id} in game {game_id}")
return
logger.info(
f"Loading position ratings for team {team_id} lineup ({len(lineup.players)} players)"
)
# Load ratings for each player
loaded_count = 0
for player in lineup.players:
try:
# Get rating for this player's position
rating = await position_rating_service.get_rating_for_position(
card_id=player.card_id,
position=player.position,
league_id=league_id,
)
if rating:
player.position_rating = rating
loaded_count += 1
logger.debug(
f"Loaded rating for card {player.card_id} at {player.position}: "
f"range={rating.range}, error={rating.error}"
)
else:
logger.warning(
f"No rating found for card {player.card_id} at {player.position}"
)
except (KeyError, ValueError) as e:
# Missing or invalid rating data - player may not have rating for this position
logger.warning(
f"Invalid rating data for card {player.card_id} at {player.position}: {e}"
)
except (ConnectionError, TimeoutError) as e:
# Network error fetching from external API - continue with other players
logger.error(
f"Network error loading rating for card {player.card_id} at {player.position}: {e}"
)
logger.info(
f"Loaded {loaded_count}/{len(lineup.players)} position ratings for team {team_id}"
)
async def start_game(self, game_id: UUID) -> GameState:
"""
Start a game
Transitions from 'pending' to 'active'.
Validates that both teams have complete lineups (minimum 9 players each).
Prepares the first play snapshot.
Raises:
ValidationError: If game already started or lineups incomplete
"""
state = state_manager.get_state(game_id)
if not state:
raise ValueError(f"Game {game_id} not found in state manager")
if state.status != "pending":
raise ValidationError(f"Game already started (status: {state.status})")
# HARD REQUIREMENT: Validate both lineups are complete
# At game start, we validate BOTH teams (exception to the "defensive only" rule)
home_lineup = await self.db_ops.get_active_lineup(
state.game_id, state.home_team_id
)
away_lineup = await self.db_ops.get_active_lineup(
state.game_id, state.away_team_id
)
# Check minimum 9 players per team
if not home_lineup or len(home_lineup) < 9:
raise ValidationError(
f"Home team lineup incomplete: {len(home_lineup) if home_lineup else 0} players "
f"(minimum 9 required)"
)
if not away_lineup or len(away_lineup) < 9:
raise ValidationError(
f"Away team lineup incomplete: {len(away_lineup) if away_lineup else 0} players "
f"(minimum 9 required)"
)
# Validate defensive positions - at game start, check BOTH teams
try:
game_validator.validate_defensive_lineup_positions(home_lineup)
except ValidationError as e:
raise ValidationError(f"Home team: {e}")
try:
game_validator.validate_defensive_lineup_positions(away_lineup)
except ValidationError as e:
raise ValidationError(f"Away team: {e}")
# Phase 3E-Main: Load position ratings for both teams (PD league only)
await self._load_position_ratings_for_lineup(
game_id=game_id, team_id=state.home_team_id, league_id=state.league_id
)
await self._load_position_ratings_for_lineup(
game_id=game_id, team_id=state.away_team_id, league_id=state.league_id
)
# Mark as active
state.status = "active"
state.inning = 1
state.half = "top"
state.outs = 0
# Initialize roll tracking for this game
self._rolls_this_inning[game_id] = []
# Prepare first play snapshot
await self._prepare_next_play(state)
# Set initial decision phase to awaiting defensive decision
# This allows the frontend to immediately show the defensive setup panel
state.decision_phase = "awaiting_defensive"
# Update state
state_manager.update_state(game_id, state)
# Broadcast game_state_update so all connected clients get the new active state
if self._connection_manager:
await self._connection_manager.broadcast_to_game(
str(game_id),
"game_state_update",
state.model_dump(mode='json')
)
# Emit decision_required event for real-time frontend notification
await self._emit_decision_required(game_id, state, "awaiting_defensive", timeout_seconds=self.DECISION_TIMEOUT)
# Persist to DB
await self.db_ops.update_game_state(
game_id=game_id,
inning=1,
half="top",
home_score=0,
away_score=0,
status="active",
)
logger.info(
f"Started game {game_id} - First batter: lineup_id={state.current_batter.lineup_id}, "
f"decision_phase={state.decision_phase}"
)
return state
async def submit_defensive_decision(
self, game_id: UUID, decision: DefensiveDecision
) -> GameState:
"""
Submit defensive team decision.
Phase 3: Now integrates with decision queue to resolve pending futures.
Uses per-game lock to prevent race conditions with concurrent submissions.
"""
async with state_manager.game_lock(game_id):
state = state_manager.get_state(game_id)
if not state:
raise ValueError(f"Game {game_id} not found")
game_validator.validate_game_active(state)
game_validator.validate_defensive_decision(decision, state)
# Store decision in state (for backward compatibility)
state.decisions_this_play["defensive"] = decision.model_dump()
state.pending_decision = "offensive"
state.decision_phase = "awaiting_offensive"
state.pending_defensive_decision = decision
# Phase 3: Resolve pending future if exists
fielding_team_id = state.get_fielding_team_id()
try:
state_manager.submit_decision(game_id, fielding_team_id, decision)
logger.info(
f"Resolved pending defensive decision future for game {game_id}"
)
except ValueError:
# No pending future - that's okay (direct submission without await)
logger.debug(f"No pending defensive decision for game {game_id}")
state_manager.update_state(game_id, state)
logger.info(f"Defensive decision submitted for game {game_id}")
# Emit decision_required for offensive phase
await self._emit_decision_required(
game_id, state, "awaiting_offensive", timeout_seconds=self.DECISION_TIMEOUT
)
return state
async def submit_offensive_decision(
self, game_id: UUID, decision: OffensiveDecision
) -> GameState:
"""
Submit offensive team decision.
Phase 3: Now integrates with decision queue to resolve pending futures.
Uses per-game lock to prevent race conditions with concurrent submissions.
"""
async with state_manager.game_lock(game_id):
state = state_manager.get_state(game_id)
if not state:
raise ValueError(f"Game {game_id} not found")
game_validator.validate_game_active(state)
game_validator.validate_offensive_decision(decision, state)
# Store decision in state (for backward compatibility)
state.decisions_this_play["offensive"] = decision.model_dump()
state.pending_decision = "resolution"
state.decision_phase = "resolution"
state.pending_offensive_decision = decision
# Phase 3: Resolve pending future if exists
batting_team_id = state.get_batting_team_id()
try:
state_manager.submit_decision(game_id, batting_team_id, decision)
logger.info(
f"Resolved pending offensive decision future for game {game_id}"
)
except ValueError:
# No pending future - that's okay (direct submission without await)
logger.debug(f"No pending offensive decision for game {game_id}")
state_manager.update_state(game_id, state)
logger.info(f"Offensive decision submitted for game {game_id}")
return state
# ============================================================================
# PHASE 3: ENHANCED DECISION WORKFLOW
# ============================================================================
async def await_defensive_decision(
self, state: GameState, timeout: int = None
) -> DefensiveDecision:
"""
Wait for defensive team to submit decision.
For AI teams: Generate decision immediately
For human teams: Wait for WebSocket submission (with timeout)
Args:
state: Current game state
timeout: Seconds to wait before using default decision (default: class constant)
Returns:
DefensiveDecision (validated)
Raises:
asyncio.TimeoutError: If timeout exceeded (async games only)
"""
if timeout is None:
timeout = self.DECISION_TIMEOUT
fielding_team_id = state.get_fielding_team_id()
# Check if fielding team is AI
if state.is_fielding_team_ai():
logger.info(f"Generating AI defensive decision for game {state.game_id}")
return await ai_opponent.generate_defensive_decision(state)
# Human team: wait for decision via WebSocket
logger.info(
f"Awaiting human defensive decision for game {state.game_id}, team {fielding_team_id}"
)
# Set pending decision in state manager
state_manager.set_pending_decision(
game_id=state.game_id, team_id=fielding_team_id, decision_type="defensive"
)
# Update state with decision phase
state.decision_phase = "awaiting_defensive"
state.decision_deadline = (
pendulum.now("UTC").add(seconds=timeout).to_iso8601_string()
)
state_manager.update_state(state.game_id, state)
# Emit decision_required event for real-time frontend notification
await self._emit_decision_required(state.game_id, state, "awaiting_defensive", timeout_seconds=timeout)
try:
# Wait for decision with timeout
decision = await asyncio.wait_for(
state_manager.await_decision(
state.game_id, fielding_team_id, "defensive"
),
timeout=timeout,
)
logger.info(f"Received defensive decision for game {state.game_id}")
return decision
except TimeoutError:
# Use default decision on timeout
logger.warning(
f"Defensive decision timeout for game {state.game_id}, using default"
)
return DefensiveDecision() # All defaults
async def await_offensive_decision(
self, state: GameState, timeout: int = None
) -> OffensiveDecision:
"""
Wait for offensive team to submit decision.
Similar to await_defensive_decision but for batting team.
Args:
state: Current game state
timeout: Seconds to wait before using default decision
Returns:
OffensiveDecision (validated)
Raises:
asyncio.TimeoutError: If timeout exceeded (async games only)
"""
if timeout is None:
timeout = self.DECISION_TIMEOUT
batting_team_id = state.get_batting_team_id()
# Check if batting team is AI
if state.is_batting_team_ai():
logger.info(f"Generating AI offensive decision for game {state.game_id}")
return await ai_opponent.generate_offensive_decision(state)
# Human team: wait for decision via WebSocket
logger.info(
f"Awaiting human offensive decision for game {state.game_id}, team {batting_team_id}"
)
# Set pending decision in state manager
state_manager.set_pending_decision(
game_id=state.game_id, team_id=batting_team_id, decision_type="offensive"
)
# Update state with decision phase
state.decision_phase = "awaiting_offensive"
state.decision_deadline = (
pendulum.now("UTC").add(seconds=timeout).to_iso8601_string()
)
state_manager.update_state(state.game_id, state)
# Emit decision_required event for real-time frontend notification
await self._emit_decision_required(state.game_id, state, "awaiting_offensive", timeout_seconds=timeout)
try:
# Wait for decision with timeout
decision = await asyncio.wait_for(
state_manager.await_decision(
state.game_id, batting_team_id, "offensive"
),
timeout=timeout,
)
logger.info(f"Received offensive decision for game {state.game_id}")
return decision
except TimeoutError:
# Use default decision on timeout
logger.warning(
f"Offensive decision timeout for game {state.game_id}, using default"
)
return OffensiveDecision() # All defaults
async def _finalize_play(
self, state: GameState, result: PlayResult, ab_roll, log_suffix: str = ""
) -> None:
"""
Common finalization logic for both resolve_play and resolve_manual_play.
Handles:
- Roll tracking for batch saving
- State capture and result application
- Transaction with DB operations (save play, update state, advance inning)
- Batch save rolls at inning boundary
- Prepare next play or cleanup on completion
- Clear decisions and update state
Args:
state: Current game state (modified in place)
result: Play result to apply
ab_roll: The dice roll to track
log_suffix: Optional suffix for log message (e.g., " (hit to SS)")
"""
game_id = state.game_id
# Track roll for batch saving at end of inning
if game_id not in self._rolls_this_inning:
self._rolls_this_inning[game_id] = []
self._rolls_this_inning[game_id].append(ab_roll)
# Capture state before applying result
outs_before = state.outs # Capture BEFORE _apply_play_result modifies it
# Capture runners BEFORE _apply_play_result modifies them
runners_before = {
"on_first_id": state.on_first.lineup_id if state.on_first else None,
"on_second_id": state.on_second.lineup_id if state.on_second else None,
"on_third_id": state.on_third.lineup_id if state.on_third else None,
}
state_before = {
"inning": state.inning,
"half": state.half,
"home_score": state.home_score,
"away_score": state.away_score,
"status": state.status,
}
# Apply result to state (outs, score, runners) - before transaction
self._apply_play_result(state, result)
# Database operations in single transaction
async with AsyncSessionLocal() as session:
try:
# Create session-injected db_ops for this transaction
db_ops_tx = DatabaseOperations(session)
# Save play to DB (uses snapshot from GameState)
await self._save_play_to_db(
state,
result,
outs_before=outs_before,
runners_before=runners_before,
db_ops=db_ops_tx,
)
# Update game state in DB only if something changed
if (
state.inning != state_before["inning"]
or state.half != state_before["half"]
or state.home_score != state_before["home_score"]
or state.away_score != state_before["away_score"]
or state.status != state_before["status"]
):
await db_ops_tx.update_game_state(
game_id=state.game_id,
inning=state.inning,
half=state.half,
home_score=state.home_score,
away_score=state.away_score,
status=state.status,
)
logger.info(
"Updated game state in DB - score/inning/status changed"
)
else:
logger.debug("Skipped game state update - no changes to persist")
# Check for inning change
if state.outs >= state.outs_per_inning:
await self._advance_inning(state, game_id)
# Update DB again after inning change
await db_ops_tx.update_game_state(
game_id=state.game_id,
inning=state.inning,
half=state.half,
home_score=state.home_score,
away_score=state.away_score,
status=state.status,
)
# Commit entire transaction
await session.commit()
logger.debug("Committed play transaction successfully")
except IntegrityError as e:
await session.rollback()
logger.error(f"Data integrity error during play save: {e}")
raise DatabaseError("save_play", e)
except OperationalError as e:
await session.rollback()
logger.error(f"Database connection error during play save: {e}")
raise DatabaseError("save_play", e)
except SQLAlchemyError as e:
await session.rollback()
logger.error(f"Database error during play save: {e}")
raise DatabaseError("save_play", e)
# Batch save rolls at half-inning boundary (separate transaction - audit data)
if state.outs >= state.outs_per_inning:
await self._batch_save_inning_rolls(game_id)
# Prepare next play or clean up if game completed
if state.status == "active":
await self._prepare_next_play(state)
# Reset decision phase for next play
state.decision_phase = "awaiting_defensive"
elif state.status == "completed":
# Clean up per-game resources to prevent memory leaks
self._cleanup_game_resources(game_id)
# Clear decisions for next play
state.decisions_this_play = {}
state.pending_decision = "defensive"
# Update in-memory state
state_manager.update_state(game_id, state)
logger.info(
f"Resolved play {state.play_count} for game {game_id}: {result.description}{log_suffix}"
)
async def resolve_play(
self,
game_id: UUID,
forced_outcome: PlayOutcome | None = None,
xcheck_position: str | None = None,
xcheck_result: str | None = None,
xcheck_error: str | None = None,
) -> PlayResult:
"""
Resolve the current play with dice roll (testing/forced outcome method).
Args:
game_id: Game to resolve
forced_outcome: If provided, use this outcome instead of rolling dice (for testing)
xcheck_position: For X_CHECK outcomes, the position to check (SS, LF, etc.)
xcheck_result: For X_CHECK, force the converted result (G1, G2, SI2, DO2, etc.)
xcheck_error: For X_CHECK, force the error result (NO, E1, E2, E3, RP)
Returns:
PlayResult with complete outcome
"""
state = state_manager.get_state(game_id)
if not state:
raise ValueError(f"Game {game_id} not found")
game_validator.validate_game_active(state)
# Get decisions
defensive_decision = DefensiveDecision(
**state.decisions_this_play.get("defensive", {})
)
offensive_decision = OffensiveDecision(
**state.decisions_this_play.get("offensive", {})
)
# Create resolver for this game's league and mode
resolver = PlayResolver(
league_id=state.league_id,
auto_mode=state.auto_mode,
state_manager=state_manager,
)
# Check if there are runners on base (affects chaos check)
runners_on_base = bool(state.on_first or state.on_second or state.on_third)
# Roll dice
ab_roll = dice_system.roll_ab(
league_id=state.league_id,
game_id=game_id,
runners_on_base=runners_on_base,
)
# Use forced outcome if provided (for testing), otherwise need to implement chart lookup
if forced_outcome is None:
raise NotImplementedError(
"This method only supports forced_outcome for testing. "
"Use resolve_manual_play() for manual mode or resolve_auto_play() for auto mode."
)
# For X_CHECK, use xcheck_position as the hit_location parameter
hit_location = (
xcheck_position if forced_outcome == PlayOutcome.X_CHECK else None
)
result = resolver.resolve_outcome(
outcome=forced_outcome,
hit_location=hit_location, # For X_CHECK, this is the position being checked
state=state,
defensive_decision=defensive_decision,
offensive_decision=offensive_decision,
ab_roll=ab_roll,
forced_xcheck_result=xcheck_result,
forced_xcheck_error=xcheck_error,
)
# Finalize the play (common logic)
await self._finalize_play(state, result, result.ab_roll)
return result
async def resolve_manual_play(
self,
game_id: UUID,
ab_roll: "AbRoll",
outcome: PlayOutcome,
hit_location: str | None = None,
) -> PlayResult:
"""
Resolve play with manually-submitted outcome (manual mode).
In manual mode (SBA + PD manual):
1. Server rolls dice for fairness/auditing
2. Players read their physical cards based on those dice
3. Players submit the outcome they see
4. Server validates and processes with the provided outcome
Args:
game_id: Game to resolve
ab_roll: The dice roll (server-rolled for fairness)
outcome: PlayOutcome enum (from player's physical card)
hit_location: Optional hit location for groundballs/flyouts
Returns:
PlayResult with complete outcome
Raises:
ValidationError: If game not active or hit location missing when required
"""
state = state_manager.get_state(game_id)
if not state:
raise ValueError(f"Game {game_id} not found")
game_validator.validate_game_active(state)
# NOTE: Business rule validation (e.g., when hit_location is required based on
# game state) is handled in PlayResolver, not here. The transport layer should
# not make business logic decisions about contextual requirements.
# Get decisions
defensive_decision = DefensiveDecision(
**state.decisions_this_play.get("defensive", {})
)
offensive_decision = OffensiveDecision(
**state.decisions_this_play.get("offensive", {})
)
# Create resolver for this game's league and mode
resolver = PlayResolver(
league_id=state.league_id,
auto_mode=state.auto_mode,
state_manager=state_manager,
)
# Call core resolution with manual outcome
result = resolver.resolve_outcome(
outcome=outcome,
hit_location=hit_location,
state=state,
defensive_decision=defensive_decision,
offensive_decision=offensive_decision,
ab_roll=ab_roll,
)
# Finalize the play (common logic)
log_suffix = f" (hit to {hit_location})" if hit_location else ""
await self._finalize_play(state, result, ab_roll, log_suffix)
return result
def _apply_play_result(self, state: GameState, result: PlayResult) -> None:
"""
Apply play result to in-memory game state.
Only updates state - NO database writes (handled by orchestration layer).
"""
# Update outs
state.outs += result.outs_recorded
# Build advancement lookup
advancement_map = {
adv.from_base: adv.to_base for adv in result.runners_advanced
}
# Create temporary storage for new runner positions
new_first = None
new_second = None
new_third = None
# Process existing runners
for base, runner in state.get_all_runners():
if base in advancement_map:
to_base = advancement_map[base]
if to_base < 4: # Not scored
if to_base == 1:
new_first = runner
elif to_base == 2:
new_second = runner
elif to_base == 3:
new_third = runner
# If to_base == 4, runner scored (don't add to new positions)
else:
# Runner stays put
if base == 1:
new_first = runner
elif base == 2:
new_second = runner
elif base == 3:
new_third = runner
# Add batter if reached base
if result.batter_result and result.batter_result < 4:
# GameState now has the full batter object (set by _prepare_next_play)
batter = state.current_batter
if result.batter_result == 1:
new_first = batter
elif result.batter_result == 2:
new_second = batter
elif result.batter_result == 3:
new_third = batter
# Update state with new runner positions
state.on_first = new_first
state.on_second = new_second
state.on_third = new_third
# Update score
if state.half == "top":
state.away_score += result.runs_scored
else:
state.home_score += result.runs_scored
# Increment play count
state.play_count += 1
state.last_play_result = result.description
runner_count = len(
[r for r in [state.on_first, state.on_second, state.on_third] if r]
)
logger.debug(
f"Applied play result: outs={state.outs}, "
f"score={state.away_score}-{state.home_score}, "
f"runners={runner_count}"
)
async def _advance_inning(self, state: GameState, game_id: UUID) -> None:
"""
Advance to next half inning.
Only handles inning transition - NO database writes, NO prepare_next_play.
Those are handled by the orchestration layer.
Validates defensive team lineup positions at start of each half inning.
"""
if state.half == "top":
state.half = "bottom"
else:
state.half = "top"
state.inning += 1
# Clear bases and reset outs
state.outs = 0
state.clear_bases()
# Validate defensive team lineup positions
# Top of inning: home team is defending
# Bottom of inning: away team is defending
defensive_team = (
state.home_team_id if state.half == "top" else state.away_team_id
)
defensive_lineup = await self.db_ops.get_active_lineup(
state.game_id, defensive_team
)
if not defensive_lineup:
raise ValidationError(
f"No lineup found for defensive team {defensive_team}"
)
game_validator.validate_defensive_lineup_positions(defensive_lineup)
logger.info(f"Advanced to inning {state.inning} {state.half}")
# Check if game is over
if game_validator.is_game_over(state):
state.status = "completed"
logger.info(
f"Game {state.game_id} completed - Final: Away {state.away_score}, Home {state.home_score}"
)
async def _prepare_next_play(self, state: GameState) -> None:
"""
Prepare snapshot for the next play.
This method:
1. Determines current batter based on batting order index
2. Advances the appropriate team's batter index (with wraparound)
3. Fetches active lineups from database
4. Sets snapshot fields: current_batter/pitcher/catcher_lineup_id
5. Calculates on_base_code from current runners
This snapshot is used when saving the Play record to DB.
"""
# Determine which team is batting
if state.half == "top":
# Away team batting
current_idx = state.away_team_batter_idx
state.away_team_batter_idx = (current_idx + 1) % 9
batting_team = state.away_team_id
fielding_team = state.home_team_id
logger.debug(f"_prepare_next_play: AWAY team batting, idx {current_idx}{state.away_team_batter_idx}")
else:
# Home team batting
current_idx = state.home_team_batter_idx
state.home_team_batter_idx = (current_idx + 1) % 9
batting_team = state.home_team_id
fielding_team = state.away_team_id
logger.debug(f"_prepare_next_play: HOME team batting, idx {current_idx}{state.home_team_batter_idx}")
# Try to get lineups from cache first, only fetch from DB if not cached
from app.models.game_models import LineupPlayerState
batting_lineup_state = state_manager.get_lineup(state.game_id, batting_team)
fielding_lineup_state = state_manager.get_lineup(state.game_id, fielding_team)
# Fetch from database only if not in cache
if not batting_lineup_state:
batting_lineup_state = (
await lineup_service.load_team_lineup_with_player_data(
game_id=state.game_id,
team_id=batting_team,
league_id=state.league_id,
)
)
if batting_lineup_state:
state_manager.set_lineup(
state.game_id, batting_team, batting_lineup_state
)
if not fielding_lineup_state:
fielding_lineup_state = (
await lineup_service.load_team_lineup_with_player_data(
game_id=state.game_id,
team_id=fielding_team,
league_id=state.league_id,
)
)
if fielding_lineup_state:
state_manager.set_lineup(
state.game_id, fielding_team, fielding_lineup_state
)
# Set current player snapshot using cached lineup data
# Batter: use the batting order index to find the player
if batting_lineup_state and current_idx < len(batting_lineup_state.players):
# Get batting order sorted list
batting_order = sorted(
[
p
for p in batting_lineup_state.players
if p.batting_order is not None
],
key=lambda x: x.batting_order or 0,
)
if current_idx < len(batting_order):
state.current_batter = batting_order[current_idx]
else:
# Create placeholder - this shouldn't happen in normal gameplay
state.current_batter = LineupPlayerState(
lineup_id=0, card_id=0, position="DH", batting_order=None
)
logger.warning(
f"Batter index {current_idx} out of range for batting order"
)
else:
# Create placeholder - this shouldn't happen in normal gameplay
state.current_batter = LineupPlayerState(
lineup_id=0, card_id=0, position="DH", batting_order=None
)
logger.warning(f"No batting lineup found for team {batting_team}")
# Pitcher and catcher: find by position from cached lineup
if fielding_lineup_state:
state.current_pitcher = next(
(p for p in fielding_lineup_state.players if p.position == "P"), None
)
state.current_catcher = next(
(p for p in fielding_lineup_state.players if p.position == "C"), None
)
else:
state.current_pitcher = None
state.current_catcher = None
# Calculate on_base_code from current runners (bit field)
state.current_on_base_code = 0
if state.on_first:
state.current_on_base_code |= 1 # Bit 0: first base
if state.on_second:
state.current_on_base_code |= 2 # Bit 1: second base
if state.on_third:
state.current_on_base_code |= 4 # Bit 2: third base
logger.info(
f"Prepared next play: batter lineup_id={state.current_batter.lineup_id}, "
f"batting_order={state.current_batter.batting_order}, "
f"pitcher={state.current_pitcher.lineup_id if state.current_pitcher else None}"
)
async def _batch_save_inning_rolls(self, game_id: UUID) -> None:
"""
Batch save all rolls from the inning
This is called at end of each half-inning to persist
all dice rolls with their context to the database.
"""
if game_id not in self._rolls_this_inning:
logger.debug(f"No rolls to save for game {game_id}")
return
rolls = self._rolls_this_inning[game_id]
if not rolls:
logger.debug(f"Empty roll list for game {game_id}")
return
try:
await self.db_ops.save_rolls_batch(rolls)
logger.info(f"Batch saved {len(rolls)} rolls for game {game_id}")
# Clear rolls for this inning
self._rolls_this_inning[game_id] = []
except IntegrityError as e:
logger.error(f"Integrity error saving rolls for game {game_id}: {e}")
# Re-raise - audit data loss is critical
raise DatabaseError("save_rolls_batch", e)
except OperationalError as e:
logger.error(f"Database connection error saving rolls for game {game_id}: {e}")
# Re-raise - audit data loss is critical
raise DatabaseError("save_rolls_batch", e)
except SQLAlchemyError as e:
logger.error(f"Database error saving rolls for game {game_id}: {e}")
# Re-raise - audit data loss is critical
raise DatabaseError("save_rolls_batch", e)
async def _save_play_to_db(
self,
state: GameState,
result: PlayResult,
outs_before: int,
runners_before: dict[str, int | None],
db_ops: DatabaseOperations | None = None,
) -> None:
"""
Save play to database using snapshot from GameState.
Uses the pre-calculated snapshot fields (no database lookbacks).
Args:
state: Current game state
result: Play result to save
outs_before: Number of outs BEFORE this play (captured before _apply_play_result)
runners_before: Dict with runner IDs BEFORE play (on_first_id, on_second_id, on_third_id)
db_ops: Optional DatabaseOperations for transaction grouping
Raises:
ValueError: If required player IDs are missing
"""
# Use snapshot from GameState (set by _prepare_next_play)
# Extract IDs from objects for database persistence
batter_id = state.current_batter.lineup_id
pitcher_id = state.current_pitcher.lineup_id if state.current_pitcher else None
catcher_id = state.current_catcher.lineup_id if state.current_catcher else None
on_base_code = state.current_on_base_code
# VERIFY required fields are present
if batter_id is None:
raise ValueError(
f"Cannot save play: batter_id is None. "
f"Game {state.game_id} may need _prepare_next_play() called after recovery."
)
if pitcher_id is None:
raise ValueError(
f"Cannot save play: pitcher_id is None. "
f"Game {state.game_id} may need _prepare_next_play() called after recovery."
)
if catcher_id is None:
raise ValueError(
f"Cannot save play: catcher_id is None. "
f"Game {state.game_id} may need _prepare_next_play() called after recovery."
)
# Runners on base BEFORE play (captured before _apply_play_result modifies state)
on_first_id = runners_before["on_first_id"]
on_second_id = runners_before["on_second_id"]
on_third_id = runners_before["on_third_id"]
# Runners AFTER play (from result.runners_advanced)
# Build dict of from_base -> to_base for quick lookup
finals = {adv.from_base: adv.to_base for adv in result.runners_advanced}
on_first_final = finals.get(1) # None if out/scored, 1-4 if advanced
on_second_final = finals.get(2) # None if out/scored, 1-4 if advanced
on_third_final = finals.get(3) # None if out/scored, 1-4 if advanced
# Batter result (None=out, 1-4=base reached)
batter_final = result.batter_result
play_data = {
"game_id": state.game_id,
"play_number": state.play_count,
"inning": state.inning,
"half": state.half,
"outs_before": outs_before, # Passed from _finalize_play (captured before _apply_play_result)
"outs_recorded": result.outs_recorded,
"batting_order": state.current_batter.batting_order if state.current_batter else 1,
# Player IDs from snapshot
"batter_id": batter_id,
"pitcher_id": pitcher_id,
"catcher_id": catcher_id,
# Base situation snapshot
"on_base_code": on_base_code,
"on_first_id": on_first_id,
"on_second_id": on_second_id,
"on_third_id": on_third_id,
# Final positions
"on_first_final": on_first_final,
"on_second_final": on_second_final,
"on_third_final": on_third_final,
"batter_final": batter_final,
# Play outcome
"dice_roll": str(result.ab_roll),
"hit_type": result.outcome.value,
"result_description": result.description,
"runs_scored": result.runs_scored,
"away_score": state.away_score,
"home_score": state.home_score,
"complete": True,
# Strategic decisions
"defensive_choices": state.decisions_this_play.get("defensive", {}),
"offensive_choices": state.decisions_this_play.get("offensive", {}),
}
# Add metadata for uncapped hits (Phase 3: will include runner advancement decisions)
play_metadata = {}
if result.outcome in [PlayOutcome.SINGLE_UNCAPPED, PlayOutcome.DOUBLE_UNCAPPED]:
play_metadata["uncapped"] = True
play_metadata["outcome_type"] = result.outcome.value
play_data["play_metadata"] = play_metadata
# Calculate statistical fields (Phase 3.5: Materialized Views)
# Create state_after by cloning state and applying result
state_after = state.model_copy(deep=True)
state_after.outs += result.outs_recorded
if state.half == "top":
state_after.away_score += result.runs_scored
else:
state_after.home_score += result.runs_scored
# Calculate stats using PlayStatCalculator
stats = PlayStatCalculator.calculate_stats(
outcome=result.outcome,
result=result,
state_before=state,
state_after=state_after,
)
# Add stat fields to play_data
play_data.update(stats)
# Use provided db_ops or fall back to instance's db_ops
ops = db_ops or self.db_ops
await ops.save_play(play_data)
logger.debug(
f"Saved play {state.play_count}: batter={batter_id}, on_base={on_base_code}"
)
async def get_game_state(self, game_id: UUID) -> GameState | None:
"""Get current game state"""
return state_manager.get_state(game_id)
async def rollback_plays(self, game_id: UUID, num_plays: int) -> GameState:
"""
Roll back the last N plays.
Deletes plays from the database and reconstructs game state by replaying
remaining plays. Also removes any substitutions that occurred during the
rolled-back plays.
Args:
game_id: Game to roll back
num_plays: Number of plays to roll back (must be > 0)
Returns:
Updated GameState after rollback
Raises:
ValueError: If num_plays invalid, game not found, or game completed
"""
# 1. Validate
state = state_manager.get_state(game_id)
if not state:
raise ValueError(f"Game {game_id} not found")
if num_plays <= 0:
raise ValueError("num_plays must be greater than 0")
if state.play_count < num_plays:
raise ValueError(
f"Cannot roll back {num_plays} plays (only {state.play_count} exist)"
)
if state.status == "completed":
raise ValueError("Cannot roll back a completed game")
# 2. Calculate target play number
target_play = state.play_count - num_plays
logger.info(
f"Rolling back {num_plays} plays for game {game_id} "
f"(from play {state.play_count} to play {target_play})"
)
# 3. Delete plays from database
deleted_plays = await self.db_ops.delete_plays_after(game_id, target_play)
logger.info(f"Deleted {deleted_plays} plays")
# 4. Delete substitutions that occurred after target play
deleted_subs = await self.db_ops.delete_substitutions_after(
game_id, target_play
)
logger.info(f"Deleted {deleted_subs} substitutions")
# Note: We don't delete dice rolls from the rolls table - they're kept for auditing
# and don't affect game state reconstruction
# 5. Clear in-memory roll tracking for this game
if game_id in self._rolls_this_inning:
del self._rolls_this_inning[game_id]
# 6. Recover game state by replaying remaining plays
logger.info(f"Recovering game state for {game_id}")
new_state = await state_manager.recover_game(game_id)
logger.info(
f"Rollback complete - now at play {new_state.play_count}, "
f"inning {new_state.inning} {new_state.half}"
)
return new_state
async def end_game(self, game_id: UUID) -> GameState:
"""
Manually end a game
For forfeit, abandonment, etc.
"""
state = state_manager.get_state(game_id)
if not state:
raise ValueError(f"Game {game_id} not found")
# Batch save any remaining rolls
await self._batch_save_inning_rolls(game_id)
state.status = "completed"
state_manager.update_state(game_id, state)
await self.db_ops.update_game_state(
game_id=game_id,
inning=state.inning,
half=state.half,
home_score=state.home_score,
away_score=state.away_score,
status="completed",
)
# Clean up per-game resources to prevent memory leaks
self._cleanup_game_resources(game_id)
logger.info(f"Game {game_id} ended manually")
return state
def _cleanup_game_resources(self, game_id: UUID) -> None:
"""
Clean up per-game resources when a game completes.
Prevents memory leaks from unbounded dictionary growth.
Note: Game locks are now managed by StateManager.
"""
# Clean up rolls tracking
if game_id in self._rolls_this_inning:
del self._rolls_this_inning[game_id]
logger.debug(f"Cleaned up game engine resources for game {game_id}")
# Singleton instance
game_engine = GameEngine()