major-domo-database/app/services/player_service.py
Cal Corum d92bb263f1 fix: invalidate cache after PlayerService write operations (#32)
Add finally blocks to update_player, patch_player, create_players, and
delete_player in PlayerService to call invalidate_related_cache() using
the existing cache_patterns. Matches the pattern already used in
TeamService.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-17 16:28:07 -05:00

717 lines
25 KiB
Python

"""
Player Service - Dependency Injection Version
Business logic for player operations with injectable dependencies.
"""
import csv
import io
import logging
from typing import List, Optional, Dict, Any, TYPE_CHECKING
from peewee import fn as peewee_fn
from playhouse.shortcuts import model_to_dict
from .base import BaseService
from .interfaces import AbstractPlayerRepository, QueryResult
from ..db_engine import Player
if TYPE_CHECKING:
from .base import ServiceConfig
# Try to import HTTPException from FastAPI, fall back to custom for testing
try:
from fastapi import HTTPException
except ImportError:
# Custom exception for testing without FastAPI
class HTTPException(Exception):
def __init__(self, status_code: int, detail: str):
self.status_code = status_code
self.detail = detail
super().__init__(detail)
logger = logging.getLogger("discord_app")
class PlayerService(BaseService):
"""Service for player-related operations with dependency injection."""
cache_patterns = ["players*", "players-search*", "player*", "team-roster*"]
# Deprecated fields to exclude from player responses
EXCLUDED_FIELDS = ["pitcher_injury"]
# Class-level repository for dependency injection
_injected_repo: Optional[AbstractPlayerRepository] = None
def __init__(
self,
player_repo: Optional[AbstractPlayerRepository] = None,
config: Optional["ServiceConfig"] = None,
**kwargs,
):
"""
Initialize PlayerService with optional repository.
Args:
player_repo: AbstractPlayerRepository implementation (mock or real)
config: ServiceConfig with injected dependencies
**kwargs: Additional arguments passed to BaseService
"""
super().__init__(player_repo=player_repo, config=config, **kwargs)
# Store injected repo at class level for classmethod access
# Check both direct injection and config
repo_to_inject = player_repo
if config is not None and config.player_repo is not None:
repo_to_inject = config.player_repo
if repo_to_inject is not None:
PlayerService._injected_repo = repo_to_inject
@classmethod
def _get_player_repo(cls) -> AbstractPlayerRepository:
"""Get the player repository, using real DB if not injected."""
if cls._injected_repo is not None:
return cls._injected_repo
# Fall back to real DB models for production
return cls._get_real_repo()
@classmethod
def _get_real_repo(cls) -> "RealPlayerRepository":
"""Get a real DB repository for production use."""
return RealPlayerRepository(Player)
@classmethod
def get_players(
cls,
season: Optional[int] = None,
team_id: Optional[List[int]] = None,
pos: Optional[List[str]] = None,
strat_code: Optional[List[str]] = None,
name: Optional[str] = None,
is_injured: Optional[bool] = None,
sort: Optional[str] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
short_output: bool = False,
as_csv: bool = False,
) -> Dict[str, Any]:
"""
Get players with filtering and sorting.
Args:
season: Filter by season
team_id: Filter by team IDs
pos: Filter by positions
strat_code: Filter by strat codes
name: Filter by name (exact match)
is_injured: Filter by injury status
sort: Sort order
limit: Maximum number of results to return
offset: Number of results to skip for pagination
short_output: Exclude related data
as_csv: Return as CSV format
Returns:
Dict with count and players list, or CSV string
"""
try:
# Get base query from repo
repo = cls._get_player_repo()
if season is not None:
query = repo.select_season(season)
else:
query = repo.select_season(0)
# Apply filters using repo-agnostic approach
query = cls._apply_player_filters(
query,
team_id=team_id,
pos=pos,
strat_code=strat_code,
name=name,
is_injured=is_injured,
)
# Apply sorting
query = cls._apply_player_sort(query, sort)
# Apply pagination at DB level for real queries, Python level for mocks
if isinstance(query, InMemoryQueryResult):
total_count = len(query)
players_data = cls._query_to_player_dicts(query, short_output)
if offset is not None:
players_data = players_data[offset:]
if limit is not None:
players_data = players_data[:limit]
else:
total_count = query.count()
if offset is not None:
query = query.offset(offset)
if limit is not None:
query = query.limit(limit)
players_data = cls._query_to_player_dicts(query, short_output)
# Return format
if as_csv:
return cls._format_player_csv(players_data)
else:
return {
"count": len(players_data),
"total": total_count,
"players": players_data,
}
except Exception as e:
logger.error(f"Error fetching players: {e}")
try:
raise HTTPException(
status_code=500, detail=f"Error fetching players: {str(e)}"
)
except ImportError:
raise RuntimeError(f"Error fetching players: {str(e)}")
@classmethod
def _apply_player_filters(
cls,
query: QueryResult,
team_id: Optional[List[int]] = None,
pos: Optional[List[str]] = None,
strat_code: Optional[List[str]] = None,
name: Optional[str] = None,
is_injured: Optional[bool] = None,
) -> QueryResult:
"""Apply player filters in a repo-agnostic way."""
# Check if repo supports where() method (real DB)
# Only use DB-native filtering if:
# 1. Query has where() method
# 2. Items are Peewee models (not dicts)
first_item = None
for item in query:
first_item = item
break
# Use DB-native filtering only for real Peewee models
if first_item is not None and not isinstance(first_item, dict):
try:
if team_id:
query = query.where(Player.team_id << team_id)
if strat_code:
code_list = [x.lower() for x in strat_code]
query = query.where(peewee_fn.Lower(Player.strat_code) << code_list)
if name:
query = query.where(peewee_fn.lower(Player.name) == name.lower())
if pos:
p_list = [x.upper() for x in pos]
# Expand generic "P" to match all pitcher positions
pitcher_positions = ["SP", "RP", "CP"]
if "P" in p_list:
p_list.remove("P")
p_list.extend(pitcher_positions)
pos_conditions = (
(Player.pos_1 << p_list)
| (Player.pos_2 << p_list)
| (Player.pos_3 << p_list)
| (Player.pos_4 << p_list)
| (Player.pos_5 << p_list)
| (Player.pos_6 << p_list)
| (Player.pos_7 << p_list)
| (Player.pos_8 << p_list)
)
query = query.where(pos_conditions)
if is_injured is not None:
if is_injured:
query = query.where(Player.il_return.is_null(False))
else:
query = query.where(Player.il_return.is_null(True))
except ImportError:
# DB not available, fall back to Python filtering
pass
else:
# Use Python filtering for mocks
def matches(player):
if team_id and player.get("team_id") not in team_id:
return False
if strat_code:
code_list = [s.lower() for s in strat_code]
player_code = (player.get("strat_code") or "").lower()
if player_code not in code_list:
return False
if name and (player.get("name") or "").lower() != name.lower():
return False
if pos:
p_list = [p.upper() for p in pos]
# Expand generic "P" to match all pitcher positions
pitcher_positions = ["SP", "RP", "CP"]
if "P" in p_list:
p_list.remove("P")
p_list.extend(pitcher_positions)
player_pos = [
player.get(f"pos_{i}")
for i in range(1, 9)
if player.get(f"pos_{i}")
]
if not any(p in p_list for p in player_pos):
return False
if is_injured is not None:
has_injury = player.get("il_return") is not None
if is_injured and not has_injury:
return False
if not is_injured and has_injury:
return False
return True
# Filter in memory
filtered = [p for p in query if matches(p)]
query = InMemoryQueryResult(filtered)
return query
@classmethod
def _apply_player_sort(
cls, query: QueryResult, sort: Optional[str] = None
) -> QueryResult:
"""Apply player sorting in a repo-agnostic way."""
# Check if items are Peewee models (not dicts)
first_item = None
for item in query:
first_item = item
break
# Use DB-native sorting only for real Peewee models
if first_item is not None and not isinstance(first_item, dict):
try:
if sort == "cost-asc":
query = query.order_by(Player.wara)
elif sort == "cost-desc":
query = query.order_by(-Player.wara)
elif sort == "name-asc":
query = query.order_by(Player.name)
elif sort == "name-desc":
query = query.order_by(-Player.name)
else:
query = query.order_by(Player.id)
except ImportError:
# Fall back to Python sorting if DB not available
pass
# Use Python sorting for mocks or if DB sort failed
if not hasattr(query, "order_by") or isinstance(query, InMemoryQueryResult):
def get_sort_key(player):
name = player.get("name", "")
wara = player.get("wara", 0)
player_id = player.get("id", 0)
if sort == "cost-asc":
return (wara, name, player_id)
elif sort == "cost-desc":
return (-wara, name, player_id)
elif sort == "name-asc":
return (name, wara, player_id)
elif sort == "name-desc":
return (name, wara, player_id) # Will use reverse=True
else:
return (player_id,)
# Use reverse for descending name sort
reverse_sort = sort == "name-desc"
sorted_list = sorted(list(query), key=get_sort_key, reverse=reverse_sort)
query = InMemoryQueryResult(sorted_list)
return query
@classmethod
def _query_to_player_dicts(
cls, query: QueryResult, short_output: bool = False
) -> List[Dict[str, Any]]:
"""Convert query results to list of player dicts."""
# Check if we have DB models or dicts
first_item = None
for item in query:
first_item = item
break
if first_item is None:
return []
# Convert all items through _player_to_dict to ensure filtering
players_data = []
for player in query:
player_dict = cls._player_to_dict(player, recurse=not short_output)
players_data.append(player_dict)
return players_data
@classmethod
def search_players(
cls,
query_str: str,
season: Optional[int] = None,
limit: int = 10,
short_output: bool = False,
) -> Dict[str, Any]:
"""
Search players by name with database-level filtering.
Performance optimized: Uses SQL LIKE for filtering instead of loading
all players into memory. Reduces query time from 15+ seconds to <500ms
for all-seasons searches.
Args:
query_str: Search query
season: Season to search (None/0 for all)
limit: Maximum results
short_output: Exclude related data
Returns:
Dict with count and matching players
"""
try:
from peewee import fn
from ..db_engine import Player
query_lower = query_str.lower()
search_all_seasons = season is None or season == 0
# Build database query with SQL LIKE for efficient filtering
# This filters at the database level instead of loading all players
if search_all_seasons:
# Search all seasons, order by season DESC (newest first)
query = (
Player.select()
.where(fn.Lower(Player.name).contains(query_lower))
.order_by(Player.season.desc(), Player.name)
.limit(limit * 2)
) # Get extra for exact match sorting
else:
# Search specific season
query = (
Player.select()
.where(
(Player.season == season)
& (fn.Lower(Player.name).contains(query_lower))
)
.order_by(Player.name)
.limit(limit * 2)
) # Get extra for exact match sorting
# Execute query and convert limited results to dicts
players = list(query)
player_dicts = cls._query_to_player_dicts(
InMemoryQueryResult(players), short_output=short_output
)
# Separate exact vs partial matches for proper ordering
exact_matches = []
partial_matches = []
for player in player_dicts:
name_lower = player.get("name", "").lower()
if name_lower == query_lower:
exact_matches.append(player)
else:
partial_matches.append(player)
# Combine and limit to requested amount
results = (exact_matches + partial_matches)[:limit]
return {
"count": len(results),
"total_matches": len(results), # Approximate since limited at DB
"all_seasons": search_all_seasons,
"players": results,
}
except Exception as e:
cls.log_error("Error searching players", e)
try:
raise HTTPException(
status_code=500, detail=f"Error searching players: {str(e)}"
)
except ImportError:
raise RuntimeError(f"Error searching players: {str(e)}")
@classmethod
def get_player(
cls, player_id: int, short_output: bool = False
) -> Optional[Dict[str, Any]]:
"""Get a single player by ID."""
try:
repo = cls._get_player_repo()
player = repo.get_by_id(player_id)
if player:
return cls._player_to_dict(player, recurse=not short_output)
return None
except Exception as e:
cls.log_error(f"Error fetching player {player_id}", e)
try:
raise HTTPException(
status_code=500,
detail=f"Error fetching player {player_id}: {str(e)}",
)
except ImportError:
raise RuntimeError(f"Error fetching player {player_id}: {str(e)}")
@classmethod
def _player_to_dict(cls, player, recurse: bool = True) -> Dict[str, Any]:
"""Convert player to dict, excluding deprecated fields."""
# If already a dict, filter and return
if isinstance(player, dict):
return {k: v for k, v in player.items() if k not in cls.EXCLUDED_FIELDS}
# Try to convert Peewee model with foreign key recursion
try:
# Use backrefs=False to avoid circular reference issues
player_dict = model_to_dict(player, recurse=recurse, backrefs=False)
# Filter out excluded fields
return {
k: v for k, v in player_dict.items() if k not in cls.EXCLUDED_FIELDS
}
except (ImportError, AttributeError, TypeError) as e:
# Log the error and fall back to non-recursive serialization
logger.warning(
f"Error in recursive player serialization: {e}, falling back to non-recursive"
)
try:
# Fallback to non-recursive serialization
player_dict = model_to_dict(player, recurse=False)
return {
k: v for k, v in player_dict.items() if k not in cls.EXCLUDED_FIELDS
}
except Exception as fallback_error:
# Final fallback to basic dict conversion
logger.error(
f"Error in non-recursive serialization: {fallback_error}, using basic dict"
)
player_dict = dict(player)
return {
k: v for k, v in player_dict.items() if k not in cls.EXCLUDED_FIELDS
}
@classmethod
def update_player(
cls, player_id: int, data: Dict[str, Any], token: str
) -> Dict[str, Any]:
"""Update a player (full update via PUT)."""
temp_service = cls()
temp_service.require_auth(token)
try:
# Verify player exists
repo = cls._get_player_repo()
if not repo.get_by_id(player_id):
raise HTTPException(
status_code=404, detail=f"Player ID {player_id} not found"
)
# Execute update
repo.update(data, player_id=player_id)
return cls.get_player(player_id)
except Exception as e:
cls.log_error(f"Error updating player {player_id}", e)
raise HTTPException(
status_code=500, detail=f"Error updating player {player_id}: {str(e)}"
)
finally:
temp_service.invalidate_related_cache(cls.cache_patterns)
@classmethod
def patch_player(
cls, player_id: int, data: Dict[str, Any], token: str
) -> Dict[str, Any]:
"""Patch a player (partial update)."""
temp_service = cls()
temp_service.require_auth(token)
try:
repo = cls._get_player_repo()
player = repo.get_by_id(player_id)
if not player:
raise HTTPException(
status_code=404, detail=f"Player ID {player_id} not found"
)
# Apply updates using repo
repo.update(data, player_id=player_id)
return cls.get_player(player_id)
except Exception as e:
cls.log_error(f"Error patching player {player_id}", e)
raise HTTPException(
status_code=500, detail=f"Error patching player {player_id}: {str(e)}"
)
finally:
temp_service.invalidate_related_cache(cls.cache_patterns)
@classmethod
def create_players(
cls, players_data: List[Dict[str, Any]], token: str
) -> Dict[str, Any]:
"""Create multiple players."""
temp_service = cls()
temp_service.require_auth(token)
try:
# Check for duplicates using repo
repo = cls._get_player_repo()
for player in players_data:
dupe = repo.get_or_none(
season=player.get("season"), name=player.get("name")
)
if dupe:
raise HTTPException(
status_code=500,
detail=f"Player {player.get('name')} already exists in Season {player.get('season')}",
)
# Insert in batches
repo.insert_many(players_data)
return {"message": f"Inserted {len(players_data)} players"}
except Exception as e:
cls.log_error("Error creating players", e)
raise HTTPException(
status_code=500, detail=f"Error creating players: {str(e)}"
)
finally:
temp_service.invalidate_related_cache(cls.cache_patterns)
@classmethod
def delete_player(cls, player_id: int, token: str) -> Dict[str, str]:
"""Delete a player."""
temp_service = cls()
temp_service.require_auth(token)
try:
repo = cls._get_player_repo()
if not repo.get_by_id(player_id):
raise HTTPException(
status_code=404, detail=f"Player ID {player_id} not found"
)
repo.delete_by_id(player_id)
return {"message": f"Player {player_id} deleted"}
except Exception as e:
cls.log_error(f"Error deleting player {player_id}", e)
raise HTTPException(
status_code=500, detail=f"Error deleting player {player_id}: {str(e)}"
)
finally:
temp_service.invalidate_related_cache(cls.cache_patterns)
@classmethod
def _format_player_csv(cls, players: List[Dict]) -> str:
"""Format player list as CSV - works with both real DB and mocks."""
if not players:
return ""
# Flatten nested objects for CSV export
flattened_players = []
for player in players:
flat_player = player.copy()
# Flatten team object to just abbreviation
if isinstance(flat_player.get("team"), dict):
flat_player["team"] = flat_player["team"].get("abbrev", "")
# Flatten sbaplayer object to just ID
if isinstance(flat_player.get("sbaplayer"), dict):
flat_player["sbaplayer"] = flat_player["sbaplayer"].get("id", "")
flattened_players.append(flat_player)
# Build CSV from flattened data
output = io.StringIO()
if flattened_players:
writer = csv.DictWriter(output, fieldnames=flattened_players[0].keys())
writer.writeheader()
writer.writerows(flattened_players)
return output.getvalue()
class InMemoryQueryResult:
"""
In-memory query result for mock repositories.
Supports filtering, sorting, and iteration.
"""
def __init__(self, items: List[Dict[str, Any]]):
self._items = list(items)
def where(self, *conditions) -> "InMemoryQueryResult":
"""Apply filter conditions (no-op for compatibility)."""
return self
def order_by(self, *fields) -> "InMemoryQueryResult":
"""Apply sort (no-op, sorting done by service)."""
return self
def count(self) -> int:
return len(self._items)
def __iter__(self):
return iter(self._items)
def __len__(self):
return len(self._items)
def __getitem__(self, index):
return self._items[index]
class RealPlayerRepository:
"""Real database repository implementation."""
def __init__(self, model_class):
self._model = model_class
def select_season(self, season: int):
"""Return query for season. Season=0 or None returns all seasons."""
if season == 0 or season is None:
return self._model.select()
return self._model.select().where(self._model.season == season)
def get_by_id(self, player_id: int):
"""Get player by ID."""
return self._model.get_or_none(self._model.id == player_id)
def get_or_none(self, **conditions):
"""Get player matching conditions."""
try:
return self._model.get_or_none(**conditions)
except Exception:
return None
def update(self, data: Dict, player_id: int) -> int:
"""Update player."""
return Player.update(**data).where(Player.id == player_id).execute()
def insert_many(self, data: List[Dict]) -> int:
"""Insert multiple players."""
with Player._meta.database.atomic():
Player.insert_many(data).execute()
return len(data)
def delete_by_id(self, player_id: int) -> int:
"""Delete player by ID."""
return Player.delete().where(Player.id == player_id).execute()