paper-dynasty-database/app/routers_v2/cardsets.py
Cal Corum 40c512c665 Add PostgreSQL compatibility fixes for query ordering
- Add explicit ORDER BY id to all queries for consistent results across SQLite and PostgreSQL
- PostgreSQL does not guarantee row order without ORDER BY, unlike SQLite
- Skip table creation when DATABASE_TYPE=postgresql (production tables already exist)
- Fix datetime handling in notifications (PostgreSQL native datetime vs SQLite timestamp)
- Fix grouped query count() calls that don't work in PostgreSQL
- Update .gitignore to include storage/templates/ directory

This completes the PostgreSQL migration compatibility layer while maintaining
backwards compatibility with SQLite for local development.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-03 10:39:14 -06:00

278 lines
9.4 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, Response, Query
from typing import Optional
import logging
import pydantic
from pandas import DataFrame
from ..db_engine import db, Cardset, model_to_dict, fn, Event
from ..dependencies import oauth2_scheme, valid_token, LOG_DATA
logging.basicConfig(
filename=LOG_DATA['filename'],
format=LOG_DATA['format'],
level=LOG_DATA['log_level']
)
router = APIRouter(
prefix='/api/v2/cardsets',
tags=['cardsets']
)
class CardsetModel(pydantic.BaseModel):
name: str
description: str
event_id: Optional[int] = None
in_packs: Optional[bool] = True
total_cards: int = 0
for_purchase: Optional[bool] = True
ranked_legal: Optional[bool] = True
@router.get('')
async def get_cardsets(
name: Optional[str] = None, in_desc: Optional[str] = None, event_id: Optional[int] = None,
in_packs: Optional[bool] = None, ranked_legal: Optional[bool] = None, csv: Optional[bool] = None):
all_cardsets = Cardset.select().order_by(Cardset.id)
if all_cardsets.count() == 0:
db.close()
raise HTTPException(status_code=404, detail=f'There are no cardsets to filter')
if name is not None:
all_cardsets = all_cardsets.where(fn.Lower(Cardset.name) == name.lower())
if in_desc is not None:
all_cardsets = all_cardsets.where(fn.Lower(Cardset.description).contains(in_desc.lower()))
if event_id is not None:
try:
this_event = Event.get_by_id(event_id)
all_cardsets = all_cardsets.where(Cardset.event == this_event)
except Exception as e:
logging.error(f'Failed to find event {event_id}: {e}')
raise HTTPException(status_code=404, detail=f'Event id {event_id} not found')
if in_packs is not None:
all_cardsets = all_cardsets.where(Cardset.in_packs == in_packs)
if ranked_legal is not None:
all_cardsets = all_cardsets.where(Cardset.ranked_legal == ranked_legal)
if all_cardsets.count() == 0:
db.close()
raise HTTPException(status_code=404, detail=f'No cardsets found')
if csv:
data_list = [[
'id', 'name', 'description', 'event_id', 'in_packs', 'for_purchase', 'total_cards', 'ranked_legal'
]]
for line in all_cardsets:
data_list.append(
[
line.id, line.name, line.description, line.event.id if line.event else '', line.in_packs,
line.for_purchase, line.total_cards, line.ranked_legal
]
)
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = {'count': all_cardsets.count(), 'cardsets': []}
for x in all_cardsets:
return_val['cardsets'].append(model_to_dict(x))
db.close()
return return_val
@router.get('/search')
async def search_cardsets(
q: str = Query(..., description="Search query for cardset name"),
in_packs: Optional[bool] = None,
ranked_legal: Optional[bool] = None,
event_id: Optional[int] = None,
limit: int = Query(default=25, ge=1, le=100, description="Maximum number of results to return")):
"""
Real-time fuzzy search for cardsets by name.
Returns cardsets matching the query with exact matches prioritized over partial matches.
"""
# Start with all cardsets
all_cardsets = Cardset.select().order_by(Cardset.id)
# Apply name filter (partial match)
all_cardsets = all_cardsets.where(fn.Lower(Cardset.name).contains(q.lower()))
# Apply optional filters
if in_packs is not None:
all_cardsets = all_cardsets.where(Cardset.in_packs == in_packs)
if ranked_legal is not None:
all_cardsets = all_cardsets.where(Cardset.ranked_legal == ranked_legal)
if event_id is not None:
try:
this_event = Event.get_by_id(event_id)
all_cardsets = all_cardsets.where(Cardset.event == this_event)
except Exception as e:
logging.error(f'Failed to find event {event_id}: {e}')
db.close()
raise HTTPException(status_code=404, detail=f'Event id {event_id} not found')
# Convert to list for sorting
cardsets_list = list(all_cardsets)
# Sort by relevance (exact matches first, then name starts, then partial)
query_lower = q.lower()
exact_matches = []
name_start_matches = []
partial_matches = []
for cardset in cardsets_list:
name_lower = cardset.name.lower()
if name_lower == query_lower:
exact_matches.append(cardset)
else:
# Check if query matches the start of any word in name
name_parts = name_lower.split()
starts_with_match = any(part.startswith(query_lower) for part in name_parts)
if starts_with_match:
name_start_matches.append(cardset)
elif query_lower in name_lower:
partial_matches.append(cardset)
# Combine and limit results (exact, then name starts, then partial)
results = exact_matches + name_start_matches + partial_matches
total_matches = len(results)
limited_results = results[:limit]
# Build response
return_val = {
'count': len(limited_results),
'total_matches': total_matches,
'cardsets': [model_to_dict(x) for x in limited_results]
}
db.close()
return return_val
@router.get('/{cardset_id}')
async def get_one_cardset(cardset_id, csv: Optional[bool] = False):
try:
this_cardset = Cardset.get_by_id(cardset_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No cardset found with id {cardset_id}')
if csv:
data_list = [
['id', 'name', 'description'],
[this_cardset.id, this_cardset.name, this_cardset.description]
]
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = model_to_dict(this_cardset)
db.close()
return return_val
@router.post('')
async def post_cardsets(cardset: CardsetModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to post cardsets. This event has been logged.'
)
dupe_set = Cardset.get_or_none(Cardset.name == cardset.name)
if dupe_set:
db.close()
raise HTTPException(status_code=400, detail=f'There is already a cardset using {cardset.name}')
this_cardset = Cardset(**cardset.__dict__)
saved = this_cardset.save()
if saved == 1:
return_val = model_to_dict(this_cardset)
db.close()
return return_val
else:
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that cardset'
)
@router.patch('/{cardset_id}')
async def patch_cardsets(
cardset_id, name: Optional[str] = None, description: Optional[str] = None, in_packs: Optional[bool] = None,
for_purchase: Optional[bool] = None, total_cards: Optional[int] = None, ranked_legal: Optional[bool] = None,
token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to patch cardsets. This event has been logged.'
)
try:
this_cardset = Cardset.get_by_id(cardset_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No cardset found with id {cardset_id}')
if name is not None:
this_cardset.name = name
if description is not None:
this_cardset.description = description
if in_packs is not None:
this_cardset.in_packs = in_packs
if for_purchase is not None:
this_cardset.for_purchase = for_purchase
if total_cards is not None:
this_cardset.total_cards = total_cards
if ranked_legal is not None:
this_cardset.ranked_legal = ranked_legal
if this_cardset.save() == 1:
return_val = model_to_dict(this_cardset)
db.close()
return return_val
else:
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that rarity'
)
@router.delete('/{cardset_id}')
async def delete_cardsets(cardset_id, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to delete cardsets. This event has been logged.'
)
try:
this_cardset = Cardset.get_by_id(cardset_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No cardset found with id {cardset_id}')
count = this_cardset.delete_instance()
db.close()
if count == 1:
raise HTTPException(status_code=200, detail=f'Cardset {cardset_id} has been deleted')
else:
raise HTTPException(status_code=500, detail=f'Cardset {cardset_id} was not deleted')