- Add missing imports: json, csv, io, model_to_dict - Remove unused imports: Any, Dict, Type, invalidate_cache - Remove redundant f-string prefixes from static error messages - Format code with black - All ruff and black checks pass - All 76 unit tests pass (9 skipped) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
625 lines
21 KiB
Python
625 lines
21 KiB
Python
"""
|
|
Player Service - Dependency Injection Version
|
|
Business logic for player operations with injectable dependencies.
|
|
"""
|
|
|
|
import csv
|
|
import io
|
|
import logging
|
|
from typing import List, Optional, Dict, Any, TYPE_CHECKING
|
|
|
|
from peewee import fn as peewee_fn
|
|
from playhouse.shortcuts import model_to_dict
|
|
|
|
from .base import BaseService
|
|
from .interfaces import AbstractPlayerRepository, QueryResult
|
|
from ..db_engine import Player
|
|
|
|
if TYPE_CHECKING:
|
|
from .base import ServiceConfig
|
|
|
|
# Try to import HTTPException from FastAPI, fall back to custom for testing
|
|
try:
|
|
from fastapi import HTTPException
|
|
except ImportError:
|
|
# Custom exception for testing without FastAPI
|
|
class HTTPException(Exception):
|
|
def __init__(self, status_code: int, detail: str):
|
|
self.status_code = status_code
|
|
self.detail = detail
|
|
super().__init__(detail)
|
|
|
|
|
|
logger = logging.getLogger("discord_app")
|
|
|
|
|
|
class PlayerService(BaseService):
|
|
"""Service for player-related operations with dependency injection."""
|
|
|
|
cache_patterns = ["players*", "players-search*", "player*", "team-roster*"]
|
|
|
|
# Class-level repository for dependency injection
|
|
_injected_repo: Optional[AbstractPlayerRepository] = None
|
|
|
|
def __init__(
|
|
self,
|
|
player_repo: Optional[AbstractPlayerRepository] = None,
|
|
config: Optional["ServiceConfig"] = None,
|
|
**kwargs,
|
|
):
|
|
"""
|
|
Initialize PlayerService with optional repository.
|
|
|
|
Args:
|
|
player_repo: AbstractPlayerRepository implementation (mock or real)
|
|
config: ServiceConfig with injected dependencies
|
|
**kwargs: Additional arguments passed to BaseService
|
|
"""
|
|
super().__init__(player_repo=player_repo, config=config, **kwargs)
|
|
# Store injected repo at class level for classmethod access
|
|
# Check both direct injection and config
|
|
repo_to_inject = player_repo
|
|
if config is not None and config.player_repo is not None:
|
|
repo_to_inject = config.player_repo
|
|
if repo_to_inject is not None:
|
|
PlayerService._injected_repo = repo_to_inject
|
|
|
|
@classmethod
|
|
def _get_player_repo(cls) -> AbstractPlayerRepository:
|
|
"""Get the player repository, using real DB if not injected."""
|
|
if cls._injected_repo is not None:
|
|
return cls._injected_repo
|
|
# Fall back to real DB models for production
|
|
return cls._get_real_repo()
|
|
|
|
@classmethod
|
|
def _get_real_repo(cls) -> "RealPlayerRepository":
|
|
"""Get a real DB repository for production use."""
|
|
return RealPlayerRepository(Player)
|
|
|
|
@classmethod
|
|
def get_players(
|
|
cls,
|
|
season: Optional[int] = None,
|
|
team_id: Optional[List[int]] = None,
|
|
pos: Optional[List[str]] = None,
|
|
strat_code: Optional[List[str]] = None,
|
|
name: Optional[str] = None,
|
|
is_injured: Optional[bool] = None,
|
|
sort: Optional[str] = None,
|
|
short_output: bool = False,
|
|
as_csv: bool = False,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Get players with filtering and sorting.
|
|
|
|
Args:
|
|
season: Filter by season
|
|
team_id: Filter by team IDs
|
|
pos: Filter by positions
|
|
strat_code: Filter by strat codes
|
|
name: Filter by name (exact match)
|
|
is_injured: Filter by injury status
|
|
sort: Sort order
|
|
short_output: Exclude related data
|
|
as_csv: Return as CSV format
|
|
|
|
Returns:
|
|
Dict with count and players list, or CSV string
|
|
"""
|
|
try:
|
|
# Get base query from repo
|
|
repo = cls._get_player_repo()
|
|
if season is not None:
|
|
query = repo.select_season(season)
|
|
else:
|
|
query = repo.select_season(0)
|
|
|
|
# Apply filters using repo-agnostic approach
|
|
query = cls._apply_player_filters(
|
|
query,
|
|
team_id=team_id,
|
|
pos=pos,
|
|
strat_code=strat_code,
|
|
name=name,
|
|
is_injured=is_injured,
|
|
)
|
|
|
|
# Apply sorting
|
|
query = cls._apply_player_sort(query, sort)
|
|
|
|
# Convert to list of dicts
|
|
players_data = cls._query_to_player_dicts(query, short_output)
|
|
|
|
# Return format
|
|
if as_csv:
|
|
return cls._format_player_csv(players_data)
|
|
else:
|
|
return {"count": len(players_data), "players": players_data}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching players: {e}")
|
|
try:
|
|
raise HTTPException(
|
|
status_code=500, detail=f"Error fetching players: {str(e)}"
|
|
)
|
|
except ImportError:
|
|
raise RuntimeError(f"Error fetching players: {str(e)}")
|
|
|
|
@classmethod
|
|
def _apply_player_filters(
|
|
cls,
|
|
query: QueryResult,
|
|
team_id: Optional[List[int]] = None,
|
|
pos: Optional[List[str]] = None,
|
|
strat_code: Optional[List[str]] = None,
|
|
name: Optional[str] = None,
|
|
is_injured: Optional[bool] = None,
|
|
) -> QueryResult:
|
|
"""Apply player filters in a repo-agnostic way."""
|
|
|
|
# Check if repo supports where() method (real DB)
|
|
# Only use DB-native filtering if:
|
|
# 1. Query has where() method
|
|
# 2. Items are Peewee models (not dicts)
|
|
first_item = None
|
|
for item in query:
|
|
first_item = item
|
|
break
|
|
|
|
# Use DB-native filtering only for real Peewee models
|
|
if first_item is not None and not isinstance(first_item, dict):
|
|
try:
|
|
if team_id:
|
|
query = query.where(Player.team_id << team_id)
|
|
|
|
if strat_code:
|
|
code_list = [x.lower() for x in strat_code]
|
|
query = query.where(peewee_fn.Lower(Player.strat_code) << code_list)
|
|
|
|
if name:
|
|
query = query.where(peewee_fn.lower(Player.name) == name.lower())
|
|
|
|
if pos:
|
|
p_list = [x.upper() for x in pos]
|
|
pos_conditions = (
|
|
(Player.pos_1 << p_list)
|
|
| (Player.pos_2 << p_list)
|
|
| (Player.pos_3 << p_list)
|
|
| (Player.pos_4 << p_list)
|
|
| (Player.pos_5 << p_list)
|
|
| (Player.pos_6 << p_list)
|
|
| (Player.pos_7 << p_list)
|
|
| (Player.pos_8 << p_list)
|
|
)
|
|
query = query.where(pos_conditions)
|
|
|
|
if is_injured is not None:
|
|
if is_injured:
|
|
query = query.where(Player.il_return.is_null(False))
|
|
else:
|
|
query = query.where(Player.il_return.is_null(True))
|
|
except ImportError:
|
|
# DB not available, fall back to Python filtering
|
|
pass
|
|
else:
|
|
# Use Python filtering for mocks
|
|
def matches(player):
|
|
if team_id and player.get("team_id") not in team_id:
|
|
return False
|
|
if strat_code:
|
|
code_list = [s.lower() for s in strat_code]
|
|
player_code = (player.get("strat_code") or "").lower()
|
|
if player_code not in code_list:
|
|
return False
|
|
if name and (player.get("name") or "").lower() != name.lower():
|
|
return False
|
|
if pos:
|
|
p_list = [p.upper() for p in pos]
|
|
player_pos = [
|
|
player.get(f"pos_{i}")
|
|
for i in range(1, 9)
|
|
if player.get(f"pos_{i}")
|
|
]
|
|
if not any(p in p_list for p in player_pos):
|
|
return False
|
|
if is_injured is not None:
|
|
has_injury = player.get("il_return") is not None
|
|
if is_injured and not has_injury:
|
|
return False
|
|
if not is_injured and has_injury:
|
|
return False
|
|
return True
|
|
|
|
# Filter in memory
|
|
filtered = [p for p in query if matches(p)]
|
|
query = InMemoryQueryResult(filtered)
|
|
|
|
return query
|
|
|
|
@classmethod
|
|
def _apply_player_sort(
|
|
cls, query: QueryResult, sort: Optional[str] = None
|
|
) -> QueryResult:
|
|
"""Apply player sorting in a repo-agnostic way."""
|
|
|
|
# Check if items are Peewee models (not dicts)
|
|
first_item = None
|
|
for item in query:
|
|
first_item = item
|
|
break
|
|
|
|
# Use DB-native sorting only for real Peewee models
|
|
if first_item is not None and not isinstance(first_item, dict):
|
|
try:
|
|
|
|
if sort == "cost-asc":
|
|
query = query.order_by(Player.wara)
|
|
elif sort == "cost-desc":
|
|
query = query.order_by(-Player.wara)
|
|
elif sort == "name-asc":
|
|
query = query.order_by(Player.name)
|
|
elif sort == "name-desc":
|
|
query = query.order_by(-Player.name)
|
|
else:
|
|
query = query.order_by(Player.id)
|
|
except ImportError:
|
|
# Fall back to Python sorting if DB not available
|
|
pass
|
|
|
|
# Use Python sorting for mocks or if DB sort failed
|
|
if not hasattr(query, "order_by") or isinstance(query, InMemoryQueryResult):
|
|
|
|
def get_sort_key(player):
|
|
name = player.get("name", "")
|
|
wara = player.get("wara", 0)
|
|
player_id = player.get("id", 0)
|
|
|
|
if sort == "cost-asc":
|
|
return (wara, name, player_id)
|
|
elif sort == "cost-desc":
|
|
return (-wara, name, player_id)
|
|
elif sort == "name-asc":
|
|
return (name, wara, player_id)
|
|
elif sort == "name-desc":
|
|
return (name, wara, player_id) # Will use reverse=True
|
|
else:
|
|
return (player_id,)
|
|
|
|
# Use reverse for descending name sort
|
|
reverse_sort = sort == "name-desc"
|
|
sorted_list = sorted(list(query), key=get_sort_key, reverse=reverse_sort)
|
|
query = InMemoryQueryResult(sorted_list)
|
|
|
|
return query
|
|
|
|
@classmethod
|
|
def _query_to_player_dicts(
|
|
cls, query: QueryResult, short_output: bool = False
|
|
) -> List[Dict[str, Any]]:
|
|
"""Convert query results to list of player dicts."""
|
|
|
|
# Check if we have DB models or dicts
|
|
first_item = None
|
|
for item in query:
|
|
first_item = item
|
|
break
|
|
|
|
if first_item is None:
|
|
return []
|
|
|
|
# If items are already dicts (from mock)
|
|
if isinstance(first_item, dict):
|
|
players_data = list(query)
|
|
if short_output:
|
|
return players_data
|
|
# Add computed fields if needed
|
|
return players_data
|
|
|
|
# If items are DB models (from real repo)
|
|
|
|
players_data = []
|
|
for player in query:
|
|
player_dict = model_to_dict(player, recurse=not short_output)
|
|
players_data.append(player_dict)
|
|
|
|
return players_data
|
|
|
|
@classmethod
|
|
def search_players(
|
|
cls,
|
|
query_str: str,
|
|
season: Optional[int] = None,
|
|
limit: int = 10,
|
|
short_output: bool = False,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Search players by name with fuzzy matching.
|
|
|
|
Args:
|
|
query_str: Search query
|
|
season: Season to search (None/0 for all)
|
|
limit: Maximum results
|
|
short_output: Exclude related data
|
|
|
|
Returns:
|
|
Dict with count and matching players
|
|
"""
|
|
try:
|
|
query_lower = query_str.lower()
|
|
search_all_seasons = season is None or season == 0
|
|
|
|
# Get all players from repo
|
|
repo = cls._get_player_repo()
|
|
if search_all_seasons:
|
|
all_players = list(repo.select_season(0))
|
|
else:
|
|
all_players = list(repo.select_season(season))
|
|
|
|
# Convert to dicts if needed
|
|
all_player_dicts = cls._query_to_player_dicts(
|
|
InMemoryQueryResult(all_players), short_output=True
|
|
)
|
|
|
|
# Sort by relevance (exact matches first)
|
|
exact_matches = []
|
|
partial_matches = []
|
|
|
|
for player in all_player_dicts:
|
|
name_lower = player.get("name", "").lower()
|
|
|
|
if name_lower == query_lower:
|
|
exact_matches.append(player)
|
|
elif query_lower in name_lower:
|
|
partial_matches.append(player)
|
|
|
|
# Sort by season within each group (newest first)
|
|
if search_all_seasons:
|
|
exact_matches.sort(key=lambda p: p.get("season", 0), reverse=True)
|
|
partial_matches.sort(key=lambda p: p.get("season", 0), reverse=True)
|
|
|
|
# Combine and limit
|
|
results = (exact_matches + partial_matches)[:limit]
|
|
|
|
return {
|
|
"count": len(results),
|
|
"total_matches": len(exact_matches + partial_matches),
|
|
"all_seasons": search_all_seasons,
|
|
"players": results,
|
|
}
|
|
|
|
except Exception as e:
|
|
cls.log_error("Error searching players", e)
|
|
try:
|
|
raise HTTPException(
|
|
status_code=500, detail=f"Error searching players: {str(e)}"
|
|
)
|
|
except ImportError:
|
|
raise RuntimeError(f"Error searching players: {str(e)}")
|
|
|
|
@classmethod
|
|
def get_player(
|
|
cls, player_id: int, short_output: bool = False
|
|
) -> Optional[Dict[str, Any]]:
|
|
"""Get a single player by ID."""
|
|
try:
|
|
repo = cls._get_player_repo()
|
|
player = repo.get_by_id(player_id)
|
|
if player:
|
|
return cls._player_to_dict(player, recurse=not short_output)
|
|
return None
|
|
except Exception as e:
|
|
cls.log_error(f"Error fetching player {player_id}", e)
|
|
try:
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Error fetching player {player_id}: {str(e)}",
|
|
)
|
|
except ImportError:
|
|
raise RuntimeError(f"Error fetching player {player_id}: {str(e)}")
|
|
|
|
@classmethod
|
|
def _player_to_dict(cls, player, recurse: bool = True) -> Dict[str, Any]:
|
|
"""Convert player to dict."""
|
|
# If already a dict, return as-is
|
|
if isinstance(player, dict):
|
|
return player
|
|
|
|
# Try to convert Peewee model
|
|
try:
|
|
return model_to_dict(player, recurse=recurse)
|
|
except ImportError:
|
|
# Fall back to basic dict conversion
|
|
return dict(player)
|
|
|
|
@classmethod
|
|
def update_player(
|
|
cls, player_id: int, data: Dict[str, Any], token: str
|
|
) -> Dict[str, Any]:
|
|
"""Update a player (full update via PUT)."""
|
|
temp_service = cls()
|
|
temp_service.require_auth(token)
|
|
|
|
try:
|
|
# Verify player exists
|
|
repo = cls._get_player_repo()
|
|
if not repo.get_by_id(player_id):
|
|
raise HTTPException(
|
|
status_code=404, detail=f"Player ID {player_id} not found"
|
|
)
|
|
|
|
# Execute update
|
|
repo.update(data, player_id=player_id)
|
|
|
|
return cls.get_player(player_id)
|
|
|
|
except Exception as e:
|
|
cls.log_error(f"Error updating player {player_id}", e)
|
|
raise HTTPException(
|
|
status_code=500, detail=f"Error updating player {player_id}: {str(e)}"
|
|
)
|
|
|
|
@classmethod
|
|
def patch_player(
|
|
cls, player_id: int, data: Dict[str, Any], token: str
|
|
) -> Dict[str, Any]:
|
|
"""Patch a player (partial update)."""
|
|
temp_service = cls()
|
|
temp_service.require_auth(token)
|
|
|
|
try:
|
|
repo = cls._get_player_repo()
|
|
player = repo.get_by_id(player_id)
|
|
if not player:
|
|
raise HTTPException(
|
|
status_code=404, detail=f"Player ID {player_id} not found"
|
|
)
|
|
|
|
# Apply updates using repo
|
|
repo.update(data, player_id=player_id)
|
|
|
|
return cls.get_player(player_id)
|
|
|
|
except Exception as e:
|
|
cls.log_error(f"Error patching player {player_id}", e)
|
|
raise HTTPException(
|
|
status_code=500, detail=f"Error patching player {player_id}: {str(e)}"
|
|
)
|
|
|
|
@classmethod
|
|
def create_players(
|
|
cls, players_data: List[Dict[str, Any]], token: str
|
|
) -> Dict[str, Any]:
|
|
"""Create multiple players."""
|
|
temp_service = cls()
|
|
temp_service.require_auth(token)
|
|
|
|
try:
|
|
# Check for duplicates using repo
|
|
repo = cls._get_player_repo()
|
|
for player in players_data:
|
|
dupe = repo.get_or_none(
|
|
season=player.get("season"), name=player.get("name")
|
|
)
|
|
if dupe:
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Player {player.get('name')} already exists in Season {player.get('season')}",
|
|
)
|
|
|
|
# Insert in batches
|
|
repo.insert_many(players_data)
|
|
|
|
return {"message": f"Inserted {len(players_data)} players"}
|
|
|
|
except Exception as e:
|
|
cls.log_error("Error creating players", e)
|
|
raise HTTPException(
|
|
status_code=500, detail=f"Error creating players: {str(e)}"
|
|
)
|
|
|
|
@classmethod
|
|
def delete_player(cls, player_id: int, token: str) -> Dict[str, str]:
|
|
"""Delete a player."""
|
|
temp_service = cls()
|
|
temp_service.require_auth(token)
|
|
|
|
try:
|
|
repo = cls._get_player_repo()
|
|
if not repo.get_by_id(player_id):
|
|
raise HTTPException(
|
|
status_code=404, detail=f"Player ID {player_id} not found"
|
|
)
|
|
|
|
repo.delete_by_id(player_id)
|
|
|
|
return {"message": f"Player {player_id} deleted"}
|
|
|
|
except Exception as e:
|
|
cls.log_error(f"Error deleting player {player_id}", e)
|
|
raise HTTPException(
|
|
status_code=500, detail=f"Error deleting player {player_id}: {str(e)}"
|
|
)
|
|
|
|
@classmethod
|
|
def _format_player_csv(cls, players: List[Dict]) -> str:
|
|
"""Format player list as CSV - works with both real DB and mocks."""
|
|
if not players:
|
|
return ""
|
|
|
|
# Build CSV from dict data (works with mocks)
|
|
|
|
output = io.StringIO()
|
|
if players:
|
|
writer = csv.DictWriter(output, fieldnames=players[0].keys())
|
|
writer.writeheader()
|
|
writer.writerows(players)
|
|
|
|
return output.getvalue()
|
|
|
|
|
|
class InMemoryQueryResult:
|
|
"""
|
|
In-memory query result for mock repositories.
|
|
Supports filtering, sorting, and iteration.
|
|
"""
|
|
|
|
def __init__(self, items: List[Dict[str, Any]]):
|
|
self._items = list(items)
|
|
|
|
def where(self, *conditions) -> "InMemoryQueryResult":
|
|
"""Apply filter conditions (no-op for compatibility)."""
|
|
return self
|
|
|
|
def order_by(self, *fields) -> "InMemoryQueryResult":
|
|
"""Apply sort (no-op, sorting done by service)."""
|
|
return self
|
|
|
|
def count(self) -> int:
|
|
return len(self._items)
|
|
|
|
def __iter__(self):
|
|
return iter(self._items)
|
|
|
|
def __len__(self):
|
|
return len(self._items)
|
|
|
|
def __getitem__(self, index):
|
|
return self._items[index]
|
|
|
|
|
|
class RealPlayerRepository:
|
|
"""Real database repository implementation."""
|
|
|
|
def __init__(self, model_class):
|
|
self._model = model_class
|
|
|
|
def select_season(self, season: int):
|
|
"""Return query for season."""
|
|
return self._model.select().where(self._model.season == season)
|
|
|
|
def get_by_id(self, player_id: int):
|
|
"""Get player by ID."""
|
|
return self._model.get_or_none(self._model.id == player_id)
|
|
|
|
def get_or_none(self, **conditions):
|
|
"""Get player matching conditions."""
|
|
try:
|
|
return self._model.get_or_none(**conditions)
|
|
except Exception:
|
|
return None
|
|
|
|
def update(self, data: Dict, player_id: int) -> int:
|
|
"""Update player."""
|
|
return Player.update(**data).where(Player.id == player_id).execute()
|
|
|
|
def insert_many(self, data: List[Dict]) -> int:
|
|
"""Insert multiple players."""
|
|
with Player._meta.database.atomic():
|
|
Player.insert_many(data).execute()
|
|
return len(data)
|
|
|
|
def delete_by_id(self, player_id: int) -> int:
|
|
"""Delete player by ID."""
|
|
return Player.delete().where(Player.id == player_id).execute()
|