paper-dynasty-database/main.py
2023-02-23 23:49:38 -06:00

4460 lines
156 KiB
Python

from datetime import datetime
import logging
import os
from db_engine import *
from typing import Optional, List, Union
from fastapi import FastAPI, HTTPException, Depends, Response, Query
from fastapi.security import OAuth2PasswordBearer
import pydantic
import pygsheets
import sheets
from playhouse.shortcuts import model_to_dict
from pandas import DataFrame
raw_log_level = os.getenv('LOG_LEVEL')
if raw_log_level == 'INFO':
log_level = logging.INFO
elif raw_log_level == 'WARN':
log_level = logging.WARN
else:
log_level = logging.ERROR
date = f'{datetime.now().year}-{datetime.now().month}-{datetime.now().day}'
logging.basicConfig(
filename=f'logs/database/{date}.log',
format='%(asctime)s - %(levelname)s - %(message)s',
level=log_level
)
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
DEFAULT_SEASON = 4
SHEETS_AUTH = pygsheets.authorize(service_file='storage/paper-dynasty-service-creds.json', retries=1)
def valid_token(token):
if token == os.environ.get('API_TOKEN'):
return True
else:
return False
def int_timestamp(datetime_obj: datetime) -> int:
return int(datetime.timestamp(datetime_obj) * 1000)
"""
CURRENT ENDPOINTS
"""
class CurrentModel(pydantic.BaseModel):
season: int
week: int
gsheet_template: str
gsheet_version: str
@app.get('/api/v1/current')
async def v1_current_get(season: Optional[int] = None, csv: Optional[bool] = False):
if season:
current = Current.get_or_none(season=season)
else:
current = Current.latest()
if csv:
current_list = [
['id', 'season', 'week'],
[current.id, current.season, current.week]
]
return_val = DataFrame(current_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = model_to_dict(current)
db.close()
return return_val
@app.get('/api/v1/current/{current_id}')
async def v1_current_get_one(current_id, csv: Optional[bool] = False):
try:
current = Current.get_by_id(current_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No current found with id {current_id}')
if csv:
current_list = [
['id', 'season', 'week'],
[current.id, current.season, current.week]
]
return_val = DataFrame(current_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = model_to_dict(current)
db.close()
return return_val
@app.post('/api/v1/current')
async def v1_current_post(current: CurrentModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to post current. This event has been logged.'
)
dupe_curr = Current.get_or_none(Current.season == current.season)
if dupe_curr:
db.close()
raise HTTPException(status_code=400, detail=f'There is already a current for season {current.season}')
this_curr = Current(
season=current.season,
week=current.week,
gsheet_template=current.gsheet_template,
gsheet_version=current.gsheet_version
)
saved = this_curr.save()
if saved == 1:
return_val = model_to_dict(this_curr)
db.close()
return return_val
else:
raise HTTPException(status_code=418, detail='Well slap my ass and call me a teapot; I could not save that team')
@app.patch('/api/v1/current/{current_id}')
async def v1_current_patch(
current_id: int, season: Optional[int] = None, week: Optional[int] = None,
gsheet_template: Optional[str] = None, gsheet_version: Optional[str] = None,
live_scoreboard: Optional[int] = None, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to patch current. This event has been logged.'
)
try:
current = Current.get_by_id(current_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No current found with id {current_id}')
if season is not None:
current.season = season
if week is not None:
current.week = week
if gsheet_template is not None:
current.gsheet_template = gsheet_template
if gsheet_version is not None:
current.gsheet_version = gsheet_version
if live_scoreboard is not None:
current.live_scoreboard = live_scoreboard
if current.save() == 1:
return_val = model_to_dict(current)
db.close()
return return_val
else:
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that current'
)
@app.delete('/api/v1/current/{current_id}')
async def v1_current_delete(current_id, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to delete current. This event has been logged.'
)
try:
this_curr = Current.get_by_id(current_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No current found with id {current_id}')
count = this_curr.delete_instance()
db.close()
if count == 1:
raise HTTPException(status_code=200, detail=f'Current {current_id} has been deleted')
else:
raise HTTPException(status_code=500, detail=f'Current {current_id} was not deleted')
"""
TEAMS ENDPOINTS
"""
class TeamModel(pydantic.BaseModel):
abbrev: str
sname: str
lname: str
gmid: int
gmname: str
wallet: int = 0
gsheet: str
team_value: int = 0
collection_value: int = 0
logo: Optional[str] = None
color: Optional[str] = None
season: int
ps_shiny: Optional[int] = 0
ranking: Optional[int] = 1000
has_guide: Optional[bool] = False
is_ai: Optional[bool] = False
@app.get('/api/v1/teams')
async def v1_teams_get(
season: Optional[int] = None, gm_id: Optional[int] = None, abbrev: Optional[str] = None,
tv_min: Optional[int] = None, tv_max: Optional[int] = None, cv_min: Optional[int] = None,
cv_max: Optional[int] = None, ps_shiny_min: Optional[int] = None, ps_shiny_max: Optional[int] = None,
ranking_min: Optional[int] = None, ranking_max: Optional[int] = None, has_guide: Optional[bool] = None,
sname: Optional[str] = None, lname: Optional[str] = None, is_ai: Optional[bool] = None,
limit: Optional[int] = None, csv: Optional[bool] = False):
"""
Param: season: int
Param: team_abbrev: string
Param: owner_id: int
"""
if season:
all_teams = Team.select_season(season)
else:
all_teams = Team.select()
# if all_teams.count() == 0:
# db.close()
# raise HTTPException(status_code=404, detail=f'There are no teams to filter')
if gm_id is not None:
all_teams = all_teams.where(Team.gmid == gm_id)
if abbrev is not None:
all_teams = all_teams.where(fn.Lower(Team.abbrev) == abbrev.lower())
if sname is not None:
all_teams = all_teams.where(fn.Lower(Team.sname) == sname.lower())
if lname is not None:
all_teams = all_teams.where(fn.Lower(Team.lname) == lname.lower())
if tv_min is not None:
all_teams = all_teams.where(Team.team_value >= tv_min)
if tv_max is not None:
all_teams = all_teams.where(Team.team_value <= tv_max)
if cv_min is not None:
all_teams = all_teams.where(Team.collection_value >= cv_min)
if cv_max is not None:
all_teams = all_teams.where(Team.collection_value <= cv_max)
if ps_shiny_min is not None:
all_teams = all_teams.where(Team.career >= ps_shiny_min)
if ps_shiny_max is not None:
all_teams = all_teams.where(Team.career <= ps_shiny_max)
if ranking_min is not None:
all_teams = all_teams.where(Team.ranking >= ranking_min)
if ranking_max is not None:
all_teams = all_teams.where(Team.ranking <= ranking_max)
if ranking_max is not None:
all_teams = all_teams.where(Team.ranking <= ranking_max)
if has_guide is not None:
if not has_guide:
all_teams = all_teams.where(Team.has_guide == 0)
else:
all_teams = all_teams.where(Team.has_guide == 1)
if is_ai is not None:
all_teams = all_teams.where(Team.is_ai)
if limit is not None:
all_teams = all_teams.limit(limit)
# if all_teams.count() == 0:
# db.close()
# raise HTTPException(status_code=404, detail=f'No teams found')
if csv:
data_list = [[
'id', 'abbrev', 'sname', 'lname', 'gmid', 'gmname', 'wallet', 'gsheet', 'team_value',
'collection_value', 'logo', 'color', 'season', 'ranking'
]]
for line in all_teams:
data_list.append(
[
line.id, line.abbrev, line.sname, line.lname, line.gmid, line.gmname, line.wallet, line.gsheet,
line.team_value, line.collection_value, line.logo, f'\'{line.color}', line.season, line.ranking
]
)
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_teams = {'count': all_teams.count(), 'teams': []}
for x in all_teams:
return_teams['teams'].append(model_to_dict(x))
db.close()
return return_teams
@app.get('/api/v1/teams/{team_id}')
async def v1_teams_get_one(team_id, csv: Optional[bool] = False):
try:
this_team = Team.get_by_id(team_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No team found with id {team_id}')
if csv:
team_packs = Pack.select().where((Pack.team == this_team) & (Pack.open_time.is_null(True)))
data_list = [
['id', 'abbrev', 'sname', 'lname', 'gmid', 'gmname', 'wallet', 'ranking', 'gsheet', 'sealed_packs',
'collection_value', 'logo', 'color', 'season'],
[this_team.id, this_team.abbrev, this_team.sname, this_team.lname, this_team.gmid, this_team.gmname,
this_team.wallet, this_team.ranking, this_team.gsheet, team_packs.count(), this_team.collection_value,
this_team.logo, this_team.color, this_team.season]
]
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = model_to_dict(this_team)
db.close()
return return_val
@app.get('/api/v1/teams/{team_id}/buy/players')
async def v1_team_cards_buy(team_id: int, ids: str, ts: int):
try:
this_team = Team.get_by_id(team_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No team found with id {team_id}')
if ts != this_team.team_hash():
logging.warning(f'Bad Team Secret: {ts} ({this_team.team_hash()})')
db.close()
raise HTTPException(
status_code=401,
detail=f'You are not authorized to buy {this_team.abbrev} cards. This event has been logged.'
)
last_card = Card.select(Card.id).order_by(-Card.id).limit(1)
lc_id = last_card[0].id
all_ids = ids.split(',')
conf_message = ''
total_cost = 0
for player_id in all_ids:
if player_id != '':
try:
this_player = Player.get_by_id(player_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No player found with id {player_id} /// '
f'{conf_message} purchased')
# check wallet balance
if this_team.wallet < this_player.cost:
logging.info(f'{this_player} was not purchased. {this_team.lname} only has {this_team.wallet}₼, but '
f'{this_player} costs {this_player.cost}₼.')
db.close()
raise HTTPException(
200,
detail=f'{this_player} was not purchased. {this_team.lname} only has {this_team.wallet}₼, but '
f'{this_player} costs {this_player.cost}₼. /// {conf_message} purchased'
)
# Create player card and update cost
buy_price = this_player.cost
total_cost += buy_price
this_card = Card(
player_id=this_player.player_id,
team_id=this_team.id,
value=buy_price
)
Paperdex.get_or_create(team_id=team_id, player_id=this_player.player_id)
this_card.save()
this_player.change_on_buy()
# Deduct card cost from team
logging.info(f'{this_team.abbrev} starting wallet: {this_team.wallet}')
this_team.wallet -= buy_price
this_team.save()
logging.info(f'{this_team.abbrev} ending wallet: {this_team.wallet}')
# Post a notification
if this_player.rarity.value >= 2:
new_notif = Notification(
created=int_timestamp(datetime.now()),
title=f'Price Change',
desc='Modified by buying and selling',
field_name=f'{this_player.description}',
message=f'From {buy_price}₼ 📈 to **{this_player.cost}**₼',
about=f'Player-{this_player.player_id}'
)
new_notif.save()
conf_message += f'{buy_price}₼ for {this_player.rarity.name} {this_player.p_name} ' \
f'({this_player.cardset.name}), '
# sheets.post_new_cards(SHEETS_AUTH, lc_id)
raise HTTPException(status_code=200, detail=f'{conf_message} purchased. /// Total Cost: {total_cost}₼ /// '
f'Final Wallet: {this_team.wallet}')
@app.get('/api/v1/teams/{team_id}/buy/pack/{packtype_id}')
async def v1_team_pack_buy(team_id: int, packtype_id: int, ts: int, quantity: Optional[int] = 1):
try:
this_packtype = PackType.get_by_id(packtype_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No pack type found with id {packtype_id}')
try:
this_team = Team.get_by_id(team_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No team found with id {team_id}')
if ts != this_team.team_hash():
logging.warning(f'Bad Team Secret: {ts} ({this_team.team_hash()})')
db.close()
logging.warning(f'team: {this_team} / pack_type: {this_packtype} / secret: {ts} / '
f'actual: {this_team.team_hash()}')
raise HTTPException(
status_code=401,
detail=f'You are not authorized to buy {this_team.abbrev} packs. This event has been logged.'
)
# check wallet balance
total_cost = this_packtype.cost * quantity
if this_team.wallet < total_cost:
db.close()
raise HTTPException(
200,
detail=f'{this_packtype} was not purchased. {this_team.lname} only has {this_team.wallet} bucks, but '
f'{this_packtype} costs {this_packtype.cost}.'
)
all_packs = []
for i in range(quantity):
all_packs.append(Pack(team_id=this_team.id, pack_type_id=this_packtype.id))
# Deduct card cost from team
logging.info(f'{this_team.abbrev} starting wallet: {this_team.wallet}')
this_team.wallet -= total_cost
this_team.save()
logging.info(f'{this_team.abbrev} ending wallet: {this_team.wallet}')
with db.atomic():
Pack.bulk_create(all_packs, batch_size=15)
db.close()
raise HTTPException(
status_code=200,
detail=f'Quantity {quantity} {this_packtype.name} pack{"s" if quantity > 1 else ""} have been purchased by '
f'{this_team.lname} for {total_cost} bucks. You may close this window.'
)
@app.get('/api/v1/teams/{team_id}/sell/cards')
async def v1_team_cards_sell(team_id: int, ids: str, ts: int):
try:
this_team = Team.get_by_id(team_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No team found with id {team_id}')
if ts != this_team.team_hash():
logging.warning(f'Bad Team Secret: {ts} ({this_team.team_hash()})')
db.close()
raise HTTPException(
status_code=401,
detail=f'You are not authorized to sell {this_team.abbrev} cards. This event has been logged.'
)
all_ids = ids.split(',')
del_ids = []
conf_message = ''
total_cost = 0
for card_id in all_ids:
if card_id != '':
try:
this_card = Card.get_by_id(card_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No card found with id {card_id}')
del_ids.append(card_id)
this_player = this_card.player
if this_card.team != this_team:
raise HTTPException(status_code=401,
detail=f'Card id {card_id} ({this_player.p_name}) belongs to '
f'{this_card.team.abbrev} and cannot be sold. /// {conf_message} sold')
orig_price = this_player.cost
sell_price = round(this_player.cost * .5)
total_cost += sell_price
# credit selling team's wallet
if this_team.wallet is None:
this_team.wallet = sell_price
else:
this_team.wallet += sell_price
this_team.save()
# decrease price of player
this_player.change_on_sell()
this_card.delete_instance()
# post a notification
if this_player.rarity.value >= 2:
new_notif = Notification(
created=int_timestamp(datetime.now()),
title=f'Price Change',
desc='Modified by buying and selling',
field_name=f'{this_player.description}',
message=f'From {orig_price}₼ 📉 to **{this_player.cost}**₼',
about=f'Player-{this_player.id}'
)
new_notif.save()
conf_message += f'{sell_price}₼ for {this_player.rarity.name} {this_player.p_name} ' \
f'({this_player.cardset.name}), '
# sheets.post_deletion(SHEETS_AUTH, del_ids)
raise HTTPException(status_code=200, detail=f'{conf_message} sold. /// Total Earned: {total_cost}₼ /// '
f'Final Wallet: {this_team.wallet}')
@app.get('/api/v1/teams/{team_id}/cards')
async def v1_teams_cards_get(team_id, csv: Optional[bool] = True):
"""
CSV output specifically targeting team roster sheet
Parameters
----------
team_id
csv
"""
try:
this_team = Team.get_by_id(team_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No team found with id {team_id}')
if not csv:
db.close()
raise HTTPException(
status_code=400,
detail='The /teams/{team_id}/cards endpoint only supports csv output.'
)
all_cards = (Card
.select()
.join(Player)
.join(Rarity)
.where(Card.team == this_team)
.order_by(-Card.player.rarity.value, Card.player.p_name)
)
if all_cards.count() == 0:
db.close()
raise HTTPException(status_code=404, detail=f'No cards found')
data_list = [[
'cardset', 'player', 'rarity', 'image', 'image2', 'pos_1', 'pos_2', 'pos_3', 'pos_4', 'pos_5', 'pos_6',
'pos_7', 'pos_8', 'cost', 'mlbclub', 'franchise', 'set_num', 'bbref_id', 'player_id', 'card_id'
]]
for line in all_cards:
data_list.append(
[
line.player.cardset, line.player.p_name, line.player.rarity, line.player.image, line.player.image2,
line.player.pos_1, line.player.pos_2, line.player.pos_3, line.player.pos_4, line.player.pos_5,
line.player.pos_6, line.player.pos_7, line.player.pos_8, line.player.cost, line.player.mlbclub,
line.player.franchise, line.player.set_num, line.player.bbref_id, line.player.player_id, line.id
]
)
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
@app.post('/api/v1/teams')
async def v1_teams_post(team: TeamModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to post teams. This event has been logged.'
)
dupe_team = Team.get_or_none(Team.season == team.season, Team.abbrev == team.abbrev)
if dupe_team:
db.close()
raise HTTPException(status_code=400, detail=f'There is already a season {team.season} team using {team.abbrev}')
this_team = Team(
abbrev=team.abbrev,
sname=team.sname,
lname=team.lname,
gmid=team.gmid,
gmname=team.gmname,
wallet=team.wallet,
gsheet=team.gsheet,
team_value=team.team_value,
collection_value=team.collection_value,
logo=team.logo,
color=team.color,
ranking=team.ranking,
season=team.season,
career=team.ps_shiny,
has_guide=team.has_guide,
is_ai=team.is_ai
)
saved = this_team.save()
if saved == 1:
return_team = model_to_dict(this_team)
db.close()
return return_team
else:
raise HTTPException(status_code=418, detail='Well slap my ass and call me a teapot; I could not save that team')
@app.post('/api/v1/teams/{team_id}/money/{delta}')
async def v1_teams_money_delta(team_id: int, delta: int, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to adjust wallets. This event has been logged.'
)
try:
this_team = Team.get_by_id(team_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No team found with id {team_id}')
this_team.wallet += delta
if this_team.save() == 1:
return_team = model_to_dict(this_team)
db.close()
return return_team
else:
raise HTTPException(status_code=418, detail='Well slap my ass and call me a teapot; I could not save that team')
@app.patch('/api/v1/teams/{team_id}')
async def v1_teams_patch(
team_id, sname: Optional[str] = None, lname: Optional[str] = None, gmid: Optional[int] = None,
gmname: Optional[str] = None, gsheet: Optional[str] = None, team_value: Optional[int] = None,
collection_value: Optional[int] = None, logo: Optional[str] = None, color: Optional[str] = None,
season: Optional[int] = None, ps_shiny: Optional[int] = None, wallet_delta: Optional[int] = None,
has_guide: Optional[bool] = None, is_ai: Optional[bool] = None, ranking: Optional[int] = None,
token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to delete teams. This event has been logged.'
)
try:
this_team = Team.get_by_id(team_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No team found with id {team_id}')
if sname is not None:
this_team.sname = sname
if lname is not None:
this_team.lname = lname
if gmid is not None:
this_team.gmid = gmid
if gmname is not None:
this_team.gmname = gmname
if gsheet is not None:
this_team.gsheet = gsheet
if team_value is not None:
this_team.team_value = team_value
if collection_value is not None:
this_team.collection_value = collection_value
if logo is not None:
this_team.logo = logo
if color is not None:
this_team.color = color
if season is not None:
this_team.season = season
if ps_shiny is not None:
this_team.career = ps_shiny
if ranking is not None:
this_team.ranking = ranking
if wallet_delta is not None:
this_team.wallet += wallet_delta
if has_guide is not None:
if has_guide:
this_team.has_guide = 1
else:
this_team.has_guide = 0
if is_ai is not None:
if is_ai:
this_team.is_ai = 1
else:
this_team.is_ai = 0
if this_team.save() == 1:
return_team = model_to_dict(this_team)
db.close()
return return_team
else:
raise HTTPException(status_code=418, detail='Well slap my ass and call me a teapot; I could not save that team')
@app.delete('/api/v1/teams/{team_id}')
async def v1_teams_delete(team_id, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to delete teams. This event has been logged.'
)
try:
this_team = Team.get_by_id(team_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No team found with id {team_id}')
count = this_team.delete_instance()
db.close()
if count == 1:
raise HTTPException(status_code=200, detail=f'Team {team_id} has been deleted')
else:
raise HTTPException(status_code=500, detail=f'Team {team_id} was not deleted')
"""
RARITY ENDPOINTS
"""
class RarityModel(pydantic.BaseModel):
value: int
name: str
color: str
@app.get('/api/v1/rarities')
async def v1_rarities_get(value: Optional[int] = None, name: Optional[str] = None, min_value: Optional[int] = None,
max_value: Optional[int] = None, csv: Optional[bool] = None):
all_rarities = Rarity.select()
if all_rarities.count() == 0:
db.close()
raise HTTPException(status_code=404, detail=f'There are no rarities to filter')
if value is not None:
all_rarities = all_rarities.where(Rarity.value == value)
if name is not None:
all_rarities = all_rarities.where(fn.Lower(Rarity.name) == name.lower())
if min_value is not None:
all_rarities = all_rarities.where(Rarity.value >= min_value)
if max_value is not None:
all_rarities = all_rarities.where(Rarity.value <= max_value)
if all_rarities.count() == 0:
db.close()
raise HTTPException(status_code=404, detail=f'No rarities found')
if csv:
data_list = [['id', 'value', 'name']]
for line in all_rarities:
data_list.append(
[
line.id, line.value, line.name
]
)
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = {'count': all_rarities.count(), 'rarities': []}
for x in all_rarities:
return_val['rarities'].append(model_to_dict(x))
db.close()
return return_val
@app.get('/api/v1/rarities/{rarity_id}')
async def v1_rarities_get_one(rarity_id, csv: Optional[bool] = False):
try:
this_rarity = Rarity.get_by_id(rarity_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No rarity found with id {rarity_id}')
if csv:
data_list = [['id', 'value', 'name']]
for line in this_rarity:
data_list.append(
[
line.id, line.value, line.name
]
)
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = model_to_dict(this_rarity)
db.close()
return return_val
@app.post('/api/v1/rarities')
async def v1_rarities_post(rarity: RarityModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to post rarities. This event has been logged.'
)
dupe_team = Rarity.get_or_none(Rarity.name)
if dupe_team:
db.close()
raise HTTPException(status_code=400, detail=f'There is already a rarity using {rarity.name}')
this_rarity = Rarity(
value=rarity.value,
name=rarity.name,
color=rarity.color
)
saved = this_rarity.save()
if saved == 1:
return_val = model_to_dict(this_rarity)
db.close()
return return_val
else:
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that rarity'
)
@app.patch('/api/v1/rarities/{rarity_id}')
async def v1_rarities_patch(
rarity_id, value: Optional[int] = None, name: Optional[str] = None, color: Optional[str] = None,
token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to patch rarities. This event has been logged.'
)
try:
this_rarity = Rarity.get_by_id(rarity_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No rarity found with id {rarity_id}')
if value is not None:
this_rarity.value = value
if name is not None:
this_rarity.name = name
if color is not None:
this_rarity.color = color
if this_rarity.save() == 1:
return_val = model_to_dict(this_rarity)
db.close()
return return_val
else:
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that rarity'
)
@app.delete('/api/v1/rarities/{rarity_id}')
async def v1_rarities_delete(rarity_id, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to delete rarities. This event has been logged.'
)
try:
this_rarity = Rarity.get_by_id(rarity_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No rarity found with id {rarity_id}')
count = this_rarity.delete_instance()
db.close()
if count == 1:
raise HTTPException(status_code=200, detail=f'Rarity {rarity_id} has been deleted')
else:
raise HTTPException(status_code=500, detail=f'Rarity {rarity_id} was not deleted')
"""
CARDSET ENDPOINTS
"""
class CardsetModel(pydantic.BaseModel):
name: str
description: str
event_id: Optional[int] = None
in_packs: Optional[bool] = True
total_cards: int = 0
for_purchase: Optional[bool] = True
ranked_legal: Optional[bool] = True
@app.get('/api/v1/cardsets')
async def v1_cardsets_get(
name: Optional[str] = None, in_desc: Optional[str] = None, event_id: Optional[int] = None,
in_packs: Optional[bool] = None, ranked_legal: Optional[bool] = None, csv: Optional[bool] = None):
all_cardsets = Cardset.select()
if all_cardsets.count() == 0:
db.close()
raise HTTPException(status_code=404, detail=f'There are no cardsets to filter')
if name is not None:
all_cardsets = all_cardsets.where(fn.Lower(Cardset.name) == name.lower())
if in_desc is not None:
all_cardsets = all_cardsets.where(fn.Lower(Cardset.description).contains(in_desc.lower()))
if event_id is not None:
try:
this_event = Event.get_by_id(event_id)
all_cardsets = all_cardsets.where(Cardset.event == this_event)
except Exception as e:
logging.error(f'Failed to find event {event_id}: {e}')
raise HTTPException(status_code=404, detail=f'Event id {event_id} not found')
if in_packs is not None:
all_cardsets = all_cardsets.where(Cardset.in_packs == in_packs)
if ranked_legal is not None:
all_cardsets = all_cardsets.where(Cardset.ranked_legal == ranked_legal)
if all_cardsets.count() == 0:
db.close()
raise HTTPException(status_code=404, detail=f'No cardsets found')
if csv:
data_list = [[
'id', 'name', 'description', 'event_id', 'in_packs', 'for_purchase', 'total_cards', 'ranked_legal'
]]
for line in all_cardsets:
data_list.append(
[
line.id, line.name, line.description, line.event.id if line.event else '', line.in_packs,
line.for_purchase, line.total_cards, line.ranked_legal
]
)
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = {'count': all_cardsets.count(), 'cardsets': []}
for x in all_cardsets:
return_val['cardsets'].append(model_to_dict(x))
db.close()
return return_val
@app.get('/api/v1/cardsets/{cardset_id}')
async def v1_cardsets_get_one(cardset_id, csv: Optional[bool] = False):
try:
this_cardset = Cardset.get_by_id(cardset_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No cardset found with id {cardset_id}')
if csv:
data_list = [
['id', 'name', 'description'],
[this_cardset.id, this_cardset.name, this_cardset.description]
]
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = model_to_dict(this_cardset)
db.close()
return return_val
@app.post('/api/v1/cardsets')
async def v1_cardsets_post(cardset: CardsetModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to post cardsets. This event has been logged.'
)
dupe_set = Cardset.get_or_none(Cardset.name == cardset.name)
if dupe_set:
db.close()
raise HTTPException(status_code=400, detail=f'There is already a cardset using {cardset.name}')
this_cardset = Cardset(**cardset.__dict__)
saved = this_cardset.save()
if saved == 1:
return_val = model_to_dict(this_cardset)
db.close()
return return_val
else:
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that cardset'
)
@app.patch('/api/v1/cardsets/{cardset_id}')
async def v1_cardsets_patch(
cardset_id, name: Optional[str] = None, description: Optional[str] = None, in_packs: Optional[bool] = None,
for_purchase: Optional[bool] = None, total_cards: Optional[int] = None, ranked_legal: Optional[bool] = None,
token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to patch cardsets. This event has been logged.'
)
try:
this_cardset = Cardset.get_by_id(cardset_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No cardset found with id {cardset_id}')
if name is not None:
this_cardset.name = name
if description is not None:
this_cardset.description = description
if in_packs is not None:
this_cardset.in_packs = in_packs
if for_purchase is not None:
this_cardset.for_purchase = for_purchase
if total_cards is not None:
this_cardset.total_cards = total_cards
if ranked_legal is not None:
this_cardset.ranked_legal = ranked_legal
if this_cardset.save() == 1:
return_val = model_to_dict(this_cardset)
db.close()
return return_val
else:
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that rarity'
)
@app.delete('/api/v1/cardsets/{cardset_id}')
async def v1_cardsets_delete(cardset_id, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to delete cardsets. This event has been logged.'
)
try:
this_cardset = Cardset.get_by_id(cardset_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No cardset found with id {cardset_id}')
count = this_cardset.delete_instance()
db.close()
if count == 1:
raise HTTPException(status_code=200, detail=f'Cardset {cardset_id} has been deleted')
else:
raise HTTPException(status_code=500, detail=f'Cardset {cardset_id} was not deleted')
"""
PLAYER ENDPOINTS
"""
class PlayerPydantic(pydantic.BaseModel):
player_id: int
p_name: str
cost: int
image: str
image2: Optional[str] = None
mlbclub: str
franchise: str
cardset_id: int
set_num: int
rarity_id: int
pos_1: str
pos_2: Optional[str] = None
pos_3: Optional[str] = None
pos_4: Optional[str] = None
pos_5: Optional[str] = None
pos_6: Optional[str] = None
pos_7: Optional[str] = None
pos_8: Optional[str] = None
headshot: Optional[str] = None
vanity_card: Optional[str] = None
strat_code: Optional[str] = None
bbref_id: Optional[str] = None
fangr_id: Optional[str] = None
description: str
quantity: Optional[int] = 999
class PlayerModel(pydantic.BaseModel):
players: List[PlayerPydantic]
# NOT A TEMPLATE - BROKE MOLD FOR pos_exclude
@app.get('/api/v1/players')
async def v1_players_get(
name: Optional[str] = None, value: Optional[int] = None, min_cost: Optional[int] = None,
max_cost: Optional[int] = None, has_image2: Optional[bool] = None, mlbclub: Optional[str] = None,
franchise: Optional[str] = None, cardset_id: list = Query(default=None), rarity_id: list = Query(default=None),
pos_include: list = Query(default=None), pos_exclude: list = Query(default=None), has_headshot: Optional[bool] = None,
has_vanity_card: Optional[bool] = None, strat_code: Optional[str] = None, bbref_id: Optional[str] = None,
fangr_id: Optional[str] = None, inc_dex: Optional[bool] = True, in_desc: Optional[str] = None,
flat: Optional[bool] = False, sort_by: Optional[str] = False, cardset_id_exclude: list = Query(default=None),
limit: Optional[int] = None, csv: Optional[bool] = None):
all_players = Player.select()
if all_players.count() == 0:
db.close()
raise HTTPException(status_code=404, detail=f'There are no players to filter')
if name is not None:
all_players = all_players.where(fn.Lower(Player.p_name) == name.lower())
if value is not None:
all_players = all_players.where(Player.cost == value)
if min_cost is not None:
all_players = all_players.where(Player.cost >= min_cost)
if max_cost is not None:
all_players = all_players.where(Player.cost <= max_cost)
if has_image2 is not None:
all_players = all_players.where(Player.image2.is_null(not has_image2))
if mlbclub is not None:
all_players = all_players.where(fn.Lower(Player.mlbclub) == mlbclub.lower())
if franchise is not None:
all_players = all_players.where(fn.Lower(Player.franchise) == franchise.lower())
if cardset_id is not None:
all_players = all_players.where(Player.cardset_id << cardset_id)
if cardset_id_exclude is not None:
all_players = all_players.where(Player.cardset_id.not_in(cardset_id_exclude))
if rarity_id is not None:
all_players = all_players.where(Player.rarity_id << rarity_id)
if pos_include is not None:
p_list = [x.upper() for x in pos_include]
all_players = all_players.where(
(Player.pos_1 << p_list) | (Player.pos_2 << p_list) | (Player.pos_3 << p_list) | (Player.pos_4 << p_list) |
(Player.pos_5 << p_list) | (Player.pos_6 << p_list) | (Player.pos_7 << p_list) | (Player.pos_8 << p_list)
)
if has_headshot is not None:
all_players = all_players.where(Player.headshot.is_null(not has_headshot))
if has_vanity_card is not None:
all_players = all_players.where(Player.vanity_card.is_null(not has_vanity_card))
if strat_code is not None:
all_players = all_players.where(Player.strat_code == strat_code)
if bbref_id is not None:
all_players = all_players.where(Player.bbref_id == bbref_id)
if fangr_id is not None:
all_players = all_players.where(Player.fangr_id == fangr_id)
if in_desc is not None:
all_players = all_players.where(fn.Lower(Player.description).contains(in_desc.lower()))
if sort_by is not None:
if sort_by == 'cost-desc':
all_players = all_players.order_by(-Player.cost)
elif sort_by == 'cost-asc':
all_players = all_players.order_by(Player.cost)
elif sort_by == 'name-asc':
all_players = all_players.order_by(Player.p_name)
elif sort_by == 'name-desc':
all_players = all_players.order_by(-Player.p_name)
elif sort_by == 'rarity-desc':
all_players = all_players.order_by(Player.rarity)
elif sort_by == 'rarity-asc':
all_players = all_players.order_by(-Player.rarity)
final_players = []
# logging.info(f'pos_exclude: {type(pos_exclude)} - {pos_exclude} - is None: {pos_exclude is None}')
for x in all_players:
if pos_exclude is not None and set([x.upper() for x in pos_exclude]).intersection(x.get_all_pos()):
pass
else:
final_players.append(x)
if limit is not None and len(final_players) >= limit:
break
if len(final_players) == 0:
db.close()
raise HTTPException(status_code=404, detail=f'No players found')
if csv:
all_players.order_by(-Player.rarity.value, Player.p_name)
data_list = [['id', 'name', 'value', 'image', 'image2', 'mlbclub', 'franchise', 'cardset', 'rarity', 'pos_1',
'pos_2', 'pos_3', 'pos_4', 'pos_5', 'pos_6', 'pos_7', 'pos_8', 'headshot', 'vanity_card',
'strat_code', 'bbref_id', 'description', 'for_purchase', 'ranked_legal']]
for line in final_players:
data_list.append(
[
line.player_id, line.p_name, line.cost, line.image, line.image2, line.mlbclub, line.franchise,
line.cardset, line.rarity, line.pos_1, line.pos_2, line.pos_3, line.pos_4, line.pos_5, line.pos_6,
line.pos_7, line.pos_8, line.headshot, line.vanity_card, line.strat_code, line.bbref_id,
line.description, line.cardset.for_purchase, line.cardset.ranked_legal
# line.description, line.cardset.in_packs, line.quantity
]
)
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = {'count': len(final_players), 'players': []}
for x in final_players:
this_record = model_to_dict(x, recurse=not flat)
if inc_dex:
this_dex = Paperdex.select().where(Paperdex.player == x)
this_record['paperdex'] = {'count': this_dex.count(), 'paperdex': []}
for y in this_dex:
this_record['paperdex']['paperdex'].append(model_to_dict(y, recurse=False))
return_val['players'].append(this_record)
# return_val['players'].append(model_to_dict(x, recurse=not flat))
db.close()
return return_val
@app.get('/api/v1/players/random')
async def v1_players_get_random(
min_cost: Optional[int] = None, max_cost: Optional[int] = None, in_packs: Optional[bool] = True,
min_rarity: Optional[int] = None, max_rarity: Optional[int] = None, limit: Optional[int] = None,
pos_include: Optional[str] = None, pos_exclude: Optional[str] = None, franchise: Optional[str] = None,
mlbclub: Optional[str] = None, csv: Optional[bool] = None):
all_players = (Player
.select()
.join(Cardset)
.switch(Player)
.join(Rarity)
.order_by(fn.Random()))
if min_cost is not None:
all_players = all_players.where(Player.cost >= min_cost)
if max_cost is not None:
all_players = all_players.where(Player.cost <= max_cost)
if in_packs is not None:
if in_packs:
all_players = all_players.where(Player.cardset.in_packs)
if min_rarity is not None:
all_players = all_players.where(Player.rarity.value >= min_rarity)
if max_rarity is not None:
all_players = all_players.where(Player.rarity.value <= max_rarity)
if pos_include is not None:
all_players = all_players.where(
(fn.lower(Player.pos_1) == pos_include.lower()) | (fn.lower(Player.pos_2) == pos_include.lower()) |
(fn.lower(Player.pos_3) == pos_include.lower()) | (fn.lower(Player.pos_4) == pos_include.lower()) |
(fn.lower(Player.pos_5) == pos_include.lower()) | (fn.lower(Player.pos_6) == pos_include.lower()) |
(fn.lower(Player.pos_7) == pos_include.lower()) | (fn.lower(Player.pos_8) == pos_include.lower())
)
if franchise is not None:
all_players = all_players.where(fn.Lower(Player.franchise) == franchise.lower())
if mlbclub is not None:
all_players = all_players.where(fn.Lower(Player.mlbclub) == mlbclub.lower())
if pos_exclude:
final_players = [x for x in all_players if pos_exclude not in x.get_all_pos()]
else:
final_players = [x for x in all_players]
if limit is not None:
final_players = final_players[:limit]
# if len(final_players) == 0:
# db.close()
# raise HTTPException(status_code=404, detail=f'No players found')
if csv:
data_list = [['id', 'name', 'cost', 'image', 'image2', 'mlbclub', 'franchise', 'cardset', 'rarity', 'pos_1',
'pos_2', 'pos_3', 'pos_4', 'pos_5', 'pos_6', 'pos_7', 'pos_8', 'headshot', 'vanity_card',
'strat_code', 'bbref_id', 'description']]
for line in final_players:
data_list.append(
[
line.id, line.p_name, line.cost, line.image, line.image2,
line.mlbclub, line.franchise, line.cardset.name, line.rarity.name,
line.pos_1, line.pos_2, line.pos_3, line.pos_4, line.pos_5,
line.pos_6, line.pos_7, line.pos_8, line.headshot, line.vanity_card,
line.strat_code, line.bbref_id, line.description
]
)
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = {'count': len(final_players), 'players': []}
for x in final_players:
this_record = model_to_dict(x)
this_dex = Paperdex.select().where(Paperdex.player == x)
this_record['paperdex'] = {'count': this_dex.count(), 'paperdex': []}
for y in this_dex:
this_record['paperdex']['paperdex'].append(model_to_dict(y, recurse=False))
return_val['players'].append(this_record)
# return_val['players'].append(model_to_dict(x))
db.close()
return return_val
@app.get('/api/v1/players/{player_id}')
async def v1_players_get_one(player_id, csv: Optional[bool] = False):
try:
this_player = Player.get_by_id(player_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No player found with id {player_id}')
if csv:
data_list = [['id', 'name', 'cost', 'image', 'image2', 'mlbclub', 'franchise', 'cardset', 'rarity', 'pos_1',
'pos_2', 'pos_3', 'pos_4', 'pos_5', 'pos_6', 'pos_7', 'pos_8', 'headshot', 'vanity_card',
'strat_code', 'bbref_id', 'description']]
return_val = DataFrame(data_list).to_csv(header=False, index=False)
data_list.append(
[
this_player.id, this_player.p_name, this_player.cost, this_player.image, this_player.image2,
this_player.mlbclub, this_player.franchise, this_player.cardset.name, this_player.rarity.name,
this_player.pos_1, this_player.pos_2, this_player.pos_3, this_player.pos_4, this_player.pos_5,
this_player.pos_6, this_player.pos_7, this_player.pos_8, this_player.headshot, this_player.vanity_card,
this_player.strat_code, this_player.bbref_id, this_player.description
]
)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = model_to_dict(this_player)
this_dex = Paperdex.select().where(Paperdex.player == this_player)
return_val['paperdex'] = {'count': this_dex.count(), 'paperdex': []}
for x in this_dex:
return_val['paperdex']['paperdex'].append(model_to_dict(x, recurse=False))
db.close()
return return_val
@app.patch('/api/v1/players/{player_id}')
async def v1_players_patch(
player_id, name: Optional[str] = None, image: Optional[str] = None, image2: Optional[str] = None,
mlbclub: Optional[str] = None, franchise: Optional[str] = None, cardset_id: Optional[int] = None,
rarity_id: Optional[int] = None, pos_1: Optional[str] = None, pos_2: Optional[str] = None,
pos_3: Optional[str] = None, pos_4: Optional[str] = None, pos_5: Optional[str] = None,
pos_6: Optional[str] = None, pos_7: Optional[str] = None, pos_8: Optional[str] = None,
headshot: Optional[str] = None, vanity_card: Optional[str] = None, strat_code: Optional[str] = None,
bbref_id: Optional[str] = None, description: Optional[str] = None, cost: Optional[int] = None,
fangr_id: Optional[str] = None, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to patch players. This event has been logged.'
)
try:
this_player = Player.get_by_id(player_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No player found with id {player_id}')
if cost is not None:
this_player.cost = cost
if name is not None:
this_player.p_name = name
if image is not None:
this_player.image = image
if image2 is not None:
if image2.lower() == 'false':
this_player.image2 = None
else:
this_player.image2 = image2
if mlbclub is not None:
this_player.mlbclub = mlbclub
if franchise is not None:
this_player.franchise = franchise
if cardset_id is not None:
try:
this_cardset = Cardset.get_by_id(cardset_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No cardset found with id {cardset_id}')
this_player.cardset = this_cardset
if rarity_id is not None:
try:
this_rarity = Rarity.get_by_id(rarity_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No rarity found with id {rarity_id}')
this_player.rarity = this_rarity
if pos_1 is not None:
if pos_1 == 'False':
this_player.pos_1 = None
else:
this_player.pos_1 = pos_1
if pos_2 is not None:
if pos_2 == 'False':
this_player.pos_2 = None
else:
this_player.pos_2 = pos_2
if pos_3 is not None:
if pos_3 == 'False':
this_player.pos_3 = None
else:
this_player.pos_3 = pos_3
if pos_4 is not None:
if pos_4 == 'False':
this_player.pos_4 = None
else:
this_player.pos_4 = pos_4
if pos_5 is not None:
if pos_5 == 'False':
this_player.pos_5 = None
else:
this_player.pos_5 = pos_5
if pos_6 is not None:
if pos_6 == 'False':
this_player.pos_6 = None
else:
this_player.pos_6 = pos_6
if pos_7 is not None:
if pos_7 == 'False':
this_player.pos_7 = None
else:
this_player.pos_7 = pos_7
if pos_8 is not None:
if pos_8 == 'False':
this_player.pos_8 = None
else:
this_player.pos_8 = pos_8
if headshot is not None:
this_player.headshot = headshot
if vanity_card is not None:
this_player.vanity_card = vanity_card
if strat_code is not None:
this_player.strat_code = strat_code
if bbref_id is not None:
this_player.bbref_id = bbref_id
if fangr_id is not None:
this_player.fangr_id = fangr_id
if description is not None:
this_player.description = description
if this_player.save() == 1:
return_val = model_to_dict(this_player)
db.close()
return return_val
else:
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that rarity'
)
@app.post('/api/v1/players')
async def v1_players_post(players: PlayerModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to post players. This event has been logged.'
)
new_players = []
for x in players.players:
# this_player = Player(
# player_id=x.player_id,
# p_name=x.p_name,
# cost=x.cost,
# image=x.image,
# image2=x.image2,
# mlbclub=x.mlbclub,
# franchise=x.franchise,
# cardset_id=x.cardset_id,
# rarity_id=x.rarity_id,
# set_num=x.set_num,
# pos_1=x.pos_1,
# pos_2=x.pos_2,
# pos_3=x.pos_3,
# pos_4=x.pos_4,
# pos_5=x.pos_5,
# pos_6=x.pos_6,
# pos_7=x.pos_7,
# pos_8=x.pos_8,
# headshot=x.headshot,
# vanity_card=x.vanity_card,
# strat_code=x.strat_code,
# fangr_id=x.fangr_id,
# bbref_id=x.bbref_id,
# description=x.description
# )
# new_players.append(this_player)
new_players.append({
'player_id': x.player_id,
'p_name': x.p_name,
'cost': x.cost,
'image': x.image,
'image2': x.image2,
'mlbclub': x.mlbclub.title(),
'franchise': x.franchise.title(),
'cardset_id': x.cardset_id,
'rarity_id': x.rarity_id,
'set_num': x.set_num,
'pos_1': x.pos_1,
'pos_2': x.pos_2,
'pos_3': x.pos_3,
'pos_4': x.pos_4,
'pos_5': x.pos_5,
'pos_6': x.pos_6,
'pos_7': x.pos_7,
'pos_8': x.pos_8,
'headshot': x.headshot,
'vanity_card': x.vanity_card,
'strat_code': x.strat_code,
'fangr_id': x.fangr_id,
'bbref_id': x.bbref_id,
'description': x.description
})
logging.info(f'new_players: {new_players}')
with db.atomic():
# Player.bulk_create(new_players, batch_size=15)
for batch in chunked(new_players, 15):
logging.info(f'batch: {batch}')
Player.insert_many(batch).on_conflict_replace().execute()
db.close()
# sheets.update_all_players(SHEETS_AUTH)
raise HTTPException(status_code=200, detail=f'{len(new_players)} players have been added')
# @app.put('/api/v1/players')
# async def v1_players_put(players: PlayerModel, token: str = Depends(oauth2_scheme)):
# if not valid_token(token):
# logging.warning(f'Bad Token: {token}')
# db.close()
# raise HTTPException(
# status_code=401,
# detail='You are not authorized to post players. This event has been logged.'
# )
#
# new_players = []
# for x in players.players:
# try:
# this_player = Player.get_by_id(x.player_id)
# except Exception as e:
# new_players.append({
# 'player_id': x.player_id,
# 'p_name': x.p_name,
# 'cost': x.cost,
# 'image': x.image,
# 'image2': x.image2,
# 'mlbclub': x.mlbclub.title(),
# 'franchise': x.franchise.title(),
# 'cardset_id': x.cardset_id,
# 'rarity_id': x.rarity_id,
# 'set_num': x.set_num,
# 'pos_1': x.pos_1,
# 'pos_2': x.pos_2,
# 'pos_3': x.pos_3,
# 'pos_4': x.pos_4,
# 'pos_5': x.pos_5,
# 'pos_6': x.pos_6,
# 'pos_7': x.pos_7,
# 'pos_8': x.pos_8,
# 'headshot': x.headshot,
# 'vanity_card': x.vanity_card,
# 'strat_code': x.strat_code,
# 'fangr_id': x.fangr_id,
# 'bbref_id': x.bbref_id,
# 'description': x.description
# })
# finally:
#
# @app.patch('/api/v1/players')
# async def v1_players_put(players: PlayerModel, token: str = Depends(oauth2_scheme)):
# if not valid_token(token):
# logging.warning(f'Bad Token: {token}')
# db.close()
# raise HTTPException(
# status_code=401,
# detail='You are not authorized to post players. This event has been logged.'
# )
#
# new_players = []
# for x in players.players:
@app.delete('/api/v1/players/{player_id}')
async def v1_players_delete(player_id, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to delete players. This event has been logged.'
)
try:
this_player = Player.get_by_id(player_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No player found with id {player_id}')
count = this_player.delete_instance()
db.close()
if count == 1:
raise HTTPException(status_code=200, detail=f'Player {player_id} has been deleted')
else:
raise HTTPException(status_code=500, detail=f'Player {player_id} was not deleted')
"""
PACKTYPE ENDPOINTS
"""
class PacktypeModel(pydantic.BaseModel):
name: str
card_count: int
description: str
cost: int
available: Optional[bool] = True
@app.get('/api/v1/packtypes')
async def v1_packtypes_get(
name: Optional[str] = None, card_count: Optional[int] = None, in_desc: Optional[str] = None,
available: Optional[bool] = None, csv: Optional[bool] = None):
all_packtypes = PackType.select()
if all_packtypes.count() == 0:
db.close()
raise HTTPException(status_code=404, detail=f'There are no packtypes to filter')
if name is not None:
all_packtypes = all_packtypes.where(fn.Lower(PackType.name) == name.lower())
if card_count is not None:
all_packtypes = all_packtypes.where(PackType.card_count == card_count)
if in_desc is not None:
all_packtypes = all_packtypes.where(fn.Lower(PackType.description).contains(in_desc.lower()))
if available is not None:
all_packtypes = all_packtypes.where(PackType.available == available)
# if all_packtypes.count() == 0:
# db.close()
# raise HTTPException(status_code=404, detail=f'No packtypes found')
if csv:
data_list = [['id', 'name', 'card_count', 'description']]
for line in all_packtypes:
data_list.append(
[
line.id, line.name, line.card_count, line.description
]
)
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = {'count': all_packtypes.count(), 'packtypes': []}
for x in all_packtypes:
return_val['packtypes'].append(model_to_dict(x))
db.close()
return return_val
@app.get('/api/v1/packtypes/{packtype_id}')
async def v1_packtypes_get_one(packtype_id, csv: Optional[bool] = False):
try:
this_packtype = PackType.get_by_id(packtype_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No packtype found with id {packtype_id}')
if csv:
data_list = [
['id', 'name', 'card_count', 'description'],
[this_packtype.id, this_packtype.name, this_packtype.card_count, this_packtype.description]
]
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = model_to_dict(this_packtype)
db.close()
return return_val
@app.post('/api/v1/packtypes')
async def v1_packtypes_post(packtype: PacktypeModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to post packtypes. This event has been logged.'
)
dupe_packtype = PackType.get_or_none(PackType.name == packtype.name)
if dupe_packtype:
db.close()
raise HTTPException(status_code=400, detail=f'There is already a packtype using {packtype.name}')
this_packtype = PackType(
name=packtype.name,
card_count=packtype.card_count,
description=packtype.description,
cost=packtype.cost,
available=packtype.available
)
saved = this_packtype.save()
if saved == 1:
return_val = model_to_dict(this_packtype)
db.close()
return return_val
else:
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that cardset'
)
@app.patch('/api/v1/packtypes/{packtype_id}')
async def v1_packtypes_patch(
packtype_id, name: Optional[str] = None, card_count: Optional[int] = None, description: Optional[str] = None,
cost: Optional[int] = None, available: Optional[bool] = None, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to patch packtypes. This event has been logged.'
)
try:
this_packtype = PackType.get_by_id(packtype_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No packtype found with id {packtype_id}')
if name is not None:
this_packtype.name = name
if card_count is not None:
this_packtype.card_count = card_count
if description is not None:
this_packtype.description = description
if cost is not None:
this_packtype.cost = cost
if available is not None:
this_packtype.available = available
if this_packtype.save() == 1:
return_val = model_to_dict(this_packtype)
db.close()
return return_val
else:
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that rarity'
)
@app.delete('/api/v1/packtypes/{packtype_id}')
async def v1_packtypes_delete(packtype_id, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to delete packtypes. This event has been logged.'
)
try:
this_packtype = PackType.get_by_id(packtype_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No packtype found with id {packtype_id}')
count = this_packtype.delete_instance()
db.close()
if count == 1:
raise HTTPException(status_code=200, detail=f'Packtype {packtype_id} has been deleted')
else:
raise HTTPException(status_code=500, detail=f'Packtype {packtype_id} was not deleted')
"""
PACK ENDPOINTS
"""
class PackPydantic(pydantic.BaseModel):
team_id: int
pack_type_id: int
open_time: Optional[str] = None
class PackModel(pydantic.BaseModel):
packs: List[PackPydantic]
@app.get('/api/v1/packs')
async def v1_packs_get(
team_id: Optional[int] = None, pack_type_id: Optional[int] = None, opened: Optional[bool] = None,
limit: Optional[int] = None, new_to_old: Optional[bool] = None, csv: Optional[bool] = None):
all_packs = Pack.select()
if all_packs.count() == 0:
db.close()
raise HTTPException(status_code=404, detail=f'There are no packs to filter')
if team_id is not None:
try:
this_team = Team.get_by_id(team_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No team found with id {team_id}')
all_packs = all_packs.where(Pack.team == this_team)
if pack_type_id is not None:
try:
this_pack_type = PackType.get_by_id(pack_type_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No pack type found with id {pack_type_id}')
all_packs = all_packs.where(Pack.pack_type == this_pack_type)
if opened is not None:
all_packs = all_packs.where(Pack.open_time.is_null(not opened))
if limit is not None:
all_packs = all_packs.limit(limit)
if new_to_old is not None:
all_packs = all_packs.order_by(-Pack.id)
# if all_packs.count() == 0:
# db.close()
# raise HTTPException(status_code=404, detail=f'No packs found')
if csv:
data_list = [['id', 'team', 'pack_type', 'open_time']]
for line in all_packs:
data_list.append(
[
line.id, line.team.abbrev, line.pack_type.name,
datetime.fromtimestamp(line.open_time) if line.open_time else None
]
)
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = {'count': all_packs.count(), 'packs': []}
for x in all_packs:
return_val['packs'].append(model_to_dict(x))
db.close()
return return_val
@app.get('/api/v1/packs/{pack_id}')
async def v1_packs_get_one(pack_id, csv: Optional[bool] = False):
try:
this_pack = Pack.get_by_id(pack_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No pack found with id {pack_id}')
if csv:
data_list = [
['id', 'team', 'pack_type', 'open_time'],
[this_pack.id, this_pack.team.abbrev, this_pack.pack_type.name,
datetime.fromtimestamp(this_pack.open_time) if this_pack.open_time else None]
]
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = model_to_dict(this_pack)
db.close()
return return_val
@app.post('/api/v1/packs')
async def v1_packs_post(packs: PackModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to post packs. This event has been logged.'
)
new_packs = []
for x in packs.packs:
this_player = Pack(
team_id=x.team_id,
pack_type_id=x.pack_type_id,
open_time=x.open_time if x.open_time != "" else None
)
new_packs.append(this_player)
with db.atomic():
Pack.bulk_create(new_packs, batch_size=15)
db.close()
raise HTTPException(status_code=200, detail=f'{len(new_packs)} packs have been added')
@app.post('/api/v1/packs/one')
async def v1_packs_post_one(pack: PackPydantic, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to post packs. This event has been logged.'
)
this_pack = Pack(
team_id=pack.team_id,
pack_type_id=pack.pack_type_id,
open_time=pack.open_time
)
saved = this_pack.save()
if saved == 1:
return_val = model_to_dict(this_pack)
db.close()
return return_val
else:
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that cardset'
)
@app.patch('/api/v1/packs/{pack_id}')
async def v1_packs_patch(
pack_id, team_id: Optional[int] = None, pack_type_id: Optional[int] = None, open_time: Optional[int] = None,
token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to patch packs. This event has been logged.'
)
try:
this_pack = Pack.get_by_id(pack_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No pack found with id {pack_id}')
if team_id is not None:
this_pack.team_id = team_id
if pack_type_id is not None:
this_pack.pack_type_id = pack_type_id
if open_time is not None:
this_pack.open_time = open_time
if this_pack.save() == 1:
return_val = model_to_dict(this_pack)
db.close()
return return_val
else:
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that rarity'
)
@app.delete('/api/v1/packs/{pack_id}')
async def v1_packs_delete(pack_id, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to delete packs. This event has been logged.'
)
try:
this_pack = Pack.get_by_id(pack_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No packs found with id {pack_id}')
count = this_pack.delete_instance()
db.close()
if count == 1:
raise HTTPException(status_code=200, detail=f'Pack {pack_id} has been deleted')
else:
raise HTTPException(status_code=500, detail=f'Pack {pack_id} was not deleted')
"""
CARD ENDPOINTS
"""
class CardPydantic(pydantic.BaseModel):
player_id: int
team_id: int
pack_id: int
value: Optional[int] = 0
class CardModel(pydantic.BaseModel):
cards: List[CardPydantic]
@app.get('/api/v1/cards')
async def v1_cards_get(
player_id: Optional[int] = None, team_id: Optional[int] = None, pack_id: Optional[int] = None,
value: Optional[int] = None, min_value: Optional[int] = None, max_value: Optional[int] = None,
order_by: Optional[str] = None, limit: Optional[int] = None, dupes: Optional[bool] = None,
csv: Optional[bool] = None):
all_cards = Card.select()
# if all_cards.count() == 0:
# db.close()
# raise HTTPException(status_code=404, detail=f'There are no cards to filter')
if team_id is not None:
try:
this_team = Team.get_by_id(team_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No team found with id {team_id}')
all_cards = all_cards.where(Card.team == this_team)
if player_id is not None:
try:
this_player = Player.get_by_id(player_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No player found with id {player_id}')
all_cards = all_cards.where(Card.player == this_player)
if pack_id is not None:
try:
this_pack = Pack.get_by_id(pack_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No pack found with id {pack_id}')
all_cards = all_cards.where(Card.pack == this_pack)
if value is not None:
all_cards = all_cards.where(Card.value == value)
if min_value is not None:
all_cards = all_cards.where(Card.value >= min_value)
if max_value is not None:
all_cards = all_cards.where(Card.value <= max_value)
if order_by is not None:
if order_by.lower() == 'new':
all_cards = all_cards.order_by(-Card.id)
if limit is not None:
all_cards = all_cards.limit(limit)
# if all_cards.count() == 0:
# db.close()
# raise HTTPException(status_code=404, detail=f'No cards found')
if csv:
data_list = [['id', 'player', 'cardset', 'rarity', 'team', 'pack', 'value']]
for line in all_cards:
data_list.append(
[
line.id, line.player.p_name, line.player.cardset, line.player.rarity, line.team.abbrev, line.pack,
line.value
]
)
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = {'count': all_cards.count(), 'cards': []}
for x in all_cards:
this_record = model_to_dict(x)
logging.debug(f'this_record: {this_record}')
this_dex = Paperdex.select().where(Paperdex.player == x)
this_record['player']['paperdex'] = {'count': this_dex.count(), 'paperdex': []}
for y in this_dex:
this_record['player']['paperdex']['paperdex'].append(model_to_dict(y, recurse=False))
return_val['cards'].append(this_record)
# return_val['cards'].append(model_to_dict(x))
db.close()
return return_val
@app.get('/api/v1/cards/{card_id}')
async def v1_cards_get_one(card_id, csv: Optional[bool] = False):
try:
this_card = Card.get_by_id(card_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No card found with id {card_id}')
if csv:
data_list = [
['id', 'player', 'team', 'pack', 'value', 'roster1', 'roster2', 'roster3'],
[this_card.id, this_card.player, this_card.team.abbrev, this_card.pack, this_card.value,
this_card.roster1.name, this_card.roster2.name, this_card.roster3.name]
]
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = model_to_dict(this_card)
db.close()
return return_val
@app.post('/api/v1/cards')
async def v1_cards_post(cards: CardModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to post cards. This event has been logged.'
)
last_card = Card.select(Card.id).order_by(-Card.id).limit(1)
lc_id = last_card[0].id
new_cards = []
player_ids = []
# new_dex = []
# now = int(datetime.timestamp(datetime.now()) * 1000)
for x in cards.cards:
this_card = Card(
player_id=x.player_id,
team_id=x.team_id,
pack_id=x.pack_id,
value=x.value
)
Paperdex.get_or_create(team_id=x.team_id, player_id=x.player_id)
player_ids.append(x.player_id)
new_cards.append(this_card)
with db.atomic():
Card.bulk_create(new_cards, batch_size=15)
cost_query = Player.update(cost=Player.cost + 1).where(Player.player_id << player_ids)
cost_query.execute()
# sheets.post_new_cards(SHEETS_AUTH, lc_id)
db.close()
raise HTTPException(status_code=200, detail=f'{len(new_cards)} cards have been added')
@app.post('/api/v1/cards/ai-update')
async def v1_cards_ai_update(token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to update AI cards. This event has been logged.'
)
sheets.send_ai_cards(SHEETS_AUTH)
raise HTTPException(status_code=200, detail=f'Just sent AI cards to sheets')
@app.post('/api/v1/cards/post-update/{starting_id}')
async def v1_cards_post_update(starting_id: int, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to update card lists. This event has been logged.'
)
# sheets.post_new_cards(SHEETS_AUTH, starting_id)
db.close()
raise HTTPException(status_code=200, detail=f'Just sent cards to sheets starting at ID {starting_id}')
@app.post('/api/v1/cards/post-delete')
async def v1_cards_post_delete(del_ids: str, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to delete card lists. This event has been logged.'
)
logging.info(f'del_ids: {del_ids} / type: {type(del_ids)}')
# sheets.post_deletion(SHEETS_AUTH, del_ids.split(','))
# @app.get('/api/v1/cards/{card_id}/sell')
# async def v1_cards_sell(card_id, ts: int = None):
# try:
# this_card = Card.get_by_id(card_id)
# except Exception:
# db.close()
# raise HTTPException(status_code=404, detail=f'No card found with id {card_id}')
#
# this_team = this_card.team
# if ts != this_team.team_hash():
# logging.warning(f'Bad Team Secret: {ts} ({this_team.team_hash()})')
# db.close()
# raise HTTPException(
# status_code=401,
# detail=f'You are not authorized to sell {this_team.abbrev} cards. This event has been logged.'
# )
#
# this_player = this_card.player
# orig_price = this_player.cost
# sell_price = round(this_player.cost * .5)
#
# # credit selling team's wallet
# if this_team.wallet is None:
# this_team.wallet = sell_price
# else:
# this_team.wallet += sell_price
# this_team.save()
#
# # decrease price of player
# this_player.change_on_sell()
# this_card.delete_instance()
#
# # post a notification
# new_notif = Notification(
# created=int_timestamp(datetime.now()),
# title=f'Price Change',
# desc='Modified by buying and selling',
# field_name=f'{this_player.p_name}',
# message=f'From {orig_price}₼ to **{this_player.cost}**₼',
# about=f'Player-{this_player.id}'
# )
# new_notif.save()
#
# raise HTTPException(status_code=200, detail=f'Card {card_id} has been sold for {sell_price} bucks')
@app.patch('/api/v1/cards/{card_id}')
async def v1_cards_patch(
card_id, player_id: Optional[int] = None, team_id: Optional[int] = None, pack_id: Optional[int] = None,
value: Optional[int] = None, roster1_id: Optional[int] = None, roster2_id: Optional[int] = None,
roster3_id: Optional[int] = None, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to patch cards. This event has been logged.'
)
try:
this_card = Card.get_by_id(card_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No card found with id {card_id}')
if player_id is not None:
this_card.player_id = player_id
if team_id is not None:
this_card.team_id = team_id
if pack_id is not None:
this_card.pack_id = pack_id
if value is not None:
this_card.value = value
if roster1_id is not None:
this_card.roster1_id = roster1_id
if roster2_id is not None:
this_card.roster2_id = roster2_id
if roster3_id is not None:
this_card.roster3_id = roster3_id
if this_card.save() == 1:
return_val = model_to_dict(this_card)
db.close()
return return_val
else:
db.close()
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that rarity'
)
@app.delete('/api/v1/cards/{card_id}')
async def v1_cards_delete(card_id, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to delete packs. This event has been logged.'
)
try:
this_card = Card.get_by_id(card_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No cards found with id {card_id}')
count = this_card.delete_instance()
db.close()
if count == 1:
raise HTTPException(status_code=200, detail=f'Card {card_id} has been deleted')
else:
raise HTTPException(status_code=500, detail=f'Card {card_id} was not deleted')
"""
EVENTS ENDPOINTS
"""
class EventModel(pydantic.BaseModel):
name: str
short_desc: str
long_desc: str
url: Optional[str] = None
thumbnail: Optional[str] = None
active: Optional[bool] = False
@app.get('/api/v1/events')
async def v1_events_get(
name: Optional[str] = None, in_desc: Optional[str] = None, active: Optional[bool] = None,
csv: Optional[bool] = None):
all_events = Event.select()
if all_events.count() == 0:
db.close()
raise HTTPException(status_code=404, detail=f'There are no events to filter')
if name is not None:
all_events = all_events.where(fn.Lower(Event.name) == name.lower())
if in_desc is not None:
all_events = all_events.where(
(fn.Lower(Event.short_desc).contains(in_desc.lower())) |
(fn.Lower(Event.long_desc).contains(in_desc.lower()))
)
if active is not None:
all_events = all_events.where(Event.active == active)
if all_events.count() == 0:
db.close()
raise HTTPException(status_code=404, detail=f'No events found')
if csv:
data_list = [['id', 'name', 'short_desc', 'long_desc', 'url', 'thumbnail', 'active']]
for line in all_events:
data_list.append(
[
line.id, line.name, line.short_desc, line.long_desc, line.url, line.thumbnail, line.active
]
)
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = {'count': all_events.count(), 'events': []}
for x in all_events:
return_val['events'].append(model_to_dict(x))
db.close()
return return_val
@app.get('/api/v1/events/{event_id}')
async def v1_events_get_one(event_id, csv: Optional[bool] = False):
try:
this_event = Event.get_by_id(event_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No event found with id {event_id}')
if csv:
data_list = [
['id', 'name', 'short_desc', 'long_desc', 'url', 'thumbnail', 'active'],
[this_event.id, this_event.name, this_event.short_desc, this_event.long_desc, this_event.url,
this_event.thumbnail, this_event.active]
]
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = model_to_dict(this_event)
db.close()
return return_val
@app.post('/api/v1/events')
async def v1_events_post(event: EventModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to post events. This event has been logged.'
)
dupe_event = Event.get_or_none(Event.name == event.name)
if dupe_event:
db.close()
raise HTTPException(status_code=400, detail=f'There is already an event using {event.name}')
this_event = Event(
name=event.name,
short_desc=event.short_desc,
long_desc=event.long_desc,
url=event.url,
thumbnail=event.thumbnail,
active=event.active
)
saved = this_event.save()
if saved == 1:
return_val = model_to_dict(this_event)
db.close()
return return_val
else:
db.close()
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that cardset'
)
@app.patch('/api/v1/events/{event_id}')
async def v1_events_patch(
event_id, name: Optional[str] = None, short_desc: Optional[str] = None, long_desc: Optional[str] = None,
url: Optional[str] = None, thumbnail: Optional[str] = None, active: Optional[bool] = None,
token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to patch events. This event has been logged.'
)
try:
this_event = Event.get_by_id(event_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No event found with id {event_id}')
if name is not None:
this_event.name = name
if short_desc is not None:
this_event.short_desc = short_desc
if long_desc is not None:
this_event.long_desc = long_desc
if url is not None:
this_event.url = url
if thumbnail is not None:
this_event.thumbnail = thumbnail
if active is not None:
this_event.active = active
if this_event.save() == 1:
return_val = model_to_dict(this_event)
db.close()
return return_val
else:
db.close()
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that event'
)
@app.delete('/api/v1/events/{event_id}')
async def v1_events_delete(event_id, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to delete events. This event has been logged.'
)
try:
this_event = Event.get_by_id(event_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No event found with id {event_id}')
count = this_event.delete_instance()
db.close()
if count == 1:
raise HTTPException(status_code=200, detail=f'Event {event_id} has been deleted')
else:
raise HTTPException(status_code=500, detail=f'Event {event_id} was not deleted')
"""
ROSTERS ENDPOINTS
"""
class RosterModel(pydantic.BaseModel):
team_id: int
name: Optional[str] = 'My Roster'
roster_num: int
card_ids: list
@app.get('/api/v1/rosters')
async def v1_rosters_get(team_id: Optional[int] = None, csv: Optional[bool] = None):
all_rosters = Roster.select()
# if all_rosters.count() == 0:
# db.close()
# raise HTTPException(status_code=404, detail=f'There are no rosters to filter')
if team_id:
try:
this_team = Team.get_by_id(team_id)
all_rosters = all_rosters.where(Roster.team == this_team)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No team found with id {team_id}')
if csv:
data_list = [['id', 'roster', 'team_id', 'team_abbrev']]
for line in all_rosters:
data_list.append([
line.id, line.name, line.team, line.team.abbrev
])
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = {'count': all_rosters.count(), 'rosters': []}
for x in all_rosters:
return_val['rosters'].append(model_to_dict(x))
db.close()
return return_val
@app.get('/api/v1/rosters/{roster_id}')
async def v1_rosters_get_one(roster_id, csv: Optional[bool] = None):
try:
this_roster = Roster.get_by_id(roster_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No roster found with id {roster_id}')
if csv:
data_list = [
['id', 'roster', 'team_id', 'team_abbrev'],
[this_roster.id, this_roster.name, this_roster.team, this_roster.team.abbrev]
]
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = model_to_dict(this_roster)
db.close()
return return_val
@app.post('/api/v1/rosters')
async def v1_rosters_post(roster: RosterModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to post rosters. This event has been logged.'
)
c_query = Card.select().where(Card.id << roster.card_ids)
logging.debug(f'c_query: {c_query}')
for card in c_query:
if card.team_id != roster.team_id:
raise HTTPException(
status_code=401,
detail=f'Card ID {card.id} ({card.player.rarity.name} {card.player.p_name}) belongs to '
f'{card.team.abbrev} and cannot be added to your roster.'
)
r_query = Roster.delete().where(Roster.team_id == roster.team_id, Roster.roster_num == roster.roster_num)
logging.debug(f'r_query: {r_query}')
r_query.execute()
this_roster = Roster(
team_id=roster.team_id,
name=roster.name,
roster_num=roster.roster_num,
card_1_id=roster.card_ids[0],
card_2_id=roster.card_ids[1],
card_3_id=roster.card_ids[2],
card_4_id=roster.card_ids[3],
card_5_id=roster.card_ids[4],
card_6_id=roster.card_ids[5],
card_7_id=roster.card_ids[6],
card_8_id=roster.card_ids[7],
card_9_id=roster.card_ids[8],
card_10_id=roster.card_ids[9],
card_11_id=roster.card_ids[10],
card_12_id=roster.card_ids[11],
card_13_id=roster.card_ids[12],
card_14_id=roster.card_ids[13],
card_15_id=roster.card_ids[14],
card_16_id=roster.card_ids[15],
card_17_id=roster.card_ids[16],
card_18_id=roster.card_ids[17],
card_19_id=roster.card_ids[18],
card_20_id=roster.card_ids[19],
card_21_id=roster.card_ids[20],
card_22_id=roster.card_ids[21],
card_23_id=roster.card_ids[22],
card_24_id=roster.card_ids[23],
card_25_id=roster.card_ids[24],
card_26_id=roster.card_ids[25],
)
saved = this_roster.save()
if saved == 1:
return_val = model_to_dict(this_roster, recurse=False)
db.close()
return return_val
else:
db.close()
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that roster'
)
@app.patch('/api/v1/rosters/{roster_id}')
async def v1_rosters_patch(
roster_id, team_id: Optional[int] = None, name: Optional[str] = None, roster_num: Optional[int] = None,
token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to patch rosters. This event has been logged.'
)
try:
this_roster = Roster.get_by_id(roster_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No roster found with id {roster_id}')
if team_id is not None:
this_roster.team_id = team_id
if name is not None:
this_roster.name = name
if roster_num is not None:
this_roster.roster_num = roster_num
if this_roster.save() == 1:
return_val = model_to_dict(this_roster)
db.close()
return return_val
else:
db.close()
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that event'
)
@app.delete('/api/v1/rosters/{roster_id}')
async def v1_rosters_delete(roster_id, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to delete rosters. This event has been logged.'
)
try:
this_roster = Roster.get_by_id(roster_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No roster found with id {roster_id}')
count = this_roster.delete_instance()
db.close()
if count == 1:
raise HTTPException(status_code=200, detail=f'Roster {roster_id} has been deleted')
else:
raise HTTPException(status_code=500, detail=f'Roster {roster_id} was not deleted')
"""
RESULTS ENDPOINTS
"""
class ResultModel(pydantic.BaseModel):
away_team_id: int
home_team_id: int
away_score: int
home_score: int
away_team_value: Optional[int] = None
home_team_value: Optional[int] = None
away_team_ranking: Optional[int] = None
home_team_ranking: Optional[int] = None
scorecard: str
week: int
season: int
ranked: bool
short_game: bool
game_type: str
@app.get('/api/v1/results')
async def v1_results_get(
away_team_id: Optional[int] = None, home_team_id: Optional[int] = None, team_one_id: Optional[int] = None,
team_two_id: Optional[int] = None, away_score_min: Optional[int] = None, away_score_max: Optional[int] = None,
home_score_min: Optional[int] = None, home_score_max: Optional[int] = None, bothscore_min: Optional[int] = None,
bothscore_max: Optional[int] = None, season: Optional[int] = None, week: Optional[int] = None,
week_start: Optional[int] = None, week_end: Optional[int] = None, ranked: Optional[bool] = None,
short_game: Optional[bool] = None, game_type: Optional[str] = None, vs_ai: Optional[bool] = None,
csv: Optional[bool] = None):
all_results = Result.select()
# if all_results.count() == 0:
# db.close()
# raise HTTPException(status_code=404, detail=f'There are no results to filter')
if away_team_id is not None:
try:
this_team = Team.get_by_id(away_team_id)
all_results = all_results.where(Result.away_team == this_team)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No team found with id {away_team_id}')
if home_team_id is not None:
try:
this_team = Team.get_by_id(home_team_id)
all_results = all_results.where(Result.home_team == this_team)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No team found with id {home_team_id}')
if team_one_id is not None:
try:
this_team = Team.get_by_id(team_one_id)
all_results = all_results.where((Result.home_team == this_team) | (Result.away_team == this_team))
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No team found with id {team_one_id}')
if team_two_id is not None:
try:
this_team = Team.get_by_id(team_two_id)
all_results = all_results.where((Result.home_team == this_team) | (Result.away_team == this_team))
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No team found with id {team_two_id}')
if away_score_min is not None:
all_results = all_results.where(Result.away_score >= away_score_min)
if away_score_max is not None:
all_results = all_results.where(Result.away_score <= away_score_max)
if home_score_min is not None:
all_results = all_results.where(Result.home_score >= home_score_min)
if home_score_max is not None:
all_results = all_results.where(Result.home_score <= home_score_max)
if bothscore_min is not None:
all_results = all_results.where((Result.home_score >= bothscore_min) & (Result.away_score >= bothscore_min))
if bothscore_max is not None:
all_results = all_results.where((Result.home_score <= bothscore_max) & (Result.away_score <= bothscore_max))
if season is not None:
all_results = all_results.where(Result.season == season)
if week is not None:
all_results = all_results.where(Result.week == week)
if ranked is not None:
all_results = all_results.where(Result.ranked == ranked)
if short_game is not None:
all_results = all_results.where(Result.short_game == short_game)
if week_start is not None:
all_results = all_results.where(Result.week >= week_start)
if week_end is not None:
all_results = all_results.where(Result.week <= week_end)
if game_type is not None:
all_results = all_results.where(Result.game_type == game_type)
all_results = all_results.order_by(Result.id)
# Not functional
# if vs_ai is not None:
# AwayTeam = Team.alias()
# all_results = all_results.join(
# Team, on=Result.home_team
# ).switch(Result).join(
# Team, on=(AwayTeam.id == Result.away_team).alias('a_team')
# )
#
# if vs_ai:
# all_results = all_results.where(
# (Result.home_team.is_ai == 1) | (Result.a_team.is_ai == 1)
# )
# else:
# all_results = all_results.where(
# (Result.home_team.is_ai == 0) & (Result.a_team.is_ai == 0)
# )
# logging.info(f'Result Query:\n\n{all_results}')
if csv:
data_list = [['id', 'away_abbrev', 'home_abbrev', 'away_score', 'home_score', 'away_tv', 'home_tv',
'game_type', 'season', 'week', 'short_game', 'ranked']]
for line in all_results:
data_list.append([
line.id, line.away_team.abbrev, line.home_team.abbrev, line.away_score, line.home_score,
line.away_team_value, line.home_team_value, line.game_type if line.game_type else 'minor-league',
line.season, line.week, line.short_game, line.ranked
])
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = {'count': all_results.count(), 'results': []}
for x in all_results:
return_val['results'].append(model_to_dict(x))
db.close()
return return_val
@app.get('/api/v1/results/{result_id}')
async def v1_results_get_one(result_id, csv: Optional[bool] = None):
try:
this_result = Result.get_by_id(result_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No result found with id {result_id}')
if csv:
data_list = [
['id', 'away_abbrev', 'home_abbrev', 'away_score', 'home_score', 'away_tv', 'home_tv', 'game_type',
'season', 'week', 'game_type'],
[this_result.id, this_result.away_team.abbrev, this_result.away_team.abbrev, this_result.away_score,
this_result.home_score, this_result.away_team_value, this_result.home_team_value,
this_result.game_type if this_result.game_type else 'minor-league',
this_result.season, this_result.week, this_result.game_type]
]
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = model_to_dict(this_result)
db.close()
return return_val
@app.get('/api/v1/results/team/{team_id}')
async def v1_results_team_get(
team_id: int, season: Optional[int] = None, week: Optional[int] = None, csv: Optional[bool] = False):
all_results = Result.select().where((Result.away_team_id == team_id) | (Result.home_team_id == team_id))
try:
this_team = Team.get_by_id(team_id)
except Exception as e:
logging.error(f'Unknown team id {team_id} trying to pull team results')
raise HTTPException(404, f'Team id {team_id} not found')
if season is not None:
all_results = all_results.where(Result.season == season)
if week is not None:
all_results = all_results.where(Result.week == week)
r_wins, r_loss, c_wins, c_loss = 0, 0, 0, 0
for x in all_results:
if x.away_team_id == team_id:
if x.away_score > x.home_score:
if x.ranked:
r_wins += 1
else:
c_wins += 1
else:
if x.ranked:
r_loss += 1
else:
c_loss += 1
elif x.home_team_id == team_id:
if x.away_score > x.home_score:
if x.ranked:
r_loss += 1
else:
c_loss += 1
else:
if x.ranked:
r_wins += 1
else:
c_wins += 1
if csv:
data_list = [
['team_id', 'ranked_wins', 'ranked_losses', 'casual_wins', 'casual_losses', 'team_ranking'],
[team_id, r_wins, r_loss, c_wins, c_loss, this_team.ranking]
]
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = {
'team': model_to_dict(this_team),
'ranked_wins': r_wins,
'ranked_losses': r_loss,
'casual_wins': c_wins,
'casual_losses': c_loss,
}
db.close()
return return_val
@app.post('/api/v1/results')
async def v1_results_post(result: ResultModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to post results. This event has been logged.'
)
this_result = Result(**result.__dict__)
saved = this_result.save()
if result.ranked:
if not result.away_team_ranking:
db.close()
error = f'Ranked game did not include away team ({result.away_team_id}) ranking.'
logging.error(error)
raise DataError(error)
if not result.home_team_ranking:
db.close()
error = f'Ranked game did not include home team ({result.home_team_id}) ranking.'
logging.error(error)
raise DataError(error)
k_value = 20 if result.short_game else 60
ratio = (result.home_team_ranking - result.away_team_ranking) / 400
exp_score = 1 / (1 + (10 ** ratio))
away_win = True if result.away_score > result.home_score else False
total_delta = k_value * exp_score
high_delta = total_delta * exp_score if exp_score > .5 else total_delta * (1 - exp_score)
low_delta = total_delta - high_delta
# exp_score > .5 means away team is favorite
if exp_score > .5 and away_win:
final_delta = low_delta
away_delta = low_delta * 3
home_delta = -low_delta
elif away_win:
final_delta = high_delta
away_delta = high_delta * 3
home_delta = -high_delta
elif exp_score <= .5 and not away_win:
final_delta = low_delta
away_delta = -low_delta
home_delta = low_delta * 3
elif not away_win:
final_delta = high_delta
away_delta = -high_delta
home_delta = high_delta * 3
else:
final_delta = 0
away_delta = 0
home_delta = 0
logging.debug(f'/results ranking deltas\n\nk_value: {k_value} / ratio: {ratio} / '
f'exp_score: {exp_score} / away_win: {away_win} / total_delta: {total_delta} / '
f'high_delta: {high_delta} / low_delta: {low_delta} / final_delta: {final_delta} / ')
away_team = Team.get_by_id(result.away_team_id)
away_team.ranking += away_delta
away_team.save()
logging.info(f'Just updated {away_team.abbrev} ranking to {away_team.ranking}')
home_team = Team.get_by_id(result.home_team_id)
home_team.ranking += home_delta
home_team.save()
logging.info(f'Just updated {home_team.abbrev} ranking to {home_team.ranking}')
if saved == 1:
return_val = model_to_dict(this_result)
db.close()
return return_val
else:
db.close()
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that roster'
)
@app.patch('/api/v1/results/{result_id}')
async def v1_results_patch(
result_id, away_team_id: Optional[int] = None, home_team_id: Optional[int] = None,
away_score: Optional[int] = None, home_score: Optional[int] = None, away_team_value: Optional[int] = None,
home_team_value: Optional[int] = None, scorecard: Optional[str] = None, week: Optional[int] = None,
season: Optional[int] = None, short_game: Optional[bool] = None, game_type: Optional[str] = None,
token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to patch results. This event has been logged.'
)
try:
this_result = Result.get_by_id(result_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No result found with id {result_id}')
if away_team_id is not None:
this_result.away_team_id = away_team_id
if home_team_id is not None:
this_result.home_team_id = home_team_id
if away_score is not None:
this_result.away_score = away_score
if home_score is not None:
this_result.home_score = home_score
if away_team_value is not None:
this_result.away_team_value = away_team_value
if home_team_value is not None:
this_result.home_team_value = home_team_value
if scorecard is not None:
this_result.scorecard = scorecard
if week is not None:
this_result.week = week
if season is not None:
this_result.season = season
if game_type is not None:
this_result.game_type = game_type
if short_game is not None:
if not short_game:
this_result.short_game = None
else:
this_result.short_game = short_game
if this_result.save() == 1:
return_val = model_to_dict(this_result)
db.close()
return return_val
else:
db.close()
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that event'
)
@app.delete('/api/v1/results/{result_id}')
async def v1_results_delete(result_id, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to post results. This event has been logged.'
)
try:
this_result = Result.get_by_id(result_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No result found with id {result_id}')
count = this_result.delete_instance()
db.close()
if count == 1:
raise HTTPException(status_code=200, detail=f'Result {result_id} has been deleted')
else:
raise HTTPException(status_code=500, detail=f'Result {result_id} was not deleted')
"""
AWARDS ENDPOINTS
"""
class AwardModel(pydantic.BaseModel):
name: str
season: int
timing: str = 'In-Season'
card_id: Optional[int] = None
team_id: Optional[int] = None
image: Optional[str] = None
@app.get('/api/v1/awards')
async def v1_awards_get(
name: Optional[str] = None, season: Optional[int] = None, timing: Optional[str] = None,
card_id: Optional[int] = None, team_id: Optional[int] = None, image: Optional[str] = None,
csv: Optional[bool] = None):
all_awards = Award.select()
if all_awards.count() == 0:
db.close()
raise HTTPException(status_code=404, detail=f'There are no awards to filter')
if name is not None:
all_awards = all_awards.where(Award.name == name)
if season is not None:
all_awards = all_awards.where(Award.season == season)
if timing is not None:
all_awards = all_awards.where(Award.timing == timing)
if card_id is not None:
all_awards = all_awards.where(Award.card_id == card_id)
if team_id is not None:
all_awards = all_awards.where(Award.team_id == team_id)
if image is not None:
all_awards = all_awards.where(Award.image == image)
if csv:
data_list = [['id', 'name', 'season', 'timing', 'card', 'team', 'image']]
for line in all_awards:
data_list.append([
line.id, line.name, line.season, line.timing, line.card, line.team, line.image
])
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = {'count': all_awards.count(), 'awards': []}
for x in all_awards:
return_val['awards'].append(model_to_dict(x))
db.close()
return return_val
@app.get('/api/v1/awards/{award_id}')
async def v1_awards_get_one(award_id, csv: Optional[bool] = None):
try:
this_award = Award.get_by_id(award_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No award found with id {award_id}')
if csv:
data_list = [
['id', 'name', 'season', 'timing', 'card', 'team', 'image'],
[this_award.id, this_award.name, this_award.season, this_award.timing, this_award.card,
this_award.team, this_award.image]
]
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = model_to_dict(this_award)
db.close()
return return_val
@app.post('/api/v1/awards')
async def v1_awards_post(award: AwardModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to post awards. This event has been logged.'
)
this_award = Award(
name=award.name,
season=award.season,
timing=award.season,
card_id=award.card_id,
team_id=award.team_id,
image=award.image
)
saved = this_award.save()
if saved == 1:
return_val = model_to_dict(this_award)
db.close()
return return_val
else:
db.close()
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that roster'
)
@app.delete('/api/v1/awards/{award_id}')
async def v1_awards_delete(award_id, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to delete awards. This event has been logged.'
)
try:
this_award = Award.get_by_id(award_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No award found with id {award_id}')
count = this_award.delete_instance()
db.close()
if count == 1:
raise HTTPException(status_code=200, detail=f'Award {award_id} has been deleted')
else:
raise HTTPException(status_code=500, detail=f'Award {award_id} was not deleted')
"""
REWARD ENDPOINTS
"""
class RewardModel(pydantic.BaseModel):
name: str
season: int
week: int
team_id: int
created: Optional[int] = int(datetime.timestamp(datetime.now())*1000)
@app.get('/api/v1/rewards')
async def v1_rewards_get(
name: Optional[str] = None, in_name: Optional[str] = None, team_id: Optional[int] = None,
season: Optional[int] = None, week: Optional[int] = None, created_after: Optional[int] = None,
flat: Optional[bool] = False, csv: Optional[bool] = None):
all_rewards = Reward.select()
if all_rewards.count() == 0:
db.close()
raise HTTPException(status_code=404, detail=f'There are no rewards to filter')
if name is not None:
all_rewards = all_rewards.where(fn.Lower(Reward.name) == name.lower())
if team_id is not None:
all_rewards = all_rewards.where(Reward.team_id == team_id)
if created_after is not None:
all_rewards = all_rewards.where(Reward.created >= created_after)
if in_name is not None:
all_rewards = all_rewards.where(fn.Lower(Reward.name).contains(in_name.lower()))
if season is not None:
all_rewards = all_rewards.where(Reward.season == season)
if week is not None:
all_rewards = all_rewards.where(Reward.week == week)
if all_rewards.count() == 0:
db.close()
raise HTTPException(status_code=404, detail=f'No rewards found')
if csv:
data_list = [['id', 'name', 'team', 'daily', 'created']]
for line in all_rewards:
data_list.append(
[
line.id, line.name, line.team.id, line.daily, line.created
]
)
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = {'count': all_rewards.count(), 'rewards': []}
for x in all_rewards:
return_val['rewards'].append(model_to_dict(x, recurse=not flat))
db.close()
return return_val
@app.get('/api/v1/rewards/{reward_id}')
async def v1_rewards_get_one(reward_id, csv: Optional[bool] = False):
try:
this_reward = Reward.get_by_id(reward_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No reward found with id {reward_id}')
if csv:
data_list = [
['id', 'name', 'card_count', 'description'],
[this_reward.id, this_reward.name, this_reward.team.id, this_reward.daily, this_reward.created]
]
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = model_to_dict(this_reward)
db.close()
return return_val
@app.post('/api/v1/rewards')
async def v1_rewards_post(reward: RewardModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to post rewards. This event has been logged.'
)
this_reward = Reward(**reward.dict())
saved = this_reward.save()
if saved == 1:
return_val = model_to_dict(this_reward)
db.close()
return return_val
else:
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that cardset'
)
@app.patch('/api/v1/rewards/{reward_id}')
async def v1_rewards_patch(
reward_id, name: Optional[str] = None, team_id: Optional[int] = None, created: Optional[int] = None,
token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to patch rewards. This event has been logged.'
)
try:
this_reward = Reward.get_by_id(reward_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No reward found with id {reward_id}')
if name is not None:
this_reward.name = name
if team_id is not None:
this_reward.team_id = team_id
if created is not None:
this_reward.created = created
if this_reward.save() == 1:
return_val = model_to_dict(this_reward)
db.close()
return return_val
else:
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that rarity'
)
@app.delete('/api/v1/rewards/{reward_id}')
async def v1_rewards_delete(reward_id, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to delete rewards. This event has been logged.'
)
try:
this_reward = Reward.get_by_id(reward_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No reward found with id {reward_id}')
count = this_reward.delete_instance()
db.close()
if count == 1:
raise HTTPException(status_code=200, detail=f'Reward {reward_id} has been deleted')
else:
raise HTTPException(status_code=500, detail=f'Reward {reward_id} was not deleted')
"""
BATTING STATS ENDPOINTS
"""
class BatStat(pydantic.BaseModel):
card_id: int
team_id: int
roster_num: int
vs_team_id: int
pos: str
pa: Optional[int] = 0
ab: Optional[int] = 0
run: Optional[int] = 0
hit: Optional[int] = 0
rbi: Optional[int] = 0
double: Optional[int] = 0
triple: Optional[int] = 0
hr: Optional[int] = 0
bb: Optional[int] = 0
so: Optional[int] = 0
hbp: Optional[int] = 0
sac: Optional[int] = 0
ibb: Optional[int] = 0
gidp: Optional[int] = 0
sb: Optional[int] = 0
cs: Optional[int] = 0
bphr: Optional[int] = 0
bpfo: Optional[int] = 0
bp1b: Optional[int] = 0
bplo: Optional[int] = 0
xch: Optional[int] = 0
xhit: Optional[int] = 0
error: Optional[int] = 0
pb: Optional[int] = 0
sbc: Optional[int] = 0
csc: Optional[int] = 0
week: int
season: int
created: Optional[int] = int(datetime.timestamp(datetime.now())*100000)
game_id: int
class BattingStatModel(pydantic.BaseModel):
stats: List[BatStat]
@app.get('/api/v1/batstats')
async def v1_batstats_get(
card_id: int = None, player_id: int = None, team_id: int = None, vs_team_id: int = None, week: int = None,
season: int = None, week_start: int = None, week_end: int = None, created: int = None, csv: bool = None):
all_stats = BattingStat.select().join(Card).join(Player)
if card_id is not None:
all_stats = all_stats.where(BattingStat.card_id == card_id)
if player_id is not None:
all_stats = all_stats.where(BattingStat.card.player.player_id == player_id)
if team_id is not None:
all_stats = all_stats.where(BattingStat.team_id == team_id)
if vs_team_id is not None:
all_stats = all_stats.where(BattingStat.vs_team_id == vs_team_id)
if week is not None:
all_stats = all_stats.where(BattingStat.week == week)
if season is not None:
all_stats = all_stats.where(BattingStat.season == season)
if week_start is not None:
all_stats = all_stats.where(BattingStat.week >= week_start)
if week_end is not None:
all_stats = all_stats.where(BattingStat.week <= week_end)
if created is not None:
all_stats = all_stats.where(BattingStat.created == created)
# if all_stats.count() == 0:
# db.close()
# raise HTTPException(status_code=404, detail=f'No batting stats found')
if csv:
data_list = [['id', 'card_id', 'player_id', 'cardset', 'team', 'vs_team', 'pos', 'pa', 'ab', 'run', 'hit', 'rbi', 'double',
'triple', 'hr', 'bb', 'so', 'hbp', 'sac', 'ibb', 'gidp', 'sb', 'cs', 'bphr', 'bpfo', 'bp1b',
'bplo', 'xch', 'xhit', 'error', 'pb', 'sbc', 'csc', 'week', 'season', 'created', 'game_id', 'roster_num']]
for line in all_stats:
data_list.append(
[
line.id, line.card.id, line.card.player.player_id, line.card.player.cardset.name, line.team.abbrev, line.vs_team.abbrev,
line.pos, line.pa, line.ab, line.run, line.hit, line.rbi, line.double, line.triple, line.hr,
line.bb, line.so, line.hbp, line.sac, line.ibb, line.gidp, line.sb, line.cs, line.bphr, line.bpfo,
line.bp1b, line.bplo, line.xch, line.xhit, line.error, line.pb, line.sbc, line.csc, line.week,
line.season, line.created, line.game_id, line.roster_num
]
)
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = {'count': all_stats.count(), 'stats': []}
for x in all_stats:
return_val['stats'].append(model_to_dict(x, recurse=False))
db.close()
return return_val
@app.get('/api/v1/batstats/player/{player_id}')
async def v1_batstats_get_card(
player_id: int, team_id: int = None, vs_team_id: int = None, week_start: int = None, week_end: int = None,
csv: bool = None):
all_stats = (BattingStat
.select(fn.COUNT(BattingStat.created).alias('game_count'))
.join(Card)
.group_by(BattingStat.card)
.where(BattingStat.card.player == player_id)).scalar()
if team_id is not None:
all_stats = all_stats.where(BattingStat.team_id == team_id)
if vs_team_id is not None:
all_stats = all_stats.where(BattingStat.vs_team_id == vs_team_id)
if week_start is not None:
all_stats = all_stats.where(BattingStat.week >= week_start)
if week_end is not None:
all_stats = all_stats.where(BattingStat.week <= week_end)
if csv:
data_list = [
[
'pa', 'ab', 'run', 'hit', 'rbi', 'double', 'triple', 'hr', 'bb', 'so', 'hbp', 'sac', 'ibb', 'gidp',
'sb', 'cs', 'bphr', 'bpfo', 'bp1b', 'bplo', 'xch', 'xhit', 'error', 'pb', 'sbc', 'csc',
],[
all_stats.pa_sum, all_stats.ab_sum, all_stats.run, all_stats.hit_sum, all_stats.rbi_sum,
all_stats.double_sum, all_stats.triple_sum, all_stats.hr_sum, all_stats.bb_sum, all_stats.so_sum,
all_stats.hbp_sum, all_stats.sac, all_stats.ibb_sum, all_stats.gidp_sum, all_stats.sb_sum,
all_stats.cs_sum, all_stats.bphr_sum, all_stats.bpfo_sum, all_stats.bp1b_sum, all_stats.bplo_sum,
all_stats.xch, all_stats.xhit_sum, all_stats.error_sum, all_stats.pb_sum, all_stats.sbc_sum,
all_stats.csc_sum
]
]
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
logging.debug(f'stat pull query: {all_stats}\n')
# logging.debug(f'result 0: {all_stats[0]}\n')
for x in all_stats:
logging.debug(f'this_line: {model_to_dict(x)}')
return_val = model_to_dict(all_stats[0])
db.close()
return return_val
@app.post('/api/v1/batstats')
async def v1_batstats_post(stats: BattingStatModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to post stats. This event has been logged.'
)
new_stats = []
for x in stats.stats:
this_stat = BattingStat(
card_id=x.card_id,
team_id=x.team_id,
roster_num=x.roster_num,
vs_team_id=x.vs_team_id,
pos=x.pos,
pa=x.pa,
ab=x.ab,
run=x.run,
hit=x.hit,
rbi=x.rbi,
double=x.double,
triple=x.triple,
hr=x.hr,
bb=x.bb,
so=x.so,
hbp=x.hbp,
sac=x.sac,
ibb=x.ibb,
gidp=x.gidp,
sb=x.sb,
cs=x.cs,
bphr=x.bphr,
bpfo=x.bpfo,
bp1b=x.bp1b,
bplo=x.bplo,
xch=x.xch,
xhit=x.xhit,
error=x.error,
pb=x.pb,
sbc=x.sbc,
csc=x.csc,
week=x.week,
season=x.season,
created=x.created,
game_id=x.game_id
)
new_stats.append(this_stat)
with db.atomic():
BattingStat.bulk_create(new_stats, batch_size=15)
db.close()
raise HTTPException(status_code=200, detail=f'{len(new_stats)} batting lines have been added')
@app.delete('/api/v1/batstats/{stat_id}')
async def v1_rewards_delete(stat_id, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to delete stats. This event has been logged.'
)
try:
this_reward = Reward.get_by_id(stat_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No stat found with id {stat_id}')
count = this_reward.delete_instance()
db.close()
if count == 1:
raise HTTPException(status_code=200, detail=f'Stat {stat_id} has been deleted')
else:
raise HTTPException(status_code=500, detail=f'Stat {stat_id} was not deleted')
"""
PITCHING STATS ENDPOINTS
"""
class PitStat(pydantic.BaseModel):
card_id: int
team_id: int
vs_team_id: int
roster_num: int
ip: float
hit: Optional[int] = 0
run: Optional[int] = 0
erun: Optional[int] = 0
so: Optional[int] = 0
bb: Optional[int] = 0
hbp: Optional[int] = 0
wp: Optional[int] = 0
balk: Optional[int] = 0
hr: Optional[int] = 0
ir: Optional[int] = 0
irs: Optional[int] = 0
gs: Optional[int] = 0
win: Optional[int] = 0
loss: Optional[int] = 0
hold: Optional[int] = 0
sv: Optional[int] = 0
bsv: Optional[int] = 0
week: int
season: int
created: Optional[int] = int(datetime.timestamp(datetime.now())*100000)
game_id: int
class PitchingStatModel(pydantic.BaseModel):
stats: List[PitStat]
@app.get('/api/v1/pitstats')
async def v1_pitstats_get(
card_id: int = None, player_id: int = None, team_id: int = None, vs_team_id: int = None, week: int = None,
season: int = None, week_start: int = None, week_end: int = None, created: int = None, gs: bool = None,
csv: bool = None):
all_stats = PitchingStat.select().join(Card).join(Player)
logging.debug(f'pit query:\n\n{all_stats}')
if card_id is not None:
all_stats = all_stats.where(PitchingStat.card_id == card_id)
if player_id is not None:
all_stats = all_stats.where(PitchingStat.card.player.player_id == player_id)
if team_id is not None:
all_stats = all_stats.where(PitchingStat.team_id == team_id)
if vs_team_id is not None:
all_stats = all_stats.where(PitchingStat.vs_team_id == vs_team_id)
if week is not None:
all_stats = all_stats.where(PitchingStat.week == week)
if season is not None:
all_stats = all_stats.where(PitchingStat.season == season)
if week_start is not None:
all_stats = all_stats.where(PitchingStat.week >= week_start)
if week_end is not None:
all_stats = all_stats.where(PitchingStat.week <= week_end)
if created is not None:
all_stats = all_stats.where(PitchingStat.created <= created)
if gs is not None:
all_stats = all_stats.where(PitchingStat.gs == 1 if gs else 0)
# if all_stats.count() == 0:
# db.close()
# raise HTTPException(status_code=404, detail=f'No pitching stats found')
if csv:
data_list = [['id', 'card_id', 'player_id', 'cardset', 'team', 'vs_team', 'ip', 'hit', 'run', 'erun', 'so', 'bb', 'hbp',
'wp', 'balk', 'hr', 'ir', 'irs', 'gs', 'win', 'loss', 'hold', 'sv', 'bsv', 'week', 'season',
'created', 'game_id', 'roster_num']]
for line in all_stats:
data_list.append(
[
line.id, line.card.id, line.card.player.player_id, line.card.player.cardset.name, line.team.abbrev,
line.vs_team.abbrev, line.ip, line.hit,
line.run, line.erun, line.so, line.bb, line.hbp, line.wp, line.balk, line.hr, line.ir, line.irs,
line.gs, line.win, line.loss, line.hold, line.sv, line.bsv, line.week, line.season, line.created,
line.game_id, line.roster_num
]
)
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = {'count': all_stats.count(), 'stats': []}
for x in all_stats:
return_val['stats'].append(model_to_dict(x, recurse=False))
db.close()
return return_val
@app.post('/api/v1/pitstats')
async def v1_batstats_post(stats: PitchingStatModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to post stats. This event has been logged.'
)
new_stats = []
for x in stats.stats:
this_stat = PitchingStat(
card_id=x.card_id,
team_id=x.team_id,
vs_team_id=x.vs_team_id,
roster_num=x.roster_num,
ip=x.ip,
hit=x.hit,
run=x.run,
erun=x.erun,
so=x.so,
bb=x.bb,
hbp=x.hbp,
wp=x.wp,
balk=x.balk,
hr=x.hr,
ir=x.ir,
irs=x.irs,
gs=x.gs,
win=x.win,
loss=x.loss,
hold=x.hold,
sv=x.sv,
bsv=x.bsv,
week=x.week,
season=x.season,
created=x.created,
game_id=x.game_id
)
new_stats.append(this_stat)
with db.atomic():
PitchingStat.bulk_create(new_stats, batch_size=15)
db.close()
raise HTTPException(status_code=200, detail=f'{len(new_stats)} pitching lines have been added')
@app.delete('/api/v1/pitstats/{stat_id}')
async def v1_rewards_delete(stat_id, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to delete stats. This event has been logged.'
)
try:
this_reward = Reward.get_by_id(stat_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No stat found with id {stat_id}')
count = this_reward.delete_instance()
db.close()
if count == 1:
raise HTTPException(status_code=200, detail=f'Stat {stat_id} has been deleted')
else:
raise HTTPException(status_code=500, detail=f'Stat {stat_id} was not deleted')
"""
NOTIFICATIONS ENDPOINTS
"""
class NotifModel(pydantic.BaseModel):
created: int
title: str
desc: Optional[str] = None
field_name: str
message: str
about: Optional[str] = 'blank'
ack: Optional[bool] = False
@app.get('/api/v1/notifs')
async def v1_notifs_get(
created_after: Optional[int] = None, title: Optional[str] = None, desc: Optional[str] = None,
field_name: Optional[str] = None, in_desc: Optional[str] = None, about: Optional[str] = None,
ack: Optional[bool] = None, csv: Optional[bool] = None):
all_notif = Notification.select()
if all_notif.count() == 0:
db.close()
raise HTTPException(status_code=404, detail=f'There are no notifications to filter')
if created_after is not None:
all_notif = all_notif.where(Notification.created < created_after)
if title is not None:
all_notif = all_notif.where(Notification.title == title)
if desc is not None:
all_notif = all_notif.where(Notification.desc == desc)
if field_name is not None:
all_notif = all_notif.where(Notification.field_name == field_name)
if in_desc is not None:
all_notif = all_notif.where(fn.Lower(Notification.desc).contains(in_desc.lower()))
if about is not None:
all_notif = all_notif.where(Notification.about == about)
if ack is not None:
all_notif = all_notif.where(Notification.ack == ack)
if csv:
data_list = [['id', 'created', 'title', 'desc', 'field_name', 'message', 'about', 'ack']]
for line in all_notif:
data_list.append([
line.id, line.created, line.title, line.desc, line.field_name, line.message, line.about, line.ack
])
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = {'count': all_notif.count(), 'notifs': []}
for x in all_notif:
return_val['notifs'].append(model_to_dict(x))
db.close()
return return_val
@app.get('/api/v1/notifs/{notif_id}')
async def v1_notifs_get_one(notif_id, csv: Optional[bool] = None):
try:
this_notif = Notification.get_by_id(notif_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No notification found with id {notif_id}')
if csv:
data_list = [
['id', 'created', 'title', 'desc', 'field_name', 'message', 'about', 'ack'],
[this_notif.id, this_notif.created, this_notif.title, this_notif.desc, this_notif.field_name,
this_notif.message, this_notif.about, this_notif.ack]
]
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = model_to_dict(this_notif)
db.close()
return return_val
@app.post('/api/v1/notifs')
async def v1_notifs_post(notif: NotifModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to post notifications. This event has been logged.'
)
logging.info(f'new notif: {notif}')
this_notif = Notification(
created=notif.created,
title=notif.title,
desc=notif.desc,
field_name=notif.field_name,
message=notif.message,
about=notif.about,
)
saved = this_notif.save()
if saved == 1:
return_val = model_to_dict(this_notif)
db.close()
return return_val
else:
db.close()
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that notification'
)
@app.patch('/api/v1/notifs/{notif_id}')
async def v1_rewards_patch(
notif_id, created: Optional[int] = None, title: Optional[str] = None, desc: Optional[str] = None,
field_name: Optional[str] = None, message: Optional[str] = None, about: Optional[str] = None,
ack: Optional[bool] = None, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to patch notifications. This event has been logged.'
)
try:
this_notif = Notification.get_by_id(notif_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No notification found with id {notif_id}')
if title is not None:
this_notif.title = title
if desc is not None:
this_notif.desc = desc
if field_name is not None:
this_notif.field_name = field_name
if message is not None:
this_notif.message = message
if about is not None:
this_notif.about = about
if ack is not None:
this_notif.ack = ack
if created is not None:
this_notif.created = created
if this_notif.save() == 1:
return_val = model_to_dict(this_notif)
db.close()
return return_val
else:
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that rarity'
)
@app.delete('/api/v1/notifs/{notif_id}')
async def v1_notifs_delete(notif_id, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to delete notifications. This event has been logged.'
)
try:
this_notif = Notification.get_by_id(notif_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No notification found with id {notif_id}')
count = this_notif.delete_instance()
db.close()
if count == 1:
raise HTTPException(status_code=200, detail=f'Notification {notif_id} has been deleted')
else:
raise HTTPException(status_code=500, detail=f'Notification {notif_id} was not deleted')
"""
PAPERDEX ENDPOINTS
"""
class PaperdexModel(pydantic.BaseModel):
team_id: int
player_id: int
created: Optional[int] = int(datetime.timestamp(datetime.now())*1000)
@app.get('/api/v1/paperdex')
async def v1_paperdex_get(
team_id: Optional[int] = None, player_id: Optional[int] = None, created_after: Optional[int] = None,
cardset_id: Optional[int] = None, created_before: Optional[int] = None, flat: Optional[bool] = False,
csv: Optional[bool] = None):
all_dex = Paperdex.select().join(Player).join(Cardset)
if all_dex.count() == 0:
db.close()
raise HTTPException(status_code=404, detail=f'There are no paperdex to filter')
if team_id is not None:
all_dex = all_dex.where(Paperdex.team_id == team_id)
if player_id is not None:
all_dex = all_dex.where(Paperdex.player_id == player_id)
if cardset_id is not None:
all_dex = all_dex.where(Paperdex.player.cardset.id == cardset_id)
if created_after is not None:
all_dex = all_dex.where(Paperdex.created >= created_after)
if created_before is not None:
all_dex = all_dex.where(Paperdex.created <= created_before)
# if all_dex.count() == 0:
# db.close()
# raise HTTPException(status_code=404, detail=f'No paperdex found')
if csv:
data_list = [['id', 'team_id', 'player_id', 'created']]
for line in all_dex:
data_list.append(
[
line.id, line.team.id, line.player.player_id, line.created
]
)
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = {'count': all_dex.count(), 'paperdex': []}
for x in all_dex:
return_val['paperdex'].append(model_to_dict(x, recurse=not flat))
db.close()
return return_val
@app.get('/api/v1/paperdex/{paperdex_id}')
async def v1_paperdex_get_one(paperdex_id, csv: Optional[bool] = False):
try:
this_dex = Paperdex.get_by_id(paperdex_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No paperdex found with id {paperdex_id}')
if csv:
data_list = [
['id', 'team_id', 'player_id', 'created'],
[this_dex.id, this_dex.team.id, this_dex.player.id, this_dex.created]
]
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = model_to_dict(this_dex)
db.close()
return return_val
@app.post('/api/v1/paperdex')
async def v1_paperdex_post(paperdex: PaperdexModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to post paperdex. This event has been logged.'
)
dupe_dex = Paperdex.get_or_none(Paperdex.team_id == paperdex.team_id, Paperdex.player_id == paperdex.player_id)
if dupe_dex:
return_val = model_to_dict(dupe_dex)
db.close()
return return_val
this_dex = Paperdex(
team_id=paperdex.team_id,
player_id=paperdex.player_id,
created=paperdex.created
)
saved = this_dex.save()
if saved == 1:
return_val = model_to_dict(this_dex)
db.close()
return return_val
else:
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that dex'
)
@app.patch('/api/v1/paperdex/{paperdex_id}')
async def v1_paperdex_patch(
paperdex_id, team_id: Optional[int] = None, player_id: Optional[int] = None, created: Optional[int] = None,
token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to patch paperdex. This event has been logged.'
)
try:
this_dex = Paperdex.get_by_id(paperdex_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No paperdex found with id {paperdex_id}')
if team_id is not None:
this_dex.team_id = team_id
if player_id is not None:
this_dex.player_id = player_id
if created is not None:
this_dex.created = created
if this_dex.save() == 1:
return_val = model_to_dict(this_dex)
db.close()
return return_val
else:
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that rarity'
)
@app.delete('/api/v1/paperdex/{paperdex_id}')
async def v1_paperdex_delete(paperdex_id, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to delete rewards. This event has been logged.'
)
try:
this_dex = Paperdex.get_by_id(paperdex_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No paperdex found with id {paperdex_id}')
count = this_dex.delete_instance()
db.close()
if count == 1:
raise HTTPException(status_code=200, detail=f'Paperdex {this_dex} has been deleted')
else:
raise HTTPException(status_code=500, detail=f'Paperdex {this_dex} was not deleted')
"""
GAMEREWARDS ENDPOINTS
"""
class GameRewardModel(pydantic.BaseModel):
name: str
pack_type_id: Optional[int] = None
player_id: Optional[int] = None
money: Optional[int] = None
@app.get('/api/v1/gamerewards')
async def v1_gamerewards_get(
name: Optional[str] = None, pack_type_id: Optional[int] = None, player_id: Optional[int] = None,
money: Optional[int] = None, csv: Optional[bool] = None):
all_rewards = GameRewards.select()
# if all_rewards.count() == 0:
# db.close()
# raise HTTPException(status_code=404, detail=f'There are no awards to filter')
if name is not None:
all_rewards = all_rewards.where(GameRewards.name == name)
if pack_type_id is not None:
all_rewards = all_rewards.where(GameRewards.pack_type_id == pack_type_id)
if player_id is not None:
all_rewards = all_rewards.where(GameRewards.player_id == player_id)
if money is not None:
all_rewards = all_rewards.where(GameRewards.money == money)
if csv:
data_list = [['id', 'pack_type_id', 'player_id', 'money']]
for line in all_rewards:
data_list.append([
line.id, line.pack_type_id if line.pack_type else None, line.player_id if line.player else None,
line.money
])
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = {'count': all_rewards.count(), 'gamerewards': []}
for x in all_rewards:
return_val['gamerewards'].append(model_to_dict(x))
db.close()
return return_val
@app.get('/api/v1/gamerewards/{gameaward_id}')
async def v1_gamerewards_get_one(gamereward_id, csv: Optional[bool] = None):
try:
this_game_reward = GameRewards.get_by_id(gamereward_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No game reward found with id {gamereward_id}')
if csv:
data_list = [
['id', 'pack_type_id', 'player_id', 'money'],
[this_game_reward.id, this_game_reward.pack_type_id if this_game_reward.pack_type else None,
this_game_reward.player_id if this_game_reward.player else None, this_game_reward.money]
]
return_val = DataFrame(data_list).to_csv(header=False, index=False)
db.close()
return Response(content=return_val, media_type='text/csv')
else:
return_val = model_to_dict(this_game_reward)
db.close()
return return_val
@app.post('/api/v1/gamerewards')
async def v1_gamerewards_post(game_reward: GameRewardModel, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to post game rewards. This event has been logged.'
)
this_award = GameRewards(
name=game_reward.name,
pack_type_id=game_reward.pack_type_id,
player_id=game_reward.player_id,
money=game_reward.money
)
saved = this_award.save()
if saved == 1:
return_val = model_to_dict(this_award)
db.close()
return return_val
else:
db.close()
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that roster'
)
@app.patch('/api/v1/gamerewards/{game_reward_id}')
async def v1_gamerewards_patch(
game_reward_id: int, name: Optional[str] = None, pack_type_id: Optional[int] = None,
player_id: Optional[int] = None, money: Optional[int] = None, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to patch gamerewards. This event has been logged.'
)
try:
this_game_reward = GameRewards.get_by_id(game_reward_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No game reward found with id {game_reward_id}')
if name is not None:
this_game_reward.name = name
if pack_type_id is not None:
if not pack_type_id:
this_game_reward.pack_type_id = None
else:
this_game_reward.pack_type_id = pack_type_id
if player_id is not None:
if not player_id:
this_game_reward.player_id = None
else:
this_game_reward.player_id = player_id
if money is not None:
if not money:
this_game_reward.money = None
else:
this_game_reward.money = money
if this_game_reward.save() == 1:
return_val = model_to_dict(this_game_reward)
db.close()
return return_val
else:
raise HTTPException(
status_code=418,
detail='Well slap my ass and call me a teapot; I could not save that rarity'
)
@app.delete('/api/v1/gamerewards/{gamereward_id}')
async def v1_gamerewards_delete(gamereward_id, token: str = Depends(oauth2_scheme)):
if not valid_token(token):
logging.warning(f'Bad Token: {token}')
db.close()
raise HTTPException(
status_code=401,
detail='You are not authorized to delete awards. This event has been logged.'
)
try:
this_award = GameRewards.get_by_id(gamereward_id)
except Exception:
db.close()
raise HTTPException(status_code=404, detail=f'No award found with id {gamereward_id}')
count = this_award.delete_instance()
db.close()
if count == 1:
raise HTTPException(status_code=200, detail=f'Game Reward {gamereward_id} has been deleted')
else:
raise HTTPException(status_code=500, detail=f'Game Reward {gamereward_id} was not deleted')