""" Database Operations - Async persistence layer for game data. Provides async operations for persisting and retrieving game data. Used by StateManager for database persistence and recovery. Author: Claude Date: 2025-10-22 """ # pyright: reportAssignmentType=false, reportArgumentType=false, reportAttributeAccessIssue=false # Note: SQLAlchemy Column descriptors cause false positives in Pylance/Pyright import logging from uuid import UUID from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.database.session import AsyncSessionLocal from app.models.db_models import Game, GameSession, Lineup, Play, Roll, RosterLink from app.models.roster_models import PdRosterLinkData, SbaRosterLinkData logger = logging.getLogger(f"{__name__}.DatabaseOperations") class DatabaseOperations: """ Async database operations for game persistence. Provides methods for creating, reading, and updating game data in PostgreSQL. All operations are async and use the AsyncSessionLocal for session management. """ async def create_game( self, game_id: UUID, league_id: str, home_team_id: int, away_team_id: int, game_mode: str, visibility: str, home_team_is_ai: bool = False, away_team_is_ai: bool = False, ai_difficulty: str | None = None, ) -> Game: """ Create new game in database. Args: game_id: Unique game identifier league_id: League identifier ('sba' or 'pd') home_team_id: Home team ID away_team_id: Away team ID game_mode: Game mode ('ranked', 'friendly', 'practice') visibility: Visibility ('public', 'private') home_team_is_ai: Whether home team is AI away_team_is_ai: Whether away team is AI ai_difficulty: AI difficulty if applicable Returns: Created Game model Raises: SQLAlchemyError: If database operation fails """ async with AsyncSessionLocal() as session: try: game = Game( id=game_id, league_id=league_id, home_team_id=home_team_id, away_team_id=away_team_id, game_mode=game_mode, visibility=visibility, home_team_is_ai=home_team_is_ai, away_team_is_ai=away_team_is_ai, ai_difficulty=ai_difficulty, status="pending", ) session.add(game) await session.commit() await session.refresh(game) logger.info(f"Created game {game_id} in database ({league_id})") return game except Exception as e: await session.rollback() logger.error(f"Failed to create game {game_id}: {e}") raise async def get_game(self, game_id: UUID) -> Game | None: """ Get game by ID. Args: game_id: Game identifier Returns: Game model if found, None otherwise """ async with AsyncSessionLocal() as session: result = await session.execute(select(Game).where(Game.id == game_id)) game = result.scalar_one_or_none() if game: logger.debug(f"Retrieved game {game_id} from database") return game async def update_game_state( self, game_id: UUID, inning: int, half: str, home_score: int, away_score: int, status: str | None = None, session: AsyncSession | None = None, ) -> None: """ Update game state fields using direct UPDATE (no SELECT). Args: game_id: Game identifier inning: Current inning half: Current half ('top' or 'bottom') home_score: Home team score away_score: Away team score status: Game status if updating session: Optional external session for transaction grouping Raises: ValueError: If game not found """ from sqlalchemy import update async def _do_update(sess: AsyncSession) -> None: # Build update values update_values = { "current_inning": inning, "current_half": half, "home_score": home_score, "away_score": away_score, } if status: update_values["status"] = status # Direct UPDATE statement (no SELECT needed) result = await sess.execute( update(Game).where(Game.id == game_id).values(**update_values) ) if result.rowcount == 0: raise ValueError(f"Game {game_id} not found for update") # Use provided session or create new one if session: await _do_update(session) # Don't commit - caller controls transaction logger.debug(f"Updated game {game_id} state in external transaction") else: async with AsyncSessionLocal() as new_session: try: await _do_update(new_session) await new_session.commit() logger.debug( f"Updated game {game_id} state (inning {inning}, {half})" ) except Exception as e: await new_session.rollback() logger.error(f"Failed to update game {game_id} state: {e}") raise async def add_pd_lineup_card( self, game_id: UUID, team_id: int, card_id: int, position: str, batting_order: int | None = None, is_starter: bool = True, ) -> Lineup: """ Add PD card to lineup. Args: game_id: Game identifier team_id: Team identifier card_id: Player card ID position: Player position batting_order: Batting order (1-9) if applicable is_starter: Whether player is starting lineup Returns: Created Lineup model Raises: SQLAlchemyError: If database operation fails """ async with AsyncSessionLocal() as session: try: lineup = Lineup( game_id=game_id, team_id=team_id, card_id=card_id, player_id=None, position=position, batting_order=batting_order, is_starter=is_starter, is_active=True, ) session.add(lineup) await session.commit() await session.refresh(lineup) logger.debug(f"Added PD card {card_id} to lineup in game {game_id}") return lineup except Exception as e: await session.rollback() logger.error(f"Failed to add PD lineup card: {e}") raise async def add_sba_lineup_player( self, game_id: UUID, team_id: int, player_id: int, position: str, batting_order: int | None = None, is_starter: bool = True, ) -> Lineup: """ Add SBA player to lineup. Args: game_id: Game identifier team_id: Team identifier player_id: Player ID position: Player position batting_order: Batting order (1-9) if applicable is_starter: Whether player is starting lineup Returns: Created Lineup model Raises: SQLAlchemyError: If database operation fails """ async with AsyncSessionLocal() as session: try: lineup = Lineup( game_id=game_id, team_id=team_id, card_id=None, player_id=player_id, position=position, batting_order=batting_order, is_starter=is_starter, is_active=True, ) session.add(lineup) await session.commit() await session.refresh(lineup) logger.debug( f"Added SBA player {player_id} to lineup in game {game_id}" ) return lineup except Exception as e: await session.rollback() logger.error(f"Failed to add SBA lineup player: {e}") raise async def get_active_lineup(self, game_id: UUID, team_id: int) -> list[Lineup]: """ Get active lineup for team. Args: game_id: Game identifier team_id: Team identifier Returns: List of active Lineup models, sorted by batting order """ async with AsyncSessionLocal() as session: result = await session.execute( select(Lineup) .where( Lineup.game_id == game_id, Lineup.team_id == team_id, Lineup.is_active == True, ) .order_by(Lineup.batting_order) ) lineups = list(result.scalars().all()) logger.debug( f"Retrieved {len(lineups)} active lineup entries for team {team_id}" ) return lineups async def create_substitution( self, game_id: UUID, team_id: int, player_out_lineup_id: int, player_in_card_id: int, position: str, batting_order: int | None, inning: int, play_number: int, ) -> int: """ Create substitution in database (DB-first pattern). Process: 1. Mark old player inactive (is_active = False) 2. Create new lineup entry with: - is_starter = False - is_active = True - entered_inning = current inning - replacing_id = old player's lineup_id - after_play = current play number 3. Return new lineup_id Args: game_id: Game identifier team_id: Team identifier player_out_lineup_id: Lineup ID of player being replaced player_in_card_id: Card/Player ID of incoming player position: Position for incoming player batting_order: Batting order for incoming player (can be None for non-batting positions) inning: Current inning play_number: Current play number Returns: New lineup_id for substituted player Raises: ValueError: If player_out not found SQLAlchemyError: If database operation fails """ async with AsyncSessionLocal() as session: try: # STEP 1: Mark old player inactive result = await session.execute( select(Lineup).where(Lineup.id == player_out_lineup_id) ) player_out = result.scalar_one_or_none() if not player_out: raise ValueError(f"Lineup entry {player_out_lineup_id} not found") player_out.is_active = False # STEP 2: Create new lineup entry new_lineup = Lineup( game_id=game_id, team_id=team_id, card_id=player_in_card_id, # For PD, will use card_id player_id=None, # For SBA, swap these position=position, batting_order=batting_order, is_starter=False, # Substitutes are never starters is_active=True, # New player is active entered_inning=inning, replacing_id=player_out_lineup_id, after_play=play_number, ) session.add(new_lineup) await session.commit() new_lineup_id = new_lineup.id # type: ignore[assignment] logger.info( f"Substitution created: lineup {player_out_lineup_id} → {new_lineup_id} " f"(card {player_in_card_id}, {position}, inning {inning})" ) return new_lineup_id except Exception as e: await session.rollback() logger.error(f"Failed to create substitution: {e}", exc_info=True) raise async def get_eligible_substitutes( self, game_id: UUID, team_id: int ) -> list[Lineup]: """ Get all inactive players (potential substitutes). Args: game_id: Game identifier team_id: Team identifier Returns: List of inactive Lineup models """ async with AsyncSessionLocal() as session: result = await session.execute( select(Lineup) .where( Lineup.game_id == game_id, Lineup.team_id == team_id, Lineup.is_active == False, ) .order_by(Lineup.batting_order) ) subs = list(result.scalars().all()) logger.debug( f"Retrieved {len(subs)} eligible substitutes for team {team_id}" ) return subs async def save_play( self, play_data: dict, session: AsyncSession | None = None ) -> int: """ Save play to database. Args: play_data: Dictionary with play data matching Play model fields session: Optional external session for transaction grouping Returns: Play ID (primary key) Raises: SQLAlchemyError: If database operation fails """ async def _do_save(sess: AsyncSession) -> int: play = Play(**play_data) sess.add(play) await sess.flush() # Get ID without committing play_id = play.id logger.info(f"Saved play {play.play_number} for game {play.game_id}") return play_id # type: ignore # Use provided session or create new one if session: return await _do_save(session) # Don't commit - caller controls transaction async with AsyncSessionLocal() as new_session: try: play_id = await _do_save(new_session) await new_session.commit() return play_id except Exception as e: await new_session.rollback() logger.error(f"Failed to save play: {e}") raise async def get_plays(self, game_id: UUID) -> list[Play]: """ Get all plays for game. Args: game_id: Game identifier Returns: List of Play models, ordered by play_number """ async with AsyncSessionLocal() as session: result = await session.execute( select(Play).where(Play.game_id == game_id).order_by(Play.play_number) ) plays = list(result.scalars().all()) logger.debug(f"Retrieved {len(plays)} plays for game {game_id}") return plays async def load_game_state(self, game_id: UUID) -> dict | None: """ Load complete game state for recovery. Loads game, lineups, and plays in a single transaction. Args: game_id: Game identifier Returns: Dictionary with 'game', 'lineups', and 'plays' keys, or None if game not found """ async with AsyncSessionLocal() as session: # Get game game_result = await session.execute(select(Game).where(Game.id == game_id)) game = game_result.scalar_one_or_none() if not game: logger.warning(f"Game {game_id} not found for recovery") return None # Get lineups lineup_result = await session.execute( select(Lineup).where( Lineup.game_id == game_id, Lineup.is_active == True ) ) lineups = list(lineup_result.scalars().all()) # Get plays play_result = await session.execute( select(Play).where(Play.game_id == game_id).order_by(Play.play_number) ) plays = list(play_result.scalars().all()) logger.info( f"Loaded game state for {game_id}: {len(lineups)} lineups, {len(plays)} plays" ) return { "game": { "id": game.id, "league_id": game.league_id, "home_team_id": game.home_team_id, "away_team_id": game.away_team_id, "home_team_is_ai": game.home_team_is_ai, "away_team_is_ai": game.away_team_is_ai, "status": game.status, "current_inning": game.current_inning, "current_half": game.current_half, "home_score": game.home_score, "away_score": game.away_score, }, "lineups": [ { "id": l.id, "team_id": l.team_id, "card_id": l.card_id, "player_id": l.player_id, "position": l.position, "batting_order": l.batting_order, "is_active": l.is_active, } for l in lineups ], "plays": [ { "play_number": p.play_number, "inning": p.inning, "half": p.half, "outs_before": p.outs_before, "result_description": p.result_description, "complete": p.complete, # Runner tracking for state recovery "batter_id": p.batter_id, "on_first_id": p.on_first_id, "on_second_id": p.on_second_id, "on_third_id": p.on_third_id, "batter_final": p.batter_final, "on_first_final": p.on_first_final, "on_second_final": p.on_second_final, "on_third_final": p.on_third_final, } for p in plays ], } async def create_game_session(self, game_id: UUID) -> GameSession: """ Create game session record for WebSocket tracking. Args: game_id: Game identifier Returns: Created GameSession model """ async with AsyncSessionLocal() as session: try: game_session = GameSession(game_id=game_id) session.add(game_session) await session.commit() await session.refresh(game_session) logger.info(f"Created game session for {game_id}") return game_session except Exception as e: await session.rollback() logger.error(f"Failed to create game session: {e}") raise async def update_session_snapshot( self, game_id: UUID, state_snapshot: dict ) -> None: """ Update session state snapshot. Args: game_id: Game identifier state_snapshot: JSON-serializable state snapshot Raises: ValueError: If game session not found """ async with AsyncSessionLocal() as session: try: result = await session.execute( select(GameSession).where(GameSession.game_id == game_id) ) game_session = result.scalar_one_or_none() if not game_session: raise ValueError(f"Game session {game_id} not found") game_session.state_snapshot = state_snapshot await session.commit() logger.debug(f"Updated session snapshot for {game_id}") except Exception as e: await session.rollback() logger.error(f"Failed to update session snapshot: {e}") raise async def add_pd_roster_card( self, game_id: UUID, card_id: int, team_id: int ) -> PdRosterLinkData: """ Add a PD card to game roster. Args: game_id: Game identifier card_id: Card identifier team_id: Team identifier Returns: PdRosterLinkData with populated id Raises: ValueError: If card already rostered or constraint violation """ async with AsyncSessionLocal() as session: try: roster_link = RosterLink( game_id=game_id, card_id=card_id, team_id=team_id ) session.add(roster_link) await session.commit() await session.refresh(roster_link) logger.info(f"Added PD card {card_id} to roster for game {game_id}") return PdRosterLinkData( id=roster_link.id, game_id=roster_link.game_id, card_id=roster_link.card_id, team_id=roster_link.team_id, ) except Exception as e: await session.rollback() logger.error(f"Failed to add PD roster card: {e}") raise ValueError(f"Could not add card to roster: {e}") async def add_sba_roster_player( self, game_id: UUID, player_id: int, team_id: int ) -> SbaRosterLinkData: """ Add an SBA player to game roster. Args: game_id: Game identifier player_id: Player identifier team_id: Team identifier Returns: SbaRosterLinkData with populated id Raises: ValueError: If player already rostered or constraint violation """ async with AsyncSessionLocal() as session: try: roster_link = RosterLink( game_id=game_id, player_id=player_id, team_id=team_id ) session.add(roster_link) await session.commit() await session.refresh(roster_link) logger.info( f"Added SBA player {player_id} to roster for game {game_id}" ) return SbaRosterLinkData( id=roster_link.id, game_id=roster_link.game_id, player_id=roster_link.player_id, team_id=roster_link.team_id, ) except Exception as e: await session.rollback() logger.error(f"Failed to add SBA roster player: {e}") raise ValueError(f"Could not add player to roster: {e}") async def get_pd_roster( self, game_id: UUID, team_id: int | None = None ) -> list[PdRosterLinkData]: """ Get PD cards for a game, optionally filtered by team. Args: game_id: Game identifier team_id: Optional team filter Returns: List of PdRosterLinkData """ async with AsyncSessionLocal() as session: try: query = select(RosterLink).where( RosterLink.game_id == game_id, RosterLink.card_id.is_not(None) ) if team_id is not None: query = query.where(RosterLink.team_id == team_id) result = await session.execute(query) roster_links = result.scalars().all() return [ PdRosterLinkData( id=link.id, game_id=link.game_id, card_id=link.card_id, team_id=link.team_id, ) for link in roster_links ] except Exception as e: logger.error(f"Failed to get PD roster: {e}") raise async def get_sba_roster( self, game_id: UUID, team_id: int | None = None ) -> list[SbaRosterLinkData]: """ Get SBA players for a game, optionally filtered by team. Args: game_id: Game identifier team_id: Optional team filter Returns: List of SbaRosterLinkData """ async with AsyncSessionLocal() as session: try: query = select(RosterLink).where( RosterLink.game_id == game_id, RosterLink.player_id.is_not(None) ) if team_id is not None: query = query.where(RosterLink.team_id == team_id) result = await session.execute(query) roster_links = result.scalars().all() return [ SbaRosterLinkData( id=link.id, game_id=link.game_id, player_id=link.player_id, team_id=link.team_id, ) for link in roster_links ] except Exception as e: logger.error(f"Failed to get SBA roster: {e}") raise async def remove_roster_entry(self, roster_id: int) -> None: """ Remove a roster entry by ID. Args: roster_id: RosterLink ID Raises: ValueError: If roster entry not found """ async with AsyncSessionLocal() as session: try: result = await session.execute( select(RosterLink).where(RosterLink.id == roster_id) ) roster_link = result.scalar_one_or_none() if not roster_link: raise ValueError(f"Roster entry {roster_id} not found") await session.delete(roster_link) await session.commit() logger.info(f"Removed roster entry {roster_id}") except Exception as e: await session.rollback() logger.error(f"Failed to remove roster entry: {e}") raise async def save_rolls_batch(self, rolls: list) -> None: """ Save multiple dice rolls in a single transaction. Used for batch persistence at end of innings. Args: rolls: List of DiceRoll objects (AbRoll, JumpRoll, FieldingRoll, D20Roll) Raises: Exception: If batch save fails """ if not rolls: logger.debug("No rolls to save") return async with AsyncSessionLocal() as session: try: roll_records = [ Roll( roll_id=roll.roll_id, game_id=roll.game_id, roll_type=roll.roll_type.value, league_id=roll.league_id, team_id=roll.team_id, player_id=roll.player_id, roll_data=roll.to_dict(), # Store full roll as JSONB context=roll.context, timestamp=roll.timestamp, ) for roll in rolls ] session.add_all(roll_records) await session.commit() logger.info(f"Batch saved {len(rolls)} rolls") except Exception as e: await session.rollback() logger.error(f"Failed to batch save rolls: {e}") raise async def get_rolls_for_game( self, game_id: UUID, roll_type: str | None = None, team_id: int | None = None, limit: int = 100, ) -> list[Roll]: """ Get roll history for a game with optional filtering. Args: game_id: Game identifier roll_type: Optional filter by roll type ('ab', 'jump', 'fielding', 'd20') team_id: Optional filter by team limit: Maximum rolls to return Returns: List of Roll objects """ async with AsyncSessionLocal() as session: try: query = select(Roll).where(Roll.game_id == game_id) if roll_type: query = query.where(Roll.roll_type == roll_type) if team_id is not None: query = query.where(Roll.team_id == team_id) query = query.order_by(Roll.timestamp.desc()).limit(limit) result = await session.execute(query) return list(result.scalars().all()) except Exception as e: logger.error(f"Failed to get rolls for game: {e}") raise # ============================================================================ # ROLLBACK OPERATIONS # ============================================================================ async def delete_plays_after(self, game_id: UUID, after_play_number: int) -> int: """ Delete all plays after a specific play number. Used for rolling back plays when a mistake is made. Args: game_id: Game to delete plays from after_play_number: Delete plays with play_number > this value Returns: Number of plays deleted """ async with AsyncSessionLocal() as session: try: from sqlalchemy import delete stmt = delete(Play).where( Play.game_id == game_id, Play.play_number > after_play_number ) result = await session.execute(stmt) await session.commit() deleted_count = result.rowcount logger.info( f"Deleted {deleted_count} plays after play {after_play_number} for game {game_id}" ) return deleted_count except Exception as e: await session.rollback() logger.error(f"Failed to delete plays: {e}") raise async def delete_substitutions_after( self, game_id: UUID, after_play_number: int ) -> int: """ Delete all substitutions that occurred after a specific play number. Used for rolling back lineups when plays are deleted. Args: game_id: Game to delete substitutions from after_play_number: Delete lineup entries with after_play >= this value Returns: Number of lineup entries deleted """ async with AsyncSessionLocal() as session: try: from sqlalchemy import delete stmt = delete(Lineup).where( Lineup.game_id == game_id, Lineup.after_play >= after_play_number ) result = await session.execute(stmt) await session.commit() deleted_count = result.rowcount logger.info( f"Deleted {deleted_count} substitutions after play {after_play_number} for game {game_id}" ) return deleted_count except Exception as e: await session.rollback() logger.error(f"Failed to delete substitutions: {e}") raise async def delete_rolls_after(self, game_id: UUID, after_play_number: int) -> int: """ Delete all dice rolls after a specific play number. Used for rolling back dice roll history when plays are deleted. Args: game_id: Game to delete rolls from after_play_number: Delete rolls with play_number > this value Returns: Number of rolls deleted """ async with AsyncSessionLocal() as session: try: from sqlalchemy import delete stmt = delete(Roll).where( Roll.game_id == game_id, Roll.play_number > after_play_number ) result = await session.execute(stmt) await session.commit() deleted_count = result.rowcount logger.info( f"Deleted {deleted_count} rolls after play {after_play_number} for game {game_id}" ) return deleted_count except Exception as e: await session.rollback() logger.error(f"Failed to delete rolls: {e}") raise