claude-configs/scripts/session-memory/session_memory.py
Cal Corum 0179143fef Optimize CLAUDE.md and add session-memory scripts
- Slim CLAUDE.md from 161 to 32 lines (80% reduction)
- Move detailed Memory Protocol docs to skill reference
- Add scripts/session-memory/ tooling
- Update settings.json and claude-pulse

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-14 08:09:06 -06:00

375 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Session-end memory hook for Claude Code.
Reads the session transcript, extracts significant events (commits, bug fixes,
architecture decisions, new patterns, configurations), and stores them as
cognitive memories via claude-memory CLI.
"""
import json
import re
import subprocess
import sys
from pathlib import Path
def read_stdin():
"""Read the hook input JSON from stdin."""
try:
return json.loads(sys.stdin.read())
except (json.JSONDecodeError, EOFError):
return {}
def read_transcript(transcript_path: str) -> list[dict]:
"""Read JSONL transcript file into a list of message dicts."""
messages = []
path = Path(transcript_path)
if not path.exists():
return messages
with open(path) as f:
for line in f:
line = line.strip()
if line:
try:
messages.append(json.loads(line))
except json.JSONDecodeError:
continue
return messages
def find_last_memory_command_index(messages: list[dict]) -> int:
"""Find the index of the last message containing a claude-memory command.
Scans for Bash tool_use blocks where the command contains 'claude-memory'
(store, recall, episode, etc). Returns the index of that message so we can
slice the transcript to only process messages after the last memory operation,
avoiding duplicate storage.
Returns -1 if no claude-memory commands were found.
"""
last_index = -1
for i, msg in enumerate(messages):
if msg.get("role") != "assistant":
continue
content = msg.get("content", [])
if not isinstance(content, list):
continue
for block in content:
if not isinstance(block, dict):
continue
if block.get("type") != "tool_use":
continue
if block.get("name") != "Bash":
continue
cmd = block.get("input", {}).get("command", "")
if "claude-memory" in cmd:
last_index = i
return last_index
def extract_text_content(message: dict) -> str:
"""Extract plain text from a message's content blocks."""
content = message.get("content", "")
if isinstance(content, str):
return content
if isinstance(content, list):
parts = []
for block in content:
if isinstance(block, dict):
if block.get("type") == "text":
parts.append(block.get("text", ""))
elif block.get("type") == "tool_result":
# Recurse into tool result content
sub = block.get("content", "")
if isinstance(sub, str):
parts.append(sub)
elif isinstance(sub, list):
for sb in sub:
if isinstance(sb, dict) and sb.get("type") == "text":
parts.append(sb.get("text", ""))
elif isinstance(block, str):
parts.append(block)
return "\n".join(parts)
return ""
def extract_tool_uses(messages: list[dict]) -> list[dict]:
"""Extract all tool_use blocks from assistant messages."""
tool_uses = []
for msg in messages:
if msg.get("role") != "assistant":
continue
content = msg.get("content", [])
if not isinstance(content, list):
continue
for block in content:
if isinstance(block, dict) and block.get("type") == "tool_use":
tool_uses.append(block)
return tool_uses
def find_git_commits(tool_uses: list[dict]) -> list[str]:
"""Find git commit commands from Bash tool uses."""
commits = []
for tu in tool_uses:
if tu.get("name") != "Bash":
continue
cmd = tu.get("input", {}).get("command", "")
if "git commit" in cmd:
commits.append(cmd)
return commits
def find_files_edited(tool_uses: list[dict]) -> set[str]:
"""Find unique files edited via Edit/Write tools."""
files = set()
for tu in tool_uses:
name = tu.get("name", "")
if name in ("Edit", "Write", "MultiEdit"):
fp = tu.get("input", {}).get("file_path", "")
if fp:
files.add(fp)
return files
def find_errors_encountered(messages: list[dict]) -> list[str]:
"""Find error messages from tool results."""
errors = []
for msg in messages:
if msg.get("role") != "user":
continue
content = msg.get("content", [])
if not isinstance(content, list):
continue
for block in content:
if not isinstance(block, dict):
continue
if block.get("type") == "tool_result" and block.get("is_error"):
error_text = extract_text_content({"content": block.get("content", "")})
if error_text and len(error_text) > 10:
errors.append(error_text[:500])
return errors
def detect_project(cwd: str, files_edited: set[str]) -> str:
"""Detect project name from cwd and edited files."""
all_paths = [cwd] + list(files_edited)
project_indicators = {
"major-domo": "major-domo",
"paper-dynasty": "paper-dynasty",
"claude-home": "homelab",
"homelab": "homelab",
".claude": "claude-config",
"openclaw": "openclaw",
"tdarr": "tdarr",
}
for path in all_paths:
for indicator, project in project_indicators.items():
if indicator in path.lower():
return project
# Fall back to last directory component of cwd
return Path(cwd).name
def build_session_summary(messages: list[dict], cwd: str) -> dict | None:
"""Analyze the transcript and build a summary of storable events."""
if len(messages) < 4:
# Too short to be meaningful
return None
tool_uses = extract_tool_uses(messages)
commits = find_git_commits(tool_uses)
files_edited = find_files_edited(tool_uses)
errors = find_errors_encountered(messages)
project = detect_project(cwd, files_edited)
# Collect assistant text for topic extraction
assistant_texts = []
for msg in messages:
if msg.get("role") == "assistant":
text = extract_text_content(msg)
if text:
assistant_texts.append(text)
full_assistant_text = "\n".join(assistant_texts)
# Detect what kind of work was done
work_types = set()
if commits:
work_types.add("commit")
if errors:
work_types.add("debugging")
if any("test" in f.lower() for f in files_edited):
work_types.add("testing")
if any(kw in full_assistant_text.lower() for kw in ["bug", "fix", "error", "issue"]):
work_types.add("fix")
if any(kw in full_assistant_text.lower() for kw in ["refactor", "restructure", "reorganize"]):
work_types.add("refactoring")
if any(kw in full_assistant_text.lower() for kw in ["new feature", "implement", "add support"]):
work_types.add("feature")
if any(kw in full_assistant_text.lower() for kw in ["deploy", "production", "release"]):
work_types.add("deployment")
if any(kw in full_assistant_text.lower() for kw in ["config", "setup", "install", "configure"]):
work_types.add("configuration")
if any(kw in full_assistant_text.lower() for kw in ["hook", "script", "automat"]):
work_types.add("automation")
if not work_types and not files_edited:
# Likely a research/chat session, skip
return None
return {
"project": project,
"work_types": work_types,
"commits": commits,
"files_edited": sorted(files_edited),
"errors": errors[:5], # Cap at 5
"assistant_text_snippet": full_assistant_text[:3000],
"message_count": len(messages),
"tool_use_count": len(tool_uses),
}
def build_memory_content(summary: dict) -> str:
"""Build a concise memory content string from the summary."""
parts = []
if summary["commits"]:
parts.append(f"Commits made: {len(summary['commits'])}")
for c in summary["commits"][:3]:
# Extract commit message
match = re.search(r'-m\s+["\'](.+?)["\']', c)
if not match:
match = re.search(r"<<'?EOF'?\n(.+?)(?:\n|EOF)", c, re.DOTALL)
if match:
parts.append(f" - {match.group(1)[:200]}")
if summary["files_edited"]:
parts.append(f"Files edited ({len(summary['files_edited'])}):")
for f in summary["files_edited"][:10]:
parts.append(f" - {f}")
if summary["errors"]:
parts.append(f"Errors encountered ({len(summary['errors'])}):")
for e in summary["errors"][:3]:
parts.append(f" - {e[:150]}")
work_desc = ", ".join(sorted(summary["work_types"]))
parts.append(f"Work types: {work_desc}")
parts.append(f"Session size: {summary['message_count']} messages, {summary['tool_use_count']} tool calls")
return "\n".join(parts)
def determine_memory_type(summary: dict) -> str:
"""Pick the best memory type based on work done."""
wt = summary["work_types"]
if "fix" in wt or "debugging" in wt:
return "fix"
if "configuration" in wt:
return "configuration"
if "feature" in wt:
return "workflow"
if "refactoring" in wt:
return "code_pattern"
if "deployment" in wt:
return "workflow"
if "automation" in wt:
return "workflow"
return "general"
def build_title(summary: dict) -> str:
"""Generate a descriptive title for the memory."""
project = summary["project"]
work = ", ".join(sorted(summary["work_types"]))
if summary["commits"]:
# Try to use first commit message as title basis
first_commit = summary["commits"][0]
match = re.search(r'-m\s+["\'](.+?)["\']', first_commit)
if not match:
match = re.search(r"<<'?EOF'?\n(.+?)(?:\n|EOF)", first_commit, re.DOTALL)
if match:
msg = match.group(1).split("\n")[0][:80]
return f"[{project}] {msg}"
return f"[{project}] Session: {work}"
def store_memory(summary: dict):
"""Store the session memory via claude-memory CLI."""
title = build_title(summary)
content = build_memory_content(summary)
mem_type = determine_memory_type(summary)
importance = "0.4"
# Boost importance for commits or significant work
if summary["commits"]:
importance = "0.6"
if len(summary["files_edited"]) > 5:
importance = "0.6"
if "deployment" in summary["work_types"]:
importance = "0.7"
# Build tags
tags = [summary["project"]]
tags.extend(sorted(summary["work_types"]))
tags.append("session-log")
tag_str = ",".join(tags)
cmd = [
"claude-memory", "store",
"--type", mem_type,
"--title", title,
"--content", content,
"--tags", tag_str,
"--importance", importance,
"--episode",
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
if result.returncode == 0:
print(f"Session memory stored: {title}", file=sys.stderr)
else:
print(f"Memory store failed: {result.stderr}", file=sys.stderr)
except subprocess.TimeoutExpired:
print("Memory store timed out", file=sys.stderr)
except Exception as e:
print(f"Memory store error: {e}", file=sys.stderr)
def main():
hook_input = read_stdin()
transcript_path = hook_input.get("transcript_path", "")
cwd = hook_input.get("cwd", "")
if not transcript_path:
print("No transcript path provided", file=sys.stderr)
sys.exit(0)
messages = read_transcript(transcript_path)
if not messages:
sys.exit(0)
# Only process messages after the last claude-memory command to avoid
# duplicating memories that were already stored during the session.
cutoff = find_last_memory_command_index(messages)
if cutoff >= 0:
messages = messages[cutoff + 1:]
if not messages:
print("No new messages after last claude-memory command", file=sys.stderr)
sys.exit(0)
summary = build_session_summary(messages, cwd)
if summary is None:
print("Session too short or no significant work detected", file=sys.stderr)
sys.exit(0)
store_memory(summary)
if __name__ == "__main__":
main()