paper-dynasty-card-creation/db_calls.py
Cal Corum fe91de905a Add card generation pipeline agents and refresh scouting data
- Add retrosheet-card-update agent (8-step pipeline with validation gates)
- Add live-series-card-update agent (7-step pipeline with PotM support)
- Both agents: dev/prod S3 guard, environment verification, groundball_b validation
- Restore db_calls.py to production (alt_database = None)
- Refresh scouting reports (6303 batters, 7164 pitchers)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 14:16:50 -06:00

187 lines
5.9 KiB
Python

import aiohttp
import pybaseball as pb
from typing import Literal, Optional
from exceptions import logger
AUTH_TOKEN = {"Authorization": f"Bearer Tp3aO3jhYve5NJF1IqOmJTmk"}
DB_URL = "https://pd.manticorum.com/api"
master_debug = True
alt_database = None
if alt_database == "dev":
DB_URL = "https://pddev.manticorum.com/api"
elif alt_database == "sba":
DB_URL = "https://sba.manticorum.com/api"
def param_char(other_params):
if other_params:
return "&"
else:
return "?"
def get_req_url(
endpoint: str, api_ver: int = 2, object_id: int = None, params: list = None
):
req_url = f'{DB_URL}/v{api_ver}/{endpoint}{"/" if object_id is not None else ""}{object_id if object_id is not None else ""}'
if params:
other_params = False
for x in params:
req_url += f"{param_char(other_params)}{x[0]}={x[1]}"
other_params = True
return req_url
def log_return_value(log_string: str):
if master_debug:
logger.info(
f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}\n'
)
else:
logger.debug(
f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}\n'
)
async def db_get(
endpoint: str,
api_ver: int = 2,
object_id: int = None,
params: list = None,
none_okay: bool = True,
timeout: int = 3,
):
req_url = get_req_url(endpoint, api_ver=api_ver, object_id=object_id, params=params)
log_string = f"get:\n{endpoint} id: {object_id} params: {params}"
logger.info(log_string) if master_debug else logger.debug(log_string)
async with aiohttp.ClientSession(headers=AUTH_TOKEN) as session:
async with session.get(req_url) as r:
logger.info(f"session info: {r}")
if r.status == 200:
js = await r.json()
log_return_value(f"{js}")
return js
elif none_okay:
e = await r.text()
logger.error(e)
return None
else:
e = await r.text()
logger.error(e)
raise ValueError(f"DB: {e}")
async def url_get(url: str, timeout: int = 3):
log_string = f"get:\n{url}"
logger.info(log_string) if master_debug else logger.debug(log_string)
async with aiohttp.ClientSession() as session:
async with session.get(url) as r:
if r.status == 200:
log_string = f"200 received"
log_return_value(log_string)
return r
else:
e = await r.text()
logger.error(e)
raise ValueError(f"DB: {e}")
async def db_patch(
endpoint: str, object_id: int, params: list, api_ver: int = 2, timeout: int = 3
):
req_url = get_req_url(endpoint, api_ver=api_ver, object_id=object_id, params=params)
log_string = f"patch:\n{endpoint} {params}"
logger.info(log_string) if master_debug else logger.debug(log_string)
async with aiohttp.ClientSession(headers=AUTH_TOKEN) as session:
async with session.patch(req_url) as r:
if r.status == 200:
js = await r.json()
log_return_value(f"{js}")
return js
else:
e = await r.text()
logger.error(e)
raise ValueError(f"DB: {e}")
async def db_post(
endpoint: str, api_ver: int = 2, payload: dict = None, timeout: int = 3
):
req_url = get_req_url(endpoint, api_ver=api_ver)
log_string = f"post:\n{endpoint} payload: {payload}\ntype: {type(payload)}"
logger.info(log_string) if master_debug else logger.debug(log_string)
async with aiohttp.ClientSession(headers=AUTH_TOKEN) as session:
async with session.post(req_url, json=payload) as r:
if r.status == 200:
js = await r.json()
log_return_value(f"{js}")
return js
else:
e = await r.text()
logger.error(e)
raise ValueError(f"DB: {e}")
async def db_put(
endpoint: str, api_ver: int = 2, payload: dict = None, timeout: int = 3
):
req_url = get_req_url(endpoint, api_ver=api_ver)
log_string = f"put:\n{endpoint} payload: {payload}\ntype: {type(payload)}"
logger.info(log_string) if master_debug else logger.debug(log_string)
async with aiohttp.ClientSession(headers=AUTH_TOKEN) as session:
async with session.put(req_url, json=payload) as r:
if r.status == 200:
js = await r.json()
log_return_value(f"{js}")
return js
else:
e = await r.text()
logger.error(e)
raise ValueError(f"DB: {e}")
async def db_delete(endpoint: str, object_id: int, api_ver: int = 2, timeout=3):
req_url = get_req_url(endpoint, api_ver=api_ver, object_id=object_id)
log_string = f"delete:\n{endpoint} {object_id}"
logger.info(log_string) if master_debug else logger.debug(log_string)
async with aiohttp.ClientSession(headers=AUTH_TOKEN) as session:
async with session.delete(req_url) as r:
if r.status == 200:
js = await r.json()
log_return_value(f"{js}")
return js
else:
e = await r.text()
logger.error(e)
raise ValueError(f"DB: {e}")
def get_player_data(
player_id: str,
id_type: Literal["bbref", "fangraphs"],
return_type: Literal["dict", "Series"] = "dict",
):
q = pb.playerid_reverse_lookup([player_id], key_type=id_type)
if len(q.values) == 0:
return None
elif return_type == "Series":
return q.loc[0]
else:
return q.loc[0].to_dict()
def player_desc(this_player) -> str:
if this_player["p_name"] in this_player["description"]:
return this_player["description"]
return f'{this_player["description"]} {this_player["p_name"]}'