strat-gameplay-webapp/backend/app/core/game_engine.py
Cal Corum e2f1d6079f CLAUDE: Implement Week 7 Task 6 - PlayResolver Integration with RunnerAdvancement
Major Refactor: Outcome-First Architecture
- PlayResolver now accepts league_id and auto_mode in constructor
- Added core resolve_outcome() method - all resolution logic in one place
- Added resolve_manual_play() wrapper for manual submissions (primary)
- Added resolve_auto_play() wrapper for PD auto mode (rare)
- Removed SimplifiedResultChart (obsolete with new architecture)
- Removed play_resolver singleton

RunnerAdvancement Integration:
- All groundball outcomes (GROUNDBALL_A/B/C) now use RunnerAdvancement
- Proper DP probability calculation with positioning modifiers
- Hit location tracked for all relevant outcomes
- 13 result types fully integrated from advancement charts

Game State Updates:
- Added auto_mode field to GameState (stored per-game)
- Updated state_manager.create_game() to accept auto_mode parameter
- GameEngine now uses state.auto_mode to create appropriate resolver

League Configuration:
- Added supports_auto_mode() to BaseGameConfig
- SbaConfig: returns False (no digitized cards)
- PdConfig: returns True (has digitized ratings)
- PlayResolver validates auto mode support and raises error for SBA

Play Results:
- Added hit_location field to PlayResult
- Groundballs include location from RunnerAdvancement
- Flyouts track hit_location for tag-up logic (future)
- Other outcomes have hit_location=None

Testing:
- Completely rewrote test_play_resolver.py for new architecture
- 9 new tests covering initialization, strikeouts, walks, groundballs, home runs
- All 9 tests passing
- All 180 core tests still passing (1 pre-existing failure unrelated)

Terminal Client:
- No changes needed - defaults to manual mode (auto_mode=False)
- Perfect for human testing of manual submissions

This completes Week 7 Task 6 - the final task of Week 7!
Week 7 is now 100% complete with all 8 tasks done.

🎯 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-31 08:20:52 -05:00

1048 lines
40 KiB
Python

