Implement /game namespace event handlers (WS-005, WS-006)

Add GameNamespaceHandler with full event handling for real-time gameplay:
- handle_join: Join/rejoin games with visibility-filtered state
- handle_action: Execute actions and broadcast state to participants
- handle_resign: Process resignation and end game
- handle_disconnect: Notify opponent of disconnection
- Broadcast helpers for state, game over, and opponent status

Includes 28 unit tests covering all handler methods.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Cal Corum 2026-01-29 20:40:06 -06:00
parent 531d3e1e79
commit 154d466ff1
5 changed files with 1607 additions and 25 deletions

View File

@ -0,0 +1,606 @@
"""Game namespace event handlers for WebSocket communication.
This module implements all /game namespace event handling for real-time
game communication. It orchestrates between the WebSocket layer and
GameService to handle:
- Game joining and state synchronization
- Action execution and result broadcasting
- Resignation handling
- Disconnect notifications to opponents
Architecture:
Socket.IO Events -> GameNamespaceHandler -> GameService
-> ConnectionManager (for routing)
-> Socket.IO Emits (responses)
The handler is designed with dependency injection for testability.
All game logic is delegated to GameService; this module only handles
the WebSocket communication layer.
Example:
from app.socketio.game_namespace import game_namespace_handler
@sio.on("game:join", namespace="/game")
async def on_game_join(sid: str, data: dict) -> dict:
return await game_namespace_handler.handle_join(sio, sid, data)
"""
import logging
from typing import TYPE_CHECKING, Any
from pydantic import ValidationError
from app.core.models.actions import parse_action
from app.core.visibility import get_visible_state
from app.schemas.ws_messages import (
ConnectionStatus,
GameOverMessage,
GameStateMessage,
OpponentStatusMessage,
WSErrorCode,
)
from app.services.connection_manager import ConnectionManager, connection_manager
from app.services.game_service import (
ForcedActionRequiredError,
GameAlreadyEndedError,
GameNotFoundError,
GameService,
InvalidActionError,
NotPlayerTurnError,
PlayerNotInGameError,
game_service,
)
if TYPE_CHECKING:
import socketio
logger = logging.getLogger(__name__)
class GameNamespaceHandler:
"""Handler for /game namespace WebSocket events.
This class encapsulates all game-related event handling logic with
dependencies injected via constructor for testability.
Attributes:
_game_service: GameService for game operations.
_connection_manager: ConnectionManager for connection tracking.
"""
def __init__(
self,
game_svc: GameService | None = None,
conn_manager: ConnectionManager | None = None,
) -> None:
"""Initialize the GameNamespaceHandler.
Args:
game_svc: GameService instance. Uses global if not provided.
conn_manager: ConnectionManager instance. Uses global if not provided.
"""
self._game_service = game_svc or game_service
self._connection_manager = conn_manager or connection_manager
# =========================================================================
# Event Handlers
# =========================================================================
async def handle_join(
self,
sio: "socketio.AsyncServer",
sid: str,
user_id: str,
data: dict[str, Any],
) -> dict[str, Any]:
"""Handle game:join event.
Joins or rejoins a player to a game session. On success, the player
is added to the game room and receives the current game state.
Args:
sio: Socket.IO server instance.
sid: Socket session ID.
user_id: Authenticated user's ID.
data: Message data with game_id and optional last_event_id.
Returns:
Response dict with success status and game state or error.
"""
game_id = data.get("game_id")
last_event_id = data.get("last_event_id")
message_id = data.get("message_id", "")
if not game_id:
logger.warning(f"game:join missing game_id from {sid}")
return self._error_response(
WSErrorCode.INVALID_MESSAGE,
"game_id is required",
message_id,
)
try:
# Join the game via GameService
result = await self._game_service.join_game(
game_id=game_id,
player_id=user_id,
last_event_id=last_event_id,
)
if not result.success:
error_code = WSErrorCode.GAME_NOT_FOUND
if "not a participant" in result.message:
error_code = WSErrorCode.NOT_IN_GAME
return self._error_response(error_code, result.message, message_id)
# Register this connection with the game
await self._connection_manager.join_game(sid, game_id)
# Join the Socket.IO room for this game
await sio.enter_room(sid, f"game:{game_id}", namespace="/game")
# Notify opponent that player connected
await self._notify_opponent_status(
sio, sid, game_id, user_id, ConnectionStatus.CONNECTED
)
# Build response with game state
response: dict[str, Any] = {
"success": True,
"game_id": game_id,
"is_your_turn": result.is_your_turn,
"game_over": result.game_over,
}
if result.visible_state:
response["state"] = result.visible_state.model_dump(mode="json")
if result.pending_forced_action:
response["pending_forced_action"] = {
"player_id": result.pending_forced_action.player_id,
"action_type": result.pending_forced_action.action_type,
"reason": result.pending_forced_action.reason,
"params": result.pending_forced_action.params,
}
logger.info(f"Player {user_id} joined game {game_id}")
return response
except Exception as e:
logger.exception(f"Error joining game {game_id}: {e}")
return self._error_response(
WSErrorCode.INTERNAL_ERROR,
"Failed to join game",
message_id,
)
async def handle_action(
self,
sio: "socketio.AsyncServer",
sid: str,
user_id: str,
data: dict[str, Any],
) -> dict[str, Any]:
"""Handle game:action event.
Executes a game action and broadcasts results to all participants.
Args:
sio: Socket.IO server instance.
sid: Socket session ID.
user_id: Authenticated user's ID.
data: Message data with game_id and action.
Returns:
Response dict with action result or error.
"""
game_id = data.get("game_id")
action_data = data.get("action")
message_id = data.get("message_id", "")
if not game_id:
return self._error_response(
WSErrorCode.INVALID_MESSAGE,
"game_id is required",
message_id,
)
if not action_data:
return self._error_response(
WSErrorCode.INVALID_MESSAGE,
"action is required",
message_id,
)
# Parse the action
try:
action = parse_action(action_data)
except (ValidationError, ValueError) as e:
logger.warning(f"Invalid action from {sid}: {e}")
return self._error_response(
WSErrorCode.INVALID_ACTION,
f"Invalid action format: {e}",
message_id,
)
# Execute the action
try:
result = await self._game_service.execute_action(
game_id=game_id,
player_id=user_id,
action=action,
)
# Broadcast state to all participants
await self._broadcast_game_state(sio, game_id)
# Build response
response: dict[str, Any] = {
"success": True,
"game_id": game_id,
"action_type": result.action_type,
"message": result.message,
"turn_changed": result.turn_changed,
"current_player_id": result.current_player_id,
}
if result.state_changes:
response["changes"] = result.state_changes
if result.pending_forced_action:
response["pending_forced_action"] = {
"player_id": result.pending_forced_action.player_id,
"action_type": result.pending_forced_action.action_type,
"reason": result.pending_forced_action.reason,
"params": result.pending_forced_action.params,
}
# Handle game over
if result.game_over:
response["game_over"] = True
response["winner_id"] = result.winner_id
response["end_reason"] = result.end_reason.value if result.end_reason else None
# End the game and archive to history
if result.end_reason:
await self._game_service.end_game(
game_id=game_id,
winner_id=result.winner_id,
end_reason=result.end_reason,
)
# Broadcast game over to room
await self._broadcast_game_over(sio, game_id, result.winner_id, result.end_reason)
logger.debug(f"Action executed: game={game_id}, player={user_id}, type={action.type}")
return response
except GameNotFoundError:
return self._error_response(
WSErrorCode.GAME_NOT_FOUND,
f"Game {game_id} not found",
message_id,
)
except PlayerNotInGameError:
return self._error_response(
WSErrorCode.NOT_IN_GAME,
"You are not a participant in this game",
message_id,
)
except GameAlreadyEndedError:
return self._error_response(
WSErrorCode.GAME_ENDED,
"Game has already ended",
message_id,
)
except NotPlayerTurnError as e:
return self._error_response(
WSErrorCode.NOT_YOUR_TURN,
f"Not your turn. Current player: {e.current_player_id}",
message_id,
)
except ForcedActionRequiredError as e:
return self._error_response(
WSErrorCode.ACTION_NOT_ALLOWED,
f"Forced action required: {e.required_action_type}",
message_id,
)
except InvalidActionError as e:
return self._error_response(
WSErrorCode.INVALID_ACTION,
e.reason,
message_id,
)
except Exception as e:
logger.exception(f"Error executing action in game {game_id}: {e}")
return self._error_response(
WSErrorCode.INTERNAL_ERROR,
"Failed to execute action",
message_id,
)
async def handle_resign(
self,
sio: "socketio.AsyncServer",
sid: str,
user_id: str,
data: dict[str, Any],
) -> dict[str, Any]:
"""Handle game:resign event.
Processes a player's resignation from the game.
Args:
sio: Socket.IO server instance.
sid: Socket session ID.
user_id: Authenticated user's ID.
data: Message data with game_id.
Returns:
Response dict with resignation result or error.
"""
game_id = data.get("game_id")
message_id = data.get("message_id", "")
if not game_id:
return self._error_response(
WSErrorCode.INVALID_MESSAGE,
"game_id is required",
message_id,
)
try:
result = await self._game_service.resign_game(
game_id=game_id,
player_id=user_id,
)
# End the game and archive
if result.game_over and result.end_reason:
await self._game_service.end_game(
game_id=game_id,
winner_id=result.winner_id,
end_reason=result.end_reason,
)
# Broadcast game over
await self._broadcast_game_over(sio, game_id, result.winner_id, result.end_reason)
logger.info(f"Player {user_id} resigned from game {game_id}")
return {
"success": True,
"game_id": game_id,
"game_over": True,
"winner_id": result.winner_id,
"end_reason": result.end_reason.value if result.end_reason else None,
}
except GameNotFoundError:
return self._error_response(
WSErrorCode.GAME_NOT_FOUND,
f"Game {game_id} not found",
message_id,
)
except PlayerNotInGameError:
return self._error_response(
WSErrorCode.NOT_IN_GAME,
"You are not a participant in this game",
message_id,
)
except GameAlreadyEndedError:
return self._error_response(
WSErrorCode.GAME_ENDED,
"Game has already ended",
message_id,
)
except Exception as e:
logger.exception(f"Error resigning from game {game_id}: {e}")
return self._error_response(
WSErrorCode.INTERNAL_ERROR,
"Failed to resign from game",
message_id,
)
async def handle_disconnect(
self,
sio: "socketio.AsyncServer",
sid: str,
user_id: str,
) -> None:
"""Handle disconnect event.
Notifies opponents in any active game that the player disconnected.
Args:
sio: Socket.IO server instance.
sid: Socket session ID.
user_id: Authenticated user's ID.
"""
# Get connection info to find active game
conn_info = await self._connection_manager.get_connection(sid)
if conn_info is None or conn_info.game_id is None:
return
game_id = conn_info.game_id
# Notify opponent of disconnect
await self._notify_opponent_status(
sio, sid, game_id, user_id, ConnectionStatus.DISCONNECTED
)
logger.info(f"Player {user_id} disconnected from game {game_id}")
# =========================================================================
# Broadcast Helpers
# =========================================================================
async def _broadcast_game_state(
self,
sio: "socketio.AsyncServer",
game_id: str,
) -> None:
"""Broadcast filtered game state to all participants.
Each player receives their own visibility-filtered view of the game.
Args:
sio: Socket.IO server instance.
game_id: The game to broadcast.
"""
try:
# Get full game state
state = await self._game_service.get_game_state(game_id)
# Get all connected players for this game
user_sids = await self._connection_manager.get_game_user_sids(game_id)
# Send filtered state to each player
for player_id, player_sid in user_sids.items():
try:
visible_state = get_visible_state(state, player_id)
message = GameStateMessage(
game_id=game_id,
state=visible_state,
)
await sio.emit(
"game:state",
message.model_dump(mode="json"),
to=player_sid,
namespace="/game",
)
except ValueError:
# Player not in game - skip
logger.warning(f"Player {player_id} not in game {game_id}")
continue
except GameNotFoundError:
logger.warning(f"Cannot broadcast state: game {game_id} not found")
except Exception as e:
logger.exception(f"Error broadcasting state for game {game_id}: {e}")
async def _broadcast_game_over(
self,
sio: "socketio.AsyncServer",
game_id: str,
winner_id: str | None,
end_reason: Any,
) -> None:
"""Broadcast game over notification to all participants.
Args:
sio: Socket.IO server instance.
game_id: The game that ended.
winner_id: The winner's player ID, or None for draw.
end_reason: The GameEndReason for why the game ended.
"""
try:
# Get final state for each player
state = await self._game_service.get_game_state(game_id)
user_sids = await self._connection_manager.get_game_user_sids(game_id)
for player_id, player_sid in user_sids.items():
try:
visible_state = get_visible_state(state, player_id)
message = GameOverMessage(
game_id=game_id,
winner_id=winner_id,
end_reason=end_reason,
final_state=visible_state,
)
await sio.emit(
"game:game_over",
message.model_dump(mode="json"),
to=player_sid,
namespace="/game",
)
except ValueError:
continue
except GameNotFoundError:
# Game already archived - just emit to room without state
message = GameOverMessage(
game_id=game_id,
winner_id=winner_id,
end_reason=end_reason,
final_state=None, # type: ignore[arg-type]
)
await sio.emit(
"game:game_over",
{"game_id": game_id, "winner_id": winner_id, "end_reason": end_reason.value},
room=f"game:{game_id}",
namespace="/game",
)
except Exception as e:
logger.exception(f"Error broadcasting game over for {game_id}: {e}")
async def _notify_opponent_status(
self,
sio: "socketio.AsyncServer",
sender_sid: str,
game_id: str,
user_id: str,
status: ConnectionStatus,
) -> None:
"""Notify opponent of connection status change.
Args:
sio: Socket.IO server instance.
sender_sid: The sid of the player whose status changed.
game_id: The game ID.
user_id: The player whose status changed.
status: The new connection status.
"""
try:
# Find opponent's sid
opponent_sid = await self._connection_manager.get_opponent_sid(game_id, user_id)
if opponent_sid:
message = OpponentStatusMessage(
game_id=game_id,
opponent_id=user_id,
status=status,
)
await sio.emit(
"game:opponent_status",
message.model_dump(mode="json"),
to=opponent_sid,
namespace="/game",
)
logger.debug(f"Notified opponent of {user_id} status: {status}")
except Exception as e:
logger.exception(f"Error notifying opponent status: {e}")
# =========================================================================
# Helper Methods
# =========================================================================
def _error_response(
self,
code: WSErrorCode,
message: str,
request_message_id: str = "",
) -> dict[str, Any]:
"""Create a standardized error response.
Args:
code: The error code.
message: Human-readable error message.
request_message_id: The original request's message_id.
Returns:
Dict with error information.
"""
return {
"success": False,
"error": {
"code": code.value,
"message": message,
},
"request_message_id": request_message_id,
}
# Global singleton instance
game_namespace_handler = GameNamespaceHandler()

