strat-gameplay-webapp/backend/app/core/game_engine.py
Cal Corum 6880b6d5ad CLAUDE: Complete Week 6 - granular PlayOutcome integration and metadata support
- Renamed check_d20 → chaos_d20 throughout dice system
- Expanded PlayOutcome enum with granular variants (SINGLE_1/2, DOUBLE_2/3, GROUNDBALL_A/B/C, etc.)
- Integrated PlayOutcome from app.config into PlayResolver
- Added play_metadata support for uncapped hit tracking
- Updated all tests (139/140 passing)

Week 6: 100% Complete - Ready for Phase 3

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-29 20:29:06 -05:00

645 lines
25 KiB
Python

"""
Game Engine - Main game orchestration engine.
Coordinates game flow, validates actions, resolves plays, and persists state.
Integrates DiceSystem for roll tracking with context and batch saving.
Author: Claude
Date: 2025-10-24
"""
import logging
from uuid import UUID
from typing import Optional, List
from app.core.state_manager import state_manager
from app.core.play_resolver import play_resolver, PlayResult
from app.config import PlayOutcome
from app.core.validators import game_validator, ValidationError
from app.core.dice import dice_system
from app.database.operations import DatabaseOperations
from app.models.game_models import (
GameState, DefensiveDecision, OffensiveDecision
)
logger = logging.getLogger(f'{__name__}.GameEngine')
class GameEngine:
"""Main game orchestration engine"""
def __init__(self):
self.db_ops = DatabaseOperations()
# Track rolls per inning for batch saving
self._rolls_this_inning: dict[UUID, List] = {}
async def start_game(self, game_id: UUID) -> GameState:
"""
Start a game
Transitions from 'pending' to 'active'.
Validates that both teams have complete lineups (minimum 9 players each).
Prepares the first play snapshot.
Raises:
ValidationError: If game already started or lineups incomplete
"""
state = state_manager.get_state(game_id)
if not state:
raise ValueError(f"Game {game_id} not found in state manager")
if state.status != "pending":
raise ValidationError(f"Game already started (status: {state.status})")
# HARD REQUIREMENT: Validate both lineups are complete
# At game start, we validate BOTH teams (exception to the "defensive only" rule)
home_lineup = await self.db_ops.get_active_lineup(state.game_id, state.home_team_id)
away_lineup = await self.db_ops.get_active_lineup(state.game_id, state.away_team_id)
# Check minimum 9 players per team
if not home_lineup or len(home_lineup) < 9:
raise ValidationError(
f"Home team lineup incomplete: {len(home_lineup) if home_lineup else 0} players "
f"(minimum 9 required)"
)
if not away_lineup or len(away_lineup) < 9:
raise ValidationError(
f"Away team lineup incomplete: {len(away_lineup) if away_lineup else 0} players "
f"(minimum 9 required)"
)
# Validate defensive positions - at game start, check BOTH teams
try:
game_validator.validate_defensive_lineup_positions(home_lineup)
except ValidationError as e:
raise ValidationError(f"Home team: {e}")
try:
game_validator.validate_defensive_lineup_positions(away_lineup)
except ValidationError as e:
raise ValidationError(f"Away team: {e}")
# Mark as active
state.status = "active"
state.inning = 1
state.half = "top"
state.outs = 0
# Initialize roll tracking for this game
self._rolls_this_inning[game_id] = []
# Prepare first play snapshot
await self._prepare_next_play(state)
# Update state
state_manager.update_state(game_id, state)
# Persist to DB
await self.db_ops.update_game_state(
game_id=game_id,
inning=1,
half="top",
home_score=0,
away_score=0,
status="active"
)
logger.info(
f"Started game {game_id} - First batter: lineup_id={state.current_batter_lineup_id}"
)
return state
async def submit_defensive_decision(
self,
game_id: UUID,
decision: DefensiveDecision
) -> GameState:
"""Submit defensive team decision"""
state = state_manager.get_state(game_id)
if not state:
raise ValueError(f"Game {game_id} not found")
game_validator.validate_game_active(state)
game_validator.validate_defensive_decision(decision, state)
# Store decision
state.decisions_this_play['defensive'] = decision.model_dump()
state.pending_decision = "offensive"
state_manager.update_state(game_id, state)
logger.info(f"Defensive decision submitted for game {game_id}")
return state
async def submit_offensive_decision(
self,
game_id: UUID,
decision: OffensiveDecision
) -> GameState:
"""Submit offensive team decision"""
state = state_manager.get_state(game_id)
if not state:
raise ValueError(f"Game {game_id} not found")
game_validator.validate_game_active(state)
game_validator.validate_offensive_decision(decision, state)
# Store decision
state.decisions_this_play['offensive'] = decision.model_dump()
state.pending_decision = "resolution"
state_manager.update_state(game_id, state)
logger.info(f"Offensive decision submitted for game {game_id}")
return state
async def resolve_play(self, game_id: UUID) -> PlayResult:
"""
Resolve the current play with dice roll
Explicit orchestration sequence:
1. Resolve play with dice rolls
2. Save play to DB (uses snapshot from GameState)
3. Apply result to state (outs, score, runners)
4. Update game state in DB
5. Check for inning change (outs >= 3)
6. Prepare next play (always last step)
"""
state = state_manager.get_state(game_id)
if not state:
raise ValueError(f"Game {game_id} not found")
game_validator.validate_game_active(state)
# Get decisions
defensive_decision = DefensiveDecision(**state.decisions_this_play.get('defensive', {}))
offensive_decision = OffensiveDecision(**state.decisions_this_play.get('offensive', {}))
# STEP 1: Resolve play (this internally calls dice_system.roll_ab)
result = play_resolver.resolve_play(state, defensive_decision, offensive_decision)
# Track roll for batch saving at end of inning
if game_id not in self._rolls_this_inning:
self._rolls_this_inning[game_id] = []
self._rolls_this_inning[game_id].append(result.ab_roll)
# STEP 2: Save play to DB (uses snapshot from GameState)
await self._save_play_to_db(state, result)
# Capture state before applying result
state_before = {
'inning': state.inning,
'half': state.half,
'home_score': state.home_score,
'away_score': state.away_score,
'status': state.status
}
# STEP 3: Apply result to state (outs, score, runners)
self._apply_play_result(state, result)
# STEP 4: Update game state in DB only if something changed
if (state.inning != state_before['inning'] or
state.half != state_before['half'] or
state.home_score != state_before['home_score'] or
state.away_score != state_before['away_score'] or
state.status != state_before['status']):
await self.db_ops.update_game_state(
game_id=state.game_id,
inning=state.inning,
half=state.half,
home_score=state.home_score,
away_score=state.away_score,
status=state.status
)
logger.info(f"Updated game state in DB - score/inning/status changed")
else:
logger.debug(f"Skipped game state update - no changes to persist")
# STEP 5: Check for inning change
if state.outs >= 3:
await self._advance_inning(state, game_id)
# Update DB again after inning change
await self.db_ops.update_game_state(
game_id=state.game_id,
inning=state.inning,
half=state.half,
home_score=state.home_score,
away_score=state.away_score,
status=state.status
)
# Batch save rolls at half-inning boundary
await self._batch_save_inning_rolls(game_id)
# STEP 6: Prepare next play (always last step)
if state.status == "active": # Only prepare if game is still active
await self._prepare_next_play(state)
# Clear decisions for next play
state.decisions_this_play = {}
state.pending_decision = "defensive"
# Update in-memory state
state_manager.update_state(game_id, state)
logger.info(f"Resolved play {state.play_count} for game {game_id}: {result.description}")
return result
def _apply_play_result(self, state: GameState, result: PlayResult) -> None:
"""
Apply play result to in-memory game state.
Only updates state - NO database writes (handled by orchestration layer).
"""
# Update outs
state.outs += result.outs_recorded
# Build advancement lookup
advancement_map = {from_base: to_base for from_base, to_base in result.runners_advanced}
# Create temporary storage for new runner positions
new_first = None
new_second = None
new_third = None
# Process existing runners
for base, runner in state.get_all_runners():
if base in advancement_map:
to_base = advancement_map[base]
if to_base < 4: # Not scored
if to_base == 1:
new_first = runner
elif to_base == 2:
new_second = runner
elif to_base == 3:
new_third = runner
# If to_base == 4, runner scored (don't add to new positions)
else:
# Runner stays put
if base == 1:
new_first = runner
elif base == 2:
new_second = runner
elif base == 3:
new_third = runner
# Add batter if reached base
if result.batter_result and result.batter_result < 4:
# Look up the actual batter from cached lineup
batting_team_id = state.away_team_id if state.half == "top" else state.home_team_id
batting_lineup = state_manager.get_lineup(state.game_id, batting_team_id)
batter = None
if batting_lineup and state.current_batter_lineup_id:
# Find the batter in the lineup
batter = batting_lineup.get_player_by_lineup_id(state.current_batter_lineup_id)
if not batter:
# Fallback: create minimal LineupPlayerState
# This shouldn't happen if _prepare_next_play was called correctly
from app.models.game_models import LineupPlayerState
logger.warning(f"Could not find batter lineup_id={state.current_batter_lineup_id} in cached lineup, using fallback")
batter = LineupPlayerState(
lineup_id=state.current_batter_lineup_id or 0,
card_id=0,
position="DH", # Use DH as fallback position
batting_order=None
)
if result.batter_result == 1:
new_first = batter
elif result.batter_result == 2:
new_second = batter
elif result.batter_result == 3:
new_third = batter
# Update state with new runner positions
state.on_first = new_first
state.on_second = new_second
state.on_third = new_third
# Update score
if state.half == "top":
state.away_score += result.runs_scored
else:
state.home_score += result.runs_scored
# Increment play count
state.play_count += 1
state.last_play_result = result.description
runner_count = len([r for r in [state.on_first, state.on_second, state.on_third] if r])
logger.debug(
f"Applied play result: outs={state.outs}, "
f"score={state.away_score}-{state.home_score}, "
f"runners={runner_count}"
)
async def _advance_inning(self, state: GameState, game_id: UUID) -> None:
"""
Advance to next half inning.
Only handles inning transition - NO database writes, NO prepare_next_play.
Those are handled by the orchestration layer.
Validates defensive team lineup positions at start of each half inning.
"""
if state.half == "top":
state.half = "bottom"
else:
state.half = "top"
state.inning += 1
# Clear bases and reset outs
state.outs = 0
state.clear_bases()
# Validate defensive team lineup positions
# Top of inning: home team is defending
# Bottom of inning: away team is defending
defensive_team = state.home_team_id if state.half == "top" else state.away_team_id
defensive_lineup = await self.db_ops.get_active_lineup(state.game_id, defensive_team)
if not defensive_lineup:
raise ValidationError(f"No lineup found for defensive team {defensive_team}")
game_validator.validate_defensive_lineup_positions(defensive_lineup)
logger.info(f"Advanced to inning {state.inning} {state.half}")
# Check if game is over
if game_validator.is_game_over(state):
state.status = "completed"
logger.info(f"Game {state.game_id} completed - Final: Away {state.away_score}, Home {state.home_score}")
async def _prepare_next_play(self, state: GameState) -> None:
"""
Prepare snapshot for the next play.
This method:
1. Determines current batter based on batting order index
2. Advances the appropriate team's batter index (with wraparound)
3. Fetches active lineups from database
4. Sets snapshot fields: current_batter/pitcher/catcher_lineup_id
5. Calculates on_base_code from current runners
This snapshot is used when saving the Play record to DB.
"""
# Determine which team is batting
if state.half == "top":
# Away team batting
current_idx = state.away_team_batter_idx
state.away_team_batter_idx = (current_idx + 1) % 9
batting_team = state.away_team_id
fielding_team = state.home_team_id
else:
# Home team batting
current_idx = state.home_team_batter_idx
state.home_team_batter_idx = (current_idx + 1) % 9
batting_team = state.home_team_id
fielding_team = state.away_team_id
# Try to get lineups from cache first, only fetch from DB if not cached
from app.models.game_models import TeamLineupState, LineupPlayerState
batting_lineup_state = state_manager.get_lineup(state.game_id, batting_team)
fielding_lineup_state = state_manager.get_lineup(state.game_id, fielding_team)
# Fetch from database only if not in cache
if not batting_lineup_state:
batting_lineup = await self.db_ops.get_active_lineup(state.game_id, batting_team)
if batting_lineup:
batting_lineup_state = TeamLineupState(
team_id=batting_team,
players=[
LineupPlayerState(
lineup_id=p.id, # type: ignore[assignment]
card_id=p.card_id if p.card_id else 0, # type: ignore[assignment]
position=p.position, # type: ignore[assignment]
batting_order=p.batting_order, # type: ignore[assignment]
is_active=p.is_active # type: ignore[assignment]
)
for p in batting_lineup
]
)
state_manager.set_lineup(state.game_id, batting_team, batting_lineup_state)
if not fielding_lineup_state:
fielding_lineup = await self.db_ops.get_active_lineup(state.game_id, fielding_team)
if fielding_lineup:
fielding_lineup_state = TeamLineupState(
team_id=fielding_team,
players=[
LineupPlayerState(
lineup_id=p.id, # type: ignore[assignment]
card_id=p.card_id if p.card_id else 0, # type: ignore[assignment]
position=p.position, # type: ignore[assignment]
batting_order=p.batting_order, # type: ignore[assignment]
is_active=p.is_active # type: ignore[assignment]
)
for p in fielding_lineup
]
)
state_manager.set_lineup(state.game_id, fielding_team, fielding_lineup_state)
# Set current player snapshot using cached lineup data
# Batter: use the batting order index to find the player
if batting_lineup_state and current_idx < len(batting_lineup_state.players):
# Get batting order sorted list
batting_order = sorted(
[p for p in batting_lineup_state.players if p.batting_order is not None],
key=lambda x: x.batting_order or 0
)
if current_idx < len(batting_order):
state.current_batter_lineup_id = batting_order[current_idx].lineup_id
else:
state.current_batter_lineup_id = None
logger.warning(f"Batter index {current_idx} out of range for batting order")
else:
state.current_batter_lineup_id = None
logger.warning(f"No batting lineup found for team {batting_team}")
# Pitcher and catcher: find by position from cached lineup
if fielding_lineup_state:
pitcher = next((p for p in fielding_lineup_state.players if p.position == "P"), None)
state.current_pitcher_lineup_id = pitcher.lineup_id if pitcher else None
catcher = next((p for p in fielding_lineup_state.players if p.position == "C"), None)
state.current_catcher_lineup_id = catcher.lineup_id if catcher else None
else:
state.current_pitcher_lineup_id = None
state.current_catcher_lineup_id = None
# Calculate on_base_code from current runners (bit field)
state.current_on_base_code = 0
if state.on_first:
state.current_on_base_code |= 1 # Bit 0: first base
if state.on_second:
state.current_on_base_code |= 2 # Bit 1: second base
if state.on_third:
state.current_on_base_code |= 4 # Bit 2: third base
logger.debug(
f"Prepared next play: batter={state.current_batter_lineup_id}, "
f"pitcher={state.current_pitcher_lineup_id}, "
f"catcher={state.current_catcher_lineup_id}, "
f"on_base_code={state.current_on_base_code}"
)
async def _batch_save_inning_rolls(self, game_id: UUID) -> None:
"""
Batch save all rolls from the inning
This is called at end of each half-inning to persist
all dice rolls with their context to the database.
"""
if game_id not in self._rolls_this_inning:
logger.debug(f"No rolls to save for game {game_id}")
return
rolls = self._rolls_this_inning[game_id]
if not rolls:
logger.debug(f"Empty roll list for game {game_id}")
return
try:
await self.db_ops.save_rolls_batch(rolls)
logger.info(f"Batch saved {len(rolls)} rolls for game {game_id}")
# Clear rolls for this inning
self._rolls_this_inning[game_id] = []
except Exception as e:
logger.error(f"Failed to batch save rolls for game {game_id}: {e}")
# Don't fail the game - rolls are still in dice_system history
# We can recover them later if needed
async def _save_play_to_db(self, state: GameState, result: PlayResult) -> None:
"""
Save play to database using snapshot from GameState.
Uses the pre-calculated snapshot fields (no database lookbacks).
Raises:
ValueError: If required player IDs are missing
"""
# Use snapshot from GameState (set by _prepare_next_play)
batter_id = state.current_batter_lineup_id
pitcher_id = state.current_pitcher_lineup_id
catcher_id = state.current_catcher_lineup_id
on_base_code = state.current_on_base_code
# VERIFY required fields are present
if batter_id is None:
raise ValueError(
f"Cannot save play: batter_id is None. "
f"Game {state.game_id} may need _prepare_next_play() called after recovery."
)
if pitcher_id is None:
raise ValueError(
f"Cannot save play: pitcher_id is None. "
f"Game {state.game_id} may need _prepare_next_play() called after recovery."
)
if catcher_id is None:
raise ValueError(
f"Cannot save play: catcher_id is None. "
f"Game {state.game_id} may need _prepare_next_play() called after recovery."
)
# Runners on base BEFORE play (from state.on_first/second/third)
on_first_id = state.on_first.lineup_id if state.on_first else None
on_second_id = state.on_second.lineup_id if state.on_second else None
on_third_id = state.on_third.lineup_id if state.on_third else None
# Runners AFTER play (from result.runners_advanced)
# Build dict of from_base -> to_base for quick lookup
finals = {from_base: to_base for from_base, to_base in result.runners_advanced}
on_first_final = finals.get(1) # None if out/scored, 1-4 if advanced
on_second_final = finals.get(2) # None if out/scored, 1-4 if advanced
on_third_final = finals.get(3) # None if out/scored, 1-4 if advanced
# Batter result (None=out, 1-4=base reached)
batter_final = result.batter_result
play_data = {
"game_id": state.game_id,
"play_number": state.play_count,
"inning": state.inning,
"half": state.half,
"outs_before": state.outs, # Capture current outs BEFORE applying result
"outs_recorded": result.outs_recorded,
# Player IDs from snapshot
"batter_id": batter_id,
"pitcher_id": pitcher_id,
"catcher_id": catcher_id,
# Base situation snapshot
"on_base_code": on_base_code,
"on_first_id": on_first_id,
"on_second_id": on_second_id,
"on_third_id": on_third_id,
# Final positions
"on_first_final": on_first_final,
"on_second_final": on_second_final,
"on_third_final": on_third_final,
"batter_final": batter_final,
# Play outcome
"dice_roll": str(result.ab_roll),
"hit_type": result.outcome.value,
"result_description": result.description,
"runs_scored": result.runs_scored,
"away_score": state.away_score,
"home_score": state.home_score,
"complete": True,
# Strategic decisions
"defensive_choices": state.decisions_this_play.get('defensive', {}),
"offensive_choices": state.decisions_this_play.get('offensive', {})
}
# Add metadata for uncapped hits (Phase 3: will include runner advancement decisions)
play_metadata = {}
if result.outcome in [PlayOutcome.SINGLE_UNCAPPED, PlayOutcome.DOUBLE_UNCAPPED]:
play_metadata["uncapped"] = True
play_metadata["outcome_type"] = result.outcome.value
play_data["play_metadata"] = play_metadata
await self.db_ops.save_play(play_data)
logger.debug(f"Saved play {state.play_count}: batter={batter_id}, on_base={on_base_code}")
async def get_game_state(self, game_id: UUID) -> Optional[GameState]:
"""Get current game state"""
return state_manager.get_state(game_id)
async def end_game(self, game_id: UUID) -> GameState:
"""
Manually end a game
For forfeit, abandonment, etc.
"""
state = state_manager.get_state(game_id)
if not state:
raise ValueError(f"Game {game_id} not found")
# Batch save any remaining rolls
await self._batch_save_inning_rolls(game_id)
state.status = "completed"
state_manager.update_state(game_id, state)
await self.db_ops.update_game_state(
game_id=game_id,
inning=state.inning,
half=state.half,
home_score=state.home_score,
away_score=state.away_score,
status="completed"
)
logger.info(f"Game {game_id} ended manually")
return state
# Singleton instance
game_engine = GameEngine()