"""
Game Engine - Main game orchestration engine.
Coordinates game flow, validates actions, resolves plays, and persists state.
Integrates DiceSystem for roll tracking with context and batch saving.
Phase 3: Enhanced with async decision workflow and AI opponent integration.
Author: Claude
Date: 2025-10-24
"""
import asyncio
import logging
from uuid import UUID
from typing import Optional, List
import pendulum
from app.core.state_manager import state_manager
from app.core.play_resolver import PlayResolver, PlayResult
from app.config import PlayOutcome
from app.core.validators import game_validator, ValidationError
from app.core.dice import dice_system
from app.core.ai_opponent import ai_opponent
from app.database.operations import DatabaseOperations
from app.models.game_models import (
GameState, DefensiveDecision, OffensiveDecision
)
logger = logging.getLogger(f'{__name__}.GameEngine')
class GameEngine:
"""Main game orchestration engine"""
# Phase 3: Decision timeout in seconds
DECISION_TIMEOUT = 30
def __init__(self):
self.db_ops = DatabaseOperations()
# Track rolls per inning for batch saving
self._rolls_this_inning: dict[UUID, List] = {}
async def start_game(self, game_id: UUID) -> GameState:
"""
Start a game
Transitions from 'pending' to 'active'.
Validates that both teams have complete lineups (minimum 9 players each).
Prepares the first play snapshot.
Raises:
ValidationError: If game already started or lineups incomplete
"""
state = state_manager.get_state(game_id)
if not state:
raise ValueError(f"Game {game_id} not found in state manager")
if state.status != "pending":
raise ValidationError(f"Game already started (status: {state.status})")
# HARD REQUIREMENT: Validate both lineups are complete
# At game start, we validate BOTH teams (exception to the "defensive only" rule)
home_lineup = await self.db_ops.get_active_lineup(state.game_id, state.home_team_id)
away_lineup = await self.db_ops.get_active_lineup(state.game_id, state.away_team_id)
# Check minimum 9 players per team
if not home_lineup or len(home_lineup) < 9:
raise ValidationError(
f"Home team lineup incomplete: {len(home_lineup) if home_lineup else 0} players "
f"(minimum 9 required)"
)
if not away_lineup or len(away_lineup) < 9:
raise ValidationError(
f"Away team lineup incomplete: {len(away_lineup) if away_lineup else 0} players "
f"(minimum 9 required)"
)
# Validate defensive positions - at game start, check BOTH teams
try:
game_validator.validate_defensive_lineup_positions(home_lineup)
except ValidationError as e:
raise ValidationError(f"Home team: {e}")
try:
game_validator.validate_defensive_lineup_positions(away_lineup)
except ValidationError as e:
raise ValidationError(f"Away team: {e}")
# Mark as active
state.status = "active"
state.inning = 1
state.half = "top"
state.outs = 0
# Initialize roll tracking for this game
self._rolls_this_inning[game_id] = []
# Prepare first play snapshot
await self._prepare_next_play(state)
# Update state
state_manager.update_state(game_id, state)
# Persist to DB
await self.db_ops.update_game_state(
game_id=game_id,
inning=1,
half="top",
home_score=0,
away_score=0,
status="active"
)
logger.info(
f"Started game {game_id} - First batter: lineup_id={state.current_batter_lineup_id}"
)
return state
async def submit_defensive_decision(
self,
game_id: UUID,
decision: DefensiveDecision
) -> GameState:
"""
Submit defensive team decision.
Phase 3: Now integrates with decision queue to resolve pending futures.
"""
state = state_manager.get_state(game_id)
if not state:
raise ValueError(f"Game {game_id} not found")
game_validator.validate_game_active(state)
game_validator.validate_defensive_decision(decision, state)
# Store decision in state (for backward compatibility)
state.decisions_this_play['defensive'] = decision.model_dump()
state.pending_decision = "offensive"
state.pending_defensive_decision = decision
# Phase 3: Resolve pending future if exists
fielding_team_id = state.get_fielding_team_id()
try:
state_manager.submit_decision(game_id, fielding_team_id, decision)
logger.info(f"Resolved pending defensive decision future for game {game_id}")
except ValueError:
# No pending future - that's okay (direct submission without await)
logger.debug(f"No pending defensive decision for game {game_id}")
state_manager.update_state(game_id, state)
logger.info(f"Defensive decision submitted for game {game_id}")
return state
async def submit_offensive_decision(
self,
game_id: UUID,
decision: OffensiveDecision
) -> GameState:
"""
Submit offensive team decision.
Phase 3: Now integrates with decision queue to resolve pending futures.
"""
state = state_manager.get_state(game_id)
if not state:
raise ValueError(f"Game {game_id} not found")
game_validator.validate_game_active(state)
game_validator.validate_offensive_decision(decision, state)
# Store decision in state (for backward compatibility)
state.decisions_this_play['offensive'] = decision.model_dump()
state.pending_decision = "resolution"
state.pending_offensive_decision = decision
# Phase 3: Resolve pending future if exists
batting_team_id = state.get_batting_team_id()
try:
state_manager.submit_decision(game_id, batting_team_id, decision)
logger.info(f"Resolved pending offensive decision future for game {game_id}")
except ValueError:
# No pending future - that's okay (direct submission without await)
logger.debug(f"No pending offensive decision for game {game_id}")
state_manager.update_state(game_id, state)
logger.info(f"Offensive decision submitted for game {game_id}")
return state
# ============================================================================
# PHASE 3: ENHANCED DECISION WORKFLOW
# ============================================================================
async def await_defensive_decision(
self,
state: GameState,
timeout: int = None
) -> DefensiveDecision:
"""
Wait for defensive team to submit decision.
For AI teams: Generate decision immediately
For human teams: Wait for WebSocket submission (with timeout)
Args:
state: Current game state
timeout: Seconds to wait before using default decision (default: class constant)
Returns:
DefensiveDecision (validated)
Raises:
asyncio.TimeoutError: If timeout exceeded (async games only)
"""
if timeout is None:
timeout = self.DECISION_TIMEOUT
fielding_team_id = state.get_fielding_team_id()
# Check if fielding team is AI
if state.is_fielding_team_ai():
logger.info(f"Generating AI defensive decision for game {state.game_id}")
return await ai_opponent.generate_defensive_decision(state)
# Human team: wait for decision via WebSocket
logger.info(f"Awaiting human defensive decision for game {state.game_id}, team {fielding_team_id}")
# Set pending decision in state manager
state_manager.set_pending_decision(
game_id=state.game_id,
team_id=fielding_team_id,
decision_type="defensive"
)
# Update state with decision phase
state.decision_phase = "awaiting_defensive"
state.decision_deadline = pendulum.now('UTC').add(seconds=timeout).to_iso8601_string()
state_manager.update_state(state.game_id, state)
# TODO Week 7 Task 4: Emit WebSocket event to notify frontend
# await self.connection_manager.emit_decision_required(
# game_id=state.game_id,
# team_id=fielding_team_id,
# decision_type="defensive",
# timeout=timeout,
# game_situation=state.to_situation_summary()
# )
try:
# Wait for decision with timeout
decision = await asyncio.wait_for(
state_manager.await_decision(state.game_id, fielding_team_id, "defensive"),
timeout=timeout
)
logger.info(f"Received defensive decision for game {state.game_id}")
return decision
except asyncio.TimeoutError:
# Use default decision on timeout
logger.warning(f"Defensive decision timeout for game {state.game_id}, using default")
return DefensiveDecision() # All defaults
async def await_offensive_decision(
self,
state: GameState,
timeout: int = None
) -> OffensiveDecision:
"""
Wait for offensive team to submit decision.
Similar to await_defensive_decision but for batting team.
Args:
state: Current game state
timeout: Seconds to wait before using default decision
Returns:
OffensiveDecision (validated)
Raises:
asyncio.TimeoutError: If timeout exceeded (async games only)
"""
if timeout is None:
timeout = self.DECISION_TIMEOUT
batting_team_id = state.get_batting_team_id()
# Check if batting team is AI
if state.is_batting_team_ai():
logger.info(f"Generating AI offensive decision for game {state.game_id}")
return await ai_opponent.generate_offensive_decision(state)
# Human team: wait for decision via WebSocket
logger.info(f"Awaiting human offensive decision for game {state.game_id}, team {batting_team_id}")
# Set pending decision in state manager
state_manager.set_pending_decision(
game_id=state.game_id,
team_id=batting_team_id,
decision_type="offensive"
)
# Update state with decision phase
state.decision_phase = "awaiting_offensive"
state.decision_deadline = pendulum.now('UTC').add(seconds=timeout).to_iso8601_string()
state_manager.update_state(state.game_id, state)
# TODO Week 7 Task 4: Emit WebSocket event to notify frontend
# await self.connection_manager.emit_decision_required(...)
try:
# Wait for decision with timeout
decision = await asyncio.wait_for(
state_manager.await_decision(state.game_id, batting_team_id, "offensive"),
timeout=timeout
)
logger.info(f"Received offensive decision for game {state.game_id}")
return decision
except asyncio.TimeoutError:
# Use default decision on timeout
logger.warning(f"Offensive decision timeout for game {state.game_id}, using default")
return OffensiveDecision() # All defaults
async def resolve_play(self, game_id: UUID, forced_outcome: Optional[PlayOutcome] = None) -> PlayResult:
"""
Resolve the current play with dice roll
Explicit orchestration sequence:
1. Resolve play with dice rolls
2. Save play to DB (uses snapshot from GameState)
3. Apply result to state (outs, score, runners)
4. Update game state in DB
5. Check for inning change (outs >= 3)
6. Prepare next play (always last step)
Args:
game_id: Game to resolve
forced_outcome: If provided, use this outcome instead of rolling dice (for testing)
Returns:
PlayResult with complete outcome
"""
state = state_manager.get_state(game_id)
if not state:
raise ValueError(f"Game {game_id} not found")
game_validator.validate_game_active(state)
# Get decisions
defensive_decision = DefensiveDecision(**state.decisions_this_play.get('defensive', {}))
offensive_decision = OffensiveDecision(**state.decisions_this_play.get('offensive', {}))
# STEP 1: Resolve play
# Create resolver for this game's league and mode
resolver = PlayResolver(league_id=state.league_id, auto_mode=state.auto_mode)
# Roll dice
ab_roll = dice_system.roll_ab(league_id=state.league_id, game_id=game_id)
# Use forced outcome if provided (for testing), otherwise need to implement chart lookup
if forced_outcome is None:
raise NotImplementedError(
"This method only supports forced_outcome for testing. "
"Use resolve_manual_play() for manual mode or resolve_auto_play() for auto mode."
)
result = resolver.resolve_outcome(
outcome=forced_outcome,
hit_location=None, # Testing doesn't specify location
state=state,
defensive_decision=defensive_decision,
offensive_decision=offensive_decision,
ab_roll=ab_roll
)
# Track roll for batch saving at end of inning
if game_id not in self._rolls_this_inning:
self._rolls_this_inning[game_id] = []
self._rolls_this_inning[game_id].append(result.ab_roll)
# STEP 2: Save play to DB (uses snapshot from GameState)
await self._save_play_to_db(state, result)
# Capture state before applying result
state_before = {
'inning': state.inning,
'half': state.half,
'home_score': state.home_score,
'away_score': state.away_score,
'status': state.status
}
# STEP 3: Apply result to state (outs, score, runners)
self._apply_play_result(state, result)
# STEP 4: Update game state in DB only if something changed
if (state.inning != state_before['inning'] or
state.half != state_before['half'] or
state.home_score != state_before['home_score'] or
state.away_score != state_before['away_score'] or
state.status != state_before['status']):
await self.db_ops.update_game_state(
game_id=state.game_id,
inning=state.inning,
half=state.half,
home_score=state.home_score,
away_score=state.away_score,
status=state.status
)
logger.info(f"Updated game state in DB - score/inning/status changed")
else:
logger.debug(f"Skipped game state update - no changes to persist")
# STEP 5: Check for inning change
if state.outs >= 3:
await self._advance_inning(state, game_id)
# Update DB again after inning change
await self.db_ops.update_game_state(
game_id=state.game_id,
inning=state.inning,
half=state.half,
home_score=state.home_score,
away_score=state.away_score,
status=state.status
)
# Batch save rolls at half-inning boundary
await self._batch_save_inning_rolls(game_id)
# STEP 6: Prepare next play (always last step)
if state.status == "active": # Only prepare if game is still active
await self._prepare_next_play(state)
# Clear decisions for next play
state.decisions_this_play = {}
state.pending_decision = "defensive"
# Update in-memory state
state_manager.update_state(game_id, state)
logger.info(f"Resolved play {state.play_count} for game {game_id}: {result.description}")
return result
async def resolve_manual_play(
self,
game_id: UUID,
ab_roll: 'AbRoll',
outcome: PlayOutcome,
hit_location: Optional[str] = None
) -> PlayResult:
"""
Resolve play with manually-submitted outcome (manual mode).
In manual mode (SBA + PD manual):
1. Server rolls dice for fairness/auditing
2. Players read their physical cards based on those dice
3. Players submit the outcome they see
4. Server validates and processes with the provided outcome
Orchestration sequence (same as resolve_play):
1. Resolve play with manual outcome (uses ab_roll for audit trail)
2. Save play to DB
3. Apply result to state
4. Update game state in DB
5. Check for inning change
6. Prepare next play
Args:
game_id: Game to resolve
ab_roll: The dice roll (server-rolled for fairness)
outcome: PlayOutcome enum (from player's physical card)
hit_location: Optional hit location for groundballs/flyouts
Returns:
PlayResult with complete outcome
Raises:
ValidationError: If game not active or hit location missing when required
"""
state = state_manager.get_state(game_id)
if not state:
raise ValueError(f"Game {game_id} not found")
game_validator.validate_game_active(state)
# Validate hit location provided when required
if outcome.requires_hit_location() and not hit_location:
raise ValidationError(
f"Outcome {outcome.value} requires hit_location "
f"(one of: 1B, 2B, SS, 3B, LF, CF, RF, P, C)"
)
# Get decisions
defensive_decision = DefensiveDecision(**state.decisions_this_play.get('defensive', {}))
offensive_decision = OffensiveDecision(**state.decisions_this_play.get('offensive', {}))
# STEP 1: Resolve play with manual outcome
# Create resolver for this game's league and mode
resolver = PlayResolver(league_id=state.league_id, auto_mode=state.auto_mode)
# Call core resolution with manual outcome
result = resolver.resolve_outcome(
outcome=outcome,
hit_location=hit_location,
state=state,
defensive_decision=defensive_decision,
offensive_decision=offensive_decision,
ab_roll=ab_roll
)
# Track roll for batch saving at end of inning (same as auto mode)
if game_id not in self._rolls_this_inning:
self._rolls_this_inning[game_id] = []
self._rolls_this_inning[game_id].append(ab_roll)
# STEP 2: Save play to DB
await self._save_play_to_db(state, result)
# Capture state before applying result
state_before = {
'inning': state.inning,
'half': state.half,
'home_score': state.home_score,
'away_score': state.away_score,
'status': state.status
}
# STEP 3: Apply result to state
self._apply_play_result(state, result)
# STEP 4: Update game state in DB only if something changed
if (state.inning != state_before['inning'] or
state.half != state_before['half'] or
state.home_score != state_before['home_score'] or
state.away_score != state_before['away_score'] or
state.status != state_before['status']):
await self.db_ops.update_game_state(
game_id=state.game_id,
inning=state.inning,
half=state.half,
home_score=state.home_score,
away_score=state.away_score,
status=state.status
)
logger.info(f"Updated game state in DB - score/inning/status changed")
else:
logger.debug(f"Skipped game state update - no changes to persist")
# STEP 5: Check for inning change
if state.outs >= 3:
await self._advance_inning(state, game_id)
# Update DB again after inning change
await self.db_ops.update_game_state(
game_id=state.game_id,
inning=state.inning,
half=state.half,
home_score=state.home_score,
away_score=state.away_score,
status=state.status
)
# Batch save rolls at half-inning boundary
await self._batch_save_inning_rolls(game_id)
# STEP 6: Prepare next play
if state.status == "active":
await self._prepare_next_play(state)
# Clear decisions for next play
state.decisions_this_play = {}
state.pending_decision = "defensive"
# Update in-memory state
state_manager.update_state(game_id, state)
logger.info(
f"Resolved manual play {state.play_count} for game {game_id}: "
f"{result.description}" + (f" (hit to {hit_location})" if hit_location else "")
)
return result
def _apply_play_result(self, state: GameState, result: PlayResult) -> None:
"""
Apply play result to in-memory game state.
Only updates state - NO database writes (handled by orchestration layer).
"""
# Update outs
state.outs += result.outs_recorded
# Build advancement lookup
advancement_map = {from_base: to_base for from_base, to_base in result.runners_advanced}
# Create temporary storage for new runner positions
new_first = None
new_second = None
new_third = None
# Process existing runners
for base, runner in state.get_all_runners():
if base in advancement_map:
to_base = advancement_map[base]
if to_base < 4: # Not scored
if to_base == 1:
new_first = runner
elif to_base == 2:
new_second = runner
elif to_base == 3:
new_third = runner
# If to_base == 4, runner scored (don't add to new positions)
else:
# Runner stays put
if base == 1:
new_first = runner
elif base == 2:
new_second = runner
elif base == 3:
new_third = runner
# Add batter if reached base
if result.batter_result and result.batter_result < 4:
# Look up the actual batter from cached lineup
batting_team_id = state.away_team_id if state.half == "top" else state.home_team_id
batting_lineup = state_manager.get_lineup(state.game_id, batting_team_id)
batter = None
if batting_lineup and state.current_batter_lineup_id:
# Find the batter in the lineup
batter = batting_lineup.get_player_by_lineup_id(state.current_batter_lineup_id)
if not batter:
# Fallback: create minimal LineupPlayerState
# This shouldn't happen if _prepare_next_play was called correctly
from app.models.game_models import LineupPlayerState
logger.warning(f"Could not find batter lineup_id={state.current_batter_lineup_id} in cached lineup, using fallback")
batter = LineupPlayerState(
lineup_id=state.current_batter_lineup_id or 0,
card_id=0,
position="DH", # Use DH as fallback position
batting_order=None
)
if result.batter_result == 1:
new_first = batter
elif result.batter_result == 2:
new_second = batter
elif result.batter_result == 3:
new_third = batter
# Update state with new runner positions
state.on_first = new_first
state.on_second = new_second
state.on_third = new_third
# Update score
if state.half == "top":
state.away_score += result.runs_scored
else:
state.home_score += result.runs_scored
# Increment play count
state.play_count += 1
state.last_play_result = result.description
runner_count = len([r for r in [state.on_first, state.on_second, state.on_third] if r])
logger.debug(
f"Applied play result: outs={state.outs}, "
f"score={state.away_score}-{state.home_score}, "
f"runners={runner_count}"
)
async def _advance_inning(self, state: GameState, game_id: UUID) -> None:
"""
Advance to next half inning.
Only handles inning transition - NO database writes, NO prepare_next_play.
Those are handled by the orchestration layer.
Validates defensive team lineup positions at start of each half inning.
"""
if state.half == "top":
state.half = "bottom"
else:
state.half = "top"
state.inning += 1
# Clear bases and reset outs
state.outs = 0
state.clear_bases()
# Validate defensive team lineup positions
# Top of inning: home team is defending
# Bottom of inning: away team is defending
defensive_team = state.home_team_id if state.half == "top" else state.away_team_id
defensive_lineup = await self.db_ops.get_active_lineup(state.game_id, defensive_team)
if not defensive_lineup:
raise ValidationError(f"No lineup found for defensive team {defensive_team}")
game_validator.validate_defensive_lineup_positions(defensive_lineup)
logger.info(f"Advanced to inning {state.inning} {state.half}")
# Check if game is over
if game_validator.is_game_over(state):
state.status = "completed"
logger.info(f"Game {state.game_id} completed - Final: Away {state.away_score}, Home {state.home_score}")
async def _prepare_next_play(self, state: GameState) -> None:
"""
Prepare snapshot for the next play.
This method:
1. Determines current batter based on batting order index
2. Advances the appropriate team's batter index (with wraparound)
3. Fetches active lineups from database
4. Sets snapshot fields: current_batter/pitcher/catcher_lineup_id
5. Calculates on_base_code from current runners
This snapshot is used when saving the Play record to DB.
"""
# Determine which team is batting
if state.half == "top":
# Away team batting
current_idx = state.away_team_batter_idx
state.away_team_batter_idx = (current_idx + 1) % 9
batting_team = state.away_team_id
fielding_team = state.home_team_id
else:
# Home team batting
current_idx = state.home_team_batter_idx
state.home_team_batter_idx = (current_idx + 1) % 9
batting_team = state.home_team_id
fielding_team = state.away_team_id
# Try to get lineups from cache first, only fetch from DB if not cached
from app.models.game_models import TeamLineupState, LineupPlayerState
batting_lineup_state = state_manager.get_lineup(state.game_id, batting_team)
fielding_lineup_state = state_manager.get_lineup(state.game_id, fielding_team)
# Fetch from database only if not in cache
if not batting_lineup_state:
batting_lineup = await self.db_ops.get_active_lineup(state.game_id, batting_team)
if batting_lineup:
batting_lineup_state = TeamLineupState(
team_id=batting_team,
players=[
LineupPlayerState(
lineup_id=p.id, # type: ignore[assignment]
card_id=p.card_id if p.card_id else 0, # type: ignore[assignment]
position=p.position, # type: ignore[assignment]
batting_order=p.batting_order, # type: ignore[assignment]
is_active=p.is_active # type: ignore[assignment]
)
for p in batting_lineup
]
)
state_manager.set_lineup(state.game_id, batting_team, batting_lineup_state)
if not fielding_lineup_state:
fielding_lineup = await self.db_ops.get_active_lineup(state.game_id, fielding_team)
if fielding_lineup:
fielding_lineup_state = TeamLineupState(
team_id=fielding_team,
players=[
LineupPlayerState(
lineup_id=p.id, # type: ignore[assignment]
card_id=p.card_id if p.card_id else 0, # type: ignore[assignment]
position=p.position, # type: ignore[assignment]
batting_order=p.batting_order, # type: ignore[assignment]
is_active=p.is_active # type: ignore[assignment]
)
for p in fielding_lineup
]
)
state_manager.set_lineup(state.game_id, fielding_team, fielding_lineup_state)
# Set current player snapshot using cached lineup data
# Batter: use the batting order index to find the player
if batting_lineup_state and current_idx < len(batting_lineup_state.players):
# Get batting order sorted list
batting_order = sorted(
[p for p in batting_lineup_state.players if p.batting_order is not None],
key=lambda x: x.batting_order or 0
)
if current_idx < len(batting_order):
state.current_batter_lineup_id = batting_order[current_idx].lineup_id
else:
state.current_batter_lineup_id = None
logger.warning(f"Batter index {current_idx} out of range for batting order")
else:
state.current_batter_lineup_id = None
logger.warning(f"No batting lineup found for team {batting_team}")
# Pitcher and catcher: find by position from cached lineup
if fielding_lineup_state:
pitcher = next((p for p in fielding_lineup_state.players if p.position == "P"), None)
state.current_pitcher_lineup_id = pitcher.lineup_id if pitcher else None
catcher = next((p for p in fielding_lineup_state.players if p.position == "C"), None)
state.current_catcher_lineup_id = catcher.lineup_id if catcher else None
else:
state.current_pitcher_lineup_id = None
state.current_catcher_lineup_id = None
# Calculate on_base_code from current runners (bit field)
state.current_on_base_code = 0
if state.on_first:
state.current_on_base_code |= 1 # Bit 0: first base
if state.on_second:
state.current_on_base_code |= 2 # Bit 1: second base
if state.on_third:
state.current_on_base_code |= 4 # Bit 2: third base
logger.debug(
f"Prepared next play: batter={state.current_batter_lineup_id}, "
f"pitcher={state.current_pitcher_lineup_id}, "
f"catcher={state.current_catcher_lineup_id}, "
f"on_base_code={state.current_on_base_code}"
)
async def _batch_save_inning_rolls(self, game_id: UUID) -> None:
"""
Batch save all rolls from the inning
This is called at end of each half-inning to persist
all dice rolls with their context to the database.
"""
if game_id not in self._rolls_this_inning:
logger.debug(f"No rolls to save for game {game_id}")
return
rolls = self._rolls_this_inning[game_id]
if not rolls:
logger.debug(f"Empty roll list for game {game_id}")
return
try:
await self.db_ops.save_rolls_batch(rolls)
logger.info(f"Batch saved {len(rolls)} rolls for game {game_id}")
# Clear rolls for this inning
self._rolls_this_inning[game_id] = []
except Exception as e:
logger.error(f"Failed to batch save rolls for game {game_id}: {e}")
# Don't fail the game - rolls are still in dice_system history
# We can recover them later if needed
async def _save_play_to_db(self, state: GameState, result: PlayResult) -> None:
"""
Save play to database using snapshot from GameState.
Uses the pre-calculated snapshot fields (no database lookbacks).
Raises:
ValueError: If required player IDs are missing
"""
# Use snapshot from GameState (set by _prepare_next_play)
batter_id = state.current_batter_lineup_id
pitcher_id = state.current_pitcher_lineup_id
catcher_id = state.current_catcher_lineup_id
on_base_code = state.current_on_base_code
# VERIFY required fields are present
if batter_id is None:
raise ValueError(
f"Cannot save play: batter_id is None. "
f"Game {state.game_id} may need _prepare_next_play() called after recovery."
)
if pitcher_id is None:
raise ValueError(
f"Cannot save play: pitcher_id is None. "
f"Game {state.game_id} may need _prepare_next_play() called after recovery."
)
if catcher_id is None:
raise ValueError(
f"Cannot save play: catcher_id is None. "
f"Game {state.game_id} may need _prepare_next_play() called after recovery."
)
# Runners on base BEFORE play (from state.on_first/second/third)
on_first_id = state.on_first.lineup_id if state.on_first else None
on_second_id = state.on_second.lineup_id if state.on_second else None
on_third_id = state.on_third.lineup_id if state.on_third else None
# Runners AFTER play (from result.runners_advanced)
# Build dict of from_base -> to_base for quick lookup
finals = {from_base: to_base for from_base, to_base in result.runners_advanced}
on_first_final = finals.get(1) # None if out/scored, 1-4 if advanced
on_second_final = finals.get(2) # None if out/scored, 1-4 if advanced
on_third_final = finals.get(3) # None if out/scored, 1-4 if advanced
# Batter result (None=out, 1-4=base reached)
batter_final = result.batter_result
play_data = {
"game_id": state.game_id,
"play_number": state.play_count,
"inning": state.inning,
"half": state.half,
"outs_before": state.outs, # Capture current outs BEFORE applying result
"outs_recorded": result.outs_recorded,
# Player IDs from snapshot
"batter_id": batter_id,
"pitcher_id": pitcher_id,
"catcher_id": catcher_id,
# Base situation snapshot
"on_base_code": on_base_code,
"on_first_id": on_first_id,
"on_second_id": on_second_id,
"on_third_id": on_third_id,
# Final positions
"on_first_final": on_first_final,
"on_second_final": on_second_final,
"on_third_final": on_third_final,
"batter_final": batter_final,
# Play outcome
"dice_roll": str(result.ab_roll),
"hit_type": result.outcome.value,
"result_description": result.description,
"runs_scored": result.runs_scored,
"away_score": state.away_score,
"home_score": state.home_score,
"complete": True,
# Strategic decisions
"defensive_choices": state.decisions_this_play.get('defensive', {}),
"offensive_choices": state.decisions_this_play.get('offensive', {})
}
# Add metadata for uncapped hits (Phase 3: will include runner advancement decisions)
play_metadata = {}
if result.outcome in [PlayOutcome.SINGLE_UNCAPPED, PlayOutcome.DOUBLE_UNCAPPED]:
play_metadata["uncapped"] = True
play_metadata["outcome_type"] = result.outcome.value
play_data["play_metadata"] = play_metadata
await self.db_ops.save_play(play_data)
logger.debug(f"Saved play {state.play_count}: batter={batter_id}, on_base={on_base_code}")
async def get_game_state(self, game_id: UUID) -> Optional[GameState]:
"""Get current game state"""
return state_manager.get_state(game_id)
async def rollback_plays(self, game_id: UUID, num_plays: int) -> GameState:
"""
Roll back the last N plays.
Deletes plays from the database and reconstructs game state by replaying
remaining plays. Also removes any substitutions that occurred during the
rolled-back plays.
Args:
game_id: Game to roll back
num_plays: Number of plays to roll back (must be > 0)
Returns:
Updated GameState after rollback
Raises:
ValueError: If num_plays invalid, game not found, or game completed
"""
# 1. Validate
state = state_manager.get_state(game_id)
if not state:
raise ValueError(f"Game {game_id} not found")
if num_plays <= 0:
raise ValueError("num_plays must be greater than 0")
if state.play_count < num_plays:
raise ValueError(
f"Cannot roll back {num_plays} plays (only {state.play_count} exist)"
)
if state.status == "completed":
raise ValueError("Cannot roll back a completed game")
# 2. Calculate target play number
target_play = state.play_count - num_plays
logger.info(
f"Rolling back {num_plays} plays for game {game_id} "
f"(from play {state.play_count} to play {target_play})"
)
# 3. Delete plays from database
deleted_plays = await self.db_ops.delete_plays_after(game_id, target_play)
logger.info(f"Deleted {deleted_plays} plays")
# 4. Delete substitutions that occurred after target play
deleted_subs = await self.db_ops.delete_substitutions_after(game_id, target_play)
logger.info(f"Deleted {deleted_subs} substitutions")
# Note: We don't delete dice rolls from the rolls table - they're kept for auditing
# and don't affect game state reconstruction
# 5. Clear in-memory roll tracking for this game
if game_id in self._rolls_this_inning:
del self._rolls_this_inning[game_id]
# 6. Recover game state by replaying remaining plays
logger.info(f"Recovering game state for {game_id}")
new_state = await state_manager.recover_game(game_id)
logger.info(
f"Rollback complete - now at play {new_state.play_count}, "
f"inning {new_state.inning} {new_state.half}"
)
return new_state
async def end_game(self, game_id: UUID) -> GameState:
"""
Manually end a game
For forfeit, abandonment, etc.
"""
state = state_manager.get_state(game_id)
if not state:
raise ValueError(f"Game {game_id} not found")
# Batch save any remaining rolls
await self._batch_save_inning_rolls(game_id)
state.status = "completed"
state_manager.update_state(game_id, state)
await self.db_ops.update_game_state(
game_id=game_id,
inning=state.inning,
half=state.half,
home_score=state.home_score,
away_score=state.away_score,
status="completed"
)
logger.info(f"Game {game_id} ended manually")
return state
# Singleton instance
game_engine = GameEngine()