import random from fastapi import APIRouter, Depends, HTTPException, Query from typing import Literal, Optional, List import logging import pydantic from ..db_engine import db, BattingCard, model_to_dict, fn, chunked, Player, MlbPlayer from ..db_helpers import upsert_batting_cards from ..dependencies import oauth2_scheme, valid_token, LOG_DATA logging.basicConfig( filename=LOG_DATA["filename"], format=LOG_DATA["format"], level=LOG_DATA["log_level"], ) router = APIRouter(prefix="/api/v2/battingcards", tags=["battingcards"]) class BattingCardModel(pydantic.BaseModel): player_id: int variant: int = 0 steal_low: int = 3 steal_high: int = 20 steal_auto: bool = False steal_jump: float = 0 bunting: str = "C" hit_and_run: str = "C" running: int = 10 offense_col: int = None hand: Literal["R", "L", "S"] = "R" class BattingCardList(pydantic.BaseModel): cards: List[BattingCardModel] @router.get("") async def get_batting_cards( player_id: list = Query(default=None), player_name: list = Query(default=None), cardset_id: list = Query(default=None), short_output: bool = False, limit: Optional[int] = None, variant: list = Query(default=None), ): all_cards = BattingCard.select().order_by(BattingCard.id) if player_id is not None: all_cards = all_cards.where(BattingCard.player_id << player_id) if cardset_id is not None: all_players = Player.select().where(Player.cardset_id << cardset_id) all_cards = all_cards.where(BattingCard.player << all_players) if player_name is not None: name_list = [x.lower() for x in player_name] all_players = Player.select().where(fn.lower(Player.p_name) << name_list) all_cards = all_cards.where(BattingCard.player << all_players) if variant is not None: all_cards = all_cards.where(BattingCard.variant << variant) if limit is not None: all_cards = all_cards.limit(limit) return_val = { "count": all_cards.count(), "cards": [model_to_dict(x, recurse=not short_output) for x in all_cards], } db.close() return return_val @router.get("/{card_id}") async def get_one_card(card_id: int): this_card = BattingCard.get_or_none(BattingCard.id == card_id) if this_card is None: db.close() raise HTTPException( status_code=404, detail=f"BattingCard id {card_id} not found" ) r_card = model_to_dict(this_card) db.close() return r_card @router.get("/player/{player_id}") async def get_player_cards( player_id: int, variant: list = Query(default=None), short_output: bool = False ): all_cards = ( BattingCard.select() .where(BattingCard.player_id == player_id) .order_by(BattingCard.variant) ) if variant is not None: all_cards = all_cards.where(BattingCard.variant << variant) return_val = { "count": all_cards.count(), "cards": [model_to_dict(x, recurse=not short_output) for x in all_cards], } db.close() return return_val @router.put("") async def put_cards(cards: BattingCardList, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f"Bad Token: {token}") db.close() raise HTTPException( status_code=401, detail="You are not authorized to post batting cards. This event has been logged.", ) new_cards = [] updates = 0 logging.info(f"here!") for x in cards.cards: try: old = BattingCard.get( (BattingCard.player_id == x.player_id) & (BattingCard.variant == x.variant) ) if x.offense_col is None: x.offense_col = old.offense_col updates += ( BattingCard.update(x.dict()) .where( (BattingCard.player_id == x.player_id) & (BattingCard.variant == x.variant) ) .execute() ) except BattingCard.DoesNotExist: if x.offense_col is None: this_player = Player.get_or_none(Player.player_id == x.player_id) mlb_player = MlbPlayer.get_or_none( MlbPlayer.key_bbref == this_player.bbref_id ) if mlb_player is not None: logging.info( f"setting offense_col to {mlb_player.offense_col} for {this_player.p_name}" ) x.offense_col = mlb_player.offense_col else: logging.info( f"randomly setting offense_col for {this_player.p_name}" ) x.offense_col = random.randint(1, 3) logging.debug(f"x.dict(): {x.dict()}") new_cards.append(x.dict()) with db.atomic(): # Use PostgreSQL-compatible upsert helper upsert_batting_cards(new_cards, batch_size=30) db.close() return f"Updated cards: {updates}; new cards: {len(new_cards)}" @router.patch("/{card_id}") async def patch_card( card_id: int, steal_low: Optional[int] = None, steal_high: Optional[int] = None, steal_auto: Optional[bool] = None, steal_jump: Optional[float] = None, bunting: Optional[str] = None, hit_and_run: Optional[str] = None, running: Optional[int] = None, offense_col: Optional[int] = None, hand: Literal["R", "L", "S"] = None, token: str = Depends(oauth2_scheme), ): if not valid_token(token): logging.warning(f"Bad Token: {token}") db.close() raise HTTPException( status_code=401, detail="You are not authorized to patch batting cards. This event has been logged.", ) this_card = BattingCard.get_or_none(BattingCard.id == card_id) if this_card is None: db.close() raise HTTPException( status_code=404, detail=f"BattingCard id {card_id} not found" ) if steal_low is not None: this_card.steal_low = steal_low if steal_high is not None: this_card.steal_high = steal_high if steal_auto is not None: this_card.steal_auto = steal_auto if steal_jump is not None: this_card.steal_jump = steal_jump if bunting is not None: this_card.bunting = bunting if hit_and_run is not None: this_card.hit_and_run = hit_and_run if running is not None: this_card.running = running if offense_col is not None: this_card.offense_col = offense_col if hand is not None: this_card.hand = hand if this_card.save() == 1: return_val = model_to_dict(this_card) db.close() return return_val else: db.close() raise HTTPException( status_code=418, detail="Well slap my ass and call me a teapot; I could not save that card", ) @router.delete("/{card_id}") async def delete_card(card_id: int, token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f"Bad Token: {token}") db.close() raise HTTPException( status_code=401, detail="You are not authorized to delete batting cards. This event has been logged.", ) this_card = BattingCard.get_or_none(BattingCard.id == card_id) if this_card is None: db.close() raise HTTPException( status_code=404, detail=f"BattingCard id {card_id} not found" ) count = this_card.delete_instance() db.close() if count == 1: return f"Card {this_card} has been deleted" else: raise HTTPException( status_code=500, detail=f"Card {this_card} could not be deleted" ) @router.delete("") async def delete_all_cards(token: str = Depends(oauth2_scheme)): if not valid_token(token): logging.warning(f"Bad Token: {token}") db.close() raise HTTPException( status_code=401, detail="You are not authorized to delete batting cards. This event has been logged.", ) d_query = BattingCard.delete() d_query.execute() return f"Deleted {d_query.count()} batting cards"