""" Team Service Business logic for team operations: - CRUD operations - Roster management - Cache management """ import copy import logging from typing import List, Optional, Dict, Any, Literal, TYPE_CHECKING from playhouse.shortcuts import model_to_dict from .base import BaseService from .interfaces import AbstractTeamRepository if TYPE_CHECKING: from .base import ServiceConfig # Try to import HTTPException from FastAPI, fall back to custom for testing try: from fastapi import HTTPException except ImportError: # Custom exception for testing without FastAPI class HTTPException(Exception): def __init__(self, status_code: int, detail: str): self.status_code = status_code self.detail = detail super().__init__(detail) logger = logging.getLogger("discord_app") class TeamService(BaseService): """Service for team-related operations.""" cache_patterns = ["teams*", "team*", "team-roster*"] # Class-level repository for dependency injection _injected_repo: Optional[AbstractTeamRepository] = None def __init__( self, team_repo: Optional[AbstractTeamRepository] = None, config: Optional["ServiceConfig"] = None, **kwargs, ): """ Initialize TeamService with optional repository. Args: team_repo: AbstractTeamRepository implementation (mock or real) config: ServiceConfig with injected dependencies **kwargs: Additional arguments passed to BaseService """ super().__init__(team_repo=team_repo, config=config, **kwargs) # Store injected repo at class level for classmethod access # Check both direct injection and config repo_to_inject = team_repo if config is not None and config.team_repo is not None: repo_to_inject = config.team_repo if repo_to_inject is not None: TeamService._injected_repo = repo_to_inject @classmethod def _get_team_repo(cls) -> AbstractTeamRepository: """Get the team repository, using real DB if not injected.""" if cls._injected_repo is not None: return cls._injected_repo # Fall back to real DB models for production return cls._get_real_repo() @classmethod def _get_real_repo(cls) -> "RealTeamRepository": """Get a real DB repository for production use.""" from ..db_engine import Team # Lazy import to avoid loading DB in tests return RealTeamRepository(Team) @classmethod def get_teams( cls, season: Optional[int] = None, owner_id: Optional[List[int]] = None, manager_id: Optional[List[int]] = None, team_abbrev: Optional[List[str]] = None, active_only: bool = False, short_output: bool = False, as_csv: bool = False, ) -> Dict[str, Any]: """ Get teams with filtering. Args: season: Filter by season owner_id: Filter by Discord owner ID manager_id: Filter by manager IDs team_abbrev: Filter by abbreviations active_only: Exclude IL/MiL teams short_output: Exclude related data as_csv: Return as CSV Returns: Dict with count and teams list, or CSV string """ try: repo = cls._get_team_repo() if season is not None: query = repo.select_season(season) else: query = repo.select_season(0) # 0 means all seasons # Convert to list and apply Python filters teams_list = list(query) # Apply filters if manager_id: teams_list = [ t for t in teams_list if cls._team_has_manager(t, manager_id) ] if owner_id: teams_list = [t for t in teams_list if cls._team_has_owner(t, owner_id)] if team_abbrev: abbrev_list = [x.lower() for x in team_abbrev] teams_list = [ t for t in teams_list if cls._get_team_field(t, "abbrev", "").lower() in abbrev_list ] if active_only: teams_list = [ t for t in teams_list if not cls._get_team_field(t, "abbrev", "").endswith(("IL", "MiL")) ] # Sort by team ID ascending teams_list.sort(key=lambda t: cls._get_team_field(t, 'id', 0)) # Convert to dicts teams_data = [cls._team_to_dict(t, short_output) for t in teams_list] if as_csv: return cls._format_team_csv(teams_data) return {"count": len(teams_data), "teams": teams_data} except Exception as e: temp_service = cls() temp_service.handle_error("Error fetching teams", e) finally: temp_service = cls() temp_service.close_db() @classmethod def get_team(cls, team_id: int) -> Optional[Dict[str, Any]]: """Get a single team by ID.""" try: repo = cls._get_team_repo() team = repo.get_by_id(team_id) if team: return cls._team_to_dict(team, short_output=False) return None except Exception as e: temp_service = cls() temp_service.handle_error(f"Error fetching team {team_id}", e) finally: temp_service = cls() temp_service.close_db() @classmethod def get_team_roster( cls, team_id: int, which: Literal["current", "next"], sort: Optional[str] = None ) -> Dict[str, Any]: """ Get team roster with IL lists. Args: team_id: Team ID which: 'current' or 'next' week roster sort: Optional sort key Returns: Roster dict with active, short-il, long-il lists """ try: # This method requires real DB access for roster methods from ..db_engine import Team # Lazy import - roster methods need DB team = Team.get_by_id(team_id) if which == "current": full_roster = team.get_this_week() else: full_roster = team.get_next_week() # Deep copy and convert to dicts result = { "active": {"players": []}, "shortil": {"players": []}, "longil": {"players": []}, } for section in ["active", "shortil", "longil"]: players = copy.deepcopy(full_roster[section]["players"]) result[section]["players"] = [model_to_dict(p) for p in players] # Apply sorting if sort == "wara-desc": for section in ["active", "shortil", "longil"]: result[section]["players"].sort( key=lambda p: p.get("wara", 0), reverse=True ) return result except Exception as e: temp_service = cls() temp_service.handle_error(f"Error fetching roster for team {team_id}", e) finally: temp_service = cls() temp_service.close_db() @classmethod def update_team( cls, team_id: int, data: Dict[str, Any], token: str ) -> Dict[str, Any]: """Update a team (partial update).""" temp_service = cls() temp_service.require_auth(token) try: repo = cls._get_team_repo() team = repo.get_by_id(team_id) if not team: raise HTTPException( status_code=404, detail=f"Team ID {team_id} not found" ) # Apply updates using repo repo.update(data, team_id=team_id) return cls.get_team(team_id) except Exception as e: temp_service.handle_error(f"Error updating team {team_id}", e) finally: temp_service.invalidate_related_cache(cls.cache_patterns) temp_service.close_db() @classmethod def create_teams( cls, teams_data: List[Dict[str, Any]], token: str ) -> Dict[str, str]: """Create multiple teams.""" temp_service = cls() temp_service.require_auth(token) try: # Check for duplicates using repo repo = cls._get_team_repo() for team in teams_data: dupe = repo.get_or_none( season=team.get("season"), abbrev=team.get("abbrev") ) if dupe: raise HTTPException( status_code=500, detail=f"Team {team.get('abbrev')} already exists in Season {team.get('season')}", ) # Insert teams repo.insert_many(teams_data) return {"message": f"Inserted {len(teams_data)} teams"} except Exception as e: temp_service.handle_error("Error creating teams", e) finally: temp_service.invalidate_related_cache(cls.cache_patterns) temp_service.close_db() @classmethod def delete_team(cls, team_id: int, token: str) -> Dict[str, str]: """Delete a team.""" temp_service = cls() temp_service.require_auth(token) try: repo = cls._get_team_repo() if not repo.get_by_id(team_id): raise HTTPException( status_code=404, detail=f"Team ID {team_id} not found" ) repo.delete_by_id(team_id) return {"message": f"Team {team_id} deleted"} except Exception as e: temp_service.handle_error(f"Error deleting team {team_id}", e) finally: temp_service.invalidate_related_cache(cls.cache_patterns) temp_service.close_db() # Helper methods for filtering and conversion @classmethod def _team_has_manager(cls, team, manager_ids: List[int]) -> bool: """Check if team has any of the specified managers.""" team_dict = ( team if isinstance(team, dict) else cls._team_to_dict(team, short_output=True) ) manager1 = team_dict.get("manager1_id") manager2 = team_dict.get("manager2_id") return manager1 in manager_ids or manager2 in manager_ids @classmethod def _team_has_owner(cls, team, owner_ids: List[int]) -> bool: """Check if team has any of the specified owners.""" team_dict = ( team if isinstance(team, dict) else cls._team_to_dict(team, short_output=True) ) gmid = team_dict.get("gmid") gmid2 = team_dict.get("gmid2") return gmid in owner_ids or gmid2 in owner_ids @classmethod def _get_team_field(cls, team, field: str, default: Any = None) -> Any: """Get field value from team (dict or model).""" if isinstance(team, dict): return team.get(field, default) return getattr(team, field, default) @classmethod def _team_to_dict(cls, team, short_output: bool = False) -> Dict[str, Any]: """Convert team to dict.""" # If already a dict, return as-is if isinstance(team, dict): return team # Try to convert Peewee model try: return model_to_dict(team, recurse=not short_output) except ImportError: # Fall back to basic dict conversion return dict(team) @classmethod def _format_team_csv(cls, teams: List[Dict]) -> str: """Format team list as CSV.""" from ..db_engine import query_to_csv, Team # Lazy import - CSV needs DB # Get team IDs from the list team_ids = [t.get("id") for t in teams if t.get("id")] if not team_ids: return "" # Query for CSV formatting query = Team.select().where(Team.id << team_ids) return query_to_csv( query, exclude=[Team.division_legacy, Team.mascot, Team.gsheet] ) class RealTeamRepository: """Real database repository implementation for teams.""" def __init__(self, model_class): self._model = model_class def select_season(self, season: int): """Return query for season.""" if season == 0: return self._model.select() return self._model.select().where(self._model.season == season) def get_by_id(self, team_id: int): """Get team by ID.""" return self._model.get_or_none(self._model.id == team_id) def get_or_none(self, **conditions): """Get team matching conditions.""" try: return self._model.get_or_none(**conditions) except Exception: return None def update(self, data: Dict, team_id: int) -> int: """Update team.""" from ..db_engine import Team # Lazy import - only used in production return Team.update(**data).where(Team.id == team_id).execute() def insert_many(self, data: List[Dict]) -> int: """Insert multiple teams.""" from ..db_engine import Team, db # Lazy import - only used in production with db.atomic(): Team.insert_many(data).on_conflict_ignore().execute() return len(data) def delete_by_id(self, team_id: int) -> int: """Delete team by ID.""" from ..db_engine import Team # Lazy import - only used in production return Team.delete().where(Team.id == team_id).execute()