claude-configs/skills/mcp-manager/mcp_control.py
Cal Corum 8a1d15911f Initial commit: Claude Code configuration backup
Version control Claude Code configuration including:
- Global instructions (CLAUDE.md)
- User settings (settings.json)
- Custom agents (architect, designer, engineer, etc.)
- Custom skills (create-skill templates and workflows)

Excludes session data, secrets, cache, and temporary files per .gitignore.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-03 16:34:21 -06:00

392 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
"""
MCP Manager - Dynamic MCP Server Loading/Unloading
Intelligently manages MCP servers to minimize context consumption
"""
import json
import shutil
from pathlib import Path
from datetime import datetime
from typing import List, Dict, Optional
import sys
# Paths
CLAUDE_DIR = Path.home() / ".claude"
MCP_CONFIG = CLAUDE_DIR / ".mcp.json"
MCP_BACKUP = CLAUDE_DIR / ".mcp.json.backup"
MCP_FULL = CLAUDE_DIR / ".mcp-full.json"
MCP_MINIMAL = CLAUDE_DIR / ".mcp-minimal.json"
LOG_FILE = CLAUDE_DIR / "logs" / "mcp-manager.log"
# Ensure log directory exists
LOG_FILE.parent.mkdir(parents=True, exist_ok=True)
# MCP Registry with metadata
MCP_REGISTRY = {
"httpx": {
"type": "http",
"category": "security",
"description": "Web server stack information and tech analysis",
"triggers": ["stack", "web server", "technology", "httpx", "what tech"],
"estimated_tokens": 300
},
"naabu": {
"type": "http",
"category": "security",
"description": "Port scanning and service discovery",
"triggers": ["port scan", "open ports", "services", "naabu", "network scan"],
"estimated_tokens": 300
},
"apify": {
"type": "stdio",
"category": "scraping",
"description": "Web scraping and automation via Apify",
"triggers": ["scrape", "crawl", "extract", "apify"],
"estimated_tokens": 800
},
"brightdata": {
"type": "stdio",
"category": "scraping",
"description": "Enterprise web data extraction",
"triggers": ["scrape", "proxy", "brightdata"],
"estimated_tokens": 500
},
"playwright": {
"type": "stdio",
"category": "automation",
"description": "Browser automation and testing",
"triggers": ["browser", "screenshot", "playwright", "automate browser"],
"estimated_tokens": 1000
},
"Ref": {
"type": "stdio",
"category": "documentation",
"description": "Documentation search from public and private sources",
"triggers": ["documentation", "docs", "api reference", "ref"],
"estimated_tokens": 600
},
"stripe": {
"type": "stdio",
"category": "payments",
"description": "Stripe payment processing and operations",
"triggers": ["stripe", "payment", "billing", "subscription"],
"estimated_tokens": 700
},
"content": {
"type": "http",
"category": "personal",
"description": "Daniel Miessler's content archive",
"triggers": ["your content", "you wrote", "your blog", "your opinion"],
"estimated_tokens": 400
},
"daemon": {
"type": "http",
"category": "personal",
"description": "Daniel Miessler's personal API",
"triggers": ["daemon", "personal api"],
"estimated_tokens": 300
},
"Foundry": {
"type": "http",
"category": "personal",
"description": "Daniel Miessler's PAI infrastructure",
"triggers": ["foundry", "pai tool"],
"estimated_tokens": 500
},
"notediscovery": {
"type": "stdio",
"category": "notes",
"description": "Personal knowledge base - search, read, create, update notes",
"triggers": ["note", "notes", "knowledge base", "remember", "save this", "my notes", "write down", "jot down", "search my notes", "create note", "update note", "notediscovery"],
"estimated_tokens": 500
}
}
def log(message: str):
"""Log message with timestamp"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"[{timestamp}] {message}\n"
with open(LOG_FILE, "a") as f:
f.write(log_entry)
print(f"📝 {message}")
def backup_config():
"""Create backup of current MCP configuration"""
if MCP_CONFIG.exists():
shutil.copy(MCP_CONFIG, MCP_BACKUP)
log(f"Backed up config to {MCP_BACKUP}")
return True
return False
def read_mcp_config() -> Dict:
"""Read current MCP configuration"""
if not MCP_CONFIG.exists():
return {"mcpServers": {}}
try:
with open(MCP_CONFIG) as f:
return json.load(f)
except json.JSONDecodeError as e:
log(f"ERROR: Invalid JSON in {MCP_CONFIG}: {e}")
return None
def read_full_config() -> Dict:
"""Read full MCP configuration (all available MCPs)"""
if not MCP_FULL.exists():
log(f"WARNING: {MCP_FULL} not found, using current config as full")
return read_mcp_config()
try:
with open(MCP_FULL) as f:
return json.load(f)
except json.JSONDecodeError as e:
log(f"ERROR: Invalid JSON in {MCP_FULL}: {e}")
return None
def write_mcp_config(config: Dict) -> bool:
"""Write MCP configuration safely"""
try:
# Validate JSON first
json_str = json.dumps(config, indent=2)
# Write to file
with open(MCP_CONFIG, "w") as f:
f.write(json_str)
f.write("\n")
log(f"Updated {MCP_CONFIG}")
return True
except Exception as e:
log(f"ERROR: Failed to write config: {e}")
return False
def get_loaded_mcps() -> List[str]:
"""Get list of currently loaded MCPs"""
config = read_mcp_config()
if config and "mcpServers" in config:
return list(config["mcpServers"].keys())
return []
def get_mcp_status() -> Dict:
"""Get detailed status of all MCPs"""
loaded = get_loaded_mcps()
status = {
"loaded": loaded,
"available": list(MCP_REGISTRY.keys()),
"unloaded": [m for m in MCP_REGISTRY.keys() if m not in loaded],
"total_tokens": sum(MCP_REGISTRY[m]["estimated_tokens"] for m in loaded if m in MCP_REGISTRY)
}
return status
def enable_mcp(mcp_name: str) -> bool:
"""Enable a specific MCP"""
if mcp_name not in MCP_REGISTRY:
log(f"ERROR: Unknown MCP '{mcp_name}'")
print(f"Available MCPs: {', '.join(MCP_REGISTRY.keys())}")
return False
# Check if already loaded
loaded = get_loaded_mcps()
if mcp_name in loaded:
log(f"MCP '{mcp_name}' is already loaded")
return True
# Backup current config
backup_config()
# Read current and full configs
current_config = read_mcp_config()
full_config = read_full_config()
if current_config is None or full_config is None:
return False
# Get MCP definition from full config
if mcp_name not in full_config.get("mcpServers", {}):
log(f"ERROR: MCP '{mcp_name}' not found in full config")
return False
# Add MCP to current config
if "mcpServers" not in current_config:
current_config["mcpServers"] = {}
current_config["mcpServers"][mcp_name] = full_config["mcpServers"][mcp_name]
# Write updated config
if write_mcp_config(current_config):
tokens = MCP_REGISTRY[mcp_name]["estimated_tokens"]
log(f"✅ Enabled MCP: {mcp_name} (~{tokens} tokens)")
print(f"\n⚠️ Claude Code restart required for changes to take effect!")
return True
return False
def disable_mcp(mcp_name: str) -> bool:
"""Disable a specific MCP"""
loaded = get_loaded_mcps()
if mcp_name not in loaded:
log(f"MCP '{mcp_name}' is not currently loaded")
return True
# Backup current config
backup_config()
# Read current config
current_config = read_mcp_config()
if current_config is None:
return False
# Remove MCP
if "mcpServers" in current_config and mcp_name in current_config["mcpServers"]:
del current_config["mcpServers"][mcp_name]
# Write updated config
if write_mcp_config(current_config):
tokens = MCP_REGISTRY.get(mcp_name, {}).get("estimated_tokens", 0)
log(f"✅ Disabled MCP: {mcp_name} (freed ~{tokens} tokens)")
print(f"\n⚠️ Claude Code restart required for changes to take effect!")
return True
return False
def detect_required_mcps(user_request: str) -> List[str]:
"""Analyze user request and return list of recommended MCPs"""
request_lower = user_request.lower()
required = []
for mcp_name, mcp_info in MCP_REGISTRY.items():
for trigger in mcp_info["triggers"]:
if trigger in request_lower:
required.append(mcp_name)
break
return list(set(required)) # deduplicate
def list_mcps():
"""List all MCPs with their status"""
status = get_mcp_status()
print("\n" + "="*60)
print("MCP SERVER STATUS")
print("="*60)
print(f"\n📊 Summary:")
print(f" Loaded: {len(status['loaded'])}")
print(f" Available: {len(status['available'])}")
print(f" Context Usage: ~{status['total_tokens']} tokens")
if status['loaded']:
print(f"\n✅ Currently Loaded:")
for mcp in status['loaded']:
info = MCP_REGISTRY.get(mcp, {})
tokens = info.get('estimated_tokens', '?')
desc = info.get('description', 'No description')
print(f"{mcp} (~{tokens} tokens) - {desc}")
if status['unloaded']:
print(f"\n⭕ Available (Not Loaded):")
for mcp in status['unloaded']:
info = MCP_REGISTRY.get(mcp, {})
desc = info.get('description', 'No description')
print(f"{mcp} - {desc}")
print()
def reset_to_minimal():
"""Reset to minimal MCP configuration"""
backup_config()
minimal_config = {"mcpServers": {}}
if write_mcp_config(minimal_config):
log("✅ Reset to minimal configuration (no MCPs loaded)")
print("\n⚠️ Claude Code restart required for changes to take effect!")
return True
return False
def create_full_backup():
"""Create full config backup if it doesn't exist"""
if not MCP_FULL.exists():
current = read_mcp_config()
if current:
with open(MCP_FULL, "w") as f:
json.dump(current, f, indent=2)
f.write("\n")
log(f"Created full config backup at {MCP_FULL}")
def main():
"""CLI interface for MCP management"""
if len(sys.argv) < 2:
print("MCP Manager - Dynamic MCP Loading/Unloading")
print("\nUsage:")
print(" mcp_control.py status - Show MCP status")
print(" mcp_control.py list - List all MCPs")
print(" mcp_control.py enable <name> - Enable an MCP")
print(" mcp_control.py disable <name> - Disable an MCP")
print(" mcp_control.py detect <query> - Detect needed MCPs for query")
print(" mcp_control.py reset - Reset to minimal config")
print(" mcp_control.py backup - Create full config backup")
return
command = sys.argv[1]
if command == "status":
list_mcps()
elif command == "list":
list_mcps()
elif command == "enable":
if len(sys.argv) < 3:
print("Error: Please specify MCP name")
return
enable_mcp(sys.argv[2])
elif command == "disable":
if len(sys.argv) < 3:
print("Error: Please specify MCP name")
return
disable_mcp(sys.argv[2])
elif command == "detect":
if len(sys.argv) < 3:
print("Error: Please specify query")
return
query = " ".join(sys.argv[2:])
mcps = detect_required_mcps(query)
print(f"\n🔍 Detected MCPs for query: '{query}'")
if mcps:
print(f" Recommended: {', '.join(mcps)}")
else:
print(" No specific MCPs recommended")
elif command == "reset":
reset_to_minimal()
elif command == "backup":
create_full_backup()
else:
print(f"Unknown command: {command}")
if __name__ == "__main__":
main()