diff --git a/skills/cognitive-memory/analysis.py b/skills/cognitive-memory/analysis.py new file mode 100644 index 0000000..97e1712 --- /dev/null +++ b/skills/cognitive-memory/analysis.py @@ -0,0 +1,861 @@ +""" +Cognitive Memory - Analysis Mixin + +Decay scoring, CORE.md generation, tag analysis, reflection clustering, +memory merging, and REFLECTION.md generation. +""" + +import json +import subprocess +import sys +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +from common import ( + CORE_MAX_CHARS, + MEMORY_DIR, + THRESHOLD_ACTIVE, + THRESHOLD_DORMANT, + THRESHOLD_FADING, + TYPE_WEIGHTS, + calculate_decay_score, +) + + +class AnalysisMixin: + """Mixin providing analysis operations for CognitiveMemoryClient.""" + + def decay(self) -> Dict[str, Any]: + """Recalculate all decay scores. Updates _state.json.""" + index = self._load_index() + state = self._load_state() + now = datetime.now(timezone.utc) + updated_count = 0 + + for mid, entry in index.get("entries", {}).items(): + s = state.setdefault("entries", {}).setdefault( + mid, + { + "access_count": 0, + "last_accessed": entry.get("created", now.isoformat()), + "decay_score": 0.5, + }, + ) + + # Calculate days since last access + last_str = s.get("last_accessed", entry.get("created", "")) + try: + last_dt = datetime.fromisoformat(last_str.replace("Z", "+00:00")) + if last_dt.tzinfo is None: + last_dt = last_dt.replace(tzinfo=timezone.utc) + days = (now - last_dt).total_seconds() / 86400 + except (ValueError, AttributeError): + days = 30 # Default to 30 days if unparseable + + importance = entry.get("importance", 0.5) + mem_type = entry.get("type", "general") + type_weight = TYPE_WEIGHTS.get(mem_type, 1.0) + access_count = s.get("access_count", 0) + + # Check if pinned (vault) + path = entry.get("path", "") + if path.startswith("vault/"): + s["decay_score"] = 999.0 # Pinned memories never decay + else: + s["decay_score"] = round( + calculate_decay_score(importance, days, access_count, type_weight), + 4, + ) + + updated_count += 1 + + self._save_state(state) + + # Summary + state_entries = state.get("entries", {}) + return { + "updated": updated_count, + "active": sum( + 1 + for s in state_entries.values() + if s.get("decay_score", 0) >= THRESHOLD_ACTIVE + ), + "fading": sum( + 1 + for s in state_entries.values() + if THRESHOLD_FADING <= s.get("decay_score", 0) < THRESHOLD_ACTIVE + ), + "dormant": sum( + 1 + for s in state_entries.values() + if THRESHOLD_DORMANT <= s.get("decay_score", 0) < THRESHOLD_FADING + ), + "archived": sum( + 1 + for s in state_entries.values() + if 0 < s.get("decay_score", 0) < THRESHOLD_DORMANT + ), + } + + def core(self) -> str: + """Generate CORE.md from top memories by decay score.""" + index = self._load_index() + state = self._load_state() + + def _extract_summary(body: str, max_len: int = 80) -> str: + """Extract first meaningful sentence from memory body.""" + if not body or not body.strip(): + return "" + # Skip leading code blocks, headings, and list markers + for ln in body.strip().split("\n"): + stripped = ln.strip() + if not stripped: + continue + if stripped.startswith("```") or stripped.startswith("#"): + continue + first_line = stripped.lstrip("- *>").strip() + break + else: + return "" + if not first_line: + return "" + # Extract first sentence (split on '. ') + dot_pos = first_line.find(". ") + if 0 < dot_pos < max_len: + sentence = first_line[: dot_pos + 1] + else: + sentence = first_line + if len(sentence) > max_len: + sentence = sentence[: max_len - 3].rstrip() + "..." + return sentence + + # Collect all memories with decay scores + memories = [] + for mid, entry in index.get("entries", {}).items(): + s = state.get("entries", {}).get(mid, {}) + decay = s.get("decay_score", 0.5) + if decay < THRESHOLD_FADING: + continue # Skip low-relevance + memories.append( + { + "id": mid, + "title": entry.get("title", ""), + "type": entry.get("type", "general"), + "path": entry.get("path", ""), + "importance": entry.get("importance", 0.5), + "decay_score": decay, + "tags": entry.get("tags", []), + } + ) + + # Sort by decay score descending + memories.sort(key=lambda x: x["decay_score"], reverse=True) + + # Group by type category + categories = { + "Critical Solutions": ["solution"], + "Active Decisions": ["decision"], + "Key Fixes": ["fix"], + "Configurations": ["configuration"], + "Key Procedures": ["procedure"], + "Patterns & Workflows": ["code_pattern", "workflow"], + "Known Issues": ["problem", "error"], + "Insights": ["insight"], + "General": ["general"], + } + + now_str = datetime.now(timezone.utc).strftime("%Y-%m-%d") + active_count = sum(1 for m in memories if m["decay_score"] >= THRESHOLD_ACTIVE) + total_count = len(index.get("entries", {})) + + lines = [ + "# Memory Core (auto-generated)", + f"> Last updated: {now_str} | Active memories: {active_count}/{total_count} | Next refresh: daily (systemd timer)", + "", + ] + + char_count = sum(len(l) for l in lines) + + for cat_name, cat_types in categories.items(): + cat_memories = [m for m in memories if m["type"] in cat_types] + if not cat_memories: + continue + + section_lines = [f"## {cat_name}", ""] + for mem in cat_memories[:10]: # Cap per section (reduced for summaries) + path = mem["path"] + title = mem["title"] + tags = ", ".join(mem["tags"][:3]) if mem["tags"] else "" + + # Read body to extract summary + summary = "" + mem_path = self.memory_dir / path + if mem_path.exists(): + try: + _, body = self._read_memory_file(mem_path) + summary = _extract_summary(body) + except Exception: + pass + + line = f"- [{title}]({path})" + if summary: + line += f" - {summary}" + if tags: + line += f" ({tags})" + section_lines.append(line) + + # Check budget + added = sum(len(l) for l in section_lines) + 2 + if char_count + added > CORE_MAX_CHARS: + break + + section_lines.append("") + section_text = "\n".join(section_lines) + if char_count + len(section_text) > CORE_MAX_CHARS: + break + lines.extend(section_lines) + char_count += len(section_text) + + core_content = "\n".join(lines) + core_path = self.memory_dir / "CORE.md" + core_path.write_text(core_content, encoding="utf-8") + self._git_commit("core: regenerate CORE.md", [core_path]) + + return core_content + + def tags_list(self, limit: int = 0) -> List[Dict[str, Any]]: + """List all tags with occurrence counts across all memories. + + Returns list of {"tag": str, "count": int} sorted by count descending. + """ + index = self._load_index() + tag_counts: Dict[str, int] = {} + + for entry in index.get("entries", {}).values(): + for tag in entry.get("tags", []): + normalized = tag.lower().strip() + if normalized: + tag_counts[normalized] = tag_counts.get(normalized, 0) + 1 + + results = [{"tag": tag, "count": count} for tag, count in tag_counts.items()] + results.sort(key=lambda x: x["count"], reverse=True) + + if limit > 0: + results = results[:limit] + return results + + def tags_related(self, tag: str, limit: int = 0) -> List[Dict[str, Any]]: + """Find tags that co-occur with the given tag. + + Returns list of {"tag": str, "co_occurrences": int, "memories_with_both": int} + sorted by co_occurrences descending. + """ + tag = tag.lower().strip() + index = self._load_index() + co_counts: Dict[str, int] = {} + + for entry in index.get("entries", {}).values(): + entry_tags = [t.lower().strip() for t in entry.get("tags", [])] + if tag not in entry_tags: + continue + for other_tag in entry_tags: + if other_tag != tag and other_tag: + co_counts[other_tag] = co_counts.get(other_tag, 0) + 1 + + results = [ + {"tag": t, "co_occurrences": count, "memories_with_both": count} + for t, count in co_counts.items() + ] + results.sort(key=lambda x: x["co_occurrences"], reverse=True) + + if limit > 0: + results = results[:limit] + return results + + def tags_suggest(self, memory_id: str) -> List[Dict[str, Any]]: + """Suggest tags for a memory based on co-occurrence patterns. + + Returns top 10 suggestions as {"tag": str, "score": int, "reason": str}. + """ + index = self._load_index() + entry = index.get("entries", {}).get(memory_id) + if not entry: + return [] + + existing_tags = set(t.lower().strip() for t in entry.get("tags", [])) + if not existing_tags: + return [] + + # For each existing tag, find co-occurring tags and accumulate scores + candidate_scores: Dict[str, int] = {} + candidate_sources: Dict[str, set] = {} + + for existing_tag in existing_tags: + co_tags = self.tags_related(existing_tag) + for co_entry in co_tags: + candidate = co_entry["tag"] + if candidate in existing_tags: + continue + candidate_scores[candidate] = ( + candidate_scores.get(candidate, 0) + co_entry["co_occurrences"] + ) + if candidate not in candidate_sources: + candidate_sources[candidate] = set() + candidate_sources[candidate].add(existing_tag) + + results = [] + for candidate, score in candidate_scores.items(): + sources = sorted(candidate_sources[candidate]) + reason = "co-occurs with: " + ", ".join(sources) + results.append({"tag": candidate, "score": score, "reason": reason}) + + results.sort(key=lambda x: x["score"], reverse=True) + return results[:10] + + def reflect( + self, + since: Optional[str] = None, + dry_run: bool = False, + ) -> Dict[str, Any]: + """Review recent memories, identify tag-based clusters, and output consolidation recommendations. + + Clusters memories that share 2+ tags using iterative union-find merging. + Does NOT auto-create insights; the agent reads output and decides what to store. + + Args: + since: ISO date string (YYYY-MM-DD) to filter memories from. Falls back to + _state.json's last_reflection, then 30 days ago. + dry_run: If True, return analysis without updating _state.json or logging episode. + + Returns: + Dict with clusters, total_memories_reviewed, clusters_found, and since date. + """ + now = datetime.now(timezone.utc) + state = self._load_state() + + # Determine time window + if since: + since_date = since + elif state.get("last_reflection"): + # last_reflection may be a full ISO timestamp; extract date portion + since_date = state["last_reflection"][:10] + else: + since_dt = now - timedelta(days=30) + since_date = since_dt.strftime("%Y-%m-%d") + + # Load recent memories from index + index = self._load_index() + recent_memories = [] + for mid, entry in index.get("entries", {}).items(): + created = entry.get("created", "") + # Compare date portion only (YYYY-MM-DD) + created_date = created[:10] if created else "" + if created_date >= since_date: + recent_memories.append( + { + "id": mid, + "title": entry.get("title", ""), + "type": entry.get("type", "general"), + "tags": [t.lower().strip() for t in entry.get("tags", [])], + } + ) + + total_reviewed = len(recent_memories) + + # Union-find clustering by tag overlap >= 2 + # parent[i] = index of parent in recent_memories list + n = len(recent_memories) + parent = list(range(n)) + + def find(x: int) -> int: + while parent[x] != x: + parent[x] = parent[parent[x]] # path compression + x = parent[x] + return x + + def union(a: int, b: int): + ra, rb = find(a), find(b) + if ra != rb: + parent[rb] = ra + + # Compare all pairs, check tag overlap + for i in range(n): + tags_i = set(recent_memories[i]["tags"]) + for j in range(i + 1, n): + tags_j = set(recent_memories[j]["tags"]) + if len(tags_i & tags_j) >= 2: + union(i, j) + + # Group by root + clusters_map: Dict[int, List[int]] = {} + for i in range(n): + root = find(i) + clusters_map.setdefault(root, []).append(i) + + # Build output clusters (only clusters with 2+ members) + clusters = [] + cluster_id = 0 + for indices in clusters_map.values(): + if len(indices) < 2: + continue + cluster_id += 1 + members = [] + all_tag_sets = [] + types_seen = set() + for idx in indices: + mem = recent_memories[idx] + members.append( + { + "id": mem["id"], + "title": mem["title"], + "type": mem["type"], + "tags": mem["tags"], + } + ) + all_tag_sets.append(set(mem["tags"])) + types_seen.add(mem["type"]) + + # Common tags = intersection of ALL members + common_tags = ( + sorted(set.intersection(*all_tag_sets)) if all_tag_sets else [] + ) + + # Shared tags = tags appearing in 2+ members + tag_counts: Dict[str, int] = {} + for ts in all_tag_sets: + for t in ts: + tag_counts[t] = tag_counts.get(t, 0) + 1 + shared_tags = sorted(t for t, c in tag_counts.items() if c >= 2) + + # Suggested topic + tag_label = ( + ", ".join(common_tags[:4]) + if common_tags + else ", ".join(shared_tags[:4]) + ) + type_label = ", ".join(sorted(types_seen)) + suggested_topic = f"Pattern: {tag_label} across {type_label}" + + clusters.append( + { + "cluster_id": cluster_id, + "members": members, + "common_tags": common_tags, + "shared_tags": shared_tags, + "suggested_topic": suggested_topic, + "member_count": len(members), + } + ) + + # Sort clusters by member count descending + clusters.sort(key=lambda c: c["member_count"], reverse=True) + # Re-number after sorting + for i, c in enumerate(clusters): + c["cluster_id"] = i + 1 + + result = { + "clusters": clusters, + "total_memories_reviewed": total_reviewed, + "clusters_found": len(clusters), + "since": since_date, + } + + if not dry_run: + # Update state with last_reflection timestamp + state["last_reflection"] = now.isoformat() + self._save_state(state) + + # Log an episode entry + self.episode( + type="reflection", + title=f"Reflection: {len(clusters)} clusters from {total_reviewed} memories", + tags=["reflection", "cognitive-memory"], + summary=f"Reviewed {total_reviewed} memories since {since_date}, found {len(clusters)} clusters", + ) + + # Regenerate REFLECTION.md + self.reflection_summary() + + return result + + def merge( + self, + keep_id: str, + absorb_id: str, + dry_run: bool = False, + ) -> Dict[str, Any]: + """Merge two memories: absorb one into another. + + The 'keep' memory absorbs the content, tags, and relations from + the 'absorb' memory. The absorb memory is then deleted. + All other memories referencing absorb_id are updated to point to keep_id. + + If dry_run=True, returns what would change without writing anything. + """ + # Resolve both memory file paths + keep_path = self._resolve_memory_path(keep_id) + if not keep_path: + raise ValueError(f"Keep memory not found: {keep_id}") + absorb_path = self._resolve_memory_path(absorb_id) + if not absorb_path: + raise ValueError(f"Absorb memory not found: {absorb_id}") + + # Read both memory files + keep_fm, keep_body = self._read_memory_file(keep_path) + absorb_fm, absorb_body = self._read_memory_file(absorb_path) + + keep_title = keep_fm.get("title", keep_id[:8]) + absorb_title = absorb_fm.get("title", absorb_id[:8]) + + # Combine content + merged_body = keep_body + if absorb_body.strip(): + merged_body = ( + keep_body.rstrip() + + f"\n\n---\n*Merged from: {absorb_title}*\n\n" + + absorb_body.strip() + ) + + # Merge tags (sorted, deduplicated) + keep_tags = keep_fm.get("tags", []) + absorb_tags = absorb_fm.get("tags", []) + merged_tags = sorted(list(set(keep_tags + absorb_tags))) + + # importance = max of both + merged_importance = max( + keep_fm.get("importance", 0.5), + absorb_fm.get("importance", 0.5), + ) + + # Merge relations: combine both relation lists + keep_rels = list(keep_fm.get("relations", [])) + absorb_rels = list(absorb_fm.get("relations", [])) + combined_rels = keep_rels + absorb_rels + + # Replace any relation targeting absorb_id with keep_id + for rel in combined_rels: + if rel.get("target") == absorb_id: + rel["target"] = keep_id + + # Deduplicate by (target, type) tuple + seen = set() + deduped_rels = [] + for rel in combined_rels: + key = (rel.get("target"), rel.get("type")) + if key not in seen: + seen.add(key) + deduped_rels.append(rel) + + # Remove self-referential relations (target == keep_id) + merged_rels = [r for r in deduped_rels if r.get("target") != keep_id] + + # Scan all other memories for relations targeting absorb_id + index = self._load_index() + updated_others = [] + for mid, entry in index.get("entries", {}).items(): + if mid in (keep_id, absorb_id): + continue + rels = entry.get("relations", []) + needs_update = any(r.get("target") == absorb_id for r in rels) + if needs_update: + updated_others.append(mid) + + if dry_run: + return { + "dry_run": True, + "keep_id": keep_id, + "keep_title": keep_title, + "absorb_id": absorb_id, + "absorb_title": absorb_title, + "merged_tags": merged_tags, + "merged_importance": merged_importance, + "merged_relations_count": len(merged_rels), + "other_memories_updated": len(updated_others), + "other_memory_ids": updated_others, + } + + # Write updated keep memory file + now = datetime.now(timezone.utc).isoformat() + keep_fm["tags"] = merged_tags + keep_fm["importance"] = merged_importance + keep_fm["relations"] = merged_rels + keep_fm["updated"] = now + self._write_memory_file(keep_path, keep_fm, merged_body) + + # Update index for keep memory (refresh content_preview with merged body) + keep_rel_path = str(keep_path.relative_to(self.memory_dir)) + preview = merged_body.strip()[:200] + if len(merged_body.strip()) > 200: + last_space = preview.rfind(" ") + if last_space > 0: + preview = preview[:last_space] + self._update_index_entry( + keep_id, keep_fm, keep_rel_path, content_preview=preview + ) + + # Update all other memories that reference absorb_id + for mid in updated_others: + other_path = self._resolve_memory_path(mid) + if not other_path: + continue + try: + other_fm, other_body = self._read_memory_file(other_path) + other_rels = other_fm.get("relations", []) + for rel in other_rels: + if rel.get("target") == absorb_id: + rel["target"] = keep_id + # Deduplicate after replacement + seen_other = set() + deduped_other = [] + for rel in other_rels: + key = (rel.get("target"), rel.get("type")) + if key not in seen_other: + seen_other.add(key) + deduped_other.append(rel) + # Remove self-referential + other_fm["relations"] = [ + r for r in deduped_other if r.get("target") != mid + ] + other_fm["updated"] = now + self._write_memory_file(other_path, other_fm, other_body) + other_rel_path = str(other_path.relative_to(self.memory_dir)) + self._update_index_entry(mid, other_fm, other_rel_path) + except Exception: + pass + + # Delete absorb memory file and remove from index/state + absorb_path.unlink() + self._remove_index_entry(absorb_id) + state = self._load_state() + state.get("entries", {}).pop(absorb_id, None) + self._save_state(state) + + # Git: stage absorb deletion + try: + absorb_rel = absorb_path.relative_to(self.memory_dir) + subprocess.run( + ["git", "rm", "--cached", str(absorb_rel)], + cwd=str(self.memory_dir), + capture_output=True, + timeout=5, + ) + except Exception: + pass + + self._git_commit(f"merge: {keep_title} absorbed {absorb_title}") + + return { + "success": True, + "keep_id": keep_id, + "keep_title": keep_title, + "absorb_id": absorb_id, + "absorb_title": absorb_title, + "merged_tags": merged_tags, + "merged_importance": merged_importance, + "merged_relations_count": len(merged_rels), + "other_memories_updated": len(updated_others), + } + + def reflection_summary(self) -> str: + """Generate REFLECTION.md with patterns and insights from the memory system. + + Writes to ~/.claude/memory/REFLECTION.md and git-commits the file. + Returns the generated content. + """ + index = self._load_index() + state = self._load_state() + entries = index.get("entries", {}) + state_entries = state.get("entries", {}) + now_str = datetime.now(timezone.utc).strftime("%Y-%m-%d") + + # Last reflection date + last_reflection = state.get("last_reflection", "") + last_reflection_date = last_reflection[:10] if last_reflection else "never" + + # Count total reflections from episode files + total_reflections = 0 + episodes_dir = self.memory_dir / "episodes" + if episodes_dir.exists(): + for ep_file in sorted(episodes_dir.glob("*.md")): + try: + text = ep_file.read_text(encoding="utf-8") + # Count lines that look like reflection episode entries + for line in text.split("\n"): + if "Reflection:" in line and line.strip().startswith("## "): + total_reflections += 1 + except OSError: + pass + + lines = [ + "# Reflection Summary (auto-generated)", + f"> Last updated: {now_str} | Last reflection: {last_reflection_date} | Total reflections: {total_reflections}", + "", + ] + + # === Themes section === + # Get top tags, then find co-occurrence pairs + lines.append("## Themes") + lines.append("Top tag co-occurrences revealing recurring themes:") + lines.append("") + + top_tags = self.tags_list(limit=10) + # Build co-occurrence pairs across top tags + pair_data: Dict[Tuple[str, str], List[str]] = {} # (tag1, tag2) -> [titles] + for tag_info in top_tags[:5]: + tag = tag_info["tag"] + co_tags = self.tags_related(tag, limit=5) + for co in co_tags: + other = co["tag"] + # Canonical pair ordering + pair = tuple(sorted([tag, other])) + if pair in pair_data: + continue + # Find memories with both tags and collect titles + titles = [] + for mid, entry in entries.items(): + mem_tags = [t.lower().strip() for t in entry.get("tags", [])] + if pair[0] in mem_tags and pair[1] in mem_tags: + titles.append(entry.get("title", "untitled")) + if titles: + pair_data[pair] = titles + + # Sort pairs by memory count descending, show top 8 + sorted_pairs = sorted(pair_data.items(), key=lambda x: len(x[1]), reverse=True) + for (tag_a, tag_b), titles in sorted_pairs[:8]: + example_titles = ", ".join(f'"{t}"' for t in titles[:3]) + lines.append( + f"- **{tag_a} + {tag_b}**: {len(titles)} memories ({example_titles})" + ) + + if not sorted_pairs: + lines.append("- No co-occurrence data available yet.") + lines.append("") + + # === Cross-Project Patterns section === + lines.append("## Cross-Project Patterns") + lines.append("Tags that span multiple projects:") + lines.append("") + + known_projects = [ + "major-domo", + "paper-dynasty", + "homelab", + "vagabond-rpg", + "foundryvtt", + "strat-gameplay-webapp", + ] + # Build tag -> {project -> count} mapping + tag_project_map: Dict[str, Dict[str, int]] = {} + for mid, entry in entries.items(): + mem_tags = [t.lower().strip() for t in entry.get("tags", [])] + # Identify which projects this memory belongs to + mem_projects = [t for t in mem_tags if t in known_projects] + # For non-project tags, record which projects they appear in + non_project_tags = [t for t in mem_tags if t not in known_projects] + for tag in non_project_tags: + if tag not in tag_project_map: + tag_project_map[tag] = {} + for proj in mem_projects: + tag_project_map[tag][proj] = tag_project_map[tag].get(proj, 0) + 1 + + # Filter to tags spanning 2+ projects, sort by project count then total + cross_project = [] + for tag, projects in tag_project_map.items(): + if len(projects) >= 2: + total = sum(projects.values()) + cross_project.append((tag, projects, total)) + cross_project.sort(key=lambda x: (len(x[1]), x[2]), reverse=True) + + for tag, projects, total in cross_project[:10]: + proj_parts = ", ".join( + f"{p} ({c})" + for p, c in sorted(projects.items(), key=lambda x: x[1], reverse=True) + ) + lines.append(f"- **{tag}**: appears in {proj_parts}") + + if not cross_project: + lines.append("- No cross-project patterns detected yet.") + lines.append("") + + # === Most Accessed section === + lines.append("## Most Accessed") + lines.append("Top 10 memories by access count:") + lines.append("") + + # Sort state entries by access_count descending + accessed = [] + for mid, s in state_entries.items(): + count = s.get("access_count", 0) + if count > 0: + entry = entries.get(mid) + if entry: + accessed.append( + ( + mid, + entry.get("title", "untitled"), + entry.get("path", ""), + count, + ) + ) + accessed.sort(key=lambda x: x[3], reverse=True) + + for mid, title, path, count in accessed[:10]: + lines.append(f"1. [{title}]({path}) - {count} accesses") + + if not accessed: + lines.append("- No access data recorded yet.") + lines.append("") + + # === Recent Insights section === + lines.append("## Recent Insights") + lines.append("Insight-type memories:") + lines.append("") + + insights = [] + for mid, entry in entries.items(): + if entry.get("type") == "insight": + insights.append((mid, entry)) + # Sort by created date descending + insights.sort(key=lambda x: x[1].get("created", ""), reverse=True) + + for mid, entry in insights[:10]: + title = entry.get("title", "untitled") + path = entry.get("path", "") + preview = entry.get("content_preview", "") + if preview: + lines.append(f"- [{title}]({path}) - {preview[:80]}") + else: + lines.append(f"- [{title}]({path})") + + if not insights: + lines.append("- No insight memories stored yet.") + lines.append("") + + # === Consolidation History section === + lines.append("## Consolidation History") + lines.append("") + + merge_count = 0 + if episodes_dir.exists(): + for ep_file in sorted(episodes_dir.glob("*.md")): + try: + text = ep_file.read_text(encoding="utf-8") + for line_text in text.split("\n"): + stripped = line_text.strip() + if stripped.startswith("## ") and "merge" in stripped.lower(): + merge_count += 1 + except OSError: + pass + + lines.append(f"- Total merges performed: {merge_count}") + lines.append("") + + content = "\n".join(lines) + + # Write REFLECTION.md + reflection_path = self.memory_dir / "REFLECTION.md" + reflection_path.write_text(content, encoding="utf-8") + self._git_commit("reflection: regenerate REFLECTION.md", [reflection_path]) + + return content diff --git a/skills/cognitive-memory/cli.py b/skills/cognitive-memory/cli.py new file mode 100644 index 0000000..8403556 --- /dev/null +++ b/skills/cognitive-memory/cli.py @@ -0,0 +1,486 @@ +#!/usr/bin/env python3 +""" +Cognitive Memory - CLI Interface + +Command-line interface for the cognitive memory system. +""" + +import argparse +import json +import sys + +from client import CognitiveMemoryClient +from common import ( + MEMORY_DIR, + VALID_RELATION_TYPES, + VALID_TYPES, + _load_memory_config, +) + + +def main(): + parser = argparse.ArgumentParser( + description="Cognitive Memory - Markdown-based memory system with decay scoring", + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + subparsers = parser.add_subparsers(dest="command", help="Commands") + + # store + sp = subparsers.add_parser("store", help="Store a new memory") + sp.add_argument( + "--type", "-t", required=True, choices=VALID_TYPES, help="Memory type" + ) + sp.add_argument("--title", required=True, help="Memory title") + sp.add_argument("--content", "-c", required=True, help="Memory content") + sp.add_argument("--tags", help="Comma-separated tags") + sp.add_argument( + "--importance", "-i", type=float, default=0.5, help="Importance 0.0-1.0" + ) + sp.add_argument("--confidence", type=float, default=0.8, help="Confidence 0.0-1.0") + sp.add_argument( + "--episode", + action="store_true", + default=False, + help="Also log an episode entry", + ) + + # recall + sp = subparsers.add_parser("recall", help="Search memories by query") + sp.add_argument("query", help="Search query") + sp.add_argument("--types", help="Comma-separated memory types") + sp.add_argument("--limit", "-n", type=int, default=10, help="Max results") + sp.add_argument( + "--no-semantic", + action="store_true", + default=False, + help="Disable semantic search (keyword-only, faster)", + ) + + # get + sp = subparsers.add_parser("get", help="Get memory by ID") + sp.add_argument("memory_id", help="Memory UUID") + + # relate + sp = subparsers.add_parser("relate", help="Create relationship") + sp.add_argument("from_id", help="Source memory UUID") + sp.add_argument("to_id", help="Target memory UUID") + sp.add_argument("rel_type", choices=VALID_RELATION_TYPES, help="Relationship type") + sp.add_argument("--strength", type=float, default=0.8, help="Strength 0.0-1.0") + sp.add_argument("--context", help="Context description") + sp.add_argument("--description", help="Rich edge description body") + + # edge-get + sp = subparsers.add_parser("edge-get", help="Get edge by ID") + sp.add_argument("edge_id", help="Edge UUID") + + # edge-search + sp = subparsers.add_parser("edge-search", help="Search edges") + sp.add_argument("--query", "-q", help="Text query") + sp.add_argument("--types", help="Comma-separated relation types") + sp.add_argument("--from-id", help="Filter by source memory ID") + sp.add_argument("--to-id", help="Filter by target memory ID") + sp.add_argument("--limit", "-n", type=int, default=20, help="Max results") + + # edge-update + sp = subparsers.add_parser("edge-update", help="Update an edge") + sp.add_argument("edge_id", help="Edge UUID") + sp.add_argument("--description", help="New description body") + sp.add_argument("--strength", type=float, help="New strength 0.0-1.0") + + # edge-delete + sp = subparsers.add_parser("edge-delete", help="Delete an edge") + sp.add_argument("edge_id", help="Edge UUID") + + # search + sp = subparsers.add_parser("search", help="Filter memories") + sp.add_argument("--query", "-q", help="Text query") + sp.add_argument("--types", help="Comma-separated memory types") + sp.add_argument("--tags", help="Comma-separated tags") + sp.add_argument("--min-importance", type=float, help="Minimum importance") + sp.add_argument("--limit", "-n", type=int, default=20, help="Max results") + + # update + sp = subparsers.add_parser("update", help="Update a memory") + sp.add_argument("memory_id", help="Memory UUID") + sp.add_argument("--title", help="New title") + sp.add_argument("--content", help="New content") + sp.add_argument("--tags", help="New tags (comma-separated)") + sp.add_argument("--importance", type=float, help="New importance") + + # delete + sp = subparsers.add_parser("delete", help="Delete a memory") + sp.add_argument("memory_id", help="Memory UUID") + sp.add_argument("--force", "-f", action="store_true", help="Skip confirmation") + + # related + sp = subparsers.add_parser("related", help="Get related memories") + sp.add_argument("memory_id", help="Memory UUID") + sp.add_argument("--types", help="Comma-separated relationship types") + sp.add_argument("--depth", type=int, default=1, help="Traversal depth 1-5") + + # stats + subparsers.add_parser("stats", help="Show statistics") + + # recent + sp = subparsers.add_parser("recent", help="Recently created memories") + sp.add_argument("--limit", "-n", type=int, default=20, help="Max results") + + # decay + subparsers.add_parser("decay", help="Recalculate all decay scores") + + # core + subparsers.add_parser("core", help="Generate CORE.md") + + # episode + sp = subparsers.add_parser("episode", help="Log episode entry") + sp.add_argument("--type", "-t", required=True, help="Entry type") + sp.add_argument("--title", required=True, help="Entry title") + sp.add_argument("--tags", help="Comma-separated tags") + sp.add_argument("--summary", "-s", help="Summary text") + sp.add_argument("--memory-link", help="Path to related memory file") + + # reindex + subparsers.add_parser("reindex", help="Rebuild index from files") + + # embed + embed_parser = subparsers.add_parser( + "embed", help="Generate embeddings for all memories via Ollama" + ) + embed_parser.add_argument( + "--if-changed", + action="store_true", + help="Skip if no memories were added or deleted since last embed", + ) + + # pin + sp = subparsers.add_parser("pin", help="Move memory to vault (never decays)") + sp.add_argument("memory_id", help="Memory UUID") + + # reflect + sp = subparsers.add_parser( + "reflect", help="Review recent memories and identify clusters" + ) + sp.add_argument("--since", help="ISO date (YYYY-MM-DD) to review from") + sp.add_argument( + "--dry-run", action="store_true", help="Preview without updating state" + ) + + # merge + sp = subparsers.add_parser( + "merge", help="Merge two memories (absorb one into another)" + ) + sp.add_argument("keep_id", help="Memory UUID to keep") + sp.add_argument("absorb_id", help="Memory UUID to absorb and delete") + sp.add_argument( + "--dry-run", action="store_true", help="Preview merge without writing" + ) + + # reflection + subparsers.add_parser("reflection", help="Generate REFLECTION.md summary") + + # tags + sp = subparsers.add_parser("tags", help="Tag analysis commands") + tags_sub = sp.add_subparsers(dest="tags_command") + sp2 = tags_sub.add_parser("list", help="List all tags with counts") + sp2.add_argument("--limit", "-n", type=int, default=0, help="Max results (0=all)") + sp3 = tags_sub.add_parser("related", help="Find co-occurring tags") + sp3.add_argument("tag", help="Tag to analyze") + sp3.add_argument("--limit", "-n", type=int, default=0, help="Max results (0=all)") + sp4 = tags_sub.add_parser("suggest", help="Suggest tags for a memory") + sp4.add_argument("memory_id", help="Memory UUID") + + # procedure + sp = subparsers.add_parser( + "procedure", help="Store a procedure memory (convenience wrapper)" + ) + sp.add_argument("--title", required=True, help="Procedure title") + sp.add_argument("--content", "-c", required=True, help="Procedure description") + sp.add_argument("--steps", help="Comma-separated ordered steps") + sp.add_argument("--preconditions", help="Comma-separated preconditions") + sp.add_argument("--postconditions", help="Comma-separated postconditions") + sp.add_argument("--tags", help="Comma-separated tags") + sp.add_argument( + "--importance", "-i", type=float, default=0.5, help="Importance 0.0-1.0" + ) + + # config + sp = subparsers.add_parser("config", help="Manage embedding config") + sp.add_argument("--show", action="store_true", help="Display current config") + sp.add_argument( + "--provider", choices=["ollama", "openai"], help="Set embedding provider" + ) + sp.add_argument("--openai-key", help="Set OpenAI API key") + sp.add_argument( + "--ollama-model", help="Set Ollama model name (e.g. qwen3-embedding:8b)" + ) + + args = parser.parse_args() + + if not args.command: + parser.print_help() + sys.exit(1) + + client = CognitiveMemoryClient() + result = None + + if args.command == "store": + tags = [t.strip() for t in args.tags.split(",")] if args.tags else None + memory_id = client.store( + type=args.type, + title=args.title, + content=args.content, + tags=tags, + importance=args.importance, + confidence=args.confidence, + ) + result = {"success": True, "memory_id": memory_id} + + if args.episode: + # Get the relative path from the index for memory_link + index = client._load_index() + entry = index.get("entries", {}).get(memory_id, {}) + rel_path = entry.get("path", "") + + # Truncate content at word boundary for summary (max 100 chars) + summary = args.content.strip()[:100] + if len(args.content.strip()) > 100: + last_space = summary.rfind(" ") + if last_space > 0: + summary = summary[:last_space] + + client.episode( + type=args.type, + title=args.title, + tags=tags or [], + summary=summary, + memory_link=rel_path, + ) + result["episode_logged"] = True + + elif args.command == "recall": + types = [t.strip() for t in args.types.split(",")] if args.types else None + result = client.recall( + args.query, + memory_types=types, + limit=args.limit, + semantic=not args.no_semantic, + ) + + elif args.command == "get": + result = client.get(args.memory_id) + if not result: + result = {"error": "Memory not found"} + + elif args.command == "relate": + edge_id = client.relate( + args.from_id, + args.to_id, + args.rel_type, + strength=args.strength, + context=args.context, + description=args.description, + ) + result = {"success": bool(edge_id), "edge_id": edge_id} + + elif args.command == "edge-get": + result = client.edge_get(args.edge_id) + if not result: + result = {"error": "Edge not found"} + + elif args.command == "edge-search": + types = [t.strip() for t in args.types.split(",")] if args.types else None + result = client.edge_search( + query=args.query, + types=types, + from_id=getattr(args, "from_id", None), + to_id=getattr(args, "to_id", None), + limit=args.limit, + ) + + elif args.command == "edge-update": + success = client.edge_update( + args.edge_id, + description=args.description, + strength=args.strength, + ) + result = {"success": success} + + elif args.command == "edge-delete": + success = client.edge_delete(args.edge_id) + result = {"success": success} + + elif args.command == "search": + types = [t.strip() for t in args.types.split(",")] if args.types else None + tags = [t.strip() for t in args.tags.split(",")] if args.tags else None + result = client.search( + query=args.query, + memory_types=types, + tags=tags, + min_importance=args.min_importance, + limit=args.limit, + ) + + elif args.command == "update": + tags = [t.strip() for t in args.tags.split(",")] if args.tags else None + success = client.update( + args.memory_id, + title=args.title, + content=args.content, + tags=tags, + importance=args.importance, + ) + result = {"success": success} + + elif args.command == "delete": + if not args.force: + mem = client.get(args.memory_id) + if mem: + print(f"Deleting: {mem.get('title')}", file=sys.stderr) + success = client.delete(args.memory_id) + result = {"success": success} + + elif args.command == "related": + types = [t.strip() for t in args.types.split(",")] if args.types else None + result = client.related(args.memory_id, rel_types=types, max_depth=args.depth) + + elif args.command == "stats": + result = client.stats() + + elif args.command == "recent": + result = client.recent(limit=args.limit) + + elif args.command == "decay": + result = client.decay() + + elif args.command == "core": + content = client.core() + # Print path, not content (content is written to file) + result = { + "success": True, + "path": str(MEMORY_DIR / "CORE.md"), + "chars": len(content), + } + + elif args.command == "episode": + tags = [t.strip() for t in args.tags.split(",")] if args.tags else None + client.episode( + type=args.type, + title=args.title, + tags=tags, + summary=args.summary, + memory_link=args.memory_link, + ) + result = {"success": True} + + elif args.command == "reindex": + count = client.reindex() + result = {"success": True, "indexed": count} + + elif args.command == "embed": + if_changed = getattr(args, "if_changed", False) + if not if_changed: + print( + "Generating embeddings (this may take a while if model needs to be pulled)...", + file=sys.stderr, + ) + result = client.embed(if_changed=if_changed) + + elif args.command == "pin": + success = client.pin(args.memory_id) + result = {"success": success} + + elif args.command == "reflect": + result = client.reflect( + since=args.since, + dry_run=args.dry_run, + ) + + elif args.command == "merge": + result = client.merge( + keep_id=args.keep_id, + absorb_id=args.absorb_id, + dry_run=args.dry_run, + ) + + elif args.command == "reflection": + content = client.reflection_summary() + result = { + "success": True, + "path": str(MEMORY_DIR / "REFLECTION.md"), + "chars": len(content), + } + + elif args.command == "tags": + if args.tags_command == "list": + result = client.tags_list(limit=args.limit) + elif args.tags_command == "related": + result = client.tags_related(args.tag, limit=args.limit) + elif args.tags_command == "suggest": + result = client.tags_suggest(args.memory_id) + else: + # No subcommand given, print tags help + # Re-parse to get the tags subparser for help output + for action in parser._subparsers._actions: + if isinstance(action, argparse._SubParsersAction): + tags_parser = action.choices.get("tags") + if tags_parser: + tags_parser.print_help() + break + sys.exit(1) + + elif args.command == "procedure": + tags = [t.strip() for t in args.tags.split(",")] if args.tags else None + steps = [s.strip() for s in args.steps.split(",")] if args.steps else None + preconditions = ( + [p.strip() for p in args.preconditions.split(",")] + if args.preconditions + else None + ) + postconditions = ( + [p.strip() for p in args.postconditions.split(",")] + if args.postconditions + else None + ) + memory_id = client.store( + type="procedure", + title=args.title, + content=args.content, + tags=tags, + importance=args.importance, + steps=steps, + preconditions=preconditions, + postconditions=postconditions, + ) + result = {"success": True, "memory_id": memory_id} + + elif args.command == "config": + config_path = MEMORY_DIR / "_config.json" + config = _load_memory_config(config_path) + changed = False + + if args.provider: + config["embedding_provider"] = args.provider + changed = True + if args.openai_key: + config["openai_api_key"] = args.openai_key + changed = True + if args.ollama_model: + config["ollama_model"] = args.ollama_model + changed = True + + if changed: + config_path.write_text(json.dumps(config, indent=2)) + result = {"success": True, "updated": True} + elif args.show or not changed: + # Mask API key for display + display = dict(config) + key = display.get("openai_api_key") + if key and isinstance(key, str) and len(key) > 8: + display["openai_api_key"] = key[:4] + "..." + key[-4:] + result = display + + print(json.dumps(result, indent=2, default=str)) + + +if __name__ == "__main__": + main() diff --git a/skills/cognitive-memory/client.py b/skills/cognitive-memory/client.py index 5da0732..c26885a 100644 --- a/skills/cognitive-memory/client.py +++ b/skills/cognitive-memory/client.py @@ -20,533 +20,69 @@ Usage: memory_id = client.store(type="solution", title="...", content="...") """ -import argparse import json -import math import os -import re import subprocess import sys -import urllib.request +import tempfile import uuid -from datetime import datetime, timedelta, timezone +from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional, Tuple -from urllib.error import URLError - -# ============================================================================= -# CONSTANTS -# ============================================================================= - -MEMORY_DIR = Path.home() / ".claude" / "memory" -INDEX_PATH = MEMORY_DIR / "_index.json" -STATE_PATH = MEMORY_DIR / "_state.json" -EMBEDDINGS_PATH = MEMORY_DIR / "_embeddings.json" -OLLAMA_URL = "http://localhost:11434" -EMBEDDING_MODEL = "nomic-embed-text" -EMBEDDING_TIMEOUT = 5 # seconds -CONFIG_PATH = MEMORY_DIR / "_config.json" -OPENAI_EMBED_URL = "https://api.openai.com/v1/embeddings" -OPENAI_MODEL_DEFAULT = "text-embedding-3-small" - -# Memory type -> directory name mapping -TYPE_DIRS = { - "solution": "solutions", - "fix": "fixes", - "decision": "decisions", - "configuration": "configurations", - "problem": "problems", - "workflow": "workflows", - "code_pattern": "code-patterns", - "error": "errors", - "general": "general", - "procedure": "procedures", - "insight": "insights", -} - -VALID_TYPES = list(TYPE_DIRS.keys()) - -# Decay model type weights -TYPE_WEIGHTS = { - "decision": 1.3, - "solution": 1.2, - "insight": 1.25, - "code_pattern": 1.1, - "configuration": 1.1, - "fix": 1.0, - "workflow": 1.0, - "problem": 0.9, - "error": 0.8, - "general": 0.8, - "procedure": 1.4, -} - -DECAY_LAMBDA = 0.03 # Half-life ~23 days - -# Decay score thresholds -THRESHOLD_ACTIVE = 0.5 -THRESHOLD_FADING = 0.2 -THRESHOLD_DORMANT = 0.05 - -# Relationship types (subset from MemoryGraph, focused on most useful) -VALID_RELATION_TYPES = [ - "SOLVES", - "CAUSES", - "BUILDS_ON", - "ALTERNATIVE_TO", - "REQUIRES", - "FOLLOWS", - "RELATED_TO", -] - -# Edge file constants -EDGES_DIR_NAME = "edges" -EDGE_FIELD_ORDER = [ - "id", - "type", - "from_id", - "from_title", - "to_id", - "to_title", - "strength", - "created", - "updated", -] - -# Frontmatter field order for consistent output -FIELD_ORDER = [ - "id", - "type", - "title", - "tags", - "importance", - "confidence", - "steps", - "preconditions", - "postconditions", - "created", - "updated", - "relations", -] - -# CORE.md token budget (approximate, 1 token ~= 4 chars) -CORE_MAX_CHARS = 12000 # ~3K tokens - - -# ============================================================================= -# YAML FRONTMATTER PARSING (stdlib only) -# ============================================================================= - - -def _needs_quoting(s: str) -> bool: - """Check if a YAML string value needs quoting.""" - if not s: - return True - if any(c in s for c in ":#{}[]&*?|>!%@`"): - return True - try: - float(s) - return True - except ValueError: - pass - if s.lower() in ("true", "false", "null", "yes", "no", "on", "off"): - return True - return False - - -def _quote_yaml(s: str) -> str: - """Quote a string for YAML, escaping internal quotes.""" - escaped = s.replace("\\", "\\\\").replace('"', '\\"') - return f'"{escaped}"' - - -def _format_yaml_value(value: Any, force_quote: bool = False) -> str: - """Format a Python value for YAML output.""" - if value is None: - return "null" - if isinstance(value, bool): - return "true" if value else "false" - if isinstance(value, (int, float)): - return str(value) - s = str(value) - if force_quote or _needs_quoting(s): - return _quote_yaml(s) - return s - - -def _parse_scalar(value: str) -> Any: - """Parse a YAML scalar value to Python type.""" - v = value.strip() - if not v or v == "null": - return None - if v == "true": - return True - if v == "false": - return False - # Try numeric - try: - if "." in v: - return float(v) - return int(v) - except ValueError: - pass - # Strip quotes - if (v.startswith('"') and v.endswith('"')) or ( - v.startswith("'") and v.endswith("'") - ): - return v[1:-1] - return v - - -def serialize_frontmatter(data: Dict[str, Any]) -> str: - """Serialize a dict to YAML frontmatter string (between --- markers).""" - lines = ["---"] - - for key in FIELD_ORDER: - if key not in data: - continue - value = data[key] - - if key == "tags" and isinstance(value, list): - if value: - items = ", ".join(_format_yaml_value(t) for t in value) - lines.append(f"tags: [{items}]") - else: - lines.append("tags: []") - - elif key in ("steps", "preconditions", "postconditions") and isinstance( - value, list - ): - if not value: - continue - lines.append(f"{key}:") - for item in value: - lines.append(f" - {_format_yaml_value(str(item), force_quote=True)}") - - elif key == "relations" and isinstance(value, list): - if not value: - continue - lines.append("relations:") - for rel in value: - first = True - for rk in [ - "target", - "type", - "direction", - "strength", - "context", - "edge_id", - ]: - if rk not in rel: - continue - rv = rel[rk] - prefix = " - " if first else " " - force_q = rk in ("context",) - lines.append( - f"{prefix}{rk}: {_format_yaml_value(rv, force_quote=force_q)}" - ) - first = False - - elif key == "title": - lines.append(f"title: {_format_yaml_value(value, force_quote=True)}") - - else: - lines.append(f"{key}: {_format_yaml_value(value)}") - - lines.append("---") - return "\n".join(lines) - - -def parse_frontmatter(text: str) -> Tuple[Dict[str, Any], str]: - """Parse YAML frontmatter and body from markdown text. - - Returns (frontmatter_dict, body_text). - """ - if not text.startswith("---\n"): - return {}, text - - # Find closing --- - end_match = re.search(r"\n---\s*\n", text[3:]) - if not end_match: - # Try end of string - if text.rstrip().endswith("---"): - end_pos = text.rstrip().rfind("\n---") - if end_pos <= 3: - return {}, text - fm_text = text[4:end_pos] - body = "" - else: - return {}, text - else: - end_pos = end_match.start() + 3 # Offset from text[3:] - fm_text = text[4:end_pos] - body = text[end_pos + end_match.end() - end_match.start() :] - - body = body.lstrip("\n") - data = {} - lines = fm_text.split("\n") - i = 0 - - while i < len(lines): - line = lines[i] - - # Skip empty lines - if not line.strip(): - i += 1 - continue - - # Must be a top-level key (no leading whitespace) - if line[0] == " ": - i += 1 - continue - - if ":" not in line: - i += 1 - continue - - key, _, rest = line.partition(":") - key = key.strip() - rest = rest.strip() - - if not rest: - # Block value - collect indented lines - block_lines = [] - j = i + 1 - while j < len(lines) and lines[j] and lines[j][0] == " ": - block_lines.append(lines[j]) - j += 1 - - if key == "relations": - data["relations"] = _parse_relations_block(block_lines) - elif block_lines and block_lines[0].strip().startswith("- "): - # Simple list - data[key] = [ - _parse_scalar(bl.strip().lstrip("- ")) - for bl in block_lines - if bl.strip().startswith("- ") - ] - else: - data[key] = None - i = j - continue - - # Inline list: [a, b, c] - if rest.startswith("[") and rest.endswith("]"): - inner = rest[1:-1] - if inner.strip(): - data[key] = [ - _parse_scalar(v.strip()) for v in inner.split(",") if v.strip() - ] - else: - data[key] = [] - else: - data[key] = _parse_scalar(rest) - - i += 1 - - return data, body - - -def _parse_relations_block(lines: List[str]) -> List[Dict[str, Any]]: - """Parse a YAML block list of relation dicts.""" - relations = [] - current = None - - for line in lines: - stripped = line.strip() - if not stripped: - continue - - if stripped.startswith("- "): - # New relation entry - current = {} - relations.append(current) - # Parse key:value on same line as - - rest = stripped[2:] - if ":" in rest: - k, _, v = rest.partition(":") - current[k.strip()] = _parse_scalar(v.strip()) - elif current is not None and ":" in stripped: - k, _, v = stripped.partition(":") - current[k.strip()] = _parse_scalar(v.strip()) - - return relations - - -# ============================================================================= -# HELPER FUNCTIONS -# ============================================================================= - - -def slugify(text: str, max_length: int = 60) -> str: - """Convert text to a URL-friendly slug.""" - text = text.lower().strip() - text = re.sub(r"[^\w\s-]", "", text) - text = re.sub(r"[\s_]+", "-", text) - text = re.sub(r"-+", "-", text) - text = text.strip("-") - if len(text) > max_length: - text = text[:max_length].rstrip("-") - return text or "untitled" - - -def make_filename(title: str, memory_id: str) -> str: - """Create a filename from title and UUID suffix.""" - slug = slugify(title) - suffix = memory_id[:6] - return f"{slug}-{suffix}.md" - - -def calculate_decay_score( - importance: float, days_since_access: float, access_count: int, type_weight: float -) -> float: - """Calculate decay score for a memory. - - decay_score = importance * e^(-lambda * days) * log2(access_count + 1) * type_weight - """ - time_factor = math.exp(-DECAY_LAMBDA * days_since_access) - usage_factor = math.log2(access_count + 1) if access_count > 0 else 1.0 - return importance * time_factor * usage_factor * type_weight - - -def _ollama_embed( - texts: List[str], - model: str = EMBEDDING_MODEL, - timeout: int = EMBEDDING_TIMEOUT, -) -> Optional[List[List[float]]]: - """Get embeddings from Ollama for a list of texts. - - Returns list of embedding vectors, or None if Ollama is unavailable. - """ - try: - payload = json.dumps({"model": model, "input": texts}).encode("utf-8") - req = urllib.request.Request( - f"{OLLAMA_URL}/api/embed", - data=payload, - headers={"Content-Type": "application/json"}, - method="POST", - ) - with urllib.request.urlopen(req, timeout=timeout) as resp: - if resp.status != 200: - return None - data = json.loads(resp.read().decode("utf-8")) - embeddings = data.get("embeddings") - if embeddings and isinstance(embeddings, list): - return embeddings - return None - except ( - ConnectionRefusedError, - URLError, - TimeoutError, - OSError, - json.JSONDecodeError, - ValueError, - KeyError, - ): - return None - - -def _load_memory_config(config_path: Optional[Path] = None) -> Dict[str, Any]: - """Read _config.json, return defaults if missing.""" - path = config_path or CONFIG_PATH - defaults = { - "embedding_provider": "ollama", - "openai_api_key": None, - "ollama_model": EMBEDDING_MODEL, - "openai_model": OPENAI_MODEL_DEFAULT, - } - if path.exists(): - try: - data = json.loads(path.read_text()) - for k, v in defaults.items(): - data.setdefault(k, v) - return data - except (json.JSONDecodeError, OSError): - pass - return defaults - - -def _openai_embed( - texts: List[str], - api_key: str, - model: str = OPENAI_MODEL_DEFAULT, - timeout: int = 30, -) -> Optional[List[List[float]]]: - """Get embeddings from OpenAI API (stdlib-only, same interface as _ollama_embed).""" - try: - payload = json.dumps({"input": texts, "model": model}).encode("utf-8") - req = urllib.request.Request( - OPENAI_EMBED_URL, - data=payload, - headers={ - "Content-Type": "application/json", - "Authorization": f"Bearer {api_key}", - }, - method="POST", - ) - with urllib.request.urlopen(req, timeout=timeout) as resp: - if resp.status != 200: - return None - data = json.loads(resp.read().decode("utf-8")) - items = data.get("data", []) - if items and isinstance(items, list): - # Sort by index to ensure order matches input - items.sort(key=lambda x: x.get("index", 0)) - return [item["embedding"] for item in items] - return None - except ( - ConnectionRefusedError, - URLError, - TimeoutError, - OSError, - json.JSONDecodeError, - ValueError, - KeyError, - ): - return None - - -def _cosine_similarity(a: List[float], b: List[float]) -> float: - """Compute cosine similarity between two vectors.""" - dot = sum(x * y for x, y in zip(a, b)) - norm_a = math.sqrt(sum(x * x for x in a)) - norm_b = math.sqrt(sum(x * x for x in b)) - if norm_a == 0.0 or norm_b == 0.0: - return 0.0 - return dot / (norm_a * norm_b) - - -def _make_edge_filename( - from_title: str, rel_type: str, to_title: str, edge_id: str -) -> str: - """Produce edge filename: {from-slug}--{TYPE}--{to-slug}-{6char}.md""" - from_slug = slugify(from_title, max_length=30) - to_slug = slugify(to_title, max_length=30) - suffix = edge_id[:6] - return f"{from_slug}--{rel_type}--{to_slug}-{suffix}.md" - - -def serialize_edge_frontmatter(data: Dict[str, Any]) -> str: - """Serialize an edge dict to YAML frontmatter string.""" - lines = ["---"] - for key in EDGE_FIELD_ORDER: - if key not in data: - continue - value = data[key] - if key in ("from_title", "to_title"): - lines.append(f"{key}: {_format_yaml_value(value, force_quote=True)}") - else: - lines.append(f"{key}: {_format_yaml_value(value)}") - lines.append("---") - return "\n".join(lines) +# Import everything from common for backward compatibility +from common import ( + CORE_MAX_CHARS, + CONFIG_PATH, + DECAY_LAMBDA, + EDGE_FIELD_ORDER, + EDGES_DIR_NAME, + EMBEDDING_MODEL, + EMBEDDING_TIMEOUT, + EMBEDDINGS_PATH, + FIELD_ORDER, + INDEX_PATH, + MEMORY_DIR, + OLLAMA_URL, + OPENAI_EMBED_URL, + OPENAI_MODEL_DEFAULT, + STATE_PATH, + THRESHOLD_ACTIVE, + THRESHOLD_DORMANT, + THRESHOLD_FADING, + TYPE_DIRS, + TYPE_WEIGHTS, + VALID_RELATION_TYPES, + VALID_TYPES, + _cosine_similarity, + _format_yaml_value, + _load_memory_config, + _make_edge_filename, + _needs_quoting, + _ollama_embed, + _openai_embed, + _parse_relations_block, + _parse_scalar, + _quote_yaml, + calculate_decay_score, + make_filename, + parse_frontmatter, + serialize_edge_frontmatter, + serialize_frontmatter, + slugify, +) + +# Import mixins +from edges import EdgesMixin +from embeddings import EmbeddingsMixin +from analysis import AnalysisMixin # ============================================================================= # CLIENT # ============================================================================= -class CognitiveMemoryClient: +class CognitiveMemoryClient(EdgesMixin, EmbeddingsMixin, AnalysisMixin): """Client for markdown-based cognitive memory system.""" def __init__(self, memory_dir: Optional[Path] = None): @@ -623,8 +159,6 @@ class CognitiveMemoryClient: def _save_state(self, state: Dict): """Write _state.json atomically, merging top-level keys to prevent race conditions.""" - import tempfile - # Merge with existing state to preserve keys written by concurrent processes if self.state_path.exists(): try: @@ -1116,280 +650,6 @@ class CognitiveMemoryClient: "path": str(path.relative_to(self.memory_dir)), } - def relate( - self, - from_id: str, - to_id: str, - rel_type: str, - strength: float = 0.8, - context: Optional[str] = None, - description: Optional[str] = None, - ) -> str: - """Create a relationship between two memories with an edge file. - - Returns edge_id string, or empty string if duplicate. - """ - if rel_type not in VALID_RELATION_TYPES: - raise ValueError( - f"Invalid relation type: {rel_type}. Valid: {VALID_RELATION_TYPES}" - ) - - from_path = self._resolve_memory_path(from_id) - to_path = self._resolve_memory_path(to_id) - if not from_path or not to_path: - raise ValueError(f"Memory not found: {from_id if not from_path else to_id}") - - # Read source memory - fm, body = self._read_memory_file(from_path) - relations = fm.get("relations", []) - - # Check for duplicate - for r in relations: - if r.get("target") == to_id and r.get("type") == rel_type: - return "" # Already exists - - # Read target memory for title - to_fm, to_body = self._read_memory_file(to_path) - - # Create edge file - edge_id = str(uuid.uuid4()) - now = datetime.now(timezone.utc).isoformat() - from_title = fm.get("title", from_id[:8]) - to_title = to_fm.get("title", to_id[:8]) - clamped_strength = max(0.0, min(1.0, strength)) - - edge_data = { - "id": edge_id, - "type": rel_type, - "from_id": from_id, - "from_title": from_title, - "to_id": to_id, - "to_title": to_title, - "strength": clamped_strength, - "created": now, - "updated": now, - } - edge_filename = _make_edge_filename(from_title, rel_type, to_title, edge_id) - edge_path = self.memory_dir / "graph" / EDGES_DIR_NAME / edge_filename - edge_fm_str = serialize_edge_frontmatter(edge_data) - edge_body = description.strip() if description else "" - edge_content = ( - f"{edge_fm_str}\n\n{edge_body}\n" if edge_body else f"{edge_fm_str}\n" - ) - edge_path.write_text(edge_content, encoding="utf-8") - - # Update source memory frontmatter with edge_id - new_rel = { - "target": to_id, - "type": rel_type, - "direction": "outgoing", - "strength": clamped_strength, - "edge_id": edge_id, - } - if context: - new_rel["context"] = context - relations.append(new_rel) - fm["relations"] = relations - fm["updated"] = now - self._write_memory_file(from_path, fm, body) - - # Add incoming relation to target with edge_id - to_relations = to_fm.get("relations", []) - has_incoming = any( - r.get("target") == from_id - and r.get("type") == rel_type - and r.get("direction") == "incoming" - for r in to_relations - ) - if not has_incoming: - incoming_rel = { - "target": from_id, - "type": rel_type, - "direction": "incoming", - "strength": clamped_strength, - "edge_id": edge_id, - } - if context: - incoming_rel["context"] = context - to_relations.append(incoming_rel) - to_fm["relations"] = to_relations - to_fm["updated"] = now - self._write_memory_file(to_path, to_fm, to_body) - - # Update memory index - rel_from = str(from_path.relative_to(self.memory_dir)) - rel_to = str(to_path.relative_to(self.memory_dir)) - self._update_index_entry(from_id, fm, rel_from) - self._update_index_entry(to_id, to_fm, rel_to) - - # Update edge index - self._update_edge_index( - edge_id, edge_data, f"graph/{EDGES_DIR_NAME}/{edge_filename}" - ) - - self._git_commit( - f"relate: {from_id[:8]} --{rel_type}--> {to_id[:8]}", - [from_path, to_path, edge_path], - ) - return edge_id - - def edge_get(self, edge_id: str) -> Optional[Dict[str, Any]]: - """Read full edge file (frontmatter + body).""" - path = self._resolve_edge_path(edge_id) - if not path: - return None - fm, body = self._read_memory_file(path) - return { - "id": fm.get("id", edge_id), - "type": fm.get("type", ""), - "from_id": fm.get("from_id", ""), - "from_title": fm.get("from_title", ""), - "to_id": fm.get("to_id", ""), - "to_title": fm.get("to_title", ""), - "strength": fm.get("strength", 0.8), - "created": fm.get("created", ""), - "updated": fm.get("updated", ""), - "description": body.strip(), - "path": str(path.relative_to(self.memory_dir)), - } - - def edge_search( - self, - query: Optional[str] = None, - types: Optional[List[str]] = None, - from_id: Optional[str] = None, - to_id: Optional[str] = None, - limit: int = 20, - ) -> List[Dict[str, Any]]: - """Search edges via index.""" - index = self._load_index() - results = [] - query_lower = query.lower().strip() if query else "" - - # Resolve partial IDs to full UUIDs via prefix match - if from_id: - entries = index.get("entries", {}) - resolved = self._resolve_prefix(from_id, entries) - from_id = resolved or from_id - if to_id: - entries = index.get("entries", {}) - resolved = self._resolve_prefix(to_id, entries) - to_id = resolved or to_id - - for eid, entry in index.get("edges", {}).items(): - if types and entry.get("type") not in types: - continue - if from_id and entry.get("from_id") != from_id: - continue - if to_id and entry.get("to_id") != to_id: - continue - if query_lower: - searchable = f"{entry.get('from_title', '')} {entry.get('to_title', '')} {entry.get('type', '')}".lower() - if query_lower not in searchable: - continue - results.append({"id": eid, **entry}) - - results.sort(key=lambda x: x.get("created", ""), reverse=True) - return results[:limit] - - def edge_update( - self, - edge_id: str, - description: Optional[str] = None, - strength: Optional[float] = None, - ) -> bool: - """Update edge body/metadata, sync strength to memory frontmatter.""" - path = self._resolve_edge_path(edge_id) - if not path: - return False - - fm, body = self._read_memory_file(path) - now = datetime.now(timezone.utc).isoformat() - - if description is not None: - body = description - if strength is not None: - fm["strength"] = max(0.0, min(1.0, strength)) - fm["updated"] = now - - # Write edge file - edge_fm_str = serialize_edge_frontmatter(fm) - edge_body = body.strip() if body else "" - edge_content = ( - f"{edge_fm_str}\n\n{edge_body}\n" if edge_body else f"{edge_fm_str}\n" - ) - path.write_text(edge_content, encoding="utf-8") - - # Sync strength to memory frontmatter if changed - if strength is not None: - for mid_key in ("from_id", "to_id"): - mid = fm.get(mid_key) - if not mid: - continue - mem_path = self._resolve_memory_path(mid) - if not mem_path: - continue - mem_fm, mem_body = self._read_memory_file(mem_path) - for rel in mem_fm.get("relations", []): - if rel.get("edge_id") == edge_id: - rel["strength"] = fm["strength"] - mem_fm["updated"] = now - self._write_memory_file(mem_path, mem_fm, mem_body) - - # Update edge index - rel_path = str(path.relative_to(self.memory_dir)) - self._update_edge_index(edge_id, fm, rel_path) - self._git_commit(f"edge-update: {edge_id[:8]}", [path]) - return True - - def edge_delete(self, edge_id: str) -> bool: - """Remove edge file and clean frontmatter refs from both memories.""" - path = self._resolve_edge_path(edge_id) - if not path: - return False - - fm, _ = self._read_memory_file(path) - now = datetime.now(timezone.utc).isoformat() - files_to_commit: List[Path] = [] - - # Clean edge_id references from both memories - for mid_key in ("from_id", "to_id"): - mid = fm.get(mid_key) - if not mid: - continue - mem_path = self._resolve_memory_path(mid) - if not mem_path: - continue - mem_fm, mem_body = self._read_memory_file(mem_path) - original_rels = mem_fm.get("relations", []) - mem_fm["relations"] = [ - r for r in original_rels if r.get("edge_id") != edge_id - ] - if len(mem_fm["relations"]) != len(original_rels): - mem_fm["updated"] = now - self._write_memory_file(mem_path, mem_fm, mem_body) - rel_p = str(mem_path.relative_to(self.memory_dir)) - self._update_index_entry(mid, mem_fm, rel_p) - files_to_commit.append(mem_path) - - # Remove edge file - path.unlink() - self._remove_edge_index(edge_id) - - # Git stage deletion - try: - rel_path = path.relative_to(self.memory_dir) - subprocess.run( - ["git", "rm", "--cached", str(rel_path)], - cwd=str(self.memory_dir), - capture_output=True, - timeout=5, - ) - except Exception: - pass - self._git_commit(f"edge-delete: {edge_id[:8]}") - return True - def search( self, query: Optional[str] = None, @@ -1680,204 +940,6 @@ class CognitiveMemoryClient: entries.sort(key=lambda x: x.get("created", ""), reverse=True) return entries[:limit] - def decay(self) -> Dict[str, Any]: - """Recalculate all decay scores. Updates _state.json.""" - index = self._load_index() - state = self._load_state() - now = datetime.now(timezone.utc) - updated_count = 0 - - for mid, entry in index.get("entries", {}).items(): - s = state.setdefault("entries", {}).setdefault( - mid, - { - "access_count": 0, - "last_accessed": entry.get("created", now.isoformat()), - "decay_score": 0.5, - }, - ) - - # Calculate days since last access - last_str = s.get("last_accessed", entry.get("created", "")) - try: - last_dt = datetime.fromisoformat(last_str.replace("Z", "+00:00")) - if last_dt.tzinfo is None: - last_dt = last_dt.replace(tzinfo=timezone.utc) - days = (now - last_dt).total_seconds() / 86400 - except (ValueError, AttributeError): - days = 30 # Default to 30 days if unparseable - - importance = entry.get("importance", 0.5) - mem_type = entry.get("type", "general") - type_weight = TYPE_WEIGHTS.get(mem_type, 1.0) - access_count = s.get("access_count", 0) - - # Check if pinned (vault) - path = entry.get("path", "") - if path.startswith("vault/"): - s["decay_score"] = 999.0 # Pinned memories never decay - else: - s["decay_score"] = round( - calculate_decay_score(importance, days, access_count, type_weight), - 4, - ) - - updated_count += 1 - - self._save_state(state) - - # Summary - state_entries = state.get("entries", {}) - return { - "updated": updated_count, - "active": sum( - 1 - for s in state_entries.values() - if s.get("decay_score", 0) >= THRESHOLD_ACTIVE - ), - "fading": sum( - 1 - for s in state_entries.values() - if THRESHOLD_FADING <= s.get("decay_score", 0) < THRESHOLD_ACTIVE - ), - "dormant": sum( - 1 - for s in state_entries.values() - if THRESHOLD_DORMANT <= s.get("decay_score", 0) < THRESHOLD_FADING - ), - "archived": sum( - 1 - for s in state_entries.values() - if 0 < s.get("decay_score", 0) < THRESHOLD_DORMANT - ), - } - - def core(self) -> str: - """Generate CORE.md from top memories by decay score.""" - index = self._load_index() - state = self._load_state() - - def _extract_summary(body: str, max_len: int = 80) -> str: - """Extract first meaningful sentence from memory body.""" - if not body or not body.strip(): - return "" - # Skip leading code blocks, headings, and list markers - for ln in body.strip().split("\n"): - stripped = ln.strip() - if not stripped: - continue - if stripped.startswith("```") or stripped.startswith("#"): - continue - first_line = stripped.lstrip("- *>").strip() - break - else: - return "" - if not first_line: - return "" - # Extract first sentence (split on '. ') - dot_pos = first_line.find(". ") - if 0 < dot_pos < max_len: - sentence = first_line[: dot_pos + 1] - else: - sentence = first_line - if len(sentence) > max_len: - sentence = sentence[: max_len - 3].rstrip() + "..." - return sentence - - # Collect all memories with decay scores - memories = [] - for mid, entry in index.get("entries", {}).items(): - s = state.get("entries", {}).get(mid, {}) - decay = s.get("decay_score", 0.5) - if decay < THRESHOLD_FADING: - continue # Skip low-relevance - memories.append( - { - "id": mid, - "title": entry.get("title", ""), - "type": entry.get("type", "general"), - "path": entry.get("path", ""), - "importance": entry.get("importance", 0.5), - "decay_score": decay, - "tags": entry.get("tags", []), - } - ) - - # Sort by decay score descending - memories.sort(key=lambda x: x["decay_score"], reverse=True) - - # Group by type category - categories = { - "Critical Solutions": ["solution"], - "Active Decisions": ["decision"], - "Key Fixes": ["fix"], - "Configurations": ["configuration"], - "Key Procedures": ["procedure"], - "Patterns & Workflows": ["code_pattern", "workflow"], - "Known Issues": ["problem", "error"], - "Insights": ["insight"], - "General": ["general"], - } - - now_str = datetime.now(timezone.utc).strftime("%Y-%m-%d") - active_count = sum(1 for m in memories if m["decay_score"] >= THRESHOLD_ACTIVE) - total_count = len(index.get("entries", {})) - - lines = [ - "# Memory Core (auto-generated)", - f"> Last updated: {now_str} | Active memories: {active_count}/{total_count} | Next refresh: daily (systemd timer)", - "", - ] - - char_count = sum(len(l) for l in lines) - - for cat_name, cat_types in categories.items(): - cat_memories = [m for m in memories if m["type"] in cat_types] - if not cat_memories: - continue - - section_lines = [f"## {cat_name}", ""] - for mem in cat_memories[:10]: # Cap per section (reduced for summaries) - path = mem["path"] - title = mem["title"] - tags = ", ".join(mem["tags"][:3]) if mem["tags"] else "" - - # Read body to extract summary - summary = "" - mem_path = self.memory_dir / path - if mem_path.exists(): - try: - _, body = self._read_memory_file(mem_path) - summary = _extract_summary(body) - except Exception: - pass - - line = f"- [{title}]({path})" - if summary: - line += f" - {summary}" - if tags: - line += f" ({tags})" - section_lines.append(line) - - # Check budget - added = sum(len(l) for l in section_lines) + 2 - if char_count + added > CORE_MAX_CHARS: - break - - section_lines.append("") - section_text = "\n".join(section_lines) - if char_count + len(section_text) > CORE_MAX_CHARS: - break - lines.extend(section_lines) - char_count += len(section_text) - - core_content = "\n".join(lines) - core_path = self.memory_dir / "CORE.md" - core_path.write_text(core_content, encoding="utf-8") - self._git_commit("core: regenerate CORE.md", [core_path]) - - return core_content - def episode( self, type: str, @@ -2022,1327 +1084,8 @@ class CognitiveMemoryClient: self._git_commit(f"pin: {fm.get('title', memory_id[:8])}", [new_path]) return True - # ------------------------------------------------------------------------- - # Tag analysis - # ------------------------------------------------------------------------- - - def tags_list(self, limit: int = 0) -> List[Dict[str, Any]]: - """List all tags with occurrence counts across all memories. - - Returns list of {"tag": str, "count": int} sorted by count descending. - """ - index = self._load_index() - tag_counts: Dict[str, int] = {} - - for entry in index.get("entries", {}).values(): - for tag in entry.get("tags", []): - normalized = tag.lower().strip() - if normalized: - tag_counts[normalized] = tag_counts.get(normalized, 0) + 1 - - results = [{"tag": tag, "count": count} for tag, count in tag_counts.items()] - results.sort(key=lambda x: x["count"], reverse=True) - - if limit > 0: - results = results[:limit] - return results - - def tags_related(self, tag: str, limit: int = 0) -> List[Dict[str, Any]]: - """Find tags that co-occur with the given tag. - - Returns list of {"tag": str, "co_occurrences": int, "memories_with_both": int} - sorted by co_occurrences descending. - """ - tag = tag.lower().strip() - index = self._load_index() - co_counts: Dict[str, int] = {} - - for entry in index.get("entries", {}).values(): - entry_tags = [t.lower().strip() for t in entry.get("tags", [])] - if tag not in entry_tags: - continue - for other_tag in entry_tags: - if other_tag != tag and other_tag: - co_counts[other_tag] = co_counts.get(other_tag, 0) + 1 - - results = [ - {"tag": t, "co_occurrences": count, "memories_with_both": count} - for t, count in co_counts.items() - ] - results.sort(key=lambda x: x["co_occurrences"], reverse=True) - - if limit > 0: - results = results[:limit] - return results - - def tags_suggest(self, memory_id: str) -> List[Dict[str, Any]]: - """Suggest tags for a memory based on co-occurrence patterns. - - Returns top 10 suggestions as {"tag": str, "score": int, "reason": str}. - """ - index = self._load_index() - entry = index.get("entries", {}).get(memory_id) - if not entry: - return [] - - existing_tags = set(t.lower().strip() for t in entry.get("tags", [])) - if not existing_tags: - return [] - - # For each existing tag, find co-occurring tags and accumulate scores - candidate_scores: Dict[str, int] = {} - candidate_sources: Dict[str, set] = {} - - for existing_tag in existing_tags: - co_tags = self.tags_related(existing_tag) - for co_entry in co_tags: - candidate = co_entry["tag"] - if candidate in existing_tags: - continue - candidate_scores[candidate] = ( - candidate_scores.get(candidate, 0) + co_entry["co_occurrences"] - ) - if candidate not in candidate_sources: - candidate_sources[candidate] = set() - candidate_sources[candidate].add(existing_tag) - - results = [] - for candidate, score in candidate_scores.items(): - sources = sorted(candidate_sources[candidate]) - reason = "co-occurs with: " + ", ".join(sources) - results.append({"tag": candidate, "score": score, "reason": reason}) - - results.sort(key=lambda x: x["score"], reverse=True) - return results[:10] - - # ------------------------------------------------------------------------- - # Embedding-based semantic search (hybrid: Ollama + OpenAI) - # ------------------------------------------------------------------------- - - def _get_embedding_provider(self) -> Dict[str, Any]: - """Load embedding config from _config.json.""" - return _load_memory_config(self.memory_dir / "_config.json") - - def _embed_texts_with_fallback( - self, - texts: List[str], - timeout: int = 300, - ) -> Tuple[Optional[List[List[float]]], str, str]: - """Embed texts with fallback chain. Returns (vectors, provider_used, model_used).""" - config = self._get_embedding_provider() - provider = config.get("embedding_provider", "ollama") - - # Try configured provider first - if provider == "openai": - api_key = config.get("openai_api_key") - model = config.get("openai_model", OPENAI_MODEL_DEFAULT) - if api_key: - vectors = _openai_embed(texts, api_key, model, timeout=timeout) - if vectors is not None: - return vectors, "openai", model - # Fallback to ollama - ollama_model = config.get("ollama_model", EMBEDDING_MODEL) - vectors = _ollama_embed(texts, model=ollama_model, timeout=timeout) - if vectors is not None: - return vectors, "ollama", ollama_model - else: - # ollama first - ollama_model = config.get("ollama_model", EMBEDDING_MODEL) - vectors = _ollama_embed(texts, model=ollama_model, timeout=timeout) - if vectors is not None: - return vectors, "ollama", ollama_model - # Fallback to openai - api_key = config.get("openai_api_key") - model = config.get("openai_model", OPENAI_MODEL_DEFAULT) - if api_key: - vectors = _openai_embed(texts, api_key, model, timeout=timeout) - if vectors is not None: - return vectors, "openai", model - - return None, "", "" - - def embed(self, if_changed: bool = False) -> Dict[str, Any]: - """Generate embeddings for all memories using configured provider. - - Detects provider changes and re-embeds everything (dimension mismatch safety). - Stores vectors in _embeddings.json (not git-tracked). - - Args: - if_changed: If True, skip embedding if the set of memory IDs hasn't - changed since last run (no new/deleted memories). - """ - index = self._load_index() - entries = index.get("entries", {}) - if not entries: - return { - "embedded": 0, - "provider": "none", - "model": "", - "path": str(EMBEDDINGS_PATH), - } - - # Check for provider change - embeddings_path = self.memory_dir / "_embeddings.json" - old_provider = "" - if embeddings_path.exists(): - try: - old_data = json.loads(embeddings_path.read_text()) - old_provider = old_data.get("provider", "ollama") - except (json.JSONDecodeError, OSError): - pass - - config = self._get_embedding_provider() - new_provider = config.get("embedding_provider", "ollama") - provider_changed = old_provider and old_provider != new_provider - if provider_changed: - print( - f"Provider changed ({old_provider} -> {new_provider}), re-embedding all memories...", - file=sys.stderr, - ) - - # Skip if nothing changed (unless provider switched) - if if_changed and not provider_changed and embeddings_path.exists(): - try: - old_data = json.loads(embeddings_path.read_text()) - embedded_ids = set(old_data.get("entries", {}).keys()) - index_ids = set(entries.keys()) - if embedded_ids == index_ids: - return { - "embedded": 0, - "skipped": True, - "reason": "no new or deleted memories", - "path": str(embeddings_path), - } - except (json.JSONDecodeError, OSError): - pass # Can't read old data, re-embed - - # Build texts to embed - memory_ids = list(entries.keys()) - texts = [] - for mid in memory_ids: - entry = entries[mid] - title = entry.get("title", "") - preview = entry.get("content_preview", "") - texts.append(f"{title}. {preview}") - - # Batch embed in groups of 50 - all_embeddings: Dict[str, List[float]] = {} - batch_size = 50 - provider_used = "" - model_used = "" - for i in range(0, len(texts), batch_size): - batch_texts = texts[i : i + batch_size] - batch_ids = memory_ids[i : i + batch_size] - vectors, provider_used, model_used = self._embed_texts_with_fallback( - batch_texts, - timeout=300, - ) - if vectors is None: - return { - "error": "All embedding providers unavailable", - "embedded": len(all_embeddings), - } - for mid, vec in zip(batch_ids, vectors): - all_embeddings[mid] = vec - - # Write embeddings file with provider info - embeddings_data = { - "provider": provider_used, - "model": model_used, - "updated": datetime.now(timezone.utc).isoformat(), - "entries": all_embeddings, - } - embeddings_path.write_text(json.dumps(embeddings_data, default=str)) - - return { - "embedded": len(all_embeddings), - "provider": provider_used, - "model": model_used, - "path": str(embeddings_path), - } - - def semantic_recall(self, query: str, limit: int = 10) -> List[Dict[str, Any]]: - """Search memories by semantic similarity using embeddings. - - Uses the same provider that generated stored embeddings to embed the query. - Skips vectors with dimension mismatch as safety guard. - """ - emb_data = self._load_embeddings_cached() - if emb_data is None: - return [] - - stored = emb_data.get("entries", {}) - if not stored: - return [] - - # Embed query with matching provider - stored_provider = emb_data.get("provider", "ollama") - config = self._get_embedding_provider() - query_vec = None - - if stored_provider == "openai": - api_key = config.get("openai_api_key") - model = emb_data.get("model", OPENAI_MODEL_DEFAULT) - if api_key: - vecs = _openai_embed([query], api_key, model) - if vecs: - query_vec = vecs[0] - if query_vec is None and stored_provider == "ollama": - stored_model = emb_data.get("model", EMBEDDING_MODEL) - vecs = _ollama_embed([query], model=stored_model) - if vecs: - query_vec = vecs[0] - # Last resort: try any available provider - if query_vec is None: - vecs, _, _ = self._embed_texts_with_fallback([query], timeout=30) - if vecs: - query_vec = vecs[0] - - if query_vec is None: - return [] - - query_dim = len(query_vec) - - # Score all memories by cosine similarity - index = self._load_index() - scored = [] - for mid, vec in stored.items(): - # Skip dimension mismatch - if len(vec) != query_dim: - continue - sim = _cosine_similarity(query_vec, vec) - entry = index.get("entries", {}).get(mid) - if entry: - scored.append( - { - "id": mid, - "title": entry.get("title", ""), - "type": entry.get("type", "general"), - "tags": entry.get("tags", []), - "similarity": round(sim, 4), - "path": entry.get("path", ""), - } - ) - - scored.sort(key=lambda x: x["similarity"], reverse=True) - return scored[:limit] - - def reflect( - self, - since: Optional[str] = None, - dry_run: bool = False, - ) -> Dict[str, Any]: - """Review recent memories, identify tag-based clusters, and output consolidation recommendations. - - Clusters memories that share 2+ tags using iterative union-find merging. - Does NOT auto-create insights; the agent reads output and decides what to store. - - Args: - since: ISO date string (YYYY-MM-DD) to filter memories from. Falls back to - _state.json's last_reflection, then 30 days ago. - dry_run: If True, return analysis without updating _state.json or logging episode. - - Returns: - Dict with clusters, total_memories_reviewed, clusters_found, and since date. - """ - now = datetime.now(timezone.utc) - state = self._load_state() - - # Determine time window - if since: - since_date = since - elif state.get("last_reflection"): - # last_reflection may be a full ISO timestamp; extract date portion - since_date = state["last_reflection"][:10] - else: - since_dt = now - timedelta(days=30) - since_date = since_dt.strftime("%Y-%m-%d") - - # Load recent memories from index - index = self._load_index() - recent_memories = [] - for mid, entry in index.get("entries", {}).items(): - created = entry.get("created", "") - # Compare date portion only (YYYY-MM-DD) - created_date = created[:10] if created else "" - if created_date >= since_date: - recent_memories.append( - { - "id": mid, - "title": entry.get("title", ""), - "type": entry.get("type", "general"), - "tags": [t.lower().strip() for t in entry.get("tags", [])], - } - ) - - total_reviewed = len(recent_memories) - - # Union-find clustering by tag overlap >= 2 - # parent[i] = index of parent in recent_memories list - n = len(recent_memories) - parent = list(range(n)) - - def find(x: int) -> int: - while parent[x] != x: - parent[x] = parent[parent[x]] # path compression - x = parent[x] - return x - - def union(a: int, b: int): - ra, rb = find(a), find(b) - if ra != rb: - parent[rb] = ra - - # Compare all pairs, check tag overlap - for i in range(n): - tags_i = set(recent_memories[i]["tags"]) - for j in range(i + 1, n): - tags_j = set(recent_memories[j]["tags"]) - if len(tags_i & tags_j) >= 2: - union(i, j) - - # Group by root - clusters_map: Dict[int, List[int]] = {} - for i in range(n): - root = find(i) - clusters_map.setdefault(root, []).append(i) - - # Build output clusters (only clusters with 2+ members) - clusters = [] - cluster_id = 0 - for indices in clusters_map.values(): - if len(indices) < 2: - continue - cluster_id += 1 - members = [] - all_tag_sets = [] - types_seen = set() - for idx in indices: - mem = recent_memories[idx] - members.append( - { - "id": mem["id"], - "title": mem["title"], - "type": mem["type"], - "tags": mem["tags"], - } - ) - all_tag_sets.append(set(mem["tags"])) - types_seen.add(mem["type"]) - - # Common tags = intersection of ALL members - common_tags = ( - sorted(set.intersection(*all_tag_sets)) if all_tag_sets else [] - ) - - # Shared tags = tags appearing in 2+ members - tag_counts: Dict[str, int] = {} - for ts in all_tag_sets: - for t in ts: - tag_counts[t] = tag_counts.get(t, 0) + 1 - shared_tags = sorted(t for t, c in tag_counts.items() if c >= 2) - - # Suggested topic - tag_label = ( - ", ".join(common_tags[:4]) - if common_tags - else ", ".join(shared_tags[:4]) - ) - type_label = ", ".join(sorted(types_seen)) - suggested_topic = f"Pattern: {tag_label} across {type_label}" - - clusters.append( - { - "cluster_id": cluster_id, - "members": members, - "common_tags": common_tags, - "shared_tags": shared_tags, - "suggested_topic": suggested_topic, - "member_count": len(members), - } - ) - - # Sort clusters by member count descending - clusters.sort(key=lambda c: c["member_count"], reverse=True) - # Re-number after sorting - for i, c in enumerate(clusters): - c["cluster_id"] = i + 1 - - result = { - "clusters": clusters, - "total_memories_reviewed": total_reviewed, - "clusters_found": len(clusters), - "since": since_date, - } - - if not dry_run: - # Update state with last_reflection timestamp - state["last_reflection"] = now.isoformat() - self._save_state(state) - - # Log an episode entry - self.episode( - type="reflection", - title=f"Reflection: {len(clusters)} clusters from {total_reviewed} memories", - tags=["reflection", "cognitive-memory"], - summary=f"Reviewed {total_reviewed} memories since {since_date}, found {len(clusters)} clusters", - ) - - # Regenerate REFLECTION.md - self.reflection_summary() - - return result - - def merge( - self, - keep_id: str, - absorb_id: str, - dry_run: bool = False, - ) -> Dict[str, Any]: - """Merge two memories: absorb one into another. - - The 'keep' memory absorbs the content, tags, and relations from - the 'absorb' memory. The absorb memory is then deleted. - All other memories referencing absorb_id are updated to point to keep_id. - - If dry_run=True, returns what would change without writing anything. - """ - # Resolve both memory file paths - keep_path = self._resolve_memory_path(keep_id) - if not keep_path: - raise ValueError(f"Keep memory not found: {keep_id}") - absorb_path = self._resolve_memory_path(absorb_id) - if not absorb_path: - raise ValueError(f"Absorb memory not found: {absorb_id}") - - # Read both memory files - keep_fm, keep_body = self._read_memory_file(keep_path) - absorb_fm, absorb_body = self._read_memory_file(absorb_path) - - keep_title = keep_fm.get("title", keep_id[:8]) - absorb_title = absorb_fm.get("title", absorb_id[:8]) - - # Combine content - merged_body = keep_body - if absorb_body.strip(): - merged_body = ( - keep_body.rstrip() - + f"\n\n---\n*Merged from: {absorb_title}*\n\n" - + absorb_body.strip() - ) - - # Merge tags (sorted, deduplicated) - keep_tags = keep_fm.get("tags", []) - absorb_tags = absorb_fm.get("tags", []) - merged_tags = sorted(list(set(keep_tags + absorb_tags))) - - # importance = max of both - merged_importance = max( - keep_fm.get("importance", 0.5), - absorb_fm.get("importance", 0.5), - ) - - # Merge relations: combine both relation lists - keep_rels = list(keep_fm.get("relations", [])) - absorb_rels = list(absorb_fm.get("relations", [])) - combined_rels = keep_rels + absorb_rels - - # Replace any relation targeting absorb_id with keep_id - for rel in combined_rels: - if rel.get("target") == absorb_id: - rel["target"] = keep_id - - # Deduplicate by (target, type) tuple - seen = set() - deduped_rels = [] - for rel in combined_rels: - key = (rel.get("target"), rel.get("type")) - if key not in seen: - seen.add(key) - deduped_rels.append(rel) - - # Remove self-referential relations (target == keep_id) - merged_rels = [r for r in deduped_rels if r.get("target") != keep_id] - - # Scan all other memories for relations targeting absorb_id - index = self._load_index() - updated_others = [] - for mid, entry in index.get("entries", {}).items(): - if mid in (keep_id, absorb_id): - continue - rels = entry.get("relations", []) - needs_update = any(r.get("target") == absorb_id for r in rels) - if needs_update: - updated_others.append(mid) - - if dry_run: - return { - "dry_run": True, - "keep_id": keep_id, - "keep_title": keep_title, - "absorb_id": absorb_id, - "absorb_title": absorb_title, - "merged_tags": merged_tags, - "merged_importance": merged_importance, - "merged_relations_count": len(merged_rels), - "other_memories_updated": len(updated_others), - "other_memory_ids": updated_others, - } - - # Write updated keep memory file - now = datetime.now(timezone.utc).isoformat() - keep_fm["tags"] = merged_tags - keep_fm["importance"] = merged_importance - keep_fm["relations"] = merged_rels - keep_fm["updated"] = now - self._write_memory_file(keep_path, keep_fm, merged_body) - - # Update index for keep memory (refresh content_preview with merged body) - keep_rel_path = str(keep_path.relative_to(self.memory_dir)) - preview = merged_body.strip()[:200] - if len(merged_body.strip()) > 200: - last_space = preview.rfind(" ") - if last_space > 0: - preview = preview[:last_space] - self._update_index_entry( - keep_id, keep_fm, keep_rel_path, content_preview=preview - ) - - # Update all other memories that reference absorb_id - for mid in updated_others: - other_path = self._resolve_memory_path(mid) - if not other_path: - continue - try: - other_fm, other_body = self._read_memory_file(other_path) - other_rels = other_fm.get("relations", []) - for rel in other_rels: - if rel.get("target") == absorb_id: - rel["target"] = keep_id - # Deduplicate after replacement - seen_other = set() - deduped_other = [] - for rel in other_rels: - key = (rel.get("target"), rel.get("type")) - if key not in seen_other: - seen_other.add(key) - deduped_other.append(rel) - # Remove self-referential - other_fm["relations"] = [ - r for r in deduped_other if r.get("target") != mid - ] - other_fm["updated"] = now - self._write_memory_file(other_path, other_fm, other_body) - other_rel_path = str(other_path.relative_to(self.memory_dir)) - self._update_index_entry(mid, other_fm, other_rel_path) - except Exception: - pass - - # Delete absorb memory file and remove from index/state - absorb_path.unlink() - self._remove_index_entry(absorb_id) - state = self._load_state() - state.get("entries", {}).pop(absorb_id, None) - self._save_state(state) - - # Git: stage absorb deletion - try: - absorb_rel = absorb_path.relative_to(self.memory_dir) - subprocess.run( - ["git", "rm", "--cached", str(absorb_rel)], - cwd=str(self.memory_dir), - capture_output=True, - timeout=5, - ) - except Exception: - pass - - self._git_commit(f"merge: {keep_title} absorbed {absorb_title}") - - return { - "success": True, - "keep_id": keep_id, - "keep_title": keep_title, - "absorb_id": absorb_id, - "absorb_title": absorb_title, - "merged_tags": merged_tags, - "merged_importance": merged_importance, - "merged_relations_count": len(merged_rels), - "other_memories_updated": len(updated_others), - } - - def reflection_summary(self) -> str: - """Generate REFLECTION.md with patterns and insights from the memory system. - - Writes to ~/.claude/memory/REFLECTION.md and git-commits the file. - Returns the generated content. - """ - index = self._load_index() - state = self._load_state() - entries = index.get("entries", {}) - state_entries = state.get("entries", {}) - now_str = datetime.now(timezone.utc).strftime("%Y-%m-%d") - - # Last reflection date - last_reflection = state.get("last_reflection", "") - last_reflection_date = last_reflection[:10] if last_reflection else "never" - - # Count total reflections from episode files - total_reflections = 0 - episodes_dir = self.memory_dir / "episodes" - if episodes_dir.exists(): - for ep_file in sorted(episodes_dir.glob("*.md")): - try: - text = ep_file.read_text(encoding="utf-8") - # Count lines that look like reflection episode entries - for line in text.split("\n"): - if "Reflection:" in line and line.strip().startswith("## "): - total_reflections += 1 - except OSError: - pass - - lines = [ - "# Reflection Summary (auto-generated)", - f"> Last updated: {now_str} | Last reflection: {last_reflection_date} | Total reflections: {total_reflections}", - "", - ] - - # === Themes section === - # Get top tags, then find co-occurrence pairs - lines.append("## Themes") - lines.append("Top tag co-occurrences revealing recurring themes:") - lines.append("") - - top_tags = self.tags_list(limit=10) - # Build co-occurrence pairs across top tags - pair_data: Dict[Tuple[str, str], List[str]] = {} # (tag1, tag2) -> [titles] - for tag_info in top_tags[:5]: - tag = tag_info["tag"] - co_tags = self.tags_related(tag, limit=5) - for co in co_tags: - other = co["tag"] - # Canonical pair ordering - pair = tuple(sorted([tag, other])) - if pair in pair_data: - continue - # Find memories with both tags and collect titles - titles = [] - for mid, entry in entries.items(): - mem_tags = [t.lower().strip() for t in entry.get("tags", [])] - if pair[0] in mem_tags and pair[1] in mem_tags: - titles.append(entry.get("title", "untitled")) - if titles: - pair_data[pair] = titles - - # Sort pairs by memory count descending, show top 8 - sorted_pairs = sorted(pair_data.items(), key=lambda x: len(x[1]), reverse=True) - for (tag_a, tag_b), titles in sorted_pairs[:8]: - example_titles = ", ".join(f'"{t}"' for t in titles[:3]) - lines.append( - f"- **{tag_a} + {tag_b}**: {len(titles)} memories ({example_titles})" - ) - - if not sorted_pairs: - lines.append("- No co-occurrence data available yet.") - lines.append("") - - # === Cross-Project Patterns section === - lines.append("## Cross-Project Patterns") - lines.append("Tags that span multiple projects:") - lines.append("") - - known_projects = [ - "major-domo", - "paper-dynasty", - "homelab", - "vagabond-rpg", - "foundryvtt", - "strat-gameplay-webapp", - ] - # Build tag -> {project -> count} mapping - tag_project_map: Dict[str, Dict[str, int]] = {} - for mid, entry in entries.items(): - mem_tags = [t.lower().strip() for t in entry.get("tags", [])] - # Identify which projects this memory belongs to - mem_projects = [t for t in mem_tags if t in known_projects] - # For non-project tags, record which projects they appear in - non_project_tags = [t for t in mem_tags if t not in known_projects] - for tag in non_project_tags: - if tag not in tag_project_map: - tag_project_map[tag] = {} - for proj in mem_projects: - tag_project_map[tag][proj] = tag_project_map[tag].get(proj, 0) + 1 - - # Filter to tags spanning 2+ projects, sort by project count then total - cross_project = [] - for tag, projects in tag_project_map.items(): - if len(projects) >= 2: - total = sum(projects.values()) - cross_project.append((tag, projects, total)) - cross_project.sort(key=lambda x: (len(x[1]), x[2]), reverse=True) - - for tag, projects, total in cross_project[:10]: - proj_parts = ", ".join( - f"{p} ({c})" - for p, c in sorted(projects.items(), key=lambda x: x[1], reverse=True) - ) - lines.append(f"- **{tag}**: appears in {proj_parts}") - - if not cross_project: - lines.append("- No cross-project patterns detected yet.") - lines.append("") - - # === Most Accessed section === - lines.append("## Most Accessed") - lines.append("Top 10 memories by access count:") - lines.append("") - - # Sort state entries by access_count descending - accessed = [] - for mid, s in state_entries.items(): - count = s.get("access_count", 0) - if count > 0: - entry = entries.get(mid) - if entry: - accessed.append( - ( - mid, - entry.get("title", "untitled"), - entry.get("path", ""), - count, - ) - ) - accessed.sort(key=lambda x: x[3], reverse=True) - - for mid, title, path, count in accessed[:10]: - lines.append(f"1. [{title}]({path}) - {count} accesses") - - if not accessed: - lines.append("- No access data recorded yet.") - lines.append("") - - # === Recent Insights section === - lines.append("## Recent Insights") - lines.append("Insight-type memories:") - lines.append("") - - insights = [] - for mid, entry in entries.items(): - if entry.get("type") == "insight": - insights.append((mid, entry)) - # Sort by created date descending - insights.sort(key=lambda x: x[1].get("created", ""), reverse=True) - - for mid, entry in insights[:10]: - title = entry.get("title", "untitled") - path = entry.get("path", "") - preview = entry.get("content_preview", "") - if preview: - lines.append(f"- [{title}]({path}) - {preview[:80]}") - else: - lines.append(f"- [{title}]({path})") - - if not insights: - lines.append("- No insight memories stored yet.") - lines.append("") - - # === Consolidation History section === - lines.append("## Consolidation History") - lines.append("") - - merge_count = 0 - if episodes_dir.exists(): - for ep_file in sorted(episodes_dir.glob("*.md")): - try: - text = ep_file.read_text(encoding="utf-8") - for line_text in text.split("\n"): - stripped = line_text.strip() - if stripped.startswith("## ") and "merge" in stripped.lower(): - merge_count += 1 - except OSError: - pass - - lines.append(f"- Total merges performed: {merge_count}") - lines.append("") - - content = "\n".join(lines) - - # Write REFLECTION.md - reflection_path = self.memory_dir / "REFLECTION.md" - reflection_path.write_text(content, encoding="utf-8") - self._git_commit("reflection: regenerate REFLECTION.md", [reflection_path]) - - return content - - -# ============================================================================= -# CLI INTERFACE -# ============================================================================= - - -def main(): - parser = argparse.ArgumentParser( - description="Cognitive Memory - Markdown-based memory system with decay scoring", - formatter_class=argparse.RawDescriptionHelpFormatter, - ) - subparsers = parser.add_subparsers(dest="command", help="Commands") - - # store - sp = subparsers.add_parser("store", help="Store a new memory") - sp.add_argument( - "--type", "-t", required=True, choices=VALID_TYPES, help="Memory type" - ) - sp.add_argument("--title", required=True, help="Memory title") - sp.add_argument("--content", "-c", required=True, help="Memory content") - sp.add_argument("--tags", help="Comma-separated tags") - sp.add_argument( - "--importance", "-i", type=float, default=0.5, help="Importance 0.0-1.0" - ) - sp.add_argument("--confidence", type=float, default=0.8, help="Confidence 0.0-1.0") - sp.add_argument( - "--episode", - action="store_true", - default=False, - help="Also log an episode entry", - ) - - # recall - sp = subparsers.add_parser("recall", help="Search memories by query") - sp.add_argument("query", help="Search query") - sp.add_argument("--types", help="Comma-separated memory types") - sp.add_argument("--limit", "-n", type=int, default=10, help="Max results") - sp.add_argument( - "--no-semantic", - action="store_true", - default=False, - help="Disable semantic search (keyword-only, faster)", - ) - - # get - sp = subparsers.add_parser("get", help="Get memory by ID") - sp.add_argument("memory_id", help="Memory UUID") - - # relate - sp = subparsers.add_parser("relate", help="Create relationship") - sp.add_argument("from_id", help="Source memory UUID") - sp.add_argument("to_id", help="Target memory UUID") - sp.add_argument("rel_type", choices=VALID_RELATION_TYPES, help="Relationship type") - sp.add_argument("--strength", type=float, default=0.8, help="Strength 0.0-1.0") - sp.add_argument("--context", help="Context description") - sp.add_argument("--description", help="Rich edge description body") - - # edge-get - sp = subparsers.add_parser("edge-get", help="Get edge by ID") - sp.add_argument("edge_id", help="Edge UUID") - - # edge-search - sp = subparsers.add_parser("edge-search", help="Search edges") - sp.add_argument("--query", "-q", help="Text query") - sp.add_argument("--types", help="Comma-separated relation types") - sp.add_argument("--from-id", help="Filter by source memory ID") - sp.add_argument("--to-id", help="Filter by target memory ID") - sp.add_argument("--limit", "-n", type=int, default=20, help="Max results") - - # edge-update - sp = subparsers.add_parser("edge-update", help="Update an edge") - sp.add_argument("edge_id", help="Edge UUID") - sp.add_argument("--description", help="New description body") - sp.add_argument("--strength", type=float, help="New strength 0.0-1.0") - - # edge-delete - sp = subparsers.add_parser("edge-delete", help="Delete an edge") - sp.add_argument("edge_id", help="Edge UUID") - - # search - sp = subparsers.add_parser("search", help="Filter memories") - sp.add_argument("--query", "-q", help="Text query") - sp.add_argument("--types", help="Comma-separated memory types") - sp.add_argument("--tags", help="Comma-separated tags") - sp.add_argument("--min-importance", type=float, help="Minimum importance") - sp.add_argument("--limit", "-n", type=int, default=20, help="Max results") - - # update - sp = subparsers.add_parser("update", help="Update a memory") - sp.add_argument("memory_id", help="Memory UUID") - sp.add_argument("--title", help="New title") - sp.add_argument("--content", help="New content") - sp.add_argument("--tags", help="New tags (comma-separated)") - sp.add_argument("--importance", type=float, help="New importance") - - # delete - sp = subparsers.add_parser("delete", help="Delete a memory") - sp.add_argument("memory_id", help="Memory UUID") - sp.add_argument("--force", "-f", action="store_true", help="Skip confirmation") - - # related - sp = subparsers.add_parser("related", help="Get related memories") - sp.add_argument("memory_id", help="Memory UUID") - sp.add_argument("--types", help="Comma-separated relationship types") - sp.add_argument("--depth", type=int, default=1, help="Traversal depth 1-5") - - # stats - subparsers.add_parser("stats", help="Show statistics") - - # recent - sp = subparsers.add_parser("recent", help="Recently created memories") - sp.add_argument("--limit", "-n", type=int, default=20, help="Max results") - - # decay - subparsers.add_parser("decay", help="Recalculate all decay scores") - - # core - subparsers.add_parser("core", help="Generate CORE.md") - - # episode - sp = subparsers.add_parser("episode", help="Log episode entry") - sp.add_argument("--type", "-t", required=True, help="Entry type") - sp.add_argument("--title", required=True, help="Entry title") - sp.add_argument("--tags", help="Comma-separated tags") - sp.add_argument("--summary", "-s", help="Summary text") - sp.add_argument("--memory-link", help="Path to related memory file") - - # reindex - subparsers.add_parser("reindex", help="Rebuild index from files") - - # embed - embed_parser = subparsers.add_parser( - "embed", help="Generate embeddings for all memories via Ollama" - ) - embed_parser.add_argument( - "--if-changed", - action="store_true", - help="Skip if no memories were added or deleted since last embed", - ) - - # pin - sp = subparsers.add_parser("pin", help="Move memory to vault (never decays)") - sp.add_argument("memory_id", help="Memory UUID") - - # reflect - sp = subparsers.add_parser( - "reflect", help="Review recent memories and identify clusters" - ) - sp.add_argument("--since", help="ISO date (YYYY-MM-DD) to review from") - sp.add_argument( - "--dry-run", action="store_true", help="Preview without updating state" - ) - - # merge - sp = subparsers.add_parser( - "merge", help="Merge two memories (absorb one into another)" - ) - sp.add_argument("keep_id", help="Memory UUID to keep") - sp.add_argument("absorb_id", help="Memory UUID to absorb and delete") - sp.add_argument( - "--dry-run", action="store_true", help="Preview merge without writing" - ) - - # reflection - subparsers.add_parser("reflection", help="Generate REFLECTION.md summary") - - # tags - sp = subparsers.add_parser("tags", help="Tag analysis commands") - tags_sub = sp.add_subparsers(dest="tags_command") - sp2 = tags_sub.add_parser("list", help="List all tags with counts") - sp2.add_argument("--limit", "-n", type=int, default=0, help="Max results (0=all)") - sp3 = tags_sub.add_parser("related", help="Find co-occurring tags") - sp3.add_argument("tag", help="Tag to analyze") - sp3.add_argument("--limit", "-n", type=int, default=0, help="Max results (0=all)") - sp4 = tags_sub.add_parser("suggest", help="Suggest tags for a memory") - sp4.add_argument("memory_id", help="Memory UUID") - - # procedure - sp = subparsers.add_parser( - "procedure", help="Store a procedure memory (convenience wrapper)" - ) - sp.add_argument("--title", required=True, help="Procedure title") - sp.add_argument("--content", "-c", required=True, help="Procedure description") - sp.add_argument("--steps", help="Comma-separated ordered steps") - sp.add_argument("--preconditions", help="Comma-separated preconditions") - sp.add_argument("--postconditions", help="Comma-separated postconditions") - sp.add_argument("--tags", help="Comma-separated tags") - sp.add_argument( - "--importance", "-i", type=float, default=0.5, help="Importance 0.0-1.0" - ) - - # config - sp = subparsers.add_parser("config", help="Manage embedding config") - sp.add_argument("--show", action="store_true", help="Display current config") - sp.add_argument( - "--provider", choices=["ollama", "openai"], help="Set embedding provider" - ) - sp.add_argument("--openai-key", help="Set OpenAI API key") - sp.add_argument( - "--ollama-model", help="Set Ollama model name (e.g. qwen3-embedding:8b)" - ) - - args = parser.parse_args() - - if not args.command: - parser.print_help() - sys.exit(1) - - client = CognitiveMemoryClient() - result = None - - if args.command == "store": - tags = [t.strip() for t in args.tags.split(",")] if args.tags else None - memory_id = client.store( - type=args.type, - title=args.title, - content=args.content, - tags=tags, - importance=args.importance, - confidence=args.confidence, - ) - result = {"success": True, "memory_id": memory_id} - - if args.episode: - # Get the relative path from the index for memory_link - index = client._load_index() - entry = index.get("entries", {}).get(memory_id, {}) - rel_path = entry.get("path", "") - - # Truncate content at word boundary for summary (max 100 chars) - summary = args.content.strip()[:100] - if len(args.content.strip()) > 100: - last_space = summary.rfind(" ") - if last_space > 0: - summary = summary[:last_space] - - client.episode( - type=args.type, - title=args.title, - tags=tags or [], - summary=summary, - memory_link=rel_path, - ) - result["episode_logged"] = True - - elif args.command == "recall": - types = [t.strip() for t in args.types.split(",")] if args.types else None - result = client.recall( - args.query, - memory_types=types, - limit=args.limit, - semantic=not args.no_semantic, - ) - - elif args.command == "get": - result = client.get(args.memory_id) - if not result: - result = {"error": "Memory not found"} - - elif args.command == "relate": - edge_id = client.relate( - args.from_id, - args.to_id, - args.rel_type, - strength=args.strength, - context=args.context, - description=args.description, - ) - result = {"success": bool(edge_id), "edge_id": edge_id} - - elif args.command == "edge-get": - result = client.edge_get(args.edge_id) - if not result: - result = {"error": "Edge not found"} - - elif args.command == "edge-search": - types = [t.strip() for t in args.types.split(",")] if args.types else None - result = client.edge_search( - query=args.query, - types=types, - from_id=getattr(args, "from_id", None), - to_id=getattr(args, "to_id", None), - limit=args.limit, - ) - - elif args.command == "edge-update": - success = client.edge_update( - args.edge_id, - description=args.description, - strength=args.strength, - ) - result = {"success": success} - - elif args.command == "edge-delete": - success = client.edge_delete(args.edge_id) - result = {"success": success} - - elif args.command == "search": - types = [t.strip() for t in args.types.split(",")] if args.types else None - tags = [t.strip() for t in args.tags.split(",")] if args.tags else None - result = client.search( - query=args.query, - memory_types=types, - tags=tags, - min_importance=args.min_importance, - limit=args.limit, - ) - - elif args.command == "update": - tags = [t.strip() for t in args.tags.split(",")] if args.tags else None - success = client.update( - args.memory_id, - title=args.title, - content=args.content, - tags=tags, - importance=args.importance, - ) - result = {"success": success} - - elif args.command == "delete": - if not args.force: - mem = client.get(args.memory_id) - if mem: - print(f"Deleting: {mem.get('title')}", file=sys.stderr) - success = client.delete(args.memory_id) - result = {"success": success} - - elif args.command == "related": - types = [t.strip() for t in args.types.split(",")] if args.types else None - result = client.related(args.memory_id, rel_types=types, max_depth=args.depth) - - elif args.command == "stats": - result = client.stats() - - elif args.command == "recent": - result = client.recent(limit=args.limit) - - elif args.command == "decay": - result = client.decay() - - elif args.command == "core": - content = client.core() - # Print path, not content (content is written to file) - result = { - "success": True, - "path": str(MEMORY_DIR / "CORE.md"), - "chars": len(content), - } - - elif args.command == "episode": - tags = [t.strip() for t in args.tags.split(",")] if args.tags else None - client.episode( - type=args.type, - title=args.title, - tags=tags, - summary=args.summary, - memory_link=args.memory_link, - ) - result = {"success": True} - - elif args.command == "reindex": - count = client.reindex() - result = {"success": True, "indexed": count} - - elif args.command == "embed": - if_changed = getattr(args, "if_changed", False) - if not if_changed: - print( - "Generating embeddings (this may take a while if model needs to be pulled)...", - file=sys.stderr, - ) - result = client.embed(if_changed=if_changed) - - elif args.command == "pin": - success = client.pin(args.memory_id) - result = {"success": success} - - elif args.command == "reflect": - result = client.reflect( - since=args.since, - dry_run=args.dry_run, - ) - - elif args.command == "merge": - result = client.merge( - keep_id=args.keep_id, - absorb_id=args.absorb_id, - dry_run=args.dry_run, - ) - - elif args.command == "reflection": - content = client.reflection_summary() - result = { - "success": True, - "path": str(MEMORY_DIR / "REFLECTION.md"), - "chars": len(content), - } - - elif args.command == "tags": - if args.tags_command == "list": - result = client.tags_list(limit=args.limit) - elif args.tags_command == "related": - result = client.tags_related(args.tag, limit=args.limit) - elif args.tags_command == "suggest": - result = client.tags_suggest(args.memory_id) - else: - # No subcommand given, print tags help - # Re-parse to get the tags subparser for help output - for action in parser._subparsers._actions: - if isinstance(action, argparse._SubParsersAction): - tags_parser = action.choices.get("tags") - if tags_parser: - tags_parser.print_help() - break - sys.exit(1) - - elif args.command == "procedure": - tags = [t.strip() for t in args.tags.split(",")] if args.tags else None - steps = [s.strip() for s in args.steps.split(",")] if args.steps else None - preconditions = ( - [p.strip() for p in args.preconditions.split(",")] - if args.preconditions - else None - ) - postconditions = ( - [p.strip() for p in args.postconditions.split(",")] - if args.postconditions - else None - ) - memory_id = client.store( - type="procedure", - title=args.title, - content=args.content, - tags=tags, - importance=args.importance, - steps=steps, - preconditions=preconditions, - postconditions=postconditions, - ) - result = {"success": True, "memory_id": memory_id} - - elif args.command == "config": - config_path = MEMORY_DIR / "_config.json" - config = _load_memory_config(config_path) - changed = False - - if args.provider: - config["embedding_provider"] = args.provider - changed = True - if args.openai_key: - config["openai_api_key"] = args.openai_key - changed = True - if args.ollama_model: - config["ollama_model"] = args.ollama_model - changed = True - - if changed: - config_path.write_text(json.dumps(config, indent=2)) - result = {"success": True, "updated": True} - elif args.show or not changed: - # Mask API key for display - display = dict(config) - key = display.get("openai_api_key") - if key and isinstance(key, str) and len(key) > 8: - display["openai_api_key"] = key[:4] + "..." + key[-4:] - result = display - - print(json.dumps(result, indent=2, default=str)) - if __name__ == "__main__": + from cli import main + main() diff --git a/skills/cognitive-memory/common.py b/skills/cognitive-memory/common.py new file mode 100644 index 0000000..6590f45 --- /dev/null +++ b/skills/cognitive-memory/common.py @@ -0,0 +1,521 @@ +""" +Cognitive Memory - Common Constants & Helpers + +Module-level constants, YAML parsing, slug generation, decay calculation, +embedding helpers, and cosine similarity. Shared by all other modules. +""" + +import json +import math +import re +import urllib.request +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple +from urllib.error import URLError + +# ============================================================================= +# CONSTANTS +# ============================================================================= + +MEMORY_DIR = Path.home() / ".claude" / "memory" +INDEX_PATH = MEMORY_DIR / "_index.json" +STATE_PATH = MEMORY_DIR / "_state.json" +EMBEDDINGS_PATH = MEMORY_DIR / "_embeddings.json" +OLLAMA_URL = "http://localhost:11434" +EMBEDDING_MODEL = "nomic-embed-text" +EMBEDDING_TIMEOUT = 5 # seconds +CONFIG_PATH = MEMORY_DIR / "_config.json" +OPENAI_EMBED_URL = "https://api.openai.com/v1/embeddings" +OPENAI_MODEL_DEFAULT = "text-embedding-3-small" + +# Memory type -> directory name mapping +TYPE_DIRS = { + "solution": "solutions", + "fix": "fixes", + "decision": "decisions", + "configuration": "configurations", + "problem": "problems", + "workflow": "workflows", + "code_pattern": "code-patterns", + "error": "errors", + "general": "general", + "procedure": "procedures", + "insight": "insights", +} + +VALID_TYPES = list(TYPE_DIRS.keys()) + +# Decay model type weights +TYPE_WEIGHTS = { + "decision": 1.3, + "solution": 1.2, + "insight": 1.25, + "code_pattern": 1.1, + "configuration": 1.1, + "fix": 1.0, + "workflow": 1.0, + "problem": 0.9, + "error": 0.8, + "general": 0.8, + "procedure": 1.4, +} + +DECAY_LAMBDA = 0.03 # Half-life ~23 days + +# Decay score thresholds +THRESHOLD_ACTIVE = 0.5 +THRESHOLD_FADING = 0.2 +THRESHOLD_DORMANT = 0.05 + +# Relationship types (subset from MemoryGraph, focused on most useful) +VALID_RELATION_TYPES = [ + "SOLVES", + "CAUSES", + "BUILDS_ON", + "ALTERNATIVE_TO", + "REQUIRES", + "FOLLOWS", + "RELATED_TO", +] + +# Edge file constants +EDGES_DIR_NAME = "edges" +EDGE_FIELD_ORDER = [ + "id", + "type", + "from_id", + "from_title", + "to_id", + "to_title", + "strength", + "created", + "updated", +] + +# Frontmatter field order for consistent output +FIELD_ORDER = [ + "id", + "type", + "title", + "tags", + "importance", + "confidence", + "steps", + "preconditions", + "postconditions", + "created", + "updated", + "relations", +] + +# CORE.md token budget (approximate, 1 token ~= 4 chars) +CORE_MAX_CHARS = 12000 # ~3K tokens + + +# ============================================================================= +# YAML FRONTMATTER PARSING (stdlib only) +# ============================================================================= + + +def _needs_quoting(s: str) -> bool: + """Check if a YAML string value needs quoting.""" + if not s: + return True + if any(c in s for c in ":#{}[]&*?|>!%@`"): + return True + try: + float(s) + return True + except ValueError: + pass + if s.lower() in ("true", "false", "null", "yes", "no", "on", "off"): + return True + return False + + +def _quote_yaml(s: str) -> str: + """Quote a string for YAML, escaping internal quotes.""" + escaped = s.replace("\\", "\\\\").replace('"', '\\"') + return f'"{escaped}"' + + +def _format_yaml_value(value: Any, force_quote: bool = False) -> str: + """Format a Python value for YAML output.""" + if value is None: + return "null" + if isinstance(value, bool): + return "true" if value else "false" + if isinstance(value, (int, float)): + return str(value) + s = str(value) + if force_quote or _needs_quoting(s): + return _quote_yaml(s) + return s + + +def _parse_scalar(value: str) -> Any: + """Parse a YAML scalar value to Python type.""" + v = value.strip() + if not v or v == "null": + return None + if v == "true": + return True + if v == "false": + return False + # Try numeric + try: + if "." in v: + return float(v) + return int(v) + except ValueError: + pass + # Strip quotes + if (v.startswith('"') and v.endswith('"')) or ( + v.startswith("'") and v.endswith("'") + ): + return v[1:-1] + return v + + +def serialize_frontmatter(data: Dict[str, Any]) -> str: + """Serialize a dict to YAML frontmatter string (between --- markers).""" + lines = ["---"] + + for key in FIELD_ORDER: + if key not in data: + continue + value = data[key] + + if key == "tags" and isinstance(value, list): + if value: + items = ", ".join(_format_yaml_value(t) for t in value) + lines.append(f"tags: [{items}]") + else: + lines.append("tags: []") + + elif key in ("steps", "preconditions", "postconditions") and isinstance( + value, list + ): + if not value: + continue + lines.append(f"{key}:") + for item in value: + lines.append(f" - {_format_yaml_value(str(item), force_quote=True)}") + + elif key == "relations" and isinstance(value, list): + if not value: + continue + lines.append("relations:") + for rel in value: + first = True + for rk in [ + "target", + "type", + "direction", + "strength", + "context", + "edge_id", + ]: + if rk not in rel: + continue + rv = rel[rk] + prefix = " - " if first else " " + force_q = rk in ("context",) + lines.append( + f"{prefix}{rk}: {_format_yaml_value(rv, force_quote=force_q)}" + ) + first = False + + elif key == "title": + lines.append(f"title: {_format_yaml_value(value, force_quote=True)}") + + else: + lines.append(f"{key}: {_format_yaml_value(value)}") + + lines.append("---") + return "\n".join(lines) + + +def parse_frontmatter(text: str) -> Tuple[Dict[str, Any], str]: + """Parse YAML frontmatter and body from markdown text. + + Returns (frontmatter_dict, body_text). + """ + if not text.startswith("---\n"): + return {}, text + + # Find closing --- + end_match = re.search(r"\n---\s*\n", text[3:]) + if not end_match: + # Try end of string + if text.rstrip().endswith("---"): + end_pos = text.rstrip().rfind("\n---") + if end_pos <= 3: + return {}, text + fm_text = text[4:end_pos] + body = "" + else: + return {}, text + else: + end_pos = end_match.start() + 3 # Offset from text[3:] + fm_text = text[4:end_pos] + body = text[end_pos + end_match.end() - end_match.start() :] + + body = body.lstrip("\n") + data = {} + lines = fm_text.split("\n") + i = 0 + + while i < len(lines): + line = lines[i] + + # Skip empty lines + if not line.strip(): + i += 1 + continue + + # Must be a top-level key (no leading whitespace) + if line[0] == " ": + i += 1 + continue + + if ":" not in line: + i += 1 + continue + + key, _, rest = line.partition(":") + key = key.strip() + rest = rest.strip() + + if not rest: + # Block value - collect indented lines + block_lines = [] + j = i + 1 + while j < len(lines) and lines[j] and lines[j][0] == " ": + block_lines.append(lines[j]) + j += 1 + + if key == "relations": + data["relations"] = _parse_relations_block(block_lines) + elif block_lines and block_lines[0].strip().startswith("- "): + # Simple list + data[key] = [ + _parse_scalar(bl.strip().lstrip("- ")) + for bl in block_lines + if bl.strip().startswith("- ") + ] + else: + data[key] = None + i = j + continue + + # Inline list: [a, b, c] + if rest.startswith("[") and rest.endswith("]"): + inner = rest[1:-1] + if inner.strip(): + data[key] = [ + _parse_scalar(v.strip()) for v in inner.split(",") if v.strip() + ] + else: + data[key] = [] + else: + data[key] = _parse_scalar(rest) + + i += 1 + + return data, body + + +def _parse_relations_block(lines: List[str]) -> List[Dict[str, Any]]: + """Parse a YAML block list of relation dicts.""" + relations = [] + current = None + + for line in lines: + stripped = line.strip() + if not stripped: + continue + + if stripped.startswith("- "): + # New relation entry + current = {} + relations.append(current) + # Parse key:value on same line as - + rest = stripped[2:] + if ":" in rest: + k, _, v = rest.partition(":") + current[k.strip()] = _parse_scalar(v.strip()) + elif current is not None and ":" in stripped: + k, _, v = stripped.partition(":") + current[k.strip()] = _parse_scalar(v.strip()) + + return relations + + +# ============================================================================= +# HELPER FUNCTIONS +# ============================================================================= + + +def slugify(text: str, max_length: int = 60) -> str: + """Convert text to a URL-friendly slug.""" + text = text.lower().strip() + text = re.sub(r"[^\w\s-]", "", text) + text = re.sub(r"[\s_]+", "-", text) + text = re.sub(r"-+", "-", text) + text = text.strip("-") + if len(text) > max_length: + text = text[:max_length].rstrip("-") + return text or "untitled" + + +def make_filename(title: str, memory_id: str) -> str: + """Create a filename from title and UUID suffix.""" + slug = slugify(title) + suffix = memory_id[:6] + return f"{slug}-{suffix}.md" + + +def calculate_decay_score( + importance: float, days_since_access: float, access_count: int, type_weight: float +) -> float: + """Calculate decay score for a memory. + + decay_score = importance * e^(-lambda * days) * log2(access_count + 1) * type_weight + """ + time_factor = math.exp(-DECAY_LAMBDA * days_since_access) + usage_factor = math.log2(access_count + 1) if access_count > 0 else 1.0 + return importance * time_factor * usage_factor * type_weight + + +def _ollama_embed( + texts: List[str], + model: str = EMBEDDING_MODEL, + timeout: int = EMBEDDING_TIMEOUT, +) -> Optional[List[List[float]]]: + """Get embeddings from Ollama for a list of texts. + + Returns list of embedding vectors, or None if Ollama is unavailable. + """ + try: + payload = json.dumps({"model": model, "input": texts}).encode("utf-8") + req = urllib.request.Request( + f"{OLLAMA_URL}/api/embed", + data=payload, + headers={"Content-Type": "application/json"}, + method="POST", + ) + with urllib.request.urlopen(req, timeout=timeout) as resp: + if resp.status != 200: + return None + data = json.loads(resp.read().decode("utf-8")) + embeddings = data.get("embeddings") + if embeddings and isinstance(embeddings, list): + return embeddings + return None + except ( + ConnectionRefusedError, + URLError, + TimeoutError, + OSError, + json.JSONDecodeError, + ValueError, + KeyError, + ): + return None + + +def _load_memory_config(config_path: Optional[Path] = None) -> Dict[str, Any]: + """Read _config.json, return defaults if missing.""" + path = config_path or CONFIG_PATH + defaults = { + "embedding_provider": "ollama", + "openai_api_key": None, + "ollama_model": EMBEDDING_MODEL, + "openai_model": OPENAI_MODEL_DEFAULT, + } + if path.exists(): + try: + data = json.loads(path.read_text()) + for k, v in defaults.items(): + data.setdefault(k, v) + return data + except (json.JSONDecodeError, OSError): + pass + return defaults + + +def _openai_embed( + texts: List[str], + api_key: str, + model: str = OPENAI_MODEL_DEFAULT, + timeout: int = 30, +) -> Optional[List[List[float]]]: + """Get embeddings from OpenAI API (stdlib-only, same interface as _ollama_embed).""" + try: + payload = json.dumps({"input": texts, "model": model}).encode("utf-8") + req = urllib.request.Request( + OPENAI_EMBED_URL, + data=payload, + headers={ + "Content-Type": "application/json", + "Authorization": f"Bearer {api_key}", + }, + method="POST", + ) + with urllib.request.urlopen(req, timeout=timeout) as resp: + if resp.status != 200: + return None + data = json.loads(resp.read().decode("utf-8")) + items = data.get("data", []) + if items and isinstance(items, list): + # Sort by index to ensure order matches input + items.sort(key=lambda x: x.get("index", 0)) + return [item["embedding"] for item in items] + return None + except ( + ConnectionRefusedError, + URLError, + TimeoutError, + OSError, + json.JSONDecodeError, + ValueError, + KeyError, + ): + return None + + +def _cosine_similarity(a: List[float], b: List[float]) -> float: + """Compute cosine similarity between two vectors.""" + dot = sum(x * y for x, y in zip(a, b)) + norm_a = math.sqrt(sum(x * x for x in a)) + norm_b = math.sqrt(sum(x * x for x in b)) + if norm_a == 0.0 or norm_b == 0.0: + return 0.0 + return dot / (norm_a * norm_b) + + +def _make_edge_filename( + from_title: str, rel_type: str, to_title: str, edge_id: str +) -> str: + """Produce edge filename: {from-slug}--{TYPE}--{to-slug}-{6char}.md""" + from_slug = slugify(from_title, max_length=30) + to_slug = slugify(to_title, max_length=30) + suffix = edge_id[:6] + return f"{from_slug}--{rel_type}--{to_slug}-{suffix}.md" + + +def serialize_edge_frontmatter(data: Dict[str, Any]) -> str: + """Serialize an edge dict to YAML frontmatter string.""" + lines = ["---"] + for key in EDGE_FIELD_ORDER: + if key not in data: + continue + value = data[key] + if key in ("from_title", "to_title"): + lines.append(f"{key}: {_format_yaml_value(value, force_quote=True)}") + else: + lines.append(f"{key}: {_format_yaml_value(value)}") + lines.append("---") + return "\n".join(lines) diff --git a/skills/cognitive-memory/edges.py b/skills/cognitive-memory/edges.py new file mode 100644 index 0000000..033acdb --- /dev/null +++ b/skills/cognitive-memory/edges.py @@ -0,0 +1,297 @@ +"""EdgesMixin: edge/relationship operations for CognitiveMemoryClient. + +This module is part of the client.py mixin refactor. EdgesMixin provides all +edge CRUD and search operations. It relies on helper methods resolved at runtime +via MRO from the composed CognitiveMemoryClient class. +""" + +import uuid +import subprocess +from datetime import datetime, timezone +from pathlib import Path +from typing import Optional, List, Dict, Any + +from common import ( + VALID_RELATION_TYPES, + EDGES_DIR_NAME, + _make_edge_filename, + serialize_edge_frontmatter, +) + + +class EdgesMixin: + """Mixin providing edge/relationship operations for CognitiveMemoryClient.""" + + def relate( + self, + from_id: str, + to_id: str, + rel_type: str, + strength: float = 0.8, + context: Optional[str] = None, + description: Optional[str] = None, + ) -> str: + """Create a relationship between two memories with an edge file. + + Returns edge_id string, or empty string if duplicate. + """ + if rel_type not in VALID_RELATION_TYPES: + raise ValueError( + f"Invalid relation type: {rel_type}. Valid: {VALID_RELATION_TYPES}" + ) + + from_path = self._resolve_memory_path(from_id) + to_path = self._resolve_memory_path(to_id) + if not from_path or not to_path: + raise ValueError(f"Memory not found: {from_id if not from_path else to_id}") + + # Read source memory + fm, body = self._read_memory_file(from_path) + relations = fm.get("relations", []) + + # Check for duplicate + for r in relations: + if r.get("target") == to_id and r.get("type") == rel_type: + return "" # Already exists + + # Read target memory for title + to_fm, to_body = self._read_memory_file(to_path) + + # Create edge file + edge_id = str(uuid.uuid4()) + now = datetime.now(timezone.utc).isoformat() + from_title = fm.get("title", from_id[:8]) + to_title = to_fm.get("title", to_id[:8]) + clamped_strength = max(0.0, min(1.0, strength)) + + edge_data = { + "id": edge_id, + "type": rel_type, + "from_id": from_id, + "from_title": from_title, + "to_id": to_id, + "to_title": to_title, + "strength": clamped_strength, + "created": now, + "updated": now, + } + edge_filename = _make_edge_filename(from_title, rel_type, to_title, edge_id) + edge_path = self.memory_dir / "graph" / EDGES_DIR_NAME / edge_filename + edge_fm_str = serialize_edge_frontmatter(edge_data) + edge_body = description.strip() if description else "" + edge_content = ( + f"{edge_fm_str}\n\n{edge_body}\n" if edge_body else f"{edge_fm_str}\n" + ) + edge_path.write_text(edge_content, encoding="utf-8") + + # Update source memory frontmatter with edge_id + new_rel = { + "target": to_id, + "type": rel_type, + "direction": "outgoing", + "strength": clamped_strength, + "edge_id": edge_id, + } + if context: + new_rel["context"] = context + relations.append(new_rel) + fm["relations"] = relations + fm["updated"] = now + self._write_memory_file(from_path, fm, body) + + # Add incoming relation to target with edge_id + to_relations = to_fm.get("relations", []) + has_incoming = any( + r.get("target") == from_id + and r.get("type") == rel_type + and r.get("direction") == "incoming" + for r in to_relations + ) + if not has_incoming: + incoming_rel = { + "target": from_id, + "type": rel_type, + "direction": "incoming", + "strength": clamped_strength, + "edge_id": edge_id, + } + if context: + incoming_rel["context"] = context + to_relations.append(incoming_rel) + to_fm["relations"] = to_relations + to_fm["updated"] = now + self._write_memory_file(to_path, to_fm, to_body) + + # Update memory index + rel_from = str(from_path.relative_to(self.memory_dir)) + rel_to = str(to_path.relative_to(self.memory_dir)) + self._update_index_entry(from_id, fm, rel_from) + self._update_index_entry(to_id, to_fm, rel_to) + + # Update edge index + self._update_edge_index( + edge_id, edge_data, f"graph/{EDGES_DIR_NAME}/{edge_filename}" + ) + + self._git_commit( + f"relate: {from_id[:8]} --{rel_type}--> {to_id[:8]}", + [from_path, to_path, edge_path], + ) + return edge_id + + def edge_get(self, edge_id: str) -> Optional[Dict[str, Any]]: + """Read full edge file (frontmatter + body).""" + path = self._resolve_edge_path(edge_id) + if not path: + return None + fm, body = self._read_memory_file(path) + return { + "id": fm.get("id", edge_id), + "type": fm.get("type", ""), + "from_id": fm.get("from_id", ""), + "from_title": fm.get("from_title", ""), + "to_id": fm.get("to_id", ""), + "to_title": fm.get("to_title", ""), + "strength": fm.get("strength", 0.8), + "created": fm.get("created", ""), + "updated": fm.get("updated", ""), + "description": body.strip(), + "path": str(path.relative_to(self.memory_dir)), + } + + def edge_search( + self, + query: Optional[str] = None, + types: Optional[List[str]] = None, + from_id: Optional[str] = None, + to_id: Optional[str] = None, + limit: int = 20, + ) -> List[Dict[str, Any]]: + """Search edges via index.""" + index = self._load_index() + results = [] + query_lower = query.lower().strip() if query else "" + + # Resolve partial IDs to full UUIDs via prefix match + if from_id: + entries = index.get("entries", {}) + resolved = self._resolve_prefix(from_id, entries) + from_id = resolved or from_id + if to_id: + entries = index.get("entries", {}) + resolved = self._resolve_prefix(to_id, entries) + to_id = resolved or to_id + + for eid, entry in index.get("edges", {}).items(): + if types and entry.get("type") not in types: + continue + if from_id and entry.get("from_id") != from_id: + continue + if to_id and entry.get("to_id") != to_id: + continue + if query_lower: + searchable = f"{entry.get('from_title', '')} {entry.get('to_title', '')} {entry.get('type', '')}".lower() + if query_lower not in searchable: + continue + results.append({"id": eid, **entry}) + + results.sort(key=lambda x: x.get("created", ""), reverse=True) + return results[:limit] + + def edge_update( + self, + edge_id: str, + description: Optional[str] = None, + strength: Optional[float] = None, + ) -> bool: + """Update edge body/metadata, sync strength to memory frontmatter.""" + path = self._resolve_edge_path(edge_id) + if not path: + return False + + fm, body = self._read_memory_file(path) + now = datetime.now(timezone.utc).isoformat() + + if description is not None: + body = description + if strength is not None: + fm["strength"] = max(0.0, min(1.0, strength)) + fm["updated"] = now + + # Write edge file + edge_fm_str = serialize_edge_frontmatter(fm) + edge_body = body.strip() if body else "" + edge_content = ( + f"{edge_fm_str}\n\n{edge_body}\n" if edge_body else f"{edge_fm_str}\n" + ) + path.write_text(edge_content, encoding="utf-8") + + # Sync strength to memory frontmatter if changed + if strength is not None: + for mid_key in ("from_id", "to_id"): + mid = fm.get(mid_key) + if not mid: + continue + mem_path = self._resolve_memory_path(mid) + if not mem_path: + continue + mem_fm, mem_body = self._read_memory_file(mem_path) + for rel in mem_fm.get("relations", []): + if rel.get("edge_id") == edge_id: + rel["strength"] = fm["strength"] + mem_fm["updated"] = now + self._write_memory_file(mem_path, mem_fm, mem_body) + + # Update edge index + rel_path = str(path.relative_to(self.memory_dir)) + self._update_edge_index(edge_id, fm, rel_path) + self._git_commit(f"edge-update: {edge_id[:8]}", [path]) + return True + + def edge_delete(self, edge_id: str) -> bool: + """Remove edge file and clean frontmatter refs from both memories.""" + path = self._resolve_edge_path(edge_id) + if not path: + return False + + fm, _ = self._read_memory_file(path) + now = datetime.now(timezone.utc).isoformat() + files_to_commit: List[Path] = [] + + # Clean edge_id references from both memories + for mid_key in ("from_id", "to_id"): + mid = fm.get(mid_key) + if not mid: + continue + mem_path = self._resolve_memory_path(mid) + if not mem_path: + continue + mem_fm, mem_body = self._read_memory_file(mem_path) + original_rels = mem_fm.get("relations", []) + mem_fm["relations"] = [ + r for r in original_rels if r.get("edge_id") != edge_id + ] + if len(mem_fm["relations"]) != len(original_rels): + mem_fm["updated"] = now + self._write_memory_file(mem_path, mem_fm, mem_body) + rel_p = str(mem_path.relative_to(self.memory_dir)) + self._update_index_entry(mid, mem_fm, rel_p) + files_to_commit.append(mem_path) + + # Remove edge file + path.unlink() + self._remove_edge_index(edge_id) + + # Git stage deletion + try: + rel_path = path.relative_to(self.memory_dir) + subprocess.run( + ["git", "rm", "--cached", str(rel_path)], + cwd=str(self.memory_dir), + capture_output=True, + timeout=5, + ) + except Exception: + pass + self._git_commit(f"edge-delete: {edge_id[:8]}") + return True diff --git a/skills/cognitive-memory/embeddings.py b/skills/cognitive-memory/embeddings.py new file mode 100644 index 0000000..460c4d3 --- /dev/null +++ b/skills/cognitive-memory/embeddings.py @@ -0,0 +1,233 @@ +"""EmbeddingsMixin for CognitiveMemoryClient. + +Provides embedding generation and semantic search capabilities. Extracted from +client.py as part of the mixin refactor. Methods rely on shared state (memory_dir, +_load_index, _load_embeddings_cached) provided by the base class via MRO. +""" + +import json +import sys +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional, Tuple + +from common import ( + EMBEDDING_MODEL, + EMBEDDINGS_PATH, + OPENAI_MODEL_DEFAULT, + _cosine_similarity, + _load_memory_config, + _ollama_embed, + _openai_embed, +) + + +class EmbeddingsMixin: + """Mixin providing embedding generation and semantic recall for memory clients.""" + + def _get_embedding_provider(self) -> Dict[str, Any]: + """Load embedding config from _config.json.""" + return _load_memory_config(self.memory_dir / "_config.json") + + def _embed_texts_with_fallback( + self, + texts: List[str], + timeout: int = 300, + ) -> Tuple[Optional[List[List[float]]], str, str]: + """Embed texts with fallback chain. Returns (vectors, provider_used, model_used).""" + config = self._get_embedding_provider() + provider = config.get("embedding_provider", "ollama") + + # Try configured provider first + if provider == "openai": + api_key = config.get("openai_api_key") + model = config.get("openai_model", OPENAI_MODEL_DEFAULT) + if api_key: + vectors = _openai_embed(texts, api_key, model, timeout=timeout) + if vectors is not None: + return vectors, "openai", model + # Fallback to ollama + ollama_model = config.get("ollama_model", EMBEDDING_MODEL) + vectors = _ollama_embed(texts, model=ollama_model, timeout=timeout) + if vectors is not None: + return vectors, "ollama", ollama_model + else: + # ollama first + ollama_model = config.get("ollama_model", EMBEDDING_MODEL) + vectors = _ollama_embed(texts, model=ollama_model, timeout=timeout) + if vectors is not None: + return vectors, "ollama", ollama_model + # Fallback to openai + api_key = config.get("openai_api_key") + model = config.get("openai_model", OPENAI_MODEL_DEFAULT) + if api_key: + vectors = _openai_embed(texts, api_key, model, timeout=timeout) + if vectors is not None: + return vectors, "openai", model + + return None, "", "" + + def embed(self, if_changed: bool = False) -> Dict[str, Any]: + """Generate embeddings for all memories using configured provider. + + Detects provider changes and re-embeds everything (dimension mismatch safety). + Stores vectors in _embeddings.json (not git-tracked). + + Args: + if_changed: If True, skip embedding if the set of memory IDs hasn't + changed since last run (no new/deleted memories). + """ + index = self._load_index() + entries = index.get("entries", {}) + if not entries: + return { + "embedded": 0, + "provider": "none", + "model": "", + "path": str(EMBEDDINGS_PATH), + } + + # Check for provider change + embeddings_path = self.memory_dir / "_embeddings.json" + old_provider = "" + if embeddings_path.exists(): + try: + old_data = json.loads(embeddings_path.read_text()) + old_provider = old_data.get("provider", "ollama") + except (json.JSONDecodeError, OSError): + pass + + config = self._get_embedding_provider() + new_provider = config.get("embedding_provider", "ollama") + provider_changed = old_provider and old_provider != new_provider + if provider_changed: + print( + f"Provider changed ({old_provider} -> {new_provider}), re-embedding all memories...", + file=sys.stderr, + ) + + # Skip if nothing changed (unless provider switched) + if if_changed and not provider_changed and embeddings_path.exists(): + try: + old_data = json.loads(embeddings_path.read_text()) + embedded_ids = set(old_data.get("entries", {}).keys()) + index_ids = set(entries.keys()) + if embedded_ids == index_ids: + return { + "embedded": 0, + "skipped": True, + "reason": "no new or deleted memories", + "path": str(embeddings_path), + } + except (json.JSONDecodeError, OSError): + pass # Can't read old data, re-embed + + # Build texts to embed + memory_ids = list(entries.keys()) + texts = [] + for mid in memory_ids: + entry = entries[mid] + title = entry.get("title", "") + preview = entry.get("content_preview", "") + texts.append(f"{title}. {preview}") + + # Batch embed in groups of 50 + all_embeddings: Dict[str, List[float]] = {} + batch_size = 50 + provider_used = "" + model_used = "" + for i in range(0, len(texts), batch_size): + batch_texts = texts[i : i + batch_size] + batch_ids = memory_ids[i : i + batch_size] + vectors, provider_used, model_used = self._embed_texts_with_fallback( + batch_texts, + timeout=300, + ) + if vectors is None: + return { + "error": "All embedding providers unavailable", + "embedded": len(all_embeddings), + } + for mid, vec in zip(batch_ids, vectors): + all_embeddings[mid] = vec + + # Write embeddings file with provider info + embeddings_data = { + "provider": provider_used, + "model": model_used, + "updated": datetime.now(timezone.utc).isoformat(), + "entries": all_embeddings, + } + embeddings_path.write_text(json.dumps(embeddings_data, default=str)) + + return { + "embedded": len(all_embeddings), + "provider": provider_used, + "model": model_used, + "path": str(embeddings_path), + } + + def semantic_recall(self, query: str, limit: int = 10) -> List[Dict[str, Any]]: + """Search memories by semantic similarity using embeddings. + + Uses the same provider that generated stored embeddings to embed the query. + Skips vectors with dimension mismatch as safety guard. + """ + emb_data = self._load_embeddings_cached() + if emb_data is None: + return [] + + stored = emb_data.get("entries", {}) + if not stored: + return [] + + # Embed query with matching provider + stored_provider = emb_data.get("provider", "ollama") + config = self._get_embedding_provider() + query_vec = None + + if stored_provider == "openai": + api_key = config.get("openai_api_key") + model = emb_data.get("model", OPENAI_MODEL_DEFAULT) + if api_key: + vecs = _openai_embed([query], api_key, model) + if vecs: + query_vec = vecs[0] + if query_vec is None and stored_provider == "ollama": + stored_model = emb_data.get("model", EMBEDDING_MODEL) + vecs = _ollama_embed([query], model=stored_model) + if vecs: + query_vec = vecs[0] + # Last resort: try any available provider + if query_vec is None: + vecs, _, _ = self._embed_texts_with_fallback([query], timeout=30) + if vecs: + query_vec = vecs[0] + + if query_vec is None: + return [] + + query_dim = len(query_vec) + + # Score all memories by cosine similarity + index = self._load_index() + scored = [] + for mid, vec in stored.items(): + # Skip dimension mismatch + if len(vec) != query_dim: + continue + sim = _cosine_similarity(query_vec, vec) + entry = index.get("entries", {}).get(mid) + if entry: + scored.append( + { + "id": mid, + "title": entry.get("title", ""), + "type": entry.get("type", "general"), + "tags": entry.get("tags", []), + "similarity": round(sim, 4), + "path": entry.get("path", ""), + } + ) + + scored.sort(key=lambda x: x["similarity"], reverse=True) + return scored[:limit]