claude-configs/skills/memorygraph/client.py
Cal Corum 8a1d15911f Initial commit: Claude Code configuration backup
Version control Claude Code configuration including:
- Global instructions (CLAUDE.md)
- User settings (settings.json)
- Custom agents (architect, designer, engineer, etc.)
- Custom skills (create-skill templates and workflows)

Excludes session data, secrets, cache, and temporary files per .gitignore.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-03 16:34:21 -06:00

882 lines
31 KiB
Python

#!/usr/bin/env python3
"""
MemoryGraph SQLite Client
Direct SQLite interface for MemoryGraph operations, bypassing MCP protocol.
Provides CLI and Python API for memory storage, retrieval, and relationship management.
Usage:
# CLI
python client.py store --type solution --title "Fixed X" --content "Details..." --tags "python,fix"
python client.py recall "timeout error"
python client.py get <memory_id>
python client.py relate <from_id> <to_id> SOLVES
# Python
from client import MemoryGraphClient
client = MemoryGraphClient()
memory_id = client.store_memory(type="solution", title="...", content="...")
"""
import argparse
import json
import os
import re
import sqlite3
import sys
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
# Database location
DB_PATH = Path.home() / ".memorygraph" / "memory.db"
# Valid memory types
MEMORY_TYPES = [
"solution", "problem", "error", "fix", "code_pattern", "decision",
"configuration", "workflow", "task", "project", "technology",
"command", "file_context", "general"
]
# Valid relationship types
RELATIONSHIP_TYPES = [
# Causal
"CAUSES", "TRIGGERS", "LEADS_TO", "PREVENTS", "BREAKS",
# Solution
"SOLVES", "ADDRESSES", "ALTERNATIVE_TO", "IMPROVES", "REPLACES",
# Context
"OCCURS_IN", "APPLIES_TO", "WORKS_WITH", "REQUIRES", "USED_IN",
# Learning
"BUILDS_ON", "CONTRADICTS", "CONFIRMS", "GENERALIZES", "SPECIALIZES",
# Similarity
"SIMILAR_TO", "VARIANT_OF", "RELATED_TO", "ANALOGY_TO", "OPPOSITE_OF",
# Workflow
"FOLLOWS", "DEPENDS_ON", "ENABLES", "BLOCKS", "PARALLEL_TO",
# Quality
"EFFECTIVE_FOR", "INEFFECTIVE_FOR", "PREFERRED_OVER", "DEPRECATED_BY", "VALIDATED_BY"
]
class MemoryGraphClient:
"""Client for direct SQLite access to MemoryGraph database."""
def __init__(self, db_path: Optional[Path] = None):
self.db_path = db_path or DB_PATH
if not self.db_path.exists():
raise FileNotFoundError(f"Database not found: {self.db_path}")
def _connect(self) -> sqlite3.Connection:
"""Create database connection with row factory."""
conn = sqlite3.connect(str(self.db_path))
conn.row_factory = sqlite3.Row
return conn
def _now_iso(self) -> str:
"""Get current timestamp in ISO format."""
return datetime.now(timezone.utc).isoformat()
# =========================================================================
# CORE OPERATIONS (High Priority)
# =========================================================================
def store_memory(
self,
type: str,
title: str,
content: str,
tags: Optional[List[str]] = None,
importance: float = 0.5,
summary: Optional[str] = None,
context: Optional[Dict[str, Any]] = None
) -> str:
"""
Store a new memory.
Args:
type: Memory type (solution, problem, error, fix, etc.)
title: Short descriptive title
content: Detailed content
tags: List of tags for categorization
importance: Importance score 0.0-1.0 (default 0.5)
summary: Optional brief summary
context: Optional context dict (project_path, files, etc.)
Returns:
memory_id: UUID of the created memory
"""
if type not in MEMORY_TYPES:
raise ValueError(f"Invalid memory type: {type}. Valid types: {MEMORY_TYPES}")
memory_id = str(uuid.uuid4())
now = self._now_iso()
# Normalize tags to lowercase
tags = [t.lower().strip() for t in (tags or [])]
properties = {
"id": memory_id,
"type": type,
"title": title,
"content": content,
"summary": summary,
"tags": tags,
"importance": max(0.0, min(1.0, importance)),
"confidence": 0.8,
"usage_count": 0,
"created_at": now,
"updated_at": now
}
if context:
properties["context"] = context
with self._connect() as conn:
# Insert node
conn.execute(
"INSERT INTO nodes (id, label, properties, created_at, updated_at) VALUES (?, ?, ?, ?, ?)",
(memory_id, "Memory", json.dumps(properties), now, now)
)
# Note: FTS is an external content table managed by triggers in the MCP schema.
# We use JSON-based search instead, so no FTS update needed.
conn.commit()
return memory_id
def recall_memories(
self,
query: str,
memory_types: Optional[List[str]] = None,
limit: int = 20,
project_path: Optional[str] = None
) -> List[Dict[str, Any]]:
"""
Natural language search for memories with fuzzy matching.
Args:
query: Natural language search query
memory_types: Optional filter by memory types
limit: Maximum results (default 20)
project_path: Optional filter by project path
Returns:
List of matching memories with relevance info
"""
query_lower = query.lower().strip()
terms = query_lower.split()
with self._connect() as conn:
# Use JSON-based search (more reliable than FTS for existing data)
# Search in title, content, summary, and tags
rows = conn.execute(
"""
SELECT id, properties, created_at
FROM nodes
WHERE label = 'Memory'
ORDER BY created_at DESC
"""
).fetchall()
results = []
for row in rows:
props = json.loads(row["properties"])
# Filter by memory type if specified
if memory_types and props.get("type") not in memory_types:
continue
# Filter by project path if specified
if project_path:
ctx = props.get("context", {})
if isinstance(ctx, dict) and ctx.get("project_path") != project_path:
continue
# Score based on term matches in title, content, summary, tags
title = (props.get("title") or "").lower()
content = (props.get("content") or "").lower()
summary = (props.get("summary") or "").lower()
tags = " ".join(props.get("tags") or []).lower()
searchable = f"{title} {content} {summary} {tags}"
# Check if any term matches
matches = sum(1 for term in terms if term in searchable)
if matches == 0:
continue
# Get relationships for context
rels = self._get_relationships_for_memory(conn, row["id"])
# Calculate relevance score
title_matches = sum(1 for term in terms if term in title) * 3
tag_matches = sum(1 for term in terms if term in tags) * 2
content_matches = matches
score = title_matches + tag_matches + content_matches
results.append({
"id": props["id"],
"type": props.get("type"),
"title": props.get("title"),
"content": props.get("content"),
"summary": props.get("summary"),
"tags": props.get("tags", []),
"importance": props.get("importance"),
"created_at": props.get("created_at"),
"relationships": rels,
"match_query": query,
"_score": score
})
# Sort by score (higher = better match)
results.sort(key=lambda x: x.pop("_score", 0), reverse=True)
return results[:limit]
def get_memory(
self,
memory_id: str,
include_relationships: bool = True
) -> Optional[Dict[str, Any]]:
"""
Retrieve a specific memory by ID.
Args:
memory_id: UUID of the memory
include_relationships: Whether to include related memories
Returns:
Memory dict or None if not found
"""
with self._connect() as conn:
row = conn.execute(
"SELECT id, properties, created_at, updated_at FROM nodes WHERE id = ? AND label = 'Memory'",
(memory_id,)
).fetchone()
if not row:
return None
props = json.loads(row["properties"])
result = {
"id": props["id"],
"type": props.get("type"),
"title": props.get("title"),
"content": props.get("content"),
"summary": props.get("summary"),
"tags": props.get("tags", []),
"importance": props.get("importance"),
"confidence": props.get("confidence"),
"usage_count": props.get("usage_count", 0),
"created_at": props.get("created_at"),
"updated_at": props.get("updated_at"),
"context": props.get("context")
}
if include_relationships:
result["relationships"] = self._get_relationships_for_memory(conn, memory_id)
# Increment usage count
props["usage_count"] = props.get("usage_count", 0) + 1
conn.execute(
"UPDATE nodes SET properties = ? WHERE id = ?",
(json.dumps(props), memory_id)
)
conn.commit()
return result
def create_relationship(
self,
from_memory_id: str,
to_memory_id: str,
relationship_type: str,
strength: float = 0.5,
confidence: float = 0.8,
context: Optional[str] = None
) -> str:
"""
Create a relationship between two memories.
Args:
from_memory_id: Source memory UUID
to_memory_id: Target memory UUID
relationship_type: Type of relationship (SOLVES, CAUSES, etc.)
strength: Relationship strength 0.0-1.0 (default 0.5)
confidence: Confidence score 0.0-1.0 (default 0.8)
context: Optional context description
Returns:
relationship_id: UUID of the created relationship
"""
if relationship_type not in RELATIONSHIP_TYPES:
raise ValueError(f"Invalid relationship type: {relationship_type}")
rel_id = str(uuid.uuid4())
now = self._now_iso()
# Build context structure
context_obj = {
"text": context or "",
"scope": None,
"components": [],
"conditions": [],
"evidence": [],
"temporal": None,
"exceptions": []
}
properties = {
"id": rel_id,
"strength": max(0.0, min(1.0, strength)),
"confidence": max(0.0, min(1.0, confidence)),
"context": json.dumps(context_obj),
"evidence_count": 1 if context else 0,
"success_rate": None,
"created_at": now,
"last_validated": now,
"validation_count": 0,
"counter_evidence_count": 0
}
with self._connect() as conn:
# Verify both memories exist
for mid in [from_memory_id, to_memory_id]:
exists = conn.execute(
"SELECT 1 FROM nodes WHERE id = ? AND label = 'Memory'", (mid,)
).fetchone()
if not exists:
raise ValueError(f"Memory not found: {mid}")
conn.execute(
"INSERT INTO relationships (id, from_id, to_id, rel_type, properties, created_at) VALUES (?, ?, ?, ?, ?, ?)",
(rel_id, from_memory_id, to_memory_id, relationship_type, json.dumps(properties), now)
)
conn.commit()
return rel_id
# =========================================================================
# MEDIUM PRIORITY OPERATIONS
# =========================================================================
def search_memories(
self,
query: Optional[str] = None,
memory_types: Optional[List[str]] = None,
tags: Optional[List[str]] = None,
min_importance: Optional[float] = None,
limit: int = 50,
offset: int = 0
) -> List[Dict[str, Any]]:
"""
Advanced search with fine-grained control.
Args:
query: Text search query (optional)
memory_types: Filter by memory types
tags: Filter by tags (matches any)
min_importance: Minimum importance score
limit: Maximum results
offset: Pagination offset
Returns:
List of matching memories
"""
query_terms = query.lower().strip().split() if query else []
with self._connect() as conn:
rows = conn.execute(
"SELECT id, properties FROM nodes WHERE label = 'Memory' ORDER BY created_at DESC"
).fetchall()
results = []
for row in rows:
props = json.loads(row["properties"])
# Apply type filter
if memory_types and props.get("type") not in memory_types:
continue
# Apply importance filter
if min_importance and props.get("importance", 0) < min_importance:
continue
# Apply tag filter
if tags:
mem_tags = set(props.get("tags", []))
if not mem_tags.intersection(set(t.lower() for t in tags)):
continue
# Apply query filter
if query_terms:
title = (props.get("title") or "").lower()
content = (props.get("content") or "").lower()
summary = (props.get("summary") or "").lower()
mem_tags_str = " ".join(props.get("tags") or []).lower()
searchable = f"{title} {content} {summary} {mem_tags_str}"
if not any(term in searchable for term in query_terms):
continue
content_preview = props.get("content") or ""
if len(content_preview) > 200:
content_preview = content_preview[:200] + "..."
results.append({
"id": props["id"],
"type": props.get("type"),
"title": props.get("title"),
"content": content_preview,
"tags": props.get("tags", []),
"importance": props.get("importance"),
"created_at": props.get("created_at")
})
# Apply pagination
return results[offset:offset + limit]
def update_memory(
self,
memory_id: str,
title: Optional[str] = None,
content: Optional[str] = None,
summary: Optional[str] = None,
tags: Optional[List[str]] = None,
importance: Optional[float] = None
) -> bool:
"""
Update an existing memory.
Args:
memory_id: UUID of memory to update
title, content, summary, tags, importance: Fields to update (None = no change)
Returns:
True if updated, False if not found
"""
with self._connect() as conn:
row = conn.execute(
"SELECT properties FROM nodes WHERE id = ? AND label = 'Memory'",
(memory_id,)
).fetchone()
if not row:
return False
props = json.loads(row["properties"])
now = self._now_iso()
# Update fields
if title is not None:
props["title"] = title
if content is not None:
props["content"] = content
if summary is not None:
props["summary"] = summary
if tags is not None:
props["tags"] = [t.lower().strip() for t in tags]
if importance is not None:
props["importance"] = max(0.0, min(1.0, importance))
props["updated_at"] = now
conn.execute(
"UPDATE nodes SET properties = ?, updated_at = ? WHERE id = ?",
(json.dumps(props), now, memory_id)
)
# Note: FTS is an external content table - we use JSON-based search instead
conn.commit()
return True
def delete_memory(self, memory_id: str) -> bool:
"""
Delete a memory and all its relationships.
Args:
memory_id: UUID of memory to delete
Returns:
True if deleted, False if not found
"""
with self._connect() as conn:
# Check exists
exists = conn.execute(
"SELECT 1 FROM nodes WHERE id = ? AND label = 'Memory'", (memory_id,)
).fetchone()
if not exists:
return False
# Delete relationships (CASCADE should handle but be explicit)
conn.execute(
"DELETE FROM relationships WHERE from_id = ? OR to_id = ?",
(memory_id, memory_id)
)
# Delete node (FTS is external content table - no direct deletion needed)
conn.execute("DELETE FROM nodes WHERE id = ?", (memory_id,))
conn.commit()
return True
def get_related_memories(
self,
memory_id: str,
relationship_types: Optional[List[str]] = None,
max_depth: int = 1
) -> List[Dict[str, Any]]:
"""
Find memories connected to a specific memory.
Args:
memory_id: UUID of the memory
relationship_types: Filter by relationship types
max_depth: Traversal depth (1-5)
Returns:
List of related memories with relationship info
"""
max_depth = max(1, min(5, max_depth))
with self._connect() as conn:
visited = set()
results = []
def traverse(mid: str, depth: int):
if depth > max_depth or mid in visited:
return
visited.add(mid)
# Get outgoing relationships
rows = conn.execute(
"""
SELECT r.rel_type, r.properties as rel_props, n.id, n.properties
FROM relationships r
JOIN nodes n ON r.to_id = n.id
WHERE r.from_id = ? AND n.label = 'Memory'
""",
(mid,)
).fetchall()
for row in rows:
if relationship_types and row["rel_type"] not in relationship_types:
continue
if row["id"] not in visited:
props = json.loads(row["properties"])
rel_props = json.loads(row["rel_props"])
results.append({
"id": props["id"],
"type": props.get("type"),
"title": props.get("title"),
"relationship": row["rel_type"],
"direction": "outgoing",
"strength": rel_props.get("strength"),
"depth": depth
})
traverse(row["id"], depth + 1)
# Get incoming relationships
rows = conn.execute(
"""
SELECT r.rel_type, r.properties as rel_props, n.id, n.properties
FROM relationships r
JOIN nodes n ON r.from_id = n.id
WHERE r.to_id = ? AND n.label = 'Memory'
""",
(mid,)
).fetchall()
for row in rows:
if relationship_types and row["rel_type"] not in relationship_types:
continue
if row["id"] not in visited:
props = json.loads(row["properties"])
rel_props = json.loads(row["rel_props"])
results.append({
"id": props["id"],
"type": props.get("type"),
"title": props.get("title"),
"relationship": row["rel_type"],
"direction": "incoming",
"strength": rel_props.get("strength"),
"depth": depth
})
traverse(row["id"], depth + 1)
traverse(memory_id, 1)
return results
# =========================================================================
# LOW PRIORITY OPERATIONS
# =========================================================================
def get_memory_statistics(self) -> Dict[str, Any]:
"""Get database statistics."""
with self._connect() as conn:
# Total memories
total = conn.execute(
"SELECT COUNT(*) as count FROM nodes WHERE label = 'Memory'"
).fetchone()["count"]
# Count by type
rows = conn.execute(
"""
SELECT json_extract(properties, '$.type') as type, COUNT(*) as count
FROM nodes WHERE label = 'Memory'
GROUP BY json_extract(properties, '$.type')
"""
).fetchall()
by_type = {row["type"]: row["count"] for row in rows}
# Relationship count
rel_count = conn.execute(
"SELECT COUNT(*) as count FROM relationships"
).fetchone()["count"]
# DB size
db_size = self.db_path.stat().st_size
return {
"total_memories": total,
"by_type": by_type,
"total_relationships": rel_count,
"database_size_bytes": db_size,
"database_path": str(self.db_path)
}
def get_recent_activity(
self,
days: int = 7,
project: Optional[str] = None
) -> Dict[str, Any]:
"""Get recent memory activity summary."""
with self._connect() as conn:
cutoff = datetime.now(timezone.utc).isoformat()[:10] # Just date part for simplicity
rows = conn.execute(
"""
SELECT id, properties FROM nodes
WHERE label = 'Memory'
ORDER BY created_at DESC
LIMIT 50
"""
).fetchall()
recent = []
by_type = {}
for row in rows:
props = json.loads(row["properties"])
mem_type = props.get("type", "unknown")
by_type[mem_type] = by_type.get(mem_type, 0) + 1
recent.append({
"id": props["id"],
"type": mem_type,
"title": props.get("title"),
"created_at": props.get("created_at")
})
return {
"period_days": days,
"summary_by_type": by_type,
"recent_memories": recent[:20],
"total_in_period": len(recent)
}
# =========================================================================
# HELPER METHODS
# =========================================================================
def _get_relationships_for_memory(self, conn: sqlite3.Connection, memory_id: str) -> Dict[str, List[str]]:
"""Get relationships grouped by type for a memory."""
rels = {}
# Outgoing
rows = conn.execute(
"SELECT rel_type, to_id FROM relationships WHERE from_id = ?",
(memory_id,)
).fetchall()
for row in rows:
key = f"{row['rel_type']}_to"
if key not in rels:
rels[key] = []
rels[key].append(row["to_id"])
# Incoming
rows = conn.execute(
"SELECT rel_type, from_id FROM relationships WHERE to_id = ?",
(memory_id,)
).fetchall()
for row in rows:
key = f"{row['rel_type']}_from"
if key not in rels:
rels[key] = []
rels[key].append(row["from_id"])
return rels
# =============================================================================
# CLI INTERFACE
# =============================================================================
def main():
parser = argparse.ArgumentParser(
description="MemoryGraph CLI - Direct SQLite interface for memory operations",
formatter_class=argparse.RawDescriptionHelpFormatter
)
subparsers = parser.add_subparsers(dest="command", help="Commands")
# store
store_parser = subparsers.add_parser("store", help="Store a new memory")
store_parser.add_argument("--type", "-t", required=True, choices=MEMORY_TYPES, help="Memory type")
store_parser.add_argument("--title", required=True, help="Memory title")
store_parser.add_argument("--content", "-c", required=True, help="Memory content")
store_parser.add_argument("--tags", help="Comma-separated tags")
store_parser.add_argument("--importance", "-i", type=float, default=0.5, help="Importance 0.0-1.0")
store_parser.add_argument("--summary", "-s", help="Brief summary")
# recall
recall_parser = subparsers.add_parser("recall", help="Search memories with natural language")
recall_parser.add_argument("query", help="Search query")
recall_parser.add_argument("--types", help="Comma-separated memory types to filter")
recall_parser.add_argument("--limit", "-n", type=int, default=10, help="Max results")
# get
get_parser = subparsers.add_parser("get", help="Get a specific memory by ID")
get_parser.add_argument("memory_id", help="Memory UUID")
get_parser.add_argument("--no-rels", action="store_true", help="Exclude relationships")
# relate
relate_parser = subparsers.add_parser("relate", help="Create relationship between memories")
relate_parser.add_argument("from_id", help="Source memory UUID")
relate_parser.add_argument("to_id", help="Target memory UUID")
relate_parser.add_argument("rel_type", choices=RELATIONSHIP_TYPES, help="Relationship type")
relate_parser.add_argument("--context", help="Context description")
relate_parser.add_argument("--strength", type=float, default=0.5, help="Strength 0.0-1.0")
# search
search_parser = subparsers.add_parser("search", help="Advanced search with filters")
search_parser.add_argument("--query", "-q", help="Text query")
search_parser.add_argument("--types", help="Comma-separated memory types")
search_parser.add_argument("--tags", help="Comma-separated tags")
search_parser.add_argument("--min-importance", type=float, help="Minimum importance")
search_parser.add_argument("--limit", "-n", type=int, default=20, help="Max results")
# update
update_parser = subparsers.add_parser("update", help="Update a memory")
update_parser.add_argument("memory_id", help="Memory UUID")
update_parser.add_argument("--title", help="New title")
update_parser.add_argument("--content", help="New content")
update_parser.add_argument("--tags", help="New tags (comma-separated)")
update_parser.add_argument("--importance", type=float, help="New importance")
# delete
delete_parser = subparsers.add_parser("delete", help="Delete a memory")
delete_parser.add_argument("memory_id", help="Memory UUID")
delete_parser.add_argument("--force", "-f", action="store_true", help="Skip confirmation")
# related
related_parser = subparsers.add_parser("related", help="Get related memories")
related_parser.add_argument("memory_id", help="Memory UUID")
related_parser.add_argument("--types", help="Comma-separated relationship types")
related_parser.add_argument("--depth", type=int, default=1, help="Traversal depth 1-5")
# stats
subparsers.add_parser("stats", help="Show database statistics")
# recent
recent_parser = subparsers.add_parser("recent", help="Show recent activity")
recent_parser.add_argument("--days", type=int, default=7, help="Days to look back")
args = parser.parse_args()
if not args.command:
parser.print_help()
sys.exit(1)
try:
client = MemoryGraphClient()
except FileNotFoundError as e:
print(json.dumps({"error": str(e)}))
sys.exit(1)
result = None
if args.command == "store":
tags = [t.strip() for t in args.tags.split(",")] if args.tags else None
memory_id = client.store_memory(
type=args.type,
title=args.title,
content=args.content,
tags=tags,
importance=args.importance,
summary=args.summary
)
result = {"success": True, "memory_id": memory_id}
elif args.command == "recall":
types = [t.strip() for t in args.types.split(",")] if args.types else None
result = client.recall_memories(args.query, memory_types=types, limit=args.limit)
elif args.command == "get":
result = client.get_memory(args.memory_id, include_relationships=not args.no_rels)
if not result:
result = {"error": "Memory not found"}
elif args.command == "relate":
rel_id = client.create_relationship(
args.from_id, args.to_id, args.rel_type,
strength=args.strength, context=args.context
)
result = {"success": True, "relationship_id": rel_id}
elif args.command == "search":
types = [t.strip() for t in args.types.split(",")] if args.types else None
tags = [t.strip() for t in args.tags.split(",")] if args.tags else None
result = client.search_memories(
query=args.query,
memory_types=types,
tags=tags,
min_importance=args.min_importance,
limit=args.limit
)
elif args.command == "update":
tags = [t.strip() for t in args.tags.split(",")] if args.tags else None
success = client.update_memory(
args.memory_id,
title=args.title,
content=args.content,
tags=tags,
importance=args.importance
)
result = {"success": success}
elif args.command == "delete":
if not args.force:
# Get memory first to show what's being deleted
mem = client.get_memory(args.memory_id, include_relationships=False)
if mem:
print(f"Deleting: {mem.get('title')}", file=sys.stderr)
success = client.delete_memory(args.memory_id)
result = {"success": success}
elif args.command == "related":
types = [t.strip() for t in args.types.split(",")] if args.types else None
result = client.get_related_memories(args.memory_id, relationship_types=types, max_depth=args.depth)
elif args.command == "stats":
result = client.get_memory_statistics()
elif args.command == "recent":
result = client.get_recent_activity(days=args.days)
print(json.dumps(result, indent=2, default=str))
if __name__ == "__main__":
main()