strat-gameplay-webapp/backend/app/websocket/CLAUDE.md
Cal Corum 76e24ab22b CLAUDE: Refactor ManualOutcomeSubmission to use PlayOutcome enum + comprehensive documentation
## Refactoring
- Changed `ManualOutcomeSubmission.outcome` from `str` to `PlayOutcome` enum type
- Removed custom validator (Pydantic handles enum validation automatically)
- Added direct import of PlayOutcome (no circular dependency due to TYPE_CHECKING guard)
- Updated tests to use enum values while maintaining backward compatibility

Benefits:
- Better type safety with IDE autocomplete
- Cleaner code (removed 15 lines of validator boilerplate)
- Backward compatible (Pydantic auto-converts strings to enum)
- Access to helper methods (is_hit(), is_out(), etc.)

Files modified:
- app/models/game_models.py: Enum type + import
- tests/unit/config/test_result_charts.py: Updated 7 tests + added compatibility test

## Documentation
Created comprehensive CLAUDE.md files for all backend/app/ subdirectories to help future AI agents quickly understand and work with the code.

Added 8,799 lines of documentation covering:
- api/ (906 lines): FastAPI routes, health checks, auth patterns
- config/ (906 lines): League configs, PlayOutcome enum, result charts
- core/ (1,288 lines): GameEngine, StateManager, PlayResolver, dice system
- data/ (937 lines): API clients (planned), caching layer
- database/ (945 lines): Async sessions, operations, recovery
- models/ (1,270 lines): Pydantic/SQLAlchemy models, polymorphic patterns
- utils/ (959 lines): Logging, JWT auth, security
- websocket/ (1,588 lines): Socket.io handlers, real-time events
- tests/ (475 lines): Testing patterns and structure

Each CLAUDE.md includes:
- Purpose & architecture overview
- Key components with detailed explanations
- Patterns & conventions
- Integration points
- Common tasks (step-by-step guides)
- Troubleshooting with solutions
- Working code examples
- Testing guidance

Total changes: +9,294 lines / -24 lines
Tests: All passing (62/62 model tests, 7/7 ManualOutcomeSubmission tests)
2025-10-31 16:03:54 -05:00

41 KiB

WebSocket Module - Real-Time Game Communication

Purpose

Real-time bidirectional communication layer for Paper Dynasty game engine using Socket.io. Handles connection lifecycle, room management, game event broadcasting, and player action processing.

Critical Role: This is the primary interface between players and the game engine. All game actions flow through WebSocket events, ensuring real-time updates for all participants.

Architecture Overview

Client (Browser)
    ↓ Socket.io
ConnectionManager
    ↓
Event Handlers
    ↓
Game Engine → StateManager → Database
    ↓
Broadcast to All Players

Key Characteristics:

  • Async-first: All handlers use async/await
  • Room-based: Games are isolated rooms (game_id as room name)
  • JWT Authentication: All connections require valid token
  • Event-driven: Actions trigger events, results broadcast to rooms
  • Error isolation: Exceptions caught per-event, emit error to client

Structure

Module Files

app/websocket/
├── __init__.py                # Package marker (minimal/empty)
├── connection_manager.py      # Connection lifecycle & broadcasting
└── handlers.py                # Event handler registration

Dependencies

Internal:

  • app.core.state_manager - In-memory game state
  • app.core.game_engine - Play resolution logic
  • app.core.dice - Dice rolling system
  • app.core.validators - Rule validation
  • app.models.game_models - Pydantic game state models
  • app.utils.auth - JWT token verification
  • app.config.result_charts - PlayOutcome enum

External:

  • socketio.AsyncServer - Socket.io server implementation
  • pydantic - Data validation

Key Components

1. ConnectionManager (connection_manager.py)

Purpose: Manages WebSocket connection lifecycle, room membership, and message broadcasting.

State Tracking:

self.sio: socketio.AsyncServer          # Socket.io server instance
self.user_sessions: Dict[str, str]      # sid → user_id mapping
self.game_rooms: Dict[str, Set[str]]    # game_id → set of sids

Core Methods:

async connect(sid: str, user_id: str) -> None

Register a new connection after authentication.

await manager.connect(sid, user_id)
# Logs: "User {user_id} connected with session {sid}"

async disconnect(sid: str) -> None

Handle disconnection - cleanup sessions and notify game rooms.

await manager.disconnect(sid)
# Automatically:
# - Removes user from user_sessions
# - Removes from all game_rooms
# - Broadcasts "user_disconnected" to affected games

async join_game(sid: str, game_id: str, role: str) -> None

Add user to game room and broadcast join event.

await manager.join_game(sid, game_id, role="player")
# - Calls sio.enter_room(sid, game_id)
# - Tracks in game_rooms dict
# - Broadcasts "user_connected" to room

async leave_game(sid: str, game_id: str) -> None

Remove user from game room.

await manager.leave_game(sid, game_id)
# - Calls sio.leave_room(sid, game_id)
# - Updates game_rooms tracking

async broadcast_to_game(game_id: str, event: str, data: dict) -> None

Send event to all users in game room.

