major-domo-database/app/services/base.py
Cal Corum 2189aea8da Fix linting and formatting issues
- Add missing imports: json, csv, io, model_to_dict
- Remove unused imports: Any, Dict, Type, invalidate_cache
- Remove redundant f-string prefixes from static error messages
- Format code with black
- All ruff and black checks pass
- All 76 unit tests pass (9 skipped)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-04 08:44:12 -06:00

299 lines
10 KiB
Python

"""
Base Service Class - Dependency Injection Version
Provides common functionality with configurable dependencies.
"""
import json
import logging
from typing import Optional, TypeVar
from .interfaces import (
AbstractPlayerRepository,
AbstractTeamRepository,
AbstractCacheService,
)
from .mocks import MockCacheService
logger = logging.getLogger("discord_app")
T = TypeVar("T")
class ServiceConfig:
"""Configuration for service dependencies."""
def __init__(
self,
player_repo: Optional[AbstractPlayerRepository] = None,
team_repo: Optional[AbstractTeamRepository] = None,
cache: Optional[AbstractCacheService] = None,
):
self.player_repo = player_repo
self.team_repo = team_repo
self.cache = cache
# Default configuration
_default_config = ServiceConfig()
class BaseService:
"""Base class for all services with dependency injection support."""
# Subclasses should override these
cache_patterns = []
def __init__(
self,
config: Optional[ServiceConfig] = None,
player_repo: Optional[AbstractPlayerRepository] = None,
team_repo: Optional[AbstractTeamRepository] = None,
cache: Optional[AbstractCacheService] = None,
):
"""
Initialize service with dependencies.
Args:
config: Optional ServiceConfig containing all dependencies
player_repo: Override for player repository
team_repo: Override for team repository
cache: Override for cache service
"""
# Use config if provided, otherwise use overrides or defaults
if config:
self._player_repo = config.player_repo
self._team_repo = config.team_repo
self._cache = config.cache
else:
self._player_repo = player_repo
self._team_repo = team_repo
self._cache = cache
# Lazy imports for defaults (avoids circular imports)
self._using_defaults = (
self._player_repo is None
and self._team_repo is None
and self._cache is None
)
@property
def player_repo(self) -> AbstractPlayerRepository:
"""Get player repository, importing from db_engine if not set."""
if self._player_repo is None:
from ..db_engine import Player
class DefaultPlayerRepo:
def select_season(self, season):
return Player.select_season(season)
def get_by_id(self, player_id):
return Player.get_or_none(Player.id == player_id)
def get_or_none(self, *conditions):
return Player.get_or_none(*conditions)
def update(self, data, *conditions):
return Player.update(data).where(*conditions).execute()
def insert_many(self, data):
return Player.insert_many(data).execute()
def delete_by_id(self, player_id):
player = Player.get_by_id(player_id)
if player:
return player.delete_instance()
return 0
self._player_repo = DefaultPlayerRepo()
return self._player_repo
@property
def team_repo(self) -> AbstractTeamRepository:
"""Get team repository, importing from db_engine if not set."""
if self._team_repo is None:
from ..db_engine import Team
class DefaultTeamRepo:
def select_season(self, season):
return Team.select_season(season)
def get_by_id(self, team_id):
return Team.get_by_id(team_id)
def get_or_none(self, *conditions):
return Team.get_or_none(*conditions)
def update(self, data, *conditions):
return Team.update(data).where(*conditions).execute()
def insert_many(self, data):
return Team.insert_many(data).execute()
def delete_by_id(self, team_id):
team = Team.get_by_id(team_id)
if team:
return team.delete_instance()
return 0
self._team_repo = DefaultTeamRepo()
return self._team_repo
@property
def cache(self) -> AbstractCacheService:
"""Get cache service, importing from dependencies if not set."""
if self._cache is None:
try:
from ..dependencies import redis_client
class DefaultCache:
def get(self, key: str):
if redis_client is None:
return None
return redis_client.get(key)
def set(self, key: str, value: str, ttl: int = 300):
if redis_client is None:
return False
redis_client.setex(key, ttl, value)
return True
def setex(self, key: str, ttl: int, value: str):
return self.set(key, value, ttl)
def keys(self, pattern: str):
if redis_client is None:
return []
return redis_client.keys(pattern)
def delete(self, *keys: str):
if redis_client is None:
return 0
return redis_client.delete(*keys)
def invalidate_pattern(self, pattern: str):
if redis_client is None:
return 0
keys = self.keys(pattern)
return self.delete(*keys)
def exists(self, key: str):
if redis_client is None:
return False
return redis_client.exists(key)
self._cache = DefaultCache()
except ImportError:
# Fall back to mock if dependencies not available
self._cache = MockCacheService()
return self._cache
def close_db(self):
"""Safely close database connection (for non-injected repos)."""
if self._using_defaults:
try:
from ..db_engine import db
db.close()
except Exception:
pass # Connection may already be closed
def invalidate_cache_for(self, entity_type: str, entity_id: Optional[int] = None):
"""Invalidate cache entries for an entity."""
if entity_id:
self.cache.invalidate_pattern(f"{entity_type}*{entity_id}*")
else:
self.cache.invalidate_pattern(f"{entity_type}*")
def invalidate_related_cache(self, patterns: list):
"""Invalidate multiple cache patterns."""
for pattern in patterns:
self.cache.invalidate_pattern(pattern)
def handle_error(
self, operation: str, error: Exception, rethrow: bool = True
) -> dict:
"""Handle errors consistently."""
logger.error(f"{operation}: {error}")
if rethrow:
try:
from fastapi import HTTPException
raise HTTPException(
status_code=500, detail=f"{operation}: {str(error)}"
)
except ImportError:
# For testing without FastAPI
raise RuntimeError(f"{operation}: {str(error)}")
return {"error": operation, "detail": str(error)}
@classmethod
def log_error(cls, operation: str, error: Exception) -> None:
"""Class method for logging errors without needing an instance."""
logger.error(f"{operation}: {error}")
def require_auth(self, token: str) -> bool:
"""Validate authentication token."""
try:
from fastapi import HTTPException
from ..dependencies import valid_token
if not valid_token(token):
logger.warning(
f"Unauthorized access attempt with token: {token[:10]}..."
)
raise HTTPException(status_code=401, detail="Unauthorized")
except ImportError:
# For testing without FastAPI - accept "valid_token" as test token
if token != "valid_token":
logger.warning(
f"Unauthorized access attempt with token: {token[:10] if len(token) >= 10 else token}..."
)
error = RuntimeError("Unauthorized")
error.status_code = 401 # Add status_code for test compatibility
raise error
return True
def format_csv_response(self, headers: list, rows: list) -> str:
"""Format data as CSV."""
from pandas import DataFrame
all_data = [headers] + rows
return DataFrame(all_data).to_csv(header=False, index=False)
def parse_query_params(self, params: dict, remove_none: bool = True) -> dict:
"""Parse and clean query parameters."""
if remove_none:
return {
k: v for k, v in params.items() if v is not None and v != [] and v != ""
}
return params
def with_cache(self, key: str, ttl: int = 300, fallback: Optional[callable] = None):
"""
Decorator-style cache wrapper for methods.
Usage:
@service.with_cache("player:123", ttl=600)
def get_player(self):
...
"""
def decorator(func):
async def wrapper(*args, **kwargs):
# Try cache first
cached = self.cache.get(key)
if cached:
return json.loads(cached)
# Execute and cache result
result = func(*args, **kwargs)
if result is not None:
self.cache.set(key, json.dumps(result, default=str), ttl)
return result
return wrapper
return decorator