- Add db_helpers.py with cross-database upsert functions for SQLite/PostgreSQL - Replace 12 on_conflict_replace() calls with PostgreSQL-compatible upserts - Add unique indexes: StratPlay(game, play_num), Decision(game, pitcher) - Add max_length to Team model fields (abbrev, sname, lname) - Fix boolean comparison in teams.py (== 0/1 to == False/True) - Create migrate_to_postgres.py with ID-preserving migration logic - Create audit_sqlite.py for pre-migration data integrity checks - Add PROJECT_PLAN.json for migration tracking - Add .secrets/ to .gitignore for credentials Audit results: 658,963 records across 29 tables, 2,390 orphaned stats (expected) Based on Major Domo migration lessons learned (33 issues resolved there)
565 lines
18 KiB
Python
Executable File
565 lines
18 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Paper Dynasty SQLite Data Integrity Audit
|
|
|
|
Pre-migration script to identify potential issues before migrating to PostgreSQL.
|
|
Based on issues discovered during Major Domo migration (August 2025).
|
|
|
|
Checks for:
|
|
1. NULL values in fields that will be NOT NULL in PostgreSQL
|
|
2. Orphaned foreign key records
|
|
3. VARCHAR field max lengths (PostgreSQL is stricter)
|
|
4. Record counts for baseline comparison
|
|
5. Primary key gaps or duplicates
|
|
|
|
Usage:
|
|
python scripts/audit_sqlite.py
|
|
python scripts/audit_sqlite.py --fix # Apply safe fixes
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import sqlite3
|
|
import sys
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
|
|
def connect_db(db_path: str) -> sqlite3.Connection:
|
|
"""Connect to SQLite database."""
|
|
conn = sqlite3.connect(db_path)
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
|
|
def get_table_record_counts(conn: sqlite3.Connection) -> dict:
|
|
"""Get record counts for all tables."""
|
|
counts = {}
|
|
cursor = conn.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' ORDER BY name"
|
|
)
|
|
for row in cursor:
|
|
table_name = row["name"]
|
|
count_cursor = conn.execute(f"SELECT COUNT(*) FROM {table_name}")
|
|
counts[table_name] = count_cursor.fetchone()[0]
|
|
return counts
|
|
|
|
|
|
def check_null_values(conn: sqlite3.Connection) -> list:
|
|
"""
|
|
Check for NULL values in fields that should not be null.
|
|
|
|
These are the fields that Major Domo found issues with.
|
|
"""
|
|
issues = []
|
|
|
|
# Fields to check - based on Major Domo experience
|
|
null_checks = [
|
|
# (table, field, description)
|
|
("team", "abbrev", "Team abbreviation"),
|
|
("team", "sname", "Team short name"),
|
|
("team", "lname", "Team long name"),
|
|
("player", "p_name", "Player name"),
|
|
("player", "image", "Player image URL"),
|
|
("card", "player_id", "Card player reference"),
|
|
("stratplay", "game_id", "Play game reference"),
|
|
("stratplay", "pitcher_id", "Play pitcher reference"),
|
|
("decision", "game_id", "Decision game reference"),
|
|
("decision", "pitcher_id", "Decision pitcher reference"),
|
|
]
|
|
|
|
for table, field, description in null_checks:
|
|
try:
|
|
cursor = conn.execute(f"SELECT COUNT(*) FROM {table} WHERE {field} IS NULL")
|
|
null_count = cursor.fetchone()[0]
|
|
if null_count > 0:
|
|
cursor = conn.execute(f"SELECT COUNT(*) FROM {table}")
|
|
total_count = cursor.fetchone()[0]
|
|
issues.append(
|
|
{
|
|
"type": "NULL_VALUE",
|
|
"severity": "HIGH"
|
|
if null_count > total_count * 0.1
|
|
else "MEDIUM",
|
|
"table": table,
|
|
"field": field,
|
|
"description": description,
|
|
"null_count": null_count,
|
|
"total_count": total_count,
|
|
"percentage": round(null_count / total_count * 100, 2)
|
|
if total_count > 0
|
|
else 0,
|
|
}
|
|
)
|
|
except sqlite3.OperationalError:
|
|
# Table or column doesn't exist
|
|
pass
|
|
|
|
return issues
|
|
|
|
|
|
def check_orphaned_foreign_keys(conn: sqlite3.Connection) -> list:
|
|
"""
|
|
Check for orphaned foreign key records.
|
|
|
|
These will fail with foreign key constraint violations in PostgreSQL.
|
|
"""
|
|
issues = []
|
|
|
|
# Foreign key relationships to check
|
|
fk_checks = [
|
|
# (child_table, child_field, parent_table, parent_field, description)
|
|
(
|
|
"card",
|
|
"player_id",
|
|
"player",
|
|
"player_id",
|
|
"Cards referencing non-existent players",
|
|
),
|
|
("card", "team_id", "team", "id", "Cards referencing non-existent teams"),
|
|
(
|
|
"stratplay",
|
|
"game_id",
|
|
"stratgame",
|
|
"id",
|
|
"Plays referencing non-existent games",
|
|
),
|
|
(
|
|
"stratplay",
|
|
"batter_id",
|
|
"player",
|
|
"player_id",
|
|
"Plays referencing non-existent batters",
|
|
),
|
|
(
|
|
"stratplay",
|
|
"pitcher_id",
|
|
"player",
|
|
"player_id",
|
|
"Plays referencing non-existent pitchers",
|
|
),
|
|
(
|
|
"decision",
|
|
"game_id",
|
|
"stratgame",
|
|
"id",
|
|
"Decisions referencing non-existent games",
|
|
),
|
|
(
|
|
"decision",
|
|
"pitcher_id",
|
|
"player",
|
|
"player_id",
|
|
"Decisions referencing non-existent pitchers",
|
|
),
|
|
(
|
|
"battingstat",
|
|
"card_id",
|
|
"card",
|
|
"id",
|
|
"Batting stats referencing non-existent cards",
|
|
),
|
|
(
|
|
"pitchingstat",
|
|
"card_id",
|
|
"card",
|
|
"id",
|
|
"Pitching stats referencing non-existent cards",
|
|
),
|
|
(
|
|
"battingcard",
|
|
"player_id",
|
|
"player",
|
|
"player_id",
|
|
"Batting cards referencing non-existent players",
|
|
),
|
|
(
|
|
"pitchingcard",
|
|
"player_id",
|
|
"player",
|
|
"player_id",
|
|
"Pitching cards referencing non-existent players",
|
|
),
|
|
(
|
|
"cardposition",
|
|
"player_id",
|
|
"player",
|
|
"player_id",
|
|
"Card positions referencing non-existent players",
|
|
),
|
|
(
|
|
"paperdex",
|
|
"player_id",
|
|
"player",
|
|
"player_id",
|
|
"Paperdex entries referencing non-existent players",
|
|
),
|
|
(
|
|
"paperdex",
|
|
"team_id",
|
|
"team",
|
|
"id",
|
|
"Paperdex entries referencing non-existent teams",
|
|
),
|
|
(
|
|
"gauntletrun",
|
|
"team_id",
|
|
"team",
|
|
"id",
|
|
"Gauntlet runs referencing non-existent teams",
|
|
),
|
|
]
|
|
|
|
for child_table, child_field, parent_table, parent_field, description in fk_checks:
|
|
try:
|
|
# Use explicit column names to avoid ambiguity
|
|
query = f"""
|
|
SELECT COUNT(*)
|
|
FROM {child_table} c
|
|
LEFT JOIN {parent_table} p ON c.{child_field} = p.{parent_field}
|
|
WHERE c.{child_field} IS NOT NULL AND p.{parent_field} IS NULL
|
|
"""
|
|
cursor = conn.execute(query)
|
|
orphan_count = cursor.fetchone()[0]
|
|
|
|
if orphan_count > 0:
|
|
# Get sample orphaned IDs
|
|
sample_query = f"""
|
|
SELECT c.{child_field}
|
|
FROM {child_table} c
|
|
LEFT JOIN {parent_table} p ON c.{child_field} = p.{parent_field}
|
|
WHERE c.{child_field} IS NOT NULL AND p.{parent_field} IS NULL
|
|
LIMIT 5
|
|
"""
|
|
sample_cursor = conn.execute(sample_query)
|
|
sample_ids = [row[0] for row in sample_cursor.fetchall()]
|
|
|
|
issues.append(
|
|
{
|
|
"type": "ORPHANED_FK",
|
|
"severity": "HIGH",
|
|
"child_table": child_table,
|
|
"child_field": child_field,
|
|
"parent_table": parent_table,
|
|
"parent_field": parent_field,
|
|
"description": description,
|
|
"orphan_count": orphan_count,
|
|
"sample_orphan_ids": sample_ids,
|
|
}
|
|
)
|
|
except sqlite3.OperationalError as e:
|
|
# Table or column doesn't exist
|
|
print(
|
|
f"Warning: Could not check {child_table}.{child_field} -> {parent_table}.{parent_field}: {e}"
|
|
)
|
|
|
|
return issues
|
|
|
|
|
|
def check_varchar_lengths(conn: sqlite3.Connection) -> list:
|
|
"""
|
|
Check max lengths of string fields.
|
|
|
|
PostgreSQL VARCHAR fields have stricter length limits than SQLite.
|
|
"""
|
|
issues = []
|
|
|
|
# Fields to check with expected max lengths
|
|
varchar_checks = [
|
|
# (table, field, expected_max_length, description)
|
|
("player", "p_name", 255, "Player name"),
|
|
("player", "image", 1000, "Player image URL"),
|
|
("player", "image2", 1000, "Player image2 URL"),
|
|
("player", "headshot", 500, "Player headshot URL"),
|
|
("player", "vanity_card", 500, "Player vanity card"),
|
|
("player", "strat_code", 100, "Strat code"),
|
|
("player", "bbref_id", 50, "Baseball Reference ID"),
|
|
("player", "description", 1000, "Player description"),
|
|
("team", "abbrev", 10, "Team abbreviation"),
|
|
("team", "sname", 100, "Team short name"),
|
|
("team", "lname", 255, "Team long name"),
|
|
("notification", "title", 255, "Notification title"),
|
|
("notification", "message", 2000, "Notification message"),
|
|
]
|
|
|
|
for table, field, expected_max, description in varchar_checks:
|
|
try:
|
|
cursor = conn.execute(f"SELECT MAX(LENGTH({field})) FROM {table}")
|
|
max_length = cursor.fetchone()[0]
|
|
|
|
if max_length and max_length > expected_max:
|
|
# Get sample of long values
|
|
sample_cursor = conn.execute(
|
|
f"SELECT {field} FROM {table} WHERE LENGTH({field}) > {expected_max} LIMIT 3"
|
|
)
|
|
samples = [
|
|
row[0][:100] + "..." if row[0] else None
|
|
for row in sample_cursor.fetchall()
|
|
]
|
|
|
|
issues.append(
|
|
{
|
|
"type": "VARCHAR_TOO_LONG",
|
|
"severity": "HIGH",
|
|
"table": table,
|
|
"field": field,
|
|
"description": description,
|
|
"max_found": max_length,
|
|
"expected_max": expected_max,
|
|
"sample_values": samples,
|
|
}
|
|
)
|
|
elif max_length:
|
|
# Info: report actual max for reference
|
|
pass
|
|
except sqlite3.OperationalError:
|
|
pass
|
|
|
|
return issues
|
|
|
|
|
|
def check_duplicate_primary_keys(conn: sqlite3.Connection) -> list:
|
|
"""
|
|
Check for duplicate primary keys (shouldn't happen but good to verify).
|
|
"""
|
|
issues = []
|
|
|
|
pk_checks = [
|
|
("player", "player_id"),
|
|
("team", "id"),
|
|
("card", "id"),
|
|
("stratgame", "id"),
|
|
("stratplay", "id"),
|
|
]
|
|
|
|
for table, pk_field in pk_checks:
|
|
try:
|
|
cursor = conn.execute(f"""
|
|
SELECT {pk_field}, COUNT(*) as cnt
|
|
FROM {table}
|
|
GROUP BY {pk_field}
|
|
HAVING COUNT(*) > 1
|
|
""")
|
|
duplicates = cursor.fetchall()
|
|
|
|
if duplicates:
|
|
issues.append(
|
|
{
|
|
"type": "DUPLICATE_PK",
|
|
"severity": "CRITICAL",
|
|
"table": table,
|
|
"pk_field": pk_field,
|
|
"duplicate_ids": [row[0] for row in duplicates[:10]],
|
|
"duplicate_count": len(duplicates),
|
|
}
|
|
)
|
|
except sqlite3.OperationalError:
|
|
pass
|
|
|
|
return issues
|
|
|
|
|
|
def check_unique_constraints(conn: sqlite3.Connection) -> list:
|
|
"""
|
|
Check that composite unique constraints would be satisfied.
|
|
These are the indexes that on_conflict_replace() depends on.
|
|
"""
|
|
issues = []
|
|
|
|
unique_checks = [
|
|
# (table, fields, description)
|
|
("battingcard", ["player_id", "variant"], "Batting card unique constraint"),
|
|
("pitchingcard", ["player_id", "variant"], "Pitching card unique constraint"),
|
|
(
|
|
"cardposition",
|
|
["player_id", "variant", "position"],
|
|
"Card position unique constraint",
|
|
),
|
|
(
|
|
"battingcardratings",
|
|
["battingcard_id", "vs_hand"],
|
|
"Batting card ratings unique constraint",
|
|
),
|
|
(
|
|
"pitchingcardratings",
|
|
["pitchingcard_id", "vs_hand"],
|
|
"Pitching card ratings unique constraint",
|
|
),
|
|
]
|
|
|
|
for table, fields, description in unique_checks:
|
|
try:
|
|
fields_str = ", ".join(fields)
|
|
cursor = conn.execute(f"""
|
|
SELECT {fields_str}, COUNT(*) as cnt
|
|
FROM {table}
|
|
GROUP BY {fields_str}
|
|
HAVING COUNT(*) > 1
|
|
""")
|
|
duplicates = cursor.fetchall()
|
|
|
|
if duplicates:
|
|
issues.append(
|
|
{
|
|
"type": "DUPLICATE_UNIQUE",
|
|
"severity": "HIGH",
|
|
"table": table,
|
|
"fields": fields,
|
|
"description": description,
|
|
"duplicate_count": len(duplicates),
|
|
"sample_duplicates": [
|
|
dict(zip(fields + ["count"], row)) for row in duplicates[:5]
|
|
],
|
|
}
|
|
)
|
|
except sqlite3.OperationalError as e:
|
|
print(f"Warning: Could not check unique constraint on {table}: {e}")
|
|
|
|
return issues
|
|
|
|
|
|
def generate_report(counts: dict, issues: list, output_path: str = None) -> str:
|
|
"""Generate audit report."""
|
|
report = {
|
|
"generated_at": datetime.now().isoformat(),
|
|
"summary": {
|
|
"total_tables": len(counts),
|
|
"total_records": sum(counts.values()),
|
|
"total_issues": len(issues),
|
|
"critical_issues": len(
|
|
[i for i in issues if i.get("severity") == "CRITICAL"]
|
|
),
|
|
"high_issues": len([i for i in issues if i.get("severity") == "HIGH"]),
|
|
"medium_issues": len([i for i in issues if i.get("severity") == "MEDIUM"]),
|
|
},
|
|
"table_counts": counts,
|
|
"issues": issues,
|
|
}
|
|
|
|
if output_path:
|
|
with open(output_path, "w") as f:
|
|
json.dump(report, f, indent=2)
|
|
|
|
return json.dumps(report, indent=2)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Audit SQLite database before PostgreSQL migration"
|
|
)
|
|
parser.add_argument(
|
|
"--db-path",
|
|
type=str,
|
|
default="storage/pd_master.db",
|
|
help="Path to SQLite database",
|
|
)
|
|
parser.add_argument("--output", type=str, help="Output JSON file for report")
|
|
parser.add_argument(
|
|
"--fix", action="store_true", help="Apply safe fixes (not implemented)"
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
print("=" * 60)
|
|
print("Paper Dynasty SQLite Data Integrity Audit")
|
|
print("=" * 60)
|
|
|
|
if not Path(args.db_path).exists():
|
|
print(f"ERROR: Database not found: {args.db_path}")
|
|
sys.exit(1)
|
|
|
|
conn = connect_db(args.db_path)
|
|
|
|
# Run checks
|
|
print("\n1. Getting table record counts...")
|
|
counts = get_table_record_counts(conn)
|
|
print(f" Found {len(counts)} tables with {sum(counts.values()):,} total records")
|
|
|
|
print("\n2. Checking for NULL values...")
|
|
null_issues = check_null_values(conn)
|
|
print(f" Found {len(null_issues)} NULL value issues")
|
|
|
|
print("\n3. Checking for orphaned foreign keys...")
|
|
fk_issues = check_orphaned_foreign_keys(conn)
|
|
print(f" Found {len(fk_issues)} orphaned FK issues")
|
|
|
|
print("\n4. Checking VARCHAR lengths...")
|
|
varchar_issues = check_varchar_lengths(conn)
|
|
print(f" Found {len(varchar_issues)} VARCHAR length issues")
|
|
|
|
print("\n5. Checking for duplicate primary keys...")
|
|
pk_issues = check_duplicate_primary_keys(conn)
|
|
print(f" Found {len(pk_issues)} duplicate PK issues")
|
|
|
|
print("\n6. Checking unique constraints...")
|
|
unique_issues = check_unique_constraints(conn)
|
|
print(f" Found {len(unique_issues)} unique constraint issues")
|
|
|
|
# Combine all issues
|
|
all_issues = null_issues + fk_issues + varchar_issues + pk_issues + unique_issues
|
|
|
|
# Generate report
|
|
print("\n" + "=" * 60)
|
|
print("AUDIT RESULTS")
|
|
print("=" * 60)
|
|
|
|
if args.output:
|
|
report = generate_report(counts, all_issues, args.output)
|
|
print(f"Full report saved to: {args.output}")
|
|
else:
|
|
report = generate_report(counts, all_issues)
|
|
|
|
# Print summary
|
|
print(f"\nTotal Issues: {len(all_issues)}")
|
|
critical = [i for i in all_issues if i.get("severity") == "CRITICAL"]
|
|
high = [i for i in all_issues if i.get("severity") == "HIGH"]
|
|
medium = [i for i in all_issues if i.get("severity") == "MEDIUM"]
|
|
|
|
if critical:
|
|
print(f"\n CRITICAL ({len(critical)}):")
|
|
for issue in critical:
|
|
print(
|
|
f" - {issue['type']}: {issue.get('description', issue.get('table', 'Unknown'))}"
|
|
)
|
|
|
|
if high:
|
|
print(f"\n HIGH ({len(high)}):")
|
|
for issue in high:
|
|
desc = issue.get(
|
|
"description",
|
|
f"{issue.get('table', 'Unknown')}.{issue.get('field', 'Unknown')}",
|
|
)
|
|
print(f" - {issue['type']}: {desc}")
|
|
|
|
if medium:
|
|
print(f"\n MEDIUM ({len(medium)}):")
|
|
for issue in medium:
|
|
desc = issue.get(
|
|
"description",
|
|
f"{issue.get('table', 'Unknown')}.{issue.get('field', 'Unknown')}",
|
|
)
|
|
print(f" - {issue['type']}: {desc}")
|
|
|
|
# Table counts
|
|
print("\n" + "-" * 60)
|
|
print("TABLE RECORD COUNTS (for baseline comparison)")
|
|
print("-" * 60)
|
|
for table, count in sorted(counts.items()):
|
|
print(f" {table:30} {count:>10,}")
|
|
|
|
conn.close()
|
|
|
|
# Exit code based on issues
|
|
if critical:
|
|
print("\n CRITICAL ISSUES FOUND - Migration may fail!")
|
|
sys.exit(2)
|
|
elif high:
|
|
print("\n HIGH PRIORITY ISSUES FOUND - Review before migration")
|
|
sys.exit(1)
|
|
else:
|
|
print("\n No critical issues found - Ready for migration")
|
|
sys.exit(0)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|