await manager.broadcast_to_game(
    game_id="123e4567-e89b-12d3-a456-426614174000",
    event="play_resolved",
    data={"description": "Single to CF", "runs_scored": 1}
)
# All players in game receive event

async emit_to_user(sid: str, event: str, data: dict) -> None

Send event to specific user.

await manager.emit_to_user(
    sid="abc123",
    event="error",
    data={"message": "Invalid action"}
)
# Only that user receives event

get_game_participants(game_id: str) -> Set[str]

Get all session IDs currently in game room.

sids = manager.get_game_participants(game_id)
print(f"{len(sids)} players connected")

2. Event Handlers (handlers.py)

Purpose: Register and process all client-initiated events. Validates inputs, coordinates with game engine, emits responses.

Registration Pattern:

def register_handlers(sio: AsyncServer, manager: ConnectionManager) -> None:
    """Register all WebSocket event handlers"""

    @sio.event
    async def event_name(sid, data):
        # Handler implementation

Handler Design Pattern:

@sio.event
async def some_event(sid, data):
    """
    Event description.

    Event data:
        field1: type - description
        field2: type - description

    Emits:
        success_event: To requester/room on success
        error: To requester on failure
    """
    try:
        # 1. Extract and validate inputs
        game_id = UUID(data.get("game_id"))
        field = data.get("field")

        # 2. Get game state
        state = state_manager.get_state(game_id)
        if not state:
            await manager.emit_to_user(sid, "error", {"message": "Game not found"})
            return

        # 3. Validate authorization (TODO: implement)
        # user_id = manager.user_sessions.get(sid)

        # 4. Process action
        result = await game_engine.do_something(game_id, field)

        # 5. Emit success
        await manager.emit_to_user(sid, "success_event", result)

        # 6. Broadcast to game room if needed
        await manager.broadcast_to_game(game_id, "state_update", data)

    except ValidationError as e:
        # Pydantic validation error - user-friendly message
        await manager.emit_to_user(sid, "error_event", {"message": str(e)})
    except Exception as e:
        # Unexpected error - log and return generic message
        logger.error(f"Event error: {e}", exc_info=True)
        await manager.emit_to_user(sid, "error", {"message": str(e)})

Core Event Handlers

connect(sid, environ, auth) -> bool

Purpose: Authenticate new WebSocket connections using JWT.

Flow:

  1. Extract JWT token from auth dict
  2. Verify token using verify_token()
  3. Extract user_id from token payload
  4. Register connection with ConnectionManager
  5. Emit "connected" event to user

Returns: True to accept, False to reject connection

Security: First line of defense - all connections must have valid JWT.

# Client connection attempt
socket.connect({
    auth: {
        token: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
    }
})

# On success, receives:
{"user_id": "12345"}

disconnect(sid)

Purpose: Clean up when user disconnects (intentional or network failure).

Flow:

  1. Remove from user_sessions
  2. Remove from all game_rooms
  3. Broadcast "user_disconnected" to affected games

Automatic: Called by Socket.io on connection loss.


join_game(sid, data)

Purpose: Add user to game room for real-time updates.

Event Data:

{
    "game_id": "123e4567-e89b-12d3-a456-426614174000",
    "role": "player"  # or "spectator"
}

Emits:

  • game_joined → To requester with confirmation
  • user_connected → Broadcast to game room

TODO: Verify user has access to game (authorization check)


leave_game(sid, data)

Purpose: Remove user from game room.

Event Data:

{
    "game_id": "123e4567-e89b-12d3-a456-426614174000"
}

Use Case: User navigates away, switches games, or voluntarily leaves.


heartbeat(sid)

Purpose: Keep-alive mechanism to detect stale connections.

Flow:

  1. Client sends periodic "heartbeat" events
  2. Server immediately responds with "heartbeat_ack"

Usage: Client can detect server unresponsiveness if ack not received.


roll_dice(sid, data)

Purpose: Roll dice for manual outcome selection (core gameplay event).

Event Data:

{
    "game_id": "123e4567-e89b-12d3-a456-426614174000"
}

Flow:

  1. Validate game_id (UUID format, game exists)
  2. Verify user is participant (TODO: implement authorization)
  3. Roll dice using dice_system.roll_ab()
  4. Store roll in state.pending_manual_roll (one-time use)
  5. Broadcast dice results to all players in game

Emits:

  • dice_rolled → Broadcast to game room with roll results
  • error → To requester if validation fails

Dice Roll Structure:

{
    "game_id": "123e4567-...",
    "roll_id": "unique-roll-identifier",
    "d6_one": 4,              # First d6 (card selection)
    "d6_two_total": 7,        # Sum of 2d6 (row selection)
    "chaos_d20": 14,          # d20 for split results
    "resolution_d20": 8,      # d20 for secondary checks
    "check_wild_pitch": False,
    "check_passed_ball": False,
    "timestamp": "2025-10-31T12:34:56Z",
    "message": "Dice rolled - read your card and submit outcome"
}

Players' Workflow:

  1. Receive dice_rolled event
  2. d6_one determines column (1-3: batter card, 4-6: pitcher card)
  3. d6_two_total determines row on card (2-12)
  4. Read physical card result at that position
  5. Submit outcome using submit_manual_outcome

