""" Tests for concurrent message handling with per-channel locking. Tests verify: 1. Messages in the same channel are processed sequentially 2. Messages in different channels run in parallel 3. Locks are properly released on timeout/error 4. Locks are reused for the same channel 5. Queue behavior with multiple messages """ import asyncio from unittest.mock import AsyncMock, MagicMock, patch, PropertyMock from datetime import datetime import pytest from claude_coordinator.bot import ClaudeCoordinator from claude_coordinator.config import ProjectConfig from claude_coordinator.claude_runner import ClaudeResponse @pytest.fixture def mock_bot_user(): """Create a mock bot user.""" bot_user = MagicMock() bot_user.id = 987654321 bot_user.bot = True bot_user.name = "ClaudeCoordinator" return bot_user @pytest.fixture def mock_discord_user(): """Create a mock Discord user.""" user = MagicMock() user.id = 123456789 user.bot = False user.name = "TestUser" return user def create_mock_message(channel_id: int, content: str, bot_user, discord_user): """Helper to create a mock Discord message.""" message = MagicMock() message.author = discord_user message.content = f"<@{bot_user.id}> {content}" message.mentions = [bot_user] message.channel = MagicMock() message.channel.id = channel_id message.channel.send = AsyncMock() message.channel.typing = MagicMock() message.channel.typing.return_value.__aenter__ = AsyncMock() message.channel.typing.return_value.__aexit__ = AsyncMock() return message @pytest.fixture def mock_project_config(): """Create a mock ProjectConfig.""" return ProjectConfig( name="test-project", channel_id="111222333444", project_dir="/tmp/test-project", allowed_tools=["Bash", "Read", "Write"], system_prompt="You are a test assistant.", model="sonnet" ) class TestPerChannelLocking: """Tests for per-channel locking mechanism.""" @pytest.mark.asyncio async def test_lock_creation_per_channel(self): """Test that each channel gets its own lock.""" bot = ClaudeCoordinator() lock1 = bot._get_channel_lock("channel_1") lock2 = bot._get_channel_lock("channel_2") lock1_again = bot._get_channel_lock("channel_1") # Different channels should have different locks assert lock1 is not lock2 # Same channel should reuse the same lock assert lock1 is lock1_again @pytest.mark.asyncio async def test_concurrent_messages_same_channel_serialize( self, mock_bot_user, mock_discord_user, mock_project_config ): """Two messages in the same channel should process sequentially.""" bot = ClaudeCoordinator() # Track call order and timing call_order = [] start_times = {} end_times = {} async def mock_run(*args, message=None, **kwargs): """Mock Claude runner that takes time to process.""" call_id = len(call_order) start_times[call_id] = asyncio.get_event_loop().time() call_order.append(message) await asyncio.sleep(0.1) # Simulate Claude processing end_times[call_id] = asyncio.get_event_loop().time() return ClaudeResponse( success=True, result=f"Response to: {message}", session_id="sess_123", cost=0.001, duration_ms=100, permission_denials=[] ) # Mock dependencies with patch.object(type(bot), 'user', new_callable=PropertyMock) as mock_user: mock_user.return_value = mock_bot_user bot.config.get_project_by_channel = MagicMock(return_value=mock_project_config) bot.session_manager.get_session = AsyncMock(return_value=None) bot.session_manager.save_session = AsyncMock() bot.session_manager.update_activity = AsyncMock() bot.claude_runner.run = mock_run bot.response_formatter.format_response = MagicMock(return_value=["Formatted response"]) # Create two messages for the same channel msg1 = create_mock_message(111222333444, "Message 1", mock_bot_user, mock_discord_user) msg2 = create_mock_message(111222333444, "Message 2", mock_bot_user, mock_discord_user) # Start both tasks concurrently task1 = asyncio.create_task(bot._handle_claude_request(msg1, mock_project_config)) task2 = asyncio.create_task(bot._handle_claude_request(msg2, mock_project_config)) await asyncio.gather(task1, task2) # Both messages should have been processed assert len(call_order) == 2 # They should have run sequentially (second starts after first ends) # First message should complete before second message starts assert end_times[0] <= start_times[1] + 0.01 # Small tolerance for timing @pytest.mark.asyncio async def test_concurrent_messages_different_channels_parallel( self, mock_bot_user, mock_discord_user, mock_project_config ): """Messages in different channels should process in parallel.""" bot = ClaudeCoordinator() # Track concurrent execution active_count = 0 max_concurrent = 0 lock = asyncio.Lock() async def mock_run(*args, message=None, **kwargs): """Mock Claude runner that tracks concurrency.""" nonlocal active_count, max_concurrent async with lock: active_count += 1 max_concurrent = max(max_concurrent, active_count) await asyncio.sleep(0.1) # Simulate Claude processing async with lock: active_count -= 1 return ClaudeResponse( success=True, result=f"Response to: {message}", session_id="sess_123", cost=0.001, duration_ms=100, permission_denials=[] ) # Mock dependencies with patch.object(type(bot), 'user', new_callable=PropertyMock) as mock_user: mock_user.return_value = mock_bot_user bot.config.get_project_by_channel = MagicMock(return_value=mock_project_config) bot.session_manager.get_session = AsyncMock(return_value=None) bot.session_manager.save_session = AsyncMock() bot.session_manager.update_activity = AsyncMock() bot.claude_runner.run = mock_run bot.response_formatter.format_response = MagicMock(return_value=["Formatted response"]) # Create messages for different channels msg1 = create_mock_message(111111111111, "Message 1", mock_bot_user, mock_discord_user) msg2 = create_mock_message(222222222222, "Message 2", mock_bot_user, mock_discord_user) # Start both tasks concurrently task1 = asyncio.create_task(bot._handle_claude_request(msg1, mock_project_config)) task2 = asyncio.create_task(bot._handle_claude_request(msg2, mock_project_config)) await asyncio.gather(task1, task2) # Both messages should have run concurrently (max_concurrent should be 2) assert max_concurrent == 2 @pytest.mark.asyncio async def test_lock_released_on_timeout( self, mock_bot_user, mock_discord_user, mock_project_config ): """Lock should be released if first message times out.""" bot = ClaudeCoordinator() call_count = 0 async def mock_run(*args, **kwargs): """Mock Claude runner that times out on first call.""" nonlocal call_count call_count += 1 if call_count == 1: raise asyncio.TimeoutError("Claude timeout") else: return ClaudeResponse( success=True, result="Success", session_id="sess_123", cost=0.001, duration_ms=100, permission_denials=[] ) # Mock dependencies with patch.object(type(bot), 'user', new_callable=PropertyMock) as mock_user: mock_user.return_value = mock_bot_user bot.config.get_project_by_channel = MagicMock(return_value=mock_project_config) bot.session_manager.get_session = AsyncMock(return_value=None) bot.session_manager.save_session = AsyncMock() bot.session_manager.update_activity = AsyncMock() bot.claude_runner.run = mock_run bot.response_formatter.format_response = MagicMock(return_value=["Formatted response"]) # Create two messages for the same channel msg1 = create_mock_message(111222333444, "Message 1", mock_bot_user, mock_discord_user) msg2 = create_mock_message(111222333444, "Message 2", mock_bot_user, mock_discord_user) # Process messages sequentially (first will timeout, second should proceed) await bot._handle_claude_request(msg1, mock_project_config) await bot._handle_claude_request(msg2, mock_project_config) # Second message should have been processed successfully assert call_count == 2 assert msg2.channel.send.call_count == 1 # Success message sent @pytest.mark.asyncio async def test_lock_released_on_error( self, mock_bot_user, mock_discord_user, mock_project_config ): """Lock should be released if first message errors.""" bot = ClaudeCoordinator() call_count = 0 async def mock_run(*args, **kwargs): """Mock Claude runner that errors on first call.""" nonlocal call_count call_count += 1 if call_count == 1: raise Exception("Unexpected error") else: return ClaudeResponse( success=True, result="Success", session_id="sess_123", cost=0.001, duration_ms=100, permission_denials=[] ) # Mock dependencies with patch.object(type(bot), 'user', new_callable=PropertyMock) as mock_user: mock_user.return_value = mock_bot_user bot.config.get_project_by_channel = MagicMock(return_value=mock_project_config) bot.session_manager.get_session = AsyncMock(return_value=None) bot.session_manager.save_session = AsyncMock() bot.session_manager.update_activity = AsyncMock() bot.claude_runner.run = mock_run bot.response_formatter.format_response = MagicMock(return_value=["Formatted response"]) # Create two messages for the same channel msg1 = create_mock_message(111222333444, "Message 1", mock_bot_user, mock_discord_user) msg2 = create_mock_message(111222333444, "Message 2", mock_bot_user, mock_discord_user) # Process messages sequentially (first will error, second should proceed) await bot._handle_claude_request(msg1, mock_project_config) await bot._handle_claude_request(msg2, mock_project_config) # Second message should have been processed successfully assert call_count == 2 @pytest.mark.asyncio async def test_three_messages_same_channel_serialize( self, mock_bot_user, mock_discord_user, mock_project_config ): """Three messages in the same channel should all process in order.""" bot = ClaudeCoordinator() call_order = [] async def mock_run(*args, message=None, **kwargs): """Mock Claude runner that tracks call order.""" call_order.append(message) await asyncio.sleep(0.05) # Simulate processing return ClaudeResponse( success=True, result=f"Response to: {message}", session_id="sess_123", cost=0.001, duration_ms=50, permission_denials=[] ) # Mock dependencies with patch.object(type(bot), 'user', new_callable=PropertyMock) as mock_user: mock_user.return_value = mock_bot_user bot.config.get_project_by_channel = MagicMock(return_value=mock_project_config) bot.session_manager.get_session = AsyncMock(return_value=None) bot.session_manager.save_session = AsyncMock() bot.session_manager.update_activity = AsyncMock() bot.claude_runner.run = mock_run bot.response_formatter.format_response = MagicMock(return_value=["Formatted response"]) # Create three messages for the same channel msg1 = create_mock_message(111222333444, "Message 1", mock_bot_user, mock_discord_user) msg2 = create_mock_message(111222333444, "Message 2", mock_bot_user, mock_discord_user) msg3 = create_mock_message(111222333444, "Message 3", mock_bot_user, mock_discord_user) # Start all three tasks concurrently task1 = asyncio.create_task(bot._handle_claude_request(msg1, mock_project_config)) task2 = asyncio.create_task(bot._handle_claude_request(msg2, mock_project_config)) task3 = asyncio.create_task(bot._handle_claude_request(msg3, mock_project_config)) await asyncio.gather(task1, task2, task3) # All three messages should have been processed assert len(call_order) == 3 # They should have been processed in the order they were submitted assert call_order == ["Message 1", "Message 2", "Message 3"] @pytest.mark.asyncio async def test_lock_check_when_busy( self, mock_bot_user, mock_discord_user, mock_project_config ): """Test that lock status is checked when channel is busy.""" bot = ClaudeCoordinator() async def mock_run(*args, **kwargs): """Mock Claude runner.""" await asyncio.sleep(0.1) return ClaudeResponse( success=True, result="Success", session_id="sess_123", cost=0.001, duration_ms=100, permission_denials=[] ) # Mock dependencies with patch.object(type(bot), 'user', new_callable=PropertyMock) as mock_user: mock_user.return_value = mock_bot_user bot.config.get_project_by_channel = MagicMock(return_value=mock_project_config) bot.session_manager.get_session = AsyncMock(return_value=None) bot.session_manager.save_session = AsyncMock() bot.session_manager.update_activity = AsyncMock() bot.claude_runner.run = mock_run bot.response_formatter.format_response = MagicMock(return_value=["Formatted response"]) # Create two messages for the same channel msg1 = create_mock_message(111222333444, "Message 1", mock_bot_user, mock_discord_user) msg2 = create_mock_message(111222333444, "Message 2", mock_bot_user, mock_discord_user) # Start first task task1 = asyncio.create_task(bot._handle_claude_request(msg1, mock_project_config)) # Wait a bit to ensure first task has acquired the lock await asyncio.sleep(0.01) # Check that lock is busy channel_id = str(msg1.channel.id) lock = bot._get_channel_lock(channel_id) assert lock.locked() # Start second task task2 = asyncio.create_task(bot._handle_claude_request(msg2, mock_project_config)) # Wait for both to complete await asyncio.gather(task1, task2) # Lock should be released after both complete assert not lock.locked()