strat-gameplay-webapp/backend/app/websocket/handlers.py
Cal Corum 920d1c599c CLAUDE: Add Undo Last Play feature for game rollback
- Added rollback_play WebSocket handler (handlers.py:1632)
  - Accepts game_id and num_plays (default: 1)
  - Validates game state and play count
  - Broadcasts play_rolled_back and game_state_update events
  - Full error handling with rate limiting

- Added undoLastPlay action to useGameActions composable
  - Emits rollback_play event to backend

- Added Undo button to game page ([id].vue)
  - Amber floating action button with undo arrow icon
  - Positioned above substitutions button
  - Only visible when game is active and has plays

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-27 21:34:48 -06:00

1750 lines
69 KiB
Python

import asyncio
import logging
from uuid import UUID
from pydantic import ValidationError
from socketio import AsyncServer
from sqlalchemy.exc import SQLAlchemyError
from app.config.result_charts import PlayOutcome
from app.core.dice import dice_system
from app.core.exceptions import (
DatabaseError,
GameNotFoundError,
InvalidGameStateError,
)
from app.core.game_engine import game_engine
from app.core.state_manager import state_manager
from app.core.substitution_manager import SubstitutionManager
from app.core.validators import ValidationError as GameValidationError
from app.database.operations import DatabaseOperations
from app.middleware.rate_limit import rate_limiter
from app.models.game_models import ManualOutcomeSubmission
from app.services.lineup_service import lineup_service
from app.utils.auth import verify_token
from app.websocket.connection_manager import ConnectionManager
logger = logging.getLogger(f"{__name__}.handlers")
def register_handlers(sio: AsyncServer, manager: ConnectionManager) -> None:
"""Register all WebSocket event handlers"""
@sio.event
async def connect(sid, environ, auth):
"""
Handle new connection with cookie or auth object support.
Tries cookie-based auth first (from HttpOnly cookies),
falls back to auth object (for direct JS clients).
"""
try:
token = None
# Try cookie first (from HTTP headers in environ)
cookie_header = environ.get("HTTP_COOKIE", "")
if "pd_access_token=" in cookie_header:
from http.cookies import SimpleCookie
cookie = SimpleCookie()
cookie.load(cookie_header)
if "pd_access_token" in cookie:
token = cookie["pd_access_token"].value
logger.debug(f"Connection {sid} using cookie auth")
# Fall back to auth object (for direct JS clients)
if not token and auth:
token = auth.get("token")
if token:
logger.debug(f"Connection {sid} using auth object")
if not token:
logger.warning(f"Connection {sid} rejected: no token")
return False
user_data = verify_token(token)
user_id = user_data.get("user_id")
if not user_id:
logger.warning(f"Connection {sid} rejected: invalid token")
return False
# Extract IP address for logging
ip_address = environ.get("REMOTE_ADDR")
await manager.connect(sid, user_id, ip_address=ip_address)
await sio.emit("connected", {"user_id": user_id}, room=sid)
logger.info(f"Connection {sid} accepted for user {user_id}")
return True
except (ValueError, KeyError) as e:
# Token parsing or missing data error
logger.warning(f"Connection {sid} auth error: {e}")
return False
except (ConnectionError, OSError) as e:
# Network/socket error during connection
logger.error(f"Connection {sid} network error: {e}")
return False
except Exception as e:
# Unexpected error - log and reject connection
logger.error(f"Connection {sid} unexpected error: {e}", exc_info=True)
return False
@sio.event
async def disconnect(sid):
"""Handle disconnection"""
# Clean up rate limiter buckets for this connection
rate_limiter.remove_connection(sid)
await manager.disconnect(sid)
@sio.event
async def join_game(sid, data):
"""Handle join game request"""
# Rate limit check - connection level
if not await rate_limiter.check_websocket_limit(sid):
await manager.emit_to_user(
sid, "error", {"message": "Rate limited. Please slow down.", "code": "RATE_LIMITED"}
)
return
try:
game_id = data.get("game_id")
role = data.get("role", "player")
if not game_id:
await manager.emit_to_user(sid, "error", {"message": "Missing game_id"})
return
# TODO: Verify user has access to game
await manager.join_game(sid, game_id, role)
await manager.emit_to_user(
sid, "game_joined", {"game_id": game_id, "role": role}
)
except (ValueError, TypeError) as e:
# Invalid data format
logger.warning(f"Join game validation error for {sid}: {e}")
await manager.emit_to_user(sid, "error", {"message": "Invalid game data"})
except (ConnectionError, OSError) as e:
# Network error during room join
logger.error(f"Join game network error for {sid}: {e}")
await manager.emit_to_user(sid, "error", {"message": "Connection error"})
@sio.event
async def leave_game(sid, data):
"""Handle leave game request"""
try:
game_id = data.get("game_id")
if game_id:
await manager.leave_game(sid, game_id)
except (ValueError, TypeError) as e:
# Invalid data - log but don't error to client (leave is cleanup)
logger.warning(f"Leave game data error for {sid}: {e}")
except (ConnectionError, OSError) as e:
# Network error - log but don't propagate (connection may already be gone)
logger.debug(f"Leave game network error for {sid}: {e}")
@sio.event
async def heartbeat(sid):
"""
Handle client-initiated heartbeat ping.
Updates session activity timestamp to prevent expiration.
Socket.io has its own ping/pong mechanism, but clients can
send explicit heartbeats for application-level keepalive.
"""
await manager.update_activity(sid)
await sio.emit("heartbeat_ack", {}, room=sid)
@sio.event
async def request_game_state(sid, data):
"""
Client requests full game state (recovery after disconnect or initial load).
Recovers game from database if not in memory.
"""
# Update activity timestamp
await manager.update_activity(sid)
# Rate limit check - connection level
if not await rate_limiter.check_websocket_limit(sid):
await manager.emit_to_user(
sid, "error", {"message": "Rate limited. Please slow down.", "code": "RATE_LIMITED"}
)
return
try:
game_id_str = data.get("game_id")
if not game_id_str:
await manager.emit_to_user(sid, "error", {"message": "Missing game_id"})
return
try:
game_id = UUID(game_id_str)
except (ValueError, AttributeError):
await manager.emit_to_user(
sid, "error", {"message": "Invalid game_id format"}
)
return
# Try to get from memory first
state = state_manager.get_state(game_id)
# If not in memory, recover from database
if not state:
logger.info(f"Game {game_id} not in memory, recovering from database")
state = await state_manager.recover_game(game_id)
if state:
# Use mode='json' to serialize UUIDs as strings
await manager.emit_to_user(
sid, "game_state", state.model_dump(mode="json")
)
logger.info(f"Sent game state for {game_id} to {sid}")
else:
await manager.emit_to_user(
sid, "error", {"message": f"Game {game_id} not found"}
)
logger.warning(f"Game {game_id} not found in memory or database")
except GameNotFoundError as e:
logger.warning(f"Game not found: {e.game_id}")
await manager.emit_to_user(sid, "error", {"message": str(e)})
except SQLAlchemyError as e:
logger.error(f"Database error fetching game state: {e}")
await manager.emit_to_user(sid, "error", {"message": "Database error - please retry"})
except (ValueError, TypeError) as e:
logger.warning(f"Invalid data in request_game_state: {e}")
await manager.emit_to_user(sid, "error", {"message": "Invalid request data"})
@sio.event
async def roll_dice(sid, data):
"""
Roll dice for manual outcome selection.
Server rolls dice and broadcasts to all players in game room.
Players then read their physical cards and submit outcomes.
Event data:
game_id: UUID of the game
Emits:
dice_rolled: Broadcast to game room with dice results
error: To requester if validation fails
"""
# Update activity timestamp
await manager.update_activity(sid)
# Rate limit check - connection level
if not await rate_limiter.check_websocket_limit(sid):
await manager.emit_to_user(
sid, "error", {"message": "Rate limited. Please slow down.", "code": "RATE_LIMITED"}
)
return
try:
# Extract and validate game_id
game_id_str = data.get("game_id")
if not game_id_str:
await manager.emit_to_user(sid, "error", {"message": "Missing game_id"})
return
try:
game_id = UUID(game_id_str)
except (ValueError, AttributeError):
await manager.emit_to_user(
sid, "error", {"message": "Invalid game_id format"}
)
return
# Rate limit check - game level for rolls
if not await rate_limiter.check_game_limit(str(game_id), "roll"):
await manager.emit_to_user(
sid, "error", {"message": "Too many roll requests. Please wait.", "code": "GAME_RATE_LIMITED"}
)
return
# Acquire lock before modifying state
async with state_manager.game_lock(game_id):
# Get game state
state = state_manager.get_state(game_id)
if not state:
await manager.emit_to_user(
sid, "error", {"message": f"Game {game_id} not found"}
)
return
# TODO: Verify user is participant in this game
# user_id = manager.user_sessions.get(sid)
# if not is_game_participant(game_id, user_id):
# await manager.emit_to_user(sid, "error", {"message": "Not authorized"})
# return
# Roll dice
ab_roll = dice_system.roll_ab(league_id=state.league_id, game_id=game_id)
logger.info(
f"Dice rolled for game {game_id}: "
f"d6={ab_roll.d6_one}, 2d6={ab_roll.d6_two_total}, "
f"chaos={ab_roll.chaos_d20}, resolution={ab_roll.resolution_d20}"
)
# Store roll in game state for manual outcome validation
state.pending_manual_roll = ab_roll
state_manager.update_state(game_id, state)
# Broadcast dice results to all players in game (outside lock)
await manager.broadcast_to_game(
str(game_id),
"dice_rolled",
{
"game_id": str(game_id),
"roll_id": ab_roll.roll_id,
"d6_one": ab_roll.d6_one,
"d6_two_a": ab_roll.d6_two_a,
"d6_two_b": ab_roll.d6_two_b,
"d6_two_total": ab_roll.d6_two_total,
"chaos_d20": ab_roll.chaos_d20,
"resolution_d20": ab_roll.resolution_d20,
"check_wild_pitch": ab_roll.check_wild_pitch,
"check_passed_ball": ab_roll.check_passed_ball,
"timestamp": ab_roll.timestamp.to_iso8601_string(),
"message": "Dice rolled - read your card and submit outcome",
},
)
except asyncio.TimeoutError:
logger.error(f"Lock timeout while rolling dice for game {game_id}")
await manager.emit_to_user(
sid, "error", {"message": "Server busy - please try again"}
)
except GameValidationError as e:
logger.warning(f"Validation error in roll_dice: {e}")
await manager.emit_to_user(sid, "error", {"message": str(e)})
except SQLAlchemyError as e:
logger.error(f"Database error during roll_dice: {e}")
await manager.emit_to_user(
sid, "error", {"message": "Database error - please retry"}
)
except (ValueError, TypeError) as e:
logger.warning(f"Invalid data in roll_dice: {e}")
await manager.emit_to_user(
sid, "error", {"message": "Invalid request data"}
)
except Exception as e:
# Unexpected error - ensure we log and report
logger.error(f"Unexpected error in roll_dice: {e}", exc_info=True)
await manager.emit_to_user(
sid, "error", {"message": "An unexpected error occurred"}
)
@sio.event
async def submit_manual_outcome(sid, data):
"""
Submit manually-selected play outcome.
After dice are rolled, players read their physical cards and
submit the outcome they see. System validates and processes.
Event data:
game_id: UUID of the game
outcome: PlayOutcome enum value (e.g., "groundball_c")
hit_location: Optional position string (e.g., "SS")
Emits:
outcome_accepted: To requester if valid
play_resolved: Broadcast to game room with play result
outcome_rejected: To requester if validation fails
error: To requester if processing fails
"""
# Update activity timestamp
await manager.update_activity(sid)
# Rate limit check - connection level
if not await rate_limiter.check_websocket_limit(sid):
await manager.emit_to_user(
sid, "error", {"message": "Rate limited. Please slow down.", "code": "RATE_LIMITED"}
)
return
try:
# Extract and validate game_id
game_id_str = data.get("game_id")
if not game_id_str:
await manager.emit_to_user(
sid,
"outcome_rejected",
{"message": "Missing game_id", "field": "game_id"},
)
return
try:
game_id = UUID(game_id_str)
except (ValueError, AttributeError):
await manager.emit_to_user(
sid,
"outcome_rejected",
{"message": "Invalid game_id format", "field": "game_id"},
)
return
# Get game state
state = state_manager.get_state(game_id)
if not state:
await manager.emit_to_user(
sid, "error", {"message": f"Game {game_id} not found"}
)
return
# Rate limit check - game level for decisions
if not await rate_limiter.check_game_limit(str(game_id), "decision"):
await manager.emit_to_user(
sid, "error", {"message": "Too many outcome submissions. Please wait.", "code": "GAME_RATE_LIMITED"}
)
return
# TODO: Verify user is active batter or authorized to submit
# user_id = manager.user_sessions.get(sid)
# Extract outcome data
outcome_str = data.get("outcome")
hit_location = data.get("hit_location")
if not outcome_str:
await manager.emit_to_user(
sid,
"outcome_rejected",
{"message": "Missing outcome", "field": "outcome"},
)
return
# Validate using ManualOutcomeSubmission model
try:
submission = ManualOutcomeSubmission(
outcome=outcome_str, hit_location=hit_location
)
except ValidationError as e:
# Extract first error for user-friendly message
first_error = e.errors()[0]
field = first_error["loc"][0] if first_error["loc"] else "unknown"
message = first_error["msg"]
await manager.emit_to_user(
sid,
"outcome_rejected",
{"message": message, "field": field, "errors": e.errors()},
)
logger.warning(
f"Manual outcome validation failed for game {game_id}: {message}"
)
return
# Convert to PlayOutcome enum
outcome = PlayOutcome(submission.outcome)
# NOTE: Business rule validation (e.g., when hit_location is required based on
# game state) is handled in PlayResolver, not here. This layer only validates
# basic input format and type checking.
# Acquire lock for state modifications
async with state_manager.game_lock(game_id):
# Re-fetch state inside lock to ensure consistency
state = state_manager.get_state(game_id)
if not state:
await manager.emit_to_user(
sid, "error", {"message": f"Game {game_id} not found"}
)
return
# Check for pending roll BEFORE accepting outcome
if not state.pending_manual_roll:
await manager.emit_to_user(
sid,
"outcome_rejected",
{
"message": "No pending dice roll - call roll_dice first",
"field": "game_state",
},
)
return
ab_roll = state.pending_manual_roll
logger.info(
f"Manual outcome submitted for game {game_id}: "
f"{outcome.value}"
+ (f" to {submission.hit_location}" if submission.hit_location else "")
)
logger.info(
f"Processing manual outcome with roll {ab_roll.roll_id}: "
f"d6={ab_roll.d6_one}, 2d6={ab_roll.d6_two_total}, "
f"chaos={ab_roll.chaos_d20}"
)
# Process manual outcome through game engine
try:
result = await game_engine.resolve_manual_play(
game_id=game_id,
ab_roll=ab_roll,
outcome=outcome,
hit_location=submission.hit_location,
)
# Clear pending roll only AFTER successful validation (one-time use)
state.pending_manual_roll = None
state_manager.update_state(game_id, state)
except GameValidationError as e:
# Game engine validation error (e.g., missing hit location)
await manager.emit_to_user(
sid, "outcome_rejected", {"message": str(e), "field": "validation"}
)
logger.warning(f"Manual play validation failed: {e}")
return
except ValueError as e:
# Business logic validation error from PlayResolver
await manager.emit_to_user(
sid, "outcome_rejected", {"message": str(e), "field": "validation"}
)
logger.warning(f"Manual play business logic validation failed: {e}")
return
except DatabaseError as e:
# Database error during play resolution
logger.error(f"Database error resolving manual play: {e}")
await manager.emit_to_user(
sid, "error", {"message": "Database error during play resolution - please retry"}
)
return
except SQLAlchemyError as e:
# SQLAlchemy error during play resolution
logger.error(f"SQLAlchemy error resolving manual play: {e}")
await manager.emit_to_user(
sid, "error", {"message": "Database error - please retry"}
)
return
# Broadcasting happens outside lock to avoid holding it too long
# Confirm acceptance to submitter AFTER successful validation
await manager.emit_to_user(
sid,
"outcome_accepted",
{
"game_id": str(game_id),
"outcome": outcome.value,
"hit_location": submission.hit_location,
},
)
# Build play result data
play_result_data = {
"game_id": str(game_id),
"play_number": state.play_count,
"outcome": result.outcome.value, # Use resolved outcome, not submitted
"hit_location": submission.hit_location,
"description": result.description,
"outs_recorded": result.outs_recorded,
"runs_scored": result.runs_scored,
"batter_result": result.batter_result,
"runners_advanced": [
{"from": adv[0], "to": adv[1]} for adv in result.runners_advanced
],
"is_hit": result.is_hit,
"is_out": result.is_out,
"is_walk": result.is_walk,
"roll_id": ab_roll.roll_id,
}
# Include X-Check details if present (Phase 3E-Final)
if result.x_check_details:
xcheck = result.x_check_details
play_result_data["x_check_details"] = {
"position": xcheck.position,
"d20_roll": xcheck.d20_roll,
"d6_roll": xcheck.d6_roll,
"defender_range": xcheck.defender_range,
"defender_error_rating": xcheck.defender_error_rating,
"defender_id": xcheck.defender_id,
"base_result": xcheck.base_result,
"converted_result": xcheck.converted_result,
"error_result": xcheck.error_result,
"final_outcome": xcheck.final_outcome.value,
"hit_type": xcheck.hit_type,
# Optional SPD test details
"spd_test_roll": xcheck.spd_test_roll,
"spd_test_target": xcheck.spd_test_target,
"spd_test_passed": xcheck.spd_test_passed,
}
# Broadcast play result to game room
await manager.broadcast_to_game(
str(game_id), "play_resolved", play_result_data
)
logger.info(
f"Manual play resolved for game {game_id}: {result.description}"
)
# Broadcast updated game state so frontend sees new batter, outs, etc.
updated_state = state_manager.get_state(game_id)
if updated_state:
await manager.broadcast_to_game(
str(game_id),
"game_state_update",
updated_state.model_dump(mode="json"),
)
logger.debug(f"Broadcast updated game state after play resolution")
except asyncio.TimeoutError:
logger.error(f"Lock timeout while submitting manual outcome for game {game_id}")
await manager.emit_to_user(
sid, "error", {"message": "Server busy - please try again"}
)
except DatabaseError as e:
logger.error(f"Database error in submit_manual_outcome: {e}")
await manager.emit_to_user(
sid, "error", {"message": "Database error - please retry"}
)
except SQLAlchemyError as e:
logger.error(f"SQLAlchemy error in submit_manual_outcome: {e}")
await manager.emit_to_user(
sid, "error", {"message": "Database error - please retry"}
)
except (ValueError, TypeError) as e:
logger.warning(f"Invalid data in submit_manual_outcome: {e}")
await manager.emit_to_user(
sid, "error", {"message": "Invalid outcome data"}
)
# ===== SUBSTITUTION EVENTS =====
@sio.event
async def request_pinch_hitter(sid, data):
"""
Request pinch hitter substitution.
Replaces current batter with a player from the bench. The substitute
takes the batting order position of the replaced player.
Event data:
game_id: UUID of the game
player_out_lineup_id: int - lineup ID of player being removed
player_in_card_id: int - card/player ID of substitute
team_id: int - team making substitution
Emits:
player_substituted: Broadcast to game room on success
substitution_confirmed: To requester with new lineup_id
substitution_error: To requester if validation fails
error: To requester if processing fails
"""
# Update activity timestamp
await manager.update_activity(sid)
# Rate limit check - connection level
if not await rate_limiter.check_websocket_limit(sid):
await manager.emit_to_user(
sid, "error", {"message": "Rate limited. Please slow down.", "code": "RATE_LIMITED"}
)
return
try:
# Extract and validate game_id
game_id_str = data.get("game_id")
if not game_id_str:
await manager.emit_to_user(
sid,
"substitution_error",
{"message": "Missing game_id", "code": "MISSING_FIELD"},
)
return
try:
game_id = UUID(game_id_str)
except (ValueError, AttributeError):
await manager.emit_to_user(
sid,
"substitution_error",
{"message": "Invalid game_id format", "code": "INVALID_FORMAT"},
)
return
# Rate limit check - game level for substitutions
if not await rate_limiter.check_game_limit(str(game_id), "substitution"):
await manager.emit_to_user(
sid, "error", {"message": "Too many substitution requests. Please wait.", "code": "GAME_RATE_LIMITED"}
)
return
# Get game state
state = state_manager.get_state(game_id)
if not state:
await manager.emit_to_user(
sid, "error", {"message": f"Game {game_id} not found"}
)
return
# Extract substitution data
player_out_lineup_id = data.get("player_out_lineup_id")
player_in_card_id = data.get("player_in_card_id")
team_id = data.get("team_id")
if player_out_lineup_id is None:
await manager.emit_to_user(
sid,
"substitution_error",
{
"message": "Missing player_out_lineup_id",
"code": "MISSING_FIELD",
},
)
return
if player_in_card_id is None:
await manager.emit_to_user(
sid,
"substitution_error",
{"message": "Missing player_in_card_id", "code": "MISSING_FIELD"},
)
return
if team_id is None:
await manager.emit_to_user(
sid,
"substitution_error",
{"message": "Missing team_id", "code": "MISSING_FIELD"},
)
return
# TODO: Verify user is authorized to make substitution for this team
# user_id = manager.user_sessions.get(sid)
logger.info(
f"Pinch hitter request for game {game_id}: "
f"Replacing {player_out_lineup_id} with card {player_in_card_id}"
)
# Acquire lock before substitution to prevent concurrent lineup modifications
async with state_manager.game_lock(game_id):
# Create SubstitutionManager instance
db_ops = DatabaseOperations()
sub_manager = SubstitutionManager(db_ops)
# Execute pinch hitter substitution
result = await sub_manager.pinch_hit(
game_id=game_id,
player_out_lineup_id=player_out_lineup_id,
player_in_card_id=player_in_card_id,
team_id=team_id,
)
# Broadcasting happens outside lock
if result.success:
# Broadcast to all clients in game
await manager.broadcast_to_game(
str(game_id),
"player_substituted",
{
"type": "pinch_hitter",
"player_out_lineup_id": result.player_out_lineup_id,
"player_in_card_id": result.player_in_card_id,
"new_lineup_id": result.new_lineup_id,
"position": result.new_position,
"batting_order": result.new_batting_order,
"team_id": team_id,
"message": f"Pinch hitter: #{result.new_batting_order} now batting",
},
)
# Send confirmation to requester
await manager.emit_to_user(
sid,
"substitution_confirmed",
{
"type": "pinch_hitter",
"new_lineup_id": result.new_lineup_id,
"success": True,
},
)
logger.info(
f"Pinch hitter successful for game {game_id}: "
f"New lineup ID {result.new_lineup_id}"
)
else:
# Send error to requester with error code
await manager.emit_to_user(
sid,
"substitution_error",
{
"message": result.error_message,
"code": result.error_code,
"type": "pinch_hitter",
},
)
logger.warning(
f"Pinch hitter failed for game {game_id}: {result.error_message}"
)
except asyncio.TimeoutError:
logger.error(f"Lock timeout while processing pinch hitter for game {game_id}")
await manager.emit_to_user(
sid, "error", {"message": "Server busy - please try again"}
)
except SQLAlchemyError as e:
logger.error(f"Database error in pinch hitter: {e}")
await manager.emit_to_user(
sid, "error", {"message": "Database error - please retry"}
)
except (ValueError, TypeError) as e:
logger.warning(f"Invalid data in pinch hitter request: {e}")
await manager.emit_to_user(
sid, "error", {"message": "Invalid substitution data"}
)
@sio.event
async def request_defensive_replacement(sid, data):
"""
Request defensive replacement substitution.
Replaces a defensive player with a better fielder. Player can be
swapped at any position. If player is in batting order, substitute
takes their batting order spot.
Event data:
game_id: UUID of the game
player_out_lineup_id: int - lineup ID of player being removed
player_in_card_id: int - card/player ID of substitute
new_position: str - defensive position for substitute (e.g., "SS")
team_id: int - team making substitution
Emits:
player_substituted: Broadcast to game room on success
substitution_confirmed: To requester with new lineup_id
substitution_error: To requester if validation fails
error: To requester if processing fails
"""
# Update activity timestamp
await manager.update_activity(sid)
# Rate limit check - connection level
if not await rate_limiter.check_websocket_limit(sid):
await manager.emit_to_user(
sid, "error", {"message": "Rate limited. Please slow down.", "code": "RATE_LIMITED"}
)
return
try:
# Extract and validate game_id
game_id_str = data.get("game_id")
if not game_id_str:
await manager.emit_to_user(
sid,
"substitution_error",
{"message": "Missing game_id", "code": "MISSING_FIELD"},
)
return
try:
game_id = UUID(game_id_str)
except (ValueError, AttributeError):
await manager.emit_to_user(
sid,
"substitution_error",
{"message": "Invalid game_id format", "code": "INVALID_FORMAT"},
)
return
# Rate limit check - game level for substitutions
if not await rate_limiter.check_game_limit(str(game_id), "substitution"):
await manager.emit_to_user(
sid, "error", {"message": "Too many substitution requests. Please wait.", "code": "GAME_RATE_LIMITED"}
)
return
# Get game state
state = state_manager.get_state(game_id)
if not state:
await manager.emit_to_user(
sid, "error", {"message": f"Game {game_id} not found"}
)
return
# Extract substitution data
player_out_lineup_id = data.get("player_out_lineup_id")
player_in_card_id = data.get("player_in_card_id")
new_position = data.get("new_position")
team_id = data.get("team_id")
if player_out_lineup_id is None:
await manager.emit_to_user(
sid,
"substitution_error",
{
"message": "Missing player_out_lineup_id",
"code": "MISSING_FIELD",
},
)
return
if player_in_card_id is None:
await manager.emit_to_user(
sid,
"substitution_error",
{"message": "Missing player_in_card_id", "code": "MISSING_FIELD"},
)
return
if not new_position:
await manager.emit_to_user(
sid,
"substitution_error",
{"message": "Missing new_position", "code": "MISSING_FIELD"},
)
return
if team_id is None:
await manager.emit_to_user(
sid,
"substitution_error",
{"message": "Missing team_id", "code": "MISSING_FIELD"},
)
return
# TODO: Verify user is authorized to make substitution for this team
# user_id = manager.user_sessions.get(sid)
logger.info(
f"Defensive replacement request for game {game_id}: "
f"Replacing {player_out_lineup_id} with card {player_in_card_id} at {new_position}"
)
# Acquire lock before substitution to prevent concurrent lineup modifications
async with state_manager.game_lock(game_id):
# Create SubstitutionManager instance
db_ops = DatabaseOperations()
sub_manager = SubstitutionManager(db_ops)
# Execute defensive replacement
result = await sub_manager.defensive_replace(
game_id=game_id,
player_out_lineup_id=player_out_lineup_id,
player_in_card_id=player_in_card_id,
new_position=new_position,
team_id=team_id,
)
# Broadcasting happens outside lock
if result.success:
# Broadcast to all clients in game
await manager.broadcast_to_game(
str(game_id),
"player_substituted",
{
"type": "defensive_replacement",
"player_out_lineup_id": result.player_out_lineup_id,
"player_in_card_id": result.player_in_card_id,
"new_lineup_id": result.new_lineup_id,
"position": result.new_position,
"batting_order": result.new_batting_order,
"team_id": team_id,
"message": f"Defensive replacement: {result.new_position}",
},
)
# Send confirmation to requester
await manager.emit_to_user(
sid,
"substitution_confirmed",
{
"type": "defensive_replacement",
"new_lineup_id": result.new_lineup_id,
"success": True,
},
)
logger.info(
f"Defensive replacement successful for game {game_id}: "
f"New lineup ID {result.new_lineup_id}"
)
else:
# Send error to requester with error code
await manager.emit_to_user(
sid,
"substitution_error",
{
"message": result.error_message,
"code": result.error_code,
"type": "defensive_replacement",
},
)
logger.warning(
f"Defensive replacement failed for game {game_id}: {result.error_message}"
)
except asyncio.TimeoutError:
logger.error(f"Lock timeout while processing defensive replacement for game {game_id}")
await manager.emit_to_user(
sid, "error", {"message": "Server busy - please try again"}
)
except SQLAlchemyError as e:
logger.error(f"Database error in defensive replacement: {e}")
await manager.emit_to_user(
sid, "error", {"message": "Database error - please retry"}
)
except (ValueError, TypeError) as e:
logger.warning(f"Invalid data in defensive replacement request: {e}")
await manager.emit_to_user(
sid, "error", {"message": "Invalid substitution data"}
)
@sio.event
async def request_pitching_change(sid, data):
"""
Request pitching change substitution.
Replaces current pitcher with a reliever. Pitcher must have faced
at least 1 batter unless injury. New pitcher takes mound immediately.
Event data:
game_id: UUID of the game
player_out_lineup_id: int - lineup ID of pitcher being removed
player_in_card_id: int - card/player ID of relief pitcher
team_id: int - team making substitution
Emits:
player_substituted: Broadcast to game room on success
substitution_confirmed: To requester with new lineup_id
substitution_error: To requester if validation fails
error: To requester if processing fails
"""
# Update activity timestamp
await manager.update_activity(sid)
# Rate limit check - connection level
if not await rate_limiter.check_websocket_limit(sid):
await manager.emit_to_user(
sid, "error", {"message": "Rate limited. Please slow down.", "code": "RATE_LIMITED"}
)
return
try:
# Extract and validate game_id
game_id_str = data.get("game_id")
if not game_id_str:
await manager.emit_to_user(
sid,
"substitution_error",
{"message": "Missing game_id", "code": "MISSING_FIELD"},
)
return
try:
game_id = UUID(game_id_str)
except (ValueError, AttributeError):
await manager.emit_to_user(
sid,
"substitution_error",
{"message": "Invalid game_id format", "code": "INVALID_FORMAT"},
)
return
# Rate limit check - game level for substitutions
if not await rate_limiter.check_game_limit(str(game_id), "substitution"):
await manager.emit_to_user(
sid, "error", {"message": "Too many substitution requests. Please wait.", "code": "GAME_RATE_LIMITED"}
)
return
# Get game state
state = state_manager.get_state(game_id)
if not state:
await manager.emit_to_user(
sid, "error", {"message": f"Game {game_id} not found"}
)
return
# Extract substitution data
player_out_lineup_id = data.get("player_out_lineup_id")
player_in_card_id = data.get("player_in_card_id")
team_id = data.get("team_id")
if player_out_lineup_id is None:
await manager.emit_to_user(
sid,
"substitution_error",
{
"message": "Missing player_out_lineup_id",
"code": "MISSING_FIELD",
},
)
return
if player_in_card_id is None:
await manager.emit_to_user(
sid,
"substitution_error",
{"message": "Missing player_in_card_id", "code": "MISSING_FIELD"},
)
return
if team_id is None:
await manager.emit_to_user(
sid,
"substitution_error",
{"message": "Missing team_id", "code": "MISSING_FIELD"},
)
return
# TODO: Verify user is authorized to make substitution for this team
# user_id = manager.user_sessions.get(sid)
logger.info(
f"Pitching change request for game {game_id}: "
f"Replacing {player_out_lineup_id} with card {player_in_card_id}"
)
# Acquire lock before substitution to prevent concurrent lineup modifications
async with state_manager.game_lock(game_id):
# Create SubstitutionManager instance
db_ops = DatabaseOperations()
sub_manager = SubstitutionManager(db_ops)
# Execute pitching change
result = await sub_manager.change_pitcher(
game_id=game_id,
player_out_lineup_id=player_out_lineup_id,
player_in_card_id=player_in_card_id,
team_id=team_id,
)
# Broadcasting happens outside lock
if result.success:
# Broadcast to all clients in game
await manager.broadcast_to_game(
str(game_id),
"player_substituted",
{
"type": "pitching_change",
"player_out_lineup_id": result.player_out_lineup_id,
"player_in_card_id": result.player_in_card_id,
"new_lineup_id": result.new_lineup_id,
"position": result.new_position, # Should be "P"
"batting_order": result.new_batting_order,
"team_id": team_id,
"message": "Pitching change: New pitcher entering",
},
)
# Send confirmation to requester
await manager.emit_to_user(
sid,
"substitution_confirmed",
{
"type": "pitching_change",
"new_lineup_id": result.new_lineup_id,
"success": True,
},
)
logger.info(
f"Pitching change successful for game {game_id}: "
f"New lineup ID {result.new_lineup_id}"
)
else:
# Send error to requester with error code
await manager.emit_to_user(
sid,
"substitution_error",
{
"message": result.error_message,
"code": result.error_code,
"type": "pitching_change",
},
)
logger.warning(
f"Pitching change failed for game {game_id}: {result.error_message}"
)
except asyncio.TimeoutError:
logger.error(f"Lock timeout while processing pitching change for game {game_id}")
await manager.emit_to_user(
sid, "error", {"message": "Server busy - please try again"}
)
except SQLAlchemyError as e:
logger.error(f"Database error in pitching change: {e}")
await manager.emit_to_user(
sid, "error", {"message": "Database error - please retry"}
)
except (ValueError, TypeError) as e:
logger.warning(f"Invalid data in pitching change request: {e}")
await manager.emit_to_user(
sid, "error", {"message": "Invalid substitution data"}
)
@sio.event
async def get_lineup(sid, data):
"""
Get current active lineup for a team.
Returns all active players in the lineup with their positions
and batting orders. Used by UI to refresh lineup display.
Event data:
game_id: UUID of the game
team_id: int - team to get lineup for
Emits:
lineup_data: To requester with active lineup
error: To requester if validation fails
"""
# Update activity timestamp
await manager.update_activity(sid)
# Rate limit check - connection level
if not await rate_limiter.check_websocket_limit(sid):
await manager.emit_to_user(
sid, "error", {"message": "Rate limited. Please slow down.", "code": "RATE_LIMITED"}
)
return
try:
# Extract and validate game_id
game_id_str = data.get("game_id")
if not game_id_str:
await manager.emit_to_user(sid, "error", {"message": "Missing game_id"})
return
try:
game_id = UUID(game_id_str)
except (ValueError, AttributeError):
await manager.emit_to_user(
sid, "error", {"message": "Invalid game_id format"}
)
return
# Extract team_id
team_id = data.get("team_id")
if team_id is None:
await manager.emit_to_user(sid, "error", {"message": "Missing team_id"})
return
# TODO: Verify user has access to view this lineup
# user_id = manager.user_sessions.get(sid)
# Get lineup from state manager cache (fast O(1) lookup)
lineup = state_manager.get_lineup(game_id, team_id)
if lineup:
# Send lineup data with player info
await manager.emit_to_user(
sid,
"lineup_data",
{
"game_id": str(game_id),
"team_id": team_id,
"players": [
{
"lineup_id": p.lineup_id,
"card_id": p.card_id,
"position": p.position,
"batting_order": p.batting_order,
"is_active": p.is_active,
"is_starter": p.is_starter,
"player": {
"id": p.card_id,
"name": p.player_name or f"Player #{p.card_id}",
"image": p.player_image or "",
"headshot": p.player_headshot or "",
},
}
for p in lineup.players
if p.is_active
],
},
)
logger.info(f"Lineup data sent for game {game_id}, team {team_id}")
else:
# Lineup not in cache - try to load from database with player data
# Get league_id from game state or database
state = state_manager.get_state(game_id)
league_id = state.league_id if state else "sba"
lineup_state = await lineup_service.load_team_lineup_with_player_data(
game_id=game_id, team_id=team_id, league_id=league_id
)
if lineup_state:
# Cache the lineup for future requests
state_manager.set_lineup(game_id, team_id, lineup_state)
# Send lineup data with player info
await manager.emit_to_user(
sid,
"lineup_data",
{
"game_id": str(game_id),
"team_id": team_id,
"players": [
{
"lineup_id": p.lineup_id,
"card_id": p.card_id,
"position": p.position,
"batting_order": p.batting_order,
"is_active": p.is_active,
"is_starter": p.is_starter,
"player": {
"id": p.card_id,
"name": p.player_name or f"Player #{p.card_id}",
"image": p.player_image or "",
},
}
for p in lineup_state.players
if p.is_active
],
},
)
logger.info(
f"Lineup data loaded from DB with player data for game {game_id}, team {team_id}"
)
else:
await manager.emit_to_user(
sid,
"error",
{"message": f"Lineup not found for team {team_id}"},
)
except SQLAlchemyError as e:
logger.error(f"Database error in get_lineup: {e}")
await manager.emit_to_user(
sid, "error", {"message": "Database error - please retry"}
)
except (ValueError, TypeError) as e:
logger.warning(f"Invalid data in get_lineup request: {e}")
await manager.emit_to_user(
sid, "error", {"message": "Invalid lineup request"}
)
@sio.event
async def submit_defensive_decision(sid, data):
"""
Submit defensive team decision.
Event data:
game_id: UUID of the game
alignment: Defensive alignment (normal, shifted_left, shifted_right, extreme_shift)
infield_depth: Infield positioning (in, normal, back, double_play)
outfield_depth: Outfield positioning (in, normal, back)
hold_runners: List of bases to hold runners (e.g., [1, 3])
Emits:
defensive_decision_submitted: To requester and broadcast to game room
error: To requester if validation fails
"""
# Update activity timestamp
await manager.update_activity(sid)
# Rate limit check - connection level
if not await rate_limiter.check_websocket_limit(sid):
await manager.emit_to_user(
sid, "error", {"message": "Rate limited. Please slow down.", "code": "RATE_LIMITED"}
)
return
try:
# Extract and validate game_id
game_id_str = data.get("game_id")
if not game_id_str:
await manager.emit_to_user(sid, "error", {"message": "Missing game_id"})
return
try:
game_id = UUID(game_id_str)
except (ValueError, AttributeError):
await manager.emit_to_user(
sid, "error", {"message": "Invalid game_id format"}
)
return
# Rate limit check - game level for decisions
if not await rate_limiter.check_game_limit(str(game_id), "decision"):
await manager.emit_to_user(
sid, "error", {"message": "Too many decision requests. Please wait.", "code": "GAME_RATE_LIMITED"}
)
return
# Get game state
state = state_manager.get_state(game_id)
if not state:
await manager.emit_to_user(
sid, "error", {"message": f"Game {game_id} not found"}
)
return
# TODO: Verify user is authorized (fielding team manager)
# user_id = manager.user_sessions.get(sid)
# Extract decision data
alignment = data.get("alignment", "normal")
infield_depth = data.get("infield_depth", "normal")
outfield_depth = data.get("outfield_depth", "normal")
hold_runners = data.get("hold_runners", [])
# Create defensive decision
from app.models.game_models import DefensiveDecision
decision = DefensiveDecision(
alignment=alignment,
infield_depth=infield_depth,
outfield_depth=outfield_depth,
hold_runners=hold_runners,
)
# Submit decision through game engine
updated_state = await game_engine.submit_defensive_decision(
game_id, decision
)
logger.info(
f"Defensive decision submitted for game {game_id}: "
f"alignment={alignment}, infield={infield_depth}, outfield={outfield_depth}"
)
# Broadcast to game room
await manager.broadcast_to_game(
str(game_id),
"defensive_decision_submitted",
{
"game_id": str(game_id),
"decision": {
"alignment": alignment,
"infield_depth": infield_depth,
"outfield_depth": outfield_depth,
"hold_runners": hold_runners,
},
"pending_decision": updated_state.pending_decision,
},
)
except ValidationError as e:
logger.warning(f"Validation error in defensive decision: {e}")
await manager.emit_to_user(
sid, "error", {"message": "Invalid decision data"}
)
except GameValidationError as e:
logger.warning(f"Game validation error in defensive decision: {e}")
await manager.emit_to_user(sid, "error", {"message": str(e)})
except SQLAlchemyError as e:
logger.error(f"Database error in defensive decision: {e}")
await manager.emit_to_user(
sid, "error", {"message": "Database error - please retry"}
)
except (ValueError, TypeError) as e:
logger.warning(f"Invalid data in defensive decision: {e}")
await manager.emit_to_user(
sid, "error", {"message": "Invalid decision format"}
)
@sio.event
async def submit_offensive_decision(sid, data):
"""
Submit offensive team decision.
Event data:
game_id: UUID of the game
action: String - offensive action (swing_away, steal, check_jump, hit_and_run, sac_bunt, squeeze_bunt)
steal_attempts: List of bases for steal attempts - REQUIRED when action="steal" (e.g., [2, 3])
Emits:
offensive_decision_submitted: To requester and broadcast to game room
error: To requester if validation fails
Session 2 Update (2025-01-14): Replaced approach with action field. Stealing is now an action choice.
"""
# Update activity timestamp
await manager.update_activity(sid)
# Rate limit check - connection level
if not await rate_limiter.check_websocket_limit(sid):
await manager.emit_to_user(
sid, "error", {"message": "Rate limited. Please slow down.", "code": "RATE_LIMITED"}
)
return
try:
# Extract and validate game_id
game_id_str = data.get("game_id")
if not game_id_str:
await manager.emit_to_user(sid, "error", {"message": "Missing game_id"})
return
try:
game_id = UUID(game_id_str)
except (ValueError, AttributeError):
await manager.emit_to_user(
sid, "error", {"message": "Invalid game_id format"}
)
return
# Rate limit check - game level for decisions
if not await rate_limiter.check_game_limit(str(game_id), "decision"):
await manager.emit_to_user(
sid, "error", {"message": "Too many decision requests. Please wait.", "code": "GAME_RATE_LIMITED"}
)
return
# Get game state
state = state_manager.get_state(game_id)
if not state:
await manager.emit_to_user(
sid, "error", {"message": f"Game {game_id} not found"}
)
return
# TODO: Verify user is authorized (batting team manager)
# user_id = manager.user_sessions.get(sid)
# Extract decision data
action = data.get("action", "swing_away") # Default: swing_away
steal_attempts = data.get("steal_attempts", [])
# Create offensive decision
from app.models.game_models import OffensiveDecision
decision = OffensiveDecision(action=action, steal_attempts=steal_attempts)
# Submit decision through game engine
updated_state = await game_engine.submit_offensive_decision(
game_id, decision
)
logger.info(
f"Offensive decision submitted for game {game_id}: "
f"action={action}, steal={steal_attempts}"
)
# Broadcast to game room
await manager.broadcast_to_game(
str(game_id),
"offensive_decision_submitted",
{
"game_id": str(game_id),
"decision": {"action": action, "steal_attempts": steal_attempts},
"pending_decision": updated_state.pending_decision,
},
)
except ValidationError as e:
logger.warning(f"Validation error in offensive decision: {e}")
await manager.emit_to_user(
sid, "error", {"message": "Invalid decision data"}
)
except GameValidationError as e:
logger.warning(f"Game validation error in offensive decision: {e}")
await manager.emit_to_user(sid, "error", {"message": str(e)})
except SQLAlchemyError as e:
logger.error(f"Database error in offensive decision: {e}")
await manager.emit_to_user(
sid, "error", {"message": "Database error - please retry"}
)
except (ValueError, TypeError) as e:
logger.warning(f"Invalid data in offensive decision: {e}")
await manager.emit_to_user(
sid, "error", {"message": "Invalid decision format"}
)
@sio.event
async def get_box_score(sid, data):
"""
Get box score using materialized views.
Event data:
game_id: UUID of the game
Emits:
box_score_data: To requester with box score
error: To requester if validation fails
"""
# Update activity timestamp
await manager.update_activity(sid)
# Rate limit check - connection level
if not await rate_limiter.check_websocket_limit(sid):
await manager.emit_to_user(
sid, "error", {"message": "Rate limited. Please slow down.", "code": "RATE_LIMITED"}
)
return
try:
# Extract and validate game_id
game_id_str = data.get("game_id")
if not game_id_str:
await manager.emit_to_user(sid, "error", {"message": "Missing game_id"})
return
try:
game_id = UUID(game_id_str)
except (ValueError, AttributeError):
await manager.emit_to_user(
sid, "error", {"message": "Invalid game_id format"}
)
return
# TODO: Verify user has access to view this game's box score
# user_id = manager.user_sessions.get(sid)
# Get box score from materialized views
from app.services import box_score_service
box_score = await box_score_service.get_box_score(game_id)
if box_score:
# Send box score data to requester
await manager.emit_to_user(
sid,
"box_score_data",
{"game_id": str(game_id), "box_score": box_score},
)
logger.info(f"Box score data sent for game {game_id}")
else:
await manager.emit_to_user(
sid,
"error",
{
"message": "No box score found for game",
"hint": "Run migration (alembic upgrade head) and refresh views",
},
)
except SQLAlchemyError as e:
logger.error(f"Database error in get_box_score: {e}")
await manager.emit_to_user(
sid, "error", {"message": "Database error fetching box score - please retry"}
)
except (ValueError, TypeError) as e:
logger.warning(f"Invalid data in get_box_score request: {e}")
await manager.emit_to_user(
sid, "error", {"message": "Invalid box score request"}
)
@sio.event
async def rollback_play(sid, data):
"""
Roll back the last N plays.
Deletes plays from the database and reconstructs game state by replaying
remaining plays. Also removes any substitutions that occurred during the
rolled-back plays.
Event data:
game_id: UUID of the game
num_plays: int - Number of plays to roll back (default: 1)
Emits:
play_rolled_back: Broadcast to game room with new state
game_state_update: Broadcast to game room with updated state
error: To requester if validation fails
"""
# Update activity timestamp
await manager.update_activity(sid)
# Rate limit check - connection level
if not await rate_limiter.check_websocket_limit(sid):
await manager.emit_to_user(
sid, "error", {"message": "Rate limited. Please slow down.", "code": "RATE_LIMITED"}
)
return
try:
# Extract and validate game_id
game_id_str = data.get("game_id")
if not game_id_str:
await manager.emit_to_user(sid, "error", {"message": "Missing game_id"})
return
try:
game_id = UUID(game_id_str)
except (ValueError, AttributeError):
await manager.emit_to_user(
sid, "error", {"message": "Invalid game_id format"}
)
return
# Extract num_plays (default to 1)
num_plays = data.get("num_plays", 1)
if not isinstance(num_plays, int) or num_plays < 1:
await manager.emit_to_user(
sid, "error", {"message": "num_plays must be a positive integer"}
)
return
# Rate limit check - game level for rollback (same as decisions)
if not await rate_limiter.check_game_limit(str(game_id), "decision"):
await manager.emit_to_user(
sid, "error", {"message": "Too many requests. Please wait.", "code": "GAME_RATE_LIMITED"}
)
return
# TODO: Verify user is authorized (game manager/owner only)
# user_id = manager.user_sessions.get(sid)
logger.info(f"Rollback request for game {game_id}: {num_plays} play(s)")
# Acquire lock before modifying state
async with state_manager.game_lock(game_id):
# Execute rollback
new_state = await game_engine.rollback_plays(game_id, num_plays)
# Broadcast rollback notification to all players in game (outside lock)
await manager.broadcast_to_game(
str(game_id),
"play_rolled_back",
{
"game_id": str(game_id),
"num_plays": num_plays,
"new_play_count": new_state.play_count,
"inning": new_state.inning,
"half": new_state.half,
"message": f"Rolled back {num_plays} play(s)",
},
)
# Broadcast updated game state to all players
await manager.broadcast_to_game(
str(game_id),
"game_state_update",
new_state.model_dump(mode="json"),
)
logger.info(
f"Rollback successful for game {game_id}: "
f"Now at play {new_state.play_count}, inning {new_state.inning} {new_state.half}"
)
except ValueError as e:
# Validation errors from game_engine.rollback_plays
logger.warning(f"Rollback validation error for game {game_id}: {e}")
await manager.emit_to_user(sid, "error", {"message": str(e)})
except asyncio.TimeoutError:
logger.error(f"Lock timeout while rolling back game {game_id}")
await manager.emit_to_user(
sid, "error", {"message": "Server busy - please try again"}
)
except DatabaseError as e:
logger.error(f"Database error in rollback_play: {e}")
await manager.emit_to_user(
sid, "error", {"message": "Database error - please retry"}
)
except SQLAlchemyError as e:
logger.error(f"Database error in rollback_play: {e}")
await manager.emit_to_user(
sid, "error", {"message": "Database error - please retry"}
)
except (TypeError, AttributeError) as e:
logger.warning(f"Invalid data in rollback_play request: {e}")
await manager.emit_to_user(
sid, "error", {"message": "Invalid rollback request"}
)