Closes #57 - Add RARITY_LADDER and rarity_is_downgrade() to rarity_thresholds.py - Add get_fully_evolved_players() to db_calls.py — queries a to-be-created database endpoint; returns empty set safely if endpoint is unavailable - In batters/creation.py post_player_updates(): pre-flight check identifies players where OPS rarity would downgrade, then guards the rarity write to skip any downgrade for fully-evolved (T4) cards - Same guard added to pitchers/creation.py post_player_updates() Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
232 lines
7.5 KiB
Python
232 lines
7.5 KiB
Python
import os
|
|
|
|
import aiohttp
|
|
import pybaseball as pb
|
|
from dotenv import load_dotenv
|
|
|
|
from typing import Literal, Optional
|
|
from exceptions import logger
|
|
|
|
load_dotenv()
|
|
|
|
_token = os.environ.get("PD_API_TOKEN")
|
|
if not _token:
|
|
raise EnvironmentError("PD_API_TOKEN environment variable is required")
|
|
AUTH_TOKEN = {"Authorization": f"Bearer {_token}"}
|
|
DB_URL = "https://pd.manticorum.com/api"
|
|
master_debug = True
|
|
alt_database = None
|
|
|
|
if alt_database == "dev":
|
|
DB_URL = "https://pddev.manticorum.com/api"
|
|
elif alt_database == "sba":
|
|
DB_URL = "https://sba.manticorum.com/api"
|
|
|
|
|
|
def param_char(other_params):
|
|
if other_params:
|
|
return "&"
|
|
else:
|
|
return "?"
|
|
|
|
|
|
def get_req_url(
|
|
endpoint: str, api_ver: int = 2, object_id: int = None, params: list = None
|
|
):
|
|
req_url = f"{DB_URL}/v{api_ver}/{endpoint}{'/' if object_id is not None else ''}{object_id if object_id is not None else ''}"
|
|
|
|
if params:
|
|
other_params = False
|
|
for x in params:
|
|
req_url += f"{param_char(other_params)}{x[0]}={x[1]}"
|
|
other_params = True
|
|
|
|
return req_url
|
|
|
|
|
|
def log_return_value(log_string: str):
|
|
if master_debug:
|
|
logger.info(
|
|
f"return: {log_string[:1200]}{' [ S N I P P E D ]' if len(log_string) > 1200 else ''}\n"
|
|
)
|
|
else:
|
|
logger.debug(
|
|
f"return: {log_string[:1200]}{' [ S N I P P E D ]' if len(log_string) > 1200 else ''}\n"
|
|
)
|
|
|
|
|
|
async def db_get(
|
|
endpoint: str,
|
|
api_ver: int = 2,
|
|
object_id: int = None,
|
|
params: list = None,
|
|
none_okay: bool = True,
|
|
timeout: int = 30,
|
|
) -> Optional[dict]:
|
|
req_url = get_req_url(endpoint, api_ver=api_ver, object_id=object_id, params=params)
|
|
log_string = f"get:\n{endpoint} id: {object_id} params: {params}"
|
|
logger.info(log_string) if master_debug else logger.debug(log_string)
|
|
|
|
async with aiohttp.ClientSession(
|
|
headers=AUTH_TOKEN, timeout=aiohttp.ClientTimeout(total=timeout)
|
|
) as session:
|
|
async with session.get(req_url) as r:
|
|
logger.info(f"session info: {r}")
|
|
if r.status == 200:
|
|
js = await r.json()
|
|
log_return_value(f"{js}")
|
|
return js
|
|
elif none_okay:
|
|
e = await r.text()
|
|
logger.error(e)
|
|
return None
|
|
else:
|
|
e = await r.text()
|
|
logger.error(e)
|
|
raise ValueError(f"DB: {e}")
|
|
|
|
|
|
async def url_get(url: str, timeout: int = 30) -> dict:
|
|
log_string = f"get:\n{url}"
|
|
logger.info(log_string) if master_debug else logger.debug(log_string)
|
|
|
|
async with aiohttp.ClientSession(
|
|
timeout=aiohttp.ClientTimeout(total=timeout)
|
|
) as session:
|
|
async with session.get(url) as r:
|
|
if r.status == 200:
|
|
log_string = "200 received"
|
|
log_return_value(log_string)
|
|
return r
|
|
else:
|
|
e = await r.text()
|
|
logger.error(e)
|
|
raise ValueError(f"DB: {e}")
|
|
|
|
|
|
async def db_patch(
|
|
endpoint: str, object_id: int, params: list, api_ver: int = 2, timeout: int = 30
|
|
) -> dict:
|
|
req_url = get_req_url(endpoint, api_ver=api_ver, object_id=object_id, params=params)
|
|
log_string = f"patch:\n{endpoint} {params}"
|
|
logger.info(log_string) if master_debug else logger.debug(log_string)
|
|
|
|
async with aiohttp.ClientSession(
|
|
headers=AUTH_TOKEN, timeout=aiohttp.ClientTimeout(total=timeout)
|
|
) as session:
|
|
async with session.patch(req_url) as r:
|
|
if r.status == 200:
|
|
js = await r.json()
|
|
log_return_value(f"{js}")
|
|
return js
|
|
else:
|
|
e = await r.text()
|
|
logger.error(e)
|
|
raise ValueError(f"DB: {e}")
|
|
|
|
|
|
async def db_post(
|
|
endpoint: str, api_ver: int = 2, payload: dict = None, timeout: int = 30
|
|
) -> dict:
|
|
req_url = get_req_url(endpoint, api_ver=api_ver)
|
|
log_string = f"post:\n{endpoint} payload: {payload}\ntype: {type(payload)}"
|
|
logger.info(log_string) if master_debug else logger.debug(log_string)
|
|
|
|
async with aiohttp.ClientSession(
|
|
headers=AUTH_TOKEN, timeout=aiohttp.ClientTimeout(total=timeout)
|
|
) as session:
|
|
async with session.post(req_url, json=payload) as r:
|
|
if r.status == 200:
|
|
js = await r.json()
|
|
log_return_value(f"{js}")
|
|
return js
|
|
else:
|
|
e = await r.text()
|
|
logger.error(e)
|
|
raise ValueError(f"DB: {e}")
|
|
|
|
|
|
async def db_put(
|
|
endpoint: str, api_ver: int = 2, payload: dict = None, timeout: int = 30
|
|
) -> dict:
|
|
req_url = get_req_url(endpoint, api_ver=api_ver)
|
|
log_string = f"put:\n{endpoint} payload: {payload}\ntype: {type(payload)}"
|
|
logger.info(log_string) if master_debug else logger.debug(log_string)
|
|
|
|
async with aiohttp.ClientSession(
|
|
headers=AUTH_TOKEN, timeout=aiohttp.ClientTimeout(total=timeout)
|
|
) as session:
|
|
async with session.put(req_url, json=payload) as r:
|
|
if r.status == 200:
|
|
js = await r.json()
|
|
log_return_value(f"{js}")
|
|
return js
|
|
else:
|
|
e = await r.text()
|
|
logger.error(e)
|
|
raise ValueError(f"DB: {e}")
|
|
|
|
|
|
async def db_delete(endpoint: str, object_id: int, api_ver: int = 2, timeout=3) -> dict:
|
|
req_url = get_req_url(endpoint, api_ver=api_ver, object_id=object_id)
|
|
log_string = f"delete:\n{endpoint} {object_id}"
|
|
logger.info(log_string) if master_debug else logger.debug(log_string)
|
|
|
|
async with aiohttp.ClientSession(
|
|
headers=AUTH_TOKEN, timeout=aiohttp.ClientTimeout(total=timeout)
|
|
) as session:
|
|
async with session.delete(req_url) as r:
|
|
if r.status == 200:
|
|
js = await r.json()
|
|
log_return_value(f"{js}")
|
|
return js
|
|
else:
|
|
e = await r.text()
|
|
logger.error(e)
|
|
raise ValueError(f"DB: {e}")
|
|
|
|
|
|
async def get_fully_evolved_players(player_ids: list) -> set:
|
|
"""Return the subset of player_ids that have any fully-evolved (T4) refractor card state.
|
|
|
|
Calls GET /api/v2/refractor/fully-evolved with a comma-separated player_ids
|
|
query parameter. Returns an empty set if the endpoint is unavailable (404,
|
|
error, or missing field) so the caller degrades safely — no T4 protection is
|
|
applied rather than blocking the pipeline.
|
|
|
|
NOTE: This endpoint does not yet exist in the database API. It must return
|
|
{"player_ids": [<int>, ...]} listing player IDs with fully_evolved=True.
|
|
Until it is added, this function always returns an empty set.
|
|
"""
|
|
if not player_ids:
|
|
return set()
|
|
ids_param = ",".join(str(pid) for pid in player_ids)
|
|
result = await db_get(
|
|
"refractor/fully-evolved",
|
|
params=[("player_ids", ids_param)],
|
|
none_okay=True,
|
|
)
|
|
if result is None or "player_ids" not in result:
|
|
return set()
|
|
return set(result["player_ids"])
|
|
|
|
|
|
def get_player_data(
|
|
player_id: str,
|
|
id_type: Literal["bbref", "fangraphs"],
|
|
return_type: Literal["dict", "Series"] = "dict",
|
|
):
|
|
q = pb.playerid_reverse_lookup([player_id], key_type=id_type)
|
|
if len(q.values) == 0:
|
|
return None
|
|
elif return_type == "Series":
|
|
return q.loc[0]
|
|
else:
|
|
return q.loc[0].to_dict()
|
|
|
|
|
|
def player_desc(this_player) -> str:
|
|
if this_player["p_name"] in this_player["description"]:
|
|
return this_player["description"]
|
|
return f"{this_player['description']} {this_player['p_name']}"
|