#!/usr/bin/env python3 """ Session-end memory hook for Claude Code. Reads the session transcript, extracts significant events (commits, bug fixes, architecture decisions, new patterns, configurations), and stores them as cognitive memories via claude-memory CLI. """ import argparse import json import re import subprocess import sys from datetime import datetime from pathlib import Path LOG_FILE = Path("/tmp/session-memory-hook.log") def log(msg: str): """Append a timestamped message to the hook log file.""" with open(LOG_FILE, "a") as f: f.write(f"{datetime.now().isoformat(timespec='seconds')} {msg}\n") def log_separator(): """Write a visual separator to the log for readability between sessions.""" with open(LOG_FILE, "a") as f: f.write(f"\n{'='*72}\n") f.write( f" SESSION MEMORY HOOK — {datetime.now().isoformat(timespec='seconds')}\n" ) f.write(f"{'='*72}\n") def read_stdin(): """Read the hook input JSON from stdin.""" try: raw = sys.stdin.read() log(f"[stdin] Raw input length: {len(raw)} chars") data = json.loads(raw) log(f"[stdin] Parsed keys: {list(data.keys())}") return data except (json.JSONDecodeError, EOFError) as e: log(f"[stdin] ERROR: Failed to parse input: {e}") return {} def read_transcript(transcript_path: str) -> list[dict]: """Read JSONL transcript file into a list of normalized message dicts. Claude Code transcripts use a wrapper format where each line is: {"type": "user"|"assistant"|..., "message": {"role": ..., "content": ...}, ...} This function unwraps them into the inner {"role": ..., "content": ...} dicts that the rest of the code expects. Non-message entries (like file-history-snapshot) are filtered out. """ messages = [] path = Path(transcript_path) if not path.exists(): log(f"[transcript] ERROR: File does not exist: {transcript_path}") return messages file_size = path.stat().st_size log(f"[transcript] Reading {transcript_path} ({file_size} bytes)") parse_errors = 0 skipped_types = {} line_num = 0 with open(path) as f: for line_num, line in enumerate(f, 1): line = line.strip() if not line: continue try: raw = json.loads(line) except json.JSONDecodeError: parse_errors += 1 continue # Claude Code transcript format: wrapper with "type" and "message" keys # Unwrap to get the inner message dict with "role" and "content" if "message" in raw and isinstance(raw["message"], dict): inner = raw["message"] # Carry over the wrapper type for logging wrapper_type = raw.get("type", "unknown") if "role" not in inner: inner["role"] = wrapper_type messages.append(inner) elif "role" in raw: # Already in the expected format (future-proofing) messages.append(raw) else: # Non-message entry (file-history-snapshot, etc.) entry_type = raw.get("type", "unknown") skipped_types[entry_type] = skipped_types.get(entry_type, 0) + 1 if parse_errors: log(f"[transcript] WARNING: {parse_errors} lines failed to parse") if skipped_types: log(f"[transcript] Skipped non-message entries: {skipped_types}") log(f"[transcript] Loaded {len(messages)} messages from {line_num} lines") # Log role breakdown role_counts = {} for msg in messages: role = msg.get("role", "unknown") role_counts[role] = role_counts.get(role, 0) + 1 log(f"[transcript] Role breakdown: {role_counts}") return messages def _is_memory_tool_use(block: dict) -> str | None: """Check if a tool_use block is a memory operation. Detects both CLI calls (Bash with 'claude-memory') and MCP tool calls (mcp__cognitive-memory__memory_*). Returns a short description of the match or None. """ name = block.get("name", "") # MCP tool calls: mcp__cognitive-memory__memory_store, memory_recall, etc. if name.startswith("mcp__cognitive-memory__memory_"): return f"MCP:{name}" # Legacy/CLI: Bash commands containing 'claude-memory' if name == "Bash": cmd = block.get("input", {}).get("command", "") if "claude-memory" in cmd: return f"CLI:{cmd[:100]}" return None def find_last_memory_command_index(messages: list[dict]) -> int: """Find the index of the last message containing a memory operation. Scans for both MCP tool calls (mcp__cognitive-memory__memory_*) and Bash tool_use blocks where the command contains 'claude-memory'. Returns the index of that message so we can slice the transcript to only process messages after the last memory operation, avoiding duplicate storage. Returns -1 if no memory operations were found. """ last_index = -1 found_commands = [] for i, msg in enumerate(messages): if msg.get("role") != "assistant": continue content = msg.get("content", []) if not isinstance(content, list): continue for block in content: if not isinstance(block, dict): continue if block.get("type") != "tool_use": continue match = _is_memory_tool_use(block) if match: last_index = i found_commands.append(f"msg[{i}]: {match}") if found_commands: log(f"[cutoff] Found {len(found_commands)} memory operations:") for fc in found_commands: log(f"[cutoff] {fc}") log(f"[cutoff] Will slice after message index {last_index}") else: log("[cutoff] No memory operations found — processing full transcript") return last_index def extract_text_content(message: dict) -> str: """Extract plain text from a message's content blocks.""" content = message.get("content", "") if isinstance(content, str): return content if isinstance(content, list): parts = [] for block in content: if isinstance(block, dict): if block.get("type") == "text": parts.append(block.get("text", "")) elif block.get("type") == "tool_result": # Recurse into tool result content sub = block.get("content", "") if isinstance(sub, str): parts.append(sub) elif isinstance(sub, list): for sb in sub: if isinstance(sb, dict) and sb.get("type") == "text": parts.append(sb.get("text", "")) elif isinstance(block, str): parts.append(block) return "\n".join(parts) return "" def extract_tool_uses(messages: list[dict]) -> list[dict]: """Extract all tool_use blocks from assistant messages.""" tool_uses = [] for msg in messages: if msg.get("role") != "assistant": continue content = msg.get("content", []) if not isinstance(content, list): continue for block in content: if isinstance(block, dict) and block.get("type") == "tool_use": tool_uses.append(block) # Log tool use breakdown tool_counts = {} for tu in tool_uses: name = tu.get("name", "unknown") tool_counts[name] = tool_counts.get(name, 0) + 1 log(f"[tools] Extracted {len(tool_uses)} tool uses: {tool_counts}") return tool_uses def find_git_commits(tool_uses: list[dict]) -> list[str]: """Find git commit commands from Bash tool uses.""" commits = [] for tu in tool_uses: if tu.get("name") != "Bash": continue cmd = tu.get("input", {}).get("command", "") if "git commit" in cmd: commits.append(cmd) log(f"[commits] Found {len(commits)} git commit commands") return commits def find_files_edited(tool_uses: list[dict]) -> set[str]: """Find unique files edited via Edit/Write tools.""" files = set() for tu in tool_uses: name = tu.get("name", "") if name in ("Edit", "Write", "MultiEdit"): fp = tu.get("input", {}).get("file_path", "") if fp: files.add(fp) log(f"[files] Found {len(files)} edited files:") for f in sorted(files): log(f"[files] {f}") return files def find_errors_encountered(messages: list[dict]) -> list[str]: """Find error messages from tool results.""" errors = [] for msg in messages: if msg.get("role") != "user": continue content = msg.get("content", []) if not isinstance(content, list): continue for block in content: if not isinstance(block, dict): continue if block.get("type") == "tool_result" and block.get("is_error"): error_text = extract_text_content({"content": block.get("content", "")}) if error_text and len(error_text) > 10: errors.append(error_text[:500]) log(f"[errors] Found {len(errors)} error tool results") return errors def detect_project(cwd: str, files_edited: set[str]) -> str: """Detect project name from cwd and edited files.""" all_paths = [cwd] + list(files_edited) project_indicators = { "major-domo": "major-domo", "paper-dynasty": "paper-dynasty", "claude-home": "homelab", "homelab": "homelab", ".claude": "claude-config", "openclaw": "openclaw", "tdarr": "tdarr", } for path in all_paths: for indicator, project in project_indicators.items(): if indicator in path.lower(): log( f"[project] Detected '{project}' from path containing '{indicator}': {path}" ) return project # Fall back to last directory component of cwd fallback = Path(cwd).name log(f"[project] No indicator matched, falling back to cwd name: {fallback}") return fallback def build_session_summary(messages: list[dict], cwd: str) -> dict | None: """Analyze the transcript and build a summary of storable events.""" log(f"[summary] Building summary from {len(messages)} messages, cwd={cwd}") if len(messages) < 4: log(f"[summary] SKIP: only {len(messages)} messages, need at least 4") return "too_short" tool_uses = extract_tool_uses(messages) commits = find_git_commits(tool_uses) files_edited = find_files_edited(tool_uses) errors = find_errors_encountered(messages) project = detect_project(cwd, files_edited) # Collect assistant text for topic extraction assistant_texts = [] for msg in messages: if msg.get("role") == "assistant": text = extract_text_content(msg) if text: assistant_texts.append(text) full_assistant_text = "\n".join(assistant_texts) log( f"[summary] Assistant text: {len(full_assistant_text)} chars from {len(assistant_texts)} messages" ) # Detect what kind of work was done work_types = set() keyword_checks = { "commit": lambda: bool(commits), "debugging": lambda: bool(errors), "testing": lambda: any("test" in f.lower() for f in files_edited), "fix": lambda: any( kw in full_assistant_text.lower() for kw in ["bug", "fix", "error", "issue"] ), "refactoring": lambda: any( kw in full_assistant_text.lower() for kw in ["refactor", "restructure", "reorganize"] ), "feature": lambda: any( kw in full_assistant_text.lower() for kw in ["new feature", "implement", "add support"] ), "deployment": lambda: any( kw in full_assistant_text.lower() for kw in ["deploy", "production", "release"] ), "configuration": lambda: any( kw in full_assistant_text.lower() for kw in ["config", "setup", "install", "configure"] ), "automation": lambda: any( kw in full_assistant_text.lower() for kw in ["hook", "script", "automat"] ), "tooling": lambda: any( kw in full_assistant_text.lower() for kw in [ "skill", "command", "slash command", "commit-push", "claude code command", ] ), "creation": lambda: any( kw in full_assistant_text.lower() for kw in ["create a ", "created", "new file", "wrote a"] ), } for work_type, check_fn in keyword_checks.items(): matched = check_fn() if matched: work_types.add(work_type) log(f"[work_type] MATCH: {work_type}") else: log(f"[work_type] no match: {work_type}") if not work_types and not files_edited: log("[summary] SKIP: no work types detected and no files edited") # Log a snippet of assistant text to help debug missed keywords snippet = full_assistant_text[:500].replace("\n", " ") log(f"[summary] Assistant text preview: {snippet}") return "no_work" log( f"[summary] Result: project={project}, work_types={sorted(work_types)}, " f"commits={len(commits)}, files={len(files_edited)}, errors={len(errors)}" ) return { "project": project, "work_types": work_types, "commits": commits, "files_edited": sorted(files_edited), "errors": errors[:5], # Cap at 5 "assistant_text_snippet": full_assistant_text[:3000], "message_count": len(messages), "tool_use_count": len(tool_uses), } def build_memory_content(summary: dict) -> str: """Build a concise memory content string from the summary.""" parts = [] if summary["commits"]: parts.append(f"Commits made: {len(summary['commits'])}") for c in summary["commits"][:3]: msg = extract_commit_message(c) if msg: parts.append(f" - {msg}") if summary["files_edited"]: parts.append(f"Files edited ({len(summary['files_edited'])}):") for f in summary["files_edited"][:10]: parts.append(f" - {f}") if summary["errors"]: parts.append(f"Errors encountered ({len(summary['errors'])}):") for e in summary["errors"][:3]: parts.append(f" - {e[:150]}") work_desc = ", ".join(sorted(summary["work_types"])) parts.append(f"Work types: {work_desc}") parts.append( f"Session size: {summary['message_count']} messages, {summary['tool_use_count']} tool calls" ) return "\n".join(parts) def determine_memory_type(summary: dict) -> str: """Pick the best memory type based on work done.""" wt = summary["work_types"] if "fix" in wt or "debugging" in wt: return "fix" if "configuration" in wt: return "configuration" if "feature" in wt: return "workflow" if "refactoring" in wt: return "code_pattern" if "deployment" in wt: return "workflow" if "automation" in wt or "tooling" in wt: return "workflow" if "creation" in wt: return "workflow" return "general" def extract_commit_message(commit_cmd: str) -> str | None: """Extract the commit message from a git commit command string. Handles both simple quoted (-m "msg") and heredoc (-m "$(cat <<'EOF'...EOF)") formats. Tries heredoc first since that's the standard Claude Code format. """ # Try heredoc format first (standard Claude Code format) match = re.search(r"<<'?EOF'?\n(.+?)(?:\nEOF|\n\s*EOF)", commit_cmd, re.DOTALL) if match: # Get first non-empty line as the message for line in match.group(1).strip().split("\n"): line = line.strip() if line and not line.startswith("Co-Authored-By:"): return line[:200] # Fall back to simple quoted message (matching same quote type) match = re.search(r'-m\s+"([^"]+)"', commit_cmd) if not match: match = re.search(r"-m\s+'([^']+)'", commit_cmd) if match: return match.group(1).split("\n")[0][:200] return None def build_title(summary: dict) -> str: """Generate a descriptive title for the memory.""" project = summary["project"] work = ", ".join(sorted(summary["work_types"])) if summary["commits"]: msg = extract_commit_message(summary["commits"][0]) if msg: return f"[{project}] {msg}" return f"[{project}] Session: {work}" def store_memory(summary: dict, graph: str | None = None): """Store the session memory via claude-memory CLI. Args: summary: Session summary dict from build_session_summary(). graph: Named memory graph to store into, or None for the default graph. """ title = build_title(summary) content = build_memory_content(summary) mem_type = determine_memory_type(summary) importance = "0.4" # Boost importance for commits or significant work if summary["commits"]: importance = "0.6" if len(summary["files_edited"]) > 5: importance = "0.6" if "deployment" in summary["work_types"]: importance = "0.7" # Build tags tags = [summary["project"]] tags.extend(sorted(summary["work_types"])) tags.append("session-log") tag_str = ",".join(tags) # Base command: optionally target a named graph cmd = ["claude-memory"] if graph: cmd += ["--graph", graph] cmd += [ "store", "--type", mem_type, "--title", title, "--content", content, "--tags", tag_str, "--importance", importance, "--episode", ] log(f"[store] Memory type: {mem_type}, importance: {importance}") log(f"[store] Title: {title}") log(f"[store] Tags: {tag_str}") log(f"[store] Content length: {len(content)} chars") log(f"[store] Command: {' '.join(cmd)}") try: result = subprocess.run(cmd, capture_output=True, text=True, timeout=10) if result.returncode == 0: log(f"[store] SUCCESS: {title}") if result.stdout.strip(): log(f"[store] stdout: {result.stdout.strip()[:200]}") else: log(f"[store] FAILED (rc={result.returncode}): {result.stderr.strip()}") if result.stdout.strip(): log(f"[store] stdout: {result.stdout.strip()[:200]}") except subprocess.TimeoutExpired: log("[store] FAILED: claude-memory timed out after 10s") except FileNotFoundError: log("[store] FAILED: claude-memory command not found in PATH") except Exception as e: log(f"[store] FAILED: {type(e).__name__}: {e}") def main(): log_separator() parser = argparse.ArgumentParser( description="Session-end memory hook: store session events as cognitive memories.", add_help=False, # keep --help available but don't conflict with hook stdin ) parser.add_argument( "--graph", default=None, metavar="NAME", help=( "Named memory graph to store memories into (default: the default graph). " "Use 'claude-memory graphs' to list available graphs." ), ) parser.add_argument( "--help", "-h", action="help", help="Show this help message and exit." ) args, _ = parser.parse_known_args() hook_input = read_stdin() transcript_path = hook_input.get("transcript_path", "") cwd = hook_input.get("cwd", "") log(f"[main] cwd: {cwd}") log(f"[main] transcript_path: {transcript_path}") if not transcript_path: log("[main] ABORT: no transcript path provided") sys.exit(0) messages = read_transcript(transcript_path) if not messages: log("[main] ABORT: empty transcript") sys.exit(0) total_messages = len(messages) # Only process messages after the last claude-memory command to avoid # duplicating memories that were already stored during the session. cutoff = find_last_memory_command_index(messages) if cutoff >= 0: messages = messages[cutoff + 1 :] log(f"[main] After cutoff: {len(messages)} of {total_messages} messages remain") if not messages: log("[main] ABORT: no new messages after last claude-memory command") sys.exit(0) else: log(f"[main] Processing all {total_messages} messages (no cutoff)") summary = build_session_summary(messages, cwd) if not isinstance(summary, dict): log(f"[main] ABORT: build_session_summary returned '{summary}'") sys.exit(0) store_memory(summary, graph=args.graph) log("[main] Done") if __name__ == "__main__": main()