Moved application code from ~/.claude/skills/cognitive-memory/ to its own project directory. The skill layer (SKILL.md, SCHEMA.md) remains in the skill directory for Claude Code to read. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
862 lines
32 KiB
Python
862 lines
32 KiB
Python
"""
|
|
Cognitive Memory - Analysis Mixin
|
|
|
|
Decay scoring, CORE.md generation, tag analysis, reflection clustering,
|
|
memory merging, and REFLECTION.md generation.
|
|
"""
|
|
|
|
import json
|
|
import subprocess
|
|
import sys
|
|
from datetime import datetime, timedelta, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
from common import (
|
|
CORE_MAX_CHARS,
|
|
MEMORY_DIR,
|
|
THRESHOLD_ACTIVE,
|
|
THRESHOLD_DORMANT,
|
|
THRESHOLD_FADING,
|
|
TYPE_WEIGHTS,
|
|
calculate_decay_score,
|
|
)
|
|
|
|
|
|
class AnalysisMixin:
|
|
"""Mixin providing analysis operations for CognitiveMemoryClient."""
|
|
|
|
def decay(self) -> Dict[str, Any]:
|
|
"""Recalculate all decay scores. Updates _state.json."""
|
|
index = self._load_index()
|
|
state = self._load_state()
|
|
now = datetime.now(timezone.utc)
|
|
updated_count = 0
|
|
|
|
for mid, entry in index.get("entries", {}).items():
|
|
s = state.setdefault("entries", {}).setdefault(
|
|
mid,
|
|
{
|
|
"access_count": 0,
|
|
"last_accessed": entry.get("created", now.isoformat()),
|
|
"decay_score": 0.5,
|
|
},
|
|
)
|
|
|
|
# Calculate days since last access
|
|
last_str = s.get("last_accessed", entry.get("created", ""))
|
|
try:
|
|
last_dt = datetime.fromisoformat(last_str.replace("Z", "+00:00"))
|
|
if last_dt.tzinfo is None:
|
|
last_dt = last_dt.replace(tzinfo=timezone.utc)
|
|
days = (now - last_dt).total_seconds() / 86400
|
|
except (ValueError, AttributeError):
|
|
days = 30 # Default to 30 days if unparseable
|
|
|
|
importance = entry.get("importance", 0.5)
|
|
mem_type = entry.get("type", "general")
|
|
type_weight = TYPE_WEIGHTS.get(mem_type, 1.0)
|
|
access_count = s.get("access_count", 0)
|
|
|
|
# Check if pinned (vault)
|
|
path = entry.get("path", "")
|
|
if path.startswith("vault/"):
|
|
s["decay_score"] = 999.0 # Pinned memories never decay
|
|
else:
|
|
s["decay_score"] = round(
|
|
calculate_decay_score(importance, days, access_count, type_weight),
|
|
4,
|
|
)
|
|
|
|
updated_count += 1
|
|
|
|
self._save_state(state)
|
|
|
|
# Summary
|
|
state_entries = state.get("entries", {})
|
|
return {
|
|
"updated": updated_count,
|
|
"active": sum(
|
|
1
|
|
for s in state_entries.values()
|
|
if s.get("decay_score", 0) >= THRESHOLD_ACTIVE
|
|
),
|
|
"fading": sum(
|
|
1
|
|
for s in state_entries.values()
|
|
if THRESHOLD_FADING <= s.get("decay_score", 0) < THRESHOLD_ACTIVE
|
|
),
|
|
"dormant": sum(
|
|
1
|
|
for s in state_entries.values()
|
|
if THRESHOLD_DORMANT <= s.get("decay_score", 0) < THRESHOLD_FADING
|
|
),
|
|
"archived": sum(
|
|
1
|
|
for s in state_entries.values()
|
|
if 0 < s.get("decay_score", 0) < THRESHOLD_DORMANT
|
|
),
|
|
}
|
|
|
|
def core(self) -> str:
|
|
"""Generate CORE.md from top memories by decay score."""
|
|
index = self._load_index()
|
|
state = self._load_state()
|
|
|
|
def _extract_summary(body: str, max_len: int = 80) -> str:
|
|
"""Extract first meaningful sentence from memory body."""
|
|
if not body or not body.strip():
|
|
return ""
|
|
# Skip leading code blocks, headings, and list markers
|
|
for ln in body.strip().split("\n"):
|
|
stripped = ln.strip()
|
|
if not stripped:
|
|
continue
|
|
if stripped.startswith("```") or stripped.startswith("#"):
|
|
continue
|
|
first_line = stripped.lstrip("- *>").strip()
|
|
break
|
|
else:
|
|
return ""
|
|
if not first_line:
|
|
return ""
|
|
# Extract first sentence (split on '. ')
|
|
dot_pos = first_line.find(". ")
|
|
if 0 < dot_pos < max_len:
|
|
sentence = first_line[: dot_pos + 1]
|
|
else:
|
|
sentence = first_line
|
|
if len(sentence) > max_len:
|
|
sentence = sentence[: max_len - 3].rstrip() + "..."
|
|
return sentence
|
|
|
|
# Collect all memories with decay scores
|
|
memories = []
|
|
for mid, entry in index.get("entries", {}).items():
|
|
s = state.get("entries", {}).get(mid, {})
|
|
decay = s.get("decay_score", 0.5)
|
|
if decay < THRESHOLD_FADING:
|
|
continue # Skip low-relevance
|
|
memories.append(
|
|
{
|
|
"id": mid,
|
|
"title": entry.get("title", ""),
|
|
"type": entry.get("type", "general"),
|
|
"path": entry.get("path", ""),
|
|
"importance": entry.get("importance", 0.5),
|
|
"decay_score": decay,
|
|
"tags": entry.get("tags", []),
|
|
}
|
|
)
|
|
|
|
# Sort by decay score descending
|
|
memories.sort(key=lambda x: x["decay_score"], reverse=True)
|
|
|
|
# Group by type category
|
|
categories = {
|
|
"Critical Solutions": ["solution"],
|
|
"Active Decisions": ["decision"],
|
|
"Key Fixes": ["fix"],
|
|
"Configurations": ["configuration"],
|
|
"Key Procedures": ["procedure"],
|
|
"Patterns & Workflows": ["code_pattern", "workflow"],
|
|
"Known Issues": ["problem", "error"],
|
|
"Insights": ["insight"],
|
|
"General": ["general"],
|
|
}
|
|
|
|
now_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
|
|
active_count = sum(1 for m in memories if m["decay_score"] >= THRESHOLD_ACTIVE)
|
|
total_count = len(index.get("entries", {}))
|
|
|
|
lines = [
|
|
"# Memory Core (auto-generated)",
|
|
f"> Last updated: {now_str} | Active memories: {active_count}/{total_count} | Next refresh: daily (systemd timer)",
|
|
"",
|
|
]
|
|
|
|
char_count = sum(len(l) for l in lines)
|
|
|
|
for cat_name, cat_types in categories.items():
|
|
cat_memories = [m for m in memories if m["type"] in cat_types]
|
|
if not cat_memories:
|
|
continue
|
|
|
|
section_lines = [f"## {cat_name}", ""]
|
|
for mem in cat_memories[:10]: # Cap per section (reduced for summaries)
|
|
path = mem["path"]
|
|
title = mem["title"]
|
|
tags = ", ".join(mem["tags"][:3]) if mem["tags"] else ""
|
|
|
|
# Read body to extract summary
|
|
summary = ""
|
|
mem_path = self.memory_dir / path
|
|
if mem_path.exists():
|
|
try:
|
|
_, body = self._read_memory_file(mem_path)
|
|
summary = _extract_summary(body)
|
|
except Exception:
|
|
pass
|
|
|
|
line = f"- [{title}]({path})"
|
|
if summary:
|
|
line += f" - {summary}"
|
|
if tags:
|
|
line += f" ({tags})"
|
|
section_lines.append(line)
|
|
|
|
# Check budget
|
|
added = sum(len(l) for l in section_lines) + 2
|
|
if char_count + added > CORE_MAX_CHARS:
|
|
break
|
|
|
|
section_lines.append("")
|
|
section_text = "\n".join(section_lines)
|
|
if char_count + len(section_text) > CORE_MAX_CHARS:
|
|
break
|
|
lines.extend(section_lines)
|
|
char_count += len(section_text)
|
|
|
|
core_content = "\n".join(lines)
|
|
core_path = self.memory_dir / "CORE.md"
|
|
core_path.write_text(core_content, encoding="utf-8")
|
|
self._git_commit("core: regenerate CORE.md", [core_path])
|
|
|
|
return core_content
|
|
|
|
def tags_list(self, limit: int = 0) -> List[Dict[str, Any]]:
|
|
"""List all tags with occurrence counts across all memories.
|
|
|
|
Returns list of {"tag": str, "count": int} sorted by count descending.
|
|
"""
|
|
index = self._load_index()
|
|
tag_counts: Dict[str, int] = {}
|
|
|
|
for entry in index.get("entries", {}).values():
|
|
for tag in entry.get("tags", []):
|
|
normalized = tag.lower().strip()
|
|
if normalized:
|
|
tag_counts[normalized] = tag_counts.get(normalized, 0) + 1
|
|
|
|
results = [{"tag": tag, "count": count} for tag, count in tag_counts.items()]
|
|
results.sort(key=lambda x: x["count"], reverse=True)
|
|
|
|
if limit > 0:
|
|
results = results[:limit]
|
|
return results
|
|
|
|
def tags_related(self, tag: str, limit: int = 0) -> List[Dict[str, Any]]:
|
|
"""Find tags that co-occur with the given tag.
|
|
|
|
Returns list of {"tag": str, "co_occurrences": int, "memories_with_both": int}
|
|
sorted by co_occurrences descending.
|
|
"""
|
|
tag = tag.lower().strip()
|
|
index = self._load_index()
|
|
co_counts: Dict[str, int] = {}
|
|
|
|
for entry in index.get("entries", {}).values():
|
|
entry_tags = [t.lower().strip() for t in entry.get("tags", [])]
|
|
if tag not in entry_tags:
|
|
continue
|
|
for other_tag in entry_tags:
|
|
if other_tag != tag and other_tag:
|
|
co_counts[other_tag] = co_counts.get(other_tag, 0) + 1
|
|
|
|
results = [
|
|
{"tag": t, "co_occurrences": count, "memories_with_both": count}
|
|
for t, count in co_counts.items()
|
|
]
|
|
results.sort(key=lambda x: x["co_occurrences"], reverse=True)
|
|
|
|
if limit > 0:
|
|
results = results[:limit]
|
|
return results
|
|
|
|
def tags_suggest(self, memory_id: str) -> List[Dict[str, Any]]:
|
|
"""Suggest tags for a memory based on co-occurrence patterns.
|
|
|
|
Returns top 10 suggestions as {"tag": str, "score": int, "reason": str}.
|
|
"""
|
|
index = self._load_index()
|
|
entry = index.get("entries", {}).get(memory_id)
|
|
if not entry:
|
|
return []
|
|
|
|
existing_tags = set(t.lower().strip() for t in entry.get("tags", []))
|
|
if not existing_tags:
|
|
return []
|
|
|
|
# For each existing tag, find co-occurring tags and accumulate scores
|
|
candidate_scores: Dict[str, int] = {}
|
|
candidate_sources: Dict[str, set] = {}
|
|
|
|
for existing_tag in existing_tags:
|
|
co_tags = self.tags_related(existing_tag)
|
|
for co_entry in co_tags:
|
|
candidate = co_entry["tag"]
|
|
if candidate in existing_tags:
|
|
continue
|
|
candidate_scores[candidate] = (
|
|
candidate_scores.get(candidate, 0) + co_entry["co_occurrences"]
|
|
)
|
|
if candidate not in candidate_sources:
|
|
candidate_sources[candidate] = set()
|
|
candidate_sources[candidate].add(existing_tag)
|
|
|
|
results = []
|
|
for candidate, score in candidate_scores.items():
|
|
sources = sorted(candidate_sources[candidate])
|
|
reason = "co-occurs with: " + ", ".join(sources)
|
|
results.append({"tag": candidate, "score": score, "reason": reason})
|
|
|
|
results.sort(key=lambda x: x["score"], reverse=True)
|
|
return results[:10]
|
|
|
|
def reflect(
|
|
self,
|
|
since: Optional[str] = None,
|
|
dry_run: bool = False,
|
|
) -> Dict[str, Any]:
|
|
"""Review recent memories, identify tag-based clusters, and output consolidation recommendations.
|
|
|
|
Clusters memories that share 2+ tags using iterative union-find merging.
|
|
Does NOT auto-create insights; the agent reads output and decides what to store.
|
|
|
|
Args:
|
|
since: ISO date string (YYYY-MM-DD) to filter memories from. Falls back to
|
|
_state.json's last_reflection, then 30 days ago.
|
|
dry_run: If True, return analysis without updating _state.json or logging episode.
|
|
|
|
Returns:
|
|
Dict with clusters, total_memories_reviewed, clusters_found, and since date.
|
|
"""
|
|
now = datetime.now(timezone.utc)
|
|
state = self._load_state()
|
|
|
|
# Determine time window
|
|
if since:
|
|
since_date = since
|
|
elif state.get("last_reflection"):
|
|
# last_reflection may be a full ISO timestamp; extract date portion
|
|
since_date = state["last_reflection"][:10]
|
|
else:
|
|
since_dt = now - timedelta(days=30)
|
|
since_date = since_dt.strftime("%Y-%m-%d")
|
|
|
|
# Load recent memories from index
|
|
index = self._load_index()
|
|
recent_memories = []
|
|
for mid, entry in index.get("entries", {}).items():
|
|
created = entry.get("created", "")
|
|
# Compare date portion only (YYYY-MM-DD)
|
|
created_date = created[:10] if created else ""
|
|
if created_date >= since_date:
|
|
recent_memories.append(
|
|
{
|
|
"id": mid,
|
|
"title": entry.get("title", ""),
|
|
"type": entry.get("type", "general"),
|
|
"tags": [t.lower().strip() for t in entry.get("tags", [])],
|
|
}
|
|
)
|
|
|
|
total_reviewed = len(recent_memories)
|
|
|
|
# Union-find clustering by tag overlap >= 2
|
|
# parent[i] = index of parent in recent_memories list
|
|
n = len(recent_memories)
|
|
parent = list(range(n))
|
|
|
|
def find(x: int) -> int:
|
|
while parent[x] != x:
|
|
parent[x] = parent[parent[x]] # path compression
|
|
x = parent[x]
|
|
return x
|
|
|
|
def union(a: int, b: int):
|
|
ra, rb = find(a), find(b)
|
|
if ra != rb:
|
|
parent[rb] = ra
|
|
|
|
# Compare all pairs, check tag overlap
|
|
for i in range(n):
|
|
tags_i = set(recent_memories[i]["tags"])
|
|
for j in range(i + 1, n):
|
|
tags_j = set(recent_memories[j]["tags"])
|
|
if len(tags_i & tags_j) >= 2:
|
|
union(i, j)
|
|
|
|
# Group by root
|
|
clusters_map: Dict[int, List[int]] = {}
|
|
for i in range(n):
|
|
root = find(i)
|
|
clusters_map.setdefault(root, []).append(i)
|
|
|
|
# Build output clusters (only clusters with 2+ members)
|
|
clusters = []
|
|
cluster_id = 0
|
|
for indices in clusters_map.values():
|
|
if len(indices) < 2:
|
|
continue
|
|
cluster_id += 1
|
|
members = []
|
|
all_tag_sets = []
|
|
types_seen = set()
|
|
for idx in indices:
|
|
mem = recent_memories[idx]
|
|
members.append(
|
|
{
|
|
"id": mem["id"],
|
|
"title": mem["title"],
|
|
"type": mem["type"],
|
|
"tags": mem["tags"],
|
|
}
|
|
)
|
|
all_tag_sets.append(set(mem["tags"]))
|
|
types_seen.add(mem["type"])
|
|
|
|
# Common tags = intersection of ALL members
|
|
common_tags = (
|
|
sorted(set.intersection(*all_tag_sets)) if all_tag_sets else []
|
|
)
|
|
|
|
# Shared tags = tags appearing in 2+ members
|
|
tag_counts: Dict[str, int] = {}
|
|
for ts in all_tag_sets:
|
|
for t in ts:
|
|
tag_counts[t] = tag_counts.get(t, 0) + 1
|
|
shared_tags = sorted(t for t, c in tag_counts.items() if c >= 2)
|
|
|
|
# Suggested topic
|
|
tag_label = (
|
|
", ".join(common_tags[:4])
|
|
if common_tags
|
|
else ", ".join(shared_tags[:4])
|
|
)
|
|
type_label = ", ".join(sorted(types_seen))
|
|
suggested_topic = f"Pattern: {tag_label} across {type_label}"
|
|
|
|
clusters.append(
|
|
{
|
|
"cluster_id": cluster_id,
|
|
"members": members,
|
|
"common_tags": common_tags,
|
|
"shared_tags": shared_tags,
|
|
"suggested_topic": suggested_topic,
|
|
"member_count": len(members),
|
|
}
|
|
)
|
|
|
|
# Sort clusters by member count descending
|
|
clusters.sort(key=lambda c: c["member_count"], reverse=True)
|
|
# Re-number after sorting
|
|
for i, c in enumerate(clusters):
|
|
c["cluster_id"] = i + 1
|
|
|
|
result = {
|
|
"clusters": clusters,
|
|
"total_memories_reviewed": total_reviewed,
|
|
"clusters_found": len(clusters),
|
|
"since": since_date,
|
|
}
|
|
|
|
if not dry_run:
|
|
# Update state with last_reflection timestamp
|
|
state["last_reflection"] = now.isoformat()
|
|
self._save_state(state)
|
|
|
|
# Log an episode entry
|
|
self.episode(
|
|
type="reflection",
|
|
title=f"Reflection: {len(clusters)} clusters from {total_reviewed} memories",
|
|
tags=["reflection", "cognitive-memory"],
|
|
summary=f"Reviewed {total_reviewed} memories since {since_date}, found {len(clusters)} clusters",
|
|
)
|
|
|
|
# Regenerate REFLECTION.md
|
|
self.reflection_summary()
|
|
|
|
return result
|
|
|
|
def merge(
|
|
self,
|
|
keep_id: str,
|
|
absorb_id: str,
|
|
dry_run: bool = False,
|
|
) -> Dict[str, Any]:
|
|
"""Merge two memories: absorb one into another.
|
|
|
|
The 'keep' memory absorbs the content, tags, and relations from
|
|
the 'absorb' memory. The absorb memory is then deleted.
|
|
All other memories referencing absorb_id are updated to point to keep_id.
|
|
|
|
If dry_run=True, returns what would change without writing anything.
|
|
"""
|
|
# Resolve both memory file paths
|
|
keep_path = self._resolve_memory_path(keep_id)
|
|
if not keep_path:
|
|
raise ValueError(f"Keep memory not found: {keep_id}")
|
|
absorb_path = self._resolve_memory_path(absorb_id)
|
|
if not absorb_path:
|
|
raise ValueError(f"Absorb memory not found: {absorb_id}")
|
|
|
|
# Read both memory files
|
|
keep_fm, keep_body = self._read_memory_file(keep_path)
|
|
absorb_fm, absorb_body = self._read_memory_file(absorb_path)
|
|
|
|
keep_title = keep_fm.get("title", keep_id[:8])
|
|
absorb_title = absorb_fm.get("title", absorb_id[:8])
|
|
|
|
# Combine content
|
|
merged_body = keep_body
|
|
if absorb_body.strip():
|
|
merged_body = (
|
|
keep_body.rstrip()
|
|
+ f"\n\n---\n*Merged from: {absorb_title}*\n\n"
|
|
+ absorb_body.strip()
|
|
)
|
|
|
|
# Merge tags (sorted, deduplicated)
|
|
keep_tags = keep_fm.get("tags", [])
|
|
absorb_tags = absorb_fm.get("tags", [])
|
|
merged_tags = sorted(list(set(keep_tags + absorb_tags)))
|
|
|
|
# importance = max of both
|
|
merged_importance = max(
|
|
keep_fm.get("importance", 0.5),
|
|
absorb_fm.get("importance", 0.5),
|
|
)
|
|
|
|
# Merge relations: combine both relation lists
|
|
keep_rels = list(keep_fm.get("relations", []))
|
|
absorb_rels = list(absorb_fm.get("relations", []))
|
|
combined_rels = keep_rels + absorb_rels
|
|
|
|
# Replace any relation targeting absorb_id with keep_id
|
|
for rel in combined_rels:
|
|
if rel.get("target") == absorb_id:
|
|
rel["target"] = keep_id
|
|
|
|
# Deduplicate by (target, type) tuple
|
|
seen = set()
|
|
deduped_rels = []
|
|
for rel in combined_rels:
|
|
key = (rel.get("target"), rel.get("type"))
|
|
if key not in seen:
|
|
seen.add(key)
|
|
deduped_rels.append(rel)
|
|
|
|
# Remove self-referential relations (target == keep_id)
|
|
merged_rels = [r for r in deduped_rels if r.get("target") != keep_id]
|
|
|
|
# Scan all other memories for relations targeting absorb_id
|
|
index = self._load_index()
|
|
updated_others = []
|
|
for mid, entry in index.get("entries", {}).items():
|
|
if mid in (keep_id, absorb_id):
|
|
continue
|
|
rels = entry.get("relations", [])
|
|
needs_update = any(r.get("target") == absorb_id for r in rels)
|
|
if needs_update:
|
|
updated_others.append(mid)
|
|
|
|
if dry_run:
|
|
return {
|
|
"dry_run": True,
|
|
"keep_id": keep_id,
|
|
"keep_title": keep_title,
|
|
"absorb_id": absorb_id,
|
|
"absorb_title": absorb_title,
|
|
"merged_tags": merged_tags,
|
|
"merged_importance": merged_importance,
|
|
"merged_relations_count": len(merged_rels),
|
|
"other_memories_updated": len(updated_others),
|
|
"other_memory_ids": updated_others,
|
|
}
|
|
|
|
# Write updated keep memory file
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
keep_fm["tags"] = merged_tags
|
|
keep_fm["importance"] = merged_importance
|
|
keep_fm["relations"] = merged_rels
|
|
keep_fm["updated"] = now
|
|
self._write_memory_file(keep_path, keep_fm, merged_body)
|
|
|
|
# Update index for keep memory (refresh content_preview with merged body)
|
|
keep_rel_path = str(keep_path.relative_to(self.memory_dir))
|
|
preview = merged_body.strip()[:200]
|
|
if len(merged_body.strip()) > 200:
|
|
last_space = preview.rfind(" ")
|
|
if last_space > 0:
|
|
preview = preview[:last_space]
|
|
self._update_index_entry(
|
|
keep_id, keep_fm, keep_rel_path, content_preview=preview
|
|
)
|
|
|
|
# Update all other memories that reference absorb_id
|
|
for mid in updated_others:
|
|
other_path = self._resolve_memory_path(mid)
|
|
if not other_path:
|
|
continue
|
|
try:
|
|
other_fm, other_body = self._read_memory_file(other_path)
|
|
other_rels = other_fm.get("relations", [])
|
|
for rel in other_rels:
|
|
if rel.get("target") == absorb_id:
|
|
rel["target"] = keep_id
|
|
# Deduplicate after replacement
|
|
seen_other = set()
|
|
deduped_other = []
|
|
for rel in other_rels:
|
|
key = (rel.get("target"), rel.get("type"))
|
|
if key not in seen_other:
|
|
seen_other.add(key)
|
|
deduped_other.append(rel)
|
|
# Remove self-referential
|
|
other_fm["relations"] = [
|
|
r for r in deduped_other if r.get("target") != mid
|
|
]
|
|
other_fm["updated"] = now
|
|
self._write_memory_file(other_path, other_fm, other_body)
|
|
other_rel_path = str(other_path.relative_to(self.memory_dir))
|
|
self._update_index_entry(mid, other_fm, other_rel_path)
|
|
except Exception:
|
|
pass
|
|
|
|
# Delete absorb memory file and remove from index/state
|
|
absorb_path.unlink()
|
|
self._remove_index_entry(absorb_id)
|
|
state = self._load_state()
|
|
state.get("entries", {}).pop(absorb_id, None)
|
|
self._save_state(state)
|
|
|
|
# Git: stage absorb deletion
|
|
try:
|
|
absorb_rel = absorb_path.relative_to(self.memory_dir)
|
|
subprocess.run(
|
|
["git", "rm", "--cached", str(absorb_rel)],
|
|
cwd=str(self.memory_dir),
|
|
capture_output=True,
|
|
timeout=5,
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
self._git_commit(f"merge: {keep_title} absorbed {absorb_title}")
|
|
|
|
return {
|
|
"success": True,
|
|
"keep_id": keep_id,
|
|
"keep_title": keep_title,
|
|
"absorb_id": absorb_id,
|
|
"absorb_title": absorb_title,
|
|
"merged_tags": merged_tags,
|
|
"merged_importance": merged_importance,
|
|
"merged_relations_count": len(merged_rels),
|
|
"other_memories_updated": len(updated_others),
|
|
}
|
|
|
|
def reflection_summary(self) -> str:
|
|
"""Generate REFLECTION.md with patterns and insights from the memory system.
|
|
|
|
Writes to REFLECTION.md in the memory data directory and git-commits the file.
|
|
Returns the generated content.
|
|
"""
|
|
index = self._load_index()
|
|
state = self._load_state()
|
|
entries = index.get("entries", {})
|
|
state_entries = state.get("entries", {})
|
|
now_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
|
|
|
|
# Last reflection date
|
|
last_reflection = state.get("last_reflection", "")
|
|
last_reflection_date = last_reflection[:10] if last_reflection else "never"
|
|
|
|
# Count total reflections from episode files
|
|
total_reflections = 0
|
|
episodes_dir = self.memory_dir / "episodes"
|
|
if episodes_dir.exists():
|
|
for ep_file in sorted(episodes_dir.glob("*.md")):
|
|
try:
|
|
text = ep_file.read_text(encoding="utf-8")
|
|
# Count lines that look like reflection episode entries
|
|
for line in text.split("\n"):
|
|
if "Reflection:" in line and line.strip().startswith("## "):
|
|
total_reflections += 1
|
|
except OSError:
|
|
pass
|
|
|
|
lines = [
|
|
"# Reflection Summary (auto-generated)",
|
|
f"> Last updated: {now_str} | Last reflection: {last_reflection_date} | Total reflections: {total_reflections}",
|
|
"",
|
|
]
|
|
|
|
# === Themes section ===
|
|
# Get top tags, then find co-occurrence pairs
|
|
lines.append("## Themes")
|
|
lines.append("Top tag co-occurrences revealing recurring themes:")
|
|
lines.append("")
|
|
|
|
top_tags = self.tags_list(limit=10)
|
|
# Build co-occurrence pairs across top tags
|
|
pair_data: Dict[Tuple[str, str], List[str]] = {} # (tag1, tag2) -> [titles]
|
|
for tag_info in top_tags[:5]:
|
|
tag = tag_info["tag"]
|
|
co_tags = self.tags_related(tag, limit=5)
|
|
for co in co_tags:
|
|
other = co["tag"]
|
|
# Canonical pair ordering
|
|
pair = tuple(sorted([tag, other]))
|
|
if pair in pair_data:
|
|
continue
|
|
# Find memories with both tags and collect titles
|
|
titles = []
|
|
for mid, entry in entries.items():
|
|
mem_tags = [t.lower().strip() for t in entry.get("tags", [])]
|
|
if pair[0] in mem_tags and pair[1] in mem_tags:
|
|
titles.append(entry.get("title", "untitled"))
|
|
if titles:
|
|
pair_data[pair] = titles
|
|
|
|
# Sort pairs by memory count descending, show top 8
|
|
sorted_pairs = sorted(pair_data.items(), key=lambda x: len(x[1]), reverse=True)
|
|
for (tag_a, tag_b), titles in sorted_pairs[:8]:
|
|
example_titles = ", ".join(f'"{t}"' for t in titles[:3])
|
|
lines.append(
|
|
f"- **{tag_a} + {tag_b}**: {len(titles)} memories ({example_titles})"
|
|
)
|
|
|
|
if not sorted_pairs:
|
|
lines.append("- No co-occurrence data available yet.")
|
|
lines.append("")
|
|
|
|
# === Cross-Project Patterns section ===
|
|
lines.append("## Cross-Project Patterns")
|
|
lines.append("Tags that span multiple projects:")
|
|
lines.append("")
|
|
|
|
known_projects = [
|
|
"major-domo",
|
|
"paper-dynasty",
|
|
"homelab",
|
|
"vagabond-rpg",
|
|
"foundryvtt",
|
|
"strat-gameplay-webapp",
|
|
]
|
|
# Build tag -> {project -> count} mapping
|
|
tag_project_map: Dict[str, Dict[str, int]] = {}
|
|
for mid, entry in entries.items():
|
|
mem_tags = [t.lower().strip() for t in entry.get("tags", [])]
|
|
# Identify which projects this memory belongs to
|
|
mem_projects = [t for t in mem_tags if t in known_projects]
|
|
# For non-project tags, record which projects they appear in
|
|
non_project_tags = [t for t in mem_tags if t not in known_projects]
|
|
for tag in non_project_tags:
|
|
if tag not in tag_project_map:
|
|
tag_project_map[tag] = {}
|
|
for proj in mem_projects:
|
|
tag_project_map[tag][proj] = tag_project_map[tag].get(proj, 0) + 1
|
|
|
|
# Filter to tags spanning 2+ projects, sort by project count then total
|
|
cross_project = []
|
|
for tag, projects in tag_project_map.items():
|
|
if len(projects) >= 2:
|
|
total = sum(projects.values())
|
|
cross_project.append((tag, projects, total))
|
|
cross_project.sort(key=lambda x: (len(x[1]), x[2]), reverse=True)
|
|
|
|
for tag, projects, total in cross_project[:10]:
|
|
proj_parts = ", ".join(
|
|
f"{p} ({c})"
|
|
for p, c in sorted(projects.items(), key=lambda x: x[1], reverse=True)
|
|
)
|
|
lines.append(f"- **{tag}**: appears in {proj_parts}")
|
|
|
|
if not cross_project:
|
|
lines.append("- No cross-project patterns detected yet.")
|
|
lines.append("")
|
|
|
|
# === Most Accessed section ===
|
|
lines.append("## Most Accessed")
|
|
lines.append("Top 10 memories by access count:")
|
|
lines.append("")
|
|
|
|
# Sort state entries by access_count descending
|
|
accessed = []
|
|
for mid, s in state_entries.items():
|
|
count = s.get("access_count", 0)
|
|
if count > 0:
|
|
entry = entries.get(mid)
|
|
if entry:
|
|
accessed.append(
|
|
(
|
|
mid,
|
|
entry.get("title", "untitled"),
|
|
entry.get("path", ""),
|
|
count,
|
|
)
|
|
)
|
|
accessed.sort(key=lambda x: x[3], reverse=True)
|
|
|
|
for mid, title, path, count in accessed[:10]:
|
|
lines.append(f"1. [{title}]({path}) - {count} accesses")
|
|
|
|
if not accessed:
|
|
lines.append("- No access data recorded yet.")
|
|
lines.append("")
|
|
|
|
# === Recent Insights section ===
|
|
lines.append("## Recent Insights")
|
|
lines.append("Insight-type memories:")
|
|
lines.append("")
|
|
|
|
insights = []
|
|
for mid, entry in entries.items():
|
|
if entry.get("type") == "insight":
|
|
insights.append((mid, entry))
|
|
# Sort by created date descending
|
|
insights.sort(key=lambda x: x[1].get("created", ""), reverse=True)
|
|
|
|
for mid, entry in insights[:10]:
|
|
title = entry.get("title", "untitled")
|
|
path = entry.get("path", "")
|
|
preview = entry.get("content_preview", "")
|
|
if preview:
|
|
lines.append(f"- [{title}]({path}) - {preview[:80]}")
|
|
else:
|
|
lines.append(f"- [{title}]({path})")
|
|
|
|
if not insights:
|
|
lines.append("- No insight memories stored yet.")
|
|
lines.append("")
|
|
|
|
# === Consolidation History section ===
|
|
lines.append("## Consolidation History")
|
|
lines.append("")
|
|
|
|
merge_count = 0
|
|
if episodes_dir.exists():
|
|
for ep_file in sorted(episodes_dir.glob("*.md")):
|
|
try:
|
|
text = ep_file.read_text(encoding="utf-8")
|
|
for line_text in text.split("\n"):
|
|
stripped = line_text.strip()
|
|
if stripped.startswith("## ") and "merge" in stripped.lower():
|
|
merge_count += 1
|
|
except OSError:
|
|
pass
|
|
|
|
lines.append(f"- Total merges performed: {merge_count}")
|
|
lines.append("")
|
|
|
|
content = "\n".join(lines)
|
|
|
|
# Write REFLECTION.md
|
|
reflection_path = self.memory_dir / "REFLECTION.md"
|
|
reflection_path.write_text(content, encoding="utf-8")
|
|
self._git_commit("reflection: regenerate REFLECTION.md", [reflection_path])
|
|
|
|
return content
|