refactor: split client.py into 6 focused modules using mixin composition

3,348-line monolith → 6 modules with mixin classes resolving via MRO.
client.py retains __init__, internal helpers, and core CRUD (1,091 lines).
All backward-compat imports preserved for mcp_server.py and dev/migrate.py.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Cal Corum 2026-02-28 12:12:57 -06:00
parent 471d8709f6
commit ba0562a8ba
6 changed files with 2449 additions and 2308 deletions

View File

@ -0,0 +1,861 @@
"""
Cognitive Memory - Analysis Mixin
Decay scoring, CORE.md generation, tag analysis, reflection clustering,
memory merging, and REFLECTION.md generation.
"""
import json
import subprocess
import sys
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from common import (
CORE_MAX_CHARS,
MEMORY_DIR,
THRESHOLD_ACTIVE,
THRESHOLD_DORMANT,
THRESHOLD_FADING,
TYPE_WEIGHTS,
calculate_decay_score,
)
class AnalysisMixin:
"""Mixin providing analysis operations for CognitiveMemoryClient."""
def decay(self) -> Dict[str, Any]:
"""Recalculate all decay scores. Updates _state.json."""
index = self._load_index()
state = self._load_state()
now = datetime.now(timezone.utc)
updated_count = 0
for mid, entry in index.get("entries", {}).items():
s = state.setdefault("entries", {}).setdefault(
mid,
{
"access_count": 0,
"last_accessed": entry.get("created", now.isoformat()),
"decay_score": 0.5,
},
)
# Calculate days since last access
last_str = s.get("last_accessed", entry.get("created", ""))
try:
last_dt = datetime.fromisoformat(last_str.replace("Z", "+00:00"))
if last_dt.tzinfo is None:
last_dt = last_dt.replace(tzinfo=timezone.utc)
days = (now - last_dt).total_seconds() / 86400
except (ValueError, AttributeError):
days = 30 # Default to 30 days if unparseable
importance = entry.get("importance", 0.5)
mem_type = entry.get("type", "general")
type_weight = TYPE_WEIGHTS.get(mem_type, 1.0)
access_count = s.get("access_count", 0)
# Check if pinned (vault)
path = entry.get("path", "")
if path.startswith("vault/"):
s["decay_score"] = 999.0 # Pinned memories never decay
else:
s["decay_score"] = round(
calculate_decay_score(importance, days, access_count, type_weight),
4,
)
updated_count += 1
self._save_state(state)
# Summary
state_entries = state.get("entries", {})
return {
"updated": updated_count,
"active": sum(
1
for s in state_entries.values()
if s.get("decay_score", 0) >= THRESHOLD_ACTIVE
),
"fading": sum(
1
for s in state_entries.values()
if THRESHOLD_FADING <= s.get("decay_score", 0) < THRESHOLD_ACTIVE
),
"dormant": sum(
1
for s in state_entries.values()
if THRESHOLD_DORMANT <= s.get("decay_score", 0) < THRESHOLD_FADING
),
"archived": sum(
1
for s in state_entries.values()
if 0 < s.get("decay_score", 0) < THRESHOLD_DORMANT
),
}
def core(self) -> str:
"""Generate CORE.md from top memories by decay score."""
index = self._load_index()
state = self._load_state()
def _extract_summary(body: str, max_len: int = 80) -> str:
"""Extract first meaningful sentence from memory body."""
if not body or not body.strip():
return ""
# Skip leading code blocks, headings, and list markers
for ln in body.strip().split("\n"):
stripped = ln.strip()
if not stripped:
continue
if stripped.startswith("```") or stripped.startswith("#"):
continue
first_line = stripped.lstrip("- *>").strip()
break
else:
return ""
if not first_line:
return ""
# Extract first sentence (split on '. ')
dot_pos = first_line.find(". ")
if 0 < dot_pos < max_len:
sentence = first_line[: dot_pos + 1]
else:
sentence = first_line
if len(sentence) > max_len:
sentence = sentence[: max_len - 3].rstrip() + "..."
return sentence
# Collect all memories with decay scores
memories = []
for mid, entry in index.get("entries", {}).items():
s = state.get("entries", {}).get(mid, {})
decay = s.get("decay_score", 0.5)
if decay < THRESHOLD_FADING:
continue # Skip low-relevance
memories.append(
{
"id": mid,
"title": entry.get("title", ""),
"type": entry.get("type", "general"),
"path": entry.get("path", ""),
"importance": entry.get("importance", 0.5),
"decay_score": decay,
"tags": entry.get("tags", []),
}
)
# Sort by decay score descending
memories.sort(key=lambda x: x["decay_score"], reverse=True)
# Group by type category
categories = {
"Critical Solutions": ["solution"],
"Active Decisions": ["decision"],
"Key Fixes": ["fix"],
"Configurations": ["configuration"],
"Key Procedures": ["procedure"],
"Patterns & Workflows": ["code_pattern", "workflow"],
"Known Issues": ["problem", "error"],
"Insights": ["insight"],
"General": ["general"],
}
now_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
active_count = sum(1 for m in memories if m["decay_score"] >= THRESHOLD_ACTIVE)
total_count = len(index.get("entries", {}))
lines = [
"# Memory Core (auto-generated)",
f"> Last updated: {now_str} | Active memories: {active_count}/{total_count} | Next refresh: daily (systemd timer)",
"",
]
char_count = sum(len(l) for l in lines)
for cat_name, cat_types in categories.items():
cat_memories = [m for m in memories if m["type"] in cat_types]
if not cat_memories:
continue
section_lines = [f"## {cat_name}", ""]
for mem in cat_memories[:10]: # Cap per section (reduced for summaries)
path = mem["path"]
title = mem["title"]
tags = ", ".join(mem["tags"][:3]) if mem["tags"] else ""
# Read body to extract summary
summary = ""
mem_path = self.memory_dir / path
if mem_path.exists():
try:
_, body = self._read_memory_file(mem_path)
summary = _extract_summary(body)
except Exception:
pass
line = f"- [{title}]({path})"
if summary:
line += f" - {summary}"
if tags:
line += f" ({tags})"
section_lines.append(line)
# Check budget
added = sum(len(l) for l in section_lines) + 2
if char_count + added > CORE_MAX_CHARS:
break
section_lines.append("")
section_text = "\n".join(section_lines)
if char_count + len(section_text) > CORE_MAX_CHARS:
break
lines.extend(section_lines)
char_count += len(section_text)
core_content = "\n".join(lines)
core_path = self.memory_dir / "CORE.md"
core_path.write_text(core_content, encoding="utf-8")
self._git_commit("core: regenerate CORE.md", [core_path])
return core_content
def tags_list(self, limit: int = 0) -> List[Dict[str, Any]]:
"""List all tags with occurrence counts across all memories.
Returns list of {"tag": str, "count": int} sorted by count descending.
"""
index = self._load_index()
tag_counts: Dict[str, int] = {}
for entry in index.get("entries", {}).values():
for tag in entry.get("tags", []):
normalized = tag.lower().strip()
if normalized:
tag_counts[normalized] = tag_counts.get(normalized, 0) + 1
results = [{"tag": tag, "count": count} for tag, count in tag_counts.items()]
results.sort(key=lambda x: x["count"], reverse=True)
if limit > 0:
results = results[:limit]
return results
def tags_related(self, tag: str, limit: int = 0) -> List[Dict[str, Any]]:
"""Find tags that co-occur with the given tag.
Returns list of {"tag": str, "co_occurrences": int, "memories_with_both": int}
sorted by co_occurrences descending.
"""
tag = tag.lower().strip()
index = self._load_index()
co_counts: Dict[str, int] = {}
for entry in index.get("entries", {}).values():
entry_tags = [t.lower().strip() for t in entry.get("tags", [])]
if tag not in entry_tags:
continue
for other_tag in entry_tags:
if other_tag != tag and other_tag:
co_counts[other_tag] = co_counts.get(other_tag, 0) + 1
results = [
{"tag": t, "co_occurrences": count, "memories_with_both": count}
for t, count in co_counts.items()
]
results.sort(key=lambda x: x["co_occurrences"], reverse=True)
if limit > 0:
results = results[:limit]
return results
def tags_suggest(self, memory_id: str) -> List[Dict[str, Any]]:
"""Suggest tags for a memory based on co-occurrence patterns.
Returns top 10 suggestions as {"tag": str, "score": int, "reason": str}.
"""
index = self._load_index()
entry = index.get("entries", {}).get(memory_id)
if not entry:
return []
existing_tags = set(t.lower().strip() for t in entry.get("tags", []))
if not existing_tags:
return []
# For each existing tag, find co-occurring tags and accumulate scores
candidate_scores: Dict[str, int] = {}
candidate_sources: Dict[str, set] = {}
for existing_tag in existing_tags:
co_tags = self.tags_related(existing_tag)
for co_entry in co_tags:
candidate = co_entry["tag"]
if candidate in existing_tags:
continue
candidate_scores[candidate] = (
candidate_scores.get(candidate, 0) + co_entry["co_occurrences"]
)
if candidate not in candidate_sources:
candidate_sources[candidate] = set()
candidate_sources[candidate].add(existing_tag)
results = []
for candidate, score in candidate_scores.items():
sources = sorted(candidate_sources[candidate])
reason = "co-occurs with: " + ", ".join(sources)
results.append({"tag": candidate, "score": score, "reason": reason})
results.sort(key=lambda x: x["score"], reverse=True)
return results[:10]
def reflect(
self,
since: Optional[str] = None,
dry_run: bool = False,
) -> Dict[str, Any]:
"""Review recent memories, identify tag-based clusters, and output consolidation recommendations.
Clusters memories that share 2+ tags using iterative union-find merging.
Does NOT auto-create insights; the agent reads output and decides what to store.
Args:
since: ISO date string (YYYY-MM-DD) to filter memories from. Falls back to
_state.json's last_reflection, then 30 days ago.
dry_run: If True, return analysis without updating _state.json or logging episode.
Returns:
Dict with clusters, total_memories_reviewed, clusters_found, and since date.
"""
now = datetime.now(timezone.utc)
state = self._load_state()
# Determine time window
if since:
since_date = since
elif state.get("last_reflection"):
# last_reflection may be a full ISO timestamp; extract date portion
since_date = state["last_reflection"][:10]
else:
since_dt = now - timedelta(days=30)
since_date = since_dt.strftime("%Y-%m-%d")
# Load recent memories from index
index = self._load_index()
recent_memories = []
for mid, entry in index.get("entries", {}).items():
created = entry.get("created", "")
# Compare date portion only (YYYY-MM-DD)
created_date = created[:10] if created else ""
if created_date >= since_date:
recent_memories.append(
{
"id": mid,
"title": entry.get("title", ""),
"type": entry.get("type", "general"),
"tags": [t.lower().strip() for t in entry.get("tags", [])],
}
)
total_reviewed = len(recent_memories)
# Union-find clustering by tag overlap >= 2
# parent[i] = index of parent in recent_memories list
n = len(recent_memories)
parent = list(range(n))
def find(x: int) -> int:
while parent[x] != x:
parent[x] = parent[parent[x]] # path compression
x = parent[x]
return x
def union(a: int, b: int):
ra, rb = find(a), find(b)
if ra != rb:
parent[rb] = ra
# Compare all pairs, check tag overlap
for i in range(n):
tags_i = set(recent_memories[i]["tags"])
for j in range(i + 1, n):
tags_j = set(recent_memories[j]["tags"])
if len(tags_i & tags_j) >= 2:
union(i, j)
# Group by root
clusters_map: Dict[int, List[int]] = {}
for i in range(n):
root = find(i)
clusters_map.setdefault(root, []).append(i)
# Build output clusters (only clusters with 2+ members)
clusters = []
cluster_id = 0
for indices in clusters_map.values():
if len(indices) < 2:
continue
cluster_id += 1
members = []
all_tag_sets = []
types_seen = set()
for idx in indices:
mem = recent_memories[idx]
members.append(
{
"id": mem["id"],
"title": mem["title"],
"type": mem["type"],
"tags": mem["tags"],
}
)
all_tag_sets.append(set(mem["tags"]))
types_seen.add(mem["type"])
# Common tags = intersection of ALL members
common_tags = (
sorted(set.intersection(*all_tag_sets)) if all_tag_sets else []
)
# Shared tags = tags appearing in 2+ members
tag_counts: Dict[str, int] = {}
for ts in all_tag_sets:
for t in ts:
tag_counts[t] = tag_counts.get(t, 0) + 1
shared_tags = sorted(t for t, c in tag_counts.items() if c >= 2)
# Suggested topic
tag_label = (
", ".join(common_tags[:4])
if common_tags
else ", ".join(shared_tags[:4])
)
type_label = ", ".join(sorted(types_seen))
suggested_topic = f"Pattern: {tag_label} across {type_label}"
clusters.append(
{
"cluster_id": cluster_id,
"members": members,
"common_tags": common_tags,
"shared_tags": shared_tags,
"suggested_topic": suggested_topic,
"member_count": len(members),
}
)
# Sort clusters by member count descending
clusters.sort(key=lambda c: c["member_count"], reverse=True)
# Re-number after sorting
for i, c in enumerate(clusters):
c["cluster_id"] = i + 1
result = {
"clusters": clusters,
"total_memories_reviewed": total_reviewed,
"clusters_found": len(clusters),
"since": since_date,
}
if not dry_run:
# Update state with last_reflection timestamp
state["last_reflection"] = now.isoformat()
self._save_state(state)
# Log an episode entry
self.episode(
type="reflection",
title=f"Reflection: {len(clusters)} clusters from {total_reviewed} memories",
tags=["reflection", "cognitive-memory"],
summary=f"Reviewed {total_reviewed} memories since {since_date}, found {len(clusters)} clusters",
)
# Regenerate REFLECTION.md
self.reflection_summary()
return result
def merge(
self,
keep_id: str,
absorb_id: str,
dry_run: bool = False,
) -> Dict[str, Any]:
"""Merge two memories: absorb one into another.
The 'keep' memory absorbs the content, tags, and relations from
the 'absorb' memory. The absorb memory is then deleted.
All other memories referencing absorb_id are updated to point to keep_id.
If dry_run=True, returns what would change without writing anything.
"""
# Resolve both memory file paths
keep_path = self._resolve_memory_path(keep_id)
if not keep_path:
raise ValueError(f"Keep memory not found: {keep_id}")
absorb_path = self._resolve_memory_path(absorb_id)
if not absorb_path:
raise ValueError(f"Absorb memory not found: {absorb_id}")
# Read both memory files
keep_fm, keep_body = self._read_memory_file(keep_path)
absorb_fm, absorb_body = self._read_memory_file(absorb_path)
keep_title = keep_fm.get("title", keep_id[:8])
absorb_title = absorb_fm.get("title", absorb_id[:8])
# Combine content
merged_body = keep_body
if absorb_body.strip():
merged_body = (
keep_body.rstrip()
+ f"\n\n---\n*Merged from: {absorb_title}*\n\n"
+ absorb_body.strip()
)
# Merge tags (sorted, deduplicated)
keep_tags = keep_fm.get("tags", [])
absorb_tags = absorb_fm.get("tags", [])
merged_tags = sorted(list(set(keep_tags + absorb_tags)))
# importance = max of both
merged_importance = max(
keep_fm.get("importance", 0.5),
absorb_fm.get("importance", 0.5),
)
# Merge relations: combine both relation lists
keep_rels = list(keep_fm.get("relations", []))
absorb_rels = list(absorb_fm.get("relations", []))
combined_rels = keep_rels + absorb_rels
# Replace any relation targeting absorb_id with keep_id
for rel in combined_rels:
if rel.get("target") == absorb_id:
rel["target"] = keep_id
# Deduplicate by (target, type) tuple
seen = set()
deduped_rels = []
for rel in combined_rels:
key = (rel.get("target"), rel.get("type"))
if key not in seen:
seen.add(key)
deduped_rels.append(rel)
# Remove self-referential relations (target == keep_id)
merged_rels = [r for r in deduped_rels if r.get("target") != keep_id]
# Scan all other memories for relations targeting absorb_id
index = self._load_index()
updated_others = []
for mid, entry in index.get("entries", {}).items():
if mid in (keep_id, absorb_id):
continue
rels = entry.get("relations", [])
needs_update = any(r.get("target") == absorb_id for r in rels)
if needs_update:
updated_others.append(mid)
if dry_run:
return {
"dry_run": True,
"keep_id": keep_id,
"keep_title": keep_title,
"absorb_id": absorb_id,
"absorb_title": absorb_title,
"merged_tags": merged_tags,
"merged_importance": merged_importance,
"merged_relations_count": len(merged_rels),
"other_memories_updated": len(updated_others),
"other_memory_ids": updated_others,
}
# Write updated keep memory file
now = datetime.now(timezone.utc).isoformat()
keep_fm["tags"] = merged_tags
keep_fm["importance"] = merged_importance
keep_fm["relations"] = merged_rels
keep_fm["updated"] = now
self._write_memory_file(keep_path, keep_fm, merged_body)
# Update index for keep memory (refresh content_preview with merged body)
keep_rel_path = str(keep_path.relative_to(self.memory_dir))
preview = merged_body.strip()[:200]
if len(merged_body.strip()) > 200:
last_space = preview.rfind(" ")
if last_space > 0:
preview = preview[:last_space]
self._update_index_entry(
keep_id, keep_fm, keep_rel_path, content_preview=preview
)
# Update all other memories that reference absorb_id
for mid in updated_others:
other_path = self._resolve_memory_path(mid)
if not other_path:
continue
try:
other_fm, other_body = self._read_memory_file(other_path)
other_rels = other_fm.get("relations", [])
for rel in other_rels:
if rel.get("target") == absorb_id:
rel["target"] = keep_id
# Deduplicate after replacement
seen_other = set()
deduped_other = []
for rel in other_rels:
key = (rel.get("target"), rel.get("type"))
if key not in seen_other:
seen_other.add(key)
deduped_other.append(rel)
# Remove self-referential
other_fm["relations"] = [
r for r in deduped_other if r.get("target") != mid
]
other_fm["updated"] = now
self._write_memory_file(other_path, other_fm, other_body)
other_rel_path = str(other_path.relative_to(self.memory_dir))
self._update_index_entry(mid, other_fm, other_rel_path)
except Exception:
pass
# Delete absorb memory file and remove from index/state
absorb_path.unlink()
self._remove_index_entry(absorb_id)
state = self._load_state()
state.get("entries", {}).pop(absorb_id, None)
self._save_state(state)
# Git: stage absorb deletion
try:
absorb_rel = absorb_path.relative_to(self.memory_dir)
subprocess.run(
["git", "rm", "--cached", str(absorb_rel)],
cwd=str(self.memory_dir),
capture_output=True,
timeout=5,
)
except Exception:
pass
self._git_commit(f"merge: {keep_title} absorbed {absorb_title}")
return {
"success": True,
"keep_id": keep_id,
"keep_title": keep_title,
"absorb_id": absorb_id,
"absorb_title": absorb_title,
"merged_tags": merged_tags,
"merged_importance": merged_importance,
"merged_relations_count": len(merged_rels),
"other_memories_updated": len(updated_others),
}
def reflection_summary(self) -> str:
"""Generate REFLECTION.md with patterns and insights from the memory system.
Writes to ~/.claude/memory/REFLECTION.md and git-commits the file.
Returns the generated content.
"""
index = self._load_index()
state = self._load_state()
entries = index.get("entries", {})
state_entries = state.get("entries", {})
now_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
# Last reflection date
last_reflection = state.get("last_reflection", "")
last_reflection_date = last_reflection[:10] if last_reflection else "never"
# Count total reflections from episode files
total_reflections = 0
episodes_dir = self.memory_dir / "episodes"
if episodes_dir.exists():
for ep_file in sorted(episodes_dir.glob("*.md")):
try:
text = ep_file.read_text(encoding="utf-8")
# Count lines that look like reflection episode entries
for line in text.split("\n"):
if "Reflection:" in line and line.strip().startswith("## "):
total_reflections += 1
except OSError:
pass
lines = [
"# Reflection Summary (auto-generated)",
f"> Last updated: {now_str} | Last reflection: {last_reflection_date} | Total reflections: {total_reflections}",
"",
]
# === Themes section ===
# Get top tags, then find co-occurrence pairs
lines.append("## Themes")
lines.append("Top tag co-occurrences revealing recurring themes:")
lines.append("")
top_tags = self.tags_list(limit=10)
# Build co-occurrence pairs across top tags
pair_data: Dict[Tuple[str, str], List[str]] = {} # (tag1, tag2) -> [titles]
for tag_info in top_tags[:5]:
tag = tag_info["tag"]
co_tags = self.tags_related(tag, limit=5)
for co in co_tags:
other = co["tag"]
# Canonical pair ordering
pair = tuple(sorted([tag, other]))
if pair in pair_data:
continue
# Find memories with both tags and collect titles
titles = []
for mid, entry in entries.items():
mem_tags = [t.lower().strip() for t in entry.get("tags", [])]
if pair[0] in mem_tags and pair[1] in mem_tags:
titles.append(entry.get("title", "untitled"))
if titles:
pair_data[pair] = titles
# Sort pairs by memory count descending, show top 8
sorted_pairs = sorted(pair_data.items(), key=lambda x: len(x[1]), reverse=True)
for (tag_a, tag_b), titles in sorted_pairs[:8]:
example_titles = ", ".join(f'"{t}"' for t in titles[:3])
lines.append(
f"- **{tag_a} + {tag_b}**: {len(titles)} memories ({example_titles})"
)
if not sorted_pairs:
lines.append("- No co-occurrence data available yet.")
lines.append("")
# === Cross-Project Patterns section ===
lines.append("## Cross-Project Patterns")
lines.append("Tags that span multiple projects:")
lines.append("")
known_projects = [
"major-domo",
"paper-dynasty",
"homelab",
"vagabond-rpg",
"foundryvtt",
"strat-gameplay-webapp",
]
# Build tag -> {project -> count} mapping
tag_project_map: Dict[str, Dict[str, int]] = {}
for mid, entry in entries.items():
mem_tags = [t.lower().strip() for t in entry.get("tags", [])]
# Identify which projects this memory belongs to
mem_projects = [t for t in mem_tags if t in known_projects]
# For non-project tags, record which projects they appear in
non_project_tags = [t for t in mem_tags if t not in known_projects]
for tag in non_project_tags:
if tag not in tag_project_map:
tag_project_map[tag] = {}
for proj in mem_projects:
tag_project_map[tag][proj] = tag_project_map[tag].get(proj, 0) + 1
# Filter to tags spanning 2+ projects, sort by project count then total
cross_project = []
for tag, projects in tag_project_map.items():
if len(projects) >= 2:
total = sum(projects.values())
cross_project.append((tag, projects, total))
cross_project.sort(key=lambda x: (len(x[1]), x[2]), reverse=True)
for tag, projects, total in cross_project[:10]:
proj_parts = ", ".join(
f"{p} ({c})"
for p, c in sorted(projects.items(), key=lambda x: x[1], reverse=True)
)
lines.append(f"- **{tag}**: appears in {proj_parts}")
if not cross_project:
lines.append("- No cross-project patterns detected yet.")
lines.append("")
# === Most Accessed section ===
lines.append("## Most Accessed")
lines.append("Top 10 memories by access count:")
lines.append("")
# Sort state entries by access_count descending
accessed = []
for mid, s in state_entries.items():
count = s.get("access_count", 0)
if count > 0:
entry = entries.get(mid)
if entry:
accessed.append(
(
mid,
entry.get("title", "untitled"),
entry.get("path", ""),
count,
)
)
accessed.sort(key=lambda x: x[3], reverse=True)
for mid, title, path, count in accessed[:10]:
lines.append(f"1. [{title}]({path}) - {count} accesses")
if not accessed:
lines.append("- No access data recorded yet.")
lines.append("")
# === Recent Insights section ===
lines.append("## Recent Insights")
lines.append("Insight-type memories:")
lines.append("")
insights = []
for mid, entry in entries.items():
if entry.get("type") == "insight":
insights.append((mid, entry))
# Sort by created date descending
insights.sort(key=lambda x: x[1].get("created", ""), reverse=True)
for mid, entry in insights[:10]:
title = entry.get("title", "untitled")
path = entry.get("path", "")
preview = entry.get("content_preview", "")
if preview:
lines.append(f"- [{title}]({path}) - {preview[:80]}")
else:
lines.append(f"- [{title}]({path})")
if not insights:
lines.append("- No insight memories stored yet.")
lines.append("")
# === Consolidation History section ===
lines.append("## Consolidation History")
lines.append("")
merge_count = 0
if episodes_dir.exists():
for ep_file in sorted(episodes_dir.glob("*.md")):
try:
text = ep_file.read_text(encoding="utf-8")
for line_text in text.split("\n"):
stripped = line_text.strip()
if stripped.startswith("## ") and "merge" in stripped.lower():
merge_count += 1
except OSError:
pass
lines.append(f"- Total merges performed: {merge_count}")
lines.append("")
content = "\n".join(lines)
# Write REFLECTION.md
reflection_path = self.memory_dir / "REFLECTION.md"
reflection_path.write_text(content, encoding="utf-8")
self._git_commit("reflection: regenerate REFLECTION.md", [reflection_path])
return content

