""" Player Service - Dependency Injection Version Business logic for player operations with injectable dependencies. """ import logging from typing import List, Optional, Dict, Any from peewee import fn as peewee_fn from .base import BaseService from .interfaces import AbstractPlayerRepository from .mocks import MockPlayerRepository logger = logging.getLogger('discord_app') class PlayerService(BaseService): """Service for player-related operations with dependency injection.""" cache_patterns = [ "players*", "players-search*", "player*", "team-roster*" ] def __init__( self, player_repo: Optional[AbstractPlayerRepository] = None, **kwargs ): """ Initialize PlayerService with optional repository. Args: player_repo: AbstractPlayerRepository implementation (mock or real) **kwargs: Additional arguments passed to BaseService """ super().__init__(player_repo=player_repo, **kwargs) self._player_repo = player_repo def get_players( self, season: Optional[int] = None, team_id: Optional[List[int]] = None, pos: Optional[List[str]] = None, strat_code: Optional[List[str]] = None, name: Optional[str] = None, is_injured: Optional[bool] = None, sort: Optional[str] = None, short_output: bool = False, as_csv: bool = False ) -> Dict[str, Any]: """ Get players with filtering and sorting. Args: season: Filter by season team_id: Filter by team IDs pos: Filter by positions strat_code: Filter by strat codes name: Filter by name (exact match) is_injured: Filter by injury status sort: Sort order short_output: Exclude related data as_csv: Return as CSV format Returns: Dict with count and players list, or CSV string """ try: # Build base query if season is not None: query = self.player_repo.select_season(season) else: query = self.player_repo.select_season(0) # Get all, filter below # If no season specified, get all and filter if season is None: # Get all players via default repo or iterate all_items = list(self.player_repo.select_season(0)) if hasattr(self.player_repo, 'select_season') else [] # Fall back to get_by_id for all if not all_items: # Default behavior for non-mock repos from ..db_engine import Player all_items = list(Player.select()) query = MockQueryResult([p if isinstance(p, dict) else self._player_to_dict(p) for p in all_items]) # Apply filters if team_id: query = query.where(Player.team_id << team_id) if strat_code: code_list = [x.lower() for x in strat_code] query = query.where(peewee_fn.Lower(Player.strat_code) << code_list) if name: query = query.where(peewee_fn.lower(Player.name) == name.lower()) if pos: p_list = [x.upper() for x in pos] pos_conditions = ( (Player.pos_1 << p_list) | (Player.pos_2 << p_list) | (Player.pos_3 << p_list) | (Player.pos_4 << p_list) | (Player.pos_5 << p_list) | (Player.pos_6 << p_list) | (Player.pos_7 << p_list) | (Player.pos_8 << p_list) ) query = query.where(pos_conditions) if is_injured is not None: query = query.where(Player.il_return.is_null(False)) # Apply sorting if sort == "cost-asc": query = query.order_by(Player.wara) elif sort == "cost-desc": query = query.order_by(-Player.wara) elif sort == "name-asc": query = query.order_by(Player.name) elif sort == "name-desc": query = query.order_by(-Player.name) else: query = query.order_by(Player.id) # Return format if as_csv: return self._format_player_csv(query) else: players_data = [ self._player_to_dict(p, recurse=not short_output) for p in query ] return { "count": query.count(), "players": players_data } except Exception as e: self.handle_error(f"Error fetching players: {e}", e) finally: self.close_db() def search_players( self, query_str: str, season: Optional[int] = None, limit: int = 10, short_output: bool = False ) -> Dict[str, Any]: """ Search players by name with fuzzy matching. Args: query_str: Search query season: Season to search (None/0 for all) limit: Maximum results short_output: Exclude related data Returns: Dict with count and matching players """ try: query_lower = query_str.lower() search_all_seasons = season is None or season == 0 # Build base query if search_all_seasons: all_players = self.player_repo.select_season(0) if hasattr(all_players, '__iter__') and not isinstance(all_players, list): all_players = list(all_players) else: all_players = self.player_repo.select_season(season) if hasattr(all_players, '__iter__') and not isinstance(all_players, list): all_players = list(all_players) # Convert to list if needed if not isinstance(all_players, list): from ..db_engine import Player all_players = list(Player.select()) # Sort by relevance (exact matches first) exact_matches = [] partial_matches = [] for player in all_players: player_dict = player if isinstance(player, dict) else self._player_to_dict(player) name_lower = player_dict.get('name', '').lower() if name_lower == query_lower: exact_matches.append(player_dict) elif query_lower in name_lower: partial_matches.append(player_dict) # Sort by season within each group if search_all_seasons: exact_matches.sort(key=lambda p: p.get('season', 0), reverse=True) partial_matches.sort(key=lambda p: p.get('season', 0), reverse=True) # Combine and limit results = (exact_matches + partial_matches)[:limit] return { "count": len(results), "total_matches": len(exact_matches + partial_matches), "all_seasons": search_all_seasons, "players": results } except Exception as e: self.handle_error(f"Error searching players: {e}", e) finally: self.close_db() def get_player(self, player_id: int, short_output: bool = False) -> Optional[Dict[str, Any]]: """Get a single player by ID.""" try: player = self.player_repo.get_by_id(player_id) if player: return self._player_to_dict(player, recurse=not short_output) return None except Exception as e: self.handle_error(f"Error fetching player {player_id}: {e}", e) finally: self.close_db() def update_player(self, player_id: int, data: Dict[str, Any], token: str) -> Dict[str, Any]: """Update a player (full update via PUT).""" self.require_auth(token) try: # Verify player exists if not self.player_repo.get_by_id(player_id): from fastapi import HTTPException raise HTTPException(status_code=404, detail=f"Player ID {player_id} not found") # Execute update self.player_repo.update(data, Player.id == player_id) return self.get_player(player_id) except Exception as e: self.handle_error(f"Error updating player {player_id}: {e}", e) finally: self.invalidate_related_cache(self.cache_patterns) self.close_db() def patch_player(self, player_id: int, data: Dict[str, Any], token: str) -> Dict[str, Any]: """Patch a player (partial update).""" self.require_auth(token) try: player = self.player_repo.get_by_id(player_id) if not player: from fastapi import HTTPException raise HTTPException(status_code=404, detail=f"Player ID {player_id} not found") # Apply updates for key, value in data.items(): if value is not None and hasattr(player, key): setattr(player, key, value) # Save using repo if hasattr(player, 'save'): player.save() return self.get_player(player_id) except Exception as e: self.handle_error(f"Error patching player {player_id}: {e}", e) finally: self.invalidate_related_cache(self.cache_patterns) self.close_db() def create_players(self, players_data: List[Dict[str, Any]], token: str) -> Dict[str, Any]: """Create multiple players.""" self.require_auth(token) try: # Check for duplicates for player in players_data: dupe = self.player_repo.get_or_none( Player.season == player.get("season"), Player.name == player.get("name") ) if dupe: from fastapi import HTTPException raise HTTPException( status_code=500, detail=f"Player {player.get('name')} already exists in Season {player.get('season')}" ) # Insert in batches self.player_repo.insert_many(players_data) return {"message": f"Inserted {len(players_data)} players"} except Exception as e: self.handle_error(f"Error creating players: {e}", e) finally: self.invalidate_related_cache(self.cache_patterns) self.close_db() def delete_player(self, player_id: int, token: str) -> Dict[str, str]: """Delete a player.""" self.require_auth(token) try: if not self.player_repo.get_by_id(player_id): from fastapi import HTTPException raise HTTPException(status_code=404, detail=f"Player ID {player_id} not found") self.player_repo.delete_by_id(player_id) return {"message": f"Player {player_id} deleted"} except Exception as e: self.handle_error(f"Error deleting player {player_id}: {e}", e) finally: self.invalidate_related_cache(self.cache_patterns) self.close_db() def _player_to_dict(self, player, recurse: bool = True) -> Dict[str, Any]: """Convert player to dict.""" from playhouse.shortcuts import model_to_dict from ..db_engine import Player if isinstance(player, dict): return player return model_to_dict(player, recurse=recurse) def _format_player_csv(self, query) -> str: """Format player query results as CSV.""" from ..db_engine import Player, db from pandas import DataFrame headers = [ "name", "wara", "image", "image2", "team", "season", "pitcher_injury", "pos_1", "pos_2", "pos_3", "pos_4", "pos_5", "pos_6", "pos_7", "pos_8", "last_game", "last_game2", "il_return", "demotion_week", "headshot", "vanity_card", "strat_code", "bbref_id", "injury_rating", "player_id", "sbaref_id" ] rows = [] for player in query: player_dict = self._player_to_dict(player, recurse=False) strat_code = player_dict.get('strat_code', '') or '' if ',' in strat_code: strat_code = strat_code.replace(",", "-_-") rows.append([ player_dict.get('name', ''), player_dict.get('wara', 0), player_dict.get('image', ''), player_dict.get('image2', ''), player_dict.get('team', {}).get('abbrev', '') if isinstance(player_dict.get('team'), dict) else '', player_dict.get('season', 0), player_dict.get('pitcher_injury', ''), player_dict.get('pos_1', ''), player_dict.get('pos_2', ''), player_dict.get('pos_3', ''), player_dict.get('pos_4', ''), player_dict.get('pos_5', ''), player_dict.get('pos_6', ''), player_dict.get('pos_7', ''), player_dict.get('pos_8', ''), player_dict.get('last_game', ''), player_dict.get('last_game2', ''), player_dict.get('il_return', ''), player_dict.get('demotion_week', ''), player_dict.get('headshot', ''), player_dict.get('vanity_card', ''), strat_code, player_dict.get('bbref_id', ''), player_dict.get('injury_rating', ''), player_dict.get('id', 0), player_dict.get('sbaplayer_id', 0) ]) all_data = [headers] + rows return DataFrame(all_data).to_csv(header=False, index=False) # Import Player for use in methods from ..db_engine import Player