#!/usr/bin/env python3 """ MCP Manager - Dynamic MCP Server Loading/Unloading Intelligently manages MCP servers to minimize context consumption """ import json import shutil from pathlib import Path from datetime import datetime from typing import List, Dict, Optional import sys # Paths CLAUDE_DIR = Path.home() / ".claude" MCP_CONFIG = CLAUDE_DIR / ".mcp.json" MCP_BACKUP = CLAUDE_DIR / ".mcp.json.backup" MCP_FULL = CLAUDE_DIR / ".mcp-full.json" MCP_MINIMAL = CLAUDE_DIR / ".mcp-minimal.json" LOG_FILE = CLAUDE_DIR / "logs" / "mcp-manager.log" # Ensure log directory exists LOG_FILE.parent.mkdir(parents=True, exist_ok=True) # MCP Registry with metadata MCP_REGISTRY = { "httpx": { "type": "http", "category": "security", "description": "Web server stack information and tech analysis", "triggers": ["stack", "web server", "technology", "httpx", "what tech"], "estimated_tokens": 300 }, "naabu": { "type": "http", "category": "security", "description": "Port scanning and service discovery", "triggers": ["port scan", "open ports", "services", "naabu", "network scan"], "estimated_tokens": 300 }, "apify": { "type": "stdio", "category": "scraping", "description": "Web scraping and automation via Apify", "triggers": ["scrape", "crawl", "extract", "apify"], "estimated_tokens": 800 }, "brightdata": { "type": "stdio", "category": "scraping", "description": "Enterprise web data extraction", "triggers": ["scrape", "proxy", "brightdata"], "estimated_tokens": 500 }, "playwright": { "type": "stdio", "category": "automation", "description": "Browser automation and testing", "triggers": ["browser", "screenshot", "playwright", "automate browser"], "estimated_tokens": 1000 }, "Ref": { "type": "stdio", "category": "documentation", "description": "Documentation search from public and private sources", "triggers": ["documentation", "docs", "api reference", "ref"], "estimated_tokens": 600 }, "stripe": { "type": "stdio", "category": "payments", "description": "Stripe payment processing and operations", "triggers": ["stripe", "payment", "billing", "subscription"], "estimated_tokens": 700 }, "content": { "type": "http", "category": "personal", "description": "Daniel Miessler's content archive", "triggers": ["your content", "you wrote", "your blog", "your opinion"], "estimated_tokens": 400 }, "daemon": { "type": "http", "category": "personal", "description": "Daniel Miessler's personal API", "triggers": ["daemon", "personal api"], "estimated_tokens": 300 }, "Foundry": { "type": "http", "category": "personal", "description": "Daniel Miessler's PAI infrastructure", "triggers": ["foundry", "pai tool"], "estimated_tokens": 500 }, "notediscovery": { "type": "stdio", "category": "notes", "description": "Personal knowledge base - search, read, create, update notes", "triggers": ["note", "notes", "knowledge base", "remember", "save this", "my notes", "write down", "jot down", "search my notes", "create note", "update note", "notediscovery"], "estimated_tokens": 500 } } def log(message: str): """Log message with timestamp""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") log_entry = f"[{timestamp}] {message}\n" with open(LOG_FILE, "a") as f: f.write(log_entry) print(f"📝 {message}") def backup_config(): """Create backup of current MCP configuration""" if MCP_CONFIG.exists(): shutil.copy(MCP_CONFIG, MCP_BACKUP) log(f"Backed up config to {MCP_BACKUP}") return True return False def read_mcp_config() -> Dict: """Read current MCP configuration""" if not MCP_CONFIG.exists(): return {"mcpServers": {}} try: with open(MCP_CONFIG) as f: return json.load(f) except json.JSONDecodeError as e: log(f"ERROR: Invalid JSON in {MCP_CONFIG}: {e}") return None def read_full_config() -> Dict: """Read full MCP configuration (all available MCPs)""" if not MCP_FULL.exists(): log(f"WARNING: {MCP_FULL} not found, using current config as full") return read_mcp_config() try: with open(MCP_FULL) as f: return json.load(f) except json.JSONDecodeError as e: log(f"ERROR: Invalid JSON in {MCP_FULL}: {e}") return None def write_mcp_config(config: Dict) -> bool: """Write MCP configuration safely""" try: # Validate JSON first json_str = json.dumps(config, indent=2) # Write to file with open(MCP_CONFIG, "w") as f: f.write(json_str) f.write("\n") log(f"Updated {MCP_CONFIG}") return True except Exception as e: log(f"ERROR: Failed to write config: {e}") return False def get_loaded_mcps() -> List[str]: """Get list of currently loaded MCPs""" config = read_mcp_config() if config and "mcpServers" in config: return list(config["mcpServers"].keys()) return [] def get_mcp_status() -> Dict: """Get detailed status of all MCPs""" loaded = get_loaded_mcps() status = { "loaded": loaded, "available": list(MCP_REGISTRY.keys()), "unloaded": [m for m in MCP_REGISTRY.keys() if m not in loaded], "total_tokens": sum(MCP_REGISTRY[m]["estimated_tokens"] for m in loaded if m in MCP_REGISTRY) } return status def enable_mcp(mcp_name: str) -> bool: """Enable a specific MCP""" if mcp_name not in MCP_REGISTRY: log(f"ERROR: Unknown MCP '{mcp_name}'") print(f"Available MCPs: {', '.join(MCP_REGISTRY.keys())}") return False # Check if already loaded loaded = get_loaded_mcps() if mcp_name in loaded: log(f"MCP '{mcp_name}' is already loaded") return True # Backup current config backup_config() # Read current and full configs current_config = read_mcp_config() full_config = read_full_config() if current_config is None or full_config is None: return False # Get MCP definition from full config if mcp_name not in full_config.get("mcpServers", {}): log(f"ERROR: MCP '{mcp_name}' not found in full config") return False # Add MCP to current config if "mcpServers" not in current_config: current_config["mcpServers"] = {} current_config["mcpServers"][mcp_name] = full_config["mcpServers"][mcp_name] # Write updated config if write_mcp_config(current_config): tokens = MCP_REGISTRY[mcp_name]["estimated_tokens"] log(f"✅ Enabled MCP: {mcp_name} (~{tokens} tokens)") print(f"\n⚠️ Claude Code restart required for changes to take effect!") return True return False def disable_mcp(mcp_name: str) -> bool: """Disable a specific MCP""" loaded = get_loaded_mcps() if mcp_name not in loaded: log(f"MCP '{mcp_name}' is not currently loaded") return True # Backup current config backup_config() # Read current config current_config = read_mcp_config() if current_config is None: return False # Remove MCP if "mcpServers" in current_config and mcp_name in current_config["mcpServers"]: del current_config["mcpServers"][mcp_name] # Write updated config if write_mcp_config(current_config): tokens = MCP_REGISTRY.get(mcp_name, {}).get("estimated_tokens", 0) log(f"✅ Disabled MCP: {mcp_name} (freed ~{tokens} tokens)") print(f"\n⚠️ Claude Code restart required for changes to take effect!") return True return False def detect_required_mcps(user_request: str) -> List[str]: """Analyze user request and return list of recommended MCPs""" request_lower = user_request.lower() required = [] for mcp_name, mcp_info in MCP_REGISTRY.items(): for trigger in mcp_info["triggers"]: if trigger in request_lower: required.append(mcp_name) break return list(set(required)) # deduplicate def list_mcps(): """List all MCPs with their status""" status = get_mcp_status() print("\n" + "="*60) print("MCP SERVER STATUS") print("="*60) print(f"\n📊 Summary:") print(f" Loaded: {len(status['loaded'])}") print(f" Available: {len(status['available'])}") print(f" Context Usage: ~{status['total_tokens']} tokens") if status['loaded']: print(f"\n✅ Currently Loaded:") for mcp in status['loaded']: info = MCP_REGISTRY.get(mcp, {}) tokens = info.get('estimated_tokens', '?') desc = info.get('description', 'No description') print(f" • {mcp} (~{tokens} tokens) - {desc}") if status['unloaded']: print(f"\n⭕ Available (Not Loaded):") for mcp in status['unloaded']: info = MCP_REGISTRY.get(mcp, {}) desc = info.get('description', 'No description') print(f" • {mcp} - {desc}") print() def reset_to_minimal(): """Reset to minimal MCP configuration""" backup_config() minimal_config = {"mcpServers": {}} if write_mcp_config(minimal_config): log("✅ Reset to minimal configuration (no MCPs loaded)") print("\n⚠️ Claude Code restart required for changes to take effect!") return True return False def create_full_backup(): """Create full config backup if it doesn't exist""" if not MCP_FULL.exists(): current = read_mcp_config() if current: with open(MCP_FULL, "w") as f: json.dump(current, f, indent=2) f.write("\n") log(f"Created full config backup at {MCP_FULL}") def main(): """CLI interface for MCP management""" if len(sys.argv) < 2: print("MCP Manager - Dynamic MCP Loading/Unloading") print("\nUsage:") print(" mcp_control.py status - Show MCP status") print(" mcp_control.py list - List all MCPs") print(" mcp_control.py enable - Enable an MCP") print(" mcp_control.py disable - Disable an MCP") print(" mcp_control.py detect - Detect needed MCPs for query") print(" mcp_control.py reset - Reset to minimal config") print(" mcp_control.py backup - Create full config backup") return command = sys.argv[1] if command == "status": list_mcps() elif command == "list": list_mcps() elif command == "enable": if len(sys.argv) < 3: print("Error: Please specify MCP name") return enable_mcp(sys.argv[2]) elif command == "disable": if len(sys.argv) < 3: print("Error: Please specify MCP name") return disable_mcp(sys.argv[2]) elif command == "detect": if len(sys.argv) < 3: print("Error: Please specify query") return query = " ".join(sys.argv[2:]) mcps = detect_required_mcps(query) print(f"\n🔍 Detected MCPs for query: '{query}'") if mcps: print(f" Recommended: {', '.join(mcps)}") else: print(" No specific MCPs recommended") elif command == "reset": reset_to_minimal() elif command == "backup": create_full_backup() else: print(f"Unknown command: {command}") if __name__ == "__main__": main()