import os import aiohttp import pybaseball as pb from dotenv import load_dotenv from typing import Literal, Optional from exceptions import logger load_dotenv() _token = os.environ.get("PD_API_TOKEN") if not _token: raise EnvironmentError("PD_API_TOKEN environment variable is required") AUTH_TOKEN = {"Authorization": f"Bearer {_token}"} DB_URL = "https://pd.manticorum.com/api" master_debug = True alt_database = None if alt_database == "dev": DB_URL = "https://pddev.manticorum.com/api" elif alt_database == "sba": DB_URL = "https://sba.manticorum.com/api" def param_char(other_params): if other_params: return "&" else: return "?" def get_req_url( endpoint: str, api_ver: int = 2, object_id: int = None, params: list = None ): req_url = f"{DB_URL}/v{api_ver}/{endpoint}{'/' if object_id is not None else ''}{object_id if object_id is not None else ''}" if params: other_params = False for x in params: req_url += f"{param_char(other_params)}{x[0]}={x[1]}" other_params = True return req_url def log_return_value(log_string: str): if master_debug: logger.info( f"return: {log_string[:1200]}{' [ S N I P P E D ]' if len(log_string) > 1200 else ''}\n" ) else: logger.debug( f"return: {log_string[:1200]}{' [ S N I P P E D ]' if len(log_string) > 1200 else ''}\n" ) async def db_get( endpoint: str, api_ver: int = 2, object_id: int = None, params: list = None, none_okay: bool = True, timeout: int = 3, ) -> Optional[dict]: req_url = get_req_url(endpoint, api_ver=api_ver, object_id=object_id, params=params) log_string = f"get:\n{endpoint} id: {object_id} params: {params}" logger.info(log_string) if master_debug else logger.debug(log_string) async with aiohttp.ClientSession( headers=AUTH_TOKEN, timeout=aiohttp.ClientTimeout(total=timeout) ) as session: async with session.get(req_url) as r: logger.info(f"session info: {r}") if r.status == 200: js = await r.json() log_return_value(f"{js}") return js elif none_okay: e = await r.text() logger.error(e) return None else: e = await r.text() logger.error(e) raise ValueError(f"DB: {e}") async def url_get(url: str, timeout: int = 3) -> dict: log_string = f"get:\n{url}" logger.info(log_string) if master_debug else logger.debug(log_string) async with aiohttp.ClientSession( timeout=aiohttp.ClientTimeout(total=timeout) ) as session: async with session.get(url) as r: if r.status == 200: log_string = "200 received" log_return_value(log_string) return r else: e = await r.text() logger.error(e) raise ValueError(f"DB: {e}") async def db_patch( endpoint: str, object_id: int, params: list, api_ver: int = 2, timeout: int = 3 ) -> dict: req_url = get_req_url(endpoint, api_ver=api_ver, object_id=object_id, params=params) log_string = f"patch:\n{endpoint} {params}" logger.info(log_string) if master_debug else logger.debug(log_string) async with aiohttp.ClientSession( headers=AUTH_TOKEN, timeout=aiohttp.ClientTimeout(total=timeout) ) as session: async with session.patch(req_url) as r: if r.status == 200: js = await r.json() log_return_value(f"{js}") return js else: e = await r.text() logger.error(e) raise ValueError(f"DB: {e}") async def db_post( endpoint: str, api_ver: int = 2, payload: dict = None, timeout: int = 3 ) -> dict: req_url = get_req_url(endpoint, api_ver=api_ver) log_string = f"post:\n{endpoint} payload: {payload}\ntype: {type(payload)}" logger.info(log_string) if master_debug else logger.debug(log_string) async with aiohttp.ClientSession( headers=AUTH_TOKEN, timeout=aiohttp.ClientTimeout(total=timeout) ) as session: async with session.post(req_url, json=payload) as r: if r.status == 200: js = await r.json() log_return_value(f"{js}") return js else: e = await r.text() logger.error(e) raise ValueError(f"DB: {e}") async def db_put( endpoint: str, api_ver: int = 2, payload: dict = None, timeout: int = 3 ) -> dict: req_url = get_req_url(endpoint, api_ver=api_ver) log_string = f"put:\n{endpoint} payload: {payload}\ntype: {type(payload)}" logger.info(log_string) if master_debug else logger.debug(log_string) async with aiohttp.ClientSession( headers=AUTH_TOKEN, timeout=aiohttp.ClientTimeout(total=timeout) ) as session: async with session.put(req_url, json=payload) as r: if r.status == 200: js = await r.json() log_return_value(f"{js}") return js else: e = await r.text() logger.error(e) raise ValueError(f"DB: {e}") async def db_delete(endpoint: str, object_id: int, api_ver: int = 2, timeout=3) -> dict: req_url = get_req_url(endpoint, api_ver=api_ver, object_id=object_id) log_string = f"delete:\n{endpoint} {object_id}" logger.info(log_string) if master_debug else logger.debug(log_string) async with aiohttp.ClientSession( headers=AUTH_TOKEN, timeout=aiohttp.ClientTimeout(total=timeout) ) as session: async with session.delete(req_url) as r: if r.status == 200: js = await r.json() log_return_value(f"{js}") return js else: e = await r.text() logger.error(e) raise ValueError(f"DB: {e}") def get_player_data( player_id: str, id_type: Literal["bbref", "fangraphs"], return_type: Literal["dict", "Series"] = "dict", ): q = pb.playerid_reverse_lookup([player_id], key_type=id_type) if len(q.values) == 0: return None elif return_type == "Series": return q.loc[0] else: return q.loc[0].to_dict() def player_desc(this_player) -> str: if this_player["p_name"] in this_player["description"]: return this_player["description"] return f"{this_player['description']} {this_player['p_name']}"