paper-dynasty-card-creation/pd_cards/commands/live_series.py
Cal Corum 5b75a3d38f Implement CLI wrappers for live-series, retrosheet, scouting, upload
Migrated all major card creation workflows to pd-cards CLI:

live-series:
- update: Full FanGraphs/BBRef card generation with CLI options
- status: Show cardset status from database

retrosheet:
- process: Historical Retrosheet data processing
- arms: Generate outfield arm ratings from play-by-play
- validate: Check for position anomalies in cardsets
- defense: Fetch defensive stats from Baseball Reference

scouting:
- batters: Generate batting scouting reports
- pitchers: Generate pitching scouting reports
- all: Generate all reports at once

upload:
- s3: Upload card images to AWS S3
- check: Validate cards without uploading
- refresh: Re-generate and re-upload card images

Updated CLAUDE.md with comprehensive CLI documentation.
Legacy scripts remain available but CLI is now the primary interface.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-18 16:39:38 -06:00

282 lines
11 KiB
Python

"""
Live series card update commands.
Commands for generating cards from current season FanGraphs/Baseball Reference data.
"""
import asyncio
import datetime
from pathlib import Path
from typing import Optional
import typer
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn
app = typer.Typer(no_args_is_help=True)
console = Console()
@app.command()
def update(
cardset: str = typer.Option(..., "--cardset", "-c", help="Target cardset name (e.g., '2025 Season')"),
season: int = typer.Option(None, "--season", "-s", help="Season year (defaults to cardset year)"),
games_played: int = typer.Option(162, "--games", "-g", help="Number of games played (1-162)"),
description: str = typer.Option(None, "--description", "-d", help="Player description (defaults to year)"),
pull_fielding: bool = typer.Option(True, "--pull-fielding/--no-pull-fielding", help="Pull fielding stats from Baseball Reference"),
post_batters: bool = typer.Option(True, "--post-batters/--skip-batters", help="Post batting cards and ratings"),
post_pitchers: bool = typer.Option(True, "--post-pitchers/--skip-pitchers", help="Post pitching cards and ratings"),
post_fielders: bool = typer.Option(True, "--post-fielders/--skip-fielders", help="Post card positions"),
post_players: bool = typer.Option(True, "--post-players/--skip-players", help="Post player updates"),
is_live: bool = typer.Option(True, "--live/--not-live", help="Look up current MLB clubs from statsapi"),
ignore_limits: bool = typer.Option(False, "--ignore-limits", help="Ignore minimum PA/TBF requirements"),
dry_run: bool = typer.Option(False, "--dry-run", "-n", help="Preview without saving to database"),
):
"""
Update live series cards from FanGraphs/Baseball Reference data.
Reads CSV files from data-input/{cardset} Cardset/ and generates batting/pitching cards.
Example:
pd-cards live-series update --cardset "2025 Season" --games 81
"""
console.print()
console.print("=" * 70)
console.print(f"[bold]LIVE SERIES UPDATE - {cardset}[/bold]")
console.print("=" * 70)
if dry_run:
console.print("[yellow]DRY RUN - no changes will be made[/yellow]")
console.print()
# Validate games_played
if games_played < 1 or games_played > 162:
console.print(f"[red]Error: games_played must be between 1 and 162, got {games_played}[/red]")
raise typer.Exit(1)
season_pct = games_played / 162
# Import the necessary modules
try:
import sys
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
import batters.creation
import pitchers.creation
import pandas as pd
from creation_helpers import pd_players_df, pd_positions_df
from db_calls import db_get, db_patch, DB_URL
from exceptions import logger
except ImportError as e:
console.print(f"[red]Error importing modules: {e}[/red]")
console.print("Make sure you're running from the card-creation directory")
raise typer.Exit(1)
CARD_BASE_URL = f'{DB_URL}/v2/players'
async def run_update():
# Look up cardset
console.print(f"Searching for cardset: {cardset}")
c_query = await db_get('cardsets', params=[('name', cardset)])
if c_query is None or c_query['count'] == 0:
console.print(f"[red]Cardset '{cardset}' not found[/red]")
raise typer.Exit(1)
cardset_data = c_query['cardsets'][0]
input_path = f'data-input/{cardset_data["name"]} Cardset/'
# Determine season from cardset name if not provided
actual_season = season
if actual_season is None:
# Try to extract year from cardset name
import re
match = re.search(r'\b(19|20)\d{2}\b', cardset)
if match:
actual_season = int(match.group())
else:
actual_season = datetime.datetime.now().year
# Determine description
actual_description = description if description else str(actual_season)
console.print(f"Cardset ID: {cardset_data['id']} / Season: {actual_season}")
console.print(f"Game count: {games_played} / Season %: {season_pct:.2%}")
console.print(f"Description: {actual_description}")
console.print()
if dry_run:
console.print("[green]Validation passed - ready to run[/green]")
console.print()
console.print("Would execute:")
console.print(f" - Input path: {input_path}")
if post_batters:
console.print(" - Process batting cards")
if post_pitchers:
console.print(" - Process pitching cards")
if post_fielders:
console.print(" - Process card positions")
if post_players:
console.print(" - Update player records")
return
start_time = datetime.datetime.now()
release_directory = f'{start_time.year}-{start_time.month}-{start_time.day}'
# Run batters
console.print("[bold]Processing batters...[/bold]")
data = await batters.creation.run_batters(
cardset_data, input_path, post_players, CARD_BASE_URL, release_directory,
actual_description, season_pct, post_batters, pull_fielding, actual_season,
is_live, ignore_limits
)
batter_time = datetime.datetime.now()
batter_runtime = batter_time - start_time
console.print(f"[green]✓ Batter updates complete[/green]")
console.print(f" Total batting cards: {data['tot_batters']}")
console.print(f" New cardset batters: {data['new_batters']}")
console.print(f" Runtime: {round(batter_runtime.total_seconds())} seconds")
console.print()
# Run pitchers
console.print("[bold]Processing pitchers...[/bold]")
data = await pitchers.creation.run_pitchers(
cardset_data, input_path, CARD_BASE_URL, actual_season, release_directory,
actual_description, season_pct, post_players, post_pitchers, is_live, ignore_limits
)
pitching_stats = data['pitching_stats']
pitcher_time = datetime.datetime.now()
pitcher_runtime = pitcher_time - batter_time
console.print(f"[green]✓ Pitcher updates complete[/green]")
console.print(f" Total pitching cards: {data['tot_pitchers']}")
console.print(f" New cardset pitchers: {data['new_pitchers']}")
console.print(f" Runtime: {round(pitcher_runtime.total_seconds())} seconds")
console.print()
# Run player position updates
if 'promos' not in cardset.lower():
console.print("[bold]Processing player positions...[/bold]")
all_pos = await pd_positions_df(cardset_data['id'])
all_players = await pd_players_df(cardset_data['id'])
player_updates = {}
def set_all_positions(df_data):
pos_series = all_pos.query(f'player_id == {df_data["player_id"]}')['position']
pos_updates = []
count = 1
for this_pos in pos_series:
if this_pos == 'P':
try:
this_pitcher = pitching_stats.loc[df_data['bbref_id']]
except KeyError:
pos_updates.append((f'pos_{count}', 'RP'))
count += 1
break
if this_pitcher['starter_rating'] > 3:
pos_updates.append((f'pos_{count}', 'SP'))
count += 1
if this_pitcher['relief_rating'] > 1 or not pd.isna(this_pitcher['closer_rating']):
pos_updates.append((f'pos_{count}', 'RP'))
count += 1
else:
pos_updates.append((f'pos_{count}', 'RP'))
count += 1
if not pd.isna(this_pitcher['closer_rating']):
pos_updates.append((f'pos_{count}', 'CP'))
count += 1
else:
pos_updates.append((f'pos_{count}', this_pos))
count += 1
if count == 1:
pos_updates.append(('pos_1', 'DH'))
count += 1
while count <= 9:
pos_updates.append((f'pos_{count}', 'False'))
count += 1
if len(pos_updates) > 0:
if df_data.player_id not in player_updates.keys():
player_updates[df_data.player_id] = pos_updates
else:
player_updates[df_data.player_id].extend(pos_updates)
all_players.apply(set_all_positions, axis=1)
console.print(f"Sending {len(player_updates)} player updates to database...")
if post_players:
for player_id in player_updates:
await db_patch('players', object_id=player_id, params=player_updates[player_id])
position_time = datetime.datetime.now()
position_runtime = position_time - pitcher_time
console.print(f"[green]✓ Player position updates complete[/green]")
console.print(f" Runtime: {round(position_runtime.total_seconds())} seconds")
console.print()
total_runtime = datetime.datetime.now() - start_time
console.print("=" * 70)
console.print(f"[bold green]✓ LIVE SERIES UPDATE COMPLETE[/bold green]")
console.print(f"Total runtime: {round(total_runtime.total_seconds())} seconds")
console.print("=" * 70)
asyncio.run(run_update())
@app.command()
def status(
cardset: str = typer.Option(None, "--cardset", "-c", help="Filter by cardset name"),
):
"""Show status of live series cardsets."""
console.print()
try:
import sys
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from db_calls import db_get
except ImportError as e:
console.print(f"[red]Error importing modules: {e}[/red]")
raise typer.Exit(1)
async def get_status():
params = []
if cardset:
params.append(('name', cardset))
result = await db_get('cardsets', params=params if params else None)
if result is None or result['count'] == 0:
if cardset:
console.print(f"[yellow]No cardset found matching '{cardset}'[/yellow]")
else:
console.print("[yellow]No cardsets found[/yellow]")
return
from rich.table import Table
table = Table(title="Cardsets")
table.add_column("ID", justify="right")
table.add_column("Name")
table.add_column("Season", justify="right")
table.add_column("Players", justify="right")
for cs in result['cardsets']:
# Get player count for each cardset
players = await db_get('players', params=[('cardset_id', cs['id'])])
player_count = players['count'] if players else 0
table.add_row(
str(cs['id']),
cs['name'],
str(cs.get('season', '-')),
str(player_count)
)
console.print(table)
asyncio.run(get_status())