paper-dynasty-database/app/routers_v2/players.py
Cal Corum a105d5412a fix: pass diamond tier colors to card template
The tier_style.html template references {{ filled_bg }} for diamond
quad backgrounds but it was never set in the rendering code, making
the tier indicator invisible.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 00:12:26 -05:00

1399 lines
48 KiB
Python

import datetime
import os.path
import pandas as pd
from fastapi import (
APIRouter,
BackgroundTasks,
Depends,
HTTPException,
Request,
Response,
Query,
)
from fastapi.responses import FileResponse
from fastapi.templating import Jinja2Templates
from typing import Optional, List, Literal
import logging
import pydantic
from pandas import DataFrame
import asyncio as _asyncio
from playwright.async_api import async_playwright, Browser, Playwright
from ..card_creation import get_batter_card_data, get_pitcher_card_data
from ..db_engine import (
db,
Player,
model_to_dict,
fn,
Paperdex,
Cardset,
Rarity,
BattingCard,
BattingCardRatings,
PitchingCard,
PitchingCardRatings,
CardPosition,
MlbPlayer,
DoesNotExist,
)
from ..db_helpers import upsert_players
from ..dependencies import oauth2_scheme, valid_token
from ..services.card_storage import backfill_variant_image_url
from ..services.refractor_boost import compute_variant_hash
from ..services.apng_generator import apng_cache_path, generate_animated_card
# ---------------------------------------------------------------------------
# Persistent browser instance (WP-02)
# ---------------------------------------------------------------------------
_browser: Browser | None = None
_playwright: Playwright | None = None
_browser_lock = _asyncio.Lock()
async def get_browser() -> Browser:
"""Get or create persistent Chromium browser instance.
Reuses a single browser across all card renders, eliminating the ~1-1.5s
per-request launch/teardown overhead. Automatically reconnects if the
browser process has died.
Uses an asyncio.Lock to prevent concurrent requests from racing to
launch multiple Chromium processes.
"""
global _browser, _playwright
async with _browser_lock:
if _browser is None or not _browser.is_connected():
if _playwright is not None:
try:
await _playwright.stop()
except Exception:
pass
_playwright = await async_playwright().start()
_browser = await _playwright.chromium.launch(
args=["--no-sandbox", "--disable-dev-shm-usage"]
)
return _browser
async def shutdown_browser():
"""Clean shutdown of the persistent browser.
Called by the FastAPI lifespan handler on application exit so the
Chromium process is not left orphaned.
"""
global _browser, _playwright
if _browser:
try:
await _browser.close()
except Exception:
pass
_browser = None
if _playwright:
try:
await _playwright.stop()
except Exception:
pass
_playwright = None
# ---------------------------------------------------------------------------
# Franchise normalization: Convert city+team names to city-agnostic team names
# This enables cross-era player matching (e.g., 'Oakland Athletics' -> 'Athletics')
FRANCHISE_NORMALIZE = {
"Arizona Diamondbacks": "Diamondbacks",
"Atlanta Braves": "Braves",
"Baltimore Orioles": "Orioles",
"Boston Red Sox": "Red Sox",
"Chicago Cubs": "Cubs",
"Chicago White Sox": "White Sox",
"Cincinnati Reds": "Reds",
"Cleveland Guardians": "Guardians",
"Colorado Rockies": "Rockies",
"Detroit Tigers": "Tigers",
"Houston Astros": "Astros",
"Kansas City Royals": "Royals",
"Los Angeles Angels": "Angels",
"Los Angeles Dodgers": "Dodgers",
"Miami Marlins": "Marlins",
"Milwaukee Brewers": "Brewers",
"Minnesota Twins": "Twins",
"New York Mets": "Mets",
"New York Yankees": "Yankees",
"Oakland Athletics": "Athletics",
"Philadelphia Phillies": "Phillies",
"Pittsburgh Pirates": "Pirates",
"San Diego Padres": "Padres",
"San Francisco Giants": "Giants",
"Seattle Mariners": "Mariners",
"St Louis Cardinals": "Cardinals",
"St. Louis Cardinals": "Cardinals",
"Tampa Bay Rays": "Rays",
"Texas Rangers": "Rangers",
"Toronto Blue Jays": "Blue Jays",
"Washington Nationals": "Nationals",
}
def normalize_franchise(franchise: str) -> str:
"""Convert city+team name to team-only (e.g., 'Oakland Athletics' -> 'Athletics')"""
titled = franchise.title()
return FRANCHISE_NORMALIZE.get(titled, titled)
TIER_DIAMOND_COLORS = {
1: "linear-gradient(135deg, #40b040 0%, #1a6b1a 50%, #145214 100%)",
2: "linear-gradient(135deg, #50a0e8 0%, #2070b0 50%, #185488 100%)",
3: "linear-gradient(135deg, #e85050 0%, #a82020 50%, #7e1818 100%)",
4: "linear-gradient(135deg, #a060d0 0%, #6b2d8e 50%, #50226a 100%)",
}
def resolve_refractor_tier(player_id: int, variant: int) -> int:
"""Determine the refractor tier (0-4) from a player's variant hash.
Pure math — no DB query needed. Returns 0 for base cards or unknown variants.
"""
if variant == 0:
return 0
for tier in range(1, 5):
if compute_variant_hash(player_id, tier) == variant:
return tier
return 0
router = APIRouter(prefix="/api/v2/players", tags=["players"])
templates = Jinja2Templates(directory="storage/templates")
class PlayerPydantic(pydantic.BaseModel):
player_id: int = None
p_name: str
cost: int
image: str
image2: Optional[str] = None
mlbclub: str
franchise: str
cardset_id: int
set_num: int
rarity_id: int
pos_1: str
pos_2: Optional[str] = None
pos_3: Optional[str] = None
pos_4: Optional[str] = None
pos_5: Optional[str] = None
pos_6: Optional[str] = None
pos_7: Optional[str] = None
pos_8: Optional[str] = None
headshot: Optional[str] = None
vanity_card: Optional[str] = None
strat_code: Optional[str] = None
bbref_id: Optional[str] = None
fangr_id: Optional[str] = None
description: str
quantity: Optional[int] = 999
mlbplayer_id: Optional[int] = None
class PlayerModel(pydantic.BaseModel):
players: List[PlayerPydantic]
@router.get("")
async def get_players(
name: Optional[str] = None,
value: Optional[int] = None,
min_cost: Optional[int] = None,
max_cost: Optional[int] = None,
has_image2: Optional[bool] = None,
mlbclub: Optional[str] = None,
franchise: Optional[str] = None,
cardset_id: list = Query(default=None),
rarity_id: list = Query(default=None),
pos_include: list = Query(default=None),
pos_exclude: list = Query(default=None),
has_headshot: Optional[bool] = None,
has_vanity_card: Optional[bool] = None,
strat_code: Optional[str] = None,
bbref_id: Optional[str] = None,
fangr_id: Optional[str] = None,
inc_dex: Optional[bool] = True,
in_desc: Optional[str] = None,
flat: Optional[bool] = False,
sort_by: Optional[str] = False,
cardset_id_exclude: list = Query(default=None),
limit: Optional[int] = None,
csv: Optional[bool] = None,
short_output: Optional[bool] = False,
mlbplayer_id: Optional[int] = None,
inc_keys: Optional[bool] = False,
):
all_players = Player.select()
if all_players.count() == 0:
raise HTTPException(status_code=404, detail="There are no players to filter")
if name is not None:
all_players = all_players.where(fn.Lower(Player.p_name) == name.lower())
if value is not None:
all_players = all_players.where(Player.cost == value)
if min_cost is not None:
all_players = all_players.where(Player.cost >= min_cost)
if max_cost is not None:
all_players = all_players.where(Player.cost <= max_cost)
if has_image2 is not None:
all_players = all_players.where(Player.image2.is_null(not has_image2))
if mlbclub is not None:
all_players = all_players.where(fn.Lower(Player.mlbclub) == mlbclub.lower())
if franchise is not None:
all_players = all_players.where(fn.Lower(Player.franchise) == franchise.lower())
if cardset_id is not None:
all_players = all_players.where(Player.cardset_id << cardset_id)
if cardset_id_exclude is not None:
all_players = all_players.where(Player.cardset_id.not_in(cardset_id_exclude))
if rarity_id is not None:
all_players = all_players.where(Player.rarity_id << rarity_id)
if pos_include is not None:
p_list = [x.upper() for x in pos_include]
all_players = all_players.where(
(Player.pos_1 << p_list)
| (Player.pos_2 << p_list)
| (Player.pos_3 << p_list)
| (Player.pos_4 << p_list)
| (Player.pos_5 << p_list)
| (Player.pos_6 << p_list)
| (Player.pos_7 << p_list)
| (Player.pos_8 << p_list)
)
if has_headshot is not None:
all_players = all_players.where(Player.headshot.is_null(not has_headshot))
if has_vanity_card is not None:
all_players = all_players.where(Player.vanity_card.is_null(not has_vanity_card))
if strat_code is not None:
all_players = all_players.where(Player.strat_code == strat_code)
if bbref_id is not None:
all_players = all_players.where(Player.bbref_id == bbref_id)
if fangr_id is not None:
all_players = all_players.where(Player.fangr_id == fangr_id)
if mlbplayer_id is not None:
all_players = all_players.where(Player.mlbplayer_id == mlbplayer_id)
if in_desc is not None:
all_players = all_players.where(
fn.Lower(Player.description).contains(in_desc.lower())
)
if sort_by is not None:
if sort_by == "cost-desc":
all_players = all_players.order_by(-Player.cost)
elif sort_by == "cost-asc":
all_players = all_players.order_by(Player.cost)
elif sort_by == "name-asc":
all_players = all_players.order_by(Player.p_name)
elif sort_by == "name-desc":
all_players = all_players.order_by(-Player.p_name)
elif sort_by == "rarity-desc":
all_players = all_players.order_by(Player.rarity)
elif sort_by == "rarity-asc":
all_players = all_players.order_by(-Player.rarity)
else:
all_players = all_players.order_by(Player.player_id)
final_players = []
# logging.info(f'pos_exclude: {type(pos_exclude)} - {pos_exclude} - is None: {pos_exclude is None}')
for x in all_players:
if pos_exclude is not None and set(
[x.upper() for x in pos_exclude]
).intersection(x.get_all_pos()):
pass
else:
final_players.append(x)
if limit is not None and len(final_players) >= limit:
break
# if len(final_players) == 0:
# db.close()
# raise HTTPException(status_code=404, detail=f'No players found')
if csv:
card_vals = [model_to_dict(x) for x in all_players]
for x in card_vals:
x["player_name"] = x["p_name"]
x["cardset_name"] = x["cardset"]["name"]
x["rarity"] = x["rarity"]["name"]
x["for_purchase"] = x["cardset"]["for_purchase"]
x["ranked_legal"] = x["cardset"]["ranked_legal"]
if x["player_name"] not in x["description"]:
x["description"] = f"{x['description']} {x['player_name']}"
card_df = pd.DataFrame(card_vals)
output = card_df[
[
"player_id",
"player_name",
"cost",
"image",
"image2",
"mlbclub",
"franchise",
"cardset_name",
"rarity",
"pos_1",
"pos_2",
"pos_3",
"pos_4",
"pos_5",
"pos_6",
"pos_7",
"pos_8",
"headshot",
"vanity_card",
"fangr_id",
"bbref_id",
"description",
"for_purchase",
"ranked_legal",
]
]
return Response(
content=pd.DataFrame(output).to_csv(index=False), media_type="text/csv"
)
# all_players.order_by(-Player.rarity.value, Player.p_name)
# data_list = [['id', 'name', 'value', 'image', 'image2', 'mlbclub', 'franchise', 'cardset', 'rarity', 'pos_1',
# 'pos_2', 'pos_3', 'pos_4', 'pos_5', 'pos_6', 'pos_7', 'pos_8', 'headshot', 'vanity_card',
# 'strat_code', 'bbref_id', 'description', 'for_purchase', 'ranked_legal']]
# for line in final_players:
# data_list.append(
# [
# line.player_id, line.p_name, line.cost, line.image, line.image2, line.mlbclub, line.franchise,
# line.cardset, line.rarity, line.pos_1, line.pos_2, line.pos_3, line.pos_4, line.pos_5, line.pos_6,
# line.pos_7, line.pos_8, line.headshot, line.vanity_card, line.strat_code, line.bbref_id,
# line.description, line.cardset.for_purchase, line.cardset.ranked_legal
# # line.description, line.cardset.in_packs, line.quantity
# ]
# )
# return_val = DataFrame(data_list).to_csv(header=False, index=False)
#
# db.close()
# return Response(content=return_val, media_type='text/csv')
else:
return_val = {"count": len(final_players), "players": []}
for x in final_players:
this_record = model_to_dict(x, recurse=not (flat or short_output))
if inc_dex:
this_dex = Paperdex.select().where(Paperdex.player == x)
this_record["paperdex"] = {"count": this_dex.count(), "paperdex": []}
for y in this_dex:
this_record["paperdex"]["paperdex"].append(
model_to_dict(y, recurse=False)
)
if inc_keys and (flat or short_output):
if this_record["mlbplayer"] is not None:
this_mlb = MlbPlayer.get_by_id(this_record["mlbplayer"])
this_record["key_mlbam"] = this_mlb.key_mlbam
this_record["key_fangraphs"] = this_mlb.key_fangraphs
this_record["key_bbref"] = this_mlb.key_bbref
this_record["key_retro"] = this_mlb.key_retro
this_record["offense_col"] = this_mlb.offense_col
return_val["players"].append(this_record)
# return_val['players'].append(model_to_dict(x, recurse=not flat))
return return_val
@router.get("/random")
async def get_random_player(
min_cost: Optional[int] = None,
max_cost: Optional[int] = None,
in_packs: Optional[bool] = None,
min_rarity: Optional[int] = None,
max_rarity: Optional[int] = None,
limit: Optional[int] = None,
pos_include: Optional[str] = None,
pos_exclude: Optional[str] = None,
franchise: Optional[str] = None,
mlbclub: Optional[str] = None,
cardset_id: list = Query(default=None),
pos_inc: list = Query(default=None),
pos_exc: list = Query(default=None),
csv: Optional[bool] = None,
):
all_players = (
Player.select().join(Cardset).switch(Player).join(Rarity).order_by(fn.Random())
)
if min_cost is not None:
all_players = all_players.where(Player.cost >= min_cost)
if max_cost is not None:
all_players = all_players.where(Player.cost <= max_cost)
if in_packs is not None:
if in_packs:
all_players = all_players.where(Player.cardset.in_packs)
if min_rarity is not None:
all_players = all_players.where(Player.rarity.value >= min_rarity)
if max_rarity is not None:
all_players = all_players.where(Player.rarity.value <= max_rarity)
if pos_include is not None:
all_players = all_players.where(
(fn.lower(Player.pos_1) == pos_include.lower())
| (fn.lower(Player.pos_2) == pos_include.lower())
| (fn.lower(Player.pos_3) == pos_include.lower())
| (fn.lower(Player.pos_4) == pos_include.lower())
| (fn.lower(Player.pos_5) == pos_include.lower())
| (fn.lower(Player.pos_6) == pos_include.lower())
| (fn.lower(Player.pos_7) == pos_include.lower())
| (fn.lower(Player.pos_8) == pos_include.lower())
)
if franchise is not None:
all_players = all_players.where(fn.Lower(Player.franchise) == franchise.lower())
if mlbclub is not None:
all_players = all_players.where(fn.Lower(Player.mlbclub) == mlbclub.lower())
if cardset_id is not None:
all_players = all_players.where(Player.cardset_id << cardset_id)
if pos_inc is not None:
p_list = [x.upper() for x in pos_inc]
all_players = all_players.where(
(Player.pos_1 << p_list)
| (Player.pos_2 << p_list)
| (Player.pos_3 << p_list)
| (Player.pos_4 << p_list)
| (Player.pos_5 << p_list)
| (Player.pos_6 << p_list)
| (Player.pos_7 << p_list)
| (Player.pos_8 << p_list)
)
# if pos_exc is not None:
# p_list = [x.upper() for x in pos_exc]
# logging.info(f'starting query: {all_players}\n\np_list: {p_list}\n\n')
# all_players = all_players.where(
# Player.pos_1.not_in(p_list) & Player.pos_2.not_in(p_list) & Player.pos_3.not_in(p_list) &
# Player.pos_4.not_in(p_list) & Player.pos_5.not_in(p_list) & Player.pos_6.not_in(p_list) &
# Player.pos_7.not_in(p_list) & Player.pos_8.not_in(p_list)
# )
# logging.info(f'post pos query: {all_players}')
if pos_exclude is not None and pos_exc is None:
final_players = [x for x in all_players if pos_exclude not in x.get_all_pos()]
elif pos_exc is not None and pos_exclude is None:
final_players = []
p_list = [x.upper() for x in pos_exc]
for x in all_players:
if limit is not None and len(final_players) >= limit:
break
if not set(p_list).intersection(x.get_all_pos()):
final_players.append(x)
else:
final_players = all_players
if limit is not None:
final_players = final_players[:limit]
# if len(final_players) == 0:
# db.close()
# raise HTTPException(status_code=404, detail=f'No players found')
if csv:
data_list = [
[
"id",
"name",
"cost",
"image",
"image2",
"mlbclub",
"franchise",
"cardset",
"rarity",
"pos_1",
"pos_2",
"pos_3",
"pos_4",
"pos_5",
"pos_6",
"pos_7",
"pos_8",
"headshot",
"vanity_card",
"strat_code",
"bbref_id",
"description",
]
]
for line in final_players:
data_list.append(
[
line.id,
line.p_name,
line.cost,
line.image,
line.image2,
line.mlbclub,
line.franchise,
line.cardset.name,
line.rarity.name,
line.pos_1,
line.pos_2,
line.pos_3,
line.pos_4,
line.pos_5,
line.pos_6,
line.pos_7,
line.pos_8,
line.headshot,
line.vanity_card,
line.strat_code,
line.bbref_id,
line.description,
]
)
return_val = DataFrame(data_list).to_csv(header=False, index=False)
return Response(content=return_val, media_type="text/csv")
else:
return_val = {"count": len(final_players), "players": []}
for x in final_players:
this_record = model_to_dict(x)
this_dex = Paperdex.select().where(Paperdex.player == x)
this_record["paperdex"] = {"count": this_dex.count(), "paperdex": []}
for y in this_dex:
this_record["paperdex"]["paperdex"].append(
model_to_dict(y, recurse=False)
)
return_val["players"].append(this_record)
# return_val['players'].append(model_to_dict(x))
return return_val
@router.get("/search")
async def search_players(
q: str = Query(..., description="Search query for player name"),
cardset_id: list = Query(default=None),
rarity_id: list = Query(default=None),
limit: int = Query(
default=25, ge=1, le=100, description="Maximum number of results to return"
),
unique_names: bool = Query(
default=False, description="Return only unique player names (highest player_id)"
),
short_output: bool = False,
):
"""
Real-time fuzzy search for players by name.
Returns players matching the query with exact matches prioritized over partial matches.
When unique_names=True, only returns one player per unique name (the one with highest player_id).
"""
# Start with all players
all_players = Player.select()
# Apply name filter (partial match)
all_players = all_players.where(fn.Lower(Player.p_name).contains(q.lower()))
# Apply optional filters
if cardset_id is not None:
all_players = all_players.where(Player.cardset_id << cardset_id)
if rarity_id is not None:
all_players = all_players.where(Player.rarity_id << rarity_id)
# Convert to list for sorting
players_list = list(all_players)
# Sort by relevance (exact matches first, then name starts, then partial)
query_lower = q.lower()
exact_matches = []
name_start_matches = []
partial_matches = []
for player in players_list:
name_lower = player.p_name.lower()
if name_lower == query_lower:
exact_matches.append(player)
else:
# Check if query matches the start of first or last name
name_parts = name_lower.split()
starts_with_match = any(part.startswith(query_lower) for part in name_parts)
if starts_with_match:
name_start_matches.append(player)
elif query_lower in name_lower:
partial_matches.append(player)
# Combine results (exact, then name starts, then partial)
results = exact_matches + name_start_matches + partial_matches
# Deduplicate by name if requested (keeping highest player_id)
if unique_names:
seen_names = {}
for player in results:
name_lower = player.p_name.lower()
if (
name_lower not in seen_names
or player.player_id > seen_names[name_lower].player_id
):
seen_names[name_lower] = player
results = list(seen_names.values())
total_matches = len(results)
limited_results = results[:limit]
# Build response
return_val = {
"count": len(limited_results),
"total_matches": total_matches,
"players": [],
}
for x in limited_results:
this_record = model_to_dict(x, recurse=not short_output)
# this_dex = Paperdex.select().where(Paperdex.player == x)
# this_record['paperdex'] = {'count': this_dex.count(), 'paperdex': []}
# for y in this_dex:
# this_record['paperdex']['paperdex'].append(model_to_dict(y, recurse=False))
return_val["players"].append(this_record)
return return_val
@router.get("/{player_id}")
async def get_one_player(player_id: int, csv: Optional[bool] = False):
try:
this_player = Player.get_by_id(player_id)
except DoesNotExist:
raise HTTPException(
status_code=404, detail=f"No player found with id {player_id}"
)
if csv:
data_list = [
[
"id",
"name",
"cost",
"image",
"image2",
"mlbclub",
"franchise",
"cardset",
"rarity",
"pos_1",
"pos_2",
"pos_3",
"pos_4",
"pos_5",
"pos_6",
"pos_7",
"pos_8",
"headshot",
"vanity_card",
"strat_code",
"bbref_id",
"description",
]
]
data_list.append(
[
this_player.id,
this_player.p_name,
this_player.cost,
this_player.image,
this_player.image2,
this_player.mlbclub,
this_player.franchise,
this_player.cardset.name,
this_player.rarity.name,
this_player.pos_1,
this_player.pos_2,
this_player.pos_3,
this_player.pos_4,
this_player.pos_5,
this_player.pos_6,
this_player.pos_7,
this_player.pos_8,
this_player.headshot,
this_player.vanity_card,
this_player.strat_code,
this_player.bbref_id,
this_player.description,
]
)
return_val = DataFrame(data_list).to_csv(header=False, index=False)
return Response(content=return_val, media_type="text/csv")
else:
return_val = model_to_dict(this_player)
this_dex = Paperdex.select().where(Paperdex.player == this_player)
return_val["paperdex"] = {"count": this_dex.count(), "paperdex": []}
for x in this_dex:
return_val["paperdex"]["paperdex"].append(model_to_dict(x, recurse=False))
return return_val
@router.get("/{player_id}/{card_type}card/{d}/{variant}/animated")
async def get_animated_card(
request: Request,
player_id: int,
card_type: Literal["batting", "pitching"],
variant: int,
d: str,
tier: Optional[int] = Query(
None, ge=0, le=4, description="Override refractor tier for preview (dev only)"
),
):
try:
this_player = Player.get_by_id(player_id)
except DoesNotExist:
raise HTTPException(
status_code=404, detail=f"No player found with id {player_id}"
)
refractor_tier = (
tier if tier is not None else resolve_refractor_tier(player_id, variant)
)
if refractor_tier < 3:
raise HTTPException(
status_code=404,
detail=f"No animation for tier {refractor_tier}; animated cards require T3 or T4",
)
cache_path = apng_cache_path(
this_player.cardset.id, card_type, player_id, d, variant
)
headers = {"Cache-Control": "public, max-age=86400"}
if os.path.isfile(cache_path) and tier is None:
return FileResponse(path=cache_path, media_type="image/png", headers=headers)
all_pos = (
CardPosition.select()
.where(CardPosition.player == this_player)
.order_by(CardPosition.innings.desc())
)
if card_type == "batting":
this_bc = BattingCard.get_or_none(
BattingCard.player == this_player, BattingCard.variant == variant
)
if this_bc is None:
raise HTTPException(
status_code=404,
detail=f"Batting card not found for id {player_id}, variant {variant}",
)
rating_vl = BattingCardRatings.get_or_none(
BattingCardRatings.battingcard == this_bc, BattingCardRatings.vs_hand == "L"
)
rating_vr = BattingCardRatings.get_or_none(
BattingCardRatings.battingcard == this_bc, BattingCardRatings.vs_hand == "R"
)
if None in [rating_vr, rating_vl]:
raise HTTPException(
status_code=404,
detail=f"Ratings not found for batting card {this_bc.id}",
)
card_data = get_batter_card_data(
this_player, this_bc, rating_vl, rating_vr, all_pos
)
if (
this_player.description in this_player.cardset.name
and this_player.cardset.id not in [23]
):
card_data["cardset_name"] = this_player.cardset.name
else:
card_data["cardset_name"] = this_player.description
card_data["refractor_tier"] = refractor_tier
card_data["filled_bg"] = TIER_DIAMOND_COLORS.get(refractor_tier, "")
card_data["request"] = request
html_response = templates.TemplateResponse("player_card.html", card_data)
else:
this_pc = PitchingCard.get_or_none(
PitchingCard.player == this_player, PitchingCard.variant == variant
)
if this_pc is None:
raise HTTPException(
status_code=404,
detail=f"Pitching card not found for id {player_id}, variant {variant}",
)
rating_vl = PitchingCardRatings.get_or_none(
PitchingCardRatings.pitchingcard == this_pc,
PitchingCardRatings.vs_hand == "L",
)
rating_vr = PitchingCardRatings.get_or_none(
PitchingCardRatings.pitchingcard == this_pc,
PitchingCardRatings.vs_hand == "R",
)
if None in [rating_vr, rating_vl]:
raise HTTPException(
status_code=404,
detail=f"Ratings not found for pitching card {this_pc.id}",
)
card_data = get_pitcher_card_data(
this_player, this_pc, rating_vl, rating_vr, all_pos
)
if (
this_player.description in this_player.cardset.name
and this_player.cardset.id not in [23]
):
card_data["cardset_name"] = this_player.cardset.name
else:
card_data["cardset_name"] = this_player.description
card_data["refractor_tier"] = refractor_tier
card_data["filled_bg"] = TIER_DIAMOND_COLORS.get(refractor_tier, "")
card_data["request"] = request
html_response = templates.TemplateResponse("player_card.html", card_data)
browser = await get_browser()
page = await browser.new_page(viewport={"width": 1280, "height": 720})
try:
await generate_animated_card(
page,
html_response.body.decode("UTF-8"),
cache_path,
refractor_tier,
)
finally:
await page.close()
return FileResponse(path=cache_path, media_type="image/png", headers=headers)
@router.get("/{player_id}/{card_type}card")
@router.get("/{player_id}/{card_type}card/{d}")
@router.get("/{player_id}/{card_type}card/{d}/{variant}")
async def get_batter_card(
request: Request,
background_tasks: BackgroundTasks,
player_id: int,
card_type: Literal["batting", "pitching"],
variant: int = 0,
d: str = None,
html: Optional[bool] = False,
tier: Optional[int] = Query(
None, ge=0, le=4, description="Override refractor tier for preview (dev only)"
),
):
try:
this_player = Player.get_by_id(player_id)
except DoesNotExist:
raise HTTPException(
status_code=404, detail=f"No player found with id {player_id}"
)
headers = {"Cache-Control": "public, max-age=86400"}
_filename = (
f"{this_player.description} {this_player.p_name} {card_type} {d}-v{variant}"
)
if (
os.path.isfile(
f"storage/cards/cardset-{this_player.cardset.id}/{card_type}/{player_id}-{d}-v{variant}.png"
)
and html is False
and tier is None
):
return FileResponse(
path=f"storage/cards/cardset-{this_player.cardset.id}/{card_type}/{player_id}-{d}-v{variant}.png",
media_type="image/png",
headers=headers,
)
all_pos = (
CardPosition.select()
.where(CardPosition.player == this_player)
.order_by(CardPosition.innings.desc())
)
if card_type == "batting":
this_bc = BattingCard.get_or_none(
BattingCard.player == this_player, BattingCard.variant == variant
)
if this_bc is None:
raise HTTPException(
status_code=404,
detail=f"Batting card not found for id {player_id}, variant {variant}",
)
rating_vl = BattingCardRatings.get_or_none(
BattingCardRatings.battingcard == this_bc, BattingCardRatings.vs_hand == "L"
)
rating_vr = BattingCardRatings.get_or_none(
BattingCardRatings.battingcard == this_bc, BattingCardRatings.vs_hand == "R"
)
if None in [rating_vr, rating_vl]:
raise HTTPException(
status_code=404,
detail=f"Ratings not found for batting card {this_bc.id}",
)
card_data = get_batter_card_data(
this_player, this_bc, rating_vl, rating_vr, all_pos
)
# Include Pokemon cardsets here to remove "Pokemon" from cardset name on card
if (
this_player.description in this_player.cardset.name
and this_player.cardset.id not in [23]
):
card_data["cardset_name"] = this_player.cardset.name
else:
card_data["cardset_name"] = this_player.description
card_data["refractor_tier"] = (
tier if tier is not None else resolve_refractor_tier(player_id, variant)
)
card_data["filled_bg"] = TIER_DIAMOND_COLORS.get(
card_data["refractor_tier"], ""
)
card_data["request"] = request
html_response = templates.TemplateResponse("player_card.html", card_data)
else:
this_pc = PitchingCard.get_or_none(
PitchingCard.player == this_player, PitchingCard.variant == variant
)
if this_pc is None:
raise HTTPException(
status_code=404,
detail=f"Pitching card not found for id {player_id}, variant {variant}",
)
rating_vl = PitchingCardRatings.get_or_none(
PitchingCardRatings.pitchingcard == this_pc,
PitchingCardRatings.vs_hand == "L",
)
rating_vr = PitchingCardRatings.get_or_none(
PitchingCardRatings.pitchingcard == this_pc,
PitchingCardRatings.vs_hand == "R",
)
if None in [rating_vr, rating_vl]:
raise HTTPException(
status_code=404,
detail=f"Ratings not found for pitching card {this_pc.id}",
)
card_data = get_pitcher_card_data(
this_player, this_pc, rating_vl, rating_vr, all_pos
)
if (
this_player.description in this_player.cardset.name
and this_player.cardset.id not in [23]
):
card_data["cardset_name"] = this_player.cardset.name
else:
card_data["cardset_name"] = this_player.description
card_data["refractor_tier"] = (
tier if tier is not None else resolve_refractor_tier(player_id, variant)
)
card_data["filled_bg"] = TIER_DIAMOND_COLORS.get(
card_data["refractor_tier"], ""
)
card_data["request"] = request
html_response = templates.TemplateResponse("player_card.html", card_data)
if html:
return html_response
updates = 0
if card_type == "batting":
updates += (
BattingCardRatings.update(card_data["new_ratings_vl"].dict())
.where((BattingCardRatings.id == rating_vl.id))
.execute()
)
updates += (
BattingCardRatings.update(card_data["new_ratings_vr"].dict())
.where((BattingCardRatings.id == rating_vr.id))
.execute()
)
else:
updates += (
PitchingCardRatings.update(card_data["new_ratings_vl"].dict())
.where((PitchingCardRatings.id == rating_vl.id))
.execute()
)
updates += (
PitchingCardRatings.update(card_data["new_ratings_vr"].dict())
.where((PitchingCardRatings.id == rating_vr.id))
.execute()
)
logging.debug(f"Rating updates: {updates}")
logging.debug(f"body:\n{html_response.body.decode('UTF-8')}")
file_path = f"storage/cards/cardset-{this_player.cardset.id}/{card_type}/{player_id}-{d}-v{variant}.png"
browser = await get_browser()
page = await browser.new_page(viewport={"width": 1280, "height": 720})
try:
await page.set_content(html_response.body.decode("UTF-8"))
await page.screenshot(
path=file_path,
type="png",
clip={"x": 0.0, "y": 0, "width": 1200, "height": 600},
)
finally:
await page.close()
# hti = Html2Image(
# browser='chrome',
# size=(1200, 600),
# output_path=f'storage/cards/cardset-{this_player.cardset.id}/{card_type}/',
# custom_flags=['--no-sandbox', '--disable-remote-debugging', '--headless', '--disable-gpu',
# '--disable-software-rasterizer', '--disable-dev-shm-usage']
# )
# x = hti.screenshot(
# html_str=str(html_response.body.decode("UTF-8")),
# save_as=f'{player_id}-{d}-v{variant}.png'
# )
# Schedule S3 upload for variant cards that don't have an image_url yet.
# Skip when tier is overridden (?tier= dev preview) — those renders don't
# correspond to real variant card rows.
if variant > 0 and tier is None:
CardModel = BattingCard if card_type == "batting" else PitchingCard
try:
card_row = CardModel.get(
(CardModel.player_id == player_id) & (CardModel.variant == variant)
)
if card_row.image_url is None:
background_tasks.add_task(
backfill_variant_image_url,
player_id=player_id,
variant=variant,
card_type=card_type,
cardset_id=this_player.cardset.id,
png_path=file_path,
)
except CardModel.DoesNotExist:
pass
return FileResponse(path=file_path, media_type="image/png", headers=headers)
# @router.get('/{player_id}/pitchingcard')
# async def get_pitcher_card(
# request: Request, player_id: int, variant: int = 0, d: str = None, html: Optional[bool] = False)
@router.patch("/{player_id}")
async def v1_players_patch(
player_id,
name: Optional[str] = None,
image: Optional[str] = None,
image2: Optional[str] = None,
mlbclub: Optional[str] = None,
franchise: Optional[str] = None,
cardset_id: Optional[int] = None,
rarity_id: Optional[int] = None,
pos_1: Optional[str] = None,
pos_2: Optional[str] = None,
pos_3: Optional[str] = None,
pos_4: Optional[str] = None,
pos_5: Optional[str] = None,
mlbplayer_id: Optional[int] = None,
pos_6: Optional[str] = None,
pos_7: Optional[str] = None,
pos_8: Optional[str] = None,
headshot: Optional[str] = None,
vanity_card: Optional[str] = None,
strat_code: Optional[str] = None,
bbref_id: Optional[str] = None,
description: Optional[str] = None,
cost: Optional[int] = None,
fangr_id: Optional[str] = None,
token: str = Depends(oauth2_scheme),
):
if not valid_token(token):
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(
status_code=401,
detail="You are not authorized to patch players. This event has been logged.",
)
try:
this_player = Player.get_by_id(player_id)
except DoesNotExist:
raise HTTPException(
status_code=404, detail=f"No player found with id {player_id}"
)
if cost is not None:
this_player.cost = cost
if name is not None:
this_player.p_name = name
if image is not None:
this_player.image = image
if image2 is not None:
if image2.lower() == "false":
this_player.image2 = None
else:
this_player.image2 = image2
if mlbclub is not None:
this_player.mlbclub = mlbclub
if franchise is not None:
this_player.franchise = franchise
if cardset_id is not None:
try:
this_cardset = Cardset.get_by_id(cardset_id)
except DoesNotExist:
raise HTTPException(
status_code=404, detail=f"No cardset found with id {cardset_id}"
)
this_player.cardset = this_cardset
if rarity_id is not None:
try:
this_rarity = Rarity.get_by_id(rarity_id)
except DoesNotExist:
raise HTTPException(
status_code=404, detail=f"No rarity found with id {rarity_id}"
)
this_player.rarity = this_rarity
if pos_1 is not None:
if pos_1 in ["None", "False", ""]:
this_player.pos_1 = None
else:
this_player.pos_1 = pos_1
if pos_2 is not None:
if pos_2 in ["None", "False", ""]:
this_player.pos_2 = None
else:
this_player.pos_2 = pos_2
if pos_3 is not None:
if pos_3 in ["None", "False", ""]:
this_player.pos_3 = None
else:
this_player.pos_3 = pos_3
if pos_4 is not None:
if pos_4 in ["None", "False", ""]:
this_player.pos_4 = None
else:
this_player.pos_4 = pos_4
if pos_5 is not None:
if pos_5 in ["None", "False", ""]:
this_player.pos_5 = None
else:
this_player.pos_5 = pos_5
if pos_6 is not None:
if pos_6 in ["None", "False", ""]:
this_player.pos_6 = None
else:
this_player.pos_6 = pos_6
if pos_7 is not None:
if pos_7 in ["None", "False", ""]:
this_player.pos_7 = None
else:
this_player.pos_7 = pos_7
if pos_8 is not None:
if pos_8 in ["None", "False", ""]:
this_player.pos_8 = None
else:
this_player.pos_8 = pos_8
if headshot is not None:
this_player.headshot = headshot
if vanity_card is not None:
this_player.vanity_card = vanity_card
if strat_code is not None:
this_player.strat_code = strat_code
if bbref_id is not None:
this_player.bbref_id = bbref_id
if fangr_id is not None:
this_player.fangr_id = fangr_id
if description is not None:
this_player.description = description
if mlbplayer_id is not None:
this_player.mlbplayer_id = mlbplayer_id
if this_player.save() == 1:
return_val = model_to_dict(this_player)
return return_val
else:
raise HTTPException(
status_code=418,
detail="Well slap my ass and call me a teapot; I could not save that rarity",
)
@router.put("")
async def put_players(players: PlayerModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(
status_code=401,
detail="You are not authorized to post players. This event has been logged.",
)
new_players = []
for x in players.players:
# this_player = Player(
# player_id=x.player_id,
# p_name=x.p_name,
# cost=x.cost,
# image=x.image,
# image2=x.image2,
# mlbclub=x.mlbclub,
# franchise=x.franchise,
# cardset_id=x.cardset_id,
# rarity_id=x.rarity_id,
# set_num=x.set_num,
# pos_1=x.pos_1,
# pos_2=x.pos_2,
# pos_3=x.pos_3,
# pos_4=x.pos_4,
# pos_5=x.pos_5,
# pos_6=x.pos_6,
# pos_7=x.pos_7,
# pos_8=x.pos_8,
# headshot=x.headshot,
# vanity_card=x.vanity_card,
# strat_code=x.strat_code,
# fangr_id=x.fangr_id,
# bbref_id=x.bbref_id,
# description=x.description
# )
# new_players.append(this_player)
new_players.append(
{
"player_id": x.player_id,
"p_name": x.p_name,
"cost": x.cost,
"image": x.image,
"image2": x.image2,
"mlbclub": x.mlbclub.title(),
"franchise": normalize_franchise(x.franchise),
"cardset_id": x.cardset_id,
"rarity_id": x.rarity_id,
"set_num": x.set_num,
"pos_1": x.pos_1,
"pos_2": x.pos_2,
"pos_3": x.pos_3,
"pos_4": x.pos_4,
"pos_5": x.pos_5,
"pos_6": x.pos_6,
"pos_7": x.pos_7,
"pos_8": x.pos_8,
"headshot": x.headshot,
"vanity_card": x.vanity_card,
"strat_code": x.strat_code,
"fangr_id": x.fangr_id,
"bbref_id": x.bbref_id,
"description": x.description,
}
)
logging.debug(f"new_players: {new_players}")
with db.atomic():
# Use PostgreSQL-compatible upsert helper (preserves SQLite compatibility)
upsert_players(new_players, batch_size=15)
# sheets.update_all_players(SHEETS_AUTH)
raise HTTPException(
status_code=200, detail=f"{len(new_players)} players have been added"
)
@router.post("")
async def post_players(new_player: PlayerPydantic, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(
status_code=401,
detail="You are not authorized to post players. This event has been logged.",
)
dupe_query = Player.select().where(
(Player.bbref_id == new_player.bbref_id)
& (Player.cardset_id == new_player.cardset_id)
)
if dupe_query.count() != 0:
raise HTTPException(
status_code=400,
detail=f"This appears to be a duplicate with player {dupe_query[0].player_id}",
)
p_query = Player.select(Player.player_id).order_by(-Player.player_id).limit(1)
new_id = p_query[0].player_id + 1
new_player.player_id = new_id
p_id = Player.insert(new_player.dict()).execute()
return_val = model_to_dict(Player.get_by_id(p_id))
return return_val
@router.post("/{player_id}/image-reset")
async def post_image_reset(
player_id: int, dev: bool = False, token: str = Depends(oauth2_scheme)
):
if not valid_token(token):
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(
status_code=401,
detail="You are not authorized to modify players. This event has been logged.",
)
this_player = Player.get_or_none(Player.player_id == player_id)
if this_player is None:
raise HTTPException(status_code=404, detail=f"Player ID {player_id} not found")
now = datetime.datetime.now()
today_url = (
f"https://pd{'dev' if dev else ''}.manticorum.com/api/v2/players/{player_id}/"
f"{'pitch' if 'pitch' in this_player.image else 'batt'}ingcard?d={now.year}-{now.month}-{now.day}"
)
logging.debug(f"image1 url: {today_url}")
this_player.image = today_url
if this_player.image2 is not None:
today_url = (
f"https://pd{'dev' if dev else ''}.manticorum.com/api/v2/players/{player_id}/"
f"{'pitch' if 'pitch' in this_player.image2 else 'batt'}ingcard?d={now.year}-{now.month}-{now.day}"
)
logging.debug(f"image2 url: {today_url}")
this_player.image2 = today_url
this_player.save()
r_player = model_to_dict(this_player)
return r_player
@router.delete("/{player_id}")
async def delete_player(player_id: int, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning("Bad Token: [REDACTED]")
raise HTTPException(
status_code=401,
detail="You are not authorized to delete players. This event has been logged.",
)
try:
this_player = Player.get_by_id(player_id)
except DoesNotExist:
raise HTTPException(
status_code=404, detail=f"No player found with id {player_id}"
)
count = this_player.delete_instance()
if count == 1:
raise HTTPException(
status_code=200, detail=f"Player {player_id} has been deleted"
)
else:
raise HTTPException(
status_code=500, detail=f"Player {player_id} was not deleted"
)