strat-gameplay-webapp/backend/app/database/operations.py
Cal Corum 874e24dc75 CLAUDE: Implement comprehensive dice roll system with persistence
Core Implementation:
- Created roll_types.py with AbRoll, JumpRoll, FieldingRoll, D20Roll dataclasses
- Implemented DiceSystem singleton with cryptographically secure random generation
- Added Roll model to db_models.py with JSONB storage for roll history
- Implemented save_rolls_batch() and get_rolls_for_game() in database operations

Testing:
- 27 unit tests for roll type dataclasses (100% passing)
- 35 unit tests for dice system (34/35 passing, 1 timing issue)
- 16 integration tests for database persistence (uses production DiceSystem)

Features:
- Unique roll IDs using secrets.token_hex()
- League-specific logic (SBA d100 rare plays, PD error-based rare plays)
- Automatic derived value calculation (d6_two_total, jump_total, error_total)
- Full audit trail with context metadata
- Support for batch saving rolls per inning

Technical Details:
- Fixed dataclass inheritance with kw_only=True for Python 3.13
- Roll data stored as JSONB for flexible querying
- Indexed on game_id, roll_type, league_id, team_id for efficient retrieval
- Supports filtering by roll type, team, and timestamp ordering

Note: Integration tests have async connection pool issue when run together
(tests work individually, fixture cleanup needed in follow-up branch)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-24 08:29:02 -05:00

744 lines
24 KiB
Python