View File

@ -9,6 +9,7 @@ Architecture:
- CORS settings match FastAPI configuration
- JWT authentication on connect via auth.py
- Namespaces are registered for different communication domains
- Event handlers delegate to GameNamespaceHandler for game logic
Namespaces:
/game - Active game communication (actions, state updates)
@ -34,6 +35,7 @@ import socketio
from app.config import settings
from app.socketio.auth import auth_handler, require_auth
from app.socketio.game_namespace import game_namespace_handler
if TYPE_CHECKING:
from fastapi import FastAPI
@ -55,8 +57,8 @@ sio = socketio.AsyncServer(
# =============================================================================
# /game Namespace - Active Game Communication
# =============================================================================
# These are skeleton handlers that will be fully implemented in WS-005.
# For now, they provide basic connection lifecycle handling.
# Event handlers for real-time game communication. These handlers authenticate
# requests and delegate to GameNamespaceHandler for game logic.
@sio.event(namespace="/game")
@ -113,12 +115,18 @@ async def disconnect(sid: str) -> None:
Args:
sid: Socket session ID of disconnecting client.
"""
# Clean up session and get user info
user_id = await auth_handler.cleanup_authenticated_session(sid, namespace="/game")
# Get user_id before cleanup for opponent notification
user_id = await require_auth(sio, sid)
# Notify opponent of disconnection if in a game
if user_id:
logger.info(f"Client disconnected from /game: sid={sid}, user_id={user_id}")
# TODO (WS-005): Notify opponent of disconnection if in game
await game_namespace_handler.handle_disconnect(sio, sid, user_id)
# Clean up session and connection tracking
cleanup_user_id = await auth_handler.cleanup_authenticated_session(sid, namespace="/game")
if cleanup_user_id:
logger.info(f"Client disconnected from /game: sid={sid}, user_id={cleanup_user_id}")
else:
logger.debug(f"Unauthenticated client disconnected: {sid}")
@ -127,6 +135,9 @@ async def disconnect(sid: str) -> None:
async def on_game_join(sid: str, data: dict[str, object]) -> dict[str, object]:
"""Handle request to join/rejoin a game session.
Authenticates the request and delegates to GameNamespaceHandler.
On success, the player receives their filtered game state.
Args:
sid: Socket session ID.
data: Message containing game_id and optional last_event_id for resume.
@ -135,40 +146,71 @@ async def on_game_join(sid: str, data: dict[str, object]) -> dict[str, object]:
Response with game state or error.
"""
logger.debug(f"game:join from {sid}: {data}")
# TODO (WS-005): Implement with GameService
return {"error": "Not implemented yet"}
# Require authentication
user_id = await require_auth(sio, sid)
if not user_id:
return {
"success": False,
"error": {"code": "unauthenticated", "message": "Not authenticated"},
}
return await game_namespace_handler.handle_join(sio, sid, user_id, dict(data))
@sio.on("game:action", namespace="/game")
async def on_game_action(sid: str, data: dict[str, object]) -> dict[str, object]:
"""Handle game action from player.
Authenticates the request, validates the action, and delegates to
GameNamespaceHandler for execution. On success, broadcasts the updated
game state to all participants.
Args:
sid: Socket session ID.
data: Action message with type and parameters.
data: Action message with game_id and action object.
Returns:
Action result or error.
"""
logger.debug(f"game:action from {sid}: {data}")
# TODO (WS-005): Implement with GameService
return {"error": "Not implemented yet"}
# Require authentication
user_id = await require_auth(sio, sid)
if not user_id:
return {
"success": False,
"error": {"code": "unauthenticated", "message": "Not authenticated"},
}
return await game_namespace_handler.handle_action(sio, sid, user_id, dict(data))
@sio.on("game:resign", namespace="/game")
async def on_game_resign(sid: str, data: dict[str, object]) -> dict[str, object]:
"""Handle player resignation.
Authenticates the request and delegates to GameNamespaceHandler.
On success, ends the game and broadcasts the result to all participants.
Args:
sid: Socket session ID.
data: Resignation message (may be empty).
data: Resignation message containing game_id.
Returns:
Confirmation or error.
Confirmation with game result or error.
"""
logger.debug(f"game:resign from {sid}: {data}")
# TODO (WS-005): Implement with GameService
return {"error": "Not implemented yet"}
# Require authentication
user_id = await require_auth(sio, sid)
if not user_id:
return {
"success": False,
"error": {"code": "unauthenticated", "message": "Not authenticated"},
}
return await game_namespace_handler.handle_resign(sio, sid, user_id, dict(data))
@sio.on("game:heartbeat", namespace="/game")

View File

@ -2,14 +2,14 @@
"meta": {
"version": "1.0.0",
"created": "2026-01-28",
"lastUpdated": "2026-01-28",
"lastUpdated": "2026-01-29",
"planType": "phase",
"phaseId": "PHASE_4",
"phaseName": "Game Service + WebSocket",
"description": "Real-time gameplay infrastructure - WebSocket communication, game lifecycle management, reconnection handling, and turn timeout system",
"totalEstimatedHours": 45,
"totalTasks": 18,
"completedTasks": 10,
"completedTasks": 12,
"status": "in_progress",
"masterPlan": "../PROJECT_PLAN_MASTER.json"
},
@ -321,12 +321,12 @@
"description": "Socket.IO event handlers for game communication",
"category": "websocket",
"priority": 10,
"completed": false,
"tested": false,
"completed": true,
"tested": true,
"dependencies": ["WS-004", "GS-003", "GS-004"],
"files": [
{"path": "app/socketio/game_namespace.py", "status": "create"},
{"path": "app/socketio/server.py", "status": "modify"}
{"path": "app/socketio/game_namespace.py", "status": "created"},
{"path": "app/socketio/server.py", "status": "modified"}
],
"details": [
"Event: connect - Auth, register connection, check for resumable games",
@ -347,11 +347,11 @@
"description": "Utilities to broadcast game state to all participants with proper filtering",
"category": "websocket",
"priority": 11,
"completed": false,
"tested": false,
"completed": true,
"tested": true,
"dependencies": ["WS-005"],
"files": [
{"path": "app/socketio/broadcast.py", "status": "create"}
{"path": "app/socketio/game_namespace.py", "status": "modified", "note": "Integrated into GameNamespaceHandler"}
],
"details": [
"broadcast_game_state: Send filtered state to each player in game room",
@ -363,7 +363,7 @@
"Handle spectators (spectator view uses get_spectator_state)"
],
"estimatedHours": 2,
"notes": "Consider delta updates for bandwidth optimization (future)"
"notes": "Integrated into GameNamespaceHandler rather than separate file. Spectator mode deferred to OPT-001."
},
{
"id": "TO-001",

View File

@ -0,0 +1 @@
"""Unit tests for Socket.IO modules."""

View File

@ -0,0 +1,933 @@
"""Tests for GameNamespaceHandler.
This module tests the WebSocket event handlers for the /game namespace.
All tests use dependency injection to mock GameService and ConnectionManager,
following the project's no-monkey-patching testing pattern.
"""
from unittest.mock import AsyncMock
import pytest
from app.core.enums import GameEndReason, TurnPhase
from app.core.models.game_state import GameState, PlayerState
from app.core.visibility import VisibleGameState
from app.schemas.ws_messages import ConnectionStatus, WSErrorCode
from app.services.connection_manager import ConnectionInfo
from app.services.game_service import (
ForcedActionRequiredError,
GameActionResult,
GameAlreadyEndedError,
GameEndResult,
GameJoinResult,
GameNotFoundError,
InvalidActionError,
NotPlayerTurnError,
PendingForcedAction,
)
from app.socketio.game_namespace import GameNamespaceHandler
@pytest.fixture
def mock_game_service() -> AsyncMock:
"""Create a mock GameService.
The GameService orchestrates game operations - join, execute action,
resign, end game, and state retrieval.
"""
service = AsyncMock()
service.join_game = AsyncMock()
service.execute_action = AsyncMock()
service.resign_game = AsyncMock()
service.end_game = AsyncMock()
service.get_game_state = AsyncMock()
return service
@pytest.fixture
def mock_connection_manager() -> AsyncMock:
"""Create a mock ConnectionManager.
The ConnectionManager tracks WebSocket connections and their
association with games.
"""
cm = AsyncMock()
cm.join_game = AsyncMock(return_value=True)
cm.leave_game = AsyncMock()
cm.get_connection = AsyncMock(return_value=None)
cm.get_game_user_sids = AsyncMock(return_value={})
cm.get_opponent_sid = AsyncMock(return_value=None)
return cm
@pytest.fixture
def mock_sio() -> AsyncMock:
"""Create a mock Socket.IO AsyncServer.
The server is used to emit events and manage rooms.
"""
sio = AsyncMock()
sio.emit = AsyncMock()
sio.enter_room = AsyncMock()
sio.leave_room = AsyncMock()
return sio
@pytest.fixture
def handler(
mock_game_service: AsyncMock,
mock_connection_manager: AsyncMock,
) -> GameNamespaceHandler:
"""Create a GameNamespaceHandler with injected mock dependencies."""
return GameNamespaceHandler(
game_svc=mock_game_service,
conn_manager=mock_connection_manager,
)
@pytest.fixture
def sample_visible_state() -> VisibleGameState:
"""Create a sample visible game state for testing.
This represents what a player would see after joining a game.
"""
from app.core.visibility import VisiblePlayerState, VisibleZone
return VisibleGameState(
game_id="game-123",
viewer_id="player-1",
players={
"player-1": VisiblePlayerState(
player_id="player-1",
is_current_player=True,
deck_count=40,
hand=VisibleZone(count=7, cards=[], zone_type="hand"),
),
"player-2": VisiblePlayerState(
player_id="player-2",
is_current_player=False,
deck_count=40,
hand=VisibleZone(count=7, cards=[], zone_type="hand"),
),
},
current_player_id="player-1",
turn_number=1,
phase=TurnPhase.MAIN,
is_my_turn=True,
)
@pytest.fixture
def sample_game_state() -> GameState:
"""Create a sample game state for testing broadcasts."""
player1 = PlayerState(player_id="player-1")
player2 = PlayerState(player_id="player-2")
return GameState(
game_id="game-123",
players={"player-1": player1, "player-2": player2},
current_player_id="player-1",
turn_number=1,
phase=TurnPhase.MAIN,
)
class TestHandleJoin:
"""Tests for the handle_join event handler."""
@pytest.mark.asyncio
async def test_join_success(
self,
handler: GameNamespaceHandler,
mock_game_service: AsyncMock,
mock_connection_manager: AsyncMock,
mock_sio: AsyncMock,
sample_visible_state: VisibleGameState,
) -> None:
"""Test successful game join returns game state.
When a player joins a game they're a participant in, they should
receive the filtered game state and be added to the game room.
"""
mock_game_service.join_game.return_value = GameJoinResult(
success=True,
game_id="game-123",
player_id="player-1",
visible_state=sample_visible_state,
is_your_turn=True,
game_over=False,
)
result = await handler.handle_join(
mock_sio,
"sid-123",
"player-1",
{"game_id": "game-123", "message_id": "msg-1"},
)
assert result["success"] is True
assert result["game_id"] == "game-123"
assert result["is_your_turn"] is True
assert "state" in result
# Should have joined the connection manager
mock_connection_manager.join_game.assert_called_once_with("sid-123", "game-123")
# Should have entered the Socket.IO room
mock_sio.enter_room.assert_called_once_with("sid-123", "game:game-123", namespace="/game")
@pytest.mark.asyncio
async def test_join_missing_game_id(
self,
handler: GameNamespaceHandler,
mock_sio: AsyncMock,
) -> None:
"""Test that missing game_id returns error.
The game_id is required to join a game.
"""
result = await handler.handle_join(
mock_sio,
"sid-123",
"player-1",
{"message_id": "msg-1"}, # No game_id
)
assert result["success"] is False
assert result["error"]["code"] == WSErrorCode.INVALID_MESSAGE.value
assert "game_id" in result["error"]["message"].lower()
@pytest.mark.asyncio
async def test_join_game_not_found(
self,
handler: GameNamespaceHandler,
mock_game_service: AsyncMock,
mock_sio: AsyncMock,
) -> None:
"""Test joining non-existent game returns error.
When GameService indicates the game wasn't found, we should
return a game_not_found error.
"""
mock_game_service.join_game.return_value = GameJoinResult(
success=False,
game_id="nonexistent",
player_id="player-1",
message="Game not found",
)
result = await handler.handle_join(
mock_sio,
"sid-123",
"player-1",
{"game_id": "nonexistent"},
)
assert result["success"] is False
assert result["error"]["code"] == WSErrorCode.GAME_NOT_FOUND.value
@pytest.mark.asyncio
async def test_join_not_participant(
self,
handler: GameNamespaceHandler,
mock_game_service: AsyncMock,
mock_sio: AsyncMock,
) -> None:
"""Test joining as non-participant returns error.
Only players in the game can join it.
"""
mock_game_service.join_game.return_value = GameJoinResult(
success=False,
game_id="game-123",
player_id="stranger",
message="You are not a participant in this game",
)
result = await handler.handle_join(
mock_sio,
"sid-123",
"stranger",
{"game_id": "game-123"},
)
assert result["success"] is False
assert result["error"]["code"] == WSErrorCode.NOT_IN_GAME.value
@pytest.mark.asyncio
async def test_join_includes_pending_forced_action(
self,
handler: GameNamespaceHandler,
mock_game_service: AsyncMock,
mock_connection_manager: AsyncMock,
mock_sio: AsyncMock,
sample_visible_state: VisibleGameState,
) -> None:
"""Test that pending forced action is included in join result.
When rejoining a game with a pending forced action, the client
needs to know what action is required.
"""
mock_game_service.join_game.return_value = GameJoinResult(
success=True,
game_id="game-123",
player_id="player-1",
visible_state=sample_visible_state,
is_your_turn=True,
pending_forced_action=PendingForcedAction(
player_id="player-1",
action_type="select_active",
reason="Your active Pokemon was knocked out.",
params={"available_bench_ids": ["bench-1"]},
),
)
result = await handler.handle_join(
mock_sio,
"sid-123",
"player-1",
{"game_id": "game-123"},
)
assert result["success"] is True
assert "pending_forced_action" in result
assert result["pending_forced_action"]["action_type"] == "select_active"
@pytest.mark.asyncio
async def test_join_notifies_opponent(
self,
handler: GameNamespaceHandler,
mock_game_service: AsyncMock,
mock_connection_manager: AsyncMock,
mock_sio: AsyncMock,
sample_visible_state: VisibleGameState,
) -> None:
"""Test that joining notifies the opponent.
When a player joins/rejoins, the opponent should be notified
of their connection status.
"""
mock_game_service.join_game.return_value = GameJoinResult(
success=True,
game_id="game-123",
player_id="player-1",
visible_state=sample_visible_state,
is_your_turn=True,
)
mock_connection_manager.get_opponent_sid.return_value = "opponent-sid"
await handler.handle_join(
mock_sio,
"sid-123",
"player-1",
{"game_id": "game-123"},
)
# Should have tried to notify opponent
mock_connection_manager.get_opponent_sid.assert_called()
# Should have emitted opponent status to opponent
emit_calls = [
call for call in mock_sio.emit.call_args_list if call[0][0] == "game:opponent_status"
]
assert len(emit_calls) == 1
class TestHandleAction:
"""Tests for the handle_action event handler."""
@pytest.mark.asyncio
async def test_action_success(
self,
handler: GameNamespaceHandler,
mock_game_service: AsyncMock,
mock_connection_manager: AsyncMock,
mock_sio: AsyncMock,
sample_game_state: GameState,
) -> None:
"""Test successful action execution.
A valid action should be executed and the state should be
broadcast to all participants.
"""
mock_game_service.execute_action.return_value = GameActionResult(
success=True,
game_id="game-123",
action_type="pass",
message="Turn ended",
turn_changed=True,
current_player_id="player-2",
)
mock_game_service.get_game_state.return_value = sample_game_state
mock_connection_manager.get_game_user_sids.return_value = {
"player-1": "sid-1",
"player-2": "sid-2",
}
result = await handler.handle_action(
mock_sio,
"sid-123",
"player-1",
{
"game_id": "game-123",
"action": {"type": "pass"},
"message_id": "msg-1",
},
)
assert result["success"] is True
assert result["action_type"] == "pass"
assert result["turn_changed"] is True
# Should have broadcast state to participants
assert mock_sio.emit.called
@pytest.mark.asyncio
async def test_action_missing_game_id(
self,
handler: GameNamespaceHandler,
mock_sio: AsyncMock,
) -> None:
"""Test that missing game_id returns error."""
result = await handler.handle_action(
mock_sio,
"sid-123",
"player-1",
{"action": {"type": "pass"}},
)
assert result["success"] is False
assert result["error"]["code"] == WSErrorCode.INVALID_MESSAGE.value
@pytest.mark.asyncio
async def test_action_missing_action(
self,
handler: GameNamespaceHandler,
mock_sio: AsyncMock,
) -> None:
"""Test that missing action returns error."""
result = await handler.handle_action(
mock_sio,
"sid-123",
"player-1",
{"game_id": "game-123"},
)
assert result["success"] is False
assert result["error"]["code"] == WSErrorCode.INVALID_MESSAGE.value
@pytest.mark.asyncio
async def test_action_invalid_action_format(
self,
handler: GameNamespaceHandler,
mock_sio: AsyncMock,
) -> None:
"""Test that invalid action format returns error.
The action data must match a valid Action type.
"""
result = await handler.handle_action(
mock_sio,
"sid-123",
"player-1",
{"game_id": "game-123", "action": {"type": "invalid_action_type"}},
)
assert result["success"] is False
assert result["error"]["code"] == WSErrorCode.INVALID_ACTION.value
@pytest.mark.asyncio
async def test_action_game_not_found(
self,
handler: GameNamespaceHandler,
mock_game_service: AsyncMock,
mock_sio: AsyncMock,
) -> None:
"""Test action on non-existent game returns error."""
mock_game_service.execute_action.side_effect = GameNotFoundError("game-123")
result = await handler.handle_action(
mock_sio,
"sid-123",
"player-1",
{"game_id": "game-123", "action": {"type": "pass"}},
)
assert result["success"] is False
assert result["error"]["code"] == WSErrorCode.GAME_NOT_FOUND.value
@pytest.mark.asyncio
async def test_action_not_your_turn(
self,
handler: GameNamespaceHandler,
mock_game_service: AsyncMock,
mock_sio: AsyncMock,
) -> None:
"""Test action when not player's turn returns error."""
mock_game_service.execute_action.side_effect = NotPlayerTurnError(
"game-123", "player-2", "player-1"
)
result = await handler.handle_action(
mock_sio,
"sid-123",
"player-2",
{"game_id": "game-123", "action": {"type": "pass"}},
)
assert result["success"] is False
assert result["error"]["code"] == WSErrorCode.NOT_YOUR_TURN.value
@pytest.mark.asyncio
async def test_action_forced_action_required(
self,
handler: GameNamespaceHandler,
mock_game_service: AsyncMock,
mock_sio: AsyncMock,
) -> None:
"""Test action when forced action required returns error."""
mock_game_service.execute_action.side_effect = ForcedActionRequiredError(
"game-123", "player-1", "select_active", "pass"
)
result = await handler.handle_action(
mock_sio,
"sid-123",
"player-1",
{"game_id": "game-123", "action": {"type": "pass"}},
)
assert result["success"] is False
assert result["error"]["code"] == WSErrorCode.ACTION_NOT_ALLOWED.value
@pytest.mark.asyncio
async def test_action_invalid_action(
self,
handler: GameNamespaceHandler,
mock_game_service: AsyncMock,
mock_sio: AsyncMock,
) -> None:
"""Test invalid action (rejected by engine) returns error."""
mock_game_service.execute_action.side_effect = InvalidActionError(
"game-123", "player-1", "Not enough energy"
)
result = await handler.handle_action(
mock_sio,
"sid-123",
"player-1",
{"game_id": "game-123", "action": {"type": "attack", "attack_index": 0}},
)
assert result["success"] is False
assert result["error"]["code"] == WSErrorCode.INVALID_ACTION.value
assert "Not enough energy" in result["error"]["message"]
@pytest.mark.asyncio
async def test_action_game_over(
self,
handler: GameNamespaceHandler,
mock_game_service: AsyncMock,
mock_connection_manager: AsyncMock,
mock_sio: AsyncMock,
sample_game_state: GameState,
) -> None:
"""Test action that ends game broadcasts game over.
When an action results in game over, the result should include
game_over=True and the game should be ended.
"""
mock_game_service.execute_action.return_value = GameActionResult(
success=True,
game_id="game-123",
action_type="attack",
game_over=True,
winner_id="player-1",
end_reason=GameEndReason.PRIZES_TAKEN,
)
mock_game_service.get_game_state.return_value = sample_game_state
mock_game_service.end_game.return_value = GameEndResult(
success=True,
game_id="game-123",
winner_id="player-1",
)
mock_connection_manager.get_game_user_sids.return_value = {}
result = await handler.handle_action(
mock_sio,
"sid-123",
"player-1",
{"game_id": "game-123", "action": {"type": "attack", "attack_index": 0}},
)
assert result["success"] is True
assert result["game_over"] is True
assert result["winner_id"] == "player-1"
# Should have called end_game
mock_game_service.end_game.assert_called_once()
@pytest.mark.asyncio
async def test_action_includes_pending_forced_action(
self,
handler: GameNamespaceHandler,
mock_game_service: AsyncMock,
mock_connection_manager: AsyncMock,
mock_sio: AsyncMock,
sample_game_state: GameState,
) -> None:
"""Test that action result includes pending forced action.
When an action creates a forced action (e.g., knockout triggers
select_active), the result should include it.
"""
mock_game_service.execute_action.return_value = GameActionResult(
success=True,
game_id="game-123",
action_type="attack",
pending_forced_action=PendingForcedAction(
player_id="player-2",
action_type="select_active",
reason="Your active was knocked out.",
),
)
mock_game_service.get_game_state.return_value = sample_game_state
mock_connection_manager.get_game_user_sids.return_value = {}
result = await handler.handle_action(
mock_sio,
"sid-123",
"player-1",
{"game_id": "game-123", "action": {"type": "attack", "attack_index": 0}},
)
assert result["success"] is True
assert "pending_forced_action" in result
assert result["pending_forced_action"]["action_type"] == "select_active"
class TestHandleResign:
"""Tests for the handle_resign event handler."""
@pytest.mark.asyncio
async def test_resign_success(
self,
handler: GameNamespaceHandler,
mock_game_service: AsyncMock,
mock_connection_manager: AsyncMock,
mock_sio: AsyncMock,
sample_game_state: GameState,
) -> None:
"""Test successful resignation.
Resignation should end the game with the opponent as winner.
"""
mock_game_service.resign_game.return_value = GameActionResult(
success=True,
game_id="game-123",
action_type="resign",
game_over=True,
winner_id="player-2",
end_reason=GameEndReason.RESIGNATION,
)
mock_game_service.get_game_state.return_value = sample_game_state
mock_game_service.end_game.return_value = GameEndResult(
success=True,
game_id="game-123",
winner_id="player-2",
)
mock_connection_manager.get_game_user_sids.return_value = {}
result = await handler.handle_resign(
mock_sio,
"sid-123",
"player-1",
{"game_id": "game-123"},
)
assert result["success"] is True
assert result["game_over"] is True
assert result["winner_id"] == "player-2"
# Should have called end_game
mock_game_service.end_game.assert_called_once()
@pytest.mark.asyncio
async def test_resign_missing_game_id(
self,
handler: GameNamespaceHandler,
mock_sio: AsyncMock,
) -> None:
"""Test resignation with missing game_id returns error."""
result = await handler.handle_resign(
mock_sio,
"sid-123",
"player-1",
{},
)
assert result["success"] is False
assert result["error"]["code"] == WSErrorCode.INVALID_MESSAGE.value
@pytest.mark.asyncio
async def test_resign_game_not_found(
self,
handler: GameNamespaceHandler,
mock_game_service: AsyncMock,
mock_sio: AsyncMock,
) -> None:
"""Test resignation from non-existent game returns error."""
mock_game_service.resign_game.side_effect = GameNotFoundError("game-123")
result = await handler.handle_resign(
mock_sio,
"sid-123",
"player-1",
{"game_id": "game-123"},
)
assert result["success"] is False
assert result["error"]["code"] == WSErrorCode.GAME_NOT_FOUND.value
@pytest.mark.asyncio
async def test_resign_game_already_ended(
self,
handler: GameNamespaceHandler,
mock_game_service: AsyncMock,
mock_sio: AsyncMock,
) -> None:
"""Test resignation from ended game returns error."""
mock_game_service.resign_game.side_effect = GameAlreadyEndedError("game-123")
result = await handler.handle_resign(
mock_sio,
"sid-123",
"player-1",
{"game_id": "game-123"},
)
assert result["success"] is False
assert result["error"]["code"] == WSErrorCode.GAME_ENDED.value
class TestHandleDisconnect:
"""Tests for the handle_disconnect event handler."""
@pytest.mark.asyncio
async def test_disconnect_notifies_opponent(
self,
handler: GameNamespaceHandler,
mock_connection_manager: AsyncMock,
mock_sio: AsyncMock,
) -> None:
"""Test that disconnect notifies opponent.
When a player disconnects from a game, the opponent should
be notified of the disconnection.
"""
from datetime import UTC, datetime
mock_connection_manager.get_connection.return_value = ConnectionInfo(
sid="sid-123",
user_id="player-1",
game_id="game-123",
connected_at=datetime.now(UTC),
last_seen=datetime.now(UTC),
)
mock_connection_manager.get_opponent_sid.return_value = "opponent-sid"
await handler.handle_disconnect(mock_sio, "sid-123", "player-1")
# Should have looked up connection to find game
mock_connection_manager.get_connection.assert_called_once_with("sid-123")
# Should have emitted opponent status to opponent
emit_calls = [
call for call in mock_sio.emit.call_args_list if call[0][0] == "game:opponent_status"
]
assert len(emit_calls) == 1
assert emit_calls[0][1]["to"] == "opponent-sid"
@pytest.mark.asyncio
async def test_disconnect_no_game(
self,
handler: GameNamespaceHandler,
mock_connection_manager: AsyncMock,
mock_sio: AsyncMock,
) -> None:
"""Test disconnect when not in a game does nothing.
If the player wasn't in a game, there's no opponent to notify.
"""
mock_connection_manager.get_connection.return_value = None
await handler.handle_disconnect(mock_sio, "sid-123", "player-1")
# Should not emit anything
mock_sio.emit.assert_not_called()
@pytest.mark.asyncio
async def test_disconnect_no_game_id_in_connection(
self,
handler: GameNamespaceHandler,
mock_connection_manager: AsyncMock,
mock_sio: AsyncMock,
) -> None:
"""Test disconnect when connection exists but not in game.
The connection may exist but game_id may be None if they
haven't joined a game yet.
"""
from datetime import UTC, datetime
mock_connection_manager.get_connection.return_value = ConnectionInfo(
sid="sid-123",
user_id="player-1",
game_id=None, # Not in a game
connected_at=datetime.now(UTC),
last_seen=datetime.now(UTC),
)
await handler.handle_disconnect(mock_sio, "sid-123", "player-1")
# Should not emit anything
mock_sio.emit.assert_not_called()
class TestBroadcastGameState:
"""Tests for the _broadcast_game_state helper method."""
@pytest.mark.asyncio
async def test_broadcast_sends_to_each_player(
self,
handler: GameNamespaceHandler,
mock_game_service: AsyncMock,
mock_connection_manager: AsyncMock,
mock_sio: AsyncMock,
sample_game_state: GameState,
) -> None:
"""Test that broadcast sends filtered state to each player.
Each player should receive their own visibility-filtered
view of the game state.
"""
mock_game_service.get_game_state.return_value = sample_game_state
mock_connection_manager.get_game_user_sids.return_value = {
"player-1": "sid-1",
"player-2": "sid-2",
}
await handler._broadcast_game_state(mock_sio, "game-123")
# Should have emitted to both players
assert mock_sio.emit.call_count == 2
# Verify each emission was to a different player
emit_calls = mock_sio.emit.call_args_list
to_sids = {call[1]["to"] for call in emit_calls}
assert to_sids == {"sid-1", "sid-2"}
# All emissions should be game:state events
for call in emit_calls:
assert call[0][0] == "game:state"
@pytest.mark.asyncio
async def test_broadcast_handles_game_not_found(
self,
handler: GameNamespaceHandler,
mock_game_service: AsyncMock,
mock_sio: AsyncMock,
) -> None:
"""Test that broadcast handles missing game gracefully.
If the game can't be found (e.g., already archived), the
broadcast should log a warning but not raise an error.
"""
mock_game_service.get_game_state.side_effect = GameNotFoundError("game-123")
# Should not raise
await handler._broadcast_game_state(mock_sio, "game-123")
# Should not have emitted anything
mock_sio.emit.assert_not_called()
class TestNotifyOpponentStatus:
"""Tests for the _notify_opponent_status helper method."""
@pytest.mark.asyncio
async def test_notify_sends_to_opponent(
self,
handler: GameNamespaceHandler,
mock_connection_manager: AsyncMock,
mock_sio: AsyncMock,
) -> None:
"""Test that notify sends status to opponent.
The opponent's connection should receive a status update
about the player's connection state.
"""
mock_connection_manager.get_opponent_sid.return_value = "opponent-sid"
await handler._notify_opponent_status(
mock_sio,
"sender-sid",
"game-123",
"player-1",
ConnectionStatus.CONNECTED,
)
mock_sio.emit.assert_called_once()
call = mock_sio.emit.call_args
assert call[0][0] == "game:opponent_status"
assert call[1]["to"] == "opponent-sid"
message = call[0][1]
assert message["opponent_id"] == "player-1"
assert message["status"] == ConnectionStatus.CONNECTED.value
@pytest.mark.asyncio
async def test_notify_no_opponent_connected(
self,
handler: GameNamespaceHandler,
mock_connection_manager: AsyncMock,
mock_sio: AsyncMock,
) -> None:
"""Test that notify does nothing when opponent not connected.
If the opponent isn't connected, we can't notify them.
"""
mock_connection_manager.get_opponent_sid.return_value = None
await handler._notify_opponent_status(
mock_sio,
"sender-sid",
"game-123",
"player-1",
ConnectionStatus.CONNECTED,
)
mock_sio.emit.assert_not_called()
class TestErrorResponse:
"""Tests for the _error_response helper method."""
def test_error_response_format(
self,
handler: GameNamespaceHandler,
) -> None:
"""Test that error response has correct format.
Error responses should include success=False, error code,
message, and the original request's message_id.
"""
result = handler._error_response(
WSErrorCode.GAME_NOT_FOUND,
"Game not found",
"msg-123",
)
assert result["success"] is False
assert result["error"]["code"] == WSErrorCode.GAME_NOT_FOUND.value
assert result["error"]["message"] == "Game not found"
assert result["request_message_id"] == "msg-123"