mantimon-tcg/backend/app/services/game_state_manager.py
Cal Corum 2a95316f04 Add FastAPI lifespan hooks and fix Phase 1 gaps
- Add lifespan context manager to app/main.py with startup/shutdown hooks
- Wire startup: init_db(), init_redis(), CardService.load_all()
- Wire shutdown: close_db(), close_redis()
- Add /health/ready endpoint for readiness checks
- Add CORS middleware with configurable origins
- Disable docs in production (only available in dev)

- Export get_session_dependency from app/db/__init__.py for FastAPI DI
- Add game_cache_ttl_seconds to Settings (configurable, was hardcoded)
- Fix datetime.utcnow() deprecation (4 occurrences) -> datetime.now(UTC)
- Update test to match S3 image URL (was placeholder CDN)

All 974 tests passing.
2026-01-27 15:37:19 -06:00

478 lines
15 KiB
Python

"""Game state management with Redis-primary, Postgres-backup strategy.
This service implements a write-behind caching pattern for game state:
- Redis is the primary store for fast reads/writes during gameplay
- Postgres is the durable backup, updated at turn boundaries and game end
- On server restart, active games are recovered from Postgres → Redis
Key Patterns:
game:{game_id} - JSON serialized GameState in Redis
Write Strategy:
1. Every action: save_to_cache() → Redis only (fast path)
2. Turn boundaries: persist_to_db() → Both Redis and Postgres
3. Game end: persist_to_db() + move to history, delete from active
Read Strategy:
1. load_state() → Try Redis first
2. Cache miss → Load from Postgres, populate Redis
3. Not found → Return None
Recovery Strategy:
1. On startup, recover_active_games() loads all ActiveGame → Redis
2. Stale games (no activity for X hours) are auto-expired
Example:
manager = GameStateManager()
# Create new game
game = create_game_state(...)
await manager.save_to_cache(game)
await manager.persist_to_db(game)
# During gameplay (fast path)
game = await manager.load_state(game_id)
# ... apply action ...
await manager.save_to_cache(game)
# At turn boundary
await manager.persist_to_db(game)
# Game end
await manager.end_game(game_id, game_history)
"""
import logging
from datetime import UTC, datetime
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.core.models.game_state import GameState
from app.db.models import ActiveGame, GameType
from app.db.redis import RedisHelper, redis_helper
from app.db.session import get_session
logger = logging.getLogger(__name__)
# Redis key patterns
GAME_KEY_PREFIX = "game:"
class GameStateManager:
"""Manages game state persistence across Redis and Postgres.
Uses write-behind caching pattern:
- Redis is primary for fast gameplay
- Postgres is durable backup at turn boundaries
Attributes:
redis: RedisHelper instance for cache operations.
"""
def __init__(self, redis: RedisHelper | None = None) -> None:
"""Initialize the GameStateManager.
Args:
redis: Optional RedisHelper instance. Uses global helper if not provided.
"""
self.redis = redis or redis_helper
def _game_key(self, game_id: str) -> str:
"""Generate Redis key for a game.
Args:
game_id: Unique game identifier.
Returns:
Redis key in format "game:{game_id}".
"""
return f"{GAME_KEY_PREFIX}{game_id}"
# =========================================================================
# Cache Operations (Redis)
# =========================================================================
async def save_to_cache(self, game: GameState) -> None:
"""Save game state to Redis cache.
This is the fast path used during gameplay. Game state is serialized
to JSON and stored with a TTL to prevent stale games from lingering.
Args:
game: GameState to cache.
Example:
await manager.save_to_cache(game_state)
"""
key = self._game_key(game.game_id)
data = game.model_dump(mode="json")
await self.redis.set_json(key, data, expire_seconds=settings.game_cache_ttl_seconds)
logger.debug(f"Cached game state: {game.game_id}")
async def load_from_cache(self, game_id: str) -> GameState | None:
"""Load game state from Redis cache.
Args:
game_id: Unique game identifier.
Returns:
GameState if found in cache, None otherwise.
"""
key = self._game_key(game_id)
data = await self.redis.get_json(key)
if data is None:
return None
return GameState.model_validate(data)
async def delete_from_cache(self, game_id: str) -> bool:
"""Delete game state from Redis cache.
Args:
game_id: Unique game identifier.
Returns:
True if deleted, False if not found.
"""
key = self._game_key(game_id)
deleted = await self.redis.delete(key)
if deleted:
logger.debug(f"Deleted game from cache: {game_id}")
return deleted
async def cache_exists(self, game_id: str) -> bool:
"""Check if game exists in cache.
Args:
game_id: Unique game identifier.
Returns:
True if game is in cache.
"""
key = self._game_key(game_id)
return await self.redis.exists(key)
# =========================================================================
# Database Operations (Postgres)
# =========================================================================
async def persist_to_db(
self,
game: GameState,
game_type: GameType = GameType.CAMPAIGN,
player1_id: UUID | None = None,
player2_id: UUID | None = None,
npc_id: str | None = None,
rules_config: dict | None = None,
session: AsyncSession | None = None,
) -> ActiveGame:
"""Persist game state to Postgres (and update Redis).
Creates or updates an ActiveGame record in Postgres. This is called
at turn boundaries and game end for durability.
Args:
game: GameState to persist.
game_type: Type of game (campaign, freeplay, ranked).
player1_id: First player's user ID.
player2_id: Second player's user ID (None for campaign).
npc_id: NPC opponent ID for campaign games.
rules_config: Optional rules override dict.
session: Optional existing session (for transaction support).
Returns:
The created or updated ActiveGame record.
Example:
await manager.persist_to_db(
game_state,
game_type=GameType.CAMPAIGN,
player1_id=user.id,
npc_id="grass_trainer_1"
)
"""
# Serialize game state to JSON
game_state_json = game.model_dump(mode="json")
rules_json = rules_config or game.rules.model_dump(mode="json")
async def _persist(db: AsyncSession) -> ActiveGame:
# Try to find existing game
result = await db.execute(select(ActiveGame).where(ActiveGame.id == UUID(game.game_id)))
active_game = result.scalar_one_or_none()
if active_game is None:
# Create new record
active_game = ActiveGame(
id=UUID(game.game_id),
game_type=game_type,
player1_id=player1_id,
player2_id=player2_id,
npc_id=npc_id,
rules_config=rules_json,
game_state=game_state_json,
turn_number=game.turn_number,
started_at=datetime.now(UTC),
last_action_at=datetime.now(UTC),
)
db.add(active_game)
logger.info(f"Created ActiveGame: {game.game_id}")
else:
# Update existing record
active_game.game_state = game_state_json
active_game.turn_number = game.turn_number
active_game.last_action_at = datetime.now(UTC)
logger.debug(f"Updated ActiveGame: {game.game_id}")
await db.flush()
await db.refresh(active_game)
return active_game
if session:
return await _persist(session)
else:
async with get_session() as db:
result = await _persist(db)
await db.commit()
return result
async def load_from_db(
self,
game_id: str,
session: AsyncSession | None = None,
) -> GameState | None:
"""Load game state from Postgres.
Args:
game_id: Unique game identifier.
session: Optional existing session.
Returns:
GameState if found, None otherwise.
"""
async def _load(db: AsyncSession) -> GameState | None:
result = await db.execute(select(ActiveGame).where(ActiveGame.id == UUID(game_id)))
active_game = result.scalar_one_or_none()
if active_game is None:
return None
return GameState.model_validate(active_game.game_state)
if session:
return await _load(session)
else:
async with get_session() as db:
return await _load(db)
async def delete_from_db(
self,
game_id: str,
session: AsyncSession | None = None,
) -> bool:
"""Delete game from Postgres ActiveGame table.
Args:
game_id: Unique game identifier.
session: Optional existing session.
Returns:
True if deleted, False if not found.
"""
async def _delete(db: AsyncSession) -> bool:
result = await db.execute(select(ActiveGame).where(ActiveGame.id == UUID(game_id)))
active_game = result.scalar_one_or_none()
if active_game is None:
return False
await db.delete(active_game)
await db.flush()
logger.info(f"Deleted ActiveGame: {game_id}")
return True
if session:
return await _delete(session)
else:
async with get_session() as db:
result = await _delete(db)
await db.commit()
return result
# =========================================================================
# High-Level Operations
# =========================================================================
async def load_state(
self,
game_id: str,
session: AsyncSession | None = None,
) -> GameState | None:
"""Load game state, trying cache first then database.
This is the primary read operation. It checks Redis first for fast
access during gameplay. On cache miss, it loads from Postgres and
repopulates the cache.
Args:
game_id: Unique game identifier.
session: Optional existing session for DB fallback.
Returns:
GameState if found, None otherwise.
Example:
game = await manager.load_state("abc-123")
if game is None:
raise GameNotFoundError(game_id)
"""
# Try cache first (fast path)
state = await self.load_from_cache(game_id)
if state is not None:
logger.debug(f"Cache hit for game: {game_id}")
return state
# Cache miss - try database
logger.debug(f"Cache miss for game: {game_id}, checking database")
state = await self.load_from_db(game_id, session=session)
if state is not None:
# Repopulate cache
await self.save_to_cache(state)
logger.info(f"Loaded game from database and cached: {game_id}")
return state
async def delete_game(
self,
game_id: str,
session: AsyncSession | None = None,
) -> bool:
"""Delete game from both cache and database.
Called when a game ends and has been moved to game history.
Args:
game_id: Unique game identifier.
session: Optional existing session for DB delete.
Returns:
True if deleted from at least one location.
"""
cache_deleted = await self.delete_from_cache(game_id)
db_deleted = await self.delete_from_db(game_id, session=session)
deleted = cache_deleted or db_deleted
if deleted:
logger.info(f"Deleted game completely: {game_id}")
else:
logger.warning(f"Game not found for deletion: {game_id}")
return deleted
async def recover_active_games(
self,
session: AsyncSession | None = None,
) -> int:
"""Recover all active games from Postgres to Redis.
Called on server startup to restore game state. Loads all games
from the ActiveGame table and populates Redis cache.
Args:
session: Optional existing session.
Returns:
Number of games recovered.
Example:
@app.on_event("startup")
async def startup():
count = await game_state_manager.recover_active_games()
logger.info(f"Recovered {count} active games")
"""
async def _recover(db: AsyncSession) -> int:
count = 0
result = await db.execute(select(ActiveGame))
active_games = result.scalars().all()
for active_game in active_games:
try:
state = GameState.model_validate(active_game.game_state)
await self.save_to_cache(state)
count += 1
logger.debug(f"Recovered game: {active_game.id}")
except Exception as e:
logger.error(f"Failed to recover game {active_game.id}: {e}")
logger.info(f"Recovered {count} active games from database")
return count
if session:
return await _recover(session)
else:
async with get_session() as db:
return await _recover(db)
async def get_active_game_count(
self,
session: AsyncSession | None = None,
) -> int:
"""Get the number of active games in the database.
Useful for monitoring and admin dashboards.
Args:
session: Optional existing session.
Returns:
Number of active games.
"""
async def _count(db: AsyncSession) -> int:
result = await db.execute(select(ActiveGame))
return len(result.scalars().all())
if session:
return await _count(session)
else:
async with get_session() as db:
return await _count(db)
async def get_player_active_games(
self,
player_id: UUID,
session: AsyncSession | None = None,
) -> list[ActiveGame]:
"""Get all active games for a player.
Args:
player_id: User ID to lookup.
session: Optional existing session.
Returns:
List of ActiveGame records where the player is player1 or player2.
"""
async def _get(db: AsyncSession) -> list[ActiveGame]:
result = await db.execute(
select(ActiveGame).where(
(ActiveGame.player1_id == player_id) | (ActiveGame.player2_id == player_id)
)
)
return list(result.scalars().all())
if session:
return await _get(session)
else:
async with get_session() as db:
return await _get(db)
# Global singleton instance
game_state_manager = GameStateManager()