"""Socket.IO server configuration and ASGI app creation. This module sets up the python-socketio AsyncServer and creates the combined ASGI application that mounts Socket.IO alongside FastAPI. Architecture: - AsyncServer handles WebSocket connections with async_mode='asgi' - Socket.IO app is mounted at /socket.io path - CORS settings match FastAPI configuration - JWT authentication on connect via auth.py - Namespaces are registered for different communication domains - Event handlers delegate to GameNamespaceHandler for game logic Namespaces: /game - Active game communication (actions, state updates) /lobby - Pre-game lobby (matchmaking, invites) - Phase 6 Authentication: Clients must provide a JWT access token in the auth parameter: socket.connect({ auth: { token: "JWT_ACCESS_TOKEN" } }) Example: from app.socketio import create_socketio_app from fastapi import FastAPI fastapi_app = FastAPI() combined_app = create_socketio_app(fastapi_app) # Run with: uvicorn app.main:app """ import logging from typing import TYPE_CHECKING import socketio from app.config import settings from app.socketio.auth import auth_handler, require_auth from app.socketio.game_namespace import game_namespace_handler if TYPE_CHECKING: from fastapi import FastAPI logger = logging.getLogger(__name__) # Create the AsyncServer instance # - async_mode='asgi' for ASGI compatibility with uvicorn # - cors_allowed_origins matches FastAPI CORS settings # - logger enables Socket.IO internal logging in debug mode sio = socketio.AsyncServer( async_mode="asgi", cors_allowed_origins=settings.cors_origins if settings.cors_origins else [], logger=settings.debug, engineio_logger=settings.debug, ) # ============================================================================= # /game Namespace - Active Game Communication # ============================================================================= # Event handlers for real-time game communication. These handlers authenticate # requests and delegate to GameNamespaceHandler for game logic. @sio.event(namespace="/game") async def connect( sid: str, environ: dict[str, object], auth: dict[str, object] | None = None ) -> bool | None: """Handle client connection to /game namespace. Authenticates the connection using JWT from auth data. Rejects connections without valid authentication. After successful auth, checks for active games and auto-rejoins if found. Args: sid: Socket session ID assigned by Socket.IO. environ: WSGI/ASGI environ dict with request info. auth: Authentication data sent by client (JWT token). Expected format: { token: "JWT_ACCESS_TOKEN" } Returns: True to accept connection, False to reject. None is treated as True (accept). """ # Authenticate the connection auth_result = await auth_handler.authenticate_connection(sid, auth) if not auth_result.success: logger.warning( f"Connection rejected for {sid}: {auth_result.error_code} - {auth_result.error_message}" ) # Emit error before rejecting (client may receive this) await sio.emit( "auth_error", { "code": auth_result.error_code, "message": auth_result.error_message, }, to=sid, namespace="/game", ) return False # Set up authenticated session and register connection await auth_handler.setup_authenticated_session(sio, sid, auth_result.user_id, namespace="/game") logger.info(f"Client authenticated to /game: sid={sid}, user_id={auth_result.user_id}") # Check for active games and auto-rejoin user_id_str = str(auth_result.user_id) reconnect_info = await game_namespace_handler.handle_reconnect(sio, sid, user_id_str) if reconnect_info: # Emit reconnection event to inform client they're back in a game await sio.emit( "game:reconnected", reconnect_info, to=sid, namespace="/game", ) logger.info(f"Emitted game:reconnected for {sid}: game_id={reconnect_info.get('game_id')}") return True @sio.event(namespace="/game") async def disconnect(sid: str) -> None: """Handle client disconnection from /game namespace. Cleans up connection state and notifies other game participants. Args: sid: Socket session ID of disconnecting client. """ # Get user_id before cleanup for opponent notification user_id = await require_auth(sio, sid) # Notify opponent of disconnection if in a game if user_id: await game_namespace_handler.handle_disconnect(sio, sid, user_id) # Clean up session and connection tracking cleanup_user_id = await auth_handler.cleanup_authenticated_session(sid, namespace="/game") if cleanup_user_id: logger.info(f"Client disconnected from /game: sid={sid}, user_id={cleanup_user_id}") else: logger.debug(f"Unauthenticated client disconnected: {sid}") @sio.on("game:join", namespace="/game") async def on_game_join(sid: str, data: dict[str, object]) -> dict[str, object]: """Handle request to join/rejoin a game session. Authenticates the request and delegates to GameNamespaceHandler. On success, the player receives their filtered game state. Args: sid: Socket session ID. data: Message containing game_id and optional last_event_id for resume. Returns: Response with game state or error. """ logger.debug(f"game:join from {sid}: {data}") # Require authentication user_id = await require_auth(sio, sid) if not user_id: return { "success": False, "error": {"code": "unauthenticated", "message": "Not authenticated"}, } return await game_namespace_handler.handle_join(sio, sid, user_id, dict(data)) @sio.on("game:action", namespace="/game") async def on_game_action(sid: str, data: dict[str, object]) -> dict[str, object]: """Handle game action from player. Authenticates the request, validates the action, and delegates to GameNamespaceHandler for execution. On success, broadcasts the updated game state to all participants. Args: sid: Socket session ID. data: Action message with game_id and action object. Returns: Action result or error. """ logger.debug(f"game:action from {sid}: {data}") # Require authentication user_id = await require_auth(sio, sid) if not user_id: return { "success": False, "error": {"code": "unauthenticated", "message": "Not authenticated"}, } return await game_namespace_handler.handle_action(sio, sid, user_id, dict(data)) @sio.on("game:resign", namespace="/game") async def on_game_resign(sid: str, data: dict[str, object]) -> dict[str, object]: """Handle player resignation. Authenticates the request and delegates to GameNamespaceHandler. On success, ends the game and broadcasts the result to all participants. Args: sid: Socket session ID. data: Resignation message containing game_id. Returns: Confirmation with game result or error. """ logger.debug(f"game:resign from {sid}: {data}") # Require authentication user_id = await require_auth(sio, sid) if not user_id: return { "success": False, "error": {"code": "unauthenticated", "message": "Not authenticated"}, } return await game_namespace_handler.handle_resign(sio, sid, user_id, dict(data)) @sio.on("game:heartbeat", namespace="/game") async def on_game_heartbeat(sid: str, data: dict[str, object] | None = None) -> dict[str, object]: """Handle heartbeat to keep connection alive. Updates last_seen timestamp in ConnectionManager to prevent the connection from being marked as stale. Args: sid: Socket session ID. data: Optional heartbeat data (message_id for tracking). Returns: Heartbeat acknowledgment with server timestamp. """ from datetime import UTC, datetime from app.services.connection_manager import connection_manager # Require authentication user_id = await require_auth(sio, sid) if not user_id: return {"error": "Not authenticated", "code": "unauthenticated"} # Update last_seen in ConnectionManager await connection_manager.update_heartbeat(sid) # Return acknowledgment with timestamp return { "type": "heartbeat_ack", "timestamp": datetime.now(UTC).isoformat(), "message_id": data.get("message_id") if data else None, } @sio.on("game:spectate", namespace="/game") async def on_game_spectate(sid: str, data: dict[str, object]) -> dict[str, object]: """Handle request to spectate a game. Authenticates the request and delegates to GameNamespaceHandler. On success, the user receives a spectator-filtered game state (no hands visible). Args: sid: Socket session ID. data: Message containing game_id to spectate. Returns: Response with spectator game state or error. """ logger.debug(f"game:spectate from {sid}: {data}") # Require authentication user_id = await require_auth(sio, sid) if not user_id: return { "success": False, "error": {"code": "unauthenticated", "message": "Not authenticated"}, } return await game_namespace_handler.handle_spectate(sio, sid, user_id, dict(data)) @sio.on("game:leave_spectate", namespace="/game") async def on_game_leave_spectate(sid: str, data: dict[str, object]) -> dict[str, object]: """Handle request to stop spectating a game. Authenticates the request and removes the user from spectator list. Args: sid: Socket session ID. data: Message containing game_id to stop spectating. Returns: Confirmation of spectate leave. """ logger.debug(f"game:leave_spectate from {sid}: {data}") # Require authentication user_id = await require_auth(sio, sid) if not user_id: return { "success": False, "error": {"code": "unauthenticated", "message": "Not authenticated"}, } return await game_namespace_handler.handle_leave_spectate(sio, sid, user_id, dict(data)) # ============================================================================= # ASGI App Creation # ============================================================================= def create_socketio_app(fastapi_app: "FastAPI") -> socketio.ASGIApp: """Create combined ASGI app with Socket.IO mounted alongside FastAPI. This wraps the FastAPI app with Socket.IO, handling: - WebSocket connections at /socket.io/ - HTTP requests passed through to FastAPI Args: fastapi_app: The FastAPI application instance. Returns: Combined ASGI application to use with uvicorn. Example: app = FastAPI() combined = create_socketio_app(app) # uvicorn will use 'combined' as the ASGI app """ return socketio.ASGIApp( sio, other_asgi_app=fastapi_app, socketio_path="socket.io", )