"""Database test fixtures for Mantimon TCG. This module provides fixtures for database integration testing using the running dev containers (docker-compose). Key insight: pytest-asyncio runs fixture teardown in a DIFFERENT event loop than the test body. This causes "Future attached to different loop" errors when trying to do async cleanup on connections created during the test. Solution: Use SYNC psycopg2 for all fixture setup/teardown operations, and only use asyncpg within the test body itself. The session is created fresh per test with no cleanup needed (NullPool + TRUNCATE after). Prerequisites: docker compose up -d # Start Postgres (5433) and Redis (6380) """ import contextlib import os from collections.abc import AsyncGenerator from typing import Any import psycopg2 import pytest import pytest_asyncio from alembic import command from alembic.config import Config from sqlalchemy import pool from sqlalchemy.ext.asyncio import ( AsyncSession, async_sessionmaker, create_async_engine, ) # ============================================================================= # Configuration # ============================================================================= # Dev container URLs (matches docker-compose.yml) TEST_DATABASE_URL = os.getenv( "TEST_DATABASE_URL", "postgresql+asyncpg://mantimon:mantimon@localhost:5433/mantimon", ) SYNC_DATABASE_URL = os.getenv( "SYNC_DATABASE_URL", "postgresql+psycopg2://mantimon:mantimon@localhost:5433/mantimon", ) TEST_REDIS_URL = os.getenv( "TEST_REDIS_URL", "redis://localhost:6380/1", # Use DB 1 for tests, DB 0 for dev ) # Connection params for sync psycopg2 (used in fixtures) DB_PARAMS = { "host": "localhost", "port": 5433, "user": "mantimon", "password": "mantimon", "dbname": "mantimon", } # ============================================================================= # Tables to Truncate (ordered for FK constraints - children first) # ============================================================================= TABLES_TO_TRUNCATE = [ "game_history", "active_games", "campaign_progress", "collections", "decks", "oauth_linked_accounts", "users", ] # ============================================================================= # Sync Helper Functions (for fixture setup/teardown) # ============================================================================= def truncate_all_tables() -> None: """Truncate all tables using sync psycopg2. This runs in fixture teardown which may be in a different event loop, so we use sync operations to avoid event loop issues. """ conn = psycopg2.connect(**DB_PARAMS) try: conn.autocommit = True with conn.cursor() as cur: for table in TABLES_TO_TRUNCATE: cur.execute(f"TRUNCATE TABLE {table} CASCADE") finally: conn.close() # ============================================================================= # Migration Fixture (Session-Scoped, Sync) # ============================================================================= @pytest.fixture(scope="session", autouse=True) def _run_migrations() -> None: """Run Alembic migrations once per session. Uses sync psycopg2 to avoid async event loop issues. autouse=True ensures migrations run before any tests. """ alembic_cfg = Config("alembic.ini") alembic_cfg.set_main_option("sqlalchemy.url", SYNC_DATABASE_URL) command.upgrade(alembic_cfg, "head") # ============================================================================= # Database Session Fixture (Function-Scoped) # ============================================================================= @pytest_asyncio.fixture async def db_session() -> AsyncGenerator[AsyncSession, None]: """Provide a fresh async session for each test. The session uses NullPool (no connection reuse) and expires on commit=False. Tables are truncated AFTER each test using sync psycopg2 to avoid event loop issues in fixture teardown. Example: async def test_create_user(db_session): user = User(email="test@example.com", ...) db_session.add(user) await db_session.flush() assert user.id is not None # Data is truncated after test via sync cleanup """ # Create fresh engine with NullPool engine = create_async_engine( TEST_DATABASE_URL, echo=False, poolclass=pool.NullPool, ) # Create session factory session_factory = async_sessionmaker( bind=engine, class_=AsyncSession, expire_on_commit=False, ) # Create session session = session_factory() try: yield session finally: # Close session without rollback (may fail on different event loop) # The truncate below will clean up any uncommitted data anyway with contextlib.suppress(RuntimeError): await session.close() # Dispose engine (may fail on different event loop) with contextlib.suppress(RuntimeError): await engine.dispose() # SYNC truncate - this always works regardless of event loop truncate_all_tables() # ============================================================================= # Redis Client Fixture # ============================================================================= @pytest_asyncio.fixture async def redis_client() -> AsyncGenerator[Any, None]: """Provide a Redis client for testing. Uses DB 1 (test database) to avoid conflicts with dev data in DB 0. Flushes the database after each test. """ import redis.asyncio as aioredis client = aioredis.from_url(TEST_REDIS_URL, decode_responses=True) try: yield client finally: try: await client.flushdb() await client.aclose() except RuntimeError: # Ignore event loop errors during cleanup pass # ============================================================================= # Utility Fixtures # ============================================================================= @pytest.fixture def anyio_backend() -> str: """Specify asyncio as the async backend.""" return "asyncio"