Added cardsets, packs, packtypes, players

This commit is contained in:
Cal Corum 2023-09-13 16:19:56 -05:00
parent 0872362869
commit 6325698a10
5 changed files with 1202 additions and 1 deletions

View File

@ -4,7 +4,7 @@ import os
from fastapi import FastAPI
from.routers_v2 import current, teams, rarity
from.routers_v2 import current, teams, rarity, cardsets, players, packtypes, packs
app = FastAPI(
responses={404: {'description': 'Not found'}}
@ -13,3 +13,7 @@ app = FastAPI(
app.include_router(current.router)
app.include_router(teams.router)
app.include_router(rarity.router)
app.include_router(cardsets.router)
app.include_router(players.router)
app.include_router(packtypes.router)
app.include_router(packs.router)

204
app/routers_v2/cardsets.py Normal file
View File

@ -0,0 +1,204 @@
from fastapi import APIRouter, Depends, HTTPException, Response
from typing import Optional
import logging
import pydantic
from pandas import DataFrame
from ..db_engine import db, Cardset, model_to_dict, fn, Event
from ..dependencies import oauth2_scheme, valid_token, LOG_DATA
logging.basicConfig(
filename=LOG_DATA['filename'],
format=LOG_DATA['format'],
level=LOG_DATA['log_level']
)
router = APIRouter(
prefix='/api/v2/cardsets',
tags=['cardsets']
)
class CardsetModel(pydantic.BaseModel):
name: str
description: str
event_id: Optional[int] = None
in_packs: Optional[bool] = True
total_cards: int = 0
for_purchase: Optional[bool] = True
ranked_legal: Optional[bool] = True
@router.get('')
async def get_cardsets(
name: Optional[str] = None, in_desc: Optional[str] = None, event_id: Optional[int] = None,
in_packs: Optional[bool] = None, ranked_legal: Optional[bool] = None, csv: Optional[bool] = None):
all_cardsets = Cardset.select()
if all_cardsets.count() == 0:
db.close()
raise HTTPException(status_code=404, detail=f'There are no cardsets to filter')
if name is not None:
all_cardsets = all_cardsets.where(fn.Lower(Cardset.name) == name.lower())
if in_desc is not None:
all_cardsets = all_cardsets.where(fn.Lower(Cardset.description).contains(in_desc.lower()))
if event_id is not None:
try:
this_event = Event.get_by_id(event_id)
all_cardsets = all_cardsets.where(Cardset.event == this_event)
except Exception as e:
logging.error(f'Failed to find event {event_id}: {e}')
raise HTTPException(status_code=404, detail=f'Event id {event_id} not found')
if in_packs is not None:
all_cardsets = all_cardsets.where(Cardset.in_packs == in_packs)
if ranked_legal is not None:
all_cardsets = all_cardsets.where(Cardset.ranked_legal == ranked_legal)
if all_cardsets.count() == 0:
db.close()
raise HTTPException(status_code=404, detail=f'No cardsets found')
if csv:
data_list = [[
'id', 'name', 'description', 'event_id', 'in_packs', 'for_purchase', 'total_cards', 'ranked_legal'
]]
for line in all_cardsets:
data_list.append(
[
line.id, line.name, line.description, line.event.id if line.event else '', line.in_packs,
line.for_purchase, line.total_cards, line.ranked_legal
]
)
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = {'count': all_cardsets.count(), 'cardsets': []}
for x in all_cardsets:
return_val['cardsets'].append(model_to_dict(x))
db.close()
return return_val
@router.get('/{cardset_id}')
async def get_one_cardset(cardset_id, csv: Optional[bool] = False):
try:
this_cardset = Cardset.get_by_id(cardset_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No cardset found with id {cardset_id}')
if csv:
data_list = [
['id', 'name', 'description'],
[this_cardset.id, this_cardset.name, this_cardset.description]
]
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = model_to_dict(this_cardset)
db.close()
return return_val
@router.post('')
async def post_cardsets(cardset: CardsetModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to post cardsets. This event has been logged.'
)
dupe_set = Cardset.get_or_none(Cardset.name == cardset.name)
if dupe_set:
db.close()
raise HTTPException(status_code=400, detail=f'There is already a cardset using {cardset.name}')
this_cardset = Cardset(**cardset.__dict__)
saved = this_cardset.save()
if saved == 1:
return_val = model_to_dict(this_cardset)
db.close()
return return_val
else:
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that cardset'
)
@router.patch('/{cardset_id}')
async def patch_cardsets(
cardset_id, name: Optional[str] = None, description: Optional[str] = None, in_packs: Optional[bool] = None,
for_purchase: Optional[bool] = None, total_cards: Optional[int] = None, ranked_legal: Optional[bool] = None,
token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to patch cardsets. This event has been logged.'
)
try:
this_cardset = Cardset.get_by_id(cardset_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No cardset found with id {cardset_id}')
if name is not None:
this_cardset.name = name
if description is not None:
this_cardset.description = description
if in_packs is not None:
this_cardset.in_packs = in_packs
if for_purchase is not None:
this_cardset.for_purchase = for_purchase
if total_cards is not None:
this_cardset.total_cards = total_cards
if ranked_legal is not None:
this_cardset.ranked_legal = ranked_legal
if this_cardset.save() == 1:
return_val = model_to_dict(this_cardset)
db.close()
return return_val
else:
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that rarity'
)
@router.delete('/{cardset_id}')
async def delete_cardsets(cardset_id, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to delete cardsets. This event has been logged.'
)
try:
this_cardset = Cardset.get_by_id(cardset_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No cardset found with id {cardset_id}')
count = this_cardset.delete_instance()
db.close()
if count == 1:
raise HTTPException(status_code=200, detail=f'Cardset {cardset_id} has been deleted')
else:
raise HTTPException(status_code=500, detail=f'Cardset {cardset_id} was not deleted')

259
app/routers_v2/packs.py Normal file
View File

@ -0,0 +1,259 @@
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, Response
from typing import Optional, List
import logging
import pydantic
from pandas import DataFrame
from ..db_engine import db, Cardset, model_to_dict, fn, Pack, Team, PackType
from ..dependencies import oauth2_scheme, valid_token, LOG_DATA
logging.basicConfig(
filename=LOG_DATA['filename'],
format=LOG_DATA['format'],
level=LOG_DATA['log_level']
)
router = APIRouter(
prefix='/api/v2/packs',
tags=['packs']
)
class PackPydantic(pydantic.BaseModel):
team_id: int
pack_type_id: int
pack_team_id: Optional[int] = None
pack_cardset_id: Optional[int] = None
open_time: Optional[str] = None
class PackModel(pydantic.BaseModel):
packs: List[PackPydantic]
@router.get('')
async def get_packs(
team_id: Optional[int] = None, pack_type_id: Optional[int] = None, opened: Optional[bool] = None,
limit: Optional[int] = None, new_to_old: Optional[bool] = None, pack_team_id: Optional[int] = None,
pack_cardset_id: Optional[int] = None, exact_match: Optional[bool] = False, csv: Optional[bool] = None):
all_packs = Pack.select()
if all_packs.count() == 0:
db.close()
raise HTTPException(status_code=404, detail=f'There are no packs to filter')
if team_id is not None:
try:
this_team = Team.get_by_id(team_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No team found with id {team_id}')
all_packs = all_packs.where(Pack.team == this_team)
if pack_type_id is not None:
try:
this_pack_type = PackType.get_by_id(pack_type_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No pack type found with id {pack_type_id}')
all_packs = all_packs.where(Pack.pack_type == this_pack_type)
if pack_team_id is not None:
try:
this_pack_team = Team.get_by_id(pack_team_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No team found with id {pack_team_id}')
all_packs = all_packs.where(Pack.pack_team == this_pack_team)
elif exact_match:
all_packs = all_packs.where(Pack.pack_team == None)
if pack_cardset_id is not None:
try:
this_pack_cardset = Cardset.get_by_id(pack_cardset_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No cardset found with id {pack_cardset_id}')
all_packs = all_packs.where(Pack.pack_cardset == this_pack_cardset)
elif exact_match:
all_packs = all_packs.where(Pack.pack_cardset == None)
if opened is not None:
all_packs = all_packs.where(Pack.open_time.is_null(not opened))
if limit is not None:
all_packs = all_packs.limit(limit)
if new_to_old is not None:
all_packs = all_packs.order_by(-Pack.id)
# if all_packs.count() == 0:
# db.close()
# raise HTTPException(status_code=404, detail=f'No packs found')
if csv:
data_list = [['id', 'team', 'pack_type', 'open_time']]
for line in all_packs:
data_list.append(
[
line.id, line.team.abbrev, line.pack_type.name,
datetime.fromtimestamp(line.open_time) if line.open_time else None
]
)
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = {'count': all_packs.count(), 'packs': []}
for x in all_packs:
return_val['packs'].append(model_to_dict(x))
db.close()
return return_val
@router.get('/{pack_id}')
async def get_one_pack(pack_id, csv: Optional[bool] = False):
try:
this_pack = Pack.get_by_id(pack_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No pack found with id {pack_id}')
if csv:
data_list = [
['id', 'team', 'pack_type', 'open_time'],
[this_pack.id, this_pack.team.abbrev, this_pack.pack_type.name,
datetime.fromtimestamp(this_pack.open_time) if this_pack.open_time else None]
]
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = model_to_dict(this_pack)
db.close()
return return_val
@router.post('')
async def post_pack(packs: PackModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to post packs. This event has been logged.'
)
new_packs = []
for x in packs.packs:
this_player = Pack(
team_id=x.team_id,
pack_type_id=x.pack_type_id,
pack_team_id=x.pack_team_id,
pack_cardset_id=x.pack_cardset_id,
open_time=x.open_time if x.open_time != "" else None
)
new_packs.append(this_player)
with db.atomic():
Pack.bulk_create(new_packs, batch_size=15)
db.close()
raise HTTPException(status_code=200, detail=f'{len(new_packs)} packs have been added')
@router.post('/one')
async def post_one_pack(pack: PackPydantic, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to post packs. This event has been logged.'
)
this_pack = Pack(
team_id=pack.team_id,
pack_type_id=pack.pack_type_id,
pack_team_id=pack.pack_team_id,
pack_cardset_id=pack.pack_cardset_id,
open_time=pack.open_time
)
saved = this_pack.save()
if saved == 1:
return_val = model_to_dict(this_pack)
db.close()
return return_val
else:
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that cardset'
)
@router.patch('/{pack_id}')
async def patch_pack(
pack_id, team_id: Optional[int] = None, pack_type_id: Optional[int] = None, open_time: Optional[int] = None,
pack_team_id: Optional[int] = None, pack_cardset_id: Optional[int] = None, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to patch packs. This event has been logged.'
)
try:
this_pack = Pack.get_by_id(pack_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No pack found with id {pack_id}')
if team_id is not None:
this_pack.team_id = team_id
if pack_type_id is not None:
this_pack.pack_type_id = pack_type_id
if pack_team_id is not None:
this_pack.pack_team_id = pack_team_id
if pack_cardset_id is not None:
this_pack.pack_cardset_id = pack_cardset_id
if open_time is not None:
this_pack.open_time = open_time
if this_pack.save() == 1:
return_val = model_to_dict(this_pack)
db.close()
return return_val
else:
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that rarity'
)
@router.delete('/{pack_id}')
async def delete_pack(pack_id, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to delete packs. This event has been logged.'
)
try:
this_pack = Pack.get_by_id(pack_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No packs found with id {pack_id}')
count = this_pack.delete_instance()
db.close()
if count == 1:
raise HTTPException(status_code=200, detail=f'Pack {pack_id} has been deleted')
else:
raise HTTPException(status_code=500, detail=f'Pack {pack_id} was not deleted')

194
app/routers_v2/packtypes.py Normal file
View File

@ -0,0 +1,194 @@
from fastapi import APIRouter, Depends, HTTPException, Response
from typing import Optional
import logging
import pydantic
from pandas import DataFrame
from ..db_engine import db, PackType, model_to_dict, fn
from ..dependencies import oauth2_scheme, valid_token, LOG_DATA
logging.basicConfig(
filename=LOG_DATA['filename'],
format=LOG_DATA['format'],
level=LOG_DATA['log_level']
)
router = APIRouter(
prefix='/api/v2/packtypes',
tags=['packtypes']
)
class PacktypeModel(pydantic.BaseModel):
name: str
card_count: int
description: str
cost: int
available: Optional[bool] = True
@router.get('')
async def get_packtypes(
name: Optional[str] = None, card_count: Optional[int] = None, in_desc: Optional[str] = None,
available: Optional[bool] = None, csv: Optional[bool] = None):
all_packtypes = PackType.select()
if all_packtypes.count() == 0:
db.close()
raise HTTPException(status_code=404, detail=f'There are no packtypes to filter')
if name is not None:
all_packtypes = all_packtypes.where(fn.Lower(PackType.name) == name.lower())
if card_count is not None:
all_packtypes = all_packtypes.where(PackType.card_count == card_count)
if in_desc is not None:
all_packtypes = all_packtypes.where(fn.Lower(PackType.description).contains(in_desc.lower()))
if available is not None:
all_packtypes = all_packtypes.where(PackType.available == available)
# if all_packtypes.count() == 0:
# db.close()
# raise HTTPException(status_code=404, detail=f'No packtypes found')
if csv:
data_list = [['id', 'name', 'card_count', 'description']]
for line in all_packtypes:
data_list.append(
[
line.id, line.name, line.card_count, line.description
]
)
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = {'count': all_packtypes.count(), 'packtypes': []}
for x in all_packtypes:
return_val['packtypes'].append(model_to_dict(x))
db.close()
return return_val
@router.get('/{packtype_id}')
async def get_one_packtype(packtype_id, csv: Optional[bool] = False):
try:
this_packtype = PackType.get_by_id(packtype_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No packtype found with id {packtype_id}')
if csv:
data_list = [
['id', 'name', 'card_count', 'description'],
[this_packtype.id, this_packtype.name, this_packtype.card_count, this_packtype.description]
]
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = model_to_dict(this_packtype)
db.close()
return return_val
@router.post('')
async def post_packtypes(packtype: PacktypeModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to post packtypes. This event has been logged.'
)
dupe_packtype = PackType.get_or_none(PackType.name == packtype.name)
if dupe_packtype:
db.close()
raise HTTPException(status_code=400, detail=f'There is already a packtype using {packtype.name}')
this_packtype = PackType(
name=packtype.name,
card_count=packtype.card_count,
description=packtype.description,
cost=packtype.cost,
available=packtype.available
)
saved = this_packtype.save()
if saved == 1:
return_val = model_to_dict(this_packtype)
db.close()
return return_val
else:
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that cardset'
)
@router.patch('/{packtype_id}')
async def patch_packtype(
packtype_id, name: Optional[str] = None, card_count: Optional[int] = None, description: Optional[str] = None,
cost: Optional[int] = None, available: Optional[bool] = None, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to patch packtypes. This event has been logged.'
)
try:
this_packtype = PackType.get_by_id(packtype_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No packtype found with id {packtype_id}')
if name is not None:
this_packtype.name = name
if card_count is not None:
this_packtype.card_count = card_count
if description is not None:
this_packtype.description = description
if cost is not None:
this_packtype.cost = cost
if available is not None:
this_packtype.available = available
if this_packtype.save() == 1:
return_val = model_to_dict(this_packtype)
db.close()
return return_val
else:
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that rarity'
)
@router.delete('/{packtype_id}')
async def delete_packtype(packtype_id, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to delete packtypes. This event has been logged.'
)
try:
this_packtype = PackType.get_by_id(packtype_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No packtype found with id {packtype_id}')
count = this_packtype.delete_instance()
db.close()
if count == 1:
raise HTTPException(status_code=200, detail=f'Packtype {packtype_id} has been deleted')
else:
raise HTTPException(status_code=500, detail=f'Packtype {packtype_id} was not deleted')

540
app/routers_v2/players.py Normal file
View File

@ -0,0 +1,540 @@
from fastapi import APIRouter, Depends, HTTPException, Response, Query
from typing import Optional, List
import logging
import pydantic
from pandas import DataFrame
from ..db_engine import db, Player, model_to_dict, fn, chunked, Paperdex, Cardset, Rarity
from ..dependencies import oauth2_scheme, valid_token, LOG_DATA
logging.basicConfig(
filename=LOG_DATA['filename'],
format=LOG_DATA['format'],
level=LOG_DATA['log_level']
)
router = APIRouter(
prefix='/api/v2/players',
tags=['players']
)
class PlayerPydantic(pydantic.BaseModel):
player_id: int
p_name: str
cost: int
image: str
image2: Optional[str] = None
mlbclub: str
franchise: str
cardset_id: int
set_num: int
rarity_id: int
pos_1: str
pos_2: Optional[str] = None
pos_3: Optional[str] = None
pos_4: Optional[str] = None
pos_5: Optional[str] = None
pos_6: Optional[str] = None
pos_7: Optional[str] = None
pos_8: Optional[str] = None
headshot: Optional[str] = None
vanity_card: Optional[str] = None
strat_code: Optional[str] = None
bbref_id: Optional[str] = None
fangr_id: Optional[str] = None
description: str
quantity: Optional[int] = 999
class PlayerModel(pydantic.BaseModel):
players: List[PlayerPydantic]
@router.get('')
async def get_players(
name: Optional[str] = None, value: Optional[int] = None, min_cost: Optional[int] = None,
max_cost: Optional[int] = None, has_image2: Optional[bool] = None, mlbclub: Optional[str] = None,
franchise: Optional[str] = None, cardset_id: list = Query(default=None), rarity_id: list = Query(default=None),
pos_include: list = Query(default=None), pos_exclude: list = Query(default=None), has_headshot: Optional[bool] = None,
has_vanity_card: Optional[bool] = None, strat_code: Optional[str] = None, bbref_id: Optional[str] = None,
fangr_id: Optional[str] = None, inc_dex: Optional[bool] = True, in_desc: Optional[str] = None,
flat: Optional[bool] = False, sort_by: Optional[str] = False, cardset_id_exclude: list = Query(default=None),
limit: Optional[int] = None, csv: Optional[bool] = None):
all_players = Player.select()
if all_players.count() == 0:
db.close()
raise HTTPException(status_code=404, detail=f'There are no players to filter')
if name is not None:
all_players = all_players.where(fn.Lower(Player.p_name) == name.lower())
if value is not None:
all_players = all_players.where(Player.cost == value)
if min_cost is not None:
all_players = all_players.where(Player.cost >= min_cost)
if max_cost is not None:
all_players = all_players.where(Player.cost <= max_cost)
if has_image2 is not None:
all_players = all_players.where(Player.image2.is_null(not has_image2))
if mlbclub is not None:
all_players = all_players.where(fn.Lower(Player.mlbclub) == mlbclub.lower())
if franchise is not None:
all_players = all_players.where(fn.Lower(Player.franchise) == franchise.lower())
if cardset_id is not None:
all_players = all_players.where(Player.cardset_id << cardset_id)
if cardset_id_exclude is not None:
all_players = all_players.where(Player.cardset_id.not_in(cardset_id_exclude))
if rarity_id is not None:
all_players = all_players.where(Player.rarity_id << rarity_id)
if pos_include is not None:
p_list = [x.upper() for x in pos_include]
all_players = all_players.where(
(Player.pos_1 << p_list) | (Player.pos_2 << p_list) | (Player.pos_3 << p_list) | (Player.pos_4 << p_list) |
(Player.pos_5 << p_list) | (Player.pos_6 << p_list) | (Player.pos_7 << p_list) | (Player.pos_8 << p_list)
)
if has_headshot is not None:
all_players = all_players.where(Player.headshot.is_null(not has_headshot))
if has_vanity_card is not None:
all_players = all_players.where(Player.vanity_card.is_null(not has_vanity_card))
if strat_code is not None:
all_players = all_players.where(Player.strat_code == strat_code)
if bbref_id is not None:
all_players = all_players.where(Player.bbref_id == bbref_id)
if fangr_id is not None:
all_players = all_players.where(Player.fangr_id == fangr_id)
if in_desc is not None:
all_players = all_players.where(fn.Lower(Player.description).contains(in_desc.lower()))
if sort_by is not None:
if sort_by == 'cost-desc':
all_players = all_players.order_by(-Player.cost)
elif sort_by == 'cost-asc':
all_players = all_players.order_by(Player.cost)
elif sort_by == 'name-asc':
all_players = all_players.order_by(Player.p_name)
elif sort_by == 'name-desc':
all_players = all_players.order_by(-Player.p_name)
elif sort_by == 'rarity-desc':
all_players = all_players.order_by(Player.rarity)
elif sort_by == 'rarity-asc':
all_players = all_players.order_by(-Player.rarity)
final_players = []
# logging.info(f'pos_exclude: {type(pos_exclude)} - {pos_exclude} - is None: {pos_exclude is None}')
for x in all_players:
if pos_exclude is not None and set([x.upper() for x in pos_exclude]).intersection(x.get_all_pos()):
pass
else:
final_players.append(x)
if limit is not None and len(final_players) >= limit:
break
# if len(final_players) == 0:
# db.close()
# raise HTTPException(status_code=404, detail=f'No players found')
if csv:
all_players.order_by(-Player.rarity.value, Player.p_name)
data_list = [['id', 'name', 'value', 'image', 'image2', 'mlbclub', 'franchise', 'cardset', 'rarity', 'pos_1',
'pos_2', 'pos_3', 'pos_4', 'pos_5', 'pos_6', 'pos_7', 'pos_8', 'headshot', 'vanity_card',
'strat_code', 'bbref_id', 'description', 'for_purchase', 'ranked_legal']]
for line in final_players:
data_list.append(
[
line.player_id, line.p_name, line.cost, line.image, line.image2, line.mlbclub, line.franchise,
line.cardset, line.rarity, line.pos_1, line.pos_2, line.pos_3, line.pos_4, line.pos_5, line.pos_6,
line.pos_7, line.pos_8, line.headshot, line.vanity_card, line.strat_code, line.bbref_id,
line.description, line.cardset.for_purchase, line.cardset.ranked_legal
# line.description, line.cardset.in_packs, line.quantity
]
)
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = {'count': len(final_players), 'players': []}
for x in final_players:
this_record = model_to_dict(x, recurse=not flat)
if inc_dex:
this_dex = Paperdex.select().where(Paperdex.player == x)
this_record['paperdex'] = {'count': this_dex.count(), 'paperdex': []}
for y in this_dex:
this_record['paperdex']['paperdex'].append(model_to_dict(y, recurse=False))
return_val['players'].append(this_record)
# return_val['players'].append(model_to_dict(x, recurse=not flat))
db.close()
return return_val
@router.get('/random')
async def get_random_player(
min_cost: Optional[int] = None, max_cost: Optional[int] = None, in_packs: Optional[bool] = None,
min_rarity: Optional[int] = None, max_rarity: Optional[int] = None, limit: Optional[int] = None,
pos_include: Optional[str] = None, pos_exclude: Optional[str] = None, franchise: Optional[str] = None,
mlbclub: Optional[str] = None, cardset_id: list = Query(default=None), pos_inc: list = Query(default=None),
pos_exc: list = Query(default=None), csv: Optional[bool] = None):
all_players = (Player
.select()
.join(Cardset)
.switch(Player)
.join(Rarity)
.order_by(fn.Random()))
if min_cost is not None:
all_players = all_players.where(Player.cost >= min_cost)
if max_cost is not None:
all_players = all_players.where(Player.cost <= max_cost)
if in_packs is not None:
if in_packs:
all_players = all_players.where(Player.cardset.in_packs)
if min_rarity is not None:
all_players = all_players.where(Player.rarity.value >= min_rarity)
if max_rarity is not None:
all_players = all_players.where(Player.rarity.value <= max_rarity)
if pos_include is not None:
all_players = all_players.where(
(fn.lower(Player.pos_1) == pos_include.lower()) | (fn.lower(Player.pos_2) == pos_include.lower()) |
(fn.lower(Player.pos_3) == pos_include.lower()) | (fn.lower(Player.pos_4) == pos_include.lower()) |
(fn.lower(Player.pos_5) == pos_include.lower()) | (fn.lower(Player.pos_6) == pos_include.lower()) |
(fn.lower(Player.pos_7) == pos_include.lower()) | (fn.lower(Player.pos_8) == pos_include.lower())
)
if franchise is not None:
all_players = all_players.where(fn.Lower(Player.franchise) == franchise.lower())
if mlbclub is not None:
all_players = all_players.where(fn.Lower(Player.mlbclub) == mlbclub.lower())
if cardset_id is not None:
all_players = all_players.where(Player.cardset_id << cardset_id)
if pos_inc is not None:
p_list = [x.upper() for x in pos_inc]
all_players = all_players.where(
(Player.pos_1 << p_list) | (Player.pos_2 << p_list) | (Player.pos_3 << p_list) | (Player.pos_4 << p_list) |
(Player.pos_5 << p_list) | (Player.pos_6 << p_list) | (Player.pos_7 << p_list) | (Player.pos_8 << p_list)
)
# if pos_exc is not None:
# p_list = [x.upper() for x in pos_exc]
# logging.info(f'starting query: {all_players}\n\np_list: {p_list}\n\n')
# all_players = all_players.where(
# Player.pos_1.not_in(p_list) & Player.pos_2.not_in(p_list) & Player.pos_3.not_in(p_list) &
# Player.pos_4.not_in(p_list) & Player.pos_5.not_in(p_list) & Player.pos_6.not_in(p_list) &
# Player.pos_7.not_in(p_list) & Player.pos_8.not_in(p_list)
# )
# logging.info(f'post pos query: {all_players}')
if pos_exclude is not None and pos_exc is None:
final_players = [x for x in all_players if pos_exclude not in x.get_all_pos()]
elif pos_exc is not None and pos_exclude is None:
final_players = []
p_list = [x.upper() for x in pos_exc]
for x in all_players:
if limit is not None and len(final_players) >= limit:
break
if not set(p_list).intersection(x.get_all_pos()):
final_players.append(x)
else:
final_players = all_players
if limit is not None:
final_players = final_players[:limit]
# if len(final_players) == 0:
# db.close()
# raise HTTPException(status_code=404, detail=f'No players found')
if csv:
data_list = [['id', 'name', 'cost', 'image', 'image2', 'mlbclub', 'franchise', 'cardset', 'rarity', 'pos_1',
'pos_2', 'pos_3', 'pos_4', 'pos_5', 'pos_6', 'pos_7', 'pos_8', 'headshot', 'vanity_card',
'strat_code', 'bbref_id', 'description']]
for line in final_players:
data_list.append(
[
line.id, line.p_name, line.cost, line.image, line.image2,
line.mlbclub, line.franchise, line.cardset.name, line.rarity.name,
line.pos_1, line.pos_2, line.pos_3, line.pos_4, line.pos_5,
line.pos_6, line.pos_7, line.pos_8, line.headshot, line.vanity_card,
line.strat_code, line.bbref_id, line.description
]
)
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = {'count': len(final_players), 'players': []}
for x in final_players:
this_record = model_to_dict(x)
this_dex = Paperdex.select().where(Paperdex.player == x)
this_record['paperdex'] = {'count': this_dex.count(), 'paperdex': []}
for y in this_dex:
this_record['paperdex']['paperdex'].append(model_to_dict(y, recurse=False))
return_val['players'].append(this_record)
# return_val['players'].append(model_to_dict(x))
db.close()
return return_val
@router.get('/{player_id}')
async def get_one_player(player_id, csv: Optional[bool] = False):
try:
this_player = Player.get_by_id(player_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No player found with id {player_id}')
if csv:
data_list = [['id', 'name', 'cost', 'image', 'image2', 'mlbclub', 'franchise', 'cardset', 'rarity', 'pos_1',
'pos_2', 'pos_3', 'pos_4', 'pos_5', 'pos_6', 'pos_7', 'pos_8', 'headshot', 'vanity_card',
'strat_code', 'bbref_id', 'description']]
return_val = DataFrame(data_list).to_csv(header=False, index=False)
data_list.append(
[
this_player.id, this_player.p_name, this_player.cost, this_player.image, this_player.image2,
this_player.mlbclub, this_player.franchise, this_player.cardset.name, this_player.rarity.name,
this_player.pos_1, this_player.pos_2, this_player.pos_3, this_player.pos_4, this_player.pos_5,
this_player.pos_6, this_player.pos_7, this_player.pos_8, this_player.headshot, this_player.vanity_card,
this_player.strat_code, this_player.bbref_id, this_player.description
]
)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = model_to_dict(this_player)
this_dex = Paperdex.select().where(Paperdex.player == this_player)
return_val['paperdex'] = {'count': this_dex.count(), 'paperdex': []}
for x in this_dex:
return_val['paperdex']['paperdex'].append(model_to_dict(x, recurse=False))
db.close()
return return_val
@router.patch('/{player_id}')
async def v1_players_patch(
player_id, name: Optional[str] = None, image: Optional[str] = None, image2: Optional[str] = None,
mlbclub: Optional[str] = None, franchise: Optional[str] = None, cardset_id: Optional[int] = None,
rarity_id: Optional[int] = None, pos_1: Optional[str] = None, pos_2: Optional[str] = None,
pos_3: Optional[str] = None, pos_4: Optional[str] = None, pos_5: Optional[str] = None,
pos_6: Optional[str] = None, pos_7: Optional[str] = None, pos_8: Optional[str] = None,
headshot: Optional[str] = None, vanity_card: Optional[str] = None, strat_code: Optional[str] = None,
bbref_id: Optional[str] = None, description: Optional[str] = None, cost: Optional[int] = None,
fangr_id: Optional[str] = None, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to patch players. This event has been logged.'
)
try:
this_player = Player.get_by_id(player_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No player found with id {player_id}')
if cost is not None:
this_player.cost = cost
if name is not None:
this_player.p_name = name
if image is not None:
this_player.image = image
if image2 is not None:
if image2.lower() == 'false':
this_player.image2 = None
else:
this_player.image2 = image2
if mlbclub is not None:
this_player.mlbclub = mlbclub
if franchise is not None:
this_player.franchise = franchise
if cardset_id is not None:
try:
this_cardset = Cardset.get_by_id(cardset_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No cardset found with id {cardset_id}')
this_player.cardset = this_cardset
if rarity_id is not None:
try:
this_rarity = Rarity.get_by_id(rarity_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No rarity found with id {rarity_id}')
this_player.rarity = this_rarity
if pos_1 is not None:
if pos_1 == 'False':
this_player.pos_1 = None
else:
this_player.pos_1 = pos_1
if pos_2 is not None:
if pos_2 == 'False':
this_player.pos_2 = None
else:
this_player.pos_2 = pos_2
if pos_3 is not None:
if pos_3 == 'False':
this_player.pos_3 = None
else:
this_player.pos_3 = pos_3
if pos_4 is not None:
if pos_4 == 'False':
this_player.pos_4 = None
else:
this_player.pos_4 = pos_4
if pos_5 is not None:
if pos_5 == 'False':
this_player.pos_5 = None
else:
this_player.pos_5 = pos_5
if pos_6 is not None:
if pos_6 == 'False':
this_player.pos_6 = None
else:
this_player.pos_6 = pos_6
if pos_7 is not None:
if pos_7 == 'False':
this_player.pos_7 = None
else:
this_player.pos_7 = pos_7
if pos_8 is not None:
if pos_8 == 'False':
this_player.pos_8 = None
else:
this_player.pos_8 = pos_8
if headshot is not None:
this_player.headshot = headshot
if vanity_card is not None:
this_player.vanity_card = vanity_card
if strat_code is not None:
this_player.strat_code = strat_code
if bbref_id is not None:
this_player.bbref_id = bbref_id
if fangr_id is not None:
this_player.fangr_id = fangr_id
if description is not None:
this_player.description = description
if this_player.save() == 1:
return_val = model_to_dict(this_player)
db.close()
return return_val
else:
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that rarity'
)
@router.put('')
async def v1_players_put(players: PlayerModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to post players. This event has been logged.'
)
new_players = []
for x in players.players:
# this_player = Player(
# player_id=x.player_id,
# p_name=x.p_name,
# cost=x.cost,
# image=x.image,
# image2=x.image2,
# mlbclub=x.mlbclub,
# franchise=x.franchise,
# cardset_id=x.cardset_id,
# rarity_id=x.rarity_id,
# set_num=x.set_num,
# pos_1=x.pos_1,
# pos_2=x.pos_2,
# pos_3=x.pos_3,
# pos_4=x.pos_4,
# pos_5=x.pos_5,
# pos_6=x.pos_6,
# pos_7=x.pos_7,
# pos_8=x.pos_8,
# headshot=x.headshot,
# vanity_card=x.vanity_card,
# strat_code=x.strat_code,
# fangr_id=x.fangr_id,
# bbref_id=x.bbref_id,
# description=x.description
# )
# new_players.append(this_player)
new_players.append({
'player_id': x.player_id,
'p_name': x.p_name,
'cost': x.cost,
'image': x.image,
'image2': x.image2,
'mlbclub': x.mlbclub.title(),
'franchise': x.franchise.title(),
'cardset_id': x.cardset_id,
'rarity_id': x.rarity_id,
'set_num': x.set_num,
'pos_1': x.pos_1,
'pos_2': x.pos_2,
'pos_3': x.pos_3,
'pos_4': x.pos_4,
'pos_5': x.pos_5,
'pos_6': x.pos_6,
'pos_7': x.pos_7,
'pos_8': x.pos_8,
'headshot': x.headshot,
'vanity_card': x.vanity_card,
'strat_code': x.strat_code,
'fangr_id': x.fangr_id,
'bbref_id': x.bbref_id,
'description': x.description
})
logging.info(f'new_players: {new_players}')
with db.atomic():
# Player.bulk_create(new_players, batch_size=15)
for batch in chunked(new_players, 15):
logging.info(f'batch: {batch}')
Player.insert_many(batch).on_conflict_replace().execute()
db.close()
# sheets.update_all_players(SHEETS_AUTH)
raise HTTPException(status_code=200, detail=f'{len(new_players)} players have been added')
@router.delete('/{player_id}')
async def v1_players_delete(player_id, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to delete players. This event has been logged.'
)
try:
this_player = Player.get_by_id(player_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No player found with id {player_id}')
count = this_player.delete_instance()
db.close()
if count == 1:
raise HTTPException(status_code=200, detail=f'Player {player_id} has been deleted')
else:
raise HTTPException(status_code=500, detail=f'Player {player_id} was not deleted')