- Add explicit ORDER BY id to all queries for consistent results across SQLite and PostgreSQL - PostgreSQL does not guarantee row order without ORDER BY, unlike SQLite - Skip table creation when DATABASE_TYPE=postgresql (production tables already exist) - Fix datetime handling in notifications (PostgreSQL native datetime vs SQLite timestamp) - Fix grouped query count() calls that don't work in PostgreSQL - Update .gitignore to include storage/templates/ directory This completes the PostgreSQL migration compatibility layer while maintaining backwards compatibility with SQLite for local development. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
278 lines
9.4 KiB
Python
278 lines
9.4 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, Response, Query
|
|
from typing import Optional
|
|
import logging
|
|
import pydantic
|
|
from pandas import DataFrame
|
|
|
|
from ..db_engine import db, Cardset, model_to_dict, fn, Event
|
|
from ..dependencies import oauth2_scheme, valid_token, LOG_DATA
|
|
|
|
logging.basicConfig(
|
|
filename=LOG_DATA['filename'],
|
|
format=LOG_DATA['format'],
|
|
level=LOG_DATA['log_level']
|
|
)
|
|
|
|
router = APIRouter(
|
|
prefix='/api/v2/cardsets',
|
|
tags=['cardsets']
|
|
)
|
|
|
|
|
|
class CardsetModel(pydantic.BaseModel):
|
|
name: str
|
|
description: str
|
|
event_id: Optional[int] = None
|
|
in_packs: Optional[bool] = True
|
|
total_cards: int = 0
|
|
for_purchase: Optional[bool] = True
|
|
ranked_legal: Optional[bool] = True
|
|
|
|
|
|
@router.get('')
|
|
async def get_cardsets(
|
|
name: Optional[str] = None, in_desc: Optional[str] = None, event_id: Optional[int] = None,
|
|
in_packs: Optional[bool] = None, ranked_legal: Optional[bool] = None, csv: Optional[bool] = None):
|
|
all_cardsets = Cardset.select().order_by(Cardset.id)
|
|
|
|
if all_cardsets.count() == 0:
|
|
db.close()
|
|
raise HTTPException(status_code=404, detail=f'There are no cardsets to filter')
|
|
|
|
if name is not None:
|
|
all_cardsets = all_cardsets.where(fn.Lower(Cardset.name) == name.lower())
|
|
if in_desc is not None:
|
|
all_cardsets = all_cardsets.where(fn.Lower(Cardset.description).contains(in_desc.lower()))
|
|
if event_id is not None:
|
|
try:
|
|
this_event = Event.get_by_id(event_id)
|
|
all_cardsets = all_cardsets.where(Cardset.event == this_event)
|
|
except Exception as e:
|
|
logging.error(f'Failed to find event {event_id}: {e}')
|
|
raise HTTPException(status_code=404, detail=f'Event id {event_id} not found')
|
|
if in_packs is not None:
|
|
all_cardsets = all_cardsets.where(Cardset.in_packs == in_packs)
|
|
if ranked_legal is not None:
|
|
all_cardsets = all_cardsets.where(Cardset.ranked_legal == ranked_legal)
|
|
|
|
if all_cardsets.count() == 0:
|
|
db.close()
|
|
raise HTTPException(status_code=404, detail=f'No cardsets found')
|
|
|
|
if csv:
|
|
data_list = [[
|
|
'id', 'name', 'description', 'event_id', 'in_packs', 'for_purchase', 'total_cards', 'ranked_legal'
|
|
]]
|
|
for line in all_cardsets:
|
|
data_list.append(
|
|
[
|
|
line.id, line.name, line.description, line.event.id if line.event else '', line.in_packs,
|
|
line.for_purchase, line.total_cards, line.ranked_legal
|
|
]
|
|
)
|
|
return_val = DataFrame(data_list).to_csv(header=False, index=False)
|
|
|
|
db.close()
|
|
return Response(content=return_val, media_type='text/csv')
|
|
|
|
else:
|
|
return_val = {'count': all_cardsets.count(), 'cardsets': []}
|
|
for x in all_cardsets:
|
|
return_val['cardsets'].append(model_to_dict(x))
|
|
|
|
db.close()
|
|
return return_val
|
|
|
|
|
|
@router.get('/search')
|
|
async def search_cardsets(
|
|
q: str = Query(..., description="Search query for cardset name"),
|
|
in_packs: Optional[bool] = None,
|
|
ranked_legal: Optional[bool] = None,
|
|
event_id: Optional[int] = None,
|
|
limit: int = Query(default=25, ge=1, le=100, description="Maximum number of results to return")):
|
|
"""
|
|
Real-time fuzzy search for cardsets by name.
|
|
|
|
Returns cardsets matching the query with exact matches prioritized over partial matches.
|
|
"""
|
|
# Start with all cardsets
|
|
all_cardsets = Cardset.select().order_by(Cardset.id)
|
|
|
|
# Apply name filter (partial match)
|
|
all_cardsets = all_cardsets.where(fn.Lower(Cardset.name).contains(q.lower()))
|
|
|
|
# Apply optional filters
|
|
if in_packs is not None:
|
|
all_cardsets = all_cardsets.where(Cardset.in_packs == in_packs)
|
|
|
|
if ranked_legal is not None:
|
|
all_cardsets = all_cardsets.where(Cardset.ranked_legal == ranked_legal)
|
|
|
|
if event_id is not None:
|
|
try:
|
|
this_event = Event.get_by_id(event_id)
|
|
all_cardsets = all_cardsets.where(Cardset.event == this_event)
|
|
except Exception as e:
|
|
logging.error(f'Failed to find event {event_id}: {e}')
|
|
db.close()
|
|
raise HTTPException(status_code=404, detail=f'Event id {event_id} not found')
|
|
|
|
# Convert to list for sorting
|
|
cardsets_list = list(all_cardsets)
|
|
|
|
# Sort by relevance (exact matches first, then name starts, then partial)
|
|
query_lower = q.lower()
|
|
exact_matches = []
|
|
name_start_matches = []
|
|
partial_matches = []
|
|
|
|
for cardset in cardsets_list:
|
|
name_lower = cardset.name.lower()
|
|
if name_lower == query_lower:
|
|
exact_matches.append(cardset)
|
|
else:
|
|
# Check if query matches the start of any word in name
|
|
name_parts = name_lower.split()
|
|
starts_with_match = any(part.startswith(query_lower) for part in name_parts)
|
|
|
|
if starts_with_match:
|
|
name_start_matches.append(cardset)
|
|
elif query_lower in name_lower:
|
|
partial_matches.append(cardset)
|
|
|
|
# Combine and limit results (exact, then name starts, then partial)
|
|
results = exact_matches + name_start_matches + partial_matches
|
|
total_matches = len(results)
|
|
limited_results = results[:limit]
|
|
|
|
# Build response
|
|
return_val = {
|
|
'count': len(limited_results),
|
|
'total_matches': total_matches,
|
|
'cardsets': [model_to_dict(x) for x in limited_results]
|
|
}
|
|
|
|
db.close()
|
|
return return_val
|
|
|
|
|
|
@router.get('/{cardset_id}')
|
|
async def get_one_cardset(cardset_id, csv: Optional[bool] = False):
|
|
try:
|
|
this_cardset = Cardset.get_by_id(cardset_id)
|
|
except Exception:
|
|
db.close()
|
|
raise HTTPException(status_code=404, detail=f'No cardset found with id {cardset_id}')
|
|
|
|
if csv:
|
|
data_list = [
|
|
['id', 'name', 'description'],
|
|
[this_cardset.id, this_cardset.name, this_cardset.description]
|
|
]
|
|
return_val = DataFrame(data_list).to_csv(header=False, index=False)
|
|
|
|
db.close()
|
|
return Response(content=return_val, media_type='text/csv')
|
|
else:
|
|
return_val = model_to_dict(this_cardset)
|
|
db.close()
|
|
return return_val
|
|
|
|
|
|
@router.post('')
|
|
async def post_cardsets(cardset: CardsetModel, token: str = Depends(oauth2_scheme)):
|
|
if not valid_token(token):
|
|
logging.warning(f'Bad Token: {token}')
|
|
db.close()
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail='You are not authorized to post cardsets. This event has been logged.'
|
|
)
|
|
|
|
dupe_set = Cardset.get_or_none(Cardset.name == cardset.name)
|
|
if dupe_set:
|
|
db.close()
|
|
raise HTTPException(status_code=400, detail=f'There is already a cardset using {cardset.name}')
|
|
|
|
this_cardset = Cardset(**cardset.__dict__)
|
|
|
|
saved = this_cardset.save()
|
|
if saved == 1:
|
|
return_val = model_to_dict(this_cardset)
|
|
db.close()
|
|
return return_val
|
|
else:
|
|
raise HTTPException(
|
|
status_code=418,
|
|
detail='Well slap my ass and call me a teapot; I could not save that cardset'
|
|
)
|
|
|
|
|
|
@router.patch('/{cardset_id}')
|
|
async def patch_cardsets(
|
|
cardset_id, name: Optional[str] = None, description: Optional[str] = None, in_packs: Optional[bool] = None,
|
|
for_purchase: Optional[bool] = None, total_cards: Optional[int] = None, ranked_legal: Optional[bool] = None,
|
|
token: str = Depends(oauth2_scheme)):
|
|
if not valid_token(token):
|
|
logging.warning(f'Bad Token: {token}')
|
|
db.close()
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail='You are not authorized to patch cardsets. This event has been logged.'
|
|
)
|
|
try:
|
|
this_cardset = Cardset.get_by_id(cardset_id)
|
|
except Exception:
|
|
db.close()
|
|
raise HTTPException(status_code=404, detail=f'No cardset found with id {cardset_id}')
|
|
|
|
if name is not None:
|
|
this_cardset.name = name
|
|
if description is not None:
|
|
this_cardset.description = description
|
|
if in_packs is not None:
|
|
this_cardset.in_packs = in_packs
|
|
if for_purchase is not None:
|
|
this_cardset.for_purchase = for_purchase
|
|
if total_cards is not None:
|
|
this_cardset.total_cards = total_cards
|
|
if ranked_legal is not None:
|
|
this_cardset.ranked_legal = ranked_legal
|
|
|
|
if this_cardset.save() == 1:
|
|
return_val = model_to_dict(this_cardset)
|
|
db.close()
|
|
return return_val
|
|
else:
|
|
raise HTTPException(
|
|
status_code=418,
|
|
detail='Well slap my ass and call me a teapot; I could not save that rarity'
|
|
)
|
|
|
|
|
|
@router.delete('/{cardset_id}')
|
|
async def delete_cardsets(cardset_id, token: str = Depends(oauth2_scheme)):
|
|
if not valid_token(token):
|
|
logging.warning(f'Bad Token: {token}')
|
|
db.close()
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail='You are not authorized to delete cardsets. This event has been logged.'
|
|
)
|
|
try:
|
|
this_cardset = Cardset.get_by_id(cardset_id)
|
|
except Exception:
|
|
db.close()
|
|
raise HTTPException(status_code=404, detail=f'No cardset found with id {cardset_id}')
|
|
|
|
count = this_cardset.delete_instance()
|
|
db.close()
|
|
|
|
if count == 1:
|
|
raise HTTPException(status_code=200, detail=f'Cardset {cardset_id} has been deleted')
|
|
else:
|
|
raise HTTPException(status_code=500, detail=f'Cardset {cardset_id} was not deleted')
|
|
|
|
|