major-domo-v2/api/client.py
Cal Corum 62c658fb57 CLAUDE: Add automated weekly transaction freeze/thaw system
Implements comprehensive automated system for weekly transaction freeze periods
with priority-based contested player resolution.

New Features:
- Weekly freeze/thaw task (Monday 00:00 freeze, Saturday 00:00 thaw)
- Priority resolution for contested transactions (worst teams get first priority)
- Admin league management commands (/freeze-begin, /freeze-end, /advance-week)
- Enhanced API client to handle string-based transaction IDs (moveids)
- Service layer methods for transaction cancellation, unfreezing, and bulk operations
- Offseason mode configuration flag to disable freeze operations

Technical Changes:
- api/client.py: URL-encode object_id parameter to handle colons in moveids
- bot.py: Initialize and shutdown transaction freeze task
- config.py: Add offseason_flag to BotConfig
- services/league_service.py: Add update_current_state() for week/freeze updates
- services/transaction_service.py: Add cancel/unfreeze methods with bulk support
- tasks/transaction_freeze.py: Main freeze/thaw automation with error recovery
- commands/admin/league_management.py: Manual admin controls for freeze system

Infrastructure:
- .gitlab-ci.yml and .gitlab/: GitLab CI/CD pipeline configuration
- .mcp.json: MCP server configuration
- Dockerfile.versioned: Versioned Docker build support
- .dockerignore: Added .gitlab/ to ignore list

Testing:
- tests/test_tasks_transaction_freeze.py: Comprehensive freeze task tests

The system uses team standings to fairly resolve contested players (multiple teams
trying to acquire the same player), with worst-record teams getting priority.
Includes comprehensive error handling, GM notifications, and admin reporting.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-20 12:16:13 -05:00

502 lines
19 KiB
Python

