#!/usr/bin/env python3 """ MemoryGraph SQLite Client Direct SQLite interface for MemoryGraph operations, bypassing MCP protocol. Provides CLI and Python API for memory storage, retrieval, and relationship management. Usage: # CLI python client.py store --type solution --title "Fixed X" --content "Details..." --tags "python,fix" python client.py recall "timeout error" python client.py get python client.py relate SOLVES # Python from client import MemoryGraphClient client = MemoryGraphClient() memory_id = client.store_memory(type="solution", title="...", content="...") """ import argparse import json import os import re import sqlite3 import sys import uuid from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional, Union # Database location DB_PATH = Path.home() / ".memorygraph" / "memory.db" # Valid memory types MEMORY_TYPES = [ "solution", "problem", "error", "fix", "code_pattern", "decision", "configuration", "workflow", "task", "project", "technology", "command", "file_context", "general" ] # Valid relationship types RELATIONSHIP_TYPES = [ # Causal "CAUSES", "TRIGGERS", "LEADS_TO", "PREVENTS", "BREAKS", # Solution "SOLVES", "ADDRESSES", "ALTERNATIVE_TO", "IMPROVES", "REPLACES", # Context "OCCURS_IN", "APPLIES_TO", "WORKS_WITH", "REQUIRES", "USED_IN", # Learning "BUILDS_ON", "CONTRADICTS", "CONFIRMS", "GENERALIZES", "SPECIALIZES", # Similarity "SIMILAR_TO", "VARIANT_OF", "RELATED_TO", "ANALOGY_TO", "OPPOSITE_OF", # Workflow "FOLLOWS", "DEPENDS_ON", "ENABLES", "BLOCKS", "PARALLEL_TO", # Quality "EFFECTIVE_FOR", "INEFFECTIVE_FOR", "PREFERRED_OVER", "DEPRECATED_BY", "VALIDATED_BY" ] class MemoryGraphClient: """Client for direct SQLite access to MemoryGraph database.""" def __init__(self, db_path: Optional[Path] = None): self.db_path = db_path or DB_PATH if not self.db_path.exists(): raise FileNotFoundError(f"Database not found: {self.db_path}") def _connect(self) -> sqlite3.Connection: """Create database connection with row factory.""" conn = sqlite3.connect(str(self.db_path)) conn.row_factory = sqlite3.Row return conn def _now_iso(self) -> str: """Get current timestamp in ISO format.""" return datetime.now(timezone.utc).isoformat() # ========================================================================= # CORE OPERATIONS (High Priority) # ========================================================================= def store_memory( self, type: str, title: str, content: str, tags: Optional[List[str]] = None, importance: float = 0.5, summary: Optional[str] = None, context: Optional[Dict[str, Any]] = None ) -> str: """ Store a new memory. Args: type: Memory type (solution, problem, error, fix, etc.) title: Short descriptive title content: Detailed content tags: List of tags for categorization importance: Importance score 0.0-1.0 (default 0.5) summary: Optional brief summary context: Optional context dict (project_path, files, etc.) Returns: memory_id: UUID of the created memory """ if type not in MEMORY_TYPES: raise ValueError(f"Invalid memory type: {type}. Valid types: {MEMORY_TYPES}") memory_id = str(uuid.uuid4()) now = self._now_iso() # Normalize tags to lowercase tags = [t.lower().strip() for t in (tags or [])] properties = { "id": memory_id, "type": type, "title": title, "content": content, "summary": summary, "tags": tags, "importance": max(0.0, min(1.0, importance)), "confidence": 0.8, "usage_count": 0, "created_at": now, "updated_at": now } if context: properties["context"] = context with self._connect() as conn: # Insert node conn.execute( "INSERT INTO nodes (id, label, properties, created_at, updated_at) VALUES (?, ?, ?, ?, ?)", (memory_id, "Memory", json.dumps(properties), now, now) ) # Note: FTS is an external content table managed by triggers in the MCP schema. # We use JSON-based search instead, so no FTS update needed. conn.commit() return memory_id def recall_memories( self, query: str, memory_types: Optional[List[str]] = None, limit: int = 20, project_path: Optional[str] = None ) -> List[Dict[str, Any]]: """ Natural language search for memories with fuzzy matching. Args: query: Natural language search query memory_types: Optional filter by memory types limit: Maximum results (default 20) project_path: Optional filter by project path Returns: List of matching memories with relevance info """ query_lower = query.lower().strip() terms = query_lower.split() with self._connect() as conn: # Use JSON-based search (more reliable than FTS for existing data) # Search in title, content, summary, and tags rows = conn.execute( """ SELECT id, properties, created_at FROM nodes WHERE label = 'Memory' ORDER BY created_at DESC """ ).fetchall() results = [] for row in rows: props = json.loads(row["properties"]) # Filter by memory type if specified if memory_types and props.get("type") not in memory_types: continue # Filter by project path if specified if project_path: ctx = props.get("context", {}) if isinstance(ctx, dict) and ctx.get("project_path") != project_path: continue # Score based on term matches in title, content, summary, tags title = (props.get("title") or "").lower() content = (props.get("content") or "").lower() summary = (props.get("summary") or "").lower() tags = " ".join(props.get("tags") or []).lower() searchable = f"{title} {content} {summary} {tags}" # Check if any term matches matches = sum(1 for term in terms if term in searchable) if matches == 0: continue # Get relationships for context rels = self._get_relationships_for_memory(conn, row["id"]) # Calculate relevance score title_matches = sum(1 for term in terms if term in title) * 3 tag_matches = sum(1 for term in terms if term in tags) * 2 content_matches = matches score = title_matches + tag_matches + content_matches results.append({ "id": props["id"], "type": props.get("type"), "title": props.get("title"), "content": props.get("content"), "summary": props.get("summary"), "tags": props.get("tags", []), "importance": props.get("importance"), "created_at": props.get("created_at"), "relationships": rels, "match_query": query, "_score": score }) # Sort by score (higher = better match) results.sort(key=lambda x: x.pop("_score", 0), reverse=True) return results[:limit] def get_memory( self, memory_id: str, include_relationships: bool = True ) -> Optional[Dict[str, Any]]: """ Retrieve a specific memory by ID. Args: memory_id: UUID of the memory include_relationships: Whether to include related memories Returns: Memory dict or None if not found """ with self._connect() as conn: row = conn.execute( "SELECT id, properties, created_at, updated_at FROM nodes WHERE id = ? AND label = 'Memory'", (memory_id,) ).fetchone() if not row: return None props = json.loads(row["properties"]) result = { "id": props["id"], "type": props.get("type"), "title": props.get("title"), "content": props.get("content"), "summary": props.get("summary"), "tags": props.get("tags", []), "importance": props.get("importance"), "confidence": props.get("confidence"), "usage_count": props.get("usage_count", 0), "created_at": props.get("created_at"), "updated_at": props.get("updated_at"), "context": props.get("context") } if include_relationships: result["relationships"] = self._get_relationships_for_memory(conn, memory_id) # Increment usage count props["usage_count"] = props.get("usage_count", 0) + 1 conn.execute( "UPDATE nodes SET properties = ? WHERE id = ?", (json.dumps(props), memory_id) ) conn.commit() return result def create_relationship( self, from_memory_id: str, to_memory_id: str, relationship_type: str, strength: float = 0.5, confidence: float = 0.8, context: Optional[str] = None ) -> str: """ Create a relationship between two memories. Args: from_memory_id: Source memory UUID to_memory_id: Target memory UUID relationship_type: Type of relationship (SOLVES, CAUSES, etc.) strength: Relationship strength 0.0-1.0 (default 0.5) confidence: Confidence score 0.0-1.0 (default 0.8) context: Optional context description Returns: relationship_id: UUID of the created relationship """ if relationship_type not in RELATIONSHIP_TYPES: raise ValueError(f"Invalid relationship type: {relationship_type}") rel_id = str(uuid.uuid4()) now = self._now_iso() # Build context structure context_obj = { "text": context or "", "scope": None, "components": [], "conditions": [], "evidence": [], "temporal": None, "exceptions": [] } properties = { "id": rel_id, "strength": max(0.0, min(1.0, strength)), "confidence": max(0.0, min(1.0, confidence)), "context": json.dumps(context_obj), "evidence_count": 1 if context else 0, "success_rate": None, "created_at": now, "last_validated": now, "validation_count": 0, "counter_evidence_count": 0 } with self._connect() as conn: # Verify both memories exist for mid in [from_memory_id, to_memory_id]: exists = conn.execute( "SELECT 1 FROM nodes WHERE id = ? AND label = 'Memory'", (mid,) ).fetchone() if not exists: raise ValueError(f"Memory not found: {mid}") conn.execute( "INSERT INTO relationships (id, from_id, to_id, rel_type, properties, created_at) VALUES (?, ?, ?, ?, ?, ?)", (rel_id, from_memory_id, to_memory_id, relationship_type, json.dumps(properties), now) ) conn.commit() return rel_id # ========================================================================= # MEDIUM PRIORITY OPERATIONS # ========================================================================= def search_memories( self, query: Optional[str] = None, memory_types: Optional[List[str]] = None, tags: Optional[List[str]] = None, min_importance: Optional[float] = None, limit: int = 50, offset: int = 0 ) -> List[Dict[str, Any]]: """ Advanced search with fine-grained control. Args: query: Text search query (optional) memory_types: Filter by memory types tags: Filter by tags (matches any) min_importance: Minimum importance score limit: Maximum results offset: Pagination offset Returns: List of matching memories """ query_terms = query.lower().strip().split() if query else [] with self._connect() as conn: rows = conn.execute( "SELECT id, properties FROM nodes WHERE label = 'Memory' ORDER BY created_at DESC" ).fetchall() results = [] for row in rows: props = json.loads(row["properties"]) # Apply type filter if memory_types and props.get("type") not in memory_types: continue # Apply importance filter if min_importance and props.get("importance", 0) < min_importance: continue # Apply tag filter if tags: mem_tags = set(props.get("tags", [])) if not mem_tags.intersection(set(t.lower() for t in tags)): continue # Apply query filter if query_terms: title = (props.get("title") or "").lower() content = (props.get("content") or "").lower() summary = (props.get("summary") or "").lower() mem_tags_str = " ".join(props.get("tags") or []).lower() searchable = f"{title} {content} {summary} {mem_tags_str}" if not any(term in searchable for term in query_terms): continue content_preview = props.get("content") or "" if len(content_preview) > 200: content_preview = content_preview[:200] + "..." results.append({ "id": props["id"], "type": props.get("type"), "title": props.get("title"), "content": content_preview, "tags": props.get("tags", []), "importance": props.get("importance"), "created_at": props.get("created_at") }) # Apply pagination return results[offset:offset + limit] def update_memory( self, memory_id: str, title: Optional[str] = None, content: Optional[str] = None, summary: Optional[str] = None, tags: Optional[List[str]] = None, importance: Optional[float] = None ) -> bool: """ Update an existing memory. Args: memory_id: UUID of memory to update title, content, summary, tags, importance: Fields to update (None = no change) Returns: True if updated, False if not found """ with self._connect() as conn: row = conn.execute( "SELECT properties FROM nodes WHERE id = ? AND label = 'Memory'", (memory_id,) ).fetchone() if not row: return False props = json.loads(row["properties"]) now = self._now_iso() # Update fields if title is not None: props["title"] = title if content is not None: props["content"] = content if summary is not None: props["summary"] = summary if tags is not None: props["tags"] = [t.lower().strip() for t in tags] if importance is not None: props["importance"] = max(0.0, min(1.0, importance)) props["updated_at"] = now conn.execute( "UPDATE nodes SET properties = ?, updated_at = ? WHERE id = ?", (json.dumps(props), now, memory_id) ) # Note: FTS is an external content table - we use JSON-based search instead conn.commit() return True def delete_memory(self, memory_id: str) -> bool: """ Delete a memory and all its relationships. Args: memory_id: UUID of memory to delete Returns: True if deleted, False if not found """ with self._connect() as conn: # Check exists exists = conn.execute( "SELECT 1 FROM nodes WHERE id = ? AND label = 'Memory'", (memory_id,) ).fetchone() if not exists: return False # Delete relationships (CASCADE should handle but be explicit) conn.execute( "DELETE FROM relationships WHERE from_id = ? OR to_id = ?", (memory_id, memory_id) ) # Delete node (FTS is external content table - no direct deletion needed) conn.execute("DELETE FROM nodes WHERE id = ?", (memory_id,)) conn.commit() return True def get_related_memories( self, memory_id: str, relationship_types: Optional[List[str]] = None, max_depth: int = 1 ) -> List[Dict[str, Any]]: """ Find memories connected to a specific memory. Args: memory_id: UUID of the memory relationship_types: Filter by relationship types max_depth: Traversal depth (1-5) Returns: List of related memories with relationship info """ max_depth = max(1, min(5, max_depth)) with self._connect() as conn: visited = set() results = [] def traverse(mid: str, depth: int): if depth > max_depth or mid in visited: return visited.add(mid) # Get outgoing relationships rows = conn.execute( """ SELECT r.rel_type, r.properties as rel_props, n.id, n.properties FROM relationships r JOIN nodes n ON r.to_id = n.id WHERE r.from_id = ? AND n.label = 'Memory' """, (mid,) ).fetchall() for row in rows: if relationship_types and row["rel_type"] not in relationship_types: continue if row["id"] not in visited: props = json.loads(row["properties"]) rel_props = json.loads(row["rel_props"]) results.append({ "id": props["id"], "type": props.get("type"), "title": props.get("title"), "relationship": row["rel_type"], "direction": "outgoing", "strength": rel_props.get("strength"), "depth": depth }) traverse(row["id"], depth + 1) # Get incoming relationships rows = conn.execute( """ SELECT r.rel_type, r.properties as rel_props, n.id, n.properties FROM relationships r JOIN nodes n ON r.from_id = n.id WHERE r.to_id = ? AND n.label = 'Memory' """, (mid,) ).fetchall() for row in rows: if relationship_types and row["rel_type"] not in relationship_types: continue if row["id"] not in visited: props = json.loads(row["properties"]) rel_props = json.loads(row["rel_props"]) results.append({ "id": props["id"], "type": props.get("type"), "title": props.get("title"), "relationship": row["rel_type"], "direction": "incoming", "strength": rel_props.get("strength"), "depth": depth }) traverse(row["id"], depth + 1) traverse(memory_id, 1) return results # ========================================================================= # LOW PRIORITY OPERATIONS # ========================================================================= def get_memory_statistics(self) -> Dict[str, Any]: """Get database statistics.""" with self._connect() as conn: # Total memories total = conn.execute( "SELECT COUNT(*) as count FROM nodes WHERE label = 'Memory'" ).fetchone()["count"] # Count by type rows = conn.execute( """ SELECT json_extract(properties, '$.type') as type, COUNT(*) as count FROM nodes WHERE label = 'Memory' GROUP BY json_extract(properties, '$.type') """ ).fetchall() by_type = {row["type"]: row["count"] for row in rows} # Relationship count rel_count = conn.execute( "SELECT COUNT(*) as count FROM relationships" ).fetchone()["count"] # DB size db_size = self.db_path.stat().st_size return { "total_memories": total, "by_type": by_type, "total_relationships": rel_count, "database_size_bytes": db_size, "database_path": str(self.db_path) } def get_recent_activity( self, days: int = 7, project: Optional[str] = None ) -> Dict[str, Any]: """Get recent memory activity summary.""" with self._connect() as conn: cutoff = datetime.now(timezone.utc).isoformat()[:10] # Just date part for simplicity rows = conn.execute( """ SELECT id, properties FROM nodes WHERE label = 'Memory' ORDER BY created_at DESC LIMIT 50 """ ).fetchall() recent = [] by_type = {} for row in rows: props = json.loads(row["properties"]) mem_type = props.get("type", "unknown") by_type[mem_type] = by_type.get(mem_type, 0) + 1 recent.append({ "id": props["id"], "type": mem_type, "title": props.get("title"), "created_at": props.get("created_at") }) return { "period_days": days, "summary_by_type": by_type, "recent_memories": recent[:20], "total_in_period": len(recent) } # ========================================================================= # HELPER METHODS # ========================================================================= def _get_relationships_for_memory(self, conn: sqlite3.Connection, memory_id: str) -> Dict[str, List[str]]: """Get relationships grouped by type for a memory.""" rels = {} # Outgoing rows = conn.execute( "SELECT rel_type, to_id FROM relationships WHERE from_id = ?", (memory_id,) ).fetchall() for row in rows: key = f"{row['rel_type']}_to" if key not in rels: rels[key] = [] rels[key].append(row["to_id"]) # Incoming rows = conn.execute( "SELECT rel_type, from_id FROM relationships WHERE to_id = ?", (memory_id,) ).fetchall() for row in rows: key = f"{row['rel_type']}_from" if key not in rels: rels[key] = [] rels[key].append(row["from_id"]) return rels # ============================================================================= # CLI INTERFACE # ============================================================================= def main(): parser = argparse.ArgumentParser( description="MemoryGraph CLI - Direct SQLite interface for memory operations", formatter_class=argparse.RawDescriptionHelpFormatter ) subparsers = parser.add_subparsers(dest="command", help="Commands") # store store_parser = subparsers.add_parser("store", help="Store a new memory") store_parser.add_argument("--type", "-t", required=True, choices=MEMORY_TYPES, help="Memory type") store_parser.add_argument("--title", required=True, help="Memory title") store_parser.add_argument("--content", "-c", required=True, help="Memory content") store_parser.add_argument("--tags", help="Comma-separated tags") store_parser.add_argument("--importance", "-i", type=float, default=0.5, help="Importance 0.0-1.0") store_parser.add_argument("--summary", "-s", help="Brief summary") # recall recall_parser = subparsers.add_parser("recall", help="Search memories with natural language") recall_parser.add_argument("query", help="Search query") recall_parser.add_argument("--types", help="Comma-separated memory types to filter") recall_parser.add_argument("--limit", "-n", type=int, default=10, help="Max results") # get get_parser = subparsers.add_parser("get", help="Get a specific memory by ID") get_parser.add_argument("memory_id", help="Memory UUID") get_parser.add_argument("--no-rels", action="store_true", help="Exclude relationships") # relate relate_parser = subparsers.add_parser("relate", help="Create relationship between memories") relate_parser.add_argument("from_id", help="Source memory UUID") relate_parser.add_argument("to_id", help="Target memory UUID") relate_parser.add_argument("rel_type", choices=RELATIONSHIP_TYPES, help="Relationship type") relate_parser.add_argument("--context", help="Context description") relate_parser.add_argument("--strength", type=float, default=0.5, help="Strength 0.0-1.0") # search search_parser = subparsers.add_parser("search", help="Advanced search with filters") search_parser.add_argument("--query", "-q", help="Text query") search_parser.add_argument("--types", help="Comma-separated memory types") search_parser.add_argument("--tags", help="Comma-separated tags") search_parser.add_argument("--min-importance", type=float, help="Minimum importance") search_parser.add_argument("--limit", "-n", type=int, default=20, help="Max results") # update update_parser = subparsers.add_parser("update", help="Update a memory") update_parser.add_argument("memory_id", help="Memory UUID") update_parser.add_argument("--title", help="New title") update_parser.add_argument("--content", help="New content") update_parser.add_argument("--tags", help="New tags (comma-separated)") update_parser.add_argument("--importance", type=float, help="New importance") # delete delete_parser = subparsers.add_parser("delete", help="Delete a memory") delete_parser.add_argument("memory_id", help="Memory UUID") delete_parser.add_argument("--force", "-f", action="store_true", help="Skip confirmation") # related related_parser = subparsers.add_parser("related", help="Get related memories") related_parser.add_argument("memory_id", help="Memory UUID") related_parser.add_argument("--types", help="Comma-separated relationship types") related_parser.add_argument("--depth", type=int, default=1, help="Traversal depth 1-5") # stats subparsers.add_parser("stats", help="Show database statistics") # recent recent_parser = subparsers.add_parser("recent", help="Show recent activity") recent_parser.add_argument("--days", type=int, default=7, help="Days to look back") args = parser.parse_args() if not args.command: parser.print_help() sys.exit(1) try: client = MemoryGraphClient() except FileNotFoundError as e: print(json.dumps({"error": str(e)})) sys.exit(1) result = None if args.command == "store": tags = [t.strip() for t in args.tags.split(",")] if args.tags else None memory_id = client.store_memory( type=args.type, title=args.title, content=args.content, tags=tags, importance=args.importance, summary=args.summary ) result = {"success": True, "memory_id": memory_id} elif args.command == "recall": types = [t.strip() for t in args.types.split(",")] if args.types else None result = client.recall_memories(args.query, memory_types=types, limit=args.limit) elif args.command == "get": result = client.get_memory(args.memory_id, include_relationships=not args.no_rels) if not result: result = {"error": "Memory not found"} elif args.command == "relate": rel_id = client.create_relationship( args.from_id, args.to_id, args.rel_type, strength=args.strength, context=args.context ) result = {"success": True, "relationship_id": rel_id} elif args.command == "search": types = [t.strip() for t in args.types.split(",")] if args.types else None tags = [t.strip() for t in args.tags.split(",")] if args.tags else None result = client.search_memories( query=args.query, memory_types=types, tags=tags, min_importance=args.min_importance, limit=args.limit ) elif args.command == "update": tags = [t.strip() for t in args.tags.split(",")] if args.tags else None success = client.update_memory( args.memory_id, title=args.title, content=args.content, tags=tags, importance=args.importance ) result = {"success": success} elif args.command == "delete": if not args.force: # Get memory first to show what's being deleted mem = client.get_memory(args.memory_id, include_relationships=False) if mem: print(f"Deleting: {mem.get('title')}", file=sys.stderr) success = client.delete_memory(args.memory_id) result = {"success": success} elif args.command == "related": types = [t.strip() for t in args.types.split(",")] if args.types else None result = client.get_related_memories(args.memory_id, relationship_types=types, max_depth=args.depth) elif args.command == "stats": result = client.get_memory_statistics() elif args.command == "recent": result = client.get_recent_activity(days=args.days) print(json.dumps(result, indent=2, default=str)) if __name__ == "__main__": main()