Merge branch 'fix/security-issues' into next-release

This commit is contained in:
Cal Corum 2026-02-20 10:39:47 -06:00
commit a44d037611
8 changed files with 976 additions and 665 deletions

View File

@ -4,6 +4,7 @@ API client for Discord Bot v2.0
Modern aiohttp-based HTTP client for communicating with the database API.
Provides connection pooling, proper error handling, and session management.
"""
import aiohttp
import logging
from typing import Optional, List, Dict, Any, Union
@ -13,13 +14,13 @@ from contextlib import asynccontextmanager
from config import get_config
from exceptions import APIException
logger = logging.getLogger(f'{__name__}.APIClient')
logger = logging.getLogger(f"{__name__}.APIClient")
class APIClient:
"""
Async HTTP client for SBA database API communication.
Features:
- Connection pooling with proper session management
- Bearer token authentication
@ -27,15 +28,15 @@ class APIClient:
- Comprehensive error handling
- Debug logging with response truncation
"""
def __init__(self, base_url: Optional[str] = None, api_token: Optional[str] = None):
"""
Initialize API client with configuration.
Args:
base_url: Override default database URL from config
api_token: Override default API token from config
Raises:
ValueError: If required configuration is missing
"""
@ -43,24 +44,29 @@ class APIClient:
self.base_url = base_url or config.db_url
self.api_token = api_token or config.api_token
self._session: Optional[aiohttp.ClientSession] = None
if not self.base_url:
raise ValueError("DB_URL must be configured")
if not self.api_token:
raise ValueError("API_TOKEN must be configured")
logger.debug(f"APIClient initialized with base_url: {self.base_url}")
@property
def headers(self) -> Dict[str, str]:
"""Get headers with authentication and content type."""
return {
'Authorization': f'Bearer {self.api_token}',
'Content-Type': 'application/json',
'User-Agent': 'SBA-Discord-Bot-v2/1.0'
"Authorization": f"Bearer {self.api_token}",
"Content-Type": "application/json",
"User-Agent": "SBA-Discord-Bot-v2/1.0",
}
def _build_url(self, endpoint: str, api_version: int = 3, object_id: Optional[Union[int, str]] = None) -> str:
def _build_url(
self,
endpoint: str,
api_version: int = 3,
object_id: Optional[Union[int, str]] = None,
) -> str:
"""
Build complete API URL from components.
@ -73,35 +79,38 @@ class APIClient:
Complete URL for API request
"""
# Handle already complete URLs
if endpoint.startswith(('http://', 'https://')) or '/api/' in endpoint:
if endpoint.startswith(("http://", "https://")) or "/api/" in endpoint:
return endpoint
path = f"v{api_version}/{endpoint}"
if object_id is not None:
# URL-encode the object_id to handle special characters (e.g., colons in moveids)
encoded_id = quote(str(object_id), safe='')
encoded_id = quote(str(object_id), safe="")
path += f"/{encoded_id}"
return urljoin(self.base_url.rstrip('/') + '/', path)
return urljoin(self.base_url.rstrip("/") + "/", path)
def _add_params(self, url: str, params: Optional[List[tuple]] = None) -> str:
"""
Add query parameters to URL.
Args:
url: Base URL
params: List of (key, value) tuples
Returns:
URL with query parameters appended
"""
if not params:
return url
param_str = "&".join(f"{key}={value}" for key, value in params)
param_str = "&".join(
f"{quote(str(key), safe='')}={quote(str(value), safe='')}"
for key, value in params
)
separator = "&" if "?" in url else "?"
return f"{url}{separator}{param_str}"
async def _ensure_session(self) -> None:
"""Ensure aiohttp session exists and is not closed."""
if self._session is None or self._session.closed:
@ -109,53 +118,51 @@ class APIClient:
limit=100, # Total connection pool size
limit_per_host=30, # Connections per host
ttl_dns_cache=300, # DNS cache TTL
use_dns_cache=True
use_dns_cache=True,
)
timeout = aiohttp.ClientTimeout(total=30, connect=10)
self._session = aiohttp.ClientSession(
headers=self.headers,
connector=connector,
timeout=timeout
headers=self.headers, connector=connector, timeout=timeout
)
logger.debug("Created new aiohttp session with connection pooling")
async def get(
self,
endpoint: str,
object_id: Optional[Union[int, str]] = None,
params: Optional[List[tuple]] = None,
api_version: int = 3,
timeout: Optional[int] = None
timeout: Optional[int] = None,
) -> Optional[Dict[str, Any]]:
"""
Make GET request to API.
Args:
endpoint: API endpoint
object_id: Optional object ID
params: Query parameters
api_version: API version (default: 3)
timeout: Request timeout override
Returns:
JSON response data or None for 404
Raises:
APIException: For HTTP errors or network issues
"""
url = self._build_url(endpoint, api_version, object_id)
url = self._add_params(url, params)
await self._ensure_session()
try:
logger.debug(f"GET: {endpoint} id: {object_id} params: {params}")
request_timeout = aiohttp.ClientTimeout(total=timeout) if timeout else None
async with self._session.get(url, timeout=request_timeout) as response:
if response.status == 404:
logger.warning(f"Resource not found: {url}")
@ -169,10 +176,12 @@ class APIClient:
elif response.status >= 400:
error_text = await response.text()
logger.error(f"API error {response.status}: {url} - {error_text}")
raise APIException(f"API request failed with status {response.status}: {error_text}")
raise APIException(
f"API request failed with status {response.status}: {error_text}"
)
data = await response.json()
# Truncate response for logging
data_str = str(data)
if len(data_str) > 1200:
@ -180,48 +189,50 @@ class APIClient:
else:
log_data = data_str
logger.debug(f"Response: {log_data}")
return data
except aiohttp.ClientError as e:
logger.error(f"HTTP client error for {url}: {e}")
raise APIException(f"Network error: {e}")
except Exception as e:
logger.error(f"Unexpected error in GET {url}: {e}")
raise APIException(f"API call failed: {e}")
async def post(
self,
endpoint: str,
self,
endpoint: str,
data: Dict[str, Any],
api_version: int = 3,
timeout: Optional[int] = None
timeout: Optional[int] = None,
) -> Optional[Dict[str, Any]]:
"""
Make POST request to API.
Args:
endpoint: API endpoint
data: Request payload
api_version: API version (default: 3)
timeout: Request timeout override
Returns:
JSON response data
Raises:
APIException: For HTTP errors or network issues
"""
url = self._build_url(endpoint, api_version)
await self._ensure_session()
try:
logger.debug(f"POST: {endpoint} data: {data}")
request_timeout = aiohttp.ClientTimeout(total=timeout) if timeout else None
async with self._session.post(url, json=data, timeout=request_timeout) as response:
async with self._session.post(
url, json=data, timeout=request_timeout
) as response:
if response.status == 401:
logger.error(f"Authentication failed for POST: {url}")
raise APIException("Authentication failed - check API token")
@ -231,10 +242,12 @@ class APIClient:
elif response.status not in [200, 201]:
error_text = await response.text()
logger.error(f"POST error {response.status}: {url} - {error_text}")
raise APIException(f"POST request failed with status {response.status}: {error_text}")
raise APIException(
f"POST request failed with status {response.status}: {error_text}"
)
result = await response.json()
# Truncate response for logging
result_str = str(result)
if len(result_str) > 1200:
@ -242,50 +255,52 @@ class APIClient:
else:
log_result = result_str
logger.debug(f"POST Response: {log_result}")
return result
except aiohttp.ClientError as e:
logger.error(f"HTTP client error for POST {url}: {e}")
raise APIException(f"Network error: {e}")
except Exception as e:
logger.error(f"Unexpected error in POST {url}: {e}")
raise APIException(f"POST failed: {e}")
async def put(
self,
endpoint: str,
data: Dict[str, Any],
object_id: Optional[Union[int, str]] = None,
api_version: int = 3,
timeout: Optional[int] = None
timeout: Optional[int] = None,
) -> Optional[Dict[str, Any]]:
"""
Make PUT request to API.
Args:
endpoint: API endpoint
data: Request payload
object_id: Optional object ID
api_version: API version (default: 3)
timeout: Request timeout override
Returns:
JSON response data
Raises:
APIException: For HTTP errors or network issues
"""
url = self._build_url(endpoint, api_version, object_id)
await self._ensure_session()
try:
logger.debug(f"PUT: {endpoint} id: {object_id} data: {data}")
request_timeout = aiohttp.ClientTimeout(total=timeout) if timeout else None
async with self._session.put(url, json=data, timeout=request_timeout) as response:
async with self._session.put(
url, json=data, timeout=request_timeout
) as response:
if response.status == 401:
logger.error(f"Authentication failed for PUT: {url}")
raise APIException("Authentication failed - check API token")
@ -298,19 +313,23 @@ class APIClient:
elif response.status not in [200, 201]:
error_text = await response.text()
logger.error(f"PUT error {response.status}: {url} - {error_text}")
raise APIException(f"PUT request failed with status {response.status}: {error_text}")
raise APIException(
f"PUT request failed with status {response.status}: {error_text}"
)
result = await response.json()
logger.debug(f"PUT Response: {str(result)[:1200]}{'...' if len(str(result)) > 1200 else ''}")
logger.debug(
f"PUT Response: {str(result)[:1200]}{'...' if len(str(result)) > 1200 else ''}"
)
return result
except aiohttp.ClientError as e:
logger.error(f"HTTP client error for PUT {url}: {e}")
raise APIException(f"Network error: {e}")
except Exception as e:
logger.error(f"Unexpected error in PUT {url}: {e}")
raise APIException(f"PUT failed: {e}")
async def patch(
self,
endpoint: str,
@ -318,7 +337,7 @@ class APIClient:
object_id: Optional[Union[int, str]] = None,
api_version: int = 3,
timeout: Optional[int] = None,
use_query_params: bool = False
use_query_params: bool = False,
) -> Optional[Dict[str, Any]]:
"""
Make PATCH request to API.
@ -344,13 +363,15 @@ class APIClient:
# Handle None values by converting to empty string
# The database API's PATCH endpoint treats empty strings as NULL for nullable fields
# Example: {'il_return': None} → ?il_return= → Database sets il_return to NULL
params = [(k, '' if v is None else str(v)) for k, v in data.items()]
params = [(k, "" if v is None else str(v)) for k, v in data.items()]
url = self._add_params(url, params)
await self._ensure_session()
try:
logger.debug(f"PATCH: {endpoint} id: {object_id} data: {data} use_query_params: {use_query_params}")
logger.debug(
f"PATCH: {endpoint} id: {object_id} data: {data} use_query_params: {use_query_params}"
)
logger.debug(f"PATCH URL: {url}")
request_timeout = aiohttp.ClientTimeout(total=timeout) if timeout else None
@ -358,10 +379,12 @@ class APIClient:
# Use json=data if data is provided and not using query params
kwargs = {}
if data is not None and not use_query_params:
kwargs['json'] = data
kwargs["json"] = data
logger.debug(f"PATCH JSON body: {data}")
async with self._session.patch(url, timeout=request_timeout, **kwargs) as response:
async with self._session.patch(
url, timeout=request_timeout, **kwargs
) as response:
if response.status == 401:
logger.error(f"Authentication failed for PATCH: {url}")
raise APIException("Authentication failed - check API token")
@ -374,10 +397,14 @@ class APIClient:
elif response.status not in [200, 201]:
error_text = await response.text()
logger.error(f"PATCH error {response.status}: {url} - {error_text}")
raise APIException(f"PATCH request failed with status {response.status}: {error_text}")
raise APIException(
f"PATCH request failed with status {response.status}: {error_text}"
)
result = await response.json()
logger.debug(f"PATCH Response: {str(result)[:1200]}{'...' if len(str(result)) > 1200 else ''}")
logger.debug(
f"PATCH Response: {str(result)[:1200]}{'...' if len(str(result)) > 1200 else ''}"
)
return result
except aiohttp.ClientError as e:
@ -386,38 +413,38 @@ class APIClient:
except Exception as e:
logger.error(f"Unexpected error in PATCH {url}: {e}")
raise APIException(f"PATCH failed: {e}")
async def delete(
self,
endpoint: str,
object_id: Optional[Union[int, str]] = None,
api_version: int = 3,
timeout: Optional[int] = None
timeout: Optional[int] = None,
) -> bool:
"""
Make DELETE request to API.
Args:
endpoint: API endpoint
object_id: Optional object ID
api_version: API version (default: 3)
timeout: Request timeout override
Returns:
True if deletion successful, False if resource not found
Raises:
APIException: For HTTP errors or network issues
"""
url = self._build_url(endpoint, api_version, object_id)
await self._ensure_session()
try:
logger.debug(f"DELETE: {endpoint} id: {object_id}")
request_timeout = aiohttp.ClientTimeout(total=timeout) if timeout else None
async with self._session.delete(url, timeout=request_timeout) as response:
if response.status == 401:
logger.error(f"Authentication failed for DELETE: {url}")
@ -430,30 +457,34 @@ class APIClient:
return False
elif response.status not in [200, 204]:
error_text = await response.text()
logger.error(f"DELETE error {response.status}: {url} - {error_text}")
raise APIException(f"DELETE request failed with status {response.status}: {error_text}")
logger.error(
f"DELETE error {response.status}: {url} - {error_text}"
)
raise APIException(
f"DELETE request failed with status {response.status}: {error_text}"
)
logger.debug(f"DELETE successful: {url}")
return True
except aiohttp.ClientError as e:
logger.error(f"HTTP client error for DELETE {url}: {e}")
raise APIException(f"Network error: {e}")
except Exception as e:
logger.error(f"Unexpected error in DELETE {url}: {e}")
raise APIException(f"DELETE failed: {e}")
async def close(self) -> None:
"""Close the HTTP session and clean up resources."""
if self._session and not self._session.closed:
await self._session.close()
logger.debug("Closed aiohttp session")
async def __aenter__(self):
"""Async context manager entry."""
await self._ensure_session()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit with cleanup."""
await self.close()
@ -463,7 +494,7 @@ class APIClient:
async def get_api_client() -> APIClient:
"""
Get API client as async context manager.
Usage:
async with get_api_client() as client:
data = await client.get('players')
@ -482,14 +513,14 @@ _global_client: Optional[APIClient] = None
async def get_global_client() -> APIClient:
"""
Get global API client instance with automatic session management.
Returns:
Shared APIClient instance
"""
global _global_client
if _global_client is None:
_global_client = APIClient()
await _global_client._ensure_session()
return _global_client
@ -499,4 +530,4 @@ async def cleanup_global_client() -> None:
global _global_client
if _global_client:
await _global_client.close()
_global_client = None
_global_client = None

