paper-dynasty-database/scripts/audit_sqlite.py
Cal Corum 0cba52cea5 PostgreSQL migration: Complete code preparation phase
- Add db_helpers.py with cross-database upsert functions for SQLite/PostgreSQL
- Replace 12 on_conflict_replace() calls with PostgreSQL-compatible upserts
- Add unique indexes: StratPlay(game, play_num), Decision(game, pitcher)
- Add max_length to Team model fields (abbrev, sname, lname)
- Fix boolean comparison in teams.py (== 0/1 to == False/True)
- Create migrate_to_postgres.py with ID-preserving migration logic
- Create audit_sqlite.py for pre-migration data integrity checks
- Add PROJECT_PLAN.json for migration tracking
- Add .secrets/ to .gitignore for credentials

Audit results: 658,963 records across 29 tables, 2,390 orphaned stats (expected)

Based on Major Domo migration lessons learned (33 issues resolved there)
2026-01-25 23:05:54 -06:00

565 lines
18 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Paper Dynasty SQLite Data Integrity Audit
Pre-migration script to identify potential issues before migrating to PostgreSQL.
Based on issues discovered during Major Domo migration (August 2025).
Checks for:
1. NULL values in fields that will be NOT NULL in PostgreSQL
2. Orphaned foreign key records
3. VARCHAR field max lengths (PostgreSQL is stricter)
4. Record counts for baseline comparison
5. Primary key gaps or duplicates
Usage:
python scripts/audit_sqlite.py
python scripts/audit_sqlite.py --fix # Apply safe fixes
"""
import argparse
import json
import sqlite3
import sys
from datetime import datetime
from pathlib import Path
def connect_db(db_path: str) -> sqlite3.Connection:
"""Connect to SQLite database."""
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
return conn
def get_table_record_counts(conn: sqlite3.Connection) -> dict:
"""Get record counts for all tables."""
counts = {}
cursor = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' ORDER BY name"
)
for row in cursor:
table_name = row["name"]
count_cursor = conn.execute(f"SELECT COUNT(*) FROM {table_name}")
counts[table_name] = count_cursor.fetchone()[0]
return counts
def check_null_values(conn: sqlite3.Connection) -> list:
"""
Check for NULL values in fields that should not be null.
These are the fields that Major Domo found issues with.
"""
issues = []
# Fields to check - based on Major Domo experience
null_checks = [
# (table, field, description)
("team", "abbrev", "Team abbreviation"),
("team", "sname", "Team short name"),
("team", "lname", "Team long name"),
("player", "p_name", "Player name"),
("player", "image", "Player image URL"),
("card", "player_id", "Card player reference"),
("stratplay", "game_id", "Play game reference"),
("stratplay", "pitcher_id", "Play pitcher reference"),
("decision", "game_id", "Decision game reference"),
("decision", "pitcher_id", "Decision pitcher reference"),
]
for table, field, description in null_checks:
try:
cursor = conn.execute(f"SELECT COUNT(*) FROM {table} WHERE {field} IS NULL")
null_count = cursor.fetchone()[0]
if null_count > 0:
cursor = conn.execute(f"SELECT COUNT(*) FROM {table}")
total_count = cursor.fetchone()[0]
issues.append(
{
"type": "NULL_VALUE",
"severity": "HIGH"
if null_count > total_count * 0.1
else "MEDIUM",
"table": table,
"field": field,
"description": description,
"null_count": null_count,
"total_count": total_count,
"percentage": round(null_count / total_count * 100, 2)
if total_count > 0
else 0,
}
)
except sqlite3.OperationalError:
# Table or column doesn't exist
pass
return issues
def check_orphaned_foreign_keys(conn: sqlite3.Connection) -> list:
"""
Check for orphaned foreign key records.
These will fail with foreign key constraint violations in PostgreSQL.
"""
issues = []
# Foreign key relationships to check
fk_checks = [
# (child_table, child_field, parent_table, parent_field, description)
(
"card",
"player_id",
"player",
"player_id",
"Cards referencing non-existent players",
),
("card", "team_id", "team", "id", "Cards referencing non-existent teams"),
(
"stratplay",
"game_id",
"stratgame",
"id",
"Plays referencing non-existent games",
),
(
"stratplay",
"batter_id",
"player",
"player_id",
"Plays referencing non-existent batters",
),
(
"stratplay",
"pitcher_id",
"player",
"player_id",
"Plays referencing non-existent pitchers",
),
(
"decision",
"game_id",
"stratgame",
"id",
"Decisions referencing non-existent games",
),
(
"decision",
"pitcher_id",
"player",
"player_id",
"Decisions referencing non-existent pitchers",
),
(
"battingstat",
"card_id",
"card",
"id",
"Batting stats referencing non-existent cards",
),
(
"pitchingstat",
"card_id",
"card",
"id",
"Pitching stats referencing non-existent cards",
),
(
"battingcard",
"player_id",
"player",
"player_id",
"Batting cards referencing non-existent players",
),
(
"pitchingcard",
"player_id",
"player",
"player_id",
"Pitching cards referencing non-existent players",
),
(
"cardposition",
"player_id",
"player",
"player_id",
"Card positions referencing non-existent players",
),
(
"paperdex",
"player_id",
"player",
"player_id",
"Paperdex entries referencing non-existent players",
),
(
"paperdex",
"team_id",
"team",
"id",
"Paperdex entries referencing non-existent teams",
),
(
"gauntletrun",
"team_id",
"team",
"id",
"Gauntlet runs referencing non-existent teams",
),
]
for child_table, child_field, parent_table, parent_field, description in fk_checks:
try:
# Use explicit column names to avoid ambiguity
query = f"""
SELECT COUNT(*)
FROM {child_table} c
LEFT JOIN {parent_table} p ON c.{child_field} = p.{parent_field}
WHERE c.{child_field} IS NOT NULL AND p.{parent_field} IS NULL
"""
cursor = conn.execute(query)
orphan_count = cursor.fetchone()[0]
if orphan_count > 0:
# Get sample orphaned IDs
sample_query = f"""
SELECT c.{child_field}
FROM {child_table} c
LEFT JOIN {parent_table} p ON c.{child_field} = p.{parent_field}
WHERE c.{child_field} IS NOT NULL AND p.{parent_field} IS NULL
LIMIT 5
"""
sample_cursor = conn.execute(sample_query)
sample_ids = [row[0] for row in sample_cursor.fetchall()]
issues.append(
{
"type": "ORPHANED_FK",
"severity": "HIGH",
"child_table": child_table,
"child_field": child_field,
"parent_table": parent_table,
"parent_field": parent_field,
"description": description,
"orphan_count": orphan_count,
"sample_orphan_ids": sample_ids,
}
)
except sqlite3.OperationalError as e:
# Table or column doesn't exist
print(
f"Warning: Could not check {child_table}.{child_field} -> {parent_table}.{parent_field}: {e}"
)
return issues
def check_varchar_lengths(conn: sqlite3.Connection) -> list:
"""
Check max lengths of string fields.
PostgreSQL VARCHAR fields have stricter length limits than SQLite.
"""
issues = []
# Fields to check with expected max lengths
varchar_checks = [
# (table, field, expected_max_length, description)
("player", "p_name", 255, "Player name"),
("player", "image", 1000, "Player image URL"),
("player", "image2", 1000, "Player image2 URL"),
("player", "headshot", 500, "Player headshot URL"),
("player", "vanity_card", 500, "Player vanity card"),
("player", "strat_code", 100, "Strat code"),
("player", "bbref_id", 50, "Baseball Reference ID"),
("player", "description", 1000, "Player description"),
("team", "abbrev", 10, "Team abbreviation"),
("team", "sname", 100, "Team short name"),
("team", "lname", 255, "Team long name"),
("notification", "title", 255, "Notification title"),
("notification", "message", 2000, "Notification message"),
]
for table, field, expected_max, description in varchar_checks:
try:
cursor = conn.execute(f"SELECT MAX(LENGTH({field})) FROM {table}")
max_length = cursor.fetchone()[0]
if max_length and max_length > expected_max:
# Get sample of long values
sample_cursor = conn.execute(
f"SELECT {field} FROM {table} WHERE LENGTH({field}) > {expected_max} LIMIT 3"
)
samples = [
row[0][:100] + "..." if row[0] else None
for row in sample_cursor.fetchall()
]
issues.append(
{
"type": "VARCHAR_TOO_LONG",
"severity": "HIGH",
"table": table,
"field": field,
"description": description,
"max_found": max_length,
"expected_max": expected_max,
"sample_values": samples,
}
)
elif max_length:
# Info: report actual max for reference
pass
except sqlite3.OperationalError:
pass
return issues
def check_duplicate_primary_keys(conn: sqlite3.Connection) -> list:
"""
Check for duplicate primary keys (shouldn't happen but good to verify).
"""
issues = []
pk_checks = [
("player", "player_id"),
("team", "id"),
("card", "id"),
("stratgame", "id"),
("stratplay", "id"),
]
for table, pk_field in pk_checks:
try:
cursor = conn.execute(f"""
SELECT {pk_field}, COUNT(*) as cnt
FROM {table}
GROUP BY {pk_field}
HAVING COUNT(*) > 1
""")
duplicates = cursor.fetchall()
if duplicates:
issues.append(
{
"type": "DUPLICATE_PK",
"severity": "CRITICAL",
"table": table,
"pk_field": pk_field,
"duplicate_ids": [row[0] for row in duplicates[:10]],
"duplicate_count": len(duplicates),
}
)
except sqlite3.OperationalError:
pass
return issues
def check_unique_constraints(conn: sqlite3.Connection) -> list:
"""
Check that composite unique constraints would be satisfied.
These are the indexes that on_conflict_replace() depends on.
"""
issues = []
unique_checks = [
# (table, fields, description)
("battingcard", ["player_id", "variant"], "Batting card unique constraint"),
("pitchingcard", ["player_id", "variant"], "Pitching card unique constraint"),
(
"cardposition",
["player_id", "variant", "position"],
"Card position unique constraint",
),
(
"battingcardratings",
["battingcard_id", "vs_hand"],
"Batting card ratings unique constraint",
),
(
"pitchingcardratings",
["pitchingcard_id", "vs_hand"],
"Pitching card ratings unique constraint",
),
]
for table, fields, description in unique_checks:
try:
fields_str = ", ".join(fields)
cursor = conn.execute(f"""
SELECT {fields_str}, COUNT(*) as cnt
FROM {table}
GROUP BY {fields_str}
HAVING COUNT(*) > 1
""")
duplicates = cursor.fetchall()
if duplicates:
issues.append(
{
"type": "DUPLICATE_UNIQUE",
"severity": "HIGH",
"table": table,
"fields": fields,
"description": description,
"duplicate_count": len(duplicates),
"sample_duplicates": [
dict(zip(fields + ["count"], row)) for row in duplicates[:5]
],
}
)
except sqlite3.OperationalError as e:
print(f"Warning: Could not check unique constraint on {table}: {e}")
return issues
def generate_report(counts: dict, issues: list, output_path: str = None) -> str:
"""Generate audit report."""
report = {
"generated_at": datetime.now().isoformat(),
"summary": {
"total_tables": len(counts),
"total_records": sum(counts.values()),
"total_issues": len(issues),
"critical_issues": len(
[i for i in issues if i.get("severity") == "CRITICAL"]
),
"high_issues": len([i for i in issues if i.get("severity") == "HIGH"]),
"medium_issues": len([i for i in issues if i.get("severity") == "MEDIUM"]),
},
"table_counts": counts,
"issues": issues,
}
if output_path:
with open(output_path, "w") as f:
json.dump(report, f, indent=2)
return json.dumps(report, indent=2)
def main():
parser = argparse.ArgumentParser(
description="Audit SQLite database before PostgreSQL migration"
)
parser.add_argument(
"--db-path",
type=str,
default="storage/pd_master.db",
help="Path to SQLite database",
)
parser.add_argument("--output", type=str, help="Output JSON file for report")
parser.add_argument(
"--fix", action="store_true", help="Apply safe fixes (not implemented)"
)
args = parser.parse_args()
print("=" * 60)
print("Paper Dynasty SQLite Data Integrity Audit")
print("=" * 60)
if not Path(args.db_path).exists():
print(f"ERROR: Database not found: {args.db_path}")
sys.exit(1)
conn = connect_db(args.db_path)
# Run checks
print("\n1. Getting table record counts...")
counts = get_table_record_counts(conn)
print(f" Found {len(counts)} tables with {sum(counts.values()):,} total records")
print("\n2. Checking for NULL values...")
null_issues = check_null_values(conn)
print(f" Found {len(null_issues)} NULL value issues")
print("\n3. Checking for orphaned foreign keys...")
fk_issues = check_orphaned_foreign_keys(conn)
print(f" Found {len(fk_issues)} orphaned FK issues")
print("\n4. Checking VARCHAR lengths...")
varchar_issues = check_varchar_lengths(conn)
print(f" Found {len(varchar_issues)} VARCHAR length issues")
print("\n5. Checking for duplicate primary keys...")
pk_issues = check_duplicate_primary_keys(conn)
print(f" Found {len(pk_issues)} duplicate PK issues")
print("\n6. Checking unique constraints...")
unique_issues = check_unique_constraints(conn)
print(f" Found {len(unique_issues)} unique constraint issues")
# Combine all issues
all_issues = null_issues + fk_issues + varchar_issues + pk_issues + unique_issues
# Generate report
print("\n" + "=" * 60)
print("AUDIT RESULTS")
print("=" * 60)
if args.output:
report = generate_report(counts, all_issues, args.output)
print(f"Full report saved to: {args.output}")
else:
report = generate_report(counts, all_issues)
# Print summary
print(f"\nTotal Issues: {len(all_issues)}")
critical = [i for i in all_issues if i.get("severity") == "CRITICAL"]
high = [i for i in all_issues if i.get("severity") == "HIGH"]
medium = [i for i in all_issues if i.get("severity") == "MEDIUM"]
if critical:
print(f"\n CRITICAL ({len(critical)}):")
for issue in critical:
print(
f" - {issue['type']}: {issue.get('description', issue.get('table', 'Unknown'))}"
)
if high:
print(f"\n HIGH ({len(high)}):")
for issue in high:
desc = issue.get(
"description",
f"{issue.get('table', 'Unknown')}.{issue.get('field', 'Unknown')}",
)
print(f" - {issue['type']}: {desc}")
if medium:
print(f"\n MEDIUM ({len(medium)}):")
for issue in medium:
desc = issue.get(
"description",
f"{issue.get('table', 'Unknown')}.{issue.get('field', 'Unknown')}",
)
print(f" - {issue['type']}: {desc}")
# Table counts
print("\n" + "-" * 60)
print("TABLE RECORD COUNTS (for baseline comparison)")
print("-" * 60)
for table, count in sorted(counts.items()):
print(f" {table:30} {count:>10,}")
conn.close()
# Exit code based on issues
if critical:
print("\n CRITICAL ISSUES FOUND - Migration may fail!")
sys.exit(2)
elif high:
print("\n HIGH PRIORITY ISSUES FOUND - Review before migration")
sys.exit(1)
else:
print("\n No critical issues found - Ready for migration")
sys.exit(0)
if __name__ == "__main__":
main()