strat-gameplay-webapp/backend/app/database/operations.py
Cal Corum 9d0d29ef18 CLAUDE: Add Alembic migrations and database session injection
Database Infrastructure:
- Added Alembic migration system (alembic.ini, env.py)
- Migration 001: Initial schema
- Migration 004: Stat materialized views (enhanced)
- Migration 005: Composite indexes for performance
- operations.py: Session injection support for test isolation
- session.py: Enhanced session management

Application Updates:
- main.py: Integration with new database infrastructure
- health.py: Enhanced health checks with pool monitoring

Integration Tests:
- conftest.py: Session injection pattern for reliable tests
- test_operations.py: Database operations tests
- test_migrations.py: Migration verification tests

Session injection pattern enables:
- Production: Auto-commit per operation
- Testing: Shared session with automatic rollback
- Transactions: Multiple ops, single commit

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-28 12:09:09 -06:00

916 lines
30 KiB
Python

"""
Database Operations - Async persistence layer for game data.
Provides async operations for persisting and retrieving game data.
Used by StateManager for database persistence and recovery.
Supports session injection for efficient batching and easy testing:
# Production - auto-manages sessions
db_ops = DatabaseOperations()
await db_ops.create_game(...) # Creates and commits its own session
# Testing or batching - inject shared session
async with AsyncSessionLocal() as session:
db_ops = DatabaseOperations(session)
await db_ops.create_game(...) # Uses injected session
await db_ops.add_lineup(...) # Same session
await session.commit() # Caller commits
Author: Claude
Date: 2025-10-22
"""
# pyright: reportAssignmentType=false, reportArgumentType=false, reportAttributeAccessIssue=false
# Note: SQLAlchemy Column descriptors cause false positives in Pylance/Pyright
import logging
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.database.session import AsyncSessionLocal
from app.models.db_models import Game, GameSession, Lineup, Play, Roll, RosterLink
from app.models.roster_models import PdRosterLinkData, SbaRosterLinkData
logger = logging.getLogger(f"{__name__}.DatabaseOperations")
class DatabaseOperations:
"""
Async database operations for game persistence.
Provides methods for creating, reading, and updating game data in PostgreSQL.
Session Management:
- If constructed without a session, each method creates and commits its own session
- If constructed with a session, all methods use that session (caller commits)
Args:
session: Optional AsyncSession for dependency injection.
If provided, caller is responsible for commit/rollback.
"""
def __init__(self, session: AsyncSession | None = None):
"""
Initialize DatabaseOperations.
Args:
session: Optional AsyncSession. If provided, all operations use this
session and caller is responsible for committing.
If None, each operation creates its own auto-committing session.
"""
self._session = session
@asynccontextmanager
async def _get_session(self) -> AsyncGenerator[AsyncSession]:
"""
Get database session for operations.
If a session was injected at construction:
- Yields that session
- Does NOT commit (caller controls transaction)
If no session was injected:
- Creates a new session
- Auto-commits on success
- Rolls back on exception
"""
if self._session:
# Use injected session - caller controls transaction
yield self._session
else:
# Create new session with auto-commit/rollback
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
async def create_game(
self,
game_id: UUID,
league_id: str,
home_team_id: int,
away_team_id: int,
game_mode: str,
visibility: str,
home_team_is_ai: bool = False,
away_team_is_ai: bool = False,
ai_difficulty: str | None = None,
) -> Game:
"""
Create new game in database.
Args:
game_id: Unique game identifier
league_id: League identifier ('sba' or 'pd')
home_team_id: Home team ID
away_team_id: Away team ID
game_mode: Game mode ('ranked', 'friendly', 'practice')
visibility: Visibility ('public', 'private')
home_team_is_ai: Whether home team is AI
away_team_is_ai: Whether away team is AI
ai_difficulty: AI difficulty if applicable
Returns:
Created Game model
Raises:
SQLAlchemyError: If database operation fails
"""
async with self._get_session() as session:
game = Game(
id=game_id,
league_id=league_id,
home_team_id=home_team_id,
away_team_id=away_team_id,
game_mode=game_mode,
visibility=visibility,
home_team_is_ai=home_team_is_ai,
away_team_is_ai=away_team_is_ai,
ai_difficulty=ai_difficulty,
status="pending",
)
session.add(game)
await session.flush()
await session.refresh(game)
logger.info(f"Created game {game_id} in database ({league_id})")
return game
async def get_game(self, game_id: UUID) -> Game | None:
"""
Get game by ID.
Args:
game_id: Game identifier
Returns:
Game model if found, None otherwise
"""
async with self._get_session() as session:
result = await session.execute(select(Game).where(Game.id == game_id))
game = result.scalar_one_or_none()
if game:
logger.debug(f"Retrieved game {game_id} from database")
return game
async def update_game_state(
self,
game_id: UUID,
inning: int,
half: str,
home_score: int,
away_score: int,
status: str | None = None,
) -> None:
"""
Update game state fields using direct UPDATE (no SELECT).
Args:
game_id: Game identifier
inning: Current inning
half: Current half ('top' or 'bottom')
home_score: Home team score
away_score: Away team score
status: Game status if updating
Raises:
ValueError: If game not found
"""
from sqlalchemy import update
async with self._get_session() as session:
# Build update values
update_values = {
"current_inning": inning,
"current_half": half,
"home_score": home_score,
"away_score": away_score,
}
if status:
update_values["status"] = status
# Direct UPDATE statement (no SELECT needed)
result = await session.execute(
update(Game).where(Game.id == game_id).values(**update_values)
)
if result.rowcount == 0:
raise ValueError(f"Game {game_id} not found for update")
logger.debug(f"Updated game {game_id} state (inning {inning}, {half})")
async def add_pd_lineup_card(
self,
game_id: UUID,
team_id: int,
card_id: int,
position: str,
batting_order: int | None = None,
is_starter: bool = True,
) -> Lineup:
"""
Add PD card to lineup.
Args:
game_id: Game identifier
team_id: Team identifier
card_id: Player card ID
position: Player position
batting_order: Batting order (1-9) if applicable
is_starter: Whether player is starting lineup
Returns:
Created Lineup model
Raises:
SQLAlchemyError: If database operation fails
"""
async with self._get_session() as session:
lineup = Lineup(
game_id=game_id,
team_id=team_id,
card_id=card_id,
player_id=None,
position=position,
batting_order=batting_order,
is_starter=is_starter,
is_active=True,
)
session.add(lineup)
await session.flush()
await session.refresh(lineup)
logger.debug(f"Added PD card {card_id} to lineup in game {game_id}")
return lineup
async def add_sba_lineup_player(
self,
game_id: UUID,
team_id: int,
player_id: int,
position: str,
batting_order: int | None = None,
is_starter: bool = True,
) -> Lineup:
"""
Add SBA player to lineup.
Args:
game_id: Game identifier
team_id: Team identifier
player_id: Player ID
position: Player position
batting_order: Batting order (1-9) if applicable
is_starter: Whether player is starting lineup
Returns:
Created Lineup model
Raises:
SQLAlchemyError: If database operation fails
"""
async with self._get_session() as session:
lineup = Lineup(
game_id=game_id,
team_id=team_id,
card_id=None,
player_id=player_id,
position=position,
batting_order=batting_order,
is_starter=is_starter,
is_active=True,
)
session.add(lineup)
await session.flush()
await session.refresh(lineup)
logger.debug(f"Added SBA player {player_id} to lineup in game {game_id}")
return lineup
async def get_active_lineup(self, game_id: UUID, team_id: int) -> list[Lineup]:
"""
Get active lineup for team.
Args:
game_id: Game identifier
team_id: Team identifier
Returns:
List of active Lineup models, sorted by batting order
"""
async with self._get_session() as session:
result = await session.execute(
select(Lineup)
.where(
Lineup.game_id == game_id,
Lineup.team_id == team_id,
Lineup.is_active == True,
)
.order_by(Lineup.batting_order)
)
lineups = list(result.scalars().all())
logger.debug(
f"Retrieved {len(lineups)} active lineup entries for team {team_id}"
)
return lineups
async def create_substitution(
self,
game_id: UUID,
team_id: int,
player_out_lineup_id: int,
player_in_card_id: int,
position: str,
batting_order: int | None,
inning: int,
play_number: int,
) -> int:
"""
Create substitution in database (DB-first pattern).
Process:
1. Mark old player inactive (is_active = False)
2. Create new lineup entry with:
- is_starter = False
- is_active = True
- entered_inning = current inning
- replacing_id = old player's lineup_id
- after_play = current play number
3. Return new lineup_id
Args:
game_id: Game identifier
team_id: Team identifier
player_out_lineup_id: Lineup ID of player being replaced
player_in_card_id: Card/Player ID of incoming player
position: Position for incoming player
batting_order: Batting order for incoming player (can be None for non-batting positions)
inning: Current inning
play_number: Current play number
Returns:
New lineup_id for substituted player
Raises:
ValueError: If player_out not found
SQLAlchemyError: If database operation fails
"""
async with self._get_session() as session:
# STEP 1: Mark old player inactive
result = await session.execute(
select(Lineup).where(Lineup.id == player_out_lineup_id)
)
player_out = result.scalar_one_or_none()
if not player_out:
raise ValueError(f"Lineup entry {player_out_lineup_id} not found")
player_out.is_active = False
# STEP 2: Create new lineup entry
new_lineup = Lineup(
game_id=game_id,
team_id=team_id,
card_id=player_in_card_id, # For PD, will use card_id
player_id=None, # For SBA, swap these
position=position,
batting_order=batting_order,
is_starter=False, # Substitutes are never starters
is_active=True, # New player is active
entered_inning=inning,
replacing_id=player_out_lineup_id,
after_play=play_number,
)
session.add(new_lineup)
await session.flush()
new_lineup_id = new_lineup.id # type: ignore[assignment]
logger.info(
f"Substitution created: lineup {player_out_lineup_id}{new_lineup_id} "
f"(card {player_in_card_id}, {position}, inning {inning})"
)
return new_lineup_id
async def get_eligible_substitutes(
self, game_id: UUID, team_id: int
) -> list[Lineup]:
"""
Get all inactive players (potential substitutes).
Args:
game_id: Game identifier
team_id: Team identifier
Returns:
List of inactive Lineup models
"""
async with self._get_session() as session:
result = await session.execute(
select(Lineup)
.where(
Lineup.game_id == game_id,
Lineup.team_id == team_id,
Lineup.is_active == False,
)
.order_by(Lineup.batting_order)
)
subs = list(result.scalars().all())
logger.debug(
f"Retrieved {len(subs)} eligible substitutes for team {team_id}"
)
return subs
async def save_play(self, play_data: dict) -> int:
"""
Save play to database.
Args:
play_data: Dictionary with play data matching Play model fields
Returns:
Play ID (primary key)
Raises:
SQLAlchemyError: If database operation fails
"""
async with self._get_session() as session:
play = Play(**play_data)
session.add(play)
await session.flush() # Get ID without committing
play_id = play.id
logger.info(f"Saved play {play.play_number} for game {play.game_id}")
return play_id # type: ignore
async def get_plays(self, game_id: UUID) -> list[Play]:
"""
Get all plays for game.
Args:
game_id: Game identifier
Returns:
List of Play models, ordered by play_number
"""
async with self._get_session() as session:
result = await session.execute(
select(Play).where(Play.game_id == game_id).order_by(Play.play_number)
)
plays = list(result.scalars().all())
logger.debug(f"Retrieved {len(plays)} plays for game {game_id}")
return plays
async def load_game_state(self, game_id: UUID) -> dict | None:
"""
Load complete game state for recovery.
Loads game, lineups, and plays in a single transaction.
Args:
game_id: Game identifier
Returns:
Dictionary with 'game', 'lineups', and 'plays' keys, or None if game not found
"""
async with self._get_session() as session:
# Get game
game_result = await session.execute(select(Game).where(Game.id == game_id))
game = game_result.scalar_one_or_none()
if not game:
logger.warning(f"Game {game_id} not found for recovery")
return None
# Get lineups
lineup_result = await session.execute(
select(Lineup).where(
Lineup.game_id == game_id, Lineup.is_active == True
)
)
lineups = list(lineup_result.scalars().all())
# Get plays
play_result = await session.execute(
select(Play).where(Play.game_id == game_id).order_by(Play.play_number)
)
plays = list(play_result.scalars().all())
logger.info(
f"Loaded game state for {game_id}: {len(lineups)} lineups, {len(plays)} plays"
)
return {
"game": {
"id": game.id,
"league_id": game.league_id,
"home_team_id": game.home_team_id,
"away_team_id": game.away_team_id,
"home_team_is_ai": game.home_team_is_ai,
"away_team_is_ai": game.away_team_is_ai,
"status": game.status,
"current_inning": game.current_inning,
"current_half": game.current_half,
"home_score": game.home_score,
"away_score": game.away_score,
},
"lineups": [
{
"id": l.id,
"team_id": l.team_id,
"card_id": l.card_id,
"player_id": l.player_id,
"position": l.position,
"batting_order": l.batting_order,
"is_active": l.is_active,
}
for l in lineups
],
"plays": [
{
"play_number": p.play_number,
"inning": p.inning,
"half": p.half,
"outs_before": p.outs_before,
"batting_order": p.batting_order,
"outs_recorded": p.outs_recorded,
"result_description": p.result_description,
"complete": p.complete,
# Runner tracking for state recovery
"batter_id": p.batter_id,
"on_first_id": p.on_first_id,
"on_second_id": p.on_second_id,
"on_third_id": p.on_third_id,
"batter_final": p.batter_final,
"on_first_final": p.on_first_final,
"on_second_final": p.on_second_final,
"on_third_final": p.on_third_final,
}
for p in plays
],
}
async def create_game_session(self, game_id: UUID) -> GameSession:
"""
Create game session record for WebSocket tracking.
Args:
game_id: Game identifier
Returns:
Created GameSession model
"""
async with self._get_session() as session:
game_session = GameSession(game_id=game_id)
session.add(game_session)
await session.flush()
await session.refresh(game_session)
logger.info(f"Created game session for {game_id}")
return game_session
async def update_session_snapshot(
self, game_id: UUID, state_snapshot: dict
) -> None:
"""
Update session state snapshot.
Args:
game_id: Game identifier
state_snapshot: JSON-serializable state snapshot
Raises:
ValueError: If game session not found
"""
async with self._get_session() as session:
result = await session.execute(
select(GameSession).where(GameSession.game_id == game_id)
)
game_session = result.scalar_one_or_none()
if not game_session:
raise ValueError(f"Game session {game_id} not found")
game_session.state_snapshot = state_snapshot
logger.debug(f"Updated session snapshot for {game_id}")
async def add_pd_roster_card(
self, game_id: UUID, card_id: int, team_id: int
) -> PdRosterLinkData:
"""
Add a PD card to game roster.
Args:
game_id: Game identifier
card_id: Card identifier
team_id: Team identifier
Returns:
PdRosterLinkData with populated id
Raises:
ValueError: If card already rostered or constraint violation
"""
async with self._get_session() as session:
roster_link = RosterLink(
game_id=game_id, card_id=card_id, team_id=team_id
)
session.add(roster_link)
await session.flush()
await session.refresh(roster_link)
logger.info(f"Added PD card {card_id} to roster for game {game_id}")
return PdRosterLinkData(
id=roster_link.id,
game_id=roster_link.game_id,
card_id=roster_link.card_id,
team_id=roster_link.team_id,
)
async def add_sba_roster_player(
self, game_id: UUID, player_id: int, team_id: int
) -> SbaRosterLinkData:
"""
Add an SBA player to game roster.
Args:
game_id: Game identifier
player_id: Player identifier
team_id: Team identifier
Returns:
SbaRosterLinkData with populated id
Raises:
ValueError: If player already rostered or constraint violation
"""
async with self._get_session() as session:
roster_link = RosterLink(
game_id=game_id, player_id=player_id, team_id=team_id
)
session.add(roster_link)
await session.flush()
await session.refresh(roster_link)
logger.info(f"Added SBA player {player_id} to roster for game {game_id}")
return SbaRosterLinkData(
id=roster_link.id,
game_id=roster_link.game_id,
player_id=roster_link.player_id,
team_id=roster_link.team_id,
)
async def get_pd_roster(
self, game_id: UUID, team_id: int | None = None
) -> list[PdRosterLinkData]:
"""
Get PD cards for a game, optionally filtered by team.
Args:
game_id: Game identifier
team_id: Optional team filter
Returns:
List of PdRosterLinkData
"""
async with self._get_session() as session:
query = select(RosterLink).where(
RosterLink.game_id == game_id, RosterLink.card_id.is_not(None)
)
if team_id is not None:
query = query.where(RosterLink.team_id == team_id)
result = await session.execute(query)
roster_links = result.scalars().all()
return [
PdRosterLinkData(
id=link.id,
game_id=link.game_id,
card_id=link.card_id,
team_id=link.team_id,
)
for link in roster_links
]
async def get_sba_roster(
self, game_id: UUID, team_id: int | None = None
) -> list[SbaRosterLinkData]:
"""
Get SBA players for a game, optionally filtered by team.
Args:
game_id: Game identifier
team_id: Optional team filter
Returns:
List of SbaRosterLinkData
"""
async with self._get_session() as session:
query = select(RosterLink).where(
RosterLink.game_id == game_id, RosterLink.player_id.is_not(None)
)
if team_id is not None:
query = query.where(RosterLink.team_id == team_id)
result = await session.execute(query)
roster_links = result.scalars().all()
return [
SbaRosterLinkData(
id=link.id,
game_id=link.game_id,
player_id=link.player_id,
team_id=link.team_id,
)
for link in roster_links
]
async def remove_roster_entry(self, roster_id: int) -> None:
"""
Remove a roster entry by ID.
Args:
roster_id: RosterLink ID
Raises:
ValueError: If roster entry not found
"""
async with self._get_session() as session:
result = await session.execute(
select(RosterLink).where(RosterLink.id == roster_id)
)
roster_link = result.scalar_one_or_none()
if not roster_link:
raise ValueError(f"Roster entry {roster_id} not found")
await session.delete(roster_link)
logger.info(f"Removed roster entry {roster_id}")
async def save_rolls_batch(self, rolls: list) -> None:
"""
Save multiple dice rolls in a single transaction.
Used for batch persistence at end of innings.
Args:
rolls: List of DiceRoll objects (AbRoll, JumpRoll, FieldingRoll, D20Roll)
Raises:
Exception: If batch save fails
"""
if not rolls:
logger.debug("No rolls to save")
return
async with self._get_session() as session:
roll_records = [
Roll(
roll_id=roll.roll_id,
game_id=roll.game_id,
roll_type=roll.roll_type.value,
league_id=roll.league_id,
team_id=roll.team_id,
player_id=roll.player_id,
roll_data=roll.to_dict(), # Store full roll as JSONB
context=roll.context,
timestamp=roll.timestamp,
)
for roll in rolls
]
session.add_all(roll_records)
await session.flush()
logger.info(f"Batch saved {len(rolls)} rolls")
async def get_rolls_for_game(
self,
game_id: UUID,
roll_type: str | None = None,
team_id: int | None = None,
limit: int = 100,
) -> list[Roll]:
"""
Get roll history for a game with optional filtering.
Args:
game_id: Game identifier
roll_type: Optional filter by roll type ('ab', 'jump', 'fielding', 'd20')
team_id: Optional filter by team
limit: Maximum rolls to return
Returns:
List of Roll objects
"""
async with self._get_session() as session:
query = select(Roll).where(Roll.game_id == game_id)
if roll_type:
query = query.where(Roll.roll_type == roll_type)
if team_id is not None:
query = query.where(Roll.team_id == team_id)
query = query.order_by(Roll.timestamp.desc()).limit(limit)
result = await session.execute(query)
return list(result.scalars().all())
# ============================================================================
# ROLLBACK OPERATIONS
# ============================================================================
async def delete_plays_after(self, game_id: UUID, after_play_number: int) -> int:
"""
Delete all plays after a specific play number.
Used for rolling back plays when a mistake is made.
Args:
game_id: Game to delete plays from
after_play_number: Delete plays with play_number > this value
Returns:
Number of plays deleted
"""
from sqlalchemy import delete
async with self._get_session() as session:
stmt = delete(Play).where(
Play.game_id == game_id, Play.play_number > after_play_number
)
result = await session.execute(stmt)
deleted_count = result.rowcount
logger.info(
f"Deleted {deleted_count} plays after play {after_play_number} for game {game_id}"
)
return deleted_count
async def delete_substitutions_after(
self, game_id: UUID, after_play_number: int
) -> int:
"""
Delete all substitutions that occurred after a specific play number.
Used for rolling back lineups when plays are deleted.
Args:
game_id: Game to delete substitutions from
after_play_number: Delete lineup entries with after_play >= this value
Returns:
Number of lineup entries deleted
"""
from sqlalchemy import delete
async with self._get_session() as session:
stmt = delete(Lineup).where(
Lineup.game_id == game_id, Lineup.after_play >= after_play_number
)
result = await session.execute(stmt)
deleted_count = result.rowcount
logger.info(
f"Deleted {deleted_count} substitutions after play {after_play_number} for game {game_id}"
)
return deleted_count
async def delete_rolls_after(self, game_id: UUID, after_play_number: int) -> int:
"""
Delete all dice rolls after a specific play number.
Used for rolling back dice roll history when plays are deleted.
Args:
game_id: Game to delete rolls from
after_play_number: Delete rolls with play_number > this value
Returns:
Number of rolls deleted
"""
from sqlalchemy import delete
async with self._get_session() as session:
stmt = delete(Roll).where(
Roll.game_id == game_id, Roll.play_number > after_play_number
)
result = await session.execute(stmt)
deleted_count = result.rowcount
logger.info(
f"Deleted {deleted_count} rolls after play {after_play_number} for game {game_id}"
)
return deleted_count