paper-dynasty-card-creation/scripts/upload_scouting.py
2025-11-08 16:57:35 -06:00

174 lines
5.6 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Upload scouting reports to remote server.
This script:
1. Runs scouting_batters.py to generate batting scouting reports
2. Runs scouting_pitchers.py to generate pitching scouting reports
3. Uploads generated CSV files to sba-db server via SSH
"""
import asyncio
import datetime
import logging
import subprocess
import sys
from pathlib import Path
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class ScoutingUploader:
"""Handles scouting report generation and upload."""
SCOUTING_DIR = Path('../scouting')
REMOTE_HOST = 'sba-db'
REMOTE_DIR = '/home/cal/container-data/pd-database/storage'
FILES_TO_UPLOAD = [
'batting-basic.csv',
'batting-ratings.csv',
'pitching-basic.csv',
'pitching-ratings.csv'
]
def __init__(self):
self.start_time = datetime.datetime.now()
def log_time(self, message: str):
"""Log elapsed time for a message."""
elapsed = (datetime.datetime.now() - self.start_time).total_seconds()
logger.info(f"{message} (elapsed: {elapsed:.2f}s)")
async def run_scouting_batters(self):
"""Run scouting_batters.py to generate batting reports."""
logger.info("Starting batting scouting report generation...")
try:
result = subprocess.run(
[sys.executable, '../scouting_batters.py'],
check=True,
capture_output=True,
text=True
)
self.log_time("Batting scouting reports generated")
logger.debug(f"Output: {result.stdout}")
return True
except subprocess.CalledProcessError as e:
logger.error(f"Failed to generate batting scouting reports: {e}")
logger.error(f"Stderr: {e.stderr}")
return False
async def run_scouting_pitchers(self):
"""Run scouting_pitchers.py to generate pitching reports."""
logger.info("Starting pitching scouting report generation...")
try:
result = subprocess.run(
[sys.executable, '../scouting_pitchers.py'],
check=True,
capture_output=True,
text=True
)
self.log_time("Pitching scouting reports generated")
logger.debug(f"Output: {result.stdout}")
return True
except subprocess.CalledProcessError as e:
logger.error(f"Failed to generate pitching scouting reports: {e}")
logger.error(f"Stderr: {e.stderr}")
return False
def verify_files_exist(self) -> bool:
"""Verify all required files exist before upload."""
logger.info("Verifying generated files...")
missing_files = []
for filename in self.FILES_TO_UPLOAD:
filepath = self.SCOUTING_DIR / filename
if not filepath.exists():
missing_files.append(str(filepath))
if missing_files:
logger.error(f"Missing files: {', '.join(missing_files)}")
return False
logger.info(f"All {len(self.FILES_TO_UPLOAD)} files verified")
return True
def upload_files(self) -> bool:
"""Upload scouting files to remote server via SCP."""
logger.info(f"Uploading files to {self.REMOTE_HOST}:{self.REMOTE_DIR}...")
try:
# Build list of local file paths
local_files = [str(self.SCOUTING_DIR / f) for f in self.FILES_TO_UPLOAD]
# Use SCP to upload all files at once
scp_command = [
'scp',
*local_files,
f"{self.REMOTE_HOST}:{self.REMOTE_DIR}/"
]
logger.debug(f"Running: {' '.join(scp_command)}")
result = subprocess.run(
scp_command,
check=True,
capture_output=True,
text=True
)
self.log_time(f"Successfully uploaded {len(self.FILES_TO_UPLOAD)} files")
return True
except subprocess.CalledProcessError as e:
logger.error(f"Failed to upload files: {e}")
logger.error(f"Stderr: {e.stderr}")
return False
async def run(self):
"""Execute the full scouting and upload workflow."""
logger.info("=" * 60)
logger.info("Starting scouting report generation and upload")
logger.info("=" * 60)
# Step 1: Generate batting scouting reports
if not await self.run_scouting_batters():
logger.error("Batting scouting generation failed. Aborting.")
return False
# Step 2: Generate pitching scouting reports
if not await self.run_scouting_pitchers():
logger.error("Pitching scouting generation failed. Aborting.")
return False
# Step 3: Verify files exist
if not self.verify_files_exist():
logger.error("File verification failed. Aborting.")
return False
# Step 4: Upload files to remote server
if not self.upload_files():
logger.error("File upload failed.")
return False
logger.info("=" * 60)
logger.info("Scouting report generation and upload completed successfully!")
self.log_time("Total time")
logger.info("=" * 60)
return True
async def main():
"""Main entry point."""
uploader = ScoutingUploader()
success = await uploader.run()
sys.exit(0 if success else 1)
if __name__ == '__main__':
asyncio.run(main())