64 lines
2.5 KiB
Python
64 lines
2.5 KiB
Python
from typing import Literal
|
|
import requests
|
|
from exceptions import logger, log_exception
|
|
|
|
AUTH_TOKEN = {'Authorization': f'Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6ImNucGhwbnV2aGp2cXprY2J3emRrIiwicm9sZSI6InNlcnZpY2Vfcm9sZSIsImlhdCI6MTc0NTgxMTc4NCwiZXhwIjoyMDYxMzg3Nzg0fQ.7dG_y2zU2PajBwTD8vut5GcWf3CSaZePkYW_hMf0fVg', 'apikey': 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6ImNucGhwbnV2aGp2cXprY2J3emRrIiwicm9sZSI6InNlcnZpY2Vfcm9sZSIsImlhdCI6MTc0NTgxMTc4NCwiZXhwIjoyMDYxMzg3Nzg0fQ.7dG_y2zU2PajBwTD8vut5GcWf3CSaZePkYW_hMf0fVg'}
|
|
DB_URL = 'https://cnphpnuvhjvqzkcbwzdk.supabase.co/rest/v1'
|
|
|
|
|
|
def get_req_url(endpoint: str, params: list = None):
|
|
req_url = f'{DB_URL}/{endpoint}?'
|
|
|
|
if params:
|
|
other_params = False
|
|
for x in params:
|
|
req_url += f'{"&" if other_params else "?"}{x[0]}={x[1]}'
|
|
other_params = True
|
|
|
|
return req_url
|
|
|
|
|
|
def log_return_value(log_string: str, log_type: Literal['info', 'debug']):
|
|
if log_type == 'info':
|
|
logger.info(f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}\n')
|
|
else:
|
|
logger.debug(f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}\n')
|
|
|
|
|
|
def db_get(
|
|
endpoint: str, params: dict = None, limit: int = 1000, offset: int = 0, none_okay: bool = True, timeout: int = 3):
|
|
req_url = f'{DB_URL}/{endpoint}?limit={limit}&offset={offset}'
|
|
logger.info(f'HTTP GET: {req_url}, params: {params}')
|
|
|
|
response = requests.request('GET', req_url, params=params, headers=AUTH_TOKEN)
|
|
logger.info(response)
|
|
|
|
if response.status_code != requests.codes.ok:
|
|
log_exception(Exception, response.text)
|
|
|
|
data = response.json()
|
|
if isinstance(data, list) and len(data) == 0:
|
|
if none_okay:
|
|
return None
|
|
else:
|
|
log_exception(Exception, 'Query returned no results and none_okay = False')
|
|
|
|
return data
|
|
|
|
# async with aiohttp.ClientSession(headers=AUTH_TOKEN) as session:
|
|
# async with session.get(req_url) as r:
|
|
# logger.info(f'session info: {r}')
|
|
# if r.status == 200:
|
|
# js = await r.json()
|
|
# log_return_value(f'{js}')
|
|
# return js
|
|
# elif none_okay:
|
|
# e = await r.text()
|
|
# logger.error(e)
|
|
# return None
|
|
# else:
|
|
# e = await r.text()
|
|
# logger.error(e)
|
|
# raise ValueError(f'DB: {e}')
|
|
|