# Tasks Directory The tasks directory contains automated background tasks for Discord Bot v2.0. These tasks handle periodic maintenance, data cleanup, and scheduled operations that run independently of user interactions. ## Architecture ### Task System Design Tasks in Discord Bot v2.0 follow these patterns: - **Discord.py tasks** using the `@tasks.loop` decorator - **Structured logging** with contextual information - **Error handling** with graceful degradation - **Guild-specific operations** respecting bot permissions - **Configurable intervals** via task decorators ### Base Task Pattern All tasks follow a consistent structure: ```python from discord.ext import tasks from utils.logging import get_contextual_logger class ExampleTask: def __init__(self, bot: commands.Bot): self.bot = bot self.logger = get_contextual_logger(f'{__name__}.ExampleTask') self.task_loop.start() def cog_unload(self): """Stop the task when cog is unloaded.""" self.task_loop.cancel() @tasks.loop(hours=24) # Run daily async def task_loop(self): """Main task implementation.""" try: # Task logic here pass except Exception as e: self.logger.error("Task failed", error=e) @task_loop.before_loop async def before_task(self): """Wait for bot to be ready before starting.""" await self.bot.wait_until_ready() ``` ## Current Tasks ### Custom Command Cleanup (`custom_command_cleanup.py`) **Purpose:** Automated cleanup system for user-created custom commands **Schedule:** Daily (24 hours) **Operations:** - **Warning Phase:** Notifies users about commands at risk (unused for 60+ days) - **Deletion Phase:** Removes commands unused for 90+ days - **Admin Reporting:** Sends cleanup summaries to admin channels #### Key Features - **User Notifications:** Direct messages to command creators - **Grace Period:** 30-day warning before deletion - **Admin Transparency:** Optional summary reports - **Bulk Operations:** Efficient batch processing - **Error Resilience:** Continues operation despite individual failures #### Configuration The cleanup task respects guild settings and permissions: ```python # Configuration via get_config() guild_id = config.guild_id # Target guild admin_channels = ['admin', 'bot-logs'] # Admin notification channels ``` #### Notification System **Warning Embed (30 days before deletion):** - Lists commands at risk - Shows days since last use - Provides usage instructions - Links to command management **Deletion Embed (after deletion):** - Lists deleted commands - Shows final usage statistics - Provides recreation instructions - Explains cleanup policy #### Admin Summary Optional admin channel reporting includes: - Number of warnings sent - Number of commands deleted - Current system statistics - Next cleanup schedule ## Task Lifecycle ### Initialization Tasks are initialized when the bot starts: ```python # In bot startup def setup_cleanup_task(bot: commands.Bot) -> CustomCommandCleanupTask: return CustomCommandCleanupTask(bot) # Usage cleanup_task = setup_cleanup_task(bot) ``` ### Execution Flow 1. **Bot Ready Check:** Wait for `bot.wait_until_ready()` 2. **Guild Validation:** Verify bot has access to configured guild 3. **Permission Checks:** Ensure bot can send messages/DMs 4. **Main Operation:** Execute task logic with error handling 5. **Logging:** Record operation results and performance metrics 6. **Cleanup:** Reset state for next iteration ### Error Handling Tasks implement comprehensive error handling: ```python async def task_operation(self): try: # Main task logic result = await self.perform_operation() self.logger.info("Task completed", result=result) except SpecificException as e: self.logger.warning("Recoverable error", error=e) # Continue with degraded functionality except Exception as e: self.logger.error("Task failed", error=e) # Task will retry on next interval ``` ## Development Patterns ### Creating New Tasks 1. **Inherit from Base Pattern** ```python class NewTask: def __init__(self, bot: commands.Bot): self.bot = bot self.logger = get_contextual_logger(f'{__name__}.NewTask') self.main_loop.start() ``` 2. **Configure Task Schedule** ```python @tasks.loop(minutes=30) # Every 30 minutes # or @tasks.loop(hours=6) # Every 6 hours # or @tasks.loop(time=datetime.time(hour=3)) # Daily at 3 AM UTC ``` 3. **Implement Before Loop** ```python @main_loop.before_loop async def before_loop(self): await self.bot.wait_until_ready() self.logger.info("Task initialized and ready") ``` 4. **Add Cleanup Handling** ```python def cog_unload(self): self.main_loop.cancel() self.logger.info("Task stopped") ``` ### Task Categories #### Maintenance Tasks - **Data cleanup** (expired records, unused resources) - **Cache management** (clear stale entries, optimize storage) - **Log rotation** (archive old logs, manage disk space) #### User Management - **Inactive user cleanup** (remove old user data) - **Permission auditing** (validate role assignments) - **Usage analytics** (collect usage statistics) #### System Monitoring - **Health checks** (verify system components) - **Performance monitoring** (track response times) - **Error rate tracking** (monitor failure rates) ### Task Configuration #### Environment Variables Tasks respect standard bot configuration: ```python GUILD_ID=12345... # Target Discord guild LOG_LEVEL=INFO # Logging verbosity REDIS_URL=redis://... # Optional caching backend ``` #### Runtime Configuration Tasks use the central config system: ```python from config import get_config config = get_config() guild = self.bot.get_guild(config.guild_id) ``` ## Logging and Monitoring ### Structured Logging Tasks use contextual logging for observability: ```python self.logger.info( "Cleanup task starting", guild_id=guild.id, commands_at_risk=len(at_risk_commands) ) self.logger.warning( "User DM failed", user_id=user.id, reason="DMs disabled" ) self.logger.error( "Task operation failed", operation="delete_commands", error=str(e) ) ``` ### Performance Tracking Tasks log timing and performance metrics: ```python start_time = datetime.utcnow() # ... task operations ... duration = (datetime.utcnow() - start_time).total_seconds() self.logger.info( "Task completed", duration_seconds=duration, operations_completed=operation_count ) ``` ### Error Recovery Tasks implement retry logic and graceful degradation: ```python async def process_with_retry(self, operation, max_retries=3): for attempt in range(max_retries): try: return await operation() except RecoverableError as e: if attempt == max_retries - 1: raise await asyncio.sleep(2 ** attempt) # Exponential backoff ``` ## Testing Strategies ### Unit Testing Tasks ```python @pytest.mark.asyncio async def test_custom_command_cleanup(): # Mock bot and services bot = AsyncMock() task = CustomCommandCleanupTask(bot) # Mock service responses with patch('services.custom_commands_service') as mock_service: mock_service.get_commands_needing_warning.return_value = [] # Test task execution await task.cleanup_task() # Verify service calls mock_service.get_commands_needing_warning.assert_called_once() ``` ### Integration Testing ```python @pytest.mark.integration async def test_cleanup_task_with_real_data(): # Test with actual Discord bot instance # Use test guild and test data # Verify real Discord API interactions ``` ### Performance Testing ```python @pytest.mark.performance async def test_cleanup_task_performance(): # Test with large datasets # Measure execution time # Verify memory usage ``` ## Security Considerations ### Permission Validation Tasks verify bot permissions before operations: ```python async def check_permissions(self, guild: discord.Guild) -> bool: """Verify bot has required permissions.""" bot_member = guild.me # Check for required permissions if not bot_member.guild_permissions.send_messages: self.logger.warning("Missing send_messages permission") return False return True ``` ### Data Privacy Tasks handle user data responsibly: - **Minimal data access** - Only access required data - **Secure logging** - Avoid logging sensitive information - **GDPR compliance** - Respect user data rights - **Permission respect** - Honor user privacy settings ### Rate Limiting Tasks implement Discord API rate limiting: ```python async def send_notifications_with_rate_limiting(self, notifications): """Send notifications with rate limiting.""" for notification in notifications: try: await self.send_notification(notification) await asyncio.sleep(1) # Avoid rate limits except discord.HTTPException as e: if e.status == 429: # Rate limited retry_after = e.response.headers.get('Retry-After', 60) await asyncio.sleep(int(retry_after)) ``` ## Future Task Ideas ### Potential Additions - **Database maintenance** - Optimize database performance - **Backup automation** - Create data backups - **Usage analytics** - Generate usage reports - **Health monitoring** - System health checks - **Cache warming** - Pre-populate frequently accessed data ### Scalability Patterns - **Task queues** - Distribute work across multiple workers - **Sharding support** - Handle multiple Discord guilds - **Load balancing** - Distribute task execution - **Monitoring integration** - External monitoring systems --- **Next Steps for AI Agents:** 1. Review the existing cleanup task implementation 2. Understand the Discord.py tasks framework 3. Follow the structured logging patterns 4. Implement proper error handling and recovery 5. Consider guild permissions and user privacy 6. Test tasks thoroughly before deployment