strat-gameplay-webapp/backend/app/core/state_manager.py
Cal Corum a696473d0a CLAUDE: Integrate flyball advancement with RunnerAdvancement system
Major Phase 2 refactoring to consolidate runner advancement logic:

**Flyball System Enhancement**:
- Add FLYOUT_BQ variant (medium-shallow depth)
- 4 flyball types with clear semantics: A (deep), B (medium), BQ (medium-shallow), C (shallow)
- Updated helper methods to include FLYOUT_BQ

**RunnerAdvancement Integration**:
- Extend runner_advancement.py to handle both groundballs AND flyballs
- advance_runners() routes to _advance_runners_groundball() or _advance_runners_flyball()
- Comprehensive flyball logic with proper DECIDE mechanics per flyball type
- No-op movements recorded for state recovery consistency

**PlayResolver Refactoring**:
- Consolidate all 4 flyball outcomes to delegate to RunnerAdvancement (DRY)
- Eliminate duplicate flyball resolution code
- Rename helpers for clarity: _advance_on_single_1/_advance_on_single_2 (was _advance_on_single)
- Fix single/double advancement logic for different hit types

**State Recovery Fix**:
- Fix state_manager.py game recovery to build LineupPlayerState objects properly
- Use get_lineup_player() helper to construct from lineup data
- Correctly track runners in on_first/on_second/on_third fields (matches Phase 2 model)

**Database Support**:
- Add runner tracking fields to play data for accurate recovery
- Include batter_id, on_first_id, on_second_id, on_third_id, and *_final fields

**Type Safety Improvements**:
- Fix lineup_id access throughout runner_advancement.py (was accessing on_first directly, now on_first.lineup_id)
- Make current_batter_lineup_id non-optional (always set by _prepare_next_play)
- Add type: ignore for known SQLAlchemy false positives

**Documentation**:
- Update CLAUDE.md with comprehensive flyball documentation
- Add flyball types table, usage examples, and test coverage notes
- Document differences between groundball and flyball mechanics

**Testing**:
- Add test_flyball_advancement.py with 21 flyball tests
- Coverage: all 4 types, DECIDE scenarios, no-op movements, edge cases

🚀 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-31 17:04:23 -05:00

570 lines
20 KiB
Python

