claude-configs/skills/cognitive-memory/client.py
Cal Corum a2d18ef0c2 Cognitive Memory v3.0: rich edges, hybrid embeddings, MCP server
Add first-class edge files in graph/edges/ with bidirectional frontmatter
refs, hybrid Ollama/OpenAI embedding providers with fallback chain, and
native MCP server (18 tools) for direct Claude Code integration.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-19 14:11:18 -06:00

3224 lines
114 KiB
Python

#!/usr/bin/env python3
"""
Cognitive Memory Client
Markdown-based memory system with decay scoring, episodic logging, and auto-curated CORE.md.
Stores memories as human-readable markdown files with YAML frontmatter.
Usage:
# CLI
python client.py store --type solution --title "Fixed X" --content "Details..." --tags "python,fix"
python client.py recall "timeout error"
python client.py get <memory_id>
python client.py relate <from_id> <to_id> SOLVES
python client.py core
python client.py episode --type fix --title "Fixed reconnection" --tags "discord,python"
# Python
from client import CognitiveMemoryClient
client = CognitiveMemoryClient()
memory_id = client.store(type="solution", title="...", content="...")
"""
import argparse
import json
import math
import os
import re
import subprocess
import sys
import urllib.request
import uuid
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from urllib.error import URLError
# =============================================================================
# CONSTANTS
# =============================================================================
MEMORY_DIR = Path.home() / ".claude" / "memory"
INDEX_PATH = MEMORY_DIR / "_index.json"
STATE_PATH = MEMORY_DIR / "_state.json"
EMBEDDINGS_PATH = MEMORY_DIR / "_embeddings.json"
OLLAMA_URL = "http://localhost:11434"
EMBEDDING_MODEL = "nomic-embed-text"
EMBEDDING_TIMEOUT = 5 # seconds
CONFIG_PATH = MEMORY_DIR / "_config.json"
OPENAI_EMBED_URL = "https://api.openai.com/v1/embeddings"
OPENAI_MODEL_DEFAULT = "text-embedding-3-small"
# Memory type -> directory name mapping
TYPE_DIRS = {
"solution": "solutions",
"fix": "fixes",
"decision": "decisions",
"configuration": "configurations",
"problem": "problems",
"workflow": "workflows",
"code_pattern": "code-patterns",
"error": "errors",
"general": "general",
"procedure": "procedures",
"insight": "insights",
}
VALID_TYPES = list(TYPE_DIRS.keys())
# Decay model type weights
TYPE_WEIGHTS = {
"decision": 1.3,
"solution": 1.2,
"insight": 1.25,
"code_pattern": 1.1,
"configuration": 1.1,
"fix": 1.0,
"workflow": 1.0,
"problem": 0.9,
"error": 0.8,
"general": 0.8,
"procedure": 1.4,
}
DECAY_LAMBDA = 0.03 # Half-life ~23 days
# Decay score thresholds
THRESHOLD_ACTIVE = 0.5
THRESHOLD_FADING = 0.2
THRESHOLD_DORMANT = 0.05
# Relationship types (subset from MemoryGraph, focused on most useful)
VALID_RELATION_TYPES = [
"SOLVES",
"CAUSES",
"BUILDS_ON",
"ALTERNATIVE_TO",
"REQUIRES",
"FOLLOWS",
"RELATED_TO",
]
# Edge file constants
EDGES_DIR_NAME = "edges"
EDGE_FIELD_ORDER = [
"id",
"type",
"from_id",
"from_title",
"to_id",
"to_title",
"strength",
"created",
"updated",
]
# Frontmatter field order for consistent output
FIELD_ORDER = [
"id",
"type",
"title",
"tags",
"importance",
"confidence",
"steps",
"preconditions",
"postconditions",
"created",
"updated",
"relations",
]
# CORE.md token budget (approximate, 1 token ~= 4 chars)
CORE_MAX_CHARS = 12000 # ~3K tokens
# =============================================================================
# YAML FRONTMATTER PARSING (stdlib only)
# =============================================================================
def _needs_quoting(s: str) -> bool:
"""Check if a YAML string value needs quoting."""
if not s:
return True
if any(c in s for c in ":#{}[]&*?|>!%@`"):
return True
try:
float(s)
return True
except ValueError:
pass
if s.lower() in ("true", "false", "null", "yes", "no", "on", "off"):
return True
return False
def _quote_yaml(s: str) -> str:
"""Quote a string for YAML, escaping internal quotes."""
escaped = s.replace("\\", "\\\\").replace('"', '\\"')
return f'"{escaped}"'
def _format_yaml_value(value: Any, force_quote: bool = False) -> str:
"""Format a Python value for YAML output."""
if value is None:
return "null"
if isinstance(value, bool):
return "true" if value else "false"
if isinstance(value, (int, float)):
return str(value)
s = str(value)
if force_quote or _needs_quoting(s):
return _quote_yaml(s)
return s
def _parse_scalar(value: str) -> Any:
"""Parse a YAML scalar value to Python type."""
v = value.strip()
if not v or v == "null":
return None
if v == "true":
return True
if v == "false":
return False
# Try numeric
try:
if "." in v:
return float(v)
return int(v)
except ValueError:
pass
# Strip quotes
if (v.startswith('"') and v.endswith('"')) or (
v.startswith("'") and v.endswith("'")
):
return v[1:-1]
return v
def serialize_frontmatter(data: Dict[str, Any]) -> str:
"""Serialize a dict to YAML frontmatter string (between --- markers)."""
lines = ["---"]
for key in FIELD_ORDER:
if key not in data:
continue
value = data[key]
if key == "tags" and isinstance(value, list):
if value:
items = ", ".join(_format_yaml_value(t) for t in value)
lines.append(f"tags: [{items}]")
else:
lines.append("tags: []")
elif key in ("steps", "preconditions", "postconditions") and isinstance(
value, list
):
if not value:
continue
lines.append(f"{key}:")
for item in value:
lines.append(f" - {_format_yaml_value(str(item), force_quote=True)}")
elif key == "relations" and isinstance(value, list):
if not value:
continue
lines.append("relations:")
for rel in value:
first = True
for rk in [
"target",
"type",
"direction",
"strength",
"context",
"edge_id",
]:
if rk not in rel:
continue
rv = rel[rk]
prefix = " - " if first else " "
force_q = rk in ("context",)
lines.append(
f"{prefix}{rk}: {_format_yaml_value(rv, force_quote=force_q)}"
)
first = False
elif key == "title":
lines.append(f"title: {_format_yaml_value(value, force_quote=True)}")
else:
lines.append(f"{key}: {_format_yaml_value(value)}")
lines.append("---")
return "\n".join(lines)
def parse_frontmatter(text: str) -> Tuple[Dict[str, Any], str]:
"""Parse YAML frontmatter and body from markdown text.
Returns (frontmatter_dict, body_text).
"""
if not text.startswith("---\n"):
return {}, text
# Find closing ---
end_match = re.search(r"\n---\s*\n", text[3:])
if not end_match:
# Try end of string
if text.rstrip().endswith("---"):
end_pos = text.rstrip().rfind("\n---")
if end_pos <= 3:
return {}, text
fm_text = text[4:end_pos]
body = ""
else:
return {}, text
else:
end_pos = end_match.start() + 3 # Offset from text[3:]
fm_text = text[4:end_pos]
body = text[end_pos + end_match.end() - end_match.start() :]
body = body.lstrip("\n")
data = {}
lines = fm_text.split("\n")
i = 0
while i < len(lines):
line = lines[i]
# Skip empty lines
if not line.strip():
i += 1
continue
# Must be a top-level key (no leading whitespace)
if line[0] == " ":
i += 1
continue
if ":" not in line:
i += 1
continue
key, _, rest = line.partition(":")
key = key.strip()
rest = rest.strip()
if not rest:
# Block value - collect indented lines
block_lines = []
j = i + 1
while j < len(lines) and lines[j] and lines[j][0] == " ":
block_lines.append(lines[j])
j += 1
if key == "relations":
data["relations"] = _parse_relations_block(block_lines)
elif block_lines and block_lines[0].strip().startswith("- "):
# Simple list
data[key] = [
_parse_scalar(bl.strip().lstrip("- "))
for bl in block_lines
if bl.strip().startswith("- ")
]
else:
data[key] = None
i = j
continue
# Inline list: [a, b, c]
if rest.startswith("[") and rest.endswith("]"):
inner = rest[1:-1]
if inner.strip():
data[key] = [
_parse_scalar(v.strip()) for v in inner.split(",") if v.strip()
]
else:
data[key] = []
else:
data[key] = _parse_scalar(rest)
i += 1
return data, body
def _parse_relations_block(lines: List[str]) -> List[Dict[str, Any]]:
"""Parse a YAML block list of relation dicts."""
relations = []
current = None
for line in lines:
stripped = line.strip()
if not stripped:
continue
if stripped.startswith("- "):
# New relation entry
current = {}
relations.append(current)
# Parse key:value on same line as -
rest = stripped[2:]
if ":" in rest:
k, _, v = rest.partition(":")
current[k.strip()] = _parse_scalar(v.strip())
elif current is not None and ":" in stripped:
k, _, v = stripped.partition(":")
current[k.strip()] = _parse_scalar(v.strip())
return relations
# =============================================================================
# HELPER FUNCTIONS
# =============================================================================
def slugify(text: str, max_length: int = 60) -> str:
"""Convert text to a URL-friendly slug."""
text = text.lower().strip()
text = re.sub(r"[^\w\s-]", "", text)
text = re.sub(r"[\s_]+", "-", text)
text = re.sub(r"-+", "-", text)
text = text.strip("-")
if len(text) > max_length:
text = text[:max_length].rstrip("-")
return text or "untitled"
def make_filename(title: str, memory_id: str) -> str:
"""Create a filename from title and UUID suffix."""
slug = slugify(title)
suffix = memory_id[:6]
return f"{slug}-{suffix}.md"
def calculate_decay_score(
importance: float, days_since_access: float, access_count: int, type_weight: float
) -> float:
"""Calculate decay score for a memory.
decay_score = importance * e^(-lambda * days) * log2(access_count + 1) * type_weight
"""
time_factor = math.exp(-DECAY_LAMBDA * days_since_access)
usage_factor = math.log2(access_count + 1) if access_count > 0 else 1.0
return importance * time_factor * usage_factor * type_weight
def _ollama_embed(
texts: List[str], timeout: int = EMBEDDING_TIMEOUT
) -> Optional[List[List[float]]]:
"""Get embeddings from Ollama for a list of texts.
Returns list of embedding vectors, or None if Ollama is unavailable.
"""
try:
payload = json.dumps({"model": EMBEDDING_MODEL, "input": texts}).encode("utf-8")
req = urllib.request.Request(
f"{OLLAMA_URL}/api/embed",
data=payload,
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(req, timeout=timeout) as resp:
if resp.status != 200:
return None
data = json.loads(resp.read().decode("utf-8"))
embeddings = data.get("embeddings")
if embeddings and isinstance(embeddings, list):
return embeddings
return None
except (
ConnectionRefusedError,
URLError,
TimeoutError,
OSError,
json.JSONDecodeError,
ValueError,
KeyError,
):
return None
def _load_memory_config(config_path: Optional[Path] = None) -> Dict[str, Any]:
"""Read _config.json, return defaults if missing."""
path = config_path or CONFIG_PATH
defaults = {
"embedding_provider": "ollama",
"openai_api_key": None,
"ollama_model": EMBEDDING_MODEL,
"openai_model": OPENAI_MODEL_DEFAULT,
}
if path.exists():
try:
data = json.loads(path.read_text())
for k, v in defaults.items():
data.setdefault(k, v)
return data
except (json.JSONDecodeError, OSError):
pass
return defaults
def _openai_embed(
texts: List[str],
api_key: str,
model: str = OPENAI_MODEL_DEFAULT,
timeout: int = 30,
) -> Optional[List[List[float]]]:
"""Get embeddings from OpenAI API (stdlib-only, same interface as _ollama_embed)."""
try:
payload = json.dumps({"input": texts, "model": model}).encode("utf-8")
req = urllib.request.Request(
OPENAI_EMBED_URL,
data=payload,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}",
},
method="POST",
)
with urllib.request.urlopen(req, timeout=timeout) as resp:
if resp.status != 200:
return None
data = json.loads(resp.read().decode("utf-8"))
items = data.get("data", [])
if items and isinstance(items, list):
# Sort by index to ensure order matches input
items.sort(key=lambda x: x.get("index", 0))
return [item["embedding"] for item in items]
return None
except (
ConnectionRefusedError,
URLError,
TimeoutError,
OSError,
json.JSONDecodeError,
ValueError,
KeyError,
):
return None
def _cosine_similarity(a: List[float], b: List[float]) -> float:
"""Compute cosine similarity between two vectors."""
dot = sum(x * y for x, y in zip(a, b))
norm_a = math.sqrt(sum(x * x for x in a))
norm_b = math.sqrt(sum(x * x for x in b))
if norm_a == 0.0 or norm_b == 0.0:
return 0.0
return dot / (norm_a * norm_b)
def _make_edge_filename(
from_title: str, rel_type: str, to_title: str, edge_id: str
) -> str:
"""Produce edge filename: {from-slug}--{TYPE}--{to-slug}-{6char}.md"""
from_slug = slugify(from_title, max_length=30)
to_slug = slugify(to_title, max_length=30)
suffix = edge_id[:6]
return f"{from_slug}--{rel_type}--{to_slug}-{suffix}.md"
def serialize_edge_frontmatter(data: Dict[str, Any]) -> str:
"""Serialize an edge dict to YAML frontmatter string."""
lines = ["---"]
for key in EDGE_FIELD_ORDER:
if key not in data:
continue
value = data[key]
if key in ("from_title", "to_title"):
lines.append(f"{key}: {_format_yaml_value(value, force_quote=True)}")
else:
lines.append(f"{key}: {_format_yaml_value(value)}")
lines.append("---")
return "\n".join(lines)
# =============================================================================
# CLIENT
# =============================================================================
class CognitiveMemoryClient:
"""Client for markdown-based cognitive memory system."""
def __init__(self, memory_dir: Optional[Path] = None):
self.memory_dir = memory_dir or MEMORY_DIR
self.index_path = self.memory_dir / "_index.json"
self.state_path = self.memory_dir / "_state.json"
self._ensure_dirs()
def _ensure_dirs(self):
"""Create directory structure if needed."""
for type_dir in TYPE_DIRS.values():
(self.memory_dir / "graph" / type_dir).mkdir(parents=True, exist_ok=True)
(self.memory_dir / "graph" / EDGES_DIR_NAME).mkdir(parents=True, exist_ok=True)
(self.memory_dir / "episodes").mkdir(parents=True, exist_ok=True)
(self.memory_dir / "vault").mkdir(parents=True, exist_ok=True)
# -------------------------------------------------------------------------
# Index and State management
# -------------------------------------------------------------------------
def _load_index(self) -> Dict:
"""Load _index.json, return empty structure if missing."""
if self.index_path.exists():
try:
return json.loads(self.index_path.read_text())
except (json.JSONDecodeError, OSError):
pass
return {"version": 2, "updated": "", "count": 0, "entries": {}, "edges": {}}
def _save_index(self, index: Dict):
"""Write _index.json."""
index["version"] = 2
index.setdefault("edges", {})
index["updated"] = datetime.now(timezone.utc).isoformat()
index["count"] = len(index.get("entries", {}))
self.index_path.write_text(json.dumps(index, indent=2, default=str))
def _load_state(self) -> Dict:
"""Load _state.json, return empty structure if missing."""
if self.state_path.exists():
try:
return json.loads(self.state_path.read_text())
except (json.JSONDecodeError, OSError):
pass
return {"version": 1, "updated": "", "entries": {}}
def _save_state(self, state: Dict):
"""Write _state.json."""
state["updated"] = datetime.now(timezone.utc).isoformat()
self.state_path.write_text(json.dumps(state, indent=2, default=str))
# -------------------------------------------------------------------------
# File I/O
# -------------------------------------------------------------------------
def _read_memory_file(self, path: Path) -> Tuple[Dict[str, Any], str]:
"""Read a memory markdown file, return (frontmatter, body)."""
text = path.read_text(encoding="utf-8")
return parse_frontmatter(text)
def _write_memory_file(self, path: Path, frontmatter: Dict[str, Any], body: str):
"""Write a memory markdown file with frontmatter and body."""
fm_str = serialize_frontmatter(frontmatter)
content = f"{fm_str}\n\n{body.strip()}\n" if body.strip() else f"{fm_str}\n"
path.write_text(content, encoding="utf-8")
def _resolve_memory_path(self, memory_id: str) -> Optional[Path]:
"""Find the file path for a memory by ID using the index."""
index = self._load_index()
entry = index.get("entries", {}).get(memory_id)
if entry:
path = self.memory_dir / entry["path"]
if path.exists():
return path
# Fallback: scan files (slow but reliable)
return self._scan_for_memory(memory_id)
def _scan_for_memory(self, memory_id: str) -> Optional[Path]:
"""Scan graph/ and vault/ directories for a memory file by ID."""
search_dirs = [self.memory_dir / "graph", self.memory_dir / "vault"]
for search_dir in search_dirs:
if not search_dir.exists():
continue
for md_file in search_dir.rglob("*.md"):
try:
fm, _ = self._read_memory_file(md_file)
if fm.get("id") == memory_id:
return md_file
except Exception:
continue
return None
# -------------------------------------------------------------------------
# Git operations
# -------------------------------------------------------------------------
def _git_commit(self, message: str, files: Optional[List[Path]] = None):
"""Stage and commit files in the memory repo."""
try:
# Check if it's a git repo
result = subprocess.run(
["git", "rev-parse", "--git-dir"],
cwd=str(self.memory_dir),
capture_output=True,
timeout=5,
)
if result.returncode != 0:
return # Not a git repo, skip
if files:
for f in files:
rel = f.relative_to(self.memory_dir)
subprocess.run(
["git", "add", str(rel)],
cwd=str(self.memory_dir),
capture_output=True,
timeout=5,
)
else:
subprocess.run(
["git", "add", "-A"],
cwd=str(self.memory_dir),
capture_output=True,
timeout=5,
)
subprocess.run(
["git", "commit", "-m", message, "--allow-empty"],
cwd=str(self.memory_dir),
capture_output=True,
timeout=10,
)
except (subprocess.TimeoutExpired, FileNotFoundError, OSError):
pass # Git operations are best-effort
# -------------------------------------------------------------------------
# Index helpers
# -------------------------------------------------------------------------
def _update_index_entry(
self,
memory_id: str,
frontmatter: Dict[str, Any],
rel_path: str,
content_preview: str = "",
):
"""Add or update an entry in the index."""
index = self._load_index()
entry = {
"title": frontmatter.get("title", ""),
"type": frontmatter.get("type", "general"),
"tags": frontmatter.get("tags", []),
"importance": frontmatter.get("importance", 0.5),
"confidence": frontmatter.get("confidence", 0.8),
"created": frontmatter.get("created", ""),
"updated": frontmatter.get("updated", ""),
"path": rel_path,
"relations": frontmatter.get("relations", []),
"content_preview": content_preview,
}
# Preserve existing content_preview if not provided
if not content_preview:
existing = index.get("entries", {}).get(memory_id, {})
entry["content_preview"] = existing.get("content_preview", "")
index.setdefault("entries", {})[memory_id] = entry
self._save_index(index)
def _remove_index_entry(self, memory_id: str):
"""Remove an entry from the index."""
index = self._load_index()
index.get("entries", {}).pop(memory_id, None)
self._save_index(index)
def _maybe_refresh_decay(self):
"""Auto-refresh all decay scores if state is older than 24 hours."""
if not self.state_path.exists():
self.decay()
return
try:
state = json.loads(self.state_path.read_text())
updated_str = state.get("updated", "")
if not updated_str:
self.decay()
return
updated_dt = datetime.fromisoformat(updated_str.replace("Z", "+00:00"))
if updated_dt.tzinfo is None:
updated_dt = updated_dt.replace(tzinfo=timezone.utc)
age_hours = (datetime.now(timezone.utc) - updated_dt).total_seconds() / 3600
if age_hours > 24:
self.decay()
except (json.JSONDecodeError, ValueError, OSError):
self.decay()
# -------------------------------------------------------------------------
# Edge index helpers
# -------------------------------------------------------------------------
def _update_edge_index(
self, edge_id: str, edge_data: Dict[str, Any], rel_path: str
):
"""Add or update an edge entry in the index."""
index = self._load_index()
index.setdefault("edges", {})[edge_id] = {
"type": edge_data.get("type", ""),
"from_id": edge_data.get("from_id", ""),
"from_title": edge_data.get("from_title", ""),
"to_id": edge_data.get("to_id", ""),
"to_title": edge_data.get("to_title", ""),
"strength": edge_data.get("strength", 0.8),
"created": edge_data.get("created", ""),
"updated": edge_data.get("updated", ""),
"path": rel_path,
}
self._save_index(index)
def _remove_edge_index(self, edge_id: str):
"""Remove an edge entry from the index."""
index = self._load_index()
index.get("edges", {}).pop(edge_id, None)
self._save_index(index)
def _scan_for_edge(self, edge_id: str) -> Optional[Path]:
"""Fallback file scan for an edge by ID if index is stale."""
edges_dir = self.memory_dir / "graph" / EDGES_DIR_NAME
if not edges_dir.exists():
return None
for md_file in edges_dir.glob("*.md"):
try:
fm, _ = self._read_memory_file(md_file)
if fm.get("id") == edge_id:
return md_file
except Exception:
continue
return None
def _resolve_edge_path(self, edge_id: str) -> Optional[Path]:
"""Find the file path for an edge by ID using the index."""
index = self._load_index()
entry = index.get("edges", {}).get(edge_id)
if entry:
path = self.memory_dir / entry["path"]
if path.exists():
return path
return self._scan_for_edge(edge_id)
# =========================================================================
# PUBLIC API
# =========================================================================
def store(
self,
type: str,
title: str,
content: str,
tags: Optional[List[str]] = None,
importance: float = 0.5,
confidence: float = 0.8,
steps: Optional[List[str]] = None,
preconditions: Optional[List[str]] = None,
postconditions: Optional[List[str]] = None,
) -> str:
"""Store a new memory as a markdown file.
Returns the memory UUID.
"""
if type not in VALID_TYPES:
raise ValueError(f"Invalid type: {type}. Valid: {VALID_TYPES}")
memory_id = str(uuid.uuid4())
now = datetime.now(timezone.utc).isoformat()
tags = [t.lower().strip() for t in (tags or [])]
frontmatter = {
"id": memory_id,
"type": type,
"title": title,
"tags": tags,
"importance": max(0.0, min(1.0, importance)),
"confidence": max(0.0, min(1.0, confidence)),
"created": now,
"updated": now,
}
# Add procedure-specific fields when type is "procedure"
if type == "procedure":
if steps:
frontmatter["steps"] = steps
if preconditions:
frontmatter["preconditions"] = preconditions
if postconditions:
frontmatter["postconditions"] = postconditions
# Determine file path
type_dir = TYPE_DIRS[type]
filename = make_filename(title, memory_id)
rel_path = f"graph/{type_dir}/{filename}"
full_path = self.memory_dir / rel_path
# Write file
self._write_memory_file(full_path, frontmatter, content)
# Update index with content preview (truncate at word boundary)
preview = content.strip()[:200]
if len(content.strip()) > 200:
last_space = preview.rfind(" ")
if last_space > 0:
preview = preview[:last_space]
self._update_index_entry(
memory_id, frontmatter, rel_path, content_preview=preview
)
# Init state entry
state = self._load_state()
state.setdefault("entries", {})[memory_id] = {
"access_count": 0,
"last_accessed": now,
"decay_score": importance * TYPE_WEIGHTS.get(type, 1.0) * 0.5,
}
self._save_state(state)
# Git commit
self._git_commit(f"store: {title}", [full_path])
return memory_id
def recall(
self,
query: str,
memory_types: Optional[List[str]] = None,
limit: int = 10,
semantic: bool = False,
) -> List[Dict[str, Any]]:
"""Search memories by query, ranked by relevance and decay score.
When semantic=True and embeddings exist, merges keyword and semantic results.
"""
self._maybe_refresh_decay()
query_lower = query.lower().strip()
terms = query_lower.split()
if not terms:
return []
index = self._load_index()
state = self._load_state()
results = []
for mid, entry in index.get("entries", {}).items():
# Filter by type
if memory_types and entry.get("type") not in memory_types:
continue
# Check decay threshold - skip archived
s = state.get("entries", {}).get(mid, {})
decay = s.get("decay_score", 0.5)
if decay < THRESHOLD_DORMANT:
continue
# Score based on term matches
title = (entry.get("title") or "").lower()
tags_str = " ".join(entry.get("tags") or []).lower()
title_matches = sum(1 for t in terms if t in title) * 3
tag_matches = sum(1 for t in terms if t in tags_str) * 2
score = title_matches + tag_matches
if score == 0:
# Check content_preview from index (no file I/O needed)
preview = (entry.get("content_preview") or "").lower()
preview_matches = sum(1 for t in terms if t in preview)
if preview_matches > 0:
score = preview_matches
else:
continue
# Weight by decay score
weighted_score = score * (1 + decay)
results.append(
{
"id": mid,
"type": entry.get("type"),
"title": entry.get("title"),
"tags": entry.get("tags", []),
"importance": entry.get("importance"),
"decay_score": round(decay, 3),
"path": entry.get("path"),
"created": entry.get("created"),
"_score": weighted_score,
}
)
results.sort(key=lambda x: x.pop("_score", 0), reverse=True)
keyword_results = results[:limit]
# Merge with semantic results if requested
if semantic:
embeddings_path = self.memory_dir / "_embeddings.json"
if embeddings_path.exists():
sem_results = self.semantic_recall(query, limit=limit)
if sem_results:
# Build merged score map: keyword_score + similarity * 5
score_map: Dict[str, float] = {}
result_map: Dict[str, Dict] = {}
# Add keyword results with their position-based score
for i, r in enumerate(keyword_results):
mid = r["id"]
score_map[mid] = float(limit - i) # higher rank = higher score
result_map[mid] = r
# Add semantic results
for r in sem_results:
mid = r["id"]
sim_score = r.get("similarity", 0.0) * 5
if mid in score_map:
score_map[mid] += sim_score
result_map[mid]["similarity"] = r.get("similarity", 0.0)
else:
score_map[mid] = sim_score
# Enrich with index data for consistent return format
idx_entry = index.get("entries", {}).get(mid, {})
s = state.get("entries", {}).get(mid, {})
result_map[mid] = {
"id": mid,
"type": r.get("type"),
"title": r.get("title"),
"tags": r.get("tags", []),
"importance": idx_entry.get("importance"),
"decay_score": round(s.get("decay_score", 0.5), 3),
"similarity": r.get("similarity", 0.0),
"path": r.get("path"),
"created": idx_entry.get("created"),
}
# Sort by merged score
merged = sorted(
result_map.values(),
key=lambda x: score_map.get(x["id"], 0),
reverse=True,
)
return merged[:limit]
return keyword_results
def get(self, memory_id: str) -> Optional[Dict[str, Any]]:
"""Get a memory by ID, update access count."""
path = self._resolve_memory_path(memory_id)
if not path:
return None
fm, body = self._read_memory_file(path)
# Update access count in state
state = self._load_state()
now = datetime.now(timezone.utc).isoformat()
entry = state.setdefault("entries", {}).setdefault(
memory_id,
{
"access_count": 0,
"last_accessed": now,
"decay_score": 0.5,
},
)
entry["access_count"] = entry.get("access_count", 0) + 1
entry["last_accessed"] = now
# Recalculate decay score for this single memory (just accessed, so days=0)
importance = fm.get("importance", 0.5)
mem_type = fm.get("type", "general")
type_weight = TYPE_WEIGHTS.get(mem_type, 1.0)
entry["decay_score"] = round(
calculate_decay_score(importance, 0, entry["access_count"], type_weight), 4
)
self._save_state(state)
return {
"id": fm.get("id", memory_id),
"type": fm.get("type"),
"title": fm.get("title"),
"content": body,
"tags": fm.get("tags", []),
"importance": fm.get("importance"),
"confidence": fm.get("confidence"),
"created": fm.get("created"),
"updated": fm.get("updated"),
"relations": fm.get("relations", []),
"steps": fm.get("steps", []),
"preconditions": fm.get("preconditions", []),
"postconditions": fm.get("postconditions", []),
"access_count": entry["access_count"],
"decay_score": entry.get("decay_score", 0.5),
"path": str(path.relative_to(self.memory_dir)),
}
def relate(
self,
from_id: str,
to_id: str,
rel_type: str,
strength: float = 0.8,
context: Optional[str] = None,
description: Optional[str] = None,
) -> str:
"""Create a relationship between two memories with an edge file.
Returns edge_id string, or empty string if duplicate.
"""
if rel_type not in VALID_RELATION_TYPES:
raise ValueError(
f"Invalid relation type: {rel_type}. Valid: {VALID_RELATION_TYPES}"
)
from_path = self._resolve_memory_path(from_id)
to_path = self._resolve_memory_path(to_id)
if not from_path or not to_path:
raise ValueError(f"Memory not found: {from_id if not from_path else to_id}")
# Read source memory
fm, body = self._read_memory_file(from_path)
relations = fm.get("relations", [])
# Check for duplicate
for r in relations:
if r.get("target") == to_id and r.get("type") == rel_type:
return "" # Already exists
# Read target memory for title
to_fm, to_body = self._read_memory_file(to_path)
# Create edge file
edge_id = str(uuid.uuid4())
now = datetime.now(timezone.utc).isoformat()
from_title = fm.get("title", from_id[:8])
to_title = to_fm.get("title", to_id[:8])
clamped_strength = max(0.0, min(1.0, strength))
edge_data = {
"id": edge_id,
"type": rel_type,
"from_id": from_id,
"from_title": from_title,
"to_id": to_id,
"to_title": to_title,
"strength": clamped_strength,
"created": now,
"updated": now,
}
edge_filename = _make_edge_filename(from_title, rel_type, to_title, edge_id)
edge_path = self.memory_dir / "graph" / EDGES_DIR_NAME / edge_filename
edge_fm_str = serialize_edge_frontmatter(edge_data)
edge_body = description.strip() if description else ""
edge_content = (
f"{edge_fm_str}\n\n{edge_body}\n" if edge_body else f"{edge_fm_str}\n"
)
edge_path.write_text(edge_content, encoding="utf-8")
# Update source memory frontmatter with edge_id
new_rel = {
"target": to_id,
"type": rel_type,
"direction": "outgoing",
"strength": clamped_strength,
"edge_id": edge_id,
}
if context:
new_rel["context"] = context
relations.append(new_rel)
fm["relations"] = relations
fm["updated"] = now
self._write_memory_file(from_path, fm, body)
# Add incoming relation to target with edge_id
to_relations = to_fm.get("relations", [])
has_incoming = any(
r.get("target") == from_id
and r.get("type") == rel_type
and r.get("direction") == "incoming"
for r in to_relations
)
if not has_incoming:
incoming_rel = {
"target": from_id,
"type": rel_type,
"direction": "incoming",
"strength": clamped_strength,
"edge_id": edge_id,
}
if context:
incoming_rel["context"] = context
to_relations.append(incoming_rel)
to_fm["relations"] = to_relations
to_fm["updated"] = now
self._write_memory_file(to_path, to_fm, to_body)
# Update memory index
rel_from = str(from_path.relative_to(self.memory_dir))
rel_to = str(to_path.relative_to(self.memory_dir))
self._update_index_entry(from_id, fm, rel_from)
self._update_index_entry(to_id, to_fm, rel_to)
# Update edge index
self._update_edge_index(
edge_id, edge_data, f"graph/{EDGES_DIR_NAME}/{edge_filename}"
)
self._git_commit(
f"relate: {from_id[:8]} --{rel_type}--> {to_id[:8]}",
[from_path, to_path, edge_path],
)
return edge_id
def edge_get(self, edge_id: str) -> Optional[Dict[str, Any]]:
"""Read full edge file (frontmatter + body)."""
path = self._resolve_edge_path(edge_id)
if not path:
return None
fm, body = self._read_memory_file(path)
return {
"id": fm.get("id", edge_id),
"type": fm.get("type", ""),
"from_id": fm.get("from_id", ""),
"from_title": fm.get("from_title", ""),
"to_id": fm.get("to_id", ""),
"to_title": fm.get("to_title", ""),
"strength": fm.get("strength", 0.8),
"created": fm.get("created", ""),
"updated": fm.get("updated", ""),
"description": body.strip(),
"path": str(path.relative_to(self.memory_dir)),
}
def edge_search(
self,
query: Optional[str] = None,
types: Optional[List[str]] = None,
from_id: Optional[str] = None,
to_id: Optional[str] = None,
limit: int = 20,
) -> List[Dict[str, Any]]:
"""Search edges via index."""
index = self._load_index()
results = []
query_lower = query.lower().strip() if query else ""
for eid, entry in index.get("edges", {}).items():
if types and entry.get("type") not in types:
continue
if from_id and entry.get("from_id") != from_id:
continue
if to_id and entry.get("to_id") != to_id:
continue
if query_lower:
searchable = f"{entry.get('from_title', '')} {entry.get('to_title', '')} {entry.get('type', '')}".lower()
if query_lower not in searchable:
continue
results.append({"id": eid, **entry})
results.sort(key=lambda x: x.get("created", ""), reverse=True)
return results[:limit]
def edge_update(
self,
edge_id: str,
description: Optional[str] = None,
strength: Optional[float] = None,
) -> bool:
"""Update edge body/metadata, sync strength to memory frontmatter."""
path = self._resolve_edge_path(edge_id)
if not path:
return False
fm, body = self._read_memory_file(path)
now = datetime.now(timezone.utc).isoformat()
if description is not None:
body = description
if strength is not None:
fm["strength"] = max(0.0, min(1.0, strength))
fm["updated"] = now
# Write edge file
edge_fm_str = serialize_edge_frontmatter(fm)
edge_body = body.strip() if body else ""
edge_content = (
f"{edge_fm_str}\n\n{edge_body}\n" if edge_body else f"{edge_fm_str}\n"
)
path.write_text(edge_content, encoding="utf-8")
# Sync strength to memory frontmatter if changed
if strength is not None:
for mid_key in ("from_id", "to_id"):
mid = fm.get(mid_key)
if not mid:
continue
mem_path = self._resolve_memory_path(mid)
if not mem_path:
continue
mem_fm, mem_body = self._read_memory_file(mem_path)
for rel in mem_fm.get("relations", []):
if rel.get("edge_id") == edge_id:
rel["strength"] = fm["strength"]
mem_fm["updated"] = now
self._write_memory_file(mem_path, mem_fm, mem_body)
# Update edge index
rel_path = str(path.relative_to(self.memory_dir))
self._update_edge_index(edge_id, fm, rel_path)
self._git_commit(f"edge-update: {edge_id[:8]}", [path])
return True
def edge_delete(self, edge_id: str) -> bool:
"""Remove edge file and clean frontmatter refs from both memories."""
path = self._resolve_edge_path(edge_id)
if not path:
return False
fm, _ = self._read_memory_file(path)
now = datetime.now(timezone.utc).isoformat()
files_to_commit: List[Path] = []
# Clean edge_id references from both memories
for mid_key in ("from_id", "to_id"):
mid = fm.get(mid_key)
if not mid:
continue
mem_path = self._resolve_memory_path(mid)
if not mem_path:
continue
mem_fm, mem_body = self._read_memory_file(mem_path)
original_rels = mem_fm.get("relations", [])
mem_fm["relations"] = [
r for r in original_rels if r.get("edge_id") != edge_id
]
if len(mem_fm["relations"]) != len(original_rels):
mem_fm["updated"] = now
self._write_memory_file(mem_path, mem_fm, mem_body)
rel_p = str(mem_path.relative_to(self.memory_dir))
self._update_index_entry(mid, mem_fm, rel_p)
files_to_commit.append(mem_path)
# Remove edge file
path.unlink()
self._remove_edge_index(edge_id)
# Git stage deletion
try:
rel_path = path.relative_to(self.memory_dir)
subprocess.run(
["git", "rm", "--cached", str(rel_path)],
cwd=str(self.memory_dir),
capture_output=True,
timeout=5,
)
except Exception:
pass
self._git_commit(f"edge-delete: {edge_id[:8]}")
return True
def search(
self,
query: Optional[str] = None,
memory_types: Optional[List[str]] = None,
tags: Optional[List[str]] = None,
min_importance: Optional[float] = None,
limit: int = 20,
) -> List[Dict[str, Any]]:
"""Filter memories by type, tags, importance. Optional text query."""
self._maybe_refresh_decay()
index = self._load_index()
state = self._load_state()
query_terms = query.lower().strip().split() if query else []
filter_tags = set(t.lower() for t in tags) if tags else None
results = []
for mid, entry in index.get("entries", {}).items():
# Type filter
if memory_types and entry.get("type") not in memory_types:
continue
# Importance filter
if min_importance and entry.get("importance", 0) < min_importance:
continue
# Tag filter
if filter_tags:
mem_tags = set(t.lower() for t in entry.get("tags", []))
if not mem_tags.intersection(filter_tags):
continue
# Query filter
if query_terms:
title = (entry.get("title") or "").lower()
tags_str = " ".join(entry.get("tags") or []).lower()
searchable = f"{title} {tags_str}"
if not any(t in searchable for t in query_terms):
# Check content_preview from index (no file I/O needed)
preview = (entry.get("content_preview") or "").lower()
if not any(t in preview for t in query_terms):
continue
s = state.get("entries", {}).get(mid, {})
results.append(
{
"id": mid,
"type": entry.get("type"),
"title": entry.get("title"),
"tags": entry.get("tags", []),
"importance": entry.get("importance"),
"decay_score": round(s.get("decay_score", 0.5), 3),
"path": entry.get("path"),
"created": entry.get("created"),
}
)
# Sort by importance descending
results.sort(key=lambda x: x.get("importance", 0), reverse=True)
return results[:limit]
def update(
self,
memory_id: str,
title: Optional[str] = None,
content: Optional[str] = None,
tags: Optional[List[str]] = None,
importance: Optional[float] = None,
) -> bool:
"""Update fields of an existing memory."""
path = self._resolve_memory_path(memory_id)
if not path:
return False
fm, body = self._read_memory_file(path)
now = datetime.now(timezone.utc).isoformat()
if title is not None:
fm["title"] = title
if tags is not None:
fm["tags"] = [t.lower().strip() for t in tags]
if importance is not None:
fm["importance"] = max(0.0, min(1.0, importance))
if content is not None:
body = content
fm["updated"] = now
self._write_memory_file(path, fm, body)
# Update index (refresh content_preview if content changed)
rel_path = str(path.relative_to(self.memory_dir))
preview = ""
if content is not None:
preview = body.strip()[:200]
if len(body.strip()) > 200:
last_space = preview.rfind(" ")
if last_space > 0:
preview = preview[:last_space]
self._update_index_entry(memory_id, fm, rel_path, content_preview=preview)
self._git_commit(f"update: {fm.get('title', memory_id[:8])}", [path])
return True
def delete(self, memory_id: str) -> bool:
"""Delete a memory file, cascade-delete edges, remove from index."""
path = self._resolve_memory_path(memory_id)
if not path:
return False
fm, _ = self._read_memory_file(path)
title = fm.get("title", memory_id[:8])
# Cascade-delete edges where from_id or to_id matches
index = self._load_index()
edges_to_delete = []
for eid, edata in index.get("edges", {}).items():
if edata.get("from_id") == memory_id or edata.get("to_id") == memory_id:
edges_to_delete.append(eid)
for eid in edges_to_delete:
self.edge_delete(eid)
# Remove file
path.unlink()
# Remove from index and state
self._remove_index_entry(memory_id)
state = self._load_state()
state.get("entries", {}).pop(memory_id, None)
self._save_state(state)
# Remove incoming references from other memories
index = self._load_index()
for mid, entry in index.get("entries", {}).items():
rels = entry.get("relations", [])
original_count = len(rels)
rels = [r for r in rels if r.get("target") != memory_id]
if len(rels) != original_count:
entry["relations"] = rels
other_path = self._resolve_memory_path(mid)
if other_path:
try:
other_fm, other_body = self._read_memory_file(other_path)
other_fm["relations"] = [
r
for r in other_fm.get("relations", [])
if r.get("target") != memory_id
]
self._write_memory_file(other_path, other_fm, other_body)
except Exception:
pass
self._save_index(index)
# Git: stage deletion
try:
rel_path = path.relative_to(self.memory_dir)
subprocess.run(
["git", "rm", "--cached", str(rel_path)],
cwd=str(self.memory_dir),
capture_output=True,
timeout=5,
)
except Exception:
pass
self._git_commit(f"delete: {title}")
return True
def related(
self,
memory_id: str,
rel_types: Optional[List[str]] = None,
max_depth: int = 1,
) -> List[Dict[str, Any]]:
"""Traverse relations from a memory, depth-limited BFS."""
max_depth = max(1, min(5, max_depth))
index = self._load_index()
visited = set()
results = []
def traverse(mid: str, depth: int):
if depth > max_depth or mid in visited:
return
visited.add(mid)
entry = index.get("entries", {}).get(mid)
if not entry:
return
for rel in entry.get("relations", []):
target_id = rel.get("target")
if not target_id or target_id in visited:
continue
if rel_types and rel.get("type") not in rel_types:
continue
target_entry = index.get("entries", {}).get(target_id)
if target_entry:
results.append(
{
"id": target_id,
"type": target_entry.get("type"),
"title": target_entry.get("title"),
"relationship": rel.get("type"),
"direction": rel.get("direction", "outgoing"),
"strength": rel.get("strength"),
"depth": depth,
}
)
traverse(target_id, depth + 1)
traverse(memory_id, 1)
return results
def stats(self) -> Dict[str, Any]:
"""Get statistics about the memory system."""
index = self._load_index()
state = self._load_state()
entries = index.get("entries", {})
by_type = {}
total_relations = 0
for entry in entries.values():
t = entry.get("type", "unknown")
by_type[t] = by_type.get(t, 0) + 1
total_relations += len(entry.get("relations", []))
# Count files per directory
dir_counts = {}
for type_dir in TYPE_DIRS.values():
d = self.memory_dir / "graph" / type_dir
if d.exists():
dir_counts[type_dir] = len(list(d.glob("*.md")))
vault_count = (
len(list((self.memory_dir / "vault").glob("*.md")))
if (self.memory_dir / "vault").exists()
else 0
)
# Decay stats
state_entries = state.get("entries", {})
active = sum(
1
for s in state_entries.values()
if s.get("decay_score", 0) >= THRESHOLD_ACTIVE
)
fading = sum(
1
for s in state_entries.values()
if THRESHOLD_FADING <= s.get("decay_score", 0) < THRESHOLD_ACTIVE
)
dormant = sum(
1
for s in state_entries.values()
if THRESHOLD_DORMANT <= s.get("decay_score", 0) < THRESHOLD_FADING
)
archived = sum(
1
for s in state_entries.values()
if s.get("decay_score", 0) < THRESHOLD_DORMANT
)
# Unique outgoing relations only (avoid double-counting)
unique_relations = total_relations // 2 if total_relations > 0 else 0
return {
"total_memories": len(entries),
"by_type": by_type,
"dir_counts": dir_counts,
"vault_count": vault_count,
"total_relations": unique_relations,
"decay_summary": {
"active": active,
"fading": fading,
"dormant": dormant,
"archived": archived,
},
"memory_dir": str(self.memory_dir),
}
def recent(self, limit: int = 20) -> List[Dict[str, Any]]:
"""Get most recently created memories."""
index = self._load_index()
entries = [
{"id": mid, **entry} for mid, entry in index.get("entries", {}).items()
]
entries.sort(key=lambda x: x.get("created", ""), reverse=True)
return entries[:limit]
def decay(self) -> Dict[str, Any]:
"""Recalculate all decay scores. Updates _state.json."""
index = self._load_index()
state = self._load_state()
now = datetime.now(timezone.utc)
updated_count = 0
for mid, entry in index.get("entries", {}).items():
s = state.setdefault("entries", {}).setdefault(
mid,
{
"access_count": 0,
"last_accessed": entry.get("created", now.isoformat()),
"decay_score": 0.5,
},
)
# Calculate days since last access
last_str = s.get("last_accessed", entry.get("created", ""))
try:
last_dt = datetime.fromisoformat(last_str.replace("Z", "+00:00"))
if last_dt.tzinfo is None:
last_dt = last_dt.replace(tzinfo=timezone.utc)
days = (now - last_dt).total_seconds() / 86400
except (ValueError, AttributeError):
days = 30 # Default to 30 days if unparseable
importance = entry.get("importance", 0.5)
mem_type = entry.get("type", "general")
type_weight = TYPE_WEIGHTS.get(mem_type, 1.0)
access_count = s.get("access_count", 0)
# Check if pinned (vault)
path = entry.get("path", "")
if path.startswith("vault/"):
s["decay_score"] = 999.0 # Pinned memories never decay
else:
s["decay_score"] = round(
calculate_decay_score(importance, days, access_count, type_weight),
4,
)
updated_count += 1
self._save_state(state)
# Summary
state_entries = state.get("entries", {})
return {
"updated": updated_count,
"active": sum(
1
for s in state_entries.values()
if s.get("decay_score", 0) >= THRESHOLD_ACTIVE
),
"fading": sum(
1
for s in state_entries.values()
if THRESHOLD_FADING <= s.get("decay_score", 0) < THRESHOLD_ACTIVE
),
"dormant": sum(
1
for s in state_entries.values()
if THRESHOLD_DORMANT <= s.get("decay_score", 0) < THRESHOLD_FADING
),
"archived": sum(
1
for s in state_entries.values()
if 0 < s.get("decay_score", 0) < THRESHOLD_DORMANT
),
}
def core(self) -> str:
"""Generate CORE.md from top memories by decay score."""
index = self._load_index()
state = self._load_state()
def _extract_summary(body: str, max_len: int = 80) -> str:
"""Extract first meaningful sentence from memory body."""
if not body or not body.strip():
return ""
# Skip leading code blocks, headings, and list markers
for ln in body.strip().split("\n"):
stripped = ln.strip()
if not stripped:
continue
if stripped.startswith("```") or stripped.startswith("#"):
continue
first_line = stripped.lstrip("- *>").strip()
break
else:
return ""
if not first_line:
return ""
# Extract first sentence (split on '. ')
dot_pos = first_line.find(". ")
if 0 < dot_pos < max_len:
sentence = first_line[: dot_pos + 1]
else:
sentence = first_line
if len(sentence) > max_len:
sentence = sentence[: max_len - 3].rstrip() + "..."
return sentence
# Collect all memories with decay scores
memories = []
for mid, entry in index.get("entries", {}).items():
s = state.get("entries", {}).get(mid, {})
decay = s.get("decay_score", 0.5)
if decay < THRESHOLD_FADING:
continue # Skip low-relevance
memories.append(
{
"id": mid,
"title": entry.get("title", ""),
"type": entry.get("type", "general"),
"path": entry.get("path", ""),
"importance": entry.get("importance", 0.5),
"decay_score": decay,
"tags": entry.get("tags", []),
}
)
# Sort by decay score descending
memories.sort(key=lambda x: x["decay_score"], reverse=True)
# Group by type category
categories = {
"Critical Solutions": ["solution"],
"Active Decisions": ["decision"],
"Key Fixes": ["fix"],
"Configurations": ["configuration"],
"Key Procedures": ["procedure"],
"Patterns & Workflows": ["code_pattern", "workflow"],
"Known Issues": ["problem", "error"],
"Insights": ["insight"],
"General": ["general"],
}
now_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
active_count = sum(1 for m in memories if m["decay_score"] >= THRESHOLD_ACTIVE)
total_count = len(index.get("entries", {}))
lines = [
"# Memory Core (auto-generated)",
f"> Last updated: {now_str} | Active memories: {active_count}/{total_count} | Next refresh: daily (systemd timer)",
"",
]
char_count = sum(len(l) for l in lines)
for cat_name, cat_types in categories.items():
cat_memories = [m for m in memories if m["type"] in cat_types]
if not cat_memories:
continue
section_lines = [f"## {cat_name}", ""]
for mem in cat_memories[:10]: # Cap per section (reduced for summaries)
path = mem["path"]
title = mem["title"]
tags = ", ".join(mem["tags"][:3]) if mem["tags"] else ""
# Read body to extract summary
summary = ""
mem_path = self.memory_dir / path
if mem_path.exists():
try:
_, body = self._read_memory_file(mem_path)
summary = _extract_summary(body)
except Exception:
pass
line = f"- [{title}]({path})"
if summary:
line += f" - {summary}"
if tags:
line += f" ({tags})"
section_lines.append(line)
# Check budget
added = sum(len(l) for l in section_lines) + 2
if char_count + added > CORE_MAX_CHARS:
break
section_lines.append("")
section_text = "\n".join(section_lines)
if char_count + len(section_text) > CORE_MAX_CHARS:
break
lines.extend(section_lines)
char_count += len(section_text)
core_content = "\n".join(lines)
core_path = self.memory_dir / "CORE.md"
core_path.write_text(core_content, encoding="utf-8")
self._git_commit("core: regenerate CORE.md", [core_path])
return core_content
def episode(
self,
type: str,
title: str,
tags: Optional[List[str]] = None,
summary: Optional[str] = None,
memory_link: Optional[str] = None,
):
"""Append an entry to today's episode file."""
now_local = datetime.now().astimezone()
today = now_local.strftime("%Y-%m-%d")
time_str = now_local.strftime("%H:%M")
ep_path = self.memory_dir / "episodes" / f"{today}.md"
tags = tags or []
tags_str = ", ".join(tags)
entry_lines = [
f"## {time_str} - {title}",
f"- **Type:** {type}",
]
if tags_str:
entry_lines.append(f"- **Tags:** {tags_str}")
if memory_link:
# Extract name from link path
name = Path(memory_link).stem
entry_lines.append(f"- **Memory:** [{name}]({memory_link})")
if summary:
entry_lines.append(f"- **Summary:** {summary}")
entry_text = "\n".join(entry_lines)
if ep_path.exists():
existing = ep_path.read_text(encoding="utf-8")
new_content = f"{existing.rstrip()}\n\n{entry_text}\n"
else:
new_content = f"# {today}\n\n{entry_text}\n"
ep_path.write_text(new_content, encoding="utf-8")
self._git_commit(f"episode: {title}", [ep_path])
def reindex(self) -> int:
"""Rebuild _index.json from all markdown files. Recovery command."""
index = {"version": 2, "updated": "", "count": 0, "entries": {}, "edges": {}}
count = 0
search_dirs = [
("graph", self.memory_dir / "graph"),
("vault", self.memory_dir / "vault"),
]
edges_dir = self.memory_dir / "graph" / EDGES_DIR_NAME
for _prefix, search_dir in search_dirs:
if not search_dir.exists():
continue
for md_file in search_dir.rglob("*.md"):
# Skip edge files — handled separately
if edges_dir.exists() and md_file.parent == edges_dir:
continue
try:
fm, body = self._read_memory_file(md_file)
mid = fm.get("id")
if not mid:
continue
rel_path = str(md_file.relative_to(self.memory_dir))
preview = body.strip()[:200]
if len(body.strip()) > 200:
last_space = preview.rfind(" ")
if last_space > 0:
preview = preview[:last_space]
index["entries"][mid] = {
"title": fm.get("title", ""),
"type": fm.get("type", "general"),
"tags": fm.get("tags", []),
"importance": fm.get("importance", 0.5),
"confidence": fm.get("confidence", 0.8),
"created": fm.get("created", ""),
"updated": fm.get("updated", ""),
"path": rel_path,
"relations": fm.get("relations", []),
"content_preview": preview,
}
count += 1
except Exception as e:
print(f"Warning: Failed to index {md_file}: {e}", file=sys.stderr)
# Scan edge files
if edges_dir.exists():
for md_file in edges_dir.glob("*.md"):
try:
fm, _ = self._read_memory_file(md_file)
eid = fm.get("id")
if not eid:
continue
rel_path = str(md_file.relative_to(self.memory_dir))
index["edges"][eid] = {
"type": fm.get("type", ""),
"from_id": fm.get("from_id", ""),
"from_title": fm.get("from_title", ""),
"to_id": fm.get("to_id", ""),
"to_title": fm.get("to_title", ""),
"strength": fm.get("strength", 0.8),
"created": fm.get("created", ""),
"updated": fm.get("updated", ""),
"path": rel_path,
}
except Exception as e:
print(
f"Warning: Failed to index edge {md_file}: {e}", file=sys.stderr
)
self._save_index(index)
return count
def pin(self, memory_id: str) -> bool:
"""Move a memory to vault/ (never decays)."""
path = self._resolve_memory_path(memory_id)
if not path:
return False
fm, body = self._read_memory_file(path)
vault_dir = self.memory_dir / "vault"
vault_dir.mkdir(parents=True, exist_ok=True)
new_path = vault_dir / path.name
self._write_memory_file(new_path, fm, body)
# Remove old file
old_rel = str(path.relative_to(self.memory_dir))
path.unlink()
# Update index with new path
new_rel = str(new_path.relative_to(self.memory_dir))
self._update_index_entry(memory_id, fm, new_rel)
# Set decay to infinity in state
state = self._load_state()
state.setdefault("entries", {}).setdefault(memory_id, {})["decay_score"] = 999.0
self._save_state(state)
self._git_commit(f"pin: {fm.get('title', memory_id[:8])}", [new_path])
return True
# -------------------------------------------------------------------------
# Tag analysis
# -------------------------------------------------------------------------
def tags_list(self, limit: int = 0) -> List[Dict[str, Any]]:
"""List all tags with occurrence counts across all memories.
Returns list of {"tag": str, "count": int} sorted by count descending.
"""
index = self._load_index()
tag_counts: Dict[str, int] = {}
for entry in index.get("entries", {}).values():
for tag in entry.get("tags", []):
normalized = tag.lower().strip()
if normalized:
tag_counts[normalized] = tag_counts.get(normalized, 0) + 1
results = [{"tag": tag, "count": count} for tag, count in tag_counts.items()]
results.sort(key=lambda x: x["count"], reverse=True)
if limit > 0:
results = results[:limit]
return results
def tags_related(self, tag: str, limit: int = 0) -> List[Dict[str, Any]]:
"""Find tags that co-occur with the given tag.
Returns list of {"tag": str, "co_occurrences": int, "memories_with_both": int}
sorted by co_occurrences descending.
"""
tag = tag.lower().strip()
index = self._load_index()
co_counts: Dict[str, int] = {}
for entry in index.get("entries", {}).values():
entry_tags = [t.lower().strip() for t in entry.get("tags", [])]
if tag not in entry_tags:
continue
for other_tag in entry_tags:
if other_tag != tag and other_tag:
co_counts[other_tag] = co_counts.get(other_tag, 0) + 1
results = [
{"tag": t, "co_occurrences": count, "memories_with_both": count}
for t, count in co_counts.items()
]
results.sort(key=lambda x: x["co_occurrences"], reverse=True)
if limit > 0:
results = results[:limit]
return results
def tags_suggest(self, memory_id: str) -> List[Dict[str, Any]]:
"""Suggest tags for a memory based on co-occurrence patterns.
Returns top 10 suggestions as {"tag": str, "score": int, "reason": str}.
"""
index = self._load_index()
entry = index.get("entries", {}).get(memory_id)
if not entry:
return []
existing_tags = set(t.lower().strip() for t in entry.get("tags", []))
if not existing_tags:
return []
# For each existing tag, find co-occurring tags and accumulate scores
candidate_scores: Dict[str, int] = {}
candidate_sources: Dict[str, set] = {}
for existing_tag in existing_tags:
co_tags = self.tags_related(existing_tag)
for co_entry in co_tags:
candidate = co_entry["tag"]
if candidate in existing_tags:
continue
candidate_scores[candidate] = (
candidate_scores.get(candidate, 0) + co_entry["co_occurrences"]
)
if candidate not in candidate_sources:
candidate_sources[candidate] = set()
candidate_sources[candidate].add(existing_tag)
results = []
for candidate, score in candidate_scores.items():
sources = sorted(candidate_sources[candidate])
reason = "co-occurs with: " + ", ".join(sources)
results.append({"tag": candidate, "score": score, "reason": reason})
results.sort(key=lambda x: x["score"], reverse=True)
return results[:10]
# -------------------------------------------------------------------------
# Embedding-based semantic search (hybrid: Ollama + OpenAI)
# -------------------------------------------------------------------------
def _get_embedding_provider(self) -> Dict[str, Any]:
"""Load embedding config from _config.json."""
return _load_memory_config(self.memory_dir / "_config.json")
def _embed_texts_with_fallback(
self,
texts: List[str],
timeout: int = 300,
) -> Tuple[Optional[List[List[float]]], str, str]:
"""Embed texts with fallback chain. Returns (vectors, provider_used, model_used)."""
config = self._get_embedding_provider()
provider = config.get("embedding_provider", "ollama")
# Try configured provider first
if provider == "openai":
api_key = config.get("openai_api_key")
model = config.get("openai_model", OPENAI_MODEL_DEFAULT)
if api_key:
vectors = _openai_embed(texts, api_key, model, timeout=timeout)
if vectors is not None:
return vectors, "openai", model
# Fallback to ollama
ollama_model = config.get("ollama_model", EMBEDDING_MODEL)
vectors = _ollama_embed(texts, timeout=timeout)
if vectors is not None:
return vectors, "ollama", ollama_model
else:
# ollama first
ollama_model = config.get("ollama_model", EMBEDDING_MODEL)
vectors = _ollama_embed(texts, timeout=timeout)
if vectors is not None:
return vectors, "ollama", ollama_model
# Fallback to openai
api_key = config.get("openai_api_key")
model = config.get("openai_model", OPENAI_MODEL_DEFAULT)
if api_key:
vectors = _openai_embed(texts, api_key, model, timeout=timeout)
if vectors is not None:
return vectors, "openai", model
return None, "", ""
def embed(self) -> Dict[str, Any]:
"""Generate embeddings for all memories using configured provider.
Detects provider changes and re-embeds everything (dimension mismatch safety).
Stores vectors in _embeddings.json (not git-tracked).
"""
index = self._load_index()
entries = index.get("entries", {})
if not entries:
return {
"embedded": 0,
"provider": "none",
"model": "",
"path": str(EMBEDDINGS_PATH),
}
# Check for provider change
embeddings_path = self.memory_dir / "_embeddings.json"
old_provider = ""
if embeddings_path.exists():
try:
old_data = json.loads(embeddings_path.read_text())
old_provider = old_data.get("provider", "ollama")
except (json.JSONDecodeError, OSError):
pass
config = self._get_embedding_provider()
new_provider = config.get("embedding_provider", "ollama")
if old_provider and old_provider != new_provider:
print(
f"Provider changed ({old_provider} -> {new_provider}), re-embedding all memories...",
file=sys.stderr,
)
# Build texts to embed
memory_ids = list(entries.keys())
texts = []
for mid in memory_ids:
entry = entries[mid]
title = entry.get("title", "")
preview = entry.get("content_preview", "")
texts.append(f"{title}. {preview}")
# Batch embed in groups of 50
all_embeddings: Dict[str, List[float]] = {}
batch_size = 50
provider_used = ""
model_used = ""
for i in range(0, len(texts), batch_size):
batch_texts = texts[i : i + batch_size]
batch_ids = memory_ids[i : i + batch_size]
vectors, provider_used, model_used = self._embed_texts_with_fallback(
batch_texts,
timeout=300,
)
if vectors is None:
return {
"error": "All embedding providers unavailable",
"embedded": len(all_embeddings),
}
for mid, vec in zip(batch_ids, vectors):
all_embeddings[mid] = vec
# Write embeddings file with provider info
embeddings_data = {
"provider": provider_used,
"model": model_used,
"updated": datetime.now(timezone.utc).isoformat(),
"entries": all_embeddings,
}
embeddings_path.write_text(json.dumps(embeddings_data, default=str))
return {
"embedded": len(all_embeddings),
"provider": provider_used,
"model": model_used,
"path": str(embeddings_path),
}
def semantic_recall(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
"""Search memories by semantic similarity using embeddings.
Uses the same provider that generated stored embeddings to embed the query.
Skips vectors with dimension mismatch as safety guard.
"""
embeddings_path = self.memory_dir / "_embeddings.json"
if not embeddings_path.exists():
return []
try:
emb_data = json.loads(embeddings_path.read_text())
except (json.JSONDecodeError, OSError):
return []
stored = emb_data.get("entries", {})
if not stored:
return []
# Embed query with matching provider
stored_provider = emb_data.get("provider", "ollama")
config = self._get_embedding_provider()
query_vec = None
if stored_provider == "openai":
api_key = config.get("openai_api_key")
model = emb_data.get("model", OPENAI_MODEL_DEFAULT)
if api_key:
vecs = _openai_embed([query], api_key, model)
if vecs:
query_vec = vecs[0]
if query_vec is None and stored_provider == "ollama":
vecs = _ollama_embed([query])
if vecs:
query_vec = vecs[0]
# Last resort: try any available provider
if query_vec is None:
vecs, _, _ = self._embed_texts_with_fallback([query], timeout=30)
if vecs:
query_vec = vecs[0]
if query_vec is None:
return []
query_dim = len(query_vec)
# Score all memories by cosine similarity
index = self._load_index()
scored = []
for mid, vec in stored.items():
# Skip dimension mismatch
if len(vec) != query_dim:
continue
sim = _cosine_similarity(query_vec, vec)
entry = index.get("entries", {}).get(mid)
if entry:
scored.append(
{
"id": mid,
"title": entry.get("title", ""),
"type": entry.get("type", "general"),
"tags": entry.get("tags", []),
"similarity": round(sim, 4),
"path": entry.get("path", ""),
}
)
scored.sort(key=lambda x: x["similarity"], reverse=True)
return scored[:limit]
def reflect(
self,
since: Optional[str] = None,
dry_run: bool = False,
) -> Dict[str, Any]:
"""Review recent memories, identify tag-based clusters, and output consolidation recommendations.
Clusters memories that share 2+ tags using iterative union-find merging.
Does NOT auto-create insights; the agent reads output and decides what to store.
Args:
since: ISO date string (YYYY-MM-DD) to filter memories from. Falls back to
_state.json's last_reflection, then 30 days ago.
dry_run: If True, return analysis without updating _state.json or logging episode.
Returns:
Dict with clusters, total_memories_reviewed, clusters_found, and since date.
"""
now = datetime.now(timezone.utc)
state = self._load_state()
# Determine time window
if since:
since_date = since
elif state.get("last_reflection"):
# last_reflection may be a full ISO timestamp; extract date portion
since_date = state["last_reflection"][:10]
else:
since_dt = now - timedelta(days=30)
since_date = since_dt.strftime("%Y-%m-%d")
# Load recent memories from index
index = self._load_index()
recent_memories = []
for mid, entry in index.get("entries", {}).items():
created = entry.get("created", "")
# Compare date portion only (YYYY-MM-DD)
created_date = created[:10] if created else ""
if created_date >= since_date:
recent_memories.append(
{
"id": mid,
"title": entry.get("title", ""),
"type": entry.get("type", "general"),
"tags": [t.lower().strip() for t in entry.get("tags", [])],
}
)
total_reviewed = len(recent_memories)
# Union-find clustering by tag overlap >= 2
# parent[i] = index of parent in recent_memories list
n = len(recent_memories)
parent = list(range(n))
def find(x: int) -> int:
while parent[x] != x:
parent[x] = parent[parent[x]] # path compression
x = parent[x]
return x
def union(a: int, b: int):
ra, rb = find(a), find(b)
if ra != rb:
parent[rb] = ra
# Compare all pairs, check tag overlap
for i in range(n):
tags_i = set(recent_memories[i]["tags"])
for j in range(i + 1, n):
tags_j = set(recent_memories[j]["tags"])
if len(tags_i & tags_j) >= 2:
union(i, j)
# Group by root
clusters_map: Dict[int, List[int]] = {}
for i in range(n):
root = find(i)
clusters_map.setdefault(root, []).append(i)
# Build output clusters (only clusters with 2+ members)
clusters = []
cluster_id = 0
for indices in clusters_map.values():
if len(indices) < 2:
continue
cluster_id += 1
members = []
all_tag_sets = []
types_seen = set()
for idx in indices:
mem = recent_memories[idx]
members.append(
{
"id": mem["id"],
"title": mem["title"],
"type": mem["type"],
"tags": mem["tags"],
}
)
all_tag_sets.append(set(mem["tags"]))
types_seen.add(mem["type"])
# Common tags = intersection of ALL members
common_tags = (
sorted(set.intersection(*all_tag_sets)) if all_tag_sets else []
)
# Shared tags = tags appearing in 2+ members
tag_counts: Dict[str, int] = {}
for ts in all_tag_sets:
for t in ts:
tag_counts[t] = tag_counts.get(t, 0) + 1
shared_tags = sorted(t for t, c in tag_counts.items() if c >= 2)
# Suggested topic
tag_label = (
", ".join(common_tags[:4])
if common_tags
else ", ".join(shared_tags[:4])
)
type_label = ", ".join(sorted(types_seen))
suggested_topic = f"Pattern: {tag_label} across {type_label}"
clusters.append(
{
"cluster_id": cluster_id,
"members": members,
"common_tags": common_tags,
"shared_tags": shared_tags,
"suggested_topic": suggested_topic,
"member_count": len(members),
}
)
# Sort clusters by member count descending
clusters.sort(key=lambda c: c["member_count"], reverse=True)
# Re-number after sorting
for i, c in enumerate(clusters):
c["cluster_id"] = i + 1
result = {
"clusters": clusters,
"total_memories_reviewed": total_reviewed,
"clusters_found": len(clusters),
"since": since_date,
}
if not dry_run:
# Update state with last_reflection timestamp
state["last_reflection"] = now.isoformat()
self._save_state(state)
# Log an episode entry
self.episode(
type="reflection",
title=f"Reflection: {len(clusters)} clusters from {total_reviewed} memories",
tags=["reflection", "cognitive-memory"],
summary=f"Reviewed {total_reviewed} memories since {since_date}, found {len(clusters)} clusters",
)
# Regenerate REFLECTION.md
self.reflection_summary()
return result
def merge(
self,
keep_id: str,
absorb_id: str,
dry_run: bool = False,
) -> Dict[str, Any]:
"""Merge two memories: absorb one into another.
The 'keep' memory absorbs the content, tags, and relations from
the 'absorb' memory. The absorb memory is then deleted.
All other memories referencing absorb_id are updated to point to keep_id.
If dry_run=True, returns what would change without writing anything.
"""
# Resolve both memory file paths
keep_path = self._resolve_memory_path(keep_id)
if not keep_path:
raise ValueError(f"Keep memory not found: {keep_id}")
absorb_path = self._resolve_memory_path(absorb_id)
if not absorb_path:
raise ValueError(f"Absorb memory not found: {absorb_id}")
# Read both memory files
keep_fm, keep_body = self._read_memory_file(keep_path)
absorb_fm, absorb_body = self._read_memory_file(absorb_path)
keep_title = keep_fm.get("title", keep_id[:8])
absorb_title = absorb_fm.get("title", absorb_id[:8])
# Combine content
merged_body = keep_body
if absorb_body.strip():
merged_body = (
keep_body.rstrip()
+ f"\n\n---\n*Merged from: {absorb_title}*\n\n"
+ absorb_body.strip()
)
# Merge tags (sorted, deduplicated)
keep_tags = keep_fm.get("tags", [])
absorb_tags = absorb_fm.get("tags", [])
merged_tags = sorted(list(set(keep_tags + absorb_tags)))
# importance = max of both
merged_importance = max(
keep_fm.get("importance", 0.5),
absorb_fm.get("importance", 0.5),
)
# Merge relations: combine both relation lists
keep_rels = list(keep_fm.get("relations", []))
absorb_rels = list(absorb_fm.get("relations", []))
combined_rels = keep_rels + absorb_rels
# Replace any relation targeting absorb_id with keep_id
for rel in combined_rels:
if rel.get("target") == absorb_id:
rel["target"] = keep_id
# Deduplicate by (target, type) tuple
seen = set()
deduped_rels = []
for rel in combined_rels:
key = (rel.get("target"), rel.get("type"))
if key not in seen:
seen.add(key)
deduped_rels.append(rel)
# Remove self-referential relations (target == keep_id)
merged_rels = [r for r in deduped_rels if r.get("target") != keep_id]
# Scan all other memories for relations targeting absorb_id
index = self._load_index()
updated_others = []
for mid, entry in index.get("entries", {}).items():
if mid in (keep_id, absorb_id):
continue
rels = entry.get("relations", [])
needs_update = any(r.get("target") == absorb_id for r in rels)
if needs_update:
updated_others.append(mid)
if dry_run:
return {
"dry_run": True,
"keep_id": keep_id,
"keep_title": keep_title,
"absorb_id": absorb_id,
"absorb_title": absorb_title,
"merged_tags": merged_tags,
"merged_importance": merged_importance,
"merged_relations_count": len(merged_rels),
"other_memories_updated": len(updated_others),
"other_memory_ids": updated_others,
}
# Write updated keep memory file
now = datetime.now(timezone.utc).isoformat()
keep_fm["tags"] = merged_tags
keep_fm["importance"] = merged_importance
keep_fm["relations"] = merged_rels
keep_fm["updated"] = now
self._write_memory_file(keep_path, keep_fm, merged_body)
# Update index for keep memory (refresh content_preview with merged body)
keep_rel_path = str(keep_path.relative_to(self.memory_dir))
preview = merged_body.strip()[:200]
if len(merged_body.strip()) > 200:
last_space = preview.rfind(" ")
if last_space > 0:
preview = preview[:last_space]
self._update_index_entry(
keep_id, keep_fm, keep_rel_path, content_preview=preview
)
# Update all other memories that reference absorb_id
for mid in updated_others:
other_path = self._resolve_memory_path(mid)
if not other_path:
continue
try:
other_fm, other_body = self._read_memory_file(other_path)
other_rels = other_fm.get("relations", [])
for rel in other_rels:
if rel.get("target") == absorb_id:
rel["target"] = keep_id
# Deduplicate after replacement
seen_other = set()
deduped_other = []
for rel in other_rels:
key = (rel.get("target"), rel.get("type"))
if key not in seen_other:
seen_other.add(key)
deduped_other.append(rel)
# Remove self-referential
other_fm["relations"] = [
r for r in deduped_other if r.get("target") != mid
]
other_fm["updated"] = now
self._write_memory_file(other_path, other_fm, other_body)
other_rel_path = str(other_path.relative_to(self.memory_dir))
self._update_index_entry(mid, other_fm, other_rel_path)
except Exception:
pass
# Delete absorb memory file and remove from index/state
absorb_path.unlink()
self._remove_index_entry(absorb_id)
state = self._load_state()
state.get("entries", {}).pop(absorb_id, None)
self._save_state(state)
# Git: stage absorb deletion
try:
absorb_rel = absorb_path.relative_to(self.memory_dir)
subprocess.run(
["git", "rm", "--cached", str(absorb_rel)],
cwd=str(self.memory_dir),
capture_output=True,
timeout=5,
)
except Exception:
pass
self._git_commit(f"merge: {keep_title} absorbed {absorb_title}")
return {
"success": True,
"keep_id": keep_id,
"keep_title": keep_title,
"absorb_id": absorb_id,
"absorb_title": absorb_title,
"merged_tags": merged_tags,
"merged_importance": merged_importance,
"merged_relations_count": len(merged_rels),
"other_memories_updated": len(updated_others),
}
def reflection_summary(self) -> str:
"""Generate REFLECTION.md with patterns and insights from the memory system.
Writes to ~/.claude/memory/REFLECTION.md and git-commits the file.
Returns the generated content.
"""
index = self._load_index()
state = self._load_state()
entries = index.get("entries", {})
state_entries = state.get("entries", {})
now_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
# Last reflection date
last_reflection = state.get("last_reflection", "")
last_reflection_date = last_reflection[:10] if last_reflection else "never"
# Count total reflections from episode files
total_reflections = 0
episodes_dir = self.memory_dir / "episodes"
if episodes_dir.exists():
for ep_file in sorted(episodes_dir.glob("*.md")):
try:
text = ep_file.read_text(encoding="utf-8")
# Count lines that look like reflection episode entries
for line in text.split("\n"):
if "Reflection:" in line and line.strip().startswith("## "):
total_reflections += 1
except OSError:
pass
lines = [
"# Reflection Summary (auto-generated)",
f"> Last updated: {now_str} | Last reflection: {last_reflection_date} | Total reflections: {total_reflections}",
"",
]
# === Themes section ===
# Get top tags, then find co-occurrence pairs
lines.append("## Themes")
lines.append("Top tag co-occurrences revealing recurring themes:")
lines.append("")
top_tags = self.tags_list(limit=10)
# Build co-occurrence pairs across top tags
pair_data: Dict[Tuple[str, str], List[str]] = {} # (tag1, tag2) -> [titles]
for tag_info in top_tags[:5]:
tag = tag_info["tag"]
co_tags = self.tags_related(tag, limit=5)
for co in co_tags:
other = co["tag"]
# Canonical pair ordering
pair = tuple(sorted([tag, other]))
if pair in pair_data:
continue
# Find memories with both tags and collect titles
titles = []
for mid, entry in entries.items():
mem_tags = [t.lower().strip() for t in entry.get("tags", [])]
if pair[0] in mem_tags and pair[1] in mem_tags:
titles.append(entry.get("title", "untitled"))
if titles:
pair_data[pair] = titles
# Sort pairs by memory count descending, show top 8
sorted_pairs = sorted(pair_data.items(), key=lambda x: len(x[1]), reverse=True)
for (tag_a, tag_b), titles in sorted_pairs[:8]:
example_titles = ", ".join(f'"{t}"' for t in titles[:3])
lines.append(
f"- **{tag_a} + {tag_b}**: {len(titles)} memories ({example_titles})"
)
if not sorted_pairs:
lines.append("- No co-occurrence data available yet.")
lines.append("")
# === Cross-Project Patterns section ===
lines.append("## Cross-Project Patterns")
lines.append("Tags that span multiple projects:")
lines.append("")
known_projects = [
"major-domo",
"paper-dynasty",
"homelab",
"vagabond-rpg",
"foundryvtt",
"strat-gameplay-webapp",
]
# Build tag -> {project -> count} mapping
tag_project_map: Dict[str, Dict[str, int]] = {}
for mid, entry in entries.items():
mem_tags = [t.lower().strip() for t in entry.get("tags", [])]
# Identify which projects this memory belongs to
mem_projects = [t for t in mem_tags if t in known_projects]
# For non-project tags, record which projects they appear in
non_project_tags = [t for t in mem_tags if t not in known_projects]
for tag in non_project_tags:
if tag not in tag_project_map:
tag_project_map[tag] = {}
for proj in mem_projects:
tag_project_map[tag][proj] = tag_project_map[tag].get(proj, 0) + 1
# Filter to tags spanning 2+ projects, sort by project count then total
cross_project = []
for tag, projects in tag_project_map.items():
if len(projects) >= 2:
total = sum(projects.values())
cross_project.append((tag, projects, total))
cross_project.sort(key=lambda x: (len(x[1]), x[2]), reverse=True)
for tag, projects, total in cross_project[:10]:
proj_parts = ", ".join(
f"{p} ({c})"
for p, c in sorted(projects.items(), key=lambda x: x[1], reverse=True)
)
lines.append(f"- **{tag}**: appears in {proj_parts}")
if not cross_project:
lines.append("- No cross-project patterns detected yet.")
lines.append("")
# === Most Accessed section ===
lines.append("## Most Accessed")
lines.append("Top 10 memories by access count:")
lines.append("")
# Sort state entries by access_count descending
accessed = []
for mid, s in state_entries.items():
count = s.get("access_count", 0)
if count > 0:
entry = entries.get(mid)
if entry:
accessed.append(
(
mid,
entry.get("title", "untitled"),
entry.get("path", ""),
count,
)
)
accessed.sort(key=lambda x: x[3], reverse=True)
for mid, title, path, count in accessed[:10]:
lines.append(f"1. [{title}]({path}) - {count} accesses")
if not accessed:
lines.append("- No access data recorded yet.")
lines.append("")
# === Recent Insights section ===
lines.append("## Recent Insights")
lines.append("Insight-type memories:")
lines.append("")
insights = []
for mid, entry in entries.items():
if entry.get("type") == "insight":
insights.append((mid, entry))
# Sort by created date descending
insights.sort(key=lambda x: x[1].get("created", ""), reverse=True)
for mid, entry in insights[:10]:
title = entry.get("title", "untitled")
path = entry.get("path", "")
preview = entry.get("content_preview", "")
if preview:
lines.append(f"- [{title}]({path}) - {preview[:80]}")
else:
lines.append(f"- [{title}]({path})")
if not insights:
lines.append("- No insight memories stored yet.")
lines.append("")
# === Consolidation History section ===
lines.append("## Consolidation History")
lines.append("")
merge_count = 0
if episodes_dir.exists():
for ep_file in sorted(episodes_dir.glob("*.md")):
try:
text = ep_file.read_text(encoding="utf-8")
for line_text in text.split("\n"):
stripped = line_text.strip()
if stripped.startswith("## ") and "merge" in stripped.lower():
merge_count += 1
except OSError:
pass
lines.append(f"- Total merges performed: {merge_count}")
lines.append("")
content = "\n".join(lines)
# Write REFLECTION.md
reflection_path = self.memory_dir / "REFLECTION.md"
reflection_path.write_text(content, encoding="utf-8")
self._git_commit("reflection: regenerate REFLECTION.md", [reflection_path])
return content
# =============================================================================
# CLI INTERFACE
# =============================================================================
def main():
parser = argparse.ArgumentParser(
description="Cognitive Memory - Markdown-based memory system with decay scoring",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
subparsers = parser.add_subparsers(dest="command", help="Commands")
# store
sp = subparsers.add_parser("store", help="Store a new memory")
sp.add_argument(
"--type", "-t", required=True, choices=VALID_TYPES, help="Memory type"
)
sp.add_argument("--title", required=True, help="Memory title")
sp.add_argument("--content", "-c", required=True, help="Memory content")
sp.add_argument("--tags", help="Comma-separated tags")
sp.add_argument(
"--importance", "-i", type=float, default=0.5, help="Importance 0.0-1.0"
)
sp.add_argument("--confidence", type=float, default=0.8, help="Confidence 0.0-1.0")
sp.add_argument(
"--episode",
action="store_true",
default=False,
help="Also log an episode entry",
)
# recall
sp = subparsers.add_parser("recall", help="Search memories by query")
sp.add_argument("query", help="Search query")
sp.add_argument("--types", help="Comma-separated memory types")
sp.add_argument("--limit", "-n", type=int, default=10, help="Max results")
sp.add_argument(
"--semantic",
action="store_true",
default=False,
help="Also use embedding similarity (requires embed first)",
)
# get
sp = subparsers.add_parser("get", help="Get memory by ID")
sp.add_argument("memory_id", help="Memory UUID")
# relate
sp = subparsers.add_parser("relate", help="Create relationship")
sp.add_argument("from_id", help="Source memory UUID")
sp.add_argument("to_id", help="Target memory UUID")
sp.add_argument("rel_type", choices=VALID_RELATION_TYPES, help="Relationship type")
sp.add_argument("--strength", type=float, default=0.8, help="Strength 0.0-1.0")
sp.add_argument("--context", help="Context description")
sp.add_argument("--description", help="Rich edge description body")
# edge-get
sp = subparsers.add_parser("edge-get", help="Get edge by ID")
sp.add_argument("edge_id", help="Edge UUID")
# edge-search
sp = subparsers.add_parser("edge-search", help="Search edges")
sp.add_argument("--query", "-q", help="Text query")
sp.add_argument("--types", help="Comma-separated relation types")
sp.add_argument("--from-id", help="Filter by source memory ID")
sp.add_argument("--to-id", help="Filter by target memory ID")
sp.add_argument("--limit", "-n", type=int, default=20, help="Max results")
# edge-update
sp = subparsers.add_parser("edge-update", help="Update an edge")
sp.add_argument("edge_id", help="Edge UUID")
sp.add_argument("--description", help="New description body")
sp.add_argument("--strength", type=float, help="New strength 0.0-1.0")
# edge-delete
sp = subparsers.add_parser("edge-delete", help="Delete an edge")
sp.add_argument("edge_id", help="Edge UUID")
# search
sp = subparsers.add_parser("search", help="Filter memories")
sp.add_argument("--query", "-q", help="Text query")
sp.add_argument("--types", help="Comma-separated memory types")
sp.add_argument("--tags", help="Comma-separated tags")
sp.add_argument("--min-importance", type=float, help="Minimum importance")
sp.add_argument("--limit", "-n", type=int, default=20, help="Max results")
# update
sp = subparsers.add_parser("update", help="Update a memory")
sp.add_argument("memory_id", help="Memory UUID")
sp.add_argument("--title", help="New title")
sp.add_argument("--content", help="New content")
sp.add_argument("--tags", help="New tags (comma-separated)")
sp.add_argument("--importance", type=float, help="New importance")
# delete
sp = subparsers.add_parser("delete", help="Delete a memory")
sp.add_argument("memory_id", help="Memory UUID")
sp.add_argument("--force", "-f", action="store_true", help="Skip confirmation")
# related
sp = subparsers.add_parser("related", help="Get related memories")
sp.add_argument("memory_id", help="Memory UUID")
sp.add_argument("--types", help="Comma-separated relationship types")
sp.add_argument("--depth", type=int, default=1, help="Traversal depth 1-5")
# stats
subparsers.add_parser("stats", help="Show statistics")
# recent
sp = subparsers.add_parser("recent", help="Recently created memories")
sp.add_argument("--limit", "-n", type=int, default=20, help="Max results")
# decay
subparsers.add_parser("decay", help="Recalculate all decay scores")
# core
subparsers.add_parser("core", help="Generate CORE.md")
# episode
sp = subparsers.add_parser("episode", help="Log episode entry")
sp.add_argument("--type", "-t", required=True, help="Entry type")
sp.add_argument("--title", required=True, help="Entry title")
sp.add_argument("--tags", help="Comma-separated tags")
sp.add_argument("--summary", "-s", help="Summary text")
sp.add_argument("--memory-link", help="Path to related memory file")
# reindex
subparsers.add_parser("reindex", help="Rebuild index from files")
# embed
subparsers.add_parser(
"embed", help="Generate embeddings for all memories via Ollama"
)
# pin
sp = subparsers.add_parser("pin", help="Move memory to vault (never decays)")
sp.add_argument("memory_id", help="Memory UUID")
# reflect
sp = subparsers.add_parser(
"reflect", help="Review recent memories and identify clusters"
)
sp.add_argument("--since", help="ISO date (YYYY-MM-DD) to review from")
sp.add_argument(
"--dry-run", action="store_true", help="Preview without updating state"
)
# merge
sp = subparsers.add_parser(
"merge", help="Merge two memories (absorb one into another)"
)
sp.add_argument("keep_id", help="Memory UUID to keep")
sp.add_argument("absorb_id", help="Memory UUID to absorb and delete")
sp.add_argument(
"--dry-run", action="store_true", help="Preview merge without writing"
)
# reflection
subparsers.add_parser("reflection", help="Generate REFLECTION.md summary")
# tags
sp = subparsers.add_parser("tags", help="Tag analysis commands")
tags_sub = sp.add_subparsers(dest="tags_command")
sp2 = tags_sub.add_parser("list", help="List all tags with counts")
sp2.add_argument("--limit", "-n", type=int, default=0, help="Max results (0=all)")
sp3 = tags_sub.add_parser("related", help="Find co-occurring tags")
sp3.add_argument("tag", help="Tag to analyze")
sp3.add_argument("--limit", "-n", type=int, default=0, help="Max results (0=all)")
sp4 = tags_sub.add_parser("suggest", help="Suggest tags for a memory")
sp4.add_argument("memory_id", help="Memory UUID")
# procedure
sp = subparsers.add_parser(
"procedure", help="Store a procedure memory (convenience wrapper)"
)
sp.add_argument("--title", required=True, help="Procedure title")
sp.add_argument("--content", "-c", required=True, help="Procedure description")
sp.add_argument("--steps", help="Comma-separated ordered steps")
sp.add_argument("--preconditions", help="Comma-separated preconditions")
sp.add_argument("--postconditions", help="Comma-separated postconditions")
sp.add_argument("--tags", help="Comma-separated tags")
sp.add_argument(
"--importance", "-i", type=float, default=0.5, help="Importance 0.0-1.0"
)
# config
sp = subparsers.add_parser("config", help="Manage embedding config")
sp.add_argument("--show", action="store_true", help="Display current config")
sp.add_argument(
"--provider", choices=["ollama", "openai"], help="Set embedding provider"
)
sp.add_argument("--openai-key", help="Set OpenAI API key")
args = parser.parse_args()
if not args.command:
parser.print_help()
sys.exit(1)
client = CognitiveMemoryClient()
result = None
if args.command == "store":
tags = [t.strip() for t in args.tags.split(",")] if args.tags else None
memory_id = client.store(
type=args.type,
title=args.title,
content=args.content,
tags=tags,
importance=args.importance,
confidence=args.confidence,
)
result = {"success": True, "memory_id": memory_id}
if args.episode:
# Get the relative path from the index for memory_link
index = client._load_index()
entry = index.get("entries", {}).get(memory_id, {})
rel_path = entry.get("path", "")
# Truncate content at word boundary for summary (max 100 chars)
summary = args.content.strip()[:100]
if len(args.content.strip()) > 100:
last_space = summary.rfind(" ")
if last_space > 0:
summary = summary[:last_space]
client.episode(
type=args.type,
title=args.title,
tags=tags or [],
summary=summary,
memory_link=rel_path,
)
result["episode_logged"] = True
elif args.command == "recall":
types = [t.strip() for t in args.types.split(",")] if args.types else None
result = client.recall(
args.query, memory_types=types, limit=args.limit, semantic=args.semantic
)
elif args.command == "get":
result = client.get(args.memory_id)
if not result:
result = {"error": "Memory not found"}
elif args.command == "relate":
edge_id = client.relate(
args.from_id,
args.to_id,
args.rel_type,
strength=args.strength,
context=args.context,
description=args.description,
)
result = {"success": bool(edge_id), "edge_id": edge_id}
elif args.command == "edge-get":
result = client.edge_get(args.edge_id)
if not result:
result = {"error": "Edge not found"}
elif args.command == "edge-search":
types = [t.strip() for t in args.types.split(",")] if args.types else None
result = client.edge_search(
query=args.query,
types=types,
from_id=getattr(args, "from_id", None),
to_id=getattr(args, "to_id", None),
limit=args.limit,
)
elif args.command == "edge-update":
success = client.edge_update(
args.edge_id,
description=args.description,
strength=args.strength,
)
result = {"success": success}
elif args.command == "edge-delete":
success = client.edge_delete(args.edge_id)
result = {"success": success}
elif args.command == "search":
types = [t.strip() for t in args.types.split(",")] if args.types else None
tags = [t.strip() for t in args.tags.split(",")] if args.tags else None
result = client.search(
query=args.query,
memory_types=types,
tags=tags,
min_importance=args.min_importance,
limit=args.limit,
)
elif args.command == "update":
tags = [t.strip() for t in args.tags.split(",")] if args.tags else None
success = client.update(
args.memory_id,
title=args.title,
content=args.content,
tags=tags,
importance=args.importance,
)
result = {"success": success}
elif args.command == "delete":
if not args.force:
mem = client.get(args.memory_id)
if mem:
print(f"Deleting: {mem.get('title')}", file=sys.stderr)
success = client.delete(args.memory_id)
result = {"success": success}
elif args.command == "related":
types = [t.strip() for t in args.types.split(",")] if args.types else None
result = client.related(args.memory_id, rel_types=types, max_depth=args.depth)
elif args.command == "stats":
result = client.stats()
elif args.command == "recent":
result = client.recent(limit=args.limit)
elif args.command == "decay":
result = client.decay()
elif args.command == "core":
content = client.core()
# Print path, not content (content is written to file)
result = {
"success": True,
"path": str(MEMORY_DIR / "CORE.md"),
"chars": len(content),
}
elif args.command == "episode":
tags = [t.strip() for t in args.tags.split(",")] if args.tags else None
client.episode(
type=args.type,
title=args.title,
tags=tags,
summary=args.summary,
memory_link=args.memory_link,
)
result = {"success": True}
elif args.command == "reindex":
count = client.reindex()
result = {"success": True, "indexed": count}
elif args.command == "embed":
print(
"Generating embeddings (this may take a while if model needs to be pulled)...",
file=sys.stderr,
)
result = client.embed()
elif args.command == "pin":
success = client.pin(args.memory_id)
result = {"success": success}
elif args.command == "reflect":
result = client.reflect(
since=args.since,
dry_run=args.dry_run,
)
elif args.command == "merge":
result = client.merge(
keep_id=args.keep_id,
absorb_id=args.absorb_id,
dry_run=args.dry_run,
)
elif args.command == "reflection":
content = client.reflection_summary()
result = {
"success": True,
"path": str(MEMORY_DIR / "REFLECTION.md"),
"chars": len(content),
}
elif args.command == "tags":
if args.tags_command == "list":
result = client.tags_list(limit=args.limit)
elif args.tags_command == "related":
result = client.tags_related(args.tag, limit=args.limit)
elif args.tags_command == "suggest":
result = client.tags_suggest(args.memory_id)
else:
# No subcommand given, print tags help
# Re-parse to get the tags subparser for help output
for action in parser._subparsers._actions:
if isinstance(action, argparse._SubParsersAction):
tags_parser = action.choices.get("tags")
if tags_parser:
tags_parser.print_help()
break
sys.exit(1)
elif args.command == "procedure":
tags = [t.strip() for t in args.tags.split(",")] if args.tags else None
steps = [s.strip() for s in args.steps.split(",")] if args.steps else None
preconditions = (
[p.strip() for p in args.preconditions.split(",")]
if args.preconditions
else None
)
postconditions = (
[p.strip() for p in args.postconditions.split(",")]
if args.postconditions
else None
)
memory_id = client.store(
type="procedure",
title=args.title,
content=args.content,
tags=tags,
importance=args.importance,
steps=steps,
preconditions=preconditions,
postconditions=postconditions,
)
result = {"success": True, "memory_id": memory_id}
elif args.command == "config":
config_path = MEMORY_DIR / "_config.json"
config = _load_memory_config(config_path)
changed = False
if args.provider:
config["embedding_provider"] = args.provider
changed = True
if args.openai_key:
config["openai_api_key"] = args.openai_key
changed = True
if changed:
config_path.write_text(json.dumps(config, indent=2))
result = {"success": True, "updated": True}
elif args.show or not changed:
# Mask API key for display
display = dict(config)
key = display.get("openai_api_key")
if key and isinstance(key, str) and len(key) > 8:
display["openai_api_key"] = key[:4] + "..." + key[-4:]
result = display
print(json.dumps(result, indent=2, default=str))
if __name__ == "__main__":
main()