import asyncio import datetime import os import pandas as pd from typing import List, Dict, Optional from pathlib import Path try: import pybaseball as pb except ImportError: # Allow module to be imported for testing even without pybaseball pb = None from exceptions import logger try: from creation_helpers import get_all_pybaseball_ids except ImportError: # Fallback for testing def get_all_pybaseball_ids(season): return [] class DataFetcher: """Automated data fetching for Baseball Reference and FanGraphs data""" def __init__(self, season: int, cardset_type: str = "Season"): self.season = season self.cardset_type = cardset_type # "Season", "Live", "Promos" self.output_dir = Path(f"data-input/{season} {cardset_type} Cardset") self.cache_enabled = True # Enable pybaseball caching if self.cache_enabled and pb is not None: pb.cache.enable() def ensure_output_dir(self): """Create output directory if it doesn't exist""" self.output_dir.mkdir(parents=True, exist_ok=True) logger.info(f"Output directory: {self.output_dir}") async def fetch_baseball_reference_data(self) -> Dict[str, pd.DataFrame]: """Fetch all Baseball Reference data that can be automated""" if pb is None: raise ImportError("pybaseball is required for data fetching") logger.info(f"Fetching Baseball Reference data for {self.season}") data = {} try: # Standard pitching stats logger.info("Fetching standard pitching stats...") data['pitching'] = pb.pitching_stats_bref(self.season) # Baserunning stats logger.info("Fetching baserunning stats...") batting_data = pb.batting_stats_bref(self.season) required_running_cols = ['Name', 'SB', 'CS', 'SB%', 'GDP', 'R', 'H', 'BB', 'SO'] available_cols = [col for col in required_running_cols if col in batting_data.columns] data['running'] = batting_data[available_cols] # Player list for splits player_ids = await self._get_active_players() # Fetch splits for subset of players (limit for performance) logger.info(f"Fetching batting splits for {len(player_ids)} players...") splits_data = await self._fetch_player_splits(player_ids[:50]) # Limit for initial implementation data['batting_splits'] = splits_data['batting'] data['pitching_splits'] = splits_data['pitching'] except Exception as e: logger.error(f"Error fetching Baseball Reference data: {e}") raise return data async def fetch_fangraphs_data(self, start_date: str = None, end_date: str = None) -> Dict[str, pd.DataFrame]: """Fetch FanGraphs data (limited functionality)""" if pb is None: raise ImportError("pybaseball is required for data fetching") logger.info(f"Fetching FanGraphs data for {self.season}") data = {} try: if start_date and end_date: # Date range queries for live series data['batting_basic'] = pb.batting_stats_range(start_date, end_date) data['pitching_basic'] = pb.pitching_stats_range(start_date, end_date) else: # Full season queries data['batting_basic'] = pb.batting_stats(self.season, self.season) data['pitching_basic'] = pb.pitching_stats(self.season, self.season) except Exception as e: logger.error(f"Error fetching FanGraphs data: {e}") raise return data async def _get_active_players(self) -> List[str]: """Get list of active player IDs for the season""" try: # Try to use existing function first try: player_ids = get_all_pybaseball_ids(self.season) logger.info(f"Found {len(player_ids)} active players from existing function") return player_ids except Exception: logger.info("Falling back to FanGraphs player ID extraction") # Fallback to getting IDs from basic stats batting_data = pb.batting_stats(self.season, self.season) player_ids = batting_data['IDfg'].dropna().unique().tolist() logger.info(f"Found {len(player_ids)} players from FanGraphs data") return player_ids except Exception as e: logger.error(f"Error getting player IDs: {e}") # Return empty list if all else fails return [] async def _fetch_player_splits(self, player_ids: List[str]) -> Dict[str, pd.DataFrame]: """Fetch batting/pitching splits for all players""" batting_splits = [] pitching_splits = [] for i, player_id in enumerate(player_ids): if i % 10 == 0: logger.info(f"Processing player {i+1}/{len(player_ids)}") try: # Batting splits bat_splits = pb.get_splits(player_id, year=self.season, pitching_splits=False) if not bat_splits.empty: bat_splits['player_id'] = player_id batting_splits.append(bat_splits) # Pitching splits pitch_splits = pb.get_splits(player_id, year=self.season, pitching_splits=True) if not pitch_splits.empty: pitch_splits['player_id'] = player_id pitching_splits.append(pitch_splits) except Exception as e: logger.warning(f"Error fetching splits for {player_id}: {e}") continue return { 'batting': pd.concat(batting_splits, ignore_index=True) if batting_splits else pd.DataFrame(), 'pitching': pd.concat(pitching_splits, ignore_index=True) if pitching_splits else pd.DataFrame() } def save_data_to_csv(self, data: Dict[str, pd.DataFrame]): """Save fetched data to CSV files in the expected format""" self.ensure_output_dir() for name, df in data.items(): if df.empty: logger.warning(f"Skipping empty dataset: {name}") continue filename = self._get_csv_filename(name) filepath = self.output_dir / filename # Apply any necessary transformations df = self._transform_for_card_creation(df, name) df.to_csv(filepath, index=False) logger.info(f"Saved {len(df)} records to {filepath}") def _get_csv_filename(self, data_type: str) -> str: """Map data types to expected CSV filenames""" filename_map = { 'pitching': 'pitching.csv', 'running': 'running.csv', 'batting_basic': 'batter-stats.csv', 'pitching_basic': 'pitcher-stats.csv', 'batting_splits': 'batting-splits.csv', 'pitching_splits': 'pitching-splits.csv' } return filename_map.get(data_type, f'{data_type}.csv') def _transform_for_card_creation(self, df: pd.DataFrame, data_type: str) -> pd.DataFrame: """Apply transformations to match card creation expectations""" if data_type == 'batting_splits': # Filter for handedness splits if available if 'Split' in df.columns: handedness_splits = df[df['Split'].isin(['vs LHP', 'vs RHP'])] if not handedness_splits.empty: return handedness_splits elif data_type == 'running': # Ensure expected columns for baserunning required_cols = ['Name', 'SB', 'CS', 'SB%', 'GDP'] available_cols = [col for col in required_cols if col in df.columns] if available_cols: df = df[available_cols] return df class LiveSeriesDataFetcher(DataFetcher): """Specialized fetcher for live series updates""" def __init__(self, season: int, games_played: int): super().__init__(season, "Live") self.games_played = games_played self.start_date = f"{season}-03-01" # Spring training start self.end_date = self._calculate_end_date(games_played) def _calculate_end_date(self, games_played: int) -> str: """Calculate end date based on games played""" # Rough estimate: 162 games over ~180 days days_elapsed = int(games_played * (180 / 162)) start = datetime.datetime(self.season, 3, 1) end_date = start + datetime.timedelta(days=days_elapsed) return end_date.strftime("%Y-%m-%d") async def fetch_live_data(self) -> Dict[str, pd.DataFrame]: """Fetch live series data with date filtering""" logger.info(f"Fetching live data through {self.end_date} ({self.games_played} games)") # Combine Baseball Reference and FanGraphs data bref_data = await self.fetch_baseball_reference_data() fg_data = await self.fetch_fangraphs_data(self.start_date, self.end_date) return {**bref_data, **fg_data} # Utility functions for CLI usage async def fetch_season_data(season: int): """Fetch complete season data""" fetcher = DataFetcher(season, "Season") # Fetch all available data bref_data = await fetcher.fetch_baseball_reference_data() fg_data = await fetcher.fetch_fangraphs_data() # Combine and save all_data = {**bref_data, **fg_data} fetcher.save_data_to_csv(all_data) # Report what still needs manual download manual_files = [ "vlhp-basic.csv", "vlhp-rate.csv", "vrhp-basic.csv", "vrhp-rate.csv", "vlhh-basic.csv", "vlhh-rate.csv", "vrhh-basic.csv", "vrhh-rate.csv" ] print(f"\n{'='*50}") print("AUTOMATED DOWNLOAD COMPLETE") print(f"{'='*50}") print(f"✅ Saved to: {fetcher.output_dir}") print(f"✅ Baseball Reference data: AUTOMATED") print(f"⚠️ Still need manual download from FanGraphs:") for file in manual_files: print(f" - {file}") print(f"{'='*50}") async def fetch_live_series_data(season: int, games_played: int): """Fetch live series data""" fetcher = LiveSeriesDataFetcher(season, games_played) live_data = await fetcher.fetch_live_data() fetcher.save_data_to_csv(live_data) # CLI integration if __name__ == '__main__': import sys if len(sys.argv) < 2: print("Usage: python automated_data_fetcher.py [games_played]") print("Examples:") print(" python automated_data_fetcher.py 2024 # Full season") print(" python automated_data_fetcher.py 2025 74 # Live series (74 games)") sys.exit(1) season = int(sys.argv[1]) if len(sys.argv) == 3: # Live series mode games_played = int(sys.argv[2]) asyncio.run(fetch_live_series_data(season, games_played)) else: # Full season mode asyncio.run(fetch_season_data(season))