View File

@ -0,0 +1,486 @@
#!/usr/bin/env python3
"""
Cognitive Memory - CLI Interface
Command-line interface for the cognitive memory system.
"""
import argparse
import json
import sys
from client import CognitiveMemoryClient
from common import (
MEMORY_DIR,
VALID_RELATION_TYPES,
VALID_TYPES,
_load_memory_config,
)
def main():
parser = argparse.ArgumentParser(
description="Cognitive Memory - Markdown-based memory system with decay scoring",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
subparsers = parser.add_subparsers(dest="command", help="Commands")
# store
sp = subparsers.add_parser("store", help="Store a new memory")
sp.add_argument(
"--type", "-t", required=True, choices=VALID_TYPES, help="Memory type"
)
sp.add_argument("--title", required=True, help="Memory title")
sp.add_argument("--content", "-c", required=True, help="Memory content")
sp.add_argument("--tags", help="Comma-separated tags")
sp.add_argument(
"--importance", "-i", type=float, default=0.5, help="Importance 0.0-1.0"
)
sp.add_argument("--confidence", type=float, default=0.8, help="Confidence 0.0-1.0")
sp.add_argument(
"--episode",
action="store_true",
default=False,
help="Also log an episode entry",
)
# recall
sp = subparsers.add_parser("recall", help="Search memories by query")
sp.add_argument("query", help="Search query")
sp.add_argument("--types", help="Comma-separated memory types")
sp.add_argument("--limit", "-n", type=int, default=10, help="Max results")
sp.add_argument(
"--no-semantic",
action="store_true",
default=False,
help="Disable semantic search (keyword-only, faster)",
)
# get
sp = subparsers.add_parser("get", help="Get memory by ID")
sp.add_argument("memory_id", help="Memory UUID")
# relate
sp = subparsers.add_parser("relate", help="Create relationship")
sp.add_argument("from_id", help="Source memory UUID")
sp.add_argument("to_id", help="Target memory UUID")
sp.add_argument("rel_type", choices=VALID_RELATION_TYPES, help="Relationship type")
sp.add_argument("--strength", type=float, default=0.8, help="Strength 0.0-1.0")
sp.add_argument("--context", help="Context description")
sp.add_argument("--description", help="Rich edge description body")
# edge-get
sp = subparsers.add_parser("edge-get", help="Get edge by ID")
sp.add_argument("edge_id", help="Edge UUID")
# edge-search
sp = subparsers.add_parser("edge-search", help="Search edges")
sp.add_argument("--query", "-q", help="Text query")
sp.add_argument("--types", help="Comma-separated relation types")
sp.add_argument("--from-id", help="Filter by source memory ID")
sp.add_argument("--to-id", help="Filter by target memory ID")
sp.add_argument("--limit", "-n", type=int, default=20, help="Max results")
# edge-update
sp = subparsers.add_parser("edge-update", help="Update an edge")
sp.add_argument("edge_id", help="Edge UUID")
sp.add_argument("--description", help="New description body")
sp.add_argument("--strength", type=float, help="New strength 0.0-1.0")
# edge-delete
sp = subparsers.add_parser("edge-delete", help="Delete an edge")
sp.add_argument("edge_id", help="Edge UUID")
# search
sp = subparsers.add_parser("search", help="Filter memories")
sp.add_argument("--query", "-q", help="Text query")
sp.add_argument("--types", help="Comma-separated memory types")
sp.add_argument("--tags", help="Comma-separated tags")
sp.add_argument("--min-importance", type=float, help="Minimum importance")
sp.add_argument("--limit", "-n", type=int, default=20, help="Max results")
# update
sp = subparsers.add_parser("update", help="Update a memory")
sp.add_argument("memory_id", help="Memory UUID")
sp.add_argument("--title", help="New title")
sp.add_argument("--content", help="New content")
sp.add_argument("--tags", help="New tags (comma-separated)")
sp.add_argument("--importance", type=float, help="New importance")
# delete
sp = subparsers.add_parser("delete", help="Delete a memory")
sp.add_argument("memory_id", help="Memory UUID")
sp.add_argument("--force", "-f", action="store_true", help="Skip confirmation")
# related
sp = subparsers.add_parser("related", help="Get related memories")
sp.add_argument("memory_id", help="Memory UUID")
sp.add_argument("--types", help="Comma-separated relationship types")
sp.add_argument("--depth", type=int, default=1, help="Traversal depth 1-5")
# stats
subparsers.add_parser("stats", help="Show statistics")
# recent
sp = subparsers.add_parser("recent", help="Recently created memories")
sp.add_argument("--limit", "-n", type=int, default=20, help="Max results")
# decay
subparsers.add_parser("decay", help="Recalculate all decay scores")
# core
subparsers.add_parser("core", help="Generate CORE.md")
# episode
sp = subparsers.add_parser("episode", help="Log episode entry")
sp.add_argument("--type", "-t", required=True, help="Entry type")
sp.add_argument("--title", required=True, help="Entry title")
sp.add_argument("--tags", help="Comma-separated tags")
sp.add_argument("--summary", "-s", help="Summary text")
sp.add_argument("--memory-link", help="Path to related memory file")
# reindex
subparsers.add_parser("reindex", help="Rebuild index from files")
# embed
embed_parser = subparsers.add_parser(
"embed", help="Generate embeddings for all memories via Ollama"
)
embed_parser.add_argument(
"--if-changed",
action="store_true",
help="Skip if no memories were added or deleted since last embed",
)
# pin
sp = subparsers.add_parser("pin", help="Move memory to vault (never decays)")
sp.add_argument("memory_id", help="Memory UUID")
# reflect
sp = subparsers.add_parser(
"reflect", help="Review recent memories and identify clusters"
)
sp.add_argument("--since", help="ISO date (YYYY-MM-DD) to review from")
sp.add_argument(
"--dry-run", action="store_true", help="Preview without updating state"
)
# merge
sp = subparsers.add_parser(
"merge", help="Merge two memories (absorb one into another)"
)
sp.add_argument("keep_id", help="Memory UUID to keep")
sp.add_argument("absorb_id", help="Memory UUID to absorb and delete")
sp.add_argument(
"--dry-run", action="store_true", help="Preview merge without writing"
)
# reflection
subparsers.add_parser("reflection", help="Generate REFLECTION.md summary")
# tags
sp = subparsers.add_parser("tags", help="Tag analysis commands")
tags_sub = sp.add_subparsers(dest="tags_command")
sp2 = tags_sub.add_parser("list", help="List all tags with counts")
sp2.add_argument("--limit", "-n", type=int, default=0, help="Max results (0=all)")
sp3 = tags_sub.add_parser("related", help="Find co-occurring tags")
sp3.add_argument("tag", help="Tag to analyze")
sp3.add_argument("--limit", "-n", type=int, default=0, help="Max results (0=all)")
sp4 = tags_sub.add_parser("suggest", help="Suggest tags for a memory")
sp4.add_argument("memory_id", help="Memory UUID")
# procedure
sp = subparsers.add_parser(
"procedure", help="Store a procedure memory (convenience wrapper)"
)
sp.add_argument("--title", required=True, help="Procedure title")
sp.add_argument("--content", "-c", required=True, help="Procedure description")
sp.add_argument("--steps", help="Comma-separated ordered steps")
sp.add_argument("--preconditions", help="Comma-separated preconditions")
sp.add_argument("--postconditions", help="Comma-separated postconditions")
sp.add_argument("--tags", help="Comma-separated tags")
sp.add_argument(
"--importance", "-i", type=float, default=0.5, help="Importance 0.0-1.0"
)
# config
sp = subparsers.add_parser("config", help="Manage embedding config")
sp.add_argument("--show", action="store_true", help="Display current config")
sp.add_argument(
"--provider", choices=["ollama", "openai"], help="Set embedding provider"
)
sp.add_argument("--openai-key", help="Set OpenAI API key")
sp.add_argument(
"--ollama-model", help="Set Ollama model name (e.g. qwen3-embedding:8b)"
)
args = parser.parse_args()
if not args.command:
parser.print_help()
sys.exit(1)
client = CognitiveMemoryClient()
result = None
if args.command == "store":
tags = [t.strip() for t in args.tags.split(",")] if args.tags else None
memory_id = client.store(
type=args.type,
title=args.title,
content=args.content,
tags=tags,
importance=args.importance,
confidence=args.confidence,
)
result = {"success": True, "memory_id": memory_id}
if args.episode:
# Get the relative path from the index for memory_link
index = client._load_index()
entry = index.get("entries", {}).get(memory_id, {})
rel_path = entry.get("path", "")
# Truncate content at word boundary for summary (max 100 chars)
summary = args.content.strip()[:100]
if len(args.content.strip()) > 100:
last_space = summary.rfind(" ")
if last_space > 0:
summary = summary[:last_space]
client.episode(
type=args.type,
title=args.title,
tags=tags or [],
summary=summary,
memory_link=rel_path,
)
result["episode_logged"] = True
elif args.command == "recall":
types = [t.strip() for t in args.types.split(",")] if args.types else None
result = client.recall(
args.query,
memory_types=types,
limit=args.limit,
semantic=not args.no_semantic,
)
elif args.command == "get":
result = client.get(args.memory_id)
if not result:
result = {"error": "Memory not found"}
elif args.command == "relate":
edge_id = client.relate(
args.from_id,
args.to_id,
args.rel_type,
strength=args.strength,
context=args.context,
description=args.description,
)
result = {"success": bool(edge_id), "edge_id": edge_id}
elif args.command == "edge-get":
result = client.edge_get(args.edge_id)
if not result:
result = {"error": "Edge not found"}
elif args.command == "edge-search":
types = [t.strip() for t in args.types.split(",")] if args.types else None
result = client.edge_search(
query=args.query,
types=types,
from_id=getattr(args, "from_id", None),
to_id=getattr(args, "to_id", None),
limit=args.limit,
)
elif args.command == "edge-update":
success = client.edge_update(
args.edge_id,
description=args.description,
strength=args.strength,
)
result = {"success": success}
elif args.command == "edge-delete":
success = client.edge_delete(args.edge_id)
result = {"success": success}
elif args.command == "search":
types = [t.strip() for t in args.types.split(",")] if args.types else None
tags = [t.strip() for t in args.tags.split(",")] if args.tags else None
result = client.search(
query=args.query,
memory_types=types,
tags=tags,
min_importance=args.min_importance,
limit=args.limit,
)
elif args.command == "update":
tags = [t.strip() for t in args.tags.split(",")] if args.tags else None
success = client.update(
args.memory_id,
title=args.title,
content=args.content,
tags=tags,
importance=args.importance,
)
result = {"success": success}
elif args.command == "delete":
if not args.force:
mem = client.get(args.memory_id)
if mem:
print(f"Deleting: {mem.get('title')}", file=sys.stderr)
success = client.delete(args.memory_id)
result = {"success": success}
elif args.command == "related":
types = [t.strip() for t in args.types.split(",")] if args.types else None
result = client.related(args.memory_id, rel_types=types, max_depth=args.depth)
elif args.command == "stats":
result = client.stats()
elif args.command == "recent":
result = client.recent(limit=args.limit)
elif args.command == "decay":
result = client.decay()
elif args.command == "core":
content = client.core()
# Print path, not content (content is written to file)
result = {
"success": True,
"path": str(MEMORY_DIR / "CORE.md"),
"chars": len(content),
}
elif args.command == "episode":
tags = [t.strip() for t in args.tags.split(",")] if args.tags else None
client.episode(
type=args.type,
title=args.title,
tags=tags,
summary=args.summary,
memory_link=args.memory_link,
)
result = {"success": True}
elif args.command == "reindex":
count = client.reindex()
result = {"success": True, "indexed": count}
elif args.command == "embed":
if_changed = getattr(args, "if_changed", False)
if not if_changed:
print(
"Generating embeddings (this may take a while if model needs to be pulled)...",
file=sys.stderr,
)
result = client.embed(if_changed=if_changed)
elif args.command == "pin":
success = client.pin(args.memory_id)
result = {"success": success}
elif args.command == "reflect":
result = client.reflect(
since=args.since,
dry_run=args.dry_run,
)
elif args.command == "merge":
result = client.merge(
keep_id=args.keep_id,
absorb_id=args.absorb_id,
dry_run=args.dry_run,
)
elif args.command == "reflection":
content = client.reflection_summary()
result = {
"success": True,
"path": str(MEMORY_DIR / "REFLECTION.md"),
"chars": len(content),
}
elif args.command == "tags":
if args.tags_command == "list":
result = client.tags_list(limit=args.limit)
elif args.tags_command == "related":
result = client.tags_related(args.tag, limit=args.limit)
elif args.tags_command == "suggest":
result = client.tags_suggest(args.memory_id)
else:
# No subcommand given, print tags help
# Re-parse to get the tags subparser for help output
for action in parser._subparsers._actions:
if isinstance(action, argparse._SubParsersAction):
tags_parser = action.choices.get("tags")
if tags_parser:
tags_parser.print_help()
break
sys.exit(1)
elif args.command == "procedure":
tags = [t.strip() for t in args.tags.split(",")] if args.tags else None
steps = [s.strip() for s in args.steps.split(",")] if args.steps else None
preconditions = (
[p.strip() for p in args.preconditions.split(",")]
if args.preconditions
else None
)
postconditions = (
[p.strip() for p in args.postconditions.split(",")]
if args.postconditions
else None
)
memory_id = client.store(
type="procedure",
title=args.title,
content=args.content,
tags=tags,
importance=args.importance,
steps=steps,
preconditions=preconditions,
postconditions=postconditions,
)
result = {"success": True, "memory_id": memory_id}
elif args.command == "config":
config_path = MEMORY_DIR / "_config.json"
config = _load_memory_config(config_path)
changed = False
if args.provider:
config["embedding_provider"] = args.provider
changed = True
if args.openai_key:
config["openai_api_key"] = args.openai_key
changed = True
if args.ollama_model:
config["ollama_model"] = args.ollama_model
changed = True
if changed:
config_path.write_text(json.dumps(config, indent=2))
result = {"success": True, "updated": True}
elif args.show or not changed:
# Mask API key for display
display = dict(config)
key = display.get("openai_api_key")
if key and isinstance(key, str) and len(key) > 8:
display["openai_api_key"] = key[:4] + "..." + key[-4:]
result = display
print(json.dumps(result, indent=2, default=str))
if __name__ == "__main__":
main()

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,521 @@
"""
Cognitive Memory - Common Constants & Helpers
Module-level constants, YAML parsing, slug generation, decay calculation,
embedding helpers, and cosine similarity. Shared by all other modules.
"""
import json
import math
import re
import urllib.request
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from urllib.error import URLError
# =============================================================================
# CONSTANTS
# =============================================================================
MEMORY_DIR = Path.home() / ".claude" / "memory"
INDEX_PATH = MEMORY_DIR / "_index.json"
STATE_PATH = MEMORY_DIR / "_state.json"
EMBEDDINGS_PATH = MEMORY_DIR / "_embeddings.json"
OLLAMA_URL = "http://localhost:11434"
EMBEDDING_MODEL = "nomic-embed-text"
EMBEDDING_TIMEOUT = 5 # seconds
CONFIG_PATH = MEMORY_DIR / "_config.json"
OPENAI_EMBED_URL = "https://api.openai.com/v1/embeddings"
OPENAI_MODEL_DEFAULT = "text-embedding-3-small"
# Memory type -> directory name mapping
TYPE_DIRS = {
"solution": "solutions",
"fix": "fixes",
"decision": "decisions",
"configuration": "configurations",
"problem": "problems",
"workflow": "workflows",
"code_pattern": "code-patterns",
"error": "errors",
"general": "general",
"procedure": "procedures",
"insight": "insights",
}
VALID_TYPES = list(TYPE_DIRS.keys())
# Decay model type weights
TYPE_WEIGHTS = {
"decision": 1.3,
"solution": 1.2,
"insight": 1.25,
"code_pattern": 1.1,
"configuration": 1.1,
"fix": 1.0,
"workflow": 1.0,
"problem": 0.9,
"error": 0.8,
"general": 0.8,
"procedure": 1.4,
}
DECAY_LAMBDA = 0.03 # Half-life ~23 days
# Decay score thresholds
THRESHOLD_ACTIVE = 0.5
THRESHOLD_FADING = 0.2
THRESHOLD_DORMANT = 0.05
# Relationship types (subset from MemoryGraph, focused on most useful)
VALID_RELATION_TYPES = [
"SOLVES",
"CAUSES",
"BUILDS_ON",
"ALTERNATIVE_TO",
"REQUIRES",
"FOLLOWS",
"RELATED_TO",
]
# Edge file constants
EDGES_DIR_NAME = "edges"
EDGE_FIELD_ORDER = [
"id",
"type",
"from_id",
"from_title",
"to_id",
"to_title",
"strength",
"created",
"updated",
]
# Frontmatter field order for consistent output
FIELD_ORDER = [
"id",
"type",
"title",
"tags",
"importance",
"confidence",
"steps",
"preconditions",
"postconditions",
"created",
"updated",
"relations",
]
# CORE.md token budget (approximate, 1 token ~= 4 chars)
CORE_MAX_CHARS = 12000 # ~3K tokens
# =============================================================================
# YAML FRONTMATTER PARSING (stdlib only)
# =============================================================================
def _needs_quoting(s: str) -> bool:
"""Check if a YAML string value needs quoting."""
if not s:
return True
if any(c in s for c in ":#{}[]&*?|>!%@`"):
return True
try:
float(s)
return True
except ValueError:
pass
if s.lower() in ("true", "false", "null", "yes", "no", "on", "off"):
return True
return False
def _quote_yaml(s: str) -> str:
"""Quote a string for YAML, escaping internal quotes."""
escaped = s.replace("\\", "\\\\").replace('"', '\\"')
return f'"{escaped}"'
def _format_yaml_value(value: Any, force_quote: bool = False) -> str:
"""Format a Python value for YAML output."""
if value is None:
return "null"
if isinstance(value, bool):
return "true" if value else "false"
if isinstance(value, (int, float)):
return str(value)
s = str(value)
if force_quote or _needs_quoting(s):
return _quote_yaml(s)
return s
def _parse_scalar(value: str) -> Any:
"""Parse a YAML scalar value to Python type."""
v = value.strip()
if not v or v == "null":
return None
if v == "true":
return True
if v == "false":
return False
# Try numeric
try:
if "." in v:
return float(v)
return int(v)
except ValueError:
pass
# Strip quotes
if (v.startswith('"') and v.endswith('"')) or (
v.startswith("'") and v.endswith("'")
):
return v[1:-1]
return v
def serialize_frontmatter(data: Dict[str, Any]) -> str:
"""Serialize a dict to YAML frontmatter string (between --- markers)."""
lines = ["---"]
for key in FIELD_ORDER:
if key not in data:
continue
value = data[key]
if key == "tags" and isinstance(value, list):
if value:
items = ", ".join(_format_yaml_value(t) for t in value)
lines.append(f"tags: [{items}]")
else:
lines.append("tags: []")
elif key in ("steps", "preconditions", "postconditions") and isinstance(
value, list
):
if not value:
continue
lines.append(f"{key}:")
for item in value:
lines.append(f" - {_format_yaml_value(str(item), force_quote=True)}")
elif key == "relations" and isinstance(value, list):
if not value:
continue
lines.append("relations:")
for rel in value:
first = True
for rk in [
"target",
"type",
"direction",
"strength",
"context",
"edge_id",
]:
if rk not in rel:
continue
rv = rel[rk]
prefix = " - " if first else " "
force_q = rk in ("context",)
lines.append(
f"{prefix}{rk}: {_format_yaml_value(rv, force_quote=force_q)}"
)
first = False
elif key == "title":
lines.append(f"title: {_format_yaml_value(value, force_quote=True)}")
else:
lines.append(f"{key}: {_format_yaml_value(value)}")
lines.append("---")
return "\n".join(lines)
def parse_frontmatter(text: str) -> Tuple[Dict[str, Any], str]:
"""Parse YAML frontmatter and body from markdown text.
Returns (frontmatter_dict, body_text).
"""
if not text.startswith("---\n"):
return {}, text
# Find closing ---
end_match = re.search(r"\n---\s*\n", text[3:])
if not end_match:
# Try end of string
if text.rstrip().endswith("---"):
end_pos = text.rstrip().rfind("\n---")
if end_pos <= 3:
return {}, text
fm_text = text[4:end_pos]
body = ""
else:
return {}, text
else:
end_pos = end_match.start() + 3 # Offset from text[3:]
fm_text = text[4:end_pos]
body = text[end_pos + end_match.end() - end_match.start() :]
body = body.lstrip("\n")
data = {}
lines = fm_text.split("\n")
i = 0
while i < len(lines):
line = lines[i]
# Skip empty lines
if not line.strip():
i += 1
continue
# Must be a top-level key (no leading whitespace)
if line[0] == " ":
i += 1
continue
if ":" not in line:
i += 1
continue
key, _, rest = line.partition(":")
key = key.strip()
rest = rest.strip()
if not rest:
# Block value - collect indented lines
block_lines = []
j = i + 1
while j < len(lines) and lines[j] and lines[j][0] == " ":
block_lines.append(lines[j])
j += 1
if key == "relations":
data["relations"] = _parse_relations_block(block_lines)
elif block_lines and block_lines[0].strip().startswith("- "):
# Simple list
data[key] = [
_parse_scalar(bl.strip().lstrip("- "))
for bl in block_lines
if bl.strip().startswith("- ")
]
else:
data[key] = None
i = j
continue
# Inline list: [a, b, c]
if rest.startswith("[") and rest.endswith("]"):
inner = rest[1:-1]
if inner.strip():
data[key] = [
_parse_scalar(v.strip()) for v in inner.split(",") if v.strip()
]
else:
data[key] = []
else:
data[key] = _parse_scalar(rest)
i += 1
return data, body
def _parse_relations_block(lines: List[str]) -> List[Dict[str, Any]]:
"""Parse a YAML block list of relation dicts."""
relations = []
current = None
for line in lines:
stripped = line.strip()
if not stripped:
continue
if stripped.startswith("- "):
# New relation entry
current = {}
relations.append(current)
# Parse key:value on same line as -
rest = stripped[2:]
if ":" in rest:
k, _, v = rest.partition(":")
current[k.strip()] = _parse_scalar(v.strip())
elif current is not None and ":" in stripped:
k, _, v = stripped.partition(":")
current[k.strip()] = _parse_scalar(v.strip())
return relations
# =============================================================================
# HELPER FUNCTIONS
# =============================================================================
def slugify(text: str, max_length: int = 60) -> str:
"""Convert text to a URL-friendly slug."""
text = text.lower().strip()
text = re.sub(r"[^\w\s-]", "", text)
text = re.sub(r"[\s_]+", "-", text)
text = re.sub(r"-+", "-", text)
text = text.strip("-")
if len(text) > max_length:
text = text[:max_length].rstrip("-")
return text or "untitled"
def make_filename(title: str, memory_id: str) -> str:
"""Create a filename from title and UUID suffix."""
slug = slugify(title)
suffix = memory_id[:6]
return f"{slug}-{suffix}.md"
def calculate_decay_score(
importance: float, days_since_access: float, access_count: int, type_weight: float
) -> float:
"""Calculate decay score for a memory.
decay_score = importance * e^(-lambda * days) * log2(access_count + 1) * type_weight
"""
time_factor = math.exp(-DECAY_LAMBDA * days_since_access)
usage_factor = math.log2(access_count + 1) if access_count > 0 else 1.0
return importance * time_factor * usage_factor * type_weight
def _ollama_embed(
texts: List[str],
model: str = EMBEDDING_MODEL,
timeout: int = EMBEDDING_TIMEOUT,
) -> Optional[List[List[float]]]:
"""Get embeddings from Ollama for a list of texts.
Returns list of embedding vectors, or None if Ollama is unavailable.
"""
try:
payload = json.dumps({"model": model, "input": texts}).encode("utf-8")
req = urllib.request.Request(
f"{OLLAMA_URL}/api/embed",
data=payload,
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(req, timeout=timeout) as resp:
if resp.status != 200:
return None
data = json.loads(resp.read().decode("utf-8"))
embeddings = data.get("embeddings")
if embeddings and isinstance(embeddings, list):
return embeddings
return None
except (
ConnectionRefusedError,
URLError,
TimeoutError,
OSError,
json.JSONDecodeError,
ValueError,
KeyError,
):
return None
def _load_memory_config(config_path: Optional[Path] = None) -> Dict[str, Any]:
"""Read _config.json, return defaults if missing."""
path = config_path or CONFIG_PATH
defaults = {
"embedding_provider": "ollama",
"openai_api_key": None,
"ollama_model": EMBEDDING_MODEL,
"openai_model": OPENAI_MODEL_DEFAULT,
}
if path.exists():
try:
data = json.loads(path.read_text())
for k, v in defaults.items():
data.setdefault(k, v)
return data
except (json.JSONDecodeError, OSError):
pass
return defaults
def _openai_embed(
texts: List[str],
api_key: str,
model: str = OPENAI_MODEL_DEFAULT,
timeout: int = 30,
) -> Optional[List[List[float]]]:
"""Get embeddings from OpenAI API (stdlib-only, same interface as _ollama_embed)."""
try:
payload = json.dumps({"input": texts, "model": model}).encode("utf-8")
req = urllib.request.Request(
OPENAI_EMBED_URL,
data=payload,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}",
},
method="POST",
)
with urllib.request.urlopen(req, timeout=timeout) as resp:
if resp.status != 200:
return None
data = json.loads(resp.read().decode("utf-8"))
items = data.get("data", [])
if items and isinstance(items, list):
# Sort by index to ensure order matches input
items.sort(key=lambda x: x.get("index", 0))
return [item["embedding"] for item in items]
return None
except (
ConnectionRefusedError,
URLError,
TimeoutError,
OSError,
json.JSONDecodeError,
ValueError,
KeyError,
):
return None
def _cosine_similarity(a: List[float], b: List[float]) -> float:
"""Compute cosine similarity between two vectors."""
dot = sum(x * y for x, y in zip(a, b))
norm_a = math.sqrt(sum(x * x for x in a))
norm_b = math.sqrt(sum(x * x for x in b))
if norm_a == 0.0 or norm_b == 0.0:
return 0.0
return dot / (norm_a * norm_b)
def _make_edge_filename(
from_title: str, rel_type: str, to_title: str, edge_id: str
) -> str:
"""Produce edge filename: {from-slug}--{TYPE}--{to-slug}-{6char}.md"""
from_slug = slugify(from_title, max_length=30)
to_slug = slugify(to_title, max_length=30)
suffix = edge_id[:6]
return f"{from_slug}--{rel_type}--{to_slug}-{suffix}.md"
def serialize_edge_frontmatter(data: Dict[str, Any]) -> str:
"""Serialize an edge dict to YAML frontmatter string."""
lines = ["---"]
for key in EDGE_FIELD_ORDER:
if key not in data:
continue
value = data[key]
if key in ("from_title", "to_title"):
lines.append(f"{key}: {_format_yaml_value(value, force_quote=True)}")
else:
lines.append(f"{key}: {_format_yaml_value(value)}")
lines.append("---")
return "\n".join(lines)

