285 lines
11 KiB
Python
285 lines
11 KiB
Python
import asyncio
|
|
import datetime
|
|
import os
|
|
import pandas as pd
|
|
from typing import List, Dict, Optional
|
|
from pathlib import Path
|
|
|
|
try:
|
|
import pybaseball as pb
|
|
except ImportError:
|
|
# Allow module to be imported for testing even without pybaseball
|
|
pb = None
|
|
|
|
from exceptions import logger
|
|
|
|
try:
|
|
from creation_helpers import get_all_pybaseball_ids
|
|
except ImportError:
|
|
# Fallback for testing
|
|
def get_all_pybaseball_ids(season):
|
|
return []
|
|
|
|
class DataFetcher:
|
|
"""Automated data fetching for Baseball Reference and FanGraphs data"""
|
|
|
|
def __init__(self, season: int, cardset_type: str = "Season"):
|
|
self.season = season
|
|
self.cardset_type = cardset_type # "Season", "Live", "Promos"
|
|
self.output_dir = Path(f"data-input/{season} {cardset_type} Cardset")
|
|
self.cache_enabled = True
|
|
|
|
# Enable pybaseball caching
|
|
if self.cache_enabled and pb is not None:
|
|
pb.cache.enable()
|
|
|
|
def ensure_output_dir(self):
|
|
"""Create output directory if it doesn't exist"""
|
|
self.output_dir.mkdir(parents=True, exist_ok=True)
|
|
logger.info(f"Output directory: {self.output_dir}")
|
|
|
|
async def fetch_baseball_reference_data(self) -> Dict[str, pd.DataFrame]:
|
|
"""Fetch all Baseball Reference data that can be automated"""
|
|
if pb is None:
|
|
raise ImportError("pybaseball is required for data fetching")
|
|
|
|
logger.info(f"Fetching Baseball Reference data for {self.season}")
|
|
data = {}
|
|
|
|
try:
|
|
# Standard pitching stats
|
|
logger.info("Fetching standard pitching stats...")
|
|
data['pitching'] = pb.pitching_stats_bref(self.season)
|
|
|
|
# Baserunning stats
|
|
logger.info("Fetching baserunning stats...")
|
|
batting_data = pb.batting_stats_bref(self.season)
|
|
required_running_cols = ['Name', 'SB', 'CS', 'SB%', 'GDP', 'R', 'H', 'BB', 'SO']
|
|
available_cols = [col for col in required_running_cols if col in batting_data.columns]
|
|
data['running'] = batting_data[available_cols]
|
|
|
|
# Player list for splits
|
|
player_ids = await self._get_active_players()
|
|
|
|
# Fetch splits for subset of players (limit for performance)
|
|
logger.info(f"Fetching batting splits for {len(player_ids)} players...")
|
|
splits_data = await self._fetch_player_splits(player_ids[:50]) # Limit for initial implementation
|
|
data['batting_splits'] = splits_data['batting']
|
|
data['pitching_splits'] = splits_data['pitching']
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching Baseball Reference data: {e}")
|
|
raise
|
|
|
|
return data
|
|
|
|
async def fetch_fangraphs_data(self, start_date: str = None, end_date: str = None) -> Dict[str, pd.DataFrame]:
|
|
"""Fetch FanGraphs data (limited functionality)"""
|
|
if pb is None:
|
|
raise ImportError("pybaseball is required for data fetching")
|
|
|
|
logger.info(f"Fetching FanGraphs data for {self.season}")
|
|
data = {}
|
|
|
|
try:
|
|
if start_date and end_date:
|
|
# Date range queries for live series
|
|
data['batting_basic'] = pb.batting_stats_range(start_date, end_date)
|
|
data['pitching_basic'] = pb.pitching_stats_range(start_date, end_date)
|
|
else:
|
|
# Full season queries
|
|
data['batting_basic'] = pb.batting_stats(self.season, self.season)
|
|
data['pitching_basic'] = pb.pitching_stats(self.season, self.season)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching FanGraphs data: {e}")
|
|
raise
|
|
|
|
return data
|
|
|
|
async def _get_active_players(self) -> List[str]:
|
|
"""Get list of active player IDs for the season"""
|
|
try:
|
|
# Try to use existing function first
|
|
try:
|
|
player_ids = get_all_pybaseball_ids(self.season)
|
|
logger.info(f"Found {len(player_ids)} active players from existing function")
|
|
return player_ids
|
|
except Exception:
|
|
logger.info("Falling back to FanGraphs player ID extraction")
|
|
|
|
# Fallback to getting IDs from basic stats
|
|
batting_data = pb.batting_stats(self.season, self.season)
|
|
player_ids = batting_data['IDfg'].dropna().unique().tolist()
|
|
logger.info(f"Found {len(player_ids)} players from FanGraphs data")
|
|
return player_ids
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting player IDs: {e}")
|
|
# Return empty list if all else fails
|
|
return []
|
|
|
|
async def _fetch_player_splits(self, player_ids: List[str]) -> Dict[str, pd.DataFrame]:
|
|
"""Fetch batting/pitching splits for all players"""
|
|
batting_splits = []
|
|
pitching_splits = []
|
|
|
|
for i, player_id in enumerate(player_ids):
|
|
if i % 10 == 0:
|
|
logger.info(f"Processing player {i+1}/{len(player_ids)}")
|
|
|
|
try:
|
|
# Batting splits
|
|
bat_splits = pb.get_splits(player_id, year=self.season, pitching_splits=False)
|
|
if not bat_splits.empty:
|
|
bat_splits['player_id'] = player_id
|
|
batting_splits.append(bat_splits)
|
|
|
|
# Pitching splits
|
|
pitch_splits = pb.get_splits(player_id, year=self.season, pitching_splits=True)
|
|
if not pitch_splits.empty:
|
|
pitch_splits['player_id'] = player_id
|
|
pitching_splits.append(pitch_splits)
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error fetching splits for {player_id}: {e}")
|
|
continue
|
|
|
|
return {
|
|
'batting': pd.concat(batting_splits, ignore_index=True) if batting_splits else pd.DataFrame(),
|
|
'pitching': pd.concat(pitching_splits, ignore_index=True) if pitching_splits else pd.DataFrame()
|
|
}
|
|
|
|
def save_data_to_csv(self, data: Dict[str, pd.DataFrame]):
|
|
"""Save fetched data to CSV files in the expected format"""
|
|
self.ensure_output_dir()
|
|
|
|
for name, df in data.items():
|
|
if df.empty:
|
|
logger.warning(f"Skipping empty dataset: {name}")
|
|
continue
|
|
|
|
filename = self._get_csv_filename(name)
|
|
filepath = self.output_dir / filename
|
|
|
|
# Apply any necessary transformations
|
|
df = self._transform_for_card_creation(df, name)
|
|
|
|
df.to_csv(filepath, index=False)
|
|
logger.info(f"Saved {len(df)} records to {filepath}")
|
|
|
|
def _get_csv_filename(self, data_type: str) -> str:
|
|
"""Map data types to expected CSV filenames"""
|
|
filename_map = {
|
|
'pitching': 'pitching.csv',
|
|
'running': 'running.csv',
|
|
'batting_basic': 'batter-stats.csv',
|
|
'pitching_basic': 'pitcher-stats.csv',
|
|
'batting_splits': 'batting-splits.csv',
|
|
'pitching_splits': 'pitching-splits.csv'
|
|
}
|
|
return filename_map.get(data_type, f'{data_type}.csv')
|
|
|
|
def _transform_for_card_creation(self, df: pd.DataFrame, data_type: str) -> pd.DataFrame:
|
|
"""Apply transformations to match card creation expectations"""
|
|
if data_type == 'batting_splits':
|
|
# Filter for handedness splits if available
|
|
if 'Split' in df.columns:
|
|
handedness_splits = df[df['Split'].isin(['vs LHP', 'vs RHP'])]
|
|
if not handedness_splits.empty:
|
|
return handedness_splits
|
|
|
|
elif data_type == 'running':
|
|
# Ensure expected columns for baserunning
|
|
required_cols = ['Name', 'SB', 'CS', 'SB%', 'GDP']
|
|
available_cols = [col for col in required_cols if col in df.columns]
|
|
if available_cols:
|
|
df = df[available_cols]
|
|
|
|
return df
|
|
|
|
class LiveSeriesDataFetcher(DataFetcher):
|
|
"""Specialized fetcher for live series updates"""
|
|
|
|
def __init__(self, season: int, games_played: int):
|
|
super().__init__(season, "Live")
|
|
self.games_played = games_played
|
|
self.start_date = f"{season}-03-01" # Spring training start
|
|
self.end_date = self._calculate_end_date(games_played)
|
|
|
|
def _calculate_end_date(self, games_played: int) -> str:
|
|
"""Calculate end date based on games played"""
|
|
# Rough estimate: 162 games over ~180 days
|
|
days_elapsed = int(games_played * (180 / 162))
|
|
start = datetime.datetime(self.season, 3, 1)
|
|
end_date = start + datetime.timedelta(days=days_elapsed)
|
|
return end_date.strftime("%Y-%m-%d")
|
|
|
|
async def fetch_live_data(self) -> Dict[str, pd.DataFrame]:
|
|
"""Fetch live series data with date filtering"""
|
|
logger.info(f"Fetching live data through {self.end_date} ({self.games_played} games)")
|
|
|
|
# Combine Baseball Reference and FanGraphs data
|
|
bref_data = await self.fetch_baseball_reference_data()
|
|
fg_data = await self.fetch_fangraphs_data(self.start_date, self.end_date)
|
|
|
|
return {**bref_data, **fg_data}
|
|
|
|
# Utility functions for CLI usage
|
|
|
|
async def fetch_season_data(season: int):
|
|
"""Fetch complete season data"""
|
|
fetcher = DataFetcher(season, "Season")
|
|
|
|
# Fetch all available data
|
|
bref_data = await fetcher.fetch_baseball_reference_data()
|
|
fg_data = await fetcher.fetch_fangraphs_data()
|
|
|
|
# Combine and save
|
|
all_data = {**bref_data, **fg_data}
|
|
fetcher.save_data_to_csv(all_data)
|
|
|
|
# Report what still needs manual download
|
|
manual_files = [
|
|
"vlhp-basic.csv", "vlhp-rate.csv",
|
|
"vrhp-basic.csv", "vrhp-rate.csv",
|
|
"vlhh-basic.csv", "vlhh-rate.csv",
|
|
"vrhh-basic.csv", "vrhh-rate.csv"
|
|
]
|
|
|
|
print(f"\n{'='*50}")
|
|
print("AUTOMATED DOWNLOAD COMPLETE")
|
|
print(f"{'='*50}")
|
|
print(f"✅ Saved to: {fetcher.output_dir}")
|
|
print(f"✅ Baseball Reference data: AUTOMATED")
|
|
print(f"⚠️ Still need manual download from FanGraphs:")
|
|
for file in manual_files:
|
|
print(f" - {file}")
|
|
print(f"{'='*50}")
|
|
|
|
async def fetch_live_series_data(season: int, games_played: int):
|
|
"""Fetch live series data"""
|
|
fetcher = LiveSeriesDataFetcher(season, games_played)
|
|
live_data = await fetcher.fetch_live_data()
|
|
fetcher.save_data_to_csv(live_data)
|
|
|
|
# CLI integration
|
|
if __name__ == '__main__':
|
|
import sys
|
|
|
|
if len(sys.argv) < 2:
|
|
print("Usage: python automated_data_fetcher.py <season> [games_played]")
|
|
print("Examples:")
|
|
print(" python automated_data_fetcher.py 2024 # Full season")
|
|
print(" python automated_data_fetcher.py 2025 74 # Live series (74 games)")
|
|
sys.exit(1)
|
|
|
|
season = int(sys.argv[1])
|
|
|
|
if len(sys.argv) == 3:
|
|
# Live series mode
|
|
games_played = int(sys.argv[2])
|
|
asyncio.run(fetch_live_series_data(season, games_played))
|
|
else:
|
|
# Full season mode
|
|
asyncio.run(fetch_season_data(season)) |