cognitive-memory/client.py
Cal Corum 48df2a89ce Initial commit: extract cognitive-memory app from skill directory
Moved application code from ~/.claude/skills/cognitive-memory/ to its own
project directory. The skill layer (SKILL.md, SCHEMA.md) remains in the
skill directory for Claude Code to read.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 16:02:28 -06:00

1092 lines
40 KiB
Python

#!/usr/bin/env python3
"""
Cognitive Memory Client
Markdown-based memory system with decay scoring, episodic logging, and auto-curated CORE.md.
Stores memories as human-readable markdown files with YAML frontmatter.
Usage:
# CLI
python client.py store --type solution --title "Fixed X" --content "Details..." --tags "python,fix"
python client.py recall "timeout error"
python client.py get <memory_id>
python client.py relate <from_id> <to_id> SOLVES
python client.py core
python client.py episode --type fix --title "Fixed reconnection" --tags "discord,python"
# Python
from client import CognitiveMemoryClient
client = CognitiveMemoryClient()
memory_id = client.store(type="solution", title="...", content="...")
"""
import json
import os
import subprocess
import sys
import tempfile
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
# Import everything from common for backward compatibility
from common import (
CORE_MAX_CHARS,
CONFIG_PATH,
DECAY_LAMBDA,
EDGE_FIELD_ORDER,
EDGES_DIR_NAME,
EMBEDDING_MODEL,
EMBEDDING_TIMEOUT,
EMBEDDINGS_PATH,
FIELD_ORDER,
INDEX_PATH,
MEMORY_DIR,
OLLAMA_URL,
OPENAI_EMBED_URL,
OPENAI_MODEL_DEFAULT,
STATE_PATH,
THRESHOLD_ACTIVE,
THRESHOLD_DORMANT,
THRESHOLD_FADING,
TYPE_DIRS,
TYPE_WEIGHTS,
VALID_RELATION_TYPES,
VALID_TYPES,
_cosine_similarity,
_format_yaml_value,
_load_memory_config,
_make_edge_filename,
_needs_quoting,
_ollama_embed,
_openai_embed,
_parse_relations_block,
_parse_scalar,
_quote_yaml,
calculate_decay_score,
make_filename,
parse_frontmatter,
serialize_edge_frontmatter,
serialize_frontmatter,
slugify,
)
# Import mixins
from edges import EdgesMixin
from embeddings import EmbeddingsMixin
from analysis import AnalysisMixin
# =============================================================================
# CLIENT
# =============================================================================
class CognitiveMemoryClient(EdgesMixin, EmbeddingsMixin, AnalysisMixin):
"""Client for markdown-based cognitive memory system."""
def __init__(self, memory_dir: Optional[Path] = None):
self.memory_dir = memory_dir or MEMORY_DIR
self.index_path = self.memory_dir / "_index.json"
self.state_path = self.memory_dir / "_state.json"
self._embeddings_cache: Optional[Dict] = None
self._embeddings_mtime: float = 0.0
self._ensure_dirs()
def _ensure_dirs(self):
"""Create directory structure if needed."""
for type_dir in TYPE_DIRS.values():
(self.memory_dir / "graph" / type_dir).mkdir(parents=True, exist_ok=True)
(self.memory_dir / "graph" / EDGES_DIR_NAME).mkdir(parents=True, exist_ok=True)
(self.memory_dir / "episodes").mkdir(parents=True, exist_ok=True)
(self.memory_dir / "vault").mkdir(parents=True, exist_ok=True)
def _load_embeddings_cached(self) -> Optional[Dict]:
"""Load _embeddings.json with mtime-based caching.
Returns the parsed dict, or None if the file doesn't exist or fails to parse.
Only re-reads from disk when the file's mtime has changed.
"""
embeddings_path = self.memory_dir / "_embeddings.json"
if not embeddings_path.exists():
return None
try:
current_mtime = embeddings_path.stat().st_mtime
except OSError:
return None
if (
self._embeddings_cache is not None
and current_mtime == self._embeddings_mtime
):
return self._embeddings_cache
try:
data = json.loads(embeddings_path.read_text())
self._embeddings_cache = data
self._embeddings_mtime = current_mtime
return data
except (json.JSONDecodeError, OSError):
return None
# -------------------------------------------------------------------------
# Index and State management
# -------------------------------------------------------------------------
def _load_index(self) -> Dict:
"""Load _index.json, return empty structure if missing."""
if self.index_path.exists():
try:
return json.loads(self.index_path.read_text())
except (json.JSONDecodeError, OSError):
pass
return {"version": 2, "updated": "", "count": 0, "entries": {}, "edges": {}}
def _save_index(self, index: Dict):
"""Write _index.json."""
index["version"] = 2
index.setdefault("edges", {})
index["updated"] = datetime.now(timezone.utc).isoformat()
index["count"] = len(index.get("entries", {}))
self.index_path.write_text(json.dumps(index, indent=2, default=str))
def _load_state(self) -> Dict:
"""Load _state.json, return empty structure if missing."""
if self.state_path.exists():
try:
return json.loads(self.state_path.read_text())
except (json.JSONDecodeError, OSError):
pass
return {"version": 1, "updated": "", "entries": {}}
def _save_state(self, state: Dict):
"""Write _state.json atomically, merging top-level keys to prevent race conditions."""
# Merge with existing state to preserve keys written by concurrent processes
if self.state_path.exists():
try:
existing = json.loads(self.state_path.read_text())
existing.update(state)
state = existing
except (json.JSONDecodeError, OSError):
pass
state["updated"] = datetime.now(timezone.utc).isoformat()
# Atomic write: write to temp file then rename
fd, tmp_path = tempfile.mkstemp(dir=self.memory_dir, suffix=".tmp")
try:
with os.fdopen(fd, "w") as f:
json.dump(state, f, indent=2, default=str)
os.replace(tmp_path, self.state_path)
except Exception:
try:
os.unlink(tmp_path)
except OSError:
pass
raise
# -------------------------------------------------------------------------
# File I/O
# -------------------------------------------------------------------------
def _read_memory_file(self, path: Path) -> Tuple[Dict[str, Any], str]:
"""Read a memory markdown file, return (frontmatter, body)."""
text = path.read_text(encoding="utf-8")
return parse_frontmatter(text)
def _write_memory_file(self, path: Path, frontmatter: Dict[str, Any], body: str):
"""Write a memory markdown file with frontmatter and body."""
fm_str = serialize_frontmatter(frontmatter)
content = f"{fm_str}\n\n{body.strip()}\n" if body.strip() else f"{fm_str}\n"
path.write_text(content, encoding="utf-8")
@staticmethod
def _resolve_prefix(partial_id: str, keys) -> Optional[str]:
"""Resolve a partial UUID prefix to a full ID (git-style).
Returns the full key if exactly one match is found, None if zero
matches, or raises ValueError on ambiguous (multiple) matches.
"""
if partial_id in keys:
return partial_id
matches = [k for k in keys if k.startswith(partial_id)]
if len(matches) == 1:
return matches[0]
if len(matches) > 1:
raise ValueError(
f"Ambiguous ID prefix '{partial_id}' matches {len(matches)} "
f"entries: {', '.join(sorted(matches)[:5])}"
)
return None
def _resolve_memory_path(self, memory_id: str) -> Optional[Path]:
"""Find the file path for a memory by ID using the index."""
index = self._load_index()
entries = index.get("entries", {})
full_id = self._resolve_prefix(memory_id, entries)
if full_id:
entry = entries[full_id]
path = self.memory_dir / entry["path"]
if path.exists():
return path
# Fallback: scan files (slow but reliable)
return self._scan_for_memory(memory_id)
def _scan_for_memory(self, memory_id: str) -> Optional[Path]:
"""Scan graph/ and vault/ directories for a memory file by ID."""
search_dirs = [self.memory_dir / "graph", self.memory_dir / "vault"]
for search_dir in search_dirs:
if not search_dir.exists():
continue
for md_file in search_dir.rglob("*.md"):
try:
fm, _ = self._read_memory_file(md_file)
if fm.get("id") == memory_id:
return md_file
except Exception:
continue
return None
# -------------------------------------------------------------------------
# Git operations
# -------------------------------------------------------------------------
def _git_commit(self, message: str, files: Optional[List[Path]] = None):
"""Stage and commit files in the memory repo."""
try:
# Check if it's a git repo
result = subprocess.run(
["git", "rev-parse", "--git-dir"],
cwd=str(self.memory_dir),
capture_output=True,
timeout=5,
)
if result.returncode != 0:
return # Not a git repo, skip
if files:
for f in files:
rel = f.relative_to(self.memory_dir)
subprocess.run(
["git", "add", str(rel)],
cwd=str(self.memory_dir),
capture_output=True,
timeout=5,
)
else:
subprocess.run(
["git", "add", "-A"],
cwd=str(self.memory_dir),
capture_output=True,
timeout=5,
)
subprocess.run(
["git", "commit", "-m", message, "--allow-empty"],
cwd=str(self.memory_dir),
capture_output=True,
timeout=10,
)
except (subprocess.TimeoutExpired, FileNotFoundError, OSError):
pass # Git operations are best-effort
# -------------------------------------------------------------------------
# Index helpers
# -------------------------------------------------------------------------
def _update_index_entry(
self,
memory_id: str,
frontmatter: Dict[str, Any],
rel_path: str,
content_preview: str = "",
):
"""Add or update an entry in the index."""
index = self._load_index()
entry = {
"title": frontmatter.get("title", ""),
"type": frontmatter.get("type", "general"),
"tags": frontmatter.get("tags", []),
"importance": frontmatter.get("importance", 0.5),
"confidence": frontmatter.get("confidence", 0.8),
"created": frontmatter.get("created", ""),
"updated": frontmatter.get("updated", ""),
"path": rel_path,
"relations": frontmatter.get("relations", []),
"content_preview": content_preview,
}
# Preserve existing content_preview if not provided
if not content_preview:
existing = index.get("entries", {}).get(memory_id, {})
entry["content_preview"] = existing.get("content_preview", "")
index.setdefault("entries", {})[memory_id] = entry
self._save_index(index)
def _remove_index_entry(self, memory_id: str):
"""Remove an entry from the index."""
index = self._load_index()
index.get("entries", {}).pop(memory_id, None)
self._save_index(index)
def _maybe_refresh_decay(self):
"""Auto-refresh all decay scores if state is older than 24 hours."""
if not self.state_path.exists():
self.decay()
return
try:
state = json.loads(self.state_path.read_text())
updated_str = state.get("updated", "")
if not updated_str:
self.decay()
return
updated_dt = datetime.fromisoformat(updated_str.replace("Z", "+00:00"))
if updated_dt.tzinfo is None:
updated_dt = updated_dt.replace(tzinfo=timezone.utc)
age_hours = (datetime.now(timezone.utc) - updated_dt).total_seconds() / 3600
if age_hours > 24:
self.decay()
except (json.JSONDecodeError, ValueError, OSError):
self.decay()
# -------------------------------------------------------------------------
# Edge index helpers
# -------------------------------------------------------------------------
def _update_edge_index(
self, edge_id: str, edge_data: Dict[str, Any], rel_path: str
):
"""Add or update an edge entry in the index."""
index = self._load_index()
index.setdefault("edges", {})[edge_id] = {
"type": edge_data.get("type", ""),
"from_id": edge_data.get("from_id", ""),
"from_title": edge_data.get("from_title", ""),
"to_id": edge_data.get("to_id", ""),
"to_title": edge_data.get("to_title", ""),
"strength": edge_data.get("strength", 0.8),
"created": edge_data.get("created", ""),
"updated": edge_data.get("updated", ""),
"path": rel_path,
}
self._save_index(index)
def _remove_edge_index(self, edge_id: str):
"""Remove an edge entry from the index."""
index = self._load_index()
index.get("edges", {}).pop(edge_id, None)
self._save_index(index)
def _scan_for_edge(self, edge_id: str) -> Optional[Path]:
"""Fallback file scan for an edge by ID if index is stale."""
edges_dir = self.memory_dir / "graph" / EDGES_DIR_NAME
if not edges_dir.exists():
return None
for md_file in edges_dir.glob("*.md"):
try:
fm, _ = self._read_memory_file(md_file)
if fm.get("id") == edge_id:
return md_file
except Exception:
continue
return None
def _resolve_edge_path(self, edge_id: str) -> Optional[Path]:
"""Find the file path for an edge by ID using the index."""
index = self._load_index()
edges = index.get("edges", {})
full_id = self._resolve_prefix(edge_id, edges)
if full_id:
entry = edges[full_id]
path = self.memory_dir / entry["path"]
if path.exists():
return path
return self._scan_for_edge(edge_id)
# =========================================================================
# PUBLIC API
# =========================================================================
def store(
self,
type: str,
title: str,
content: str,
tags: Optional[List[str]] = None,
importance: float = 0.5,
confidence: float = 0.8,
steps: Optional[List[str]] = None,
preconditions: Optional[List[str]] = None,
postconditions: Optional[List[str]] = None,
) -> str:
"""Store a new memory as a markdown file.
Returns the memory UUID.
"""
if type not in VALID_TYPES:
raise ValueError(f"Invalid type: {type}. Valid: {VALID_TYPES}")
memory_id = str(uuid.uuid4())
now = datetime.now(timezone.utc).isoformat()
tags = [t.lower().strip() for t in (tags or [])]
frontmatter = {
"id": memory_id,
"type": type,
"title": title,
"tags": tags,
"importance": max(0.0, min(1.0, importance)),
"confidence": max(0.0, min(1.0, confidence)),
"created": now,
"updated": now,
}
# Add procedure-specific fields when type is "procedure"
if type == "procedure":
if steps:
frontmatter["steps"] = steps
if preconditions:
frontmatter["preconditions"] = preconditions
if postconditions:
frontmatter["postconditions"] = postconditions
# Determine file path
type_dir = TYPE_DIRS[type]
filename = make_filename(title, memory_id)
rel_path = f"graph/{type_dir}/{filename}"
full_path = self.memory_dir / rel_path
# Write file
self._write_memory_file(full_path, frontmatter, content)
# Update index with content preview (truncate at word boundary)
preview = content.strip()[:200]
if len(content.strip()) > 200:
last_space = preview.rfind(" ")
if last_space > 0:
preview = preview[:last_space]
self._update_index_entry(
memory_id, frontmatter, rel_path, content_preview=preview
)
# Init state entry
state = self._load_state()
state.setdefault("entries", {})[memory_id] = {
"access_count": 0,
"last_accessed": now,
"decay_score": importance * TYPE_WEIGHTS.get(type, 1.0) * 0.5,
}
self._save_state(state)
# Git commit
self._git_commit(f"store: {title}", [full_path])
return memory_id
def recall(
self,
query: str,
memory_types: Optional[List[str]] = None,
limit: int = 10,
semantic: bool = True,
) -> List[Dict[str, Any]]:
"""Search memories by query, ranked by relevance and decay score.
When semantic=True and embeddings exist, merges keyword and semantic results.
"""
self._maybe_refresh_decay()
query_lower = query.lower().strip()
terms = query_lower.split()
if not terms:
return []
index = self._load_index()
state = self._load_state()
results = []
for mid, entry in index.get("entries", {}).items():
# Filter by type
if memory_types and entry.get("type") not in memory_types:
continue
# Check decay threshold - skip archived
s = state.get("entries", {}).get(mid, {})
decay = s.get("decay_score", 0.5)
if decay < THRESHOLD_DORMANT:
continue
# Score based on term matches
title = (entry.get("title") or "").lower()
tags_str = " ".join(entry.get("tags") or []).lower()
title_matches = sum(1 for t in terms if t in title) * 3
tag_matches = sum(1 for t in terms if t in tags_str) * 2
score = title_matches + tag_matches
if score == 0:
# Check content_preview from index (no file I/O needed)
preview = (entry.get("content_preview") or "").lower()
preview_matches = sum(1 for t in terms if t in preview)
if preview_matches > 0:
score = preview_matches
else:
continue
# Weight by decay score
weighted_score = score * (1 + decay)
results.append(
{
"id": mid,
"type": entry.get("type"),
"title": entry.get("title"),
"tags": entry.get("tags", []),
"importance": entry.get("importance"),
"decay_score": round(decay, 3),
"path": entry.get("path"),
"created": entry.get("created"),
"_score": weighted_score,
}
)
results.sort(key=lambda x: x.pop("_score", 0), reverse=True)
keyword_results = results[:limit]
# Merge with semantic results (on by default)
# Weights: semantic 60%, keyword 40%
# Conceptual matching dominates; keyword acts as precision boost for exact terms
if semantic:
sem_results = self.semantic_recall(query, limit=limit)
if sem_results:
score_map: Dict[str, float] = {}
result_map: Dict[str, Dict] = {}
# Keyword: normalize rank to 0-1 (rank 1 = 1.0, last = ~0.1)
kw_weight = 0.4
for i, r in enumerate(keyword_results):
mid = r["id"]
normalized = (limit - i) / limit
score_map[mid] = normalized * kw_weight
result_map[mid] = r
# Semantic: similarity is already 0-1
sem_weight = 0.6
for r in sem_results:
mid = r["id"]
sim = r.get("similarity", 0.0)
sem_score = sim * sem_weight
if mid in score_map:
score_map[mid] += sem_score
result_map[mid]["similarity"] = sim
else:
score_map[mid] = sem_score
idx_entry = index.get("entries", {}).get(mid, {})
s = state.get("entries", {}).get(mid, {})
result_map[mid] = {
"id": mid,
"type": r.get("type"),
"title": r.get("title"),
"tags": r.get("tags", []),
"importance": idx_entry.get("importance"),
"decay_score": round(s.get("decay_score", 0.5), 3),
"similarity": sim,
"path": r.get("path"),
"created": idx_entry.get("created"),
}
# Sort by merged score
merged = sorted(
result_map.values(),
key=lambda x: score_map.get(x["id"], 0),
reverse=True,
)
return merged[:limit]
return keyword_results
def get(self, memory_id: str) -> Optional[Dict[str, Any]]:
"""Get a memory by ID, update access count."""
path = self._resolve_memory_path(memory_id)
if not path:
return None
fm, body = self._read_memory_file(path)
# Update access count in state
state = self._load_state()
now = datetime.now(timezone.utc).isoformat()
entry = state.setdefault("entries", {}).setdefault(
memory_id,
{
"access_count": 0,
"last_accessed": now,
"decay_score": 0.5,
},
)
entry["access_count"] = entry.get("access_count", 0) + 1
entry["last_accessed"] = now
# Recalculate decay score for this single memory (just accessed, so days=0)
importance = fm.get("importance", 0.5)
mem_type = fm.get("type", "general")
type_weight = TYPE_WEIGHTS.get(mem_type, 1.0)
entry["decay_score"] = round(
calculate_decay_score(importance, 0, entry["access_count"], type_weight), 4
)
self._save_state(state)
return {
"id": fm.get("id", memory_id),
"type": fm.get("type"),
"title": fm.get("title"),
"content": body,
"tags": fm.get("tags", []),
"importance": fm.get("importance"),
"confidence": fm.get("confidence"),
"created": fm.get("created"),
"updated": fm.get("updated"),
"relations": fm.get("relations", []),
"steps": fm.get("steps", []),
"preconditions": fm.get("preconditions", []),
"postconditions": fm.get("postconditions", []),
"access_count": entry["access_count"],
"decay_score": entry.get("decay_score", 0.5),
"path": str(path.relative_to(self.memory_dir)),
}
def search(
self,
query: Optional[str] = None,
memory_types: Optional[List[str]] = None,
tags: Optional[List[str]] = None,
min_importance: Optional[float] = None,
limit: int = 20,
) -> List[Dict[str, Any]]:
"""Filter memories by type, tags, importance. Optional text query."""
self._maybe_refresh_decay()
index = self._load_index()
state = self._load_state()
query_terms = query.lower().strip().split() if query else []
filter_tags = set(t.lower() for t in tags) if tags else None
results = []
for mid, entry in index.get("entries", {}).items():
# Type filter
if memory_types and entry.get("type") not in memory_types:
continue
# Importance filter
if min_importance and entry.get("importance", 0) < min_importance:
continue
# Tag filter
if filter_tags:
mem_tags = set(t.lower() for t in entry.get("tags", []))
if not mem_tags.intersection(filter_tags):
continue
# Query filter
if query_terms:
title = (entry.get("title") or "").lower()
tags_str = " ".join(entry.get("tags") or []).lower()
searchable = f"{title} {tags_str}"
if not any(t in searchable for t in query_terms):
# Check content_preview from index (no file I/O needed)
preview = (entry.get("content_preview") or "").lower()
if not any(t in preview for t in query_terms):
continue
s = state.get("entries", {}).get(mid, {})
results.append(
{
"id": mid,
"type": entry.get("type"),
"title": entry.get("title"),
"tags": entry.get("tags", []),
"importance": entry.get("importance"),
"decay_score": round(s.get("decay_score", 0.5), 3),
"path": entry.get("path"),
"created": entry.get("created"),
}
)
# Sort by importance descending
results.sort(key=lambda x: x.get("importance", 0), reverse=True)
return results[:limit]
def update(
self,
memory_id: str,
title: Optional[str] = None,
content: Optional[str] = None,
tags: Optional[List[str]] = None,
importance: Optional[float] = None,
) -> bool:
"""Update fields of an existing memory."""
path = self._resolve_memory_path(memory_id)
if not path:
return False
fm, body = self._read_memory_file(path)
now = datetime.now(timezone.utc).isoformat()
if title is not None:
fm["title"] = title
if tags is not None:
fm["tags"] = [t.lower().strip() for t in tags]
if importance is not None:
fm["importance"] = max(0.0, min(1.0, importance))
if content is not None:
body = content
fm["updated"] = now
self._write_memory_file(path, fm, body)
# Update index (refresh content_preview if content changed)
rel_path = str(path.relative_to(self.memory_dir))
preview = ""
if content is not None:
preview = body.strip()[:200]
if len(body.strip()) > 200:
last_space = preview.rfind(" ")
if last_space > 0:
preview = preview[:last_space]
self._update_index_entry(memory_id, fm, rel_path, content_preview=preview)
self._git_commit(f"update: {fm.get('title', memory_id[:8])}", [path])
return True
def delete(self, memory_id: str) -> bool:
"""Delete a memory file, cascade-delete edges, remove from index."""
path = self._resolve_memory_path(memory_id)
if not path:
return False
fm, _ = self._read_memory_file(path)
title = fm.get("title", memory_id[:8])
# Cascade-delete edges where from_id or to_id matches
index = self._load_index()
edges_to_delete = []
for eid, edata in index.get("edges", {}).items():
if edata.get("from_id") == memory_id or edata.get("to_id") == memory_id:
edges_to_delete.append(eid)
for eid in edges_to_delete:
self.edge_delete(eid)
# Remove file
path.unlink()
# Remove from index and state
self._remove_index_entry(memory_id)
state = self._load_state()
state.get("entries", {}).pop(memory_id, None)
self._save_state(state)
# Remove incoming references from other memories
index = self._load_index()
for mid, entry in index.get("entries", {}).items():
rels = entry.get("relations", [])
original_count = len(rels)
rels = [r for r in rels if r.get("target") != memory_id]
if len(rels) != original_count:
entry["relations"] = rels
other_path = self._resolve_memory_path(mid)
if other_path:
try:
other_fm, other_body = self._read_memory_file(other_path)
other_fm["relations"] = [
r
for r in other_fm.get("relations", [])
if r.get("target") != memory_id
]
self._write_memory_file(other_path, other_fm, other_body)
except Exception:
pass
self._save_index(index)
# Git: stage deletion
try:
rel_path = path.relative_to(self.memory_dir)
subprocess.run(
["git", "rm", "--cached", str(rel_path)],
cwd=str(self.memory_dir),
capture_output=True,
timeout=5,
)
except Exception:
pass
self._git_commit(f"delete: {title}")
return True
def related(
self,
memory_id: str,
rel_types: Optional[List[str]] = None,
max_depth: int = 1,
) -> List[Dict[str, Any]]:
"""Traverse relations from a memory, depth-limited BFS."""
max_depth = max(1, min(5, max_depth))
index = self._load_index()
visited = set()
results = []
# Resolve partial ID prefix to full UUID
entries = index.get("entries", {})
resolved = self._resolve_prefix(memory_id, entries)
if resolved:
memory_id = resolved
def traverse(mid: str, depth: int):
if depth > max_depth or mid in visited:
return
visited.add(mid)
entry = entries.get(mid)
if not entry:
return
for rel in entry.get("relations", []):
target_id = rel.get("target")
if not target_id or target_id in visited:
continue
if rel_types and rel.get("type") not in rel_types:
continue
target_entry = index.get("entries", {}).get(target_id)
if target_entry:
results.append(
{
"id": target_id,
"type": target_entry.get("type"),
"title": target_entry.get("title"),
"relationship": rel.get("type"),
"direction": rel.get("direction", "outgoing"),
"strength": rel.get("strength"),
"depth": depth,
}
)
traverse(target_id, depth + 1)
traverse(memory_id, 1)
return results
def stats(self) -> Dict[str, Any]:
"""Get statistics about the memory system."""
index = self._load_index()
state = self._load_state()
entries = index.get("entries", {})
by_type = {}
total_relations = 0
for entry in entries.values():
t = entry.get("type", "unknown")
by_type[t] = by_type.get(t, 0) + 1
total_relations += len(entry.get("relations", []))
# Count files per directory
dir_counts = {}
for type_dir in TYPE_DIRS.values():
d = self.memory_dir / "graph" / type_dir
if d.exists():
dir_counts[type_dir] = len(list(d.glob("*.md")))
vault_count = (
len(list((self.memory_dir / "vault").glob("*.md")))
if (self.memory_dir / "vault").exists()
else 0
)
# Decay stats
state_entries = state.get("entries", {})
active = sum(
1
for s in state_entries.values()
if s.get("decay_score", 0) >= THRESHOLD_ACTIVE
)
fading = sum(
1
for s in state_entries.values()
if THRESHOLD_FADING <= s.get("decay_score", 0) < THRESHOLD_ACTIVE
)
dormant = sum(
1
for s in state_entries.values()
if THRESHOLD_DORMANT <= s.get("decay_score", 0) < THRESHOLD_FADING
)
archived = sum(
1
for s in state_entries.values()
if s.get("decay_score", 0) < THRESHOLD_DORMANT
)
# Unique outgoing relations only (avoid double-counting)
unique_relations = total_relations // 2 if total_relations > 0 else 0
return {
"total_memories": len(entries),
"by_type": by_type,
"dir_counts": dir_counts,
"vault_count": vault_count,
"total_relations": unique_relations,
"decay_summary": {
"active": active,
"fading": fading,
"dormant": dormant,
"archived": archived,
},
"memory_dir": str(self.memory_dir),
}
def recent(self, limit: int = 20) -> List[Dict[str, Any]]:
"""Get most recently created memories."""
index = self._load_index()
entries = [
{"id": mid, **entry} for mid, entry in index.get("entries", {}).items()
]
entries.sort(key=lambda x: x.get("created", ""), reverse=True)
return entries[:limit]
def episode(
self,
type: str,
title: str,
tags: Optional[List[str]] = None,
summary: Optional[str] = None,
memory_link: Optional[str] = None,
):
"""Append an entry to today's episode file."""
now_local = datetime.now().astimezone()
today = now_local.strftime("%Y-%m-%d")
time_str = now_local.strftime("%H:%M")
ep_path = self.memory_dir / "episodes" / f"{today}.md"
tags = tags or []
tags_str = ", ".join(tags)
entry_lines = [
f"## {time_str} - {title}",
f"- **Type:** {type}",
]
if tags_str:
entry_lines.append(f"- **Tags:** {tags_str}")
if memory_link:
# Extract name from link path
name = Path(memory_link).stem
entry_lines.append(f"- **Memory:** [{name}]({memory_link})")
if summary:
entry_lines.append(f"- **Summary:** {summary}")
entry_text = "\n".join(entry_lines)
if ep_path.exists():
existing = ep_path.read_text(encoding="utf-8")
new_content = f"{existing.rstrip()}\n\n{entry_text}\n"
else:
new_content = f"# {today}\n\n{entry_text}\n"
ep_path.write_text(new_content, encoding="utf-8")
self._git_commit(f"episode: {title}", [ep_path])
def reindex(self) -> int:
"""Rebuild _index.json from all markdown files. Recovery command."""
index = {"version": 2, "updated": "", "count": 0, "entries": {}, "edges": {}}
count = 0
search_dirs = [
("graph", self.memory_dir / "graph"),
("vault", self.memory_dir / "vault"),
]
edges_dir = self.memory_dir / "graph" / EDGES_DIR_NAME
for _prefix, search_dir in search_dirs:
if not search_dir.exists():
continue
for md_file in search_dir.rglob("*.md"):
# Skip edge files — handled separately
if edges_dir.exists() and md_file.parent == edges_dir:
continue
try:
fm, body = self._read_memory_file(md_file)
mid = fm.get("id")
if not mid:
continue
rel_path = str(md_file.relative_to(self.memory_dir))
preview = body.strip()[:200]
if len(body.strip()) > 200:
last_space = preview.rfind(" ")
if last_space > 0:
preview = preview[:last_space]
index["entries"][mid] = {
"title": fm.get("title", ""),
"type": fm.get("type", "general"),
"tags": fm.get("tags", []),
"importance": fm.get("importance", 0.5),
"confidence": fm.get("confidence", 0.8),
"created": fm.get("created", ""),
"updated": fm.get("updated", ""),
"path": rel_path,
"relations": fm.get("relations", []),
"content_preview": preview,
}
count += 1
except Exception as e:
print(f"Warning: Failed to index {md_file}: {e}", file=sys.stderr)
# Scan edge files
if edges_dir.exists():
for md_file in edges_dir.glob("*.md"):
try:
fm, _ = self._read_memory_file(md_file)
eid = fm.get("id")
if not eid:
continue
rel_path = str(md_file.relative_to(self.memory_dir))
index["edges"][eid] = {
"type": fm.get("type", ""),
"from_id": fm.get("from_id", ""),
"from_title": fm.get("from_title", ""),
"to_id": fm.get("to_id", ""),
"to_title": fm.get("to_title", ""),
"strength": fm.get("strength", 0.8),
"created": fm.get("created", ""),
"updated": fm.get("updated", ""),
"path": rel_path,
}
except Exception as e:
print(
f"Warning: Failed to index edge {md_file}: {e}", file=sys.stderr
)
self._save_index(index)
return count
def pin(self, memory_id: str) -> bool:
"""Move a memory to vault/ (never decays)."""
path = self._resolve_memory_path(memory_id)
if not path:
return False
fm, body = self._read_memory_file(path)
vault_dir = self.memory_dir / "vault"
vault_dir.mkdir(parents=True, exist_ok=True)
new_path = vault_dir / path.name
self._write_memory_file(new_path, fm, body)
# Remove old file
old_rel = str(path.relative_to(self.memory_dir))
path.unlink()
# Update index with new path
new_rel = str(new_path.relative_to(self.memory_dir))
self._update_index_entry(memory_id, fm, new_rel)
# Set decay to infinity in state
state = self._load_state()
state.setdefault("entries", {}).setdefault(memory_id, {})["decay_score"] = 999.0
self._save_state(state)
self._git_commit(f"pin: {fm.get('title', memory_id[:8])}", [new_path])
return True
if __name__ == "__main__":
from cli import main
main()