mantimon-tcg/backend/tests/conftest.py
Cal Corum c00ee87f25 Switch to testcontainers for automatic test container management
- Create tests/conftest.py with testcontainers for Postgres and Redis
- Auto-detect Docker Desktop socket and disable Ryuk for compatibility
- Update tests/db/conftest.py and tests/services/conftest.py to use shared fixtures
- Fix test_resolve_effect_logs_exceptions: logger was disabled by pytest
- Fix test_save_and_load_with_real_redis: use redis_url fixture
- Minor lint fix in engine_validation.py

Tests now auto-start containers on run - no need for `docker compose up`
All 1199 tests passing.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-28 16:49:11 -06:00

292 lines
8.9 KiB
Python

"""Shared test fixtures for Mantimon TCG using testcontainers.
This module provides database and Redis fixtures that automatically start
containers when tests run - no need for `docker compose up` beforehand.
Key Features:
- Testcontainers auto-starts Postgres and Redis for test session
- Containers are shared across all tests (session scope)
- Tables are truncated between tests for isolation
- Sync psycopg2 used for fixture teardown (avoids event loop issues)
Usage:
@pytest.mark.asyncio
async def test_something(db_session, redis_client):
# db_session is an AsyncSession connected to testcontainer Postgres
# redis_client is an async Redis client connected to testcontainer Redis
pass
Environment:
The following environment variables are auto-configured for Docker Desktop:
- DOCKER_HOST: Set to Docker Desktop socket if default socket not found
- TESTCONTAINERS_RYUK_DISABLED: Disabled to avoid Ryuk startup issues
"""
import contextlib
import os
from pathlib import Path
# =============================================================================
# Docker Environment Configuration
# =============================================================================
# Auto-detect Docker Desktop socket if default socket doesn't exist
_default_socket = Path("/var/run/docker.sock")
_desktop_socket = Path.home() / ".docker/desktop/docker.sock"
if not _default_socket.exists() and _desktop_socket.exists():
os.environ.setdefault("DOCKER_HOST", f"unix://{_desktop_socket}")
# Disable Ryuk (cleanup container) to avoid startup issues with Docker Desktop
os.environ.setdefault("TESTCONTAINERS_RYUK_DISABLED", "true")
# ruff: noqa: E402
# Imports must come after environment setup for testcontainers to work correctly
from collections.abc import AsyncGenerator
from typing import Any
import psycopg2
import pytest
import pytest_asyncio
import redis.asyncio as aioredis
from alembic import command
from alembic.config import Config
from sqlalchemy import pool
from sqlalchemy.ext.asyncio import (
AsyncSession,
async_sessionmaker,
create_async_engine,
)
from testcontainers.postgres import PostgresContainer
from testcontainers.redis import RedisContainer
# =============================================================================
# Module-level container state (set by session fixtures)
# =============================================================================
_postgres_container: PostgresContainer | None = None
_redis_container: RedisContainer | None = None
_db_params: dict[str, Any] = {}
_async_db_url: str = ""
_sync_db_url: str = ""
_redis_url: str = ""
# =============================================================================
# Tables to Truncate (ordered for FK constraints - children first)
# =============================================================================
TABLES_TO_TRUNCATE = [
"game_history",
"active_games",
"campaign_progress",
"collections",
"decks",
"oauth_linked_accounts",
"users",
]
# =============================================================================
# Container Fixtures (Session-Scoped)
# =============================================================================
@pytest.fixture(scope="session")
def postgres_container() -> PostgresContainer:
"""Start a Postgres container for the test session.
The container is started once and shared across all tests.
It's automatically stopped when the test session ends.
"""
global _postgres_container, _db_params, _async_db_url, _sync_db_url
container = PostgresContainer(
image="postgres:15-alpine",
username="mantimon",
password="mantimon",
dbname="mantimon",
)
container.start()
# Extract connection info
host = container.get_container_host_ip()
port = container.get_exposed_port(5432)
_db_params = {
"host": host,
"port": int(port),
"user": "mantimon",
"password": "mantimon",
"dbname": "mantimon",
}
_async_db_url = f"postgresql+asyncpg://mantimon:mantimon@{host}:{port}/mantimon"
_sync_db_url = f"postgresql+psycopg2://mantimon:mantimon@{host}:{port}/mantimon"
_postgres_container = container
yield container
container.stop()
@pytest.fixture(scope="session")
def redis_container() -> RedisContainer:
"""Start a Redis container for the test session.
The container is started once and shared across all tests.
It's automatically stopped when the test session ends.
"""
global _redis_container, _redis_url
container = RedisContainer(image="redis:7-alpine")
container.start()
host = container.get_container_host_ip()
port = container.get_exposed_port(6379)
_redis_url = f"redis://{host}:{port}/0"
_redis_container = container
yield container
container.stop()
@pytest.fixture(scope="session")
def redis_url(redis_container: RedisContainer) -> str:
"""Get the Redis connection URL for the testcontainer.
This fixture provides the URL so tests can create their own
Redis clients within their own event loop, avoiding the
'attached to different loop' issue.
"""
return _redis_url
# =============================================================================
# Migration Fixture (Session-Scoped)
# =============================================================================
@pytest.fixture(scope="session", autouse=True)
def _run_migrations(postgres_container: PostgresContainer) -> None:
"""Run Alembic migrations once per session after Postgres starts.
This fixture depends on postgres_container to ensure the database
is running before migrations are applied.
"""
alembic_cfg = Config("alembic.ini")
alembic_cfg.set_main_option("sqlalchemy.url", _sync_db_url)
command.upgrade(alembic_cfg, "head")
# =============================================================================
# Sync Helper Functions (for fixture teardown)
# =============================================================================
def truncate_all_tables() -> None:
"""Truncate all tables using sync psycopg2.
This runs in fixture teardown which may be in a different event loop,
so we use sync operations to avoid event loop issues.
"""
if not _db_params:
return # Container not started yet
conn = psycopg2.connect(**_db_params)
try:
conn.autocommit = True
with conn.cursor() as cur:
for table in TABLES_TO_TRUNCATE:
cur.execute(f"TRUNCATE TABLE {table} CASCADE")
finally:
conn.close()
# =============================================================================
# Database Session Fixture (Function-Scoped)
# =============================================================================
@pytest_asyncio.fixture
async def db_session(
postgres_container: PostgresContainer,
) -> AsyncGenerator[AsyncSession, None]:
"""Provide a fresh async session for each test.
The session uses NullPool (no connection reuse) and expires on commit=False.
Tables are truncated AFTER each test using sync psycopg2 to avoid event
loop issues in fixture teardown.
Example:
async def test_create_user(db_session):
user = User(email="test@example.com", ...)
db_session.add(user)
await db_session.flush()
assert user.id is not None
"""
engine = create_async_engine(
_async_db_url,
echo=False,
poolclass=pool.NullPool,
)
session_factory = async_sessionmaker(
bind=engine,
class_=AsyncSession,
expire_on_commit=False,
)
session = session_factory()
try:
yield session
finally:
with contextlib.suppress(RuntimeError):
await session.close()
with contextlib.suppress(RuntimeError):
await engine.dispose()
# SYNC truncate - works regardless of event loop
truncate_all_tables()
# =============================================================================
# Redis Client Fixture (Function-Scoped)
# =============================================================================
@pytest_asyncio.fixture
async def redis_client(
redis_container: RedisContainer,
) -> AsyncGenerator[Any, None]:
"""Provide a Redis client for testing.
Flushes the database before and after each test for isolation.
"""
client = aioredis.from_url(_redis_url, decode_responses=True)
try:
await client.flushdb()
yield client
finally:
try:
await client.flushdb()
await client.aclose()
except RuntimeError:
pass
# =============================================================================
# Utility Fixtures
# =============================================================================
@pytest.fixture
def anyio_backend() -> str:
"""Specify asyncio as the async backend."""
return "asyncio"