#!/usr/bin/env python3 """ Cognitive Memory Client Markdown-based memory system with decay scoring, episodic logging, and auto-curated CORE.md. Stores memories as human-readable markdown files with YAML frontmatter. Usage: # CLI python client.py store --type solution --title "Fixed X" --content "Details..." --tags "python,fix" python client.py recall "timeout error" python client.py get python client.py relate SOLVES python client.py core python client.py episode --type fix --title "Fixed reconnection" --tags "discord,python" # Python from client import CognitiveMemoryClient client = CognitiveMemoryClient() memory_id = client.store(type="solution", title="...", content="...") """ import argparse import json import math import os import re import subprocess import sys import urllib.request import uuid from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any, Dict, List, Optional, Tuple from urllib.error import URLError # ============================================================================= # CONSTANTS # ============================================================================= MEMORY_DIR = Path.home() / ".claude" / "memory" INDEX_PATH = MEMORY_DIR / "_index.json" STATE_PATH = MEMORY_DIR / "_state.json" EMBEDDINGS_PATH = MEMORY_DIR / "_embeddings.json" OLLAMA_URL = "http://localhost:11434" EMBEDDING_MODEL = "nomic-embed-text" EMBEDDING_TIMEOUT = 5 # seconds CONFIG_PATH = MEMORY_DIR / "_config.json" OPENAI_EMBED_URL = "https://api.openai.com/v1/embeddings" OPENAI_MODEL_DEFAULT = "text-embedding-3-small" # Memory type -> directory name mapping TYPE_DIRS = { "solution": "solutions", "fix": "fixes", "decision": "decisions", "configuration": "configurations", "problem": "problems", "workflow": "workflows", "code_pattern": "code-patterns", "error": "errors", "general": "general", "procedure": "procedures", "insight": "insights", } VALID_TYPES = list(TYPE_DIRS.keys()) # Decay model type weights TYPE_WEIGHTS = { "decision": 1.3, "solution": 1.2, "insight": 1.25, "code_pattern": 1.1, "configuration": 1.1, "fix": 1.0, "workflow": 1.0, "problem": 0.9, "error": 0.8, "general": 0.8, "procedure": 1.4, } DECAY_LAMBDA = 0.03 # Half-life ~23 days # Decay score thresholds THRESHOLD_ACTIVE = 0.5 THRESHOLD_FADING = 0.2 THRESHOLD_DORMANT = 0.05 # Relationship types (subset from MemoryGraph, focused on most useful) VALID_RELATION_TYPES = [ "SOLVES", "CAUSES", "BUILDS_ON", "ALTERNATIVE_TO", "REQUIRES", "FOLLOWS", "RELATED_TO", ] # Edge file constants EDGES_DIR_NAME = "edges" EDGE_FIELD_ORDER = [ "id", "type", "from_id", "from_title", "to_id", "to_title", "strength", "created", "updated", ] # Frontmatter field order for consistent output FIELD_ORDER = [ "id", "type", "title", "tags", "importance", "confidence", "steps", "preconditions", "postconditions", "created", "updated", "relations", ] # CORE.md token budget (approximate, 1 token ~= 4 chars) CORE_MAX_CHARS = 12000 # ~3K tokens # ============================================================================= # YAML FRONTMATTER PARSING (stdlib only) # ============================================================================= def _needs_quoting(s: str) -> bool: """Check if a YAML string value needs quoting.""" if not s: return True if any(c in s for c in ":#{}[]&*?|>!%@`"): return True try: float(s) return True except ValueError: pass if s.lower() in ("true", "false", "null", "yes", "no", "on", "off"): return True return False def _quote_yaml(s: str) -> str: """Quote a string for YAML, escaping internal quotes.""" escaped = s.replace("\\", "\\\\").replace('"', '\\"') return f'"{escaped}"' def _format_yaml_value(value: Any, force_quote: bool = False) -> str: """Format a Python value for YAML output.""" if value is None: return "null" if isinstance(value, bool): return "true" if value else "false" if isinstance(value, (int, float)): return str(value) s = str(value) if force_quote or _needs_quoting(s): return _quote_yaml(s) return s def _parse_scalar(value: str) -> Any: """Parse a YAML scalar value to Python type.""" v = value.strip() if not v or v == "null": return None if v == "true": return True if v == "false": return False # Try numeric try: if "." in v: return float(v) return int(v) except ValueError: pass # Strip quotes if (v.startswith('"') and v.endswith('"')) or ( v.startswith("'") and v.endswith("'") ): return v[1:-1] return v def serialize_frontmatter(data: Dict[str, Any]) -> str: """Serialize a dict to YAML frontmatter string (between --- markers).""" lines = ["---"] for key in FIELD_ORDER: if key not in data: continue value = data[key] if key == "tags" and isinstance(value, list): if value: items = ", ".join(_format_yaml_value(t) for t in value) lines.append(f"tags: [{items}]") else: lines.append("tags: []") elif key in ("steps", "preconditions", "postconditions") and isinstance( value, list ): if not value: continue lines.append(f"{key}:") for item in value: lines.append(f" - {_format_yaml_value(str(item), force_quote=True)}") elif key == "relations" and isinstance(value, list): if not value: continue lines.append("relations:") for rel in value: first = True for rk in [ "target", "type", "direction", "strength", "context", "edge_id", ]: if rk not in rel: continue rv = rel[rk] prefix = " - " if first else " " force_q = rk in ("context",) lines.append( f"{prefix}{rk}: {_format_yaml_value(rv, force_quote=force_q)}" ) first = False elif key == "title": lines.append(f"title: {_format_yaml_value(value, force_quote=True)}") else: lines.append(f"{key}: {_format_yaml_value(value)}") lines.append("---") return "\n".join(lines) def parse_frontmatter(text: str) -> Tuple[Dict[str, Any], str]: """Parse YAML frontmatter and body from markdown text. Returns (frontmatter_dict, body_text). """ if not text.startswith("---\n"): return {}, text # Find closing --- end_match = re.search(r"\n---\s*\n", text[3:]) if not end_match: # Try end of string if text.rstrip().endswith("---"): end_pos = text.rstrip().rfind("\n---") if end_pos <= 3: return {}, text fm_text = text[4:end_pos] body = "" else: return {}, text else: end_pos = end_match.start() + 3 # Offset from text[3:] fm_text = text[4:end_pos] body = text[end_pos + end_match.end() - end_match.start() :] body = body.lstrip("\n") data = {} lines = fm_text.split("\n") i = 0 while i < len(lines): line = lines[i] # Skip empty lines if not line.strip(): i += 1 continue # Must be a top-level key (no leading whitespace) if line[0] == " ": i += 1 continue if ":" not in line: i += 1 continue key, _, rest = line.partition(":") key = key.strip() rest = rest.strip() if not rest: # Block value - collect indented lines block_lines = [] j = i + 1 while j < len(lines) and lines[j] and lines[j][0] == " ": block_lines.append(lines[j]) j += 1 if key == "relations": data["relations"] = _parse_relations_block(block_lines) elif block_lines and block_lines[0].strip().startswith("- "): # Simple list data[key] = [ _parse_scalar(bl.strip().lstrip("- ")) for bl in block_lines if bl.strip().startswith("- ") ] else: data[key] = None i = j continue # Inline list: [a, b, c] if rest.startswith("[") and rest.endswith("]"): inner = rest[1:-1] if inner.strip(): data[key] = [ _parse_scalar(v.strip()) for v in inner.split(",") if v.strip() ] else: data[key] = [] else: data[key] = _parse_scalar(rest) i += 1 return data, body def _parse_relations_block(lines: List[str]) -> List[Dict[str, Any]]: """Parse a YAML block list of relation dicts.""" relations = [] current = None for line in lines: stripped = line.strip() if not stripped: continue if stripped.startswith("- "): # New relation entry current = {} relations.append(current) # Parse key:value on same line as - rest = stripped[2:] if ":" in rest: k, _, v = rest.partition(":") current[k.strip()] = _parse_scalar(v.strip()) elif current is not None and ":" in stripped: k, _, v = stripped.partition(":") current[k.strip()] = _parse_scalar(v.strip()) return relations # ============================================================================= # HELPER FUNCTIONS # ============================================================================= def slugify(text: str, max_length: int = 60) -> str: """Convert text to a URL-friendly slug.""" text = text.lower().strip() text = re.sub(r"[^\w\s-]", "", text) text = re.sub(r"[\s_]+", "-", text) text = re.sub(r"-+", "-", text) text = text.strip("-") if len(text) > max_length: text = text[:max_length].rstrip("-") return text or "untitled" def make_filename(title: str, memory_id: str) -> str: """Create a filename from title and UUID suffix.""" slug = slugify(title) suffix = memory_id[:6] return f"{slug}-{suffix}.md" def calculate_decay_score( importance: float, days_since_access: float, access_count: int, type_weight: float ) -> float: """Calculate decay score for a memory. decay_score = importance * e^(-lambda * days) * log2(access_count + 1) * type_weight """ time_factor = math.exp(-DECAY_LAMBDA * days_since_access) usage_factor = math.log2(access_count + 1) if access_count > 0 else 1.0 return importance * time_factor * usage_factor * type_weight def _ollama_embed( texts: List[str], model: str = EMBEDDING_MODEL, timeout: int = EMBEDDING_TIMEOUT, ) -> Optional[List[List[float]]]: """Get embeddings from Ollama for a list of texts. Returns list of embedding vectors, or None if Ollama is unavailable. """ try: payload = json.dumps({"model": model, "input": texts}).encode("utf-8") req = urllib.request.Request( f"{OLLAMA_URL}/api/embed", data=payload, headers={"Content-Type": "application/json"}, method="POST", ) with urllib.request.urlopen(req, timeout=timeout) as resp: if resp.status != 200: return None data = json.loads(resp.read().decode("utf-8")) embeddings = data.get("embeddings") if embeddings and isinstance(embeddings, list): return embeddings return None except ( ConnectionRefusedError, URLError, TimeoutError, OSError, json.JSONDecodeError, ValueError, KeyError, ): return None def _load_memory_config(config_path: Optional[Path] = None) -> Dict[str, Any]: """Read _config.json, return defaults if missing.""" path = config_path or CONFIG_PATH defaults = { "embedding_provider": "ollama", "openai_api_key": None, "ollama_model": EMBEDDING_MODEL, "openai_model": OPENAI_MODEL_DEFAULT, } if path.exists(): try: data = json.loads(path.read_text()) for k, v in defaults.items(): data.setdefault(k, v) return data except (json.JSONDecodeError, OSError): pass return defaults def _openai_embed( texts: List[str], api_key: str, model: str = OPENAI_MODEL_DEFAULT, timeout: int = 30, ) -> Optional[List[List[float]]]: """Get embeddings from OpenAI API (stdlib-only, same interface as _ollama_embed).""" try: payload = json.dumps({"input": texts, "model": model}).encode("utf-8") req = urllib.request.Request( OPENAI_EMBED_URL, data=payload, headers={ "Content-Type": "application/json", "Authorization": f"Bearer {api_key}", }, method="POST", ) with urllib.request.urlopen(req, timeout=timeout) as resp: if resp.status != 200: return None data = json.loads(resp.read().decode("utf-8")) items = data.get("data", []) if items and isinstance(items, list): # Sort by index to ensure order matches input items.sort(key=lambda x: x.get("index", 0)) return [item["embedding"] for item in items] return None except ( ConnectionRefusedError, URLError, TimeoutError, OSError, json.JSONDecodeError, ValueError, KeyError, ): return None def _cosine_similarity(a: List[float], b: List[float]) -> float: """Compute cosine similarity between two vectors.""" dot = sum(x * y for x, y in zip(a, b)) norm_a = math.sqrt(sum(x * x for x in a)) norm_b = math.sqrt(sum(x * x for x in b)) if norm_a == 0.0 or norm_b == 0.0: return 0.0 return dot / (norm_a * norm_b) def _make_edge_filename( from_title: str, rel_type: str, to_title: str, edge_id: str ) -> str: """Produce edge filename: {from-slug}--{TYPE}--{to-slug}-{6char}.md""" from_slug = slugify(from_title, max_length=30) to_slug = slugify(to_title, max_length=30) suffix = edge_id[:6] return f"{from_slug}--{rel_type}--{to_slug}-{suffix}.md" def serialize_edge_frontmatter(data: Dict[str, Any]) -> str: """Serialize an edge dict to YAML frontmatter string.""" lines = ["---"] for key in EDGE_FIELD_ORDER: if key not in data: continue value = data[key] if key in ("from_title", "to_title"): lines.append(f"{key}: {_format_yaml_value(value, force_quote=True)}") else: lines.append(f"{key}: {_format_yaml_value(value)}") lines.append("---") return "\n".join(lines) # ============================================================================= # CLIENT # ============================================================================= class CognitiveMemoryClient: """Client for markdown-based cognitive memory system.""" def __init__(self, memory_dir: Optional[Path] = None): self.memory_dir = memory_dir or MEMORY_DIR self.index_path = self.memory_dir / "_index.json" self.state_path = self.memory_dir / "_state.json" self._embeddings_cache: Optional[Dict] = None self._embeddings_mtime: float = 0.0 self._ensure_dirs() def _ensure_dirs(self): """Create directory structure if needed.""" for type_dir in TYPE_DIRS.values(): (self.memory_dir / "graph" / type_dir).mkdir(parents=True, exist_ok=True) (self.memory_dir / "graph" / EDGES_DIR_NAME).mkdir(parents=True, exist_ok=True) (self.memory_dir / "episodes").mkdir(parents=True, exist_ok=True) (self.memory_dir / "vault").mkdir(parents=True, exist_ok=True) def _load_embeddings_cached(self) -> Optional[Dict]: """Load _embeddings.json with mtime-based caching. Returns the parsed dict, or None if the file doesn't exist or fails to parse. Only re-reads from disk when the file's mtime has changed. """ embeddings_path = self.memory_dir / "_embeddings.json" if not embeddings_path.exists(): return None try: current_mtime = embeddings_path.stat().st_mtime except OSError: return None if ( self._embeddings_cache is not None and current_mtime == self._embeddings_mtime ): return self._embeddings_cache try: data = json.loads(embeddings_path.read_text()) self._embeddings_cache = data self._embeddings_mtime = current_mtime return data except (json.JSONDecodeError, OSError): return None # ------------------------------------------------------------------------- # Index and State management # ------------------------------------------------------------------------- def _load_index(self) -> Dict: """Load _index.json, return empty structure if missing.""" if self.index_path.exists(): try: return json.loads(self.index_path.read_text()) except (json.JSONDecodeError, OSError): pass return {"version": 2, "updated": "", "count": 0, "entries": {}, "edges": {}} def _save_index(self, index: Dict): """Write _index.json.""" index["version"] = 2 index.setdefault("edges", {}) index["updated"] = datetime.now(timezone.utc).isoformat() index["count"] = len(index.get("entries", {})) self.index_path.write_text(json.dumps(index, indent=2, default=str)) def _load_state(self) -> Dict: """Load _state.json, return empty structure if missing.""" if self.state_path.exists(): try: return json.loads(self.state_path.read_text()) except (json.JSONDecodeError, OSError): pass return {"version": 1, "updated": "", "entries": {}} def _save_state(self, state: Dict): """Write _state.json.""" state["updated"] = datetime.now(timezone.utc).isoformat() self.state_path.write_text(json.dumps(state, indent=2, default=str)) # ------------------------------------------------------------------------- # File I/O # ------------------------------------------------------------------------- def _read_memory_file(self, path: Path) -> Tuple[Dict[str, Any], str]: """Read a memory markdown file, return (frontmatter, body).""" text = path.read_text(encoding="utf-8") return parse_frontmatter(text) def _write_memory_file(self, path: Path, frontmatter: Dict[str, Any], body: str): """Write a memory markdown file with frontmatter and body.""" fm_str = serialize_frontmatter(frontmatter) content = f"{fm_str}\n\n{body.strip()}\n" if body.strip() else f"{fm_str}\n" path.write_text(content, encoding="utf-8") def _resolve_memory_path(self, memory_id: str) -> Optional[Path]: """Find the file path for a memory by ID using the index.""" index = self._load_index() entry = index.get("entries", {}).get(memory_id) if entry: path = self.memory_dir / entry["path"] if path.exists(): return path # Fallback: scan files (slow but reliable) return self._scan_for_memory(memory_id) def _scan_for_memory(self, memory_id: str) -> Optional[Path]: """Scan graph/ and vault/ directories for a memory file by ID.""" search_dirs = [self.memory_dir / "graph", self.memory_dir / "vault"] for search_dir in search_dirs: if not search_dir.exists(): continue for md_file in search_dir.rglob("*.md"): try: fm, _ = self._read_memory_file(md_file) if fm.get("id") == memory_id: return md_file except Exception: continue return None # ------------------------------------------------------------------------- # Git operations # ------------------------------------------------------------------------- def _git_commit(self, message: str, files: Optional[List[Path]] = None): """Stage and commit files in the memory repo.""" try: # Check if it's a git repo result = subprocess.run( ["git", "rev-parse", "--git-dir"], cwd=str(self.memory_dir), capture_output=True, timeout=5, ) if result.returncode != 0: return # Not a git repo, skip if files: for f in files: rel = f.relative_to(self.memory_dir) subprocess.run( ["git", "add", str(rel)], cwd=str(self.memory_dir), capture_output=True, timeout=5, ) else: subprocess.run( ["git", "add", "-A"], cwd=str(self.memory_dir), capture_output=True, timeout=5, ) subprocess.run( ["git", "commit", "-m", message, "--allow-empty"], cwd=str(self.memory_dir), capture_output=True, timeout=10, ) except (subprocess.TimeoutExpired, FileNotFoundError, OSError): pass # Git operations are best-effort # ------------------------------------------------------------------------- # Index helpers # ------------------------------------------------------------------------- def _update_index_entry( self, memory_id: str, frontmatter: Dict[str, Any], rel_path: str, content_preview: str = "", ): """Add or update an entry in the index.""" index = self._load_index() entry = { "title": frontmatter.get("title", ""), "type": frontmatter.get("type", "general"), "tags": frontmatter.get("tags", []), "importance": frontmatter.get("importance", 0.5), "confidence": frontmatter.get("confidence", 0.8), "created": frontmatter.get("created", ""), "updated": frontmatter.get("updated", ""), "path": rel_path, "relations": frontmatter.get("relations", []), "content_preview": content_preview, } # Preserve existing content_preview if not provided if not content_preview: existing = index.get("entries", {}).get(memory_id, {}) entry["content_preview"] = existing.get("content_preview", "") index.setdefault("entries", {})[memory_id] = entry self._save_index(index) def _remove_index_entry(self, memory_id: str): """Remove an entry from the index.""" index = self._load_index() index.get("entries", {}).pop(memory_id, None) self._save_index(index) def _maybe_refresh_decay(self): """Auto-refresh all decay scores if state is older than 24 hours.""" if not self.state_path.exists(): self.decay() return try: state = json.loads(self.state_path.read_text()) updated_str = state.get("updated", "") if not updated_str: self.decay() return updated_dt = datetime.fromisoformat(updated_str.replace("Z", "+00:00")) if updated_dt.tzinfo is None: updated_dt = updated_dt.replace(tzinfo=timezone.utc) age_hours = (datetime.now(timezone.utc) - updated_dt).total_seconds() / 3600 if age_hours > 24: self.decay() except (json.JSONDecodeError, ValueError, OSError): self.decay() # ------------------------------------------------------------------------- # Edge index helpers # ------------------------------------------------------------------------- def _update_edge_index( self, edge_id: str, edge_data: Dict[str, Any], rel_path: str ): """Add or update an edge entry in the index.""" index = self._load_index() index.setdefault("edges", {})[edge_id] = { "type": edge_data.get("type", ""), "from_id": edge_data.get("from_id", ""), "from_title": edge_data.get("from_title", ""), "to_id": edge_data.get("to_id", ""), "to_title": edge_data.get("to_title", ""), "strength": edge_data.get("strength", 0.8), "created": edge_data.get("created", ""), "updated": edge_data.get("updated", ""), "path": rel_path, } self._save_index(index) def _remove_edge_index(self, edge_id: str): """Remove an edge entry from the index.""" index = self._load_index() index.get("edges", {}).pop(edge_id, None) self._save_index(index) def _scan_for_edge(self, edge_id: str) -> Optional[Path]: """Fallback file scan for an edge by ID if index is stale.""" edges_dir = self.memory_dir / "graph" / EDGES_DIR_NAME if not edges_dir.exists(): return None for md_file in edges_dir.glob("*.md"): try: fm, _ = self._read_memory_file(md_file) if fm.get("id") == edge_id: return md_file except Exception: continue return None def _resolve_edge_path(self, edge_id: str) -> Optional[Path]: """Find the file path for an edge by ID using the index.""" index = self._load_index() entry = index.get("edges", {}).get(edge_id) if entry: path = self.memory_dir / entry["path"] if path.exists(): return path return self._scan_for_edge(edge_id) # ========================================================================= # PUBLIC API # ========================================================================= def store( self, type: str, title: str, content: str, tags: Optional[List[str]] = None, importance: float = 0.5, confidence: float = 0.8, steps: Optional[List[str]] = None, preconditions: Optional[List[str]] = None, postconditions: Optional[List[str]] = None, ) -> str: """Store a new memory as a markdown file. Returns the memory UUID. """ if type not in VALID_TYPES: raise ValueError(f"Invalid type: {type}. Valid: {VALID_TYPES}") memory_id = str(uuid.uuid4()) now = datetime.now(timezone.utc).isoformat() tags = [t.lower().strip() for t in (tags or [])] frontmatter = { "id": memory_id, "type": type, "title": title, "tags": tags, "importance": max(0.0, min(1.0, importance)), "confidence": max(0.0, min(1.0, confidence)), "created": now, "updated": now, } # Add procedure-specific fields when type is "procedure" if type == "procedure": if steps: frontmatter["steps"] = steps if preconditions: frontmatter["preconditions"] = preconditions if postconditions: frontmatter["postconditions"] = postconditions # Determine file path type_dir = TYPE_DIRS[type] filename = make_filename(title, memory_id) rel_path = f"graph/{type_dir}/{filename}" full_path = self.memory_dir / rel_path # Write file self._write_memory_file(full_path, frontmatter, content) # Update index with content preview (truncate at word boundary) preview = content.strip()[:200] if len(content.strip()) > 200: last_space = preview.rfind(" ") if last_space > 0: preview = preview[:last_space] self._update_index_entry( memory_id, frontmatter, rel_path, content_preview=preview ) # Init state entry state = self._load_state() state.setdefault("entries", {})[memory_id] = { "access_count": 0, "last_accessed": now, "decay_score": importance * TYPE_WEIGHTS.get(type, 1.0) * 0.5, } self._save_state(state) # Git commit self._git_commit(f"store: {title}", [full_path]) return memory_id def recall( self, query: str, memory_types: Optional[List[str]] = None, limit: int = 10, semantic: bool = True, ) -> List[Dict[str, Any]]: """Search memories by query, ranked by relevance and decay score. When semantic=True and embeddings exist, merges keyword and semantic results. """ self._maybe_refresh_decay() query_lower = query.lower().strip() terms = query_lower.split() if not terms: return [] index = self._load_index() state = self._load_state() results = [] for mid, entry in index.get("entries", {}).items(): # Filter by type if memory_types and entry.get("type") not in memory_types: continue # Check decay threshold - skip archived s = state.get("entries", {}).get(mid, {}) decay = s.get("decay_score", 0.5) if decay < THRESHOLD_DORMANT: continue # Score based on term matches title = (entry.get("title") or "").lower() tags_str = " ".join(entry.get("tags") or []).lower() title_matches = sum(1 for t in terms if t in title) * 3 tag_matches = sum(1 for t in terms if t in tags_str) * 2 score = title_matches + tag_matches if score == 0: # Check content_preview from index (no file I/O needed) preview = (entry.get("content_preview") or "").lower() preview_matches = sum(1 for t in terms if t in preview) if preview_matches > 0: score = preview_matches else: continue # Weight by decay score weighted_score = score * (1 + decay) results.append( { "id": mid, "type": entry.get("type"), "title": entry.get("title"), "tags": entry.get("tags", []), "importance": entry.get("importance"), "decay_score": round(decay, 3), "path": entry.get("path"), "created": entry.get("created"), "_score": weighted_score, } ) results.sort(key=lambda x: x.pop("_score", 0), reverse=True) keyword_results = results[:limit] # Merge with semantic results (on by default) # Weights: semantic 60%, keyword 40% # Conceptual matching dominates; keyword acts as precision boost for exact terms if semantic: sem_results = self.semantic_recall(query, limit=limit) if sem_results: score_map: Dict[str, float] = {} result_map: Dict[str, Dict] = {} # Keyword: normalize rank to 0-1 (rank 1 = 1.0, last = ~0.1) kw_weight = 0.4 for i, r in enumerate(keyword_results): mid = r["id"] normalized = (limit - i) / limit score_map[mid] = normalized * kw_weight result_map[mid] = r # Semantic: similarity is already 0-1 sem_weight = 0.6 for r in sem_results: mid = r["id"] sim = r.get("similarity", 0.0) sem_score = sim * sem_weight if mid in score_map: score_map[mid] += sem_score result_map[mid]["similarity"] = sim else: score_map[mid] = sem_score idx_entry = index.get("entries", {}).get(mid, {}) s = state.get("entries", {}).get(mid, {}) result_map[mid] = { "id": mid, "type": r.get("type"), "title": r.get("title"), "tags": r.get("tags", []), "importance": idx_entry.get("importance"), "decay_score": round(s.get("decay_score", 0.5), 3), "similarity": sim, "path": r.get("path"), "created": idx_entry.get("created"), } # Sort by merged score merged = sorted( result_map.values(), key=lambda x: score_map.get(x["id"], 0), reverse=True, ) return merged[:limit] return keyword_results def get(self, memory_id: str) -> Optional[Dict[str, Any]]: """Get a memory by ID, update access count.""" path = self._resolve_memory_path(memory_id) if not path: return None fm, body = self._read_memory_file(path) # Update access count in state state = self._load_state() now = datetime.now(timezone.utc).isoformat() entry = state.setdefault("entries", {}).setdefault( memory_id, { "access_count": 0, "last_accessed": now, "decay_score": 0.5, }, ) entry["access_count"] = entry.get("access_count", 0) + 1 entry["last_accessed"] = now # Recalculate decay score for this single memory (just accessed, so days=0) importance = fm.get("importance", 0.5) mem_type = fm.get("type", "general") type_weight = TYPE_WEIGHTS.get(mem_type, 1.0) entry["decay_score"] = round( calculate_decay_score(importance, 0, entry["access_count"], type_weight), 4 ) self._save_state(state) return { "id": fm.get("id", memory_id), "type": fm.get("type"), "title": fm.get("title"), "content": body, "tags": fm.get("tags", []), "importance": fm.get("importance"), "confidence": fm.get("confidence"), "created": fm.get("created"), "updated": fm.get("updated"), "relations": fm.get("relations", []), "steps": fm.get("steps", []), "preconditions": fm.get("preconditions", []), "postconditions": fm.get("postconditions", []), "access_count": entry["access_count"], "decay_score": entry.get("decay_score", 0.5), "path": str(path.relative_to(self.memory_dir)), } def relate( self, from_id: str, to_id: str, rel_type: str, strength: float = 0.8, context: Optional[str] = None, description: Optional[str] = None, ) -> str: """Create a relationship between two memories with an edge file. Returns edge_id string, or empty string if duplicate. """ if rel_type not in VALID_RELATION_TYPES: raise ValueError( f"Invalid relation type: {rel_type}. Valid: {VALID_RELATION_TYPES}" ) from_path = self._resolve_memory_path(from_id) to_path = self._resolve_memory_path(to_id) if not from_path or not to_path: raise ValueError(f"Memory not found: {from_id if not from_path else to_id}") # Read source memory fm, body = self._read_memory_file(from_path) relations = fm.get("relations", []) # Check for duplicate for r in relations: if r.get("target") == to_id and r.get("type") == rel_type: return "" # Already exists # Read target memory for title to_fm, to_body = self._read_memory_file(to_path) # Create edge file edge_id = str(uuid.uuid4()) now = datetime.now(timezone.utc).isoformat() from_title = fm.get("title", from_id[:8]) to_title = to_fm.get("title", to_id[:8]) clamped_strength = max(0.0, min(1.0, strength)) edge_data = { "id": edge_id, "type": rel_type, "from_id": from_id, "from_title": from_title, "to_id": to_id, "to_title": to_title, "strength": clamped_strength, "created": now, "updated": now, } edge_filename = _make_edge_filename(from_title, rel_type, to_title, edge_id) edge_path = self.memory_dir / "graph" / EDGES_DIR_NAME / edge_filename edge_fm_str = serialize_edge_frontmatter(edge_data) edge_body = description.strip() if description else "" edge_content = ( f"{edge_fm_str}\n\n{edge_body}\n" if edge_body else f"{edge_fm_str}\n" ) edge_path.write_text(edge_content, encoding="utf-8") # Update source memory frontmatter with edge_id new_rel = { "target": to_id, "type": rel_type, "direction": "outgoing", "strength": clamped_strength, "edge_id": edge_id, } if context: new_rel["context"] = context relations.append(new_rel) fm["relations"] = relations fm["updated"] = now self._write_memory_file(from_path, fm, body) # Add incoming relation to target with edge_id to_relations = to_fm.get("relations", []) has_incoming = any( r.get("target") == from_id and r.get("type") == rel_type and r.get("direction") == "incoming" for r in to_relations ) if not has_incoming: incoming_rel = { "target": from_id, "type": rel_type, "direction": "incoming", "strength": clamped_strength, "edge_id": edge_id, } if context: incoming_rel["context"] = context to_relations.append(incoming_rel) to_fm["relations"] = to_relations to_fm["updated"] = now self._write_memory_file(to_path, to_fm, to_body) # Update memory index rel_from = str(from_path.relative_to(self.memory_dir)) rel_to = str(to_path.relative_to(self.memory_dir)) self._update_index_entry(from_id, fm, rel_from) self._update_index_entry(to_id, to_fm, rel_to) # Update edge index self._update_edge_index( edge_id, edge_data, f"graph/{EDGES_DIR_NAME}/{edge_filename}" ) self._git_commit( f"relate: {from_id[:8]} --{rel_type}--> {to_id[:8]}", [from_path, to_path, edge_path], ) return edge_id def edge_get(self, edge_id: str) -> Optional[Dict[str, Any]]: """Read full edge file (frontmatter + body).""" path = self._resolve_edge_path(edge_id) if not path: return None fm, body = self._read_memory_file(path) return { "id": fm.get("id", edge_id), "type": fm.get("type", ""), "from_id": fm.get("from_id", ""), "from_title": fm.get("from_title", ""), "to_id": fm.get("to_id", ""), "to_title": fm.get("to_title", ""), "strength": fm.get("strength", 0.8), "created": fm.get("created", ""), "updated": fm.get("updated", ""), "description": body.strip(), "path": str(path.relative_to(self.memory_dir)), } def edge_search( self, query: Optional[str] = None, types: Optional[List[str]] = None, from_id: Optional[str] = None, to_id: Optional[str] = None, limit: int = 20, ) -> List[Dict[str, Any]]: """Search edges via index.""" index = self._load_index() results = [] query_lower = query.lower().strip() if query else "" for eid, entry in index.get("edges", {}).items(): if types and entry.get("type") not in types: continue if from_id and entry.get("from_id") != from_id: continue if to_id and entry.get("to_id") != to_id: continue if query_lower: searchable = f"{entry.get('from_title', '')} {entry.get('to_title', '')} {entry.get('type', '')}".lower() if query_lower not in searchable: continue results.append({"id": eid, **entry}) results.sort(key=lambda x: x.get("created", ""), reverse=True) return results[:limit] def edge_update( self, edge_id: str, description: Optional[str] = None, strength: Optional[float] = None, ) -> bool: """Update edge body/metadata, sync strength to memory frontmatter.""" path = self._resolve_edge_path(edge_id) if not path: return False fm, body = self._read_memory_file(path) now = datetime.now(timezone.utc).isoformat() if description is not None: body = description if strength is not None: fm["strength"] = max(0.0, min(1.0, strength)) fm["updated"] = now # Write edge file edge_fm_str = serialize_edge_frontmatter(fm) edge_body = body.strip() if body else "" edge_content = ( f"{edge_fm_str}\n\n{edge_body}\n" if edge_body else f"{edge_fm_str}\n" ) path.write_text(edge_content, encoding="utf-8") # Sync strength to memory frontmatter if changed if strength is not None: for mid_key in ("from_id", "to_id"): mid = fm.get(mid_key) if not mid: continue mem_path = self._resolve_memory_path(mid) if not mem_path: continue mem_fm, mem_body = self._read_memory_file(mem_path) for rel in mem_fm.get("relations", []): if rel.get("edge_id") == edge_id: rel["strength"] = fm["strength"] mem_fm["updated"] = now self._write_memory_file(mem_path, mem_fm, mem_body) # Update edge index rel_path = str(path.relative_to(self.memory_dir)) self._update_edge_index(edge_id, fm, rel_path) self._git_commit(f"edge-update: {edge_id[:8]}", [path]) return True def edge_delete(self, edge_id: str) -> bool: """Remove edge file and clean frontmatter refs from both memories.""" path = self._resolve_edge_path(edge_id) if not path: return False fm, _ = self._read_memory_file(path) now = datetime.now(timezone.utc).isoformat() files_to_commit: List[Path] = [] # Clean edge_id references from both memories for mid_key in ("from_id", "to_id"): mid = fm.get(mid_key) if not mid: continue mem_path = self._resolve_memory_path(mid) if not mem_path: continue mem_fm, mem_body = self._read_memory_file(mem_path) original_rels = mem_fm.get("relations", []) mem_fm["relations"] = [ r for r in original_rels if r.get("edge_id") != edge_id ] if len(mem_fm["relations"]) != len(original_rels): mem_fm["updated"] = now self._write_memory_file(mem_path, mem_fm, mem_body) rel_p = str(mem_path.relative_to(self.memory_dir)) self._update_index_entry(mid, mem_fm, rel_p) files_to_commit.append(mem_path) # Remove edge file path.unlink() self._remove_edge_index(edge_id) # Git stage deletion try: rel_path = path.relative_to(self.memory_dir) subprocess.run( ["git", "rm", "--cached", str(rel_path)], cwd=str(self.memory_dir), capture_output=True, timeout=5, ) except Exception: pass self._git_commit(f"edge-delete: {edge_id[:8]}") return True def search( self, query: Optional[str] = None, memory_types: Optional[List[str]] = None, tags: Optional[List[str]] = None, min_importance: Optional[float] = None, limit: int = 20, ) -> List[Dict[str, Any]]: """Filter memories by type, tags, importance. Optional text query.""" self._maybe_refresh_decay() index = self._load_index() state = self._load_state() query_terms = query.lower().strip().split() if query else [] filter_tags = set(t.lower() for t in tags) if tags else None results = [] for mid, entry in index.get("entries", {}).items(): # Type filter if memory_types and entry.get("type") not in memory_types: continue # Importance filter if min_importance and entry.get("importance", 0) < min_importance: continue # Tag filter if filter_tags: mem_tags = set(t.lower() for t in entry.get("tags", [])) if not mem_tags.intersection(filter_tags): continue # Query filter if query_terms: title = (entry.get("title") or "").lower() tags_str = " ".join(entry.get("tags") or []).lower() searchable = f"{title} {tags_str}" if not any(t in searchable for t in query_terms): # Check content_preview from index (no file I/O needed) preview = (entry.get("content_preview") or "").lower() if not any(t in preview for t in query_terms): continue s = state.get("entries", {}).get(mid, {}) results.append( { "id": mid, "type": entry.get("type"), "title": entry.get("title"), "tags": entry.get("tags", []), "importance": entry.get("importance"), "decay_score": round(s.get("decay_score", 0.5), 3), "path": entry.get("path"), "created": entry.get("created"), } ) # Sort by importance descending results.sort(key=lambda x: x.get("importance", 0), reverse=True) return results[:limit] def update( self, memory_id: str, title: Optional[str] = None, content: Optional[str] = None, tags: Optional[List[str]] = None, importance: Optional[float] = None, ) -> bool: """Update fields of an existing memory.""" path = self._resolve_memory_path(memory_id) if not path: return False fm, body = self._read_memory_file(path) now = datetime.now(timezone.utc).isoformat() if title is not None: fm["title"] = title if tags is not None: fm["tags"] = [t.lower().strip() for t in tags] if importance is not None: fm["importance"] = max(0.0, min(1.0, importance)) if content is not None: body = content fm["updated"] = now self._write_memory_file(path, fm, body) # Update index (refresh content_preview if content changed) rel_path = str(path.relative_to(self.memory_dir)) preview = "" if content is not None: preview = body.strip()[:200] if len(body.strip()) > 200: last_space = preview.rfind(" ") if last_space > 0: preview = preview[:last_space] self._update_index_entry(memory_id, fm, rel_path, content_preview=preview) self._git_commit(f"update: {fm.get('title', memory_id[:8])}", [path]) return True def delete(self, memory_id: str) -> bool: """Delete a memory file, cascade-delete edges, remove from index.""" path = self._resolve_memory_path(memory_id) if not path: return False fm, _ = self._read_memory_file(path) title = fm.get("title", memory_id[:8]) # Cascade-delete edges where from_id or to_id matches index = self._load_index() edges_to_delete = [] for eid, edata in index.get("edges", {}).items(): if edata.get("from_id") == memory_id or edata.get("to_id") == memory_id: edges_to_delete.append(eid) for eid in edges_to_delete: self.edge_delete(eid) # Remove file path.unlink() # Remove from index and state self._remove_index_entry(memory_id) state = self._load_state() state.get("entries", {}).pop(memory_id, None) self._save_state(state) # Remove incoming references from other memories index = self._load_index() for mid, entry in index.get("entries", {}).items(): rels = entry.get("relations", []) original_count = len(rels) rels = [r for r in rels if r.get("target") != memory_id] if len(rels) != original_count: entry["relations"] = rels other_path = self._resolve_memory_path(mid) if other_path: try: other_fm, other_body = self._read_memory_file(other_path) other_fm["relations"] = [ r for r in other_fm.get("relations", []) if r.get("target") != memory_id ] self._write_memory_file(other_path, other_fm, other_body) except Exception: pass self._save_index(index) # Git: stage deletion try: rel_path = path.relative_to(self.memory_dir) subprocess.run( ["git", "rm", "--cached", str(rel_path)], cwd=str(self.memory_dir), capture_output=True, timeout=5, ) except Exception: pass self._git_commit(f"delete: {title}") return True def related( self, memory_id: str, rel_types: Optional[List[str]] = None, max_depth: int = 1, ) -> List[Dict[str, Any]]: """Traverse relations from a memory, depth-limited BFS.""" max_depth = max(1, min(5, max_depth)) index = self._load_index() visited = set() results = [] def traverse(mid: str, depth: int): if depth > max_depth or mid in visited: return visited.add(mid) entry = index.get("entries", {}).get(mid) if not entry: return for rel in entry.get("relations", []): target_id = rel.get("target") if not target_id or target_id in visited: continue if rel_types and rel.get("type") not in rel_types: continue target_entry = index.get("entries", {}).get(target_id) if target_entry: results.append( { "id": target_id, "type": target_entry.get("type"), "title": target_entry.get("title"), "relationship": rel.get("type"), "direction": rel.get("direction", "outgoing"), "strength": rel.get("strength"), "depth": depth, } ) traverse(target_id, depth + 1) traverse(memory_id, 1) return results def stats(self) -> Dict[str, Any]: """Get statistics about the memory system.""" index = self._load_index() state = self._load_state() entries = index.get("entries", {}) by_type = {} total_relations = 0 for entry in entries.values(): t = entry.get("type", "unknown") by_type[t] = by_type.get(t, 0) + 1 total_relations += len(entry.get("relations", [])) # Count files per directory dir_counts = {} for type_dir in TYPE_DIRS.values(): d = self.memory_dir / "graph" / type_dir if d.exists(): dir_counts[type_dir] = len(list(d.glob("*.md"))) vault_count = ( len(list((self.memory_dir / "vault").glob("*.md"))) if (self.memory_dir / "vault").exists() else 0 ) # Decay stats state_entries = state.get("entries", {}) active = sum( 1 for s in state_entries.values() if s.get("decay_score", 0) >= THRESHOLD_ACTIVE ) fading = sum( 1 for s in state_entries.values() if THRESHOLD_FADING <= s.get("decay_score", 0) < THRESHOLD_ACTIVE ) dormant = sum( 1 for s in state_entries.values() if THRESHOLD_DORMANT <= s.get("decay_score", 0) < THRESHOLD_FADING ) archived = sum( 1 for s in state_entries.values() if s.get("decay_score", 0) < THRESHOLD_DORMANT ) # Unique outgoing relations only (avoid double-counting) unique_relations = total_relations // 2 if total_relations > 0 else 0 return { "total_memories": len(entries), "by_type": by_type, "dir_counts": dir_counts, "vault_count": vault_count, "total_relations": unique_relations, "decay_summary": { "active": active, "fading": fading, "dormant": dormant, "archived": archived, }, "memory_dir": str(self.memory_dir), } def recent(self, limit: int = 20) -> List[Dict[str, Any]]: """Get most recently created memories.""" index = self._load_index() entries = [ {"id": mid, **entry} for mid, entry in index.get("entries", {}).items() ] entries.sort(key=lambda x: x.get("created", ""), reverse=True) return entries[:limit] def decay(self) -> Dict[str, Any]: """Recalculate all decay scores. Updates _state.json.""" index = self._load_index() state = self._load_state() now = datetime.now(timezone.utc) updated_count = 0 for mid, entry in index.get("entries", {}).items(): s = state.setdefault("entries", {}).setdefault( mid, { "access_count": 0, "last_accessed": entry.get("created", now.isoformat()), "decay_score": 0.5, }, ) # Calculate days since last access last_str = s.get("last_accessed", entry.get("created", "")) try: last_dt = datetime.fromisoformat(last_str.replace("Z", "+00:00")) if last_dt.tzinfo is None: last_dt = last_dt.replace(tzinfo=timezone.utc) days = (now - last_dt).total_seconds() / 86400 except (ValueError, AttributeError): days = 30 # Default to 30 days if unparseable importance = entry.get("importance", 0.5) mem_type = entry.get("type", "general") type_weight = TYPE_WEIGHTS.get(mem_type, 1.0) access_count = s.get("access_count", 0) # Check if pinned (vault) path = entry.get("path", "") if path.startswith("vault/"): s["decay_score"] = 999.0 # Pinned memories never decay else: s["decay_score"] = round( calculate_decay_score(importance, days, access_count, type_weight), 4, ) updated_count += 1 self._save_state(state) # Summary state_entries = state.get("entries", {}) return { "updated": updated_count, "active": sum( 1 for s in state_entries.values() if s.get("decay_score", 0) >= THRESHOLD_ACTIVE ), "fading": sum( 1 for s in state_entries.values() if THRESHOLD_FADING <= s.get("decay_score", 0) < THRESHOLD_ACTIVE ), "dormant": sum( 1 for s in state_entries.values() if THRESHOLD_DORMANT <= s.get("decay_score", 0) < THRESHOLD_FADING ), "archived": sum( 1 for s in state_entries.values() if 0 < s.get("decay_score", 0) < THRESHOLD_DORMANT ), } def core(self) -> str: """Generate CORE.md from top memories by decay score.""" index = self._load_index() state = self._load_state() def _extract_summary(body: str, max_len: int = 80) -> str: """Extract first meaningful sentence from memory body.""" if not body or not body.strip(): return "" # Skip leading code blocks, headings, and list markers for ln in body.strip().split("\n"): stripped = ln.strip() if not stripped: continue if stripped.startswith("```") or stripped.startswith("#"): continue first_line = stripped.lstrip("- *>").strip() break else: return "" if not first_line: return "" # Extract first sentence (split on '. ') dot_pos = first_line.find(". ") if 0 < dot_pos < max_len: sentence = first_line[: dot_pos + 1] else: sentence = first_line if len(sentence) > max_len: sentence = sentence[: max_len - 3].rstrip() + "..." return sentence # Collect all memories with decay scores memories = [] for mid, entry in index.get("entries", {}).items(): s = state.get("entries", {}).get(mid, {}) decay = s.get("decay_score", 0.5) if decay < THRESHOLD_FADING: continue # Skip low-relevance memories.append( { "id": mid, "title": entry.get("title", ""), "type": entry.get("type", "general"), "path": entry.get("path", ""), "importance": entry.get("importance", 0.5), "decay_score": decay, "tags": entry.get("tags", []), } ) # Sort by decay score descending memories.sort(key=lambda x: x["decay_score"], reverse=True) # Group by type category categories = { "Critical Solutions": ["solution"], "Active Decisions": ["decision"], "Key Fixes": ["fix"], "Configurations": ["configuration"], "Key Procedures": ["procedure"], "Patterns & Workflows": ["code_pattern", "workflow"], "Known Issues": ["problem", "error"], "Insights": ["insight"], "General": ["general"], } now_str = datetime.now(timezone.utc).strftime("%Y-%m-%d") active_count = sum(1 for m in memories if m["decay_score"] >= THRESHOLD_ACTIVE) total_count = len(index.get("entries", {})) lines = [ "# Memory Core (auto-generated)", f"> Last updated: {now_str} | Active memories: {active_count}/{total_count} | Next refresh: daily (systemd timer)", "", ] char_count = sum(len(l) for l in lines) for cat_name, cat_types in categories.items(): cat_memories = [m for m in memories if m["type"] in cat_types] if not cat_memories: continue section_lines = [f"## {cat_name}", ""] for mem in cat_memories[:10]: # Cap per section (reduced for summaries) path = mem["path"] title = mem["title"] tags = ", ".join(mem["tags"][:3]) if mem["tags"] else "" # Read body to extract summary summary = "" mem_path = self.memory_dir / path if mem_path.exists(): try: _, body = self._read_memory_file(mem_path) summary = _extract_summary(body) except Exception: pass line = f"- [{title}]({path})" if summary: line += f" - {summary}" if tags: line += f" ({tags})" section_lines.append(line) # Check budget added = sum(len(l) for l in section_lines) + 2 if char_count + added > CORE_MAX_CHARS: break section_lines.append("") section_text = "\n".join(section_lines) if char_count + len(section_text) > CORE_MAX_CHARS: break lines.extend(section_lines) char_count += len(section_text) core_content = "\n".join(lines) core_path = self.memory_dir / "CORE.md" core_path.write_text(core_content, encoding="utf-8") self._git_commit("core: regenerate CORE.md", [core_path]) return core_content def episode( self, type: str, title: str, tags: Optional[List[str]] = None, summary: Optional[str] = None, memory_link: Optional[str] = None, ): """Append an entry to today's episode file.""" now_local = datetime.now().astimezone() today = now_local.strftime("%Y-%m-%d") time_str = now_local.strftime("%H:%M") ep_path = self.memory_dir / "episodes" / f"{today}.md" tags = tags or [] tags_str = ", ".join(tags) entry_lines = [ f"## {time_str} - {title}", f"- **Type:** {type}", ] if tags_str: entry_lines.append(f"- **Tags:** {tags_str}") if memory_link: # Extract name from link path name = Path(memory_link).stem entry_lines.append(f"- **Memory:** [{name}]({memory_link})") if summary: entry_lines.append(f"- **Summary:** {summary}") entry_text = "\n".join(entry_lines) if ep_path.exists(): existing = ep_path.read_text(encoding="utf-8") new_content = f"{existing.rstrip()}\n\n{entry_text}\n" else: new_content = f"# {today}\n\n{entry_text}\n" ep_path.write_text(new_content, encoding="utf-8") self._git_commit(f"episode: {title}", [ep_path]) def reindex(self) -> int: """Rebuild _index.json from all markdown files. Recovery command.""" index = {"version": 2, "updated": "", "count": 0, "entries": {}, "edges": {}} count = 0 search_dirs = [ ("graph", self.memory_dir / "graph"), ("vault", self.memory_dir / "vault"), ] edges_dir = self.memory_dir / "graph" / EDGES_DIR_NAME for _prefix, search_dir in search_dirs: if not search_dir.exists(): continue for md_file in search_dir.rglob("*.md"): # Skip edge files — handled separately if edges_dir.exists() and md_file.parent == edges_dir: continue try: fm, body = self._read_memory_file(md_file) mid = fm.get("id") if not mid: continue rel_path = str(md_file.relative_to(self.memory_dir)) preview = body.strip()[:200] if len(body.strip()) > 200: last_space = preview.rfind(" ") if last_space > 0: preview = preview[:last_space] index["entries"][mid] = { "title": fm.get("title", ""), "type": fm.get("type", "general"), "tags": fm.get("tags", []), "importance": fm.get("importance", 0.5), "confidence": fm.get("confidence", 0.8), "created": fm.get("created", ""), "updated": fm.get("updated", ""), "path": rel_path, "relations": fm.get("relations", []), "content_preview": preview, } count += 1 except Exception as e: print(f"Warning: Failed to index {md_file}: {e}", file=sys.stderr) # Scan edge files if edges_dir.exists(): for md_file in edges_dir.glob("*.md"): try: fm, _ = self._read_memory_file(md_file) eid = fm.get("id") if not eid: continue rel_path = str(md_file.relative_to(self.memory_dir)) index["edges"][eid] = { "type": fm.get("type", ""), "from_id": fm.get("from_id", ""), "from_title": fm.get("from_title", ""), "to_id": fm.get("to_id", ""), "to_title": fm.get("to_title", ""), "strength": fm.get("strength", 0.8), "created": fm.get("created", ""), "updated": fm.get("updated", ""), "path": rel_path, } except Exception as e: print( f"Warning: Failed to index edge {md_file}: {e}", file=sys.stderr ) self._save_index(index) return count def pin(self, memory_id: str) -> bool: """Move a memory to vault/ (never decays).""" path = self._resolve_memory_path(memory_id) if not path: return False fm, body = self._read_memory_file(path) vault_dir = self.memory_dir / "vault" vault_dir.mkdir(parents=True, exist_ok=True) new_path = vault_dir / path.name self._write_memory_file(new_path, fm, body) # Remove old file old_rel = str(path.relative_to(self.memory_dir)) path.unlink() # Update index with new path new_rel = str(new_path.relative_to(self.memory_dir)) self._update_index_entry(memory_id, fm, new_rel) # Set decay to infinity in state state = self._load_state() state.setdefault("entries", {}).setdefault(memory_id, {})["decay_score"] = 999.0 self._save_state(state) self._git_commit(f"pin: {fm.get('title', memory_id[:8])}", [new_path]) return True # ------------------------------------------------------------------------- # Tag analysis # ------------------------------------------------------------------------- def tags_list(self, limit: int = 0) -> List[Dict[str, Any]]: """List all tags with occurrence counts across all memories. Returns list of {"tag": str, "count": int} sorted by count descending. """ index = self._load_index() tag_counts: Dict[str, int] = {} for entry in index.get("entries", {}).values(): for tag in entry.get("tags", []): normalized = tag.lower().strip() if normalized: tag_counts[normalized] = tag_counts.get(normalized, 0) + 1 results = [{"tag": tag, "count": count} for tag, count in tag_counts.items()] results.sort(key=lambda x: x["count"], reverse=True) if limit > 0: results = results[:limit] return results def tags_related(self, tag: str, limit: int = 0) -> List[Dict[str, Any]]: """Find tags that co-occur with the given tag. Returns list of {"tag": str, "co_occurrences": int, "memories_with_both": int} sorted by co_occurrences descending. """ tag = tag.lower().strip() index = self._load_index() co_counts: Dict[str, int] = {} for entry in index.get("entries", {}).values(): entry_tags = [t.lower().strip() for t in entry.get("tags", [])] if tag not in entry_tags: continue for other_tag in entry_tags: if other_tag != tag and other_tag: co_counts[other_tag] = co_counts.get(other_tag, 0) + 1 results = [ {"tag": t, "co_occurrences": count, "memories_with_both": count} for t, count in co_counts.items() ] results.sort(key=lambda x: x["co_occurrences"], reverse=True) if limit > 0: results = results[:limit] return results def tags_suggest(self, memory_id: str) -> List[Dict[str, Any]]: """Suggest tags for a memory based on co-occurrence patterns. Returns top 10 suggestions as {"tag": str, "score": int, "reason": str}. """ index = self._load_index() entry = index.get("entries", {}).get(memory_id) if not entry: return [] existing_tags = set(t.lower().strip() for t in entry.get("tags", [])) if not existing_tags: return [] # For each existing tag, find co-occurring tags and accumulate scores candidate_scores: Dict[str, int] = {} candidate_sources: Dict[str, set] = {} for existing_tag in existing_tags: co_tags = self.tags_related(existing_tag) for co_entry in co_tags: candidate = co_entry["tag"] if candidate in existing_tags: continue candidate_scores[candidate] = ( candidate_scores.get(candidate, 0) + co_entry["co_occurrences"] ) if candidate not in candidate_sources: candidate_sources[candidate] = set() candidate_sources[candidate].add(existing_tag) results = [] for candidate, score in candidate_scores.items(): sources = sorted(candidate_sources[candidate]) reason = "co-occurs with: " + ", ".join(sources) results.append({"tag": candidate, "score": score, "reason": reason}) results.sort(key=lambda x: x["score"], reverse=True) return results[:10] # ------------------------------------------------------------------------- # Embedding-based semantic search (hybrid: Ollama + OpenAI) # ------------------------------------------------------------------------- def _get_embedding_provider(self) -> Dict[str, Any]: """Load embedding config from _config.json.""" return _load_memory_config(self.memory_dir / "_config.json") def _embed_texts_with_fallback( self, texts: List[str], timeout: int = 300, ) -> Tuple[Optional[List[List[float]]], str, str]: """Embed texts with fallback chain. Returns (vectors, provider_used, model_used).""" config = self._get_embedding_provider() provider = config.get("embedding_provider", "ollama") # Try configured provider first if provider == "openai": api_key = config.get("openai_api_key") model = config.get("openai_model", OPENAI_MODEL_DEFAULT) if api_key: vectors = _openai_embed(texts, api_key, model, timeout=timeout) if vectors is not None: return vectors, "openai", model # Fallback to ollama ollama_model = config.get("ollama_model", EMBEDDING_MODEL) vectors = _ollama_embed(texts, model=ollama_model, timeout=timeout) if vectors is not None: return vectors, "ollama", ollama_model else: # ollama first ollama_model = config.get("ollama_model", EMBEDDING_MODEL) vectors = _ollama_embed(texts, model=ollama_model, timeout=timeout) if vectors is not None: return vectors, "ollama", ollama_model # Fallback to openai api_key = config.get("openai_api_key") model = config.get("openai_model", OPENAI_MODEL_DEFAULT) if api_key: vectors = _openai_embed(texts, api_key, model, timeout=timeout) if vectors is not None: return vectors, "openai", model return None, "", "" def embed(self, if_changed: bool = False) -> Dict[str, Any]: """Generate embeddings for all memories using configured provider. Detects provider changes and re-embeds everything (dimension mismatch safety). Stores vectors in _embeddings.json (not git-tracked). Args: if_changed: If True, skip embedding if the set of memory IDs hasn't changed since last run (no new/deleted memories). """ index = self._load_index() entries = index.get("entries", {}) if not entries: return { "embedded": 0, "provider": "none", "model": "", "path": str(EMBEDDINGS_PATH), } # Check for provider change embeddings_path = self.memory_dir / "_embeddings.json" old_provider = "" if embeddings_path.exists(): try: old_data = json.loads(embeddings_path.read_text()) old_provider = old_data.get("provider", "ollama") except (json.JSONDecodeError, OSError): pass config = self._get_embedding_provider() new_provider = config.get("embedding_provider", "ollama") provider_changed = old_provider and old_provider != new_provider if provider_changed: print( f"Provider changed ({old_provider} -> {new_provider}), re-embedding all memories...", file=sys.stderr, ) # Skip if nothing changed (unless provider switched) if if_changed and not provider_changed and embeddings_path.exists(): try: old_data = json.loads(embeddings_path.read_text()) embedded_ids = set(old_data.get("entries", {}).keys()) index_ids = set(entries.keys()) if embedded_ids == index_ids: return { "embedded": 0, "skipped": True, "reason": "no new or deleted memories", "path": str(embeddings_path), } except (json.JSONDecodeError, OSError): pass # Can't read old data, re-embed # Build texts to embed memory_ids = list(entries.keys()) texts = [] for mid in memory_ids: entry = entries[mid] title = entry.get("title", "") preview = entry.get("content_preview", "") texts.append(f"{title}. {preview}") # Batch embed in groups of 50 all_embeddings: Dict[str, List[float]] = {} batch_size = 50 provider_used = "" model_used = "" for i in range(0, len(texts), batch_size): batch_texts = texts[i : i + batch_size] batch_ids = memory_ids[i : i + batch_size] vectors, provider_used, model_used = self._embed_texts_with_fallback( batch_texts, timeout=300, ) if vectors is None: return { "error": "All embedding providers unavailable", "embedded": len(all_embeddings), } for mid, vec in zip(batch_ids, vectors): all_embeddings[mid] = vec # Write embeddings file with provider info embeddings_data = { "provider": provider_used, "model": model_used, "updated": datetime.now(timezone.utc).isoformat(), "entries": all_embeddings, } embeddings_path.write_text(json.dumps(embeddings_data, default=str)) return { "embedded": len(all_embeddings), "provider": provider_used, "model": model_used, "path": str(embeddings_path), } def semantic_recall(self, query: str, limit: int = 10) -> List[Dict[str, Any]]: """Search memories by semantic similarity using embeddings. Uses the same provider that generated stored embeddings to embed the query. Skips vectors with dimension mismatch as safety guard. """ emb_data = self._load_embeddings_cached() if emb_data is None: return [] stored = emb_data.get("entries", {}) if not stored: return [] # Embed query with matching provider stored_provider = emb_data.get("provider", "ollama") config = self._get_embedding_provider() query_vec = None if stored_provider == "openai": api_key = config.get("openai_api_key") model = emb_data.get("model", OPENAI_MODEL_DEFAULT) if api_key: vecs = _openai_embed([query], api_key, model) if vecs: query_vec = vecs[0] if query_vec is None and stored_provider == "ollama": stored_model = emb_data.get("model", EMBEDDING_MODEL) vecs = _ollama_embed([query], model=stored_model) if vecs: query_vec = vecs[0] # Last resort: try any available provider if query_vec is None: vecs, _, _ = self._embed_texts_with_fallback([query], timeout=30) if vecs: query_vec = vecs[0] if query_vec is None: return [] query_dim = len(query_vec) # Score all memories by cosine similarity index = self._load_index() scored = [] for mid, vec in stored.items(): # Skip dimension mismatch if len(vec) != query_dim: continue sim = _cosine_similarity(query_vec, vec) entry = index.get("entries", {}).get(mid) if entry: scored.append( { "id": mid, "title": entry.get("title", ""), "type": entry.get("type", "general"), "tags": entry.get("tags", []), "similarity": round(sim, 4), "path": entry.get("path", ""), } ) scored.sort(key=lambda x: x["similarity"], reverse=True) return scored[:limit] def reflect( self, since: Optional[str] = None, dry_run: bool = False, ) -> Dict[str, Any]: """Review recent memories, identify tag-based clusters, and output consolidation recommendations. Clusters memories that share 2+ tags using iterative union-find merging. Does NOT auto-create insights; the agent reads output and decides what to store. Args: since: ISO date string (YYYY-MM-DD) to filter memories from. Falls back to _state.json's last_reflection, then 30 days ago. dry_run: If True, return analysis without updating _state.json or logging episode. Returns: Dict with clusters, total_memories_reviewed, clusters_found, and since date. """ now = datetime.now(timezone.utc) state = self._load_state() # Determine time window if since: since_date = since elif state.get("last_reflection"): # last_reflection may be a full ISO timestamp; extract date portion since_date = state["last_reflection"][:10] else: since_dt = now - timedelta(days=30) since_date = since_dt.strftime("%Y-%m-%d") # Load recent memories from index index = self._load_index() recent_memories = [] for mid, entry in index.get("entries", {}).items(): created = entry.get("created", "") # Compare date portion only (YYYY-MM-DD) created_date = created[:10] if created else "" if created_date >= since_date: recent_memories.append( { "id": mid, "title": entry.get("title", ""), "type": entry.get("type", "general"), "tags": [t.lower().strip() for t in entry.get("tags", [])], } ) total_reviewed = len(recent_memories) # Union-find clustering by tag overlap >= 2 # parent[i] = index of parent in recent_memories list n = len(recent_memories) parent = list(range(n)) def find(x: int) -> int: while parent[x] != x: parent[x] = parent[parent[x]] # path compression x = parent[x] return x def union(a: int, b: int): ra, rb = find(a), find(b) if ra != rb: parent[rb] = ra # Compare all pairs, check tag overlap for i in range(n): tags_i = set(recent_memories[i]["tags"]) for j in range(i + 1, n): tags_j = set(recent_memories[j]["tags"]) if len(tags_i & tags_j) >= 2: union(i, j) # Group by root clusters_map: Dict[int, List[int]] = {} for i in range(n): root = find(i) clusters_map.setdefault(root, []).append(i) # Build output clusters (only clusters with 2+ members) clusters = [] cluster_id = 0 for indices in clusters_map.values(): if len(indices) < 2: continue cluster_id += 1 members = [] all_tag_sets = [] types_seen = set() for idx in indices: mem = recent_memories[idx] members.append( { "id": mem["id"], "title": mem["title"], "type": mem["type"], "tags": mem["tags"], } ) all_tag_sets.append(set(mem["tags"])) types_seen.add(mem["type"]) # Common tags = intersection of ALL members common_tags = ( sorted(set.intersection(*all_tag_sets)) if all_tag_sets else [] ) # Shared tags = tags appearing in 2+ members tag_counts: Dict[str, int] = {} for ts in all_tag_sets: for t in ts: tag_counts[t] = tag_counts.get(t, 0) + 1 shared_tags = sorted(t for t, c in tag_counts.items() if c >= 2) # Suggested topic tag_label = ( ", ".join(common_tags[:4]) if common_tags else ", ".join(shared_tags[:4]) ) type_label = ", ".join(sorted(types_seen)) suggested_topic = f"Pattern: {tag_label} across {type_label}" clusters.append( { "cluster_id": cluster_id, "members": members, "common_tags": common_tags, "shared_tags": shared_tags, "suggested_topic": suggested_topic, "member_count": len(members), } ) # Sort clusters by member count descending clusters.sort(key=lambda c: c["member_count"], reverse=True) # Re-number after sorting for i, c in enumerate(clusters): c["cluster_id"] = i + 1 result = { "clusters": clusters, "total_memories_reviewed": total_reviewed, "clusters_found": len(clusters), "since": since_date, } if not dry_run: # Update state with last_reflection timestamp state["last_reflection"] = now.isoformat() self._save_state(state) # Log an episode entry self.episode( type="reflection", title=f"Reflection: {len(clusters)} clusters from {total_reviewed} memories", tags=["reflection", "cognitive-memory"], summary=f"Reviewed {total_reviewed} memories since {since_date}, found {len(clusters)} clusters", ) # Regenerate REFLECTION.md self.reflection_summary() return result def merge( self, keep_id: str, absorb_id: str, dry_run: bool = False, ) -> Dict[str, Any]: """Merge two memories: absorb one into another. The 'keep' memory absorbs the content, tags, and relations from the 'absorb' memory. The absorb memory is then deleted. All other memories referencing absorb_id are updated to point to keep_id. If dry_run=True, returns what would change without writing anything. """ # Resolve both memory file paths keep_path = self._resolve_memory_path(keep_id) if not keep_path: raise ValueError(f"Keep memory not found: {keep_id}") absorb_path = self._resolve_memory_path(absorb_id) if not absorb_path: raise ValueError(f"Absorb memory not found: {absorb_id}") # Read both memory files keep_fm, keep_body = self._read_memory_file(keep_path) absorb_fm, absorb_body = self._read_memory_file(absorb_path) keep_title = keep_fm.get("title", keep_id[:8]) absorb_title = absorb_fm.get("title", absorb_id[:8]) # Combine content merged_body = keep_body if absorb_body.strip(): merged_body = ( keep_body.rstrip() + f"\n\n---\n*Merged from: {absorb_title}*\n\n" + absorb_body.strip() ) # Merge tags (sorted, deduplicated) keep_tags = keep_fm.get("tags", []) absorb_tags = absorb_fm.get("tags", []) merged_tags = sorted(list(set(keep_tags + absorb_tags))) # importance = max of both merged_importance = max( keep_fm.get("importance", 0.5), absorb_fm.get("importance", 0.5), ) # Merge relations: combine both relation lists keep_rels = list(keep_fm.get("relations", [])) absorb_rels = list(absorb_fm.get("relations", [])) combined_rels = keep_rels + absorb_rels # Replace any relation targeting absorb_id with keep_id for rel in combined_rels: if rel.get("target") == absorb_id: rel["target"] = keep_id # Deduplicate by (target, type) tuple seen = set() deduped_rels = [] for rel in combined_rels: key = (rel.get("target"), rel.get("type")) if key not in seen: seen.add(key) deduped_rels.append(rel) # Remove self-referential relations (target == keep_id) merged_rels = [r for r in deduped_rels if r.get("target") != keep_id] # Scan all other memories for relations targeting absorb_id index = self._load_index() updated_others = [] for mid, entry in index.get("entries", {}).items(): if mid in (keep_id, absorb_id): continue rels = entry.get("relations", []) needs_update = any(r.get("target") == absorb_id for r in rels) if needs_update: updated_others.append(mid) if dry_run: return { "dry_run": True, "keep_id": keep_id, "keep_title": keep_title, "absorb_id": absorb_id, "absorb_title": absorb_title, "merged_tags": merged_tags, "merged_importance": merged_importance, "merged_relations_count": len(merged_rels), "other_memories_updated": len(updated_others), "other_memory_ids": updated_others, } # Write updated keep memory file now = datetime.now(timezone.utc).isoformat() keep_fm["tags"] = merged_tags keep_fm["importance"] = merged_importance keep_fm["relations"] = merged_rels keep_fm["updated"] = now self._write_memory_file(keep_path, keep_fm, merged_body) # Update index for keep memory (refresh content_preview with merged body) keep_rel_path = str(keep_path.relative_to(self.memory_dir)) preview = merged_body.strip()[:200] if len(merged_body.strip()) > 200: last_space = preview.rfind(" ") if last_space > 0: preview = preview[:last_space] self._update_index_entry( keep_id, keep_fm, keep_rel_path, content_preview=preview ) # Update all other memories that reference absorb_id for mid in updated_others: other_path = self._resolve_memory_path(mid) if not other_path: continue try: other_fm, other_body = self._read_memory_file(other_path) other_rels = other_fm.get("relations", []) for rel in other_rels: if rel.get("target") == absorb_id: rel["target"] = keep_id # Deduplicate after replacement seen_other = set() deduped_other = [] for rel in other_rels: key = (rel.get("target"), rel.get("type")) if key not in seen_other: seen_other.add(key) deduped_other.append(rel) # Remove self-referential other_fm["relations"] = [ r for r in deduped_other if r.get("target") != mid ] other_fm["updated"] = now self._write_memory_file(other_path, other_fm, other_body) other_rel_path = str(other_path.relative_to(self.memory_dir)) self._update_index_entry(mid, other_fm, other_rel_path) except Exception: pass # Delete absorb memory file and remove from index/state absorb_path.unlink() self._remove_index_entry(absorb_id) state = self._load_state() state.get("entries", {}).pop(absorb_id, None) self._save_state(state) # Git: stage absorb deletion try: absorb_rel = absorb_path.relative_to(self.memory_dir) subprocess.run( ["git", "rm", "--cached", str(absorb_rel)], cwd=str(self.memory_dir), capture_output=True, timeout=5, ) except Exception: pass self._git_commit(f"merge: {keep_title} absorbed {absorb_title}") return { "success": True, "keep_id": keep_id, "keep_title": keep_title, "absorb_id": absorb_id, "absorb_title": absorb_title, "merged_tags": merged_tags, "merged_importance": merged_importance, "merged_relations_count": len(merged_rels), "other_memories_updated": len(updated_others), } def reflection_summary(self) -> str: """Generate REFLECTION.md with patterns and insights from the memory system. Writes to ~/.claude/memory/REFLECTION.md and git-commits the file. Returns the generated content. """ index = self._load_index() state = self._load_state() entries = index.get("entries", {}) state_entries = state.get("entries", {}) now_str = datetime.now(timezone.utc).strftime("%Y-%m-%d") # Last reflection date last_reflection = state.get("last_reflection", "") last_reflection_date = last_reflection[:10] if last_reflection else "never" # Count total reflections from episode files total_reflections = 0 episodes_dir = self.memory_dir / "episodes" if episodes_dir.exists(): for ep_file in sorted(episodes_dir.glob("*.md")): try: text = ep_file.read_text(encoding="utf-8") # Count lines that look like reflection episode entries for line in text.split("\n"): if "Reflection:" in line and line.strip().startswith("## "): total_reflections += 1 except OSError: pass lines = [ "# Reflection Summary (auto-generated)", f"> Last updated: {now_str} | Last reflection: {last_reflection_date} | Total reflections: {total_reflections}", "", ] # === Themes section === # Get top tags, then find co-occurrence pairs lines.append("## Themes") lines.append("Top tag co-occurrences revealing recurring themes:") lines.append("") top_tags = self.tags_list(limit=10) # Build co-occurrence pairs across top tags pair_data: Dict[Tuple[str, str], List[str]] = {} # (tag1, tag2) -> [titles] for tag_info in top_tags[:5]: tag = tag_info["tag"] co_tags = self.tags_related(tag, limit=5) for co in co_tags: other = co["tag"] # Canonical pair ordering pair = tuple(sorted([tag, other])) if pair in pair_data: continue # Find memories with both tags and collect titles titles = [] for mid, entry in entries.items(): mem_tags = [t.lower().strip() for t in entry.get("tags", [])] if pair[0] in mem_tags and pair[1] in mem_tags: titles.append(entry.get("title", "untitled")) if titles: pair_data[pair] = titles # Sort pairs by memory count descending, show top 8 sorted_pairs = sorted(pair_data.items(), key=lambda x: len(x[1]), reverse=True) for (tag_a, tag_b), titles in sorted_pairs[:8]: example_titles = ", ".join(f'"{t}"' for t in titles[:3]) lines.append( f"- **{tag_a} + {tag_b}**: {len(titles)} memories ({example_titles})" ) if not sorted_pairs: lines.append("- No co-occurrence data available yet.") lines.append("") # === Cross-Project Patterns section === lines.append("## Cross-Project Patterns") lines.append("Tags that span multiple projects:") lines.append("") known_projects = [ "major-domo", "paper-dynasty", "homelab", "vagabond-rpg", "foundryvtt", "strat-gameplay-webapp", ] # Build tag -> {project -> count} mapping tag_project_map: Dict[str, Dict[str, int]] = {} for mid, entry in entries.items(): mem_tags = [t.lower().strip() for t in entry.get("tags", [])] # Identify which projects this memory belongs to mem_projects = [t for t in mem_tags if t in known_projects] # For non-project tags, record which projects they appear in non_project_tags = [t for t in mem_tags if t not in known_projects] for tag in non_project_tags: if tag not in tag_project_map: tag_project_map[tag] = {} for proj in mem_projects: tag_project_map[tag][proj] = tag_project_map[tag].get(proj, 0) + 1 # Filter to tags spanning 2+ projects, sort by project count then total cross_project = [] for tag, projects in tag_project_map.items(): if len(projects) >= 2: total = sum(projects.values()) cross_project.append((tag, projects, total)) cross_project.sort(key=lambda x: (len(x[1]), x[2]), reverse=True) for tag, projects, total in cross_project[:10]: proj_parts = ", ".join( f"{p} ({c})" for p, c in sorted(projects.items(), key=lambda x: x[1], reverse=True) ) lines.append(f"- **{tag}**: appears in {proj_parts}") if not cross_project: lines.append("- No cross-project patterns detected yet.") lines.append("") # === Most Accessed section === lines.append("## Most Accessed") lines.append("Top 10 memories by access count:") lines.append("") # Sort state entries by access_count descending accessed = [] for mid, s in state_entries.items(): count = s.get("access_count", 0) if count > 0: entry = entries.get(mid) if entry: accessed.append( ( mid, entry.get("title", "untitled"), entry.get("path", ""), count, ) ) accessed.sort(key=lambda x: x[3], reverse=True) for mid, title, path, count in accessed[:10]: lines.append(f"1. [{title}]({path}) - {count} accesses") if not accessed: lines.append("- No access data recorded yet.") lines.append("") # === Recent Insights section === lines.append("## Recent Insights") lines.append("Insight-type memories:") lines.append("") insights = [] for mid, entry in entries.items(): if entry.get("type") == "insight": insights.append((mid, entry)) # Sort by created date descending insights.sort(key=lambda x: x[1].get("created", ""), reverse=True) for mid, entry in insights[:10]: title = entry.get("title", "untitled") path = entry.get("path", "") preview = entry.get("content_preview", "") if preview: lines.append(f"- [{title}]({path}) - {preview[:80]}") else: lines.append(f"- [{title}]({path})") if not insights: lines.append("- No insight memories stored yet.") lines.append("") # === Consolidation History section === lines.append("## Consolidation History") lines.append("") merge_count = 0 if episodes_dir.exists(): for ep_file in sorted(episodes_dir.glob("*.md")): try: text = ep_file.read_text(encoding="utf-8") for line_text in text.split("\n"): stripped = line_text.strip() if stripped.startswith("## ") and "merge" in stripped.lower(): merge_count += 1 except OSError: pass lines.append(f"- Total merges performed: {merge_count}") lines.append("") content = "\n".join(lines) # Write REFLECTION.md reflection_path = self.memory_dir / "REFLECTION.md" reflection_path.write_text(content, encoding="utf-8") self._git_commit("reflection: regenerate REFLECTION.md", [reflection_path]) return content # ============================================================================= # CLI INTERFACE # ============================================================================= def main(): parser = argparse.ArgumentParser( description="Cognitive Memory - Markdown-based memory system with decay scoring", formatter_class=argparse.RawDescriptionHelpFormatter, ) subparsers = parser.add_subparsers(dest="command", help="Commands") # store sp = subparsers.add_parser("store", help="Store a new memory") sp.add_argument( "--type", "-t", required=True, choices=VALID_TYPES, help="Memory type" ) sp.add_argument("--title", required=True, help="Memory title") sp.add_argument("--content", "-c", required=True, help="Memory content") sp.add_argument("--tags", help="Comma-separated tags") sp.add_argument( "--importance", "-i", type=float, default=0.5, help="Importance 0.0-1.0" ) sp.add_argument("--confidence", type=float, default=0.8, help="Confidence 0.0-1.0") sp.add_argument( "--episode", action="store_true", default=False, help="Also log an episode entry", ) # recall sp = subparsers.add_parser("recall", help="Search memories by query") sp.add_argument("query", help="Search query") sp.add_argument("--types", help="Comma-separated memory types") sp.add_argument("--limit", "-n", type=int, default=10, help="Max results") sp.add_argument( "--no-semantic", action="store_true", default=False, help="Disable semantic search (keyword-only, faster)", ) # get sp = subparsers.add_parser("get", help="Get memory by ID") sp.add_argument("memory_id", help="Memory UUID") # relate sp = subparsers.add_parser("relate", help="Create relationship") sp.add_argument("from_id", help="Source memory UUID") sp.add_argument("to_id", help="Target memory UUID") sp.add_argument("rel_type", choices=VALID_RELATION_TYPES, help="Relationship type") sp.add_argument("--strength", type=float, default=0.8, help="Strength 0.0-1.0") sp.add_argument("--context", help="Context description") sp.add_argument("--description", help="Rich edge description body") # edge-get sp = subparsers.add_parser("edge-get", help="Get edge by ID") sp.add_argument("edge_id", help="Edge UUID") # edge-search sp = subparsers.add_parser("edge-search", help="Search edges") sp.add_argument("--query", "-q", help="Text query") sp.add_argument("--types", help="Comma-separated relation types") sp.add_argument("--from-id", help="Filter by source memory ID") sp.add_argument("--to-id", help="Filter by target memory ID") sp.add_argument("--limit", "-n", type=int, default=20, help="Max results") # edge-update sp = subparsers.add_parser("edge-update", help="Update an edge") sp.add_argument("edge_id", help="Edge UUID") sp.add_argument("--description", help="New description body") sp.add_argument("--strength", type=float, help="New strength 0.0-1.0") # edge-delete sp = subparsers.add_parser("edge-delete", help="Delete an edge") sp.add_argument("edge_id", help="Edge UUID") # search sp = subparsers.add_parser("search", help="Filter memories") sp.add_argument("--query", "-q", help="Text query") sp.add_argument("--types", help="Comma-separated memory types") sp.add_argument("--tags", help="Comma-separated tags") sp.add_argument("--min-importance", type=float, help="Minimum importance") sp.add_argument("--limit", "-n", type=int, default=20, help="Max results") # update sp = subparsers.add_parser("update", help="Update a memory") sp.add_argument("memory_id", help="Memory UUID") sp.add_argument("--title", help="New title") sp.add_argument("--content", help="New content") sp.add_argument("--tags", help="New tags (comma-separated)") sp.add_argument("--importance", type=float, help="New importance") # delete sp = subparsers.add_parser("delete", help="Delete a memory") sp.add_argument("memory_id", help="Memory UUID") sp.add_argument("--force", "-f", action="store_true", help="Skip confirmation") # related sp = subparsers.add_parser("related", help="Get related memories") sp.add_argument("memory_id", help="Memory UUID") sp.add_argument("--types", help="Comma-separated relationship types") sp.add_argument("--depth", type=int, default=1, help="Traversal depth 1-5") # stats subparsers.add_parser("stats", help="Show statistics") # recent sp = subparsers.add_parser("recent", help="Recently created memories") sp.add_argument("--limit", "-n", type=int, default=20, help="Max results") # decay subparsers.add_parser("decay", help="Recalculate all decay scores") # core subparsers.add_parser("core", help="Generate CORE.md") # episode sp = subparsers.add_parser("episode", help="Log episode entry") sp.add_argument("--type", "-t", required=True, help="Entry type") sp.add_argument("--title", required=True, help="Entry title") sp.add_argument("--tags", help="Comma-separated tags") sp.add_argument("--summary", "-s", help="Summary text") sp.add_argument("--memory-link", help="Path to related memory file") # reindex subparsers.add_parser("reindex", help="Rebuild index from files") # embed embed_parser = subparsers.add_parser( "embed", help="Generate embeddings for all memories via Ollama" ) embed_parser.add_argument( "--if-changed", action="store_true", help="Skip if no memories were added or deleted since last embed", ) # pin sp = subparsers.add_parser("pin", help="Move memory to vault (never decays)") sp.add_argument("memory_id", help="Memory UUID") # reflect sp = subparsers.add_parser( "reflect", help="Review recent memories and identify clusters" ) sp.add_argument("--since", help="ISO date (YYYY-MM-DD) to review from") sp.add_argument( "--dry-run", action="store_true", help="Preview without updating state" ) # merge sp = subparsers.add_parser( "merge", help="Merge two memories (absorb one into another)" ) sp.add_argument("keep_id", help="Memory UUID to keep") sp.add_argument("absorb_id", help="Memory UUID to absorb and delete") sp.add_argument( "--dry-run", action="store_true", help="Preview merge without writing" ) # reflection subparsers.add_parser("reflection", help="Generate REFLECTION.md summary") # tags sp = subparsers.add_parser("tags", help="Tag analysis commands") tags_sub = sp.add_subparsers(dest="tags_command") sp2 = tags_sub.add_parser("list", help="List all tags with counts") sp2.add_argument("--limit", "-n", type=int, default=0, help="Max results (0=all)") sp3 = tags_sub.add_parser("related", help="Find co-occurring tags") sp3.add_argument("tag", help="Tag to analyze") sp3.add_argument("--limit", "-n", type=int, default=0, help="Max results (0=all)") sp4 = tags_sub.add_parser("suggest", help="Suggest tags for a memory") sp4.add_argument("memory_id", help="Memory UUID") # procedure sp = subparsers.add_parser( "procedure", help="Store a procedure memory (convenience wrapper)" ) sp.add_argument("--title", required=True, help="Procedure title") sp.add_argument("--content", "-c", required=True, help="Procedure description") sp.add_argument("--steps", help="Comma-separated ordered steps") sp.add_argument("--preconditions", help="Comma-separated preconditions") sp.add_argument("--postconditions", help="Comma-separated postconditions") sp.add_argument("--tags", help="Comma-separated tags") sp.add_argument( "--importance", "-i", type=float, default=0.5, help="Importance 0.0-1.0" ) # config sp = subparsers.add_parser("config", help="Manage embedding config") sp.add_argument("--show", action="store_true", help="Display current config") sp.add_argument( "--provider", choices=["ollama", "openai"], help="Set embedding provider" ) sp.add_argument("--openai-key", help="Set OpenAI API key") sp.add_argument( "--ollama-model", help="Set Ollama model name (e.g. qwen3-embedding:8b)" ) args = parser.parse_args() if not args.command: parser.print_help() sys.exit(1) client = CognitiveMemoryClient() result = None if args.command == "store": tags = [t.strip() for t in args.tags.split(",")] if args.tags else None memory_id = client.store( type=args.type, title=args.title, content=args.content, tags=tags, importance=args.importance, confidence=args.confidence, ) result = {"success": True, "memory_id": memory_id} if args.episode: # Get the relative path from the index for memory_link index = client._load_index() entry = index.get("entries", {}).get(memory_id, {}) rel_path = entry.get("path", "") # Truncate content at word boundary for summary (max 100 chars) summary = args.content.strip()[:100] if len(args.content.strip()) > 100: last_space = summary.rfind(" ") if last_space > 0: summary = summary[:last_space] client.episode( type=args.type, title=args.title, tags=tags or [], summary=summary, memory_link=rel_path, ) result["episode_logged"] = True elif args.command == "recall": types = [t.strip() for t in args.types.split(",")] if args.types else None result = client.recall( args.query, memory_types=types, limit=args.limit, semantic=not args.no_semantic, ) elif args.command == "get": result = client.get(args.memory_id) if not result: result = {"error": "Memory not found"} elif args.command == "relate": edge_id = client.relate( args.from_id, args.to_id, args.rel_type, strength=args.strength, context=args.context, description=args.description, ) result = {"success": bool(edge_id), "edge_id": edge_id} elif args.command == "edge-get": result = client.edge_get(args.edge_id) if not result: result = {"error": "Edge not found"} elif args.command == "edge-search": types = [t.strip() for t in args.types.split(",")] if args.types else None result = client.edge_search( query=args.query, types=types, from_id=getattr(args, "from_id", None), to_id=getattr(args, "to_id", None), limit=args.limit, ) elif args.command == "edge-update": success = client.edge_update( args.edge_id, description=args.description, strength=args.strength, ) result = {"success": success} elif args.command == "edge-delete": success = client.edge_delete(args.edge_id) result = {"success": success} elif args.command == "search": types = [t.strip() for t in args.types.split(",")] if args.types else None tags = [t.strip() for t in args.tags.split(",")] if args.tags else None result = client.search( query=args.query, memory_types=types, tags=tags, min_importance=args.min_importance, limit=args.limit, ) elif args.command == "update": tags = [t.strip() for t in args.tags.split(",")] if args.tags else None success = client.update( args.memory_id, title=args.title, content=args.content, tags=tags, importance=args.importance, ) result = {"success": success} elif args.command == "delete": if not args.force: mem = client.get(args.memory_id) if mem: print(f"Deleting: {mem.get('title')}", file=sys.stderr) success = client.delete(args.memory_id) result = {"success": success} elif args.command == "related": types = [t.strip() for t in args.types.split(",")] if args.types else None result = client.related(args.memory_id, rel_types=types, max_depth=args.depth) elif args.command == "stats": result = client.stats() elif args.command == "recent": result = client.recent(limit=args.limit) elif args.command == "decay": result = client.decay() elif args.command == "core": content = client.core() # Print path, not content (content is written to file) result = { "success": True, "path": str(MEMORY_DIR / "CORE.md"), "chars": len(content), } elif args.command == "episode": tags = [t.strip() for t in args.tags.split(",")] if args.tags else None client.episode( type=args.type, title=args.title, tags=tags, summary=args.summary, memory_link=args.memory_link, ) result = {"success": True} elif args.command == "reindex": count = client.reindex() result = {"success": True, "indexed": count} elif args.command == "embed": if_changed = getattr(args, "if_changed", False) if not if_changed: print( "Generating embeddings (this may take a while if model needs to be pulled)...", file=sys.stderr, ) result = client.embed(if_changed=if_changed) elif args.command == "pin": success = client.pin(args.memory_id) result = {"success": success} elif args.command == "reflect": result = client.reflect( since=args.since, dry_run=args.dry_run, ) elif args.command == "merge": result = client.merge( keep_id=args.keep_id, absorb_id=args.absorb_id, dry_run=args.dry_run, ) elif args.command == "reflection": content = client.reflection_summary() result = { "success": True, "path": str(MEMORY_DIR / "REFLECTION.md"), "chars": len(content), } elif args.command == "tags": if args.tags_command == "list": result = client.tags_list(limit=args.limit) elif args.tags_command == "related": result = client.tags_related(args.tag, limit=args.limit) elif args.tags_command == "suggest": result = client.tags_suggest(args.memory_id) else: # No subcommand given, print tags help # Re-parse to get the tags subparser for help output for action in parser._subparsers._actions: if isinstance(action, argparse._SubParsersAction): tags_parser = action.choices.get("tags") if tags_parser: tags_parser.print_help() break sys.exit(1) elif args.command == "procedure": tags = [t.strip() for t in args.tags.split(",")] if args.tags else None steps = [s.strip() for s in args.steps.split(",")] if args.steps else None preconditions = ( [p.strip() for p in args.preconditions.split(",")] if args.preconditions else None ) postconditions = ( [p.strip() for p in args.postconditions.split(",")] if args.postconditions else None ) memory_id = client.store( type="procedure", title=args.title, content=args.content, tags=tags, importance=args.importance, steps=steps, preconditions=preconditions, postconditions=postconditions, ) result = {"success": True, "memory_id": memory_id} elif args.command == "config": config_path = MEMORY_DIR / "_config.json" config = _load_memory_config(config_path) changed = False if args.provider: config["embedding_provider"] = args.provider changed = True if args.openai_key: config["openai_api_key"] = args.openai_key changed = True if args.ollama_model: config["ollama_model"] = args.ollama_model changed = True if changed: config_path.write_text(json.dumps(config, indent=2)) result = {"success": True, "updated": True} elif args.show or not changed: # Mask API key for display display = dict(config) key = display.get("openai_api_key") if key and isinstance(key, str) and len(key) > 8: display["openai_api_key"] = key[:4] + "..." + key[-4:] result = display print(json.dumps(result, indent=2, default=str)) if __name__ == "__main__": main()