View File

@ -0,0 +1,297 @@
"""EdgesMixin: edge/relationship operations for CognitiveMemoryClient.
This module is part of the client.py mixin refactor. EdgesMixin provides all
edge CRUD and search operations. It relies on helper methods resolved at runtime
via MRO from the composed CognitiveMemoryClient class.
"""
import uuid
import subprocess
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, List, Dict, Any
from common import (
VALID_RELATION_TYPES,
EDGES_DIR_NAME,
_make_edge_filename,
serialize_edge_frontmatter,
)
class EdgesMixin:
"""Mixin providing edge/relationship operations for CognitiveMemoryClient."""
def relate(
self,
from_id: str,
to_id: str,
rel_type: str,
strength: float = 0.8,
context: Optional[str] = None,
description: Optional[str] = None,
) -> str:
"""Create a relationship between two memories with an edge file.
Returns edge_id string, or empty string if duplicate.
"""
if rel_type not in VALID_RELATION_TYPES:
raise ValueError(
f"Invalid relation type: {rel_type}. Valid: {VALID_RELATION_TYPES}"
)
from_path = self._resolve_memory_path(from_id)
to_path = self._resolve_memory_path(to_id)
if not from_path or not to_path:
raise ValueError(f"Memory not found: {from_id if not from_path else to_id}")
# Read source memory
fm, body = self._read_memory_file(from_path)
relations = fm.get("relations", [])
# Check for duplicate
for r in relations:
if r.get("target") == to_id and r.get("type") == rel_type:
return "" # Already exists
# Read target memory for title
to_fm, to_body = self._read_memory_file(to_path)
# Create edge file
edge_id = str(uuid.uuid4())
now = datetime.now(timezone.utc).isoformat()
from_title = fm.get("title", from_id[:8])
to_title = to_fm.get("title", to_id[:8])
clamped_strength = max(0.0, min(1.0, strength))
edge_data = {
"id": edge_id,
"type": rel_type,
"from_id": from_id,
"from_title": from_title,
"to_id": to_id,
"to_title": to_title,
"strength": clamped_strength,
"created": now,
"updated": now,
}
edge_filename = _make_edge_filename(from_title, rel_type, to_title, edge_id)
edge_path = self.memory_dir / "graph" / EDGES_DIR_NAME / edge_filename
edge_fm_str = serialize_edge_frontmatter(edge_data)
edge_body = description.strip() if description else ""
edge_content = (
f"{edge_fm_str}\n\n{edge_body}\n" if edge_body else f"{edge_fm_str}\n"
)
edge_path.write_text(edge_content, encoding="utf-8")
# Update source memory frontmatter with edge_id
new_rel = {
"target": to_id,
"type": rel_type,
"direction": "outgoing",
"strength": clamped_strength,
"edge_id": edge_id,
}
if context:
new_rel["context"] = context
relations.append(new_rel)
fm["relations"] = relations
fm["updated"] = now
self._write_memory_file(from_path, fm, body)
# Add incoming relation to target with edge_id
to_relations = to_fm.get("relations", [])
has_incoming = any(
r.get("target") == from_id
and r.get("type") == rel_type
and r.get("direction") == "incoming"
for r in to_relations
)
if not has_incoming:
incoming_rel = {
"target": from_id,
"type": rel_type,
"direction": "incoming",
"strength": clamped_strength,
"edge_id": edge_id,
}
if context:
incoming_rel["context"] = context
to_relations.append(incoming_rel)
to_fm["relations"] = to_relations
to_fm["updated"] = now
self._write_memory_file(to_path, to_fm, to_body)
# Update memory index
rel_from = str(from_path.relative_to(self.memory_dir))
rel_to = str(to_path.relative_to(self.memory_dir))
self._update_index_entry(from_id, fm, rel_from)
self._update_index_entry(to_id, to_fm, rel_to)
# Update edge index
self._update_edge_index(
edge_id, edge_data, f"graph/{EDGES_DIR_NAME}/{edge_filename}"
)
self._git_commit(
f"relate: {from_id[:8]} --{rel_type}--> {to_id[:8]}",
[from_path, to_path, edge_path],
)
return edge_id
def edge_get(self, edge_id: str) -> Optional[Dict[str, Any]]:
"""Read full edge file (frontmatter + body)."""
path = self._resolve_edge_path(edge_id)
if not path:
return None
fm, body = self._read_memory_file(path)
return {
"id": fm.get("id", edge_id),
"type": fm.get("type", ""),
"from_id": fm.get("from_id", ""),
"from_title": fm.get("from_title", ""),
"to_id": fm.get("to_id", ""),
"to_title": fm.get("to_title", ""),
"strength": fm.get("strength", 0.8),
"created": fm.get("created", ""),
"updated": fm.get("updated", ""),
"description": body.strip(),
"path": str(path.relative_to(self.memory_dir)),
}
def edge_search(
self,
query: Optional[str] = None,
types: Optional[List[str]] = None,
from_id: Optional[str] = None,
to_id: Optional[str] = None,
limit: int = 20,
) -> List[Dict[str, Any]]:
"""Search edges via index."""
index = self._load_index()
results = []
query_lower = query.lower().strip() if query else ""
# Resolve partial IDs to full UUIDs via prefix match
if from_id:
entries = index.get("entries", {})
resolved = self._resolve_prefix(from_id, entries)
from_id = resolved or from_id
if to_id:
entries = index.get("entries", {})
resolved = self._resolve_prefix(to_id, entries)
to_id = resolved or to_id
for eid, entry in index.get("edges", {}).items():
if types and entry.get("type") not in types:
continue
if from_id and entry.get("from_id") != from_id:
continue
if to_id and entry.get("to_id") != to_id:
continue
if query_lower:
searchable = f"{entry.get('from_title', '')} {entry.get('to_title', '')} {entry.get('type', '')}".lower()
if query_lower not in searchable:
continue
results.append({"id": eid, **entry})
results.sort(key=lambda x: x.get("created", ""), reverse=True)
return results[:limit]
def edge_update(
self,
edge_id: str,
description: Optional[str] = None,
strength: Optional[float] = None,
) -> bool:
"""Update edge body/metadata, sync strength to memory frontmatter."""
path = self._resolve_edge_path(edge_id)
if not path:
return False
fm, body = self._read_memory_file(path)
now = datetime.now(timezone.utc).isoformat()
if description is not None:
body = description
if strength is not None:
fm["strength"] = max(0.0, min(1.0, strength))
fm["updated"] = now
# Write edge file
edge_fm_str = serialize_edge_frontmatter(fm)
edge_body = body.strip() if body else ""
edge_content = (
f"{edge_fm_str}\n\n{edge_body}\n" if edge_body else f"{edge_fm_str}\n"
)
path.write_text(edge_content, encoding="utf-8")
# Sync strength to memory frontmatter if changed
if strength is not None:
for mid_key in ("from_id", "to_id"):
mid = fm.get(mid_key)
if not mid:
continue
mem_path = self._resolve_memory_path(mid)
if not mem_path:
continue
mem_fm, mem_body = self._read_memory_file(mem_path)
for rel in mem_fm.get("relations", []):
if rel.get("edge_id") == edge_id:
rel["strength"] = fm["strength"]
mem_fm["updated"] = now
self._write_memory_file(mem_path, mem_fm, mem_body)
# Update edge index
rel_path = str(path.relative_to(self.memory_dir))
self._update_edge_index(edge_id, fm, rel_path)
self._git_commit(f"edge-update: {edge_id[:8]}", [path])
return True
def edge_delete(self, edge_id: str) -> bool:
"""Remove edge file and clean frontmatter refs from both memories."""
path = self._resolve_edge_path(edge_id)
if not path:
return False
fm, _ = self._read_memory_file(path)
now = datetime.now(timezone.utc).isoformat()
files_to_commit: List[Path] = []
# Clean edge_id references from both memories
for mid_key in ("from_id", "to_id"):
mid = fm.get(mid_key)
if not mid:
continue
mem_path = self._resolve_memory_path(mid)
if not mem_path:
continue
mem_fm, mem_body = self._read_memory_file(mem_path)
original_rels = mem_fm.get("relations", [])
mem_fm["relations"] = [
r for r in original_rels if r.get("edge_id") != edge_id
]
if len(mem_fm["relations"]) != len(original_rels):
mem_fm["updated"] = now
self._write_memory_file(mem_path, mem_fm, mem_body)
rel_p = str(mem_path.relative_to(self.memory_dir))
self._update_index_entry(mid, mem_fm, rel_p)
files_to_commit.append(mem_path)
# Remove edge file
path.unlink()
self._remove_edge_index(edge_id)
# Git stage deletion
try:
rel_path = path.relative_to(self.memory_dir)
subprocess.run(
["git", "rm", "--cached", str(rel_path)],
cwd=str(self.memory_dir),
capture_output=True,
timeout=5,
)
except Exception:
pass
self._git_commit(f"edge-delete: {edge_id[:8]}")
return True