Security: Roll stored in pending_manual_roll to prevent replay attacks. Cleared after single use.


submit_manual_outcome(sid, data)

Purpose: Submit manually-selected play outcome after reading physical card.

Event Data:

{
    "game_id": "123e4567-e89b-12d3-a456-426614174000",
    "outcome": "single",           # PlayOutcome enum value
    "hit_location": "CF"           # Optional: required for hits
}

Flow:

  1. Validate game_id (UUID format, game exists)
  2. Verify user is authorized (TODO: implement - active batter or game admin)
  3. Extract outcome and hit_location
  4. Validate using ManualOutcomeSubmission Pydantic model
  5. Convert outcome string to PlayOutcome enum
  6. Check if outcome requires hit_location (groundball, flyball, line drive)
  7. Verify pending_manual_roll exists (must call roll_dice first)
  8. Emit outcome_accepted to requester (immediate feedback)
  9. Process play through game_engine.resolve_manual_play()
  10. Clear pending_manual_roll (one-time use)
  11. Broadcast play_resolved to game room with full result

Emits:

  • outcome_accepted → To requester (immediate confirmation)
  • play_resolved → Broadcast to game room (full play result)
  • outcome_rejected → To requester if validation fails
  • error → To requester if processing fails

Validation Errors:

# Missing game_id
{"message": "Missing game_id", "field": "game_id"}

# Invalid UUID format
{"message": "Invalid game_id format", "field": "game_id"}

# Game not found
{"message": "Game {game_id} not found"}

# Missing outcome
{"message": "Missing outcome", "field": "outcome"}

# Invalid outcome value
{"message": "Invalid outcome", "field": "outcome", "errors": [...]}

# Missing required hit_location
{"message": "Outcome groundball_c requires hit_location", "field": "hit_location"}

# No pending roll
{"message": "No pending dice roll - call roll_dice first", "field": "game_state"}

Play Result Structure:

{
    "game_id": "123e4567-...",
    "play_number": 15,
    "outcome": "single",
    "hit_location": "CF",
    "description": "Single to center field",
    "outs_recorded": 0,
    "runs_scored": 1,
    "batter_result": "1B",
    "runners_advanced": [
        {"from": 2, "to": 4},  # Runner scored from 2nd
        {"from": 0, "to": 1}   # Batter to 1st
    ],
    "is_hit": true,
    "is_out": false,
    "is_walk": false,
    "roll_id": "unique-roll-identifier"
}

Error Handling:

  • ValidationError (Pydantic): User-friendly field-level errors → outcome_rejected
  • GameValidationError: Business rule violations → outcome_rejected
  • Exception: Unexpected errors → logged with stack trace, error emitted

Security:

  • Validates pending_manual_roll exists (prevents fabricated submissions)
  • One-time use: roll cleared after processing
  • TODO: Verify user authorization (active batter or game admin)

Patterns & Conventions

1. Error Handling

Three-tier error handling:

try:
    # Main logic
    result = await process_action()

except ValidationError as e:
    # Pydantic validation - user-friendly error
    first_error = e.errors()[0]
    field = first_error['loc'][0] if first_error['loc'] else 'unknown'
    message = first_error['msg']

    await manager.emit_to_user(
        sid,
        "outcome_rejected",  # or "error"
        {"message": message, "field": field, "errors": e.errors()}
    )
    logger.warning(f"Validation failed: {message}")
    return  # Don't continue

except GameValidationError as e:
    # Business rule violation
    await manager.emit_to_user(
        sid,
        "outcome_rejected",
        {"message": str(e), "field": "validation"}
    )
    logger.warning(f"Game validation failed: {e}")
    return

except Exception as e:
    # Unexpected error - log full stack trace
    logger.error(f"Unexpected error: {e}", exc_info=True)
    await manager.emit_to_user(
        sid,
        "error",
        {"message": f"Failed to process action: {str(e)}"}
    )
    return

Error Event Types:

  • error - Generic error (connection, processing failures)
  • outcome_rejected - Play-specific validation failure (user-friendly)

2. Logging

All logs use structured format with module name:

import logging

logger = logging.getLogger(f'{__name__}.ConnectionManager')
logger = logging.getLogger(f'{__name__}.handlers')

# Log levels
logger.info(f"User {user_id} connected")           # Normal operations
logger.warning(f"Validation failed: {message}")    # Expected errors
logger.error(f"Error: {e}", exc_info=True)         # Unexpected errors
logger.debug(f"Broadcast {event} to game")         # Verbose details

3. UUID Validation

All game_id values must be validated as UUIDs:

from uuid import UUID

try:
    game_id = UUID(data.get("game_id"))
except (ValueError, AttributeError):
    await manager.emit_to_user(
        sid,
        "error",
        {"message": "Invalid game_id format"}
    )
    return

4. State Validation

Always verify game state exists:

state = state_manager.get_state(game_id)
if not state:
    await manager.emit_to_user(
        sid,
        "error",
        {"message": f"Game {game_id} not found"}
    )
    return