"""
API client for Discord Bot v2.0
Modern aiohttp-based HTTP client for communicating with the database API.
Provides connection pooling, proper error handling, and session management.
"""
import aiohttp
import logging
from typing import Optional, List, Dict, Any, Union
from urllib.parse import urljoin, quote
from contextlib import asynccontextmanager
from config import get_config
from exceptions import APIException
logger = logging.getLogger(f'{__name__}.APIClient')
class APIClient:
"""
Async HTTP client for SBA database API communication.
Features:
- Connection pooling with proper session management
- Bearer token authentication
- Standardized v3 API usage
- Comprehensive error handling
- Debug logging with response truncation
"""
def __init__(self, base_url: Optional[str] = None, api_token: Optional[str] = None):
"""
Initialize API client with configuration.
Args:
base_url: Override default database URL from config
api_token: Override default API token from config
Raises:
ValueError: If required configuration is missing
"""
config = get_config()
self.base_url = base_url or config.db_url
self.api_token = api_token or config.api_token
self._session: Optional[aiohttp.ClientSession] = None
if not self.base_url:
raise ValueError("DB_URL must be configured")
if not self.api_token:
raise ValueError("API_TOKEN must be configured")
logger.debug(f"APIClient initialized with base_url: {self.base_url}")
@property
def headers(self) -> Dict[str, str]:
"""Get headers with authentication and content type."""
return {
'Authorization': f'Bearer {self.api_token}',
'Content-Type': 'application/json',
'User-Agent': 'SBA-Discord-Bot-v2/1.0'
}
def _build_url(self, endpoint: str, api_version: int = 3, object_id: Optional[Union[int, str]] = None) -> str:
"""
Build complete API URL from components.
Args:
endpoint: API endpoint path
api_version: API version number (default: 3)
object_id: Optional object ID to append (int for numeric IDs, str for moveids)
Returns:
Complete URL for API request
"""
# Handle already complete URLs
if endpoint.startswith(('http://', 'https://')) or '/api/' in endpoint:
return endpoint
path = f"v{api_version}/{endpoint}"
if object_id is not None:
# URL-encode the object_id to handle special characters (e.g., colons in moveids)
encoded_id = quote(str(object_id), safe='')
path += f"/{encoded_id}"
return urljoin(self.base_url.rstrip('/') + '/', path)
def _add_params(self, url: str, params: Optional[List[tuple]] = None) -> str:
"""
Add query parameters to URL.
Args:
url: Base URL
params: List of (key, value) tuples
Returns:
URL with query parameters appended
"""
if not params:
return url
param_str = "&".join(f"{key}={value}" for key, value in params)
separator = "&" if "?" in url else "?"
return f"{url}{separator}{param_str}"
async def _ensure_session(self) -> None:
"""Ensure aiohttp session exists and is not closed."""
if self._session is None or self._session.closed:
connector = aiohttp.TCPConnector(
limit=100, # Total connection pool size
limit_per_host=30, # Connections per host
ttl_dns_cache=300, # DNS cache TTL
use_dns_cache=True
)
timeout = aiohttp.ClientTimeout(total=30, connect=10)
self._session = aiohttp.ClientSession(
headers=self.headers,
connector=connector,
timeout=timeout
)
logger.debug("Created new aiohttp session with connection pooling")
async def get(
self,
endpoint: str,
object_id: Optional[Union[int, str]] = None,
params: Optional[List[tuple]] = None,
api_version: int = 3,
timeout: Optional[int] = None
) -> Optional[Dict[str, Any]]:
"""
Make GET request to API.
Args:
endpoint: API endpoint
object_id: Optional object ID
params: Query parameters
api_version: API version (default: 3)
timeout: Request timeout override
Returns:
JSON response data or None for 404
Raises:
APIException: For HTTP errors or network issues
"""
url = self._build_url(endpoint, api_version, object_id)
url = self._add_params(url, params)
await self._ensure_session()
try:
logger.debug(f"GET: {endpoint} id: {object_id} params: {params}")
request_timeout = aiohttp.ClientTimeout(total=timeout) if timeout else None
async with self._session.get(url, timeout=request_timeout) as response:
if response.status == 404:
logger.warning(f"Resource not found: {url}")
return None
elif response.status == 401:
logger.error(f"Authentication failed for: {url}")
raise APIException("Authentication failed - check API token")
elif response.status == 403:
logger.error(f"Access forbidden for: {url}")
raise APIException("Access forbidden - insufficient permissions")
elif response.status >= 400:
error_text = await response.text()
logger.error(f"API error {response.status}: {url} - {error_text}")
raise APIException(f"API request failed with status {response.status}: {error_text}")
data = await response.json()
# Truncate response for logging
data_str = str(data)
if len(data_str) > 1200:
log_data = data_str[:1200] + "..."
else:
log_data = data_str
logger.debug(f"Response: {log_data}")
return data
except aiohttp.ClientError as e:
logger.error(f"HTTP client error for {url}: {e}")
raise APIException(f"Network error: {e}")
except Exception as e:
logger.error(f"Unexpected error in GET {url}: {e}")
raise APIException(f"API call failed: {e}")
async def post(
self,
endpoint: str,
data: Dict[str, Any],
api_version: int = 3,
timeout: Optional[int] = None
) -> Optional[Dict[str, Any]]:
"""
Make POST request to API.
Args:
endpoint: API endpoint
data: Request payload
api_version: API version (default: 3)
timeout: Request timeout override
Returns:
JSON response data
Raises:
APIException: For HTTP errors or network issues
"""
url = self._build_url(endpoint, api_version)
await self._ensure_session()
try:
logger.debug(f"POST: {endpoint} data: {data}")
request_timeout = aiohttp.ClientTimeout(total=timeout) if timeout else None
async with self._session.post(url, json=data, timeout=request_timeout) as response:
if response.status == 401:
logger.error(f"Authentication failed for POST: {url}")
raise APIException("Authentication failed - check API token")
elif response.status == 403:
logger.error(f"Access forbidden for POST: {url}")
raise APIException("Access forbidden - insufficient permissions")
elif response.status not in [200, 201]:
error_text = await response.text()
logger.error(f"POST error {response.status}: {url} - {error_text}")
raise APIException(f"POST request failed with status {response.status}: {error_text}")
result = await response.json()
# Truncate response for logging
result_str = str(result)
if len(result_str) > 1200:
log_result = result_str[:1200] + "..."
else:
log_result = result_str
logger.debug(f"POST Response: {log_result}")
return result
except aiohttp.ClientError as e:
logger.error(f"HTTP client error for POST {url}: {e}")
raise APIException(f"Network error: {e}")
except Exception as e:
logger.error(f"Unexpected error in POST {url}: {e}")
raise APIException(f"POST failed: {e}")
async def put(
self,
endpoint: str,
data: Dict[str, Any],
object_id: Optional[Union[int, str]] = None,
api_version: int = 3,
timeout: Optional[int] = None
) -> Optional[Dict[str, Any]]:
"""
Make PUT request to API.
Args:
endpoint: API endpoint
data: Request payload
object_id: Optional object ID
api_version: API version (default: 3)
timeout: Request timeout override
Returns:
JSON response data
Raises:
APIException: For HTTP errors or network issues
"""
url = self._build_url(endpoint, api_version, object_id)
await self._ensure_session()
try:
logger.debug(f"PUT: {endpoint} id: {object_id} data: {data}")
request_timeout = aiohttp.ClientTimeout(total=timeout) if timeout else None
async with self._session.put(url, json=data, timeout=request_timeout) as response:
if response.status == 401:
logger.error(f"Authentication failed for PUT: {url}")
raise APIException("Authentication failed - check API token")
elif response.status == 403:
logger.error(f"Access forbidden for PUT: {url}")
raise APIException("Access forbidden - insufficient permissions")
elif response.status == 404:
logger.warning(f"Resource not found for PUT: {url}")
return None
elif response.status not in [200, 201]:
error_text = await response.text()
logger.error(f"PUT error {response.status}: {url} - {error_text}")
raise APIException(f"PUT request failed with status {response.status}: {error_text}")
result = await response.json()
logger.debug(f"PUT Response: {str(result)[:1200]}{'...' if len(str(result)) > 1200 else ''}")
return result
except aiohttp.ClientError as e:
logger.error(f"HTTP client error for PUT {url}: {e}")
raise APIException(f"Network error: {e}")
except Exception as e:
logger.error(f"Unexpected error in PUT {url}: {e}")
raise APIException(f"PUT failed: {e}")
async def patch(
self,
endpoint: str,
data: Optional[Dict[str, Any]] = None,
object_id: Optional[Union[int, str]] = None,
api_version: int = 3,
timeout: Optional[int] = None,
use_query_params: bool = False
) -> Optional[Dict[str, Any]]:
"""
Make PATCH request to API.
Args:
endpoint: API endpoint
data: Request payload (optional for some PATCH operations)
object_id: Optional object ID
api_version: API version (default: 3)
timeout: Request timeout override
use_query_params: If True, send data as query parameters instead of JSON body (default: False)
Returns:
JSON response data
Raises:
APIException: For HTTP errors or network issues
"""
url = self._build_url(endpoint, api_version, object_id)
# Add data as query parameters if requested
if use_query_params and data:
# Handle None values by converting to empty string
# The database API's PATCH endpoint treats empty strings as NULL for nullable fields
# Example: {'il_return': None} → ?il_return= → Database sets il_return to NULL
params = [(k, '' if v is None else str(v)) for k, v in data.items()]
url = self._add_params(url, params)
await self._ensure_session()
try:
logger.debug(f"PATCH: {endpoint} id: {object_id} data: {data} use_query_params: {use_query_params}")
logger.debug(f"PATCH URL: {url}")
request_timeout = aiohttp.ClientTimeout(total=timeout) if timeout else None
# Use json=data if data is provided and not using query params
kwargs = {}
if data is not None and not use_query_params:
kwargs['json'] = data
logger.debug(f"PATCH JSON body: {data}")
async with self._session.patch(url, timeout=request_timeout, **kwargs) as response:
if response.status == 401:
logger.error(f"Authentication failed for PATCH: {url}")
raise APIException("Authentication failed - check API token")
elif response.status == 403:
logger.error(f"Access forbidden for PATCH: {url}")
raise APIException("Access forbidden - insufficient permissions")
elif response.status == 404:
logger.warning(f"Resource not found for PATCH: {url}")
return None
elif response.status not in [200, 201]:
error_text = await response.text()
logger.error(f"PATCH error {response.status}: {url} - {error_text}")
raise APIException(f"PATCH request failed with status {response.status}: {error_text}")
result = await response.json()
logger.debug(f"PATCH Response: {str(result)[:1200]}{'...' if len(str(result)) > 1200 else ''}")
return result
except aiohttp.ClientError as e:
logger.error(f"HTTP client error for PATCH {url}: {e}")
raise APIException(f"Network error: {e}")
except Exception as e:
logger.error(f"Unexpected error in PATCH {url}: {e}")
raise APIException(f"PATCH failed: {e}")
async def delete(
self,
endpoint: str,
object_id: Optional[Union[int, str]] = None,
api_version: int = 3,
timeout: Optional[int] = None
) -> bool:
"""
Make DELETE request to API.
Args:
endpoint: API endpoint
object_id: Optional object ID
api_version: API version (default: 3)
timeout: Request timeout override
Returns:
True if deletion successful, False if resource not found
Raises:
APIException: For HTTP errors or network issues
"""
url = self._build_url(endpoint, api_version, object_id)
await self._ensure_session()
try:
logger.debug(f"DELETE: {endpoint} id: {object_id}")
request_timeout = aiohttp.ClientTimeout(total=timeout) if timeout else None
async with self._session.delete(url, timeout=request_timeout) as response:
if response.status == 401:
logger.error(f"Authentication failed for DELETE: {url}")
raise APIException("Authentication failed - check API token")
elif response.status == 403:
logger.error(f"Access forbidden for DELETE: {url}")
raise APIException("Access forbidden - insufficient permissions")
elif response.status == 404:
logger.warning(f"Resource not found for DELETE: {url}")
return False
elif response.status not in [200, 204]:
error_text = await response.text()
logger.error(f"DELETE error {response.status}: {url} - {error_text}")
raise APIException(f"DELETE request failed with status {response.status}: {error_text}")
logger.debug(f"DELETE successful: {url}")
return True
except aiohttp.ClientError as e:
logger.error(f"HTTP client error for DELETE {url}: {e}")
raise APIException(f"Network error: {e}")
except Exception as e:
logger.error(f"Unexpected error in DELETE {url}: {e}")
raise APIException(f"DELETE failed: {e}")
async def close(self) -> None:
"""Close the HTTP session and clean up resources."""
if self._session and not self._session.closed:
await self._session.close()
logger.debug("Closed aiohttp session")
async def __aenter__(self):
"""Async context manager entry."""
await self._ensure_session()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit with cleanup."""
await self.close()
@asynccontextmanager
async def get_api_client() -> APIClient:
"""
Get API client as async context manager.
Usage:
async with get_api_client() as client:
data = await client.get('players')
"""
client = APIClient()
try:
yield client
finally:
await client.close()
# Global API client instance for reuse
_global_client: Optional[APIClient] = None
async def get_global_client() -> APIClient:
"""
Get global API client instance with automatic session management.
Returns:
Shared APIClient instance
"""
global _global_client
if _global_client is None:
_global_client = APIClient()
await _global_client._ensure_session()
return _global_client
async def cleanup_global_client() -> None:
"""Clean up global API client. Call during bot shutdown."""
global _global_client
if _global_client:
await _global_client.close()
_global_client = None