# Database Layer - Async Persistence for Game Data ## Purpose The database layer provides async PostgreSQL persistence for all game data using SQLAlchemy 2.0 with asyncpg. It handles: - **Session Management**: Connection pooling, lifecycle management, automatic commit/rollback - **Database Operations**: CRUD operations for games, plays, lineups, rosters, dice rolls - **State Persistence**: Async writes that don't block game logic - **State Recovery**: Complete game state reconstruction from database - **Transaction Safety**: Proper error handling and rollback on failures **Architecture Pattern**: Write-through cache - update in-memory state immediately, persist to database asynchronously. ## Structure ``` app/database/ ├── __init__.py # Empty package marker ├── session.py # Session factory, engine, Base declarative └── operations.py # DatabaseOperations class with all CRUD methods ``` ### Module Breakdown #### `session.py` (55 lines) - **Purpose**: Database connection and session management - **Exports**: `engine`, `AsyncSessionLocal`, `Base`, `init_db()`, `get_session()` - **Key Pattern**: Async context managers with automatic commit/rollback #### `operations.py` (882 lines) - **Purpose**: All database operations for game persistence - **Exports**: `DatabaseOperations` class with 20+ async methods - **Key Pattern**: Each operation uses its own session context manager ## Key Components ### 1. AsyncSessionLocal (Session Factory) **Location**: `session.py:21-27` Factory for creating async database sessions. Configured with optimal settings for game engine. ```python from sqlalchemy.ext.asyncio import async_sessionmaker, AsyncSession AsyncSessionLocal = async_sessionmaker( engine, class_=AsyncSession, expire_on_commit=False, # Don't expire objects after commit (allows access after commit) autocommit=False, # Explicit commit control autoflush=False, # Manual flush control ) ``` **Configuration Notes**: - `expire_on_commit=False`: Critical for accessing object attributes after commit without refetching - `autocommit=False`: Requires explicit `await session.commit()` - `autoflush=False`: Manual control over when SQL is flushed to database ### 2. Engine Configuration **Location**: `session.py:13-18` ```python from sqlalchemy.ext.asyncio import create_async_engine engine = create_async_engine( settings.database_url, # postgresql+asyncpg://... echo=settings.debug, # Log SQL in debug mode pool_size=settings.db_pool_size, # Default: 10 connections max_overflow=settings.db_max_overflow, # Default: 20 overflow connections ) ``` **Connection Pool**: - Base pool: 10 connections (configured in `.env`) - Max overflow: 20 additional connections under load - Total max: 30 concurrent connections **Environment Variables**: ```bash DATABASE_URL=postgresql+asyncpg://paperdynasty:PASSWORD@10.10.0.42:5432/paperdynasty_dev DB_POOL_SIZE=10 DB_MAX_OVERFLOW=20 ``` ### 3. Base Declarative Class **Location**: `session.py:30` ```python from sqlalchemy.orm import declarative_base Base = declarative_base() ``` All ORM models inherit from this Base class. Used in `app/models/db_models.py`. ### 4. DatabaseOperations Class **Location**: `operations.py:26-882` Singleton class providing all database operations. Instantiate once and reuse. **Categories**: - **Game Operations**: `create_game()`, `get_game()`, `update_game_state()` - **Lineup Operations**: `add_pd_lineup_card()`, `add_sba_lineup_player()`, `get_active_lineup()` - **Play Operations**: `save_play()`, `get_plays()` - **Roster Operations**: `add_pd_roster_card()`, `add_sba_roster_player()`, `get_pd_roster()`, `get_sba_roster()`, `remove_roster_entry()` - **Session Operations**: `create_game_session()`, `update_session_snapshot()` - **Dice Roll Operations**: `save_rolls_batch()`, `get_rolls_for_game()` - **Recovery Operations**: `load_game_state()` - **Rollback Operations**: `delete_plays_after()`, `delete_substitutions_after()`, `delete_rolls_after()` **Usage Pattern**: ```python from app.database.operations import DatabaseOperations db_ops = DatabaseOperations() # Use methods game = await db_ops.create_game(...) plays = await db_ops.get_plays(game_id) ``` ## Patterns & Conventions ### 1. Async Session Context Manager Pattern **Every database operation follows this pattern:** ```python async def some_operation(self, game_id: UUID) -> SomeModel: """ Operation description. Args: game_id: Description Returns: Description Raises: SQLAlchemyError: If database operation fails """ async with AsyncSessionLocal() as session: try: # 1. Query or create model result = await session.execute(select(Model).where(...)) model = result.scalar_one_or_none() # 2. Modify or create if not model: model = Model(...) session.add(model) # 3. Commit transaction await session.commit() # 4. Refresh if needed (loads relationships) await session.refresh(model) # 5. Log success logger.info(f"Operation completed for {game_id}") return model except Exception as e: # Automatic rollback on exception await session.rollback() logger.error(f"Operation failed: {e}") raise ``` **Key Points**: - Context manager handles session cleanup automatically - Explicit `commit()` required (autocommit=False) - `rollback()` on any exception - Always log errors with context - Session closes automatically when exiting context ### 2. Query Patterns #### Simple SELECT ```python async with AsyncSessionLocal() as session: result = await session.execute( select(Game).where(Game.id == game_id) ) game = result.scalar_one_or_none() ``` #### SELECT with Ordering ```python result = await session.execute( select(Play) .where(Play.game_id == game_id) .order_by(Play.play_number) ) plays = list(result.scalars().all()) ``` #### SELECT with Multiple Filters ```python result = await session.execute( select(Lineup) .where( Lineup.game_id == game_id, Lineup.team_id == team_id, Lineup.is_active == True ) .order_by(Lineup.batting_order) ) lineups = list(result.scalars().all()) ``` #### Direct UPDATE (No SELECT) ```python from sqlalchemy import update result = await session.execute( update(Game) .where(Game.id == game_id) .values( current_inning=inning, current_half=half, home_score=home_score, away_score=away_score ) ) await session.commit() # Check if row was found if result.rowcount == 0: raise ValueError(f"Game {game_id} not found") ``` #### DELETE ```python from sqlalchemy import delete stmt = delete(Play).where( Play.game_id == game_id, Play.play_number > after_play_number ) result = await session.execute(stmt) await session.commit() deleted_count = result.rowcount ``` ### 3. Polymorphic Operations (League-Specific) **Pattern**: Separate methods for PD vs SBA leagues using same underlying table. #### Roster Links (PD vs SBA) ```python # PD league - uses card_id async def add_pd_roster_card(self, game_id: UUID, card_id: int, team_id: int): roster_link = RosterLink( game_id=game_id, card_id=card_id, # PD: card_id populated player_id=None, # SBA: player_id is None team_id=team_id ) # ... persist and return PdRosterLinkData # SBA league - uses player_id async def add_sba_roster_player(self, game_id: UUID, player_id: int, team_id: int): roster_link = RosterLink( game_id=game_id, card_id=None, # PD: card_id is None player_id=player_id, # SBA: player_id populated team_id=team_id ) # ... persist and return SbaRosterLinkData ``` **Benefits**: - Type safety at application layer (PdRosterLinkData vs SbaRosterLinkData) - Database enforces XOR constraint (exactly one ID populated) - Single table avoids complex joins #### Lineup Operations (PD vs SBA) Same pattern - `add_pd_lineup_card()` vs `add_sba_lineup_player()`. ### 4. Batch Operations **Pattern**: Add multiple records in single transaction for performance. ```python async def save_rolls_batch(self, rolls: List) -> None: """Save multiple dice rolls in a single transaction.""" if not rolls: return async with AsyncSessionLocal() as session: try: roll_records = [ Roll( roll_id=roll.roll_id, game_id=roll.game_id, roll_type=roll.roll_type.value, # ... other fields ) for roll in rolls ] session.add_all(roll_records) # Batch insert await session.commit() except Exception as e: await session.rollback() raise ``` **Usage**: Dice rolls are batched at end of inning for efficiency. ### 5. State Recovery Pattern **Location**: `operations.py:338-424` Load complete game state in single transaction for efficient recovery. ```python async def load_game_state(self, game_id: UUID) -> Optional[Dict]: """Load complete game state for recovery.""" async with AsyncSessionLocal() as session: # 1. Load game game_result = await session.execute( select(Game).where(Game.id == game_id) ) game = game_result.scalar_one_or_none() if not game: return None # 2. Load lineups lineup_result = await session.execute( select(Lineup).where(Lineup.game_id == game_id, Lineup.is_active == True) ) lineups = list(lineup_result.scalars().all()) # 3. Load plays play_result = await session.execute( select(Play).where(Play.game_id == game_id).order_by(Play.play_number) ) plays = list(play_result.scalars().all()) # 4. Return normalized dictionary return { 'game': {...}, # Game data as dict 'lineups': [...], # Lineup data as list of dicts 'plays': [...] # Play data as list of dicts } ``` **Used By**: `StateManager.recover_game()` to rebuild in-memory state. ## Integration Points ### 1. With ORM Models (`app/models/db_models.py`) Database operations directly use SQLAlchemy ORM models: ```python from app.models.db_models import Game, Play, Lineup, RosterLink, Roll, GameSession ``` **Critical**: Models are defined in `db_models.py`, operations use them in `operations.py`. ### 2. With StateManager (`app/core/state_manager.py`) StateManager uses DatabaseOperations for all persistence: ```python from app.database.operations import DatabaseOperations class StateManager: def __init__(self): self.db_ops = DatabaseOperations() async def create_game(self, ...): # 1. Persist to database first db_game = await self.db_ops.create_game(...) # 2. Create in-memory state state = GameState(...) # 3. Cache in memory self._states[game_id] = state return state ``` **Pattern**: Database is source of truth, in-memory is fast cache. ### 3. With GameEngine (`app/core/game_engine.py`) GameEngine calls StateManager, which uses DatabaseOperations: ```python async def resolve_play(self, game_id: UUID) -> dict: # 1. Get in-memory state (fast) state = self.state_manager.get_state(game_id) # 2. Resolve play logic result = self._resolve_outcome(state) # 3. Persist play to database (async, non-blocking) play_id = await self.state_manager.db_ops.save_play(play_data) # 4. Update game state in database await self.state_manager.db_ops.update_game_state( game_id, state.inning, state.half, state.home_score, state.away_score ) ``` ### 4. With Pydantic Models (`app/models/roster_models.py`) Polymorphic operations return Pydantic models for type safety: ```python from app.models.roster_models import PdRosterLinkData, SbaRosterLinkData # Returns typed Pydantic model roster_data: PdRosterLinkData = await db_ops.add_pd_roster_card(...) ``` ## Common Tasks ### Adding a New Database Operation **Steps**: 1. Add method to `DatabaseOperations` class in `operations.py` 2. Follow async session context manager pattern 3. Add comprehensive docstring 4. Add logging (info on success, error on failure) 5. Return typed result (model or primitive) 6. Handle errors with rollback **Example**: ```python async def get_pitcher_stats(self, game_id: UUID, lineup_id: int) -> dict: """ Get pitching statistics for a pitcher in a game. Args: game_id: Game identifier lineup_id: Pitcher's lineup ID Returns: Dictionary with pitching statistics Raises: ValueError: If pitcher not found """ async with AsyncSessionLocal() as session: try: result = await session.execute( select( func.sum(Play.outs_recorded).label('outs'), func.sum(Play.hit).label('hits_allowed'), func.sum(Play.bb).label('walks'), func.sum(Play.so).label('strikeouts') ) .where( Play.game_id == game_id, Play.pitcher_id == lineup_id ) ) stats = result.one() logger.debug(f"Retrieved pitcher stats for lineup {lineup_id}") return { 'outs': stats.outs or 0, 'hits_allowed': stats.hits_allowed or 0, 'walks': stats.walks or 0, 'strikeouts': stats.strikeouts or 0 } except Exception as e: logger.error(f"Failed to get pitcher stats: {e}") raise ValueError(f"Could not retrieve pitcher stats: {e}") ``` ### Common Query Patterns #### Aggregate Statistics ```python from sqlalchemy import func result = await session.execute( select( func.sum(Play.ab).label('at_bats'), func.sum(Play.hit).label('hits'), func.sum(Play.homerun).label('homeruns') ) .where(Play.batter_id == batter_lineup_id) ) stats = result.one() ``` #### Conditional Queries ```python query = select(RosterLink).where( RosterLink.game_id == game_id, RosterLink.card_id.is_not(None) # PD only ) if team_id is not None: query = query.where(RosterLink.team_id == team_id) result = await session.execute(query) ``` #### Filtering with IN Clause ```python lineup_ids = [1, 2, 3, 4, 5] result = await session.execute( select(Lineup).where( Lineup.game_id == game_id, Lineup.id.in_(lineup_ids) ) ) lineups = list(result.scalars().all()) ``` ### Transaction Management #### Single Operation Transaction ```python async with AsyncSessionLocal() as session: # Automatic transaction session.add(model) await session.commit() # Auto-rollback on exception ``` #### Multi-Step Transaction ```python async with AsyncSessionLocal() as session: try: # Step 1 game = Game(...) session.add(game) # Step 2 for lineup_data in lineup_list: lineup = Lineup(game_id=game.id, ...) session.add(lineup) # Step 3 - all or nothing await session.commit() except Exception as e: await session.rollback() # Rolls back all steps raise ``` ### Handling Optional Results ```python # May return None game = result.scalar_one_or_none() if not game: logger.warning(f"Game {game_id} not found") return None # Do something with game ``` ## Troubleshooting ### Connection Issues **Symptom**: `asyncpg.exceptions.InvalidCatalogNameError: database "paperdynasty_dev" does not exist` **Solution**: 1. Verify database exists: `psql -h 10.10.0.42 -U paperdynasty -l` 2. Create if needed: `createdb -h 10.10.0.42 -U paperdynasty paperdynasty_dev` 3. Check `DATABASE_URL` in `.env` **Symptom**: `asyncpg.exceptions.InvalidPasswordError` **Solution**: 1. Verify password in `.env` matches database 2. Test connection: `psql postgresql://paperdynasty:PASSWORD@10.10.0.42:5432/paperdynasty_dev` ### Pool Exhaustion **Symptom**: `asyncio.TimeoutError` or hanging on database operations **Cause**: All pool connections in use, new operations waiting for available connection. **Solutions**: 1. Increase pool size: `DB_POOL_SIZE=20` in `.env` 2. Increase overflow: `DB_MAX_OVERFLOW=30` in `.env` 3. Check for unclosed sessions (should be impossible with context managers) 4. Review long-running queries ### Async Session Errors **Symptom**: `AttributeError: 'NoneType' object has no attribute 'id'` after commit **Cause**: `expire_on_commit=True` (default) expires objects after commit. **Solution**: Already configured with `expire_on_commit=False` in `AsyncSessionLocal`. **Symptom**: `sqlalchemy.exc.InvalidRequestError: Object is already attached to session` **Cause**: Trying to add same object to multiple sessions. **Solution**: Use separate session for each operation. Don't share objects across sessions. ### SQLAlchemy Column Type Errors **Symptom**: Type checker warns about `Column[int]` not assignable to `int` **Explanation**: SQLAlchemy model attributes are typed as `Column[T]` for type checkers but are `T` at runtime. **Solution**: Use `# type: ignore[assignment]` on known false positives: ```python state.current_batter_id = lineup.id # type: ignore[assignment] ``` See backend CLAUDE.md section "Type Checking & Common False Positives" for full guide. ### Deadlocks **Symptom**: `asyncpg.exceptions.DeadlockDetectedError` **Cause**: Two transactions waiting on each other's locks. **Solution**: 1. Keep transactions short 2. Access tables in consistent order across operations 3. Use `FOR UPDATE` sparingly 4. Retry transaction on deadlock ### Migration Issues **Symptom**: `AttributeError: 'Game' object has no attribute 'some_field'` **Cause**: Database schema doesn't match ORM models. **Solution**: 1. Create migration: `alembic revision --autogenerate -m "Add some_field"` 2. Apply migration: `alembic upgrade head` 3. Verify: `alembic current` ## Examples ### Example 1: Creating a Complete Game ```python from uuid import uuid4 from app.database.operations import DatabaseOperations async def create_complete_game(): db_ops = DatabaseOperations() game_id = uuid4() # 1. Create game game = await db_ops.create_game( game_id=game_id, league_id="sba", home_team_id=1, away_team_id=2, game_mode="friendly", visibility="public" ) # 2. Add home team lineup (SBA) home_lineup = [] for i in range(1, 10): lineup = await db_ops.add_sba_lineup_player( game_id=game_id, team_id=1, player_id=100 + i, position="P" if i == 1 else f"{i}B", batting_order=i, is_starter=True ) home_lineup.append(lineup) # 3. Add away team lineup away_lineup = [] for i in range(1, 10): lineup = await db_ops.add_sba_lineup_player( game_id=game_id, team_id=2, player_id=200 + i, position="P" if i == 1 else f"{i}B", batting_order=i, is_starter=True ) away_lineup.append(lineup) # 4. Create game session session = await db_ops.create_game_session(game_id) return game_id ``` ### Example 2: Recording a Complete Play ```python async def record_play(game_id: UUID, play_data: dict): db_ops = DatabaseOperations() # Save play play_id = await db_ops.save_play({ 'game_id': game_id, 'play_number': play_data['play_number'], 'inning': play_data['inning'], 'half': play_data['half'], 'outs_before': play_data['outs_before'], 'batter_id': play_data['batter_lineup_id'], 'pitcher_id': play_data['pitcher_lineup_id'], 'dice_roll': play_data['dice_roll'], 'result_description': play_data['description'], 'pa': 1, 'ab': 1, 'hit': 1 if play_data['outcome'] in ['single', 'double', 'triple', 'homerun'] else 0, 'homerun': 1 if play_data['outcome'] == 'homerun' else 0, 'complete': True }) # Update game state await db_ops.update_game_state( game_id=game_id, inning=play_data['inning'], half=play_data['half'], home_score=play_data['home_score'], away_score=play_data['away_score'] ) return play_id ``` ### Example 3: Game State Recovery ```python async def recover_game(game_id: UUID): db_ops = DatabaseOperations() # Load complete state in single transaction game_data = await db_ops.load_game_state(game_id) if not game_data: print(f"Game {game_id} not found") return None # Access loaded data game = game_data['game'] lineups = game_data['lineups'] plays = game_data['plays'] print(f"Game: {game['league_id']}") print(f"Score: {game['away_score']} - {game['home_score']}") print(f"Inning: {game['current_inning']} {game['current_half']}") print(f"Lineups: {len(lineups)} players") print(f"Plays: {len(plays)} recorded") return game_data ``` ### Example 4: Batch Saving Dice Rolls ```python from app.models.dice_models import AbRoll, RollType async def save_inning_rolls(game_id: UUID, rolls: List[AbRoll]): db_ops = DatabaseOperations() # Batch save all rolls from inning await db_ops.save_rolls_batch(rolls) print(f"Saved {len(rolls)} dice rolls for game {game_id}") ``` ### Example 5: Rollback to Previous Play ```python async def rollback_to_play(game_id: UUID, play_number: int): """Rollback game to a specific play number.""" db_ops = DatabaseOperations() # Delete all data after target play plays_deleted = await db_ops.delete_plays_after(game_id, play_number) subs_deleted = await db_ops.delete_substitutions_after(game_id, play_number) rolls_deleted = await db_ops.delete_rolls_after(game_id, play_number) print(f"Rolled back game {game_id} to play {play_number}") print(f"Deleted: {plays_deleted} plays, {subs_deleted} subs, {rolls_deleted} rolls") # Recover state from remaining plays # (StateManager will rebuild from database) ``` ## Performance Notes ### Optimizations Applied 1. **Direct UPDATE Statements** (`update_game_state`) - Uses direct UPDATE without SELECT - Faster than fetch-modify-commit pattern 2. **Conditional Updates** (Used by GameEngine) - Only UPDATE when state actually changes - ~40-60% fewer writes in low-scoring games 3. **Batch Operations** (`save_rolls_batch`) - Single transaction for multiple inserts - Reduces network round-trips 4. **Minimal Refreshes** (`save_play`) - Returns ID only, doesn't refresh with relationships - Avoids expensive JOINs when not needed 5. **Expire on Commit Disabled** - Objects remain accessible after commit - No automatic refetch when accessing attributes ### Connection Pool Tuning **Default Settings** (for 10 concurrent games): - Pool size: 10 - Max overflow: 20 - Total capacity: 30 connections **High Load Settings** (for 20+ concurrent games): ```bash DB_POOL_SIZE=20 DB_MAX_OVERFLOW=40 ``` ### Query Performance **Expected Latency** (on local network): - Simple SELECT: < 10ms - INSERT with index updates: < 20ms - UPDATE with WHERE: < 15ms - Complex JOIN query: < 50ms - Batch INSERT (10 records): < 30ms **Performance Targets**: - Database write: < 100ms (async, non-blocking) - State recovery: < 2 seconds (loads 100+ plays) ## Key Files Reference ``` app/database/ ├── session.py (55 lines) │ ├── engine # SQLAlchemy async engine │ ├── AsyncSessionLocal # Session factory │ ├── Base # ORM base class │ ├── init_db() # Create all tables │ └── get_session() # FastAPI dependency │ └── operations.py (882 lines) └── DatabaseOperations class ├── Game Operations (3 methods) │ ├── create_game() │ ├── get_game() │ └── update_game_state() │ ├── Lineup Operations (3 methods) │ ├── add_pd_lineup_card() │ ├── add_sba_lineup_player() │ └── get_active_lineup() │ ├── Play Operations (2 methods) │ ├── save_play() │ └── get_plays() │ ├── Roster Operations (6 methods) │ ├── add_pd_roster_card() │ ├── add_sba_roster_player() │ ├── get_pd_roster() │ ├── get_sba_roster() │ └── remove_roster_entry() │ ├── Session Operations (2 methods) │ ├── create_game_session() │ └── update_session_snapshot() │ ├── Dice Roll Operations (2 methods) │ ├── save_rolls_batch() │ └── get_rolls_for_game() │ ├── Recovery Operations (1 method) │ └── load_game_state() │ └── Rollback Operations (3 methods) ├── delete_plays_after() ├── delete_substitutions_after() └── delete_rolls_after() ``` ## Testing **Unit Tests**: Not applicable (database operations are integration by nature) **Integration Tests**: - `tests/integration/database/test_operations.py` (21 tests) - `tests/integration/test_state_persistence.py` (8 tests) **Running Tests**: ```bash # All database integration tests uv run pytest tests/integration/database/ -v # Specific operation test uv run pytest tests/integration/database/test_operations.py::TestGameOperations::test_create_game -v # State persistence tests uv run pytest tests/integration/test_state_persistence.py -v ``` **Test Requirements**: - PostgreSQL database running at `10.10.0.42:5432` - Database `paperdynasty_dev` exists - User `paperdynasty` has permissions - Environment variables configured in `.env` ## Related Documentation - **Backend CLAUDE.md**: `../CLAUDE.md` - Overall backend architecture - **Database Models**: `../models/db_models.py` - SQLAlchemy ORM models - **State Manager**: `../core/state_manager.py` - In-memory state management - **Game Engine**: `../core/game_engine.py` - Game logic using database operations - **Type Checking Guide**: `../../.claude/type-checking-guide.md` - SQLAlchemy type issues --- **Last Updated**: 2025-10-31 **Author**: Claude **Status**: Production-ready, optimized for performance