239 lines
7.7 KiB
Python
239 lines
7.7 KiB
Python
import datetime
|
|
import hmac
|
|
import logging
|
|
import os
|
|
|
|
import requests
|
|
from fastapi.security import OAuth2PasswordBearer
|
|
|
|
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
|
|
master_debug = False
|
|
DB_URL = "https://pd.manticorum.com/api/"
|
|
AUTH_TOKEN = f'{os.environ.get("API_TOKEN")}'
|
|
AUTH_HEADER = {"Authorization": f"Bearer {AUTH_TOKEN}"}
|
|
|
|
priv_help = (
|
|
False
|
|
if not os.environ.get("PRIVATE_IN_SCHEMA")
|
|
else os.environ.get("PRIVATE_IN_SCHEMA").upper()
|
|
)
|
|
PRIVATE_IN_SCHEMA = True if priv_help else False
|
|
|
|
|
|
if os.environ.get("TESTING") == "True":
|
|
DB_URL = "https://pddev.manticorum.com/api/"
|
|
|
|
|
|
def valid_token(token):
|
|
return hmac.compare_digest(token, AUTH_TOKEN)
|
|
|
|
|
|
def int_timestamp(datetime_obj: datetime) -> int:
|
|
return int(datetime.datetime.timestamp(datetime_obj) * 1000)
|
|
|
|
|
|
def mround(x, prec=2, base=0.05):
|
|
return round(base * round(float(x) / base), prec)
|
|
|
|
|
|
def param_char(other_params):
|
|
if other_params:
|
|
return "&"
|
|
else:
|
|
return "?"
|
|
|
|
|
|
def get_req_url(
|
|
endpoint: str, api_ver: int = 2, object_id: int = None, params: list = None
|
|
):
|
|
req_url = f'{DB_URL}/v{api_ver}/{endpoint}{"/" if object_id is not None else ""}{object_id if object_id is not None else ""}'
|
|
|
|
if params:
|
|
other_params = False
|
|
for x in params:
|
|
req_url += f"{param_char(other_params)}{x[0]}={x[1]}"
|
|
other_params = True
|
|
|
|
return req_url
|
|
|
|
|
|
async def db_get(
|
|
endpoint: str,
|
|
api_ver: int = 2,
|
|
object_id: int = None,
|
|
params: list = None,
|
|
none_okay: bool = True,
|
|
timeout: int = 3,
|
|
):
|
|
req_url = get_req_url(endpoint, api_ver=api_ver, object_id=object_id, params=params)
|
|
log_string = f"get:\n{endpoint} id: {object_id} params: {params}"
|
|
logging.info(log_string) if master_debug else logging.debug(log_string)
|
|
|
|
retries = 0
|
|
while True:
|
|
try:
|
|
resp = requests.get(req_url, timeout=timeout)
|
|
break
|
|
except requests.ReadTimeout as e:
|
|
logging.error(
|
|
f"Get Timeout: {req_url} / retries: {retries} / timeout: {timeout}"
|
|
)
|
|
if retries > 1:
|
|
raise ConnectionError(
|
|
f"DB: The internet was a bit too slow for me to grab the data I needed. Please "
|
|
f"hang on a few extra seconds and try again."
|
|
)
|
|
timeout += [2, 5][retries]
|
|
retries += 1
|
|
|
|
if resp.status_code == 200:
|
|
data = resp.json()
|
|
log_string = f"{data}"
|
|
if master_debug:
|
|
logging.info(
|
|
f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}'
|
|
)
|
|
else:
|
|
logging.debug(
|
|
f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}'
|
|
)
|
|
return data
|
|
elif none_okay:
|
|
data = resp.json()
|
|
log_string = f"{data}"
|
|
if master_debug:
|
|
logging.info(
|
|
f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}'
|
|
)
|
|
else:
|
|
logging.debug(
|
|
f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}'
|
|
)
|
|
return None
|
|
else:
|
|
logging.warning(resp.text)
|
|
raise ValueError(f"DB: {resp.text}")
|
|
|
|
|
|
async def db_patch(
|
|
endpoint: str, object_id: int, params: list, api_ver: int = 2, timeout: int = 3
|
|
):
|
|
req_url = get_req_url(endpoint, api_ver=api_ver, object_id=object_id, params=params)
|
|
log_string = f"patch:\n{endpoint} {params}"
|
|
logging.info(log_string) if master_debug else logging.debug(log_string)
|
|
|
|
retries = 0
|
|
while True:
|
|
try:
|
|
resp = requests.patch(req_url, headers=AUTH_HEADER, timeout=timeout)
|
|
break
|
|
except requests.Timeout as e:
|
|
logging.error(
|
|
f"Patch Timeout: {req_url} / retries: {retries} / timeout: {timeout}"
|
|
)
|
|
if retries > 1:
|
|
raise ConnectionError(
|
|
f"DB: The internet was a bit too slow for me to grab the data I needed. Please "
|
|
f"hang on a few extra seconds and try again."
|
|
)
|
|
timeout += [min(3, timeout), min(5, timeout)][retries]
|
|
retries += 1
|
|
|
|
if resp.status_code == 200:
|
|
data = resp.json()
|
|
log_string = f"{data}"
|
|
if master_debug:
|
|
logging.info(
|
|
f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}'
|
|
)
|
|
else:
|
|
logging.debug(
|
|
f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}'
|
|
)
|
|
return data
|
|
else:
|
|
logging.warning(resp.text)
|
|
raise ValueError(f"DB: {resp.text}")
|
|
|
|
|
|
async def db_post(
|
|
endpoint: str, api_ver: int = 2, payload: dict = None, timeout: int = 3
|
|
):
|
|
req_url = get_req_url(endpoint, api_ver=api_ver)
|
|
log_string = f"post:\n{endpoint} payload: {payload}\ntype: {type(payload)}"
|
|
logging.info(log_string) if master_debug else logging.debug(log_string)
|
|
|
|
retries = 0
|
|
while True:
|
|
try:
|
|
resp = requests.post(
|
|
req_url, json=payload, headers=AUTH_HEADER, timeout=timeout
|
|
)
|
|
break
|
|
except requests.Timeout as e:
|
|
logging.error(
|
|
f"Post Timeout: {req_url} / retries: {retries} / timeout: {timeout}"
|
|
)
|
|
if retries > 1:
|
|
raise ConnectionError(
|
|
f"DB: The internet was a bit too slow for me to grab the data I needed. Please "
|
|
f"hang on a few extra seconds and try again."
|
|
)
|
|
timeout += [min(3, timeout), min(5, timeout)][retries]
|
|
retries += 1
|
|
|
|
if resp.status_code == 200:
|
|
data = resp.json()
|
|
log_string = f"{data}"
|
|
if master_debug:
|
|
logging.info(
|
|
f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}'
|
|
)
|
|
else:
|
|
logging.debug(
|
|
f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}'
|
|
)
|
|
return data
|
|
else:
|
|
logging.warning(resp.text)
|
|
raise ValueError(f"DB: {resp.text}")
|
|
|
|
|
|
async def db_delete(endpoint: str, object_id: int, api_ver: int = 2, timeout=3):
|
|
req_url = get_req_url(endpoint, api_ver=api_ver, object_id=object_id)
|
|
log_string = f"delete:\n{endpoint} {object_id}"
|
|
logging.info(log_string) if master_debug else logging.debug(log_string)
|
|
|
|
retries = 0
|
|
while True:
|
|
try:
|
|
resp = requests.delete(req_url, headers=AUTH_HEADER, timeout=timeout)
|
|
break
|
|
except requests.ReadTimeout as e:
|
|
logging.error(
|
|
f"Delete Timeout: {req_url} / retries: {retries} / timeout: {timeout}"
|
|
)
|
|
if retries > 1:
|
|
raise ConnectionError(
|
|
f"DB: The internet was a bit too slow for me to grab the data I needed. Please "
|
|
f"hang on a few extra seconds and try again."
|
|
)
|
|
timeout += [min(3, timeout), min(5, timeout)][retries]
|
|
retries += 1
|
|
|
|
if resp.status_code == 200:
|
|
data = resp.json()
|
|
log_string = f"{data}"
|
|
if master_debug:
|
|
logging.info(
|
|
f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}'
|
|
)
|
|
else:
|
|
logging.debug(
|
|
f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}'
|
|
)
|
|
return True
|
|
else:
|
|
logging.warning(resp.text)
|
|
raise ValueError(f"DB: {resp.text}")
|