import datetime import logging import os import requests from fastapi.security import OAuth2PasswordBearer date = f"{datetime.datetime.now().year}-{datetime.datetime.now().month}-{datetime.datetime.now().day}" LOG_DATA = { "filename": f"logs/database/{date}.log", "format": "%(asctime)s - database - %(levelname)s - %(message)s", "log_level": logging.INFO if os.environ.get("LOG_LEVEL") == "INFO" else "WARN", } logging.basicConfig( filename=LOG_DATA["filename"], format=LOG_DATA["format"], level=LOG_DATA["log_level"], ) oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") master_debug = False DB_URL = "https://pd.manticorum.com/api/" AUTH_TOKEN = f'{os.environ.get("API_TOKEN")}' AUTH_HEADER = {"Authorization": f"Bearer {AUTH_TOKEN}"} priv_help = ( False if not os.environ.get("PRIVATE_IN_SCHEMA") else os.environ.get("PRIVATE_IN_SCHEMA").upper() ) PRIVATE_IN_SCHEMA = True if priv_help else False if os.environ.get("TESTING") == "True": DB_URL = "https://pddev.manticorum.com/api/" def valid_token(token): return token == AUTH_TOKEN def int_timestamp(datetime_obj: datetime) -> int: return int(datetime.datetime.timestamp(datetime_obj) * 1000) def mround(x, prec=2, base=0.05): return round(base * round(float(x) / base), prec) def param_char(other_params): if other_params: return "&" else: return "?" def get_req_url( endpoint: str, api_ver: int = 2, object_id: int = None, params: list = None ): req_url = f'{DB_URL}/v{api_ver}/{endpoint}{"/" if object_id is not None else ""}{object_id if object_id is not None else ""}' if params: other_params = False for x in params: req_url += f"{param_char(other_params)}{x[0]}={x[1]}" other_params = True return req_url async def db_get( endpoint: str, api_ver: int = 2, object_id: int = None, params: list = None, none_okay: bool = True, timeout: int = 3, ): req_url = get_req_url(endpoint, api_ver=api_ver, object_id=object_id, params=params) log_string = f"get:\n{endpoint} id: {object_id} params: {params}" logging.info(log_string) if master_debug else logging.debug(log_string) retries = 0 while True: try: resp = requests.get(req_url, timeout=timeout) break except requests.ReadTimeout as e: logging.error( f"Get Timeout: {req_url} / retries: {retries} / timeout: {timeout}" ) if retries > 1: raise ConnectionError( f"DB: The internet was a bit too slow for me to grab the data I needed. Please " f"hang on a few extra seconds and try again." ) timeout += [2, 5][retries] retries += 1 if resp.status_code == 200: data = resp.json() log_string = f"{data}" if master_debug: logging.info( f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}' ) else: logging.debug( f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}' ) return data elif none_okay: data = resp.json() log_string = f"{data}" if master_debug: logging.info( f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}' ) else: logging.debug( f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}' ) return None else: logging.warning(resp.text) raise ValueError(f"DB: {resp.text}") async def db_patch( endpoint: str, object_id: int, params: list, api_ver: int = 2, timeout: int = 3 ): req_url = get_req_url(endpoint, api_ver=api_ver, object_id=object_id, params=params) log_string = f"patch:\n{endpoint} {params}" logging.info(log_string) if master_debug else logging.debug(log_string) retries = 0 while True: try: resp = requests.patch(req_url, headers=AUTH_HEADER, timeout=timeout) break except requests.Timeout as e: logging.error( f"Patch Timeout: {req_url} / retries: {retries} / timeout: {timeout}" ) if retries > 1: raise ConnectionError( f"DB: The internet was a bit too slow for me to grab the data I needed. Please " f"hang on a few extra seconds and try again." ) timeout += [min(3, timeout), min(5, timeout)][retries] retries += 1 if resp.status_code == 200: data = resp.json() log_string = f"{data}" if master_debug: logging.info( f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}' ) else: logging.debug( f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}' ) return data else: logging.warning(resp.text) raise ValueError(f"DB: {resp.text}") async def db_post( endpoint: str, api_ver: int = 2, payload: dict = None, timeout: int = 3 ): req_url = get_req_url(endpoint, api_ver=api_ver) log_string = f"post:\n{endpoint} payload: {payload}\ntype: {type(payload)}" logging.info(log_string) if master_debug else logging.debug(log_string) retries = 0 while True: try: resp = requests.post( req_url, json=payload, headers=AUTH_HEADER, timeout=timeout ) break except requests.Timeout as e: logging.error( f"Post Timeout: {req_url} / retries: {retries} / timeout: {timeout}" ) if retries > 1: raise ConnectionError( f"DB: The internet was a bit too slow for me to grab the data I needed. Please " f"hang on a few extra seconds and try again." ) timeout += [min(3, timeout), min(5, timeout)][retries] retries += 1 if resp.status_code == 200: data = resp.json() log_string = f"{data}" if master_debug: logging.info( f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}' ) else: logging.debug( f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}' ) return data else: logging.warning(resp.text) raise ValueError(f"DB: {resp.text}") async def db_delete(endpoint: str, object_id: int, api_ver: int = 2, timeout=3): req_url = get_req_url(endpoint, api_ver=api_ver, object_id=object_id) log_string = f"delete:\n{endpoint} {object_id}" logging.info(log_string) if master_debug else logging.debug(log_string) retries = 0 while True: try: resp = requests.delete(req_url, headers=AUTH_HEADER, timeout=timeout) break except requests.ReadTimeout as e: logging.error( f"Delete Timeout: {req_url} / retries: {retries} / timeout: {timeout}" ) if retries > 1: raise ConnectionError( f"DB: The internet was a bit too slow for me to grab the data I needed. Please " f"hang on a few extra seconds and try again." ) timeout += [min(3, timeout), min(5, timeout)][retries] retries += 1 if resp.status_code == 200: data = resp.json() log_string = f"{data}" if master_debug: logging.info( f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}' ) else: logging.debug( f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}' ) return True else: logging.warning(resp.text) raise ValueError(f"DB: {resp.text}")