View File

@ -0,0 +1,233 @@
"""EmbeddingsMixin for CognitiveMemoryClient.
Provides embedding generation and semantic search capabilities. Extracted from
client.py as part of the mixin refactor. Methods rely on shared state (memory_dir,
_load_index, _load_embeddings_cached) provided by the base class via MRO.
"""
import json
import sys
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Tuple
from common import (
EMBEDDING_MODEL,
EMBEDDINGS_PATH,
OPENAI_MODEL_DEFAULT,
_cosine_similarity,
_load_memory_config,
_ollama_embed,
_openai_embed,
)
class EmbeddingsMixin:
"""Mixin providing embedding generation and semantic recall for memory clients."""
def _get_embedding_provider(self) -> Dict[str, Any]:
"""Load embedding config from _config.json."""
return _load_memory_config(self.memory_dir / "_config.json")
def _embed_texts_with_fallback(
self,
texts: List[str],
timeout: int = 300,
) -> Tuple[Optional[List[List[float]]], str, str]:
"""Embed texts with fallback chain. Returns (vectors, provider_used, model_used)."""
config = self._get_embedding_provider()
provider = config.get("embedding_provider", "ollama")
# Try configured provider first
if provider == "openai":
api_key = config.get("openai_api_key")
model = config.get("openai_model", OPENAI_MODEL_DEFAULT)
if api_key:
vectors = _openai_embed(texts, api_key, model, timeout=timeout)
if vectors is not None:
return vectors, "openai", model
# Fallback to ollama
ollama_model = config.get("ollama_model", EMBEDDING_MODEL)
vectors = _ollama_embed(texts, model=ollama_model, timeout=timeout)
if vectors is not None:
return vectors, "ollama", ollama_model
else:
# ollama first
ollama_model = config.get("ollama_model", EMBEDDING_MODEL)
vectors = _ollama_embed(texts, model=ollama_model, timeout=timeout)
if vectors is not None:
return vectors, "ollama", ollama_model
# Fallback to openai
api_key = config.get("openai_api_key")
model = config.get("openai_model", OPENAI_MODEL_DEFAULT)
if api_key:
vectors = _openai_embed(texts, api_key, model, timeout=timeout)
if vectors is not None:
return vectors, "openai", model
return None, "", ""
def embed(self, if_changed: bool = False) -> Dict[str, Any]:
"""Generate embeddings for all memories using configured provider.
Detects provider changes and re-embeds everything (dimension mismatch safety).
Stores vectors in _embeddings.json (not git-tracked).
Args:
if_changed: If True, skip embedding if the set of memory IDs hasn't
changed since last run (no new/deleted memories).
"""
index = self._load_index()
entries = index.get("entries", {})
if not entries:
return {
"embedded": 0,
"provider": "none",
"model": "",
"path": str(EMBEDDINGS_PATH),
}
# Check for provider change
embeddings_path = self.memory_dir / "_embeddings.json"
old_provider = ""
if embeddings_path.exists():
try:
old_data = json.loads(embeddings_path.read_text())
old_provider = old_data.get("provider", "ollama")
except (json.JSONDecodeError, OSError):
pass
config = self._get_embedding_provider()
new_provider = config.get("embedding_provider", "ollama")
provider_changed = old_provider and old_provider != new_provider
if provider_changed:
print(
f"Provider changed ({old_provider} -> {new_provider}), re-embedding all memories...",
file=sys.stderr,
)
# Skip if nothing changed (unless provider switched)
if if_changed and not provider_changed and embeddings_path.exists():
try:
old_data = json.loads(embeddings_path.read_text())
embedded_ids = set(old_data.get("entries", {}).keys())
index_ids = set(entries.keys())
if embedded_ids == index_ids:
return {
"embedded": 0,
"skipped": True,
"reason": "no new or deleted memories",
"path": str(embeddings_path),
}
except (json.JSONDecodeError, OSError):
pass # Can't read old data, re-embed
# Build texts to embed
memory_ids = list(entries.keys())
texts = []
for mid in memory_ids:
entry = entries[mid]
title = entry.get("title", "")
preview = entry.get("content_preview", "")
texts.append(f"{title}. {preview}")
# Batch embed in groups of 50
all_embeddings: Dict[str, List[float]] = {}
batch_size = 50
provider_used = ""
model_used = ""
for i in range(0, len(texts), batch_size):
batch_texts = texts[i : i + batch_size]
batch_ids = memory_ids[i : i + batch_size]
vectors, provider_used, model_used = self._embed_texts_with_fallback(
batch_texts,
timeout=300,
)
if vectors is None:
return {
"error": "All embedding providers unavailable",
"embedded": len(all_embeddings),
}
for mid, vec in zip(batch_ids, vectors):
all_embeddings[mid] = vec
# Write embeddings file with provider info
embeddings_data = {
"provider": provider_used,
"model": model_used,
"updated": datetime.now(timezone.utc).isoformat(),
"entries": all_embeddings,
}
embeddings_path.write_text(json.dumps(embeddings_data, default=str))
return {
"embedded": len(all_embeddings),
"provider": provider_used,
"model": model_used,
"path": str(embeddings_path),
}
def semantic_recall(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
"""Search memories by semantic similarity using embeddings.
Uses the same provider that generated stored embeddings to embed the query.
Skips vectors with dimension mismatch as safety guard.
"""
emb_data = self._load_embeddings_cached()
if emb_data is None:
return []
stored = emb_data.get("entries", {})
if not stored:
return []
# Embed query with matching provider
stored_provider = emb_data.get("provider", "ollama")
config = self._get_embedding_provider()
query_vec = None
if stored_provider == "openai":
api_key = config.get("openai_api_key")
model = emb_data.get("model", OPENAI_MODEL_DEFAULT)
if api_key:
vecs = _openai_embed([query], api_key, model)
if vecs:
query_vec = vecs[0]
if query_vec is None and stored_provider == "ollama":
stored_model = emb_data.get("model", EMBEDDING_MODEL)
vecs = _ollama_embed([query], model=stored_model)
if vecs:
query_vec = vecs[0]
# Last resort: try any available provider
if query_vec is None:
vecs, _, _ = self._embed_texts_with_fallback([query], timeout=30)
if vecs:
query_vec = vecs[0]
if query_vec is None:
return []
query_dim = len(query_vec)
# Score all memories by cosine similarity
index = self._load_index()
scored = []
for mid, vec in stored.items():
# Skip dimension mismatch
if len(vec) != query_dim:
continue
sim = _cosine_similarity(query_vec, vec)
entry = index.get("entries", {}).get(mid)
if entry:
scored.append(
{
"id": mid,
"title": entry.get("title", ""),
"type": entry.get("type", "general"),
"tags": entry.get("tags", []),
"similarity": round(sim, 4),
"path": entry.get("path", ""),
}
)
scored.sort(key=lambda x: x["similarity"], reverse=True)
return scored[:limit]