paper-dynasty-discord/db_calls.py
2023-11-19 14:30:17 -06:00

263 lines
9.3 KiB
Python

import datetime
from dataclasses import dataclass
from typing import Optional
import requests
import logging
import aiohttp
import os
AUTH_TOKEN = {'Authorization': f'Bearer {os.environ.get("API_TOKEN")}'}
DB_URL = 'https://pd.manticorum.com/api'
master_debug = True
alt_database = True
PLAYER_CACHE = {}
if alt_database == 'dev':
DB_URL = 'https://pddev.manticorum.com/api'
@dataclass
class Player:
player_id: int
p_name: str
cost: int
image: str
mlbclub: str
franchise: str
cardset: dict
set_num: int
rarity: dict
pos_1: str
description: str
quantity: Optional[int] = 999
image2: Optional[str] = None
pos_2: Optional[str] = None
pos_3: Optional[str] = None
pos_4: Optional[str] = None
pos_5: Optional[str] = None
pos_6: Optional[str] = None
pos_7: Optional[str] = None
pos_8: Optional[str] = None
headshot: Optional[str] = None
vanity_card: Optional[str] = None
strat_code: Optional[str] = None
bbref_id: Optional[str] = None
fangr_id: Optional[str] = None
mlbplayer: Optional[dict] = None
created: datetime.datetime = datetime.datetime.now()
def to_dict(self):
return {
'player_id': self.player_id,
'p_name': self.p_name,
'cost': self.cost,
'image': self.image,
'mlbclub': self.mlbclub,
'franchise': self.franchise,
'cardset': self.cardset,
'set_num': self.set_num,
'rarity': self.rarity,
'pos_1': self.pos_1,
'description': self.description,
'quantity': self.quantity,
'image2': self.image2,
'pos_2': self.pos_2,
'pos_3': self.pos_3,
'pos_4': self.pos_4,
'pos_5': self.pos_5,
'pos_6': self.pos_6,
'pos_7': self.pos_7,
'pos_8': self.pos_8,
'headshot': self.headshot,
'vanity_card': self.vanity_card,
'strat_code': self.strat_code,
'bbref_id': self.bbref_id,
'fangr_id': self.fangr_id,
'mlbplayer': self.mlbplayer
}
def param_char(other_params):
if other_params:
return '&'
else:
return '?'
def get_req_url(endpoint: str, api_ver: int = 2, object_id: int = None, params: list = None):
req_url = f'{DB_URL}/v{api_ver}/{endpoint}{"/" if object_id is not None else ""}{object_id if object_id is not None else ""}'
if params:
other_params = False
for x in params:
req_url += f'{param_char(other_params)}{x[0]}={x[1]}'
other_params = True
return req_url
def log_return_value(log_string: str):
if master_debug:
logging.info(f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}\n')
else:
logging.debug(f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}\n')
async def db_get(endpoint: str, api_ver: int = 2, object_id: int = None, params: list = None, none_okay: bool = True,
timeout: int = 3):
req_url = get_req_url(endpoint, api_ver=api_ver, object_id=object_id, params=params)
log_string = f'db_get - get: {endpoint} id: {object_id} params: {params}'
logging.info(log_string) if master_debug else logging.debug(log_string)
async with aiohttp.ClientSession(headers=AUTH_TOKEN) as session:
async with session.get(req_url) as r:
if r.status == 200:
js = await r.json()
log_return_value(f'{js}')
return js
elif none_okay:
e = await r.text()
logging.error(e)
return None
else:
e = await r.text()
logging.error(e)
raise ValueError(f'DB: {e}')
async def db_patch(endpoint: str, object_id: int, params: list, api_ver: int = 2, timeout: int = 3):
req_url = get_req_url(endpoint, api_ver=api_ver, object_id=object_id, params=params)
log_string = f'db_patch - patch: {endpoint} {params}'
logging.info(log_string) if master_debug else logging.debug(log_string)
async with aiohttp.ClientSession(headers=AUTH_TOKEN) as session:
async with session.patch(req_url) as r:
if r.status == 200:
js = await r.json()
log_return_value(f'{js}')
return js
else:
e = await r.text()
logging.error(e)
raise ValueError(f'DB: {e}')
async def db_post(endpoint: str, api_ver: int = 2, payload: dict = None, timeout: int = 3):
req_url = get_req_url(endpoint, api_ver=api_ver)
log_string = f'db_post - post: {endpoint} payload: {payload}\ntype: {type(payload)}'
logging.info(log_string) if master_debug else logging.debug(log_string)
async with aiohttp.ClientSession(headers=AUTH_TOKEN) as session:
async with session.post(req_url, json=payload) as r:
if r.status == 200:
js = await r.json()
log_return_value(f'{js}')
return js
else:
e = await r.text()
logging.error(e)
raise ValueError(f'DB: {e}')
async def db_put(endpoint: str, api_ver: int = 2, payload: dict = None, timeout: int = 3):
req_url = get_req_url(endpoint, api_ver=api_ver)
log_string = f'post:\n{endpoint} payload: {payload}\ntype: {type(payload)}'
logging.info(log_string) if master_debug else logging.debug(log_string)
async with aiohttp.ClientSession(headers=AUTH_TOKEN) as session:
async with session.put(req_url, json=payload) as r:
if r.status == 200:
js = await r.json()
log_return_value(f'{js}')
return js
else:
e = await r.text()
logging.error(e)
raise ValueError(f'DB: {e}')
# retries = 0
# while True:
# try:
# resp = requests.put(req_url, json=payload, headers=AUTH_TOKEN, timeout=timeout)
# break
# except requests.Timeout as e:
# logging.error(f'Post Timeout: {req_url} / retries: {retries} / timeout: {timeout}')
# if retries > 1:
# raise ConnectionError(f'DB: The internet was a bit too slow for me to grab the data I needed. Please '
# f'hang on a few extra seconds and try again.')
# timeout += [min(3, timeout), min(5, timeout)][retries]
# retries += 1
#
# if resp.status_code == 200:
# data = resp.json()
# log_string = f'{data}'
# if master_debug:
# logging.info(f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}')
# else:
# logging.debug(f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}')
# return data
# else:
# logging.warning(resp.text)
# raise ValueError(f'DB: {resp.text}')
async def db_delete(endpoint: str, object_id: int, api_ver: int = 2, timeout=3):
req_url = get_req_url(endpoint, api_ver=api_ver, object_id=object_id)
log_string = f'db_delete - delete: {endpoint} {object_id}'
logging.info(log_string) if master_debug else logging.debug(log_string)
async with aiohttp.ClientSession(headers=AUTH_TOKEN) as session:
async with session.delete(req_url) as r:
if r.status == 200:
js = await r.json()
log_return_value(f'{js}')
return js
else:
e = await r.text()
logging.error(e)
raise ValueError(f'DB: {e}')
async def get_team_by_abbrev(abbrev: str):
all_teams = await db_get('teams', params=[('abbrev', abbrev)])
if not all_teams or not all_teams['count']:
return None
return all_teams['teams'][0]
async def post_to_dex(player, team):
return await db_post('paperdex', payload={'team_id': team['id'], 'player_id': player['id']})
def team_hash(team):
hash_string = f'{team["sname"][-1]}{team["gmid"] / 6950123:.0f}{team["sname"][-2]}{team["gmid"] / 42069123:.0f}'
return hash_string
async def get_pd_player(player_id, as_dict: Optional[bool] = True):
if player_id in PLAYER_CACHE:
tdelta = datetime.datetime.now() - PLAYER_CACHE[player_id].created
if tdelta.total_seconds() < 1209600:
logging.debug(f'this_player: {PLAYER_CACHE[player_id]}')
if as_dict:
return PLAYER_CACHE[player_id].to_dict()
else:
return PLAYER_CACHE[player_id]
else:
logging.error(f'Refreshing player {player_id} in cache...')
this_player = await db_get('players', object_id=player_id)
for bad_key in ['mlbplayer', 'paperdex']:
if bad_key in this_player:
del this_player[bad_key]
logging.debug(f'this_player: {this_player}')
PLAYER_CACHE[player_id] = Player(**this_player)
if as_dict:
return this_player
return PLAYER_CACHE[player_id]