strat-gameplay-webapp/backend/app/core/game_engine.py
Cal Corum 95d8703f56 CLAUDE: Implement Week 7 Task 1 - Strategic Decision Integration
Enhanced game engine with async decision workflow and AI opponent integration:

GameState Model Enhancements:
- Added pending_defensive_decision and pending_offensive_decision fields
- Added decision_phase tracking (idle, awaiting_defensive, awaiting_offensive, resolving, completed)
- Added decision_deadline field for timeout handling
- Added is_batting_team_ai() and is_fielding_team_ai() helper methods
- Added validator for decision_phase

StateManager Enhancements:
- Added _pending_decisions dict for asyncio.Future-based decision queue
- Added set_pending_decision() to create decision futures
- Added await_decision() to wait for decision submission
- Added submit_decision() to resolve pending futures
- Added cancel_pending_decision() for cleanup

GameEngine Enhancements:
- Added await_defensive_decision() with AI/human branching and timeout
- Added await_offensive_decision() with AI/human branching and timeout
- Enhanced submit_defensive_decision() to resolve pending futures
- Enhanced submit_offensive_decision() to resolve pending futures
- Added DECISION_TIMEOUT constant (30 seconds)

AI Opponent (Stub):
- Created ai_opponent.py with stub implementations
- generate_defensive_decision() returns default "normal" positioning
- generate_offensive_decision() returns default "normal" approach
- TODO markers for Week 9 full AI logic implementation

Integration:
- Backward compatible with existing terminal client workflow
- New async methods ready for WebSocket integration (Week 7 Task 4)
- AI teams get instant decisions, human teams wait with timeout
- Default decisions applied on timeout (no game blocking)

Testing:
- Config tests: 58/58 passing 
- Terminal client: Working perfectly 
- Existing workflows: Fully compatible 

Week 7 Task 1: Complete
Next: Task 2 - Decision Validators

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-29 21:38:11 -05:00

816 lines
31 KiB
Python

