From 11a046ffc3f1fe1be847ef917f9d9413a72a0e9e Mon Sep 17 00:00:00 2001 From: Cal Corum Date: Sun, 1 Mar 2026 16:15:28 -0600 Subject: [PATCH] feat: add migrate-memories.py for default-to-named-graph migration Standalone script that moves memories between graphs based on tag matching. Handles memory files, edges, index/embedding/state metadata, cross-graph edge cleanup, and overlap detection (copy to multiple graphs). Supports dry-run, idempotent re-runs, and git auto-commit. Closes: #4, #5, #6 Co-Authored-By: Claude Opus 4.6 --- scripts/migrate-memories.py | 1036 +++++++++++++++++++++++++++++++++++ 1 file changed, 1036 insertions(+) create mode 100755 scripts/migrate-memories.py diff --git a/scripts/migrate-memories.py b/scripts/migrate-memories.py new file mode 100755 index 0000000..3cb22ae --- /dev/null +++ b/scripts/migrate-memories.py @@ -0,0 +1,1036 @@ +#!/usr/bin/env python3 +"""Migrate memories from the default cognitive-memory graph to a named graph. + +Moves memory .md files, edges, index entries, embeddings, and state data +from the default graph to a target named graph based on tag matching. + +Memories tagged for multiple projects are copied (not moved) to the target +graph; they remain in the default graph until explicitly cleaned up with +--cleanup-overlaps after all migrations are complete. + +Usage: + # Dry-run to preview what would be migrated + python3 migrate-memories.py --tags "major-domo,sba,discord-bot" \ + --target-graph major-domo --dry-run + + # Execute the migration + python3 migrate-memories.py --tags "major-domo,sba,discord-bot" \ + --target-graph major-domo + + # After all migrations, clean up overlap memories from default graph + python3 migrate-memories.py --cleanup-overlaps + +Gitea issues: cal/cognitive-memory #4, #5, #6 +""" + +import argparse +import json +import os +import re +import shutil +import subprocess +import sys +import tempfile +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, List, Optional, Set, Tuple + +# --------------------------------------------------------------------------- +# Path resolution (standalone, mirrors common.py without importing it) +# --------------------------------------------------------------------------- + +_env_dir = os.environ.get("COGNITIVE_MEMORY_DIR", "") +if _env_dir: + MEMORY_DIR = Path(_env_dir).expanduser() +else: + _xdg_data = os.environ.get("XDG_DATA_HOME", "") or str( + Path.home() / ".local" / "share" + ) + MEMORY_DIR = Path(_xdg_data) / "cognitive-memory" + +GRAPHS_BASE_DIR = MEMORY_DIR.parent / "cognitive-memory-graphs" +CONFIG_PATH = MEMORY_DIR / "_config.json" +OVERLAP_TRACKING_PATH = MEMORY_DIR / "_migration_overlaps.json" + +# Frontmatter field order (from common.py) +FIELD_ORDER = [ + "id", + "type", + "title", + "tags", + "importance", + "confidence", + "steps", + "preconditions", + "postconditions", + "created", + "updated", + "relations", +] + + +def resolve_graph_path(graph_name: Optional[str]) -> Path: + """Resolve graph name to directory path. None/'default' -> MEMORY_DIR.""" + if not graph_name or graph_name == "default": + return MEMORY_DIR + if CONFIG_PATH.exists(): + try: + cfg = json.loads(CONFIG_PATH.read_text()) + graphs = cfg.get("graphs", {}) + if graph_name in graphs: + p = graphs[graph_name].get("path", "") + if p: + return Path(p).expanduser() + except (json.JSONDecodeError, OSError): + pass + return GRAPHS_BASE_DIR / graph_name + + +# --------------------------------------------------------------------------- +# Frontmatter parsing (inlined from common.py) +# --------------------------------------------------------------------------- + + +def _needs_quoting(s: str) -> bool: + if not s: + return True + if any(c in s for c in ":#{}[]&*?|>!%@`"): + return True + try: + float(s) + return True + except ValueError: + pass + if s.lower() in ("true", "false", "null", "yes", "no", "on", "off"): + return True + return False + + +def _quote_yaml(s: str) -> str: + escaped = s.replace("\\", "\\\\").replace('"', '\\"') + return f'"{escaped}"' + + +def _format_yaml_value(value: Any, force_quote: bool = False) -> str: + if value is None: + return "null" + if isinstance(value, bool): + return "true" if value else "false" + if isinstance(value, (int, float)): + return str(value) + s = str(value) + if force_quote or _needs_quoting(s): + return _quote_yaml(s) + return s + + +def _parse_scalar(value: str) -> Any: + v = value.strip() + if not v or v == "null": + return None + if v == "true": + return True + if v == "false": + return False + try: + if "." in v: + return float(v) + return int(v) + except ValueError: + pass + if (v.startswith('"') and v.endswith('"')) or ( + v.startswith("'") and v.endswith("'") + ): + return v[1:-1] + return v + + +def _parse_relations_block(lines: List[str]) -> List[Dict[str, Any]]: + relations = [] + current = None + for line in lines: + stripped = line.strip() + if not stripped: + continue + if stripped.startswith("- "): + current = {} + relations.append(current) + rest = stripped[2:] + if ":" in rest: + k, _, v = rest.partition(":") + current[k.strip()] = _parse_scalar(v.strip()) + elif current is not None and ":" in stripped: + k, _, v = stripped.partition(":") + current[k.strip()] = _parse_scalar(v.strip()) + return relations + + +def parse_frontmatter(text: str) -> Tuple[Dict[str, Any], str]: + if not text.startswith("---\n"): + return {}, text + end_match = re.search(r"\n---\s*\n", text[3:]) + if not end_match: + if text.rstrip().endswith("---"): + end_pos = text.rstrip().rfind("\n---") + if end_pos <= 3: + return {}, text + fm_text = text[4:end_pos] + body = "" + else: + return {}, text + else: + end_pos = end_match.start() + 3 + fm_text = text[4:end_pos] + body = text[end_pos + end_match.end() - end_match.start() :] + body = body.lstrip("\n") + data = {} + lines = fm_text.split("\n") + i = 0 + while i < len(lines): + line = lines[i] + if not line.strip(): + i += 1 + continue + if line[0] == " ": + i += 1 + continue + if ":" not in line: + i += 1 + continue + key, _, rest = line.partition(":") + key = key.strip() + rest = rest.strip() + if not rest: + block_lines = [] + j = i + 1 + while j < len(lines) and lines[j] and lines[j][0] == " ": + block_lines.append(lines[j]) + j += 1 + if key == "relations": + data["relations"] = _parse_relations_block(block_lines) + elif block_lines and block_lines[0].strip().startswith("- "): + data[key] = [ + _parse_scalar(bl.strip().lstrip("- ")) + for bl in block_lines + if bl.strip().startswith("- ") + ] + else: + data[key] = None + i = j + continue + if rest.startswith("[") and rest.endswith("]"): + inner = rest[1:-1] + if inner.strip(): + data[key] = [ + _parse_scalar(v.strip()) for v in inner.split(",") if v.strip() + ] + else: + data[key] = [] + else: + data[key] = _parse_scalar(rest) + i += 1 + return data, body + + +def serialize_frontmatter(data: Dict[str, Any]) -> str: + lines = ["---"] + for key in FIELD_ORDER: + if key not in data: + continue + value = data[key] + if key == "tags" and isinstance(value, list): + if value: + items = ", ".join(_format_yaml_value(t) for t in value) + lines.append(f"tags: [{items}]") + else: + lines.append("tags: []") + elif key in ("steps", "preconditions", "postconditions") and isinstance( + value, list + ): + if not value: + continue + lines.append(f"{key}:") + for item in value: + lines.append(f" - {_format_yaml_value(str(item), force_quote=True)}") + elif key == "relations" and isinstance(value, list): + if not value: + continue + lines.append("relations:") + for rel in value: + first = True + for rk in [ + "target", + "type", + "direction", + "strength", + "context", + "edge_id", + ]: + if rk not in rel: + continue + rv = rel[rk] + prefix = " - " if first else " " + force_q = rk in ("context",) + lines.append( + f"{prefix}{rk}: {_format_yaml_value(rv, force_quote=force_q)}" + ) + first = False + elif key == "title": + lines.append(f"title: {_format_yaml_value(value, force_quote=True)}") + else: + lines.append(f"{key}: {_format_yaml_value(value)}") + lines.append("---") + return "\n".join(lines) + + +# --------------------------------------------------------------------------- +# JSON file I/O with atomic writes +# --------------------------------------------------------------------------- + + +def load_json(path: Path) -> Dict[str, Any]: + if not path.exists(): + return {} + return json.loads(path.read_text(encoding="utf-8")) + + +def save_json(path: Path, data: Dict[str, Any]) -> None: + """Atomic write: write to temp file then os.replace().""" + fd, tmp = tempfile.mkstemp(dir=str(path.parent), suffix=".tmp") + try: + with os.fdopen(fd, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2, ensure_ascii=False) + f.write("\n") + os.replace(tmp, str(path)) + except Exception: + try: + os.unlink(tmp) + except OSError: + pass + raise + + +# --------------------------------------------------------------------------- +# All known migration tag groups (for overlap detection) +# --------------------------------------------------------------------------- + +ALL_TAG_GROUPS = { + "major-domo": {"major-domo", "sba", "discord-bot"}, + "paper-dynasty": {"paper-dynasty", "card-game", "gauntlet"}, + "strat-gameplay": {"strat-gameplay", "sba-scout", "tui", "ratatui"}, +} + + +def detect_overlap(mem_tags: Set[str], current_group_tags: Set[str]) -> bool: + """Check if a memory's tags match any OTHER migration group besides the current one.""" + for _, group_tags in ALL_TAG_GROUPS.items(): + if group_tags == current_group_tags: + continue + if mem_tags & group_tags: + return True + return False + + +# --------------------------------------------------------------------------- +# Core migration logic +# --------------------------------------------------------------------------- + + +def select_candidates( + index_entries: Dict[str, Any], + filter_tags: Set[str], + already_in_target: Set[str], +) -> Tuple[List[str], List[str], List[str]]: + """Select memories whose tags intersect filter_tags. + + Returns (move_ids, copy_ids, skipped_ids): + - move_ids: memories that only match this group (will be moved) + - copy_ids: memories that match multiple groups (will be copied) + - skipped_ids: already in target (idempotent skip) + """ + move_ids = [] + copy_ids = [] + skipped_ids = [] + + for mem_id, entry in index_entries.items(): + mem_tags = set(entry.get("tags", [])) + if not (mem_tags & filter_tags): + continue + if mem_id in already_in_target: + skipped_ids.append(mem_id) + continue + if detect_overlap(mem_tags, filter_tags): + copy_ids.append(mem_id) + else: + move_ids.append(mem_id) + + return move_ids, copy_ids, skipped_ids + + +def classify_edges( + source_edges: Dict[str, Any], + candidate_ids: Set[str], +) -> Tuple[Dict[str, Any], Dict[str, Any]]: + """Classify edges relative to a set of candidate memory IDs. + + Returns (migrate_edges, orphan_edges): + - migrate_edges: both endpoints in candidate set + - orphan_edges: only one endpoint in candidate set (cross-graph) + """ + migrate = {} + orphan = {} + + for edge_id, edge_data in source_edges.items(): + from_id = edge_data.get("from_id", "") + to_id = edge_data.get("to_id", "") + from_in = from_id in candidate_ids + to_in = to_id in candidate_ids + + if from_in and to_in: + migrate[edge_id] = edge_data + elif from_in or to_in: + orphan[edge_id] = edge_data + + return migrate, orphan + + +def strip_edge_from_frontmatter(mem_path: Path, edge_ids: Set[str]) -> bool: + """Remove relation entries matching edge_ids from a memory file's frontmatter. + + Returns True if the file was modified. + """ + if not mem_path.exists(): + return False + + text = mem_path.read_text(encoding="utf-8") + fm, body = parse_frontmatter(text) + original_rels = fm.get("relations", []) + filtered_rels = [r for r in original_rels if r.get("edge_id") not in edge_ids] + + if len(filtered_rels) == len(original_rels): + return False + + fm["relations"] = filtered_rels + fm["updated"] = datetime.now(timezone.utc).isoformat() + new_fm = serialize_frontmatter(fm) + content = f"{new_fm}\n\n{body}\n" if body else f"{new_fm}\n" + mem_path.write_text(content, encoding="utf-8") + return True + + +def resolve_memory_path(graph_dir: Path, index_entry: Dict[str, Any]) -> Optional[Path]: + """Resolve the filesystem path for a memory from its index entry.""" + rel_path = index_entry.get("path", "") + if rel_path: + return graph_dir / rel_path + return None + + +def resolve_edge_path(graph_dir: Path, edge_entry: Dict[str, Any]) -> Optional[Path]: + """Resolve the filesystem path for an edge from its index entry.""" + rel_path = edge_entry.get("path", "") + if rel_path: + return graph_dir / rel_path + return None + + +def execute_migration( + source_dir: Path, + target_dir: Path, + move_ids: List[str], + copy_ids: List[str], + migrate_edges: Dict[str, Any], + orphan_edges: Dict[str, Any], + source_index: Dict[str, Any], + source_embeddings: Dict[str, Any], + source_state: Dict[str, Any], + target_index: Dict[str, Any], + target_embeddings: Dict[str, Any], + target_state: Dict[str, Any], + target_graph: str, + dry_run: bool = False, +) -> Dict[str, Any]: + """Execute the migration. Returns a report dict.""" + report = { + "timestamp": datetime.now(timezone.utc).isoformat(), + "target_graph": target_graph, + "dry_run": dry_run, + "memories_moved": 0, + "memories_copied": 0, + "edges_migrated": 0, + "edges_orphaned": 0, + "frontmatters_cleaned": 0, + "errors": [], + "warnings": [], + "moved_ids": [], + "copied_ids": [], + } + + if dry_run: + report["memories_moved"] = len(move_ids) + report["memories_copied"] = len(copy_ids) + report["edges_migrated"] = len(migrate_edges) + report["edges_orphaned"] = len(orphan_edges) + return report + + src_entries = source_index.get("entries", {}) + src_edges = source_index.get("edges", {}) + src_embed = source_embeddings.get("entries", {}) + src_state_entries = source_state.get("entries", {}) + + tgt_entries = target_index.setdefault("entries", {}) + tgt_edges = target_index.setdefault("edges", {}) + tgt_embed = target_embeddings.setdefault("entries", {}) + tgt_state_entries = target_state.setdefault("entries", {}) + + # --- Move memory files --- + for mem_id in move_ids: + entry = src_entries.get(mem_id) + if not entry: + report["errors"].append(f"Memory {mem_id} not found in source index") + continue + + src_path = resolve_memory_path(source_dir, entry) + if not src_path or not src_path.exists(): + report["errors"].append( + f"Memory file missing: {mem_id} at {entry.get('path', '?')}" + ) + continue + + dst_path = target_dir / entry["path"] + dst_path.parent.mkdir(parents=True, exist_ok=True) + + try: + shutil.move(str(src_path), str(dst_path)) + except Exception as e: + report["errors"].append(f"Failed to move {mem_id}: {e}") + continue + + # Transfer index entry + tgt_entries[mem_id] = entry + del src_entries[mem_id] + + # Transfer embedding + if mem_id in src_embed: + tgt_embed[mem_id] = src_embed.pop(mem_id) + + # Transfer state + if mem_id in src_state_entries: + tgt_state_entries[mem_id] = src_state_entries.pop(mem_id) + + report["memories_moved"] += 1 + report["moved_ids"].append(mem_id) + + # --- Copy memory files (overlaps) --- + overlap_tracking = [] + for mem_id in copy_ids: + entry = src_entries.get(mem_id) + if not entry: + report["errors"].append( + f"Overlap memory {mem_id} not found in source index" + ) + continue + + src_path = resolve_memory_path(source_dir, entry) + if not src_path or not src_path.exists(): + report["errors"].append(f"Overlap memory file missing: {mem_id}") + continue + + dst_path = target_dir / entry["path"] + dst_path.parent.mkdir(parents=True, exist_ok=True) + + try: + shutil.copy2(str(src_path), str(dst_path)) + except Exception as e: + report["errors"].append(f"Failed to copy {mem_id}: {e}") + continue + + # Copy (don't remove) index entry + tgt_entries[mem_id] = dict(entry) + + # Copy embedding + if mem_id in src_embed: + tgt_embed[mem_id] = src_embed[mem_id] # don't pop + + # Copy state + if mem_id in src_state_entries: + tgt_state_entries[mem_id] = dict(src_state_entries[mem_id]) + + report["memories_copied"] += 1 + report["copied_ids"].append(mem_id) + overlap_tracking.append( + { + "id": mem_id, + "title": entry.get("title", ""), + "tags": entry.get("tags", []), + "copied_to": target_graph, + } + ) + + # --- Migrate edges (both endpoints in candidate set) --- + for edge_id, edge_data in migrate_edges.items(): + src_edge_path = resolve_edge_path(source_dir, edge_data) + if not src_edge_path or not src_edge_path.exists(): + report["warnings"].append(f"Edge file missing: {edge_id}") + # Still transfer index entry if file is gone + tgt_edges[edge_id] = edge_data + if edge_id in src_edges: + del src_edges[edge_id] + report["edges_migrated"] += 1 + continue + + dst_edge_path = target_dir / edge_data["path"] + dst_edge_path.parent.mkdir(parents=True, exist_ok=True) + + try: + shutil.move(str(src_edge_path), str(dst_edge_path)) + except Exception as e: + report["errors"].append(f"Failed to move edge {edge_id}: {e}") + continue + + tgt_edges[edge_id] = edge_data + if edge_id in src_edges: + del src_edges[edge_id] + report["edges_migrated"] += 1 + + # --- Clean up orphan edges (cross-graph) --- + for edge_id, edge_data in orphan_edges.items(): + # Strip relations from both endpoint memories + for mid_key in ("from_id", "to_id"): + mid = edge_data.get(mid_key, "") + if not mid: + continue + # Check source graph + if mid in src_entries: + mem_path = resolve_memory_path(source_dir, src_entries[mid]) + if mem_path and strip_edge_from_frontmatter(mem_path, {edge_id}): + report["frontmatters_cleaned"] += 1 + # Check if it was already moved to target + if mid in tgt_entries: + mem_path = resolve_memory_path(target_dir, tgt_entries[mid]) + if mem_path and strip_edge_from_frontmatter(mem_path, {edge_id}): + report["frontmatters_cleaned"] += 1 + + # Delete the orphan edge file + src_edge_path = resolve_edge_path(source_dir, edge_data) + if src_edge_path and src_edge_path.exists(): + src_edge_path.unlink() + + # Remove from source index + if edge_id in src_edges: + del src_edges[edge_id] + + report["edges_orphaned"] += 1 + + # --- Update counts and timestamps --- + now = datetime.now(timezone.utc).isoformat() + + source_index["count"] = len(src_entries) + source_index["updated"] = now + source_embeddings["updated"] = now + source_state["updated"] = now + + target_index["count"] = len(tgt_entries) + target_index["updated"] = now + target_embeddings["updated"] = now + target_state["updated"] = now + + # --- Write all JSON files atomically --- + save_json(source_dir / "_index.json", source_index) + save_json(source_dir / "_embeddings.json", source_embeddings) + save_json(source_dir / "_state.json", source_state) + save_json(target_dir / "_index.json", target_index) + save_json(target_dir / "_embeddings.json", target_embeddings) + save_json(target_dir / "_state.json", target_state) + + # --- Track overlaps for later cleanup --- + if overlap_tracking: + existing: Dict[str, Any] = {"overlaps": []} + if OVERLAP_TRACKING_PATH.exists(): + try: + existing = json.loads(OVERLAP_TRACKING_PATH.read_text()) + except (json.JSONDecodeError, OSError): + pass + existing.setdefault("overlaps", []).extend(overlap_tracking) + save_json(OVERLAP_TRACKING_PATH, existing) + + return report + + +def cleanup_overlaps(dry_run: bool = False, no_git: bool = False) -> Dict[str, Any]: + """Remove overlap memories from the default graph that have been copied to all target graphs.""" + report = { + "timestamp": datetime.now(timezone.utc).isoformat(), + "action": "cleanup_overlaps", + "dry_run": dry_run, + "removed": 0, + "kept": 0, + "details": [], + } + + if not OVERLAP_TRACKING_PATH.exists(): + print("No overlap tracking file found. Nothing to clean up.") + return report + + tracking_data = json.loads(OVERLAP_TRACKING_PATH.read_text()) + tracking = tracking_data.get("overlaps", []) + + # Group by memory ID to see which graphs each overlap was copied to + copies_by_id: Dict[str, List[str]] = {} + info_by_id: Dict[str, Dict] = {} + for entry in tracking: + mid = entry["id"] + copies_by_id.setdefault(mid, []).append(entry["copied_to"]) + info_by_id[mid] = entry + + source_index = load_json(MEMORY_DIR / "_index.json") + source_embeddings = load_json(MEMORY_DIR / "_embeddings.json") + source_state = load_json(MEMORY_DIR / "_state.json") + + src_entries = source_index.get("entries", {}) + src_embed = source_embeddings.get("entries", {}) + src_state_entries = source_state.get("entries", {}) + + for mid, copied_to_graphs in copies_by_id.items(): + # Verify the memory exists in all target graphs before removing from default + all_present = True + for graph_name in set(copied_to_graphs): + tgt_dir = resolve_graph_path(graph_name) + tgt_index = load_json(tgt_dir / "_index.json") + if mid not in tgt_index.get("entries", {}): + all_present = False + break + + info = info_by_id.get(mid, {}) + + if not all_present: + report["kept"] += 1 + report["details"].append( + { + "id": mid, + "title": info.get("title", ""), + "action": "kept", + "reason": "not yet in all target graphs", + } + ) + continue + + if mid not in src_entries: + report["details"].append( + { + "id": mid, + "title": info.get("title", ""), + "action": "skipped", + "reason": "already removed from default", + } + ) + continue + + if dry_run: + report["removed"] += 1 + report["details"].append( + { + "id": mid, + "title": info.get("title", ""), + "action": "would_remove", + } + ) + continue + + # Remove the file + entry = src_entries[mid] + mem_path = resolve_memory_path(MEMORY_DIR, entry) + if mem_path and mem_path.exists(): + mem_path.unlink() + + # Remove from indexes + del src_entries[mid] + src_embed.pop(mid, None) + src_state_entries.pop(mid, None) + + report["removed"] += 1 + report["details"].append( + { + "id": mid, + "title": info.get("title", ""), + "action": "removed", + } + ) + + if not dry_run and report["removed"] > 0: + now = datetime.now(timezone.utc).isoformat() + source_index["count"] = len(src_entries) + source_index["updated"] = now + source_embeddings["updated"] = now + source_state["updated"] = now + + save_json(MEMORY_DIR / "_index.json", source_index) + save_json(MEMORY_DIR / "_embeddings.json", source_embeddings) + save_json(MEMORY_DIR / "_state.json", source_state) + + # Clean up tracking file + OVERLAP_TRACKING_PATH.unlink(missing_ok=True) + + if not no_git: + git_commit( + MEMORY_DIR, + f"cleanup: removed {report['removed']} overlap memories from default graph", + ) + + return report + + +def git_commit(graph_dir: Path, message: str) -> bool: + """Stage all changes and commit in a graph directory.""" + try: + subprocess.run( + ["git", "add", "-A"], + cwd=str(graph_dir), + capture_output=True, + timeout=10, + ) + result = subprocess.run( + ["git", "commit", "-m", message], + cwd=str(graph_dir), + capture_output=True, + timeout=10, + ) + return result.returncode == 0 + except Exception: + return False + + +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- + + +def print_dry_run_report( + target_graph: str, + filter_tags: Set[str], + move_ids: List[str], + copy_ids: List[str], + skipped_ids: List[str], + migrate_edges: Dict[str, Any], + orphan_edges: Dict[str, Any], + source_index: Dict[str, Any], +) -> None: + entries = source_index.get("entries", {}) + + print(f"\n{'='*60}") + print(f" DRY RUN: Migration to '{target_graph}'") + print(f" Filter tags: {', '.join(sorted(filter_tags))}") + print(f"{'='*60}\n") + + print(f" Memories to MOVE: {len(move_ids)}") + print(f" Memories to COPY: {len(copy_ids)} (overlap with other groups)") + print(f" Already in target: {len(skipped_ids)} (idempotent skip)") + print(f" Edges to migrate: {len(migrate_edges)}") + print(f" Edges to orphan: {len(orphan_edges)} (cross-graph, will be cleaned)") + print() + + if move_ids: + print(" Memories to move:") + for mid in sorted(move_ids, key=lambda x: entries.get(x, {}).get("title", "")): + e = entries.get(mid, {}) + print( + f" - {e.get('title', mid[:8])} [{e.get('type', '?')}] ({', '.join(e.get('tags', []))})" + ) + print() + + if copy_ids: + print(" Memories to copy (overlap):") + for mid in sorted(copy_ids, key=lambda x: entries.get(x, {}).get("title", "")): + e = entries.get(mid, {}) + print( + f" - {e.get('title', mid[:8])} [{e.get('type', '?')}] ({', '.join(e.get('tags', []))})" + ) + print() + + if orphan_edges: + print(" Cross-graph edges to clean up:") + for edata in orphan_edges.values(): + print( + f" - {edata.get('from_title', '?')} --{edata.get('type', '?')}--> {edata.get('to_title', '?')}" + ) + print() + + total = len(move_ids) + len(copy_ids) + print(f" Total: {total} memories would be migrated to '{target_graph}'") + print(f" Run without --dry-run to execute.\n") + + +def main(): + parser = argparse.ArgumentParser( + description="Migrate memories from default graph to a named graph by tag.", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + %(prog)s --tags "major-domo,sba,discord-bot" --target-graph major-domo --dry-run + %(prog)s --tags "paper-dynasty,card-game,gauntlet" --target-graph paper-dynasty + %(prog)s --cleanup-overlaps + """, + ) + parser.add_argument("--tags", help="Comma-separated tags to match for migration") + parser.add_argument("--target-graph", help="Name of the destination graph") + parser.add_argument( + "--dry-run", action="store_true", help="Preview without making changes" + ) + parser.add_argument( + "--no-git", action="store_true", help="Skip git commits after migration" + ) + parser.add_argument( + "--cleanup-overlaps", + action="store_true", + help="Remove overlap memories from default graph after all migrations", + ) + + args = parser.parse_args() + + # Cleanup mode + if args.cleanup_overlaps: + report = cleanup_overlaps(dry_run=args.dry_run, no_git=args.no_git) + if args.dry_run: + print( + f"\nDRY RUN: Would remove {report['removed']} overlap memories from default graph" + ) + for d in report["details"]: + if d["action"] == "would_remove": + print(f" - {d['title']} ({d['id'][:8]})") + else: + print( + f"\nRemoved {report['removed']} overlap memories, kept {report['kept']}" + ) + return + + # Migration mode + if not args.tags or not args.target_graph: + parser.error( + "--tags and --target-graph are required (unless using --cleanup-overlaps)" + ) + + filter_tags = set(t.strip() for t in args.tags.split(",")) + target_graph = args.target_graph + + source_dir = MEMORY_DIR + target_dir = resolve_graph_path(target_graph) + + if not source_dir.exists(): + print(f"Error: Default graph not found at {source_dir}", file=sys.stderr) + sys.exit(1) + + if not target_dir.exists(): + print( + f"Error: Target graph '{target_graph}' not found at {target_dir}", + file=sys.stderr, + ) + print( + f"Create it first with: claude-memory --graph {target_graph} store ...", + file=sys.stderr, + ) + sys.exit(1) + + # Load data + source_index = load_json(source_dir / "_index.json") + source_embeddings = load_json(source_dir / "_embeddings.json") + source_state = load_json(source_dir / "_state.json") + + target_index = load_json(target_dir / "_index.json") + target_embeddings = load_json(target_dir / "_embeddings.json") + target_state = load_json(target_dir / "_state.json") + + already_in_target = set(target_index.get("entries", {}).keys()) + + # Select candidates + move_ids, copy_ids, skipped_ids = select_candidates( + source_index.get("entries", {}), filter_tags, already_in_target + ) + + if not move_ids and not copy_ids: + print(f"\nNo memories to migrate for tags: {', '.join(sorted(filter_tags))}") + if skipped_ids: + print(f" ({len(skipped_ids)} already in target graph)") + return + + # Classify edges + all_candidate_ids = set(move_ids) | set(copy_ids) + migrate_edges, orphan_edges = classify_edges( + source_index.get("edges", {}), all_candidate_ids + ) + + # Dry run + if args.dry_run: + print_dry_run_report( + target_graph, + filter_tags, + move_ids, + copy_ids, + skipped_ids, + migrate_edges, + orphan_edges, + source_index, + ) + return + + # Execute + print(f"\nMigrating to '{target_graph}'...") + report = execute_migration( + source_dir=source_dir, + target_dir=target_dir, + move_ids=move_ids, + copy_ids=copy_ids, + migrate_edges=migrate_edges, + orphan_edges=orphan_edges, + source_index=source_index, + source_embeddings=source_embeddings, + source_state=source_state, + target_index=target_index, + target_embeddings=target_embeddings, + target_state=target_state, + target_graph=target_graph, + ) + + # Git commits + if not args.no_git: + total = report["memories_moved"] + report["memories_copied"] + git_commit( + source_dir, + f"migrate: {report['memories_moved']} memories moved to {target_graph}", + ) + git_commit(target_dir, f"migrate: received {total} memories from default graph") + + # Print summary + print(f"\n Moved: {report['memories_moved']} memories") + print(f" Copied: {report['memories_copied']} memories (overlaps)") + print(f" Edges migrated: {report['edges_migrated']}") + print(f" Edges orphaned: {report['edges_orphaned']}") + print(f" Frontmatters cleaned: {report['frontmatters_cleaned']}") + + if report["errors"]: + print(f"\n Errors ({len(report['errors'])}):") + for err in report["errors"]: + print(f" - {err}") + + if report["warnings"]: + print(f"\n Warnings ({len(report['warnings'])}):") + for warn in report["warnings"]: + print(f" - {warn}") + + # Write log + log_dir = Path.home() / ".claude" / "tmp" + log_dir.mkdir(parents=True, exist_ok=True) + ts = datetime.now().strftime("%Y%m%d-%H%M%S") + log_path = log_dir / f"migration-{target_graph}-{ts}.json" + save_json(log_path, report) + print(f"\n Log: {log_path}") + + # Remind about post-migration steps + print(f"\n Next steps:") + print(f" claude-memory --graph {target_graph} core # regenerate CORE.md") + print( + f" claude-memory --graph default core # regenerate default CORE.md" + ) + + +if __name__ == "__main__": + main()