mantimon-tcg/backend/tests/db/conftest.py
Cal Corum 50684a1b11 Add database infrastructure with SQLAlchemy models and test suite
Phase 1 Database Implementation (DB-001 through DB-012):

Models:
- User: OAuth support (Google/Discord), premium subscriptions
- Collection: Card ownership with CardSource enum
- Deck: JSONB cards/energy_cards, validation state
- CampaignProgress: One-to-one with User, medals/NPCs as JSONB
- ActiveGame: In-progress games with GameType enum
- GameHistory: Completed games with EndReason enum, replay data

Infrastructure:
- Alembic migrations with sync psycopg2 (avoids async issues)
- Docker Compose for Postgres (5433) and Redis (6380)
- App config with Pydantic settings
- Redis client helper

Test Infrastructure:
- 68 database tests (47 model + 21 relationship)
- Async factory pattern for test data creation
- Sync TRUNCATE cleanup (solves pytest-asyncio event loop mismatch)
- Uses dev containers instead of testcontainers for reliability

Key technical decisions:
- passive_deletes=True for ON DELETE SET NULL relationships
- NullPool for test sessions (no connection reuse)
- expire_on_commit=False with manual expire() for relationship tests
2026-01-27 10:17:30 -06:00

205 lines
6.1 KiB
Python

"""Database test fixtures for Mantimon TCG.
This module provides fixtures for database integration testing using the
running dev containers (docker-compose).
Key insight: pytest-asyncio runs fixture teardown in a DIFFERENT event loop
than the test body. This causes "Future attached to different loop" errors
when trying to do async cleanup on connections created during the test.
Solution: Use SYNC psycopg2 for all fixture setup/teardown operations,
and only use asyncpg within the test body itself. The session is created
fresh per test with no cleanup needed (NullPool + TRUNCATE after).
Prerequisites:
docker compose up -d # Start Postgres (5433) and Redis (6380)
"""
import contextlib
import os
from collections.abc import AsyncGenerator
from typing import Any
import psycopg2
import pytest
import pytest_asyncio
from alembic import command
from alembic.config import Config
from sqlalchemy import pool
from sqlalchemy.ext.asyncio import (
AsyncSession,
async_sessionmaker,
create_async_engine,
)
# =============================================================================
# Configuration
# =============================================================================
# Dev container URLs (matches docker-compose.yml)
TEST_DATABASE_URL = os.getenv(
"TEST_DATABASE_URL",
"postgresql+asyncpg://mantimon:mantimon@localhost:5433/mantimon",
)
SYNC_DATABASE_URL = os.getenv(
"SYNC_DATABASE_URL",
"postgresql+psycopg2://mantimon:mantimon@localhost:5433/mantimon",
)
TEST_REDIS_URL = os.getenv(
"TEST_REDIS_URL",
"redis://localhost:6380/1", # Use DB 1 for tests, DB 0 for dev
)
# Connection params for sync psycopg2 (used in fixtures)
DB_PARAMS = {
"host": "localhost",
"port": 5433,
"user": "mantimon",
"password": "mantimon",
"dbname": "mantimon",
}
# =============================================================================
# Tables to Truncate (ordered for FK constraints - children first)
# =============================================================================
TABLES_TO_TRUNCATE = [
"game_history",
"active_games",
"campaign_progress",
"collections",
"decks",
"users",
]
# =============================================================================
# Sync Helper Functions (for fixture setup/teardown)
# =============================================================================
def truncate_all_tables() -> None:
"""Truncate all tables using sync psycopg2.
This runs in fixture teardown which may be in a different event loop,
so we use sync operations to avoid event loop issues.
"""
conn = psycopg2.connect(**DB_PARAMS)
try:
conn.autocommit = True
with conn.cursor() as cur:
for table in TABLES_TO_TRUNCATE:
cur.execute(f"TRUNCATE TABLE {table} CASCADE")
finally:
conn.close()
# =============================================================================
# Migration Fixture (Session-Scoped, Sync)
# =============================================================================
@pytest.fixture(scope="session", autouse=True)
def _run_migrations() -> None:
"""Run Alembic migrations once per session.
Uses sync psycopg2 to avoid async event loop issues.
autouse=True ensures migrations run before any tests.
"""
alembic_cfg = Config("alembic.ini")
alembic_cfg.set_main_option("sqlalchemy.url", SYNC_DATABASE_URL)
command.upgrade(alembic_cfg, "head")
# =============================================================================
# Database Session Fixture (Function-Scoped)
# =============================================================================
@pytest_asyncio.fixture
async def db_session() -> AsyncGenerator[AsyncSession, None]:
"""Provide a fresh async session for each test.
The session uses NullPool (no connection reuse) and expires on commit=False.
Tables are truncated AFTER each test using sync psycopg2 to avoid event
loop issues in fixture teardown.
Example:
async def test_create_user(db_session):
user = User(email="test@example.com", ...)
db_session.add(user)
await db_session.flush()
assert user.id is not None
# Data is truncated after test via sync cleanup
"""
# Create fresh engine with NullPool
engine = create_async_engine(
TEST_DATABASE_URL,
echo=False,
poolclass=pool.NullPool,
)
# Create session factory
session_factory = async_sessionmaker(
bind=engine,
class_=AsyncSession,
expire_on_commit=False,
)
# Create session
session = session_factory()
try:
yield session
finally:
# Close session without rollback (may fail on different event loop)
# The truncate below will clean up any uncommitted data anyway
with contextlib.suppress(RuntimeError):
await session.close()
# Dispose engine (may fail on different event loop)
with contextlib.suppress(RuntimeError):
await engine.dispose()
# SYNC truncate - this always works regardless of event loop
truncate_all_tables()
# =============================================================================
# Redis Client Fixture
# =============================================================================
@pytest_asyncio.fixture
async def redis_client() -> AsyncGenerator[Any, None]:
"""Provide a Redis client for testing.
Uses DB 1 (test database) to avoid conflicts with dev data in DB 0.
Flushes the database after each test.
"""
import redis.asyncio as aioredis
client = aioredis.from_url(TEST_REDIS_URL, decode_responses=True)
try:
yield client
finally:
try:
await client.flushdb()
await client.aclose()
except RuntimeError:
# Ignore event loop errors during cleanup
pass
# =============================================================================
# Utility Fixtures
# =============================================================================
@pytest.fixture
def anyio_backend() -> str:
"""Specify asyncio as the async backend."""
return "asyncio"