claude-configs/skills/cognitive-memory/dev/migrate.py
Cal Corum f0f075461e Reorganize cognitive-memory skill: consolidate scripts, systemd, dev subdirs
- Move session_memory.py, ensure-symlinks.sh into skills/cognitive-memory/scripts/
- Copy systemd units into skills/cognitive-memory/systemd/ with README
- Move PROJECT_PLAN.json, migrate.py into skills/cognitive-memory/dev/
- Add mtime-based embeddings cache to client.py (6x faster semantic recall)
- Default recall to semantic+keyword merge (was keyword-only)
- Update settings.json SessionEnd hook path, MCP allow entry
- Update SKILL.md, feature.json, mcp_server.py docs for new defaults

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-19 16:02:20 -06:00

532 lines
18 KiB
Python

#!/usr/bin/env python3
"""
Cognitive Memory Migration Script
Migrates all memories from MemoryGraph SQLite database to markdown-based
cognitive memory system. Idempotent - skips files that already exist.
Usage:
python migrate.py # Run migration
python migrate.py --dry-run # Preview without writing
python migrate.py --verify # Verify post-migration integrity
"""
import json
import os
import re
import sqlite3
import sys
from datetime import datetime, timezone
from pathlib import Path
# Import from sibling module
sys.path.insert(0, str(Path(__file__).parent))
from client import (
CognitiveMemoryClient,
MEMORY_DIR,
TYPE_DIRS,
TYPE_WEIGHTS,
VALID_TYPES,
calculate_decay_score,
make_filename,
parse_frontmatter,
serialize_frontmatter,
slugify,
)
# MemoryGraph database location
MEMORYGRAPH_DB = Path.home() / ".memorygraph" / "memory.db"
# Memory type mapping: MemoryGraph types -> cognitive-memory types
# MemoryGraph has more types; map extras to closest cognitive-memory equivalent
TYPE_MAP = {
"solution": "solution",
"problem": "problem",
"error": "error",
"fix": "fix",
"code_pattern": "code_pattern",
"decision": "decision",
"configuration": "configuration",
"workflow": "workflow",
"general": "general",
# MemoryGraph-only types mapped to closest equivalents
"task": "general",
"project": "general",
"technology": "general",
"command": "general",
"file_context": "general",
}
def load_sqlite_memories(db_path: Path) -> list:
"""Load all memories from MemoryGraph SQLite database."""
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
rows = conn.execute(
"SELECT id, properties, created_at, updated_at FROM nodes WHERE label = 'Memory'"
).fetchall()
memories = []
for row in rows:
props = json.loads(row["properties"])
memories.append({
"id": props.get("id", row["id"]),
"type": props.get("type", "general"),
"title": props.get("title", "Untitled"),
"content": props.get("content", ""),
"summary": props.get("summary"),
"tags": props.get("tags", []),
"importance": props.get("importance", 0.5),
"confidence": props.get("confidence", 0.8),
"usage_count": props.get("usage_count", 0),
"created_at": props.get("created_at", row["created_at"]),
"updated_at": props.get("updated_at", row["updated_at"]),
})
conn.close()
return memories
def load_sqlite_relationships(db_path: Path) -> list:
"""Load all relationships from MemoryGraph SQLite database."""
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
rows = conn.execute(
"SELECT id, from_id, to_id, rel_type, properties, created_at FROM relationships"
).fetchall()
relationships = []
for row in rows:
props = json.loads(row["properties"])
# Parse context - may be a JSON string within JSON
context_raw = props.get("context", "")
context_text = ""
if context_raw:
try:
ctx = json.loads(context_raw) if isinstance(context_raw, str) else context_raw
if isinstance(ctx, dict):
context_text = ctx.get("text", "")
else:
context_text = str(ctx)
except (json.JSONDecodeError, TypeError):
context_text = str(context_raw)
relationships.append({
"id": row["id"],
"from_id": row["from_id"],
"to_id": row["to_id"],
"rel_type": row["rel_type"],
"strength": props.get("strength", 0.5),
"context": context_text,
})
conn.close()
return relationships
def migrate(dry_run: bool = False):
"""Run the full migration from MemoryGraph to cognitive-memory."""
if not MEMORYGRAPH_DB.exists():
print(f"Error: MemoryGraph database not found at {MEMORYGRAPH_DB}")
sys.exit(1)
print(f"Loading memories from {MEMORYGRAPH_DB}...")
memories = load_sqlite_memories(MEMORYGRAPH_DB)
relationships = load_sqlite_relationships(MEMORYGRAPH_DB)
print(f"Found {len(memories)} memories and {len(relationships)} relationships")
if dry_run:
print("\n--- DRY RUN ---")
by_type = {}
for mem in memories:
t = TYPE_MAP.get(mem["type"], "general")
by_type[t] = by_type.get(t, 0) + 1
print("Type distribution after mapping:")
for t, count in sorted(by_type.items(), key=lambda x: -x[1]):
dir_name = TYPE_DIRS.get(t, "general")
print(f" graph/{dir_name}/: {count}")
print(f"\nRelationships to embed: {len(relationships)}")
return
# Initialize client (creates directories)
client = CognitiveMemoryClient()
# Build memory ID -> file path mapping
id_to_path = {}
created_count = 0
skipped_count = 0
print("\nPhase 1: Creating markdown files...")
for i, mem in enumerate(memories, 1):
memory_id = mem["id"]
mem_type = TYPE_MAP.get(mem["type"], "general")
type_dir = TYPE_DIRS.get(mem_type, "general")
# Create filename
filename = make_filename(mem["title"], memory_id)
rel_path = f"graph/{type_dir}/{filename}"
full_path = MEMORY_DIR / rel_path
# Check if already exists (idempotent)
if full_path.exists():
id_to_path[memory_id] = (full_path, rel_path)
skipped_count += 1
continue
# Build frontmatter
frontmatter = {
"id": memory_id,
"type": mem_type,
"title": mem["title"],
"tags": mem.get("tags", []),
"importance": mem.get("importance", 0.5),
"confidence": mem.get("confidence", 0.8),
"created": mem.get("created_at", ""),
"updated": mem.get("updated_at", ""),
}
# Build content body
content = mem.get("content", "")
if mem.get("summary"):
content = f"{content}\n\n**Summary:** {mem['summary']}"
# Write file
client._write_memory_file(full_path, frontmatter, content)
id_to_path[memory_id] = (full_path, rel_path)
created_count += 1
if i % 50 == 0:
print(f" {i}/{len(memories)} files created...")
print(f" Created: {created_count}, Skipped (existing): {skipped_count}")
# Phase 2: Embed relationships into frontmatter
print("\nPhase 2: Embedding relationships into frontmatter...")
rel_count = 0
# Group relationships by source memory
from_rels = {} # from_id -> list of (to_id, type, strength, context)
for rel in relationships:
from_rels.setdefault(rel["from_id"], []).append(rel)
for from_id, rels in from_rels.items():
if from_id not in id_to_path:
print(f" Warning: Source memory {from_id[:8]} not found, skipping {len(rels)} relationships")
continue
full_path, rel_path = id_to_path[from_id]
# Read current frontmatter
fm, body = client._read_memory_file(full_path)
existing_rels = fm.get("relations", [])
existing_targets = {(r.get("target"), r.get("type")) for r in existing_rels}
added = 0
for rel in rels:
to_id = rel["to_id"]
if to_id not in id_to_path:
continue
if (to_id, rel["rel_type"]) in existing_targets:
continue # Already exists
# Normalize relation type to valid set
rel_type = rel["rel_type"]
if rel_type not in ("SOLVES", "CAUSES", "BUILDS_ON", "ALTERNATIVE_TO",
"REQUIRES", "FOLLOWS", "RELATED_TO"):
rel_type = "RELATED_TO" # Map unknown types to RELATED_TO
new_rel = {
"target": to_id,
"type": rel_type,
"direction": "outgoing",
"strength": rel.get("strength", 0.5),
}
if rel.get("context"):
new_rel["context"] = rel["context"]
existing_rels.append(new_rel)
added += 1
if added > 0:
fm["relations"] = existing_rels
client._write_memory_file(full_path, fm, body)
rel_count += added
# Also add incoming relations to target memories
for rel in rels:
to_id = rel["to_id"]
if to_id not in id_to_path:
continue
to_path, to_rel = id_to_path[to_id]
to_fm, to_body = client._read_memory_file(to_path)
to_rels = to_fm.get("relations", [])
# Check for existing incoming
has_incoming = any(
r.get("target") == from_id and r.get("direction") == "incoming"
for r in to_rels
)
if has_incoming:
continue
rel_type = rel["rel_type"]
if rel_type not in ("SOLVES", "CAUSES", "BUILDS_ON", "ALTERNATIVE_TO",
"REQUIRES", "FOLLOWS", "RELATED_TO"):
rel_type = "RELATED_TO"
incoming = {
"target": from_id,
"type": rel_type,
"direction": "incoming",
"strength": rel.get("strength", 0.5),
}
if rel.get("context"):
incoming["context"] = rel["context"]
to_rels.append(incoming)
to_fm["relations"] = to_rels
client._write_memory_file(to_path, to_fm, to_body)
print(f" Embedded {rel_count} outgoing relationships")
# Phase 3: Build _index.json
print("\nPhase 3: Building index...")
indexed = client.reindex()
print(f" Indexed {indexed} memories")
# Phase 4: Initialize _state.json with usage data
print("\nPhase 4: Initializing state with usage data...")
state = client._load_state()
now = datetime.now(timezone.utc)
for mem in memories:
mid = mem["id"]
usage_count = mem.get("usage_count", 0)
created_str = mem.get("created_at", "")
# Calculate initial decay
try:
created_dt = datetime.fromisoformat(created_str.replace("Z", "+00:00"))
if created_dt.tzinfo is None:
created_dt = created_dt.replace(tzinfo=timezone.utc)
days = (now - created_dt).total_seconds() / 86400
except (ValueError, AttributeError):
days = 30
mem_type = TYPE_MAP.get(mem["type"], "general")
type_weight = TYPE_WEIGHTS.get(mem_type, 1.0)
importance = mem.get("importance", 0.5)
decay_score = calculate_decay_score(importance, days, usage_count, type_weight)
state.setdefault("entries", {})[mid] = {
"access_count": usage_count,
"last_accessed": mem.get("updated_at", mem.get("created_at", now.isoformat())),
"decay_score": round(decay_score, 4),
}
client._save_state(state)
print(f" Initialized state for {len(state.get('entries', {}))} memories")
# Phase 5: Git commit all migrated files
print("\nPhase 5: Git commit...")
try:
import subprocess
subprocess.run(
["git", "add", "-A"],
cwd=str(MEMORY_DIR),
capture_output=True, timeout=30
)
subprocess.run(
["git", "commit", "-m",
f"migrate: {len(memories)} memories from MemoryGraph\n\n"
f"- {created_count} new markdown files created\n"
f"- {rel_count} relationships embedded\n"
f"- {indexed} entries indexed\n"
f"- State initialized with usage data"],
cwd=str(MEMORY_DIR),
capture_output=True, timeout=30
)
print(" Committed to git")
except Exception as e:
print(f" Warning: Git commit failed: {e}")
# Phase 6: Archive MemoryGraph database
print("\nPhase 6: Archiving MemoryGraph database...")
archive_path = MEMORYGRAPH_DB.with_suffix(".db.archive")
if not archive_path.exists():
import shutil
shutil.copy2(str(MEMORYGRAPH_DB), str(archive_path))
print(f" Archived to {archive_path}")
else:
print(f" Archive already exists at {archive_path}")
# Generate CORE.md
print("\nPhase 7: Generating CORE.md...")
client.core()
print(" CORE.md generated")
# Summary
print("\n" + "=" * 60)
print("Migration Complete!")
print("=" * 60)
print(f" Memories migrated: {len(memories)}")
print(f" Files created: {created_count}")
print(f" Files skipped: {skipped_count}")
print(f" Relations embedded: {rel_count}")
print(f" Index entries: {indexed}")
print(f" Memory dir: {MEMORY_DIR}")
print(f" Archive: {archive_path}")
def verify():
"""Verify migration integrity."""
print("Verifying migration integrity...\n")
if not MEMORYGRAPH_DB.exists():
# Try archive
archive = MEMORYGRAPH_DB.with_suffix(".db.archive")
if archive.exists():
db_path = archive
else:
print("Error: No MemoryGraph database found for verification")
sys.exit(1)
else:
db_path = MEMORYGRAPH_DB
# Load SQLite data
memories = load_sqlite_memories(db_path)
relationships = load_sqlite_relationships(db_path)
client = CognitiveMemoryClient()
index = client._load_index()
state = client._load_state()
errors = []
warnings = []
# Check 1: Count match
sqlite_count = len(memories)
md_count = len(index.get("entries", {}))
if sqlite_count != md_count:
errors.append(f"Count mismatch: SQLite={sqlite_count}, Index={md_count}")
else:
print(f"[OK] Memory count matches: {sqlite_count}")
# Check 2: All memories have files
missing_files = 0
for mid, entry in index.get("entries", {}).items():
path = MEMORY_DIR / entry.get("path", "")
if not path.exists():
missing_files += 1
if missing_files <= 5:
errors.append(f"Missing file: {entry.get('path')} ({entry.get('title', '')[:40]})")
if missing_files == 0:
print(f"[OK] All {md_count} files exist on disk")
else:
errors.append(f"Total missing files: {missing_files}")
# Check 3: State entries
state_count = len(state.get("entries", {}))
if state_count != sqlite_count:
warnings.append(f"State entry count mismatch: expected={sqlite_count}, got={state_count}")
else:
print(f"[OK] State entries match: {state_count}")
# Check 4: Spot check 5 random memories
import random
sample = random.sample(memories, min(5, len(memories)))
spot_ok = 0
for mem in sample:
path = client._resolve_memory_path(mem["id"])
if path:
fm, body = client._read_memory_file(path)
if fm.get("title") == mem["title"]:
spot_ok += 1
else:
warnings.append(
f"Title mismatch for {mem['id'][:8]}: "
f"SQLite='{mem['title'][:40]}', MD='{fm.get('title', '')[:40]}'"
)
else:
errors.append(f"Memory {mem['id'][:8]} not found in markdown: {mem['title'][:40]}")
print(f"[OK] Spot check: {spot_ok}/5 memories match")
# Check 5: Relationships
rel_in_index = sum(
len(entry.get("relations", []))
for entry in index.get("entries", {}).values()
)
# Each relationship creates 2 entries (outgoing + incoming)
expected_rel_entries = len(relationships) * 2
if rel_in_index < len(relationships):
warnings.append(
f"Relation count may be low: SQLite={len(relationships)}, "
f"Index entries={rel_in_index} (expected ~{expected_rel_entries})"
)
else:
print(f"[OK] Relationships: {len(relationships)} original, {rel_in_index} index entries")
# Check 6: Git status
try:
import subprocess
result = subprocess.run(
["git", "status", "--porcelain"],
cwd=str(MEMORY_DIR),
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
untracked = [l for l in result.stdout.strip().split("\n") if l.strip() and not l.startswith("??")]
if untracked:
warnings.append(f"Uncommitted changes in memory repo: {len(untracked)} files")
else:
print("[OK] Git repo clean")
else:
warnings.append("Not a git repo or git error")
except Exception:
warnings.append("Could not check git status")
# Check 7: CORE.md exists
core_path = MEMORY_DIR / "CORE.md"
if core_path.exists():
content = core_path.read_text()
print(f"[OK] CORE.md exists ({len(content)} chars)")
else:
warnings.append("CORE.md not found")
# Report
print()
if errors:
print(f"ERRORS ({len(errors)}):")
for e in errors:
print(f" [!] {e}")
if warnings:
print(f"WARNINGS ({len(warnings)}):")
for w in warnings:
print(f" [?] {w}")
if not errors and not warnings:
print("All checks passed!")
elif not errors:
print(f"\nMigration OK with {len(warnings)} warning(s)")
else:
print(f"\nMigration has {len(errors)} error(s) that need attention")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Migrate MemoryGraph to Cognitive Memory")
parser.add_argument("--dry-run", action="store_true", help="Preview without writing")
parser.add_argument("--verify", action="store_true", help="Verify migration integrity")
args = parser.parse_args()
if args.verify:
verify()
else:
migrate(dry_run=args.dry_run)