From 2189aea8da1c47a86b336d9be7e3d3f1f56d967c Mon Sep 17 00:00:00 2001 From: Cal Corum Date: Wed, 4 Feb 2026 08:44:12 -0600 Subject: [PATCH] Fix linting and formatting issues - Add missing imports: json, csv, io, model_to_dict - Remove unused imports: Any, Dict, Type, invalidate_cache - Remove redundant f-string prefixes from static error messages - Format code with black - All ruff and black checks pass - All 76 unit tests pass (9 skipped) Co-Authored-By: Claude Sonnet 4.5 --- app/services/base.py | 140 ++++++++++-------- app/services/player_service.py | 262 ++++++++++++++++++--------------- app/services/team_service.py | 139 +++++++++-------- 3 files changed, 299 insertions(+), 242 deletions(-) diff --git a/app/services/base.py b/app/services/base.py index 24406a7..66af391 100644 --- a/app/services/base.py +++ b/app/services/base.py @@ -3,20 +3,25 @@ Base Service Class - Dependency Injection Version Provides common functionality with configurable dependencies. """ +import json import logging -from typing import Optional, Any, Dict, TypeVar, Type +from typing import Optional, TypeVar -from .interfaces import AbstractPlayerRepository, AbstractTeamRepository, AbstractCacheService +from .interfaces import ( + AbstractPlayerRepository, + AbstractTeamRepository, + AbstractCacheService, +) from .mocks import MockCacheService -logger = logging.getLogger('discord_app') +logger = logging.getLogger("discord_app") -T = TypeVar('T') +T = TypeVar("T") class ServiceConfig: """Configuration for service dependencies.""" - + def __init__( self, player_repo: Optional[AbstractPlayerRepository] = None, @@ -34,10 +39,10 @@ _default_config = ServiceConfig() class BaseService: """Base class for all services with dependency injection support.""" - + # Subclasses should override these cache_patterns = [] - + def __init__( self, config: Optional[ServiceConfig] = None, @@ -47,7 +52,7 @@ class BaseService: ): """ Initialize service with dependencies. - + Args: config: Optional ServiceConfig containing all dependencies player_repo: Override for player repository @@ -63,163 +68,170 @@ class BaseService: self._player_repo = player_repo self._team_repo = team_repo self._cache = cache - + # Lazy imports for defaults (avoids circular imports) self._using_defaults = ( - self._player_repo is None and - self._team_repo is None and - self._cache is None + self._player_repo is None + and self._team_repo is None + and self._cache is None ) - + @property def player_repo(self) -> AbstractPlayerRepository: """Get player repository, importing from db_engine if not set.""" if self._player_repo is None: from ..db_engine import Player + class DefaultPlayerRepo: def select_season(self, season): return Player.select_season(season) - + def get_by_id(self, player_id): return Player.get_or_none(Player.id == player_id) - + def get_or_none(self, *conditions): return Player.get_or_none(*conditions) - + def update(self, data, *conditions): return Player.update(data).where(*conditions).execute() - + def insert_many(self, data): return Player.insert_many(data).execute() - + def delete_by_id(self, player_id): player = Player.get_by_id(player_id) if player: return player.delete_instance() return 0 - + self._player_repo = DefaultPlayerRepo() return self._player_repo - + @property def team_repo(self) -> AbstractTeamRepository: """Get team repository, importing from db_engine if not set.""" if self._team_repo is None: from ..db_engine import Team - + class DefaultTeamRepo: def select_season(self, season): return Team.select_season(season) - + def get_by_id(self, team_id): return Team.get_by_id(team_id) - + def get_or_none(self, *conditions): return Team.get_or_none(*conditions) - + def update(self, data, *conditions): return Team.update(data).where(*conditions).execute() - + def insert_many(self, data): return Team.insert_many(data).execute() - + def delete_by_id(self, team_id): team = Team.get_by_id(team_id) if team: return team.delete_instance() return 0 - + self._team_repo = DefaultTeamRepo() return self._team_repo - + @property def cache(self) -> AbstractCacheService: """Get cache service, importing from dependencies if not set.""" if self._cache is None: try: - from ..dependencies import redis_client, invalidate_cache - + from ..dependencies import redis_client + class DefaultCache: def get(self, key: str): if redis_client is None: return None return redis_client.get(key) - + def set(self, key: str, value: str, ttl: int = 300): if redis_client is None: return False redis_client.setex(key, ttl, value) return True - + def setex(self, key: str, ttl: int, value: str): return self.set(key, value, ttl) - + def keys(self, pattern: str): if redis_client is None: return [] return redis_client.keys(pattern) - + def delete(self, *keys: str): if redis_client is None: return 0 return redis_client.delete(*keys) - + def invalidate_pattern(self, pattern: str): if redis_client is None: return 0 keys = self.keys(pattern) return self.delete(*keys) - + def exists(self, key: str): if redis_client is None: return False return redis_client.exists(key) - + self._cache = DefaultCache() except ImportError: # Fall back to mock if dependencies not available self._cache = MockCacheService() - + return self._cache - + def close_db(self): """Safely close database connection (for non-injected repos).""" if self._using_defaults: try: from ..db_engine import db + db.close() except Exception: pass # Connection may already be closed - + def invalidate_cache_for(self, entity_type: str, entity_id: Optional[int] = None): """Invalidate cache entries for an entity.""" if entity_id: self.cache.invalidate_pattern(f"{entity_type}*{entity_id}*") else: self.cache.invalidate_pattern(f"{entity_type}*") - + def invalidate_related_cache(self, patterns: list): """Invalidate multiple cache patterns.""" for pattern in patterns: self.cache.invalidate_pattern(pattern) - - def handle_error(self, operation: str, error: Exception, rethrow: bool = True) -> dict: + + def handle_error( + self, operation: str, error: Exception, rethrow: bool = True + ) -> dict: """Handle errors consistently.""" logger.error(f"{operation}: {error}") if rethrow: try: from fastapi import HTTPException - raise HTTPException(status_code=500, detail=f"{operation}: {str(error)}") + + raise HTTPException( + status_code=500, detail=f"{operation}: {str(error)}" + ) except ImportError: # For testing without FastAPI raise RuntimeError(f"{operation}: {str(error)}") return {"error": operation, "detail": str(error)} - + @classmethod def log_error(cls, operation: str, error: Exception) -> None: """Class method for logging errors without needing an instance.""" logger.error(f"{operation}: {error}") - + def require_auth(self, token: str) -> bool: """Validate authentication token.""" try: @@ -227,56 +239,60 @@ class BaseService: from ..dependencies import valid_token if not valid_token(token): - logger.warning(f"Unauthorized access attempt with token: {token[:10]}...") + logger.warning( + f"Unauthorized access attempt with token: {token[:10]}..." + ) raise HTTPException(status_code=401, detail="Unauthorized") except ImportError: # For testing without FastAPI - accept "valid_token" as test token if token != "valid_token": - logger.warning(f"Unauthorized access attempt with token: {token[:10] if len(token) >= 10 else token}...") + logger.warning( + f"Unauthorized access attempt with token: {token[:10] if len(token) >= 10 else token}..." + ) error = RuntimeError("Unauthorized") error.status_code = 401 # Add status_code for test compatibility raise error return True - + def format_csv_response(self, headers: list, rows: list) -> str: """Format data as CSV.""" from pandas import DataFrame + all_data = [headers] + rows return DataFrame(all_data).to_csv(header=False, index=False) - + def parse_query_params(self, params: dict, remove_none: bool = True) -> dict: """Parse and clean query parameters.""" if remove_none: - return {k: v for k, v in params.items() if v is not None and v != [] and v != ""} + return { + k: v for k, v in params.items() if v is not None and v != [] and v != "" + } return params - - def with_cache( - self, - key: str, - ttl: int = 300, - fallback: Optional[callable] = None - ): + + def with_cache(self, key: str, ttl: int = 300, fallback: Optional[callable] = None): """ Decorator-style cache wrapper for methods. - + Usage: @service.with_cache("player:123", ttl=600) def get_player(self): ... """ + def decorator(func): async def wrapper(*args, **kwargs): # Try cache first cached = self.cache.get(key) if cached: return json.loads(cached) - + # Execute and cache result result = func(*args, **kwargs) if result is not None: - import json self.cache.set(key, json.dumps(result, default=str), ttl) - + return result + return wrapper + return decorator diff --git a/app/services/player_service.py b/app/services/player_service.py index 34eeeaf..0101d3a 100644 --- a/app/services/player_service.py +++ b/app/services/player_service.py @@ -3,6 +3,8 @@ Player Service - Dependency Injection Version Business logic for player operations with injectable dependencies. """ +import csv +import io import logging from typing import List, Optional, Dict, Any, TYPE_CHECKING @@ -27,18 +29,14 @@ except ImportError: self.detail = detail super().__init__(detail) -logger = logging.getLogger('discord_app') + +logger = logging.getLogger("discord_app") class PlayerService(BaseService): """Service for player-related operations with dependency injection.""" - cache_patterns = [ - "players*", - "players-search*", - "player*", - "team-roster*" - ] + cache_patterns = ["players*", "players-search*", "player*", "team-roster*"] # Class-level repository for dependency injection _injected_repo: Optional[AbstractPlayerRepository] = None @@ -46,8 +44,8 @@ class PlayerService(BaseService): def __init__( self, player_repo: Optional[AbstractPlayerRepository] = None, - config: Optional['ServiceConfig'] = None, - **kwargs + config: Optional["ServiceConfig"] = None, + **kwargs, ): """ Initialize PlayerService with optional repository. @@ -75,10 +73,10 @@ class PlayerService(BaseService): return cls._get_real_repo() @classmethod - def _get_real_repo(cls) -> 'RealPlayerRepository': + def _get_real_repo(cls) -> "RealPlayerRepository": """Get a real DB repository for production use.""" return RealPlayerRepository(Player) - + @classmethod def get_players( cls, @@ -90,7 +88,7 @@ class PlayerService(BaseService): is_injured: Optional[bool] = None, sort: Optional[str] = None, short_output: bool = False, - as_csv: bool = False + as_csv: bool = False, ) -> Dict[str, Any]: """ Get players with filtering and sorting. @@ -124,7 +122,7 @@ class PlayerService(BaseService): pos=pos, strat_code=strat_code, name=name, - is_injured=is_injured + is_injured=is_injured, ) # Apply sorting @@ -137,18 +135,17 @@ class PlayerService(BaseService): if as_csv: return cls._format_player_csv(players_data) else: - return { - "count": len(players_data), - "players": players_data - } + return {"count": len(players_data), "players": players_data} except Exception as e: logger.error(f"Error fetching players: {e}") try: - raise HTTPException(status_code=500, detail=f"Error fetching players: {str(e)}") + raise HTTPException( + status_code=500, detail=f"Error fetching players: {str(e)}" + ) except ImportError: raise RuntimeError(f"Error fetching players: {str(e)}") - + @classmethod def _apply_player_filters( cls, @@ -157,10 +154,10 @@ class PlayerService(BaseService): pos: Optional[List[str]] = None, strat_code: Optional[List[str]] = None, name: Optional[str] = None, - is_injured: Optional[bool] = None + is_injured: Optional[bool] = None, ) -> QueryResult: """Apply player filters in a repo-agnostic way.""" - + # Check if repo supports where() method (real DB) # Only use DB-native filtering if: # 1. Query has where() method @@ -169,34 +166,34 @@ class PlayerService(BaseService): for item in query: first_item = item break - + # Use DB-native filtering only for real Peewee models if first_item is not None and not isinstance(first_item, dict): try: if team_id: query = query.where(Player.team_id << team_id) - + if strat_code: code_list = [x.lower() for x in strat_code] query = query.where(peewee_fn.Lower(Player.strat_code) << code_list) - + if name: query = query.where(peewee_fn.lower(Player.name) == name.lower()) - + if pos: p_list = [x.upper() for x in pos] pos_conditions = ( - (Player.pos_1 << p_list) | - (Player.pos_2 << p_list) | - (Player.pos_3 << p_list) | - (Player.pos_4 << p_list) | - (Player.pos_5 << p_list) | - (Player.pos_6 << p_list) | - (Player.pos_7 << p_list) | - (Player.pos_8 << p_list) + (Player.pos_1 << p_list) + | (Player.pos_2 << p_list) + | (Player.pos_3 << p_list) + | (Player.pos_4 << p_list) + | (Player.pos_5 << p_list) + | (Player.pos_6 << p_list) + | (Player.pos_7 << p_list) + | (Player.pos_8 << p_list) ) query = query.where(pos_conditions) - + if is_injured is not None: if is_injured: query = query.where(Player.il_return.is_null(False)) @@ -208,55 +205,54 @@ class PlayerService(BaseService): else: # Use Python filtering for mocks def matches(player): - if team_id and player.get('team_id') not in team_id: + if team_id and player.get("team_id") not in team_id: return False if strat_code: code_list = [s.lower() for s in strat_code] - player_code = (player.get('strat_code') or '').lower() + player_code = (player.get("strat_code") or "").lower() if player_code not in code_list: return False - if name and (player.get('name') or '').lower() != name.lower(): + if name and (player.get("name") or "").lower() != name.lower(): return False if pos: p_list = [p.upper() for p in pos] player_pos = [ - player.get(f'pos_{i}') for i in range(1, 9) - if player.get(f'pos_{i}') + player.get(f"pos_{i}") + for i in range(1, 9) + if player.get(f"pos_{i}") ] if not any(p in p_list for p in player_pos): return False if is_injured is not None: - has_injury = player.get('il_return') is not None + has_injury = player.get("il_return") is not None if is_injured and not has_injury: return False if not is_injured and has_injury: return False return True - + # Filter in memory filtered = [p for p in query if matches(p)] query = InMemoryQueryResult(filtered) - + return query - + @classmethod def _apply_player_sort( - cls, - query: QueryResult, - sort: Optional[str] = None + cls, query: QueryResult, sort: Optional[str] = None ) -> QueryResult: """Apply player sorting in a repo-agnostic way.""" - + # Check if items are Peewee models (not dicts) first_item = None for item in query: first_item = item break - + # Use DB-native sorting only for real Peewee models if first_item is not None and not isinstance(first_item, dict): try: - + if sort == "cost-asc": query = query.order_by(Player.wara) elif sort == "cost-desc": @@ -270,13 +266,14 @@ class PlayerService(BaseService): except ImportError: # Fall back to Python sorting if DB not available pass - + # Use Python sorting for mocks or if DB sort failed - if not hasattr(query, 'order_by') or isinstance(query, InMemoryQueryResult): + if not hasattr(query, "order_by") or isinstance(query, InMemoryQueryResult): + def get_sort_key(player): - name = player.get('name', '') - wara = player.get('wara', 0) - player_id = player.get('id', 0) + name = player.get("name", "") + wara = player.get("wara", 0) + player_id = player.get("id", 0) if sort == "cost-asc": return (wara, name, player_id) @@ -290,29 +287,27 @@ class PlayerService(BaseService): return (player_id,) # Use reverse for descending name sort - reverse_sort = (sort == "name-desc") + reverse_sort = sort == "name-desc" sorted_list = sorted(list(query), key=get_sort_key, reverse=reverse_sort) query = InMemoryQueryResult(sorted_list) - + return query - + @classmethod def _query_to_player_dicts( - cls, - query: QueryResult, - short_output: bool = False + cls, query: QueryResult, short_output: bool = False ) -> List[Dict[str, Any]]: """Convert query results to list of player dicts.""" - + # Check if we have DB models or dicts first_item = None for item in query: first_item = item break - + if first_item is None: return [] - + # If items are already dicts (from mock) if isinstance(first_item, dict): players_data = list(query) @@ -320,33 +315,33 @@ class PlayerService(BaseService): return players_data # Add computed fields if needed return players_data - + # If items are DB models (from real repo) - + players_data = [] for player in query: player_dict = model_to_dict(player, recurse=not short_output) players_data.append(player_dict) - + return players_data - + @classmethod def search_players( cls, query_str: str, season: Optional[int] = None, limit: int = 10, - short_output: bool = False + short_output: bool = False, ) -> Dict[str, Any]: """ Search players by name with fuzzy matching. - + Args: query_str: Search query season: Season to search (None/0 for all) limit: Maximum results short_output: Exclude related data - + Returns: Dict with count and matching players """ @@ -363,46 +358,49 @@ class PlayerService(BaseService): # Convert to dicts if needed all_player_dicts = cls._query_to_player_dicts( - InMemoryQueryResult(all_players), - short_output=True + InMemoryQueryResult(all_players), short_output=True ) - + # Sort by relevance (exact matches first) exact_matches = [] partial_matches = [] - + for player in all_player_dicts: - name_lower = player.get('name', '').lower() - + name_lower = player.get("name", "").lower() + if name_lower == query_lower: exact_matches.append(player) elif query_lower in name_lower: partial_matches.append(player) - + # Sort by season within each group (newest first) if search_all_seasons: - exact_matches.sort(key=lambda p: p.get('season', 0), reverse=True) - partial_matches.sort(key=lambda p: p.get('season', 0), reverse=True) - + exact_matches.sort(key=lambda p: p.get("season", 0), reverse=True) + partial_matches.sort(key=lambda p: p.get("season", 0), reverse=True) + # Combine and limit results = (exact_matches + partial_matches)[:limit] - + return { "count": len(results), "total_matches": len(exact_matches + partial_matches), "all_seasons": search_all_seasons, - "players": results + "players": results, } except Exception as e: cls.log_error("Error searching players", e) try: - raise HTTPException(status_code=500, detail=f"Error searching players: {str(e)}") + raise HTTPException( + status_code=500, detail=f"Error searching players: {str(e)}" + ) except ImportError: raise RuntimeError(f"Error searching players: {str(e)}") - + @classmethod - def get_player(cls, player_id: int, short_output: bool = False) -> Optional[Dict[str, Any]]: + def get_player( + cls, player_id: int, short_output: bool = False + ) -> Optional[Dict[str, Any]]: """Get a single player by ID.""" try: repo = cls._get_player_repo() @@ -413,26 +411,31 @@ class PlayerService(BaseService): except Exception as e: cls.log_error(f"Error fetching player {player_id}", e) try: - raise HTTPException(status_code=500, detail=f"Error fetching player {player_id}: {str(e)}") + raise HTTPException( + status_code=500, + detail=f"Error fetching player {player_id}: {str(e)}", + ) except ImportError: raise RuntimeError(f"Error fetching player {player_id}: {str(e)}") - + @classmethod def _player_to_dict(cls, player, recurse: bool = True) -> Dict[str, Any]: """Convert player to dict.""" # If already a dict, return as-is if isinstance(player, dict): return player - + # Try to convert Peewee model try: return model_to_dict(player, recurse=recurse) except ImportError: # Fall back to basic dict conversion return dict(player) - + @classmethod - def update_player(cls, player_id: int, data: Dict[str, Any], token: str) -> Dict[str, Any]: + def update_player( + cls, player_id: int, data: Dict[str, Any], token: str + ) -> Dict[str, Any]: """Update a player (full update via PUT).""" temp_service = cls() temp_service.require_auth(token) @@ -441,7 +444,9 @@ class PlayerService(BaseService): # Verify player exists repo = cls._get_player_repo() if not repo.get_by_id(player_id): - raise HTTPException(status_code=404, detail=f"Player ID {player_id} not found") + raise HTTPException( + status_code=404, detail=f"Player ID {player_id} not found" + ) # Execute update repo.update(data, player_id=player_id) @@ -450,10 +455,14 @@ class PlayerService(BaseService): except Exception as e: cls.log_error(f"Error updating player {player_id}", e) - raise HTTPException(status_code=500, detail=f"Error updating player {player_id}: {str(e)}") - + raise HTTPException( + status_code=500, detail=f"Error updating player {player_id}: {str(e)}" + ) + @classmethod - def patch_player(cls, player_id: int, data: Dict[str, Any], token: str) -> Dict[str, Any]: + def patch_player( + cls, player_id: int, data: Dict[str, Any], token: str + ) -> Dict[str, Any]: """Patch a player (partial update).""" temp_service = cls() temp_service.require_auth(token) @@ -462,7 +471,9 @@ class PlayerService(BaseService): repo = cls._get_player_repo() player = repo.get_by_id(player_id) if not player: - raise HTTPException(status_code=404, detail=f"Player ID {player_id} not found") + raise HTTPException( + status_code=404, detail=f"Player ID {player_id} not found" + ) # Apply updates using repo repo.update(data, player_id=player_id) @@ -471,10 +482,14 @@ class PlayerService(BaseService): except Exception as e: cls.log_error(f"Error patching player {player_id}", e) - raise HTTPException(status_code=500, detail=f"Error patching player {player_id}: {str(e)}") - + raise HTTPException( + status_code=500, detail=f"Error patching player {player_id}: {str(e)}" + ) + @classmethod - def create_players(cls, players_data: List[Dict[str, Any]], token: str) -> Dict[str, Any]: + def create_players( + cls, players_data: List[Dict[str, Any]], token: str + ) -> Dict[str, Any]: """Create multiple players.""" temp_service = cls() temp_service.require_auth(token) @@ -484,13 +499,12 @@ class PlayerService(BaseService): repo = cls._get_player_repo() for player in players_data: dupe = repo.get_or_none( - season=player.get("season"), - name=player.get("name") + season=player.get("season"), name=player.get("name") ) if dupe: raise HTTPException( status_code=500, - detail=f"Player {player.get('name')} already exists in Season {player.get('season')}" + detail=f"Player {player.get('name')} already exists in Season {player.get('season')}", ) # Insert in batches @@ -499,9 +513,11 @@ class PlayerService(BaseService): return {"message": f"Inserted {len(players_data)} players"} except Exception as e: - cls.log_error(f"Error creating players", e) - raise HTTPException(status_code=500, detail=f"Error creating players: {str(e)}") - + cls.log_error("Error creating players", e) + raise HTTPException( + status_code=500, detail=f"Error creating players: {str(e)}" + ) + @classmethod def delete_player(cls, player_id: int, token: str) -> Dict[str, str]: """Delete a player.""" @@ -511,7 +527,9 @@ class PlayerService(BaseService): try: repo = cls._get_player_repo() if not repo.get_by_id(player_id): - raise HTTPException(status_code=404, detail=f"Player ID {player_id} not found") + raise HTTPException( + status_code=404, detail=f"Player ID {player_id} not found" + ) repo.delete_by_id(player_id) @@ -519,8 +537,10 @@ class PlayerService(BaseService): except Exception as e: cls.log_error(f"Error deleting player {player_id}", e) - raise HTTPException(status_code=500, detail=f"Error deleting player {player_id}: {str(e)}") - + raise HTTPException( + status_code=500, detail=f"Error deleting player {player_id}: {str(e)}" + ) + @classmethod def _format_player_csv(cls, players: List[Dict]) -> str: """Format player list as CSV - works with both real DB and mocks.""" @@ -543,52 +563,52 @@ class InMemoryQueryResult: In-memory query result for mock repositories. Supports filtering, sorting, and iteration. """ - + def __init__(self, items: List[Dict[str, Any]]): self._items = list(items) - - def where(self, *conditions) -> 'InMemoryQueryResult': + + def where(self, *conditions) -> "InMemoryQueryResult": """Apply filter conditions (no-op for compatibility).""" return self - - def order_by(self, *fields) -> 'InMemoryQueryResult': + + def order_by(self, *fields) -> "InMemoryQueryResult": """Apply sort (no-op, sorting done by service).""" return self - + def count(self) -> int: return len(self._items) - + def __iter__(self): return iter(self._items) - + def __len__(self): return len(self._items) - + def __getitem__(self, index): return self._items[index] class RealPlayerRepository: """Real database repository implementation.""" - + def __init__(self, model_class): self._model = model_class - + def select_season(self, season: int): """Return query for season.""" return self._model.select().where(self._model.season == season) - + def get_by_id(self, player_id: int): """Get player by ID.""" return self._model.get_or_none(self._model.id == player_id) - + def get_or_none(self, **conditions): """Get player matching conditions.""" try: return self._model.get_or_none(**conditions) except Exception: return None - + def update(self, data: Dict, player_id: int) -> int: """Update player.""" return Player.update(**data).where(Player.id == player_id).execute() diff --git a/app/services/team_service.py b/app/services/team_service.py index 2b8939f..728c549 100644 --- a/app/services/team_service.py +++ b/app/services/team_service.py @@ -10,6 +10,7 @@ import copy import logging from typing import List, Optional, Dict, Any, Literal, TYPE_CHECKING +from playhouse.shortcuts import model_to_dict from .base import BaseService from .interfaces import AbstractTeamRepository @@ -28,17 +29,14 @@ except ImportError: self.detail = detail super().__init__(detail) -logger = logging.getLogger('discord_app') + +logger = logging.getLogger("discord_app") class TeamService(BaseService): """Service for team-related operations.""" - cache_patterns = [ - "teams*", - "team*", - "team-roster*" - ] + cache_patterns = ["teams*", "team*", "team-roster*"] # Class-level repository for dependency injection _injected_repo: Optional[AbstractTeamRepository] = None @@ -46,8 +44,8 @@ class TeamService(BaseService): def __init__( self, team_repo: Optional[AbstractTeamRepository] = None, - config: Optional['ServiceConfig'] = None, - **kwargs + config: Optional["ServiceConfig"] = None, + **kwargs, ): """ Initialize TeamService with optional repository. @@ -75,11 +73,12 @@ class TeamService(BaseService): return cls._get_real_repo() @classmethod - def _get_real_repo(cls) -> 'RealTeamRepository': + def _get_real_repo(cls) -> "RealTeamRepository": """Get a real DB repository for production use.""" from ..db_engine import Team # Lazy import to avoid loading DB in tests + return RealTeamRepository(Team) - + @classmethod def get_teams( cls, @@ -89,7 +88,7 @@ class TeamService(BaseService): team_abbrev: Optional[List[str]] = None, active_only: bool = False, short_output: bool = False, - as_csv: bool = False + as_csv: bool = False, ) -> Dict[str, Any]: """ Get teams with filtering. @@ -118,21 +117,27 @@ class TeamService(BaseService): # Apply filters if manager_id: - teams_list = [t for t in teams_list - if cls._team_has_manager(t, manager_id)] + teams_list = [ + t for t in teams_list if cls._team_has_manager(t, manager_id) + ] if owner_id: - teams_list = [t for t in teams_list - if cls._team_has_owner(t, owner_id)] + teams_list = [t for t in teams_list if cls._team_has_owner(t, owner_id)] if team_abbrev: abbrev_list = [x.lower() for x in team_abbrev] - teams_list = [t for t in teams_list - if cls._get_team_field(t, 'abbrev', '').lower() in abbrev_list] + teams_list = [ + t + for t in teams_list + if cls._get_team_field(t, "abbrev", "").lower() in abbrev_list + ] if active_only: - teams_list = [t for t in teams_list - if not cls._get_team_field(t, 'abbrev', '').endswith(('IL', 'MiL'))] + teams_list = [ + t + for t in teams_list + if not cls._get_team_field(t, "abbrev", "").endswith(("IL", "MiL")) + ] # Convert to dicts teams_data = [cls._team_to_dict(t, short_output) for t in teams_list] @@ -140,18 +145,15 @@ class TeamService(BaseService): if as_csv: return cls._format_team_csv(teams_data) - return { - "count": len(teams_data), - "teams": teams_data - } + return {"count": len(teams_data), "teams": teams_data} except Exception as e: temp_service = cls() - temp_service.handle_error(f"Error fetching teams", e) + temp_service.handle_error("Error fetching teams", e) finally: temp_service = cls() temp_service.close_db() - + @classmethod def get_team(cls, team_id: int) -> Optional[Dict[str, Any]]: """Get a single team by ID.""" @@ -167,13 +169,10 @@ class TeamService(BaseService): finally: temp_service = cls() temp_service.close_db() - + @classmethod def get_team_roster( - cls, - team_id: int, - which: Literal['current', 'next'], - sort: Optional[str] = None + cls, team_id: int, which: Literal["current", "next"], sort: Optional[str] = None ) -> Dict[str, Any]: """ Get team roster with IL lists. @@ -192,26 +191,28 @@ class TeamService(BaseService): team = Team.get_by_id(team_id) - if which == 'current': + if which == "current": full_roster = team.get_this_week() else: full_roster = team.get_next_week() # Deep copy and convert to dicts result = { - 'active': {'players': []}, - 'shortil': {'players': []}, - 'longil': {'players': []} + "active": {"players": []}, + "shortil": {"players": []}, + "longil": {"players": []}, } - for section in ['active', 'shortil', 'longil']: - players = copy.deepcopy(full_roster[section]['players']) - result[section]['players'] = [model_to_dict(p) for p in players] + for section in ["active", "shortil", "longil"]: + players = copy.deepcopy(full_roster[section]["players"]) + result[section]["players"] = [model_to_dict(p) for p in players] # Apply sorting - if sort == 'wara-desc': - for section in ['active', 'shortil', 'longil']: - result[section]['players'].sort(key=lambda p: p.get("wara", 0), reverse=True) + if sort == "wara-desc": + for section in ["active", "shortil", "longil"]: + result[section]["players"].sort( + key=lambda p: p.get("wara", 0), reverse=True + ) return result @@ -221,9 +222,11 @@ class TeamService(BaseService): finally: temp_service = cls() temp_service.close_db() - + @classmethod - def update_team(cls, team_id: int, data: Dict[str, Any], token: str) -> Dict[str, Any]: + def update_team( + cls, team_id: int, data: Dict[str, Any], token: str + ) -> Dict[str, Any]: """Update a team (partial update).""" temp_service = cls() temp_service.require_auth(token) @@ -232,7 +235,9 @@ class TeamService(BaseService): repo = cls._get_team_repo() team = repo.get_by_id(team_id) if not team: - raise HTTPException(status_code=404, detail=f"Team ID {team_id} not found") + raise HTTPException( + status_code=404, detail=f"Team ID {team_id} not found" + ) # Apply updates using repo repo.update(data, team_id=team_id) @@ -244,9 +249,11 @@ class TeamService(BaseService): finally: temp_service.invalidate_related_cache(cls.cache_patterns) temp_service.close_db() - + @classmethod - def create_teams(cls, teams_data: List[Dict[str, Any]], token: str) -> Dict[str, str]: + def create_teams( + cls, teams_data: List[Dict[str, Any]], token: str + ) -> Dict[str, str]: """Create multiple teams.""" temp_service = cls() temp_service.require_auth(token) @@ -256,13 +263,12 @@ class TeamService(BaseService): repo = cls._get_team_repo() for team in teams_data: dupe = repo.get_or_none( - season=team.get("season"), - abbrev=team.get("abbrev") + season=team.get("season"), abbrev=team.get("abbrev") ) if dupe: raise HTTPException( status_code=500, - detail=f"Team {team.get('abbrev')} already exists in Season {team.get('season')}" + detail=f"Team {team.get('abbrev')} already exists in Season {team.get('season')}", ) # Insert teams @@ -271,11 +277,11 @@ class TeamService(BaseService): return {"message": f"Inserted {len(teams_data)} teams"} except Exception as e: - temp_service.handle_error(f"Error creating teams", e) + temp_service.handle_error("Error creating teams", e) finally: temp_service.invalidate_related_cache(cls.cache_patterns) temp_service.close_db() - + @classmethod def delete_team(cls, team_id: int, token: str) -> Dict[str, str]: """Delete a team.""" @@ -285,7 +291,9 @@ class TeamService(BaseService): try: repo = cls._get_team_repo() if not repo.get_by_id(team_id): - raise HTTPException(status_code=404, detail=f"Team ID {team_id} not found") + raise HTTPException( + status_code=404, detail=f"Team ID {team_id} not found" + ) repo.delete_by_id(team_id) @@ -301,17 +309,25 @@ class TeamService(BaseService): @classmethod def _team_has_manager(cls, team, manager_ids: List[int]) -> bool: """Check if team has any of the specified managers.""" - team_dict = team if isinstance(team, dict) else cls._team_to_dict(team, short_output=True) - manager1 = team_dict.get('manager1_id') - manager2 = team_dict.get('manager2_id') + team_dict = ( + team + if isinstance(team, dict) + else cls._team_to_dict(team, short_output=True) + ) + manager1 = team_dict.get("manager1_id") + manager2 = team_dict.get("manager2_id") return manager1 in manager_ids or manager2 in manager_ids @classmethod def _team_has_owner(cls, team, owner_ids: List[int]) -> bool: """Check if team has any of the specified owners.""" - team_dict = team if isinstance(team, dict) else cls._team_to_dict(team, short_output=True) - gmid = team_dict.get('gmid') - gmid2 = team_dict.get('gmid2') + team_dict = ( + team + if isinstance(team, dict) + else cls._team_to_dict(team, short_output=True) + ) + gmid = team_dict.get("gmid") + gmid2 = team_dict.get("gmid2") return gmid in owner_ids or gmid2 in owner_ids @classmethod @@ -341,14 +357,16 @@ class TeamService(BaseService): from ..db_engine import query_to_csv, Team # Lazy import - CSV needs DB # Get team IDs from the list - team_ids = [t.get('id') for t in teams if t.get('id')] + team_ids = [t.get("id") for t in teams if t.get("id")] if not team_ids: return "" # Query for CSV formatting query = Team.select().where(Team.id << team_ids) - return query_to_csv(query, exclude=[Team.division_legacy, Team.mascot, Team.gsheet]) + return query_to_csv( + query, exclude=[Team.division_legacy, Team.mascot, Team.gsheet] + ) class RealTeamRepository: @@ -377,11 +395,13 @@ class RealTeamRepository: def update(self, data: Dict, team_id: int) -> int: """Update team.""" from ..db_engine import Team # Lazy import - only used in production + return Team.update(**data).where(Team.id == team_id).execute() def insert_many(self, data: List[Dict]) -> int: """Insert multiple teams.""" from ..db_engine import Team, db # Lazy import - only used in production + with db.atomic(): Team.insert_many(data).on_conflict_ignore().execute() return len(data) @@ -389,4 +409,5 @@ class RealTeamRepository: def delete_by_id(self, team_id: int) -> int: """Delete team by ID.""" from ..db_engine import Team # Lazy import - only used in production + return Team.delete().where(Team.id == team_id).execute()