""" Cognitive Memory - Analysis Mixin Decay scoring, CORE.md generation, tag analysis, reflection clustering, memory merging, and REFLECTION.md generation. """ import json import subprocess import sys from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any, Dict, List, Optional, Tuple from common import ( CORE_MAX_CHARS, MEMORY_DIR, THRESHOLD_ACTIVE, THRESHOLD_DORMANT, THRESHOLD_FADING, TYPE_WEIGHTS, calculate_decay_score, ) class AnalysisMixin: """Mixin providing analysis operations for CognitiveMemoryClient.""" def decay(self) -> Dict[str, Any]: """Recalculate all decay scores. Updates _state.json.""" index = self._load_index() state = self._load_state() now = datetime.now(timezone.utc) updated_count = 0 for mid, entry in index.get("entries", {}).items(): s = state.setdefault("entries", {}).setdefault( mid, { "access_count": 0, "last_accessed": entry.get("created", now.isoformat()), "decay_score": 0.5, }, ) # Calculate days since last access last_str = s.get("last_accessed", entry.get("created", "")) try: last_dt = datetime.fromisoformat(last_str.replace("Z", "+00:00")) if last_dt.tzinfo is None: last_dt = last_dt.replace(tzinfo=timezone.utc) days = (now - last_dt).total_seconds() / 86400 except (ValueError, AttributeError): days = 30 # Default to 30 days if unparseable importance = entry.get("importance", 0.5) mem_type = entry.get("type", "general") type_weight = TYPE_WEIGHTS.get(mem_type, 1.0) access_count = s.get("access_count", 0) # Check if pinned (vault) path = entry.get("path", "") if path.startswith("vault/"): s["decay_score"] = 999.0 # Pinned memories never decay else: s["decay_score"] = round( calculate_decay_score(importance, days, access_count, type_weight), 4, ) updated_count += 1 self._save_state(state) # Summary state_entries = state.get("entries", {}) return { "updated": updated_count, "active": sum( 1 for s in state_entries.values() if s.get("decay_score", 0) >= THRESHOLD_ACTIVE ), "fading": sum( 1 for s in state_entries.values() if THRESHOLD_FADING <= s.get("decay_score", 0) < THRESHOLD_ACTIVE ), "dormant": sum( 1 for s in state_entries.values() if THRESHOLD_DORMANT <= s.get("decay_score", 0) < THRESHOLD_FADING ), "archived": sum( 1 for s in state_entries.values() if 0 < s.get("decay_score", 0) < THRESHOLD_DORMANT ), } def core(self) -> str: """Generate CORE.md from top memories by decay score.""" index = self._load_index() state = self._load_state() def _extract_summary(body: str, max_len: int = 80) -> str: """Extract first meaningful sentence from memory body.""" if not body or not body.strip(): return "" # Skip leading code blocks, headings, and list markers for ln in body.strip().split("\n"): stripped = ln.strip() if not stripped: continue if stripped.startswith("```") or stripped.startswith("#"): continue first_line = stripped.lstrip("- *>").strip() break else: return "" if not first_line: return "" # Extract first sentence (split on '. ') dot_pos = first_line.find(". ") if 0 < dot_pos < max_len: sentence = first_line[: dot_pos + 1] else: sentence = first_line if len(sentence) > max_len: sentence = sentence[: max_len - 3].rstrip() + "..." return sentence # Collect all memories with decay scores memories = [] for mid, entry in index.get("entries", {}).items(): s = state.get("entries", {}).get(mid, {}) decay = s.get("decay_score", 0.5) if decay < THRESHOLD_FADING: continue # Skip low-relevance memories.append( { "id": mid, "title": entry.get("title", ""), "type": entry.get("type", "general"), "path": entry.get("path", ""), "importance": entry.get("importance", 0.5), "decay_score": decay, "tags": entry.get("tags", []), } ) # Sort by decay score descending memories.sort(key=lambda x: x["decay_score"], reverse=True) # Group by type category categories = { "Critical Solutions": ["solution"], "Active Decisions": ["decision"], "Key Fixes": ["fix"], "Configurations": ["configuration"], "Key Procedures": ["procedure"], "Patterns & Workflows": ["code_pattern", "workflow"], "Known Issues": ["problem", "error"], "Insights": ["insight"], "General": ["general"], } now_str = datetime.now(timezone.utc).strftime("%Y-%m-%d") active_count = sum(1 for m in memories if m["decay_score"] >= THRESHOLD_ACTIVE) total_count = len(index.get("entries", {})) lines = [ "# Memory Core (auto-generated)", f"> Last updated: {now_str} | Active memories: {active_count}/{total_count} | Next refresh: daily (systemd timer)", "", ] char_count = sum(len(l) for l in lines) for cat_name, cat_types in categories.items(): cat_memories = [m for m in memories if m["type"] in cat_types] if not cat_memories: continue section_lines = [f"## {cat_name}", ""] for mem in cat_memories[:10]: # Cap per section (reduced for summaries) path = mem["path"] title = mem["title"] tags = ", ".join(mem["tags"][:3]) if mem["tags"] else "" # Read body to extract summary summary = "" mem_path = self.memory_dir / path if mem_path.exists(): try: _, body = self._read_memory_file(mem_path) summary = _extract_summary(body) except Exception: pass line = f"- [{title}]({path})" if summary: line += f" - {summary}" if tags: line += f" ({tags})" section_lines.append(line) # Check budget added = sum(len(l) for l in section_lines) + 2 if char_count + added > CORE_MAX_CHARS: break section_lines.append("") section_text = "\n".join(section_lines) if char_count + len(section_text) > CORE_MAX_CHARS: break lines.extend(section_lines) char_count += len(section_text) core_content = "\n".join(lines) core_path = self.memory_dir / "CORE.md" core_path.write_text(core_content, encoding="utf-8") self._git_commit("core: regenerate CORE.md", [core_path]) return core_content def tags_list(self, limit: int = 0) -> List[Dict[str, Any]]: """List all tags with occurrence counts across all memories. Returns list of {"tag": str, "count": int} sorted by count descending. """ index = self._load_index() tag_counts: Dict[str, int] = {} for entry in index.get("entries", {}).values(): for tag in entry.get("tags", []): normalized = tag.lower().strip() if normalized: tag_counts[normalized] = tag_counts.get(normalized, 0) + 1 results = [{"tag": tag, "count": count} for tag, count in tag_counts.items()] results.sort(key=lambda x: x["count"], reverse=True) if limit > 0: results = results[:limit] return results def tags_related(self, tag: str, limit: int = 0) -> List[Dict[str, Any]]: """Find tags that co-occur with the given tag. Returns list of {"tag": str, "co_occurrences": int, "memories_with_both": int} sorted by co_occurrences descending. """ tag = tag.lower().strip() index = self._load_index() co_counts: Dict[str, int] = {} for entry in index.get("entries", {}).values(): entry_tags = [t.lower().strip() for t in entry.get("tags", [])] if tag not in entry_tags: continue for other_tag in entry_tags: if other_tag != tag and other_tag: co_counts[other_tag] = co_counts.get(other_tag, 0) + 1 results = [ {"tag": t, "co_occurrences": count, "memories_with_both": count} for t, count in co_counts.items() ] results.sort(key=lambda x: x["co_occurrences"], reverse=True) if limit > 0: results = results[:limit] return results def tags_suggest(self, memory_id: str) -> List[Dict[str, Any]]: """Suggest tags for a memory based on co-occurrence patterns. Returns top 10 suggestions as {"tag": str, "score": int, "reason": str}. """ index = self._load_index() entry = index.get("entries", {}).get(memory_id) if not entry: return [] existing_tags = set(t.lower().strip() for t in entry.get("tags", [])) if not existing_tags: return [] # For each existing tag, find co-occurring tags and accumulate scores candidate_scores: Dict[str, int] = {} candidate_sources: Dict[str, set] = {} for existing_tag in existing_tags: co_tags = self.tags_related(existing_tag) for co_entry in co_tags: candidate = co_entry["tag"] if candidate in existing_tags: continue candidate_scores[candidate] = ( candidate_scores.get(candidate, 0) + co_entry["co_occurrences"] ) if candidate not in candidate_sources: candidate_sources[candidate] = set() candidate_sources[candidate].add(existing_tag) results = [] for candidate, score in candidate_scores.items(): sources = sorted(candidate_sources[candidate]) reason = "co-occurs with: " + ", ".join(sources) results.append({"tag": candidate, "score": score, "reason": reason}) results.sort(key=lambda x: x["score"], reverse=True) return results[:10] def reflect( self, since: Optional[str] = None, dry_run: bool = False, ) -> Dict[str, Any]: """Review recent memories, identify tag-based clusters, and output consolidation recommendations. Clusters memories that share 2+ tags using iterative union-find merging. Does NOT auto-create insights; the agent reads output and decides what to store. Args: since: ISO date string (YYYY-MM-DD) to filter memories from. Falls back to _state.json's last_reflection, then 30 days ago. dry_run: If True, return analysis without updating _state.json or logging episode. Returns: Dict with clusters, total_memories_reviewed, clusters_found, and since date. """ now = datetime.now(timezone.utc) state = self._load_state() # Determine time window if since: since_date = since elif state.get("last_reflection"): # last_reflection may be a full ISO timestamp; extract date portion since_date = state["last_reflection"][:10] else: since_dt = now - timedelta(days=30) since_date = since_dt.strftime("%Y-%m-%d") # Load recent memories from index index = self._load_index() recent_memories = [] for mid, entry in index.get("entries", {}).items(): created = entry.get("created", "") # Compare date portion only (YYYY-MM-DD) created_date = created[:10] if created else "" if created_date >= since_date: recent_memories.append( { "id": mid, "title": entry.get("title", ""), "type": entry.get("type", "general"), "tags": [t.lower().strip() for t in entry.get("tags", [])], } ) total_reviewed = len(recent_memories) # Union-find clustering by tag overlap >= 2 # parent[i] = index of parent in recent_memories list n = len(recent_memories) parent = list(range(n)) def find(x: int) -> int: while parent[x] != x: parent[x] = parent[parent[x]] # path compression x = parent[x] return x def union(a: int, b: int): ra, rb = find(a), find(b) if ra != rb: parent[rb] = ra # Compare all pairs, check tag overlap for i in range(n): tags_i = set(recent_memories[i]["tags"]) for j in range(i + 1, n): tags_j = set(recent_memories[j]["tags"]) if len(tags_i & tags_j) >= 2: union(i, j) # Group by root clusters_map: Dict[int, List[int]] = {} for i in range(n): root = find(i) clusters_map.setdefault(root, []).append(i) # Build output clusters (only clusters with 2+ members) clusters = [] cluster_id = 0 for indices in clusters_map.values(): if len(indices) < 2: continue cluster_id += 1 members = [] all_tag_sets = [] types_seen = set() for idx in indices: mem = recent_memories[idx] members.append( { "id": mem["id"], "title": mem["title"], "type": mem["type"], "tags": mem["tags"], } ) all_tag_sets.append(set(mem["tags"])) types_seen.add(mem["type"]) # Common tags = intersection of ALL members common_tags = ( sorted(set.intersection(*all_tag_sets)) if all_tag_sets else [] ) # Shared tags = tags appearing in 2+ members tag_counts: Dict[str, int] = {} for ts in all_tag_sets: for t in ts: tag_counts[t] = tag_counts.get(t, 0) + 1 shared_tags = sorted(t for t, c in tag_counts.items() if c >= 2) # Suggested topic tag_label = ( ", ".join(common_tags[:4]) if common_tags else ", ".join(shared_tags[:4]) ) type_label = ", ".join(sorted(types_seen)) suggested_topic = f"Pattern: {tag_label} across {type_label}" clusters.append( { "cluster_id": cluster_id, "members": members, "common_tags": common_tags, "shared_tags": shared_tags, "suggested_topic": suggested_topic, "member_count": len(members), } ) # Sort clusters by member count descending clusters.sort(key=lambda c: c["member_count"], reverse=True) # Re-number after sorting for i, c in enumerate(clusters): c["cluster_id"] = i + 1 result = { "clusters": clusters, "total_memories_reviewed": total_reviewed, "clusters_found": len(clusters), "since": since_date, } if not dry_run: # Update state with last_reflection timestamp state["last_reflection"] = now.isoformat() self._save_state(state) # Log an episode entry self.episode( type="reflection", title=f"Reflection: {len(clusters)} clusters from {total_reviewed} memories", tags=["reflection", "cognitive-memory"], summary=f"Reviewed {total_reviewed} memories since {since_date}, found {len(clusters)} clusters", ) # Regenerate REFLECTION.md self.reflection_summary() return result def merge( self, keep_id: str, absorb_id: str, dry_run: bool = False, ) -> Dict[str, Any]: """Merge two memories: absorb one into another. The 'keep' memory absorbs the content, tags, and relations from the 'absorb' memory. The absorb memory is then deleted. All other memories referencing absorb_id are updated to point to keep_id. If dry_run=True, returns what would change without writing anything. """ # Resolve both memory file paths keep_path = self._resolve_memory_path(keep_id) if not keep_path: raise ValueError(f"Keep memory not found: {keep_id}") absorb_path = self._resolve_memory_path(absorb_id) if not absorb_path: raise ValueError(f"Absorb memory not found: {absorb_id}") # Read both memory files keep_fm, keep_body = self._read_memory_file(keep_path) absorb_fm, absorb_body = self._read_memory_file(absorb_path) keep_title = keep_fm.get("title", keep_id[:8]) absorb_title = absorb_fm.get("title", absorb_id[:8]) # Combine content merged_body = keep_body if absorb_body.strip(): merged_body = ( keep_body.rstrip() + f"\n\n---\n*Merged from: {absorb_title}*\n\n" + absorb_body.strip() ) # Merge tags (sorted, deduplicated) keep_tags = keep_fm.get("tags", []) absorb_tags = absorb_fm.get("tags", []) merged_tags = sorted(list(set(keep_tags + absorb_tags))) # importance = max of both merged_importance = max( keep_fm.get("importance", 0.5), absorb_fm.get("importance", 0.5), ) # Merge relations: combine both relation lists keep_rels = list(keep_fm.get("relations", [])) absorb_rels = list(absorb_fm.get("relations", [])) combined_rels = keep_rels + absorb_rels # Replace any relation targeting absorb_id with keep_id for rel in combined_rels: if rel.get("target") == absorb_id: rel["target"] = keep_id # Deduplicate by (target, type) tuple seen = set() deduped_rels = [] for rel in combined_rels: key = (rel.get("target"), rel.get("type")) if key not in seen: seen.add(key) deduped_rels.append(rel) # Remove self-referential relations (target == keep_id) merged_rels = [r for r in deduped_rels if r.get("target") != keep_id] # Scan all other memories for relations targeting absorb_id index = self._load_index() updated_others = [] for mid, entry in index.get("entries", {}).items(): if mid in (keep_id, absorb_id): continue rels = entry.get("relations", []) needs_update = any(r.get("target") == absorb_id for r in rels) if needs_update: updated_others.append(mid) if dry_run: return { "dry_run": True, "keep_id": keep_id, "keep_title": keep_title, "absorb_id": absorb_id, "absorb_title": absorb_title, "merged_tags": merged_tags, "merged_importance": merged_importance, "merged_relations_count": len(merged_rels), "other_memories_updated": len(updated_others), "other_memory_ids": updated_others, } # Write updated keep memory file now = datetime.now(timezone.utc).isoformat() keep_fm["tags"] = merged_tags keep_fm["importance"] = merged_importance keep_fm["relations"] = merged_rels keep_fm["updated"] = now self._write_memory_file(keep_path, keep_fm, merged_body) # Update index for keep memory (refresh content_preview with merged body) keep_rel_path = str(keep_path.relative_to(self.memory_dir)) preview = merged_body.strip()[:200] if len(merged_body.strip()) > 200: last_space = preview.rfind(" ") if last_space > 0: preview = preview[:last_space] self._update_index_entry( keep_id, keep_fm, keep_rel_path, content_preview=preview ) # Update all other memories that reference absorb_id for mid in updated_others: other_path = self._resolve_memory_path(mid) if not other_path: continue try: other_fm, other_body = self._read_memory_file(other_path) other_rels = other_fm.get("relations", []) for rel in other_rels: if rel.get("target") == absorb_id: rel["target"] = keep_id # Deduplicate after replacement seen_other = set() deduped_other = [] for rel in other_rels: key = (rel.get("target"), rel.get("type")) if key not in seen_other: seen_other.add(key) deduped_other.append(rel) # Remove self-referential other_fm["relations"] = [ r for r in deduped_other if r.get("target") != mid ] other_fm["updated"] = now self._write_memory_file(other_path, other_fm, other_body) other_rel_path = str(other_path.relative_to(self.memory_dir)) self._update_index_entry(mid, other_fm, other_rel_path) except Exception: pass # Delete absorb memory file and remove from index/state absorb_path.unlink() self._remove_index_entry(absorb_id) state = self._load_state() state.get("entries", {}).pop(absorb_id, None) self._save_state(state) # Git: stage absorb deletion try: absorb_rel = absorb_path.relative_to(self.memory_dir) subprocess.run( ["git", "rm", "--cached", str(absorb_rel)], cwd=str(self.memory_dir), capture_output=True, timeout=5, ) except Exception: pass self._git_commit(f"merge: {keep_title} absorbed {absorb_title}") return { "success": True, "keep_id": keep_id, "keep_title": keep_title, "absorb_id": absorb_id, "absorb_title": absorb_title, "merged_tags": merged_tags, "merged_importance": merged_importance, "merged_relations_count": len(merged_rels), "other_memories_updated": len(updated_others), } def reflection_summary(self) -> str: """Generate REFLECTION.md with patterns and insights from the memory system. Writes to REFLECTION.md in the memory data directory and git-commits the file. Returns the generated content. """ index = self._load_index() state = self._load_state() entries = index.get("entries", {}) state_entries = state.get("entries", {}) now_str = datetime.now(timezone.utc).strftime("%Y-%m-%d") # Last reflection date last_reflection = state.get("last_reflection", "") last_reflection_date = last_reflection[:10] if last_reflection else "never" # Count total reflections from episode files total_reflections = 0 episodes_dir = self.memory_dir / "episodes" if episodes_dir.exists(): for ep_file in sorted(episodes_dir.glob("*.md")): try: text = ep_file.read_text(encoding="utf-8") # Count lines that look like reflection episode entries for line in text.split("\n"): if "Reflection:" in line and line.strip().startswith("## "): total_reflections += 1 except OSError: pass lines = [ "# Reflection Summary (auto-generated)", f"> Last updated: {now_str} | Last reflection: {last_reflection_date} | Total reflections: {total_reflections}", "", ] # === Themes section === # Get top tags, then find co-occurrence pairs lines.append("## Themes") lines.append("Top tag co-occurrences revealing recurring themes:") lines.append("") top_tags = self.tags_list(limit=10) # Build co-occurrence pairs across top tags pair_data: Dict[Tuple[str, str], List[str]] = {} # (tag1, tag2) -> [titles] for tag_info in top_tags[:5]: tag = tag_info["tag"] co_tags = self.tags_related(tag, limit=5) for co in co_tags: other = co["tag"] # Canonical pair ordering pair = tuple(sorted([tag, other])) if pair in pair_data: continue # Find memories with both tags and collect titles titles = [] for mid, entry in entries.items(): mem_tags = [t.lower().strip() for t in entry.get("tags", [])] if pair[0] in mem_tags and pair[1] in mem_tags: titles.append(entry.get("title", "untitled")) if titles: pair_data[pair] = titles # Sort pairs by memory count descending, show top 8 sorted_pairs = sorted(pair_data.items(), key=lambda x: len(x[1]), reverse=True) for (tag_a, tag_b), titles in sorted_pairs[:8]: example_titles = ", ".join(f'"{t}"' for t in titles[:3]) lines.append( f"- **{tag_a} + {tag_b}**: {len(titles)} memories ({example_titles})" ) if not sorted_pairs: lines.append("- No co-occurrence data available yet.") lines.append("") # === Cross-Project Patterns section === lines.append("## Cross-Project Patterns") lines.append("Tags that span multiple projects:") lines.append("") # Auto-detect project tags: tags with high fanout (appear in 5+ memories # and co-occur with 4+ distinct other tags) are likely project identifiers tag_counts: Dict[str, int] = {} tag_cooccur: Dict[str, set] = {} for entry in entries.values(): mem_tags = [t.lower().strip() for t in entry.get("tags", [])] for t in mem_tags: tag_counts[t] = tag_counts.get(t, 0) + 1 if t not in tag_cooccur: tag_cooccur[t] = set() tag_cooccur[t].update(tt for tt in mem_tags if tt != t) known_projects = [ t for t, count in tag_counts.items() if count >= 5 and len(tag_cooccur.get(t, set())) >= 4 ] # Build tag -> {project -> count} mapping tag_project_map: Dict[str, Dict[str, int]] = {} for mid, entry in entries.items(): mem_tags = [t.lower().strip() for t in entry.get("tags", [])] # Identify which projects this memory belongs to mem_projects = [t for t in mem_tags if t in known_projects] # For non-project tags, record which projects they appear in non_project_tags = [t for t in mem_tags if t not in known_projects] for tag in non_project_tags: if tag not in tag_project_map: tag_project_map[tag] = {} for proj in mem_projects: tag_project_map[tag][proj] = tag_project_map[tag].get(proj, 0) + 1 # Filter to tags spanning 2+ projects, sort by project count then total cross_project = [] for tag, projects in tag_project_map.items(): if len(projects) >= 2: total = sum(projects.values()) cross_project.append((tag, projects, total)) cross_project.sort(key=lambda x: (len(x[1]), x[2]), reverse=True) for tag, projects, total in cross_project[:10]: proj_parts = ", ".join( f"{p} ({c})" for p, c in sorted(projects.items(), key=lambda x: x[1], reverse=True) ) lines.append(f"- **{tag}**: appears in {proj_parts}") if not cross_project: lines.append("- No cross-project patterns detected yet.") lines.append("") # === Most Accessed section === lines.append("## Most Accessed") lines.append("Top 10 memories by access count:") lines.append("") # Sort state entries by access_count descending accessed = [] for mid, s in state_entries.items(): count = s.get("access_count", 0) if count > 0: entry = entries.get(mid) if entry: accessed.append( ( mid, entry.get("title", "untitled"), entry.get("path", ""), count, ) ) accessed.sort(key=lambda x: x[3], reverse=True) for mid, title, path, count in accessed[:10]: lines.append(f"1. [{title}]({path}) - {count} accesses") if not accessed: lines.append("- No access data recorded yet.") lines.append("") # === Recent Insights section === lines.append("## Recent Insights") lines.append("Insight-type memories:") lines.append("") insights = [] for mid, entry in entries.items(): if entry.get("type") == "insight": insights.append((mid, entry)) # Sort by created date descending insights.sort(key=lambda x: x[1].get("created", ""), reverse=True) for mid, entry in insights[:10]: title = entry.get("title", "untitled") path = entry.get("path", "") preview = entry.get("content_preview", "") if preview: lines.append(f"- [{title}]({path}) - {preview[:80]}") else: lines.append(f"- [{title}]({path})") if not insights: lines.append("- No insight memories stored yet.") lines.append("") # === Consolidation History section === lines.append("## Consolidation History") lines.append("") merge_count = 0 if episodes_dir.exists(): for ep_file in sorted(episodes_dir.glob("*.md")): try: text = ep_file.read_text(encoding="utf-8") for line_text in text.split("\n"): stripped = line_text.strip() if stripped.startswith("## ") and "merge" in stripped.lower(): merge_count += 1 except OSError: pass lines.append(f"- Total merges performed: {merge_count}") lines.append("") content = "\n".join(lines) # Write REFLECTION.md reflection_path = self.memory_dir / "REFLECTION.md" reflection_path.write_text(content, encoding="utf-8") self._git_commit("reflection: regenerate REFLECTION.md", [reflection_path]) return content