major-domo-database/app/services/player_service.py
Cal Corum 99828893c2
All checks were successful
Build Docker Image / build (pull_request) Successful in 2m26s
perf: push limit/offset to DB in PlayerService.get_players (#37)
Apply .offset() and .limit() on the Peewee query before materializing
results, instead of fetching all rows into memory and slicing in Python.
Total count is obtained via query.count() before pagination is applied.
In-memory (mock) queries continue to use Python-level slicing.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-05 10:33:54 -06:00

709 lines
25 KiB
Python

"""
Player Service - Dependency Injection Version
Business logic for player operations with injectable dependencies.
"""
import csv
import io
import logging
from typing import List, Optional, Dict, Any, TYPE_CHECKING
from peewee import fn as peewee_fn
from playhouse.shortcuts import model_to_dict
from .base import BaseService
from .interfaces import AbstractPlayerRepository, QueryResult
from ..db_engine import Player
if TYPE_CHECKING:
from .base import ServiceConfig
# Try to import HTTPException from FastAPI, fall back to custom for testing
try:
from fastapi import HTTPException
except ImportError:
# Custom exception for testing without FastAPI
class HTTPException(Exception):
def __init__(self, status_code: int, detail: str):
self.status_code = status_code
self.detail = detail
super().__init__(detail)
logger = logging.getLogger("discord_app")
class PlayerService(BaseService):
"""Service for player-related operations with dependency injection."""
cache_patterns = ["players*", "players-search*", "player*", "team-roster*"]
# Deprecated fields to exclude from player responses
EXCLUDED_FIELDS = ["pitcher_injury"]
# Class-level repository for dependency injection
_injected_repo: Optional[AbstractPlayerRepository] = None
def __init__(
self,
player_repo: Optional[AbstractPlayerRepository] = None,
config: Optional["ServiceConfig"] = None,
**kwargs,
):
"""
Initialize PlayerService with optional repository.
Args:
player_repo: AbstractPlayerRepository implementation (mock or real)
config: ServiceConfig with injected dependencies
**kwargs: Additional arguments passed to BaseService
"""
super().__init__(player_repo=player_repo, config=config, **kwargs)
# Store injected repo at class level for classmethod access
# Check both direct injection and config
repo_to_inject = player_repo
if config is not None and config.player_repo is not None:
repo_to_inject = config.player_repo
if repo_to_inject is not None:
PlayerService._injected_repo = repo_to_inject
@classmethod
def _get_player_repo(cls) -> AbstractPlayerRepository:
"""Get the player repository, using real DB if not injected."""
if cls._injected_repo is not None:
return cls._injected_repo
# Fall back to real DB models for production
return cls._get_real_repo()
@classmethod
def _get_real_repo(cls) -> "RealPlayerRepository":
"""Get a real DB repository for production use."""
return RealPlayerRepository(Player)
@classmethod
def get_players(
cls,
season: Optional[int] = None,
team_id: Optional[List[int]] = None,
pos: Optional[List[str]] = None,
strat_code: Optional[List[str]] = None,
name: Optional[str] = None,
is_injured: Optional[bool] = None,
sort: Optional[str] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
short_output: bool = False,
as_csv: bool = False,
) -> Dict[str, Any]:
"""
Get players with filtering and sorting.
Args:
season: Filter by season
team_id: Filter by team IDs
pos: Filter by positions
strat_code: Filter by strat codes
name: Filter by name (exact match)
is_injured: Filter by injury status
sort: Sort order
limit: Maximum number of results to return
offset: Number of results to skip for pagination
short_output: Exclude related data
as_csv: Return as CSV format
Returns:
Dict with count and players list, or CSV string
"""
try:
# Get base query from repo
repo = cls._get_player_repo()
if season is not None:
query = repo.select_season(season)
else:
query = repo.select_season(0)
# Apply filters using repo-agnostic approach
query = cls._apply_player_filters(
query,
team_id=team_id,
pos=pos,
strat_code=strat_code,
name=name,
is_injured=is_injured,
)
# Apply sorting
query = cls._apply_player_sort(query, sort)
# Apply pagination at DB level for real queries, Python level for mocks
if isinstance(query, InMemoryQueryResult):
total_count = len(query)
players_data = cls._query_to_player_dicts(query, short_output)
if offset is not None:
players_data = players_data[offset:]
if limit is not None:
players_data = players_data[:limit]
else:
total_count = query.count()
if offset is not None:
query = query.offset(offset)
if limit is not None:
query = query.limit(limit)
players_data = cls._query_to_player_dicts(query, short_output)
# Return format
if as_csv:
return cls._format_player_csv(players_data)
else:
return {
"count": len(players_data),
"total": total_count,
"players": players_data,
}
except Exception as e:
logger.error(f"Error fetching players: {e}")
try:
raise HTTPException(
status_code=500, detail=f"Error fetching players: {str(e)}"
)
except ImportError:
raise RuntimeError(f"Error fetching players: {str(e)}")
@classmethod
def _apply_player_filters(
cls,
query: QueryResult,
team_id: Optional[List[int]] = None,
pos: Optional[List[str]] = None,
strat_code: Optional[List[str]] = None,
name: Optional[str] = None,
is_injured: Optional[bool] = None,
) -> QueryResult:
"""Apply player filters in a repo-agnostic way."""
# Check if repo supports where() method (real DB)
# Only use DB-native filtering if:
# 1. Query has where() method
# 2. Items are Peewee models (not dicts)
first_item = None
for item in query:
first_item = item
break
# Use DB-native filtering only for real Peewee models
if first_item is not None and not isinstance(first_item, dict):
try:
if team_id:
query = query.where(Player.team_id << team_id)
if strat_code:
code_list = [x.lower() for x in strat_code]
query = query.where(peewee_fn.Lower(Player.strat_code) << code_list)
if name:
query = query.where(peewee_fn.lower(Player.name) == name.lower())
if pos:
p_list = [x.upper() for x in pos]
# Expand generic "P" to match all pitcher positions
pitcher_positions = ["SP", "RP", "CP"]
if "P" in p_list:
p_list.remove("P")
p_list.extend(pitcher_positions)
pos_conditions = (
(Player.pos_1 << p_list)
| (Player.pos_2 << p_list)
| (Player.pos_3 << p_list)
| (Player.pos_4 << p_list)
| (Player.pos_5 << p_list)
| (Player.pos_6 << p_list)
| (Player.pos_7 << p_list)
| (Player.pos_8 << p_list)
)
query = query.where(pos_conditions)
if is_injured is not None:
if is_injured:
query = query.where(Player.il_return.is_null(False))
else:
query = query.where(Player.il_return.is_null(True))
except ImportError:
# DB not available, fall back to Python filtering
pass
else:
# Use Python filtering for mocks
def matches(player):
if team_id and player.get("team_id") not in team_id:
return False
if strat_code:
code_list = [s.lower() for s in strat_code]
player_code = (player.get("strat_code") or "").lower()
if player_code not in code_list:
return False
if name and (player.get("name") or "").lower() != name.lower():
return False
if pos:
p_list = [p.upper() for p in pos]
# Expand generic "P" to match all pitcher positions
pitcher_positions = ["SP", "RP", "CP"]
if "P" in p_list:
p_list.remove("P")
p_list.extend(pitcher_positions)
player_pos = [
player.get(f"pos_{i}")
for i in range(1, 9)
if player.get(f"pos_{i}")
]
if not any(p in p_list for p in player_pos):
return False
if is_injured is not None:
has_injury = player.get("il_return") is not None
if is_injured and not has_injury:
return False
if not is_injured and has_injury:
return False
return True
# Filter in memory
filtered = [p for p in query if matches(p)]
query = InMemoryQueryResult(filtered)
return query
@classmethod
def _apply_player_sort(
cls, query: QueryResult, sort: Optional[str] = None
) -> QueryResult:
"""Apply player sorting in a repo-agnostic way."""
# Check if items are Peewee models (not dicts)
first_item = None
for item in query:
first_item = item
break
# Use DB-native sorting only for real Peewee models
if first_item is not None and not isinstance(first_item, dict):
try:
if sort == "cost-asc":
query = query.order_by(Player.wara)
elif sort == "cost-desc":
query = query.order_by(-Player.wara)
elif sort == "name-asc":
query = query.order_by(Player.name)
elif sort == "name-desc":
query = query.order_by(-Player.name)
else:
query = query.order_by(Player.id)
except ImportError:
# Fall back to Python sorting if DB not available
pass
# Use Python sorting for mocks or if DB sort failed
if not hasattr(query, "order_by") or isinstance(query, InMemoryQueryResult):
def get_sort_key(player):
name = player.get("name", "")
wara = player.get("wara", 0)
player_id = player.get("id", 0)
if sort == "cost-asc":
return (wara, name, player_id)
elif sort == "cost-desc":
return (-wara, name, player_id)
elif sort == "name-asc":
return (name, wara, player_id)
elif sort == "name-desc":
return (name, wara, player_id) # Will use reverse=True
else:
return (player_id,)
# Use reverse for descending name sort
reverse_sort = sort == "name-desc"
sorted_list = sorted(list(query), key=get_sort_key, reverse=reverse_sort)
query = InMemoryQueryResult(sorted_list)
return query
@classmethod
def _query_to_player_dicts(
cls, query: QueryResult, short_output: bool = False
) -> List[Dict[str, Any]]:
"""Convert query results to list of player dicts."""
# Check if we have DB models or dicts
first_item = None
for item in query:
first_item = item
break
if first_item is None:
return []
# Convert all items through _player_to_dict to ensure filtering
players_data = []
for player in query:
player_dict = cls._player_to_dict(player, recurse=not short_output)
players_data.append(player_dict)
return players_data
@classmethod
def search_players(
cls,
query_str: str,
season: Optional[int] = None,
limit: int = 10,
short_output: bool = False,
) -> Dict[str, Any]:
"""
Search players by name with database-level filtering.
Performance optimized: Uses SQL LIKE for filtering instead of loading
all players into memory. Reduces query time from 15+ seconds to <500ms
for all-seasons searches.
Args:
query_str: Search query
season: Season to search (None/0 for all)
limit: Maximum results
short_output: Exclude related data
Returns:
Dict with count and matching players
"""
try:
from peewee import fn
from ..db_engine import Player
query_lower = query_str.lower()
search_all_seasons = season is None or season == 0
# Build database query with SQL LIKE for efficient filtering
# This filters at the database level instead of loading all players
if search_all_seasons:
# Search all seasons, order by season DESC (newest first)
query = (
Player.select()
.where(fn.Lower(Player.name).contains(query_lower))
.order_by(Player.season.desc(), Player.name)
.limit(limit * 2)
) # Get extra for exact match sorting
else:
# Search specific season
query = (
Player.select()
.where(
(Player.season == season)
& (fn.Lower(Player.name).contains(query_lower))
)
.order_by(Player.name)
.limit(limit * 2)
) # Get extra for exact match sorting
# Execute query and convert limited results to dicts
players = list(query)
player_dicts = cls._query_to_player_dicts(
InMemoryQueryResult(players), short_output=short_output
)
# Separate exact vs partial matches for proper ordering
exact_matches = []
partial_matches = []
for player in player_dicts:
name_lower = player.get("name", "").lower()
if name_lower == query_lower:
exact_matches.append(player)
else:
partial_matches.append(player)
# Combine and limit to requested amount
results = (exact_matches + partial_matches)[:limit]
return {
"count": len(results),
"total_matches": len(results), # Approximate since limited at DB
"all_seasons": search_all_seasons,
"players": results,
}
except Exception as e:
cls.log_error("Error searching players", e)
try:
raise HTTPException(
status_code=500, detail=f"Error searching players: {str(e)}"
)
except ImportError:
raise RuntimeError(f"Error searching players: {str(e)}")
@classmethod
def get_player(
cls, player_id: int, short_output: bool = False
) -> Optional[Dict[str, Any]]:
"""Get a single player by ID."""
try:
repo = cls._get_player_repo()
player = repo.get_by_id(player_id)
if player:
return cls._player_to_dict(player, recurse=not short_output)
return None
except Exception as e:
cls.log_error(f"Error fetching player {player_id}", e)
try:
raise HTTPException(
status_code=500,
detail=f"Error fetching player {player_id}: {str(e)}",
)
except ImportError:
raise RuntimeError(f"Error fetching player {player_id}: {str(e)}")
@classmethod
def _player_to_dict(cls, player, recurse: bool = True) -> Dict[str, Any]:
"""Convert player to dict, excluding deprecated fields."""
# If already a dict, filter and return
if isinstance(player, dict):
return {k: v for k, v in player.items() if k not in cls.EXCLUDED_FIELDS}
# Try to convert Peewee model with foreign key recursion
try:
# Use backrefs=False to avoid circular reference issues
player_dict = model_to_dict(player, recurse=recurse, backrefs=False)
# Filter out excluded fields
return {
k: v for k, v in player_dict.items() if k not in cls.EXCLUDED_FIELDS
}
except (ImportError, AttributeError, TypeError) as e:
# Log the error and fall back to non-recursive serialization
logger.warning(
f"Error in recursive player serialization: {e}, falling back to non-recursive"
)
try:
# Fallback to non-recursive serialization
player_dict = model_to_dict(player, recurse=False)
return {
k: v for k, v in player_dict.items() if k not in cls.EXCLUDED_FIELDS
}
except Exception as fallback_error:
# Final fallback to basic dict conversion
logger.error(
f"Error in non-recursive serialization: {fallback_error}, using basic dict"
)
player_dict = dict(player)
return {
k: v for k, v in player_dict.items() if k not in cls.EXCLUDED_FIELDS
}
@classmethod
def update_player(
cls, player_id: int, data: Dict[str, Any], token: str
) -> Dict[str, Any]:
"""Update a player (full update via PUT)."""
temp_service = cls()
temp_service.require_auth(token)
try:
# Verify player exists
repo = cls._get_player_repo()
if not repo.get_by_id(player_id):
raise HTTPException(
status_code=404, detail=f"Player ID {player_id} not found"
)
# Execute update
repo.update(data, player_id=player_id)
return cls.get_player(player_id)
except Exception as e:
cls.log_error(f"Error updating player {player_id}", e)
raise HTTPException(
status_code=500, detail=f"Error updating player {player_id}: {str(e)}"
)
@classmethod
def patch_player(
cls, player_id: int, data: Dict[str, Any], token: str
) -> Dict[str, Any]:
"""Patch a player (partial update)."""
temp_service = cls()
temp_service.require_auth(token)
try:
repo = cls._get_player_repo()
player = repo.get_by_id(player_id)
if not player:
raise HTTPException(
status_code=404, detail=f"Player ID {player_id} not found"
)
# Apply updates using repo
repo.update(data, player_id=player_id)
return cls.get_player(player_id)
except Exception as e:
cls.log_error(f"Error patching player {player_id}", e)
raise HTTPException(
status_code=500, detail=f"Error patching player {player_id}: {str(e)}"
)
@classmethod
def create_players(
cls, players_data: List[Dict[str, Any]], token: str
) -> Dict[str, Any]:
"""Create multiple players."""
temp_service = cls()
temp_service.require_auth(token)
try:
# Check for duplicates using repo
repo = cls._get_player_repo()
for player in players_data:
dupe = repo.get_or_none(
season=player.get("season"), name=player.get("name")
)
if dupe:
raise HTTPException(
status_code=500,
detail=f"Player {player.get('name')} already exists in Season {player.get('season')}",
)
# Insert in batches
repo.insert_many(players_data)
return {"message": f"Inserted {len(players_data)} players"}
except Exception as e:
cls.log_error("Error creating players", e)
raise HTTPException(
status_code=500, detail=f"Error creating players: {str(e)}"
)
@classmethod
def delete_player(cls, player_id: int, token: str) -> Dict[str, str]:
"""Delete a player."""
temp_service = cls()
temp_service.require_auth(token)
try:
repo = cls._get_player_repo()
if not repo.get_by_id(player_id):
raise HTTPException(
status_code=404, detail=f"Player ID {player_id} not found"
)
repo.delete_by_id(player_id)
return {"message": f"Player {player_id} deleted"}
except Exception as e:
cls.log_error(f"Error deleting player {player_id}", e)
raise HTTPException(
status_code=500, detail=f"Error deleting player {player_id}: {str(e)}"
)
@classmethod
def _format_player_csv(cls, players: List[Dict]) -> str:
"""Format player list as CSV - works with both real DB and mocks."""
if not players:
return ""
# Flatten nested objects for CSV export
flattened_players = []
for player in players:
flat_player = player.copy()
# Flatten team object to just abbreviation
if isinstance(flat_player.get("team"), dict):
flat_player["team"] = flat_player["team"].get("abbrev", "")
# Flatten sbaplayer object to just ID
if isinstance(flat_player.get("sbaplayer"), dict):
flat_player["sbaplayer"] = flat_player["sbaplayer"].get("id", "")
flattened_players.append(flat_player)
# Build CSV from flattened data
output = io.StringIO()
if flattened_players:
writer = csv.DictWriter(output, fieldnames=flattened_players[0].keys())
writer.writeheader()
writer.writerows(flattened_players)
return output.getvalue()
class InMemoryQueryResult:
"""
In-memory query result for mock repositories.
Supports filtering, sorting, and iteration.
"""
def __init__(self, items: List[Dict[str, Any]]):
self._items = list(items)
def where(self, *conditions) -> "InMemoryQueryResult":
"""Apply filter conditions (no-op for compatibility)."""
return self
def order_by(self, *fields) -> "InMemoryQueryResult":
"""Apply sort (no-op, sorting done by service)."""
return self
def count(self) -> int:
return len(self._items)
def __iter__(self):
return iter(self._items)
def __len__(self):
return len(self._items)
def __getitem__(self, index):
return self._items[index]
class RealPlayerRepository:
"""Real database repository implementation."""
def __init__(self, model_class):
self._model = model_class
def select_season(self, season: int):
"""Return query for season. Season=0 or None returns all seasons."""
if season == 0 or season is None:
return self._model.select()
return self._model.select().where(self._model.season == season)
def get_by_id(self, player_id: int):
"""Get player by ID."""
return self._model.get_or_none(self._model.id == player_id)
def get_or_none(self, **conditions):
"""Get player matching conditions."""
try:
return self._model.get_or_none(**conditions)
except Exception:
return None
def update(self, data: Dict, player_id: int) -> int:
"""Update player."""
return Player.update(**data).where(Player.id == player_id).execute()
def insert_many(self, data: List[Dict]) -> int:
"""Insert multiple players."""
with Player._meta.database.atomic():
Player.insert_many(data).execute()
return len(data)
def delete_by_id(self, player_id: int) -> int:
"""Delete player by ID."""
return Player.delete().where(Player.id == player_id).execute()