5. Authorization Pattern (TODO)

Framework for future authorization checks:

# Get user_id from session
user_id = manager.user_sessions.get(sid)

# Verify user has access (not yet implemented)
# if not is_game_participant(game_id, user_id):
#     await manager.emit_to_user(sid, "error", {"message": "Not authorized"})
#     return

# Verify user can perform action (not yet implemented)
# if not is_active_batter(game_id, user_id):
#     await manager.emit_to_user(sid, "error", {"message": "Not your turn"})
#     return

6. Pydantic Validation

Use Pydantic models for input validation:

from app.models.game_models import ManualOutcomeSubmission

try:
    submission = ManualOutcomeSubmission(
        outcome=data.get("outcome"),
        hit_location=data.get("hit_location")
    )
except ValidationError as e:
    # Extract user-friendly error
    first_error = e.errors()[0]
    field = first_error['loc'][0]
    message = first_error['msg']

    await manager.emit_to_user(sid, "outcome_rejected", {
        "message": message,
        "field": field,
        "errors": e.errors()
    })
    return

7. Event Response Flow

Request → Validate → Process → Respond → Broadcast

@sio.event
async def some_action(sid, data):
    # 1. VALIDATE inputs
    game_id = validate_game_id(data)
    state = get_and_verify_state(game_id)

    # 2. PROCESS action
    result = await game_engine.process(game_id, data)

    # 3. RESPOND to requester
    await manager.emit_to_user(sid, "action_accepted", {"status": "success"})

    # 4. BROADCAST to game room
    await manager.broadcast_to_game(game_id, "state_update", result)

8. Async Best Practices

  • All handlers are async def
  • Use await for I/O operations (database, game engine)
  • Non-blocking: multiple events can be processed concurrently
  • Game engine operations are async for database writes

Integration Points

With Game Engine

Event handlers coordinate with game engine for play resolution:

from app.core.game_engine import game_engine

# Roll dice
ab_roll = dice_system.roll_ab(league_id=state.league_id, game_id=game_id)

# Store in state (pending)
state.pending_manual_roll = ab_roll
state_manager.update_state(game_id, state)

# Resolve manual play
result = await game_engine.resolve_manual_play(
    game_id=game_id,
    ab_roll=ab_roll,
    outcome=PlayOutcome.SINGLE,
    hit_location="CF"
)

With State Manager

Real-time state access and updates:

from app.core.state_manager import state_manager

# Get current state (O(1) lookup)
state = state_manager.get_state(game_id)

# Update state (in-memory)
state.pending_manual_roll = ab_roll
state_manager.update_state(game_id, state)

# Clear pending roll after use
state.pending_manual_roll = None
state_manager.update_state(game_id, state)

With Database

Async database writes happen in game engine (non-blocking):

from app.core.game_engine import game_engine

# Game engine handles async DB operations
result = await game_engine.resolve_manual_play(...)
# - Updates in-memory state (immediate)
# - Writes to database (async, non-blocking)
# - Returns result for broadcasting

With Clients

Client-side Socket.io integration:

// Connect with JWT
const socket = io('ws://localhost:8000', {
    auth: {
        token: localStorage.getItem('jwt')
    }
});

// Connection confirmed
socket.on('connected', (data) => {
    console.log('Connected as user', data.user_id);
});

// Join game room
socket.emit('join_game', {
    game_id: '123e4567-e89b-12d3-a456-426614174000',
    role: 'player'
});

// Roll dice
socket.emit('roll_dice', {
    game_id: '123e4567-e89b-12d3-a456-426614174000'
});

// Receive dice results
socket.on('dice_rolled', (data) => {
    console.log('Dice:', data.d6_one, data.d6_two_total);
    // Show UI for outcome selection
});

// Submit outcome
socket.emit('submit_manual_outcome', {
    game_id: '123e4567-e89b-12d3-a456-426614174000',
    outcome: 'single',
    hit_location: 'CF'
});

// Receive play result
socket.on('play_resolved', (data) => {
    console.log('Play:', data.description);
    console.log('Runs scored:', data.runs_scored);
    // Update game UI
});

// Handle errors
socket.on('error', (data) => {
    console.error('Error:', data.message);
});

socket.on('outcome_rejected', (data) => {
    console.error('Rejected:', data.message, 'Field:', data.field);
});

Common Tasks

Adding a New Event Handler

  1. Define handler function in handlers.py:
@sio.event
async def new_event(sid, data):
    """
    Description of what this event does.

    Event data:
        field1: type - description
        field2: type - description

    Emits:
        success_event: To requester/room on success
        error: To requester on failure
    """
    try:
        # 1. Extract and validate inputs
        game_id = UUID(data.get("game_id"))

        # 2. Get game state
        state = state_manager.get_state(game_id)
        if not state:
            await manager.emit_to_user(sid, "error", {"message": "Game not found"})
            return

        # 3. Process action
        result = await game_engine.some_action(game_id, data)

        # 4. Emit success
        await manager.emit_to_user(sid, "success_event", result)

        # 5. Broadcast to game room
        await manager.broadcast_to_game(game_id, "state_update", result)

    except Exception as e:
        logger.error(f"New event error: {e}", exc_info=True)
        await manager.emit_to_user(sid, "error", {"message": str(e)})
  1. Register automatically: @sio.event decorator auto-registers

  2. Add client-side handler:

