Implemented foundational modules for Claude Discord Coordinator: - Project skeleton with uv (CRIT-003) - Claude CLI subprocess runner with 11/11 tests passing (CRIT-004) - SQLite session manager with 27/27 tests passing (CRIT-005) - Comprehensive test suites for both modules - Production-ready async/await patterns - Full type hints and documentation Technical highlights: - Validated CLI pattern: claude -p --resume --output-format json - bypassPermissions requires non-root user (discord-bot) - WAL mode SQLite for concurrency - asyncio.Lock for thread safety - Context manager support Progress: 5/18 tasks complete (28%) Week 1: 5/6 complete Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
629 lines
20 KiB
Python
629 lines
20 KiB
Python
"""
|
|
Test suite for SessionManager.
|
|
|
|
Tests database creation, CRUD operations, concurrent access, and edge cases.
|
|
|
|
Author: Claude Code
|
|
Created: 2026-02-13
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
# Add parent directory to path for imports
|
|
import sys
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
|
|
from claude_coordinator.session_manager import SessionManager
|
|
|
|
|
|
# Configure logging for tests
|
|
logging.basicConfig(
|
|
level=logging.DEBUG,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def temp_db():
|
|
"""Create a temporary database file for testing."""
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.db') as f:
|
|
db_path = f.name
|
|
|
|
yield db_path
|
|
|
|
# Cleanup
|
|
try:
|
|
os.unlink(db_path)
|
|
# Also remove WAL and SHM files if they exist
|
|
for ext in ['-wal', '-shm']:
|
|
wal_path = db_path + ext
|
|
if os.path.exists(wal_path):
|
|
os.unlink(wal_path)
|
|
except Exception as e:
|
|
print(f"Warning: Could not clean up test database: {e}")
|
|
|
|
|
|
@pytest.fixture
|
|
async def manager(temp_db):
|
|
"""Create a SessionManager instance with temporary database."""
|
|
mgr = SessionManager(db_path=temp_db)
|
|
await mgr._initialize_db()
|
|
yield mgr
|
|
await mgr.close()
|
|
|
|
|
|
# Configure pytest-asyncio
|
|
pytest_plugins = ('pytest_asyncio',)
|
|
|
|
|
|
class TestSessionManagerInit:
|
|
"""Test database initialization and schema creation."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_database_creation(self, temp_db):
|
|
"""
|
|
Test that database file is created and schema is initialized.
|
|
|
|
What: Verify database file creation and table schema
|
|
Why: Ensure proper initialization on first use
|
|
"""
|
|
async with SessionManager(db_path=temp_db) as manager:
|
|
assert manager._db is not None
|
|
assert Path(temp_db).exists()
|
|
|
|
# Verify sessions table exists
|
|
async with manager._db.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='sessions'"
|
|
) as cursor:
|
|
result = await cursor.fetchone()
|
|
assert result is not None
|
|
assert result[0] == 'sessions'
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_indexes_created(self, temp_db):
|
|
"""
|
|
Test that database indexes are created for performance.
|
|
|
|
What: Check for idx_last_active and idx_project_name indexes
|
|
Why: Indexes are critical for query performance
|
|
"""
|
|
async with SessionManager(db_path=temp_db) as manager:
|
|
async with manager._db.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='index'"
|
|
) as cursor:
|
|
indexes = [row[0] for row in await cursor.fetchall()]
|
|
|
|
assert 'idx_last_active' in indexes
|
|
assert 'idx_project_name' in indexes
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_default_db_path(self):
|
|
"""
|
|
Test that default database path is created in user home directory.
|
|
|
|
What: Verify ~/.claude-coordinator/sessions.db creation
|
|
Why: Users should get sensible defaults without configuration
|
|
"""
|
|
manager = SessionManager()
|
|
expected_dir = Path.home() / ".claude-coordinator"
|
|
expected_path = expected_dir / "sessions.db"
|
|
|
|
assert manager.db_path == str(expected_path)
|
|
|
|
# Cleanup if created
|
|
if expected_path.exists():
|
|
expected_path.unlink()
|
|
for ext in ['-wal', '-shm']:
|
|
wal_path = str(expected_path) + ext
|
|
if Path(wal_path).exists():
|
|
Path(wal_path).unlink()
|
|
|
|
|
|
class TestSessionCRUD:
|
|
"""Test Create, Read, Update, Delete operations for sessions."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_save_new_session(self, manager):
|
|
"""
|
|
Test creating a new session.
|
|
|
|
What: Save a new session and verify it's stored correctly
|
|
Why: Session creation is core functionality
|
|
"""
|
|
await manager.save_session(
|
|
channel_id="123456",
|
|
session_id="sess_abc123",
|
|
project_name="major-domo"
|
|
)
|
|
|
|
session = await manager.get_session("123456")
|
|
assert session is not None
|
|
assert session['channel_id'] == "123456"
|
|
assert session['session_id'] == "sess_abc123"
|
|
assert session['project_name'] == "major-domo"
|
|
assert session['message_count'] == 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_save_session_without_project(self, manager):
|
|
"""
|
|
Test creating a session without a project name.
|
|
|
|
What: Save session with project_name=None
|
|
Why: Project name should be optional
|
|
"""
|
|
await manager.save_session(
|
|
channel_id="789012",
|
|
session_id="sess_xyz789"
|
|
)
|
|
|
|
session = await manager.get_session("789012")
|
|
assert session is not None
|
|
assert session['session_id'] == "sess_xyz789"
|
|
assert session['project_name'] is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_existing_session(self, manager):
|
|
"""
|
|
Test updating an existing session with new session_id.
|
|
|
|
What: Save a session, then save again with different session_id
|
|
Why: Sessions need to be updateable when Claude sessions change
|
|
"""
|
|
# Create initial session
|
|
await manager.save_session(
|
|
channel_id="123456",
|
|
session_id="sess_old",
|
|
project_name="project-a"
|
|
)
|
|
|
|
# Update with new session_id
|
|
await manager.save_session(
|
|
channel_id="123456",
|
|
session_id="sess_new",
|
|
project_name="project-b"
|
|
)
|
|
|
|
session = await manager.get_session("123456")
|
|
assert session['session_id'] == "sess_new"
|
|
assert session['project_name'] == "project-b"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_nonexistent_session(self, manager):
|
|
"""
|
|
Test retrieving a session that doesn't exist.
|
|
|
|
What: Call get_session for channel with no session
|
|
Why: Should return None gracefully, not error
|
|
"""
|
|
session = await manager.get_session("nonexistent")
|
|
assert session is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_reset_session(self, manager):
|
|
"""
|
|
Test deleting a session.
|
|
|
|
What: Create session, reset it, verify it's gone
|
|
Why: /reset command needs to clear sessions
|
|
"""
|
|
await manager.save_session(
|
|
channel_id="123456",
|
|
session_id="sess_abc123"
|
|
)
|
|
|
|
deleted = await manager.reset_session("123456")
|
|
assert deleted is True
|
|
|
|
session = await manager.get_session("123456")
|
|
assert session is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_reset_nonexistent_session(self, manager):
|
|
"""
|
|
Test resetting a session that doesn't exist.
|
|
|
|
What: Call reset_session for channel with no session
|
|
Why: Should return False, not error
|
|
"""
|
|
deleted = await manager.reset_session("nonexistent")
|
|
assert deleted is False
|
|
|
|
|
|
class TestSessionActivity:
|
|
"""Test session activity tracking."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_activity(self, manager):
|
|
"""
|
|
Test updating last_active and incrementing message_count.
|
|
|
|
What: Create session, update activity multiple times, verify counters
|
|
Why: Activity tracking is essential for monitoring
|
|
"""
|
|
await manager.save_session(
|
|
channel_id="123456",
|
|
session_id="sess_abc123"
|
|
)
|
|
|
|
# Update activity 3 times
|
|
await manager.update_activity("123456")
|
|
await manager.update_activity("123456")
|
|
await manager.update_activity("123456")
|
|
|
|
session = await manager.get_session("123456")
|
|
assert session['message_count'] == 3
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_activity_updates_timestamp(self, manager):
|
|
"""
|
|
Test that update_activity changes last_active timestamp.
|
|
|
|
What: Create session, wait, update activity, check timestamp changed
|
|
Why: Timestamp tracking is needed for session management
|
|
"""
|
|
await manager.save_session(
|
|
channel_id="123456",
|
|
session_id="sess_abc123"
|
|
)
|
|
|
|
session1 = await manager.get_session("123456")
|
|
original_timestamp = session1['last_active']
|
|
|
|
# Small delay to ensure timestamp difference
|
|
await asyncio.sleep(0.1)
|
|
|
|
await manager.update_activity("123456")
|
|
|
|
session2 = await manager.get_session("123456")
|
|
new_timestamp = session2['last_active']
|
|
|
|
assert new_timestamp >= original_timestamp
|
|
|
|
|
|
class TestSessionListing:
|
|
"""Test listing and querying multiple sessions."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_empty_sessions(self, manager):
|
|
"""
|
|
Test listing when no sessions exist.
|
|
|
|
What: Call list_sessions on empty database
|
|
Why: Should return empty list, not error
|
|
"""
|
|
sessions = await manager.list_sessions()
|
|
assert sessions == []
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_multiple_sessions(self, manager):
|
|
"""
|
|
Test listing multiple sessions.
|
|
|
|
What: Create 3 sessions, list them, verify order
|
|
Why: Bot needs to view all active sessions
|
|
"""
|
|
await manager.save_session("channel1", "sess1", "project-a")
|
|
await asyncio.sleep(0.01) # Small delay for timestamp ordering
|
|
await manager.save_session("channel2", "sess2", "project-b")
|
|
await asyncio.sleep(0.01)
|
|
await manager.save_session("channel3", "sess3", "project-c")
|
|
|
|
sessions = await manager.list_sessions()
|
|
assert len(sessions) == 3
|
|
|
|
# Should be ordered by last_active DESC (most recent first)
|
|
assert sessions[0]['channel_id'] == "channel3"
|
|
assert sessions[1]['channel_id'] == "channel2"
|
|
assert sessions[2]['channel_id'] == "channel1"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_sessions_after_activity(self, manager):
|
|
"""
|
|
Test that list_sessions ordering changes after activity updates.
|
|
|
|
What: Create sessions, update activity on older one, verify reordering
|
|
Why: Most active sessions should appear first
|
|
"""
|
|
await manager.save_session("channel1", "sess1")
|
|
await asyncio.sleep(1.1) # SQLite CURRENT_TIMESTAMP has 1-second precision
|
|
await manager.save_session("channel2", "sess2")
|
|
|
|
# Update activity on channel1 (older session)
|
|
await asyncio.sleep(1.1) # SQLite CURRENT_TIMESTAMP has 1-second precision
|
|
await manager.update_activity("channel1")
|
|
|
|
sessions = await manager.list_sessions()
|
|
|
|
# channel1 should now be first (most recent activity)
|
|
assert sessions[0]['channel_id'] == "channel1"
|
|
assert sessions[1]['channel_id'] == "channel2"
|
|
|
|
|
|
class TestSessionStats:
|
|
"""Test session statistics and analytics."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_stats_empty_database(self, manager):
|
|
"""
|
|
Test statistics on empty database.
|
|
|
|
What: Get stats when no sessions exist
|
|
Why: Should return zeros/nulls, not error
|
|
"""
|
|
stats = await manager.get_stats()
|
|
|
|
assert stats['total_sessions'] == 0
|
|
assert stats['total_messages'] == 0
|
|
assert stats['active_projects'] == 0
|
|
assert stats['most_active_channel'] is None
|
|
assert stats['oldest_session'] is None
|
|
assert stats['newest_session'] is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_stats_with_sessions(self, manager):
|
|
"""
|
|
Test statistics calculation with multiple sessions.
|
|
|
|
What: Create sessions with varying activity, check stats
|
|
Why: Stats are used for monitoring and debugging
|
|
"""
|
|
await manager.save_session("channel1", "sess1", "project-a")
|
|
await manager.save_session("channel2", "sess2", "project-b")
|
|
await manager.save_session("channel3", "sess3", "project-a") # Same project
|
|
await manager.save_session("channel4", "sess4") # No project
|
|
|
|
# Add some message activity
|
|
for _ in range(5):
|
|
await manager.update_activity("channel1")
|
|
|
|
for _ in range(3):
|
|
await manager.update_activity("channel2")
|
|
|
|
stats = await manager.get_stats()
|
|
|
|
assert stats['total_sessions'] == 4
|
|
assert stats['total_messages'] == 8 # 5 + 3 + 0 + 0
|
|
assert stats['active_projects'] == 2 # project-a and project-b (not counting None)
|
|
assert stats['most_active_channel'] == "channel1"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_stats_oldest_newest(self, manager):
|
|
"""
|
|
Test oldest and newest session tracking.
|
|
|
|
What: Create sessions in sequence, verify oldest/newest detection
|
|
Why: Useful for session lifecycle management
|
|
"""
|
|
await manager.save_session("channel1", "sess1")
|
|
await asyncio.sleep(1.1) # SQLite CURRENT_TIMESTAMP has 1-second precision
|
|
await manager.save_session("channel2", "sess2")
|
|
await asyncio.sleep(1.1) # SQLite CURRENT_TIMESTAMP has 1-second precision
|
|
await manager.save_session("channel3", "sess3")
|
|
|
|
stats = await manager.get_stats()
|
|
|
|
assert stats['oldest_session'] == "channel1"
|
|
assert stats['newest_session'] == "channel3"
|
|
|
|
|
|
class TestConcurrentAccess:
|
|
"""Test concurrent access and thread safety."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_concurrent_reads(self, manager):
|
|
"""
|
|
Test multiple concurrent read operations.
|
|
|
|
What: Create session, then read it from multiple coroutines simultaneously
|
|
Why: Bot handles multiple channels concurrently
|
|
"""
|
|
await manager.save_session("channel1", "sess1", "project-a")
|
|
|
|
# Launch 10 concurrent reads
|
|
tasks = [manager.get_session("channel1") for _ in range(10)]
|
|
results = await asyncio.gather(*tasks)
|
|
|
|
# All reads should succeed
|
|
assert all(r is not None for r in results)
|
|
assert all(r['session_id'] == "sess1" for r in results)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_concurrent_writes(self, manager):
|
|
"""
|
|
Test multiple concurrent write operations.
|
|
|
|
What: Create multiple sessions concurrently
|
|
Why: Multiple channels may save sessions simultaneously
|
|
"""
|
|
tasks = [
|
|
manager.save_session(f"channel{i}", f"sess{i}", "project-a")
|
|
for i in range(10)
|
|
]
|
|
|
|
await asyncio.gather(*tasks)
|
|
|
|
# Verify all sessions were created
|
|
sessions = await manager.list_sessions()
|
|
assert len(sessions) == 10
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_concurrent_updates_same_channel(self, manager):
|
|
"""
|
|
Test concurrent activity updates on the same channel.
|
|
|
|
What: Update activity on one channel from multiple coroutines
|
|
Why: Ensures atomicity and prevents race conditions
|
|
"""
|
|
await manager.save_session("channel1", "sess1")
|
|
|
|
# Update activity 20 times concurrently
|
|
tasks = [manager.update_activity("channel1") for _ in range(20)]
|
|
await asyncio.gather(*tasks)
|
|
|
|
session = await manager.get_session("channel1")
|
|
assert session['message_count'] == 20
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_concurrent_mixed_operations(self, manager):
|
|
"""
|
|
Test mixed concurrent operations (reads, writes, updates).
|
|
|
|
What: Perform saves, gets, updates, and lists concurrently
|
|
Why: Real-world usage has mixed concurrent operations
|
|
"""
|
|
# Create initial session
|
|
await manager.save_session("channel1", "sess1")
|
|
|
|
# Mix of operations
|
|
tasks = [
|
|
manager.get_session("channel1"),
|
|
manager.update_activity("channel1"),
|
|
manager.save_session("channel2", "sess2"),
|
|
manager.list_sessions(),
|
|
manager.get_session("channel2"),
|
|
manager.update_activity("channel1"),
|
|
]
|
|
|
|
# All should complete without error
|
|
await asyncio.gather(*tasks)
|
|
|
|
# Verify final state
|
|
session1 = await manager.get_session("channel1")
|
|
session2 = await manager.get_session("channel2")
|
|
|
|
assert session1 is not None
|
|
assert session2 is not None
|
|
assert session1['message_count'] == 2
|
|
|
|
|
|
class TestContextManager:
|
|
"""Test async context manager usage."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_context_manager_cleanup(self, temp_db):
|
|
"""
|
|
Test that context manager properly closes database connection.
|
|
|
|
What: Use SessionManager with async context manager, verify cleanup
|
|
Why: Proper resource cleanup prevents database lock issues
|
|
"""
|
|
manager = SessionManager(db_path=temp_db)
|
|
|
|
async with manager:
|
|
await manager.save_session("channel1", "sess1")
|
|
assert manager._db is not None
|
|
|
|
# Connection should be closed after exiting context
|
|
assert manager._db is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_context_manager_exception_handling(self, temp_db):
|
|
"""
|
|
Test that context manager closes connection even on exception.
|
|
|
|
What: Raise exception inside context, verify cleanup still happens
|
|
Why: Resources must be cleaned up even on errors
|
|
"""
|
|
manager = SessionManager(db_path=temp_db)
|
|
|
|
try:
|
|
async with manager:
|
|
await manager.save_session("channel1", "sess1")
|
|
raise ValueError("Test exception")
|
|
except ValueError:
|
|
pass
|
|
|
|
# Connection should still be closed
|
|
assert manager._db is None
|
|
|
|
|
|
class TestEdgeCases:
|
|
"""Test edge cases and error conditions."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_empty_channel_id(self, manager):
|
|
"""
|
|
Test handling of empty channel_id.
|
|
|
|
What: Save/get session with empty string channel_id
|
|
Why: Should handle edge case gracefully
|
|
"""
|
|
await manager.save_session("", "sess1", "project-a")
|
|
session = await manager.get_session("")
|
|
assert session is not None
|
|
assert session['channel_id'] == ""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_very_long_ids(self, manager):
|
|
"""
|
|
Test handling of very long channel and session IDs.
|
|
|
|
What: Save session with 1000-character IDs
|
|
Why: Ensure no unexpected length limits
|
|
"""
|
|
long_channel = "c" * 1000
|
|
long_session = "s" * 1000
|
|
|
|
await manager.save_session(long_channel, long_session)
|
|
session = await manager.get_session(long_channel)
|
|
|
|
assert session is not None
|
|
assert session['channel_id'] == long_channel
|
|
assert session['session_id'] == long_session
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_special_characters_in_ids(self, manager):
|
|
"""
|
|
Test handling of special characters in IDs.
|
|
|
|
What: Use IDs with quotes, newlines, unicode
|
|
Why: Ensure proper SQL escaping
|
|
"""
|
|
special_channel = "channel'with\"quotes\nand\ttabs"
|
|
special_session = "sess🎉with📊unicode"
|
|
|
|
await manager.save_session(special_channel, special_session)
|
|
session = await manager.get_session(special_channel)
|
|
|
|
assert session is not None
|
|
assert session['channel_id'] == special_channel
|
|
assert session['session_id'] == special_session
|
|
|
|
|
|
# Performance test (optional, can be slow)
|
|
class TestPerformance:
|
|
"""Test performance characteristics (can be skipped for quick tests)."""
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.slow
|
|
async def test_large_number_of_sessions(self, manager):
|
|
"""
|
|
Test handling of large number of sessions.
|
|
|
|
What: Create 1000 sessions and verify operations remain fast
|
|
Why: Bot may accumulate many sessions over time
|
|
"""
|
|
# Create 1000 sessions
|
|
for i in range(1000):
|
|
await manager.save_session(f"channel{i}", f"sess{i}", f"project{i % 10}")
|
|
|
|
# List should still be reasonably fast
|
|
sessions = await manager.list_sessions()
|
|
assert len(sessions) == 1000
|
|
|
|
# Stats should work
|
|
stats = await manager.get_stats()
|
|
assert stats['total_sessions'] == 1000
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Run tests with pytest
|
|
import pytest
|
|
pytest.main([__file__, "-v", "-s"])
|