major-domo-database/.claude/sqlite-to-postgres/botched-sbaplayer-matching/find_high_risk_matches.py
Cal Corum 7130a1fd43 Postgres Migration
Migration documentation and scripts
2025-08-25 07:18:31 -05:00

440 lines
17 KiB
Python

#!/usr/bin/env python3
"""
Find high-risk player name matches that could lead to incorrect linking
Identifies cases like "Mike Trout" vs "Michael Trout" or "Luis V Garcia" vs "Luis H Garcia"
"""
import json
import csv
import logging
from dataclasses import dataclass
from typing import Dict, List, Set, Optional, Tuple
from collections import defaultdict
import difflib
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/tmp/high_risk_matches.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger('HighRiskMatches')
@dataclass
class PlayerRecord:
id: int
name: str
season: int
bbref_id: Optional[str] = None
sbaplayer_id: Optional[int] = None
@dataclass
class SbaPlayerRecord:
id: int
first_name: str
last_name: str
key_bbref: Optional[str] = None
key_fangraphs: Optional[int] = None
key_mlbam: Optional[int] = None
key_retro: Optional[str] = None
def normalize_name(name: str) -> str:
"""Normalize a name for comparison (remove punctuation, lowercase, etc.)"""
import re
# Remove periods, apostrophes, hyphens
normalized = re.sub(r"['\.\-]", "", name.lower())
# Replace multiple spaces with single space
normalized = re.sub(r'\s+', ' ', normalized)
# Strip whitespace
normalized = normalized.strip()
return normalized
def get_name_variants(name: str) -> Set[str]:
"""Generate common variants of a name"""
variants = set()
normalized = normalize_name(name)
variants.add(normalized)
# Split into parts
parts = normalized.split()
if len(parts) >= 2:
first_part = parts[0]
remaining = ' '.join(parts[1:])
# Common nickname patterns
nickname_map = {
'michael': ['mike', 'micky', 'mickey'],
'mike': ['michael'],
'william': ['will', 'bill', 'billy'],
'will': ['william'],
'bill': ['william'],
'robert': ['rob', 'bob', 'bobby'],
'rob': ['robert'],
'bob': ['robert'],
'james': ['jim', 'jimmy'],
'jim': ['james'],
'thomas': ['tom', 'tommy'],
'tom': ['thomas'],
'joseph': ['joe', 'joey'],
'joe': ['joseph'],
'christopher': ['chris'],
'chris': ['christopher'],
'anthony': ['tony'],
'tony': ['anthony'],
'andrew': ['andy', 'drew'],
'andy': ['andrew'],
'drew': ['andrew'],
'jonathan': ['jon'],
'jon': ['jonathan'],
'matthew': ['matt'],
'matt': ['matthew'],
'nicholas': ['nick'],
'nick': ['nicholas'],
'alexander': ['alex'],
'alex': ['alexander'],
'benjamin': ['ben'],
'ben': ['benjamin'],
'samuel': ['sam'],
'sam': ['samuel'],
'daniel': ['dan', 'danny'],
'dan': ['daniel'],
'danny': ['daniel'],
'david': ['dave'],
'dave': ['david'],
'edward': ['ed', 'eddie'],
'ed': ['edward'],
'eddie': ['edward']
}
# Add nickname variants
if first_part in nickname_map:
for nickname in nickname_map[first_part]:
variants.add(f"{nickname} {remaining}")
# Handle middle initial variations (e.g., "Luis V Garcia" vs "Luis H Garcia")
if len(parts) == 3 and len(parts[1]) == 1:
# Middle initial pattern
first_name = parts[0]
last_name = parts[2]
# Add version without middle initial
variants.add(f"{first_name} {last_name}")
# Add pattern to catch other middle initials
variants.add(f"{first_name} _ {last_name}")
return variants
def calculate_name_similarity(name1: str, name2: str) -> float:
"""Calculate similarity between two names using difflib"""
norm1 = normalize_name(name1)
norm2 = normalize_name(name2)
return difflib.SequenceMatcher(None, norm1, norm2).ratio()
def load_cached_data():
"""Load all cached data"""
# Load SbaPlayers
logger.info("Loading cached SbaPlayer data...")
with open('/tmp/sbaplayers.json', 'r') as f:
sbaplayer_data = json.load(f)
sbaplayers = []
for data in sbaplayer_data:
sbaplayers.append(SbaPlayerRecord(**data))
# Load all player seasons
logger.info("Loading cached player data...")
all_players = []
for season in range(1, 13):
cache_file = f"/tmp/players_season_{season}.json"
try:
with open(cache_file, 'r') as f:
season_data = json.load(f)
for data in season_data:
all_players.append(PlayerRecord(**data))
except FileNotFoundError:
logger.error(f"Cache file for season {season} not found.")
return None, None
return all_players, sbaplayers
def find_high_risk_matches():
"""Find potentially problematic name matches"""
all_players, sbaplayers = load_cached_data()
if not all_players or not sbaplayers:
logger.error("Failed to load cached data.")
return
logger.info(f"Analyzing {len(all_players)} player records and {len(sbaplayers)} SbaPlayers")
# Create name maps
sbaplayer_names = {} # normalized name -> SbaPlayerRecord
player_names = defaultdict(list) # normalized name -> list of PlayerRecords
for sba in sbaplayers:
full_name = f"{sba.first_name} {sba.last_name}"
normalized = normalize_name(full_name)
sbaplayer_names[normalized] = sba
# Group unique players by name (using most recent season as representative)
unique_players = {}
players_by_bbref = defaultdict(list)
players_without_bbref = defaultdict(list)
for player in all_players:
if player.bbref_id:
players_by_bbref[player.bbref_id].append(player)
else:
players_without_bbref[player.name].append(player)
# Get representative players
for bbref_id, players in players_by_bbref.items():
representative = max(players, key=lambda p: p.season)
unique_players[representative.name] = representative
for name, players in players_without_bbref.items():
representative = max(players, key=lambda p: p.season)
unique_players[name] = representative
logger.info(f"Found {len(unique_players)} unique players")
# Find high-risk matches
high_risk_matches = []
# 1. Multiple SbaPlayers with very similar names
sba_name_groups = defaultdict(list)
for sba in sbaplayers:
full_name = f"{sba.first_name} {sba.last_name}"
# Group by last name + first initial for initial clustering
key = f"{sba.last_name.lower()} {sba.first_name[0].lower()}"
sba_name_groups[key].append((sba, full_name))
logger.info("Finding similar SbaPlayer names...")
sba_conflicts = []
for key, sba_list in sba_name_groups.items():
if len(sba_list) > 1:
# Check if any are very similar
for i, (sba1, name1) in enumerate(sba_list):
for j, (sba2, name2) in enumerate(sba_list[i+1:], i+1):
similarity = calculate_name_similarity(name1, name2)
if similarity > 0.8: # Very similar names
sba_conflicts.append({
'type': 'sbaplayer_conflict',
'sba1_id': sba1.id,
'sba1_name': name1,
'sba1_bbref': sba1.key_bbref,
'sba2_id': sba2.id,
'sba2_name': name2,
'sba2_bbref': sba2.key_bbref,
'similarity': similarity,
'risk_reason': f'Very similar SbaPlayer names (similarity: {similarity:.3f})'
})
logger.info(f"Found {len(sba_conflicts)} SbaPlayer name conflicts")
# 2. Players that could match multiple SbaPlayers
logger.info("Finding players with ambiguous SbaPlayer matches...")
player_conflicts = []
for player_name, player in unique_players.items():
# Skip players with bbref_id (they have definitive matching)
if player.bbref_id:
continue
normalized_player = normalize_name(player_name)
potential_matches = []
# Find all potential SbaPlayer matches
for sba in sbaplayers:
sba_name = f"{sba.first_name} {sba.last_name}"
similarity = calculate_name_similarity(player_name, sba_name)
if similarity > 0.7: # Potential match threshold
potential_matches.append((sba, sba_name, similarity))
# If multiple potential matches, this is high risk
if len(potential_matches) > 1:
# Sort by similarity
potential_matches.sort(key=lambda x: x[2], reverse=True)
player_conflicts.append({
'type': 'player_ambiguous_match',
'player_id': player.id,
'player_name': player_name,
'player_seasons': [player.season], # We only have representative
'potential_matches': [
{
'sba_id': sba.id,
'sba_name': sba_name,
'sba_bbref': sba.key_bbref,
'similarity': sim
}
for sba, sba_name, sim in potential_matches[:5] # Top 5 matches
],
'risk_reason': f'Player could match {len(potential_matches)} different SbaPlayers'
})
logger.info(f"Found {len(player_conflicts)} players with ambiguous matches")
# 3. Middle initial conflicts (Luis V Garcia vs Luis H Garcia type issues)
logger.info("Finding middle initial conflicts...")
middle_initial_conflicts = []
# Group players by "FirstName LastName" pattern (ignoring middle initial)
name_groups = defaultdict(list)
for player_name, player in unique_players.items():
if player.bbref_id:
continue
parts = player_name.split()
if len(parts) == 3 and len(parts[1]) == 1:
# Has middle initial pattern
key = f"{parts[0]} {parts[2]}".lower()
name_groups[key].append((player, player_name))
elif len(parts) == 2:
# No middle initial
key = f"{parts[0]} {parts[1]}".lower()
name_groups[key].append((player, player_name))
for key, player_list in name_groups.items():
if len(player_list) > 1:
# Multiple players with same first/last but different middles
middle_initial_conflicts.append({
'type': 'middle_initial_conflict',
'base_name': key.title(),
'players': [
{
'player_id': player.id,
'player_name': name,
'seasons': [player.season],
'bbref_id': player.bbref_id
}
for player, name in player_list
],
'risk_reason': f'{len(player_list)} players with similar first/last names but different middle initials'
})
logger.info(f"Found {len(middle_initial_conflicts)} middle initial conflicts")
return sba_conflicts, player_conflicts, middle_initial_conflicts
def generate_high_risk_csv(sba_conflicts, player_conflicts, middle_initial_conflicts):
"""Generate CSV file with all high-risk matches"""
output_file = '/mnt/NV2/Development/major-domo/database/high_risk_player_matches.csv'
with open(output_file, 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
# Header
writer.writerow([
'risk_type', 'risk_reason', 'player_id', 'player_name', 'player_seasons',
'sba1_id', 'sba1_name', 'sba1_bbref', 'sba2_id', 'sba2_name', 'sba2_bbref',
'similarity_score', 'action_needed'
])
# SbaPlayer conflicts
for conflict in sba_conflicts:
writer.writerow([
conflict['type'],
conflict['risk_reason'],
'', # No player_id for SbaPlayer conflicts
'', # No player_name
'', # No seasons
conflict['sba1_id'],
conflict['sba1_name'],
conflict['sba1_bbref'] or '',
conflict['sba2_id'],
conflict['sba2_name'],
conflict['sba2_bbref'] or '',
f"{conflict['similarity']:.3f}",
'Verify these are different people, not duplicates'
])
# Player ambiguous matches
for conflict in player_conflicts:
# Show top 2 potential matches
matches = conflict['potential_matches'][:2]
if len(matches) >= 2:
writer.writerow([
conflict['type'],
conflict['risk_reason'],
conflict['player_id'],
conflict['player_name'],
','.join(map(str, conflict['player_seasons'])),
matches[0]['sba_id'],
matches[0]['sba_name'],
matches[0]['sba_bbref'] or '',
matches[1]['sba_id'] if len(matches) > 1 else '',
matches[1]['sba_name'] if len(matches) > 1 else '',
matches[1]['sba_bbref'] if len(matches) > 1 else '',
f"{matches[0]['similarity']:.3f}",
f'Choose correct match from {len(conflict["potential_matches"])} options'
])
# Middle initial conflicts
for conflict in middle_initial_conflicts:
players = conflict['players']
if len(players) >= 2:
writer.writerow([
conflict['type'],
conflict['risk_reason'],
players[0]['player_id'],
players[0]['player_name'],
','.join(map(str, players[0]['seasons'])),
'', # No SbaPlayer info yet
'',
'',
players[1]['player_id'] if len(players) > 1 else '',
players[1]['player_name'] if len(players) > 1 else '',
'',
'N/A',
f'Verify these are different people: {", ".join([p["player_name"] for p in players])}'
])
logger.info(f"Generated high-risk matches CSV: {output_file}")
# Generate summary
summary_file = '/mnt/NV2/Development/major-domo/database/high_risk_matches_summary.txt'
with open(summary_file, 'w') as f:
f.write("HIGH-RISK PLAYER MATCHES SUMMARY\n")
f.write("=" * 50 + "\n\n")
f.write(f"SbaPlayer name conflicts: {len(sba_conflicts)}\n")
f.write(f"Players with ambiguous matches: {len(player_conflicts)}\n")
f.write(f"Middle initial conflicts: {len(middle_initial_conflicts)}\n")
f.write(f"Total high-risk situations: {len(sba_conflicts) + len(player_conflicts) + len(middle_initial_conflicts)}\n\n")
f.write("RISK TYPES:\n")
f.write("1. sbaplayer_conflict: Multiple SbaPlayers with very similar names\n")
f.write("2. player_ambiguous_match: Player could match multiple SbaPlayers\n")
f.write("3. middle_initial_conflict: Players with same first/last but different middle initials\n\n")
f.write("ACTION REQUIRED:\n")
f.write("Review the CSV file to ensure correct matching and avoid linking wrong players.\n")
logger.info(f"Generated summary: {summary_file}")
return output_file, summary_file
def main():
"""Main execution"""
logger.info("Starting high-risk match analysis...")
sba_conflicts, player_conflicts, middle_initial_conflicts = find_high_risk_matches()
csv_file, summary_file = generate_high_risk_csv(sba_conflicts, player_conflicts, middle_initial_conflicts)
logger.info(f"\n=== HIGH-RISK ANALYSIS COMPLETE ===")
logger.info(f"CSV file: {csv_file}")
logger.info(f"Summary: {summary_file}")
total_risks = len(sba_conflicts) + len(player_conflicts) + len(middle_initial_conflicts)
logger.info(f"Total high-risk situations found: {total_risks}")
if __name__ == "__main__":
main()