paper-dynasty-discord/api_calls.py
Cal Corum 740ea93b34
All checks were successful
Build Docker Image / build (pull_request) Successful in 1m26s
fix: batch cleanup — dead code, bare excepts, empty stubs (#25, #32, #37, #38)
Fixes #25, Fixes #32, Fixes #37, Fixes #38

- Remove unused PLAYER_CACHE = {} from api_calls.py (issue #37)
- Remove dead select_speed_testing() and select_all_testing() functions
  with their debug print() statements from gameplay_models.py (issue #32)
- Remove empty if-pass stubs after db_post calls in logic_gameplay.py (issue #38)
- Replace 10 bare except: clauses with except Exception: in gameplay_queries.py (issue #25)
- Add ruff.toml to configure pre-commit hook for existing codebase patterns
  (F403/F405 from intentional star imports, F541/F401/F841/E712 cosmetic)
- Fix E713 in logic_gameplay.py (not x in [...] -> x not in [...]) required
  by the pre-commit hook on the file already being touched

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-22 23:23:09 -05:00

313 lines
10 KiB
Python

import asyncio
import datetime
from dataclasses import dataclass
from typing import Optional
import logging
import aiohttp
from aiohttp import ClientTimeout
import os
from exceptions import DatabaseError, APITimeoutError
AUTH_TOKEN = {"Authorization": f"Bearer {os.environ.get('API_TOKEN')}"}
ENV_DATABASE = os.getenv("DATABASE", "dev").lower()
DB_URL = (
"https://pd.manticorum.com/api"
if "prod" in ENV_DATABASE
else "https://pddev.manticorum.com/api"
)
logger = logging.getLogger("discord_app")
def param_char(other_params):
if other_params:
return "&"
else:
return "?"
def get_req_url(
endpoint: str,
api_ver: int = 2,
object_id: Optional[int] = None,
params: Optional[list] = None,
):
req_url = f"{DB_URL}/v{api_ver}/{endpoint}{'/' if object_id is not None else ''}{object_id if object_id is not None else ''}"
if params:
other_params = False
for x in params:
req_url += f"{param_char(other_params)}{x[0]}={x[1]}"
other_params = True
return req_url
def log_return_value(log_string: str):
start = 0
end = 3000
while end < 300000:
line = log_string[start:end]
if len(line) == 0:
return
logger.debug(f"{'\n\nreturn: ' if start == 0 else ''}{log_string[start:end]}")
start += 3000
end += 3000
logger.warning("[ S N I P P E D ]")
async def db_get(
endpoint: str,
api_ver: int = 2,
object_id: Optional[int] = None,
params: Optional[list] = None,
none_okay: bool = True,
timeout: int = 5,
retries: int = 3,
):
"""
GET request to the API with timeout and retry logic.
Args:
endpoint: API endpoint path
api_ver: API version (default 2)
object_id: Optional object ID to append to URL
params: Optional list of (key, value) tuples for query params
none_okay: If True, return None on non-200 response; if False, raise DatabaseError
timeout: Request timeout in seconds (default 5)
retries: Number of retry attempts on timeout (default 3)
Returns:
JSON response or None if none_okay and request failed
Raises:
APITimeoutError: If all retry attempts fail due to timeout
DatabaseError: If response is non-200 and none_okay is False
"""
req_url = get_req_url(endpoint, api_ver=api_ver, object_id=object_id, params=params)
log_string = f"db_get - get: {endpoint} id: {object_id} params: {params}"
logger.debug(log_string)
for attempt in range(retries):
try:
client_timeout = ClientTimeout(total=timeout)
async with aiohttp.ClientSession(
headers=AUTH_TOKEN, timeout=client_timeout
) as session:
async with session.get(req_url) as r:
if r.status == 200:
js = await r.json()
log_return_value(f"{js}")
return js
elif none_okay:
e = await r.text()
logger.error(e)
return None
else:
e = await r.text()
logger.error(e)
raise DatabaseError(e)
except asyncio.TimeoutError:
if attempt < retries - 1:
wait_time = 2**attempt # 1s, 2s, 4s
logger.warning(
f"Timeout on GET {endpoint}, retry {attempt + 1}/{retries} in {wait_time}s"
)
await asyncio.sleep(wait_time)
else:
logger.error(
f"Connection timeout to host {req_url} after {retries} attempts"
)
raise APITimeoutError(f"Connection timeout to host {req_url}")
except aiohttp.ClientError as e:
logger.error(f"Connection error on GET {req_url}: {e}")
raise DatabaseError(f"Connection error: {e}")
async def db_patch(
endpoint: str, object_id: int, params: list, api_ver: int = 2, timeout: int = 5
):
"""
PATCH request to the API with timeout (no retry - not safe for mutations).
Args:
endpoint: API endpoint path
object_id: Object ID to patch
params: List of (key, value) tuples for query params
api_ver: API version (default 2)
timeout: Request timeout in seconds (default 5)
Raises:
APITimeoutError: If request times out
DatabaseError: If response is non-200
"""
req_url = get_req_url(endpoint, api_ver=api_ver, object_id=object_id, params=params)
log_string = f"db_patch - patch: {endpoint} {params}"
logger.debug(log_string)
try:
client_timeout = ClientTimeout(total=timeout)
async with aiohttp.ClientSession(
headers=AUTH_TOKEN, timeout=client_timeout
) as session:
async with session.patch(req_url) as r:
if r.status == 200:
js = await r.json()
log_return_value(f"{js}")
return js
else:
e = await r.text()
logger.error(e)
raise DatabaseError(e)
except asyncio.TimeoutError:
logger.error(f"Connection timeout to host {req_url}")
raise APITimeoutError(f"Connection timeout to host {req_url}")
except aiohttp.ClientError as e:
logger.error(f"Connection error on PATCH {req_url}: {e}")
raise DatabaseError(f"Connection error: {e}")
async def db_post(
endpoint: str, api_ver: int = 2, payload: Optional[dict] = None, timeout: int = 5
):
"""
POST request to the API with timeout (no retry - not safe for mutations).
Args:
endpoint: API endpoint path
api_ver: API version (default 2)
payload: Optional JSON payload
timeout: Request timeout in seconds (default 5)
Raises:
APITimeoutError: If request times out
DatabaseError: If response is non-200
"""
req_url = get_req_url(endpoint, api_ver=api_ver)
log_string = f"db_post - post: {endpoint} payload: {payload}\ntype: {type(payload)}"
logger.debug(log_string)
try:
client_timeout = ClientTimeout(total=timeout)
async with aiohttp.ClientSession(
headers=AUTH_TOKEN, timeout=client_timeout
) as session:
async with session.post(req_url, json=payload) as r:
if r.status == 200:
js = await r.json()
log_return_value(f"{js}")
return js
else:
e = await r.text()
logger.error(e)
raise DatabaseError(e)
except asyncio.TimeoutError:
logger.error(f"Connection timeout to host {req_url}")
raise APITimeoutError(f"Connection timeout to host {req_url}")
except aiohttp.ClientError as e:
logger.error(f"Connection error on POST {req_url}: {e}")
raise DatabaseError(f"Connection error: {e}")
async def db_put(
endpoint: str, api_ver: int = 2, payload: Optional[dict] = None, timeout: int = 5
):
"""
PUT request to the API with timeout (no retry - not safe for mutations).
Args:
endpoint: API endpoint path
api_ver: API version (default 2)
payload: Optional JSON payload
timeout: Request timeout in seconds (default 5)
Raises:
APITimeoutError: If request times out
DatabaseError: If response is non-200
"""
req_url = get_req_url(endpoint, api_ver=api_ver)
log_string = f"db_put - put: {endpoint} payload: {payload}\ntype: {type(payload)}"
logger.debug(log_string)
try:
client_timeout = ClientTimeout(total=timeout)
async with aiohttp.ClientSession(
headers=AUTH_TOKEN, timeout=client_timeout
) as session:
async with session.put(req_url, json=payload) as r:
if r.status == 200:
js = await r.json()
log_return_value(f"{js}")
return js
else:
e = await r.text()
logger.error(e)
raise DatabaseError(e)
except asyncio.TimeoutError:
logger.error(f"Connection timeout to host {req_url}")
raise APITimeoutError(f"Connection timeout to host {req_url}")
except aiohttp.ClientError as e:
logger.error(f"Connection error on PUT {req_url}: {e}")
raise DatabaseError(f"Connection error: {e}")
async def db_delete(endpoint: str, object_id: int, api_ver: int = 2, timeout: int = 5):
"""
DELETE request to the API with timeout (no retry - not safe for mutations).
Args:
endpoint: API endpoint path
object_id: Object ID to delete
api_ver: API version (default 2)
timeout: Request timeout in seconds (default 5)
Raises:
APITimeoutError: If request times out
DatabaseError: If response is non-200
"""
req_url = get_req_url(endpoint, api_ver=api_ver, object_id=object_id)
log_string = f"db_delete - delete: {endpoint} {object_id}"
logger.debug(log_string)
try:
client_timeout = ClientTimeout(total=timeout)
async with aiohttp.ClientSession(
headers=AUTH_TOKEN, timeout=client_timeout
) as session:
async with session.delete(req_url) as r:
if r.status == 200:
js = await r.json()
log_return_value(f"{js}")
return js
else:
e = await r.text()
logger.error(e)
raise DatabaseError(e)
except asyncio.TimeoutError:
logger.error(f"Connection timeout to host {req_url}")
raise APITimeoutError(f"Connection timeout to host {req_url}")
except aiohttp.ClientError as e:
logger.error(f"Connection error on DELETE {req_url}: {e}")
raise DatabaseError(f"Connection error: {e}")
async def get_team_by_abbrev(abbrev: str):
all_teams = await db_get("teams", params=[("abbrev", abbrev)])
if not all_teams or not all_teams["count"]:
return None
return all_teams["teams"][0]
async def post_to_dex(player, team):
return await db_post(
"paperdex", payload={"team_id": team["id"], "player_id": player["id"]}
)
def team_hash(team):
hash_string = f"{team['sname'][-1]}{team['gmid'] / 6950123:.0f}{team['sname'][-2]}{team['gmid'] / 42069123:.0f}"
return hash_string