import datetime from dataclasses import dataclass from typing import Optional import requests import logging import aiohttp import os AUTH_TOKEN = {'Authorization': f'Bearer {os.environ.get("API_TOKEN")}'} DB_URL = 'https://pd.manticorum.com/api' master_debug = True alt_database = True PLAYER_CACHE = {} if alt_database == 'dev': DB_URL = 'https://pddev.manticorum.com/api' @dataclass class Player: player_id: int p_name: str cost: int image: str mlbclub: str franchise: str cardset: dict set_num: int rarity: dict pos_1: str description: str quantity: Optional[int] = 999 image2: Optional[str] = None pos_2: Optional[str] = None pos_3: Optional[str] = None pos_4: Optional[str] = None pos_5: Optional[str] = None pos_6: Optional[str] = None pos_7: Optional[str] = None pos_8: Optional[str] = None headshot: Optional[str] = None vanity_card: Optional[str] = None strat_code: Optional[str] = None bbref_id: Optional[str] = None fangr_id: Optional[str] = None mlbplayer: Optional[dict] = None created: datetime.datetime = datetime.datetime.now() def to_dict(self): return { 'player_id': self.player_id, 'p_name': self.p_name, 'cost': self.cost, 'image': self.image, 'mlbclub': self.mlbclub, 'franchise': self.franchise, 'cardset': self.cardset, 'set_num': self.set_num, 'rarity': self.rarity, 'pos_1': self.pos_1, 'description': self.description, 'quantity': self.quantity, 'image2': self.image2, 'pos_2': self.pos_2, 'pos_3': self.pos_3, 'pos_4': self.pos_4, 'pos_5': self.pos_5, 'pos_6': self.pos_6, 'pos_7': self.pos_7, 'pos_8': self.pos_8, 'headshot': self.headshot, 'vanity_card': self.vanity_card, 'strat_code': self.strat_code, 'bbref_id': self.bbref_id, 'fangr_id': self.fangr_id, 'mlbplayer': self.mlbplayer } def param_char(other_params): if other_params: return '&' else: return '?' def get_req_url(endpoint: str, api_ver: int = 2, object_id: int = None, params: list = None): req_url = f'{DB_URL}/v{api_ver}/{endpoint}{"/" if object_id is not None else ""}{object_id if object_id is not None else ""}' if params: other_params = False for x in params: req_url += f'{param_char(other_params)}{x[0]}={x[1]}' other_params = True return req_url def log_return_value(log_string: str): if master_debug: logging.info(f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}\n') else: logging.debug(f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}\n') async def db_get(endpoint: str, api_ver: int = 2, object_id: int = None, params: list = None, none_okay: bool = True, timeout: int = 3): req_url = get_req_url(endpoint, api_ver=api_ver, object_id=object_id, params=params) log_string = f'db_get - get: {endpoint} id: {object_id} params: {params}' logging.info(log_string) if master_debug else logging.debug(log_string) async with aiohttp.ClientSession(headers=AUTH_TOKEN) as session: async with session.get(req_url) as r: if r.status == 200: js = await r.json() log_return_value(f'{js}') return js elif none_okay: e = await r.text() logging.error(e) return None else: e = await r.text() logging.error(e) raise ValueError(f'DB: {e}') async def db_patch(endpoint: str, object_id: int, params: list, api_ver: int = 2, timeout: int = 3): req_url = get_req_url(endpoint, api_ver=api_ver, object_id=object_id, params=params) log_string = f'db_patch - patch: {endpoint} {params}' logging.info(log_string) if master_debug else logging.debug(log_string) async with aiohttp.ClientSession(headers=AUTH_TOKEN) as session: async with session.patch(req_url) as r: if r.status == 200: js = await r.json() log_return_value(f'{js}') return js else: e = await r.text() logging.error(e) raise ValueError(f'DB: {e}') async def db_post(endpoint: str, api_ver: int = 2, payload: dict = None, timeout: int = 3): req_url = get_req_url(endpoint, api_ver=api_ver) log_string = f'db_post - post: {endpoint} payload: {payload}\ntype: {type(payload)}' logging.info(log_string) if master_debug else logging.debug(log_string) async with aiohttp.ClientSession(headers=AUTH_TOKEN) as session: async with session.post(req_url, json=payload) as r: if r.status == 200: js = await r.json() log_return_value(f'{js}') return js else: e = await r.text() logging.error(e) raise ValueError(f'DB: {e}') async def db_put(endpoint: str, api_ver: int = 2, payload: dict = None, timeout: int = 3): req_url = get_req_url(endpoint, api_ver=api_ver) log_string = f'post:\n{endpoint} payload: {payload}\ntype: {type(payload)}' logging.info(log_string) if master_debug else logging.debug(log_string) async with aiohttp.ClientSession(headers=AUTH_TOKEN) as session: async with session.put(req_url, json=payload) as r: if r.status == 200: js = await r.json() log_return_value(f'{js}') return js else: e = await r.text() logging.error(e) raise ValueError(f'DB: {e}') # retries = 0 # while True: # try: # resp = requests.put(req_url, json=payload, headers=AUTH_TOKEN, timeout=timeout) # break # except requests.Timeout as e: # logging.error(f'Post Timeout: {req_url} / retries: {retries} / timeout: {timeout}') # if retries > 1: # raise ConnectionError(f'DB: The internet was a bit too slow for me to grab the data I needed. Please ' # f'hang on a few extra seconds and try again.') # timeout += [min(3, timeout), min(5, timeout)][retries] # retries += 1 # # if resp.status_code == 200: # data = resp.json() # log_string = f'{data}' # if master_debug: # logging.info(f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}') # else: # logging.debug(f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}') # return data # else: # logging.warning(resp.text) # raise ValueError(f'DB: {resp.text}') async def db_delete(endpoint: str, object_id: int, api_ver: int = 2, timeout=3): req_url = get_req_url(endpoint, api_ver=api_ver, object_id=object_id) log_string = f'db_delete - delete: {endpoint} {object_id}' logging.info(log_string) if master_debug else logging.debug(log_string) async with aiohttp.ClientSession(headers=AUTH_TOKEN) as session: async with session.delete(req_url) as r: if r.status == 200: js = await r.json() log_return_value(f'{js}') return js else: e = await r.text() logging.error(e) raise ValueError(f'DB: {e}') async def get_team_by_abbrev(abbrev: str): all_teams = await db_get('teams', params=[('abbrev', abbrev)]) if not all_teams or not all_teams['count']: return None return all_teams['teams'][0] async def post_to_dex(player, team): return await db_post('paperdex', payload={'team_id': team['id'], 'player_id': player['id']}) def team_hash(team): hash_string = f'{team["sname"][-1]}{team["gmid"] / 6950123:.0f}{team["sname"][-2]}{team["gmid"] / 42069123:.0f}' return hash_string async def get_pd_player(player_id, as_dict: Optional[bool] = True): if player_id in PLAYER_CACHE: tdelta = datetime.datetime.now() - PLAYER_CACHE[player_id].created if tdelta.total_seconds() < 1209600: logging.debug(f'this_player: {PLAYER_CACHE[player_id]}') if as_dict: return PLAYER_CACHE[player_id].to_dict() else: return PLAYER_CACHE[player_id] else: logging.error(f'Refreshing player {player_id} in cache...') this_player = await db_get('players', object_id=player_id) for bad_key in ['mlbplayer', 'paperdex']: if bad_key in this_player: del this_player[bad_key] logging.debug(f'this_player: {this_player}') PLAYER_CACHE[player_id] = Player(**this_player) if as_dict: return this_player return PLAYER_CACHE[player_id]