"""
Game Engine - Main game orchestration engine.
Coordinates game flow, validates actions, resolves plays, and persists state.
Integrates DiceSystem for roll tracking with context and batch saving.
Phase 3: Enhanced with async decision workflow and AI opponent integration.
Author: Claude
Date: 2025-10-24
"""
import asyncio
import logging
from uuid import UUID
from typing import Optional, List
import pendulum
from app.core.state_manager import state_manager
from app.core.play_resolver import play_resolver, PlayResult
from app.config import PlayOutcome
from app.core.validators import game_validator, ValidationError
from app.core.dice import dice_system
from app.core.ai_opponent import ai_opponent
from app.database.operations import DatabaseOperations
from app.models.game_models import (
GameState, DefensiveDecision, OffensiveDecision
)
logger = logging.getLogger(f'{__name__}.GameEngine')
class GameEngine:
"""Main game orchestration engine"""
# Phase 3: Decision timeout in seconds
DECISION_TIMEOUT = 30
def __init__(self):
self.db_ops = DatabaseOperations()
# Track rolls per inning for batch saving
self._rolls_this_inning: dict[UUID, List] = {}
async def start_game(self, game_id: UUID) -> GameState:
"""
Start a game
Transitions from 'pending' to 'active'.
Validates that both teams have complete lineups (minimum 9 players each).
Prepares the first play snapshot.
Raises:
ValidationError: If game already started or lineups incomplete
"""
state = state_manager.get_state(game_id)
if not state:
raise ValueError(f"Game {game_id} not found in state manager")
if state.status != "pending":
raise ValidationError(f"Game already started (status: {state.status})")
# HARD REQUIREMENT: Validate both lineups are complete
# At game start, we validate BOTH teams (exception to the "defensive only" rule)
home_lineup = await self.db_ops.get_active_lineup(state.game_id, state.home_team_id)
away_lineup = await self.db_ops.get_active_lineup(state.game_id, state.away_team_id)
# Check minimum 9 players per team
if not home_lineup or len(home_lineup) < 9:
raise ValidationError(
f"Home team lineup incomplete: {len(home_lineup) if home_lineup else 0} players "
f"(minimum 9 required)"
)
if not away_lineup or len(away_lineup) < 9:
raise ValidationError(
f"Away team lineup incomplete: {len(away_lineup) if away_lineup else 0} players "
f"(minimum 9 required)"
)
# Validate defensive positions - at game start, check BOTH teams
try:
game_validator.validate_defensive_lineup_positions(home_lineup)
except ValidationError as e:
raise ValidationError(f"Home team: {e}")
try:
game_validator.validate_defensive_lineup_positions(away_lineup)
except ValidationError as e:
raise ValidationError(f"Away team: {e}")
# Mark as active
state.status = "active"
state.inning = 1
state.half = "top"
state.outs = 0
# Initialize roll tracking for this game
self._rolls_this_inning[game_id] = []
# Prepare first play snapshot
await self._prepare_next_play(state)
# Update state
state_manager.update_state(game_id, state)
# Persist to DB
await self.db_ops.update_game_state(
game_id=game_id,
inning=1,
half="top",
home_score=0,
away_score=0,
status="active"
)
logger.info(
f"Started game {game_id} - First batter: lineup_id={state.current_batter_lineup_id}"
)
return state
async def submit_defensive_decision(
self,
game_id: UUID,
decision: DefensiveDecision
) -> GameState:
"""
Submit defensive team decision.
Phase 3: Now integrates with decision queue to resolve pending futures.
"""
state = state_manager.get_state(game_id)
if not state:
raise ValueError(f"Game {game_id} not found")
game_validator.validate_game_active(state)
game_validator.validate_defensive_decision(decision, state)
# Store decision in state (for backward compatibility)
state.decisions_this_play['defensive'] = decision.model_dump()
state.pending_decision = "offensive"
state.pending_defensive_decision = decision
# Phase 3: Resolve pending future if exists
fielding_team_id = state.get_fielding_team_id()
try:
state_manager.submit_decision(game_id, fielding_team_id, decision)
logger.info(f"Resolved pending defensive decision future for game {game_id}")
except ValueError:
# No pending future - that's okay (direct submission without await)
logger.debug(f"No pending defensive decision for game {game_id}")
state_manager.update_state(game_id, state)
logger.info(f"Defensive decision submitted for game {game_id}")
return state
async def submit_offensive_decision(
self,
game_id: UUID,
decision: OffensiveDecision
) -> GameState:
"""
Submit offensive team decision.
Phase 3: Now integrates with decision queue to resolve pending futures.
"""
state = state_manager.get_state(game_id)
if not state:
raise ValueError(f"Game {game_id} not found")
game_validator.validate_game_active(state)
game_validator.validate_offensive_decision(decision, state)
# Store decision in state (for backward compatibility)
state.decisions_this_play['offensive'] = decision.model_dump()
state.pending_decision = "resolution"
state.pending_offensive_decision = decision
# Phase 3: Resolve pending future if exists
batting_team_id = state.get_batting_team_id()
try:
state_manager.submit_decision(game_id, batting_team_id, decision)
logger.info(f"Resolved pending offensive decision future for game {game_id}")
except ValueError:
# No pending future - that's okay (direct submission without await)
logger.debug(f"No pending offensive decision for game {game_id}")
state_manager.update_state(game_id, state)
logger.info(f"Offensive decision submitted for game {game_id}")
return state
# ============================================================================
# PHASE 3: ENHANCED DECISION WORKFLOW
# ============================================================================
async def await_defensive_decision(
self,
state: GameState,
timeout: int = None
) -> DefensiveDecision:
"""
Wait for defensive team to submit decision.
For AI teams: Generate decision immediately
For human teams: Wait for WebSocket submission (with timeout)
Args:
state: Current game state
timeout: Seconds to wait before using default decision (default: class constant)
Returns:
DefensiveDecision (validated)
Raises:
asyncio.TimeoutError: If timeout exceeded (async games only)
"""
if timeout is None:
timeout = self.DECISION_TIMEOUT
fielding_team_id = state.get_fielding_team_id()
# Check if fielding team is AI
if state.is_fielding_team_ai():
logger.info(f"Generating AI defensive decision for game {state.game_id}")
return await ai_opponent.generate_defensive_decision(state)
# Human team: wait for decision via WebSocket
logger.info(f"Awaiting human defensive decision for game {state.game_id}, team {fielding_team_id}")
# Set pending decision in state manager
state_manager.set_pending_decision(
game_id=state.game_id,
team_id=fielding_team_id,
decision_type="defensive"
)
# Update state with decision phase
state.decision_phase = "awaiting_defensive"
state.decision_deadline = pendulum.now('UTC').add(seconds=timeout).to_iso8601_string()
state_manager.update_state(state.game_id, state)
# TODO Week 7 Task 4: Emit WebSocket event to notify frontend
# await self.connection_manager.emit_decision_required(
# game_id=state.game_id,
# team_id=fielding_team_id,
# decision_type="defensive",
# timeout=timeout,
# game_situation=state.to_situation_summary()
# )
try:
# Wait for decision with timeout
decision = await asyncio.wait_for(
state_manager.await_decision(state.game_id, fielding_team_id, "defensive"),
timeout=timeout
)
logger.info(f"Received defensive decision for game {state.game_id}")
return decision
except asyncio.TimeoutError:
# Use default decision on timeout
logger.warning(f"Defensive decision timeout for game {state.game_id}, using default")
return DefensiveDecision() # All defaults
async def await_offensive_decision(
self,
state: GameState,
timeout: int = None
) -> OffensiveDecision:
"""
Wait for offensive team to submit decision.
Similar to await_defensive_decision but for batting team.
Args:
state: Current game state
timeout: Seconds to wait before using default decision
Returns:
OffensiveDecision (validated)
Raises:
asyncio.TimeoutError: If timeout exceeded (async games only)
"""
if timeout is None:
timeout = self.DECISION_TIMEOUT
batting_team_id = state.get_batting_team_id()
# Check if batting team is AI
if state.is_batting_team_ai():
logger.info(f"Generating AI offensive decision for game {state.game_id}")
return await ai_opponent.generate_offensive_decision(state)
# Human team: wait for decision via WebSocket
logger.info(f"Awaiting human offensive decision for game {state.game_id}, team {batting_team_id}")
# Set pending decision in state manager
state_manager.set_pending_decision(
game_id=state.game_id,
team_id=batting_team_id,
decision_type="offensive"
)
# Update state with decision phase
state.decision_phase = "awaiting_offensive"
state.decision_deadline = pendulum.now('UTC').add(seconds=timeout).to_iso8601_string()
state_manager.update_state(state.game_id, state)
# TODO Week 7 Task 4: Emit WebSocket event to notify frontend
# await self.connection_manager.emit_decision_required(...)
try:
# Wait for decision with timeout
decision = await asyncio.wait_for(
state_manager.await_decision(state.game_id, batting_team_id, "offensive"),
timeout=timeout
)
logger.info(f"Received offensive decision for game {state.game_id}")
return decision
except asyncio.TimeoutError:
# Use default decision on timeout
logger.warning(f"Offensive decision timeout for game {state.game_id}, using default")
return OffensiveDecision() # All defaults
async def resolve_play(self, game_id: UUID) -> PlayResult:
"""
Resolve the current play with dice roll
Explicit orchestration sequence:
1. Resolve play with dice rolls
2. Save play to DB (uses snapshot from GameState)
3. Apply result to state (outs, score, runners)
4. Update game state in DB
5. Check for inning change (outs >= 3)
6. Prepare next play (always last step)
"""
state = state_manager.get_state(game_id)
if not state:
raise ValueError(f"Game {game_id} not found")
game_validator.validate_game_active(state)
# Get decisions
defensive_decision = DefensiveDecision(**state.decisions_this_play.get('defensive', {}))
offensive_decision = OffensiveDecision(**state.decisions_this_play.get('offensive', {}))
# STEP 1: Resolve play (this internally calls dice_system.roll_ab)
result = play_resolver.resolve_play(state, defensive_decision, offensive_decision)
# Track roll for batch saving at end of inning
if game_id not in self._rolls_this_inning:
self._rolls_this_inning[game_id] = []
self._rolls_this_inning[game_id].append(result.ab_roll)
# STEP 2: Save play to DB (uses snapshot from GameState)
await self._save_play_to_db(state, result)
# Capture state before applying result
state_before = {
'inning': state.inning,
'half': state.half,
'home_score': state.home_score,
'away_score': state.away_score,
'status': state.status
}
# STEP 3: Apply result to state (outs, score, runners)
self._apply_play_result(state, result)
# STEP 4: Update game state in DB only if something changed
if (state.inning != state_before['inning'] or
state.half != state_before['half'] or
state.home_score != state_before['home_score'] or
state.away_score != state_before['away_score'] or
state.status != state_before['status']):
await self.db_ops.update_game_state(
game_id=state.game_id,
inning=state.inning,
half=state.half,
home_score=state.home_score,
away_score=state.away_score,
status=state.status
)
logger.info(f"Updated game state in DB - score/inning/status changed")
else:
logger.debug(f"Skipped game state update - no changes to persist")
# STEP 5: Check for inning change
if state.outs >= 3:
await self._advance_inning(state, game_id)
# Update DB again after inning change
await self.db_ops.update_game_state(
game_id=state.game_id,
inning=state.inning,
half=state.half,
home_score=state.home_score,
away_score=state.away_score,
status=state.status
)
# Batch save rolls at half-inning boundary
await self._batch_save_inning_rolls(game_id)
# STEP 6: Prepare next play (always last step)
if state.status == "active": # Only prepare if game is still active
await self._prepare_next_play(state)
# Clear decisions for next play
state.decisions_this_play = {}
state.pending_decision = "defensive"
# Update in-memory state
state_manager.update_state(game_id, state)
logger.info(f"Resolved play {state.play_count} for game {game_id}: {result.description}")
return result
def _apply_play_result(self, state: GameState, result: PlayResult) -> None:
"""
Apply play result to in-memory game state.
Only updates state - NO database writes (handled by orchestration layer).
"""
# Update outs
state.outs += result.outs_recorded
# Build advancement lookup
advancement_map = {from_base: to_base for from_base, to_base in result.runners_advanced}
# Create temporary storage for new runner positions
new_first = None
new_second = None
new_third = None
# Process existing runners
for base, runner in state.get_all_runners():
if base in advancement_map:
to_base = advancement_map[base]
if to_base < 4: # Not scored
if to_base == 1:
new_first = runner
elif to_base == 2:
new_second = runner
elif to_base == 3:
new_third = runner
# If to_base == 4, runner scored (don't add to new positions)
else:
# Runner stays put
if base == 1:
new_first = runner
elif base == 2:
new_second = runner
elif base == 3:
new_third = runner
# Add batter if reached base
if result.batter_result and result.batter_result < 4:
# Look up the actual batter from cached lineup
batting_team_id = state.away_team_id if state.half == "top" else state.home_team_id
batting_lineup = state_manager.get_lineup(state.game_id, batting_team_id)
batter = None
if batting_lineup and state.current_batter_lineup_id:
# Find the batter in the lineup
batter = batting_lineup.get_player_by_lineup_id(state.current_batter_lineup_id)
if not batter:
# Fallback: create minimal LineupPlayerState
# This shouldn't happen if _prepare_next_play was called correctly
from app.models.game_models import LineupPlayerState
logger.warning(f"Could not find batter lineup_id={state.current_batter_lineup_id} in cached lineup, using fallback")
batter = LineupPlayerState(
lineup_id=state.current_batter_lineup_id or 0,
card_id=0,
position="DH", # Use DH as fallback position
batting_order=None
)
if result.batter_result == 1:
new_first = batter
elif result.batter_result == 2:
new_second = batter
elif result.batter_result == 3:
new_third = batter
# Update state with new runner positions
state.on_first = new_first
state.on_second = new_second
state.on_third = new_third
# Update score
if state.half == "top":
state.away_score += result.runs_scored
else:
state.home_score += result.runs_scored
# Increment play count
state.play_count += 1
state.last_play_result = result.description
runner_count = len([r for r in [state.on_first, state.on_second, state.on_third] if r])
logger.debug(
f"Applied play result: outs={state.outs}, "
f"score={state.away_score}-{state.home_score}, "
f"runners={runner_count}"
)
async def _advance_inning(self, state: GameState, game_id: UUID) -> None:
"""
Advance to next half inning.
Only handles inning transition - NO database writes, NO prepare_next_play.
Those are handled by the orchestration layer.
Validates defensive team lineup positions at start of each half inning.
"""
if state.half == "top":
state.half = "bottom"
else:
state.half = "top"
state.inning += 1
# Clear bases and reset outs
state.outs = 0
state.clear_bases()
# Validate defensive team lineup positions
# Top of inning: home team is defending
# Bottom of inning: away team is defending
defensive_team = state.home_team_id if state.half == "top" else state.away_team_id
defensive_lineup = await self.db_ops.get_active_lineup(state.game_id, defensive_team)
if not defensive_lineup:
raise ValidationError(f"No lineup found for defensive team {defensive_team}")
game_validator.validate_defensive_lineup_positions(defensive_lineup)
logger.info(f"Advanced to inning {state.inning} {state.half}")
# Check if game is over
if game_validator.is_game_over(state):
state.status = "completed"
logger.info(f"Game {state.game_id} completed - Final: Away {state.away_score}, Home {state.home_score}")
async def _prepare_next_play(self, state: GameState) -> None:
"""
Prepare snapshot for the next play.
This method:
1. Determines current batter based on batting order index
2. Advances the appropriate team's batter index (with wraparound)
3. Fetches active lineups from database
4. Sets snapshot fields: current_batter/pitcher/catcher_lineup_id
5. Calculates on_base_code from current runners
This snapshot is used when saving the Play record to DB.
"""
# Determine which team is batting
if state.half == "top":
# Away team batting
current_idx = state.away_team_batter_idx
state.away_team_batter_idx = (current_idx + 1) % 9
batting_team = state.away_team_id
fielding_team = state.home_team_id
else:
# Home team batting
current_idx = state.home_team_batter_idx
state.home_team_batter_idx = (current_idx + 1) % 9
batting_team = state.home_team_id
fielding_team = state.away_team_id
# Try to get lineups from cache first, only fetch from DB if not cached
from app.models.game_models import TeamLineupState, LineupPlayerState
batting_lineup_state = state_manager.get_lineup(state.game_id, batting_team)
fielding_lineup_state = state_manager.get_lineup(state.game_id, fielding_team)
# Fetch from database only if not in cache
if not batting_lineup_state:
batting_lineup = await self.db_ops.get_active_lineup(state.game_id, batting_team)
if batting_lineup:
batting_lineup_state = TeamLineupState(
team_id=batting_team,
players=[
LineupPlayerState(
lineup_id=p.id, # type: ignore[assignment]
card_id=p.card_id if p.card_id else 0, # type: ignore[assignment]
position=p.position, # type: ignore[assignment]
batting_order=p.batting_order, # type: ignore[assignment]
is_active=p.is_active # type: ignore[assignment]
)
for p in batting_lineup
]
)
state_manager.set_lineup(state.game_id, batting_team, batting_lineup_state)
if not fielding_lineup_state:
fielding_lineup = await self.db_ops.get_active_lineup(state.game_id, fielding_team)
if fielding_lineup:
fielding_lineup_state = TeamLineupState(
team_id=fielding_team,
players=[
LineupPlayerState(
lineup_id=p.id, # type: ignore[assignment]
card_id=p.card_id if p.card_id else 0, # type: ignore[assignment]
position=p.position, # type: ignore[assignment]
batting_order=p.batting_order, # type: ignore[assignment]
is_active=p.is_active # type: ignore[assignment]
)
for p in fielding_lineup
]
)
state_manager.set_lineup(state.game_id, fielding_team, fielding_lineup_state)
# Set current player snapshot using cached lineup data
# Batter: use the batting order index to find the player
if batting_lineup_state and current_idx < len(batting_lineup_state.players):
# Get batting order sorted list
batting_order = sorted(
[p for p in batting_lineup_state.players if p.batting_order is not None],
key=lambda x: x.batting_order or 0
)
if current_idx < len(batting_order):
state.current_batter_lineup_id = batting_order[current_idx].lineup_id
else:
state.current_batter_lineup_id = None
logger.warning(f"Batter index {current_idx} out of range for batting order")
else:
state.current_batter_lineup_id = None
logger.warning(f"No batting lineup found for team {batting_team}")
# Pitcher and catcher: find by position from cached lineup
if fielding_lineup_state:
pitcher = next((p for p in fielding_lineup_state.players if p.position == "P"), None)
state.current_pitcher_lineup_id = pitcher.lineup_id if pitcher else None
catcher = next((p for p in fielding_lineup_state.players if p.position == "C"), None)
state.current_catcher_lineup_id = catcher.lineup_id if catcher else None
else:
state.current_pitcher_lineup_id = None
state.current_catcher_lineup_id = None
# Calculate on_base_code from current runners (bit field)
state.current_on_base_code = 0
if state.on_first:
state.current_on_base_code |= 1 # Bit 0: first base
if state.on_second:
state.current_on_base_code |= 2 # Bit 1: second base
if state.on_third:
state.current_on_base_code |= 4 # Bit 2: third base
logger.debug(
f"Prepared next play: batter={state.current_batter_lineup_id}, "
f"pitcher={state.current_pitcher_lineup_id}, "
f"catcher={state.current_catcher_lineup_id}, "
f"on_base_code={state.current_on_base_code}"
)
async def _batch_save_inning_rolls(self, game_id: UUID) -> None:
"""
Batch save all rolls from the inning
This is called at end of each half-inning to persist
all dice rolls with their context to the database.
"""
if game_id not in self._rolls_this_inning:
logger.debug(f"No rolls to save for game {game_id}")
return
rolls = self._rolls_this_inning[game_id]
if not rolls:
logger.debug(f"Empty roll list for game {game_id}")
return
try:
await self.db_ops.save_rolls_batch(rolls)
logger.info(f"Batch saved {len(rolls)} rolls for game {game_id}")
# Clear rolls for this inning
self._rolls_this_inning[game_id] = []
except Exception as e:
logger.error(f"Failed to batch save rolls for game {game_id}: {e}")
# Don't fail the game - rolls are still in dice_system history
# We can recover them later if needed
async def _save_play_to_db(self, state: GameState, result: PlayResult) -> None:
"""
Save play to database using snapshot from GameState.
Uses the pre-calculated snapshot fields (no database lookbacks).
Raises:
ValueError: If required player IDs are missing
"""
# Use snapshot from GameState (set by _prepare_next_play)
batter_id = state.current_batter_lineup_id
pitcher_id = state.current_pitcher_lineup_id
catcher_id = state.current_catcher_lineup_id
on_base_code = state.current_on_base_code
# VERIFY required fields are present
if batter_id is None:
raise ValueError(
f"Cannot save play: batter_id is None. "
f"Game {state.game_id} may need _prepare_next_play() called after recovery."
)
if pitcher_id is None:
raise ValueError(
f"Cannot save play: pitcher_id is None. "
f"Game {state.game_id} may need _prepare_next_play() called after recovery."
)
if catcher_id is None:
raise ValueError(
f"Cannot save play: catcher_id is None. "
f"Game {state.game_id} may need _prepare_next_play() called after recovery."
)
# Runners on base BEFORE play (from state.on_first/second/third)
on_first_id = state.on_first.lineup_id if state.on_first else None
on_second_id = state.on_second.lineup_id if state.on_second else None
on_third_id = state.on_third.lineup_id if state.on_third else None
# Runners AFTER play (from result.runners_advanced)
# Build dict of from_base -> to_base for quick lookup
finals = {from_base: to_base for from_base, to_base in result.runners_advanced}
on_first_final = finals.get(1) # None if out/scored, 1-4 if advanced
on_second_final = finals.get(2) # None if out/scored, 1-4 if advanced
on_third_final = finals.get(3) # None if out/scored, 1-4 if advanced
# Batter result (None=out, 1-4=base reached)
batter_final = result.batter_result
play_data = {
"game_id": state.game_id,
"play_number": state.play_count,
"inning": state.inning,
"half": state.half,
"outs_before": state.outs, # Capture current outs BEFORE applying result
"outs_recorded": result.outs_recorded,
# Player IDs from snapshot
"batter_id": batter_id,
"pitcher_id": pitcher_id,
"catcher_id": catcher_id,
# Base situation snapshot
"on_base_code": on_base_code,
"on_first_id": on_first_id,
"on_second_id": on_second_id,
"on_third_id": on_third_id,
# Final positions
"on_first_final": on_first_final,
"on_second_final": on_second_final,
"on_third_final": on_third_final,
"batter_final": batter_final,
# Play outcome
"dice_roll": str(result.ab_roll),
"hit_type": result.outcome.value,
"result_description": result.description,
"runs_scored": result.runs_scored,
"away_score": state.away_score,
"home_score": state.home_score,
"complete": True,
# Strategic decisions
"defensive_choices": state.decisions_this_play.get('defensive', {}),
"offensive_choices": state.decisions_this_play.get('offensive', {})
}
# Add metadata for uncapped hits (Phase 3: will include runner advancement decisions)
play_metadata = {}
if result.outcome in [PlayOutcome.SINGLE_UNCAPPED, PlayOutcome.DOUBLE_UNCAPPED]:
play_metadata["uncapped"] = True
play_metadata["outcome_type"] = result.outcome.value
play_data["play_metadata"] = play_metadata
await self.db_ops.save_play(play_data)
logger.debug(f"Saved play {state.play_count}: batter={batter_id}, on_base={on_base_code}")
async def get_game_state(self, game_id: UUID) -> Optional[GameState]:
"""Get current game state"""
return state_manager.get_state(game_id)
async def end_game(self, game_id: UUID) -> GameState:
"""
Manually end a game
For forfeit, abandonment, etc.
"""
state = state_manager.get_state(game_id)
if not state:
raise ValueError(f"Game {game_id} not found")
# Batch save any remaining rolls
await self._batch_save_inning_rolls(game_id)
state.status = "completed"
state_manager.update_state(game_id, state)
await self.db_ops.update_game_state(
game_id=game_id,
inning=state.inning,
half=state.half,
home_score=state.home_score,
away_score=state.away_score,
status="completed"
)
logger.info(f"Game {game_id} ended manually")
return state
# Singleton instance
game_engine = GameEngine()