cognitive-memory/mcp_server.py
Cal Corum 1fefb1d5e2 perf: skip per-edge git commits during auto-edge creation
Auto-edges called relate() up to 3 times per store, each triggering a
blocking git subprocess (~50-100ms each). Now relate() accepts
skip_commit=True so auto-edges defer to the fire-and-forget git sync,
cutting 150-300ms of latency from every memory_store call.

Closes #2

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 22:44:06 -06:00

925 lines
36 KiB
Python

#!/usr/bin/env python3
"""
Cognitive Memory MCP Server
JSON-RPC 2.0 stdio MCP server that wraps CognitiveMemoryClient.
Exposes 18 memory operations as MCP tools for Claude Code.
"""
import json
import subprocess
import sys
from pathlib import Path
from typing import Any, Dict, Optional
# Allow imports from this directory (client.py lives here)
sys.path.insert(0, str(Path(__file__).parent))
from client import CognitiveMemoryClient, _load_memory_config, MEMORY_DIR
from common import resolve_graph_path, list_graphs
SYNC_SCRIPT = Path(__file__).parent / "scripts" / "memory-git-sync.sh"
# Auto-edge heuristics: (new_type, match_type) -> (rel_type, direction)
# direction "ab" = new->match, "ba" = match->new
# Subset of edge-proposer.py's TYPE_HEURISTICS — excludes:
# - Same-type pairs (decision/decision, fix/fix, solution/solution): too noisy for auto-edges
# - REQUIRES/FOLLOWS pairs: need stronger signal than title recall provides
AUTO_EDGE_HEURISTICS = {
("fix", "problem"): ("SOLVES", "ab"),
("solution", "problem"): ("SOLVES", "ab"),
("solution", "error"): ("SOLVES", "ab"),
("fix", "error"): ("SOLVES", "ab"),
("insight", "solution"): ("BUILDS_ON", "ab"),
("insight", "decision"): ("BUILDS_ON", "ab"),
("decision", "solution"): ("BUILDS_ON", "ab"),
("fix", "solution"): ("BUILDS_ON", "ab"),
("code_pattern", "solution"): ("BUILDS_ON", "ab"),
("procedure", "workflow"): ("BUILDS_ON", "ab"),
}
AUTO_EDGE_SIMILARITY_THRESHOLD = 0.4
AUTO_EDGE_MAX = 3
_clients: Dict[str, CognitiveMemoryClient] = {}
def get_client(graph: Optional[str] = None) -> CognitiveMemoryClient:
"""Get or create a CognitiveMemoryClient for the given graph."""
key = graph or "default"
if key not in _clients:
path = resolve_graph_path(graph)
_clients[key] = CognitiveMemoryClient(memory_dir=path)
return _clients[key]
def _trigger_git_sync(memory_dir: Optional[Path] = None):
"""Fire-and-forget git sync after a write operation."""
if SYNC_SCRIPT.exists():
try:
env = None
if memory_dir is not None:
import os
env = {**os.environ, "COGNITIVE_MEMORY_DIR": str(memory_dir)}
subprocess.Popen(
[str(SYNC_SCRIPT)],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True,
env=env,
)
except Exception:
pass # Non-critical — daily timer is the fallback
def create_tools() -> list:
"""Define all 18 MCP tool definitions with inputSchema."""
return [
{
"name": "memory_store",
"description": (
"Store a new memory in the cognitive memory system. "
"Creates a markdown file with YAML frontmatter and returns the new memory UUID. "
"Valid types: solution, fix, decision, configuration, problem, workflow, "
"code_pattern, error, general, procedure, insight."
),
"inputSchema": {
"type": "object",
"properties": {
"type": {
"type": "string",
"description": "Memory type (solution, fix, decision, configuration, problem, workflow, code_pattern, error, general, procedure, insight)",
},
"title": {
"type": "string",
"description": "Short descriptive title for the memory",
},
"content": {
"type": "string",
"description": "Full content/body of the memory in markdown",
},
"tags": {
"type": "array",
"items": {"type": "string"},
"description": "List of lowercase tags for categorisation (e.g. ['python', 'fix', 'discord'])",
},
"importance": {
"type": "number",
"description": "Importance score from 0.0 to 1.0 (default 0.5)",
},
"episode": {
"type": "boolean",
"description": "Also log an episode entry for this memory (default true)",
},
"graph": {
"type": "string",
"description": "Named memory graph to use (default: 'default')",
},
},
"required": ["type", "title", "content"],
},
},
{
"name": "memory_recall",
"description": (
"Search memories by a natural language query, ranked by relevance and decay score. "
"Semantic search is enabled by default when embeddings exist. Set semantic=false for keyword-only."
),
"inputSchema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Natural language search query",
},
"semantic": {
"type": "boolean",
"description": "Merge with semantic/vector similarity search (requires embeddings, default true)",
},
"limit": {
"type": "integer",
"description": "Maximum number of results to return (default 10)",
},
"graph": {
"type": "string",
"description": "Named memory graph to use (default: 'default')",
},
},
"required": ["query"],
},
},
{
"name": "memory_get",
"description": (
"Retrieve a single memory by its UUID, including full content, frontmatter metadata, "
"relations, and current decay score."
),
"inputSchema": {
"type": "object",
"properties": {
"memory_id": {
"type": "string",
"description": "UUID of the memory to retrieve",
},
"graph": {
"type": "string",
"description": "Named memory graph to use (default: 'default')",
},
},
"required": ["memory_id"],
},
},
{
"name": "memory_search",
"description": (
"Filter memories by type, tags, and/or minimum importance score. "
"Optionally include a text query. Returns results sorted by importance descending. "
"Use this for structured browsing; use memory_recall for ranked relevance search."
),
"inputSchema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Optional text query to filter results",
},
"memory_types": {
"type": "array",
"items": {"type": "string"},
"description": "Filter by memory types (e.g. ['solution', 'fix'])",
},
"tags": {
"type": "array",
"items": {"type": "string"},
"description": "Filter by tags — memory must have at least one of these",
},
"min_importance": {
"type": "number",
"description": "Minimum importance score (0.0 to 1.0)",
},
"graph": {
"type": "string",
"description": "Named memory graph to use (default: 'default')",
},
},
},
},
{
"name": "memory_relate",
"description": (
"Create a typed relationship (edge) between two memories. "
"Valid relation types: SOLVES, CAUSES, BUILDS_ON, ALTERNATIVE_TO, REQUIRES, FOLLOWS, RELATED_TO. "
"Returns the new edge UUID, or empty string if the relationship already exists."
),
"inputSchema": {
"type": "object",
"properties": {
"from_id": {
"type": "string",
"description": "UUID of the source memory",
},
"to_id": {
"type": "string",
"description": "UUID of the target memory",
},
"rel_type": {
"type": "string",
"description": "Relationship type (SOLVES, CAUSES, BUILDS_ON, ALTERNATIVE_TO, REQUIRES, FOLLOWS, RELATED_TO)",
},
"description": {
"type": "string",
"description": "Optional human-readable description of the relationship",
},
"strength": {
"type": "number",
"description": "Relationship strength from 0.0 to 1.0 (default 0.8)",
},
"graph": {
"type": "string",
"description": "Named memory graph to use (default: 'default')",
},
},
"required": ["from_id", "to_id", "rel_type"],
},
},
{
"name": "memory_related",
"description": (
"Traverse the relationship graph from a given memory, returning connected memories "
"up to max_depth hops away. Optionally filter by relationship type."
),
"inputSchema": {
"type": "object",
"properties": {
"memory_id": {
"type": "string",
"description": "UUID of the starting memory",
},
"rel_types": {
"type": "array",
"items": {"type": "string"},
"description": "Filter by relation types (e.g. ['SOLVES', 'BUILDS_ON'])",
},
"max_depth": {
"type": "integer",
"description": "Maximum traversal depth (1-5, default 1)",
},
"graph": {
"type": "string",
"description": "Named memory graph to use (default: 'default')",
},
},
"required": ["memory_id"],
},
},
{
"name": "memory_edge_get",
"description": (
"Retrieve a single relationship edge by its UUID, including metadata and description body."
),
"inputSchema": {
"type": "object",
"properties": {
"edge_id": {
"type": "string",
"description": "UUID of the edge to retrieve",
},
"graph": {
"type": "string",
"description": "Named memory graph to use (default: 'default')",
},
},
"required": ["edge_id"],
},
},
{
"name": "memory_edge_search",
"description": (
"Search relationship edges by type, connected memory IDs, or a text query "
"that matches against the from/to memory titles and relationship type."
),
"inputSchema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Text query to match against edge titles and type",
},
"types": {
"type": "array",
"items": {"type": "string"},
"description": "Filter by relationship types (e.g. ['SOLVES'])",
},
"from_id": {
"type": "string",
"description": "Filter to edges originating from this memory UUID",
},
"to_id": {
"type": "string",
"description": "Filter to edges pointing at this memory UUID",
},
"graph": {
"type": "string",
"description": "Named memory graph to use (default: 'default')",
},
},
},
},
{
"name": "memory_reflect",
"description": (
"Review memories created since a given date, cluster them by shared tags, "
"and return consolidation recommendations. Does NOT auto-create new memories — "
"you review the output and decide what to store. Set dry_run=true to skip "
"updating state and logging an episode entry."
),
"inputSchema": {
"type": "object",
"properties": {
"since": {
"type": "string",
"description": "ISO date (YYYY-MM-DD) to review memories from. Defaults to last reflection date or 30 days ago.",
},
"dry_run": {
"type": "boolean",
"description": "If true, return analysis without persisting state changes (default false)",
},
"graph": {
"type": "string",
"description": "Named memory graph to use (default: 'default')",
},
},
},
},
{
"name": "memory_reflection",
"description": (
"Return the current REFLECTION.md summary — the auto-curated narrative of recent "
"memory themes, clusters, and activity. Use this to quickly orient at session start."
),
"inputSchema": {
"type": "object",
"properties": {
"graph": {
"type": "string",
"description": "Named memory graph to use (default: 'default')",
},
},
},
},
{
"name": "memory_stats",
"description": (
"Return statistics about the memory system: total count, breakdown by type, "
"relation count, decay distribution, embeddings count, and per-directory file counts."
),
"inputSchema": {
"type": "object",
"properties": {
"graph": {
"type": "string",
"description": "Named memory graph to use (default: 'default')",
},
},
},
},
{
"name": "memory_episode",
"description": (
"Append a timestamped entry to today's episode log file (episodes/YYYY-MM-DD.md in the active graph). "
"Use this to record significant session events, commits, or decisions without creating full memories."
),
"inputSchema": {
"type": "object",
"properties": {
"type": {
"type": "string",
"description": "Episode entry type (e.g. fix, commit, decision, automation)",
},
"title": {
"type": "string",
"description": "Short title for the episode entry",
},
"tags": {
"type": "array",
"items": {"type": "string"},
"description": "Tags for the episode entry",
},
"summary": {
"type": "string",
"description": "Optional summary text for the entry",
},
"graph": {
"type": "string",
"description": "Named memory graph to use (default: 'default')",
},
},
"required": ["type", "title"],
},
},
{
"name": "memory_tags_list",
"description": (
"List all tags used across the memory system, sorted by usage frequency. "
"Returns tag name and count of memories using it."
),
"inputSchema": {
"type": "object",
"properties": {
"limit": {
"type": "integer",
"description": "Maximum number of tags to return (0 = unlimited, default 0)",
},
"graph": {
"type": "string",
"description": "Named memory graph to use (default: 'default')",
},
},
},
},
{
"name": "memory_tags_related",
"description": (
"Find tags that frequently co-occur with a given tag, sorted by co-occurrence count. "
"Useful for discovering related topics and navigating the tag graph."
),
"inputSchema": {
"type": "object",
"properties": {
"tag": {
"type": "string",
"description": "The tag to find co-occurring tags for",
},
"limit": {
"type": "integer",
"description": "Maximum number of related tags to return (0 = unlimited, default 0)",
},
"graph": {
"type": "string",
"description": "Named memory graph to use (default: 'default')",
},
},
"required": ["tag"],
},
},
{
"name": "memory_embed",
"description": (
"Generate or refresh vector embeddings for all memories that do not yet have them. "
"Requires either Ollama (nomic-embed-text model) or an OpenAI API key configured. "
"Embeddings enable semantic recall via memory_recall with semantic=true."
),
"inputSchema": {
"type": "object",
"properties": {
"graph": {
"type": "string",
"description": "Named memory graph to use (default: 'default')",
},
},
},
},
{
"name": "memory_core",
"description": (
"Return the current CORE.md content — the auto-curated high-priority memory digest "
"used to seed Claude sessions. Lists critical solutions, active decisions, and key fixes."
),
"inputSchema": {
"type": "object",
"properties": {
"graph": {
"type": "string",
"description": "Named memory graph to use (default: 'default')",
},
},
},
},
{
"name": "memory_decay",
"description": (
"Run a decay pass over all memories: recalculate decay scores based on age, "
"access frequency, importance, and type weight. Archives memories whose score "
"drops below the dormant threshold. Returns a summary of updated scores."
),
"inputSchema": {
"type": "object",
"properties": {
"graph": {
"type": "string",
"description": "Named memory graph to use (default: 'default')",
},
},
},
},
{
"name": "memory_config",
"description": (
"View or update the cognitive memory embedding configuration (_config.json). "
"Set action='show' to display current config (API key is masked). "
"Provide provider='openai' or provider='ollama' to switch embedding backends. "
"Provide openai_api_key to set the OpenAI API key for embeddings."
),
"inputSchema": {
"type": "object",
"properties": {
"action": {
"type": "string",
"description": "Set to 'show' to display current config without modifying it",
},
"provider": {
"type": "string",
"description": "Embedding provider: 'ollama' or 'openai'",
},
"openai_api_key": {
"type": "string",
"description": "OpenAI API key to store in config",
},
"graph": {
"type": "string",
"description": "Named memory graph to use (default: 'default')",
},
},
},
},
{
"name": "memory_graphs",
"description": (
"List all available memory graphs (named, segregated memory namespaces). "
"Returns each graph's name, path, and whether it exists on disk."
),
"inputSchema": {"type": "object", "properties": {}},
},
]
def _auto_create_edges(
client: CognitiveMemoryClient,
memory_id: str,
title: str,
mem_type: str,
tags: Optional[list] = None,
) -> list:
"""Auto-create edges between a newly stored memory and related existing memories.
Uses recall to find similar memories and heuristic type-pairs to choose
relationship types. Returns list of created edge info dicts.
Never raises — returns partial results or empty list with error info so
store always succeeds. Returns (edges_list, error_string_or_None).
"""
try:
# Build recall query from title + tags for better precision (#4)
query = title
if tags:
query = f"{title} {' '.join(tags)}"
results = client.recall(query, limit=5, semantic=True)
# Filter out the just-stored memory
results = [r for r in results if r.get("id") != memory_id]
# Filter by similarity threshold before slicing to AUTO_EDGE_MAX (#5)
# When embeddings are absent, keyword-only results lack "similarity" —
# require at least a tag overlap to avoid spurious edges (#1)
filtered = []
new_tags = set(tags or [])
for result in results:
similarity = result.get("similarity")
if similarity is not None:
# Semantic path: use similarity threshold
if similarity < AUTO_EDGE_SIMILARITY_THRESHOLD:
continue
else:
# Keyword-only path: require at least one shared tag
match_tags = set(result.get("tags") or [])
if not new_tags or not (new_tags & match_tags):
continue
filtered.append(result)
created_edges = []
for result in filtered[:AUTO_EDGE_MAX]:
match_type = result.get("type", "")
match_id = result["id"]
match_title = result.get("title", "")
# Look up heuristic for (new_type, match_type) in both orderings
key_ab = (mem_type, match_type)
key_ba = (match_type, mem_type)
if key_ab in AUTO_EDGE_HEURISTICS:
rel_type, direction = AUTO_EDGE_HEURISTICS[key_ab]
from_id = memory_id if direction == "ab" else match_id
to_id = match_id if direction == "ab" else memory_id
elif key_ba in AUTO_EDGE_HEURISTICS:
rel_type, direction = AUTO_EDGE_HEURISTICS[key_ba]
# Reverse: if heuristic says ab for (match, new), then match->new
from_id = match_id if direction == "ab" else memory_id
to_id = memory_id if direction == "ab" else match_id
else:
rel_type = "RELATED_TO"
from_id = memory_id
to_id = match_id
# Build description matching actual edge direction (#2)
from_title = title if from_id == memory_id else match_title
to_title = match_title if to_id == match_id else title
desc = f"Auto-edge: {from_title}{to_title}"
# Use similarity as edge strength when available (default 0.8)
sim = result.get("similarity")
strength = round(sim, 2) if sim is not None else 0.8
edge_id = client.relate(
from_id=from_id,
to_id=to_id,
rel_type=rel_type,
description=desc,
strength=strength,
skip_commit=True, # batched — git sync handles persistence
)
if edge_id: # Empty string means duplicate
created_edges.append(
{
"edge_id": edge_id,
"rel_type": rel_type,
"linked_memory_id": match_id,
"linked_title": match_title,
}
)
return created_edges, None
except Exception as e:
return [], f"{type(e).__name__}: {e}"
def handle_tool_call(tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Dispatch MCP tool calls to the CognitiveMemoryClient."""
def ok(result: Any) -> Dict[str, Any]:
return {
"content": [
{"type": "text", "text": json.dumps(result, indent=2, default=str)}
]
}
try:
graph = arguments.pop("graph", None)
client = get_client(graph)
if tool_name == "memory_store":
mem_type = arguments["type"]
title = arguments["title"]
tags = arguments.get("tags")
memory_id = client.store(
type=mem_type,
title=title,
content=arguments["content"],
tags=tags,
importance=arguments.get("importance", 0.5),
)
# Auto-log episode entry (default true, opt-out with episode=false)
episode_logged = False
if arguments.get("episode", True):
client.episode(
type=mem_type,
title=title,
tags=tags,
)
episode_logged = True
auto_edges, auto_edge_error = _auto_create_edges(
client, memory_id, title, mem_type, tags
)
_trigger_git_sync(client.memory_dir)
result = {
"success": True,
"memory_id": memory_id,
"episode_logged": episode_logged,
"auto_edges": auto_edges,
}
if auto_edge_error:
result["auto_edge_error"] = auto_edge_error
return ok(result)
elif tool_name == "memory_recall":
results = client.recall(
query=arguments["query"],
semantic=arguments.get("semantic", True),
limit=arguments.get("limit", 10),
)
return ok(results)
elif tool_name == "memory_get":
result = client.get(arguments["memory_id"])
if result is None:
return ok({"error": f"Memory not found: {arguments['memory_id']}"})
return ok(result)
elif tool_name == "memory_search":
results = client.search(
query=arguments.get("query"),
memory_types=arguments.get("memory_types"),
tags=arguments.get("tags"),
min_importance=arguments.get("min_importance"),
)
return ok(results)
elif tool_name == "memory_relate":
edge_id = client.relate(
from_id=arguments["from_id"],
to_id=arguments["to_id"],
rel_type=arguments["rel_type"],
description=arguments.get("description"),
strength=arguments.get("strength", 0.8),
)
if edge_id:
_trigger_git_sync(client.memory_dir)
return ok({"success": True, "edge_id": edge_id})
return ok({"success": False, "message": "Relationship already exists"})
elif tool_name == "memory_related":
results = client.related(
memory_id=arguments["memory_id"],
rel_types=arguments.get("rel_types"),
max_depth=arguments.get("max_depth", 1),
)
return ok(results)
elif tool_name == "memory_edge_get":
result = client.edge_get(arguments["edge_id"])
if result is None:
return ok({"error": f"Edge not found: {arguments['edge_id']}"})
return ok(result)
elif tool_name == "memory_edge_search":
results = client.edge_search(
query=arguments.get("query"),
types=arguments.get("types"),
from_id=arguments.get("from_id"),
to_id=arguments.get("to_id"),
)
return ok(results)
elif tool_name == "memory_reflect":
result = client.reflect(
since=arguments.get("since"),
dry_run=arguments.get("dry_run", False),
)
return ok(result)
elif tool_name == "memory_reflection":
text = client.reflection_summary()
return ok({"content": text})
elif tool_name == "memory_stats":
result = client.stats()
return ok(result)
elif tool_name == "memory_episode":
client.episode(
type=arguments["type"],
title=arguments["title"],
tags=arguments.get("tags"),
summary=arguments.get("summary"),
)
return ok({"success": True})
elif tool_name == "memory_tags_list":
results = client.tags_list(limit=arguments.get("limit", 0))
return ok(results)
elif tool_name == "memory_tags_related":
results = client.tags_related(
tag=arguments["tag"],
limit=arguments.get("limit", 0),
)
return ok(results)
elif tool_name == "memory_embed":
result = client.embed()
return ok(result)
elif tool_name == "memory_core":
text = client.core()
return ok({"content": text})
elif tool_name == "memory_decay":
result = client.decay()
return ok(result)
elif tool_name == "memory_config":
config_path = client.memory_dir / "_config.json"
config = _load_memory_config(config_path)
changed = False
provider = arguments.get("provider")
openai_api_key = arguments.get("openai_api_key")
if provider:
config["embedding_provider"] = provider
changed = True
if openai_api_key:
config["openai_api_key"] = openai_api_key
changed = True
if changed:
config_path.write_text(json.dumps(config, indent=2))
return ok({"success": True, "updated": True})
else:
# Show config with masked API key
display = dict(config)
key = display.get("openai_api_key")
if key and isinstance(key, str) and len(key) > 8:
display["openai_api_key"] = key[:4] + "..." + key[-4:]
return ok(display)
elif tool_name == "memory_graphs":
graphs = list_graphs()
# Enrich with existence check and memory count
for g in graphs:
p = Path(g["path"])
g["exists"] = p.exists()
if g["exists"]:
index_path = p / "_index.json"
if index_path.exists():
try:
idx = json.loads(index_path.read_text())
g["memory_count"] = len(idx.get("entries", {}))
except (json.JSONDecodeError, OSError):
g["memory_count"] = 0
else:
g["memory_count"] = 0
return ok(graphs)
else:
return {
"content": [{"type": "text", "text": f"Unknown tool: {tool_name}"}],
"isError": True,
}
except Exception as e:
return {
"content": [{"type": "text", "text": f"Error: {str(e)}"}],
"isError": True,
}
def main():
"""MCP stdio server main loop (JSON-RPC 2.0)."""
tools = create_tools()
for line in sys.stdin:
line = line.strip()
if not line:
continue
try:
message = json.loads(line)
if message.get("method") == "initialize":
response = {
"jsonrpc": "2.0",
"id": message.get("id"),
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {"tools": {}},
"serverInfo": {
"name": "cognitive-memory-mcp-server",
"version": "3.1.0",
},
},
}
print(json.dumps(response), flush=True)
elif message.get("method") == "tools/list":
response = {
"jsonrpc": "2.0",
"id": message.get("id"),
"result": {"tools": tools},
}
print(json.dumps(response), flush=True)
elif message.get("method") == "tools/call":
params = message.get("params", {})
tool_name = params.get("name")
arguments = params.get("arguments", {})
result = handle_tool_call(tool_name, arguments)
response = {"jsonrpc": "2.0", "id": message.get("id"), "result": result}
print(json.dumps(response), flush=True)
elif message.get("method") == "notifications/initialized":
# Acknowledge but no response required for notifications
pass
except Exception as e:
error_response = {
"jsonrpc": "2.0",
"id": message.get("id") if "message" in locals() else None,
"error": {"code": -32603, "message": str(e)},
}
print(json.dumps(error_response), flush=True)
if __name__ == "__main__":
main()