paper-dynasty-card-creation/scripts/upload_scouting.py
Cal Corum 0a17745389 Run black and ruff across entire codebase
Standardize formatting with black and apply ruff auto-fixes.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-08 14:24:33 -05:00

170 lines
5.5 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Upload scouting reports to remote server.
This script:
1. Runs scouting_batters.py to generate batting scouting reports
2. Runs scouting_pitchers.py to generate pitching scouting reports
3. Uploads generated CSV files to akamai server via SSH
"""
import asyncio
import datetime
import logging
import subprocess
import sys
from pathlib import Path
# Set up logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
class ScoutingUploader:
"""Handles scouting report generation and upload."""
SCOUTING_DIR = Path("../scouting")
REMOTE_HOST = "akamai"
REMOTE_DIR = "/root/container-data/paper-dynasty/storage"
FILES_TO_UPLOAD = [
"batting-basic.csv",
"batting-ratings.csv",
"pitching-basic.csv",
"pitching-ratings.csv",
]
def __init__(self):
self.start_time = datetime.datetime.now()
def log_time(self, message: str):
"""Log elapsed time for a message."""
elapsed = (datetime.datetime.now() - self.start_time).total_seconds()
logger.info(f"{message} (elapsed: {elapsed:.2f}s)")
async def run_scouting_batters(self):
"""Run scouting_batters.py to generate batting reports."""
logger.info("Starting batting scouting report generation...")
try:
result = subprocess.run(
[sys.executable, "../scouting_batters.py"],
check=True,
capture_output=True,
text=True,
)
self.log_time("Batting scouting reports generated")
logger.debug(f"Output: {result.stdout}")
return True
except subprocess.CalledProcessError as e:
logger.error(f"Failed to generate batting scouting reports: {e}")
logger.error(f"Stderr: {e.stderr}")
return False
async def run_scouting_pitchers(self):
"""Run scouting_pitchers.py to generate pitching reports."""
logger.info("Starting pitching scouting report generation...")
try:
result = subprocess.run(
[sys.executable, "../scouting_pitchers.py"],
check=True,
capture_output=True,
text=True,
)
self.log_time("Pitching scouting reports generated")
logger.debug(f"Output: {result.stdout}")
return True
except subprocess.CalledProcessError as e:
logger.error(f"Failed to generate pitching scouting reports: {e}")
logger.error(f"Stderr: {e.stderr}")
return False
def verify_files_exist(self) -> bool:
"""Verify all required files exist before upload."""
logger.info("Verifying generated files...")
missing_files = []
for filename in self.FILES_TO_UPLOAD:
filepath = self.SCOUTING_DIR / filename
if not filepath.exists():
missing_files.append(str(filepath))
if missing_files:
logger.error(f"Missing files: {', '.join(missing_files)}")
return False
logger.info(f"All {len(self.FILES_TO_UPLOAD)} files verified")
return True
def upload_files(self) -> bool:
"""Upload scouting files to remote server via SCP."""
logger.info(f"Uploading files to {self.REMOTE_HOST}:{self.REMOTE_DIR}...")
try:
# Build list of local file paths
local_files = [str(self.SCOUTING_DIR / f) for f in self.FILES_TO_UPLOAD]
# Use SCP to upload all files at once
scp_command = [
"scp",
*local_files,
f"{self.REMOTE_HOST}:{self.REMOTE_DIR}/",
]
logger.debug(f"Running: {' '.join(scp_command)}")
result = subprocess.run(
scp_command, check=True, capture_output=True, text=True
)
self.log_time(f"Successfully uploaded {len(self.FILES_TO_UPLOAD)} files")
return True
except subprocess.CalledProcessError as e:
logger.error(f"Failed to upload files: {e}")
logger.error(f"Stderr: {e.stderr}")
return False
async def run(self):
"""Execute the full scouting and upload workflow."""
logger.info("=" * 60)
logger.info("Starting scouting report generation and upload")
logger.info("=" * 60)
# Step 1: Generate batting scouting reports
if not await self.run_scouting_batters():
logger.error("Batting scouting generation failed. Aborting.")
return False
# Step 2: Generate pitching scouting reports
if not await self.run_scouting_pitchers():
logger.error("Pitching scouting generation failed. Aborting.")
return False
# Step 3: Verify files exist
if not self.verify_files_exist():
logger.error("File verification failed. Aborting.")
return False
# Step 4: Upload files to remote server
if not self.upload_files():
logger.error("File upload failed.")
return False
logger.info("=" * 60)
logger.info("Scouting report generation and upload completed successfully!")
self.log_time("Total time")
logger.info("=" * 60)
return True
async def main():
"""Main entry point."""
uploader = ScoutingUploader()
success = await uploader.run()
sys.exit(0 if success else 1)
if __name__ == "__main__":
asyncio.run(main())