Version control Claude Code configuration including: - Global instructions (CLAUDE.md) - User settings (settings.json) - Custom agents (architect, designer, engineer, etc.) - Custom skills (create-skill templates and workflows) Excludes session data, secrets, cache, and temporary files per .gitignore. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
392 lines
12 KiB
Python
Executable File
392 lines
12 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
MCP Manager - Dynamic MCP Server Loading/Unloading
|
|
Intelligently manages MCP servers to minimize context consumption
|
|
"""
|
|
|
|
import json
|
|
import shutil
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from typing import List, Dict, Optional
|
|
import sys
|
|
|
|
# Paths
|
|
CLAUDE_DIR = Path.home() / ".claude"
|
|
MCP_CONFIG = CLAUDE_DIR / ".mcp.json"
|
|
MCP_BACKUP = CLAUDE_DIR / ".mcp.json.backup"
|
|
MCP_FULL = CLAUDE_DIR / ".mcp-full.json"
|
|
MCP_MINIMAL = CLAUDE_DIR / ".mcp-minimal.json"
|
|
LOG_FILE = CLAUDE_DIR / "logs" / "mcp-manager.log"
|
|
|
|
# Ensure log directory exists
|
|
LOG_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# MCP Registry with metadata
|
|
MCP_REGISTRY = {
|
|
"httpx": {
|
|
"type": "http",
|
|
"category": "security",
|
|
"description": "Web server stack information and tech analysis",
|
|
"triggers": ["stack", "web server", "technology", "httpx", "what tech"],
|
|
"estimated_tokens": 300
|
|
},
|
|
"naabu": {
|
|
"type": "http",
|
|
"category": "security",
|
|
"description": "Port scanning and service discovery",
|
|
"triggers": ["port scan", "open ports", "services", "naabu", "network scan"],
|
|
"estimated_tokens": 300
|
|
},
|
|
"apify": {
|
|
"type": "stdio",
|
|
"category": "scraping",
|
|
"description": "Web scraping and automation via Apify",
|
|
"triggers": ["scrape", "crawl", "extract", "apify"],
|
|
"estimated_tokens": 800
|
|
},
|
|
"brightdata": {
|
|
"type": "stdio",
|
|
"category": "scraping",
|
|
"description": "Enterprise web data extraction",
|
|
"triggers": ["scrape", "proxy", "brightdata"],
|
|
"estimated_tokens": 500
|
|
},
|
|
"playwright": {
|
|
"type": "stdio",
|
|
"category": "automation",
|
|
"description": "Browser automation and testing",
|
|
"triggers": ["browser", "screenshot", "playwright", "automate browser"],
|
|
"estimated_tokens": 1000
|
|
},
|
|
"Ref": {
|
|
"type": "stdio",
|
|
"category": "documentation",
|
|
"description": "Documentation search from public and private sources",
|
|
"triggers": ["documentation", "docs", "api reference", "ref"],
|
|
"estimated_tokens": 600
|
|
},
|
|
"stripe": {
|
|
"type": "stdio",
|
|
"category": "payments",
|
|
"description": "Stripe payment processing and operations",
|
|
"triggers": ["stripe", "payment", "billing", "subscription"],
|
|
"estimated_tokens": 700
|
|
},
|
|
"content": {
|
|
"type": "http",
|
|
"category": "personal",
|
|
"description": "Daniel Miessler's content archive",
|
|
"triggers": ["your content", "you wrote", "your blog", "your opinion"],
|
|
"estimated_tokens": 400
|
|
},
|
|
"daemon": {
|
|
"type": "http",
|
|
"category": "personal",
|
|
"description": "Daniel Miessler's personal API",
|
|
"triggers": ["daemon", "personal api"],
|
|
"estimated_tokens": 300
|
|
},
|
|
"Foundry": {
|
|
"type": "http",
|
|
"category": "personal",
|
|
"description": "Daniel Miessler's PAI infrastructure",
|
|
"triggers": ["foundry", "pai tool"],
|
|
"estimated_tokens": 500
|
|
},
|
|
"notediscovery": {
|
|
"type": "stdio",
|
|
"category": "notes",
|
|
"description": "Personal knowledge base - search, read, create, update notes",
|
|
"triggers": ["note", "notes", "knowledge base", "remember", "save this", "my notes", "write down", "jot down", "search my notes", "create note", "update note", "notediscovery"],
|
|
"estimated_tokens": 500
|
|
}
|
|
}
|
|
|
|
|
|
def log(message: str):
|
|
"""Log message with timestamp"""
|
|
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
log_entry = f"[{timestamp}] {message}\n"
|
|
with open(LOG_FILE, "a") as f:
|
|
f.write(log_entry)
|
|
print(f"📝 {message}")
|
|
|
|
|
|
def backup_config():
|
|
"""Create backup of current MCP configuration"""
|
|
if MCP_CONFIG.exists():
|
|
shutil.copy(MCP_CONFIG, MCP_BACKUP)
|
|
log(f"Backed up config to {MCP_BACKUP}")
|
|
return True
|
|
return False
|
|
|
|
|
|
def read_mcp_config() -> Dict:
|
|
"""Read current MCP configuration"""
|
|
if not MCP_CONFIG.exists():
|
|
return {"mcpServers": {}}
|
|
|
|
try:
|
|
with open(MCP_CONFIG) as f:
|
|
return json.load(f)
|
|
except json.JSONDecodeError as e:
|
|
log(f"ERROR: Invalid JSON in {MCP_CONFIG}: {e}")
|
|
return None
|
|
|
|
|
|
def read_full_config() -> Dict:
|
|
"""Read full MCP configuration (all available MCPs)"""
|
|
if not MCP_FULL.exists():
|
|
log(f"WARNING: {MCP_FULL} not found, using current config as full")
|
|
return read_mcp_config()
|
|
|
|
try:
|
|
with open(MCP_FULL) as f:
|
|
return json.load(f)
|
|
except json.JSONDecodeError as e:
|
|
log(f"ERROR: Invalid JSON in {MCP_FULL}: {e}")
|
|
return None
|
|
|
|
|
|
def write_mcp_config(config: Dict) -> bool:
|
|
"""Write MCP configuration safely"""
|
|
try:
|
|
# Validate JSON first
|
|
json_str = json.dumps(config, indent=2)
|
|
|
|
# Write to file
|
|
with open(MCP_CONFIG, "w") as f:
|
|
f.write(json_str)
|
|
f.write("\n")
|
|
|
|
log(f"Updated {MCP_CONFIG}")
|
|
return True
|
|
except Exception as e:
|
|
log(f"ERROR: Failed to write config: {e}")
|
|
return False
|
|
|
|
|
|
def get_loaded_mcps() -> List[str]:
|
|
"""Get list of currently loaded MCPs"""
|
|
config = read_mcp_config()
|
|
if config and "mcpServers" in config:
|
|
return list(config["mcpServers"].keys())
|
|
return []
|
|
|
|
|
|
def get_mcp_status() -> Dict:
|
|
"""Get detailed status of all MCPs"""
|
|
loaded = get_loaded_mcps()
|
|
status = {
|
|
"loaded": loaded,
|
|
"available": list(MCP_REGISTRY.keys()),
|
|
"unloaded": [m for m in MCP_REGISTRY.keys() if m not in loaded],
|
|
"total_tokens": sum(MCP_REGISTRY[m]["estimated_tokens"] for m in loaded if m in MCP_REGISTRY)
|
|
}
|
|
return status
|
|
|
|
|
|
def enable_mcp(mcp_name: str) -> bool:
|
|
"""Enable a specific MCP"""
|
|
if mcp_name not in MCP_REGISTRY:
|
|
log(f"ERROR: Unknown MCP '{mcp_name}'")
|
|
print(f"Available MCPs: {', '.join(MCP_REGISTRY.keys())}")
|
|
return False
|
|
|
|
# Check if already loaded
|
|
loaded = get_loaded_mcps()
|
|
if mcp_name in loaded:
|
|
log(f"MCP '{mcp_name}' is already loaded")
|
|
return True
|
|
|
|
# Backup current config
|
|
backup_config()
|
|
|
|
# Read current and full configs
|
|
current_config = read_mcp_config()
|
|
full_config = read_full_config()
|
|
|
|
if current_config is None or full_config is None:
|
|
return False
|
|
|
|
# Get MCP definition from full config
|
|
if mcp_name not in full_config.get("mcpServers", {}):
|
|
log(f"ERROR: MCP '{mcp_name}' not found in full config")
|
|
return False
|
|
|
|
# Add MCP to current config
|
|
if "mcpServers" not in current_config:
|
|
current_config["mcpServers"] = {}
|
|
|
|
current_config["mcpServers"][mcp_name] = full_config["mcpServers"][mcp_name]
|
|
|
|
# Write updated config
|
|
if write_mcp_config(current_config):
|
|
tokens = MCP_REGISTRY[mcp_name]["estimated_tokens"]
|
|
log(f"✅ Enabled MCP: {mcp_name} (~{tokens} tokens)")
|
|
print(f"\n⚠️ Claude Code restart required for changes to take effect!")
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def disable_mcp(mcp_name: str) -> bool:
|
|
"""Disable a specific MCP"""
|
|
loaded = get_loaded_mcps()
|
|
|
|
if mcp_name not in loaded:
|
|
log(f"MCP '{mcp_name}' is not currently loaded")
|
|
return True
|
|
|
|
# Backup current config
|
|
backup_config()
|
|
|
|
# Read current config
|
|
current_config = read_mcp_config()
|
|
if current_config is None:
|
|
return False
|
|
|
|
# Remove MCP
|
|
if "mcpServers" in current_config and mcp_name in current_config["mcpServers"]:
|
|
del current_config["mcpServers"][mcp_name]
|
|
|
|
# Write updated config
|
|
if write_mcp_config(current_config):
|
|
tokens = MCP_REGISTRY.get(mcp_name, {}).get("estimated_tokens", 0)
|
|
log(f"✅ Disabled MCP: {mcp_name} (freed ~{tokens} tokens)")
|
|
print(f"\n⚠️ Claude Code restart required for changes to take effect!")
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def detect_required_mcps(user_request: str) -> List[str]:
|
|
"""Analyze user request and return list of recommended MCPs"""
|
|
request_lower = user_request.lower()
|
|
required = []
|
|
|
|
for mcp_name, mcp_info in MCP_REGISTRY.items():
|
|
for trigger in mcp_info["triggers"]:
|
|
if trigger in request_lower:
|
|
required.append(mcp_name)
|
|
break
|
|
|
|
return list(set(required)) # deduplicate
|
|
|
|
|
|
def list_mcps():
|
|
"""List all MCPs with their status"""
|
|
status = get_mcp_status()
|
|
|
|
print("\n" + "="*60)
|
|
print("MCP SERVER STATUS")
|
|
print("="*60)
|
|
|
|
print(f"\n📊 Summary:")
|
|
print(f" Loaded: {len(status['loaded'])}")
|
|
print(f" Available: {len(status['available'])}")
|
|
print(f" Context Usage: ~{status['total_tokens']} tokens")
|
|
|
|
if status['loaded']:
|
|
print(f"\n✅ Currently Loaded:")
|
|
for mcp in status['loaded']:
|
|
info = MCP_REGISTRY.get(mcp, {})
|
|
tokens = info.get('estimated_tokens', '?')
|
|
desc = info.get('description', 'No description')
|
|
print(f" • {mcp} (~{tokens} tokens) - {desc}")
|
|
|
|
if status['unloaded']:
|
|
print(f"\n⭕ Available (Not Loaded):")
|
|
for mcp in status['unloaded']:
|
|
info = MCP_REGISTRY.get(mcp, {})
|
|
desc = info.get('description', 'No description')
|
|
print(f" • {mcp} - {desc}")
|
|
|
|
print()
|
|
|
|
|
|
def reset_to_minimal():
|
|
"""Reset to minimal MCP configuration"""
|
|
backup_config()
|
|
|
|
minimal_config = {"mcpServers": {}}
|
|
|
|
if write_mcp_config(minimal_config):
|
|
log("✅ Reset to minimal configuration (no MCPs loaded)")
|
|
print("\n⚠️ Claude Code restart required for changes to take effect!")
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def create_full_backup():
|
|
"""Create full config backup if it doesn't exist"""
|
|
if not MCP_FULL.exists():
|
|
current = read_mcp_config()
|
|
if current:
|
|
with open(MCP_FULL, "w") as f:
|
|
json.dump(current, f, indent=2)
|
|
f.write("\n")
|
|
log(f"Created full config backup at {MCP_FULL}")
|
|
|
|
|
|
def main():
|
|
"""CLI interface for MCP management"""
|
|
if len(sys.argv) < 2:
|
|
print("MCP Manager - Dynamic MCP Loading/Unloading")
|
|
print("\nUsage:")
|
|
print(" mcp_control.py status - Show MCP status")
|
|
print(" mcp_control.py list - List all MCPs")
|
|
print(" mcp_control.py enable <name> - Enable an MCP")
|
|
print(" mcp_control.py disable <name> - Disable an MCP")
|
|
print(" mcp_control.py detect <query> - Detect needed MCPs for query")
|
|
print(" mcp_control.py reset - Reset to minimal config")
|
|
print(" mcp_control.py backup - Create full config backup")
|
|
return
|
|
|
|
command = sys.argv[1]
|
|
|
|
if command == "status":
|
|
list_mcps()
|
|
|
|
elif command == "list":
|
|
list_mcps()
|
|
|
|
elif command == "enable":
|
|
if len(sys.argv) < 3:
|
|
print("Error: Please specify MCP name")
|
|
return
|
|
enable_mcp(sys.argv[2])
|
|
|
|
elif command == "disable":
|
|
if len(sys.argv) < 3:
|
|
print("Error: Please specify MCP name")
|
|
return
|
|
disable_mcp(sys.argv[2])
|
|
|
|
elif command == "detect":
|
|
if len(sys.argv) < 3:
|
|
print("Error: Please specify query")
|
|
return
|
|
query = " ".join(sys.argv[2:])
|
|
mcps = detect_required_mcps(query)
|
|
print(f"\n🔍 Detected MCPs for query: '{query}'")
|
|
if mcps:
|
|
print(f" Recommended: {', '.join(mcps)}")
|
|
else:
|
|
print(" No specific MCPs recommended")
|
|
|
|
elif command == "reset":
|
|
reset_to_minimal()
|
|
|
|
elif command == "backup":
|
|
create_full_backup()
|
|
|
|
else:
|
|
print(f"Unknown command: {command}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|