import asyncio import logging from uuid import UUID import pendulum from pydantic import ValidationError from socketio import AsyncServer from sqlalchemy.exc import SQLAlchemyError from app.config.result_charts import PlayOutcome from app.core.dice import dice_system from app.core.exceptions import ( DatabaseError, GameNotFoundError, InvalidGameStateError, ) from app.core.game_engine import game_engine from app.core.state_manager import state_manager from app.core.substitution_manager import SubstitutionManager from app.core.validators import ValidationError as GameValidationError from app.database.operations import DatabaseOperations from app.middleware.rate_limit import rate_limiter from app.models.game_models import ManualOutcomeSubmission from app.services.lineup_service import lineup_service from app.utils.auth import verify_token from app.websocket.connection_manager import ConnectionManager logger = logging.getLogger(f"{__name__}.handlers") def register_handlers(sio: AsyncServer, manager: ConnectionManager) -> None: """Register all WebSocket event handlers""" @sio.event async def connect(sid, environ, auth): """ Handle new connection with cookie, query param, or auth object support. Tries in order: 1. Cookie-based auth (from HttpOnly cookies) 2. Query parameter auth (for Safari/iOS fallback) 3. Auth object (for direct JS clients) """ try: token = None # Try cookie first (from HTTP headers in environ) cookie_header = environ.get("HTTP_COOKIE", "") if "pd_access_token=" in cookie_header: from http.cookies import SimpleCookie cookie = SimpleCookie() cookie.load(cookie_header) if "pd_access_token" in cookie: token = cookie["pd_access_token"].value logger.debug(f"Connection {sid} using cookie auth") # Try query parameter (Safari/iOS fallback) if not token: query_string = environ.get("QUERY_STRING", "") if "token=" in query_string: from urllib.parse import parse_qs params = parse_qs(query_string) if "token" in params: token = params["token"][0] logger.debug(f"Connection {sid} using query param auth (Safari fallback)") # Fall back to auth object (for direct JS clients) if not token and auth: token = auth.get("token") if token: logger.debug(f"Connection {sid} using auth object") if not token: logger.warning(f"Connection {sid} rejected: no token") return False user_data = verify_token(token) user_id = user_data.get("user_id") if not user_id: logger.warning(f"Connection {sid} rejected: invalid token") return False # Extract IP address for logging ip_address = environ.get("REMOTE_ADDR") await manager.connect(sid, user_id, ip_address=ip_address) await sio.emit("connected", {"user_id": user_id}, room=sid) logger.info(f"Connection {sid} accepted for user {user_id}") return True except (ValueError, KeyError) as e: # Token parsing or missing data error logger.warning(f"Connection {sid} auth error: {e}") return False except (ConnectionError, OSError) as e: # Network/socket error during connection logger.error(f"Connection {sid} network error: {e}") return False except Exception as e: # Unexpected error - log and reject connection logger.error(f"Connection {sid} unexpected error: {e}", exc_info=True) return False @sio.event async def disconnect(sid): """Handle disconnection""" # Clean up rate limiter buckets for this connection rate_limiter.remove_connection(sid) await manager.disconnect(sid) @sio.event async def join_game(sid, data): """Handle join game request""" # Rate limit check - connection level if not await rate_limiter.check_websocket_limit(sid): await manager.emit_to_user( sid, "error", {"message": "Rate limited. Please slow down.", "code": "RATE_LIMITED"} ) return try: game_id = data.get("game_id") role = data.get("role", "player") if not game_id: await manager.emit_to_user(sid, "error", {"message": "Missing game_id"}) return # TODO: Verify user has access to game await manager.join_game(sid, game_id, role) await manager.emit_to_user( sid, "game_joined", {"game_id": game_id, "role": role} ) except (ValueError, TypeError) as e: # Invalid data format logger.warning(f"Join game validation error for {sid}: {e}") await manager.emit_to_user(sid, "error", {"message": "Invalid game data"}) except (ConnectionError, OSError) as e: # Network error during room join logger.error(f"Join game network error for {sid}: {e}") await manager.emit_to_user(sid, "error", {"message": "Connection error"}) @sio.event async def leave_game(sid, data): """Handle leave game request""" try: game_id = data.get("game_id") if game_id: await manager.leave_game(sid, game_id) except (ValueError, TypeError) as e: # Invalid data - log but don't error to client (leave is cleanup) logger.warning(f"Leave game data error for {sid}: {e}") except (ConnectionError, OSError) as e: # Network error - log but don't propagate (connection may already be gone) logger.debug(f"Leave game network error for {sid}: {e}") @sio.event async def heartbeat(sid): """ Handle client-initiated heartbeat ping. Updates session activity timestamp to prevent expiration. Socket.io has its own ping/pong mechanism, but clients can send explicit heartbeats for application-level keepalive. """ await manager.update_activity(sid) await sio.emit("heartbeat_ack", {}, room=sid) @sio.event async def request_game_state(sid, data): """ Client requests full game state (recovery after disconnect or initial load). Recovers game from database if not in memory. """ # Update activity timestamp await manager.update_activity(sid) # Rate limit check - connection level if not await rate_limiter.check_websocket_limit(sid): await manager.emit_to_user( sid, "error", {"message": "Rate limited. Please slow down.", "code": "RATE_LIMITED"} ) return try: game_id_str = data.get("game_id") if not game_id_str: await manager.emit_to_user(sid, "error", {"message": "Missing game_id"}) return try: game_id = UUID(game_id_str) except (ValueError, AttributeError): await manager.emit_to_user( sid, "error", {"message": "Invalid game_id format"} ) return # Try to get from memory first state = state_manager.get_state(game_id) # If not in memory, recover from database if not state: logger.info(f"Game {game_id} not in memory, recovering from database") state = await state_manager.recover_game(game_id) if state: # Fetch play history from database for mid-game joins db_ops = DatabaseOperations() plays = await db_ops.get_plays(game_id) # Convert Play DB models to frontend-compatible PlayResult dicts # Play model uses: hit_type (outcome), result_description (description), batter_id recent_plays = [] for p in plays: outcome = p.hit_type or "" # Parse outcome for categorization try: outcome_enum = PlayOutcome(outcome) is_hit = outcome_enum.is_hit() except ValueError: is_hit = False recent_plays.append({ "play_number": p.play_number, "inning": p.inning, "half": p.half, "outcome": outcome, "description": p.result_description or "", "runs_scored": p.runs_scored or 0, "outs_recorded": p.outs_recorded or 0, "batter_lineup_id": p.batter_id, "runners_advanced": [], # Not stored in Play model "batter_result": p.batter_final, "is_hit": is_hit, "is_out": (p.outs_recorded or 0) > 0, "is_walk": outcome and "walk" in outcome.lower(), "is_strikeout": outcome and "strikeout" in outcome.lower(), }) # Emit game_state_sync with state and play history await manager.emit_to_user( sid, "game_state_sync", { "state": state.model_dump(mode="json"), "recent_plays": recent_plays, "timestamp": pendulum.now("UTC").isoformat(), } ) logger.info(f"Sent game state sync with {len(recent_plays)} plays for {game_id} to {sid}") else: await manager.emit_to_user( sid, "error", {"message": f"Game {game_id} not found"} ) logger.warning(f"Game {game_id} not found in memory or database") except GameNotFoundError as e: logger.warning(f"Game not found: {e.game_id}") await manager.emit_to_user(sid, "error", {"message": str(e)}) except SQLAlchemyError as e: logger.error(f"Database error fetching game state: {e}") await manager.emit_to_user(sid, "error", {"message": "Database error - please retry"}) except (ValueError, TypeError) as e: logger.warning(f"Invalid data in request_game_state: {e}") await manager.emit_to_user(sid, "error", {"message": "Invalid request data"}) @sio.event async def roll_dice(sid, data): """ Roll dice for manual outcome selection. Server rolls dice and broadcasts to all players in game room. Players then read their physical cards and submit outcomes. Event data: game_id: UUID of the game Emits: dice_rolled: Broadcast to game room with dice results error: To requester if validation fails """ # Update activity timestamp await manager.update_activity(sid) # Rate limit check - connection level if not await rate_limiter.check_websocket_limit(sid): await manager.emit_to_user( sid, "error", {"message": "Rate limited. Please slow down.", "code": "RATE_LIMITED"} ) return try: # Extract and validate game_id game_id_str = data.get("game_id") if not game_id_str: await manager.emit_to_user(sid, "error", {"message": "Missing game_id"}) return try: game_id = UUID(game_id_str) except (ValueError, AttributeError): await manager.emit_to_user( sid, "error", {"message": "Invalid game_id format"} ) return # Rate limit check - game level for rolls if not await rate_limiter.check_game_limit(str(game_id), "roll"): await manager.emit_to_user( sid, "error", {"message": "Too many roll requests. Please wait.", "code": "GAME_RATE_LIMITED"} ) return # Acquire lock before modifying state async with state_manager.game_lock(game_id): # Get game state state = state_manager.get_state(game_id) if not state: await manager.emit_to_user( sid, "error", {"message": f"Game {game_id} not found"} ) return # TODO: Verify user is participant in this game # user_id = manager.user_sessions.get(sid) # if not is_game_participant(game_id, user_id): # await manager.emit_to_user(sid, "error", {"message": "Not authorized"}) # return # Roll dice ab_roll = dice_system.roll_ab(league_id=state.league_id, game_id=game_id) logger.info( f"Dice rolled for game {game_id}: " f"d6={ab_roll.d6_one}, 2d6={ab_roll.d6_two_total}, " f"chaos={ab_roll.chaos_d20}, resolution={ab_roll.resolution_d20}" ) # Store roll in game state for manual outcome validation state.pending_manual_roll = ab_roll state_manager.update_state(game_id, state) # Broadcast dice results to all players in game (outside lock) await manager.broadcast_to_game( str(game_id), "dice_rolled", { "game_id": str(game_id), "roll_id": ab_roll.roll_id, "d6_one": ab_roll.d6_one, "d6_two_a": ab_roll.d6_two_a, "d6_two_b": ab_roll.d6_two_b, "d6_two_total": ab_roll.d6_two_total, "chaos_d20": ab_roll.chaos_d20, "resolution_d20": ab_roll.resolution_d20, "check_wild_pitch": ab_roll.check_wild_pitch, "check_passed_ball": ab_roll.check_passed_ball, "timestamp": ab_roll.timestamp.to_iso8601_string(), "message": "Dice rolled - read your card and submit outcome", }, ) except asyncio.TimeoutError: logger.error(f"Lock timeout while rolling dice for game {game_id}") await manager.emit_to_user( sid, "error", {"message": "Server busy - please try again"} ) except GameValidationError as e: logger.warning(f"Validation error in roll_dice: {e}") await manager.emit_to_user(sid, "error", {"message": str(e)}) except SQLAlchemyError as e: logger.error(f"Database error during roll_dice: {e}") await manager.emit_to_user( sid, "error", {"message": "Database error - please retry"} ) except (ValueError, TypeError) as e: logger.warning(f"Invalid data in roll_dice: {e}") await manager.emit_to_user( sid, "error", {"message": "Invalid request data"} ) except Exception as e: # Unexpected error - ensure we log and report logger.error(f"Unexpected error in roll_dice: {e}", exc_info=True) await manager.emit_to_user( sid, "error", {"message": "An unexpected error occurred"} ) @sio.event async def submit_manual_outcome(sid, data): """ Submit manually-selected play outcome. After dice are rolled, players read their physical cards and submit the outcome they see. System validates and processes. Event data: game_id: UUID of the game outcome: PlayOutcome enum value (e.g., "groundball_c") hit_location: Optional position string (e.g., "SS") Emits: outcome_accepted: To requester if valid play_resolved: Broadcast to game room with play result outcome_rejected: To requester if validation fails error: To requester if processing fails """ # Update activity timestamp await manager.update_activity(sid) # Rate limit check - connection level if not await rate_limiter.check_websocket_limit(sid): await manager.emit_to_user( sid, "error", {"message": "Rate limited. Please slow down.", "code": "RATE_LIMITED"} ) return try: # Extract and validate game_id game_id_str = data.get("game_id") if not game_id_str: await manager.emit_to_user( sid, "outcome_rejected", {"message": "Missing game_id", "field": "game_id"}, ) return try: game_id = UUID(game_id_str) except (ValueError, AttributeError): await manager.emit_to_user( sid, "outcome_rejected", {"message": "Invalid game_id format", "field": "game_id"}, ) return # Get game state state = state_manager.get_state(game_id) if not state: await manager.emit_to_user( sid, "error", {"message": f"Game {game_id} not found"} ) return # Rate limit check - game level for decisions if not await rate_limiter.check_game_limit(str(game_id), "decision"): await manager.emit_to_user( sid, "error", {"message": "Too many outcome submissions. Please wait.", "code": "GAME_RATE_LIMITED"} ) return # TODO: Verify user is active batter or authorized to submit # user_id = manager.user_sessions.get(sid) # Extract outcome data outcome_str = data.get("outcome") hit_location = data.get("hit_location") if not outcome_str: await manager.emit_to_user( sid, "outcome_rejected", {"message": "Missing outcome", "field": "outcome"}, ) return # Validate using ManualOutcomeSubmission model try: submission = ManualOutcomeSubmission( outcome=outcome_str, hit_location=hit_location ) except ValidationError as e: # Extract first error for user-friendly message first_error = e.errors()[0] field = first_error["loc"][0] if first_error["loc"] else "unknown" message = first_error["msg"] await manager.emit_to_user( sid, "outcome_rejected", {"message": message, "field": field, "errors": e.errors()}, ) logger.warning( f"Manual outcome validation failed for game {game_id}: {message}" ) return # Convert to PlayOutcome enum outcome = PlayOutcome(submission.outcome) # NOTE: Business rule validation (e.g., when hit_location is required based on # game state) is handled in PlayResolver, not here. This layer only validates # basic input format and type checking. # Acquire lock for state modifications async with state_manager.game_lock(game_id): # Re-fetch state inside lock to ensure consistency state = state_manager.get_state(game_id) if not state: await manager.emit_to_user( sid, "error", {"message": f"Game {game_id} not found"} ) return # Check for pending roll BEFORE accepting outcome if not state.pending_manual_roll: await manager.emit_to_user( sid, "outcome_rejected", { "message": "No pending dice roll - call roll_dice first", "field": "game_state", }, ) return ab_roll = state.pending_manual_roll logger.info( f"Manual outcome submitted for game {game_id}: " f"{outcome.value}" + (f" to {submission.hit_location}" if submission.hit_location else "") ) logger.info( f"Processing manual outcome with roll {ab_roll.roll_id}: " f"d6={ab_roll.d6_one}, 2d6={ab_roll.d6_two_total}, " f"chaos={ab_roll.chaos_d20}" ) # Process manual outcome through game engine try: result = await game_engine.resolve_manual_play( game_id=game_id, ab_roll=ab_roll, outcome=outcome, hit_location=submission.hit_location, ) # Clear pending roll only AFTER successful validation (one-time use) state.pending_manual_roll = None state_manager.update_state(game_id, state) except GameValidationError as e: # Game engine validation error (e.g., missing hit location) await manager.emit_to_user( sid, "outcome_rejected", {"message": str(e), "field": "validation"} ) logger.warning(f"Manual play validation failed: {e}") return except ValueError as e: # Business logic validation error from PlayResolver await manager.emit_to_user( sid, "outcome_rejected", {"message": str(e), "field": "validation"} ) logger.warning(f"Manual play business logic validation failed: {e}") return except DatabaseError as e: # Database error during play resolution logger.error(f"Database error resolving manual play: {e}") await manager.emit_to_user( sid, "error", {"message": "Database error during play resolution - please retry"} ) return except SQLAlchemyError as e: # SQLAlchemy error during play resolution logger.error(f"SQLAlchemy error resolving manual play: {e}") await manager.emit_to_user( sid, "error", {"message": "Database error - please retry"} ) return # Broadcasting happens outside lock to avoid holding it too long # Confirm acceptance to submitter AFTER successful validation await manager.emit_to_user( sid, "outcome_accepted", { "game_id": str(game_id), "outcome": outcome.value, "hit_location": submission.hit_location, }, ) # Build play result data play_result_data = { "game_id": str(game_id), "play_number": state.play_count, "inning": state.inning, "half": state.half, "outcome": result.outcome.value, # Use resolved outcome, not submitted "hit_location": submission.hit_location, "description": result.description, "outs_recorded": result.outs_recorded, "runs_scored": result.runs_scored, "batter_result": result.batter_result, "batter_lineup_id": state.current_batter.lineup_id if state.current_batter else None, "runners_advanced": [ { "from": adv.from_base, "to": adv.to_base, "lineup_id": adv.lineup_id, "is_out": adv.is_out, } for adv in result.runners_advanced ], "is_hit": result.is_hit, "is_out": result.is_out, "is_walk": result.is_walk, "roll_id": ab_roll.roll_id, } # Include X-Check details if present (Phase 3E-Final) if result.x_check_details: xcheck = result.x_check_details play_result_data["x_check_details"] = { "position": xcheck.position, "d20_roll": xcheck.d20_roll, "d6_roll": xcheck.d6_roll, "defender_range": xcheck.defender_range, "defender_error_rating": xcheck.defender_error_rating, "defender_id": xcheck.defender_id, "base_result": xcheck.base_result, "converted_result": xcheck.converted_result, "error_result": xcheck.error_result, "final_outcome": xcheck.final_outcome.value, "hit_type": xcheck.hit_type, # Optional SPD test details "spd_test_roll": xcheck.spd_test_roll, "spd_test_target": xcheck.spd_test_target, "spd_test_passed": xcheck.spd_test_passed, } # Broadcast play result to game room await manager.broadcast_to_game( str(game_id), "play_resolved", play_result_data ) logger.info( f"Manual play resolved for game {game_id}: {result.description}" ) # Broadcast updated game state so frontend sees new batter, outs, etc. updated_state = state_manager.get_state(game_id) if updated_state: batter_info = f"lineup_id={updated_state.current_batter.lineup_id}, batting_order={updated_state.current_batter.batting_order}" if updated_state.current_batter else "None" logger.info(f"Broadcasting game_state_update with current_batter: {batter_info}") await manager.broadcast_to_game( str(game_id), "game_state_update", updated_state.model_dump(mode="json"), ) except asyncio.TimeoutError: logger.error(f"Lock timeout while submitting manual outcome for game {game_id}") await manager.emit_to_user( sid, "error", {"message": "Server busy - please try again"} ) except DatabaseError as e: logger.error(f"Database error in submit_manual_outcome: {e}") await manager.emit_to_user( sid, "error", {"message": "Database error - please retry"} ) except SQLAlchemyError as e: logger.error(f"SQLAlchemy error in submit_manual_outcome: {e}") await manager.emit_to_user( sid, "error", {"message": "Database error - please retry"} ) except (ValueError, TypeError) as e: logger.warning(f"Invalid data in submit_manual_outcome: {e}") await manager.emit_to_user( sid, "error", {"message": "Invalid outcome data"} ) # ===== SUBSTITUTION EVENTS ===== @sio.event async def request_pinch_hitter(sid, data): """ Request pinch hitter substitution. Replaces current batter with a player from the bench. The substitute takes the batting order position of the replaced player. Event data: game_id: UUID of the game player_out_lineup_id: int - lineup ID of player being removed player_in_card_id: int - card/player ID of substitute team_id: int - team making substitution Emits: player_substituted: Broadcast to game room on success substitution_confirmed: To requester with new lineup_id substitution_error: To requester if validation fails error: To requester if processing fails """ # Update activity timestamp await manager.update_activity(sid) # Rate limit check - connection level if not await rate_limiter.check_websocket_limit(sid): await manager.emit_to_user( sid, "error", {"message": "Rate limited. Please slow down.", "code": "RATE_LIMITED"} ) return try: # Extract and validate game_id game_id_str = data.get("game_id") if not game_id_str: await manager.emit_to_user( sid, "substitution_error", {"message": "Missing game_id", "code": "MISSING_FIELD"}, ) return try: game_id = UUID(game_id_str) except (ValueError, AttributeError): await manager.emit_to_user( sid, "substitution_error", {"message": "Invalid game_id format", "code": "INVALID_FORMAT"}, ) return # Rate limit check - game level for substitutions if not await rate_limiter.check_game_limit(str(game_id), "substitution"): await manager.emit_to_user( sid, "error", {"message": "Too many substitution requests. Please wait.", "code": "GAME_RATE_LIMITED"} ) return # Get game state state = state_manager.get_state(game_id) if not state: await manager.emit_to_user( sid, "error", {"message": f"Game {game_id} not found"} ) return # Extract substitution data player_out_lineup_id = data.get("player_out_lineup_id") player_in_card_id = data.get("player_in_card_id") team_id = data.get("team_id") if player_out_lineup_id is None: await manager.emit_to_user( sid, "substitution_error", { "message": "Missing player_out_lineup_id", "code": "MISSING_FIELD", }, ) return if player_in_card_id is None: await manager.emit_to_user( sid, "substitution_error", {"message": "Missing player_in_card_id", "code": "MISSING_FIELD"}, ) return if team_id is None: await manager.emit_to_user( sid, "substitution_error", {"message": "Missing team_id", "code": "MISSING_FIELD"}, ) return # TODO: Verify user is authorized to make substitution for this team # user_id = manager.user_sessions.get(sid) logger.info( f"Pinch hitter request for game {game_id}: " f"Replacing {player_out_lineup_id} with card {player_in_card_id}" ) # Acquire lock before substitution to prevent concurrent lineup modifications async with state_manager.game_lock(game_id): # Create SubstitutionManager instance db_ops = DatabaseOperations() sub_manager = SubstitutionManager(db_ops) # Execute pinch hitter substitution result = await sub_manager.pinch_hit( game_id=game_id, player_out_lineup_id=player_out_lineup_id, player_in_card_id=player_in_card_id, team_id=team_id, ) # Broadcasting happens outside lock if result.success: # Broadcast to all clients in game await manager.broadcast_to_game( str(game_id), "player_substituted", { "type": "pinch_hitter", "player_out_lineup_id": result.player_out_lineup_id, "player_in_card_id": result.player_in_card_id, "new_lineup_id": result.new_lineup_id, "position": result.new_position, "batting_order": result.new_batting_order, "team_id": team_id, "message": f"Pinch hitter: #{result.new_batting_order} now batting", }, ) # Send confirmation to requester await manager.emit_to_user( sid, "substitution_confirmed", { "type": "pinch_hitter", "new_lineup_id": result.new_lineup_id, "success": True, }, ) logger.info( f"Pinch hitter successful for game {game_id}: " f"New lineup ID {result.new_lineup_id}" ) else: # Send error to requester with error code await manager.emit_to_user( sid, "substitution_error", { "message": result.error_message, "code": result.error_code, "type": "pinch_hitter", }, ) logger.warning( f"Pinch hitter failed for game {game_id}: {result.error_message}" ) except asyncio.TimeoutError: logger.error(f"Lock timeout while processing pinch hitter for game {game_id}") await manager.emit_to_user( sid, "error", {"message": "Server busy - please try again"} ) except SQLAlchemyError as e: logger.error(f"Database error in pinch hitter: {e}") await manager.emit_to_user( sid, "error", {"message": "Database error - please retry"} ) except (ValueError, TypeError) as e: logger.warning(f"Invalid data in pinch hitter request: {e}") await manager.emit_to_user( sid, "error", {"message": "Invalid substitution data"} ) @sio.event async def request_defensive_replacement(sid, data): """ Request defensive replacement substitution. Replaces a defensive player with a better fielder. Player can be swapped at any position. If player is in batting order, substitute takes their batting order spot. Event data: game_id: UUID of the game player_out_lineup_id: int - lineup ID of player being removed player_in_card_id: int - card/player ID of substitute new_position: str - defensive position for substitute (e.g., "SS") team_id: int - team making substitution Emits: player_substituted: Broadcast to game room on success substitution_confirmed: To requester with new lineup_id substitution_error: To requester if validation fails error: To requester if processing fails """ # Update activity timestamp await manager.update_activity(sid) # Rate limit check - connection level if not await rate_limiter.check_websocket_limit(sid): await manager.emit_to_user( sid, "error", {"message": "Rate limited. Please slow down.", "code": "RATE_LIMITED"} ) return try: # Extract and validate game_id game_id_str = data.get("game_id") if not game_id_str: await manager.emit_to_user( sid, "substitution_error", {"message": "Missing game_id", "code": "MISSING_FIELD"}, ) return try: game_id = UUID(game_id_str) except (ValueError, AttributeError): await manager.emit_to_user( sid, "substitution_error", {"message": "Invalid game_id format", "code": "INVALID_FORMAT"}, ) return # Rate limit check - game level for substitutions if not await rate_limiter.check_game_limit(str(game_id), "substitution"): await manager.emit_to_user( sid, "error", {"message": "Too many substitution requests. Please wait.", "code": "GAME_RATE_LIMITED"} ) return # Get game state state = state_manager.get_state(game_id) if not state: await manager.emit_to_user( sid, "error", {"message": f"Game {game_id} not found"} ) return # Extract substitution data player_out_lineup_id = data.get("player_out_lineup_id") player_in_card_id = data.get("player_in_card_id") new_position = data.get("new_position") team_id = data.get("team_id") if player_out_lineup_id is None: await manager.emit_to_user( sid, "substitution_error", { "message": "Missing player_out_lineup_id", "code": "MISSING_FIELD", }, ) return if player_in_card_id is None: await manager.emit_to_user( sid, "substitution_error", {"message": "Missing player_in_card_id", "code": "MISSING_FIELD"}, ) return if not new_position: await manager.emit_to_user( sid, "substitution_error", {"message": "Missing new_position", "code": "MISSING_FIELD"}, ) return if team_id is None: await manager.emit_to_user( sid, "substitution_error", {"message": "Missing team_id", "code": "MISSING_FIELD"}, ) return # TODO: Verify user is authorized to make substitution for this team # user_id = manager.user_sessions.get(sid) logger.info( f"Defensive replacement request for game {game_id}: " f"Replacing {player_out_lineup_id} with card {player_in_card_id} at {new_position}" ) # Acquire lock before substitution to prevent concurrent lineup modifications async with state_manager.game_lock(game_id): # Create SubstitutionManager instance db_ops = DatabaseOperations() sub_manager = SubstitutionManager(db_ops) # Execute defensive replacement result = await sub_manager.defensive_replace( game_id=game_id, player_out_lineup_id=player_out_lineup_id, player_in_card_id=player_in_card_id, new_position=new_position, team_id=team_id, ) # Broadcasting happens outside lock if result.success: # Broadcast to all clients in game await manager.broadcast_to_game( str(game_id), "player_substituted", { "type": "defensive_replacement", "player_out_lineup_id": result.player_out_lineup_id, "player_in_card_id": result.player_in_card_id, "new_lineup_id": result.new_lineup_id, "position": result.new_position, "batting_order": result.new_batting_order, "team_id": team_id, "message": f"Defensive replacement: {result.new_position}", }, ) # Send confirmation to requester await manager.emit_to_user( sid, "substitution_confirmed", { "type": "defensive_replacement", "new_lineup_id": result.new_lineup_id, "success": True, }, ) logger.info( f"Defensive replacement successful for game {game_id}: " f"New lineup ID {result.new_lineup_id}" ) else: # Send error to requester with error code await manager.emit_to_user( sid, "substitution_error", { "message": result.error_message, "code": result.error_code, "type": "defensive_replacement", }, ) logger.warning( f"Defensive replacement failed for game {game_id}: {result.error_message}" ) except asyncio.TimeoutError: logger.error(f"Lock timeout while processing defensive replacement for game {game_id}") await manager.emit_to_user( sid, "error", {"message": "Server busy - please try again"} ) except SQLAlchemyError as e: logger.error(f"Database error in defensive replacement: {e}") await manager.emit_to_user( sid, "error", {"message": "Database error - please retry"} ) except (ValueError, TypeError) as e: logger.warning(f"Invalid data in defensive replacement request: {e}") await manager.emit_to_user( sid, "error", {"message": "Invalid substitution data"} ) @sio.event async def request_pitching_change(sid, data): """ Request pitching change substitution. Replaces current pitcher with a reliever. Pitcher must have faced at least 1 batter unless injury. New pitcher takes mound immediately. Event data: game_id: UUID of the game player_out_lineup_id: int - lineup ID of pitcher being removed player_in_card_id: int - card/player ID of relief pitcher team_id: int - team making substitution Emits: player_substituted: Broadcast to game room on success substitution_confirmed: To requester with new lineup_id substitution_error: To requester if validation fails error: To requester if processing fails """ # Update activity timestamp await manager.update_activity(sid) # Rate limit check - connection level if not await rate_limiter.check_websocket_limit(sid): await manager.emit_to_user( sid, "error", {"message": "Rate limited. Please slow down.", "code": "RATE_LIMITED"} ) return try: # Extract and validate game_id game_id_str = data.get("game_id") if not game_id_str: await manager.emit_to_user( sid, "substitution_error", {"message": "Missing game_id", "code": "MISSING_FIELD"}, ) return try: game_id = UUID(game_id_str) except (ValueError, AttributeError): await manager.emit_to_user( sid, "substitution_error", {"message": "Invalid game_id format", "code": "INVALID_FORMAT"}, ) return # Rate limit check - game level for substitutions if not await rate_limiter.check_game_limit(str(game_id), "substitution"): await manager.emit_to_user( sid, "error", {"message": "Too many substitution requests. Please wait.", "code": "GAME_RATE_LIMITED"} ) return # Get game state state = state_manager.get_state(game_id) if not state: await manager.emit_to_user( sid, "error", {"message": f"Game {game_id} not found"} ) return # Extract substitution data player_out_lineup_id = data.get("player_out_lineup_id") player_in_card_id = data.get("player_in_card_id") team_id = data.get("team_id") if player_out_lineup_id is None: await manager.emit_to_user( sid, "substitution_error", { "message": "Missing player_out_lineup_id", "code": "MISSING_FIELD", }, ) return if player_in_card_id is None: await manager.emit_to_user( sid, "substitution_error", {"message": "Missing player_in_card_id", "code": "MISSING_FIELD"}, ) return if team_id is None: await manager.emit_to_user( sid, "substitution_error", {"message": "Missing team_id", "code": "MISSING_FIELD"}, ) return # TODO: Verify user is authorized to make substitution for this team # user_id = manager.user_sessions.get(sid) logger.info( f"Pitching change request for game {game_id}: " f"Replacing {player_out_lineup_id} with card {player_in_card_id}" ) # Acquire lock before substitution to prevent concurrent lineup modifications async with state_manager.game_lock(game_id): # Create SubstitutionManager instance db_ops = DatabaseOperations() sub_manager = SubstitutionManager(db_ops) # Execute pitching change result = await sub_manager.change_pitcher( game_id=game_id, player_out_lineup_id=player_out_lineup_id, player_in_card_id=player_in_card_id, team_id=team_id, ) # Broadcasting happens outside lock if result.success: # Broadcast to all clients in game await manager.broadcast_to_game( str(game_id), "player_substituted", { "type": "pitching_change", "player_out_lineup_id": result.player_out_lineup_id, "player_in_card_id": result.player_in_card_id, "new_lineup_id": result.new_lineup_id, "position": result.new_position, # Should be "P" "batting_order": result.new_batting_order, "team_id": team_id, "message": "Pitching change: New pitcher entering", }, ) # Send confirmation to requester await manager.emit_to_user( sid, "substitution_confirmed", { "type": "pitching_change", "new_lineup_id": result.new_lineup_id, "success": True, }, ) logger.info( f"Pitching change successful for game {game_id}: " f"New lineup ID {result.new_lineup_id}" ) else: # Send error to requester with error code await manager.emit_to_user( sid, "substitution_error", { "message": result.error_message, "code": result.error_code, "type": "pitching_change", }, ) logger.warning( f"Pitching change failed for game {game_id}: {result.error_message}" ) except asyncio.TimeoutError: logger.error(f"Lock timeout while processing pitching change for game {game_id}") await manager.emit_to_user( sid, "error", {"message": "Server busy - please try again"} ) except SQLAlchemyError as e: logger.error(f"Database error in pitching change: {e}") await manager.emit_to_user( sid, "error", {"message": "Database error - please retry"} ) except (ValueError, TypeError) as e: logger.warning(f"Invalid data in pitching change request: {e}") await manager.emit_to_user( sid, "error", {"message": "Invalid substitution data"} ) @sio.event async def get_lineup(sid, data): """ Get current active lineup for a team. Returns all active players in the lineup with their positions and batting orders. Used by UI to refresh lineup display. Event data: game_id: UUID of the game team_id: int - team to get lineup for Emits: lineup_data: To requester with active lineup error: To requester if validation fails """ # Update activity timestamp await manager.update_activity(sid) # Rate limit check - connection level if not await rate_limiter.check_websocket_limit(sid): await manager.emit_to_user( sid, "error", {"message": "Rate limited. Please slow down.", "code": "RATE_LIMITED"} ) return try: # Extract and validate game_id game_id_str = data.get("game_id") if not game_id_str: await manager.emit_to_user(sid, "error", {"message": "Missing game_id"}) return try: game_id = UUID(game_id_str) except (ValueError, AttributeError): await manager.emit_to_user( sid, "error", {"message": "Invalid game_id format"} ) return # Extract team_id team_id = data.get("team_id") if team_id is None: await manager.emit_to_user(sid, "error", {"message": "Missing team_id"}) return # TODO: Verify user has access to view this lineup # user_id = manager.user_sessions.get(sid) # Get lineup from state manager cache (fast O(1) lookup) lineup = state_manager.get_lineup(game_id, team_id) if lineup: # Send lineup data with player info await manager.emit_to_user( sid, "lineup_data", { "game_id": str(game_id), "team_id": team_id, "players": [ { "lineup_id": p.lineup_id, "card_id": p.card_id, "position": p.position, "batting_order": p.batting_order, "is_active": p.is_active, "is_starter": p.is_starter, "player": { "id": p.card_id, "name": p.player_name or f"Player #{p.card_id}", "image": p.player_image or "", "headshot": p.player_headshot or "", }, } for p in lineup.players if p.is_active ], }, ) logger.info(f"Lineup data sent for game {game_id}, team {team_id}") else: # Lineup not in cache - try to load from database with player data # Get league_id from game state or database state = state_manager.get_state(game_id) league_id = state.league_id if state else "sba" lineup_state = await lineup_service.load_team_lineup_with_player_data( game_id=game_id, team_id=team_id, league_id=league_id ) if lineup_state: # Cache the lineup for future requests state_manager.set_lineup(game_id, team_id, lineup_state) # Send lineup data with player info await manager.emit_to_user( sid, "lineup_data", { "game_id": str(game_id), "team_id": team_id, "players": [ { "lineup_id": p.lineup_id, "card_id": p.card_id, "position": p.position, "batting_order": p.batting_order, "is_active": p.is_active, "is_starter": p.is_starter, "player": { "id": p.card_id, "name": p.player_name or f"Player #{p.card_id}", "image": p.player_image or "", }, } for p in lineup_state.players if p.is_active ], }, ) logger.info( f"Lineup data loaded from DB with player data for game {game_id}, team {team_id}" ) else: await manager.emit_to_user( sid, "error", {"message": f"Lineup not found for team {team_id}"}, ) except SQLAlchemyError as e: logger.error(f"Database error in get_lineup: {e}") await manager.emit_to_user( sid, "error", {"message": "Database error - please retry"} ) except (ValueError, TypeError) as e: logger.warning(f"Invalid data in get_lineup request: {e}") await manager.emit_to_user( sid, "error", {"message": "Invalid lineup request"} ) @sio.event async def submit_defensive_decision(sid, data): """ Submit defensive team decision. Event data: game_id: UUID of the game alignment: Defensive alignment (normal, shifted_left, shifted_right, extreme_shift) infield_depth: Infield positioning (in, normal, back, double_play) outfield_depth: Outfield positioning (in, normal, back) hold_runners: List of bases to hold runners (e.g., [1, 3]) Emits: defensive_decision_submitted: To requester and broadcast to game room error: To requester if validation fails """ # Update activity timestamp await manager.update_activity(sid) # Rate limit check - connection level if not await rate_limiter.check_websocket_limit(sid): await manager.emit_to_user( sid, "error", {"message": "Rate limited. Please slow down.", "code": "RATE_LIMITED"} ) return try: # Extract and validate game_id game_id_str = data.get("game_id") if not game_id_str: await manager.emit_to_user(sid, "error", {"message": "Missing game_id"}) return try: game_id = UUID(game_id_str) except (ValueError, AttributeError): await manager.emit_to_user( sid, "error", {"message": "Invalid game_id format"} ) return # Rate limit check - game level for decisions if not await rate_limiter.check_game_limit(str(game_id), "decision"): await manager.emit_to_user( sid, "error", {"message": "Too many decision requests. Please wait.", "code": "GAME_RATE_LIMITED"} ) return # Get game state state = state_manager.get_state(game_id) if not state: await manager.emit_to_user( sid, "error", {"message": f"Game {game_id} not found"} ) return # TODO: Verify user is authorized (fielding team manager) # user_id = manager.user_sessions.get(sid) # Extract decision data alignment = data.get("alignment", "normal") infield_depth = data.get("infield_depth", "normal") outfield_depth = data.get("outfield_depth", "normal") hold_runners = data.get("hold_runners", []) # Create defensive decision from app.models.game_models import DefensiveDecision decision = DefensiveDecision( alignment=alignment, infield_depth=infield_depth, outfield_depth=outfield_depth, hold_runners=hold_runners, ) # Submit decision through game engine updated_state = await game_engine.submit_defensive_decision( game_id, decision ) logger.info( f"Defensive decision submitted for game {game_id}: " f"alignment={alignment}, infield={infield_depth}, outfield={outfield_depth}" ) # Broadcast to game room await manager.broadcast_to_game( str(game_id), "defensive_decision_submitted", { "game_id": str(game_id), "decision": { "alignment": alignment, "infield_depth": infield_depth, "outfield_depth": outfield_depth, "hold_runners": hold_runners, }, "pending_decision": updated_state.pending_decision, }, ) except ValidationError as e: logger.warning(f"Validation error in defensive decision: {e}") await manager.emit_to_user( sid, "error", {"message": "Invalid decision data"} ) except GameValidationError as e: logger.warning(f"Game validation error in defensive decision: {e}") await manager.emit_to_user(sid, "error", {"message": str(e)}) except SQLAlchemyError as e: logger.error(f"Database error in defensive decision: {e}") await manager.emit_to_user( sid, "error", {"message": "Database error - please retry"} ) except (ValueError, TypeError) as e: logger.warning(f"Invalid data in defensive decision: {e}") await manager.emit_to_user( sid, "error", {"message": "Invalid decision format"} ) @sio.event async def submit_offensive_decision(sid, data): """ Submit offensive team decision. Event data: game_id: UUID of the game action: String - offensive action (swing_away, steal, check_jump, hit_and_run, sac_bunt, squeeze_bunt) steal_attempts: List of bases for steal attempts - REQUIRED when action="steal" (e.g., [2, 3]) Emits: offensive_decision_submitted: To requester and broadcast to game room error: To requester if validation fails Session 2 Update (2025-01-14): Replaced approach with action field. Stealing is now an action choice. """ # Update activity timestamp await manager.update_activity(sid) # Rate limit check - connection level if not await rate_limiter.check_websocket_limit(sid): await manager.emit_to_user( sid, "error", {"message": "Rate limited. Please slow down.", "code": "RATE_LIMITED"} ) return try: # Extract and validate game_id game_id_str = data.get("game_id") if not game_id_str: await manager.emit_to_user(sid, "error", {"message": "Missing game_id"}) return try: game_id = UUID(game_id_str) except (ValueError, AttributeError): await manager.emit_to_user( sid, "error", {"message": "Invalid game_id format"} ) return # Rate limit check - game level for decisions if not await rate_limiter.check_game_limit(str(game_id), "decision"): await manager.emit_to_user( sid, "error", {"message": "Too many decision requests. Please wait.", "code": "GAME_RATE_LIMITED"} ) return # Get game state state = state_manager.get_state(game_id) if not state: await manager.emit_to_user( sid, "error", {"message": f"Game {game_id} not found"} ) return # TODO: Verify user is authorized (batting team manager) # user_id = manager.user_sessions.get(sid) # Extract decision data action = data.get("action", "swing_away") # Default: swing_away steal_attempts = data.get("steal_attempts", []) # Create offensive decision from app.models.game_models import OffensiveDecision decision = OffensiveDecision(action=action, steal_attempts=steal_attempts) # Submit decision through game engine updated_state = await game_engine.submit_offensive_decision( game_id, decision ) logger.info( f"Offensive decision submitted for game {game_id}: " f"action={action}, steal={steal_attempts}" ) # Broadcast to game room await manager.broadcast_to_game( str(game_id), "offensive_decision_submitted", { "game_id": str(game_id), "decision": {"action": action, "steal_attempts": steal_attempts}, "pending_decision": updated_state.pending_decision, }, ) except ValidationError as e: logger.warning(f"Validation error in offensive decision: {e}") await manager.emit_to_user( sid, "error", {"message": "Invalid decision data"} ) except GameValidationError as e: logger.warning(f"Game validation error in offensive decision: {e}") await manager.emit_to_user(sid, "error", {"message": str(e)}) except SQLAlchemyError as e: logger.error(f"Database error in offensive decision: {e}") await manager.emit_to_user( sid, "error", {"message": "Database error - please retry"} ) except (ValueError, TypeError) as e: logger.warning(f"Invalid data in offensive decision: {e}") await manager.emit_to_user( sid, "error", {"message": "Invalid decision format"} ) @sio.event async def get_box_score(sid, data): """ Get box score using materialized views. Event data: game_id: UUID of the game Emits: box_score_data: To requester with box score error: To requester if validation fails """ # Update activity timestamp await manager.update_activity(sid) # Rate limit check - connection level if not await rate_limiter.check_websocket_limit(sid): await manager.emit_to_user( sid, "error", {"message": "Rate limited. Please slow down.", "code": "RATE_LIMITED"} ) return try: # Extract and validate game_id game_id_str = data.get("game_id") if not game_id_str: await manager.emit_to_user(sid, "error", {"message": "Missing game_id"}) return try: game_id = UUID(game_id_str) except (ValueError, AttributeError): await manager.emit_to_user( sid, "error", {"message": "Invalid game_id format"} ) return # TODO: Verify user has access to view this game's box score # user_id = manager.user_sessions.get(sid) # Get box score from materialized views from app.services import box_score_service box_score = await box_score_service.get_box_score(game_id) if box_score: # Send box score data to requester await manager.emit_to_user( sid, "box_score_data", {"game_id": str(game_id), "box_score": box_score}, ) logger.info(f"Box score data sent for game {game_id}") else: await manager.emit_to_user( sid, "error", { "message": "No box score found for game", "hint": "Run migration (alembic upgrade head) and refresh views", }, ) except SQLAlchemyError as e: logger.error(f"Database error in get_box_score: {e}") await manager.emit_to_user( sid, "error", {"message": "Database error fetching box score - please retry"} ) except (ValueError, TypeError) as e: logger.warning(f"Invalid data in get_box_score request: {e}") await manager.emit_to_user( sid, "error", {"message": "Invalid box score request"} ) @sio.event async def rollback_play(sid, data): """ Roll back the last N plays. Deletes plays from the database and reconstructs game state by replaying remaining plays. Also removes any substitutions that occurred during the rolled-back plays. Event data: game_id: UUID of the game num_plays: int - Number of plays to roll back (default: 1) Emits: play_rolled_back: Broadcast to game room with new state game_state_update: Broadcast to game room with updated state error: To requester if validation fails """ # Update activity timestamp await manager.update_activity(sid) # Rate limit check - connection level if not await rate_limiter.check_websocket_limit(sid): await manager.emit_to_user( sid, "error", {"message": "Rate limited. Please slow down.", "code": "RATE_LIMITED"} ) return try: # Extract and validate game_id game_id_str = data.get("game_id") if not game_id_str: await manager.emit_to_user(sid, "error", {"message": "Missing game_id"}) return try: game_id = UUID(game_id_str) except (ValueError, AttributeError): await manager.emit_to_user( sid, "error", {"message": "Invalid game_id format"} ) return # Extract num_plays (default to 1) num_plays = data.get("num_plays", 1) if not isinstance(num_plays, int) or num_plays < 1: await manager.emit_to_user( sid, "error", {"message": "num_plays must be a positive integer"} ) return # Rate limit check - game level for rollback (same as decisions) if not await rate_limiter.check_game_limit(str(game_id), "decision"): await manager.emit_to_user( sid, "error", {"message": "Too many requests. Please wait.", "code": "GAME_RATE_LIMITED"} ) return # TODO: Verify user is authorized (game manager/owner only) # user_id = manager.user_sessions.get(sid) logger.info(f"Rollback request for game {game_id}: {num_plays} play(s)") # Acquire lock before modifying state async with state_manager.game_lock(game_id): # Execute rollback new_state = await game_engine.rollback_plays(game_id, num_plays) # Broadcast rollback notification to all players in game (outside lock) await manager.broadcast_to_game( str(game_id), "play_rolled_back", { "game_id": str(game_id), "num_plays": num_plays, "new_play_count": new_state.play_count, "inning": new_state.inning, "half": new_state.half, "message": f"Rolled back {num_plays} play(s)", }, ) # Broadcast updated game state to all players await manager.broadcast_to_game( str(game_id), "game_state_update", new_state.model_dump(mode="json"), ) logger.info( f"Rollback successful for game {game_id}: " f"Now at play {new_state.play_count}, inning {new_state.inning} {new_state.half}" ) except ValueError as e: # Validation errors from game_engine.rollback_plays logger.warning(f"Rollback validation error for game {game_id}: {e}") await manager.emit_to_user(sid, "error", {"message": str(e)}) except asyncio.TimeoutError: logger.error(f"Lock timeout while rolling back game {game_id}") await manager.emit_to_user( sid, "error", {"message": "Server busy - please try again"} ) except DatabaseError as e: logger.error(f"Database error in rollback_play: {e}") await manager.emit_to_user( sid, "error", {"message": "Database error - please retry"} ) except SQLAlchemyError as e: logger.error(f"Database error in rollback_play: {e}") await manager.emit_to_user( sid, "error", {"message": "Database error - please retry"} ) except (TypeError, AttributeError) as e: logger.warning(f"Invalid data in rollback_play request: {e}") await manager.emit_to_user( sid, "error", {"message": "Invalid rollback request"} )