from fastapi import APIRouter, Depends, HTTPException, Response, Query from typing import Optional import logging import pydantic from pandas import DataFrame from ..db_engine import db, Cardset, model_to_dict, fn, Event from ..dependencies import oauth2_scheme, valid_token, LOG_DATA logging.basicConfig( filename=LOG_DATA['filename'], format=LOG_DATA['format'], level=LOG_DATA['log_level'] ) router = APIRouter( prefix='/api/v2/cardsets', tags=['cardsets'] ) class CardsetModel(pydantic.BaseModel): name: str description: str event_id: Optional[int] = None in_packs: Optional[bool] = True total_cards: int = 0 for_purchase: Optional[bool] = True ranked_legal: Optional[bool] = True @router.get('') async def get_cardsets( name: Optional[str] = None, in_desc: Optional[str] = None, event_id: Optional[int] = None, in_packs: Optional[bool] = None, ranked_legal: Optional[bool] = None, csv: Optional[bool] = None): all_cardsets = Cardset.select().order_by(Cardset.id) if all_cardsets.count() == 0: db.close() raise HTTPException(status_code=404, detail=f'There are no cardsets to filter') if name is not None: all_cardsets = all_cardsets.where(fn.Lower(Cardset.name) == name.lower()) if in_desc is not None: all_cardsets = all_cardsets.where(fn.Lower(Cardset.description).contains(in_desc.lower())) if event_id is not None: try: this_event = Event.get_by_id(event_id) all_cardsets = all_cardsets.where(Cardset.event == this_event) except Exception as e: logging.error(f'Failed to find event {event_id}: {e}') raise HTTPException(status_code=404, detail=f'Event id {event_id} not found') if in_packs is not None: all_cardsets = all_cardsets.where(Cardset.in_packs == in_packs) if ranked_legal is not None: all_cardsets = all_cardsets.where(Cardset.ranked_legal == ranked_legal) if all_cardsets.count() == 0: db.close() raise HTTPException(status_code=404, detail=f'No cardsets found') if csv: data_list = [[ 'id', 'name', 'description', 'event_id', 'in_packs', 'for_purchase', 'total_cards', 'ranked_legal' ]] for line in all_cardsets: data_list.append( [ line.id, line.name, line.description, line.event.id if line.event else '', line.in_packs, line.for_purchase, line.total_cards, line.ranked_legal ] ) return_val = DataFrame(data_list).to_csv(header=False, index=False) db.close() return Response(content=return_val, media_type='text/csv') else: return_val = {'count': all_cardsets.count(), 'cardsets': []} for x in all_cardsets: return_val['cardsets'].append(model_to_dict(x)) db.close() return return_val @router.get('/search') async def search_cardsets( q: str = Query(..., description="Search query for cardset name"), in_packs: Optional[bool] = None, ranked_legal: Optional[bool] = None, event_id: Optional[int] = None, limit: int = Query(default=25, ge=1, le=100, description="Maximum number of results to return")): """ Real-time fuzzy search for cardsets by name. Returns cardsets matching the query with exact matches prioritized over partial matches. """ # Start with all cardsets all_cardsets = Cardset.select().order_by(Cardset.id) # Apply name filter (partial match) all_cardsets = all_cardsets.where(fn.Lower(Cardset.name).contains(q.lower())) # Apply optional filters if in_packs is not None: all_cardsets = all_cardsets.where(Cardset.in_packs == in_packs) if ranked_legal is not None: all_cardsets = all_cardsets.where(Cardset.ranked_legal == ranked_legal) if event_id is not None: try: this_event = Event.get_by_id(event_id) all_cardsets = all_cardsets.where(Cardset.event == this_event) except Exception as e: logging.error(f'Failed to find event {event_id}: {e}') db.close() raise HTTPException(status_code=404, detail=f'Event id {event_id} not found') # Convert to list for sorting cardsets_list = list(all_cardsets) # Sort by relevance (exact matches first, then name starts, then partial) query_lower = q.lower() exact_matches = [] name_start_matches = [] partial_matches = [] for cardset in cardsets_list: name_lower = cardset.name.lower() if name_lower == query_lower: exact_matches.append(cardset) else: # Check if query matches the start of any word in name name_parts = name_lower.split() starts_with_match = any(part.startswith(query_lower) for part in name_parts) if starts_with_match: name_start_matches.append(cardset) elif query_lower in name_lower: partial_matches.append(cardset) # Combine and limit results (exact, then name starts, then partial) results = exact_matches + name_start_matches + partial_matches total_matches = len(results) limited_results = results[:limit] # Build response return_val = { 'count': len(limited_results), 'total_matches': total_matches, 'cardsets': [model_to_dict(x) for x in limited_results] } db.close() return return_val @router.get('/{cardset_id}') async def get_one_cardset(cardset_id, csv: Optional[bool] = False): try: this_cardset = Cardset.get_by_id(cardset_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No cardset found with id {cardset_id}') if csv: data_list = [ ['id', 'name', 'description'], [this_cardset.id, this_cardset.name, this_cardset.description] ] return_val = DataFrame(data_list).to_csv(header=False, index=False) db.close() return Response(content=return_val, media_type='text/csv') else: return_val = model_to_dict(this_cardset) db.close() return return_val @router.post('') async def post_cardsets(cardset: CardsetModel, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to post cardsets. This event has been logged.' ) dupe_set = Cardset.get_or_none(Cardset.name == cardset.name) if dupe_set: db.close() raise HTTPException(status_code=400, detail=f'There is already a cardset using {cardset.name}') this_cardset = Cardset(**cardset.__dict__) saved = this_cardset.save() if saved == 1: return_val = model_to_dict(this_cardset) db.close() return return_val else: raise HTTPException( status_code=418, detail='Well slap my ass and call me a teapot; I could not save that cardset' ) @router.patch('/{cardset_id}') async def patch_cardsets( cardset_id, name: Optional[str] = None, description: Optional[str] = None, in_packs: Optional[bool] = None, for_purchase: Optional[bool] = None, total_cards: Optional[int] = None, ranked_legal: Optional[bool] = None, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to patch cardsets. This event has been logged.' ) try: this_cardset = Cardset.get_by_id(cardset_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No cardset found with id {cardset_id}') if name is not None: this_cardset.name = name if description is not None: this_cardset.description = description if in_packs is not None: this_cardset.in_packs = in_packs if for_purchase is not None: this_cardset.for_purchase = for_purchase if total_cards is not None: this_cardset.total_cards = total_cards if ranked_legal is not None: this_cardset.ranked_legal = ranked_legal if this_cardset.save() == 1: return_val = model_to_dict(this_cardset) db.close() return return_val else: raise HTTPException( status_code=418, detail='Well slap my ass and call me a teapot; I could not save that rarity' ) @router.delete('/{cardset_id}') async def delete_cardsets(cardset_id, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f'Bad Token: {token}') db.close() raise HTTPException( status_code=401, detail='You are not authorized to delete cardsets. This event has been logged.' ) try: this_cardset = Cardset.get_by_id(cardset_id) except Exception: db.close() raise HTTPException(status_code=404, detail=f'No cardset found with id {cardset_id}') count = this_cardset.delete_instance() db.close() if count == 1: raise HTTPException(status_code=200, detail=f'Cardset {cardset_id} has been deleted') else: raise HTTPException(status_code=500, detail=f'Cardset {cardset_id} was not deleted')