"""
State Manager - In-memory game state management.
Manages active game states in memory for fast gameplay (<500ms response time).
Provides CRUD operations, lineup management, and state recovery from database.
This is the single source of truth for active game states during gameplay.
Author: Claude
Date: 2025-10-22
"""
import asyncio
import logging
from typing import Dict, Optional, Union
from uuid import UUID
import pendulum
from app.models.game_models import GameState, TeamLineupState, LineupPlayerState, DefensiveDecision, OffensiveDecision
from app.database.operations import DatabaseOperations
logger = logging.getLogger(f'{__name__}.StateManager')
class StateManager:
"""
Manages in-memory game states for active games.
Responsibilities:
- Store game states in memory for fast access
- Manage team lineups per game
- Track last access times for eviction
- Recover game states from database on demand
This class uses dictionaries for O(1) lookups of game state by game_id.
"""
def __init__(self):
"""Initialize the state manager with empty storage"""
self._states: Dict[UUID, GameState] = {}
self._lineups: Dict[UUID, Dict[int, TeamLineupState]] = {} # game_id -> {team_id: lineup}
self._last_access: Dict[UUID, pendulum.DateTime] = {}
# Phase 3: Decision queue for async decision awaiting
# Key: (game_id, team_id, decision_type)
self._pending_decisions: Dict[tuple[UUID, int, str], asyncio.Future] = {}
self.db_ops = DatabaseOperations()
logger.info("StateManager initialized")
async def create_game(
self,
game_id: UUID,
league_id: str,
home_team_id: int,
away_team_id: int,
home_team_is_ai: bool = False,
away_team_is_ai: bool = False,
auto_mode: bool = False
) -> GameState:
"""
Create a new game state in memory.
Args:
game_id: Unique game identifier
league_id: League identifier ('sba' or 'pd')
home_team_id: Home team ID
away_team_id: Away team ID
home_team_is_ai: Whether home team is AI-controlled
away_team_is_ai: Whether away team is AI-controlled
auto_mode: True = auto-generate outcomes (PD only), False = manual submissions
Returns:
Newly created GameState
Raises:
ValueError: If game_id already exists
"""
if game_id in self._states:
raise ValueError(f"Game {game_id} already exists in state manager")
logger.info(f"Creating game state for {game_id} ({league_id} league, auto_mode={auto_mode})")
state = GameState(
game_id=game_id,
league_id=league_id,
home_team_id=home_team_id,
away_team_id=away_team_id,
home_team_is_ai=home_team_is_ai,
away_team_is_ai=away_team_is_ai,
auto_mode=auto_mode,
current_batter_lineup_id=0 # Will be set by _prepare_next_play() when game starts
)
self._states[game_id] = state
self._lineups[game_id] = {}
self._last_access[game_id] = pendulum.now('UTC')
logger.debug(f"Game {game_id} created in memory")
return state
def get_state(self, game_id: UUID) -> Optional[GameState]:
"""
Get game state by ID.
Updates last access time when accessed.
Args:
game_id: Game identifier
Returns:
GameState if found, None otherwise
"""
if game_id in self._states:
self._last_access[game_id] = pendulum.now('UTC')
return self._states[game_id]
return None
def update_state(self, game_id: UUID, state: GameState) -> None:
"""
Update game state.
Args:
game_id: Game identifier
state: Updated GameState
Raises:
ValueError: If game_id doesn't exist
"""
if game_id not in self._states:
raise ValueError(f"Game {game_id} not found in state manager")
self._states[game_id] = state
self._last_access[game_id] = pendulum.now('UTC')
logger.debug(f"Updated state for game {game_id} (inning {state.inning}, {state.half})")
def set_lineup(self, game_id: UUID, team_id: int, lineup: TeamLineupState) -> None:
"""
Set team lineup for a game.
Args:
game_id: Game identifier
team_id: Team identifier
lineup: Team lineup state
Raises:
ValueError: If game_id doesn't exist
"""
if game_id not in self._states:
raise ValueError(f"Game {game_id} not found in state manager")
if game_id not in self._lineups:
self._lineups[game_id] = {}
self._lineups[game_id][team_id] = lineup
logger.info(f"Set lineup for team {team_id} in game {game_id} ({len(lineup.players)} players)")
def get_lineup(self, game_id: UUID, team_id: int) -> Optional[TeamLineupState]:
"""
Get team lineup for a game.
Args:
game_id: Game identifier
team_id: Team identifier
Returns:
TeamLineupState if found, None otherwise
"""
return self._lineups.get(game_id, {}).get(team_id)
def remove_game(self, game_id: UUID) -> None:
"""
Remove game from memory.
Call this when a game is completed or being archived.
Args:
game_id: Game identifier
"""
removed_parts = []
if game_id in self._states:
self._states.pop(game_id)
removed_parts.append("state")
if game_id in self._lineups:
self._lineups.pop(game_id)
removed_parts.append("lineups")
if game_id in self._last_access:
self._last_access.pop(game_id)
removed_parts.append("access")
if removed_parts:
logger.info(f"Removed game {game_id} from memory ({', '.join(removed_parts)})")
else:
logger.warning(f"Attempted to remove game {game_id} but it was not in memory")
async def recover_game(self, game_id: UUID) -> Optional[GameState]:
"""
Recover game state from database.
This is called when a game needs to be loaded (e.g., after server restart,
or when a game is accessed that's not currently in memory).
Loads game data from database and rebuilds the in-memory state.
Args:
game_id: Game identifier
Returns:
Recovered GameState if found in database, None otherwise
"""
logger.info(f"Recovering game {game_id} from database")
# Load from database
game_data = await self.db_ops.load_game_state(game_id)
if not game_data:
logger.warning(f"Game {game_id} not found in database")
return None
# Rebuild state from loaded data
state = await self._rebuild_state_from_data(game_data)
# Cache in memory
self._states[game_id] = state
self._last_access[game_id] = pendulum.now('UTC')
logger.info(f"Recovered game {game_id} - inning {state.inning}, {state.half}")
return state
async def _rebuild_state_from_data(self, game_data: dict) -> GameState:
"""
Rebuild game state from database data using the last completed play.
This method recovers the complete game state without replaying all plays.
It uses the final positions from the last play to reconstruct runners and
batter indices.
Args:
game_data: Dictionary with 'game', 'lineups', and 'plays' keys
Returns:
Reconstructed GameState
"""
game = game_data['game']
state = GameState(
game_id=game['id'],
league_id=game['league_id'],
home_team_id=game['home_team_id'],
away_team_id=game['away_team_id'],
home_team_is_ai=game.get('home_team_is_ai', False),
away_team_is_ai=game.get('away_team_is_ai', False),
status=game['status'],
inning=game.get('current_inning', 1),
half=game.get('current_half', 'top'),
home_score=game.get('home_score', 0),
away_score=game.get('away_score', 0),
play_count=len(game_data.get('plays', []))
)
# Get last completed play to recover runner state and batter indices
plays = game_data.get('plays', [])
if plays:
# Sort by play_number desc and get last completed play
completed_plays = [p for p in plays if p.get('complete', False)]
if completed_plays:
last_play = max(completed_plays, key=lambda p: p['play_number'])
# Build lineup lookup dict for quick access
lineups = game_data.get('lineups', [])
lineup_dict = {l['id']: l for l in lineups}
# Helper function to create LineupPlayerState from lineup_id
def get_lineup_player(lineup_id: int) -> Optional[LineupPlayerState]:
if not lineup_id or lineup_id not in lineup_dict:
return None
lineup = lineup_dict[lineup_id]
return LineupPlayerState(
lineup_id=lineup['id'],
card_id=lineup['card_id'] or 0, # Handle nullable
position=lineup['position'],
batting_order=lineup.get('batting_order'),
is_active=lineup.get('is_active', True)
)
# Recover runners from *_final fields (where they ended up after last play)
# Check each base - if a runner ended on that base, place them there
runner_count = 0
# Check if on_first_id runner ended on first (on_first_final == 1)
if last_play.get('on_first_final') == 1:
state.on_first = get_lineup_player(last_play.get('on_first_id'))
if state.on_first:
runner_count += 1
# Check if on_second_id runner ended on second OR if on_first_id runner advanced to second
if last_play.get('on_second_final') == 2:
state.on_second = get_lineup_player(last_play.get('on_second_id'))
if state.on_second:
runner_count += 1
elif last_play.get('on_first_final') == 2:
state.on_second = get_lineup_player(last_play.get('on_first_id'))
if state.on_second:
runner_count += 1
# Check if any runner ended on third
if last_play.get('on_third_final') == 3:
state.on_third = get_lineup_player(last_play.get('on_third_id'))
if state.on_third:
runner_count += 1
elif last_play.get('on_second_final') == 3:
state.on_third = get_lineup_player(last_play.get('on_second_id'))
if state.on_third:
runner_count += 1
elif last_play.get('on_first_final') == 3:
state.on_third = get_lineup_player(last_play.get('on_first_id'))
if state.on_third:
runner_count += 1
# Check if batter reached base (and didn't score)
batter_final = last_play.get('batter_final')
if batter_final == 1:
state.on_first = get_lineup_player(last_play.get('batter_id'))
if state.on_first:
runner_count += 1
elif batter_final == 2:
state.on_second = get_lineup_player(last_play.get('batter_id'))
if state.on_second:
runner_count += 1
elif batter_final == 3:
state.on_third = get_lineup_player(last_play.get('batter_id'))
if state.on_third:
runner_count += 1
# Recover batter indices from lineups
# Initialize to 0 - will be corrected by _prepare_next_play()
state.away_team_batter_idx = 0
state.home_team_batter_idx = 0
logger.debug(
f"Recovered state from play {last_play['play_number']}: "
f"{runner_count} runners on base"
)
else:
logger.debug("No completed plays found - initializing fresh state")
else:
logger.debug("No plays found - initializing fresh state")
# Count runners on base
runners_on_base = len(state.get_all_runners())
logger.info(f"Rebuilt state for game {state.game_id}: {state.play_count} plays, {runners_on_base} runners")
return state
def evict_idle_games(self, idle_minutes: int = 60) -> int:
"""
Remove games that haven't been accessed recently.
This helps manage memory by removing inactive games. Evicted games
can be recovered from database if needed later.
Args:
idle_minutes: Minutes of inactivity before eviction (default 60)
Returns:
Number of games evicted
"""
cutoff = pendulum.now('UTC').subtract(minutes=idle_minutes)
to_evict = [
game_id for game_id, last_access in self._last_access.items()
if last_access < cutoff
]
for game_id in to_evict:
self.remove_game(game_id)
if to_evict:
logger.info(f"Evicted {len(to_evict)} idle games (idle > {idle_minutes}m)")
return len(to_evict)
def get_stats(self) -> dict:
"""
Get state manager statistics.
Returns:
Dictionary with current state statistics:
- active_games: Number of games in memory
- total_lineups: Total lineups across all games
- games_by_league: Count of games per league
- games_by_status: Count of games by status
"""
stats = {
"active_games": len(self._states),
"total_lineups": sum(len(lineups) for lineups in self._lineups.values()),
"games_by_league": {},
"games_by_status": {},
}
# Count by league
for state in self._states.values():
league = state.league_id
stats["games_by_league"][league] = stats["games_by_league"].get(league, 0) + 1
# Count by status
for state in self._states.values():
status = state.status
stats["games_by_status"][status] = stats["games_by_status"].get(status, 0) + 1
return stats
def exists(self, game_id: UUID) -> bool:
"""
Check if game exists in memory.
Args:
game_id: Game identifier
Returns:
True if game is in memory, False otherwise
"""
return game_id in self._states
def get_all_game_ids(self) -> list[UUID]:
"""
Get list of all game IDs currently in memory.
Returns:
List of game UUIDs
"""
return list(self._states.keys())
# ============================================================================
# PHASE 3: DECISION QUEUE MANAGEMENT
# ============================================================================
def set_pending_decision(
self,
game_id: UUID,
team_id: int,
decision_type: str
) -> None:
"""
Mark that a decision is required and create a future for it.
Args:
game_id: Game identifier
team_id: Team that needs to make the decision
decision_type: Type of decision ('defensive' or 'offensive')
"""
key = (game_id, team_id, decision_type)
# Create a new future for this decision
self._pending_decisions[key] = asyncio.Future()
logger.debug(f"Set pending {decision_type} decision for game {game_id}, team {team_id}")
async def await_decision(
self,
game_id: UUID,
team_id: int,
decision_type: str
) -> Union[DefensiveDecision, OffensiveDecision]:
"""
Wait for a decision to be submitted.
This coroutine will block until submit_decision() is called
with matching parameters.
Args:
game_id: Game identifier
team_id: Team making the decision
decision_type: Type of decision expected
Returns:
The submitted decision (DefensiveDecision or OffensiveDecision)
Raises:
ValueError: If no pending decision exists for these parameters
asyncio.TimeoutError: If decision not received within timeout (handled by caller)
"""
key = (game_id, team_id, decision_type)
if key not in self._pending_decisions:
raise ValueError(
f"No pending {decision_type} decision for game {game_id}, team {team_id}"
)
# Await the future (will be resolved by submit_decision)
decision = await self._pending_decisions[key]
logger.debug(f"Received {decision_type} decision for game {game_id}, team {team_id}")
return decision
def submit_decision(
self,
game_id: UUID,
team_id: int,
decision: Union[DefensiveDecision, OffensiveDecision]
) -> None:
"""
Submit a decision (called by WebSocket handler or AI opponent).
This resolves the pending future created by set_pending_decision().
Args:
game_id: Game identifier
team_id: Team making the decision
decision: The decision being submitted
Raises:
ValueError: If no pending decision exists
"""
# Determine decision type from the decision object
from app.models.game_models import DefensiveDecision
decision_type = "defensive" if isinstance(decision, DefensiveDecision) else "offensive"
key = (game_id, team_id, decision_type)
if key not in self._pending_decisions:
raise ValueError(
f"No pending {decision_type} decision for game {game_id}, team {team_id}"
)
future = self._pending_decisions[key]
# Check if already resolved (should not happen)
if future.done():
logger.warning(f"Decision already submitted for {key}")
return
# Resolve the future with the decision
future.set_result(decision)
# Clean up the future
del self._pending_decisions[key]
logger.info(f"Submitted {decision_type} decision for game {game_id}, team {team_id}")
def cancel_pending_decision(
self,
game_id: UUID,
team_id: int,
decision_type: str
) -> None:
"""
Cancel a pending decision (e.g., on timeout or game abort).
Args:
game_id: Game identifier
team_id: Team that was expected to decide
decision_type: Type of decision
"""
key = (game_id, team_id, decision_type)
if key in self._pending_decisions:
future = self._pending_decisions[key]
if not future.done():
future.cancel()
del self._pending_decisions[key]
logger.debug(f"Cancelled pending {decision_type} decision for game {game_id}, team {team_id}")
# Singleton instance for global access
state_manager = StateManager()