strat-gameplay-webapp/backend/app/core/state_manager.py
Cal Corum 64325d7163 CLAUDE: Fix game recovery to load team display info and add score text outline
Backend:
- Add game_metadata to load_game_state() return dict in DatabaseOperations
- Populate team display fields (name, color, thumbnail) in _rebuild_state_from_data()
  so recovered games show team colors/names

Frontend:
- Add text-outline CSS for score visibility on any background (light logos, gradients)
- Handle thumbnail 404 with @error event, show enhanced shadow when no thumbnail
- Apply consistent outline across mobile and desktop layouts

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-17 09:03:59 -06:00

974 lines
36 KiB
Python

"""
State Manager - In-memory game state management.
Manages active game states in memory for fast gameplay (<500ms response time).
Provides CRUD operations, lineup management, and state recovery from database.
This is the single source of truth for active game states during gameplay.
Author: Claude
Date: 2025-10-22
"""
import asyncio
import logging
from contextlib import asynccontextmanager
from uuid import UUID
import pendulum
from app.database.operations import DatabaseOperations
from app.models.game_models import (
DefensiveDecision,
GameState,
LineupPlayerState,
OffensiveDecision,
TeamLineupState,
)
logger = logging.getLogger(f"{__name__}.StateManager")
class StateManager:
"""
Manages in-memory game states for active games.
Responsibilities:
- Store game states in memory for fast access
- Manage team lineups per game
- Track last access times for eviction
- Recover game states from database on demand
This class uses dictionaries for O(1) lookups of game state by game_id.
TODO: Redis Migration for Multi-Worker Scalability
=================================================
Current implementation stores state in Python dicts, limiting to single worker.
For multi-worker deployment (high concurrency), migrate to Redis:
Storage mapping:
_states[game_id] → Redis key: game:{id}:state (JSON)
_lineups[game_id][team_id] → Redis key: game:{id}:lineup:{team_id} (JSON)
_last_access[game_id] → Redis key TTL (auto-expiry)
_pending_decisions → Redis pub/sub for cross-worker coordination
Implementation steps:
1. Add async Redis client (aioredis or redis-py async)
2. Create RedisStateManager implementing same interface
3. Serialize GameState/TeamLineupState via .model_dump_json()
4. Add Redis pub/sub for WebSocket broadcast across workers
5. Update Dockerfile to use --workers N
Performance impact: ~1ms latency per Redis call (vs ~1μs in-memory)
Still well under 500ms response target.
Note: Redis is already in docker-compose for rate limiting.
"""
def __init__(self):
"""Initialize the state manager with empty storage"""
self._states: dict[UUID, GameState] = {}
self._lineups: dict[
UUID, dict[int, TeamLineupState]
] = {} # game_id -> {team_id: lineup}
self._last_access: dict[UUID, pendulum.DateTime] = {}
# Phase 3: Decision queue for async decision awaiting
# Key: (game_id, team_id, decision_type)
self._pending_decisions: dict[tuple[UUID, int, str], asyncio.Future] = {}
# Per-game locks for concurrent access protection
self._game_locks: dict[UUID, asyncio.Lock] = {}
self.db_ops = DatabaseOperations()
logger.info("StateManager initialized")
async def create_game(
self,
game_id: UUID,
league_id: str,
home_team_id: int,
away_team_id: int,
home_team_is_ai: bool = False,
away_team_is_ai: bool = False,
auto_mode: bool = False,
creator_discord_id: str | None = None,
game_metadata: dict | None = None,
) -> GameState:
"""
Create a new game state in memory.
Args:
game_id: Unique game identifier
league_id: League identifier ('sba' or 'pd')
home_team_id: Home team ID
away_team_id: Away team ID
home_team_is_ai: Whether home team is AI-controlled
away_team_is_ai: Whether away team is AI-controlled
auto_mode: True = auto-generate outcomes (PD only), False = manual submissions
game_metadata: Optional dict with team display info (lname, abbrev, color, thumbnail)
Returns:
Newly created GameState
Raises:
ValueError: If game_id already exists
"""
if game_id in self._states:
raise ValueError(f"Game {game_id} already exists in state manager")
logger.info(
f"Creating game state for {game_id} ({league_id} league, auto_mode={auto_mode})"
)
# Create placeholder batter (will be set by _prepare_next_play() when game starts)
from app.models.game_models import LineupPlayerState
placeholder_batter = LineupPlayerState(
lineup_id=0, card_id=0, position="DH", batting_order=None
)
# Extract team display info from metadata
home_meta = game_metadata.get("home_team", {}) if game_metadata else {}
away_meta = game_metadata.get("away_team", {}) if game_metadata else {}
state = GameState(
game_id=game_id,
league_id=league_id,
home_team_id=home_team_id,
away_team_id=away_team_id,
home_team_is_ai=home_team_is_ai,
away_team_is_ai=away_team_is_ai,
auto_mode=auto_mode,
creator_discord_id=creator_discord_id,
current_batter=placeholder_batter, # Will be replaced by _prepare_next_play() when game starts
# Team display info from metadata
home_team_name=home_meta.get("lname"),
home_team_abbrev=home_meta.get("abbrev"),
home_team_color=home_meta.get("color"),
home_team_thumbnail=home_meta.get("thumbnail"),
away_team_name=away_meta.get("lname"),
away_team_abbrev=away_meta.get("abbrev"),
away_team_color=away_meta.get("color"),
away_team_thumbnail=away_meta.get("thumbnail"),
)
self._states[game_id] = state
self._lineups[game_id] = {}
self._last_access[game_id] = pendulum.now("UTC")
logger.debug(f"Game {game_id} created in memory")
return state
def get_state(self, game_id: UUID) -> GameState | None:
"""
Get game state by ID.
Updates last access time when accessed.
Args:
game_id: Game identifier
Returns:
GameState if found, None otherwise
"""
if game_id in self._states:
self._last_access[game_id] = pendulum.now("UTC")
return self._states[game_id]
return None
def update_state(self, game_id: UUID, state: GameState) -> None:
"""
Update game state.
Args:
game_id: Game identifier
state: Updated GameState
Raises:
ValueError: If game_id doesn't exist
"""
if game_id not in self._states:
raise ValueError(f"Game {game_id} not found in state manager")
self._states[game_id] = state
self._last_access[game_id] = pendulum.now("UTC")
logger.debug(
f"Updated state for game {game_id} (inning {state.inning}, {state.half})"
)
def set_lineup(self, game_id: UUID, team_id: int, lineup: TeamLineupState) -> None:
"""
Set team lineup for a game.
Args:
game_id: Game identifier
team_id: Team identifier
lineup: Team lineup state
Raises:
ValueError: If game_id doesn't exist
"""
if game_id not in self._states:
raise ValueError(f"Game {game_id} not found in state manager")
if game_id not in self._lineups:
self._lineups[game_id] = {}
self._lineups[game_id][team_id] = lineup
logger.info(
f"Set lineup for team {team_id} in game {game_id} ({len(lineup.players)} players)"
)
def get_lineup(self, game_id: UUID, team_id: int) -> TeamLineupState | None:
"""
Get team lineup for a game.
Args:
game_id: Game identifier
team_id: Team identifier
Returns:
TeamLineupState if found, None otherwise
"""
return self._lineups.get(game_id, {}).get(team_id)
def remove_game(self, game_id: UUID) -> None:
"""
Remove game from memory.
Call this when a game is completed or being archived.
Args:
game_id: Game identifier
"""
removed_parts = []
if game_id in self._states:
self._states.pop(game_id)
removed_parts.append("state")
if game_id in self._lineups:
self._lineups.pop(game_id)
removed_parts.append("lineups")
if game_id in self._last_access:
self._last_access.pop(game_id)
removed_parts.append("access")
if game_id in self._game_locks:
self._game_locks.pop(game_id)
removed_parts.append("lock")
if removed_parts:
logger.info(
f"Removed game {game_id} from memory ({', '.join(removed_parts)})"
)
else:
logger.warning(
f"Attempted to remove game {game_id} but it was not in memory"
)
async def recover_game(self, game_id: UUID) -> GameState | None:
"""
Recover game state from database.
This is called when a game needs to be loaded (e.g., after server restart,
or when a game is accessed that's not currently in memory).
Loads game data from database and rebuilds the in-memory state.
Args:
game_id: Game identifier
Returns:
Recovered GameState if found in database, None otherwise
"""
logger.info(f"Recovering game {game_id} from database")
# Load from database
game_data = await self.db_ops.load_game_state(game_id)
if not game_data:
logger.warning(f"Game {game_id} not found in database")
return None
# Rebuild state from loaded data
state = await self._rebuild_state_from_data(game_data)
# Cache in memory
self._states[game_id] = state
self._last_access[game_id] = pendulum.now("UTC")
logger.info(f"Recovered game {game_id} - inning {state.inning}, {state.half}")
return state
async def _rebuild_state_from_data(self, game_data: dict) -> GameState:
"""
Rebuild game state from database data using the last completed play.
This method recovers the complete game state without replaying all plays.
It uses the final positions from the last play to reconstruct runners and
batter indices.
Args:
game_data: Dictionary with 'game', 'lineups', and 'plays' keys
Returns:
Reconstructed GameState
"""
game = game_data["game"]
lineups = game_data.get("lineups", [])
# Build lineup lookup dict for quick access
lineup_dict = {l["id"]: l for l in lineups}
# Helper function to create LineupPlayerState from lineup_id
def get_lineup_player(lineup_id: int) -> LineupPlayerState | None:
if not lineup_id or lineup_id not in lineup_dict:
return None
lineup = lineup_dict[lineup_id]
return LineupPlayerState(
lineup_id=lineup["id"],
card_id=lineup["card_id"] or 0, # Handle nullable
position=lineup["position"],
batting_order=lineup.get("batting_order"),
is_active=lineup.get("is_active", True),
)
# Determine fielding team based on current half
# Use `or` to handle explicit None values from database (not just missing keys)
current_half = game.get("current_half") or "top"
home_team_id = game["home_team_id"]
away_team_id = game["away_team_id"]
if current_half == "top":
# Top of inning: away team batting, home team fielding
batting_team_id = away_team_id
fielding_team_id = home_team_id
else:
# Bottom of inning: home team batting, away team fielding
batting_team_id = home_team_id
fielding_team_id = away_team_id
# Get current batter from batting team (player with batting_order 1 as placeholder)
current_batter_placeholder = None
for lineup in lineups:
if (
lineup.get("team_id") == batting_team_id
and lineup.get("batting_order") == 1
and lineup.get("is_active")
):
current_batter_placeholder = get_lineup_player(lineup["id"])
break
# If no batter found, use first available lineup from batting team
if not current_batter_placeholder:
for lineup in lineups:
if lineup.get("team_id") == batting_team_id and lineup.get("is_active"):
current_batter_placeholder = get_lineup_player(lineup["id"])
break
# If still no batter (no lineups at all), raise error - game is in invalid state
if not current_batter_placeholder:
raise ValueError(
f"Cannot recover game {game['id']}: No lineups found for batting team"
)
# Get current pitcher and catcher from fielding team
current_pitcher = None
current_catcher = None
for lineup in lineups:
if lineup.get("team_id") == fielding_team_id and lineup.get("is_active"):
if lineup.get("position") == "P" and not current_pitcher:
current_pitcher = get_lineup_player(lineup["id"])
elif lineup.get("position") == "C" and not current_catcher:
current_catcher = get_lineup_player(lineup["id"])
# Stop if we found both
if current_pitcher and current_catcher:
break
# Extract team display info from game_metadata (stored at game creation)
game_metadata = game.get("game_metadata") or {}
home_meta = game_metadata.get("home_team", {})
away_meta = game_metadata.get("away_team", {})
state = GameState(
game_id=game["id"],
league_id=game["league_id"],
home_team_id=home_team_id,
away_team_id=away_team_id,
home_team_is_ai=game.get("home_team_is_ai", False),
away_team_is_ai=game.get("away_team_is_ai", False),
status=game["status"],
inning=game.get("current_inning") or 1,
half=current_half,
home_score=game.get("home_score", 0),
away_score=game.get("away_score", 0),
play_count=len(game_data.get("plays", [])),
current_batter=current_batter_placeholder,
current_pitcher=current_pitcher,
current_catcher=current_catcher,
# Team display info from metadata
home_team_name=home_meta.get("lname"),
home_team_abbrev=home_meta.get("abbrev"),
home_team_color=home_meta.get("color"),
home_team_thumbnail=home_meta.get("thumbnail"),
away_team_name=away_meta.get("lname"),
away_team_abbrev=away_meta.get("abbrev"),
away_team_color=away_meta.get("color"),
away_team_thumbnail=away_meta.get("thumbnail"),
)
# Get last completed play to recover runner state and batter indices
plays = game_data.get("plays", [])
if plays:
# Sort by play_number desc and get last completed play
completed_plays = [p for p in plays if p.get("complete", False)]
if completed_plays:
last_play = max(completed_plays, key=lambda p: p["play_number"])
# Recover runners from *_final fields (where they ended up after last play)
# Check each base - if a runner ended on that base, place them there
runner_count = 0
# Check if on_first_id runner ended on first (on_first_final == 1)
if last_play.get("on_first_final") == 1:
state.on_first = get_lineup_player(last_play.get("on_first_id"))
if state.on_first:
runner_count += 1
# Check if on_second_id runner ended on second OR if on_first_id runner advanced to second
if last_play.get("on_second_final") == 2:
state.on_second = get_lineup_player(last_play.get("on_second_id"))
if state.on_second:
runner_count += 1
elif last_play.get("on_first_final") == 2:
state.on_second = get_lineup_player(last_play.get("on_first_id"))
if state.on_second:
runner_count += 1
# Check if any runner ended on third
if last_play.get("on_third_final") == 3:
state.on_third = get_lineup_player(last_play.get("on_third_id"))
if state.on_third:
runner_count += 1
elif last_play.get("on_second_final") == 3:
state.on_third = get_lineup_player(last_play.get("on_second_id"))
if state.on_third:
runner_count += 1
elif last_play.get("on_first_final") == 3:
state.on_third = get_lineup_player(last_play.get("on_first_id"))
if state.on_third:
runner_count += 1
# Check if batter reached base (and didn't score)
batter_final = last_play.get("batter_final")
if batter_final == 1:
state.on_first = get_lineup_player(last_play.get("batter_id"))
if state.on_first:
runner_count += 1
elif batter_final == 2:
state.on_second = get_lineup_player(last_play.get("batter_id"))
if state.on_second:
runner_count += 1
elif batter_final == 3:
state.on_third = get_lineup_player(last_play.get("batter_id"))
if state.on_third:
runner_count += 1
# Recover outs from last play
outs_before = last_play.get("outs_before", 0)
outs_recorded = last_play.get("outs_recorded", 0)
outs_after = outs_before + outs_recorded
# Handle inning transitions - if 3+ outs, should be 0 (new inning/half)
# The games table current_inning/current_half should already reflect this
if outs_after >= 3:
state.outs = 0
else:
state.outs = outs_after
# Recover batter indices - find last play for EACH team separately
# This is critical after inning changes - we need both teams' positions
away_plays = [p for p in completed_plays if p.get("half") == "top"]
home_plays = [p for p in completed_plays if p.get("half") == "bottom"]
# Away team's last at-bat
if away_plays:
last_away_play = max(away_plays, key=lambda p: p["play_number"])
away_last_order = last_away_play.get("batting_order")
if away_last_order:
# Next batter is last_batter + 1, wrapping at 9
state.away_team_batter_idx = away_last_order % 9 # Already 0-indexed after mod
logger.info(f"Recovery: Away team last batter was #{away_last_order}, next is #{state.away_team_batter_idx + 1}")
else:
state.away_team_batter_idx = 0
else:
# Away team hasn't batted yet (game just started or first pitch)
state.away_team_batter_idx = 0
logger.info("Recovery: Away team has no plays yet, starting at batter #1")
# Home team's last at-bat
if home_plays:
last_home_play = max(home_plays, key=lambda p: p["play_number"])
home_last_order = last_home_play.get("batting_order")
if home_last_order:
# Next batter is last_batter + 1, wrapping at 9
state.home_team_batter_idx = home_last_order % 9 # Already 0-indexed after mod
logger.info(f"Recovery: Home team last batter was #{home_last_order}, next is #{state.home_team_batter_idx + 1}")
else:
state.home_team_batter_idx = 0
else:
# Home team hasn't batted yet
state.home_team_batter_idx = 0
logger.info("Recovery: Home team has no plays yet, starting at batter #1")
# Determine which index to use for current_batter
if batting_team_id == away_team_id:
next_batter_idx = state.away_team_batter_idx
else:
next_batter_idx = state.home_team_batter_idx
logger.info(f"Recovery: Set batter indices - away={state.away_team_batter_idx}, home={state.home_team_batter_idx}, current batting team uses idx={next_batter_idx}")
# Update current_batter to match the recovered batter index
# Get batting lineup sorted by batting_order
batting_lineup = [
get_lineup_player(lineup["id"])
for lineup in lineups
if lineup.get("team_id") == batting_team_id
and lineup.get("batting_order") is not None
and lineup.get("is_active")
]
logger.info(f"Recovery: Found {len(batting_lineup)} batters (before None filter)")
# Filter out None values (if any)
batting_lineup = [b for b in batting_lineup if b is not None]
logger.info(f"Recovery: {len(batting_lineup)} batters after None filter")
batting_lineup_sorted = sorted(
batting_lineup, key=lambda x: x.batting_order or 0
)
logger.info(f"Recovery: Sorted lineup has {len(batting_lineup_sorted)} batters")
# Set current_batter to the batter at next_batter_idx
if next_batter_idx < len(batting_lineup_sorted):
state.current_batter = batting_lineup_sorted[next_batter_idx]
logger.info(
f"Recovery: ✓ Set current_batter to idx={next_batter_idx}, "
f"card_id={state.current_batter.card_id}, batting_order={state.current_batter.batting_order}"
)
else:
logger.warning(
f"Recovery: ✗ Batter index {next_batter_idx} out of range for batting order "
f"(lineup size: {len(batting_lineup_sorted)})"
)
# Always start at awaiting_defensive on recovery
# (Users can re-submit decisions if they refreshed mid-workflow)
state.decision_phase = "awaiting_defensive"
logger.debug(
f"Recovered state from play {last_play['play_number']}: "
f"{runner_count} runners on base, {state.outs} outs"
)
else:
# No completed plays - but game is active, so start decision workflow
if state.status == "active":
state.decision_phase = "awaiting_defensive"
logger.debug("No completed plays found - initializing fresh state")
else:
# No plays at all - if game is active, start decision workflow
if state.status == "active":
state.decision_phase = "awaiting_defensive"
logger.debug("No plays found - initializing fresh state")
# Count runners on base
runners_on_base = len(state.get_all_runners())
logger.info(
f"Rebuilt state for game {state.game_id}: {state.play_count} plays, {runners_on_base} runners"
)
return state
async def evict_idle_games(self) -> list[UUID]:
"""
Remove games that have been idle beyond the timeout threshold.
Persists game state to database before eviction to prevent data loss.
Uses configuration from settings for idle timeout.
Returns:
List of evicted game IDs
"""
from app.config import get_settings
settings = get_settings()
now = pendulum.now("UTC")
timeout_seconds = settings.game_idle_timeout_hours * 3600
evicted = []
# Find idle games
for game_id, last_access in list(self._last_access.items()):
idle_seconds = (now - last_access).total_seconds()
if idle_seconds > timeout_seconds:
evicted.append(game_id)
# Evict them (persist before removal)
for game_id in evicted:
idle_hours = (now - self._last_access.get(game_id, now)).total_seconds() / 3600
await self._evict_game(game_id)
logger.info(f"Evicted idle game {game_id} (idle {idle_hours:.1f} hours)")
if evicted:
logger.info(f"Evicted {len(evicted)} idle games. Active: {len(self._states)}")
return evicted
async def _evict_game(self, game_id: UUID) -> None:
"""
Remove a single game from memory.
Persists final state to database before removal to prevent data loss.
Args:
game_id: Game identifier to evict
"""
# Persist final state before removal
if game_id in self._states:
game_state = self._states[game_id]
try:
# Save current state to database
await self.db_ops.update_game_state(
game_id=game_id,
status=game_state.status,
current_inning=game_state.inning,
current_half=game_state.half,
home_score=game_state.home_score,
away_score=game_state.away_score,
outs=game_state.outs,
)
logger.debug(f"Persisted game {game_id} before eviction")
except Exception as e:
logger.error(f"Failed to persist game {game_id} before eviction: {e}")
# Continue with eviction even if persist fails
# Data can still be recovered from last successful DB write
# Remove from all tracking dictionaries
self.remove_game(game_id)
async def enforce_memory_limit(self) -> list[UUID]:
"""
Enforce hard limit on in-memory games.
Evicts oldest games (by last access time) if limit is exceeded.
This prevents OOM conditions from unbounded game accumulation.
Returns:
List of force-evicted game IDs
"""
from app.config import get_settings
settings = get_settings()
if len(self._states) <= settings.game_max_in_memory:
return []
# Sort by last access time (oldest first)
sorted_games = sorted(self._last_access.items(), key=lambda x: x[1])
# Evict oldest until under limit
to_evict_count = len(self._states) - settings.game_max_in_memory
evicted = []
for game_id, _ in sorted_games[:to_evict_count]:
await self._evict_game(game_id)
evicted.append(game_id)
logger.warning(f"Force-evicted game {game_id} (memory limit reached)")
if evicted:
logger.warning(
f"Force-evicted {len(evicted)} games to stay under {settings.game_max_in_memory} limit"
)
return evicted
def get_memory_stats(self) -> dict:
"""
Return memory usage statistics for health monitoring.
Returns:
Dictionary with memory stats:
- active_games: Current game count
- max_games: Configured limit
- oldest_game_hours: Age of oldest game
- total_lineups_cached: Total lineup entries
"""
from app.config import get_settings
settings = get_settings()
return {
"active_games": len(self._states),
"max_games": settings.game_max_in_memory,
"oldest_game_hours": self._get_oldest_game_age_hours(),
"total_lineups_cached": sum(len(l) for l in self._lineups.values()),
"total_locks": len(self._game_locks),
}
def _get_oldest_game_age_hours(self) -> float:
"""Get age of oldest game in hours."""
if not self._last_access:
return 0.0
oldest = min(self._last_access.values())
return (pendulum.now("UTC") - oldest).total_seconds() / 3600
def get_stats(self) -> dict:
"""
Get state manager statistics.
Returns:
Dictionary with current state statistics:
- active_games: Number of games in memory
- total_lineups: Total lineups across all games
- games_by_league: Count of games per league
- games_by_status: Count of games by status
"""
stats = {
"active_games": len(self._states),
"total_lineups": sum(len(lineups) for lineups in self._lineups.values()),
"games_by_league": {},
"games_by_status": {},
}
# Count by league
for state in self._states.values():
league = state.league_id
stats["games_by_league"][league] = (
stats["games_by_league"].get(league, 0) + 1
)
# Count by status
for state in self._states.values():
status = state.status
stats["games_by_status"][status] = (
stats["games_by_status"].get(status, 0) + 1
)
return stats
def exists(self, game_id: UUID) -> bool:
"""
Check if game exists in memory.
Args:
game_id: Game identifier
Returns:
True if game is in memory, False otherwise
"""
return game_id in self._states
def get_all_game_ids(self) -> list[UUID]:
"""
Get list of all game IDs currently in memory.
Returns:
List of game UUIDs
"""
return list(self._states.keys())
# ============================================================================
# CONCURRENCY CONTROL
# ============================================================================
def _get_game_lock(self, game_id: UUID) -> asyncio.Lock:
"""
Get or create a lock for the specified game.
Args:
game_id: Game identifier
Returns:
asyncio.Lock for the game
"""
if game_id not in self._game_locks:
self._game_locks[game_id] = asyncio.Lock()
return self._game_locks[game_id]
@asynccontextmanager
async def game_lock(self, game_id: UUID, timeout: float = 30.0):
"""
Acquire exclusive lock for game operations with timeout.
Use this context manager for any operation that modifies game state
to prevent race conditions from concurrent WebSocket handlers.
Args:
game_id: Game identifier
timeout: Maximum seconds to wait for lock (default 30.0)
Raises:
asyncio.TimeoutError: If lock cannot be acquired within timeout
Usage:
async with state_manager.game_lock(game_id):
# Perform state modifications
state = state_manager.get_state(game_id)
state.pending_manual_roll = roll
state_manager.update_state(game_id, state)
"""
lock = self._get_game_lock(game_id)
try:
await asyncio.wait_for(lock.acquire(), timeout=timeout)
try:
yield
finally:
lock.release()
except asyncio.TimeoutError:
logger.error(
f"Failed to acquire lock for game {game_id} within {timeout}s"
)
raise asyncio.TimeoutError(
f"Could not acquire lock for game {game_id} - operation timed out"
)
# ============================================================================
# PHASE 3: DECISION QUEUE MANAGEMENT
# ============================================================================
def set_pending_decision(
self, game_id: UUID, team_id: int, decision_type: str
) -> None:
"""
Mark that a decision is required and create a future for it.
Args:
game_id: Game identifier
team_id: Team that needs to make the decision
decision_type: Type of decision ('defensive' or 'offensive')
"""
key = (game_id, team_id, decision_type)
# Create a new future for this decision
self._pending_decisions[key] = asyncio.Future()
logger.debug(
f"Set pending {decision_type} decision for game {game_id}, team {team_id}"
)
async def await_decision(
self, game_id: UUID, team_id: int, decision_type: str
) -> DefensiveDecision | OffensiveDecision:
"""
Wait for a decision to be submitted.
This coroutine will block until submit_decision() is called
with matching parameters.
Args:
game_id: Game identifier
team_id: Team making the decision
decision_type: Type of decision expected
Returns:
The submitted decision (DefensiveDecision or OffensiveDecision)
Raises:
ValueError: If no pending decision exists for these parameters
asyncio.TimeoutError: If decision not received within timeout (handled by caller)
"""
key = (game_id, team_id, decision_type)
if key not in self._pending_decisions:
raise ValueError(
f"No pending {decision_type} decision for game {game_id}, team {team_id}"
)
# Await the future (will be resolved by submit_decision)
decision = await self._pending_decisions[key]
logger.debug(
f"Received {decision_type} decision for game {game_id}, team {team_id}"
)
return decision
def submit_decision(
self,
game_id: UUID,
team_id: int,
decision: DefensiveDecision | OffensiveDecision,
) -> None:
"""
Submit a decision (called by WebSocket handler or AI opponent).
This resolves the pending future created by set_pending_decision().
Args:
game_id: Game identifier
team_id: Team making the decision
decision: The decision being submitted
Raises:
ValueError: If no pending decision exists
"""
# Determine decision type from the decision object
from app.models.game_models import DefensiveDecision
decision_type = (
"defensive" if isinstance(decision, DefensiveDecision) else "offensive"
)
key = (game_id, team_id, decision_type)
if key not in self._pending_decisions:
raise ValueError(
f"No pending {decision_type} decision for game {game_id}, team {team_id}"
)
future = self._pending_decisions[key]
# Check if already resolved (should not happen)
if future.done():
logger.warning(f"Decision already submitted for {key}")
return
# Resolve the future with the decision
future.set_result(decision)
# Clean up the future
del self._pending_decisions[key]
logger.info(
f"Submitted {decision_type} decision for game {game_id}, team {team_id}"
)
def cancel_pending_decision(
self, game_id: UUID, team_id: int, decision_type: str
) -> None:
"""
Cancel a pending decision (e.g., on timeout or game abort).
Args:
game_id: Game identifier
team_id: Team that was expected to decide
decision_type: Type of decision
"""
key = (game_id, team_id, decision_type)
if key in self._pending_decisions:
future = self._pending_decisions[key]
if not future.done():
future.cancel()
del self._pending_decisions[key]
logger.debug(
f"Cancelled pending {decision_type} decision for game {game_id}, team {team_id}"
)
# Singleton instance for global access
state_manager = StateManager()