Replace 22 instances of semantically incorrect raise HTTPException(status_code=200, detail=...)
with plain return {"message": ...} dicts across 16 router files.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
300 lines
9.5 KiB
Python
300 lines
9.5 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, Response, Query
|
|
from typing import Optional
|
|
import logging
|
|
import pydantic
|
|
from pandas import DataFrame
|
|
|
|
from ..db_engine import Cardset, model_to_dict, fn, Event
|
|
from ..dependencies import oauth2_scheme, valid_token, LOG_DATA
|
|
|
|
logging.basicConfig(
|
|
filename=LOG_DATA["filename"],
|
|
format=LOG_DATA["format"],
|
|
level=LOG_DATA["log_level"],
|
|
)
|
|
|
|
router = APIRouter(prefix="/api/v2/cardsets", tags=["cardsets"])
|
|
|
|
|
|
class CardsetModel(pydantic.BaseModel):
|
|
name: str
|
|
description: str
|
|
event_id: Optional[int] = None
|
|
in_packs: Optional[bool] = True
|
|
total_cards: int = 0
|
|
for_purchase: Optional[bool] = True
|
|
ranked_legal: Optional[bool] = True
|
|
|
|
|
|
@router.get("")
|
|
async def get_cardsets(
|
|
name: Optional[str] = None,
|
|
in_desc: Optional[str] = None,
|
|
event_id: Optional[int] = None,
|
|
in_packs: Optional[bool] = None,
|
|
ranked_legal: Optional[bool] = None,
|
|
csv: Optional[bool] = None,
|
|
):
|
|
all_cardsets = Cardset.select().order_by(Cardset.id)
|
|
|
|
if all_cardsets.count() == 0:
|
|
raise HTTPException(status_code=404, detail=f"There are no cardsets to filter")
|
|
|
|
if name is not None:
|
|
all_cardsets = all_cardsets.where(fn.Lower(Cardset.name) == name.lower())
|
|
if in_desc is not None:
|
|
all_cardsets = all_cardsets.where(
|
|
fn.Lower(Cardset.description).contains(in_desc.lower())
|
|
)
|
|
if event_id is not None:
|
|
try:
|
|
this_event = Event.get_by_id(event_id)
|
|
all_cardsets = all_cardsets.where(Cardset.event == this_event)
|
|
except Exception as e:
|
|
logging.error(f"Failed to find event {event_id}: {e}")
|
|
raise HTTPException(
|
|
status_code=404, detail=f"Event id {event_id} not found"
|
|
)
|
|
if in_packs is not None:
|
|
all_cardsets = all_cardsets.where(Cardset.in_packs == in_packs)
|
|
if ranked_legal is not None:
|
|
all_cardsets = all_cardsets.where(Cardset.ranked_legal == ranked_legal)
|
|
|
|
if all_cardsets.count() == 0:
|
|
raise HTTPException(status_code=404, detail=f"No cardsets found")
|
|
|
|
if csv:
|
|
data_list = [
|
|
[
|
|
"id",
|
|
"name",
|
|
"description",
|
|
"event_id",
|
|
"in_packs",
|
|
"for_purchase",
|
|
"total_cards",
|
|
"ranked_legal",
|
|
]
|
|
]
|
|
for line in all_cardsets:
|
|
data_list.append(
|
|
[
|
|
line.id,
|
|
line.name,
|
|
line.description,
|
|
line.event.id if line.event else "",
|
|
line.in_packs,
|
|
line.for_purchase,
|
|
line.total_cards,
|
|
line.ranked_legal,
|
|
]
|
|
)
|
|
return_val = DataFrame(data_list).to_csv(header=False, index=False)
|
|
|
|
return Response(content=return_val, media_type="text/csv")
|
|
|
|
else:
|
|
return_val = {"count": all_cardsets.count(), "cardsets": []}
|
|
for x in all_cardsets:
|
|
return_val["cardsets"].append(model_to_dict(x))
|
|
|
|
return return_val
|
|
|
|
|
|
@router.get("/search")
|
|
async def search_cardsets(
|
|
q: str = Query(..., description="Search query for cardset name"),
|
|
in_packs: Optional[bool] = None,
|
|
ranked_legal: Optional[bool] = None,
|
|
event_id: Optional[int] = None,
|
|
limit: int = Query(
|
|
default=25, ge=1, le=100, description="Maximum number of results to return"
|
|
),
|
|
):
|
|
"""
|
|
Real-time fuzzy search for cardsets by name.
|
|
|
|
Returns cardsets matching the query with exact matches prioritized over partial matches.
|
|
"""
|
|
# Start with all cardsets
|
|
all_cardsets = Cardset.select().order_by(Cardset.id)
|
|
|
|
# Apply name filter (partial match)
|
|
all_cardsets = all_cardsets.where(fn.Lower(Cardset.name).contains(q.lower()))
|
|
|
|
# Apply optional filters
|
|
if in_packs is not None:
|
|
all_cardsets = all_cardsets.where(Cardset.in_packs == in_packs)
|
|
|
|
if ranked_legal is not None:
|
|
all_cardsets = all_cardsets.where(Cardset.ranked_legal == ranked_legal)
|
|
|
|
if event_id is not None:
|
|
try:
|
|
this_event = Event.get_by_id(event_id)
|
|
all_cardsets = all_cardsets.where(Cardset.event == this_event)
|
|
except Exception as e:
|
|
logging.error(f"Failed to find event {event_id}: {e}")
|
|
raise HTTPException(
|
|
status_code=404, detail=f"Event id {event_id} not found"
|
|
)
|
|
|
|
# Convert to list for sorting
|
|
cardsets_list = list(all_cardsets)
|
|
|
|
# Sort by relevance (exact matches first, then name starts, then partial)
|
|
query_lower = q.lower()
|
|
exact_matches = []
|
|
name_start_matches = []
|
|
partial_matches = []
|
|
|
|
for cardset in cardsets_list:
|
|
name_lower = cardset.name.lower()
|
|
if name_lower == query_lower:
|
|
exact_matches.append(cardset)
|
|
else:
|
|
# Check if query matches the start of any word in name
|
|
name_parts = name_lower.split()
|
|
starts_with_match = any(part.startswith(query_lower) for part in name_parts)
|
|
|
|
if starts_with_match:
|
|
name_start_matches.append(cardset)
|
|
elif query_lower in name_lower:
|
|
partial_matches.append(cardset)
|
|
|
|
# Combine and limit results (exact, then name starts, then partial)
|
|
results = exact_matches + name_start_matches + partial_matches
|
|
total_matches = len(results)
|
|
limited_results = results[:limit]
|
|
|
|
# Build response
|
|
return_val = {
|
|
"count": len(limited_results),
|
|
"total_matches": total_matches,
|
|
"cardsets": [model_to_dict(x) for x in limited_results],
|
|
}
|
|
|
|
return return_val
|
|
|
|
|
|
@router.get("/{cardset_id}")
|
|
async def get_one_cardset(cardset_id, csv: Optional[bool] = False):
|
|
try:
|
|
this_cardset = Cardset.get_by_id(cardset_id)
|
|
except Exception:
|
|
raise HTTPException(
|
|
status_code=404, detail=f"No cardset found with id {cardset_id}"
|
|
)
|
|
|
|
if csv:
|
|
data_list = [
|
|
["id", "name", "description"],
|
|
[this_cardset.id, this_cardset.name, this_cardset.description],
|
|
]
|
|
return_val = DataFrame(data_list).to_csv(header=False, index=False)
|
|
|
|
return Response(content=return_val, media_type="text/csv")
|
|
else:
|
|
return_val = model_to_dict(this_cardset)
|
|
return return_val
|
|
|
|
|
|
@router.post("")
|
|
async def post_cardsets(cardset: CardsetModel, token: str = Depends(oauth2_scheme)):
|
|
if not valid_token(token):
|
|
logging.warning(f"Bad Token: {token}")
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="You are not authorized to post cardsets. This event has been logged.",
|
|
)
|
|
|
|
dupe_set = Cardset.get_or_none(Cardset.name == cardset.name)
|
|
if dupe_set:
|
|
raise HTTPException(
|
|
status_code=400, detail=f"There is already a cardset using {cardset.name}"
|
|
)
|
|
|
|
this_cardset = Cardset(**cardset.__dict__)
|
|
|
|
saved = this_cardset.save()
|
|
if saved == 1:
|
|
return_val = model_to_dict(this_cardset)
|
|
return return_val
|
|
else:
|
|
raise HTTPException(
|
|
status_code=418,
|
|
detail="Well slap my ass and call me a teapot; I could not save that cardset",
|
|
)
|
|
|
|
|
|
@router.patch("/{cardset_id}")
|
|
async def patch_cardsets(
|
|
cardset_id,
|
|
name: Optional[str] = None,
|
|
description: Optional[str] = None,
|
|
in_packs: Optional[bool] = None,
|
|
for_purchase: Optional[bool] = None,
|
|
total_cards: Optional[int] = None,
|
|
ranked_legal: Optional[bool] = None,
|
|
token: str = Depends(oauth2_scheme),
|
|
):
|
|
if not valid_token(token):
|
|
logging.warning(f"Bad Token: {token}")
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="You are not authorized to patch cardsets. This event has been logged.",
|
|
)
|
|
try:
|
|
this_cardset = Cardset.get_by_id(cardset_id)
|
|
except Exception:
|
|
raise HTTPException(
|
|
status_code=404, detail=f"No cardset found with id {cardset_id}"
|
|
)
|
|
|
|
if name is not None:
|
|
this_cardset.name = name
|
|
if description is not None:
|
|
this_cardset.description = description
|
|
if in_packs is not None:
|
|
this_cardset.in_packs = in_packs
|
|
if for_purchase is not None:
|
|
this_cardset.for_purchase = for_purchase
|
|
if total_cards is not None:
|
|
this_cardset.total_cards = total_cards
|
|
if ranked_legal is not None:
|
|
this_cardset.ranked_legal = ranked_legal
|
|
|
|
if this_cardset.save() == 1:
|
|
return_val = model_to_dict(this_cardset)
|
|
return return_val
|
|
else:
|
|
raise HTTPException(
|
|
status_code=418,
|
|
detail="Well slap my ass and call me a teapot; I could not save that rarity",
|
|
)
|
|
|
|
|
|
@router.delete("/{cardset_id}")
|
|
async def delete_cardsets(cardset_id, token: str = Depends(oauth2_scheme)):
|
|
if not valid_token(token):
|
|
logging.warning(f"Bad Token: {token}")
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="You are not authorized to delete cardsets. This event has been logged.",
|
|
)
|
|
try:
|
|
this_cardset = Cardset.get_by_id(cardset_id)
|
|
except Exception:
|
|
raise HTTPException(
|
|
status_code=404, detail=f"No cardset found with id {cardset_id}"
|
|
)
|
|
|
|
count = this_cardset.delete_instance()
|
|
|
|
if count == 1:
|
|
return {"message": f"Cardset {cardset_id} has been deleted"}
|
|
else:
|
|
raise HTTPException(
|
|
status_code=500, detail=f"Cardset {cardset_id} was not deleted"
|
|
)
|