269
bot.py
View File

@ -3,6 +3,7 @@ Discord Bot v2.0 - Main Entry Point
Modern discord.py bot with application commands and proper error handling.
"""
import asyncio
import hashlib
import json
@ -23,89 +24,91 @@ from views.embeds import EmbedTemplate, EmbedColors
def setup_logging():
"""Configure hybrid logging: human-readable console + structured JSON files."""
from utils.logging import JSONFormatter
# Create logs directory if it doesn't exist
os.makedirs('logs', exist_ok=True)
os.makedirs("logs", exist_ok=True)
# Configure root logger
config = get_config()
logger = logging.getLogger('discord_bot_v2')
logger = logging.getLogger("discord_bot_v2")
logger.setLevel(getattr(logging, config.log_level.upper()))
# Console handler - detailed format for development debugging
console_handler = logging.StreamHandler()
console_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s'
"%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s"
)
console_handler.setFormatter(console_formatter)
logger.addHandler(console_handler)
# JSON file handler - structured logging for monitoring and analysis
json_handler = RotatingFileHandler(
'logs/discord_bot_v2.json',
maxBytes=5 * 1024 * 1024, # 5MB
backupCount=5
"logs/discord_bot_v2.json", maxBytes=5 * 1024 * 1024, backupCount=5 # 5MB
)
json_handler.setFormatter(JSONFormatter())
logger.addHandler(json_handler)
# Configure root logger for third-party libraries (discord.py, aiohttp, etc.)
root_logger = logging.getLogger()
root_logger.setLevel(getattr(logging, config.log_level.upper()))
# Add handlers to root logger so third-party loggers inherit them
if not root_logger.handlers: # Avoid duplicate handlers
root_logger.addHandler(console_handler)
root_logger.addHandler(json_handler)
# Prevent discord_bot_v2 logger from propagating to root to avoid duplicate messages
# (bot logs will still appear via its own handlers, third-party logs via root handlers)
# To revert: remove the line below and bot logs will appear twice
logger.propagate = False
return logger
class SBABot(commands.Bot):
"""Custom bot class for SBA league management."""
def __init__(self):
# Configure intents
intents = discord.Intents.default()
intents.message_content = True # For legacy commands if needed
intents.members = True # For member management
super().__init__(
command_prefix='!', # Legacy prefix, primarily using slash commands
command_prefix="!", # Legacy prefix, primarily using slash commands
intents=intents,
description="Major Domo v2.0"
description="Major Domo v2.0",
)
self.logger = logging.getLogger('discord_bot_v2')
self.logger = logging.getLogger("discord_bot_v2")
async def setup_hook(self):
"""Called when the bot is starting up."""
self.logger.info("Setting up bot...")
# Load command packages
await self._load_command_packages()
# Initialize cleanup tasks
await self._setup_background_tasks()
# Smart command syncing: auto-sync in development if changes detected; !admin-sync for first sync
config = get_config()
if config.is_development:
if await self._should_sync_commands():
self.logger.info("Development mode: changes detected, syncing commands...")
self.logger.info(
"Development mode: changes detected, syncing commands..."
)
await self._sync_commands()
await self._save_command_hash()
else:
self.logger.info("Development mode: no command changes detected, skipping sync")
self.logger.info(
"Development mode: no command changes detected, skipping sync"
)
else:
self.logger.info("Production mode: commands loaded but not auto-synced")
self.logger.info("Use /admin-sync command to manually sync when needed")
async def _load_command_packages(self):
"""Load all command packages with resilient error handling."""
from commands.players import setup_players
@ -146,32 +149,42 @@ class SBABot(commands.Bot):
("gameplay", setup_gameplay),
("dev", setup_dev), # Dev-only commands (admin restricted)
]
total_successful = 0
total_failed = 0
for package_name, setup_func in command_packages:
try:
self.logger.info(f"Loading {package_name} commands...")
successful, failed, failed_modules = await setup_func(self)
total_successful += successful
total_failed += failed
if failed == 0:
self.logger.info(f"{package_name} commands loaded successfully ({successful} cogs)")
self.logger.info(
f"{package_name} commands loaded successfully ({successful} cogs)"
)
else:
self.logger.warning(f"⚠️ {package_name} commands partially loaded: {successful} successful, {failed} failed")
self.logger.warning(
f"⚠️ {package_name} commands partially loaded: {successful} successful, {failed} failed"
)
except Exception as e:
self.logger.error(f"❌ Failed to load {package_name} package: {e}", exc_info=True)
self.logger.error(
f"❌ Failed to load {package_name} package: {e}", exc_info=True
)
total_failed += 1
# Log overall summary
if total_failed == 0:
self.logger.info(f"🎉 All command packages loaded successfully ({total_successful} total cogs)")
self.logger.info(
f"🎉 All command packages loaded successfully ({total_successful} total cogs)"
)
else:
self.logger.warning(f"⚠️ Command loading completed with issues: {total_successful} successful, {total_failed} failed")
self.logger.warning(
f"⚠️ Command loading completed with issues: {total_successful} successful, {total_failed} failed"
)
async def _setup_background_tasks(self):
"""Initialize background tasks for the bot."""
try:
@ -179,28 +192,34 @@ class SBABot(commands.Bot):
# Initialize custom command cleanup task
from tasks.custom_command_cleanup import setup_cleanup_task
self.custom_command_cleanup = setup_cleanup_task(self)
# Initialize transaction freeze/thaw task
from tasks.transaction_freeze import setup_freeze_task
self.transaction_freeze = setup_freeze_task(self)
self.logger.info("✅ Transaction freeze/thaw task started")
# Initialize voice channel cleanup service
from commands.voice.cleanup_service import setup_voice_cleanup
self.voice_cleanup_service = setup_voice_cleanup(self)
self.logger.info("✅ Voice channel cleanup service started")
# Initialize live scorebug tracker
from tasks.live_scorebug_tracker import setup_scorebug_tracker
self.live_scorebug_tracker = setup_scorebug_tracker(self)
self.logger.info("✅ Live scorebug tracker started")
self.logger.info("✅ Background tasks initialized successfully")
except Exception as e:
self.logger.error(f"❌ Failed to initialize background tasks: {e}", exc_info=True)
self.logger.error(
f"❌ Failed to initialize background tasks: {e}", exc_info=True
)
async def _should_sync_commands(self) -> bool:
"""Check if commands have changed since last sync."""
try:
@ -209,50 +228,51 @@ class SBABot(commands.Bot):
for cmd in self.tree.get_commands():
# Handle different command types properly
cmd_dict = {}
cmd_dict['name'] = cmd.name
cmd_dict['type'] = type(cmd).__name__
cmd_dict["name"] = cmd.name
cmd_dict["type"] = type(cmd).__name__
# Add description if available (most command types have this)
if hasattr(cmd, 'description'):
cmd_dict['description'] = cmd.description # type: ignore
if hasattr(cmd, "description"):
cmd_dict["description"] = cmd.description # type: ignore
# Add parameters for Command objects
if isinstance(cmd, discord.app_commands.Command):
cmd_dict['parameters'] = [
cmd_dict["parameters"] = [
{
'name': param.name,
'description': param.description,
'required': param.required,
'type': str(param.type)
} for param in cmd.parameters
"name": param.name,
"description": param.description,
"required": param.required,
"type": str(param.type),
}
for param in cmd.parameters
]
elif isinstance(cmd, discord.app_commands.Group):
# For groups, include subcommands
cmd_dict['subcommands'] = [subcmd.name for subcmd in cmd.commands]
cmd_dict["subcommands"] = [subcmd.name for subcmd in cmd.commands]
commands_data.append(cmd_dict)
# Sort for consistent hashing
commands_data.sort(key=lambda x: x['name'])
current_hash = hashlib.md5(
commands_data.sort(key=lambda x: x["name"])
current_hash = hashlib.sha256(
json.dumps(commands_data, sort_keys=True).encode()
).hexdigest()
# Compare with stored hash
hash_file = '.last_command_hash'
hash_file = ".last_command_hash"
if os.path.exists(hash_file):
with open(hash_file, 'r') as f:
with open(hash_file, "r") as f:
last_hash = f.read().strip()
return current_hash != last_hash
else:
# No previous hash = first run, should sync
return True
except Exception as e:
self.logger.warning(f"Error checking command hash: {e}")
# If we can't determine changes, err on the side of syncing
return True
async def _save_command_hash(self):
"""Save current command hash for future comparison."""
try:
@ -261,41 +281,42 @@ class SBABot(commands.Bot):
for cmd in self.tree.get_commands():
# Handle different command types properly
cmd_dict = {}
cmd_dict['name'] = cmd.name
cmd_dict['type'] = type(cmd).__name__
cmd_dict["name"] = cmd.name
cmd_dict["type"] = type(cmd).__name__
# Add description if available (most command types have this)
if hasattr(cmd, 'description'):
cmd_dict['description'] = cmd.description # type: ignore
if hasattr(cmd, "description"):
cmd_dict["description"] = cmd.description # type: ignore
# Add parameters for Command objects
if isinstance(cmd, discord.app_commands.Command):
cmd_dict['parameters'] = [
cmd_dict["parameters"] = [
{
'name': param.name,
'description': param.description,
'required': param.required,
'type': str(param.type)
} for param in cmd.parameters
"name": param.name,
"description": param.description,
"required": param.required,
"type": str(param.type),
}
for param in cmd.parameters
]
elif isinstance(cmd, discord.app_commands.Group):
# For groups, include subcommands
cmd_dict['subcommands'] = [subcmd.name for subcmd in cmd.commands]
cmd_dict["subcommands"] = [subcmd.name for subcmd in cmd.commands]
commands_data.append(cmd_dict)
commands_data.sort(key=lambda x: x['name'])
current_hash = hashlib.md5(
commands_data.sort(key=lambda x: x["name"])
current_hash = hashlib.sha256(
json.dumps(commands_data, sort_keys=True).encode()
).hexdigest()
# Save hash to file
with open('.last_command_hash', 'w') as f:
with open(".last_command_hash", "w") as f:
f.write(current_hash)
except Exception as e:
self.logger.warning(f"Error saving command hash: {e}")
async def _sync_commands(self):
"""Internal method to sync commands."""
config = get_config()
@ -303,54 +324,55 @@ class SBABot(commands.Bot):
guild = discord.Object(id=config.guild_id)
self.tree.copy_global_to(guild=guild)
synced = await self.tree.sync(guild=guild)
self.logger.info(f"Synced {len(synced)} commands to guild {config.guild_id}")
self.logger.info(
f"Synced {len(synced)} commands to guild {config.guild_id}"
)
else:
synced = await self.tree.sync()
self.logger.info(f"Synced {len(synced)} commands globally")
async def on_ready(self):
"""Called when the bot is ready."""
self.logger.info(f"Bot ready! Logged in as {self.user}")
self.logger.info(f"Connected to {len(self.guilds)} guilds")
# Set activity status
activity = discord.Activity(
type=discord.ActivityType.watching,
name=random_from_list(STARTUP_WATCHING)
type=discord.ActivityType.watching, name=random_from_list(STARTUP_WATCHING)
)
await self.change_presence(activity=activity)
async def on_error(self, event_method: str, /, *args, **kwargs):
"""Global error handler for events."""
self.logger.error(f"Error in event {event_method}", exc_info=True)
async def close(self):
"""Clean shutdown of the bot."""
self.logger.info("Bot shutting down...")
# Stop background tasks
if hasattr(self, 'custom_command_cleanup'):
if hasattr(self, "custom_command_cleanup"):
try:
self.custom_command_cleanup.cleanup_task.cancel()
self.logger.info("Custom command cleanup task stopped")
except Exception as e:
self.logger.error(f"Error stopping cleanup task: {e}")
if hasattr(self, 'transaction_freeze'):
if hasattr(self, "transaction_freeze"):
try:
self.transaction_freeze.weekly_loop.cancel()
self.logger.info("Transaction freeze/thaw task stopped")
except Exception as e:
self.logger.error(f"Error stopping transaction freeze task: {e}")
if hasattr(self, 'voice_cleanup_service'):
if hasattr(self, "voice_cleanup_service"):
try:
self.voice_cleanup_service.cog_unload()
self.logger.info("Voice channel cleanup service stopped")
except Exception as e:
self.logger.error(f"Error stopping voice cleanup service: {e}")
if hasattr(self, 'live_scorebug_tracker'):
if hasattr(self, "live_scorebug_tracker"):
try:
self.live_scorebug_tracker.update_loop.cancel()
self.logger.info("Live scorebug tracker stopped")
@ -369,15 +391,15 @@ bot = SBABot()
@bot.tree.command(name="health", description="Check bot and API health status")
async def health_command(interaction: discord.Interaction):
"""Health check command to verify bot and API connectivity."""
logger = logging.getLogger('discord_bot_v2')
logger = logging.getLogger("discord_bot_v2")
try:
# Check API connectivity
api_status = "✅ Connected"
try:
client = await get_global_client()
# Test API with a simple request
result = await client.get('current')
result = await client.get("current")
if result:
api_status = "✅ Connected"
else:
@ -385,69 +407,66 @@ async def health_command(interaction: discord.Interaction):
except Exception as e:
logger.error(f"API health check failed: {e}")
api_status = f"❌ Error: {str(e)}"
# Bot health info
guild_count = len(bot.guilds)
# Create health status embed
embed = EmbedTemplate.success(
title="🏥 Bot Health Check"
)
embed = EmbedTemplate.success(title="🏥 Bot Health Check")
embed.add_field(name="Bot Status", value="✅ Online", inline=True)
embed.add_field(name="API Status", value=api_status, inline=True)
embed.add_field(name="Guilds", value=str(guild_count), inline=True)
embed.add_field(name="Latency", value=f"{bot.latency*1000:.1f}ms", inline=True)
if bot.user:
embed.set_footer(text=f"Bot: {bot.user.name}", icon_url=bot.user.display_avatar.url)
embed.set_footer(
text=f"Bot: {bot.user.name}", icon_url=bot.user.display_avatar.url
)
await interaction.response.send_message(embed=embed, ephemeral=True)
except Exception as e:
logger.error(f"Health check command error: {e}", exc_info=True)
await interaction.response.send_message(
f"❌ Health check failed: {str(e)}",
ephemeral=True
f"❌ Health check failed: {str(e)}", ephemeral=True
)
@bot.tree.error
async def on_app_command_error(interaction: discord.Interaction, error: discord.app_commands.AppCommandError):
async def on_app_command_error(
interaction: discord.Interaction, error: discord.app_commands.AppCommandError
):
"""Global error handler for application commands."""
logger = logging.getLogger('discord_bot_v2')
logger = logging.getLogger("discord_bot_v2")
# Handle specific error types
if isinstance(error, discord.app_commands.CommandOnCooldown):
await interaction.response.send_message(
f"⏰ Command on cooldown. Try again in {error.retry_after:.1f} seconds.",
ephemeral=True
ephemeral=True,
)
elif isinstance(error, discord.app_commands.MissingPermissions):
await interaction.response.send_message(
"❌ You don't have permission to use this command.",
ephemeral=True
"❌ You don't have permission to use this command.", ephemeral=True
)
elif isinstance(error, discord.app_commands.CommandNotFound):
await interaction.response.send_message(
"❌ Command not found. Use `/help` to see available commands.",
ephemeral=True
ephemeral=True,
)
elif isinstance(error, BotException):
# Our custom exceptions - show user-friendly message
await interaction.response.send_message(
f"{str(error)}",
ephemeral=True
)
await interaction.response.send_message(f"{str(error)}", ephemeral=True)
else:
# Unexpected errors - log and show generic message
logger.error(f"Unhandled command error: {error}", exc_info=True)
message = "❌ An unexpected error occurred. Please try again."
config = get_config()
if config.is_development:
message += f"\n\nDevelopment error: {str(error)}"
if interaction.response.is_done():
await interaction.followup.send(message, ephemeral=True)
else:
@ -457,12 +476,12 @@ async def on_app_command_error(interaction: discord.Interaction, error: discord.
async def main():
"""Main entry point."""
logger = setup_logging()
config = get_config()
logger.info("Starting Discord Bot v2.0")
logger.info(f"Environment: {config.environment}")
logger.info(f"Guild ID: {config.guild_id}")
try:
await bot.start(config.bot_token)
except KeyboardInterrupt:
@ -475,4 +494,4 @@ async def main():
if __name__ == "__main__":
asyncio.run(main())
asyncio.run(main())

View File

@ -10,6 +10,7 @@ The injury rating format (#p##) encodes both games played and rating:
- First character: Games played in series (1-6)
- Remaining: Injury rating (p70, p65, p60, p50, p40, p30, p20)
"""
import math
import random
import discord
@ -40,11 +41,8 @@ class InjuryGroup(app_commands.Group):
"""Injury management command group with roll, set-new, and clear subcommands."""
def __init__(self):
super().__init__(
name="injury",
description="Injury management commands"
)
self.logger = get_contextual_logger(f'{__name__}.InjuryGroup')
super().__init__(name="injury", description="Injury management commands")
self.logger = get_contextual_logger(f"{__name__}.InjuryGroup")
self.logger.info("InjuryGroup initialized")
def has_player_role(self, interaction: discord.Interaction) -> bool:
@ -53,13 +51,17 @@ class InjuryGroup(app_commands.Group):
if not isinstance(interaction.user, discord.Member):
return False
if interaction.guild is None:
return False
player_role = discord.utils.get(
interaction.guild.roles,
name=get_config().sba_players_role_name
interaction.guild.roles, name=get_config().sba_players_role_name
)
return player_role in interaction.user.roles if player_role else False
@app_commands.command(name="roll", description="Roll for injury based on player's injury rating")
@app_commands.command(
name="roll", description="Roll for injury based on player's injury rating"
)
@app_commands.describe(player_name="Player name")
@app_commands.autocomplete(player_name=player_autocomplete)
@league_only()
@ -74,12 +76,14 @@ class InjuryGroup(app_commands.Group):
raise BotException("Failed to get current season information")
# Search for player using the search endpoint (more reliable than name param)
players = await player_service.search_players(player_name, limit=10, season=current.season)
players = await player_service.search_players(
player_name, limit=10, season=current.season
)
if not players:
embed = EmbedTemplate.error(
title="Player Not Found",
description=f"I did not find anybody named **{player_name}**."
description=f"I did not find anybody named **{player_name}**.",
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
@ -89,14 +93,17 @@ class InjuryGroup(app_commands.Group):
# Fetch full team data if team is not populated
if player.team_id and not player.team:
from services.team_service import team_service
player.team = await team_service.get_team(player.team_id)
# Check if player already has an active injury
existing_injury = await injury_service.get_active_injury(player.id, current.season)
existing_injury = await injury_service.get_active_injury(
player.id, current.season
)
if existing_injury:
embed = EmbedTemplate.error(
title="Already Injured",
description=f"Hm. It looks like {player.name} is already hurt."
description=f"Hm. It looks like {player.name} is already hurt.",
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
@ -105,7 +112,7 @@ class InjuryGroup(app_commands.Group):
if not player.injury_rating:
embed = EmbedTemplate.error(
title="No Injury Rating",
description=f"{player.name} does not have an injury rating set."
description=f"{player.name} does not have an injury rating set.",
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
@ -120,13 +127,13 @@ class InjuryGroup(app_commands.Group):
raise ValueError("Games played must be between 1 and 6")
# Validate rating format (should start with 'p')
if not injury_rating.startswith('p'):
if not injury_rating.startswith("p"):
raise ValueError("Invalid rating format")
except (ValueError, IndexError):
embed = EmbedTemplate.error(
title="Invalid Injury Rating Format",
description=f"{player.name} has an invalid injury rating: `{player.injury_rating}`\n\nExpected format: `#p##` (e.g., `1p70`, `4p50`)"
description=f"{player.name} has an invalid injury rating: `{player.injury_rating}`\n\nExpected format: `#p##` (e.g., `1p70`, `4p50`)",
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
@ -141,33 +148,25 @@ class InjuryGroup(app_commands.Group):
injury_result = self._get_injury_result(injury_rating, games_played, roll_total)
# Create response embed
embed = EmbedTemplate.warning(
title=f"Injury roll for {interaction.user.name}"
)
embed = EmbedTemplate.warning(title=f"Injury roll for {interaction.user.name}")
if player.team and player.team.thumbnail:
embed.set_thumbnail(url=player.team.thumbnail)
embed.add_field(
name="Player",
value=f"{player.name} ({player.primary_position})",
inline=True
inline=True,
)
embed.add_field(
name="Injury Rating",
value=f"{player.injury_rating}",
inline=True
name="Injury Rating", value=f"{player.injury_rating}", inline=True
)
# embed.add_field(name='', value='', inline=False) # Embed line break
# Format dice roll in markdown (same format as /ab roll)
dice_result = f"```md\n# {roll_total}\nDetails:[3d6 ({d1} {d2} {d3})]```"
embed.add_field(
name="Dice Roll",
value=dice_result,
inline=False
)
embed.add_field(name="Dice Roll", value=dice_result, inline=False)
view = None
@ -177,20 +176,20 @@ class InjuryGroup(app_commands.Group):
embed.color = discord.Color.orange()
if injury_result > 6:
gif_search_text = ['well shit', 'well fuck', 'god dammit']
gif_search_text = ["well shit", "well fuck", "god dammit"]
else:
gif_search_text = ['bummer', 'well damn']
gif_search_text = ["bummer", "well damn"]
if player.is_pitcher:
result_text += ' plus their current rest requirement'
result_text += " plus their current rest requirement"
# Pitcher callback shows modal to collect rest games
async def pitcher_confirm_callback(button_interaction: discord.Interaction):
async def pitcher_confirm_callback(
button_interaction: discord.Interaction,
):
"""Show modal to collect pitcher rest information."""
modal = PitcherRestModal(
player=player,
injury_games=injury_result,
season=current.season
player=player, injury_games=injury_result, season=current.season
)
await button_interaction.response.send_modal(modal)
@ -198,12 +197,12 @@ class InjuryGroup(app_commands.Group):
else:
# Batter callback shows modal to collect current week/game
async def batter_confirm_callback(button_interaction: discord.Interaction):
async def batter_confirm_callback(
button_interaction: discord.Interaction,
):
"""Show modal to collect current week/game information for batter injury."""
modal = BatterInjuryModal(
player=player,
injury_games=injury_result,
season=current.season
player=player, injury_games=injury_result, season=current.season
)
await button_interaction.response.send_modal(modal)
@ -213,35 +212,31 @@ class InjuryGroup(app_commands.Group):
# Only the player's team GM(s) can log the injury
view = ConfirmationView(
timeout=180.0, # 3 minutes for confirmation
responders=[player.team.gmid, player.team.gmid2] if player.team else None,
responders=(
[player.team.gmid, player.team.gmid2] if player.team else None
),
confirm_callback=injury_callback,
confirm_label="Log Injury",
cancel_label="Ignore Injury"
cancel_label="Ignore Injury",
)
elif injury_result == 'REM':
elif injury_result == "REM":
if player.is_pitcher:
result_text = '**FATIGUED**'
result_text = "**FATIGUED**"
else:
result_text = "**REMAINDER OF GAME**"
embed.color = discord.Color.gold()
gif_search_text = ['this is fine', 'not even mad', 'could be worse']
gif_search_text = ["this is fine", "not even mad", "could be worse"]
else: # 'OK'
result_text = "**No injury!**"
embed.color = discord.Color.green()
gif_search_text = ['we are so back', 'all good', 'totally fine']
gif_search_text = ["we are so back", "all good", "totally fine"]
embed.add_field(
name="Injury Length",
value=result_text,
inline=True
)
embed.add_field(name="Injury Length", value=result_text, inline=True)
try:
injury_gif = await GiphyService().get_gif(
phrase_options=gif_search_text
)
injury_gif = await GiphyService().get_gif(phrase_options=gif_search_text)
except Exception:
injury_gif = ''
injury_gif = ""
embed.set_image(url=injury_gif)
@ -251,7 +246,6 @@ class InjuryGroup(app_commands.Group):
else:
await interaction.followup.send(embed=embed)
def _get_injury_result(self, rating: str, games_played: int, roll: int):
"""
Get injury result from the injury table.
@ -266,89 +260,194 @@ class InjuryGroup(app_commands.Group):
"""
# Injury table mapping
inj_data = {
'one': {
'p70': ['OK', 'OK', 'OK', 'OK', 'OK', 'OK', 'REM', 'REM', 1, 1, 2, 2, 3, 3, 4, 4],
'p65': [2, 2, 'OK', 'REM', 1, 2, 3, 3, 4, 4, 4, 4, 5, 6, 8, 12],
'p60': ['OK', 'OK', 'REM', 1, 2, 3, 4, 4, 4, 5, 5, 6, 8, 12, 16, 16],
'p50': ['OK', 'REM', 1, 2, 3, 4, 4, 5, 5, 6, 8, 8, 12, 16, 16, 'OK'],
'p40': ['OK', 1, 2, 3, 4, 4, 5, 6, 6, 8, 8, 12, 16, 24, 'REM', 'OK'],
'p30': ['OK', 4, 1, 3, 4, 5, 6, 8, 8, 12, 16, 24, 4, 2, 'REM', 'OK'],
'p20': ['OK', 1, 2, 4, 5, 8, 8, 24, 16, 12, 12, 6, 4, 3, 'REM', 'OK']
"one": {
"p70": [
"OK",
"OK",
"OK",
"OK",
"OK",
"OK",
"REM",
"REM",
1,
1,
2,
2,
3,
3,
4,
4,
],
"p65": [2, 2, "OK", "REM", 1, 2, 3, 3, 4, 4, 4, 4, 5, 6, 8, 12],
"p60": ["OK", "OK", "REM", 1, 2, 3, 4, 4, 4, 5, 5, 6, 8, 12, 16, 16],
"p50": ["OK", "REM", 1, 2, 3, 4, 4, 5, 5, 6, 8, 8, 12, 16, 16, "OK"],
"p40": ["OK", 1, 2, 3, 4, 4, 5, 6, 6, 8, 8, 12, 16, 24, "REM", "OK"],
"p30": ["OK", 4, 1, 3, 4, 5, 6, 8, 8, 12, 16, 24, 4, 2, "REM", "OK"],
"p20": ["OK", 1, 2, 4, 5, 8, 8, 24, 16, 12, 12, 6, 4, 3, "REM", "OK"],
},
'two': {
'p70': [4, 3, 2, 2, 1, 1, 'REM', 'OK', 'REM', 'OK', 2, 1, 2, 2, 3, 4],
'p65': [8, 5, 4, 2, 2, 'OK', 1, 'OK', 'REM', 1, 'REM', 2, 3, 4, 6, 12],
'p60': [1, 3, 4, 5, 2, 2, 'OK', 1, 3, 'REM', 4, 4, 6, 8, 12, 3],
'p50': [4, 'OK', 'OK', 'REM', 1, 2, 4, 3, 4, 5, 4, 6, 8, 12, 12, 'OK'],
'p40': ['OK', 'OK', 'REM', 1, 2, 3, 4, 4, 5, 4, 6, 8, 12, 16, 16, 'OK'],
'p30': ['OK', 'REM', 1, 2, 3, 4, 4, 5, 6, 5, 8, 12, 16, 24, 'REM', 'OK'],
'p20': ['OK', 1, 4, 4, 5, 5, 6, 6, 12, 8, 16, 24, 8, 3, 2, 'REM']
"two": {
"p70": [4, 3, 2, 2, 1, 1, "REM", "OK", "REM", "OK", 2, 1, 2, 2, 3, 4],
"p65": [8, 5, 4, 2, 2, "OK", 1, "OK", "REM", 1, "REM", 2, 3, 4, 6, 12],
"p60": [1, 3, 4, 5, 2, 2, "OK", 1, 3, "REM", 4, 4, 6, 8, 12, 3],
"p50": [4, "OK", "OK", "REM", 1, 2, 4, 3, 4, 5, 4, 6, 8, 12, 12, "OK"],
"p40": ["OK", "OK", "REM", 1, 2, 3, 4, 4, 5, 4, 6, 8, 12, 16, 16, "OK"],
"p30": [
"OK",
"REM",
1,
2,
3,
4,
4,
5,
6,
5,
8,
12,
16,
24,
"REM",
"OK",
],
"p20": ["OK", 1, 4, 4, 5, 5, 6, 6, 12, 8, 16, 24, 8, 3, 2, "REM"],
},
'three': {
'p70': [],
'p65': ['OK', 'OK', 'REM', 1, 3, 'OK', 'REM', 1, 2, 1, 2, 3, 4, 4, 5, 'REM'],
'p60': ['OK', 5, 'OK', 'REM', 1, 2, 2, 3, 4, 4, 1, 3, 5, 6, 8, 'REM'],
'p50': ['OK', 'OK', 'REM', 1, 2, 3, 4, 4, 5, 4, 4, 6, 8, 8, 12, 'REM'],
'p40': ['OK', 1, 1, 2, 3, 4, 4, 5, 6, 5, 6, 8, 8, 12, 4, 'REM'],
'p30': ['OK', 1, 2, 3, 4, 5, 4, 6, 5, 6, 8, 8, 12, 16, 1, 'REM'],
'p20': ['OK', 1, 2, 4, 4, 8, 8, 6, 5, 12, 6, 16, 24, 3, 4, 'REM']
"three": {
"p70": [],
"p65": [
"OK",
"OK",
"REM",
1,
3,
"OK",
"REM",
1,
2,
1,
2,
3,
4,
4,
5,
"REM",
],
"p60": ["OK", 5, "OK", "REM", 1, 2, 2, 3, 4, 4, 1, 3, 5, 6, 8, "REM"],
"p50": ["OK", "OK", "REM", 1, 2, 3, 4, 4, 5, 4, 4, 6, 8, 8, 12, "REM"],
"p40": ["OK", 1, 1, 2, 3, 4, 4, 5, 6, 5, 6, 8, 8, 12, 4, "REM"],
"p30": ["OK", 1, 2, 3, 4, 5, 4, 6, 5, 6, 8, 8, 12, 16, 1, "REM"],
"p20": ["OK", 1, 2, 4, 4, 8, 8, 6, 5, 12, 6, 16, 24, 3, 4, "REM"],
},
'four': {
'p70': [],
'p65': [],
'p60': ['OK', 'OK', 'REM', 3, 3, 'OK', 'REM', 1, 2, 1, 4, 4, 5, 6, 8, 'REM'],
'p50': ['OK', 6, 4, 'OK', 'REM', 1, 2, 4, 4, 3, 5, 3, 6, 8, 12, 'REM'],
'p40': ['OK', 'OK', 'REM', 1, 2, 3, 4, 4, 5, 4, 4, 6, 8, 8, 12, 'REM'],
'p30': ['OK', 1, 1, 2, 3, 4, 4, 5, 6, 5, 6, 8, 8, 12, 4, 'REM'],
'p20': ['OK', 1, 2, 3, 4, 5, 4, 6, 5, 6, 12, 8, 8, 16, 1, 'REM']
"four": {
"p70": [],
"p65": [],
"p60": [
"OK",
"OK",
"REM",
3,
3,
"OK",
"REM",
1,
2,
1,
4,
4,
5,
6,
8,
"REM",
],
"p50": ["OK", 6, 4, "OK", "REM", 1, 2, 4, 4, 3, 5, 3, 6, 8, 12, "REM"],
"p40": ["OK", "OK", "REM", 1, 2, 3, 4, 4, 5, 4, 4, 6, 8, 8, 12, "REM"],
"p30": ["OK", 1, 1, 2, 3, 4, 4, 5, 6, 5, 6, 8, 8, 12, 4, "REM"],
"p20": ["OK", 1, 2, 3, 4, 5, 4, 6, 5, 6, 12, 8, 8, 16, 1, "REM"],
},
'five': {
'p70': [],
'p65': [],
'p60': ['OK', 'REM', 'REM', 'REM', 3, 'OK', 1, 'REM', 2, 1, 'OK', 4, 5, 2, 6, 8],
'p50': ['OK', 'OK', 'REM', 1, 1, 'OK', 'REM', 3, 2, 4, 4, 5, 5, 6, 8, 12],
'p40': ['OK', 6, 6, 'OK', 1, 3, 2, 4, 4, 5, 'REM', 3, 8, 6, 12, 1],
'p30': ['OK', 'OK', 'REM', 4, 1, 2, 5, 4, 6, 3, 4, 8, 5, 6, 12, 'REM'],
'p20': ['OK', 'REM', 2, 3, 4, 4, 5, 4, 6, 5, 8, 6, 8, 1, 12, 'REM']
"five": {
"p70": [],
"p65": [],
"p60": [
"OK",
"REM",
"REM",
"REM",
3,
"OK",
1,
"REM",
2,
1,
"OK",
4,
5,
2,
6,
8,
],
"p50": [
"OK",
"OK",
"REM",
1,
1,
"OK",
"REM",
3,
2,
4,
4,
5,
5,
6,
8,
12,
],
"p40": ["OK", 6, 6, "OK", 1, 3, 2, 4, 4, 5, "REM", 3, 8, 6, 12, 1],
"p30": ["OK", "OK", "REM", 4, 1, 2, 5, 4, 6, 3, 4, 8, 5, 6, 12, "REM"],
"p20": ["OK", "REM", 2, 3, 4, 4, 5, 4, 6, 5, 8, 6, 8, 1, 12, "REM"],
},
"six": {
"p70": [],
"p65": [],
"p60": [],
"p50": [],
"p40": ["OK", 6, 6, "OK", 1, 3, 2, 4, 4, 5, "REM", 3, 8, 6, 1, 12],
"p30": ["OK", "OK", "REM", 5, 1, 3, 6, 4, 5, 2, 4, 8, 3, 5, 12, "REM"],
"p20": ["OK", "REM", 4, 6, 2, 3, 6, 4, 8, 5, 5, 6, 3, 1, 12, "REM"],
},
'six': {
'p70': [],
'p65': [],
'p60': [],
'p50': [],
'p40': ['OK', 6, 6, 'OK', 1, 3, 2, 4, 4, 5, 'REM', 3, 8, 6, 1, 12],
'p30': ['OK', 'OK', 'REM', 5, 1, 3, 6, 4, 5, 2, 4, 8, 3, 5, 12, 'REM'],
'p20': ['OK', 'REM', 4, 6, 2, 3, 6, 4, 8, 5, 5, 6, 3, 1, 12, 'REM']
}
}
# Map games_played to key
games_map = {1: 'one', 2: 'two', 3: 'three', 4: 'four', 5: 'five', 6: 'six'}
games_map = {1: "one", 2: "two", 3: "three", 4: "four", 5: "five", 6: "six"}
games_key = games_map.get(games_played)
if not games_key:
return 'OK'
return "OK"
# Get the injury table for this rating and games played
injury_table = inj_data.get(games_key, {}).get(rating, [])
# If no table exists (e.g., p70 with 3+ games), no injury
if not injury_table:
return 'OK'
return "OK"
# Get result from table (roll 3-18 maps to index 0-15)
table_index = roll - 3
if 0 <= table_index < len(injury_table):
return injury_table[table_index]
return 'OK'
return "OK"
@app_commands.command(name="set-new", description="Set a new injury for a player (requires SBA Players role)")
@app_commands.command(
name="set-new",
description="Set a new injury for a player (requires SBA Players role)",
)
@app_commands.describe(
player_name="Player name to injure",
this_week="Current week number",
this_game="Current game number (1-4)",
injury_games="Number of games player will be out"
injury_games="Number of games player will be out",
)
@league_only()
@logged_command("/injury set-new")
@ -358,14 +457,14 @@ class InjuryGroup(app_commands.Group):
player_name: str,
this_week: int,
this_game: int,
injury_games: int
injury_games: int,
):
"""Set a new injury for a player on your team."""
# Check role permissions
if not self.has_player_role(interaction):
embed = EmbedTemplate.error(
title="Permission Denied",
description=f"This command requires the **{get_config().sba_players_role_name}** role."
description=f"This command requires the **{get_config().sba_players_role_name}** role.",
)
await interaction.response.send_message(embed=embed, ephemeral=True)
return
@ -376,7 +475,7 @@ class InjuryGroup(app_commands.Group):
if this_game < 1 or this_game > 4:
embed = EmbedTemplate.error(
title="Invalid Input",
description="Game number must be between 1 and 4."
description="Game number must be between 1 and 4.",
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
@ -384,7 +483,7 @@ class InjuryGroup(app_commands.Group):
if injury_games < 1:
embed = EmbedTemplate.error(
title="Invalid Input",
description="Injury duration must be at least 1 game."
description="Injury duration must be at least 1 game.",
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
@ -395,12 +494,14 @@ class InjuryGroup(app_commands.Group):
raise BotException("Failed to get current season information")
# Search for player using the search endpoint (more reliable than name param)
players = await player_service.search_players(player_name, limit=10, season=current.season)
players = await player_service.search_players(
player_name, limit=10, season=current.season
)
if not players:
embed = EmbedTemplate.error(
title="Player Not Found",
description=f"I did not find anybody named **{player_name}**."
description=f"I did not find anybody named **{player_name}**.",
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
@ -410,6 +511,7 @@ class InjuryGroup(app_commands.Group):
# Fetch full team data if team is not populated
if player.team_id and not player.team:
from services.team_service import team_service
player.team = await team_service.get_team(player.team_id)
# Check if player is on user's team
@ -418,7 +520,9 @@ class InjuryGroup(app_commands.Group):
# TODO: Add team ownership verification
# Check if player already has an active injury
existing_injury = await injury_service.get_active_injury(player.id, current.season)
existing_injury = await injury_service.get_active_injury(
player.id, current.season
)
# Data consistency check: If injury exists but il_return is None, it's stale data
if existing_injury:
@ -431,12 +535,14 @@ class InjuryGroup(app_commands.Group):
await injury_service.clear_injury(existing_injury.id)
# Notify user but allow them to proceed
self.logger.info(f"Cleared stale injury {existing_injury.id} for player {player.id}")
self.logger.info(
f"Cleared stale injury {existing_injury.id} for player {player.id}"
)
else:
# Valid active injury - player is actually injured
embed = EmbedTemplate.error(
title="Already Injured",
description=f"Hm. It looks like {player.name} is already hurt (returns {player.il_return})."
description=f"Hm. It looks like {player.name} is already hurt (returns {player.il_return}).",
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
@ -456,7 +562,7 @@ class InjuryGroup(app_commands.Group):
start_week = this_week if this_game != 4 else this_week + 1
start_game = this_game + 1 if this_game != 4 else 1
return_date = f'w{return_week:02d}g{return_game}'
return_date = f"w{return_week:02d}g{return_game}"
# Create injury record
injury = await injury_service.create_injury(
@ -466,49 +572,43 @@ class InjuryGroup(app_commands.Group):
start_week=start_week,
start_game=start_game,
end_week=return_week,
end_game=return_game
end_game=return_game,
)
if not injury:
embed = EmbedTemplate.error(
title="Error",
description="Well that didn't work. Failed to create injury record."
description="Well that didn't work. Failed to create injury record.",
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
# Update player's il_return field
await player_service.update_player(player.id, {'il_return': return_date})
await player_service.update_player(player.id, {"il_return": return_date})
# Success response
embed = EmbedTemplate.success(
title="Injury Recorded",
description=f"{player.name}'s injury has been logged"
description=f"{player.name}'s injury has been logged",
)
embed.add_field(
name="Player",
value=f"{player.name} ({player.pos_1})",
inline=True
name="Player", value=f"{player.name} ({player.pos_1})", inline=True
)
embed.add_field(
name="Duration",
value=f"{injury_games} game{'s' if injury_games > 1 else ''}",
inline=True
inline=True,
)
embed.add_field(
name="Return Date",
value=return_date,
inline=True
)
embed.add_field(name="Return Date", value=return_date, inline=True)
if player.team:
embed.add_field(
name="Team",
value=f"{player.team.lname} ({player.team.abbrev})",
inline=False
inline=False,
)
await interaction.followup.send(embed=embed)
@ -518,10 +618,12 @@ class InjuryGroup(app_commands.Group):
f"Injury set for {player.name}: {injury_games} games, returns {return_date}",
player_id=player.id,
season=current.season,
injury_id=injury.id
injury_id=injury.id,
)
def _calc_injury_dates(self, start_week: int, start_game: int, injury_games: int) -> dict:
def _calc_injury_dates(
self, start_week: int, start_game: int, injury_games: int
) -> dict:
"""
Calculate injury dates from start week/game and injury duration.
@ -549,15 +651,16 @@ class InjuryGroup(app_commands.Group):
actual_start_game = start_game + 1 if start_game != 4 else 1
return {
'total_games': injury_games,
'start_week': actual_start_week,
'start_game': actual_start_game,
'end_week': return_week,
'end_game': return_game
"total_games": injury_games,
"start_week": actual_start_week,
"start_game": actual_start_game,
"end_week": return_week,
"end_game": return_game,
}
@app_commands.command(name="clear", description="Clear a player's injury (requires SBA Players role)")
@app_commands.command(
name="clear", description="Clear a player's injury (requires SBA Players role)"
)
@app_commands.describe(player_name="Player name to clear injury")
@app_commands.autocomplete(player_name=player_autocomplete)
@league_only()
@ -568,7 +671,7 @@ class InjuryGroup(app_commands.Group):
if not self.has_player_role(interaction):
embed = EmbedTemplate.error(
title="Permission Denied",
description=f"This command requires the **{get_config().sba_players_role_name}** role."
description=f"This command requires the **{get_config().sba_players_role_name}** role.",
)
await interaction.response.send_message(embed=embed, ephemeral=True)
return
@ -581,12 +684,14 @@ class InjuryGroup(app_commands.Group):
raise BotException("Failed to get current season information")
# Search for player using the search endpoint (more reliable than name param)
players = await player_service.search_players(player_name, limit=10, season=current.season)
players = await player_service.search_players(
player_name, limit=10, season=current.season
)
if not players:
embed = EmbedTemplate.error(
title="Player Not Found",
description=f"I did not find anybody named **{player_name}**."
description=f"I did not find anybody named **{player_name}**.",
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
@ -596,6 +701,7 @@ class InjuryGroup(app_commands.Group):
# Fetch full team data if team is not populated
if player.team_id and not player.team:
from services.team_service import team_service
player.team = await team_service.get_team(player.team_id)
# Get active injury
@ -603,8 +709,7 @@ class InjuryGroup(app_commands.Group):
if not injury:
embed = EmbedTemplate.error(
title="No Active Injury",
description=f"{player.name} isn't injured."
title="No Active Injury", description=f"{player.name} isn't injured."
)
await interaction.followup.send(embed=embed, ephemeral=True)
return
@ -612,7 +717,7 @@ class InjuryGroup(app_commands.Group):
# Create confirmation embed
embed = EmbedTemplate.info(
title=f"{player.name}",
description=f"Is **{player.name}** cleared to return?"
description=f"Is **{player.name}** cleared to return?",
)
if player.team and player.team.thumbnail is not None:
@ -621,33 +726,27 @@ class InjuryGroup(app_commands.Group):
embed.add_field(
name="Player",
value=f"{player.name} ({player.primary_position})",
inline=True
inline=True,
)
if player.team:
embed.add_field(
name="Team",
value=f"{player.team.lname} ({player.team.abbrev})",
inline=True
inline=True,
)
embed.add_field(
name="Expected Return",
value=injury.return_date,
inline=True
)
embed.add_field(name="Expected Return", value=injury.return_date, inline=True)
embed.add_field(
name="Games Missed",
value=injury.duration_display,
inline=True
)
embed.add_field(name="Games Missed", value=injury.duration_display, inline=True)
# Initialize responder_team to None for major league teams
if player.team.roster_type() == RosterType.MAJOR_LEAGUE:
responder_team = player.team
else:
responder_team = await team_utils.get_user_major_league_team(interaction.user.id)
responder_team = await team_utils.get_user_major_league_team(
interaction.user.id
)
# Create callback for confirmation
async def clear_confirm_callback(button_interaction: discord.Interaction):
@ -658,37 +757,33 @@ class InjuryGroup(app_commands.Group):
if not success:
error_embed = EmbedTemplate.error(
title="Error",
description="Failed to clear the injury. Please try again."
description="Failed to clear the injury. Please try again.",
)
await button_interaction.response.send_message(
embed=error_embed, ephemeral=True
)
await button_interaction.response.send_message(embed=error_embed, ephemeral=True)
return
# Clear player's il_return field
await player_service.update_player(player.id, {'il_return': ''})
await player_service.update_player(player.id, {"il_return": ""})
# Success response
success_embed = EmbedTemplate.success(
title="Injury Cleared",
description=f"{player.name} has been cleared and is eligible to play again."
description=f"{player.name} has been cleared and is eligible to play again.",
)
success_embed.add_field(
name="Injury Return Date",
value=injury.return_date,
inline=True
name="Injury Return Date", value=injury.return_date, inline=True
)
success_embed.add_field(
name="Total Games Missed",
value=injury.duration_display,
inline=True
name="Total Games Missed", value=injury.duration_display, inline=True
)
if player.team:
success_embed.add_field(
name="Team",
value=f"{player.team.lname}",
inline=False
name="Team", value=f"{player.team.lname}", inline=False
)
if player.team.thumbnail is not None:
success_embed.set_thumbnail(url=player.team.thumbnail)
@ -700,17 +795,19 @@ class InjuryGroup(app_commands.Group):
f"Injury cleared for {player.name}",
player_id=player.id,
season=current.season,
injury_id=injury.id
injury_id=injury.id,
)
# Create confirmation view
view = ConfirmationView(
user_id=interaction.user.id,
timeout=180.0, # 3 minutes for confirmation
responders=[responder_team.gmid, responder_team.gmid2] if responder_team else None,
responders=(
[responder_team.gmid, responder_team.gmid2] if responder_team else None
),
confirm_callback=clear_confirm_callback,
confirm_label="Clear Injury",
cancel_label="Cancel"
cancel_label="Cancel",
)
# Send confirmation embed with view

View File

@ -175,14 +175,14 @@ class SubmitScorecardCommands(commands.Cog):
# Delete old data
try:
await play_service.delete_plays_for_game(duplicate_game.id)
except:
except Exception:
pass # May not exist
try:
await decision_service.delete_decisions_for_game(
duplicate_game.id
)
except:
except Exception:
pass # May not exist
await game_service.wipe_game_data(duplicate_game.id)
@ -354,7 +354,7 @@ class SubmitScorecardCommands(commands.Cog):
try:
await standings_service.recalculate_standings(current.season)
except:
except Exception:
# Non-critical error
self.logger.error("Failed to recalculate standings")
@ -372,11 +372,11 @@ class SubmitScorecardCommands(commands.Cog):
await play_service.delete_plays_for_game(game_id)
elif rollback_state == "PLAYS_POSTED":
await play_service.delete_plays_for_game(game_id)
except:
except Exception:
pass # Best effort rollback
await interaction.edit_original_response(
content=f"❌ An unexpected error occurred: {str(e)}"
content="❌ An unexpected error occurred. Please try again or contact an admin."
)
def _match_manager(self, team: Team, manager_name: str):

View File

@ -1,6 +1,7 @@
"""
Configuration management for Discord Bot v2.0
"""
import os
from typing import Optional
@ -40,17 +41,18 @@ class BotConfig(BaseSettings):
playoff_round_two_games: int = 7
playoff_round_three_games: int = 7
modern_stats_start_season: int = 8
offseason_flag: bool = False # When True, relaxes roster limits and disables weekly freeze/thaw
offseason_flag: bool = (
False # When True, relaxes roster limits and disables weekly freeze/thaw
)
# Roster Limits
expand_mil_week: int = 15 # Week when MiL roster expands (early vs late limits)
ml_roster_limit_early: int = 26 # ML limit for weeks before expand_mil_week
ml_roster_limit_late: int = 26 # ML limit for weeks >= expand_mil_week
mil_roster_limit_early: int = 6 # MiL limit for weeks before expand_mil_week
mil_roster_limit_late: int = 14 # MiL limit for weeks >= expand_mil_week
expand_mil_week: int = 15 # Week when MiL roster expands (early vs late limits)
ml_roster_limit_early: int = 26 # ML limit for weeks before expand_mil_week
ml_roster_limit_late: int = 26 # ML limit for weeks >= expand_mil_week
mil_roster_limit_early: int = 6 # MiL limit for weeks before expand_mil_week
mil_roster_limit_late: int = 14 # MiL limit for weeks >= expand_mil_week
ml_roster_limit_offseason: int = 69 # ML limit during offseason
mil_roster_limit_offseason: int = 69 # MiL limit during offseason
mil_roster_limit_offseason: int = 69 # MiL limit during offseason
# API Constants
api_version: str = "v3"
@ -60,10 +62,10 @@ class BotConfig(BaseSettings):
# Draft Constants
default_pick_minutes: int = 10
draft_rounds: int = 32
draft_team_count: int = 16 # Number of teams in draft
draft_linear_rounds: int = 10 # Rounds 1-10 are linear, 11+ are snake
swar_cap_limit: float = 32.00 # Maximum sWAR cap for team roster
cap_player_count: int = 26 # Number of players that count toward cap
draft_team_count: int = 16 # Number of teams in draft
draft_linear_rounds: int = 10 # Rounds 1-10 are linear, 11+ are snake
swar_cap_limit: float = 32.00 # Maximum sWAR cap for team roster
cap_player_count: int = 26 # Number of players that count toward cap
# Special Team IDs
free_agent_team_id: int = 547
@ -80,7 +82,7 @@ class BotConfig(BaseSettings):
# Base URLs
sba_base_url: str = "https://sba.manticorum.com"
sba_logo_url: str = f'{sba_base_url}/images/sba-logo.png'
sba_logo_url: str = f"{sba_base_url}/images/sba-logo.png"
# Application settings
log_level: str = "INFO"
@ -92,29 +94,33 @@ class BotConfig(BaseSettings):
# Draft Sheet settings (for writing picks to Google Sheets)
# Sheet IDs can be overridden via environment variables: DRAFT_SHEET_KEY_12, DRAFT_SHEET_KEY_13, etc.
draft_sheet_enabled: bool = True # Feature flag - set DRAFT_SHEET_ENABLED=false to disable
draft_sheet_enabled: bool = (
True # Feature flag - set DRAFT_SHEET_ENABLED=false to disable
)
draft_sheet_worksheet: str = "Ordered List" # Worksheet name to write picks to
draft_sheet_start_column: str = "D" # Column where pick data starts (D, E, F, G for 4 columns)
draft_sheet_start_column: str = (
"D" # Column where pick data starts (D, E, F, G for 4 columns)
)
# Giphy API settings
giphy_api_key: str = "H86xibttEuUcslgmMM6uu74IgLEZ7UOD"
giphy_api_key: str = ""
giphy_translate_url: str = "https://api.giphy.com/v1/gifs/translate"
# Optional Redis caching settings
redis_url: str = "" # Empty string means no Redis caching
redis_cache_ttl: int = 300 # 5 minutes default TTL
model_config = SettingsConfigDict(
env_file=".env",
case_sensitive=False,
extra="ignore" # Ignore extra environment variables
extra="ignore", # Ignore extra environment variables
)
@property
def is_development(self) -> bool:
"""Check if running in development mode."""
return self.environment.lower() == "development"
@property
def is_testing(self) -> bool:
"""Check if running in test mode."""
@ -139,7 +145,7 @@ class BotConfig(BaseSettings):
# Default sheet IDs (hardcoded as fallback)
default_keys = {
12: "1OF-sAFykebc_2BrcYCgxCR-4rJo0GaNmTstagV-PMBU",
13: "1vWJfvuz9jN5BU2ZR0X0oC9BAVr_R8o-dWZsF2KXQMsE"
13: "1vWJfvuz9jN5BU2ZR0X0oC9BAVr_R8o-dWZsF2KXQMsE",
}
# Check environment variable first (allows runtime override)
@ -165,9 +171,10 @@ class BotConfig(BaseSettings):
# Global configuration instance - lazily initialized to avoid import-time errors
_config = None
def get_config() -> BotConfig:
"""Get the global configuration instance."""
global _config
if _config is None:
_config = BotConfig() # type: ignore
return _config
return _config

View File

@ -4,93 +4,88 @@ Giphy Service for Discord Bot v2.0
Provides async interface to Giphy API with disappointment-based search phrases.
Used for Easter egg features like the soak command.
"""
import random
from typing import List, Optional
from urllib.parse import quote
import aiohttp
from utils.logging import get_contextual_logger
from config import get_config
from exceptions import APIException
# Disappointment tier configuration
DISAPPOINTMENT_TIERS = {
'tier_1': {
'max_seconds': 1800, # 30 minutes
'phrases': [
"tier_1": {
"max_seconds": 1800, # 30 minutes
"phrases": [
"extremely disappointed",
"so disappointed",
"are you kidding me",
"seriously",
"unbelievable"
"unbelievable",
],
'description': "Maximum Disappointment"
"description": "Maximum Disappointment",
},
'tier_2': {
'max_seconds': 7200, # 2 hours
'phrases': [
"tier_2": {
"max_seconds": 7200, # 2 hours
"phrases": [
"very disappointed",
"can't believe you",
"not happy",
"shame on you",
"facepalm"
"facepalm",
],
'description': "Severe Disappointment"
"description": "Severe Disappointment",
},
'tier_3': {
'max_seconds': 21600, # 6 hours
'phrases': [
"tier_3": {
"max_seconds": 21600, # 6 hours
"phrases": [
"disappointed",
"not impressed",
"shaking head",
"eye roll",
"really"
"really",
],
'description': "Strong Disappointment"
"description": "Strong Disappointment",
},
'tier_4': {
'max_seconds': 86400, # 24 hours
'phrases': [
"tier_4": {
"max_seconds": 86400, # 24 hours
"phrases": [
"mildly disappointed",
"not great",
"could be better",
"sigh",
"seriously"
"seriously",
],
'description': "Moderate Disappointment"
"description": "Moderate Disappointment",
},
'tier_5': {
'max_seconds': 604800, # 7 days
'phrases': [
"slightly disappointed",
"oh well",
"shrug",
"meh",
"not bad"
],
'description': "Mild Disappointment"
"tier_5": {
"max_seconds": 604800, # 7 days
"phrases": ["slightly disappointed", "oh well", "shrug", "meh", "not bad"],
"description": "Mild Disappointment",
},
'tier_6': {
'max_seconds': float('inf'), # 7+ days
'phrases': [
"tier_6": {
"max_seconds": float("inf"), # 7+ days
"phrases": [
"not disappointed",
"relieved",
"proud",
"been worse",
"fine i guess"
"fine i guess",
],
'description': "Minimal Disappointment"
"description": "Minimal Disappointment",
},
'first_ever': {
'phrases': [
"first_ever": {
"phrases": [
"here we go",
"oh boy",
"uh oh",
"getting started",
"and so it begins"
"and so it begins",
],
'description': "The Beginning"
}
"description": "The Beginning",
},
}
@ -102,7 +97,7 @@ class GiphyService:
self.config = get_config()
self.api_key = self.config.giphy_api_key
self.translate_url = self.config.giphy_translate_url
self.logger = get_contextual_logger(f'{__name__}.GiphyService')
self.logger = get_contextual_logger(f"{__name__}.GiphyService")
def get_tier_for_seconds(self, seconds_elapsed: Optional[int]) -> str:
"""
@ -115,13 +110,13 @@ class GiphyService:
Tier key string (e.g., 'tier_1', 'first_ever')
"""
if seconds_elapsed is None:
return 'first_ever'
return "first_ever"
for tier_key in ['tier_1', 'tier_2', 'tier_3', 'tier_4', 'tier_5', 'tier_6']:
if seconds_elapsed <= DISAPPOINTMENT_TIERS[tier_key]['max_seconds']:
for tier_key in ["tier_1", "tier_2", "tier_3", "tier_4", "tier_5", "tier_6"]:
if seconds_elapsed <= DISAPPOINTMENT_TIERS[tier_key]["max_seconds"]:
return tier_key
return 'tier_6' # Fallback to lowest disappointment
return "tier_6" # Fallback to lowest disappointment
def get_random_phrase_for_tier(self, tier_key: str) -> str:
"""
@ -139,7 +134,7 @@ class GiphyService:
if tier_key not in DISAPPOINTMENT_TIERS:
raise ValueError(f"Invalid tier key: {tier_key}")
phrases = DISAPPOINTMENT_TIERS[tier_key]['phrases']
phrases = DISAPPOINTMENT_TIERS[tier_key]["phrases"]
return random.choice(phrases)
def get_tier_description(self, tier_key: str) -> str:
@ -158,7 +153,7 @@ class GiphyService:
if tier_key not in DISAPPOINTMENT_TIERS:
raise ValueError(f"Invalid tier key: {tier_key}")
return DISAPPOINTMENT_TIERS[tier_key]['description']
return DISAPPOINTMENT_TIERS[tier_key]["description"]
async def get_disappointment_gif(self, tier_key: str) -> str:
"""
@ -181,7 +176,7 @@ class GiphyService:
if tier_key not in DISAPPOINTMENT_TIERS:
raise ValueError(f"Invalid tier key: {tier_key}")
phrases = DISAPPOINTMENT_TIERS[tier_key]['phrases']
phrases = DISAPPOINTMENT_TIERS[tier_key]["phrases"]
# Shuffle phrases for variety and retry capability
shuffled_phrases = random.sample(phrases, len(phrases))
@ -189,39 +184,61 @@ class GiphyService:
async with aiohttp.ClientSession() as session:
for phrase in shuffled_phrases:
try:
url = f"{self.translate_url}?s={phrase}&api_key={self.api_key}"
url = f"{self.translate_url}?s={quote(phrase)}&api_key={quote(self.api_key)}"
async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp:
async with session.get(
url, timeout=aiohttp.ClientTimeout(total=5)
) as resp:
if resp.status == 200:
data = await resp.json()
# Filter out Trump GIFs (legacy behavior)
gif_title = data.get('data', {}).get('title', '').lower()
if 'trump' in gif_title:
self.logger.debug(f"Filtered out Trump GIF for phrase: {phrase}")
gif_title = data.get("data", {}).get("title", "").lower()
if "trump" in gif_title:
self.logger.debug(
f"Filtered out Trump GIF for phrase: {phrase}"
)
continue
# Get the actual GIF image URL, not the web page URL
gif_url = data.get('data', {}).get('images', {}).get('original', {}).get('url')
gif_url = (
data.get("data", {})
.get("images", {})
.get("original", {})
.get("url")
)
if gif_url:
self.logger.info(f"Successfully fetched GIF for phrase: {phrase}", gif_url=gif_url)
self.logger.info(
f"Successfully fetched GIF for phrase: {phrase}",
gif_url=gif_url,
)
return gif_url
else:
self.logger.warning(f"No GIF URL in response for phrase: {phrase}")
self.logger.warning(
f"No GIF URL in response for phrase: {phrase}"
)
else:
self.logger.warning(f"Giphy API returned status {resp.status} for phrase: {phrase}")
self.logger.warning(
f"Giphy API returned status {resp.status} for phrase: {phrase}"
)
except aiohttp.ClientError as e:
self.logger.error(f"HTTP error fetching GIF for phrase '{phrase}': {e}")
self.logger.error(
f"HTTP error fetching GIF for phrase '{phrase}': {e}"
)
except Exception as e:
self.logger.error(f"Unexpected error fetching GIF for phrase '{phrase}': {e}")
self.logger.error(
f"Unexpected error fetching GIF for phrase '{phrase}': {e}"
)
# All phrases failed
error_msg = f"Failed to fetch any GIF for tier: {tier_key}"
self.logger.error(error_msg)
raise APIException(error_msg)
async def get_gif(self, phrase: Optional[str] = None, phrase_options: Optional[List[str]] = None) -> str:
async def get_gif(
self, phrase: Optional[str] = None, phrase_options: Optional[List[str]] = None
) -> str:
"""
Fetch a GIF from Giphy based on a phrase or list of phrase options.
@ -237,9 +254,11 @@ class GiphyService:
APIException: If all GIF fetch attempts fail
"""
if phrase is None and phrase_options is None:
raise ValueError('To get a gif, one of `phrase` or `phrase_options` must be provided')
raise ValueError(
"To get a gif, one of `phrase` or `phrase_options` must be provided"
)
search_phrase = 'send help'
search_phrase = "send help"
if phrase is not None:
search_phrase = phrase
elif phrase_options is not None:
@ -250,33 +269,53 @@ class GiphyService:
while attempts < 3:
attempts += 1
try:
url = f"{self.translate_url}?s={search_phrase}&api_key={self.api_key}"
url = f"{self.translate_url}?s={quote(search_phrase)}&api_key={quote(self.api_key)}"
async with session.get(url, timeout=aiohttp.ClientTimeout(total=3)) as resp:
async with session.get(
url, timeout=aiohttp.ClientTimeout(total=3)
) as resp:
if resp.status != 200:
self.logger.warning(f"Giphy API returned status {resp.status} for phrase: {search_phrase}")
self.logger.warning(
f"Giphy API returned status {resp.status} for phrase: {search_phrase}"
)
continue
data = await resp.json()
# Filter out Trump GIFs (legacy behavior)
gif_title = data.get('data', {}).get('title', '').lower()
if 'trump' in gif_title:
self.logger.debug(f"Filtered out Trump GIF for phrase: {search_phrase}")
gif_title = data.get("data", {}).get("title", "").lower()
if "trump" in gif_title:
self.logger.debug(
f"Filtered out Trump GIF for phrase: {search_phrase}"
)
continue
# Get the actual GIF image URL, not the web page URL
gif_url = data.get('data', {}).get('images', {}).get('original', {}).get('url')
gif_url = (
data.get("data", {})
.get("images", {})
.get("original", {})
.get("url")
)
if gif_url:
self.logger.info(f"Successfully fetched GIF for phrase: {search_phrase}", gif_url=gif_url)
self.logger.info(
f"Successfully fetched GIF for phrase: {search_phrase}",
gif_url=gif_url,
)
return gif_url
else:
self.logger.warning(f"No GIF URL in response for phrase: {search_phrase}")
self.logger.warning(
f"No GIF URL in response for phrase: {search_phrase}"
)
except aiohttp.ClientError as e:
self.logger.error(f"HTTP error fetching GIF for phrase '{search_phrase}': {e}")
self.logger.error(
f"HTTP error fetching GIF for phrase '{search_phrase}': {e}"
)
except Exception as e:
self.logger.error(f"Unexpected error fetching GIF for phrase '{search_phrase}': {e}")
self.logger.error(
f"Unexpected error fetching GIF for phrase '{search_phrase}': {e}"
)
# All attempts failed
error_msg = f"Failed to fetch any GIF for phrase: {search_phrase}"

View File

@ -4,6 +4,7 @@ Transaction Freeze/Thaw Task for Discord Bot v2.0
Automated weekly system for freezing and processing transactions.
Runs on a schedule to increment weeks and process contested transactions.
"""
import asyncio
import random
from datetime import datetime, UTC
@ -30,6 +31,7 @@ class TransactionPriority:
Data class for transaction priority calculation.
Used to resolve contested transactions (multiple teams wanting same player).
"""
transaction: Transaction
team_win_percentage: float
tiebreaker: float # win% + small random number for randomized tiebreak
@ -42,6 +44,7 @@ class TransactionPriority:
@dataclass
class ConflictContender:
"""A team contending for a contested player."""
team_abbrev: str
wins: int
losses: int
@ -52,6 +55,7 @@ class ConflictContender:
@dataclass
class ConflictResolution:
"""Details of a conflict resolution for a contested player."""
player_name: str
player_swar: float
contenders: List[ConflictContender]
@ -62,6 +66,7 @@ class ConflictResolution:
@dataclass
class ThawedMove:
"""A move that was successfully thawed (unfrozen)."""
move_id: str
team_abbrev: str
players: List[Tuple[str, float, str, str]] # (name, sWAR, old_team, new_team)
@ -71,6 +76,7 @@ class ThawedMove:
@dataclass
class CancelledMove:
"""A move that was cancelled due to conflict."""
move_id: str
team_abbrev: str
players: List[Tuple[str, float, str, str]] # (name, sWAR, old_team, new_team)
@ -81,6 +87,7 @@ class CancelledMove:
@dataclass
class ThawReport:
"""Complete thaw report for admin review."""
week: int
season: int
timestamp: datetime
@ -94,8 +101,7 @@ class ThawReport:
async def resolve_contested_transactions(
transactions: List[Transaction],
season: int
transactions: List[Transaction], season: int
) -> Tuple[List[str], List[str], List[ConflictResolution]]:
"""
Resolve contested transactions where multiple teams want the same player.
@ -109,7 +115,7 @@ async def resolve_contested_transactions(
Returns:
Tuple of (winning_move_ids, losing_move_ids, conflict_resolutions)
"""
logger = get_contextual_logger(f'{__name__}.resolve_contested_transactions')
logger = get_contextual_logger(f"{__name__}.resolve_contested_transactions")
# Group transactions by player name
player_transactions: Dict[str, List[Transaction]] = {}
@ -118,7 +124,7 @@ async def resolve_contested_transactions(
player_name = transaction.player.name.lower()
# Only consider transactions where a team is acquiring a player (not FA drops)
if transaction.newteam.abbrev.upper() != 'FA':
if transaction.newteam.abbrev.upper() != "FA":
if player_name not in player_transactions:
player_transactions[player_name] = []
player_transactions[player_name].append(transaction)
@ -130,7 +136,9 @@ async def resolve_contested_transactions(
for player_name, player_transactions_list in player_transactions.items():
if len(player_transactions_list) > 1:
contested_players[player_name] = player_transactions_list
logger.info(f"Contested player: {player_name} ({len(player_transactions_list)} teams)")
logger.info(
f"Contested player: {player_name} ({len(player_transactions_list)} teams)"
)
else:
# Non-contested, automatically wins
non_contested_moves.add(player_transactions_list[0].moveid)
@ -143,50 +151,66 @@ async def resolve_contested_transactions(
for player_name, contested_transactions in contested_players.items():
priorities: List[TransactionPriority] = []
# Track standings data for each team for report
team_standings_data: Dict[str, Tuple[int, int, float]] = {} # abbrev -> (wins, losses, win_pct)
team_standings_data: Dict[str, Tuple[int, int, float]] = (
{}
) # abbrev -> (wins, losses, win_pct)
for transaction in contested_transactions:
# Get team for priority calculation
# If adding to MiL team, use the parent ML team for standings
if transaction.newteam.abbrev.endswith('MiL'):
if transaction.newteam.abbrev.endswith("MiL"):
team_abbrev = transaction.newteam.abbrev[:-3] # Remove 'MiL' suffix
else:
team_abbrev = transaction.newteam.abbrev
try:
# Get team standings to calculate win percentage
standings = await standings_service.get_team_standings(team_abbrev, season)
standings = await standings_service.get_team_standings(
team_abbrev, season
)
if standings and standings.wins is not None and standings.losses is not None:
if (
standings
and standings.wins is not None
and standings.losses is not None
):
total_games = standings.wins + standings.losses
win_pct = standings.wins / total_games if total_games > 0 else 0.0
team_standings_data[transaction.newteam.abbrev] = (
standings.wins, standings.losses, win_pct
standings.wins,
standings.losses,
win_pct,
)
else:
win_pct = 0.0
team_standings_data[transaction.newteam.abbrev] = (0, 0, 0.0)
logger.warning(f"Could not get standings for {team_abbrev}, using 0.0 win%")
logger.warning(
f"Could not get standings for {team_abbrev}, using 0.0 win%"
)
# Add small random component for tiebreaking (5 decimal precision)
random_component = random.randint(10000, 99999) * 0.00000001
tiebreaker = win_pct + random_component
priorities.append(TransactionPriority(
transaction=transaction,
team_win_percentage=win_pct,
tiebreaker=tiebreaker
))
priorities.append(
TransactionPriority(
transaction=transaction,
team_win_percentage=win_pct,
tiebreaker=tiebreaker,
)
)
except Exception as e:
logger.error(f"Error calculating priority for {team_abbrev}: {e}")
team_standings_data[transaction.newteam.abbrev] = (0, 0, 0.0)
# Give them 0.0 priority on error
priorities.append(TransactionPriority(
transaction=transaction,
team_win_percentage=0.0,
tiebreaker=random.randint(10000, 99999) * 0.00000001
))
priorities.append(
TransactionPriority(
transaction=transaction,
team_win_percentage=0.0,
tiebreaker=random.randint(10000, 99999) * 0.00000001,
)
)
# Sort by tiebreaker (lowest win% wins - worst teams get priority)
priorities.sort()
@ -204,7 +228,7 @@ async def resolve_contested_transactions(
wins=winner_standings[0],
losses=winner_standings[1],
win_pct=winner_standings[2],
move_id=winner.transaction.moveid
move_id=winner.transaction.moveid,
)
loser_contenders: List[ConflictContender] = []
@ -224,7 +248,7 @@ async def resolve_contested_transactions(
wins=loser_standings[0],
losses=loser_standings[1],
win_pct=loser_standings[2],
move_id=loser.transaction.moveid
move_id=loser.transaction.moveid,
)
loser_contenders.append(loser_contender)
all_contenders.append(loser_contender)
@ -236,13 +260,15 @@ async def resolve_contested_transactions(
# Get player info from first transaction (they all have same player)
player = contested_transactions[0].player
conflict_resolutions.append(ConflictResolution(
player_name=player.name,
player_swar=player.wara,
contenders=all_contenders,
winner=winner_contender,
losers=loser_contenders
))
conflict_resolutions.append(
ConflictResolution(
player_name=player.name,
player_swar=player.wara,
contenders=all_contenders,
winner=winner_contender,
losers=loser_contenders,
)
)
# Add non-contested moves to winners
winning_move_ids.update(non_contested_moves)
@ -255,7 +281,7 @@ class TransactionFreezeTask:
def __init__(self, bot: commands.Bot):
self.bot = bot
self.logger = get_contextual_logger(f'{__name__}.TransactionFreezeTask')
self.logger = get_contextual_logger(f"{__name__}.TransactionFreezeTask")
# Track last execution to prevent duplicate operations
self.last_freeze_week: int | None = None
@ -288,7 +314,9 @@ class TransactionFreezeTask:
# Skip if offseason mode is enabled
if config.offseason_flag:
self.logger.info("Skipping freeze/thaw operations - offseason mode enabled")
self.logger.info(
"Skipping freeze/thaw operations - offseason mode enabled"
)
return
# Get current league state
@ -304,7 +332,7 @@ class TransactionFreezeTask:
weekday=now.weekday(),
hour=now.hour,
current_week=current.week,
freeze_status=current.freeze
freeze_status=current.freeze,
)
# BEGIN FREEZE: Monday at 00:00, not already frozen
@ -312,13 +340,23 @@ class TransactionFreezeTask:
# Only run if we haven't already frozen this week
# Track the week we're freezing FROM (before increment)
if self.last_freeze_week != current.week:
freeze_from_week = current.week # Save BEFORE _begin_freeze modifies it
self.logger.info("Triggering freeze begin", current_week=current.week)
freeze_from_week = (
current.week
) # Save BEFORE _begin_freeze modifies it
self.logger.info(
"Triggering freeze begin", current_week=current.week
)
await self._begin_freeze(current)
self.last_freeze_week = freeze_from_week # Track the week we froze FROM
self.error_notification_sent = False # Reset error flag for new cycle
self.last_freeze_week = (
freeze_from_week # Track the week we froze FROM
)
self.error_notification_sent = (
False # Reset error flag for new cycle
)
else:
self.logger.debug("Freeze already executed for week", week=current.week)
self.logger.debug(
"Freeze already executed for week", week=current.week
)
# END FREEZE: Saturday at 00:00, currently frozen
elif now.weekday() == 5 and now.hour == 0 and current.freeze:
@ -327,9 +365,13 @@ class TransactionFreezeTask:
self.logger.info("Triggering freeze end", current_week=current.week)
await self._end_freeze(current)
self.last_thaw_week = current.week
self.error_notification_sent = False # Reset error flag for new cycle
self.error_notification_sent = (
False # Reset error flag for new cycle
)
else:
self.logger.debug("Thaw already executed for week", week=current.week)
self.logger.debug(
"Thaw already executed for week", week=current.week
)
else:
self.logger.debug("No freeze/thaw action needed at this time")
@ -375,8 +417,7 @@ class TransactionFreezeTask:
# Increment week and set freeze via service
new_week = current.week + 1
updated_current = await league_service.update_current_state(
week=new_week,
freeze=True
week=new_week, freeze=True
)
if not updated_current:
@ -449,15 +490,18 @@ class TransactionFreezeTask:
try:
# Get non-frozen, non-cancelled transactions for current week via service
transactions = await transaction_service.get_regular_transactions_by_week(
season=current.season,
week=current.week
season=current.season, week=current.week
)
if not transactions:
self.logger.info(f"No regular transactions to process for week {current.week}")
self.logger.info(
f"No regular transactions to process for week {current.week}"
)
return
self.logger.info(f"Processing {len(transactions)} regular transactions for week {current.week}")
self.logger.info(
f"Processing {len(transactions)} regular transactions for week {current.week}"
)
# Execute player roster updates for all transactions
success_count = 0
@ -470,7 +514,7 @@ class TransactionFreezeTask:
player_id=transaction.player.id,
new_team_id=transaction.newteam.id,
player_name=transaction.player.name,
dem_week=current.week + 2
dem_week=current.week + 2,
)
success_count += 1
@ -482,7 +526,7 @@ class TransactionFreezeTask:
f"Failed to execute transaction for {transaction.player.name}",
player_id=transaction.player.id,
new_team_id=transaction.newteam.id,
error=str(e)
error=str(e),
)
failure_count += 1
@ -490,7 +534,7 @@ class TransactionFreezeTask:
f"Transaction execution complete for week {current.week}",
success=success_count,
failures=failure_count,
total=len(transactions)
total=len(transactions),
)
except Exception as e:
@ -514,11 +558,13 @@ class TransactionFreezeTask:
transactions = await transaction_service.get_frozen_transactions_by_week(
season=current.season,
week_start=current.week,
week_end=current.week + 1
week_end=current.week + 1,
)
if not transactions:
self.logger.warning(f"No frozen transactions to process for week {current.week}")
self.logger.warning(
f"No frozen transactions to process for week {current.week}"
)
# Still post an empty report for visibility
empty_report = ThawReport(
week=current.week,
@ -530,23 +576,26 @@ class TransactionFreezeTask:
conflict_count=0,
conflicts=[],
thawed_moves=[],
cancelled_moves=[]
cancelled_moves=[],
)
await self._post_thaw_report(empty_report)
return
self.logger.info(f"Processing {len(transactions)} frozen transactions for week {current.week}")
self.logger.info(
f"Processing {len(transactions)} frozen transactions for week {current.week}"
)
# Resolve contested transactions
winning_move_ids, losing_move_ids, conflict_resolutions = await resolve_contested_transactions(
transactions,
current.season
winning_move_ids, losing_move_ids, conflict_resolutions = (
await resolve_contested_transactions(transactions, current.season)
)
# Build mapping from conflict player to winner for cancelled move tracking
conflict_player_to_winner: Dict[str, str] = {}
for conflict in conflict_resolutions:
conflict_player_to_winner[conflict.player_name.lower()] = conflict.winner.team_abbrev
conflict_player_to_winner[conflict.player_name.lower()] = (
conflict.winner.team_abbrev
)
# Track cancelled moves for report
cancelled_moves_report: List[CancelledMove] = []
@ -555,24 +604,34 @@ class TransactionFreezeTask:
for losing_move_id in losing_move_ids:
try:
# Get all moves with this moveid (could be multiple players in one transaction)
losing_moves = [t for t in transactions if t.moveid == losing_move_id]
losing_moves = [
t for t in transactions if t.moveid == losing_move_id
]
if losing_moves:
# Cancel the entire transaction (all moves with same moveid)
for move in losing_moves:
success = await transaction_service.cancel_transaction(move.moveid)
success = await transaction_service.cancel_transaction(
move.moveid
)
if not success:
self.logger.warning(f"Failed to cancel transaction {move.moveid}")
self.logger.warning(
f"Failed to cancel transaction {move.moveid}"
)
# Notify the GM(s) about cancellation
first_move = losing_moves[0]
# Determine which team to notify (the team that was trying to acquire)
team_for_notification = (first_move.newteam
if first_move.newteam.abbrev.upper() != 'FA'
else first_move.oldteam)
team_for_notification = (
first_move.newteam
if first_move.newteam.abbrev.upper() != "FA"
else first_move.oldteam
)
await self._notify_gm_of_cancellation(first_move, team_for_notification)
await self._notify_gm_of_cancellation(
first_move, team_for_notification
)
# Find which player caused the conflict
contested_player = ""
@ -586,16 +645,23 @@ class TransactionFreezeTask:
# Build report entry
players = [
(move.player.name, move.player.wara, move.oldteam.abbrev, move.newteam.abbrev)
(
move.player.name,
move.player.wara,
move.oldteam.abbrev,
move.newteam.abbrev,
)
for move in losing_moves
]
cancelled_moves_report.append(CancelledMove(
move_id=losing_move_id,
team_abbrev=team_for_notification.abbrev,
players=players,
lost_to=lost_to,
contested_player=contested_player
))
cancelled_moves_report.append(
CancelledMove(
move_id=losing_move_id,
team_abbrev=team_for_notification.abbrev,
players=players,
lost_to=lost_to,
contested_player=contested_player,
)
)
contested_players = [move.player.name for move in losing_moves]
self.logger.info(
@ -604,7 +670,9 @@ class TransactionFreezeTask:
)
except Exception as e:
self.logger.error(f"Error cancelling transaction {losing_move_id}: {e}")
self.logger.error(
f"Error cancelling transaction {losing_move_id}: {e}"
)
# Track thawed moves for report
thawed_moves_report: List[ThawedMove] = []
@ -613,13 +681,19 @@ class TransactionFreezeTask:
for winning_move_id in winning_move_ids:
try:
# Get all moves with this moveid
winning_moves = [t for t in transactions if t.moveid == winning_move_id]
winning_moves = [
t for t in transactions if t.moveid == winning_move_id
]
for move in winning_moves:
# Unfreeze the transaction via service
success = await transaction_service.unfreeze_transaction(move.moveid)
success = await transaction_service.unfreeze_transaction(
move.moveid
)
if not success:
self.logger.warning(f"Failed to unfreeze transaction {move.moveid}")
self.logger.warning(
f"Failed to unfreeze transaction {move.moveid}"
)
# Post to transaction log
await self._post_transaction_to_log(winning_move_id, transactions)
@ -629,32 +703,43 @@ class TransactionFreezeTask:
first_move = winning_moves[0]
# Extract timestamp from moveid (format: Season-XXX-Week-XX-DD-HH:MM:SS)
try:
parts = winning_move_id.split('-')
parts = winning_move_id.split("-")
submitted_at = parts[-1] if len(parts) >= 6 else "Unknown"
except Exception:
submitted_at = "Unknown"
# Determine team abbrev
if first_move.newteam.abbrev.upper() != 'FA':
if first_move.newteam.abbrev.upper() != "FA":
team_abbrev = first_move.newteam.abbrev
else:
team_abbrev = first_move.oldteam.abbrev
players = [
(move.player.name, move.player.wara, move.oldteam.abbrev, move.newteam.abbrev)
(
move.player.name,
move.player.wara,
move.oldteam.abbrev,
move.newteam.abbrev,
)
for move in winning_moves
]
thawed_moves_report.append(ThawedMove(
move_id=winning_move_id,
team_abbrev=team_abbrev,
players=players,
submitted_at=submitted_at
))
thawed_moves_report.append(
ThawedMove(
move_id=winning_move_id,
team_abbrev=team_abbrev,
players=players,
submitted_at=submitted_at,
)
)
self.logger.info(f"Processed successful transaction {winning_move_id}")
self.logger.info(
f"Processed successful transaction {winning_move_id}"
)
except Exception as e:
self.logger.error(f"Error processing winning transaction {winning_move_id}: {e}")
self.logger.error(
f"Error processing winning transaction {winning_move_id}: {e}"
)
# Generate and post thaw report
thaw_report = ThawReport(
@ -667,7 +752,7 @@ class TransactionFreezeTask:
conflict_count=len(conflict_resolutions),
conflicts=conflict_resolutions,
thawed_moves=thawed_moves_report,
cancelled_moves=cancelled_moves_report
cancelled_moves=cancelled_moves_report,
)
await self._post_thaw_report(thaw_report)
@ -685,7 +770,7 @@ class TransactionFreezeTask:
player_id: int,
new_team_id: int,
player_name: str,
dem_week: Optional[int] = None
dem_week: Optional[int] = None,
) -> bool:
"""
Execute a player roster update via API PATCH.
@ -708,13 +793,11 @@ class TransactionFreezeTask:
player_id=player_id,
player_name=player_name,
new_team_id=new_team_id,
dem_week=dem_week
dem_week=dem_week,
)
updated_player = await player_service.update_player_team(
player_id,
new_team_id,
dem_week=dem_week
player_id, new_team_id, dem_week=dem_week
)
# Verify response (200 or 204 indicates success)
@ -724,7 +807,7 @@ class TransactionFreezeTask:
player_id=player_id,
player_name=player_name,
new_team_id=new_team_id,
dem_week=dem_week
dem_week=dem_week,
)
return True
else:
@ -733,7 +816,7 @@ class TransactionFreezeTask:
player_id=player_id,
player_name=player_name,
new_team_id=new_team_id,
dem_week=dem_week
dem_week=dem_week,
)
return False
@ -745,7 +828,7 @@ class TransactionFreezeTask:
new_team_id=new_team_id,
dem_week=dem_week,
error=str(e),
exc_info=True
exc_info=True,
)
raise
@ -764,34 +847,36 @@ class TransactionFreezeTask:
self.logger.warning("Could not find guild for freeze announcement")
return
channel = discord.utils.get(guild.text_channels, name='transaction-log')
channel = discord.utils.get(guild.text_channels, name="transaction-log")
if not channel:
self.logger.warning("Could not find transaction-log channel")
return
# Create announcement message (formatted like legacy bot)
week_num = f'Week {week}'
stars = '*' * 32
week_num = f"Week {week}"
stars = "*" * 32
if is_beginning:
message = (
f'```\n'
f'{stars}\n'
f'{week_num:>9} Freeze Period Begins\n'
f'{stars}\n'
f'```'
f"```\n"
f"{stars}\n"
f"{week_num:>9} Freeze Period Begins\n"
f"{stars}\n"
f"```"
)
else:
message = (
f'```\n'
f"```\n"
f'{"*" * 30}\n'
f'{week_num:>9} Freeze Period Ends\n'
f"{week_num:>9} Freeze Period Ends\n"
f'{"*" * 30}\n'
f'```'
f"```"
)
await channel.send(message)
self.logger.info(f"Freeze announcement sent for week {week} ({'begin' if is_beginning else 'end'})")
self.logger.info(
f"Freeze announcement sent for week {week} ({'begin' if is_beginning else 'end'})"
)
except Exception as e:
self.logger.error(f"Error sending freeze announcement: {e}")
@ -809,7 +894,7 @@ class TransactionFreezeTask:
if not guild:
return
info_channel = discord.utils.get(guild.text_channels, name='weekly-info')
info_channel = discord.utils.get(guild.text_channels, name="weekly-info")
if not info_channel:
self.logger.warning("Could not find weekly-info channel")
return
@ -818,7 +903,7 @@ class TransactionFreezeTask:
async for message in info_channel.history(limit=25):
try:
await message.delete()
except:
except Exception:
pass # Ignore deletion errors
# Determine season emoji
@ -835,17 +920,17 @@ class TransactionFreezeTask:
is_div_week = current.week in [1, 3, 6, 14, 16, 18]
weekly_str = (
f'**Season**: {season_str}\n'
f'**Time of Day**: {night_str} / {night_str if is_div_week else day_str} / '
f'{night_str} / {day_str}'
f"**Season**: {season_str}\n"
f"**Time of Day**: {night_str} / {night_str if is_div_week else day_str} / "
f"{night_str} / {day_str}"
)
# Send info messages
await info_channel.send(
content=(
f'Each team has manage permissions in their home ballpark. '
f'They may pin messages and rename the channel.\n\n'
f'**Make sure your ballpark starts with your team abbreviation.**'
f"Each team has manage permissions in their home ballpark. "
f"They may pin messages and rename the channel.\n\n"
f"**Make sure your ballpark starts with your team abbreviation.**"
)
)
await info_channel.send(weekly_str)
@ -856,9 +941,7 @@ class TransactionFreezeTask:
self.logger.error(f"Error posting weekly info: {e}")
async def _post_transaction_to_log(
self,
move_id: str,
all_transactions: List[Transaction]
self, move_id: str, all_transactions: List[Transaction]
):
"""
Post a transaction to the transaction log channel.
@ -873,7 +956,7 @@ class TransactionFreezeTask:
if not guild:
return
channel = discord.utils.get(guild.text_channels, name='transaction-log')
channel = discord.utils.get(guild.text_channels, name="transaction-log")
if not channel:
return
@ -884,9 +967,15 @@ class TransactionFreezeTask:
# Determine the team for the embed (team making the moves)
first_move = moves[0]
if first_move.newteam.abbrev.upper() != 'FA' and 'IL' not in first_move.newteam.abbrev:
if (
first_move.newteam.abbrev.upper() != "FA"
and "IL" not in first_move.newteam.abbrev
):
this_team = first_move.newteam
elif first_move.oldteam.abbrev.upper() != 'FA' and 'IL' not in first_move.oldteam.abbrev:
elif (
first_move.oldteam.abbrev.upper() != "FA"
and "IL" not in first_move.oldteam.abbrev
):
this_team = first_move.oldteam
else:
# Default to newteam if both are FA/IL
@ -898,25 +987,29 @@ class TransactionFreezeTask:
for move in moves:
move_string += (
f'**{move.player.name}** ({move.player.wara:.2f}) '
f'from {move.oldteam.abbrev} to {move.newteam.abbrev}\n'
f"**{move.player.name}** ({move.player.wara:.2f}) "
f"from {move.oldteam.abbrev} to {move.newteam.abbrev}\n"
)
# Create embed
embed = EmbedTemplate.create_base_embed(
title=f'Week {week_num} Transaction',
description=this_team.sname if hasattr(this_team, 'sname') else this_team.lname,
color=EmbedColors.INFO
title=f"Week {week_num} Transaction",
description=(
this_team.sname if hasattr(this_team, "sname") else this_team.lname
),
color=EmbedColors.INFO,
)
# Set team color if available
if hasattr(this_team, 'color') and this_team.color:
if hasattr(this_team, "color") and this_team.color:
try:
embed.color = discord.Color(int(this_team.color.replace('#', ''), 16))
except:
embed.color = discord.Color(
int(this_team.color.replace("#", ""), 16)
)
except Exception:
pass # Use default color on error
embed.add_field(name='Player Moves', value=move_string, inline=False)
embed.add_field(name="Player Moves", value=move_string, inline=False)
await channel.send(embed=embed)
self.logger.info(f"Transaction posted to log: {move_id}")
@ -924,11 +1017,7 @@ class TransactionFreezeTask:
except Exception as e:
self.logger.error(f"Error posting transaction to log: {e}")
async def _notify_gm_of_cancellation(
self,
transaction: Transaction,
team
):
async def _notify_gm_of_cancellation(self, transaction: Transaction, team):
"""
Send DM to GM(s) about cancelled transaction.
@ -943,27 +1032,31 @@ class TransactionFreezeTask:
return
cancel_text = (
f'Your transaction for **{transaction.player.name}** has been cancelled '
f'because another team successfully claimed them during the freeze period.'
f"Your transaction for **{transaction.player.name}** has been cancelled "
f"because another team successfully claimed them during the freeze period."
)
# Notify GM1
if hasattr(team, 'gmid') and team.gmid:
if hasattr(team, "gmid") and team.gmid:
try:
gm_one = guild.get_member(team.gmid)
if gm_one:
await gm_one.send(cancel_text)
self.logger.info(f"Cancellation notification sent to GM1 of {team.abbrev}")
self.logger.info(
f"Cancellation notification sent to GM1 of {team.abbrev}"
)
except Exception as e:
self.logger.error(f"Could not notify GM1 of {team.abbrev}: {e}")
# Notify GM2 if exists
if hasattr(team, 'gmid2') and team.gmid2:
if hasattr(team, "gmid2") and team.gmid2:
try:
gm_two = guild.get_member(team.gmid2)
if gm_two:
await gm_two.send(cancel_text)
self.logger.info(f"Cancellation notification sent to GM2 of {team.abbrev}")
self.logger.info(
f"Cancellation notification sent to GM2 of {team.abbrev}"
)
except Exception as e:
self.logger.error(f"Could not notify GM2 of {team.abbrev}: {e}")
@ -986,30 +1079,43 @@ class TransactionFreezeTask:
admin_channel = self.bot.get_channel(config.thaw_report_channel_id)
if not admin_channel:
self.logger.warning("Could not find thaw report channel", channel_id=config.thaw_report_channel_id)
self.logger.warning(
"Could not find thaw report channel",
channel_id=config.thaw_report_channel_id,
)
return
# Build the report content
report_lines = []
# Header with summary
timestamp_str = report.timestamp.strftime('%B %d, %Y %H:%M UTC')
timestamp_str = report.timestamp.strftime("%B %d, %Y %H:%M UTC")
report_lines.append(f"# Transaction Thaw Report")
report_lines.append(f"**Week {report.week}** | **Season {report.season}** | {timestamp_str}")
report_lines.append(f"**Total:** {report.total_moves} moves | **Thawed:** {report.thawed_count} | **Cancelled:** {report.cancelled_count} | **Conflicts:** {report.conflict_count}")
report_lines.append(
f"**Week {report.week}** | **Season {report.season}** | {timestamp_str}"
)
report_lines.append(
f"**Total:** {report.total_moves} moves | **Thawed:** {report.thawed_count} | **Cancelled:** {report.cancelled_count} | **Conflicts:** {report.conflict_count}"
)
report_lines.append("")
# Conflict Resolution section (if any)
if report.conflicts:
report_lines.append("## Conflict Resolution")
for conflict in report.conflicts:
report_lines.append(f"**{conflict.player_name}** (sWAR: {conflict.player_swar:.1f})")
contenders_str = " vs ".join([
f"{c.team_abbrev} ({c.wins}-{c.losses})"
for c in conflict.contenders
])
report_lines.append(
f"**{conflict.player_name}** (sWAR: {conflict.player_swar:.1f})"
)
contenders_str = " vs ".join(
[
f"{c.team_abbrev} ({c.wins}-{c.losses})"
for c in conflict.contenders
]
)
report_lines.append(f"- Contested by: {contenders_str}")
report_lines.append(f"- **Awarded to: {conflict.winner.team_abbrev}** (worst record wins)")
report_lines.append(
f"- **Awarded to: {conflict.winner.team_abbrev}** (worst record wins)"
)
report_lines.append("")
# Thawed Moves section
@ -1018,7 +1124,9 @@ class TransactionFreezeTask:
for move in report.thawed_moves:
report_lines.append(f"**{move.move_id}** | {move.team_abbrev}")
for player_name, swar, old_team, new_team in move.players:
report_lines.append(f" - {player_name} ({swar:.1f}): {old_team}{new_team}")
report_lines.append(
f" - {player_name} ({swar:.1f}): {old_team}{new_team}"
)
else:
report_lines.append("*No moves thawed*")
report_lines.append("")
@ -1027,10 +1135,18 @@ class TransactionFreezeTask:
report_lines.append("## Cancelled Moves")
if report.cancelled_moves:
for move in report.cancelled_moves:
lost_info = f" (lost {move.contested_player} to {move.lost_to})" if move.lost_to else ""
report_lines.append(f"**{move.move_id}** | {move.team_abbrev}{lost_info}")
lost_info = (
f" (lost {move.contested_player} to {move.lost_to})"
if move.lost_to
else ""
)
report_lines.append(
f"**{move.move_id}** | {move.team_abbrev}{lost_info}"
)
for player_name, swar, old_team, new_team in move.players:
report_lines.append(f" - ❌ {player_name} ({swar:.1f}): {old_team}{new_team}")
report_lines.append(
f" - ❌ {player_name} ({swar:.1f}): {old_team}{new_team}"
)
else:
report_lines.append("*No moves cancelled*")

View File

@ -380,12 +380,14 @@ class SubmitConfirmationModal(discord.ui.Modal):
if "Transaction Builder" in message.embeds[0].title: # type: ignore
await message.edit(embed=completion_embed, view=view)
break
except:
except Exception:
pass
except Exception as e:
self.logger.error(f"Error submitting transaction: {e}", exc_info=True)
await interaction.followup.send(
f"❌ Error submitting transaction: {str(e)}", ephemeral=True
"❌ Error submitting transaction. Please try again or contact an admin.",
ephemeral=True,
)