"""
Database Operations - Async persistence layer for game data.
Provides async operations for persisting and retrieving game data.
Used by StateManager for database persistence and recovery.
Author: Claude
Date: 2025-10-22
"""
# pyright: reportAssignmentType=false, reportArgumentType=false, reportAttributeAccessIssue=false
# Note: SQLAlchemy Column descriptors cause false positives in Pylance/Pyright
import logging
from typing import Optional, List, Dict
from uuid import UUID
from sqlalchemy import select
from app.database.session import AsyncSessionLocal
from app.models.db_models import Game, Play, Lineup, GameSession, RosterLink, Roll
from app.models.roster_models import PdRosterLinkData, SbaRosterLinkData
logger = logging.getLogger(f'{__name__}.DatabaseOperations')
class DatabaseOperations:
"""
Async database operations for game persistence.
Provides methods for creating, reading, and updating game data in PostgreSQL.
All operations are async and use the AsyncSessionLocal for session management.
"""
async def create_game(
self,
game_id: UUID,
league_id: str,
home_team_id: int,
away_team_id: int,
game_mode: str,
visibility: str,
home_team_is_ai: bool = False,
away_team_is_ai: bool = False,
ai_difficulty: Optional[str] = None
) -> Game:
"""
Create new game in database.
Args:
game_id: Unique game identifier
league_id: League identifier ('sba' or 'pd')
home_team_id: Home team ID
away_team_id: Away team ID
game_mode: Game mode ('ranked', 'friendly', 'practice')
visibility: Visibility ('public', 'private')
home_team_is_ai: Whether home team is AI
away_team_is_ai: Whether away team is AI
ai_difficulty: AI difficulty if applicable
Returns:
Created Game model
Raises:
SQLAlchemyError: If database operation fails
"""
async with AsyncSessionLocal() as session:
try:
game = Game(
id=game_id,
league_id=league_id,
home_team_id=home_team_id,
away_team_id=away_team_id,
game_mode=game_mode,
visibility=visibility,
home_team_is_ai=home_team_is_ai,
away_team_is_ai=away_team_is_ai,
ai_difficulty=ai_difficulty,
status="pending"
)
session.add(game)
await session.commit()
await session.refresh(game)
logger.info(f"Created game {game_id} in database ({league_id})")
return game
except Exception as e:
await session.rollback()
logger.error(f"Failed to create game {game_id}: {e}")
raise
async def get_game(self, game_id: UUID) -> Optional[Game]:
"""
Get game by ID.
Args:
game_id: Game identifier
Returns:
Game model if found, None otherwise
"""
async with AsyncSessionLocal() as session:
result = await session.execute(
select(Game).where(Game.id == game_id)
)
game = result.scalar_one_or_none()
if game:
logger.debug(f"Retrieved game {game_id} from database")
return game
async def update_game_state(
self,
game_id: UUID,
inning: int,
half: str,
home_score: int,
away_score: int,
status: Optional[str] = None
) -> None:
"""
Update game state fields.
Args:
game_id: Game identifier
inning: Current inning
half: Current half ('top' or 'bottom')
home_score: Home team score
away_score: Away team score
status: Game status if updating
Raises:
ValueError: If game not found
"""
async with AsyncSessionLocal() as session:
try:
result = await session.execute(
select(Game).where(Game.id == game_id)
)
game = result.scalar_one_or_none()
if not game:
raise ValueError(f"Game {game_id} not found")
game.current_inning = inning
game.current_half = half
game.home_score = home_score
game.away_score = away_score
if status:
game.status = status
await session.commit()
logger.debug(f"Updated game {game_id} state (inning {inning}, {half})")
except Exception as e:
await session.rollback()
logger.error(f"Failed to update game {game_id} state: {e}")
raise
async def add_pd_lineup_card(
self,
game_id: UUID,
team_id: int,
card_id: int,
position: str,
batting_order: Optional[int] = None,
is_starter: bool = True
) -> Lineup:
"""
Add PD card to lineup.
Args:
game_id: Game identifier
team_id: Team identifier
card_id: Player card ID
position: Player position
batting_order: Batting order (1-9) if applicable
is_starter: Whether player is starting lineup
Returns:
Created Lineup model
Raises:
SQLAlchemyError: If database operation fails
"""
async with AsyncSessionLocal() as session:
try:
lineup = Lineup(
game_id=game_id,
team_id=team_id,
card_id=card_id,
player_id=None,
position=position,
batting_order=batting_order,
is_starter=is_starter,
is_active=True
)
session.add(lineup)
await session.commit()
await session.refresh(lineup)
logger.debug(f"Added PD card {card_id} to lineup in game {game_id}")
return lineup
except Exception as e:
await session.rollback()
logger.error(f"Failed to add PD lineup card: {e}")
raise
async def add_sba_lineup_player(
self,
game_id: UUID,
team_id: int,
player_id: int,
position: str,
batting_order: Optional[int] = None,
is_starter: bool = True
) -> Lineup:
"""
Add SBA player to lineup.
Args:
game_id: Game identifier
team_id: Team identifier
player_id: Player ID
position: Player position
batting_order: Batting order (1-9) if applicable
is_starter: Whether player is starting lineup
Returns:
Created Lineup model
Raises:
SQLAlchemyError: If database operation fails
"""
async with AsyncSessionLocal() as session:
try:
lineup = Lineup(
game_id=game_id,
team_id=team_id,
card_id=None,
player_id=player_id,
position=position,
batting_order=batting_order,
is_starter=is_starter,
is_active=True
)
session.add(lineup)
await session.commit()
await session.refresh(lineup)
logger.debug(f"Added SBA player {player_id} to lineup in game {game_id}")
return lineup
except Exception as e:
await session.rollback()
logger.error(f"Failed to add SBA lineup player: {e}")
raise
async def get_active_lineup(self, game_id: UUID, team_id: int) -> List[Lineup]:
"""
Get active lineup for team.
Args:
game_id: Game identifier
team_id: Team identifier
Returns:
List of active Lineup models, sorted by batting order
"""
async with AsyncSessionLocal() as session:
result = await session.execute(
select(Lineup)
.where(
Lineup.game_id == game_id,
Lineup.team_id == team_id,
Lineup.is_active == True
)
.order_by(Lineup.batting_order)
)
lineups = list(result.scalars().all())
logger.debug(f"Retrieved {len(lineups)} active lineup entries for team {team_id}")
return lineups
async def save_play(self, play_data: dict) -> Play:
"""
Save play to database.
Args:
play_data: Dictionary with play data matching Play model fields
Returns:
Created Play model
Raises:
SQLAlchemyError: If database operation fails
"""
async with AsyncSessionLocal() as session:
try:
play = Play(**play_data)
session.add(play)
await session.commit()
await session.refresh(play)
logger.info(f"Saved play {play.play_number} for game {play.game_id}")
return play
except Exception as e:
await session.rollback()
logger.error(f"Failed to save play: {e}")
raise
async def get_plays(self, game_id: UUID) -> List[Play]:
"""
Get all plays for game.
Args:
game_id: Game identifier
Returns:
List of Play models, ordered by play_number
"""
async with AsyncSessionLocal() as session:
result = await session.execute(
select(Play)
.where(Play.game_id == game_id)
.order_by(Play.play_number)
)
plays = list(result.scalars().all())
logger.debug(f"Retrieved {len(plays)} plays for game {game_id}")
return plays
async def load_game_state(self, game_id: UUID) -> Optional[Dict]:
"""
Load complete game state for recovery.
Loads game, lineups, and plays in a single transaction.
Args:
game_id: Game identifier
Returns:
Dictionary with 'game', 'lineups', and 'plays' keys, or None if game not found
"""
async with AsyncSessionLocal() as session:
# Get game
game_result = await session.execute(
select(Game).where(Game.id == game_id)
)
game = game_result.scalar_one_or_none()
if not game:
logger.warning(f"Game {game_id} not found for recovery")
return None
# Get lineups
lineup_result = await session.execute(
select(Lineup)
.where(Lineup.game_id == game_id, Lineup.is_active == True)
)
lineups = list(lineup_result.scalars().all())
# Get plays
play_result = await session.execute(
select(Play)
.where(Play.game_id == game_id)
.order_by(Play.play_number)
)
plays = list(play_result.scalars().all())
logger.info(f"Loaded game state for {game_id}: {len(lineups)} lineups, {len(plays)} plays")
return {
'game': {
'id': game.id,
'league_id': game.league_id,
'home_team_id': game.home_team_id,
'away_team_id': game.away_team_id,
'home_team_is_ai': game.home_team_is_ai,
'away_team_is_ai': game.away_team_is_ai,
'status': game.status,
'current_inning': game.current_inning,
'current_half': game.current_half,
'home_score': game.home_score,
'away_score': game.away_score
},
'lineups': [
{
'id': l.id,
'team_id': l.team_id,
'card_id': l.card_id,
'player_id': l.player_id,
'position': l.position,
'batting_order': l.batting_order,
'is_active': l.is_active
}
for l in lineups
],
'plays': [
{
'play_number': p.play_number,
'inning': p.inning,
'half': p.half,
'outs_before': p.outs_before,
'result_description': p.result_description
}
for p in plays
]
}
async def create_game_session(self, game_id: UUID) -> GameSession:
"""
Create game session record for WebSocket tracking.
Args:
game_id: Game identifier
Returns:
Created GameSession model
"""
async with AsyncSessionLocal() as session:
try:
game_session = GameSession(game_id=game_id)
session.add(game_session)
await session.commit()
await session.refresh(game_session)
logger.info(f"Created game session for {game_id}")
return game_session
except Exception as e:
await session.rollback()
logger.error(f"Failed to create game session: {e}")
raise
async def update_session_snapshot(
self,
game_id: UUID,
state_snapshot: dict
) -> None:
"""
Update session state snapshot.
Args:
game_id: Game identifier
state_snapshot: JSON-serializable state snapshot
Raises:
ValueError: If game session not found
"""
async with AsyncSessionLocal() as session:
try:
result = await session.execute(
select(GameSession).where(GameSession.game_id == game_id)
)
game_session = result.scalar_one_or_none()
if not game_session:
raise ValueError(f"Game session {game_id} not found")
game_session.state_snapshot = state_snapshot
await session.commit()
logger.debug(f"Updated session snapshot for {game_id}")
except Exception as e:
await session.rollback()
logger.error(f"Failed to update session snapshot: {e}")
raise
async def add_pd_roster_card(
self,
game_id: UUID,
card_id: int,
team_id: int
) -> PdRosterLinkData:
"""
Add a PD card to game roster.
Args:
game_id: Game identifier
card_id: Card identifier
team_id: Team identifier
Returns:
PdRosterLinkData with populated id
Raises:
ValueError: If card already rostered or constraint violation
"""
async with AsyncSessionLocal() as session:
try:
roster_link = RosterLink(
game_id=game_id,
card_id=card_id,
team_id=team_id
)
session.add(roster_link)
await session.commit()
await session.refresh(roster_link)
logger.info(f"Added PD card {card_id} to roster for game {game_id}")
return PdRosterLinkData(
id=roster_link.id,
game_id=roster_link.game_id,
card_id=roster_link.card_id,
team_id=roster_link.team_id
)
except Exception as e:
await session.rollback()
logger.error(f"Failed to add PD roster card: {e}")
raise ValueError(f"Could not add card to roster: {e}")
async def add_sba_roster_player(
self,
game_id: UUID,
player_id: int,
team_id: int
) -> SbaRosterLinkData:
"""
Add an SBA player to game roster.
Args:
game_id: Game identifier
player_id: Player identifier
team_id: Team identifier
Returns:
SbaRosterLinkData with populated id
Raises:
ValueError: If player already rostered or constraint violation
"""
async with AsyncSessionLocal() as session:
try:
roster_link = RosterLink(
game_id=game_id,
player_id=player_id,
team_id=team_id
)
session.add(roster_link)
await session.commit()
await session.refresh(roster_link)
logger.info(f"Added SBA player {player_id} to roster for game {game_id}")
return SbaRosterLinkData(
id=roster_link.id,
game_id=roster_link.game_id,
player_id=roster_link.player_id,
team_id=roster_link.team_id
)
except Exception as e:
await session.rollback()
logger.error(f"Failed to add SBA roster player: {e}")
raise ValueError(f"Could not add player to roster: {e}")
async def get_pd_roster(
self,
game_id: UUID,
team_id: Optional[int] = None
) -> List[PdRosterLinkData]:
"""
Get PD cards for a game, optionally filtered by team.
Args:
game_id: Game identifier
team_id: Optional team filter
Returns:
List of PdRosterLinkData
"""
async with AsyncSessionLocal() as session:
try:
query = select(RosterLink).where(
RosterLink.game_id == game_id,
RosterLink.card_id.is_not(None)
)
if team_id is not None:
query = query.where(RosterLink.team_id == team_id)
result = await session.execute(query)
roster_links = result.scalars().all()
return [
PdRosterLinkData(
id=link.id,
game_id=link.game_id,
card_id=link.card_id,
team_id=link.team_id
)
for link in roster_links
]
except Exception as e:
logger.error(f"Failed to get PD roster: {e}")
raise
async def get_sba_roster(
self,
game_id: UUID,
team_id: Optional[int] = None
) -> List[SbaRosterLinkData]:
"""
Get SBA players for a game, optionally filtered by team.
Args:
game_id: Game identifier
team_id: Optional team filter
Returns:
List of SbaRosterLinkData
"""
async with AsyncSessionLocal() as session:
try:
query = select(RosterLink).where(
RosterLink.game_id == game_id,
RosterLink.player_id.is_not(None)
)
if team_id is not None:
query = query.where(RosterLink.team_id == team_id)
result = await session.execute(query)
roster_links = result.scalars().all()
return [
SbaRosterLinkData(
id=link.id,
game_id=link.game_id,
player_id=link.player_id,
team_id=link.team_id
)
for link in roster_links
]
except Exception as e:
logger.error(f"Failed to get SBA roster: {e}")
raise
async def remove_roster_entry(self, roster_id: int) -> None:
"""
Remove a roster entry by ID.
Args:
roster_id: RosterLink ID
Raises:
ValueError: If roster entry not found
"""
async with AsyncSessionLocal() as session:
try:
result = await session.execute(
select(RosterLink).where(RosterLink.id == roster_id)
)
roster_link = result.scalar_one_or_none()
if not roster_link:
raise ValueError(f"Roster entry {roster_id} not found")
await session.delete(roster_link)
await session.commit()
logger.info(f"Removed roster entry {roster_id}")
except Exception as e:
await session.rollback()
logger.error(f"Failed to remove roster entry: {e}")
raise
async def save_rolls_batch(self, rolls: List) -> None:
"""
Save multiple dice rolls in a single transaction.
Used for batch persistence at end of innings.
Args:
rolls: List of DiceRoll objects (AbRoll, JumpRoll, FieldingRoll, D20Roll)
Raises:
Exception: If batch save fails
"""
if not rolls:
logger.debug("No rolls to save")
return
async with AsyncSessionLocal() as session:
try:
roll_records = [
Roll(
roll_id=roll.roll_id,
game_id=roll.game_id,
roll_type=roll.roll_type.value,
league_id=roll.league_id,
team_id=roll.team_id,
player_id=roll.player_id,
roll_data=roll.to_dict(), # Store full roll as JSONB
context=roll.context,
timestamp=roll.timestamp
)
for roll in rolls
]
session.add_all(roll_records)
await session.commit()
logger.info(f"Batch saved {len(rolls)} rolls")
except Exception as e:
await session.rollback()
logger.error(f"Failed to batch save rolls: {e}")
raise
async def get_rolls_for_game(
self,
game_id: UUID,
roll_type: Optional[str] = None,
team_id: Optional[int] = None,
limit: int = 100
) -> List[Roll]:
"""
Get roll history for a game with optional filtering.
Args:
game_id: Game identifier
roll_type: Optional filter by roll type ('ab', 'jump', 'fielding', 'd20')
team_id: Optional filter by team
limit: Maximum rolls to return
Returns:
List of Roll objects
"""
async with AsyncSessionLocal() as session:
try:
query = select(Roll).where(Roll.game_id == game_id)
if roll_type:
query = query.where(Roll.roll_type == roll_type)
if team_id is not None:
query = query.where(Roll.team_id == team_id)
query = query.order_by(Roll.timestamp.desc()).limit(limit)
result = await session.execute(query)
return list(result.scalars().all())
except Exception as e:
logger.error(f"Failed to get rolls for game: {e}")
raise