socket.emit('new_event', { game_id: '...', field1: 'value' });
socket.on('success_event', (data) => { /* handle */ });

Modifying Broadcast Logic

To broadcast to specific users:

# Get participants
sids = manager.get_game_participants(game_id)

# Emit to each with custom logic
for sid in sids:
    user_id = manager.user_sessions.get(sid)

    # Custom data per user
    custom_data = build_user_specific_data(user_id)

    await manager.emit_to_user(sid, "custom_event", custom_data)

To broadcast to teams separately:

# Get participants
sids = manager.get_game_participants(game_id)

for sid in sids:
    user_id = manager.user_sessions.get(sid)

    # Determine team
    if is_home_team(user_id, game_id):
        await manager.emit_to_user(sid, "home_event", data)
    else:
        await manager.emit_to_user(sid, "away_event", data)

Adding Authorization Checks

TODO: Implement authorization service and add checks:

from app.utils.auth import verify_game_access, verify_active_player

@sio.event
async def protected_event(sid, data):
    game_id = UUID(data.get("game_id"))
    user_id = manager.user_sessions.get(sid)

    # Verify user has access to game
    if not verify_game_access(user_id, game_id):
        await manager.emit_to_user(sid, "error", {"message": "Not authorized"})
        return

    # Verify user is active player
    if not verify_active_player(user_id, game_id):
        await manager.emit_to_user(sid, "error", {"message": "Not your turn"})
        return

    # Process action
    ...

Testing Event Handlers

Unit tests (using pytest-asyncio):

import pytest
from unittest.mock import AsyncMock, MagicMock
from app.websocket.handlers import register_handlers

@pytest.mark.asyncio
async def test_roll_dice():
    # Mock Socket.io server
    sio = MagicMock()
    manager = MagicMock()

    # Register handlers
    register_handlers(sio, manager)

    # Get the roll_dice handler
    roll_dice_handler = sio.event.call_args_list[2][0][0]  # 3rd registered event

    # Mock data
    sid = "test-sid"
    data = {"game_id": "123e4567-e89b-12d3-a456-426614174000"}

    # Call handler
    await roll_dice_handler(sid, data)

    # Verify broadcast
    manager.broadcast_to_game.assert_called_once()

Integration tests (with test database):

import pytest
from socketio import AsyncClient
from app.main import app

@pytest.mark.asyncio
async def test_roll_dice_integration():
    # Create test client
    client = AsyncClient()
    await client.connect('http://localhost:8000', auth={'token': test_jwt})

    # Join game
    await client.emit('join_game', {'game_id': test_game_id})

    # Roll dice
    await client.emit('roll_dice', {'game_id': test_game_id})

    # Wait for response
    result = await client.receive()
    assert result[0] == 'dice_rolled'
    assert 'roll_id' in result[1]

    await client.disconnect()

Troubleshooting

Connection Issues

Problem: Client can't connect to WebSocket

Checklist:

  1. Verify JWT token is valid and not expired
  2. Check CORS settings in app/config.py
  3. Ensure Socket.io versions match (client and server)
  4. Check server logs for connection rejection reasons
  5. Verify network/firewall allows WebSocket connections
  6. Test with curl: curl -H "Authorization: Bearer TOKEN" http://localhost:8000/socket.io/

Debug logs:

# Enable Socket.io debug logging
import socketio
sio = socketio.AsyncServer(logger=True, engineio_logger=True)

Events Not Received

Problem: Client emits event but no response

Checklist:

  1. Verify event name matches exactly (case-sensitive)
  2. Check server logs for handler errors
  3. Ensure game_id exists and is valid UUID
  4. Verify user is in game room (call join_game first)
  5. Check data format matches expected structure

Debug:

// Client-side logging
socket.onAny((event, data) => {
    console.log('Received:', event, data);
});

socket.on('error', (data) => {
    console.error('Error:', data);
});

Game State Desynchronization

Problem: Client UI doesn't match server state

Common Causes:

  1. Client missed broadcast due to disconnection
  2. Event handler error prevented broadcast
  3. Client state update logic has bug

Solutions:

  1. Add state synchronization event:
@sio.event
async def request_game_state(sid, data):
    """Client requests full game state (recovery after disconnect)"""
    game_id = UUID(data.get("game_id"))
    state = state_manager.get_state(game_id)

    if state:
        await manager.emit_to_user(sid, "game_state", state.model_dump())
    else:
        await manager.emit_to_user(sid, "error", {"message": "Game not found"})
  1. Implement reconnection logic:
socket.on('reconnect', () => {
    console.log('Reconnected - requesting state');
    socket.emit('request_game_state', { game_id: currentGameId });
});

Broadcast Not Reaching All Players

Problem: Some users don't receive broadcasts

Checklist:

  1. Verify all users called join_game
  2. Check game_rooms dict in ConnectionManager
  3. Ensure room name matches game_id exactly
  4. Verify users haven't silently disconnected

