paper-dynasty-database/app/dependencies.py
Cal Corum 19ac5ffd0a fix: use constant-time comparison for bearer token validation (#8)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-05 03:43:59 +00:00

239 lines
7.7 KiB
Python

import datetime
import hmac
import logging
import os
import requests
from fastapi.security import OAuth2PasswordBearer
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
master_debug = False
DB_URL = "https://pd.manticorum.com/api/"
AUTH_TOKEN = f'{os.environ.get("API_TOKEN")}'
AUTH_HEADER = {"Authorization": f"Bearer {AUTH_TOKEN}"}
priv_help = (
False
if not os.environ.get("PRIVATE_IN_SCHEMA")
else os.environ.get("PRIVATE_IN_SCHEMA").upper()
)
PRIVATE_IN_SCHEMA = True if priv_help else False
if os.environ.get("TESTING") == "True":
DB_URL = "https://pddev.manticorum.com/api/"
def valid_token(token):
return hmac.compare_digest(token, AUTH_TOKEN)
def int_timestamp(datetime_obj: datetime) -> int:
return int(datetime.datetime.timestamp(datetime_obj) * 1000)
def mround(x, prec=2, base=0.05):
return round(base * round(float(x) / base), prec)
def param_char(other_params):
if other_params:
return "&"
else:
return "?"
def get_req_url(
endpoint: str, api_ver: int = 2, object_id: int = None, params: list = None
):
req_url = f'{DB_URL}/v{api_ver}/{endpoint}{"/" if object_id is not None else ""}{object_id if object_id is not None else ""}'
if params:
other_params = False
for x in params:
req_url += f"{param_char(other_params)}{x[0]}={x[1]}"
other_params = True
return req_url
async def db_get(
endpoint: str,
api_ver: int = 2,
object_id: int = None,
params: list = None,
none_okay: bool = True,
timeout: int = 3,
):
req_url = get_req_url(endpoint, api_ver=api_ver, object_id=object_id, params=params)
log_string = f"get:\n{endpoint} id: {object_id} params: {params}"
logging.info(log_string) if master_debug else logging.debug(log_string)
retries = 0
while True:
try:
resp = requests.get(req_url, timeout=timeout)
break
except requests.ReadTimeout as e:
logging.error(
f"Get Timeout: {req_url} / retries: {retries} / timeout: {timeout}"
)
if retries > 1:
raise ConnectionError(
f"DB: The internet was a bit too slow for me to grab the data I needed. Please "
f"hang on a few extra seconds and try again."
)
timeout += [2, 5][retries]
retries += 1
if resp.status_code == 200:
data = resp.json()
log_string = f"{data}"
if master_debug:
logging.info(
f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}'
)
else:
logging.debug(
f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}'
)
return data
elif none_okay:
data = resp.json()
log_string = f"{data}"
if master_debug:
logging.info(
f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}'
)
else:
logging.debug(
f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}'
)
return None
else:
logging.warning(resp.text)
raise ValueError(f"DB: {resp.text}")
async def db_patch(
endpoint: str, object_id: int, params: list, api_ver: int = 2, timeout: int = 3
):
req_url = get_req_url(endpoint, api_ver=api_ver, object_id=object_id, params=params)
log_string = f"patch:\n{endpoint} {params}"
logging.info(log_string) if master_debug else logging.debug(log_string)
retries = 0
while True:
try:
resp = requests.patch(req_url, headers=AUTH_HEADER, timeout=timeout)
break
except requests.Timeout as e:
logging.error(
f"Patch Timeout: {req_url} / retries: {retries} / timeout: {timeout}"
)
if retries > 1:
raise ConnectionError(
f"DB: The internet was a bit too slow for me to grab the data I needed. Please "
f"hang on a few extra seconds and try again."
)
timeout += [min(3, timeout), min(5, timeout)][retries]
retries += 1
if resp.status_code == 200:
data = resp.json()
log_string = f"{data}"
if master_debug:
logging.info(
f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}'
)
else:
logging.debug(
f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}'
)
return data
else:
logging.warning(resp.text)
raise ValueError(f"DB: {resp.text}")
async def db_post(
endpoint: str, api_ver: int = 2, payload: dict = None, timeout: int = 3
):
req_url = get_req_url(endpoint, api_ver=api_ver)
log_string = f"post:\n{endpoint} payload: {payload}\ntype: {type(payload)}"
logging.info(log_string) if master_debug else logging.debug(log_string)
retries = 0
while True:
try:
resp = requests.post(
req_url, json=payload, headers=AUTH_HEADER, timeout=timeout
)
break
except requests.Timeout as e:
logging.error(
f"Post Timeout: {req_url} / retries: {retries} / timeout: {timeout}"
)
if retries > 1:
raise ConnectionError(
f"DB: The internet was a bit too slow for me to grab the data I needed. Please "
f"hang on a few extra seconds and try again."
)
timeout += [min(3, timeout), min(5, timeout)][retries]
retries += 1
if resp.status_code == 200:
data = resp.json()
log_string = f"{data}"
if master_debug:
logging.info(
f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}'
)
else:
logging.debug(
f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}'
)
return data
else:
logging.warning(resp.text)
raise ValueError(f"DB: {resp.text}")
async def db_delete(endpoint: str, object_id: int, api_ver: int = 2, timeout=3):
req_url = get_req_url(endpoint, api_ver=api_ver, object_id=object_id)
log_string = f"delete:\n{endpoint} {object_id}"
logging.info(log_string) if master_debug else logging.debug(log_string)
retries = 0
while True:
try:
resp = requests.delete(req_url, headers=AUTH_HEADER, timeout=timeout)
break
except requests.ReadTimeout as e:
logging.error(
f"Delete Timeout: {req_url} / retries: {retries} / timeout: {timeout}"
)
if retries > 1:
raise ConnectionError(
f"DB: The internet was a bit too slow for me to grab the data I needed. Please "
f"hang on a few extra seconds and try again."
)
timeout += [min(3, timeout), min(5, timeout)][retries]
retries += 1
if resp.status_code == 200:
data = resp.json()
log_string = f"{data}"
if master_debug:
logging.info(
f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}'
)
else:
logging.debug(
f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}'
)
return True
else:
logging.warning(resp.text)
raise ValueError(f"DB: {resp.text}")