cognitive-memory/analysis.py
Cal Corum 48df2a89ce Initial commit: extract cognitive-memory app from skill directory
Moved application code from ~/.claude/skills/cognitive-memory/ to its own
project directory. The skill layer (SKILL.md, SCHEMA.md) remains in the
skill directory for Claude Code to read.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 16:02:28 -06:00

862 lines
32 KiB
Python

"""
Cognitive Memory - Analysis Mixin
Decay scoring, CORE.md generation, tag analysis, reflection clustering,
memory merging, and REFLECTION.md generation.
"""
import json
import subprocess
import sys
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from common import (
CORE_MAX_CHARS,
MEMORY_DIR,
THRESHOLD_ACTIVE,
THRESHOLD_DORMANT,
THRESHOLD_FADING,
TYPE_WEIGHTS,
calculate_decay_score,
)
class AnalysisMixin:
"""Mixin providing analysis operations for CognitiveMemoryClient."""
def decay(self) -> Dict[str, Any]:
"""Recalculate all decay scores. Updates _state.json."""
index = self._load_index()
state = self._load_state()
now = datetime.now(timezone.utc)
updated_count = 0
for mid, entry in index.get("entries", {}).items():
s = state.setdefault("entries", {}).setdefault(
mid,
{
"access_count": 0,
"last_accessed": entry.get("created", now.isoformat()),
"decay_score": 0.5,
},
)
# Calculate days since last access
last_str = s.get("last_accessed", entry.get("created", ""))
try:
last_dt = datetime.fromisoformat(last_str.replace("Z", "+00:00"))
if last_dt.tzinfo is None:
last_dt = last_dt.replace(tzinfo=timezone.utc)
days = (now - last_dt).total_seconds() / 86400
except (ValueError, AttributeError):
days = 30 # Default to 30 days if unparseable
importance = entry.get("importance", 0.5)
mem_type = entry.get("type", "general")
type_weight = TYPE_WEIGHTS.get(mem_type, 1.0)
access_count = s.get("access_count", 0)
# Check if pinned (vault)
path = entry.get("path", "")
if path.startswith("vault/"):
s["decay_score"] = 999.0 # Pinned memories never decay
else:
s["decay_score"] = round(
calculate_decay_score(importance, days, access_count, type_weight),
4,
)
updated_count += 1
self._save_state(state)
# Summary
state_entries = state.get("entries", {})
return {
"updated": updated_count,
"active": sum(
1
for s in state_entries.values()
if s.get("decay_score", 0) >= THRESHOLD_ACTIVE
),
"fading": sum(
1
for s in state_entries.values()
if THRESHOLD_FADING <= s.get("decay_score", 0) < THRESHOLD_ACTIVE
),
"dormant": sum(
1
for s in state_entries.values()
if THRESHOLD_DORMANT <= s.get("decay_score", 0) < THRESHOLD_FADING
),
"archived": sum(
1
for s in state_entries.values()
if 0 < s.get("decay_score", 0) < THRESHOLD_DORMANT
),
}
def core(self) -> str:
"""Generate CORE.md from top memories by decay score."""
index = self._load_index()
state = self._load_state()
def _extract_summary(body: str, max_len: int = 80) -> str:
"""Extract first meaningful sentence from memory body."""
if not body or not body.strip():
return ""
# Skip leading code blocks, headings, and list markers
for ln in body.strip().split("\n"):
stripped = ln.strip()
if not stripped:
continue
if stripped.startswith("```") or stripped.startswith("#"):
continue
first_line = stripped.lstrip("- *>").strip()
break
else:
return ""
if not first_line:
return ""
# Extract first sentence (split on '. ')
dot_pos = first_line.find(". ")
if 0 < dot_pos < max_len:
sentence = first_line[: dot_pos + 1]
else:
sentence = first_line
if len(sentence) > max_len:
sentence = sentence[: max_len - 3].rstrip() + "..."
return sentence
# Collect all memories with decay scores
memories = []
for mid, entry in index.get("entries", {}).items():
s = state.get("entries", {}).get(mid, {})
decay = s.get("decay_score", 0.5)
if decay < THRESHOLD_FADING:
continue # Skip low-relevance
memories.append(
{
"id": mid,
"title": entry.get("title", ""),
"type": entry.get("type", "general"),
"path": entry.get("path", ""),
"importance": entry.get("importance", 0.5),
"decay_score": decay,
"tags": entry.get("tags", []),
}
)
# Sort by decay score descending
memories.sort(key=lambda x: x["decay_score"], reverse=True)
# Group by type category
categories = {
"Critical Solutions": ["solution"],
"Active Decisions": ["decision"],
"Key Fixes": ["fix"],
"Configurations": ["configuration"],
"Key Procedures": ["procedure"],
"Patterns & Workflows": ["code_pattern", "workflow"],
"Known Issues": ["problem", "error"],
"Insights": ["insight"],
"General": ["general"],
}
now_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
active_count = sum(1 for m in memories if m["decay_score"] >= THRESHOLD_ACTIVE)
total_count = len(index.get("entries", {}))
lines = [
"# Memory Core (auto-generated)",
f"> Last updated: {now_str} | Active memories: {active_count}/{total_count} | Next refresh: daily (systemd timer)",
"",
]
char_count = sum(len(l) for l in lines)
for cat_name, cat_types in categories.items():
cat_memories = [m for m in memories if m["type"] in cat_types]
if not cat_memories:
continue
section_lines = [f"## {cat_name}", ""]
for mem in cat_memories[:10]: # Cap per section (reduced for summaries)
path = mem["path"]
title = mem["title"]
tags = ", ".join(mem["tags"][:3]) if mem["tags"] else ""
# Read body to extract summary
summary = ""
mem_path = self.memory_dir / path
if mem_path.exists():
try:
_, body = self._read_memory_file(mem_path)
summary = _extract_summary(body)
except Exception:
pass
line = f"- [{title}]({path})"
if summary:
line += f" - {summary}"
if tags:
line += f" ({tags})"
section_lines.append(line)
# Check budget
added = sum(len(l) for l in section_lines) + 2
if char_count + added > CORE_MAX_CHARS:
break
section_lines.append("")
section_text = "\n".join(section_lines)
if char_count + len(section_text) > CORE_MAX_CHARS:
break
lines.extend(section_lines)
char_count += len(section_text)
core_content = "\n".join(lines)
core_path = self.memory_dir / "CORE.md"
core_path.write_text(core_content, encoding="utf-8")
self._git_commit("core: regenerate CORE.md", [core_path])
return core_content
def tags_list(self, limit: int = 0) -> List[Dict[str, Any]]:
"""List all tags with occurrence counts across all memories.
Returns list of {"tag": str, "count": int} sorted by count descending.
"""
index = self._load_index()
tag_counts: Dict[str, int] = {}
for entry in index.get("entries", {}).values():
for tag in entry.get("tags", []):
normalized = tag.lower().strip()
if normalized:
tag_counts[normalized] = tag_counts.get(normalized, 0) + 1
results = [{"tag": tag, "count": count} for tag, count in tag_counts.items()]
results.sort(key=lambda x: x["count"], reverse=True)
if limit > 0:
results = results[:limit]
return results
def tags_related(self, tag: str, limit: int = 0) -> List[Dict[str, Any]]:
"""Find tags that co-occur with the given tag.
Returns list of {"tag": str, "co_occurrences": int, "memories_with_both": int}
sorted by co_occurrences descending.
"""
tag = tag.lower().strip()
index = self._load_index()
co_counts: Dict[str, int] = {}
for entry in index.get("entries", {}).values():
entry_tags = [t.lower().strip() for t in entry.get("tags", [])]
if tag not in entry_tags:
continue
for other_tag in entry_tags:
if other_tag != tag and other_tag:
co_counts[other_tag] = co_counts.get(other_tag, 0) + 1
results = [
{"tag": t, "co_occurrences": count, "memories_with_both": count}
for t, count in co_counts.items()
]
results.sort(key=lambda x: x["co_occurrences"], reverse=True)
if limit > 0:
results = results[:limit]
return results
def tags_suggest(self, memory_id: str) -> List[Dict[str, Any]]:
"""Suggest tags for a memory based on co-occurrence patterns.
Returns top 10 suggestions as {"tag": str, "score": int, "reason": str}.
"""
index = self._load_index()
entry = index.get("entries", {}).get(memory_id)
if not entry:
return []
existing_tags = set(t.lower().strip() for t in entry.get("tags", []))
if not existing_tags:
return []
# For each existing tag, find co-occurring tags and accumulate scores
candidate_scores: Dict[str, int] = {}
candidate_sources: Dict[str, set] = {}
for existing_tag in existing_tags:
co_tags = self.tags_related(existing_tag)
for co_entry in co_tags:
candidate = co_entry["tag"]
if candidate in existing_tags:
continue
candidate_scores[candidate] = (
candidate_scores.get(candidate, 0) + co_entry["co_occurrences"]
)
if candidate not in candidate_sources:
candidate_sources[candidate] = set()
candidate_sources[candidate].add(existing_tag)
results = []
for candidate, score in candidate_scores.items():
sources = sorted(candidate_sources[candidate])
reason = "co-occurs with: " + ", ".join(sources)
results.append({"tag": candidate, "score": score, "reason": reason})
results.sort(key=lambda x: x["score"], reverse=True)
return results[:10]
def reflect(
self,
since: Optional[str] = None,
dry_run: bool = False,
) -> Dict[str, Any]:
"""Review recent memories, identify tag-based clusters, and output consolidation recommendations.
Clusters memories that share 2+ tags using iterative union-find merging.
Does NOT auto-create insights; the agent reads output and decides what to store.
Args:
since: ISO date string (YYYY-MM-DD) to filter memories from. Falls back to
_state.json's last_reflection, then 30 days ago.
dry_run: If True, return analysis without updating _state.json or logging episode.
Returns:
Dict with clusters, total_memories_reviewed, clusters_found, and since date.
"""
now = datetime.now(timezone.utc)
state = self._load_state()
# Determine time window
if since:
since_date = since
elif state.get("last_reflection"):
# last_reflection may be a full ISO timestamp; extract date portion
since_date = state["last_reflection"][:10]
else:
since_dt = now - timedelta(days=30)
since_date = since_dt.strftime("%Y-%m-%d")
# Load recent memories from index
index = self._load_index()
recent_memories = []
for mid, entry in index.get("entries", {}).items():
created = entry.get("created", "")
# Compare date portion only (YYYY-MM-DD)
created_date = created[:10] if created else ""
if created_date >= since_date:
recent_memories.append(
{
"id": mid,
"title": entry.get("title", ""),
"type": entry.get("type", "general"),
"tags": [t.lower().strip() for t in entry.get("tags", [])],
}
)
total_reviewed = len(recent_memories)
# Union-find clustering by tag overlap >= 2
# parent[i] = index of parent in recent_memories list
n = len(recent_memories)
parent = list(range(n))
def find(x: int) -> int:
while parent[x] != x:
parent[x] = parent[parent[x]] # path compression
x = parent[x]
return x
def union(a: int, b: int):
ra, rb = find(a), find(b)
if ra != rb:
parent[rb] = ra
# Compare all pairs, check tag overlap
for i in range(n):
tags_i = set(recent_memories[i]["tags"])
for j in range(i + 1, n):
tags_j = set(recent_memories[j]["tags"])
if len(tags_i & tags_j) >= 2:
union(i, j)
# Group by root
clusters_map: Dict[int, List[int]] = {}
for i in range(n):
root = find(i)
clusters_map.setdefault(root, []).append(i)
# Build output clusters (only clusters with 2+ members)
clusters = []
cluster_id = 0
for indices in clusters_map.values():
if len(indices) < 2:
continue
cluster_id += 1
members = []
all_tag_sets = []
types_seen = set()
for idx in indices:
mem = recent_memories[idx]
members.append(
{
"id": mem["id"],
"title": mem["title"],
"type": mem["type"],
"tags": mem["tags"],
}
)
all_tag_sets.append(set(mem["tags"]))
types_seen.add(mem["type"])
# Common tags = intersection of ALL members
common_tags = (
sorted(set.intersection(*all_tag_sets)) if all_tag_sets else []
)
# Shared tags = tags appearing in 2+ members
tag_counts: Dict[str, int] = {}
for ts in all_tag_sets:
for t in ts:
tag_counts[t] = tag_counts.get(t, 0) + 1
shared_tags = sorted(t for t, c in tag_counts.items() if c >= 2)
# Suggested topic
tag_label = (
", ".join(common_tags[:4])
if common_tags
else ", ".join(shared_tags[:4])
)
type_label = ", ".join(sorted(types_seen))
suggested_topic = f"Pattern: {tag_label} across {type_label}"
clusters.append(
{
"cluster_id": cluster_id,
"members": members,
"common_tags": common_tags,
"shared_tags": shared_tags,
"suggested_topic": suggested_topic,
"member_count": len(members),
}
)
# Sort clusters by member count descending
clusters.sort(key=lambda c: c["member_count"], reverse=True)
# Re-number after sorting
for i, c in enumerate(clusters):
c["cluster_id"] = i + 1
result = {
"clusters": clusters,
"total_memories_reviewed": total_reviewed,
"clusters_found": len(clusters),
"since": since_date,
}
if not dry_run:
# Update state with last_reflection timestamp
state["last_reflection"] = now.isoformat()
self._save_state(state)
# Log an episode entry
self.episode(
type="reflection",
title=f"Reflection: {len(clusters)} clusters from {total_reviewed} memories",
tags=["reflection", "cognitive-memory"],
summary=f"Reviewed {total_reviewed} memories since {since_date}, found {len(clusters)} clusters",
)
# Regenerate REFLECTION.md
self.reflection_summary()
return result
def merge(
self,
keep_id: str,
absorb_id: str,
dry_run: bool = False,
) -> Dict[str, Any]:
"""Merge two memories: absorb one into another.
The 'keep' memory absorbs the content, tags, and relations from
the 'absorb' memory. The absorb memory is then deleted.
All other memories referencing absorb_id are updated to point to keep_id.
If dry_run=True, returns what would change without writing anything.
"""
# Resolve both memory file paths
keep_path = self._resolve_memory_path(keep_id)
if not keep_path:
raise ValueError(f"Keep memory not found: {keep_id}")
absorb_path = self._resolve_memory_path(absorb_id)
if not absorb_path:
raise ValueError(f"Absorb memory not found: {absorb_id}")
# Read both memory files
keep_fm, keep_body = self._read_memory_file(keep_path)
absorb_fm, absorb_body = self._read_memory_file(absorb_path)
keep_title = keep_fm.get("title", keep_id[:8])
absorb_title = absorb_fm.get("title", absorb_id[:8])
# Combine content
merged_body = keep_body
if absorb_body.strip():
merged_body = (
keep_body.rstrip()
+ f"\n\n---\n*Merged from: {absorb_title}*\n\n"
+ absorb_body.strip()
)
# Merge tags (sorted, deduplicated)
keep_tags = keep_fm.get("tags", [])
absorb_tags = absorb_fm.get("tags", [])
merged_tags = sorted(list(set(keep_tags + absorb_tags)))
# importance = max of both
merged_importance = max(
keep_fm.get("importance", 0.5),
absorb_fm.get("importance", 0.5),
)
# Merge relations: combine both relation lists
keep_rels = list(keep_fm.get("relations", []))
absorb_rels = list(absorb_fm.get("relations", []))
combined_rels = keep_rels + absorb_rels
# Replace any relation targeting absorb_id with keep_id
for rel in combined_rels:
if rel.get("target") == absorb_id:
rel["target"] = keep_id
# Deduplicate by (target, type) tuple
seen = set()
deduped_rels = []
for rel in combined_rels:
key = (rel.get("target"), rel.get("type"))
if key not in seen:
seen.add(key)
deduped_rels.append(rel)
# Remove self-referential relations (target == keep_id)
merged_rels = [r for r in deduped_rels if r.get("target") != keep_id]
# Scan all other memories for relations targeting absorb_id
index = self._load_index()
updated_others = []
for mid, entry in index.get("entries", {}).items():
if mid in (keep_id, absorb_id):
continue
rels = entry.get("relations", [])
needs_update = any(r.get("target") == absorb_id for r in rels)
if needs_update:
updated_others.append(mid)
if dry_run:
return {
"dry_run": True,
"keep_id": keep_id,
"keep_title": keep_title,
"absorb_id": absorb_id,
"absorb_title": absorb_title,
"merged_tags": merged_tags,
"merged_importance": merged_importance,
"merged_relations_count": len(merged_rels),
"other_memories_updated": len(updated_others),
"other_memory_ids": updated_others,
}
# Write updated keep memory file
now = datetime.now(timezone.utc).isoformat()
keep_fm["tags"] = merged_tags
keep_fm["importance"] = merged_importance
keep_fm["relations"] = merged_rels
keep_fm["updated"] = now
self._write_memory_file(keep_path, keep_fm, merged_body)
# Update index for keep memory (refresh content_preview with merged body)
keep_rel_path = str(keep_path.relative_to(self.memory_dir))
preview = merged_body.strip()[:200]
if len(merged_body.strip()) > 200:
last_space = preview.rfind(" ")
if last_space > 0:
preview = preview[:last_space]
self._update_index_entry(
keep_id, keep_fm, keep_rel_path, content_preview=preview
)
# Update all other memories that reference absorb_id
for mid in updated_others:
other_path = self._resolve_memory_path(mid)
if not other_path:
continue
try:
other_fm, other_body = self._read_memory_file(other_path)
other_rels = other_fm.get("relations", [])
for rel in other_rels:
if rel.get("target") == absorb_id:
rel["target"] = keep_id
# Deduplicate after replacement
seen_other = set()
deduped_other = []
for rel in other_rels:
key = (rel.get("target"), rel.get("type"))
if key not in seen_other:
seen_other.add(key)
deduped_other.append(rel)
# Remove self-referential
other_fm["relations"] = [
r for r in deduped_other if r.get("target") != mid
]
other_fm["updated"] = now
self._write_memory_file(other_path, other_fm, other_body)
other_rel_path = str(other_path.relative_to(self.memory_dir))
self._update_index_entry(mid, other_fm, other_rel_path)
except Exception:
pass
# Delete absorb memory file and remove from index/state
absorb_path.unlink()
self._remove_index_entry(absorb_id)
state = self._load_state()
state.get("entries", {}).pop(absorb_id, None)
self._save_state(state)
# Git: stage absorb deletion
try:
absorb_rel = absorb_path.relative_to(self.memory_dir)
subprocess.run(
["git", "rm", "--cached", str(absorb_rel)],
cwd=str(self.memory_dir),
capture_output=True,
timeout=5,
)
except Exception:
pass
self._git_commit(f"merge: {keep_title} absorbed {absorb_title}")
return {
"success": True,
"keep_id": keep_id,
"keep_title": keep_title,
"absorb_id": absorb_id,
"absorb_title": absorb_title,
"merged_tags": merged_tags,
"merged_importance": merged_importance,
"merged_relations_count": len(merged_rels),
"other_memories_updated": len(updated_others),
}
def reflection_summary(self) -> str:
"""Generate REFLECTION.md with patterns and insights from the memory system.
Writes to REFLECTION.md in the memory data directory and git-commits the file.
Returns the generated content.
"""
index = self._load_index()
state = self._load_state()
entries = index.get("entries", {})
state_entries = state.get("entries", {})
now_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
# Last reflection date
last_reflection = state.get("last_reflection", "")
last_reflection_date = last_reflection[:10] if last_reflection else "never"
# Count total reflections from episode files
total_reflections = 0
episodes_dir = self.memory_dir / "episodes"
if episodes_dir.exists():
for ep_file in sorted(episodes_dir.glob("*.md")):
try:
text = ep_file.read_text(encoding="utf-8")
# Count lines that look like reflection episode entries
for line in text.split("\n"):
if "Reflection:" in line and line.strip().startswith("## "):
total_reflections += 1
except OSError:
pass
lines = [
"# Reflection Summary (auto-generated)",
f"> Last updated: {now_str} | Last reflection: {last_reflection_date} | Total reflections: {total_reflections}",
"",
]
# === Themes section ===
# Get top tags, then find co-occurrence pairs
lines.append("## Themes")
lines.append("Top tag co-occurrences revealing recurring themes:")
lines.append("")
top_tags = self.tags_list(limit=10)
# Build co-occurrence pairs across top tags
pair_data: Dict[Tuple[str, str], List[str]] = {} # (tag1, tag2) -> [titles]
for tag_info in top_tags[:5]:
tag = tag_info["tag"]
co_tags = self.tags_related(tag, limit=5)
for co in co_tags:
other = co["tag"]
# Canonical pair ordering
pair = tuple(sorted([tag, other]))
if pair in pair_data:
continue
# Find memories with both tags and collect titles
titles = []
for mid, entry in entries.items():
mem_tags = [t.lower().strip() for t in entry.get("tags", [])]
if pair[0] in mem_tags and pair[1] in mem_tags:
titles.append(entry.get("title", "untitled"))
if titles:
pair_data[pair] = titles
# Sort pairs by memory count descending, show top 8
sorted_pairs = sorted(pair_data.items(), key=lambda x: len(x[1]), reverse=True)
for (tag_a, tag_b), titles in sorted_pairs[:8]:
example_titles = ", ".join(f'"{t}"' for t in titles[:3])
lines.append(
f"- **{tag_a} + {tag_b}**: {len(titles)} memories ({example_titles})"
)
if not sorted_pairs:
lines.append("- No co-occurrence data available yet.")
lines.append("")
# === Cross-Project Patterns section ===
lines.append("## Cross-Project Patterns")
lines.append("Tags that span multiple projects:")
lines.append("")
known_projects = [
"major-domo",
"paper-dynasty",
"homelab",
"vagabond-rpg",
"foundryvtt",
"strat-gameplay-webapp",
]
# Build tag -> {project -> count} mapping
tag_project_map: Dict[str, Dict[str, int]] = {}
for mid, entry in entries.items():
mem_tags = [t.lower().strip() for t in entry.get("tags", [])]
# Identify which projects this memory belongs to
mem_projects = [t for t in mem_tags if t in known_projects]
# For non-project tags, record which projects they appear in
non_project_tags = [t for t in mem_tags if t not in known_projects]
for tag in non_project_tags:
if tag not in tag_project_map:
tag_project_map[tag] = {}
for proj in mem_projects:
tag_project_map[tag][proj] = tag_project_map[tag].get(proj, 0) + 1
# Filter to tags spanning 2+ projects, sort by project count then total
cross_project = []
for tag, projects in tag_project_map.items():
if len(projects) >= 2:
total = sum(projects.values())
cross_project.append((tag, projects, total))
cross_project.sort(key=lambda x: (len(x[1]), x[2]), reverse=True)
for tag, projects, total in cross_project[:10]:
proj_parts = ", ".join(
f"{p} ({c})"
for p, c in sorted(projects.items(), key=lambda x: x[1], reverse=True)
)
lines.append(f"- **{tag}**: appears in {proj_parts}")
if not cross_project:
lines.append("- No cross-project patterns detected yet.")
lines.append("")
# === Most Accessed section ===
lines.append("## Most Accessed")
lines.append("Top 10 memories by access count:")
lines.append("")
# Sort state entries by access_count descending
accessed = []
for mid, s in state_entries.items():
count = s.get("access_count", 0)
if count > 0:
entry = entries.get(mid)
if entry:
accessed.append(
(
mid,
entry.get("title", "untitled"),
entry.get("path", ""),
count,
)
)
accessed.sort(key=lambda x: x[3], reverse=True)
for mid, title, path, count in accessed[:10]:
lines.append(f"1. [{title}]({path}) - {count} accesses")
if not accessed:
lines.append("- No access data recorded yet.")
lines.append("")
# === Recent Insights section ===
lines.append("## Recent Insights")
lines.append("Insight-type memories:")
lines.append("")
insights = []
for mid, entry in entries.items():
if entry.get("type") == "insight":
insights.append((mid, entry))
# Sort by created date descending
insights.sort(key=lambda x: x[1].get("created", ""), reverse=True)
for mid, entry in insights[:10]:
title = entry.get("title", "untitled")
path = entry.get("path", "")
preview = entry.get("content_preview", "")
if preview:
lines.append(f"- [{title}]({path}) - {preview[:80]}")
else:
lines.append(f"- [{title}]({path})")
if not insights:
lines.append("- No insight memories stored yet.")
lines.append("")
# === Consolidation History section ===
lines.append("## Consolidation History")
lines.append("")
merge_count = 0
if episodes_dir.exists():
for ep_file in sorted(episodes_dir.glob("*.md")):
try:
text = ep_file.read_text(encoding="utf-8")
for line_text in text.split("\n"):
stripped = line_text.strip()
if stripped.startswith("## ") and "merge" in stripped.lower():
merge_count += 1
except OSError:
pass
lines.append(f"- Total merges performed: {merge_count}")
lines.append("")
content = "\n".join(lines)
# Write REFLECTION.md
reflection_path = self.memory_dir / "REFLECTION.md"
reflection_path.write_text(content, encoding="utf-8")
self._git_commit("reflection: regenerate REFLECTION.md", [reflection_path])
return content