Non-default graphs were second-class citizens — timers only maintained the default graph, git sync ignored named graphs, there was no way to create a graph without editing config manually, cross-graph edge errors were confusing, and utility scripts were hardcoded to the default graph. - Add `graph-create` CLI command + `create_graph()` in common.py, with custom path registration written to the default graph's _config.json - Add `scripts/maintain-all-graphs.sh` to loop decay/core/embed/reflect over all discovered graphs; update systemd services to call it - Refactor `memory-git-sync.sh` into sync_repo() function that iterates default + all named graphs with .git directories - Improve cross-graph edge ValueError to explain the same-graph constraint - Add --graph flag to edge-proposer.py and session_memory.py - Update systemd/README.md with portable paths and new architecture Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
615 lines
21 KiB
Python
Executable File
615 lines
21 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Session-end memory hook for Claude Code.
|
|
|
|
Reads the session transcript, extracts significant events (commits, bug fixes,
|
|
architecture decisions, new patterns, configurations), and stores them as
|
|
cognitive memories via claude-memory CLI.
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
LOG_FILE = Path("/tmp/session-memory-hook.log")
|
|
|
|
|
|
def log(msg: str):
|
|
"""Append a timestamped message to the hook log file."""
|
|
with open(LOG_FILE, "a") as f:
|
|
f.write(f"{datetime.now().isoformat(timespec='seconds')} {msg}\n")
|
|
|
|
|
|
def log_separator():
|
|
"""Write a visual separator to the log for readability between sessions."""
|
|
with open(LOG_FILE, "a") as f:
|
|
f.write(f"\n{'='*72}\n")
|
|
f.write(
|
|
f" SESSION MEMORY HOOK — {datetime.now().isoformat(timespec='seconds')}\n"
|
|
)
|
|
f.write(f"{'='*72}\n")
|
|
|
|
|
|
def read_stdin():
|
|
"""Read the hook input JSON from stdin."""
|
|
try:
|
|
raw = sys.stdin.read()
|
|
log(f"[stdin] Raw input length: {len(raw)} chars")
|
|
data = json.loads(raw)
|
|
log(f"[stdin] Parsed keys: {list(data.keys())}")
|
|
return data
|
|
except (json.JSONDecodeError, EOFError) as e:
|
|
log(f"[stdin] ERROR: Failed to parse input: {e}")
|
|
return {}
|
|
|
|
|
|
def read_transcript(transcript_path: str) -> list[dict]:
|
|
"""Read JSONL transcript file into a list of normalized message dicts.
|
|
|
|
Claude Code transcripts use a wrapper format where each line is:
|
|
{"type": "user"|"assistant"|..., "message": {"role": ..., "content": ...}, ...}
|
|
This function unwraps them into the inner {"role": ..., "content": ...} dicts
|
|
that the rest of the code expects. Non-message entries (like file-history-snapshot)
|
|
are filtered out.
|
|
"""
|
|
messages = []
|
|
path = Path(transcript_path)
|
|
if not path.exists():
|
|
log(f"[transcript] ERROR: File does not exist: {transcript_path}")
|
|
return messages
|
|
file_size = path.stat().st_size
|
|
log(f"[transcript] Reading {transcript_path} ({file_size} bytes)")
|
|
parse_errors = 0
|
|
skipped_types = {}
|
|
line_num = 0
|
|
with open(path) as f:
|
|
for line_num, line in enumerate(f, 1):
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
raw = json.loads(line)
|
|
except json.JSONDecodeError:
|
|
parse_errors += 1
|
|
continue
|
|
|
|
# Claude Code transcript format: wrapper with "type" and "message" keys
|
|
# Unwrap to get the inner message dict with "role" and "content"
|
|
if "message" in raw and isinstance(raw["message"], dict):
|
|
inner = raw["message"]
|
|
# Carry over the wrapper type for logging
|
|
wrapper_type = raw.get("type", "unknown")
|
|
if "role" not in inner:
|
|
inner["role"] = wrapper_type
|
|
messages.append(inner)
|
|
elif "role" in raw:
|
|
# Already in the expected format (future-proofing)
|
|
messages.append(raw)
|
|
else:
|
|
# Non-message entry (file-history-snapshot, etc.)
|
|
entry_type = raw.get("type", "unknown")
|
|
skipped_types[entry_type] = skipped_types.get(entry_type, 0) + 1
|
|
|
|
if parse_errors:
|
|
log(f"[transcript] WARNING: {parse_errors} lines failed to parse")
|
|
if skipped_types:
|
|
log(f"[transcript] Skipped non-message entries: {skipped_types}")
|
|
log(f"[transcript] Loaded {len(messages)} messages from {line_num} lines")
|
|
|
|
# Log role breakdown
|
|
role_counts = {}
|
|
for msg in messages:
|
|
role = msg.get("role", "unknown")
|
|
role_counts[role] = role_counts.get(role, 0) + 1
|
|
log(f"[transcript] Role breakdown: {role_counts}")
|
|
|
|
return messages
|
|
|
|
|
|
def _is_memory_tool_use(block: dict) -> str | None:
|
|
"""Check if a tool_use block is a memory operation.
|
|
|
|
Detects both CLI calls (Bash with 'claude-memory') and MCP tool calls
|
|
(mcp__cognitive-memory__memory_*). Returns a short description of the
|
|
match or None.
|
|
"""
|
|
name = block.get("name", "")
|
|
|
|
# MCP tool calls: mcp__cognitive-memory__memory_store, memory_recall, etc.
|
|
if name.startswith("mcp__cognitive-memory__memory_"):
|
|
return f"MCP:{name}"
|
|
|
|
# Legacy/CLI: Bash commands containing 'claude-memory'
|
|
if name == "Bash":
|
|
cmd = block.get("input", {}).get("command", "")
|
|
if "claude-memory" in cmd:
|
|
return f"CLI:{cmd[:100]}"
|
|
|
|
return None
|
|
|
|
|
|
def find_last_memory_command_index(messages: list[dict]) -> int:
|
|
"""Find the index of the last message containing a memory operation.
|
|
|
|
Scans for both MCP tool calls (mcp__cognitive-memory__memory_*) and
|
|
Bash tool_use blocks where the command contains 'claude-memory'.
|
|
Returns the index of that message so we can slice the transcript to
|
|
only process messages after the last memory operation, avoiding
|
|
duplicate storage.
|
|
|
|
Returns -1 if no memory operations were found.
|
|
"""
|
|
last_index = -1
|
|
found_commands = []
|
|
for i, msg in enumerate(messages):
|
|
if msg.get("role") != "assistant":
|
|
continue
|
|
content = msg.get("content", [])
|
|
if not isinstance(content, list):
|
|
continue
|
|
for block in content:
|
|
if not isinstance(block, dict):
|
|
continue
|
|
if block.get("type") != "tool_use":
|
|
continue
|
|
match = _is_memory_tool_use(block)
|
|
if match:
|
|
last_index = i
|
|
found_commands.append(f"msg[{i}]: {match}")
|
|
if found_commands:
|
|
log(f"[cutoff] Found {len(found_commands)} memory operations:")
|
|
for fc in found_commands:
|
|
log(f"[cutoff] {fc}")
|
|
log(f"[cutoff] Will slice after message index {last_index}")
|
|
else:
|
|
log("[cutoff] No memory operations found — processing full transcript")
|
|
return last_index
|
|
|
|
|
|
def extract_text_content(message: dict) -> str:
|
|
"""Extract plain text from a message's content blocks."""
|
|
content = message.get("content", "")
|
|
if isinstance(content, str):
|
|
return content
|
|
if isinstance(content, list):
|
|
parts = []
|
|
for block in content:
|
|
if isinstance(block, dict):
|
|
if block.get("type") == "text":
|
|
parts.append(block.get("text", ""))
|
|
elif block.get("type") == "tool_result":
|
|
# Recurse into tool result content
|
|
sub = block.get("content", "")
|
|
if isinstance(sub, str):
|
|
parts.append(sub)
|
|
elif isinstance(sub, list):
|
|
for sb in sub:
|
|
if isinstance(sb, dict) and sb.get("type") == "text":
|
|
parts.append(sb.get("text", ""))
|
|
elif isinstance(block, str):
|
|
parts.append(block)
|
|
return "\n".join(parts)
|
|
return ""
|
|
|
|
|
|
def extract_tool_uses(messages: list[dict]) -> list[dict]:
|
|
"""Extract all tool_use blocks from assistant messages."""
|
|
tool_uses = []
|
|
for msg in messages:
|
|
if msg.get("role") != "assistant":
|
|
continue
|
|
content = msg.get("content", [])
|
|
if not isinstance(content, list):
|
|
continue
|
|
for block in content:
|
|
if isinstance(block, dict) and block.get("type") == "tool_use":
|
|
tool_uses.append(block)
|
|
|
|
# Log tool use breakdown
|
|
tool_counts = {}
|
|
for tu in tool_uses:
|
|
name = tu.get("name", "unknown")
|
|
tool_counts[name] = tool_counts.get(name, 0) + 1
|
|
log(f"[tools] Extracted {len(tool_uses)} tool uses: {tool_counts}")
|
|
|
|
return tool_uses
|
|
|
|
|
|
def find_git_commits(tool_uses: list[dict]) -> list[str]:
|
|
"""Find git commit commands from Bash tool uses."""
|
|
commits = []
|
|
for tu in tool_uses:
|
|
if tu.get("name") != "Bash":
|
|
continue
|
|
cmd = tu.get("input", {}).get("command", "")
|
|
if "git commit" in cmd:
|
|
commits.append(cmd)
|
|
log(f"[commits] Found {len(commits)} git commit commands")
|
|
return commits
|
|
|
|
|
|
def find_files_edited(tool_uses: list[dict]) -> set[str]:
|
|
"""Find unique files edited via Edit/Write tools."""
|
|
files = set()
|
|
for tu in tool_uses:
|
|
name = tu.get("name", "")
|
|
if name in ("Edit", "Write", "MultiEdit"):
|
|
fp = tu.get("input", {}).get("file_path", "")
|
|
if fp:
|
|
files.add(fp)
|
|
log(f"[files] Found {len(files)} edited files:")
|
|
for f in sorted(files):
|
|
log(f"[files] {f}")
|
|
return files
|
|
|
|
|
|
def find_errors_encountered(messages: list[dict]) -> list[str]:
|
|
"""Find error messages from tool results."""
|
|
errors = []
|
|
for msg in messages:
|
|
if msg.get("role") != "user":
|
|
continue
|
|
content = msg.get("content", [])
|
|
if not isinstance(content, list):
|
|
continue
|
|
for block in content:
|
|
if not isinstance(block, dict):
|
|
continue
|
|
if block.get("type") == "tool_result" and block.get("is_error"):
|
|
error_text = extract_text_content({"content": block.get("content", "")})
|
|
if error_text and len(error_text) > 10:
|
|
errors.append(error_text[:500])
|
|
log(f"[errors] Found {len(errors)} error tool results")
|
|
return errors
|
|
|
|
|
|
def detect_project(cwd: str, files_edited: set[str]) -> str:
|
|
"""Detect project name from cwd and edited files."""
|
|
all_paths = [cwd] + list(files_edited)
|
|
project_indicators = {
|
|
"major-domo": "major-domo",
|
|
"paper-dynasty": "paper-dynasty",
|
|
"claude-home": "homelab",
|
|
"homelab": "homelab",
|
|
".claude": "claude-config",
|
|
"openclaw": "openclaw",
|
|
"tdarr": "tdarr",
|
|
}
|
|
for path in all_paths:
|
|
for indicator, project in project_indicators.items():
|
|
if indicator in path.lower():
|
|
log(
|
|
f"[project] Detected '{project}' from path containing '{indicator}': {path}"
|
|
)
|
|
return project
|
|
# Fall back to last directory component of cwd
|
|
fallback = Path(cwd).name
|
|
log(f"[project] No indicator matched, falling back to cwd name: {fallback}")
|
|
return fallback
|
|
|
|
|
|
def build_session_summary(messages: list[dict], cwd: str) -> dict | None:
|
|
"""Analyze the transcript and build a summary of storable events."""
|
|
log(f"[summary] Building summary from {len(messages)} messages, cwd={cwd}")
|
|
|
|
if len(messages) < 4:
|
|
log(f"[summary] SKIP: only {len(messages)} messages, need at least 4")
|
|
return "too_short"
|
|
|
|
tool_uses = extract_tool_uses(messages)
|
|
commits = find_git_commits(tool_uses)
|
|
files_edited = find_files_edited(tool_uses)
|
|
errors = find_errors_encountered(messages)
|
|
project = detect_project(cwd, files_edited)
|
|
|
|
# Collect assistant text for topic extraction
|
|
assistant_texts = []
|
|
for msg in messages:
|
|
if msg.get("role") == "assistant":
|
|
text = extract_text_content(msg)
|
|
if text:
|
|
assistant_texts.append(text)
|
|
|
|
full_assistant_text = "\n".join(assistant_texts)
|
|
log(
|
|
f"[summary] Assistant text: {len(full_assistant_text)} chars from {len(assistant_texts)} messages"
|
|
)
|
|
|
|
# Detect what kind of work was done
|
|
work_types = set()
|
|
keyword_checks = {
|
|
"commit": lambda: bool(commits),
|
|
"debugging": lambda: bool(errors),
|
|
"testing": lambda: any("test" in f.lower() for f in files_edited),
|
|
"fix": lambda: any(
|
|
kw in full_assistant_text.lower() for kw in ["bug", "fix", "error", "issue"]
|
|
),
|
|
"refactoring": lambda: any(
|
|
kw in full_assistant_text.lower()
|
|
for kw in ["refactor", "restructure", "reorganize"]
|
|
),
|
|
"feature": lambda: any(
|
|
kw in full_assistant_text.lower()
|
|
for kw in ["new feature", "implement", "add support"]
|
|
),
|
|
"deployment": lambda: any(
|
|
kw in full_assistant_text.lower()
|
|
for kw in ["deploy", "production", "release"]
|
|
),
|
|
"configuration": lambda: any(
|
|
kw in full_assistant_text.lower()
|
|
for kw in ["config", "setup", "install", "configure"]
|
|
),
|
|
"automation": lambda: any(
|
|
kw in full_assistant_text.lower() for kw in ["hook", "script", "automat"]
|
|
),
|
|
"tooling": lambda: any(
|
|
kw in full_assistant_text.lower()
|
|
for kw in [
|
|
"skill",
|
|
"command",
|
|
"slash command",
|
|
"commit-push",
|
|
"claude code command",
|
|
]
|
|
),
|
|
"creation": lambda: any(
|
|
kw in full_assistant_text.lower()
|
|
for kw in ["create a ", "created", "new file", "wrote a"]
|
|
),
|
|
}
|
|
|
|
for work_type, check_fn in keyword_checks.items():
|
|
matched = check_fn()
|
|
if matched:
|
|
work_types.add(work_type)
|
|
log(f"[work_type] MATCH: {work_type}")
|
|
else:
|
|
log(f"[work_type] no match: {work_type}")
|
|
|
|
if not work_types and not files_edited:
|
|
log("[summary] SKIP: no work types detected and no files edited")
|
|
# Log a snippet of assistant text to help debug missed keywords
|
|
snippet = full_assistant_text[:500].replace("\n", " ")
|
|
log(f"[summary] Assistant text preview: {snippet}")
|
|
return "no_work"
|
|
|
|
log(
|
|
f"[summary] Result: project={project}, work_types={sorted(work_types)}, "
|
|
f"commits={len(commits)}, files={len(files_edited)}, errors={len(errors)}"
|
|
)
|
|
|
|
return {
|
|
"project": project,
|
|
"work_types": work_types,
|
|
"commits": commits,
|
|
"files_edited": sorted(files_edited),
|
|
"errors": errors[:5], # Cap at 5
|
|
"assistant_text_snippet": full_assistant_text[:3000],
|
|
"message_count": len(messages),
|
|
"tool_use_count": len(tool_uses),
|
|
}
|
|
|
|
|
|
def build_memory_content(summary: dict) -> str:
|
|
"""Build a concise memory content string from the summary."""
|
|
parts = []
|
|
|
|
if summary["commits"]:
|
|
parts.append(f"Commits made: {len(summary['commits'])}")
|
|
for c in summary["commits"][:3]:
|
|
msg = extract_commit_message(c)
|
|
if msg:
|
|
parts.append(f" - {msg}")
|
|
|
|
if summary["files_edited"]:
|
|
parts.append(f"Files edited ({len(summary['files_edited'])}):")
|
|
for f in summary["files_edited"][:10]:
|
|
parts.append(f" - {f}")
|
|
|
|
if summary["errors"]:
|
|
parts.append(f"Errors encountered ({len(summary['errors'])}):")
|
|
for e in summary["errors"][:3]:
|
|
parts.append(f" - {e[:150]}")
|
|
|
|
work_desc = ", ".join(sorted(summary["work_types"]))
|
|
parts.append(f"Work types: {work_desc}")
|
|
parts.append(
|
|
f"Session size: {summary['message_count']} messages, {summary['tool_use_count']} tool calls"
|
|
)
|
|
|
|
return "\n".join(parts)
|
|
|
|
|
|
def determine_memory_type(summary: dict) -> str:
|
|
"""Pick the best memory type based on work done."""
|
|
wt = summary["work_types"]
|
|
if "fix" in wt or "debugging" in wt:
|
|
return "fix"
|
|
if "configuration" in wt:
|
|
return "configuration"
|
|
if "feature" in wt:
|
|
return "workflow"
|
|
if "refactoring" in wt:
|
|
return "code_pattern"
|
|
if "deployment" in wt:
|
|
return "workflow"
|
|
if "automation" in wt or "tooling" in wt:
|
|
return "workflow"
|
|
if "creation" in wt:
|
|
return "workflow"
|
|
return "general"
|
|
|
|
|
|
def extract_commit_message(commit_cmd: str) -> str | None:
|
|
"""Extract the commit message from a git commit command string.
|
|
|
|
Handles both simple quoted (-m "msg") and heredoc (-m "$(cat <<'EOF'...EOF)")
|
|
formats. Tries heredoc first since that's the standard Claude Code format.
|
|
"""
|
|
# Try heredoc format first (standard Claude Code format)
|
|
match = re.search(r"<<'?EOF'?\n(.+?)(?:\nEOF|\n\s*EOF)", commit_cmd, re.DOTALL)
|
|
if match:
|
|
# Get first non-empty line as the message
|
|
for line in match.group(1).strip().split("\n"):
|
|
line = line.strip()
|
|
if line and not line.startswith("Co-Authored-By:"):
|
|
return line[:200]
|
|
|
|
# Fall back to simple quoted message (matching same quote type)
|
|
match = re.search(r'-m\s+"([^"]+)"', commit_cmd)
|
|
if not match:
|
|
match = re.search(r"-m\s+'([^']+)'", commit_cmd)
|
|
if match:
|
|
return match.group(1).split("\n")[0][:200]
|
|
|
|
return None
|
|
|
|
|
|
def build_title(summary: dict) -> str:
|
|
"""Generate a descriptive title for the memory."""
|
|
project = summary["project"]
|
|
work = ", ".join(sorted(summary["work_types"]))
|
|
if summary["commits"]:
|
|
msg = extract_commit_message(summary["commits"][0])
|
|
if msg:
|
|
return f"[{project}] {msg}"
|
|
return f"[{project}] Session: {work}"
|
|
|
|
|
|
def store_memory(summary: dict, graph: str | None = None):
|
|
"""Store the session memory via claude-memory CLI.
|
|
|
|
Args:
|
|
summary: Session summary dict from build_session_summary().
|
|
graph: Named memory graph to store into, or None for the default graph.
|
|
"""
|
|
title = build_title(summary)
|
|
content = build_memory_content(summary)
|
|
mem_type = determine_memory_type(summary)
|
|
importance = "0.4"
|
|
|
|
# Boost importance for commits or significant work
|
|
if summary["commits"]:
|
|
importance = "0.6"
|
|
if len(summary["files_edited"]) > 5:
|
|
importance = "0.6"
|
|
if "deployment" in summary["work_types"]:
|
|
importance = "0.7"
|
|
|
|
# Build tags
|
|
tags = [summary["project"]]
|
|
tags.extend(sorted(summary["work_types"]))
|
|
tags.append("session-log")
|
|
tag_str = ",".join(tags)
|
|
|
|
# Base command: optionally target a named graph
|
|
cmd = ["claude-memory"]
|
|
if graph:
|
|
cmd += ["--graph", graph]
|
|
cmd += [
|
|
"store",
|
|
"--type",
|
|
mem_type,
|
|
"--title",
|
|
title,
|
|
"--content",
|
|
content,
|
|
"--tags",
|
|
tag_str,
|
|
"--importance",
|
|
importance,
|
|
"--episode",
|
|
]
|
|
|
|
log(f"[store] Memory type: {mem_type}, importance: {importance}")
|
|
log(f"[store] Title: {title}")
|
|
log(f"[store] Tags: {tag_str}")
|
|
log(f"[store] Content length: {len(content)} chars")
|
|
log(f"[store] Command: {' '.join(cmd)}")
|
|
|
|
try:
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
|
|
if result.returncode == 0:
|
|
log(f"[store] SUCCESS: {title}")
|
|
if result.stdout.strip():
|
|
log(f"[store] stdout: {result.stdout.strip()[:200]}")
|
|
else:
|
|
log(f"[store] FAILED (rc={result.returncode}): {result.stderr.strip()}")
|
|
if result.stdout.strip():
|
|
log(f"[store] stdout: {result.stdout.strip()[:200]}")
|
|
except subprocess.TimeoutExpired:
|
|
log("[store] FAILED: claude-memory timed out after 10s")
|
|
except FileNotFoundError:
|
|
log("[store] FAILED: claude-memory command not found in PATH")
|
|
except Exception as e:
|
|
log(f"[store] FAILED: {type(e).__name__}: {e}")
|
|
|
|
|
|
def main():
|
|
log_separator()
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description="Session-end memory hook: store session events as cognitive memories.",
|
|
add_help=False, # keep --help available but don't conflict with hook stdin
|
|
)
|
|
parser.add_argument(
|
|
"--graph",
|
|
default=None,
|
|
metavar="NAME",
|
|
help=(
|
|
"Named memory graph to store memories into (default: the default graph). "
|
|
"Use 'claude-memory graphs' to list available graphs."
|
|
),
|
|
)
|
|
parser.add_argument(
|
|
"--help", "-h", action="help", help="Show this help message and exit."
|
|
)
|
|
args, _ = parser.parse_known_args()
|
|
|
|
hook_input = read_stdin()
|
|
transcript_path = hook_input.get("transcript_path", "")
|
|
cwd = hook_input.get("cwd", "")
|
|
|
|
log(f"[main] cwd: {cwd}")
|
|
log(f"[main] transcript_path: {transcript_path}")
|
|
|
|
if not transcript_path:
|
|
log("[main] ABORT: no transcript path provided")
|
|
sys.exit(0)
|
|
|
|
messages = read_transcript(transcript_path)
|
|
if not messages:
|
|
log("[main] ABORT: empty transcript")
|
|
sys.exit(0)
|
|
|
|
total_messages = len(messages)
|
|
|
|
# Only process messages after the last claude-memory command to avoid
|
|
# duplicating memories that were already stored during the session.
|
|
cutoff = find_last_memory_command_index(messages)
|
|
if cutoff >= 0:
|
|
messages = messages[cutoff + 1 :]
|
|
log(f"[main] After cutoff: {len(messages)} of {total_messages} messages remain")
|
|
if not messages:
|
|
log("[main] ABORT: no new messages after last claude-memory command")
|
|
sys.exit(0)
|
|
else:
|
|
log(f"[main] Processing all {total_messages} messages (no cutoff)")
|
|
|
|
summary = build_session_summary(messages, cwd)
|
|
if not isinstance(summary, dict):
|
|
log(f"[main] ABORT: build_session_summary returned '{summary}'")
|
|
sys.exit(0)
|
|
|
|
store_memory(summary, graph=args.graph)
|
|
log("[main] Done")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|