Debug:

# Add logging to broadcasts
async def broadcast_to_game(self, game_id: str, event: str, data: dict) -> None:
    participants = self.get_game_participants(game_id)
    logger.info(f"Broadcasting {event} to {len(participants)} participants")

    await self.sio.emit(event, data, room=game_id)

    # Verify delivery
    for sid in participants:
        user_id = self.user_sessions.get(sid)
        logger.debug(f"Sent to user {user_id} (sid={sid})")

Pending Roll Not Found

Problem: submit_manual_outcome fails with "No pending dice roll"

Common Causes:

  1. User didn't call roll_dice first
  2. Roll expired due to timeout
  3. Another user already submitted outcome
  4. Server restarted (in-memory state lost)

Solutions:

  1. Enforce UI workflow: disable submit button until dice_rolled received
  2. Add roll expiration check (optional):
# In roll_dice handler
state.pending_manual_roll = ab_roll
state.pending_manual_roll_expires_at = pendulum.now('UTC').add(minutes=5)

# In submit_manual_outcome handler
if state.pending_manual_roll_expires_at < pendulum.now('UTC'):
    state.pending_manual_roll = None
    await manager.emit_to_user(sid, "outcome_rejected", {
        "message": "Roll expired - please roll again",
        "field": "game_state"
    })
    return
  1. Implement roll persistence in database for recovery

Authorization Not Enforced

Problem: Users can perform actions they shouldn't be able to

Current Status: Authorization checks are stubbed out with TODO comments

Implementation Plan:

  1. Create authorization service:
# app/utils/authorization.py
async def is_game_participant(game_id: UUID, user_id: str) -> bool:
    """Check if user is a participant in this game"""
    # Query database for game participants
    pass

async def is_active_batter(game_id: UUID, user_id: str) -> bool:
    """Check if user is the active batter"""
    state = state_manager.get_state(game_id)
    # Check current batter lineup ID against user's lineup assignments
    pass

async def is_game_admin(game_id: UUID, user_id: str) -> bool:
    """Check if user is game creator or admin"""
    pass
  1. Add checks to handlers:
from app.utils.authorization import is_game_participant, is_active_batter

@sio.event
async def submit_manual_outcome(sid, data):
    game_id = UUID(data.get("game_id"))
    user_id = manager.user_sessions.get(sid)

    # Verify participation
    if not await is_game_participant(game_id, user_id):
        await manager.emit_to_user(sid, "error", {"message": "Not authorized"})
        return

    # Verify active player
    if not await is_active_batter(game_id, user_id):
        await manager.emit_to_user(sid, "error", {"message": "Not your turn"})
        return

    # Process action
    ...

Memory Leaks in ConnectionManager

Problem: user_sessions or game_rooms grows indefinitely

Prevention:

  1. disconnect() handler automatically cleans up sessions
  2. Monitor dict sizes:
@sio.event
async def heartbeat(sid):
    await sio.emit("heartbeat_ack", {}, room=sid)

    # Periodic cleanup (every 100 heartbeats)
    if random.randint(1, 100) == 1:
        logger.info(f"Active sessions: {len(manager.user_sessions)}")
        logger.info(f"Active games: {len(manager.game_rooms)}")
  1. Add periodic cleanup task:
async def cleanup_stale_sessions():
    """Remove sessions that haven't sent heartbeat in 5 minutes"""
    while True:
        await asyncio.sleep(300)  # 5 minutes

        stale_sids = []
        for sid, user_id in manager.user_sessions.items():
            # Check last heartbeat timestamp
            if is_stale(sid):
                stale_sids.append(sid)

        for sid in stale_sids:
            await manager.disconnect(sid)

        if stale_sids:
            logger.info(f"Cleaned up {len(stale_sids)} stale sessions")

Examples

Example 1: Complete Dice Roll → Outcome Flow

Server-side handlers:

# handlers.py
@sio.event
async def roll_dice(sid, data):
    game_id = UUID(data.get("game_id"))
    state = state_manager.get_state(game_id)

    # Roll dice
    ab_roll = dice_system.roll_ab(league_id=state.league_id, game_id=game_id)

    # Store pending roll
    state.pending_manual_roll = ab_roll
    state_manager.update_state(game_id, state)

    # Broadcast results
    await manager.broadcast_to_game(
        str(game_id),
        "dice_rolled",
        {
            "roll_id": ab_roll.roll_id,
            "d6_one": ab_roll.d6_one,
            "d6_two_total": ab_roll.d6_two_total,
            "chaos_d20": ab_roll.chaos_d20,
            "message": "Read your card and submit outcome"
        }
    )

