"""Game state management with Redis-primary, Postgres-backup strategy. This service implements a write-behind caching pattern for game state: - Redis is the primary store for fast reads/writes during gameplay - Postgres is the durable backup, updated at turn boundaries and game end - On server restart, active games are recovered from Postgres → Redis Key Patterns: game:{game_id} - JSON serialized GameState in Redis Write Strategy: 1. Every action: save_to_cache() → Redis only (fast path) 2. Turn boundaries: persist_to_db() → Both Redis and Postgres 3. Game end: persist_to_db() + move to history, delete from active Read Strategy: 1. load_state() → Try Redis first 2. Cache miss → Load from Postgres, populate Redis 3. Not found → Return None Recovery Strategy: 1. On startup, recover_active_games() loads all ActiveGame → Redis 2. Stale games (no activity for X hours) are auto-expired Example: manager = GameStateManager() # Create new game game = create_game_state(...) await manager.save_to_cache(game) await manager.persist_to_db(game) # During gameplay (fast path) game = await manager.load_state(game_id) # ... apply action ... await manager.save_to_cache(game) # At turn boundary await manager.persist_to_db(game) # Game end await manager.end_game(game_id, game_history) """ import logging from dataclasses import dataclass, field from datetime import UTC, datetime from uuid import UUID from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.config import settings from app.core.models.game_state import GameState from app.db.models import ActiveGame, EndReason, GameHistory, GameType from app.db.redis import RedisHelper, redis_helper from app.db.session import get_session logger = logging.getLogger(__name__) @dataclass(frozen=True) class RecoveryResult: """Result of game recovery operation. Attributes: recovered: Number of games successfully recovered. failed: List of (game_id, error_message) tuples for failed recoveries. total: Total number of games attempted. """ recovered: int failed: tuple[tuple[str, str], ...] = field(default_factory=tuple) total: int = 0 @dataclass(frozen=True) class ArchiveResult: """Result of archiving a game to history. Attributes: success: Whether the archive operation succeeded. game_id: The archived game's ID. history_id: The GameHistory record ID (same as game_id). duration_seconds: Game duration in seconds. turn_count: Total turns played. game_type: Type of game that was archived. """ success: bool game_id: str history_id: str duration_seconds: int turn_count: int game_type: GameType # Redis key patterns GAME_KEY_PREFIX = "game:" class GameStateManager: """Manages game state persistence across Redis and Postgres. Uses write-behind caching pattern: - Redis is primary for fast gameplay - Postgres is durable backup at turn boundaries Attributes: redis: RedisHelper instance for cache operations. TODO: Add session_factory parameter for full DI support, similar to ConnectionManager's redis_factory pattern. Currently methods accept optional session parameter, but fallback to get_session() global. This would eliminate need for patching in unit tests. See: - ConnectionManager for reference implementation - tests/unit/services/test_game_state_manager.py for affected tests """ def __init__(self, redis: RedisHelper | None = None) -> None: """Initialize the GameStateManager. Args: redis: Optional RedisHelper instance. Uses global helper if not provided. """ self.redis = redis or redis_helper def _game_key(self, game_id: str) -> str: """Generate Redis key for a game. Args: game_id: Unique game identifier. Returns: Redis key in format "game:{game_id}". """ return f"{GAME_KEY_PREFIX}{game_id}" # ========================================================================= # Cache Operations (Redis) # ========================================================================= async def save_to_cache(self, game: GameState) -> None: """Save game state to Redis cache. This is the fast path used during gameplay. Game state is serialized to JSON and stored with a TTL to prevent stale games from lingering. Args: game: GameState to cache. Example: await manager.save_to_cache(game_state) """ key = self._game_key(game.game_id) data = game.model_dump(mode="json") await self.redis.set_json(key, data, expire_seconds=settings.game_cache_ttl_seconds) logger.debug(f"Cached game state: {game.game_id}") async def load_from_cache(self, game_id: str) -> GameState | None: """Load game state from Redis cache. Args: game_id: Unique game identifier. Returns: GameState if found in cache, None otherwise. """ key = self._game_key(game_id) data = await self.redis.get_json(key) if data is None: return None return GameState.model_validate(data) async def delete_from_cache(self, game_id: str) -> bool: """Delete game state from Redis cache. Args: game_id: Unique game identifier. Returns: True if deleted, False if not found. """ key = self._game_key(game_id) deleted = await self.redis.delete(key) if deleted: logger.debug(f"Deleted game from cache: {game_id}") return deleted async def cache_exists(self, game_id: str) -> bool: """Check if game exists in cache. Args: game_id: Unique game identifier. Returns: True if game is in cache. """ key = self._game_key(game_id) return await self.redis.exists(key) # ========================================================================= # Database Operations (Postgres) # ========================================================================= async def persist_to_db( self, game: GameState, game_type: GameType = GameType.CAMPAIGN, player1_id: UUID | None = None, player2_id: UUID | None = None, npc_id: str | None = None, rules_config: dict | None = None, session: AsyncSession | None = None, ) -> ActiveGame: """Persist game state to Postgres (and update Redis). Creates or updates an ActiveGame record in Postgres. This is called at turn boundaries and game end for durability. Args: game: GameState to persist. game_type: Type of game (campaign, freeplay, ranked). player1_id: First player's user ID. player2_id: Second player's user ID (None for campaign). npc_id: NPC opponent ID for campaign games. rules_config: Optional rules override dict. session: Optional existing session (for transaction support). Returns: The created or updated ActiveGame record. Example: await manager.persist_to_db( game_state, game_type=GameType.CAMPAIGN, player1_id=user.id, npc_id="grass_trainer_1" ) """ # Serialize game state to JSON game_state_json = game.model_dump(mode="json") rules_json = rules_config or game.rules.model_dump(mode="json") async def _persist(db: AsyncSession) -> ActiveGame: # Try to find existing game result = await db.execute(select(ActiveGame).where(ActiveGame.id == UUID(game.game_id))) active_game = result.scalar_one_or_none() if active_game is None: # Create new record active_game = ActiveGame( id=UUID(game.game_id), game_type=game_type, player1_id=player1_id, player2_id=player2_id, npc_id=npc_id, rules_config=rules_json, game_state=game_state_json, turn_number=game.turn_number, started_at=datetime.now(UTC), last_action_at=datetime.now(UTC), ) db.add(active_game) logger.info(f"Created ActiveGame: {game.game_id}") else: # Update existing record active_game.game_state = game_state_json active_game.turn_number = game.turn_number active_game.last_action_at = datetime.now(UTC) logger.debug(f"Updated ActiveGame: {game.game_id}") await db.flush() await db.refresh(active_game) return active_game if session: return await _persist(session) else: async with get_session() as db: result = await _persist(db) await db.commit() return result async def load_from_db( self, game_id: str, session: AsyncSession | None = None, ) -> GameState | None: """Load game state from Postgres. Args: game_id: Unique game identifier. session: Optional existing session. Returns: GameState if found, None otherwise. """ async def _load(db: AsyncSession) -> GameState | None: result = await db.execute(select(ActiveGame).where(ActiveGame.id == UUID(game_id))) active_game = result.scalar_one_or_none() if active_game is None: return None return GameState.model_validate(active_game.game_state) if session: return await _load(session) else: async with get_session() as db: return await _load(db) async def delete_from_db( self, game_id: str, session: AsyncSession | None = None, ) -> bool: """Delete game from Postgres ActiveGame table. Args: game_id: Unique game identifier. session: Optional existing session. Returns: True if deleted, False if not found. """ async def _delete(db: AsyncSession) -> bool: result = await db.execute(select(ActiveGame).where(ActiveGame.id == UUID(game_id))) active_game = result.scalar_one_or_none() if active_game is None: return False await db.delete(active_game) await db.flush() logger.info(f"Deleted ActiveGame: {game_id}") return True if session: return await _delete(session) else: async with get_session() as db: result = await _delete(db) await db.commit() return result # ========================================================================= # High-Level Operations # ========================================================================= async def load_state( self, game_id: str, session: AsyncSession | None = None, ) -> GameState | None: """Load game state, trying cache first then database. This is the primary read operation. It checks Redis first for fast access during gameplay. On cache miss, it loads from Postgres and repopulates the cache. Args: game_id: Unique game identifier. session: Optional existing session for DB fallback. Returns: GameState if found, None otherwise. Example: game = await manager.load_state("abc-123") if game is None: raise GameNotFoundError(game_id) """ # Try cache first (fast path) state = await self.load_from_cache(game_id) if state is not None: logger.debug(f"Cache hit for game: {game_id}") return state # Cache miss - try database logger.debug(f"Cache miss for game: {game_id}, checking database") state = await self.load_from_db(game_id, session=session) if state is not None: # Repopulate cache await self.save_to_cache(state) logger.info(f"Loaded game from database and cached: {game_id}") return state async def delete_game( self, game_id: str, session: AsyncSession | None = None, ) -> bool: """Delete game from both cache and database. Called when a game ends and has been moved to game history. Args: game_id: Unique game identifier. session: Optional existing session for DB delete. Returns: True if deleted from at least one location. """ cache_deleted = await self.delete_from_cache(game_id) db_deleted = await self.delete_from_db(game_id, session=session) deleted = cache_deleted or db_deleted if deleted: logger.info(f"Deleted game completely: {game_id}") else: logger.warning(f"Game not found for deletion: {game_id}") return deleted async def archive_to_history( self, game_id: str, state: GameState, winner_id: UUID | None, winner_is_npc: bool, end_reason: EndReason, replay_data: dict | None = None, session: AsyncSession | None = None, ) -> ArchiveResult: """Archive a completed game to history and cleanup active game. This method handles the complete end-of-game persistence: 1. Loads ActiveGame to get metadata (started_at, player IDs, game type) 2. Creates GameHistory record with replay data 3. Deletes ActiveGame record 4. Deletes from Redis cache All database operations happen in a single transaction. Args: game_id: Unique game identifier. state: Final GameState to archive. winner_id: UUID of the winner (None for draws or NPC wins). winner_is_npc: True if NPC won (campaign games). end_reason: Database EndReason enum value. replay_data: Optional replay data dict (action log, rules, etc.). session: Optional existing session for transaction control. Returns: ArchiveResult with archive metadata. Example: result = await manager.archive_to_history( game_id=game.game_id, state=game, winner_id=UUID(winner_id) if winner_id else None, winner_is_npc=False, end_reason=EndReason.PRIZES_TAKEN, replay_data={"action_log": game.action_log}, ) """ now = datetime.now(UTC) async def _archive(db: AsyncSession) -> ArchiveResult: # Load ActiveGame to get metadata result = await db.execute(select(ActiveGame).where(ActiveGame.id == UUID(game_id))) active_game = result.scalar_one_or_none() # Calculate duration and get metadata if active_game is not None: started_at = active_game.started_at duration_seconds = int((now - started_at).total_seconds()) game_type = active_game.game_type db_player1_id = active_game.player1_id db_player2_id = active_game.player2_id npc_id = active_game.npc_id else: # Fallback if ActiveGame not found (shouldn't happen normally) # Player IDs in GameState are strings that may not be valid UUIDs duration_seconds = 0 game_type = GameType.FREEPLAY db_player1_id = None db_player2_id = None npc_id = None logger.warning(f"ActiveGame not found for {game_id}, using defaults") # Create GameHistory record history = GameHistory( id=UUID(game_id), game_type=game_type, player1_id=db_player1_id, player2_id=db_player2_id, npc_id=npc_id, winner_id=winner_id, winner_is_npc=winner_is_npc, end_reason=end_reason, turn_count=state.turn_number, duration_seconds=duration_seconds, replay_data=replay_data, played_at=now, ) db.add(history) # Delete ActiveGame record if active_game is not None: await db.delete(active_game) logger.debug(f"Deleted ActiveGame record: {game_id}") await db.flush() return ArchiveResult( success=True, game_id=game_id, history_id=game_id, duration_seconds=duration_seconds, turn_count=state.turn_number, game_type=game_type, ) if session: archive_result = await _archive(session) else: async with get_session() as db: archive_result = await _archive(db) await db.commit() # Delete from Redis cache (outside transaction) await self.delete_from_cache(game_id) logger.info( f"Archived game {game_id} to history: " f"turns={archive_result.turn_count}, duration={archive_result.duration_seconds}s" ) return archive_result async def recover_active_games( self, session: AsyncSession | None = None, ) -> RecoveryResult: """Recover all active games from Postgres to Redis. Called on server startup to restore game state. Loads all games from the ActiveGame table and populates Redis cache. Args: session: Optional existing session. Returns: RecoveryResult with counts and any failures. Example: @app.on_event("startup") async def startup(): result = await game_state_manager.recover_active_games() logger.info(f"Recovered {result.recovered}/{result.total} games") if result.failed: logger.error(f"Failed to recover {len(result.failed)} games") """ async def _recover(db: AsyncSession) -> RecoveryResult: recovered = 0 failures: list[tuple[str, str]] = [] result = await db.execute(select(ActiveGame)) active_games = result.scalars().all() total = len(active_games) for active_game in active_games: game_id = str(active_game.id) try: state = GameState.model_validate(active_game.game_state) await self.save_to_cache(state) recovered += 1 logger.debug(f"Recovered game: {game_id}") except Exception as e: error_msg = str(e) failures.append((game_id, error_msg)) logger.error(f"Failed to recover game {game_id}: {error_msg}") logger.info(f"Recovered {recovered}/{total} active games from database") if failures: logger.warning( f"Failed to recover {len(failures)} games: " f"{[gid for gid, _ in failures[:5]]}" + (f" and {len(failures) - 5} more" if len(failures) > 5 else "") ) return RecoveryResult( recovered=recovered, failed=tuple(failures), total=total, ) if session: return await _recover(session) else: async with get_session() as db: return await _recover(db) async def get_active_game_count( self, session: AsyncSession | None = None, ) -> int: """Get the number of active games in the database. Useful for monitoring and admin dashboards. Args: session: Optional existing session. Returns: Number of active games. """ async def _count(db: AsyncSession) -> int: result = await db.execute(select(ActiveGame)) return len(result.scalars().all()) if session: return await _count(session) else: async with get_session() as db: return await _count(db) async def get_player_active_games( self, player_id: UUID, session: AsyncSession | None = None, ) -> list[ActiveGame]: """Get all active games for a player. Args: player_id: User ID to lookup. session: Optional existing session. Returns: List of ActiveGame records where the player is player1 or player2. """ async def _get(db: AsyncSession) -> list[ActiveGame]: result = await db.execute( select(ActiveGame).where( (ActiveGame.player1_id == player_id) | (ActiveGame.player2_id == player_id) ) ) return list(result.scalars().all()) if session: return await _get(session) else: async with get_session() as db: return await _get(db) # Global singleton instance game_state_manager = GameStateManager()