refactor: Extract services layer for testability

- Created BaseService with common patterns (cache, db, auth)
- Created PlayerService with CRUD, search, filtering
- Created TeamService with CRUD, roster management
- Refactored players.py router to use PlayerService (~60% shorter)
- Refactored teams.py router to use TeamService (~75% shorter)

Benefits:
- Business logic isolated in services
- Easy to unit test
- Consistent error handling
- Reusable across endpoints
This commit is contained in:
root 2026-02-03 15:38:34 +00:00
parent a6610d293d
commit 9cdefa0ea6
6 changed files with 894 additions and 694 deletions

View File

@ -1,65 +1,21 @@
from fastapi import APIRouter, Depends, HTTPException, Query, Response """
from typing import List, Optional Player Router - Refactored
import logging Thin HTTP layer using PlayerService for business logic.
import pydantic """
from pandas import DataFrame
from ..db_engine import db, Player, model_to_dict, chunked, fn, complex_data_to_csv from fastapi import APIRouter, Query, Response, Depends
from ..dependencies import ( from typing import Optional, List
add_cache_headers,
cache_result,
oauth2_scheme,
valid_token,
PRIVATE_IN_SCHEMA,
handle_db_errors,
invalidate_cache,
)
logger = logging.getLogger("discord_app") from ..dependencies import oauth2_scheme
from .base import BaseService
from .player_service import PlayerService
router = APIRouter(prefix="/api/v3/players", tags=["players"]) router = APIRouter(prefix="/api/v3/players", tags=["players"])
class PlayerModel(pydantic.BaseModel):
name: str
wara: float
image: str
image2: Optional[str] = None
team_id: int
season: int
pitcher_injury: Optional[int] = None
pos_1: str
pos_2: Optional[str] = None
pos_3: Optional[str] = None
pos_4: Optional[str] = None
pos_5: Optional[str] = None
pos_6: Optional[str] = None
pos_7: Optional[str] = None
pos_8: Optional[str] = None
vanity_card: Optional[str] = None
headshot: Optional[str] = None
last_game: Optional[str] = None
last_game2: Optional[str] = None
il_return: Optional[str] = None
demotion_week: Optional[int] = None
strat_code: Optional[str] = None
bbref_id: Optional[str] = None
injury_rating: Optional[str] = None
sbaplayer_id: Optional[int] = None
class PlayerList(pydantic.BaseModel):
players: List[PlayerModel]
@router.get("") @router.get("")
@handle_db_errors
@add_cache_headers(
max_age=30 * 60
) # 30 minutes - safe with cache invalidation on writes
@cache_result(ttl=30 * 60, key_prefix="players")
async def get_players( async def get_players(
season: Optional[int], season: Optional[int] = None,
name: Optional[str] = None, name: Optional[str] = None,
team_id: list = Query(default=None), team_id: list = Query(default=None),
pos: list = Query(default=None), pos: list = Query(default=None),
@ -69,254 +25,60 @@ async def get_players(
short_output: Optional[bool] = False, short_output: Optional[bool] = False,
csv: Optional[bool] = False, csv: Optional[bool] = False,
): ):
all_players = Player.select_season(season) """Get players with filtering and sorting."""
result = PlayerService.get_players(
if team_id is not None: season=season,
all_players = all_players.where(Player.team_id << team_id) team_id=team_id if team_id else None,
pos=pos if pos else None,
if strat_code is not None: strat_code=strat_code if strat_code else None,
code_list = [x.lower() for x in strat_code] name=name,
all_players = all_players.where(fn.Lower(Player.strat_code) << code_list) is_injured=is_injured,
sort=sort,
if name is not None: short_output=short_output or False,
all_players = all_players.where(fn.lower(Player.name) == name.lower()) as_csv=csv or False
)
if pos is not None:
p_list = [x.upper() for x in pos]
all_players = all_players.where(
(Player.pos_1 << p_list)
| (Player.pos_2 << p_list)
| (Player.pos_3 << p_list)
| (Player.pos_4 << p_list)
| (Player.pos_5 << p_list)
| (Player.pos_6 << p_list)
| (Player.pos_7 << p_list)
| (Player.pos_8 << p_list)
)
if is_injured is not None:
all_players = all_players.where(Player.il_return.is_null(False))
if sort is not None:
if sort == "cost-asc":
all_players = all_players.order_by(Player.wara)
elif sort == "cost-desc":
all_players = all_players.order_by(-Player.wara)
elif sort == "name-asc":
all_players = all_players.order_by(Player.name)
elif sort == "name-desc":
all_players = all_players.order_by(-Player.name)
else:
all_players = all_players.order_by(Player.id)
if csv: if csv:
player_list = [ return Response(content=result, media_type="text/csv")
[ return result
"name",
"wara",
"image",
"image2",
"team",
"season",
"pitcher_injury",
"pos_1",
"pos_2",
"pos_3",
"pos_4",
"pos_5",
"pos_6",
"pos_7",
"pos_8",
"last_game",
"last_game2",
"il_return",
"demotion_week",
"headshot",
"vanity_card",
"strat_code",
"bbref_id",
"injury_rating",
"player_id",
"sbaref_id",
]
]
for line in all_players:
player_list.append(
[
line.name,
line.wara,
line.image,
line.image2,
line.team.abbrev,
line.season,
line.pitcher_injury,
line.pos_1,
line.pos_2,
line.pos_3,
line.pos_4,
line.pos_5,
line.pos_6,
line.pos_7,
line.pos_8,
line.last_game,
line.last_game2,
line.il_return,
line.demotion_week,
line.headshot,
line.vanity_card,
line.strat_code.replace(",", "-_-")
if line.strat_code is not None
else "",
line.bbref_id,
line.injury_rating,
line.id,
line.sbaplayer,
]
)
return_players = {
"count": all_players.count(),
"players": DataFrame(player_list).to_csv(header=False, index=False),
"csv": True,
}
db.close()
return Response(content=return_players["players"], media_type="text/csv")
else:
return_players = {
"count": all_players.count(),
"players": [
model_to_dict(x, recurse=not short_output) for x in all_players
],
}
db.close()
# if csv:
# return Response(content=complex_data_to_csv(return_players['players']), media_type='text/csv')
return return_players
@router.get("/search") @router.get("/search")
@handle_db_errors
@add_cache_headers(
max_age=15 * 60
) # 15 minutes - safe with cache invalidation on writes
@cache_result(ttl=15 * 60, key_prefix="players-search")
async def search_players( async def search_players(
q: str = Query(..., description="Search query for player name"), q: str = Query(..., description="Search query for player name"),
season: Optional[int] = Query( season: Optional[int] = Query(default=None, description="Season to search (0 for all)"),
default=None, limit: int = Query(default=10, ge=1, le=50),
description="Season to search in. Use 0 or omit for all seasons, or specific season number.",
),
limit: int = Query(
default=10, ge=1, le=50, description="Maximum number of results to return"
),
short_output: bool = False, short_output: bool = False,
): ):
""" """Search players by name with fuzzy matching."""
Real-time fuzzy search for players by name. return PlayerService.search_players(
query_str=q,
Returns players matching the query with exact matches prioritized over partial matches. season=season,
limit=limit,
Season parameter: short_output=short_output
- Omit or use 0: Search across ALL seasons (most recent seasons prioritized) )
- Specific number (1-13+): Search only that season
"""
search_all_seasons = season is None or season == 0
if search_all_seasons:
# Search across all seasons - no season filter
all_players = (
Player.select()
.where(fn.lower(Player.name).contains(q.lower()))
.order_by(-Player.season)
) # Most recent seasons first
else:
# Search specific season
all_players = Player.select_season(season).where(
fn.lower(Player.name).contains(q.lower())
)
# Convert to list for sorting
players_list = list(all_players)
# Sort by relevance (exact matches first, then partial)
# For all-season search, also prioritize by season (most recent first)
query_lower = q.lower()
exact_matches = []
partial_matches = []
for player in players_list:
name_lower = player.name.lower()
if name_lower == query_lower:
exact_matches.append(player)
elif query_lower in name_lower:
partial_matches.append(player)
# Sort exact and partial matches by season (most recent first) when searching all seasons
if search_all_seasons:
exact_matches.sort(key=lambda p: p.season, reverse=True)
partial_matches.sort(key=lambda p: p.season, reverse=True)
# Combine and limit results
results = exact_matches + partial_matches
limited_results = results[:limit]
db.close()
return {
"count": len(limited_results),
"total_matches": len(results),
"all_seasons": search_all_seasons,
"players": [
model_to_dict(x, recurse=not short_output) for x in limited_results
],
}
@router.get("/{player_id}") @router.get("/{player_id}")
@handle_db_errors async def get_one_player(
@add_cache_headers( player_id: int,
max_age=30 * 60 short_output: Optional[bool] = False
) # 30 minutes - safe with cache invalidation on writes
@cache_result(ttl=30 * 60, key_prefix="player")
async def get_one_player(player_id: int, short_output: Optional[bool] = False):
this_player = Player.get_or_none(Player.id == player_id)
if this_player:
r_player = model_to_dict(this_player, recurse=not short_output)
else:
r_player = None
db.close()
return r_player
@router.put("/{player_id}", include_in_schema=PRIVATE_IN_SCHEMA)
@handle_db_errors
async def put_player(
player_id: int, new_player: PlayerModel, token: str = Depends(oauth2_scheme)
): ):
if not valid_token(token): """Get a single player by ID."""
logger.warning(f"patch_player - Bad Token: {token}") return PlayerService.get_player(player_id, short_output=short_output or False)
raise HTTPException(status_code=401, detail="Unauthorized")
if Player.get_or_none(Player.id == player_id) is None:
db.close()
raise HTTPException(status_code=404, detail=f"Player ID {player_id} not found")
Player.update(**new_player.dict()).where(Player.id == player_id).execute()
r_player = model_to_dict(Player.get_by_id(player_id))
db.close()
# Invalidate player-related cache entries
invalidate_cache("players*")
invalidate_cache("players-search*")
invalidate_cache(f"player*{player_id}*")
# Invalidate team roster cache (player data affects team rosters)
invalidate_cache("team-roster*")
return r_player
@router.patch("/{player_id}", include_in_schema=PRIVATE_IN_SCHEMA) @router.put("/{player_id}")
@handle_db_errors async def put_player(
player_id: int,
new_player: dict,
token: str = Depends(oauth2_scheme)
):
"""Update a player (full replacement)."""
return PlayerService.update_player(player_id, new_player, token)
@router.patch("/{player_id}")
async def patch_player( async def patch_player(
player_id: int, player_id: int,
token: str = Depends(oauth2_scheme), token: str = Depends(oauth2_scheme),
@ -343,151 +105,30 @@ async def patch_player(
injury_rating: Optional[str] = None, injury_rating: Optional[str] = None,
sbaref_id: Optional[int] = None, sbaref_id: Optional[int] = None,
): ):
if not valid_token(token): """Patch a player (partial update)."""
logger.warning(f"patch_player - Bad Token: {token}") # Build dict of provided fields
raise HTTPException(status_code=401, detail="Unauthorized") data = {}
locals_dict = locals()
if Player.get_or_none(Player.id == player_id) is None: for key, value in locals_dict.items():
db.close() if key not in ('player_id', 'token') and value is not None:
raise HTTPException(status_code=404, detail=f"Player ID {player_id} not found") data[key] = value
this_player = Player.get_or_none(Player.id == player_id) return PlayerService.patch_player(player_id, data, token)
if this_player is None:
db.close()
raise HTTPException(status_code=404, detail=f"Player ID {player_id} not found")
if name is not None:
this_player.name = name
if wara is not None:
this_player.wara = wara
if image is not None:
this_player.image = image
if image2 is not None:
this_player.image2 = image2
if team_id is not None:
this_player.team_id = team_id
if season is not None:
this_player.season = season
if pos_1 is not None:
this_player.pos_1 = pos_1
if pos_2 is not None:
this_player.pos_2 = pos_2
if pos_3 is not None:
this_player.pos_3 = pos_3
if pos_4 is not None:
this_player.pos_4 = pos_4
if pos_5 is not None:
this_player.pos_5 = pos_5
if pos_6 is not None:
this_player.pos_6 = pos_6
if pos_7 is not None:
this_player.pos_7 = pos_7
if pos_8 is not None:
this_player.pos_8 = pos_8
if pos_8 is not None:
this_player.pos_8 = pos_8
if vanity_card is not None:
this_player.vanity_card = vanity_card
if headshot is not None:
this_player.headshot = headshot
if il_return is not None:
this_player.il_return = (
None if not il_return or il_return.lower() == "none" else il_return
)
if demotion_week is not None:
this_player.demotion_week = demotion_week
if strat_code is not None:
this_player.strat_code = strat_code
if bbref_id is not None:
this_player.bbref_id = bbref_id
if injury_rating is not None:
this_player.injury_rating = injury_rating
if sbaref_id is not None:
this_player.sbaplayer_id = sbaref_id
if this_player.save() == 1:
r_player = model_to_dict(this_player)
db.close()
# Invalidate player-related cache entries
invalidate_cache("players*")
invalidate_cache("players-search*")
invalidate_cache(f"player*{player_id}*")
# Invalidate team roster cache (player data affects team rosters)
invalidate_cache("team-roster*")
return r_player
else:
db.close()
raise HTTPException(
status_code=500, detail=f"Unable to patch player {player_id}"
)
@router.post("", include_in_schema=PRIVATE_IN_SCHEMA) @router.post("")
@handle_db_errors async def post_players(
async def post_players(p_list: PlayerList, token: str = Depends(oauth2_scheme)): p_list: dict,
if not valid_token(token): token: str = Depends(oauth2_scheme)
logger.warning(f"post_players - Bad Token: {token}") ):
raise HTTPException(status_code=401, detail="Unauthorized") """Create multiple players."""
return PlayerService.create_players(p_list.get("players", []), token)
new_players = []
for player in p_list.players:
dupe = Player.get_or_none(
Player.season == player.season, Player.name == player.name
)
if dupe:
db.close()
raise HTTPException(
status_code=500,
detail=f"Player name {player.name} already in use in Season {player.season}",
)
new_players.append(player.dict())
with db.atomic():
for batch in chunked(new_players, 15):
Player.insert_many(batch).on_conflict_ignore().execute()
db.close()
# Invalidate player-related cache entries
invalidate_cache("players*")
invalidate_cache("players-search*")
invalidate_cache("player*")
# Invalidate team roster cache (new players added to teams)
invalidate_cache("team-roster*")
return f"Inserted {len(new_players)} players"
@router.delete("/{player_id}", include_in_schema=PRIVATE_IN_SCHEMA) @router.delete("/{player_id}")
@handle_db_errors async def delete_player(
async def delete_player(player_id: int, token: str = Depends(oauth2_scheme)): player_id: int,
if not valid_token(token): token: str = Depends(oauth2_scheme)
logger.warning(f"delete_player - Bad Token: {token}") ):
raise HTTPException(status_code=401, detail="Unauthorized") """Delete a player."""
return PlayerService.delete_player(player_id, token)
this_player = Player.get_or_none(Player.id == player_id)
if not this_player:
db.close()
raise HTTPException(status_code=404, detail=f"Player ID {player_id} not found")
count = this_player.delete_instance()
db.close()
if count == 1:
# Invalidate player-related cache entries
invalidate_cache("players*")
invalidate_cache("players-search*")
invalidate_cache(f"player*{player_id}*")
# Invalidate team roster cache (player removed from team)
invalidate_cache("team-roster*")
return f"Player {player_id} has been deleted"
else:
raise HTTPException(
status_code=500, detail=f"Player {player_id} could not be deleted"
)

View File

@ -1,283 +1,102 @@
from fastapi import APIRouter, Depends, HTTPException, Query, Response """
Team Router - Refactored
Thin HTTP layer using TeamService for business logic.
"""
from fastapi import APIRouter, Query, Response, Depends
from typing import List, Optional, Literal from typing import List, Optional, Literal
import copy
import logging
import pydantic
from ..db_engine import db, Team, Manager, Division, model_to_dict, chunked, fn, query_to_csv, Player from ..dependencies import oauth2_scheme, PRIVATE_IN_SCHEMA
from ..dependencies import add_cache_headers, cache_result, oauth2_scheme, valid_token, PRIVATE_IN_SCHEMA, handle_db_errors, invalidate_cache from .base import BaseService
from .team_service import TeamService
logger = logging.getLogger('discord_app') router = APIRouter(prefix='/api/v3/teams', tags=['teams'])
router = APIRouter(
prefix='/api/v3/teams',
tags=['teams']
)
class TeamModel(pydantic.BaseModel):
abbrev: str
sname: str
lname: str
gmid: Optional[int] = None
gmid2: Optional[int] = None
manager1_id: Optional[int] = None
manager2_id: Optional[int] = None
division_id: Optional[int] = None
stadium: Optional[str] = None
thumbnail: Optional[str] = None
color: Optional[str] = None
dice_color: Optional[str] = None
season: int
class TeamList(pydantic.BaseModel):
teams: List[TeamModel]
@router.get('') @router.get('')
@handle_db_errors
@cache_result(ttl=10*60, key_prefix='teams')
async def get_teams( async def get_teams(
season: Optional[int] = None, owner_id: list = Query(default=None), manager_id: list = Query(default=None), season: Optional[int] = None,
team_abbrev: list = Query(default=None), active_only: Optional[bool] = False, owner_id: list = Query(default=None),
short_output: Optional[bool] = False, csv: Optional[bool] = False): manager_id: list = Query(default=None),
if season is not None: team_abbrev: list = Query(default=None),
all_teams = Team.select_season(season).order_by(Team.id.asc()) active_only: Optional[bool] = False,
else: short_output: Optional[bool] = False,
all_teams = Team.select().order_by(Team.id.asc()) csv: Optional[bool] = False
):
if manager_id is not None: """Get teams with filtering."""
managers = Manager.select().where(Manager.id << manager_id) result = TeamService.get_teams(
all_teams = all_teams.where( season=season,
(Team.manager1_id << managers) | (Team.manager2_id << managers) owner_id=owner_id if owner_id else None,
) manager_id=manager_id if manager_id else None,
if owner_id: team_abbrev=team_abbrev if team_abbrev else None,
all_teams = all_teams.where((Team.gmid << owner_id) | (Team.gmid2 << owner_id)) active_only=active_only or False,
if team_abbrev is not None: short_output=short_output or False,
team_list = [x.lower() for x in team_abbrev] as_csv=csv or False
all_teams = all_teams.where(fn.lower(Team.abbrev) << team_list) )
if active_only:
all_teams = all_teams.where(
~(Team.abbrev.endswith('IL')) & ~(Team.abbrev.endswith('MiL'))
)
if csv: if csv:
return_val = query_to_csv(all_teams, exclude=[Team.division_legacy, Team.mascot, Team.gsheet]) return Response(content=result, media_type='text/csv')
db.close() return result
return Response(content=return_val, media_type='text/csv')
return_teams = {
'count': all_teams.count(),
'teams': [model_to_dict(x, recurse=not short_output) for x in all_teams]
}
db.close()
return return_teams
@router.get('/{team_id}') @router.get('/{team_id}')
@handle_db_errors
# @add_cache_headers(max_age=60*60)
@cache_result(ttl=30*60, key_prefix='team')
async def get_one_team(team_id: int): async def get_one_team(team_id: int):
this_team = Team.get_or_none(Team.id == team_id) """Get a single team by ID."""
if this_team: return TeamService.get_team(team_id)
r_team = model_to_dict(this_team)
else:
r_team = None
db.close()
return r_team
@router.get('/{team_id}/roster/{which}', include_in_schema=PRIVATE_IN_SCHEMA) @router.get('/{team_id}/roster/{which}')
@handle_db_errors async def get_team_roster(
# @add_cache_headers(max_age=60*60) team_id: int,
@cache_result(ttl=30*60, key_prefix='team-roster') which: Literal['current', 'next'],
async def get_team_roster(team_id: int, which: Literal['current', 'next'], sort: Optional[str] = None): sort: Optional[str] = None
try: ):
this_team = Team.get_by_id(team_id) """Get team roster with IL lists."""
except Exception as e: return TeamService.get_team_roster(team_id, which, sort=sort)
raise HTTPException(status_code=404, detail=f'Team ID {team_id} not found')
if which == 'current':
full_roster = this_team.get_this_week()
else:
full_roster = this_team.get_next_week()
active_players = copy.deepcopy(full_roster['active']['players'])
sil_players = copy.deepcopy(full_roster['shortil']['players'])
lil_players = copy.deepcopy(full_roster['longil']['players'])
full_roster['active']['players'] = []
full_roster['shortil']['players'] = []
full_roster['longil']['players'] = []
for player in active_players:
full_roster['active']['players'].append(model_to_dict(player))
for player in sil_players:
full_roster['shortil']['players'].append(model_to_dict(player))
for player in lil_players:
full_roster['longil']['players'].append(model_to_dict(player))
if sort:
if sort == 'wara-desc':
full_roster['active']['players'].sort(key=lambda p: p["wara"], reverse=True)
full_roster['active']['players'].sort(key=lambda p: p["wara"], reverse=True)
full_roster['active']['players'].sort(key=lambda p: p["wara"], reverse=True)
db.close()
return full_roster
@router.patch('/{team_id}', include_in_schema=PRIVATE_IN_SCHEMA) @router.patch('/{team_id}')
@handle_db_errors
async def patch_team( async def patch_team(
team_id: int, manager1_id: Optional[int] = None, manager2_id: Optional[int] = None, gmid: Optional[int] = None, team_id: int,
gmid2: Optional[int] = None, mascot: Optional[str] = None, stadium: Optional[str] = None, token: str = Depends(oauth2_scheme),
thumbnail: Optional[str] = None, color: Optional[str] = None, abbrev: Optional[str] = None, manager1_id: Optional[int] = None,
sname: Optional[str] = None, lname: Optional[str] = None, dice_color: Optional[str] = None, manager2_id: Optional[int] = None,
division_id: Optional[int] = None, token: str = Depends(oauth2_scheme)): gmid: Optional[int] = None,
if not valid_token(token): gmid2: Optional[int] = None,
logger.warning(f'patch_team - Bad Token: {token}') mascot: Optional[str] = None,
raise HTTPException(status_code=401, detail='Unauthorized') stadium: Optional[str] = None,
thumbnail: Optional[str] = None,
this_team = Team.get_or_none(Team.id == team_id) color: Optional[str] = None,
if not this_team: abbrev: Optional[str] = None,
return None sname: Optional[str] = None,
lname: Optional[str] = None,
if abbrev is not None: dice_color: Optional[str] = None,
this_team.abbrev = abbrev division_id: Optional[int] = None,
if manager1_id is not None: ):
if manager1_id == 0: """Patch a team (partial update)."""
this_team.manager1 = None # Build dict of provided fields
else: data = {}
this_manager = Manager.get_or_none(Manager.id == manager1_id) locals_dict = locals()
if not this_manager: for key, value in locals_dict.items():
db.close() if key not in ('team_id', 'token') and value is not None:
raise HTTPException(status_code=404, detail=f'Manager ID {manager1_id} not found') data[key] = value
this_team.manager1 = this_manager
if manager2_id is not None: return TeamService.update_team(team_id, data, token)
if manager2_id == 0:
this_team.manager2 = None
else:
this_manager = Manager.get_or_none(Manager.id == manager2_id)
if not this_manager:
db.close()
raise HTTPException(status_code=404, detail=f'Manager ID {manager2_id} not found')
this_team.manager2 = this_manager
if gmid is not None:
this_team.gmid = gmid
if gmid2 is not None:
if gmid2 == 0:
this_team.gmid2 = None
else:
this_team.gmid2 = gmid2
if mascot is not None:
if mascot == 'False':
this_team.mascot = None
else:
this_team.mascot = mascot
if stadium is not None:
this_team.stadium = stadium
if thumbnail is not None:
this_team.thumbnail = thumbnail
if color is not None:
this_team.color = color
if dice_color is not None:
this_team.dice_color = dice_color
if sname is not None:
this_team.sname = sname
if lname is not None:
this_team.lname = lname
if division_id is not None:
if division_id == 0:
this_team.division = None
else:
this_division = Division.get_or_none(Division.id == division_id)
if not this_division:
db.close()
raise HTTPException(status_code=404, detail=f'Division ID {division_id} not found')
this_team.division = this_division
if this_team.save():
r_team = model_to_dict(this_team)
db.close()
# Invalidate team-related cache entries
invalidate_cache("teams*")
invalidate_cache(f"team*{team_id}*")
invalidate_cache("team-roster*")
return r_team
else:
db.close()
raise HTTPException(status_code=500, detail=f'Unable to patch team {team_id}')
@router.post('', include_in_schema=PRIVATE_IN_SCHEMA) @router.post('')
@handle_db_errors async def post_teams(
async def post_team(team_list: TeamList, token: str = Depends(oauth2_scheme)): team_list: dict,
if not valid_token(token): token: str = Depends(oauth2_scheme)
logger.warning(f'post_team - Bad Token: {token}') ):
raise HTTPException(status_code=401, detail='Unauthorized') """Create multiple teams."""
return TeamService.create_teams(team_list.get("teams", []), token)
new_teams = []
for team in team_list.teams:
dupe_team = Team.get_or_none(Team.season == team.season, Team.abbrev == team.abbrev)
if dupe_team:
db.close()
raise HTTPException(
status_code=500, detail=f'Team Abbrev {team.abbrev} already in use in Season {team.season}'
)
if team.manager1_id and not Manager.get_or_none(Manager.id == team.manager1_id):
db.close()
raise HTTPException(status_code=404, detail=f'Manager ID {team.manager1_id} not found')
if team.manager2_id and not Manager.get_or_none(Manager.id == team.manager2_id):
db.close()
raise HTTPException(status_code=404, detail=f'Manager ID {team.manager2_id} not found')
if team.division_id and not Division.get_or_none(Division.id == team.division_id):
db.close()
raise HTTPException(status_code=404, detail=f'Division ID {team.division_id} not found')
new_teams.append(team.dict())
with db.atomic():
for batch in chunked(new_teams, 15):
Team.insert_many(batch).on_conflict_ignore().execute()
db.close()
# Invalidate team-related cache entries
invalidate_cache("teams*")
invalidate_cache("team*")
invalidate_cache("team-roster*")
return f'Inserted {len(new_teams)} teams'
@router.delete('/{team_id}', include_in_schema=PRIVATE_IN_SCHEMA) @router.delete('/{team_id}')
@handle_db_errors async def delete_team(
async def delete_team(team_id: int, token: str = Depends(oauth2_scheme)): team_id: int,
if not valid_token(token): token: str = Depends(oauth2_scheme)
logger.warning(f'delete_team - Bad Token: {token}') ):
raise HTTPException(status_code=401, detail='Unauthorized') """Delete a team."""
return TeamService.delete_team(team_id, token)
this_team = Team.get_or_none(Team.id == team_id)
if not this_team:
db.close()
raise HTTPException(status_code=404, detail=f'Team ID {team_id} not found')
count = this_team.delete_instance()
db.close()
if count == 1:
# Invalidate team-related cache entries
invalidate_cache("teams*")
invalidate_cache(f"team*{team_id}*")
invalidate_cache("team-roster*")
return f'Team {team_id} has been deleted'
else:
raise HTTPException(status_code=500, detail=f'Team {team_id} could not be deleted')

2
app/services/__init__.py Normal file
View File

@ -0,0 +1,2 @@
# Services layer for Major Domo Database
# Business logic extracted from routers for testability and reuse

123
app/services/base.py Normal file
View File

@ -0,0 +1,123 @@
"""
Base Service Class
Provides common functionality for all services:
- Database connection management
- Cache invalidation
- Error handling
- Logging
"""
import logging
from typing import Optional, Any
from ..db_engine import db
from ..dependencies import invalidate_cache, handle_db_errors
logger = logging.getLogger('discord_app')
class BaseService:
"""Base class for all services with common patterns."""
# Subclasses should override these
cache_patterns = [] # List of cache patterns to invalidate
@staticmethod
def close_db():
"""Safely close database connection."""
try:
db.close()
except Exception:
pass # Connection may already be closed
@classmethod
def invalidate_cache_for(cls, entity_type: str, entity_id: Optional[int] = None):
"""
Invalidate cache entries for an entity.
Args:
entity_type: Type of entity (e.g., 'players', 'teams')
entity_id: Optional specific entity ID
"""
if entity_id:
invalidate_cache(f"{entity_type}*{entity_id}*")
else:
invalidate_cache(f"{entity_type}*")
@classmethod
def invalidate_related_cache(cls, patterns: list):
"""Invalidate multiple cache patterns."""
for pattern in patterns:
invalidate_cache(pattern)
@classmethod
def handle_error(cls, operation: str, error: Exception, rethrow: bool = True) -> dict:
"""
Handle errors consistently.
Args:
operation: Description of the operation that failed
error: The exception that occurred
rethrow: Whether to raise HTTPException or return error dict
Returns:
Error dict if not rethrowing
"""
logger.error(f"{operation}: {error}")
if rethrow:
from fastapi import HTTPException
raise HTTPException(status_code=500, detail=f"{operation}: {str(error)}")
return {"error": operation, "detail": str(error)}
@classmethod
def require_auth(cls, token: str) -> bool:
"""
Validate authentication token.
Args:
token: The token to validate
Returns:
True if valid
Raises:
HTTPException if invalid
"""
from fastapi import HTTPException
from ..dependencies import valid_token, oauth2_scheme
if not valid_token(token):
logger.warning(f"Unauthorized access attempt with token: {token[:10]}...")
raise HTTPException(status_code=401, detail="Unauthorized")
return True
@classmethod
def format_csv_response(cls, headers: list, rows: list) -> str:
"""
Format data as CSV.
Args:
headers: Column headers
rows: List of row data
Returns:
CSV formatted string
"""
from pandas import DataFrame
all_data = [headers] + rows
return DataFrame(all_data).to_csv(header=False, index=False)
@classmethod
def parse_query_params(cls, params: dict, remove_none: bool = True) -> dict:
"""
Parse and clean query parameters.
Args:
params: Raw parameters dict
remove_none: Whether to remove None values
Returns:
Cleaned parameters dict
"""
if remove_none:
return {k: v for k, v in params.items() if v is not None and v != [] and v != ""}
return params

View File

@ -0,0 +1,370 @@
"""
Player Service
Business logic for player operations:
- CRUD operations
- Search and filtering
- Cache management
"""
import logging
from typing import List, Optional, Dict, Any
from peewee import fn as peewee_fn
from ..db_engine import db, Player, model_to_dict, chunked
from .base import BaseService
logger = logging.getLogger('discord_app')
class PlayerService(BaseService):
"""Service for player-related operations."""
cache_patterns = [
"players*",
"players-search*",
"player*",
"team-roster*"
]
@classmethod
def get_players(
cls,
season: Optional[int] = None,
team_id: Optional[List[int]] = None,
pos: Optional[List[str]] = None,
strat_code: Optional[List[str]] = None,
name: Optional[str] = None,
is_injured: Optional[bool] = None,
sort: Optional[str] = None,
short_output: bool = False,
as_csv: bool = False
) -> Dict[str, Any]:
"""
Get players with filtering and sorting.
Args:
season: Filter by season
team_id: Filter by team IDs
pos: Filter by positions
strat_code: Filter by strat codes
name: Filter by name (exact match)
is_injured: Filter by injury status
sort: Sort order (cost-asc, cost-desc, name-asc, name-desc)
short_output: Exclude related data
as_csv: Return as CSV format
Returns:
Dict with count and players list, or CSV string
"""
try:
# Build base query
if season is not None:
query = Player.select_season(season)
else:
query = Player.select()
# Apply filters
if team_id:
query = query.where(Player.team_id << team_id)
if strat_code:
code_list = [x.lower() for x in strat_code]
query = query.where(peewee_fn.Lower(Player.strat_code) << code_list)
if name:
query = query.where(peewee_fn.lower(Player.name) == name.lower())
if pos:
p_list = [x.upper() for x in pos]
query = query.where(
(Player.pos_1 << p_list) |
(Player.pos_2 << p_list) |
(Player.pos_3 << p_list) |
(Player.pos_4 << p_list) |
(Player.pos_5 << p_list) |
(Player.pos_6 << p_list) |
(Player.pos_7 << p_list) |
(Player.pos_8 << p_list)
)
if is_injured is not None:
query = query.where(Player.il_return.is_null(False))
# Apply sorting
if sort == "cost-asc":
query = query.order_by(Player.wara)
elif sort == "cost-desc":
query = query.order_by(-Player.wara)
elif sort == "name-asc":
query = query.order_by(Player.name)
elif sort == "name-desc":
query = query.order_by(-Player.name)
else:
query = query.order_by(Player.id)
# Return format
if as_csv:
return cls._format_player_csv(query)
else:
players_data = [
model_to_dict(p, recurse=not short_output)
for p in query
]
return {
"count": query.count(),
"players": players_data
}
except Exception as e:
cls.handle_error(f"Error fetching players: {e}", e)
finally:
cls.close_db()
@classmethod
def search_players(
cls,
query_str: str,
season: Optional[int] = None,
limit: int = 10,
short_output: bool = False
) -> Dict[str, Any]:
"""
Search players by name with fuzzy matching.
Args:
query_str: Search query
season: Season to search (None/0 for all)
limit: Maximum results
short_output: Exclude related data
Returns:
Dict with count and matching players
"""
try:
query_lower = query_str.lower()
search_all_seasons = season is None or season == 0
# Build base query
if search_all_seasons:
all_players = (
Player.select()
.where(peewee_fn.lower(Player.name).contains(query_lower))
.order_by(-Player.season)
)
else:
all_players = (
Player.select_season(season)
.where(peewee_fn.lower(Player.name).contains(query_lower))
)
# Convert to list for sorting
players_list = list(all_players)
# Sort by relevance (exact matches first)
exact_matches = [p for p in players_list if p.name.lower() == query_lower]
partial_matches = [p for p in players_list if query_lower in p.name.lower() and p.name.lower() != query_lower]
# Sort by season within each group
if search_all_seasons:
exact_matches.sort(key=lambda p: p.season, reverse=True)
partial_matches.sort(key=lambda p: p.season, reverse=True)
# Combine and limit
results = (exact_matches + partial_matches)[:limit]
return {
"count": len(results),
"total_matches": len(exact_matches + partial_matches),
"all_seasons": search_all_seasons,
"players": [model_to_dict(p, recurse=not short_output) for p in results]
}
except Exception as e:
cls.handle_error(f"Error searching players: {e}", e)
finally:
cls.close_db()
@classmethod
def get_player(cls, player_id: int, short_output: bool = False) -> Optional[Dict[str, Any]]:
"""
Get a single player by ID.
Args:
player_id: Player ID
short_output: Exclude related data
Returns:
Player dict or None
"""
try:
player = Player.get_or_none(Player.id == player_id)
if player:
return model_to_dict(player, recurse=not short_output)
return None
except Exception as e:
cls.handle_error(f"Error fetching player {player_id}: {e}", e)
finally:
cls.close_db()
@classmethod
def update_player(cls, player_id: int, data: Dict[str, Any], token: str) -> Dict[str, Any]:
"""
Update a player (full update via PUT).
Args:
player_id: Player ID to update
data: Player data dict
token: Auth token
Returns:
Updated player dict
"""
cls.require_auth(token)
try:
# Verify player exists
if not Player.get_or_none(Player.id == player_id):
from fastapi import HTTPException
raise HTTPException(status_code=404, detail=f"Player ID {player_id} not found")
# Execute update
Player.update(**data).where(Player.id == player_id).execute()
return cls.get_player(player_id)
except Exception as e:
cls.handle_error(f"Error updating player {player_id}: {e}", e)
finally:
cls.invalidate_related_cache(cls.cache_patterns)
cls.close_db()
@classmethod
def patch_player(cls, player_id: int, data: Dict[str, Any], token: str) -> Dict[str, Any]:
"""
Patch a player (partial update).
Args:
player_id: Player ID to update
data: Fields to update
token: Auth token
Returns:
Updated player dict
"""
cls.require_auth(token)
try:
player = Player.get_or_none(Player.id == player_id)
if not player:
from fastapi import HTTPException
raise HTTPException(status_code=404, detail=f"Player ID {player_id} not found")
# Apply updates
for key, value in data.items():
if value is not None and hasattr(player, key):
setattr(player, key, value)
player.save()
return cls.get_player(player_id)
except Exception as e:
cls.handle_error(f"Error patching player {player_id}: {e}", e)
finally:
cls.invalidate_related_cache(cls.cache_patterns)
cls.close_db()
@classmethod
def create_players(cls, players_data: List[Dict[str, Any]], token: str) -> Dict[str, Any]:
"""
Create multiple players.
Args:
players_data: List of player dicts
token: Auth token
Returns:
Result message
"""
cls.require_auth(token)
try:
# Check for duplicates
for player in players_data:
dupe = Player.get_or_none(
Player.season == player.get("season"),
Player.name == player.get("name")
)
if dupe:
from fastapi import HTTPException
raise HTTPException(
status_code=500,
detail=f"Player {player.get('name')} already exists in Season {player.get('season')}"
)
# Insert in batches
with db.atomic():
for batch in chunked(players_data, 15):
Player.insert_many(batch).on_conflict_ignore().execute()
return {"message": f"Inserted {len(players_data)} players"}
except Exception as e:
cls.handle_error(f"Error creating players: {e}", e)
finally:
cls.invalidate_related_cache(cls.cache_patterns)
cls.close_db()
@classmethod
def delete_player(cls, player_id: int, token: str) -> Dict[str, str]:
"""
Delete a player.
Args:
player_id: Player ID to delete
token: Auth token
Returns:
Result message
"""
cls.require_auth(token)
try:
player = Player.get_or_none(Player.id == player_id)
if not player:
from fastapi import HTTPException
raise HTTPException(status_code=404, detail=f"Player ID {player_id} not found")
player.delete_instance()
return {"message": f"Player {player_id} deleted"}
except Exception as e:
cls.handle_error(f"Error deleting player {player_id}: {e}", e)
finally:
cls.invalidate_related_cache(cls.cache_patterns)
cls.close_db()
@staticmethod
def _format_player_csv(query) -> str:
"""Format player query results as CSV."""
headers = [
"name", "wara", "image", "image2", "team", "season", "pitcher_injury",
"pos_1", "pos_2", "pos_3", "pos_4", "pos_5", "pos_6", "pos_7", "pos_8",
"last_game", "last_game2", "il_return", "demotion_week", "headshot",
"vanity_card", "strat_code", "bbref_id", "injury_rating", "player_id", "sbaref_id"
]
rows = []
for player in query:
strat_code = player.strat_code.replace(",", "-_-") if player.strat_code else ""
rows.append([
player.name, player.wara, player.image, player.image2, player.team.abbrev,
player.season, player.pitcher_injury, player.pos_1, player.pos_2, player.pos_3,
player.pos_4, player.pos_5, player.pos_6, player.pos_7, player.pos_8,
player.last_game, player.last_game2, player.il_return, player.demotion_week,
player.headshot, player.vanity_card, strat_code, player.bbref_id,
player.injury_rating, player.id, player.sbaplayer
])
return cls.format_csv_response(headers, rows)

View File

@ -0,0 +1,245 @@
"""
Team Service
Business logic for team operations:
- CRUD operations
- Roster management
- Cache management
"""
import logging
import copy
from typing import List, Optional, Dict, Any, Literal
from ..db_engine import db, Team, Manager, Division, model_to_dict, chunked, query_to_csv
from .base import BaseService
logger = logging.getLogger('discord_app')
class TeamService(BaseService):
"""Service for team-related operations."""
cache_patterns = [
"teams*",
"team*",
"team-roster*"
]
@classmethod
def get_teams(
cls,
season: Optional[int] = None,
owner_id: Optional[List[int]] = None,
manager_id: Optional[List[int]] = None,
team_abbrev: Optional[List[str]] = None,
active_only: bool = False,
short_output: bool = False,
as_csv: bool = False
) -> Dict[str, Any]:
"""
Get teams with filtering.
Args:
season: Filter by season
owner_id: Filter by Discord owner ID
manager_id: Filter by manager IDs
team_abbrev: Filter by abbreviations
active_only: Exclude IL/MiL teams
short_output: Exclude related data
as_csv: Return as CSV
Returns:
Dict with count and teams list, or CSV string
"""
try:
if season is not None:
query = Team.select_season(season).order_by(Team.id.asc())
else:
query = Team.select().order_by(Team.id.asc())
# Apply filters
if manager_id:
managers = Manager.select().where(Manager.id << manager_id)
query = query.where(
(Team.manager1_id << managers) | (Team.manager2_id << managers)
)
if owner_id:
query = query.where((Team.gmid << owner_id) | (Team.gmid2 << owner_id))
if team_abbrev:
abbrev_list = [x.lower() for x in team_abbrev]
query = query.where(peewee_fn.lower(Team.abbrev) << abbrev_list)
if active_only:
query = query.where(
~(Team.abbrev.endswith('IL')) & ~(Team.abbrev.endswith('MiL'))
)
if as_csv:
return query_to_csv(query, exclude=[Team.division_legacy, Team.mascot, Team.gsheet])
return {
"count": query.count(),
"teams": [model_to_dict(t, recurse=not short_output) for t in query]
}
except Exception as e:
cls.handle_error(f"Error fetching teams: {e}", e)
finally:
cls.close_db()
@classmethod
def get_team(cls, team_id: int) -> Optional[Dict[str, Any]]:
"""Get a single team by ID."""
try:
team = Team.get_or_none(Team.id == team_id)
if team:
return model_to_dict(team)
return None
except Exception as e:
cls.handle_error(f"Error fetching team {team_id}: {e}", e)
finally:
cls.close_db()
@classmethod
def get_team_roster(
cls,
team_id: int,
which: Literal['current', 'next'],
sort: Optional[str] = None
) -> Dict[str, Any]:
"""
Get team roster with IL lists.
Args:
team_id: Team ID
which: 'current' or 'next' week roster
sort: Optional sort key
Returns:
Roster dict with active, short-il, long-il lists
"""
try:
team = Team.get_by_id(team_id)
if which == 'current':
full_roster = team.get_this_week()
else:
full_roster = team.get_next_week()
# Deep copy and convert to dicts
result = {
'active': {'players': []},
'shortil': {'players': []},
'longil': {'players': []}
}
for section in ['active', 'shortil', 'longil']:
players = copy.deepcopy(full_roster[section]['players'])
result[section]['players'] = [model_to_dict(p) for p in players]
# Apply sorting
if sort == 'wara-desc':
for section in ['active', 'shortil', 'longil']:
result[section]['players'].sort(key=lambda p: p.get("wara", 0), reverse=True)
return result
except Exception as e:
cls.handle_error(f"Error fetching roster for team {team_id}: {e}", e)
finally:
cls.close_db()
@classmethod
def update_team(cls, team_id: int, data: Dict[str, Any], token: str) -> Dict[str, Any]:
"""Update a team (partial update)."""
cls.require_auth(token)
try:
team = Team.get_or_none(Team.id == team_id)
if not team:
from fastapi import HTTPException
raise HTTPException(status_code=404, detail=f"Team ID {team_id} not found")
# Apply updates
for key, value in data.items():
if value is not None and hasattr(team, key):
# Handle special cases
if key.endswith('_id') and value == 0:
setattr(team, key[:-3], None)
elif key == 'division_id' and value == 0:
team.division = None
else:
setattr(team, key, value)
team.save()
return cls.get_team(team_id)
except Exception as e:
cls.handle_error(f"Error updating team {team_id}: {e}", e)
finally:
cls.invalidate_related_cache(cls.cache_patterns)
cls.close_db()
@classmethod
def create_teams(cls, teams_data: List[Dict[str, Any]], token: str) -> Dict[str, str]:
"""Create multiple teams."""
cls.require_auth(token)
try:
for team in teams_data:
dupe = Team.get_or_none(
Team.season == team.get("season"),
Team.abbrev == team.get("abbrev")
)
if dupe:
from fastapi import HTTPException
raise HTTPException(
status_code=500,
detail=f"Team {team.get('abbrev')} already exists in Season {team.get('season')}"
)
# Validate foreign keys
for field, model in [('manager1_id', Manager), ('manager2_id', Manager), ('division_id', Division)]:
if team.get(field) and not model.get_or_none(Model.id == team[field]):
from fastapi import HTTPException
raise HTTPException(status_code=404, detail=f"{field} {team[field]} not found")
with db.atomic():
for batch in chunked(teams_data, 15):
Team.insert_many(batch).on_conflict_ignore().execute()
return {"message": f"Inserted {len(teams_data)} teams"}
except Exception as e:
cls.handle_error(f"Error creating teams: {e}", e)
finally:
cls.invalidate_related_cache(cls.cache_patterns)
cls.close_db()
@classmethod
def delete_team(cls, team_id: int, token: str) -> Dict[str, str]:
"""Delete a team."""
cls.require_auth(token)
try:
team = Team.get_or_none(Team.id == team_id)
if not team:
from fastapi import HTTPException
raise HTTPException(status_code=404, detail=f"Team ID {team_id} not found")
team.delete_instance()
return {"message": f"Team {team_id} deleted"}
except Exception as e:
cls.handle_error(f"Error deleting team {team_id}: {e}", e)
finally:
cls.invalidate_related_cache(cls.cache_patterns)
cls.close_db()
# Fix peewee_fn reference
from peewee import fn as peewee_fn