#!/usr/bin/env python3 """ Cognitive Memory MCP Server JSON-RPC 2.0 stdio MCP server that wraps CognitiveMemoryClient. Exposes 18 memory operations as MCP tools for Claude Code. """ import json import subprocess import sys from pathlib import Path from typing import Any, Dict, Optional # Allow imports from this directory (client.py lives here) sys.path.insert(0, str(Path(__file__).parent)) from client import CognitiveMemoryClient, _load_memory_config, MEMORY_DIR from common import resolve_graph_path, list_graphs SYNC_SCRIPT = Path(__file__).parent / "scripts" / "memory-git-sync.sh" # Auto-edge heuristics: (new_type, match_type) -> (rel_type, direction) # direction "ab" = new->match, "ba" = match->new # Subset of edge-proposer.py's TYPE_HEURISTICS — excludes: # - Same-type pairs (decision/decision, fix/fix, solution/solution): too noisy for auto-edges # - REQUIRES/FOLLOWS pairs: need stronger signal than title recall provides AUTO_EDGE_HEURISTICS = { ("fix", "problem"): ("SOLVES", "ab"), ("solution", "problem"): ("SOLVES", "ab"), ("solution", "error"): ("SOLVES", "ab"), ("fix", "error"): ("SOLVES", "ab"), ("insight", "solution"): ("BUILDS_ON", "ab"), ("insight", "decision"): ("BUILDS_ON", "ab"), ("decision", "solution"): ("BUILDS_ON", "ab"), ("fix", "solution"): ("BUILDS_ON", "ab"), ("code_pattern", "solution"): ("BUILDS_ON", "ab"), ("procedure", "workflow"): ("BUILDS_ON", "ab"), } AUTO_EDGE_SIMILARITY_THRESHOLD = 0.4 AUTO_EDGE_MAX = 3 _clients: Dict[str, CognitiveMemoryClient] = {} def get_client(graph: Optional[str] = None) -> CognitiveMemoryClient: """Get or create a CognitiveMemoryClient for the given graph. Resolution order: explicit graph param > COGNITIVE_MEMORY_GRAPH env var > "default". Set COGNITIVE_MEMORY_GRAPH per-project via Claude Code's .claude/settings.json env overrides. """ import os effective_graph = graph or os.environ.get("COGNITIVE_MEMORY_GRAPH") or None key = effective_graph or "default" if key not in _clients: path = resolve_graph_path(effective_graph) _clients[key] = CognitiveMemoryClient(memory_dir=path) return _clients[key] def _trigger_git_sync(memory_dir: Optional[Path] = None): """Fire-and-forget git sync after a write operation.""" if SYNC_SCRIPT.exists(): try: env = None if memory_dir is not None: import os env = {**os.environ, "COGNITIVE_MEMORY_DIR": str(memory_dir)} subprocess.Popen( [str(SYNC_SCRIPT)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True, env=env, ) except Exception: pass # Non-critical — daily timer is the fallback def create_tools() -> list: """Define all 18 MCP tool definitions with inputSchema.""" return [ { "name": "memory_store", "description": ( "Store a new memory in the cognitive memory system. " "Creates a markdown file with YAML frontmatter and returns the new memory UUID. " "Valid types: solution, fix, decision, configuration, problem, workflow, " "code_pattern, error, general, procedure, insight." ), "inputSchema": { "type": "object", "properties": { "type": { "type": "string", "description": "Memory type (solution, fix, decision, configuration, problem, workflow, code_pattern, error, general, procedure, insight)", }, "title": { "type": "string", "description": "Short descriptive title for the memory", }, "content": { "type": "string", "description": "Full content/body of the memory in markdown", }, "tags": { "type": "array", "items": {"type": "string"}, "description": "List of lowercase tags for categorisation (e.g. ['python', 'fix', 'discord'])", }, "importance": { "type": "number", "description": "Importance score from 0.0 to 1.0 (default 0.5)", }, "episode": { "type": "boolean", "description": "Also log an episode entry for this memory (default true)", }, "graph": { "type": "string", "description": "Named memory graph to use (default: 'default')", }, }, "required": ["type", "title", "content"], }, }, { "name": "memory_recall", "description": ( "Search memories by a natural language query, ranked by relevance and decay score. " "Semantic search is enabled by default when embeddings exist. Set semantic=false for keyword-only." ), "inputSchema": { "type": "object", "properties": { "query": { "type": "string", "description": "Natural language search query", }, "semantic": { "type": "boolean", "description": "Merge with semantic/vector similarity search (requires embeddings, default true)", }, "limit": { "type": "integer", "description": "Maximum number of results to return (default 10)", }, "graph": { "type": "string", "description": "Named memory graph to use (default: 'default')", }, }, "required": ["query"], }, }, { "name": "memory_get", "description": ( "Retrieve a single memory by its UUID, including full content, frontmatter metadata, " "relations, and current decay score." ), "inputSchema": { "type": "object", "properties": { "memory_id": { "type": "string", "description": "UUID of the memory to retrieve", }, "graph": { "type": "string", "description": "Named memory graph to use (default: 'default')", }, }, "required": ["memory_id"], }, }, { "name": "memory_search", "description": ( "Filter memories by type, tags, and/or minimum importance score. " "Optionally include a text query. Returns results sorted by importance descending. " "Use this for structured browsing; use memory_recall for ranked relevance search." ), "inputSchema": { "type": "object", "properties": { "query": { "type": "string", "description": "Optional text query to filter results", }, "memory_types": { "type": "array", "items": {"type": "string"}, "description": "Filter by memory types (e.g. ['solution', 'fix'])", }, "tags": { "type": "array", "items": {"type": "string"}, "description": "Filter by tags — memory must have at least one of these", }, "min_importance": { "type": "number", "description": "Minimum importance score (0.0 to 1.0)", }, "graph": { "type": "string", "description": "Named memory graph to use (default: 'default')", }, }, }, }, { "name": "memory_relate", "description": ( "Create a typed relationship (edge) between two memories. " "Valid relation types: SOLVES, CAUSES, BUILDS_ON, ALTERNATIVE_TO, REQUIRES, FOLLOWS, RELATED_TO. " "Returns the new edge UUID, or empty string if the relationship already exists." ), "inputSchema": { "type": "object", "properties": { "from_id": { "type": "string", "description": "UUID of the source memory", }, "to_id": { "type": "string", "description": "UUID of the target memory", }, "rel_type": { "type": "string", "description": "Relationship type (SOLVES, CAUSES, BUILDS_ON, ALTERNATIVE_TO, REQUIRES, FOLLOWS, RELATED_TO)", }, "description": { "type": "string", "description": "Optional human-readable description of the relationship", }, "strength": { "type": "number", "description": "Relationship strength from 0.0 to 1.0 (default 0.8)", }, "graph": { "type": "string", "description": "Named memory graph to use (default: 'default')", }, }, "required": ["from_id", "to_id", "rel_type"], }, }, { "name": "memory_related", "description": ( "Traverse the relationship graph from a given memory, returning connected memories " "up to max_depth hops away. Optionally filter by relationship type." ), "inputSchema": { "type": "object", "properties": { "memory_id": { "type": "string", "description": "UUID of the starting memory", }, "rel_types": { "type": "array", "items": {"type": "string"}, "description": "Filter by relation types (e.g. ['SOLVES', 'BUILDS_ON'])", }, "max_depth": { "type": "integer", "description": "Maximum traversal depth (1-5, default 1)", }, "graph": { "type": "string", "description": "Named memory graph to use (default: 'default')", }, }, "required": ["memory_id"], }, }, { "name": "memory_edge_get", "description": ( "Retrieve a single relationship edge by its UUID, including metadata and description body." ), "inputSchema": { "type": "object", "properties": { "edge_id": { "type": "string", "description": "UUID of the edge to retrieve", }, "graph": { "type": "string", "description": "Named memory graph to use (default: 'default')", }, }, "required": ["edge_id"], }, }, { "name": "memory_edge_search", "description": ( "Search relationship edges by type, connected memory IDs, or a text query " "that matches against the from/to memory titles and relationship type." ), "inputSchema": { "type": "object", "properties": { "query": { "type": "string", "description": "Text query to match against edge titles and type", }, "types": { "type": "array", "items": {"type": "string"}, "description": "Filter by relationship types (e.g. ['SOLVES'])", }, "from_id": { "type": "string", "description": "Filter to edges originating from this memory UUID", }, "to_id": { "type": "string", "description": "Filter to edges pointing at this memory UUID", }, "graph": { "type": "string", "description": "Named memory graph to use (default: 'default')", }, }, }, }, { "name": "memory_reflect", "description": ( "Review memories created since a given date, cluster them by shared tags, " "and return consolidation recommendations. Does NOT auto-create new memories — " "you review the output and decide what to store. Set dry_run=true to skip " "updating state and logging an episode entry." ), "inputSchema": { "type": "object", "properties": { "since": { "type": "string", "description": "ISO date (YYYY-MM-DD) to review memories from. Defaults to last reflection date or 30 days ago.", }, "dry_run": { "type": "boolean", "description": "If true, return analysis without persisting state changes (default false)", }, "graph": { "type": "string", "description": "Named memory graph to use (default: 'default')", }, }, }, }, { "name": "memory_reflection", "description": ( "Return the current REFLECTION.md summary — the auto-curated narrative of recent " "memory themes, clusters, and activity. Use this to quickly orient at session start." ), "inputSchema": { "type": "object", "properties": { "graph": { "type": "string", "description": "Named memory graph to use (default: 'default')", }, }, }, }, { "name": "memory_stats", "description": ( "Return statistics about the memory system: total count, breakdown by type, " "relation count, decay distribution, embeddings count, and per-directory file counts." ), "inputSchema": { "type": "object", "properties": { "graph": { "type": "string", "description": "Named memory graph to use (default: 'default')", }, }, }, }, { "name": "memory_episode", "description": ( "Append a timestamped entry to today's episode log file (episodes/YYYY-MM-DD.md in the active graph). " "Use this to record significant session events, commits, or decisions without creating full memories." ), "inputSchema": { "type": "object", "properties": { "type": { "type": "string", "description": "Episode entry type (e.g. fix, commit, decision, automation)", }, "title": { "type": "string", "description": "Short title for the episode entry", }, "tags": { "type": "array", "items": {"type": "string"}, "description": "Tags for the episode entry", }, "summary": { "type": "string", "description": "Optional summary text for the entry", }, "graph": { "type": "string", "description": "Named memory graph to use (default: 'default')", }, }, "required": ["type", "title"], }, }, { "name": "memory_tags_list", "description": ( "List all tags used across the memory system, sorted by usage frequency. " "Returns tag name and count of memories using it." ), "inputSchema": { "type": "object", "properties": { "limit": { "type": "integer", "description": "Maximum number of tags to return (0 = unlimited, default 0)", }, "graph": { "type": "string", "description": "Named memory graph to use (default: 'default')", }, }, }, }, { "name": "memory_tags_related", "description": ( "Find tags that frequently co-occur with a given tag, sorted by co-occurrence count. " "Useful for discovering related topics and navigating the tag graph." ), "inputSchema": { "type": "object", "properties": { "tag": { "type": "string", "description": "The tag to find co-occurring tags for", }, "limit": { "type": "integer", "description": "Maximum number of related tags to return (0 = unlimited, default 0)", }, "graph": { "type": "string", "description": "Named memory graph to use (default: 'default')", }, }, "required": ["tag"], }, }, { "name": "memory_embed", "description": ( "Generate or refresh vector embeddings for all memories that do not yet have them. " "Requires either Ollama (nomic-embed-text model) or an OpenAI API key configured. " "Embeddings enable semantic recall via memory_recall with semantic=true." ), "inputSchema": { "type": "object", "properties": { "graph": { "type": "string", "description": "Named memory graph to use (default: 'default')", }, }, }, }, { "name": "memory_core", "description": ( "Return the current CORE.md content — the auto-curated high-priority memory digest " "used to seed Claude sessions. Lists critical solutions, active decisions, and key fixes." ), "inputSchema": { "type": "object", "properties": { "graph": { "type": "string", "description": "Named memory graph to use (default: 'default')", }, }, }, }, { "name": "memory_decay", "description": ( "Run a decay pass over all memories: recalculate decay scores based on age, " "access frequency, importance, and type weight. Archives memories whose score " "drops below the dormant threshold. Returns a summary of updated scores." ), "inputSchema": { "type": "object", "properties": { "graph": { "type": "string", "description": "Named memory graph to use (default: 'default')", }, }, }, }, { "name": "memory_config", "description": ( "View or update the cognitive memory embedding configuration (_config.json). " "Set action='show' to display current config (API key is masked). " "Provide provider='openai' or provider='ollama' to switch embedding backends. " "Provide openai_api_key to set the OpenAI API key for embeddings." ), "inputSchema": { "type": "object", "properties": { "action": { "type": "string", "description": "Set to 'show' to display current config without modifying it", }, "provider": { "type": "string", "description": "Embedding provider: 'ollama' or 'openai'", }, "openai_api_key": { "type": "string", "description": "OpenAI API key to store in config", }, "graph": { "type": "string", "description": "Named memory graph to use (default: 'default')", }, }, }, }, { "name": "memory_graphs", "description": ( "List all available memory graphs (named, segregated memory namespaces). " "Returns each graph's name, path, and whether it exists on disk." ), "inputSchema": {"type": "object", "properties": {}}, }, ] def _auto_create_edges( client: CognitiveMemoryClient, memory_id: str, title: str, mem_type: str, tags: Optional[list] = None, ) -> list: """Auto-create edges between a newly stored memory and related existing memories. Uses recall to find similar memories and heuristic type-pairs to choose relationship types. Returns list of created edge info dicts. Never raises — returns partial results or empty list with error info so store always succeeds. Returns (edges_list, error_string_or_None). """ try: # Build recall query from title + tags for better precision (#4) query = title if tags: query = f"{title} {' '.join(tags)}" results = client.recall(query, limit=5, semantic=True) # Filter out the just-stored memory results = [r for r in results if r.get("id") != memory_id] # Filter by similarity threshold before slicing to AUTO_EDGE_MAX (#5) # When embeddings are absent, keyword-only results lack "similarity" — # require at least a tag overlap to avoid spurious edges (#1) filtered = [] new_tags = set(tags or []) for result in results: similarity = result.get("similarity") if similarity is not None: # Semantic path: use similarity threshold if similarity < AUTO_EDGE_SIMILARITY_THRESHOLD: continue else: # Keyword-only path: require at least one shared tag match_tags = set(result.get("tags") or []) if not new_tags or not (new_tags & match_tags): continue filtered.append(result) created_edges = [] for result in filtered[:AUTO_EDGE_MAX]: match_type = result.get("type", "") match_id = result["id"] match_title = result.get("title", "") # Look up heuristic for (new_type, match_type) in both orderings key_ab = (mem_type, match_type) key_ba = (match_type, mem_type) if key_ab in AUTO_EDGE_HEURISTICS: rel_type, direction = AUTO_EDGE_HEURISTICS[key_ab] from_id = memory_id if direction == "ab" else match_id to_id = match_id if direction == "ab" else memory_id elif key_ba in AUTO_EDGE_HEURISTICS: rel_type, direction = AUTO_EDGE_HEURISTICS[key_ba] # Reverse: if heuristic says ab for (match, new), then match->new from_id = match_id if direction == "ab" else memory_id to_id = memory_id if direction == "ab" else match_id else: rel_type = "RELATED_TO" from_id = memory_id to_id = match_id # Build description matching actual edge direction (#2) from_title = title if from_id == memory_id else match_title to_title = match_title if to_id == match_id else title desc = f"Auto-edge: {from_title} → {to_title}" # Use similarity as edge strength when available (default 0.8) sim = result.get("similarity") strength = round(sim, 2) if sim is not None else 0.8 edge_id = client.relate( from_id=from_id, to_id=to_id, rel_type=rel_type, description=desc, strength=strength, skip_commit=True, # batched — git sync handles persistence ) if edge_id: # Empty string means duplicate created_edges.append( { "edge_id": edge_id, "rel_type": rel_type, "linked_memory_id": match_id, "linked_title": match_title, } ) return created_edges, None except Exception as e: return [], f"{type(e).__name__}: {e}" def handle_tool_call(tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]: """Dispatch MCP tool calls to the CognitiveMemoryClient.""" def ok(result: Any) -> Dict[str, Any]: return { "content": [ {"type": "text", "text": json.dumps(result, indent=2, default=str)} ] } try: graph = arguments.pop("graph", None) client = get_client(graph) if tool_name == "memory_store": mem_type = arguments["type"] title = arguments["title"] tags = arguments.get("tags") memory_id = client.store( type=mem_type, title=title, content=arguments["content"], tags=tags, importance=arguments.get("importance", 0.5), ) # Auto-log episode entry (default true, opt-out with episode=false) episode_logged = False if arguments.get("episode", True): client.episode( type=mem_type, title=title, tags=tags, ) episode_logged = True auto_edges, auto_edge_error = _auto_create_edges( client, memory_id, title, mem_type, tags ) _trigger_git_sync(client.memory_dir) result = { "success": True, "memory_id": memory_id, "episode_logged": episode_logged, "auto_edges": auto_edges, } if auto_edge_error: result["auto_edge_error"] = auto_edge_error return ok(result) elif tool_name == "memory_recall": results = client.recall( query=arguments["query"], semantic=arguments.get("semantic", True), limit=arguments.get("limit", 10), ) return ok(results) elif tool_name == "memory_get": result = client.get(arguments["memory_id"]) if result is None: return ok({"error": f"Memory not found: {arguments['memory_id']}"}) return ok(result) elif tool_name == "memory_search": results = client.search( query=arguments.get("query"), memory_types=arguments.get("memory_types"), tags=arguments.get("tags"), min_importance=arguments.get("min_importance"), ) return ok(results) elif tool_name == "memory_relate": edge_id = client.relate( from_id=arguments["from_id"], to_id=arguments["to_id"], rel_type=arguments["rel_type"], description=arguments.get("description"), strength=arguments.get("strength", 0.8), ) if edge_id: _trigger_git_sync(client.memory_dir) return ok({"success": True, "edge_id": edge_id}) return ok({"success": False, "message": "Relationship already exists"}) elif tool_name == "memory_related": results = client.related( memory_id=arguments["memory_id"], rel_types=arguments.get("rel_types"), max_depth=arguments.get("max_depth", 1), ) return ok(results) elif tool_name == "memory_edge_get": result = client.edge_get(arguments["edge_id"]) if result is None: return ok({"error": f"Edge not found: {arguments['edge_id']}"}) return ok(result) elif tool_name == "memory_edge_search": results = client.edge_search( query=arguments.get("query"), types=arguments.get("types"), from_id=arguments.get("from_id"), to_id=arguments.get("to_id"), ) return ok(results) elif tool_name == "memory_reflect": result = client.reflect( since=arguments.get("since"), dry_run=arguments.get("dry_run", False), ) return ok(result) elif tool_name == "memory_reflection": text = client.reflection_summary() return ok({"content": text}) elif tool_name == "memory_stats": result = client.stats() return ok(result) elif tool_name == "memory_episode": client.episode( type=arguments["type"], title=arguments["title"], tags=arguments.get("tags"), summary=arguments.get("summary"), ) return ok({"success": True}) elif tool_name == "memory_tags_list": results = client.tags_list(limit=arguments.get("limit", 0)) return ok(results) elif tool_name == "memory_tags_related": results = client.tags_related( tag=arguments["tag"], limit=arguments.get("limit", 0), ) return ok(results) elif tool_name == "memory_embed": result = client.embed() return ok(result) elif tool_name == "memory_core": text = client.core() return ok({"content": text}) elif tool_name == "memory_decay": result = client.decay() return ok(result) elif tool_name == "memory_config": config_path = client.memory_dir / "_config.json" config = _load_memory_config(config_path) changed = False provider = arguments.get("provider") openai_api_key = arguments.get("openai_api_key") if provider: config["embedding_provider"] = provider changed = True if openai_api_key: config["openai_api_key"] = openai_api_key changed = True if changed: config_path.write_text(json.dumps(config, indent=2)) return ok({"success": True, "updated": True}) else: # Show config with masked API key display = dict(config) key = display.get("openai_api_key") if key and isinstance(key, str) and len(key) > 8: display["openai_api_key"] = key[:4] + "..." + key[-4:] return ok(display) elif tool_name == "memory_graphs": graphs = list_graphs() # Enrich with existence check and memory count for g in graphs: p = Path(g["path"]) g["exists"] = p.exists() if g["exists"]: index_path = p / "_index.json" if index_path.exists(): try: idx = json.loads(index_path.read_text()) g["memory_count"] = len(idx.get("entries", {})) except (json.JSONDecodeError, OSError): g["memory_count"] = 0 else: g["memory_count"] = 0 return ok(graphs) else: return { "content": [{"type": "text", "text": f"Unknown tool: {tool_name}"}], "isError": True, } except Exception as e: return { "content": [{"type": "text", "text": f"Error: {str(e)}"}], "isError": True, } def main(): """MCP stdio server main loop (JSON-RPC 2.0).""" tools = create_tools() for line in sys.stdin: line = line.strip() if not line: continue try: message = json.loads(line) if message.get("method") == "initialize": response = { "jsonrpc": "2.0", "id": message.get("id"), "result": { "protocolVersion": "2024-11-05", "capabilities": {"tools": {}}, "serverInfo": { "name": "cognitive-memory-mcp-server", "version": "3.1.0", }, }, } print(json.dumps(response), flush=True) elif message.get("method") == "tools/list": response = { "jsonrpc": "2.0", "id": message.get("id"), "result": {"tools": tools}, } print(json.dumps(response), flush=True) elif message.get("method") == "tools/call": params = message.get("params", {}) tool_name = params.get("name") arguments = params.get("arguments", {}) result = handle_tool_call(tool_name, arguments) response = {"jsonrpc": "2.0", "id": message.get("id"), "result": result} print(json.dumps(response), flush=True) elif message.get("method") == "notifications/initialized": # Acknowledge but no response required for notifications pass except Exception as e: error_response = { "jsonrpc": "2.0", "id": message.get("id") if "message" in locals() else None, "error": {"code": -32603, "message": str(e)}, } print(json.dumps(error_response), flush=True) if __name__ == "__main__": main()