major-domo-database/migrations.py
Cal Corum eccf4d1441
All checks were successful
Build Docker Image / build (pull_request) Successful in 2m11s
feat: add migration tracking system (#81)
Adds schema_versions table and migrations.py runner to prevent
double-application and missed migrations across dev/prod environments.

- migrations/2026-03-27_add_schema_versions_table.sql: creates tracking table
- migrations.py: applies pending .sql files in sorted order, records each in schema_versions
- .gitignore: untrack migrations.py (was incorrectly ignored as legacy root file)

First run on an existing DB will apply all migrations (safe — all use IF NOT EXISTS).

Closes #81

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-27 05:34:13 -05:00

89 lines
2.5 KiB
Python

#!/usr/bin/env python3
"""Apply pending SQL migrations and record them in schema_versions.
Usage:
python migrations.py
Connects to PostgreSQL using the same environment variables as the API:
POSTGRES_DB (default: sba_master)
POSTGRES_USER (default: sba_admin)
POSTGRES_PASSWORD (required)
POSTGRES_HOST (default: sba_postgres)
POSTGRES_PORT (default: 5432)
On first run against an existing database, all migrations will be applied.
All migration files use IF NOT EXISTS guards so re-applying is safe.
"""
import os
import sys
from pathlib import Path
import psycopg2
MIGRATIONS_DIR = Path(__file__).parent / "migrations"
_CREATE_SCHEMA_VERSIONS = """
CREATE TABLE IF NOT EXISTS schema_versions (
filename VARCHAR(255) PRIMARY KEY,
applied_at TIMESTAMP NOT NULL DEFAULT NOW()
);
"""
def _get_connection():
password = os.environ.get("POSTGRES_PASSWORD")
if password is None:
raise RuntimeError("POSTGRES_PASSWORD environment variable is not set")
return psycopg2.connect(
dbname=os.environ.get("POSTGRES_DB", "sba_master"),
user=os.environ.get("POSTGRES_USER", "sba_admin"),
password=password,
host=os.environ.get("POSTGRES_HOST", "sba_postgres"),
port=int(os.environ.get("POSTGRES_PORT", "5432")),
)
def main():
conn = _get_connection()
try:
with conn:
with conn.cursor() as cur:
cur.execute(_CREATE_SCHEMA_VERSIONS)
with conn.cursor() as cur:
cur.execute("SELECT filename FROM schema_versions")
applied = {row[0] for row in cur.fetchall()}
migration_files = sorted(MIGRATIONS_DIR.glob("*.sql"))
pending = [f for f in migration_files if f.name not in applied]
if not pending:
print("No pending migrations.")
return
for migration_file in pending:
print(f"Applying {migration_file.name} ...", end=" ", flush=True)
sql = migration_file.read_text()
with conn:
with conn.cursor() as cur:
cur.execute(sql)
cur.execute(
"INSERT INTO schema_versions (filename) VALUES (%s)",
(migration_file.name,),
)
print("done")
print(f"\nApplied {len(pending)} migration(s).")
finally:
conn.close()
if __name__ == "__main__":
try:
main()
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)