import asyncio import logging from contextlib import asynccontextmanager import socketio from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from sqlalchemy.exc import SQLAlchemyError from app.api.routes import auth, games, health, schedule, teams from app.config import get_settings from app.core.exceptions import ( DatabaseError, GameEngineError, GameNotFoundError, InvalidGameStateError, ) from app.core.state_manager import state_manager from app.database.session import engine, init_db from app.middleware.rate_limit import rate_limiter from app.monitoring.pool_monitor import init_pool_monitor from app.services import redis_client from app.utils.logging import setup_logging from app.websocket.connection_manager import ConnectionManager from app.websocket.handlers import register_handlers logger = logging.getLogger(f"{__name__}.main") # Background task handles _eviction_task: asyncio.Task | None = None _session_expiration_task: asyncio.Task | None = None _rate_limit_cleanup_task: asyncio.Task | None = None _pool_monitoring_task: asyncio.Task | None = None async def periodic_eviction(): """ Background task to periodically evict idle games from memory. Runs on a configurable interval (default 60 minutes) to: 1. Evict games that have been idle beyond the timeout threshold 2. Enforce memory limits by evicting oldest games if over limit 3. Log memory statistics for monitoring This prevents unbounded memory growth from abandoned games. """ settings = get_settings() interval = settings.game_eviction_interval_minutes * 60 logger.info( f"Starting periodic eviction task (interval: {settings.game_eviction_interval_minutes}m, " f"idle timeout: {settings.game_idle_timeout_hours}h, max games: {settings.game_max_in_memory})" ) while True: try: await asyncio.sleep(interval) # Run idle game eviction evicted = await state_manager.evict_idle_games() # Enforce memory limit force_evicted = await state_manager.enforce_memory_limit() # Log stats stats = state_manager.get_memory_stats() logger.info( f"Eviction cycle complete: {len(evicted)} idle + {len(force_evicted)} forced. " f"Active: {stats['active_games']}/{stats['max_games']}, " f"Oldest: {stats['oldest_game_hours']:.1f}h" ) except asyncio.CancelledError: logger.info("Eviction task cancelled - shutting down") break except Exception as e: logger.error(f"Eviction task error: {e}", exc_info=True) # Continue running despite errors - don't let one failure stop eviction async def periodic_session_expiration(connection_manager): """ Background task to periodically expire inactive WebSocket sessions. Runs every 60 seconds to detect and clean up zombie connections that weren't properly closed (network failure, browser crash, etc.). This works alongside Socket.io's ping/pong mechanism: - Socket.io ping/pong: Detects transport-level disconnections - This task: Detects application-level inactivity (no events for extended period) """ settings = get_settings() interval = 60 # Check every minute timeout = settings.ws_connection_timeout * 5 # 5x timeout = 5 minutes default logger.info( f"Starting session expiration task (interval: {interval}s, timeout: {timeout}s)" ) while True: try: await asyncio.sleep(interval) # Expire inactive sessions expired = await connection_manager.expire_inactive_sessions(timeout) # Log stats periodically stats = connection_manager.get_stats() if stats["total_sessions"] > 0: logger.debug( f"Session stats: {stats['total_sessions']} sessions, " f"{stats['unique_users']} users, " f"{stats['active_game_rooms']} active games" ) except asyncio.CancelledError: logger.info("Session expiration task cancelled - shutting down") break except Exception as e: logger.error(f"Session expiration task error: {e}", exc_info=True) # Continue running despite errors @asynccontextmanager async def lifespan(app: FastAPI): """Startup and shutdown events""" global _eviction_task, _session_expiration_task, _rate_limit_cleanup_task, _pool_monitoring_task settings = get_settings() # Startup logger.info("Starting Paper Dynasty Game Backend") setup_logging() # Initialize database await init_db() logger.info("Database initialized") # Initialize pool monitor for connection pool observability pool_monitor = init_pool_monitor(engine) _pool_monitoring_task = asyncio.create_task( pool_monitor.start_monitoring(interval_seconds=60) ) logger.info("Pool monitoring initialized") # Initialize Redis try: redis_url = settings.redis_url await redis_client.connect(redis_url) logger.info(f"Redis initialized: {redis_url}") except Exception as e: logger.warning( f"Redis connection failed: {e}. Position rating caching will be unavailable." ) # Start background eviction task logger.info("Starting background eviction task") _eviction_task = asyncio.create_task(periodic_eviction()) # Start session expiration task (connection_manager created in module scope below) # Note: This task cleans up zombie WebSocket connections logger.info("Starting session expiration task") _session_expiration_task = asyncio.create_task( periodic_session_expiration(connection_manager) ) # Start rate limiter cleanup task # Note: This task cleans up stale rate limit buckets to prevent memory leaks logger.info("Starting rate limiter cleanup task") _rate_limit_cleanup_task = asyncio.create_task( rate_limiter.cleanup_stale_buckets() ) yield # Shutdown logger.info("Shutting down Paper Dynasty Game Backend") # Stop eviction task if _eviction_task: logger.info("Stopping background eviction task") _eviction_task.cancel() try: await _eviction_task except asyncio.CancelledError: pass # Stop session expiration task if _session_expiration_task: logger.info("Stopping session expiration task") _session_expiration_task.cancel() try: await _session_expiration_task except asyncio.CancelledError: pass # Stop rate limiter cleanup task if _rate_limit_cleanup_task: logger.info("Stopping rate limiter cleanup task") _rate_limit_cleanup_task.cancel() try: await _rate_limit_cleanup_task except asyncio.CancelledError: pass # Stop pool monitoring task if _pool_monitoring_task: logger.info("Stopping pool monitoring task") _pool_monitoring_task.cancel() try: await _pool_monitoring_task except asyncio.CancelledError: pass # Disconnect Redis if redis_client.is_connected: await redis_client.disconnect() logger.info("Redis disconnected") # Initialize FastAPI app app = FastAPI( title="Paper Dynasty Game Backend", description="Real-time baseball game engine for Paper Dynasty leagues", version="1.0.0", lifespan=lifespan, ) # Global exception handlers for REST API @app.exception_handler(GameNotFoundError) async def game_not_found_handler(request: Request, exc: GameNotFoundError): """Handle game not found errors with 404 response.""" logger.warning(f"Game not found: {exc.game_id}") return JSONResponse( status_code=404, content={"detail": str(exc), "error_code": "GAME_NOT_FOUND"}, ) @app.exception_handler(InvalidGameStateError) async def invalid_game_state_handler(request: Request, exc: InvalidGameStateError): """Handle invalid game state errors with 400 response.""" logger.warning(f"Invalid game state: {exc}") return JSONResponse( status_code=400, content={ "detail": str(exc), "error_code": "INVALID_GAME_STATE", "current_state": exc.current_state, "expected_state": exc.expected_state, }, ) @app.exception_handler(DatabaseError) async def database_error_handler(request: Request, exc: DatabaseError): """Handle database errors with 500 response.""" logger.error(f"Database error during {exc.operation}: {exc.original_error}") return JSONResponse( status_code=500, content={ "detail": "Database error occurred", "error_code": "DATABASE_ERROR", "operation": exc.operation, }, ) @app.exception_handler(GameEngineError) async def game_engine_error_handler(request: Request, exc: GameEngineError): """Handle generic game engine errors with 400 response.""" logger.warning(f"Game engine error: {exc}") return JSONResponse( status_code=400, content={"detail": str(exc), "error_code": "GAME_ENGINE_ERROR"}, ) @app.exception_handler(SQLAlchemyError) async def sqlalchemy_error_handler(request: Request, exc: SQLAlchemyError): """Handle SQLAlchemy errors with 500 response.""" logger.error(f"SQLAlchemy error: {exc}") return JSONResponse( status_code=500, content={ "detail": "Database error occurred - please retry", "error_code": "DATABASE_ERROR", }, ) # CORS middleware settings = get_settings() app.add_middleware( CORSMiddleware, allow_origins=settings.cors_origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Initialize Socket.io with ping/pong for connection health sio = socketio.AsyncServer( async_mode="asgi", cors_allowed_origins=settings.cors_origins, # Ping/pong for detecting dead connections: # - Server sends ping every ping_interval seconds # - Client must respond with pong within ping_timeout seconds # - Total: connection dies after (ping_interval + ping_timeout) of no response ping_interval=settings.ws_heartbeat_interval, # Default: 30s ping_timeout=settings.ws_connection_timeout, # Default: 60s logger=True, engineio_logger=False, ) # Create Socket.io ASGI app socket_app = socketio.ASGIApp(sio, app) # Initialize connection manager and register handlers connection_manager = ConnectionManager(sio) register_handlers(sio, connection_manager) # Configure game engine with connection manager for real-time event emission from app.core.game_engine import game_engine game_engine.set_connection_manager(connection_manager) # Include API routes app.include_router(health.router, prefix="/api", tags=["health"]) app.include_router(auth.router, prefix="/api/auth", tags=["auth"]) app.include_router(games.router, prefix="/api/games", tags=["games"]) app.include_router(teams.router, prefix="/api/teams", tags=["teams"]) app.include_router(schedule.router, prefix="/api/schedule", tags=["schedule"]) @app.get("/") async def root(): return {"message": "Paper Dynasty Game Backend", "version": "1.0.0"} @app.get("/api/health/connections", tags=["health"]) async def connections_health(): """ WebSocket connection statistics for health monitoring. Returns: - total_sessions: Number of active WebSocket connections - unique_users: Number of unique authenticated users - active_game_rooms: Number of games with connected players - sessions_per_game: Breakdown of connections per game - inactivity_distribution: Sessions grouped by inactivity period - status: healthy (<50 sessions), warning (50-100), critical (>100) """ import pendulum stats = connection_manager.get_stats() # Determine health status based on session count # Thresholds can be adjusted based on expected load total = stats["total_sessions"] if total > 100: status = "critical" elif total > 50: status = "warning" else: status = "healthy" # Include rate limiter stats rate_limit_stats = rate_limiter.get_stats() return { "status": status, **stats, "rate_limiter": rate_limit_stats, "ping_interval_seconds": settings.ws_heartbeat_interval, "ping_timeout_seconds": settings.ws_connection_timeout, "session_expiration_timeout_seconds": settings.ws_connection_timeout * 5, "timestamp": pendulum.now("UTC").to_iso8601_string(), } if __name__ == "__main__": import uvicorn uvicorn.run( "app.main:socket_app", host="0.0.0.0", port=8000, reload=True, log_level="info" )