cognitive-memory/scripts/migrate-memories.py
Cal Corum 11a046ffc3 feat: add migrate-memories.py for default-to-named-graph migration
Standalone script that moves memories between graphs based on tag
matching. Handles memory files, edges, index/embedding/state metadata,
cross-graph edge cleanup, and overlap detection (copy to multiple
graphs). Supports dry-run, idempotent re-runs, and git auto-commit.

Closes: #4, #5, #6

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-01 16:15:28 -06:00

1037 lines
33 KiB
Python
Executable File

#!/usr/bin/env python3
"""Migrate memories from the default cognitive-memory graph to a named graph.
Moves memory .md files, edges, index entries, embeddings, and state data
from the default graph to a target named graph based on tag matching.
Memories tagged for multiple projects are copied (not moved) to the target
graph; they remain in the default graph until explicitly cleaned up with
--cleanup-overlaps after all migrations are complete.
Usage:
# Dry-run to preview what would be migrated
python3 migrate-memories.py --tags "major-domo,sba,discord-bot" \
--target-graph major-domo --dry-run
# Execute the migration
python3 migrate-memories.py --tags "major-domo,sba,discord-bot" \
--target-graph major-domo
# After all migrations, clean up overlap memories from default graph
python3 migrate-memories.py --cleanup-overlaps
Gitea issues: cal/cognitive-memory #4, #5, #6
"""
import argparse
import json
import os
import re
import shutil
import subprocess
import sys
import tempfile
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Tuple
# ---------------------------------------------------------------------------
# Path resolution (standalone, mirrors common.py without importing it)
# ---------------------------------------------------------------------------
_env_dir = os.environ.get("COGNITIVE_MEMORY_DIR", "")
if _env_dir:
MEMORY_DIR = Path(_env_dir).expanduser()
else:
_xdg_data = os.environ.get("XDG_DATA_HOME", "") or str(
Path.home() / ".local" / "share"
)
MEMORY_DIR = Path(_xdg_data) / "cognitive-memory"
GRAPHS_BASE_DIR = MEMORY_DIR.parent / "cognitive-memory-graphs"
CONFIG_PATH = MEMORY_DIR / "_config.json"
OVERLAP_TRACKING_PATH = MEMORY_DIR / "_migration_overlaps.json"
# Frontmatter field order (from common.py)
FIELD_ORDER = [
"id",
"type",
"title",
"tags",
"importance",
"confidence",
"steps",
"preconditions",
"postconditions",
"created",
"updated",
"relations",
]
def resolve_graph_path(graph_name: Optional[str]) -> Path:
"""Resolve graph name to directory path. None/'default' -> MEMORY_DIR."""
if not graph_name or graph_name == "default":
return MEMORY_DIR
if CONFIG_PATH.exists():
try:
cfg = json.loads(CONFIG_PATH.read_text())
graphs = cfg.get("graphs", {})
if graph_name in graphs:
p = graphs[graph_name].get("path", "")
if p:
return Path(p).expanduser()
except (json.JSONDecodeError, OSError):
pass
return GRAPHS_BASE_DIR / graph_name
# ---------------------------------------------------------------------------
# Frontmatter parsing (inlined from common.py)
# ---------------------------------------------------------------------------
def _needs_quoting(s: str) -> bool:
if not s:
return True
if any(c in s for c in ":#{}[]&*?|>!%@`"):
return True
try:
float(s)
return True
except ValueError:
pass
if s.lower() in ("true", "false", "null", "yes", "no", "on", "off"):
return True
return False
def _quote_yaml(s: str) -> str:
escaped = s.replace("\\", "\\\\").replace('"', '\\"')
return f'"{escaped}"'
def _format_yaml_value(value: Any, force_quote: bool = False) -> str:
if value is None:
return "null"
if isinstance(value, bool):
return "true" if value else "false"
if isinstance(value, (int, float)):
return str(value)
s = str(value)
if force_quote or _needs_quoting(s):
return _quote_yaml(s)
return s
def _parse_scalar(value: str) -> Any:
v = value.strip()
if not v or v == "null":
return None
if v == "true":
return True
if v == "false":
return False
try:
if "." in v:
return float(v)
return int(v)
except ValueError:
pass
if (v.startswith('"') and v.endswith('"')) or (
v.startswith("'") and v.endswith("'")
):
return v[1:-1]
return v
def _parse_relations_block(lines: List[str]) -> List[Dict[str, Any]]:
relations = []
current = None
for line in lines:
stripped = line.strip()
if not stripped:
continue
if stripped.startswith("- "):
current = {}
relations.append(current)
rest = stripped[2:]
if ":" in rest:
k, _, v = rest.partition(":")
current[k.strip()] = _parse_scalar(v.strip())
elif current is not None and ":" in stripped:
k, _, v = stripped.partition(":")
current[k.strip()] = _parse_scalar(v.strip())
return relations
def parse_frontmatter(text: str) -> Tuple[Dict[str, Any], str]:
if not text.startswith("---\n"):
return {}, text
end_match = re.search(r"\n---\s*\n", text[3:])
if not end_match:
if text.rstrip().endswith("---"):
end_pos = text.rstrip().rfind("\n---")
if end_pos <= 3:
return {}, text
fm_text = text[4:end_pos]
body = ""
else:
return {}, text
else:
end_pos = end_match.start() + 3
fm_text = text[4:end_pos]
body = text[end_pos + end_match.end() - end_match.start() :]
body = body.lstrip("\n")
data = {}
lines = fm_text.split("\n")
i = 0
while i < len(lines):
line = lines[i]
if not line.strip():
i += 1
continue
if line[0] == " ":
i += 1
continue
if ":" not in line:
i += 1
continue
key, _, rest = line.partition(":")
key = key.strip()
rest = rest.strip()
if not rest:
block_lines = []
j = i + 1
while j < len(lines) and lines[j] and lines[j][0] == " ":
block_lines.append(lines[j])
j += 1
if key == "relations":
data["relations"] = _parse_relations_block(block_lines)
elif block_lines and block_lines[0].strip().startswith("- "):
data[key] = [
_parse_scalar(bl.strip().lstrip("- "))
for bl in block_lines
if bl.strip().startswith("- ")
]
else:
data[key] = None
i = j
continue
if rest.startswith("[") and rest.endswith("]"):
inner = rest[1:-1]
if inner.strip():
data[key] = [
_parse_scalar(v.strip()) for v in inner.split(",") if v.strip()
]
else:
data[key] = []
else:
data[key] = _parse_scalar(rest)
i += 1
return data, body
def serialize_frontmatter(data: Dict[str, Any]) -> str:
lines = ["---"]
for key in FIELD_ORDER:
if key not in data:
continue
value = data[key]
if key == "tags" and isinstance(value, list):
if value:
items = ", ".join(_format_yaml_value(t) for t in value)
lines.append(f"tags: [{items}]")
else:
lines.append("tags: []")
elif key in ("steps", "preconditions", "postconditions") and isinstance(
value, list
):
if not value:
continue
lines.append(f"{key}:")
for item in value:
lines.append(f" - {_format_yaml_value(str(item), force_quote=True)}")
elif key == "relations" and isinstance(value, list):
if not value:
continue
lines.append("relations:")
for rel in value:
first = True
for rk in [
"target",
"type",
"direction",
"strength",
"context",
"edge_id",
]:
if rk not in rel:
continue
rv = rel[rk]
prefix = " - " if first else " "
force_q = rk in ("context",)
lines.append(
f"{prefix}{rk}: {_format_yaml_value(rv, force_quote=force_q)}"
)
first = False
elif key == "title":
lines.append(f"title: {_format_yaml_value(value, force_quote=True)}")
else:
lines.append(f"{key}: {_format_yaml_value(value)}")
lines.append("---")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# JSON file I/O with atomic writes
# ---------------------------------------------------------------------------
def load_json(path: Path) -> Dict[str, Any]:
if not path.exists():
return {}
return json.loads(path.read_text(encoding="utf-8"))
def save_json(path: Path, data: Dict[str, Any]) -> None:
"""Atomic write: write to temp file then os.replace()."""
fd, tmp = tempfile.mkstemp(dir=str(path.parent), suffix=".tmp")
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
f.write("\n")
os.replace(tmp, str(path))
except Exception:
try:
os.unlink(tmp)
except OSError:
pass
raise
# ---------------------------------------------------------------------------
# All known migration tag groups (for overlap detection)
# ---------------------------------------------------------------------------
ALL_TAG_GROUPS = {
"major-domo": {"major-domo", "sba", "discord-bot"},
"paper-dynasty": {"paper-dynasty", "card-game", "gauntlet"},
"strat-gameplay": {"strat-gameplay", "sba-scout", "tui", "ratatui"},
}
def detect_overlap(mem_tags: Set[str], current_group_tags: Set[str]) -> bool:
"""Check if a memory's tags match any OTHER migration group besides the current one."""
for _, group_tags in ALL_TAG_GROUPS.items():
if group_tags == current_group_tags:
continue
if mem_tags & group_tags:
return True
return False
# ---------------------------------------------------------------------------
# Core migration logic
# ---------------------------------------------------------------------------
def select_candidates(
index_entries: Dict[str, Any],
filter_tags: Set[str],
already_in_target: Set[str],
) -> Tuple[List[str], List[str], List[str]]:
"""Select memories whose tags intersect filter_tags.
Returns (move_ids, copy_ids, skipped_ids):
- move_ids: memories that only match this group (will be moved)
- copy_ids: memories that match multiple groups (will be copied)
- skipped_ids: already in target (idempotent skip)
"""
move_ids = []
copy_ids = []
skipped_ids = []
for mem_id, entry in index_entries.items():
mem_tags = set(entry.get("tags", []))
if not (mem_tags & filter_tags):
continue
if mem_id in already_in_target:
skipped_ids.append(mem_id)
continue
if detect_overlap(mem_tags, filter_tags):
copy_ids.append(mem_id)
else:
move_ids.append(mem_id)
return move_ids, copy_ids, skipped_ids
def classify_edges(
source_edges: Dict[str, Any],
candidate_ids: Set[str],
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
"""Classify edges relative to a set of candidate memory IDs.
Returns (migrate_edges, orphan_edges):
- migrate_edges: both endpoints in candidate set
- orphan_edges: only one endpoint in candidate set (cross-graph)
"""
migrate = {}
orphan = {}
for edge_id, edge_data in source_edges.items():
from_id = edge_data.get("from_id", "")
to_id = edge_data.get("to_id", "")
from_in = from_id in candidate_ids
to_in = to_id in candidate_ids
if from_in and to_in:
migrate[edge_id] = edge_data
elif from_in or to_in:
orphan[edge_id] = edge_data
return migrate, orphan
def strip_edge_from_frontmatter(mem_path: Path, edge_ids: Set[str]) -> bool:
"""Remove relation entries matching edge_ids from a memory file's frontmatter.
Returns True if the file was modified.
"""
if not mem_path.exists():
return False
text = mem_path.read_text(encoding="utf-8")
fm, body = parse_frontmatter(text)
original_rels = fm.get("relations", [])
filtered_rels = [r for r in original_rels if r.get("edge_id") not in edge_ids]
if len(filtered_rels) == len(original_rels):
return False
fm["relations"] = filtered_rels
fm["updated"] = datetime.now(timezone.utc).isoformat()
new_fm = serialize_frontmatter(fm)
content = f"{new_fm}\n\n{body}\n" if body else f"{new_fm}\n"
mem_path.write_text(content, encoding="utf-8")
return True
def resolve_memory_path(graph_dir: Path, index_entry: Dict[str, Any]) -> Optional[Path]:
"""Resolve the filesystem path for a memory from its index entry."""
rel_path = index_entry.get("path", "")
if rel_path:
return graph_dir / rel_path
return None
def resolve_edge_path(graph_dir: Path, edge_entry: Dict[str, Any]) -> Optional[Path]:
"""Resolve the filesystem path for an edge from its index entry."""
rel_path = edge_entry.get("path", "")
if rel_path:
return graph_dir / rel_path
return None
def execute_migration(
source_dir: Path,
target_dir: Path,
move_ids: List[str],
copy_ids: List[str],
migrate_edges: Dict[str, Any],
orphan_edges: Dict[str, Any],
source_index: Dict[str, Any],
source_embeddings: Dict[str, Any],
source_state: Dict[str, Any],
target_index: Dict[str, Any],
target_embeddings: Dict[str, Any],
target_state: Dict[str, Any],
target_graph: str,
dry_run: bool = False,
) -> Dict[str, Any]:
"""Execute the migration. Returns a report dict."""
report = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"target_graph": target_graph,
"dry_run": dry_run,
"memories_moved": 0,
"memories_copied": 0,
"edges_migrated": 0,
"edges_orphaned": 0,
"frontmatters_cleaned": 0,
"errors": [],
"warnings": [],
"moved_ids": [],
"copied_ids": [],
}
if dry_run:
report["memories_moved"] = len(move_ids)
report["memories_copied"] = len(copy_ids)
report["edges_migrated"] = len(migrate_edges)
report["edges_orphaned"] = len(orphan_edges)
return report
src_entries = source_index.get("entries", {})
src_edges = source_index.get("edges", {})
src_embed = source_embeddings.get("entries", {})
src_state_entries = source_state.get("entries", {})
tgt_entries = target_index.setdefault("entries", {})
tgt_edges = target_index.setdefault("edges", {})
tgt_embed = target_embeddings.setdefault("entries", {})
tgt_state_entries = target_state.setdefault("entries", {})
# --- Move memory files ---
for mem_id in move_ids:
entry = src_entries.get(mem_id)
if not entry:
report["errors"].append(f"Memory {mem_id} not found in source index")
continue
src_path = resolve_memory_path(source_dir, entry)
if not src_path or not src_path.exists():
report["errors"].append(
f"Memory file missing: {mem_id} at {entry.get('path', '?')}"
)
continue
dst_path = target_dir / entry["path"]
dst_path.parent.mkdir(parents=True, exist_ok=True)
try:
shutil.move(str(src_path), str(dst_path))
except Exception as e:
report["errors"].append(f"Failed to move {mem_id}: {e}")
continue
# Transfer index entry
tgt_entries[mem_id] = entry
del src_entries[mem_id]
# Transfer embedding
if mem_id in src_embed:
tgt_embed[mem_id] = src_embed.pop(mem_id)
# Transfer state
if mem_id in src_state_entries:
tgt_state_entries[mem_id] = src_state_entries.pop(mem_id)
report["memories_moved"] += 1
report["moved_ids"].append(mem_id)
# --- Copy memory files (overlaps) ---
overlap_tracking = []
for mem_id in copy_ids:
entry = src_entries.get(mem_id)
if not entry:
report["errors"].append(
f"Overlap memory {mem_id} not found in source index"
)
continue
src_path = resolve_memory_path(source_dir, entry)
if not src_path or not src_path.exists():
report["errors"].append(f"Overlap memory file missing: {mem_id}")
continue
dst_path = target_dir / entry["path"]
dst_path.parent.mkdir(parents=True, exist_ok=True)
try:
shutil.copy2(str(src_path), str(dst_path))
except Exception as e:
report["errors"].append(f"Failed to copy {mem_id}: {e}")
continue
# Copy (don't remove) index entry
tgt_entries[mem_id] = dict(entry)
# Copy embedding
if mem_id in src_embed:
tgt_embed[mem_id] = src_embed[mem_id] # don't pop
# Copy state
if mem_id in src_state_entries:
tgt_state_entries[mem_id] = dict(src_state_entries[mem_id])
report["memories_copied"] += 1
report["copied_ids"].append(mem_id)
overlap_tracking.append(
{
"id": mem_id,
"title": entry.get("title", ""),
"tags": entry.get("tags", []),
"copied_to": target_graph,
}
)
# --- Migrate edges (both endpoints in candidate set) ---
for edge_id, edge_data in migrate_edges.items():
src_edge_path = resolve_edge_path(source_dir, edge_data)
if not src_edge_path or not src_edge_path.exists():
report["warnings"].append(f"Edge file missing: {edge_id}")
# Still transfer index entry if file is gone
tgt_edges[edge_id] = edge_data
if edge_id in src_edges:
del src_edges[edge_id]
report["edges_migrated"] += 1
continue
dst_edge_path = target_dir / edge_data["path"]
dst_edge_path.parent.mkdir(parents=True, exist_ok=True)
try:
shutil.move(str(src_edge_path), str(dst_edge_path))
except Exception as e:
report["errors"].append(f"Failed to move edge {edge_id}: {e}")
continue
tgt_edges[edge_id] = edge_data
if edge_id in src_edges:
del src_edges[edge_id]
report["edges_migrated"] += 1
# --- Clean up orphan edges (cross-graph) ---
for edge_id, edge_data in orphan_edges.items():
# Strip relations from both endpoint memories
for mid_key in ("from_id", "to_id"):
mid = edge_data.get(mid_key, "")
if not mid:
continue
# Check source graph
if mid in src_entries:
mem_path = resolve_memory_path(source_dir, src_entries[mid])
if mem_path and strip_edge_from_frontmatter(mem_path, {edge_id}):
report["frontmatters_cleaned"] += 1
# Check if it was already moved to target
if mid in tgt_entries:
mem_path = resolve_memory_path(target_dir, tgt_entries[mid])
if mem_path and strip_edge_from_frontmatter(mem_path, {edge_id}):
report["frontmatters_cleaned"] += 1
# Delete the orphan edge file
src_edge_path = resolve_edge_path(source_dir, edge_data)
if src_edge_path and src_edge_path.exists():
src_edge_path.unlink()
# Remove from source index
if edge_id in src_edges:
del src_edges[edge_id]
report["edges_orphaned"] += 1
# --- Update counts and timestamps ---
now = datetime.now(timezone.utc).isoformat()
source_index["count"] = len(src_entries)
source_index["updated"] = now
source_embeddings["updated"] = now
source_state["updated"] = now
target_index["count"] = len(tgt_entries)
target_index["updated"] = now
target_embeddings["updated"] = now
target_state["updated"] = now
# --- Write all JSON files atomically ---
save_json(source_dir / "_index.json", source_index)
save_json(source_dir / "_embeddings.json", source_embeddings)
save_json(source_dir / "_state.json", source_state)
save_json(target_dir / "_index.json", target_index)
save_json(target_dir / "_embeddings.json", target_embeddings)
save_json(target_dir / "_state.json", target_state)
# --- Track overlaps for later cleanup ---
if overlap_tracking:
existing: Dict[str, Any] = {"overlaps": []}
if OVERLAP_TRACKING_PATH.exists():
try:
existing = json.loads(OVERLAP_TRACKING_PATH.read_text())
except (json.JSONDecodeError, OSError):
pass
existing.setdefault("overlaps", []).extend(overlap_tracking)
save_json(OVERLAP_TRACKING_PATH, existing)
return report
def cleanup_overlaps(dry_run: bool = False, no_git: bool = False) -> Dict[str, Any]:
"""Remove overlap memories from the default graph that have been copied to all target graphs."""
report = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"action": "cleanup_overlaps",
"dry_run": dry_run,
"removed": 0,
"kept": 0,
"details": [],
}
if not OVERLAP_TRACKING_PATH.exists():
print("No overlap tracking file found. Nothing to clean up.")
return report
tracking_data = json.loads(OVERLAP_TRACKING_PATH.read_text())
tracking = tracking_data.get("overlaps", [])
# Group by memory ID to see which graphs each overlap was copied to
copies_by_id: Dict[str, List[str]] = {}
info_by_id: Dict[str, Dict] = {}
for entry in tracking:
mid = entry["id"]
copies_by_id.setdefault(mid, []).append(entry["copied_to"])
info_by_id[mid] = entry
source_index = load_json(MEMORY_DIR / "_index.json")
source_embeddings = load_json(MEMORY_DIR / "_embeddings.json")
source_state = load_json(MEMORY_DIR / "_state.json")
src_entries = source_index.get("entries", {})
src_embed = source_embeddings.get("entries", {})
src_state_entries = source_state.get("entries", {})
for mid, copied_to_graphs in copies_by_id.items():
# Verify the memory exists in all target graphs before removing from default
all_present = True
for graph_name in set(copied_to_graphs):
tgt_dir = resolve_graph_path(graph_name)
tgt_index = load_json(tgt_dir / "_index.json")
if mid not in tgt_index.get("entries", {}):
all_present = False
break
info = info_by_id.get(mid, {})
if not all_present:
report["kept"] += 1
report["details"].append(
{
"id": mid,
"title": info.get("title", ""),
"action": "kept",
"reason": "not yet in all target graphs",
}
)
continue
if mid not in src_entries:
report["details"].append(
{
"id": mid,
"title": info.get("title", ""),
"action": "skipped",
"reason": "already removed from default",
}
)
continue
if dry_run:
report["removed"] += 1
report["details"].append(
{
"id": mid,
"title": info.get("title", ""),
"action": "would_remove",
}
)
continue
# Remove the file
entry = src_entries[mid]
mem_path = resolve_memory_path(MEMORY_DIR, entry)
if mem_path and mem_path.exists():
mem_path.unlink()
# Remove from indexes
del src_entries[mid]
src_embed.pop(mid, None)
src_state_entries.pop(mid, None)
report["removed"] += 1
report["details"].append(
{
"id": mid,
"title": info.get("title", ""),
"action": "removed",
}
)
if not dry_run and report["removed"] > 0:
now = datetime.now(timezone.utc).isoformat()
source_index["count"] = len(src_entries)
source_index["updated"] = now
source_embeddings["updated"] = now
source_state["updated"] = now
save_json(MEMORY_DIR / "_index.json", source_index)
save_json(MEMORY_DIR / "_embeddings.json", source_embeddings)
save_json(MEMORY_DIR / "_state.json", source_state)
# Clean up tracking file
OVERLAP_TRACKING_PATH.unlink(missing_ok=True)
if not no_git:
git_commit(
MEMORY_DIR,
f"cleanup: removed {report['removed']} overlap memories from default graph",
)
return report
def git_commit(graph_dir: Path, message: str) -> bool:
"""Stage all changes and commit in a graph directory."""
try:
subprocess.run(
["git", "add", "-A"],
cwd=str(graph_dir),
capture_output=True,
timeout=10,
)
result = subprocess.run(
["git", "commit", "-m", message],
cwd=str(graph_dir),
capture_output=True,
timeout=10,
)
return result.returncode == 0
except Exception:
return False
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def print_dry_run_report(
target_graph: str,
filter_tags: Set[str],
move_ids: List[str],
copy_ids: List[str],
skipped_ids: List[str],
migrate_edges: Dict[str, Any],
orphan_edges: Dict[str, Any],
source_index: Dict[str, Any],
) -> None:
entries = source_index.get("entries", {})
print(f"\n{'='*60}")
print(f" DRY RUN: Migration to '{target_graph}'")
print(f" Filter tags: {', '.join(sorted(filter_tags))}")
print(f"{'='*60}\n")
print(f" Memories to MOVE: {len(move_ids)}")
print(f" Memories to COPY: {len(copy_ids)} (overlap with other groups)")
print(f" Already in target: {len(skipped_ids)} (idempotent skip)")
print(f" Edges to migrate: {len(migrate_edges)}")
print(f" Edges to orphan: {len(orphan_edges)} (cross-graph, will be cleaned)")
print()
if move_ids:
print(" Memories to move:")
for mid in sorted(move_ids, key=lambda x: entries.get(x, {}).get("title", "")):
e = entries.get(mid, {})
print(
f" - {e.get('title', mid[:8])} [{e.get('type', '?')}] ({', '.join(e.get('tags', []))})"
)
print()
if copy_ids:
print(" Memories to copy (overlap):")
for mid in sorted(copy_ids, key=lambda x: entries.get(x, {}).get("title", "")):
e = entries.get(mid, {})
print(
f" - {e.get('title', mid[:8])} [{e.get('type', '?')}] ({', '.join(e.get('tags', []))})"
)
print()
if orphan_edges:
print(" Cross-graph edges to clean up:")
for edata in orphan_edges.values():
print(
f" - {edata.get('from_title', '?')} --{edata.get('type', '?')}--> {edata.get('to_title', '?')}"
)
print()
total = len(move_ids) + len(copy_ids)
print(f" Total: {total} memories would be migrated to '{target_graph}'")
print(f" Run without --dry-run to execute.\n")
def main():
parser = argparse.ArgumentParser(
description="Migrate memories from default graph to a named graph by tag.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s --tags "major-domo,sba,discord-bot" --target-graph major-domo --dry-run
%(prog)s --tags "paper-dynasty,card-game,gauntlet" --target-graph paper-dynasty
%(prog)s --cleanup-overlaps
""",
)
parser.add_argument("--tags", help="Comma-separated tags to match for migration")
parser.add_argument("--target-graph", help="Name of the destination graph")
parser.add_argument(
"--dry-run", action="store_true", help="Preview without making changes"
)
parser.add_argument(
"--no-git", action="store_true", help="Skip git commits after migration"
)
parser.add_argument(
"--cleanup-overlaps",
action="store_true",
help="Remove overlap memories from default graph after all migrations",
)
args = parser.parse_args()
# Cleanup mode
if args.cleanup_overlaps:
report = cleanup_overlaps(dry_run=args.dry_run, no_git=args.no_git)
if args.dry_run:
print(
f"\nDRY RUN: Would remove {report['removed']} overlap memories from default graph"
)
for d in report["details"]:
if d["action"] == "would_remove":
print(f" - {d['title']} ({d['id'][:8]})")
else:
print(
f"\nRemoved {report['removed']} overlap memories, kept {report['kept']}"
)
return
# Migration mode
if not args.tags or not args.target_graph:
parser.error(
"--tags and --target-graph are required (unless using --cleanup-overlaps)"
)
filter_tags = set(t.strip() for t in args.tags.split(","))
target_graph = args.target_graph
source_dir = MEMORY_DIR
target_dir = resolve_graph_path(target_graph)
if not source_dir.exists():
print(f"Error: Default graph not found at {source_dir}", file=sys.stderr)
sys.exit(1)
if not target_dir.exists():
print(
f"Error: Target graph '{target_graph}' not found at {target_dir}",
file=sys.stderr,
)
print(
f"Create it first with: claude-memory --graph {target_graph} store ...",
file=sys.stderr,
)
sys.exit(1)
# Load data
source_index = load_json(source_dir / "_index.json")
source_embeddings = load_json(source_dir / "_embeddings.json")
source_state = load_json(source_dir / "_state.json")
target_index = load_json(target_dir / "_index.json")
target_embeddings = load_json(target_dir / "_embeddings.json")
target_state = load_json(target_dir / "_state.json")
already_in_target = set(target_index.get("entries", {}).keys())
# Select candidates
move_ids, copy_ids, skipped_ids = select_candidates(
source_index.get("entries", {}), filter_tags, already_in_target
)
if not move_ids and not copy_ids:
print(f"\nNo memories to migrate for tags: {', '.join(sorted(filter_tags))}")
if skipped_ids:
print(f" ({len(skipped_ids)} already in target graph)")
return
# Classify edges
all_candidate_ids = set(move_ids) | set(copy_ids)
migrate_edges, orphan_edges = classify_edges(
source_index.get("edges", {}), all_candidate_ids
)
# Dry run
if args.dry_run:
print_dry_run_report(
target_graph,
filter_tags,
move_ids,
copy_ids,
skipped_ids,
migrate_edges,
orphan_edges,
source_index,
)
return
# Execute
print(f"\nMigrating to '{target_graph}'...")
report = execute_migration(
source_dir=source_dir,
target_dir=target_dir,
move_ids=move_ids,
copy_ids=copy_ids,
migrate_edges=migrate_edges,
orphan_edges=orphan_edges,
source_index=source_index,
source_embeddings=source_embeddings,
source_state=source_state,
target_index=target_index,
target_embeddings=target_embeddings,
target_state=target_state,
target_graph=target_graph,
)
# Git commits
if not args.no_git:
total = report["memories_moved"] + report["memories_copied"]
git_commit(
source_dir,
f"migrate: {report['memories_moved']} memories moved to {target_graph}",
)
git_commit(target_dir, f"migrate: received {total} memories from default graph")
# Print summary
print(f"\n Moved: {report['memories_moved']} memories")
print(f" Copied: {report['memories_copied']} memories (overlaps)")
print(f" Edges migrated: {report['edges_migrated']}")
print(f" Edges orphaned: {report['edges_orphaned']}")
print(f" Frontmatters cleaned: {report['frontmatters_cleaned']}")
if report["errors"]:
print(f"\n Errors ({len(report['errors'])}):")
for err in report["errors"]:
print(f" - {err}")
if report["warnings"]:
print(f"\n Warnings ({len(report['warnings'])}):")
for warn in report["warnings"]:
print(f" - {warn}")
# Write log
log_dir = Path.home() / ".claude" / "tmp"
log_dir.mkdir(parents=True, exist_ok=True)
ts = datetime.now().strftime("%Y%m%d-%H%M%S")
log_path = log_dir / f"migration-{target_graph}-{ts}.json"
save_json(log_path, report)
print(f"\n Log: {log_path}")
# Remind about post-migration steps
print(f"\n Next steps:")
print(f" claude-memory --graph {target_graph} core # regenerate CORE.md")
print(
f" claude-memory --graph default core # regenerate default CORE.md"
)
if __name__ == "__main__":
main()