fix: correct inverted TESTING env check and leading space in .env (#23) #40

Merged
cal merged 1 commits from ai/paper-dynasty-database#23 into next-release 2026-03-03 21:44:26 +00:00
2 changed files with 113 additions and 59 deletions

4
.env
View File

@ -59,9 +59,9 @@ API_TOKEN=Tp3aO3jhYve5NJF1IqOmJTmk
# PRIVATE_IN_SCHEMA=true
# Testing mode
# Set to 'False' to use development database URL (pddev.manticorum.com)
# Set to 'True' to use development database URL (pddev.manticorum.com)
# Leave unset or set to any other value for production
TESTING=TRUE
TESTING=True
# =============================================================================
# EXAMPLE CONFIGURATIONS

View File

@ -5,33 +5,37 @@ import os
import requests
from fastapi.security import OAuth2PasswordBearer
date = f'{datetime.datetime.now().year}-{datetime.datetime.now().month}-{datetime.datetime.now().day}'
date = f"{datetime.datetime.now().year}-{datetime.datetime.now().month}-{datetime.datetime.now().day}"
LOG_DATA = {
'filename': f'logs/database/{date}.log',
'format': '%(asctime)s - database - %(levelname)s - %(message)s',
'log_level': logging.INFO if os.environ.get('LOG_LEVEL') == 'INFO' else 'WARN'
"filename": f"logs/database/{date}.log",
"format": "%(asctime)s - database - %(levelname)s - %(message)s",
"log_level": logging.INFO if os.environ.get("LOG_LEVEL") == "INFO" else "WARN",
}
logging.basicConfig(
filename=LOG_DATA['filename'],
format=LOG_DATA['format'],
level=LOG_DATA['log_level']
filename=LOG_DATA["filename"],
format=LOG_DATA["format"],
level=LOG_DATA["log_level"],
)
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
master_debug = False
DB_URL = 'https://pd.manticorum.com/api/'
DB_URL = "https://pd.manticorum.com/api/"
AUTH_TOKEN = f'{os.environ.get("API_TOKEN")}'
AUTH_HEADER = {'Authorization': f'Bearer {AUTH_TOKEN}'}
AUTH_HEADER = {"Authorization": f"Bearer {AUTH_TOKEN}"}
priv_help = False if not os.environ.get('PRIVATE_IN_SCHEMA') else os.environ.get('PRIVATE_IN_SCHEMA').upper()
priv_help = (
False
if not os.environ.get("PRIVATE_IN_SCHEMA")
else os.environ.get("PRIVATE_IN_SCHEMA").upper()
)
PRIVATE_IN_SCHEMA = True if priv_help else False
if os.environ.get('TESTING') == 'False':
DB_URL = 'https://pddev.manticorum.com/api/'
if os.environ.get("TESTING") == "True":
DB_URL = "https://pddev.manticorum.com/api/"
def valid_token(token):
@ -42,33 +46,41 @@ def int_timestamp(datetime_obj: datetime) -> int:
return int(datetime.datetime.timestamp(datetime_obj) * 1000)
def mround(x, prec=2, base=.05):
def mround(x, prec=2, base=0.05):
return round(base * round(float(x) / base), prec)
def param_char(other_params):
if other_params:
return '&'
return "&"
else:
return '?'
return "?"
def get_req_url(endpoint: str, api_ver: int = 2, object_id: int = None, params: list = None):
def get_req_url(
endpoint: str, api_ver: int = 2, object_id: int = None, params: list = None
):
req_url = f'{DB_URL}/v{api_ver}/{endpoint}{"/" if object_id is not None else ""}{object_id if object_id is not None else ""}'
if params:
other_params = False
for x in params:
req_url += f'{param_char(other_params)}{x[0]}={x[1]}'
req_url += f"{param_char(other_params)}{x[0]}={x[1]}"
other_params = True
return req_url
async def db_get(endpoint: str, api_ver: int = 2, object_id: int = None, params: list = None, none_okay: bool = True,
timeout: int = 3):
async def db_get(
endpoint: str,
api_ver: int = 2,
object_id: int = None,
params: list = None,
none_okay: bool = True,
timeout: int = 3,
):
req_url = get_req_url(endpoint, api_ver=api_ver, object_id=object_id, params=params)
log_string = f'get:\n{endpoint} id: {object_id} params: {params}'
log_string = f"get:\n{endpoint} id: {object_id} params: {params}"
logging.info(log_string) if master_debug else logging.debug(log_string)
retries = 0
@ -77,37 +89,51 @@ async def db_get(endpoint: str, api_ver: int = 2, object_id: int = None, params:
resp = requests.get(req_url, timeout=timeout)
break
except requests.ReadTimeout as e:
logging.error(f'Get Timeout: {req_url} / retries: {retries} / timeout: {timeout}')
logging.error(
f"Get Timeout: {req_url} / retries: {retries} / timeout: {timeout}"
)
if retries > 1:
raise ConnectionError(f'DB: The internet was a bit too slow for me to grab the data I needed. Please '
f'hang on a few extra seconds and try again.')
raise ConnectionError(
f"DB: The internet was a bit too slow for me to grab the data I needed. Please "
f"hang on a few extra seconds and try again."
)
timeout += [2, 5][retries]
retries += 1
if resp.status_code == 200:
data = resp.json()
log_string = f'{data}'
log_string = f"{data}"
if master_debug:
logging.info(f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}')
logging.info(
f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}'
)
else:
logging.debug(f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}')
logging.debug(
f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}'
)
return data
elif none_okay:
data = resp.json()
log_string = f'{data}'
log_string = f"{data}"
if master_debug:
logging.info(f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}')
logging.info(
f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}'
)
else:
logging.debug(f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}')
logging.debug(
f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}'
)
return None
else:
logging.warning(resp.text)
raise ValueError(f'DB: {resp.text}')
raise ValueError(f"DB: {resp.text}")
async def db_patch(endpoint: str, object_id: int, params: list, api_ver: int = 2, timeout: int = 3):
async def db_patch(
endpoint: str, object_id: int, params: list, api_ver: int = 2, timeout: int = 3
):
req_url = get_req_url(endpoint, api_ver=api_ver, object_id=object_id, params=params)
log_string = f'patch:\n{endpoint} {params}'
log_string = f"patch:\n{endpoint} {params}"
logging.info(log_string) if master_debug else logging.debug(log_string)
retries = 0
@ -116,60 +142,80 @@ async def db_patch(endpoint: str, object_id: int, params: list, api_ver: int = 2
resp = requests.patch(req_url, headers=AUTH_HEADER, timeout=timeout)
break
except requests.Timeout as e:
logging.error(f'Patch Timeout: {req_url} / retries: {retries} / timeout: {timeout}')
logging.error(
f"Patch Timeout: {req_url} / retries: {retries} / timeout: {timeout}"
)
if retries > 1:
raise ConnectionError(f'DB: The internet was a bit too slow for me to grab the data I needed. Please '
f'hang on a few extra seconds and try again.')
raise ConnectionError(
f"DB: The internet was a bit too slow for me to grab the data I needed. Please "
f"hang on a few extra seconds and try again."
)
timeout += [min(3, timeout), min(5, timeout)][retries]
retries += 1
if resp.status_code == 200:
data = resp.json()
log_string = f'{data}'
log_string = f"{data}"
if master_debug:
logging.info(f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}')
logging.info(
f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}'
)
else:
logging.debug(f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}')
logging.debug(
f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}'
)
return data
else:
logging.warning(resp.text)
raise ValueError(f'DB: {resp.text}')
raise ValueError(f"DB: {resp.text}")
async def db_post(endpoint: str, api_ver: int = 2, payload: dict = None, timeout: int = 3):
async def db_post(
endpoint: str, api_ver: int = 2, payload: dict = None, timeout: int = 3
):
req_url = get_req_url(endpoint, api_ver=api_ver)
log_string = f'post:\n{endpoint} payload: {payload}\ntype: {type(payload)}'
log_string = f"post:\n{endpoint} payload: {payload}\ntype: {type(payload)}"
logging.info(log_string) if master_debug else logging.debug(log_string)
retries = 0
while True:
try:
resp = requests.post(req_url, json=payload, headers=AUTH_HEADER, timeout=timeout)
resp = requests.post(
req_url, json=payload, headers=AUTH_HEADER, timeout=timeout
)
break
except requests.Timeout as e:
logging.error(f'Post Timeout: {req_url} / retries: {retries} / timeout: {timeout}')
logging.error(
f"Post Timeout: {req_url} / retries: {retries} / timeout: {timeout}"
)
if retries > 1:
raise ConnectionError(f'DB: The internet was a bit too slow for me to grab the data I needed. Please '
f'hang on a few extra seconds and try again.')
raise ConnectionError(
f"DB: The internet was a bit too slow for me to grab the data I needed. Please "
f"hang on a few extra seconds and try again."
)
timeout += [min(3, timeout), min(5, timeout)][retries]
retries += 1
if resp.status_code == 200:
data = resp.json()
log_string = f'{data}'
log_string = f"{data}"
if master_debug:
logging.info(f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}')
logging.info(
f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}'
)
else:
logging.debug(f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}')
logging.debug(
f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}'
)
return data
else:
logging.warning(resp.text)
raise ValueError(f'DB: {resp.text}')
raise ValueError(f"DB: {resp.text}")
async def db_delete(endpoint: str, object_id: int, api_ver: int = 2, timeout=3):
req_url = get_req_url(endpoint, api_ver=api_ver, object_id=object_id)
log_string = f'delete:\n{endpoint} {object_id}'
log_string = f"delete:\n{endpoint} {object_id}"
logging.info(log_string) if master_debug else logging.debug(log_string)
retries = 0
@ -178,21 +224,29 @@ async def db_delete(endpoint: str, object_id: int, api_ver: int = 2, timeout=3):
resp = requests.delete(req_url, headers=AUTH_HEADER, timeout=timeout)
break
except requests.ReadTimeout as e:
logging.error(f'Delete Timeout: {req_url} / retries: {retries} / timeout: {timeout}')
logging.error(
f"Delete Timeout: {req_url} / retries: {retries} / timeout: {timeout}"
)
if retries > 1:
raise ConnectionError(f'DB: The internet was a bit too slow for me to grab the data I needed. Please '
f'hang on a few extra seconds and try again.')
raise ConnectionError(
f"DB: The internet was a bit too slow for me to grab the data I needed. Please "
f"hang on a few extra seconds and try again."
)
timeout += [min(3, timeout), min(5, timeout)][retries]
retries += 1
if resp.status_code == 200:
data = resp.json()
log_string = f'{data}'
log_string = f"{data}"
if master_debug:
logging.info(f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}')
logging.info(
f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}'
)
else:
logging.debug(f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}')
logging.debug(
f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}'
)
return True
else:
logging.warning(resp.text)
raise ValueError(f'DB: {resp.text}')
raise ValueError(f"DB: {resp.text}")