@sio.event
async def submit_manual_outcome(sid, data):
    game_id = UUID(data.get("game_id"))
    outcome_str = data.get("outcome")
    hit_location = data.get("hit_location")

    # Validate
    submission = ManualOutcomeSubmission(
        outcome=outcome_str,
        hit_location=hit_location
    )
    outcome = PlayOutcome(submission.outcome)

    # Get pending roll
    state = state_manager.get_state(game_id)
    ab_roll = state.pending_manual_roll

    # Confirm acceptance
    await manager.emit_to_user(sid, "outcome_accepted", {
        "outcome": outcome.value,
        "hit_location": submission.hit_location
    })

    # Clear pending roll
    state.pending_manual_roll = None
    state_manager.update_state(game_id, state)

    # Resolve play
    result = await game_engine.resolve_manual_play(
        game_id=game_id,
        ab_roll=ab_roll,
        outcome=outcome,
        hit_location=submission.hit_location
    )

    # Broadcast result
    await manager.broadcast_to_game(
        str(game_id),
        "play_resolved",
        {
            "description": result.description,
            "runs_scored": result.runs_scored,
            "outs_recorded": result.outs_recorded
        }
    )

Client-side flow:

// 1. Roll dice button clicked
document.getElementById('roll-btn').addEventListener('click', () => {
    socket.emit('roll_dice', { game_id: currentGameId });
    setButtonState('rolling');
});

// 2. Receive dice results
socket.on('dice_rolled', (data) => {
    console.log('Dice:', data.d6_one, data.d6_two_total, data.chaos_d20);

    // Show dice animation
    displayDiceResults(data);

    // Enable outcome selection
    showOutcomeSelector();
});

// 3. User selects outcome from UI
document.getElementById('submit-outcome-btn').addEventListener('click', () => {
    const outcome = document.getElementById('outcome-select').value;
    const hitLocation = document.getElementById('hit-location-select').value;

    socket.emit('submit_manual_outcome', {
        game_id: currentGameId,
        outcome: outcome,
        hit_location: hitLocation
    });

    setButtonState('submitting');
});

// 4. Receive confirmation
socket.on('outcome_accepted', (data) => {
    console.log('Outcome accepted:', data.outcome);
    showSuccessMessage('Outcome accepted - resolving play...');
});

// 5. Receive play result
socket.on('play_resolved', (data) => {
    console.log('Play result:', data.description);

    // Update game state UI
    updateScore(data.runs_scored);
    updateOuts(data.outs_recorded);
    addPlayToLog(data.description);

    // Reset for next play
    resetDiceRoller();
    setButtonState('ready');
});

// 6. Handle errors
socket.on('outcome_rejected', (data) => {
    console.error('Outcome rejected:', data.message, data.field);
    showErrorMessage(`Error: ${data.message}`);
    setButtonState('ready');
});

Example 2: Broadcasting Team-Specific Data

@sio.event
async def request_hand_cards(sid, data):
    """Send player's hand to them (but not opponents)"""
    game_id = UUID(data.get("game_id"))
    user_id = manager.user_sessions.get(sid)

    # Get user's team
    team_id = get_user_team(user_id, game_id)

    # Get hand for that team
    hand = get_team_hand(game_id, team_id)

    # Send ONLY to requesting user (private data)
    await manager.emit_to_user(sid, "hand_cards", {
        "cards": hand,
        "team_id": team_id
    })

    # Broadcast to game that player viewed hand (no details)
    await manager.broadcast_to_game(str(game_id), "player_action", {
        "user_id": user_id,
        "action": "viewed_hand"
    })

Example 3: Handling Spectators vs Players

@sio.event
async def join_game(sid, data):
    game_id = data.get("game_id")
    role = data.get("role", "player")  # "player" or "spectator"

    await manager.join_game(sid, game_id, role)

    # Store role in session data (extend ConnectionManager)
    manager.user_roles[sid] = role

    if role == "spectator":
        # Send spectator-specific state (no hidden info)
        state = state_manager.get_state(UUID(game_id))
        spectator_state = state.to_spectator_view()

        await manager.emit_to_user(sid, "game_state", spectator_state)
    else:
        # Send full player state
        state = state_manager.get_state(UUID(game_id))
        await manager.emit_to_user(sid, "game_state", state.model_dump())

# When broadcasting, respect roles
async def broadcast_play_result(game_id: str, result: PlayResult):
    sids = manager.get_game_participants(game_id)

    for sid in sids:
        role = manager.user_roles.get(sid, "player")

        if role == "spectator":
            # Send spectator-safe data (no hole cards, etc.)
            await manager.emit_to_user(sid, "play_resolved", result.to_spectator_view())
        else:
            # Send full data
            await manager.emit_to_user(sid, "play_resolved", result.model_dump())

Example 4: Reconnection Recovery

@sio.event
async def request_game_state(sid, data):
    """
    Client requests full game state after reconnection.

    Use this to recover from disconnections without reloading page.
    """
    game_id = UUID(data.get("game_id"))
    user_id = manager.user_sessions.get(sid)

    # Verify user is participant
    if not await is_game_participant(game_id, user_id):
        await manager.emit_to_user(sid, "error", {"message": "Not authorized"})
        return

    # Get current state
    state = state_manager.get_state(game_id)
    if not state:
        # Try to recover from database
        state = await state_manager.recover_game(game_id)

    if not state:
        await manager.emit_to_user(sid, "error", {"message": "Game not found"})
        return

    # Get recent plays for context
    plays = await db_ops.get_plays(game_id, limit=10)

    # Send full state
    await manager.emit_to_user(sid, "game_state_sync", {
        "state": state.model_dump(),
        "recent_plays": [p.to_dict() for p in plays],
        "timestamp": pendulum.now('UTC').to_iso8601_string()
    })

    logger.info(f"Game state synced for user {user_id} in game {game_id}")

