New Position exception Pull scouting data with lineups More bunt types String validation on gameplay models AI Defensive alignment
189 lines
6.8 KiB
Python
189 lines
6.8 KiB
Python
import datetime
|
|
from dataclasses import dataclass
|
|
from typing import Optional
|
|
|
|
import logging
|
|
import aiohttp
|
|
import os
|
|
|
|
from exceptions import DatabaseError
|
|
|
|
AUTH_TOKEN = {'Authorization': f'Bearer {os.environ.get("API_TOKEN")}'}
|
|
try:
|
|
ENV_DATABASE = os.environ.get('DATABASE').lower()
|
|
except Exception as e:
|
|
ENV_DATABASE = 'dev'
|
|
DB_URL = 'https://pd.manticorum.com/api' if 'prod' in ENV_DATABASE else 'https://pddev.manticorum.com/api'
|
|
master_debug = True
|
|
PLAYER_CACHE = {}
|
|
logger = logging.getLogger('discord_app')
|
|
|
|
|
|
def param_char(other_params):
|
|
if other_params:
|
|
return '&'
|
|
else:
|
|
return '?'
|
|
|
|
|
|
def get_req_url(endpoint: str, api_ver: int = 2, object_id: int = None, params: list = None):
|
|
req_url = f'{DB_URL}/v{api_ver}/{endpoint}{"/" if object_id is not None else ""}{object_id if object_id is not None else ""}'
|
|
|
|
if params:
|
|
other_params = False
|
|
for x in params:
|
|
req_url += f'{param_char(other_params)}{x[0]}={x[1]}'
|
|
other_params = True
|
|
|
|
return req_url
|
|
|
|
|
|
def log_return_value(log_string: str):
|
|
start = 0
|
|
end = 3000
|
|
while end < 300000:
|
|
line = log_string[start:end]
|
|
if len(line) == 0:
|
|
return
|
|
logger.info(f'{"\n\nreturn: " if start == 0 else ""}{log_string[start:end]}')
|
|
start += 3000
|
|
end += 3000
|
|
logger.warning('[ S N I P P E D ]')
|
|
# if master_debug:
|
|
# logger.info(f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}\n')
|
|
# else:
|
|
# logger.debug(f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}\n')
|
|
|
|
|
|
async def db_get(endpoint: str, api_ver: int = 2, object_id: int = None, params: list = None, none_okay: bool = True, timeout: int = 3):
|
|
req_url = get_req_url(endpoint, api_ver=api_ver, object_id=object_id, params=params)
|
|
log_string = f'db_get - get: {endpoint} id: {object_id} params: {params}'
|
|
logger.info(log_string) if master_debug else logger.debug(log_string)
|
|
|
|
async with aiohttp.ClientSession(headers=AUTH_TOKEN) as session:
|
|
async with session.get(req_url) as r:
|
|
if r.status == 200:
|
|
js = await r.json()
|
|
log_return_value(f'{js}')
|
|
return js
|
|
elif none_okay:
|
|
e = await r.text()
|
|
logger.error(e)
|
|
return None
|
|
else:
|
|
e = await r.text()
|
|
logger.error(e)
|
|
raise DatabaseError(e)
|
|
|
|
|
|
async def db_patch(endpoint: str, object_id: int, params: list, api_ver: int = 2, timeout: int = 3):
|
|
req_url = get_req_url(endpoint, api_ver=api_ver, object_id=object_id, params=params)
|
|
log_string = f'db_patch - patch: {endpoint} {params}'
|
|
logger.info(log_string) if master_debug else logger.debug(log_string)
|
|
|
|
async with aiohttp.ClientSession(headers=AUTH_TOKEN) as session:
|
|
async with session.patch(req_url) as r:
|
|
if r.status == 200:
|
|
js = await r.json()
|
|
log_return_value(f'{js}')
|
|
return js
|
|
else:
|
|
e = await r.text()
|
|
logger.error(e)
|
|
raise DatabaseError(e)
|
|
|
|
|
|
async def db_post(endpoint: str, api_ver: int = 2, payload: dict = None, timeout: int = 3):
|
|
req_url = get_req_url(endpoint, api_ver=api_ver)
|
|
log_string = f'db_post - post: {endpoint} payload: {payload}\ntype: {type(payload)}'
|
|
logger.info(log_string) if master_debug else logger.debug(log_string)
|
|
|
|
async with aiohttp.ClientSession(headers=AUTH_TOKEN) as session:
|
|
async with session.post(req_url, json=payload) as r:
|
|
if r.status == 200:
|
|
js = await r.json()
|
|
log_return_value(f'{js}')
|
|
return js
|
|
else:
|
|
e = await r.text()
|
|
logger.error(e)
|
|
raise DatabaseError(e)
|
|
|
|
|
|
async def db_put(endpoint: str, api_ver: int = 2, payload: dict = None, timeout: int = 3):
|
|
req_url = get_req_url(endpoint, api_ver=api_ver)
|
|
log_string = f'post:\n{endpoint} payload: {payload}\ntype: {type(payload)}'
|
|
logger.info(log_string) if master_debug else logger.debug(log_string)
|
|
|
|
async with aiohttp.ClientSession(headers=AUTH_TOKEN) as session:
|
|
async with session.put(req_url, json=payload) as r:
|
|
if r.status == 200:
|
|
js = await r.json()
|
|
log_return_value(f'{js}')
|
|
return js
|
|
else:
|
|
e = await r.text()
|
|
logger.error(e)
|
|
raise DatabaseError(e)
|
|
|
|
# retries = 0
|
|
# while True:
|
|
# try:
|
|
# resp = requests.put(req_url, json=payload, headers=AUTH_TOKEN, timeout=timeout)
|
|
# break
|
|
# except requests.Timeout as e:
|
|
# logger.error(f'Post Timeout: {req_url} / retries: {retries} / timeout: {timeout}')
|
|
# if retries > 1:
|
|
# raise ConnectionError(f'DB: The internet was a bit too slow for me to grab the data I needed. Please '
|
|
# f'hang on a few extra seconds and try again.')
|
|
# timeout += [min(3, timeout), min(5, timeout)][retries]
|
|
# retries += 1
|
|
#
|
|
# if resp.status_code == 200:
|
|
# data = resp.json()
|
|
# log_string = f'{data}'
|
|
# if master_debug:
|
|
# logger.info(f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}')
|
|
# else:
|
|
# logger.debug(f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}')
|
|
# return data
|
|
# else:
|
|
# logger.warning(resp.text)
|
|
# raise ValueError(f'DB: {resp.text}')
|
|
|
|
|
|
async def db_delete(endpoint: str, object_id: int, api_ver: int = 2, timeout=3):
|
|
req_url = get_req_url(endpoint, api_ver=api_ver, object_id=object_id)
|
|
log_string = f'db_delete - delete: {endpoint} {object_id}'
|
|
logger.info(log_string) if master_debug else logger.debug(log_string)
|
|
|
|
async with aiohttp.ClientSession(headers=AUTH_TOKEN) as session:
|
|
async with session.delete(req_url) as r:
|
|
if r.status == 200:
|
|
js = await r.json()
|
|
log_return_value(f'{js}')
|
|
return js
|
|
else:
|
|
e = await r.text()
|
|
logger.error(e)
|
|
raise DatabaseError(e)
|
|
|
|
|
|
async def get_team_by_abbrev(abbrev: str):
|
|
all_teams = await db_get('teams', params=[('abbrev', abbrev)])
|
|
|
|
if not all_teams or not all_teams['count']:
|
|
return None
|
|
|
|
return all_teams['teams'][0]
|
|
|
|
|
|
async def post_to_dex(player, team):
|
|
return await db_post('paperdex', payload={'team_id': team['id'], 'player_id': player['id']})
|
|
|
|
|
|
def team_hash(team):
|
|
hash_string = f'{team["sname"][-1]}{team["gmid"] / 6950123:.0f}{team["sname"][-2]}{team["gmid"] / 42069123:.0f}'
|
|
return hash_string
|
|
|