paper-dynasty-card-creation/scripts/supabase_doodling.py
Cal Corum 0a17745389 Run black and ruff across entire codebase
Standardize formatting with black and apply ruff auto-fixes.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-08 14:24:33 -05:00

76 lines
2.6 KiB
Python

from typing import Literal
import requests
from exceptions import logger, log_exception
AUTH_TOKEN = {
"Authorization": "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6ImNucGhwbnV2aGp2cXprY2J3emRrIiwicm9sZSI6InNlcnZpY2Vfcm9sZSIsImlhdCI6MTc0NTgxMTc4NCwiZXhwIjoyMDYxMzg3Nzg0fQ.7dG_y2zU2PajBwTD8vut5GcWf3CSaZePkYW_hMf0fVg",
"apikey": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6ImNucGhwbnV2aGp2cXprY2J3emRrIiwicm9sZSI6InNlcnZpY2Vfcm9sZSIsImlhdCI6MTc0NTgxMTc4NCwiZXhwIjoyMDYxMzg3Nzg0fQ.7dG_y2zU2PajBwTD8vut5GcWf3CSaZePkYW_hMf0fVg",
}
DB_URL = "https://cnphpnuvhjvqzkcbwzdk.supabase.co/rest/v1"
def get_req_url(endpoint: str, params: list = None):
req_url = f"{DB_URL}/{endpoint}?"
if params:
other_params = False
for x in params:
req_url += f'{"&" if other_params else "?"}{x[0]}={x[1]}'
other_params = True
return req_url
def log_return_value(log_string: str, log_type: Literal["info", "debug"]):
if log_type == "info":
logger.info(
f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}\n'
)
else:
logger.debug(
f'return: {log_string[:1200]}{" [ S N I P P E D ]" if len(log_string) > 1200 else ""}\n'
)
def db_get(
endpoint: str,
params: dict = None,
limit: int = 1000,
offset: int = 0,
none_okay: bool = True,
timeout: int = 3,
):
req_url = f"{DB_URL}/{endpoint}?limit={limit}&offset={offset}"
logger.info(f"HTTP GET: {req_url}, params: {params}")
response = requests.request("GET", req_url, params=params, headers=AUTH_TOKEN)
logger.info(response)
if response.status_code != requests.codes.ok:
log_exception(Exception, response.text)
data = response.json()
if isinstance(data, list) and len(data) == 0:
if none_okay:
return None
else:
log_exception(Exception, "Query returned no results and none_okay = False")
return data
# async with aiohttp.ClientSession(headers=AUTH_TOKEN) as session:
# async with session.get(req_url) as r:
# logger.info(f'session info: {r}')
# if r.status == 200:
# js = await r.json()
# log_return_value(f'{js}')
# return js
# elif none_okay:
# e = await r.text()
# logger.error(e)
# return None
# else:
# e = await r.text()
# logger.error(e)
# raise ValueError(f'DB: {e}')