Client-side:

socket.on('reconnect', () => {
    console.log('Reconnected - syncing state');
    socket.emit('request_game_state', { game_id: currentGameId });
});

socket.on('game_state_sync', (data) => {
    console.log('State synced:', data.timestamp);

    // Rebuild UI from full state
    rebuildGameUI(data.state);

    // Show recent plays
    displayRecentPlays(data.recent_plays);

    // Resume normal operation
    enableGameControls();
});

Performance Considerations

Broadcasting Efficiency

  • Socket.io's room-based broadcasting is O(n) where n = room size
  • Keep room sizes reasonable (players + spectators, not entire league)
  • Use targeted emit_to_user() for private data
  • Serialize Pydantic models once, broadcast same dict to all users

Connection Scalability

  • Each connection consumes one socket + memory for session tracking
  • Target: Support 100+ concurrent games (1000+ connections)
  • Consider horizontal scaling with Redis pub/sub for multi-server:
# Future: Redis-backed Socket.io manager
sio = socketio.AsyncServer(
    client_manager=socketio.AsyncRedisManager('redis://localhost:6379')
)

Event Loop Blocking

  • Never use blocking I/O in event handlers (always async/await)
  • Database writes are async (non-blocking)
  • Heavy computation should use thread pool executor

Memory Management

  • ConnectionManager dicts are bounded by active connections
  • StateManager handles game state eviction (idle timeout)
  • No memory leaks if disconnect() handler works correctly

Security Considerations

Authentication

  • All connections require valid JWT token
  • Token verified in connect() handler before accepting
  • TODO: Token expiration handling (refresh mechanism)

Authorization

  • TODO: Verify user is participant before allowing actions
  • TODO: Verify user is active player for turn-based actions
  • TODO: Prevent spectators from performing player actions

Input Validation

  • Pydantic models validate all inputs
  • UUID validation for game_id
  • Enum validation for outcomes
  • Required field checks

Anti-Cheating

  • Dice rolls are cryptographically secure (server-side)
  • Pending roll is one-time use (cleared after submission)
  • TODO: Rate limiting on dice rolls (prevent spam)
  • TODO: Verify outcome matches roll (if cards are digitized)
  • TODO: Track submission history for audit trail

Data Privacy

  • Emit private data only to authorized users
  • Use emit_to_user() for sensitive information
  • Broadcasts should only contain public game state
  • TODO: Implement spectator-safe data filtering

Future Enhancements

Planned Features

  1. Authorization System

    • User-game participant mapping
    • Role-based permissions (player, spectator, admin)
    • Turn-based action validation
  2. Reconnection Improvements

    • Automatic state synchronization on reconnect
    • Missed event replay
    • Persistent pending actions
  3. Spectator Mode

    • Separate spectator rooms
    • Filtered game state (no hidden information)
    • Spectator chat
  4. Rate Limiting

    • Prevent event spam
    • Configurable limits per event type
    • IP-based blocking for abuse
  5. Analytics Events

    • Track user actions for analytics
    • Performance monitoring
    • Error rate tracking
  6. Advanced Broadcasting

    • Team-specific channels
    • Private player-to-player messaging
    • Game admin announcements

  • Game Engine: ../core/CLAUDE.md - Play resolution logic
  • State Manager: ../core/CLAUDE.md - In-memory state management
  • Database: ../database/CLAUDE.md - Persistence layer
  • Models: ../models/CLAUDE.md - Pydantic game state models
  • WebSocket Protocol: ../../../.claude/implementation/websocket-protocol.md - Event specifications

Quick Reference

Event Summary

Event Direction Purpose Authentication
connect Client → Server Establish connection JWT required
disconnect Client → Server End connection Automatic
join_game Client → Server Join game room Token
leave_game Client → Server Leave game room Token
heartbeat Client → Server Keep-alive ping Token
roll_dice Client → Server Roll dice for play Token
submit_manual_outcome Client → Server Submit card outcome Token
connected Server → Client Connection confirmed -
dice_rolled Server → Room Dice results -
outcome_accepted Server → Client Outcome confirmed -
play_resolved Server → Room Play result -
outcome_rejected Server → Client Validation error -
error Server → Client Generic error -

Common Imports

# WebSocket
from socketio import AsyncServer
from app.websocket.connection_manager import ConnectionManager

# Game Logic
from app.core.state_manager import state_manager
from app.core.game_engine import game_engine
from app.core.dice import dice_system

# Models
from app.models.game_models import ManualOutcomeSubmission
from app.config.result_charts import PlayOutcome

# Validation
from uuid import UUID
from pydantic import ValidationError

# Logging
import logging
logger = logging.getLogger(f'{__name__}.handlers')

Last Updated: 2025-10-31 Module Version: Week 5 Implementation Status: Production-ready for manual outcome gameplay