## Refactoring - Changed `ManualOutcomeSubmission.outcome` from `str` to `PlayOutcome` enum type - Removed custom validator (Pydantic handles enum validation automatically) - Added direct import of PlayOutcome (no circular dependency due to TYPE_CHECKING guard) - Updated tests to use enum values while maintaining backward compatibility Benefits: - Better type safety with IDE autocomplete - Cleaner code (removed 15 lines of validator boilerplate) - Backward compatible (Pydantic auto-converts strings to enum) - Access to helper methods (is_hit(), is_out(), etc.) Files modified: - app/models/game_models.py: Enum type + import - tests/unit/config/test_result_charts.py: Updated 7 tests + added compatibility test ## Documentation Created comprehensive CLAUDE.md files for all backend/app/ subdirectories to help future AI agents quickly understand and work with the code. Added 8,799 lines of documentation covering: - api/ (906 lines): FastAPI routes, health checks, auth patterns - config/ (906 lines): League configs, PlayOutcome enum, result charts - core/ (1,288 lines): GameEngine, StateManager, PlayResolver, dice system - data/ (937 lines): API clients (planned), caching layer - database/ (945 lines): Async sessions, operations, recovery - models/ (1,270 lines): Pydantic/SQLAlchemy models, polymorphic patterns - utils/ (959 lines): Logging, JWT auth, security - websocket/ (1,588 lines): Socket.io handlers, real-time events - tests/ (475 lines): Testing patterns and structure Each CLAUDE.md includes: - Purpose & architecture overview - Key components with detailed explanations - Patterns & conventions - Integration points - Common tasks (step-by-step guides) - Troubleshooting with solutions - Working code examples - Testing guidance Total changes: +9,294 lines / -24 lines Tests: All passing (62/62 model tests, 7/7 ManualOutcomeSubmission tests)
41 KiB
WebSocket Module - Real-Time Game Communication
Purpose
Real-time bidirectional communication layer for Paper Dynasty game engine using Socket.io. Handles connection lifecycle, room management, game event broadcasting, and player action processing.
Critical Role: This is the primary interface between players and the game engine. All game actions flow through WebSocket events, ensuring real-time updates for all participants.
Architecture Overview
Client (Browser)
↓ Socket.io
ConnectionManager
↓
Event Handlers
↓
Game Engine → StateManager → Database
↓
Broadcast to All Players
Key Characteristics:
- Async-first: All handlers use async/await
- Room-based: Games are isolated rooms (game_id as room name)
- JWT Authentication: All connections require valid token
- Event-driven: Actions trigger events, results broadcast to rooms
- Error isolation: Exceptions caught per-event, emit error to client
Structure
Module Files
app/websocket/
├── __init__.py # Package marker (minimal/empty)
├── connection_manager.py # Connection lifecycle & broadcasting
└── handlers.py # Event handler registration
Dependencies
Internal:
app.core.state_manager- In-memory game stateapp.core.game_engine- Play resolution logicapp.core.dice- Dice rolling systemapp.core.validators- Rule validationapp.models.game_models- Pydantic game state modelsapp.utils.auth- JWT token verificationapp.config.result_charts- PlayOutcome enum
External:
socketio.AsyncServer- Socket.io server implementationpydantic- Data validation
Key Components
1. ConnectionManager (connection_manager.py)
Purpose: Manages WebSocket connection lifecycle, room membership, and message broadcasting.
State Tracking:
self.sio: socketio.AsyncServer # Socket.io server instance
self.user_sessions: Dict[str, str] # sid → user_id mapping
self.game_rooms: Dict[str, Set[str]] # game_id → set of sids
Core Methods:
async connect(sid: str, user_id: str) -> None
Register a new connection after authentication.
await manager.connect(sid, user_id)
# Logs: "User {user_id} connected with session {sid}"
async disconnect(sid: str) -> None
Handle disconnection - cleanup sessions and notify game rooms.
await manager.disconnect(sid)
# Automatically:
# - Removes user from user_sessions
# - Removes from all game_rooms
# - Broadcasts "user_disconnected" to affected games
async join_game(sid: str, game_id: str, role: str) -> None
Add user to game room and broadcast join event.
await manager.join_game(sid, game_id, role="player")
# - Calls sio.enter_room(sid, game_id)
# - Tracks in game_rooms dict
# - Broadcasts "user_connected" to room
async leave_game(sid: str, game_id: str) -> None
Remove user from game room.
await manager.leave_game(sid, game_id)
# - Calls sio.leave_room(sid, game_id)
# - Updates game_rooms tracking
async broadcast_to_game(game_id: str, event: str, data: dict) -> None
Send event to all users in game room.
await manager.broadcast_to_game(
game_id="123e4567-e89b-12d3-a456-426614174000",
event="play_resolved",
data={"description": "Single to CF", "runs_scored": 1}
)
# All players in game receive event
async emit_to_user(sid: str, event: str, data: dict) -> None
Send event to specific user.
await manager.emit_to_user(
sid="abc123",
event="error",
data={"message": "Invalid action"}
)
# Only that user receives event
get_game_participants(game_id: str) -> Set[str]
Get all session IDs currently in game room.
sids = manager.get_game_participants(game_id)
print(f"{len(sids)} players connected")
2. Event Handlers (handlers.py)
Purpose: Register and process all client-initiated events. Validates inputs, coordinates with game engine, emits responses.
Registration Pattern:
def register_handlers(sio: AsyncServer, manager: ConnectionManager) -> None:
"""Register all WebSocket event handlers"""
@sio.event
async def event_name(sid, data):
# Handler implementation
Handler Design Pattern:
@sio.event
async def some_event(sid, data):
"""
Event description.
Event data:
field1: type - description
field2: type - description
Emits:
success_event: To requester/room on success
error: To requester on failure
"""
try:
# 1. Extract and validate inputs
game_id = UUID(data.get("game_id"))
field = data.get("field")
# 2. Get game state
state = state_manager.get_state(game_id)
if not state:
await manager.emit_to_user(sid, "error", {"message": "Game not found"})
return
# 3. Validate authorization (TODO: implement)
# user_id = manager.user_sessions.get(sid)
# 4. Process action
result = await game_engine.do_something(game_id, field)
# 5. Emit success
await manager.emit_to_user(sid, "success_event", result)
# 6. Broadcast to game room if needed
await manager.broadcast_to_game(game_id, "state_update", data)
except ValidationError as e:
# Pydantic validation error - user-friendly message
await manager.emit_to_user(sid, "error_event", {"message": str(e)})
except Exception as e:
# Unexpected error - log and return generic message
logger.error(f"Event error: {e}", exc_info=True)
await manager.emit_to_user(sid, "error", {"message": str(e)})
Core Event Handlers
connect(sid, environ, auth) -> bool
Purpose: Authenticate new WebSocket connections using JWT.
Flow:
- Extract JWT token from
authdict - Verify token using
verify_token() - Extract
user_idfrom token payload - Register connection with ConnectionManager
- Emit "connected" event to user
Returns: True to accept, False to reject connection
Security: First line of defense - all connections must have valid JWT.
# Client connection attempt
socket.connect({
auth: {
token: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
}
})
# On success, receives:
{"user_id": "12345"}
disconnect(sid)
Purpose: Clean up when user disconnects (intentional or network failure).
Flow:
- Remove from
user_sessions - Remove from all
game_rooms - Broadcast "user_disconnected" to affected games
Automatic: Called by Socket.io on connection loss.
join_game(sid, data)
Purpose: Add user to game room for real-time updates.
Event Data:
{
"game_id": "123e4567-e89b-12d3-a456-426614174000",
"role": "player" # or "spectator"
}
Emits:
game_joined→ To requester with confirmationuser_connected→ Broadcast to game room
TODO: Verify user has access to game (authorization check)
leave_game(sid, data)
Purpose: Remove user from game room.
Event Data:
{
"game_id": "123e4567-e89b-12d3-a456-426614174000"
}
Use Case: User navigates away, switches games, or voluntarily leaves.
heartbeat(sid)
Purpose: Keep-alive mechanism to detect stale connections.
Flow:
- Client sends periodic "heartbeat" events
- Server immediately responds with "heartbeat_ack"
Usage: Client can detect server unresponsiveness if ack not received.
roll_dice(sid, data)
Purpose: Roll dice for manual outcome selection (core gameplay event).
Event Data:
{
"game_id": "123e4567-e89b-12d3-a456-426614174000"
}
Flow:
- Validate game_id (UUID format, game exists)
- Verify user is participant (TODO: implement authorization)
- Roll dice using
dice_system.roll_ab() - Store roll in
state.pending_manual_roll(one-time use) - Broadcast dice results to all players in game
Emits:
dice_rolled→ Broadcast to game room with roll resultserror→ To requester if validation fails
Dice Roll Structure:
{
"game_id": "123e4567-...",
"roll_id": "unique-roll-identifier",
"d6_one": 4, # First d6 (card selection)
"d6_two_total": 7, # Sum of 2d6 (row selection)
"chaos_d20": 14, # d20 for split results
"resolution_d20": 8, # d20 for secondary checks
"check_wild_pitch": False,
"check_passed_ball": False,
"timestamp": "2025-10-31T12:34:56Z",
"message": "Dice rolled - read your card and submit outcome"
}
Players' Workflow:
- Receive
dice_rolledevent - d6_one determines column (1-3: batter card, 4-6: pitcher card)
- d6_two_total determines row on card (2-12)
- Read physical card result at that position
- Submit outcome using
submit_manual_outcome
Security: Roll stored in pending_manual_roll to prevent replay attacks. Cleared after single use.
submit_manual_outcome(sid, data)
Purpose: Submit manually-selected play outcome after reading physical card.
Event Data:
{
"game_id": "123e4567-e89b-12d3-a456-426614174000",
"outcome": "single", # PlayOutcome enum value
"hit_location": "CF" # Optional: required for hits
}
Flow:
- Validate game_id (UUID format, game exists)
- Verify user is authorized (TODO: implement - active batter or game admin)
- Extract outcome and hit_location
- Validate using
ManualOutcomeSubmissionPydantic model - Convert outcome string to
PlayOutcomeenum - Check if outcome requires hit_location (groundball, flyball, line drive)
- Verify
pending_manual_rollexists (must callroll_dicefirst) - Emit
outcome_acceptedto requester (immediate feedback) - Process play through
game_engine.resolve_manual_play() - Clear
pending_manual_roll(one-time use) - Broadcast
play_resolvedto game room with full result
Emits:
outcome_accepted→ To requester (immediate confirmation)play_resolved→ Broadcast to game room (full play result)outcome_rejected→ To requester if validation failserror→ To requester if processing fails
Validation Errors:
# Missing game_id
{"message": "Missing game_id", "field": "game_id"}
# Invalid UUID format
{"message": "Invalid game_id format", "field": "game_id"}
# Game not found
{"message": "Game {game_id} not found"}
# Missing outcome
{"message": "Missing outcome", "field": "outcome"}
# Invalid outcome value
{"message": "Invalid outcome", "field": "outcome", "errors": [...]}
# Missing required hit_location
{"message": "Outcome groundball_c requires hit_location", "field": "hit_location"}
# No pending roll
{"message": "No pending dice roll - call roll_dice first", "field": "game_state"}
Play Result Structure:
{
"game_id": "123e4567-...",
"play_number": 15,
"outcome": "single",
"hit_location": "CF",
"description": "Single to center field",
"outs_recorded": 0,
"runs_scored": 1,
"batter_result": "1B",
"runners_advanced": [
{"from": 2, "to": 4}, # Runner scored from 2nd
{"from": 0, "to": 1} # Batter to 1st
],
"is_hit": true,
"is_out": false,
"is_walk": false,
"roll_id": "unique-roll-identifier"
}
Error Handling:
ValidationError(Pydantic): User-friendly field-level errors →outcome_rejectedGameValidationError: Business rule violations →outcome_rejectedException: Unexpected errors → logged with stack trace,erroremitted
Security:
- Validates
pending_manual_rollexists (prevents fabricated submissions) - One-time use: roll cleared after processing
- TODO: Verify user authorization (active batter or game admin)
Patterns & Conventions
1. Error Handling
Three-tier error handling:
try:
# Main logic
result = await process_action()
except ValidationError as e:
# Pydantic validation - user-friendly error
first_error = e.errors()[0]
field = first_error['loc'][0] if first_error['loc'] else 'unknown'
message = first_error['msg']
await manager.emit_to_user(
sid,
"outcome_rejected", # or "error"
{"message": message, "field": field, "errors": e.errors()}
)
logger.warning(f"Validation failed: {message}")
return # Don't continue
except GameValidationError as e:
# Business rule violation
await manager.emit_to_user(
sid,
"outcome_rejected",
{"message": str(e), "field": "validation"}
)
logger.warning(f"Game validation failed: {e}")
return
except Exception as e:
# Unexpected error - log full stack trace
logger.error(f"Unexpected error: {e}", exc_info=True)
await manager.emit_to_user(
sid,
"error",
{"message": f"Failed to process action: {str(e)}"}
)
return
Error Event Types:
error- Generic error (connection, processing failures)outcome_rejected- Play-specific validation failure (user-friendly)
2. Logging
All logs use structured format with module name:
import logging
logger = logging.getLogger(f'{__name__}.ConnectionManager')
logger = logging.getLogger(f'{__name__}.handlers')
# Log levels
logger.info(f"User {user_id} connected") # Normal operations
logger.warning(f"Validation failed: {message}") # Expected errors
logger.error(f"Error: {e}", exc_info=True) # Unexpected errors
logger.debug(f"Broadcast {event} to game") # Verbose details
3. UUID Validation
All game_id values must be validated as UUIDs:
from uuid import UUID
try:
game_id = UUID(data.get("game_id"))
except (ValueError, AttributeError):
await manager.emit_to_user(
sid,
"error",
{"message": "Invalid game_id format"}
)
return
4. State Validation
Always verify game state exists:
state = state_manager.get_state(game_id)
if not state:
await manager.emit_to_user(
sid,
"error",
{"message": f"Game {game_id} not found"}
)
return
5. Authorization Pattern (TODO)
Framework for future authorization checks:
# Get user_id from session
user_id = manager.user_sessions.get(sid)
# Verify user has access (not yet implemented)
# if not is_game_participant(game_id, user_id):
# await manager.emit_to_user(sid, "error", {"message": "Not authorized"})
# return
# Verify user can perform action (not yet implemented)
# if not is_active_batter(game_id, user_id):
# await manager.emit_to_user(sid, "error", {"message": "Not your turn"})
# return
6. Pydantic Validation
Use Pydantic models for input validation:
from app.models.game_models import ManualOutcomeSubmission
try:
submission = ManualOutcomeSubmission(
outcome=data.get("outcome"),
hit_location=data.get("hit_location")
)
except ValidationError as e:
# Extract user-friendly error
first_error = e.errors()[0]
field = first_error['loc'][0]
message = first_error['msg']
await manager.emit_to_user(sid, "outcome_rejected", {
"message": message,
"field": field,
"errors": e.errors()
})
return
7. Event Response Flow
Request → Validate → Process → Respond → Broadcast
@sio.event
async def some_action(sid, data):
# 1. VALIDATE inputs
game_id = validate_game_id(data)
state = get_and_verify_state(game_id)
# 2. PROCESS action
result = await game_engine.process(game_id, data)
# 3. RESPOND to requester
await manager.emit_to_user(sid, "action_accepted", {"status": "success"})
# 4. BROADCAST to game room
await manager.broadcast_to_game(game_id, "state_update", result)
8. Async Best Practices
- All handlers are
async def - Use
awaitfor I/O operations (database, game engine) - Non-blocking: multiple events can be processed concurrently
- Game engine operations are async for database writes
Integration Points
With Game Engine
Event handlers coordinate with game engine for play resolution:
from app.core.game_engine import game_engine
# Roll dice
ab_roll = dice_system.roll_ab(league_id=state.league_id, game_id=game_id)
# Store in state (pending)
state.pending_manual_roll = ab_roll
state_manager.update_state(game_id, state)
# Resolve manual play
result = await game_engine.resolve_manual_play(
game_id=game_id,
ab_roll=ab_roll,
outcome=PlayOutcome.SINGLE,
hit_location="CF"
)
With State Manager
Real-time state access and updates:
from app.core.state_manager import state_manager
# Get current state (O(1) lookup)
state = state_manager.get_state(game_id)
# Update state (in-memory)
state.pending_manual_roll = ab_roll
state_manager.update_state(game_id, state)
# Clear pending roll after use
state.pending_manual_roll = None
state_manager.update_state(game_id, state)
With Database
Async database writes happen in game engine (non-blocking):
from app.core.game_engine import game_engine
# Game engine handles async DB operations
result = await game_engine.resolve_manual_play(...)
# - Updates in-memory state (immediate)
# - Writes to database (async, non-blocking)
# - Returns result for broadcasting
With Clients
Client-side Socket.io integration:
// Connect with JWT
const socket = io('ws://localhost:8000', {
auth: {
token: localStorage.getItem('jwt')
}
});
// Connection confirmed
socket.on('connected', (data) => {
console.log('Connected as user', data.user_id);
});
// Join game room
socket.emit('join_game', {
game_id: '123e4567-e89b-12d3-a456-426614174000',
role: 'player'
});
// Roll dice
socket.emit('roll_dice', {
game_id: '123e4567-e89b-12d3-a456-426614174000'
});
// Receive dice results
socket.on('dice_rolled', (data) => {
console.log('Dice:', data.d6_one, data.d6_two_total);
// Show UI for outcome selection
});
// Submit outcome
socket.emit('submit_manual_outcome', {
game_id: '123e4567-e89b-12d3-a456-426614174000',
outcome: 'single',
hit_location: 'CF'
});
// Receive play result
socket.on('play_resolved', (data) => {
console.log('Play:', data.description);
console.log('Runs scored:', data.runs_scored);
// Update game UI
});
// Handle errors
socket.on('error', (data) => {
console.error('Error:', data.message);
});
socket.on('outcome_rejected', (data) => {
console.error('Rejected:', data.message, 'Field:', data.field);
});
Common Tasks
Adding a New Event Handler
- Define handler function in
handlers.py:
@sio.event
async def new_event(sid, data):
"""
Description of what this event does.
Event data:
field1: type - description
field2: type - description
Emits:
success_event: To requester/room on success
error: To requester on failure
"""
try:
# 1. Extract and validate inputs
game_id = UUID(data.get("game_id"))
# 2. Get game state
state = state_manager.get_state(game_id)
if not state:
await manager.emit_to_user(sid, "error", {"message": "Game not found"})
return
# 3. Process action
result = await game_engine.some_action(game_id, data)
# 4. Emit success
await manager.emit_to_user(sid, "success_event", result)
# 5. Broadcast to game room
await manager.broadcast_to_game(game_id, "state_update", result)
except Exception as e:
logger.error(f"New event error: {e}", exc_info=True)
await manager.emit_to_user(sid, "error", {"message": str(e)})
-
Register automatically:
@sio.eventdecorator auto-registers -
Add client-side handler:
socket.emit('new_event', { game_id: '...', field1: 'value' });
socket.on('success_event', (data) => { /* handle */ });
Modifying Broadcast Logic
To broadcast to specific users:
# Get participants
sids = manager.get_game_participants(game_id)
# Emit to each with custom logic
for sid in sids:
user_id = manager.user_sessions.get(sid)
# Custom data per user
custom_data = build_user_specific_data(user_id)
await manager.emit_to_user(sid, "custom_event", custom_data)
To broadcast to teams separately:
# Get participants
sids = manager.get_game_participants(game_id)
for sid in sids:
user_id = manager.user_sessions.get(sid)
# Determine team
if is_home_team(user_id, game_id):
await manager.emit_to_user(sid, "home_event", data)
else:
await manager.emit_to_user(sid, "away_event", data)
Adding Authorization Checks
TODO: Implement authorization service and add checks:
from app.utils.auth import verify_game_access, verify_active_player
@sio.event
async def protected_event(sid, data):
game_id = UUID(data.get("game_id"))
user_id = manager.user_sessions.get(sid)
# Verify user has access to game
if not verify_game_access(user_id, game_id):
await manager.emit_to_user(sid, "error", {"message": "Not authorized"})
return
# Verify user is active player
if not verify_active_player(user_id, game_id):
await manager.emit_to_user(sid, "error", {"message": "Not your turn"})
return
# Process action
...
Testing Event Handlers
Unit tests (using pytest-asyncio):
import pytest
from unittest.mock import AsyncMock, MagicMock
from app.websocket.handlers import register_handlers
@pytest.mark.asyncio
async def test_roll_dice():
# Mock Socket.io server
sio = MagicMock()
manager = MagicMock()
# Register handlers
register_handlers(sio, manager)
# Get the roll_dice handler
roll_dice_handler = sio.event.call_args_list[2][0][0] # 3rd registered event
# Mock data
sid = "test-sid"
data = {"game_id": "123e4567-e89b-12d3-a456-426614174000"}
# Call handler
await roll_dice_handler(sid, data)
# Verify broadcast
manager.broadcast_to_game.assert_called_once()
Integration tests (with test database):
import pytest
from socketio import AsyncClient
from app.main import app
@pytest.mark.asyncio
async def test_roll_dice_integration():
# Create test client
client = AsyncClient()
await client.connect('http://localhost:8000', auth={'token': test_jwt})
# Join game
await client.emit('join_game', {'game_id': test_game_id})
# Roll dice
await client.emit('roll_dice', {'game_id': test_game_id})
# Wait for response
result = await client.receive()
assert result[0] == 'dice_rolled'
assert 'roll_id' in result[1]
await client.disconnect()
Troubleshooting
Connection Issues
Problem: Client can't connect to WebSocket
Checklist:
- Verify JWT token is valid and not expired
- Check CORS settings in
app/config.py - Ensure Socket.io versions match (client and server)
- Check server logs for connection rejection reasons
- Verify network/firewall allows WebSocket connections
- Test with curl:
curl -H "Authorization: Bearer TOKEN" http://localhost:8000/socket.io/
Debug logs:
# Enable Socket.io debug logging
import socketio
sio = socketio.AsyncServer(logger=True, engineio_logger=True)
Events Not Received
Problem: Client emits event but no response
Checklist:
- Verify event name matches exactly (case-sensitive)
- Check server logs for handler errors
- Ensure game_id exists and is valid UUID
- Verify user is in game room (call
join_gamefirst) - Check data format matches expected structure
Debug:
// Client-side logging
socket.onAny((event, data) => {
console.log('Received:', event, data);
});
socket.on('error', (data) => {
console.error('Error:', data);
});
Game State Desynchronization
Problem: Client UI doesn't match server state
Common Causes:
- Client missed broadcast due to disconnection
- Event handler error prevented broadcast
- Client state update logic has bug
Solutions:
- Add state synchronization event:
@sio.event
async def request_game_state(sid, data):
"""Client requests full game state (recovery after disconnect)"""
game_id = UUID(data.get("game_id"))
state = state_manager.get_state(game_id)
if state:
await manager.emit_to_user(sid, "game_state", state.model_dump())
else:
await manager.emit_to_user(sid, "error", {"message": "Game not found"})
- Implement reconnection logic:
socket.on('reconnect', () => {
console.log('Reconnected - requesting state');
socket.emit('request_game_state', { game_id: currentGameId });
});
Broadcast Not Reaching All Players
Problem: Some users don't receive broadcasts
Checklist:
- Verify all users called
join_game - Check
game_roomsdict in ConnectionManager - Ensure room name matches game_id exactly
- Verify users haven't silently disconnected
Debug:
# Add logging to broadcasts
async def broadcast_to_game(self, game_id: str, event: str, data: dict) -> None:
participants = self.get_game_participants(game_id)
logger.info(f"Broadcasting {event} to {len(participants)} participants")
await self.sio.emit(event, data, room=game_id)
# Verify delivery
for sid in participants:
user_id = self.user_sessions.get(sid)
logger.debug(f"Sent to user {user_id} (sid={sid})")
Pending Roll Not Found
Problem: submit_manual_outcome fails with "No pending dice roll"
Common Causes:
- User didn't call
roll_dicefirst - Roll expired due to timeout
- Another user already submitted outcome
- Server restarted (in-memory state lost)
Solutions:
- Enforce UI workflow: disable submit button until
dice_rolledreceived - Add roll expiration check (optional):
# In roll_dice handler
state.pending_manual_roll = ab_roll
state.pending_manual_roll_expires_at = pendulum.now('UTC').add(minutes=5)
# In submit_manual_outcome handler
if state.pending_manual_roll_expires_at < pendulum.now('UTC'):
state.pending_manual_roll = None
await manager.emit_to_user(sid, "outcome_rejected", {
"message": "Roll expired - please roll again",
"field": "game_state"
})
return
- Implement roll persistence in database for recovery
Authorization Not Enforced
Problem: Users can perform actions they shouldn't be able to
Current Status: Authorization checks are stubbed out with TODO comments
Implementation Plan:
- Create authorization service:
# app/utils/authorization.py
async def is_game_participant(game_id: UUID, user_id: str) -> bool:
"""Check if user is a participant in this game"""
# Query database for game participants
pass
async def is_active_batter(game_id: UUID, user_id: str) -> bool:
"""Check if user is the active batter"""
state = state_manager.get_state(game_id)
# Check current batter lineup ID against user's lineup assignments
pass
async def is_game_admin(game_id: UUID, user_id: str) -> bool:
"""Check if user is game creator or admin"""
pass
- Add checks to handlers:
from app.utils.authorization import is_game_participant, is_active_batter
@sio.event
async def submit_manual_outcome(sid, data):
game_id = UUID(data.get("game_id"))
user_id = manager.user_sessions.get(sid)
# Verify participation
if not await is_game_participant(game_id, user_id):
await manager.emit_to_user(sid, "error", {"message": "Not authorized"})
return
# Verify active player
if not await is_active_batter(game_id, user_id):
await manager.emit_to_user(sid, "error", {"message": "Not your turn"})
return
# Process action
...
Memory Leaks in ConnectionManager
Problem: user_sessions or game_rooms grows indefinitely
Prevention:
disconnect()handler automatically cleans up sessions- Monitor dict sizes:
@sio.event
async def heartbeat(sid):
await sio.emit("heartbeat_ack", {}, room=sid)
# Periodic cleanup (every 100 heartbeats)
if random.randint(1, 100) == 1:
logger.info(f"Active sessions: {len(manager.user_sessions)}")
logger.info(f"Active games: {len(manager.game_rooms)}")
- Add periodic cleanup task:
async def cleanup_stale_sessions():
"""Remove sessions that haven't sent heartbeat in 5 minutes"""
while True:
await asyncio.sleep(300) # 5 minutes
stale_sids = []
for sid, user_id in manager.user_sessions.items():
# Check last heartbeat timestamp
if is_stale(sid):
stale_sids.append(sid)
for sid in stale_sids:
await manager.disconnect(sid)
if stale_sids:
logger.info(f"Cleaned up {len(stale_sids)} stale sessions")
Examples
Example 1: Complete Dice Roll → Outcome Flow
Server-side handlers:
# handlers.py
@sio.event
async def roll_dice(sid, data):
game_id = UUID(data.get("game_id"))
state = state_manager.get_state(game_id)
# Roll dice
ab_roll = dice_system.roll_ab(league_id=state.league_id, game_id=game_id)
# Store pending roll
state.pending_manual_roll = ab_roll
state_manager.update_state(game_id, state)
# Broadcast results
await manager.broadcast_to_game(
str(game_id),
"dice_rolled",
{
"roll_id": ab_roll.roll_id,
"d6_one": ab_roll.d6_one,
"d6_two_total": ab_roll.d6_two_total,
"chaos_d20": ab_roll.chaos_d20,
"message": "Read your card and submit outcome"
}
)
@sio.event
async def submit_manual_outcome(sid, data):
game_id = UUID(data.get("game_id"))
outcome_str = data.get("outcome")
hit_location = data.get("hit_location")
# Validate
submission = ManualOutcomeSubmission(
outcome=outcome_str,
hit_location=hit_location
)
outcome = PlayOutcome(submission.outcome)
# Get pending roll
state = state_manager.get_state(game_id)
ab_roll = state.pending_manual_roll
# Confirm acceptance
await manager.emit_to_user(sid, "outcome_accepted", {
"outcome": outcome.value,
"hit_location": submission.hit_location
})
# Clear pending roll
state.pending_manual_roll = None
state_manager.update_state(game_id, state)
# Resolve play
result = await game_engine.resolve_manual_play(
game_id=game_id,
ab_roll=ab_roll,
outcome=outcome,
hit_location=submission.hit_location
)
# Broadcast result
await manager.broadcast_to_game(
str(game_id),
"play_resolved",
{
"description": result.description,
"runs_scored": result.runs_scored,
"outs_recorded": result.outs_recorded
}
)
Client-side flow:
// 1. Roll dice button clicked
document.getElementById('roll-btn').addEventListener('click', () => {
socket.emit('roll_dice', { game_id: currentGameId });
setButtonState('rolling');
});
// 2. Receive dice results
socket.on('dice_rolled', (data) => {
console.log('Dice:', data.d6_one, data.d6_two_total, data.chaos_d20);
// Show dice animation
displayDiceResults(data);
// Enable outcome selection
showOutcomeSelector();
});
// 3. User selects outcome from UI
document.getElementById('submit-outcome-btn').addEventListener('click', () => {
const outcome = document.getElementById('outcome-select').value;
const hitLocation = document.getElementById('hit-location-select').value;
socket.emit('submit_manual_outcome', {
game_id: currentGameId,
outcome: outcome,
hit_location: hitLocation
});
setButtonState('submitting');
});
// 4. Receive confirmation
socket.on('outcome_accepted', (data) => {
console.log('Outcome accepted:', data.outcome);
showSuccessMessage('Outcome accepted - resolving play...');
});
// 5. Receive play result
socket.on('play_resolved', (data) => {
console.log('Play result:', data.description);
// Update game state UI
updateScore(data.runs_scored);
updateOuts(data.outs_recorded);
addPlayToLog(data.description);
// Reset for next play
resetDiceRoller();
setButtonState('ready');
});
// 6. Handle errors
socket.on('outcome_rejected', (data) => {
console.error('Outcome rejected:', data.message, data.field);
showErrorMessage(`Error: ${data.message}`);
setButtonState('ready');
});
Example 2: Broadcasting Team-Specific Data
@sio.event
async def request_hand_cards(sid, data):
"""Send player's hand to them (but not opponents)"""
game_id = UUID(data.get("game_id"))
user_id = manager.user_sessions.get(sid)
# Get user's team
team_id = get_user_team(user_id, game_id)
# Get hand for that team
hand = get_team_hand(game_id, team_id)
# Send ONLY to requesting user (private data)
await manager.emit_to_user(sid, "hand_cards", {
"cards": hand,
"team_id": team_id
})
# Broadcast to game that player viewed hand (no details)
await manager.broadcast_to_game(str(game_id), "player_action", {
"user_id": user_id,
"action": "viewed_hand"
})
Example 3: Handling Spectators vs Players
@sio.event
async def join_game(sid, data):
game_id = data.get("game_id")
role = data.get("role", "player") # "player" or "spectator"
await manager.join_game(sid, game_id, role)
# Store role in session data (extend ConnectionManager)
manager.user_roles[sid] = role
if role == "spectator":
# Send spectator-specific state (no hidden info)
state = state_manager.get_state(UUID(game_id))
spectator_state = state.to_spectator_view()
await manager.emit_to_user(sid, "game_state", spectator_state)
else:
# Send full player state
state = state_manager.get_state(UUID(game_id))
await manager.emit_to_user(sid, "game_state", state.model_dump())
# When broadcasting, respect roles
async def broadcast_play_result(game_id: str, result: PlayResult):
sids = manager.get_game_participants(game_id)
for sid in sids:
role = manager.user_roles.get(sid, "player")
if role == "spectator":
# Send spectator-safe data (no hole cards, etc.)
await manager.emit_to_user(sid, "play_resolved", result.to_spectator_view())
else:
# Send full data
await manager.emit_to_user(sid, "play_resolved", result.model_dump())
Example 4: Reconnection Recovery
@sio.event
async def request_game_state(sid, data):
"""
Client requests full game state after reconnection.
Use this to recover from disconnections without reloading page.
"""
game_id = UUID(data.get("game_id"))
user_id = manager.user_sessions.get(sid)
# Verify user is participant
if not await is_game_participant(game_id, user_id):
await manager.emit_to_user(sid, "error", {"message": "Not authorized"})
return
# Get current state
state = state_manager.get_state(game_id)
if not state:
# Try to recover from database
state = await state_manager.recover_game(game_id)
if not state:
await manager.emit_to_user(sid, "error", {"message": "Game not found"})
return
# Get recent plays for context
plays = await db_ops.get_plays(game_id, limit=10)
# Send full state
await manager.emit_to_user(sid, "game_state_sync", {
"state": state.model_dump(),
"recent_plays": [p.to_dict() for p in plays],
"timestamp": pendulum.now('UTC').to_iso8601_string()
})
logger.info(f"Game state synced for user {user_id} in game {game_id}")
Client-side:
socket.on('reconnect', () => {
console.log('Reconnected - syncing state');
socket.emit('request_game_state', { game_id: currentGameId });
});
socket.on('game_state_sync', (data) => {
console.log('State synced:', data.timestamp);
// Rebuild UI from full state
rebuildGameUI(data.state);
// Show recent plays
displayRecentPlays(data.recent_plays);
// Resume normal operation
enableGameControls();
});
Performance Considerations
Broadcasting Efficiency
- Socket.io's room-based broadcasting is O(n) where n = room size
- Keep room sizes reasonable (players + spectators, not entire league)
- Use targeted
emit_to_user()for private data - Serialize Pydantic models once, broadcast same dict to all users
Connection Scalability
- Each connection consumes one socket + memory for session tracking
- Target: Support 100+ concurrent games (1000+ connections)
- Consider horizontal scaling with Redis pub/sub for multi-server:
# Future: Redis-backed Socket.io manager
sio = socketio.AsyncServer(
client_manager=socketio.AsyncRedisManager('redis://localhost:6379')
)
Event Loop Blocking
- Never use blocking I/O in event handlers (always
async/await) - Database writes are async (non-blocking)
- Heavy computation should use thread pool executor
Memory Management
- ConnectionManager dicts are bounded by active connections
- StateManager handles game state eviction (idle timeout)
- No memory leaks if
disconnect()handler works correctly
Security Considerations
Authentication
- ✅ All connections require valid JWT token
- ✅ Token verified in
connect()handler before accepting - ❌ TODO: Token expiration handling (refresh mechanism)
Authorization
- ❌ TODO: Verify user is participant before allowing actions
- ❌ TODO: Verify user is active player for turn-based actions
- ❌ TODO: Prevent spectators from performing player actions
Input Validation
- ✅ Pydantic models validate all inputs
- ✅ UUID validation for game_id
- ✅ Enum validation for outcomes
- ✅ Required field checks
Anti-Cheating
- ✅ Dice rolls are cryptographically secure (server-side)
- ✅ Pending roll is one-time use (cleared after submission)
- ❌ TODO: Rate limiting on dice rolls (prevent spam)
- ❌ TODO: Verify outcome matches roll (if cards are digitized)
- ❌ TODO: Track submission history for audit trail
Data Privacy
- Emit private data only to authorized users
- Use
emit_to_user()for sensitive information - Broadcasts should only contain public game state
- TODO: Implement spectator-safe data filtering
Future Enhancements
Planned Features
-
Authorization System
- User-game participant mapping
- Role-based permissions (player, spectator, admin)
- Turn-based action validation
-
Reconnection Improvements
- Automatic state synchronization on reconnect
- Missed event replay
- Persistent pending actions
-
Spectator Mode
- Separate spectator rooms
- Filtered game state (no hidden information)
- Spectator chat
-
Rate Limiting
- Prevent event spam
- Configurable limits per event type
- IP-based blocking for abuse
-
Analytics Events
- Track user actions for analytics
- Performance monitoring
- Error rate tracking
-
Advanced Broadcasting
- Team-specific channels
- Private player-to-player messaging
- Game admin announcements
Related Documentation
- Game Engine:
../core/CLAUDE.md- Play resolution logic - State Manager:
../core/CLAUDE.md- In-memory state management - Database:
../database/CLAUDE.md- Persistence layer - Models:
../models/CLAUDE.md- Pydantic game state models - WebSocket Protocol:
../../../.claude/implementation/websocket-protocol.md- Event specifications
Quick Reference
Event Summary
| Event | Direction | Purpose | Authentication |
|---|---|---|---|
connect |
Client → Server | Establish connection | JWT required |
disconnect |
Client → Server | End connection | Automatic |
join_game |
Client → Server | Join game room | ✅ Token |
leave_game |
Client → Server | Leave game room | ✅ Token |
heartbeat |
Client → Server | Keep-alive ping | ✅ Token |
roll_dice |
Client → Server | Roll dice for play | ✅ Token |
submit_manual_outcome |
Client → Server | Submit card outcome | ✅ Token |
connected |
Server → Client | Connection confirmed | - |
dice_rolled |
Server → Room | Dice results | - |
outcome_accepted |
Server → Client | Outcome confirmed | - |
play_resolved |
Server → Room | Play result | - |
outcome_rejected |
Server → Client | Validation error | - |
error |
Server → Client | Generic error | - |
Common Imports
# WebSocket
from socketio import AsyncServer
from app.websocket.connection_manager import ConnectionManager
# Game Logic
from app.core.state_manager import state_manager
from app.core.game_engine import game_engine
from app.core.dice import dice_system
# Models
from app.models.game_models import ManualOutcomeSubmission
from app.config.result_charts import PlayOutcome
# Validation
from uuid import UUID
from pydantic import ValidationError
# Logging
import logging
logger = logging.getLogger(f'{__name__}.handlers')
Last Updated: 2025-10-31 Module Version: Week 5 Implementation Status: Production-ready for manual outcome gameplay