"""Tests for the evolution tables SQL migration (WP-04). Unit tests read the SQL migration file and verify that all required DDL statements are present — no database connection needed. Integration tests (marked with pytest.mark.integration) execute the migration against a live PostgreSQL instance and verify structure via information_schema and pg_indexes. They are skipped automatically when POSTGRES_HOST is not set, and are designed to be idempotent so they can be run against the dev database without damage. To run integration tests locally: POSTGRES_HOST=localhost POSTGRES_USER=pd_admin POSTGRES_PASSWORD=... \\ pytest tests/test_evolution_migration.py -m integration """ import os import re import pytest # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- MIGRATION_PATH = os.path.join( os.path.dirname(__file__), "..", "migrations", "2026-03-12_add_evolution_tables.sql", ) def _load_sql(): """Return the migration file text, lowercased and with SQL comments stripped. Single-line comments (-- ...) are removed before pattern matching so that the verification regexes do not accidentally match against comment text. """ with open(MIGRATION_PATH) as fh: content = fh.read() content = re.sub(r"--[^\n]*", "", content) return content.lower() # --------------------------------------------------------------------------- # Unit tests — SQL file content, no DB required # --------------------------------------------------------------------------- def test_migration_file_exists(): """The migration file must exist at the expected path.""" assert os.path.isfile(MIGRATION_PATH), f"Migration file not found: {MIGRATION_PATH}" def test_migration_wrapped_in_transaction(): """Migration must be wrapped in BEGIN / COMMIT for atomicity.""" sql = _load_sql() assert "begin;" in sql assert "commit;" in sql def test_all_five_tables_created(): """Each of the five new tables must have a CREATE TABLE IF NOT EXISTS statement.""" sql = _load_sql() expected = [ "player_season_stats", "evolution_track", "evolution_card_state", "evolution_tier_boost", "evolution_cosmetic", ] for table in expected: pattern = rf"create table if not exists {table}" assert re.search( pattern, sql ), f"Missing CREATE TABLE IF NOT EXISTS for {table}" def test_player_season_stats_columns(): """player_season_stats must contain all required batting, pitching and meta columns.""" sql = _load_sql() required_columns = [ "player_id", "team_id", "season", "games_batting", "pa", "ab", "hits", "hr", "doubles", "triples", "bb", "hbp", "so", "rbi", "runs", "sb", "cs", "games_pitching", "outs", r"\bk\b", # pitcher Ks — match as whole word to avoid false positives "bb_allowed", "hits_allowed", "hr_allowed", "wins", "losses", "saves", "holds", "blown_saves", "last_game_id", "last_updated_at", ] # Extract just the player_season_stats block for targeted matching match = re.search( r"create table if not exists player_season_stats\s*\((.+?)\);", sql, re.DOTALL, ) assert match, "Could not locate player_season_stats table body" block = match.group(1) for col in required_columns: assert re.search( col, block ), f"Missing column pattern '{col}' in player_season_stats" def test_evolution_track_columns(): """evolution_track must have name, card_type, formula, and t1–t4 threshold columns.""" sql = _load_sql() match = re.search( r"create table if not exists evolution_track\s*\((.+?)\);", sql, re.DOTALL, ) assert match, "Could not locate evolution_track table body" block = match.group(1) for col in [ "name", "card_type", "formula", "t1_threshold", "t2_threshold", "t3_threshold", "t4_threshold", ]: assert col in block, f"Missing column '{col}' in evolution_track" def test_evolution_card_state_columns(): """evolution_card_state must have player_id, team_id, track_id, tier, value, and flags.""" sql = _load_sql() match = re.search( r"create table if not exists evolution_card_state\s*\((.+?)\);", sql, re.DOTALL, ) assert match, "Could not locate evolution_card_state table body" block = match.group(1) for col in [ "player_id", "team_id", "track_id", "current_tier", "current_value", "fully_evolved", "last_evaluated_at", ]: assert col in block, f"Missing column '{col}' in evolution_card_state" def test_phase2_stubs_have_card_state_fk(): """evolution_tier_boost and evolution_cosmetic must reference evolution_card_state.""" sql = _load_sql() for table in ["evolution_tier_boost", "evolution_cosmetic"]: match = re.search( rf"create table if not exists {table}\s*\((.+?)\);", sql, re.DOTALL, ) assert match, f"Could not locate {table} table body" block = match.group(1) assert "card_state_id" in block, f"Missing card_state_id FK in {table}" assert ( "evolution_card_state" in block ), f"Missing FK reference to evolution_card_state in {table}" def test_indexes_use_if_not_exists(): """All CREATE INDEX statements must use IF NOT EXISTS for idempotency.""" sql = _load_sql() raw_indexes = re.findall(r"create(?:\s+unique)?\s+index\s+(?!if not exists)", sql) assert not raw_indexes, ( f"Found CREATE INDEX without IF NOT EXISTS — migration is not idempotent. " f"Matches: {raw_indexes}" ) def test_required_indexes_present(): """The migration must create all required indexes.""" sql = _load_sql() expected_indexes = [ "player_season_stats_player_team_season_uniq", "player_season_stats_team_season_idx", "player_season_stats_player_season_idx", "evolution_card_state_player_team_uniq", ] for idx in expected_indexes: assert idx in sql, f"Missing index '{idx}' in migration" def test_player_season_stats_unique_index_covers_correct_columns(): """The UNIQUE index on player_season_stats must cover (player_id, team_id, season).""" sql = _load_sql() match = re.search( r"player_season_stats_player_team_season_uniq[^\n]*\n\s+on player_season_stats\s*\(([^)]+)\)", sql, ) assert match, "Could not locate UNIQUE index definition for player_season_stats" cols = match.group(1) for col in ["player_id", "team_id", "season"]: assert ( col in cols ), f"Column '{col}' missing from UNIQUE index on player_season_stats" def test_evolution_card_state_unique_index_covers_correct_columns(): """The UNIQUE index on evolution_card_state must cover (player_id, team_id).""" sql = _load_sql() match = re.search( r"evolution_card_state_player_team_uniq[^\n]*\n\s+on evolution_card_state\s*\(([^)]+)\)", sql, ) assert match, "Could not locate UNIQUE index definition for evolution_card_state" cols = match.group(1) for col in ["player_id", "team_id"]: assert ( col in cols ), f"Column '{col}' missing from UNIQUE index on evolution_card_state" def test_card_variant_column_added(): """card.variant must be added as INTEGER NULL DEFAULT NULL.""" sql = _load_sql() assert "alter table card" in sql assert "add column if not exists variant" in sql match = re.search( r"add column if not exists variant\s+(\w+)\s+null\s+default null", sql ) assert match, "card.variant must be INTEGER NULL DEFAULT NULL" assert ( match.group(1) == "integer" ), f"card.variant type must be INTEGER, got {match.group(1)}" def test_battingcard_image_url_column_added(): """battingcard.image_url must be added as VARCHAR(500) NULL.""" sql = _load_sql() assert "alter table battingcard" in sql match = re.search( r"alter table battingcard\s+add column if not exists image_url\s+varchar\(500\)\s+null", sql, ) assert match, "battingcard.image_url must be VARCHAR(500) NULL with IF NOT EXISTS" def test_pitchingcard_image_url_column_added(): """pitchingcard.image_url must be added as VARCHAR(500) NULL.""" sql = _load_sql() assert "alter table pitchingcard" in sql match = re.search( r"alter table pitchingcard\s+add column if not exists image_url\s+varchar\(500\)\s+null", sql, ) assert match, "pitchingcard.image_url must be VARCHAR(500) NULL with IF NOT EXISTS" def test_add_column_uses_if_not_exists(): """All ALTER TABLE ADD COLUMN statements must use IF NOT EXISTS for idempotency.""" sql = _load_sql() # Find any ADD COLUMN that is NOT followed by IF NOT EXISTS bad_adds = re.findall(r"add column(?!\s+if not exists)", sql) assert not bad_adds, ( f"Found ADD COLUMN without IF NOT EXISTS — migration is not idempotent. " f"Count: {len(bad_adds)}" ) def test_rollback_section_present(): """Migration must include rollback DROP statements for documentation.""" with open(MIGRATION_PATH) as fh: raw = fh.read().lower() assert "rollback" in raw assert "drop table if exists" in raw # --------------------------------------------------------------------------- # Integration tests — require live PostgreSQL # --------------------------------------------------------------------------- _pg_available = bool(os.environ.get("POSTGRES_HOST")) pytestmark_integration = pytest.mark.skipif( not _pg_available, reason="POSTGRES_HOST not set — skipping integration tests", ) @pytestmark_integration def test_integration_fresh_migration_creates_all_tables(pg_conn): """Running the migration on a clean schema creates all 5 expected tables.""" cur = pg_conn.cursor() cur.execute(""" SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' AND table_name IN ( 'player_season_stats', 'evolution_track', 'evolution_card_state', 'evolution_tier_boost', 'evolution_cosmetic' ) """) found = {row[0] for row in cur.fetchall()} cur.close() expected = { "player_season_stats", "evolution_track", "evolution_card_state", "evolution_tier_boost", "evolution_cosmetic", } assert found == expected, f"Missing tables: {expected - found}" @pytestmark_integration def test_integration_idempotent_rerun(pg_conn): """Running the migration a second time must not raise any errors.""" with open(MIGRATION_PATH) as fh: sql = fh.read() cur = pg_conn.cursor() cur.execute(sql) pg_conn.commit() cur.close() # If we reach here, the second run succeeded @pytestmark_integration def test_integration_required_indexes_exist(pg_conn): """All required indexes must be present in pg_indexes after migration.""" cur = pg_conn.cursor() cur.execute(""" SELECT indexname FROM pg_indexes WHERE tablename IN ('player_season_stats', 'evolution_card_state') """) found = {row[0] for row in cur.fetchall()} cur.close() expected = { "player_season_stats_player_team_season_uniq", "player_season_stats_team_season_idx", "player_season_stats_player_season_idx", "evolution_card_state_player_team_uniq", } assert expected.issubset(found), f"Missing indexes: {expected - found}" @pytestmark_integration def test_integration_fk_constraints_exist(pg_conn): """Foreign key constraints from evolution_card_state to player and team must exist.""" cur = pg_conn.cursor() cur.execute(""" SELECT tc.constraint_name, tc.table_name, kcu.column_name, ccu.table_name AS foreign_table FROM information_schema.table_constraints AS tc JOIN information_schema.key_column_usage AS kcu ON tc.constraint_name = kcu.constraint_name JOIN information_schema.constraint_column_usage AS ccu ON ccu.constraint_name = tc.constraint_name WHERE tc.constraint_type = 'FOREIGN KEY' AND tc.table_name = 'evolution_card_state' """) fks = {(row[2], row[3]) for row in cur.fetchall()} cur.close() assert ( "player_id", "player", ) in fks, "FK from evolution_card_state.player_id to player missing" assert ( "team_id", "team", ) in fks, "FK from evolution_card_state.team_id to team missing" assert ( "track_id", "evolution_track", ) in fks, "FK from evolution_card_state.track_id to evolution_track missing" @pytestmark_integration def test_integration_existing_data_preserved(pg_conn): """Adding columns to card/battingcard/pitchingcard must not destroy existing rows.""" cur = pg_conn.cursor() # Row counts should be non-negative and unchanged after migration for table in ("card", "battingcard", "pitchingcard"): cur.execute(f"SELECT COUNT(*) FROM {table}") count = cur.fetchone()[0] assert count >= 0, f"Unexpected row count for {table}: {count}" cur.close() @pytestmark_integration def test_integration_new_columns_nullable(pg_conn): """card.variant and battingcard/pitchingcard.image_url must be nullable.""" cur = pg_conn.cursor() cur.execute(""" SELECT table_name, column_name, is_nullable, column_default FROM information_schema.columns WHERE (table_name = 'card' AND column_name = 'variant') OR (table_name IN ('battingcard', 'pitchingcard') AND column_name = 'image_url') ORDER BY table_name, column_name """) rows = cur.fetchall() cur.close() assert len(rows) == 3, f"Expected 3 new columns, found {len(rows)}" for table_name, col_name, is_nullable, col_default in rows: assert is_nullable == "YES", f"{table_name}.{col_name} must be nullable"