import datetime import logging import os import requests from fastapi.security import OAuth2PasswordBearer date = f'{datetime.datetime.now().year}-{datetime.datetime.now().month}-{datetime.datetime.now().day}' LOG_DATA = { 'filename': f'logs/database/{date}.log', 'format': '%(asctime)s - database - %(levelname)s - %(message)s', 'log_level': logging.INFO if os.environ.get('LOG_LEVEL') == 'INFO' else 'WARN' } logging.basicConfig( filename=LOG_DATA['filename'], format=LOG_DATA['format'], level=LOG_DATA['log_level'] ) oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") master_debug = False DB_URL = 'https://pd.manticorum.com/api/' AUTH_TOKEN = f'{os.environ.get("API_TOKEN")}' AUTH_HEADER = {'Authorization': f'Bearer {AUTH_TOKEN}'} if os.environ.get('TESTING') == 'False': DB_URL = 'https://pddev.manticorum.com/api/' def valid_token(token): return token == AUTH_TOKEN def int_timestamp(datetime_obj: datetime) -> int: return int(datetime.datetime.timestamp(datetime_obj) * 1000) def mround(x, prec=2, base=.05): return round(base * round(float(x) / base), prec) def param_char(other_params): if other_params: return '&' else: return '?' def get_req_url(endpoint: str, api_ver: int = 2, object_id: int = None, params: list = None): req_url = f'{DB_URL}/v{api_ver}/{endpoint}{"/" if object_id is not None else ""}{object_id if object_id is not None else ""}' if params: other_params = False for x in params: req_url += f'{param_char(other_params)}{x[0]}={x[1]}' other_params = True return req_url async def db_get(endpoint: str, api_ver: int = 2, object_id: int = None, params: list = None, none_okay: bool = True, timeout: int = 3): req_url = get_req_url(endpoint, api_ver=api_ver, object_id=object_id, params=params) log_string = f'get:\n{endpoint} id: {object_id} params: {params}' logging.info(log_string) if master_debug else logging.debug(log_string) retries = 0 while True: try: resp = requests.get(req_url, timeout=timeout) break except requests.ReadTimeout as e: logging.error(f'Get Timeout: {req_url} / retries: {retries} / timeout: {timeout}') if retries > 1: raise ConnectionError(f'DB: The internet was a bit too slow for me to grab the data I needed. Please ' f'hang on a few extra seconds and try again.') timeout += [2, 5][retries] retries += 1 if resp.status_code == 200: data = resp.json() log_string = f'{data}' if master_debug: logging.info(f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}') else: logging.debug(f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}') return data elif none_okay: data = resp.json() log_string = f'{data}' if master_debug: logging.info(f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}') else: logging.debug(f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}') return None else: logging.warning(resp.text) raise ValueError(f'DB: {resp.text}') async def db_patch(endpoint: str, object_id: int, params: list, api_ver: int = 2, timeout: int = 3): req_url = get_req_url(endpoint, api_ver=api_ver, object_id=object_id, params=params) log_string = f'patch:\n{endpoint} {params}' logging.info(log_string) if master_debug else logging.debug(log_string) retries = 0 while True: try: resp = requests.patch(req_url, headers=AUTH_HEADER, timeout=timeout) break except requests.Timeout as e: logging.error(f'Patch Timeout: {req_url} / retries: {retries} / timeout: {timeout}') if retries > 1: raise ConnectionError(f'DB: The internet was a bit too slow for me to grab the data I needed. Please ' f'hang on a few extra seconds and try again.') timeout += [min(3, timeout), min(5, timeout)][retries] retries += 1 if resp.status_code == 200: data = resp.json() log_string = f'{data}' if master_debug: logging.info(f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}') else: logging.debug(f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}') return data else: logging.warning(resp.text) raise ValueError(f'DB: {resp.text}') async def db_post(endpoint: str, api_ver: int = 2, payload: dict = None, timeout: int = 3): req_url = get_req_url(endpoint, api_ver=api_ver) log_string = f'post:\n{endpoint} payload: {payload}\ntype: {type(payload)}' logging.info(log_string) if master_debug else logging.debug(log_string) retries = 0 while True: try: resp = requests.post(req_url, json=payload, headers=AUTH_HEADER, timeout=timeout) break except requests.Timeout as e: logging.error(f'Post Timeout: {req_url} / retries: {retries} / timeout: {timeout}') if retries > 1: raise ConnectionError(f'DB: The internet was a bit too slow for me to grab the data I needed. Please ' f'hang on a few extra seconds and try again.') timeout += [min(3, timeout), min(5, timeout)][retries] retries += 1 if resp.status_code == 200: data = resp.json() log_string = f'{data}' if master_debug: logging.info(f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}') else: logging.debug(f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}') return data else: logging.warning(resp.text) raise ValueError(f'DB: {resp.text}') async def db_delete(endpoint: str, object_id: int, api_ver: int = 2, timeout=3): req_url = get_req_url(endpoint, api_ver=api_ver, object_id=object_id) log_string = f'delete:\n{endpoint} {object_id}' logging.info(log_string) if master_debug else logging.debug(log_string) retries = 0 while True: try: resp = requests.delete(req_url, headers=AUTH_HEADER, timeout=timeout) break except requests.ReadTimeout as e: logging.error(f'Delete Timeout: {req_url} / retries: {retries} / timeout: {timeout}') if retries > 1: raise ConnectionError(f'DB: The internet was a bit too slow for me to grab the data I needed. Please ' f'hang on a few extra seconds and try again.') timeout += [min(3, timeout), min(5, timeout)][retries] retries += 1 if resp.status_code == 200: data = resp.json() log_string = f'{data}' if master_debug: logging.info(f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}') else: logging.debug(f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}') return True else: logging.warning(resp.text) raise ValueError(f'DB: {resp.text}')