import asyncio import logging from uuid import UUID import pendulum from pydantic import ValidationError from socketio import AsyncServer from sqlalchemy.exc import SQLAlchemyError from app.config.result_charts import PlayOutcome from app.core.dice import dice_system from app.core.exceptions import ( DatabaseError, GameNotFoundError, InvalidGameStateError, ) from app.core.game_engine import game_engine from app.core.state_manager import state_manager from app.core.substitution_manager import SubstitutionManager from app.core.validators import ValidationError as GameValidationError from app.database.operations import DatabaseOperations from app.middleware.rate_limit import rate_limiter from app.models.game_models import ManualOutcomeSubmission from app.services.lineup_service import lineup_service from app.utils.auth import verify_token from app.websocket.connection_manager import ConnectionManager logger = logging.getLogger(f"{__name__}.handlers") def register_handlers(sio: AsyncServer, manager: ConnectionManager) -> None: """Register all WebSocket event handlers""" @sio.event async def connect(sid, environ, auth): """ Handle new connection with cookie, query param, or auth object support. Tries in order: 1. Cookie-based auth (from HttpOnly cookies) 2. Query parameter auth (for Safari/iOS fallback) 3. Auth object (for direct JS clients) """ try: token = None # Try cookie first (from HTTP headers in environ) cookie_header = environ.get("HTTP_COOKIE", "") if "pd_access_token=" in cookie_header: from http.cookies import SimpleCookie cookie = SimpleCookie() cookie.load(cookie_header) if "pd_access_token" in cookie: token = cookie["pd_access_token"].value logger.debug(f"Connection {sid} using cookie auth") # Try query parameter (Safari/iOS fallback) if not token: query_string = environ.get("QUERY_STRING", "") if "token=" in query_string: from urllib.parse import parse_qs params = parse_qs(query_string) if "token" in params: token = params["token"][0] logger.debug(f"Connection {sid} using query param auth (Safari fallback)") # Fall back to auth object (for direct JS clients) if not token and auth: token = auth.get("token") if token: logger.debug(f"Connection {sid} using auth object") if not token: logger.warning(f"Connection {sid} rejected: no token") return False user_data = verify_token(token) user_id = user_data.get("user_id") if not user_id: logger.warning(f"Connection {sid} rejected: invalid token") return False # Extract IP address for logging ip_address = environ.get("REMOTE_ADDR") await manager.connect(sid, user_id, ip_address=ip_address) await sio.emit("connected", {"user_id": user_id}, room=sid) logger.info(f"Connection {sid} accepted for user {user_id}") return True except (ValueError, KeyError) as e: # Token parsing or missing data error logger.warning(f"Connection {sid} auth error: {e}") return False except (ConnectionError, OSError) as e: # Network/socket error during connection logger.error(f"Connection {sid} network error: {e}") return False except Exception as e: # Unexpected error - log and reject connection logger.error(f"Connection {sid} unexpected error: {e}", exc_info=True) return False @sio.event async def disconnect(sid): """Handle disconnection""" # Clean up rate limiter buckets for this connection rate_limiter.remove_connection(sid) await manager.disconnect(sid) @sio.event async def join_game(sid, data): """Handle join game request""" # Rate limit check - connection level if not await rate_limiter.check_websocket_limit(sid): await manager.emit_to_user( sid, "error", {"message": "Rate limited. Please slow down.", "code": "RATE_LIMITED"} ) return try: game_id = data.get("game_id") role = data.get("role", "player") if not game_id: await manager.emit_to_user(sid, "error", {"message": "Missing game_id"}) return # TODO: Verify user has access to game await manager.join_game(sid, game_id, role) await manager.emit_to_user( sid, "game_joined", {"game_id": game_id, "role": role} ) except (ValueError, TypeError) as e: # Invalid data format logger.warning(f"Join game validation error for {sid}: {e}") await manager.emit_to_user(sid, "error", {"message": "Invalid game data"}) except (ConnectionError, OSError) as e: # Network error during room join logger.error(f"Join game network error for {sid}: {e}") await manager.emit_to_user(sid, "error", {"message": "Connection error"}) @sio.event async def leave_game(sid, data): """Handle leave game request""" try: game_id = data.get("game_id") if game_id: await manager.leave_game(sid, game_id) except (ValueError, TypeError) as e: # Invalid data - log but don't error to client (leave is cleanup) logger.warning(f"Leave game data error for {sid}: {e}") except (ConnectionError, OSError) as e: # Network error - log but don't propagate (connection may already be gone) logger.debug(f"Leave game network error for {sid}: {e}") @sio.event async def heartbeat(sid): """ Handle client-initiated heartbeat ping. Updates session activity timestamp to prevent expiration. Socket.io has its own ping/pong mechanism, but clients can send explicit heartbeats for application-level keepalive. """ await manager.update_activity(sid) await sio.emit("heartbeat_ack", {}, room=sid) @sio.event async def request_game_state(sid, data): """ Client requests full game state (recovery after disconnect or initial load). Recovers game from database if not in memory. """ # Update activity timestamp await manager.update_activity(sid) # Rate limit check - connection level if not await rate_limiter.check_websocket_limit(sid): await manager.emit_to_user( sid, "error", {"message": "Rate limited. Please slow down.", "code": "RATE_LIMITED"} ) return try: game_id_str = data.get("game_id") if not game_id_str: await manager.emit_to_user(sid, "error", {"message": "Missing game_id"}) return try: game_id = UUID(game_id_str) except (ValueError, AttributeError): await manager.emit_to_user( sid, "error", {"message": "Invalid game_id format"} ) return # Try to get from memory first state = state_manager.get_state(game_id) # If not in memory, recover from database if not state: logger.info(f"Game {game_id} not in memory, recovering from database") state = await state_manager.recover_game(game_id) if state: # Fetch play history from database for mid-game joins db_ops = DatabaseOperations() plays = await db_ops.get_plays(game_id) # Convert Play DB models to frontend-compatible PlayResult dicts # Play model uses: hit_type (outcome), result_description (description), batter_id recent_plays = [] for p in plays: outcome = p.hit_type or "" # Parse outcome for categorization try: outcome_enum = PlayOutcome(outcome) is_hit = outcome_enum.is_hit() except ValueError: is_hit = False recent_plays.append({ "play_number": p.play_number, "inning": p.inning, "half": p.half, "outcome": outcome, "description": p.result_description or "", "runs_scored": p.runs_scored or 0, "outs_recorded": p.outs_recorded or 0, "batter_lineup_id": p.batter_id, "runners_advanced": [], # Not stored in Play model "batter_result": p.batter_final, "is_hit": is_hit, "is_out": (p.outs_recorded or 0) > 0, "is_walk": outcome and "walk" in outcome.lower(), "is_strikeout": outcome and "strikeout" in outcome.lower(), }) # Emit game_state_sync with state and play history await manager.emit_to_user( sid, "game_state_sync", { "state": state.model_dump(mode="json"), "recent_plays": recent_plays, "timestamp": pendulum.now("UTC").isoformat(), } ) logger.info(f"Sent game state sync with {len(recent_plays)} plays for {game_id} to {sid}") else: await manager.emit_to_user( sid, "error", {"message": f"Game {game_id} not found"} ) logger.warning(f"Game {game_id} not found in memory or database") except GameNotFoundError as e: logger.warning(f"Game not found: {e.game_id}") await manager.emit_to_user(sid, "error", {"message": str(e)}) except SQLAlchemyError as e: logger.error(f"Database error fetching game state: {e}") await manager.emit_to_user(sid, "error", {"message": "Database error - please retry"}) except (ValueError, TypeError) as e: logger.warning(f"Invalid data in request_game_state: {e}") await manager.emit_to_user(sid, "error", {"message": "Invalid request data"}) @sio.event async def roll_dice(sid, data): """ Roll dice for manual outcome selection. Server rolls dice and broadcasts to all players in game room. Players then read their physical cards and submit outcomes. Event data: game_id: UUID of the game Emits: dice_rolled: Broadcast to game room with dice results error: To requester if validation fails """ # Update activity timestamp await manager.update_activity(sid) # Rate limit check - connection level if not await rate_limiter.check_websocket_limit(sid): await manager.emit_to_user( sid, "error", {"message": "Rate limited. Please slow down.", "code": "RATE_LIMITED"} ) return try: # Extract and validate game_id game_id_str = data.get("game_id") if not game_id_str: await manager.emit_to_user(sid, "error", {"message": "Missing game_id"}) return try: game_id = UUID(game_id_str) except (ValueError, AttributeError): await manager.emit_to_user( sid, "error", {"message": "Invalid game_id format"} ) return # Rate limit check - game level for rolls if not await rate_limiter.check_game_limit(str(game_id), "roll"): await manager.emit_to_user( sid, "error", {"message": "Too many roll requests. Please wait.", "code": "GAME_RATE_LIMITED"} ) return # Acquire lock before modifying state async with state_manager.game_lock(game_id): # Get game state state = state_manager.get_state(game_id) if not state: await manager.emit_to_user( sid, "error", {"message": f"Game {game_id} not found"} ) return # TODO: Verify user is participant in this game # user_id = manager.user_sessions.get(sid) # if not is_game_participant(game_id, user_id): # await manager.emit_to_user(sid, "error", {"message": "Not authorized"}) # return # Check if there are runners on base (affects chaos check) runners_on_base = bool(state.on_first or state.on_second or state.on_third) # Roll dice ab_roll = dice_system.roll_ab( league_id=state.league_id, game_id=game_id, runners_on_base=runners_on_base, ) logger.info( f"Dice rolled for game {game_id}: " f"d6={ab_roll.d6_one}, 2d6={ab_roll.d6_two_total}, " f"chaos={ab_roll.chaos_d20}, resolution={ab_roll.resolution_d20}, " f"chaos_skipped={ab_roll.chaos_check_skipped}" ) # Store roll in game state for manual outcome validation state.pending_manual_roll = ab_roll state_manager.update_state(game_id, state) # Broadcast dice results to all players in game (outside lock) await manager.broadcast_to_game( str(game_id), "dice_rolled", { "game_id": str(game_id), "roll_id": ab_roll.roll_id, "d6_one": ab_roll.d6_one, "d6_two_a": ab_roll.d6_two_a, "d6_two_b": ab_roll.d6_two_b, "d6_two_total": ab_roll.d6_two_total, "chaos_d20": ab_roll.chaos_d20, "resolution_d20": ab_roll.resolution_d20, "check_wild_pitch": ab_roll.check_wild_pitch, "check_passed_ball": ab_roll.check_passed_ball, "chaos_check_skipped": ab_roll.chaos_check_skipped, "timestamp": ab_roll.timestamp.to_iso8601_string(), "message": "Dice rolled - read your card and submit outcome", }, ) except asyncio.TimeoutError: logger.error(f"Lock timeout while rolling dice for game {game_id}") await manager.emit_to_user( sid, "error", {"message": "Server busy - please try again"} ) except GameValidationError as e: logger.warning(f"Validation error in roll_dice: {e}") await manager.emit_to_user(sid, "error", {"message": str(e)}) except SQLAlchemyError as e: logger.error(f"Database error during roll_dice: {e}") await manager.emit_to_user( sid, "error", {"message": "Database error - please retry"} ) except (ValueError, TypeError) as e: logger.warning(f"Invalid data in roll_dice: {e}") await manager.emit_to_user( sid, "error", {"message": "Invalid request data"} ) except Exception as e: # Unexpected error - ensure we log and report logger.error(f"Unexpected error in roll_dice: {e}", exc_info=True) await manager.emit_to_user( sid, "error", {"message": "An unexpected error occurred"} ) @sio.event async def submit_manual_outcome(sid, data): """ Submit manually-selected play outcome. After dice are rolled, players read their physical cards and submit the outcome they see. System validates and processes. Event data: game_id: UUID of the game outcome: PlayOutcome enum value (e.g., "groundball_c") hit_location: Optional position string (e.g., "SS") Emits: outcome_accepted: To requester if valid play_resolved: Broadcast to game room with play result outcome_rejected: To requester if validation fails error: To requester if processing fails """ # Update activity timestamp await manager.update_activity(sid) # Rate limit check - connection level if not await rate_limiter.check_websocket_limit(sid): await manager.emit_to_user( sid, "error", {"message": "Rate limited. Please slow down.", "code": "RATE_LIMITED"} ) return try: # Extract and validate game_id game_id_str = data.get("game_id") if not game_id_str: await manager.emit_to_user( sid, "outcome_rejected", {"message": "Missing game_id", "field": "game_id"}, ) return try: game_id = UUID(game_id_str) except (ValueError, AttributeError): await manager.emit_to_user( sid, "outcome_rejected", {"message": "Invalid game_id format", "field": "game_id"}, ) return # Get game state state = state_manager.get_state(game_id) if not state: await manager.emit_to_user( sid, "error", {"message": f"Game {game_id} not found"} ) return # Rate limit check - game level for decisions if not await rate_limiter.check_game_limit(str(game_id), "decision"): await manager.emit_to_user( sid, "error", {"message": "Too many outcome submissions. Please wait.", "code": "GAME_RATE_LIMITED"} ) return # TODO: Verify user is active batter or authorized to submit # user_id = manager.user_sessions.get(sid) # Extract outcome data outcome_str = data.get("outcome") hit_location = data.get("hit_location") if not outcome_str: await manager.emit_to_user( sid, "outcome_rejected", {"message": "Missing outcome", "field": "outcome"}, ) return # Validate using ManualOutcomeSubmission model try: submission = ManualOutcomeSubmission( outcome=outcome_str, hit_location=hit_location ) except ValidationError as e: # Extract first error for user-friendly message first_error = e.errors()[0] field = first_error["loc"][0] if first_error["loc"] else "unknown" message = first_error["msg"] await manager.emit_to_user( sid, "outcome_rejected", {"message": message, "field": field, "errors": e.errors()}, ) logger.warning( f"Manual outcome validation failed for game {game_id}: {message}" ) return # Convert to PlayOutcome enum outcome = PlayOutcome(submission.outcome) # NOTE: Business rule validation (e.g., when hit_location is required based on # game state) is handled in PlayResolver, not here. This layer only validates # basic input format and type checking. # Acquire lock for state modifications async with state_manager.game_lock(game_id): # Re-fetch state inside lock to ensure consistency state = state_manager.get_state(game_id) if not state: await manager.emit_to_user( sid, "error", {"message": f"Game {game_id} not found"} ) return # Check for pending roll BEFORE accepting outcome if not state.pending_manual_roll: await manager.emit_to_user( sid, "outcome_rejected", { "message": "No pending dice roll - call roll_dice first", "field": "game_state", }, ) return ab_roll = state.pending_manual_roll logger.info( f"Manual outcome submitted for game {game_id}: " f"{outcome.value}" + (f" to {submission.hit_location}" if submission.hit_location else "") ) logger.info( f"Processing manual outcome with roll {ab_roll.roll_id}: " f"d6={ab_roll.d6_one}, 2d6={ab_roll.d6_two_total}, " f"chaos={ab_roll.chaos_d20}" ) # Process manual outcome through game engine try: result = await game_engine.resolve_manual_play( game_id=game_id, ab_roll=ab_roll, outcome=outcome, hit_location=submission.hit_location, ) # CRITICAL: Re-fetch state AFTER resolve_manual_play completes # The game engine updates state (advances batter, etc.) during resolution. # Using the old state reference would overwrite those updates! state = state_manager.get_state(game_id) if state: # Clear pending roll only AFTER successful validation (one-time use) state.pending_manual_roll = None state_manager.update_state(game_id, state) except GameValidationError as e: # Game engine validation error (e.g., missing hit location) await manager.emit_to_user( sid, "outcome_rejected", {"message": str(e), "field": "validation"} ) logger.warning(f"Manual play validation failed: {e}") return except ValueError as e: # Business logic validation error from PlayResolver await manager.emit_to_user( sid, "outcome_rejected", {"message": str(e), "field": "validation"} ) logger.warning(f"Manual play business logic validation failed: {e}") return except DatabaseError as e: # Database error during play resolution logger.error(f"Database error resolving manual play: {e}") await manager.emit_to_user( sid, "error", {"message": "Database error during play resolution - please retry"} ) return except SQLAlchemyError as e: # SQLAlchemy error during play resolution logger.error(f"SQLAlchemy error resolving manual play: {e}") await manager.emit_to_user( sid, "error", {"message": "Database error - please retry"} ) return # Broadcasting happens outside lock to avoid holding it too long # Confirm acceptance to submitter AFTER successful validation await manager.emit_to_user( sid, "outcome_accepted", { "game_id": str(game_id), "outcome": outcome.value, "hit_location": submission.hit_location, }, ) # Build play result data play_result_data = { "game_id": str(game_id), "play_number": state.play_count, "inning": state.inning, "half": state.half, "outcome": result.outcome.value, # Use resolved outcome, not submitted "hit_location": submission.hit_location, "description": result.description, "outs_recorded": result.outs_recorded, "runs_scored": result.runs_scored, "batter_result": result.batter_result, "batter_lineup_id": state.current_batter.lineup_id if state.current_batter else None, "runners_advanced": [ { "from": adv.from_base, "to": adv.to_base, "lineup_id": adv.lineup_id, "is_out": adv.is_out, } for adv in result.runners_advanced ], "is_hit": result.is_hit, "is_out": result.is_out, "is_walk": result.is_walk, "roll_id": ab_roll.roll_id, } # Include X-Check details if present (Phase 3E-Final) if result.x_check_details: xcheck = result.x_check_details play_result_data["x_check_details"] = { "position": xcheck.position, "d20_roll": xcheck.d20_roll, "d6_roll": xcheck.d6_roll, "defender_range": xcheck.defender_range, "defender_error_rating": xcheck.defender_error_rating, "defender_id": xcheck.defender_id, "base_result": xcheck.base_result, "converted_result": xcheck.converted_result, "error_result": xcheck.error_result, "final_outcome": xcheck.final_outcome.value, "hit_type": xcheck.hit_type, # Optional SPD test details "spd_test_roll": xcheck.spd_test_roll, "spd_test_target": xcheck.spd_test_target, "spd_test_passed": xcheck.spd_test_passed, } # Broadcast play result to game room await manager.broadcast_to_game( str(game_id), "play_resolved", play_result_data ) logger.info( f"Manual play resolved for game {game_id}: {result.description}" ) # Broadcast updated game state so frontend sees new batter, outs, etc. updated_state = state_manager.get_state(game_id) if updated_state: batter_info = f"lineup_id={updated_state.current_batter.lineup_id}, batting_order={updated_state.current_batter.batting_order}" if updated_state.current_batter else "None" logger.info(f"Broadcasting game_state_update with current_batter: {batter_info}") await manager.broadcast_to_game( str(game_id), "game_state_update", updated_state.model_dump(mode="json"), ) except asyncio.TimeoutError: logger.error(f"Lock timeout while submitting manual outcome for game {game_id}") await manager.emit_to_user( sid, "error", {"message": "Server busy - please try again"} ) except DatabaseError as e: logger.error(f"Database error in submit_manual_outcome: {e}") await manager.emit_to_user( sid, "error", {"message": "Database error - please retry"} ) except SQLAlchemyError as e: logger.error(f"SQLAlchemy error in submit_manual_outcome: {e}") await manager.emit_to_user( sid, "error", {"message": "Database error - please retry"} ) except (ValueError, TypeError) as e: logger.warning(f"Invalid data in submit_manual_outcome: {e}") await manager.emit_to_user( sid, "error", {"message": "Invalid outcome data"} ) # ===== SUBSTITUTION EVENTS ===== @sio.event async def request_pinch_hitter(sid, data): """ Request pinch hitter substitution. Replaces current batter with a player from the bench. The substitute takes the batting order position of the replaced player. Event data: game_id: UUID of the game player_out_lineup_id: int - lineup ID of player being removed player_in_card_id: int - card/player ID of substitute team_id: int - team making substitution Emits: player_substituted: Broadcast to game room on success substitution_confirmed: To requester with new lineup_id substitution_error: To requester if validation fails error: To requester if processing fails """ # Update activity timestamp await manager.update_activity(sid) # Rate limit check - connection level if not await rate_limiter.check_websocket_limit(sid): await manager.emit_to_user( sid, "error", {"message": "Rate limited. Please slow down.", "code": "RATE_LIMITED"} ) return try: # Extract and validate game_id game_id_str = data.get("game_id") if not game_id_str: await manager.emit_to_user( sid, "substitution_error", {"message": "Missing game_id", "code": "MISSING_FIELD"}, ) return try: game_id = UUID(game_id_str) except (ValueError, AttributeError): await manager.emit_to_user( sid, "substitution_error", {"message": "Invalid game_id format", "code": "INVALID_FORMAT"}, ) return # Rate limit check - game level for substitutions if not await rate_limiter.check_game_limit(str(game_id), "substitution"): await manager.emit_to_user( sid, "error", {"message": "Too many substitution requests. Please wait.", "code": "GAME_RATE_LIMITED"} ) return # Get game state state = state_manager.get_state(game_id) if not state: await manager.emit_to_user( sid, "error", {"message": f"Game {game_id} not found"} ) return # Extract substitution data player_out_lineup_id = data.get("player_out_lineup_id") player_in_card_id = data.get("player_in_card_id") team_id = data.get("team_id") if player_out_lineup_id is None: await manager.emit_to_user( sid, "substitution_error", { "message": "Missing player_out_lineup_id", "code": "MISSING_FIELD", }, ) return if player_in_card_id is None: await manager.emit_to_user( sid, "substitution_error", {"message": "Missing player_in_card_id", "code": "MISSING_FIELD"}, ) return if team_id is None: await manager.emit_to_user( sid, "substitution_error", {"message": "Missing team_id", "code": "MISSING_FIELD"}, ) return # TODO: Verify user is authorized to make substitution for this team # user_id = manager.user_sessions.get(sid) logger.info( f"Pinch hitter request for game {game_id}: " f"Replacing {player_out_lineup_id} with card {player_in_card_id}" ) # Acquire lock before substitution to prevent concurrent lineup modifications async with state_manager.game_lock(game_id): # Create SubstitutionManager instance db_ops = DatabaseOperations() sub_manager = SubstitutionManager(db_ops) # Execute pinch hitter substitution result = await sub_manager.pinch_hit( game_id=game_id, player_out_lineup_id=player_out_lineup_id, player_in_card_id=player_in_card_id, team_id=team_id, ) # Broadcasting happens outside lock if result.success: # Broadcast to all clients in game await manager.broadcast_to_game( str(game_id), "player_substituted", { "type": "pinch_hitter", "player_out_lineup_id": result.player_out_lineup_id, "player_in_card_id": result.player_in_card_id, "new_lineup_id": result.new_lineup_id, "position": result.new_position, "batting_order": result.new_batting_order, "team_id": team_id, "message": f"Pinch hitter: #{result.new_batting_order} now batting", }, ) # Send confirmation to requester await manager.emit_to_user( sid, "substitution_confirmed", { "type": "pinch_hitter", "new_lineup_id": result.new_lineup_id, "success": True, }, ) logger.info( f"Pinch hitter successful for game {game_id}: " f"New lineup ID {result.new_lineup_id}" ) else: # Send error to requester with error code await manager.emit_to_user( sid, "substitution_error", { "message": result.error_message, "code": result.error_code, "type": "pinch_hitter", }, ) logger.warning( f"Pinch hitter failed for game {game_id}: {result.error_message}" ) except asyncio.TimeoutError: logger.error(f"Lock timeout while processing pinch hitter for game {game_id}") await manager.emit_to_user( sid, "error", {"message": "Server busy - please try again"} ) except SQLAlchemyError as e: logger.error(f"Database error in pinch hitter: {e}") await manager.emit_to_user( sid, "error", {"message": "Database error - please retry"} ) except (ValueError, TypeError) as e: logger.warning(f"Invalid data in pinch hitter request: {e}") await manager.emit_to_user( sid, "error", {"message": "Invalid substitution data"} ) @sio.event async def request_defensive_replacement(sid, data): """ Request defensive replacement substitution. Replaces a defensive player with a better fielder. Player can be swapped at any position. If player is in batting order, substitute takes their batting order spot. Event data: game_id: UUID of the game player_out_lineup_id: int - lineup ID of player being removed player_in_card_id: int - card/player ID of substitute new_position: str - defensive position for substitute (e.g., "SS") team_id: int - team making substitution Emits: player_substituted: Broadcast to game room on success substitution_confirmed: To requester with new lineup_id substitution_error: To requester if validation fails error: To requester if processing fails """ # Update activity timestamp await manager.update_activity(sid) # Rate limit check - connection level if not await rate_limiter.check_websocket_limit(sid): await manager.emit_to_user( sid, "error", {"message": "Rate limited. Please slow down.", "code": "RATE_LIMITED"} ) return try: # Extract and validate game_id game_id_str = data.get("game_id") if not game_id_str: await manager.emit_to_user( sid, "substitution_error", {"message": "Missing game_id", "code": "MISSING_FIELD"}, ) return try: game_id = UUID(game_id_str) except (ValueError, AttributeError): await manager.emit_to_user( sid, "substitution_error", {"message": "Invalid game_id format", "code": "INVALID_FORMAT"}, ) return # Rate limit check - game level for substitutions if not await rate_limiter.check_game_limit(str(game_id), "substitution"): await manager.emit_to_user( sid, "error", {"message": "Too many substitution requests. Please wait.", "code": "GAME_RATE_LIMITED"} ) return # Get game state state = state_manager.get_state(game_id) if not state: await manager.emit_to_user( sid, "error", {"message": f"Game {game_id} not found"} ) return # Extract substitution data player_out_lineup_id = data.get("player_out_lineup_id") player_in_card_id = data.get("player_in_card_id") new_position = data.get("new_position") team_id = data.get("team_id") if player_out_lineup_id is None: await manager.emit_to_user( sid, "substitution_error", { "message": "Missing player_out_lineup_id", "code": "MISSING_FIELD", }, ) return if player_in_card_id is None: await manager.emit_to_user( sid, "substitution_error", {"message": "Missing player_in_card_id", "code": "MISSING_FIELD"}, ) return if not new_position: await manager.emit_to_user( sid, "substitution_error", {"message": "Missing new_position", "code": "MISSING_FIELD"}, ) return if team_id is None: await manager.emit_to_user( sid, "substitution_error", {"message": "Missing team_id", "code": "MISSING_FIELD"}, ) return # TODO: Verify user is authorized to make substitution for this team # user_id = manager.user_sessions.get(sid) logger.info( f"Defensive replacement request for game {game_id}: " f"Replacing {player_out_lineup_id} with card {player_in_card_id} at {new_position}" ) # Acquire lock before substitution to prevent concurrent lineup modifications async with state_manager.game_lock(game_id): # Create SubstitutionManager instance db_ops = DatabaseOperations() sub_manager = SubstitutionManager(db_ops) # Execute defensive replacement result = await sub_manager.defensive_replace( game_id=game_id, player_out_lineup_id=player_out_lineup_id, player_in_card_id=player_in_card_id, new_position=new_position, team_id=team_id, ) # Broadcasting happens outside lock if result.success: # Broadcast to all clients in game await manager.broadcast_to_game( str(game_id), "player_substituted", { "type": "defensive_replacement", "player_out_lineup_id": result.player_out_lineup_id, "player_in_card_id": result.player_in_card_id, "new_lineup_id": result.new_lineup_id, "position": result.new_position, "batting_order": result.new_batting_order, "team_id": team_id, "message": f"Defensive replacement: {result.new_position}", }, ) # Send confirmation to requester await manager.emit_to_user( sid, "substitution_confirmed", { "type": "defensive_replacement", "new_lineup_id": result.new_lineup_id, "success": True, }, ) logger.info( f"Defensive replacement successful for game {game_id}: " f"New lineup ID {result.new_lineup_id}" ) else: # Send error to requester with error code await manager.emit_to_user( sid, "substitution_error", { "message": result.error_message, "code": result.error_code, "type": "defensive_replacement", }, ) logger.warning( f"Defensive replacement failed for game {game_id}: {result.error_message}" ) except asyncio.TimeoutError: logger.error(f"Lock timeout while processing defensive replacement for game {game_id}") await manager.emit_to_user( sid, "error", {"message": "Server busy - please try again"} ) except SQLAlchemyError as e: logger.error(f"Database error in defensive replacement: {e}") await manager.emit_to_user( sid, "error", {"message": "Database error - please retry"} ) except (ValueError, TypeError) as e: logger.warning(f"Invalid data in defensive replacement request: {e}") await manager.emit_to_user( sid, "error", {"message": "Invalid substitution data"} ) @sio.event async def request_pitching_change(sid, data): """ Request pitching change substitution. Replaces current pitcher with a reliever. Pitcher must have faced at least 1 batter unless injury. New pitcher takes mound immediately. Event data: game_id: UUID of the game player_out_lineup_id: int - lineup ID of pitcher being removed player_in_card_id: int - card/player ID of relief pitcher team_id: int - team making substitution Emits: player_substituted: Broadcast to game room on success substitution_confirmed: To requester with new lineup_id substitution_error: To requester if validation fails error: To requester if processing fails """ # Update activity timestamp await manager.update_activity(sid) # Rate limit check - connection level if not await rate_limiter.check_websocket_limit(sid): await manager.emit_to_user( sid, "error", {"message": "Rate limited. Please slow down.", "code": "RATE_LIMITED"} ) return try: # Extract and validate game_id game_id_str = data.get("game_id") if not game_id_str: await manager.emit_to_user( sid, "substitution_error", {"message": "Missing game_id", "code": "MISSING_FIELD"}, ) return try: game_id = UUID(game_id_str) except (ValueError, AttributeError): await manager.emit_to_user( sid, "substitution_error", {"message": "Invalid game_id format", "code": "INVALID_FORMAT"}, ) return # Rate limit check - game level for substitutions if not await rate_limiter.check_game_limit(str(game_id), "substitution"): await manager.emit_to_user( sid, "error", {"message": "Too many substitution requests. Please wait.", "code": "GAME_RATE_LIMITED"} ) return # Get game state state = state_manager.get_state(game_id) if not state: await manager.emit_to_user( sid, "error", {"message": f"Game {game_id} not found"} ) return # Extract substitution data player_out_lineup_id = data.get("player_out_lineup_id") player_in_card_id = data.get("player_in_card_id") team_id = data.get("team_id") if player_out_lineup_id is None: await manager.emit_to_user( sid, "substitution_error", { "message": "Missing player_out_lineup_id", "code": "MISSING_FIELD", }, ) return if player_in_card_id is None: await manager.emit_to_user( sid, "substitution_error", {"message": "Missing player_in_card_id", "code": "MISSING_FIELD"}, ) return if team_id is None: await manager.emit_to_user( sid, "substitution_error", {"message": "Missing team_id", "code": "MISSING_FIELD"}, ) return # TODO: Verify user is authorized to make substitution for this team # user_id = manager.user_sessions.get(sid) logger.info( f"Pitching change request for game {game_id}: " f"Replacing {player_out_lineup_id} with card {player_in_card_id}" ) # Acquire lock before substitution to prevent concurrent lineup modifications async with state_manager.game_lock(game_id): # Create SubstitutionManager instance db_ops = DatabaseOperations() sub_manager = SubstitutionManager(db_ops) # Execute pitching change result = await sub_manager.change_pitcher( game_id=game_id, player_out_lineup_id=player_out_lineup_id, player_in_card_id=player_in_card_id, team_id=team_id, ) # Broadcasting happens outside lock if result.success: # Broadcast to all clients in game await manager.broadcast_to_game( str(game_id), "player_substituted", { "type": "pitching_change", "player_out_lineup_id": result.player_out_lineup_id, "player_in_card_id": result.player_in_card_id, "new_lineup_id": result.new_lineup_id, "position": result.new_position, # Should be "P" "batting_order": result.new_batting_order, "team_id": team_id, "message": "Pitching change: New pitcher entering", }, ) # Send confirmation to requester await manager.emit_to_user( sid, "substitution_confirmed", { "type": "pitching_change", "new_lineup_id": result.new_lineup_id, "success": True, }, ) logger.info( f"Pitching change successful for game {game_id}: " f"New lineup ID {result.new_lineup_id}" ) else: # Send error to requester with error code await manager.emit_to_user( sid, "substitution_error", { "message": result.error_message, "code": result.error_code, "type": "pitching_change", }, ) logger.warning( f"Pitching change failed for game {game_id}: {result.error_message}" ) except asyncio.TimeoutError: logger.error(f"Lock timeout while processing pitching change for game {game_id}") await manager.emit_to_user( sid, "error", {"message": "Server busy - please try again"} ) except SQLAlchemyError as e: logger.error(f"Database error in pitching change: {e}") await manager.emit_to_user( sid, "error", {"message": "Database error - please retry"} ) except (ValueError, TypeError) as e: logger.warning(f"Invalid data in pitching change request: {e}") await manager.emit_to_user( sid, "error", {"message": "Invalid substitution data"} ) @sio.event async def get_lineup(sid, data): """ Get current active lineup for a team. Returns all active players in the lineup with their positions and batting orders. Used by UI to refresh lineup display. Event data: game_id: UUID of the game team_id: int - team to get lineup for Emits: lineup_data: To requester with active lineup error: To requester if validation fails """ # Update activity timestamp await manager.update_activity(sid) # Rate limit check - connection level if not await rate_limiter.check_websocket_limit(sid): await manager.emit_to_user( sid, "error", {"message": "Rate limited. Please slow down.", "code": "RATE_LIMITED"} ) return try: # Extract and validate game_id game_id_str = data.get("game_id") if not game_id_str: await manager.emit_to_user(sid, "error", {"message": "Missing game_id"}) return try: game_id = UUID(game_id_str) except (ValueError, AttributeError): await manager.emit_to_user( sid, "error", {"message": "Invalid game_id format"} ) return # Extract team_id team_id = data.get("team_id") if team_id is None: await manager.emit_to_user(sid, "error", {"message": "Missing team_id"}) return # TODO: Verify user has access to view this lineup # user_id = manager.user_sessions.get(sid) # Get lineup from state manager cache (fast O(1) lookup) lineup = state_manager.get_lineup(game_id, team_id) if lineup: # Send lineup data with player info await manager.emit_to_user( sid, "lineup_data", { "game_id": str(game_id), "team_id": team_id, "players": [ { "lineup_id": p.lineup_id, "card_id": p.card_id, "position": p.position, "batting_order": p.batting_order, "is_active": p.is_active, "is_starter": p.is_starter, "player": { "id": p.card_id, "name": p.player_name or f"Player #{p.card_id}", "image": p.player_image or "", "headshot": p.player_headshot or "", }, } for p in lineup.players if p.is_active ], }, ) logger.info(f"Lineup data sent for game {game_id}, team {team_id}") else: # Lineup not in cache - try to load from database with player data # Get league_id from game state or database state = state_manager.get_state(game_id) league_id = state.league_id if state else "sba" lineup_state = await lineup_service.load_team_lineup_with_player_data( game_id=game_id, team_id=team_id, league_id=league_id ) if lineup_state: # Cache the lineup for future requests state_manager.set_lineup(game_id, team_id, lineup_state) # Send lineup data with player info await manager.emit_to_user( sid, "lineup_data", { "game_id": str(game_id), "team_id": team_id, "players": [ { "lineup_id": p.lineup_id, "card_id": p.card_id, "position": p.position, "batting_order": p.batting_order, "is_active": p.is_active, "is_starter": p.is_starter, "player": { "id": p.card_id, "name": p.player_name or f"Player #{p.card_id}", "image": p.player_image or "", }, } for p in lineup_state.players if p.is_active ], }, ) logger.info( f"Lineup data loaded from DB with player data for game {game_id}, team {team_id}" ) else: await manager.emit_to_user( sid, "error", {"message": f"Lineup not found for team {team_id}"}, ) except SQLAlchemyError as e: logger.error(f"Database error in get_lineup: {e}") await manager.emit_to_user( sid, "error", {"message": "Database error - please retry"} ) except (ValueError, TypeError) as e: logger.warning(f"Invalid data in get_lineup request: {e}") await manager.emit_to_user( sid, "error", {"message": "Invalid lineup request"} ) @sio.event async def get_bench(sid, data): """ Get bench players for a team (roster players not in active lineup). This queries RosterLink for players that are NOT in the active Lineup, including their natural positions for UI filtering (batters vs pitchers). Supports two-way players with is_pitcher and is_batter computed properties. Event data: game_id: UUID of the game team_id: int - team to get bench for Emits: bench_data: To requester with bench players including: - player_positions: list of natural positions (e.g., ["SS", "2B"]) - is_pitcher: true if player has pitching positions - is_batter: true if player has batting positions error: To requester if validation fails """ await manager.update_activity(sid) if not await rate_limiter.check_websocket_limit(sid): await manager.emit_to_user( sid, "error", {"message": "Rate limited. Please slow down.", "code": "RATE_LIMITED"} ) return try: game_id_str = data.get("game_id") if not game_id_str: await manager.emit_to_user(sid, "error", {"message": "Missing game_id"}) return try: game_id = UUID(game_id_str) except (ValueError, AttributeError): await manager.emit_to_user( sid, "error", {"message": "Invalid game_id format"} ) return team_id = data.get("team_id") if team_id is None: await manager.emit_to_user(sid, "error", {"message": "Missing team_id"}) return # Query bench players from RosterLink (roster players not in active lineup) # Player data (name, image, headshot) is cached in RosterLink - no API call needed db_ops = DatabaseOperations() bench_roster = await db_ops.get_bench_players(game_id, team_id) if bench_roster: bench_players = [] for roster in bench_roster: # Use cached player_data from RosterLink (populated at lineup submission) pdata = roster.player_data or {} player_name = pdata.get("name", f"Player #{roster.player_id}") player_image = pdata.get("image", "") player_headshot = pdata.get("headshot", "") bench_players.append({ "roster_id": roster.id, "player_id": roster.player_id, "player_positions": roster.player_positions, "is_pitcher": roster.is_pitcher, "is_batter": roster.is_batter, "player": { "id": roster.player_id, "name": player_name, "image": player_image, "headshot": player_headshot, # Include positions for frontend filtering (legacy support) "pos_1": roster.player_positions[0] if roster.player_positions else None, "pos_2": roster.player_positions[1] if len(roster.player_positions) > 1 else None, "pos_3": roster.player_positions[2] if len(roster.player_positions) > 2 else None, }, }) await manager.emit_to_user( sid, "bench_data", { "game_id": str(game_id), "team_id": team_id, "players": bench_players, }, ) logger.info( f"Bench data sent for game {game_id}, team {team_id}: {len(bench_players)} players" ) else: await manager.emit_to_user( sid, "bench_data", {"game_id": str(game_id), "team_id": team_id, "players": []}, ) logger.info(f"No bench players found for game {game_id}, team {team_id}") except SQLAlchemyError as e: logger.error(f"Database error in get_bench: {e}") await manager.emit_to_user( sid, "error", {"message": "Database error - please retry"} ) except (ValueError, TypeError) as e: logger.warning(f"Invalid data in get_bench request: {e}") await manager.emit_to_user( sid, "error", {"message": "Invalid bench request"} ) @sio.event async def submit_defensive_decision(sid, data): """ Submit defensive team decision. Event data: game_id: UUID of the game alignment: Defensive alignment (normal, shifted_left, shifted_right, extreme_shift) infield_depth: Infield positioning (in, normal, back, double_play) outfield_depth: Outfield positioning (in, normal, back) hold_runners: List of bases to hold runners (e.g., [1, 3]) Emits: defensive_decision_submitted: To requester and broadcast to game room error: To requester if validation fails """ # Update activity timestamp await manager.update_activity(sid) # Rate limit check - connection level if not await rate_limiter.check_websocket_limit(sid): await manager.emit_to_user( sid, "error", {"message": "Rate limited. Please slow down.", "code": "RATE_LIMITED"} ) return try: # Extract and validate game_id game_id_str = data.get("game_id") if not game_id_str: await manager.emit_to_user(sid, "error", {"message": "Missing game_id"}) return try: game_id = UUID(game_id_str) except (ValueError, AttributeError): await manager.emit_to_user( sid, "error", {"message": "Invalid game_id format"} ) return # Rate limit check - game level for decisions if not await rate_limiter.check_game_limit(str(game_id), "decision"): await manager.emit_to_user( sid, "error", {"message": "Too many decision requests. Please wait.", "code": "GAME_RATE_LIMITED"} ) return # Get game state state = state_manager.get_state(game_id) if not state: await manager.emit_to_user( sid, "error", {"message": f"Game {game_id} not found"} ) return # TODO: Verify user is authorized (fielding team manager) # user_id = manager.user_sessions.get(sid) # Extract decision data alignment = data.get("alignment", "normal") infield_depth = data.get("infield_depth", "normal") outfield_depth = data.get("outfield_depth", "normal") hold_runners = data.get("hold_runners", []) # Create defensive decision from app.models.game_models import DefensiveDecision decision = DefensiveDecision( alignment=alignment, infield_depth=infield_depth, outfield_depth=outfield_depth, hold_runners=hold_runners, ) # Submit decision through game engine updated_state = await game_engine.submit_defensive_decision( game_id, decision ) logger.info( f"Defensive decision submitted for game {game_id}: " f"alignment={alignment}, infield={infield_depth}, outfield={outfield_depth}" ) # Broadcast to game room await manager.broadcast_to_game( str(game_id), "defensive_decision_submitted", { "game_id": str(game_id), "decision": { "alignment": alignment, "infield_depth": infield_depth, "outfield_depth": outfield_depth, "hold_runners": hold_runners, }, "pending_decision": updated_state.pending_decision, }, ) except ValidationError as e: logger.warning(f"Validation error in defensive decision: {e}") await manager.emit_to_user( sid, "error", {"message": "Invalid decision data"} ) except GameValidationError as e: logger.warning(f"Game validation error in defensive decision: {e}") await manager.emit_to_user(sid, "error", {"message": str(e)}) except SQLAlchemyError as e: logger.error(f"Database error in defensive decision: {e}") await manager.emit_to_user( sid, "error", {"message": "Database error - please retry"} ) except (ValueError, TypeError) as e: logger.warning(f"Invalid data in defensive decision: {e}") await manager.emit_to_user( sid, "error", {"message": "Invalid decision format"} ) @sio.event async def submit_offensive_decision(sid, data): """ Submit offensive team decision. Event data: game_id: UUID of the game action: String - offensive action (swing_away, steal, check_jump, hit_and_run, sac_bunt, squeeze_bunt) steal_attempts: List of bases for steal attempts - REQUIRED when action="steal" (e.g., [2, 3]) Emits: offensive_decision_submitted: To requester and broadcast to game room error: To requester if validation fails Session 2 Update (2025-01-14): Replaced approach with action field. Stealing is now an action choice. """ # Update activity timestamp await manager.update_activity(sid) # Rate limit check - connection level if not await rate_limiter.check_websocket_limit(sid): await manager.emit_to_user( sid, "error", {"message": "Rate limited. Please slow down.", "code": "RATE_LIMITED"} ) return try: # Extract and validate game_id game_id_str = data.get("game_id") if not game_id_str: await manager.emit_to_user(sid, "error", {"message": "Missing game_id"}) return try: game_id = UUID(game_id_str) except (ValueError, AttributeError): await manager.emit_to_user( sid, "error", {"message": "Invalid game_id format"} ) return # Rate limit check - game level for decisions if not await rate_limiter.check_game_limit(str(game_id), "decision"): await manager.emit_to_user( sid, "error", {"message": "Too many decision requests. Please wait.", "code": "GAME_RATE_LIMITED"} ) return # Get game state state = state_manager.get_state(game_id) if not state: await manager.emit_to_user( sid, "error", {"message": f"Game {game_id} not found"} ) return # TODO: Verify user is authorized (batting team manager) # user_id = manager.user_sessions.get(sid) # Extract decision data action = data.get("action", "swing_away") # Default: swing_away steal_attempts = data.get("steal_attempts", []) # Create offensive decision from app.models.game_models import OffensiveDecision decision = OffensiveDecision(action=action, steal_attempts=steal_attempts) # Submit decision through game engine updated_state = await game_engine.submit_offensive_decision( game_id, decision ) logger.info( f"Offensive decision submitted for game {game_id}: " f"action={action}, steal={steal_attempts}" ) # Broadcast to game room await manager.broadcast_to_game( str(game_id), "offensive_decision_submitted", { "game_id": str(game_id), "decision": {"action": action, "steal_attempts": steal_attempts}, "pending_decision": updated_state.pending_decision, }, ) except ValidationError as e: logger.warning(f"Validation error in offensive decision: {e}") await manager.emit_to_user( sid, "error", {"message": "Invalid decision data"} ) except GameValidationError as e: logger.warning(f"Game validation error in offensive decision: {e}") await manager.emit_to_user(sid, "error", {"message": str(e)}) except SQLAlchemyError as e: logger.error(f"Database error in offensive decision: {e}") await manager.emit_to_user( sid, "error", {"message": "Database error - please retry"} ) except (ValueError, TypeError) as e: logger.warning(f"Invalid data in offensive decision: {e}") await manager.emit_to_user( sid, "error", {"message": "Invalid decision format"} ) @sio.event async def get_box_score(sid, data): """ Get box score using materialized views. Event data: game_id: UUID of the game Emits: box_score_data: To requester with box score error: To requester if validation fails """ # Update activity timestamp await manager.update_activity(sid) # Rate limit check - connection level if not await rate_limiter.check_websocket_limit(sid): await manager.emit_to_user( sid, "error", {"message": "Rate limited. Please slow down.", "code": "RATE_LIMITED"} ) return try: # Extract and validate game_id game_id_str = data.get("game_id") if not game_id_str: await manager.emit_to_user(sid, "error", {"message": "Missing game_id"}) return try: game_id = UUID(game_id_str) except (ValueError, AttributeError): await manager.emit_to_user( sid, "error", {"message": "Invalid game_id format"} ) return # TODO: Verify user has access to view this game's box score # user_id = manager.user_sessions.get(sid) # Get box score from materialized views from app.services import box_score_service box_score = await box_score_service.get_box_score(game_id) if box_score: # Send box score data to requester await manager.emit_to_user( sid, "box_score_data", {"game_id": str(game_id), "box_score": box_score}, ) logger.info(f"Box score data sent for game {game_id}") else: await manager.emit_to_user( sid, "error", { "message": "No box score found for game", "hint": "Run migration (alembic upgrade head) and refresh views", }, ) except SQLAlchemyError as e: logger.error(f"Database error in get_box_score: {e}") await manager.emit_to_user( sid, "error", {"message": "Database error fetching box score - please retry"} ) except (ValueError, TypeError) as e: logger.warning(f"Invalid data in get_box_score request: {e}") await manager.emit_to_user( sid, "error", {"message": "Invalid box score request"} ) @sio.event async def rollback_play(sid, data): """ Roll back the last N plays. Deletes plays from the database and reconstructs game state by replaying remaining plays. Also removes any substitutions that occurred during the rolled-back plays. Event data: game_id: UUID of the game num_plays: int - Number of plays to roll back (default: 1) Emits: play_rolled_back: Broadcast to game room with new state game_state_update: Broadcast to game room with updated state error: To requester if validation fails """ # Update activity timestamp await manager.update_activity(sid) # Rate limit check - connection level if not await rate_limiter.check_websocket_limit(sid): await manager.emit_to_user( sid, "error", {"message": "Rate limited. Please slow down.", "code": "RATE_LIMITED"} ) return try: # Extract and validate game_id game_id_str = data.get("game_id") if not game_id_str: await manager.emit_to_user(sid, "error", {"message": "Missing game_id"}) return try: game_id = UUID(game_id_str) except (ValueError, AttributeError): await manager.emit_to_user( sid, "error", {"message": "Invalid game_id format"} ) return # Extract num_plays (default to 1) num_plays = data.get("num_plays", 1) if not isinstance(num_plays, int) or num_plays < 1: await manager.emit_to_user( sid, "error", {"message": "num_plays must be a positive integer"} ) return # Rate limit check - game level for rollback (same as decisions) if not await rate_limiter.check_game_limit(str(game_id), "decision"): await manager.emit_to_user( sid, "error", {"message": "Too many requests. Please wait.", "code": "GAME_RATE_LIMITED"} ) return # TODO: Verify user is authorized (game manager/owner only) # user_id = manager.user_sessions.get(sid) logger.info(f"Rollback request for game {game_id}: {num_plays} play(s)") # Acquire lock before modifying state async with state_manager.game_lock(game_id): # Execute rollback new_state = await game_engine.rollback_plays(game_id, num_plays) # Broadcast rollback notification to all players in game (outside lock) await manager.broadcast_to_game( str(game_id), "play_rolled_back", { "game_id": str(game_id), "num_plays": num_plays, "new_play_count": new_state.play_count, "inning": new_state.inning, "half": new_state.half, "message": f"Rolled back {num_plays} play(s)", }, ) # Broadcast updated game state to all players await manager.broadcast_to_game( str(game_id), "game_state_update", new_state.model_dump(mode="json"), ) logger.info( f"Rollback successful for game {game_id}: " f"Now at play {new_state.play_count}, inning {new_state.inning} {new_state.half}" ) except ValueError as e: # Validation errors from game_engine.rollback_plays logger.warning(f"Rollback validation error for game {game_id}: {e}") await manager.emit_to_user(sid, "error", {"message": str(e)}) except asyncio.TimeoutError: logger.error(f"Lock timeout while rolling back game {game_id}") await manager.emit_to_user( sid, "error", {"message": "Server busy - please try again"} ) except DatabaseError as e: logger.error(f"Database error in rollback_play: {e}") await manager.emit_to_user( sid, "error", {"message": "Database error - please retry"} ) except SQLAlchemyError as e: logger.error(f"Database error in rollback_play: {e}") await manager.emit_to_user( sid, "error", {"message": "Database error - please retry"} ) except (TypeError, AttributeError) as e: logger.warning(f"Invalid data in rollback_play request: {e}") await manager.emit_to_user( sid, "error", {"message": "Invalid rollback request"} )