#!/usr/bin/env python3 """ Cognitive Memory Client Markdown-based memory system with decay scoring, episodic logging, and auto-curated CORE.md. Stores memories as human-readable markdown files with YAML frontmatter. Usage: # CLI python client.py store --type solution --title "Fixed X" --content "Details..." --tags "python,fix" python client.py recall "timeout error" python client.py get python client.py relate SOLVES python client.py core python client.py episode --type fix --title "Fixed reconnection" --tags "discord,python" # Python from client import CognitiveMemoryClient client = CognitiveMemoryClient() memory_id = client.store(type="solution", title="...", content="...") """ import json import os import subprocess import sys import tempfile import uuid from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional, Tuple # Import everything from common for backward compatibility from common import ( CORE_MAX_CHARS, CONFIG_PATH, DECAY_LAMBDA, EDGE_FIELD_ORDER, EDGES_DIR_NAME, EMBEDDING_MODEL, EMBEDDING_TIMEOUT, EMBEDDINGS_PATH, FIELD_ORDER, INDEX_PATH, MEMORY_DIR, OLLAMA_URL, OPENAI_EMBED_URL, OPENAI_MODEL_DEFAULT, STATE_PATH, THRESHOLD_ACTIVE, THRESHOLD_DORMANT, THRESHOLD_FADING, TYPE_DIRS, TYPE_WEIGHTS, VALID_RELATION_TYPES, VALID_TYPES, _cosine_similarity, _format_yaml_value, _load_memory_config, _make_edge_filename, _needs_quoting, _ollama_embed, _openai_embed, _parse_relations_block, _parse_scalar, _quote_yaml, calculate_decay_score, make_filename, parse_frontmatter, serialize_edge_frontmatter, serialize_frontmatter, slugify, ) # Import mixins from edges import EdgesMixin from embeddings import EmbeddingsMixin from analysis import AnalysisMixin # ============================================================================= # CLIENT # ============================================================================= class CognitiveMemoryClient(EdgesMixin, EmbeddingsMixin, AnalysisMixin): """Client for markdown-based cognitive memory system.""" def __init__(self, memory_dir: Optional[Path] = None): self.memory_dir = memory_dir or MEMORY_DIR self.index_path = self.memory_dir / "_index.json" self.state_path = self.memory_dir / "_state.json" self._embeddings_cache: Optional[Dict] = None self._embeddings_mtime: float = 0.0 self._ensure_dirs() def _ensure_dirs(self): """Create directory structure if needed.""" for type_dir in TYPE_DIRS.values(): (self.memory_dir / "graph" / type_dir).mkdir(parents=True, exist_ok=True) (self.memory_dir / "graph" / EDGES_DIR_NAME).mkdir(parents=True, exist_ok=True) (self.memory_dir / "episodes").mkdir(parents=True, exist_ok=True) (self.memory_dir / "vault").mkdir(parents=True, exist_ok=True) def _load_embeddings_cached(self) -> Optional[Dict]: """Load _embeddings.json with mtime-based caching. Returns the parsed dict, or None if the file doesn't exist or fails to parse. Only re-reads from disk when the file's mtime has changed. """ embeddings_path = self.memory_dir / "_embeddings.json" if not embeddings_path.exists(): return None try: current_mtime = embeddings_path.stat().st_mtime except OSError: return None if ( self._embeddings_cache is not None and current_mtime == self._embeddings_mtime ): return self._embeddings_cache try: data = json.loads(embeddings_path.read_text()) self._embeddings_cache = data self._embeddings_mtime = current_mtime return data except (json.JSONDecodeError, OSError): return None # ------------------------------------------------------------------------- # Index and State management # ------------------------------------------------------------------------- def _load_index(self) -> Dict: """Load _index.json, return empty structure if missing.""" if self.index_path.exists(): try: return json.loads(self.index_path.read_text()) except (json.JSONDecodeError, OSError): pass return {"version": 2, "updated": "", "count": 0, "entries": {}, "edges": {}} def _save_index(self, index: Dict): """Write _index.json.""" index["version"] = 2 index.setdefault("edges", {}) index["updated"] = datetime.now(timezone.utc).isoformat() index["count"] = len(index.get("entries", {})) self.index_path.write_text(json.dumps(index, indent=2, default=str)) def _load_state(self) -> Dict: """Load _state.json, return empty structure if missing.""" if self.state_path.exists(): try: return json.loads(self.state_path.read_text()) except (json.JSONDecodeError, OSError): pass return {"version": 1, "updated": "", "entries": {}} def _save_state(self, state: Dict): """Write _state.json atomically, merging top-level keys to prevent race conditions.""" # Merge with existing state to preserve keys written by concurrent processes if self.state_path.exists(): try: existing = json.loads(self.state_path.read_text()) existing.update(state) state = existing except (json.JSONDecodeError, OSError): pass state["updated"] = datetime.now(timezone.utc).isoformat() # Atomic write: write to temp file then rename fd, tmp_path = tempfile.mkstemp(dir=self.memory_dir, suffix=".tmp") try: with os.fdopen(fd, "w") as f: json.dump(state, f, indent=2, default=str) os.replace(tmp_path, self.state_path) except Exception: try: os.unlink(tmp_path) except OSError: pass raise # ------------------------------------------------------------------------- # File I/O # ------------------------------------------------------------------------- def _read_memory_file(self, path: Path) -> Tuple[Dict[str, Any], str]: """Read a memory markdown file, return (frontmatter, body).""" text = path.read_text(encoding="utf-8") return parse_frontmatter(text) def _write_memory_file(self, path: Path, frontmatter: Dict[str, Any], body: str): """Write a memory markdown file with frontmatter and body.""" fm_str = serialize_frontmatter(frontmatter) content = f"{fm_str}\n\n{body.strip()}\n" if body.strip() else f"{fm_str}\n" path.write_text(content, encoding="utf-8") @staticmethod def _resolve_prefix(partial_id: str, keys) -> Optional[str]: """Resolve a partial UUID prefix to a full ID (git-style). Returns the full key if exactly one match is found, None if zero matches, or raises ValueError on ambiguous (multiple) matches. """ if partial_id in keys: return partial_id matches = [k for k in keys if k.startswith(partial_id)] if len(matches) == 1: return matches[0] if len(matches) > 1: raise ValueError( f"Ambiguous ID prefix '{partial_id}' matches {len(matches)} " f"entries: {', '.join(sorted(matches)[:5])}" ) return None def _resolve_memory_path(self, memory_id: str) -> Optional[Path]: """Find the file path for a memory by ID using the index.""" index = self._load_index() entries = index.get("entries", {}) full_id = self._resolve_prefix(memory_id, entries) if full_id: entry = entries[full_id] path = self.memory_dir / entry["path"] if path.exists(): return path # Fallback: scan files (slow but reliable) return self._scan_for_memory(memory_id) def _scan_for_memory(self, memory_id: str) -> Optional[Path]: """Scan graph/ and vault/ directories for a memory file by ID.""" search_dirs = [self.memory_dir / "graph", self.memory_dir / "vault"] for search_dir in search_dirs: if not search_dir.exists(): continue for md_file in search_dir.rglob("*.md"): try: fm, _ = self._read_memory_file(md_file) if fm.get("id") == memory_id: return md_file except Exception: continue return None # ------------------------------------------------------------------------- # Git operations # ------------------------------------------------------------------------- def _git_commit(self, message: str, files: Optional[List[Path]] = None): """Stage and commit files in the memory repo.""" try: # Check if it's a git repo result = subprocess.run( ["git", "rev-parse", "--git-dir"], cwd=str(self.memory_dir), capture_output=True, timeout=5, ) if result.returncode != 0: return # Not a git repo, skip if files: for f in files: rel = f.relative_to(self.memory_dir) subprocess.run( ["git", "add", str(rel)], cwd=str(self.memory_dir), capture_output=True, timeout=5, ) else: subprocess.run( ["git", "add", "-A"], cwd=str(self.memory_dir), capture_output=True, timeout=5, ) subprocess.run( ["git", "commit", "-m", message, "--allow-empty"], cwd=str(self.memory_dir), capture_output=True, timeout=10, ) except (subprocess.TimeoutExpired, FileNotFoundError, OSError): pass # Git operations are best-effort # ------------------------------------------------------------------------- # Index helpers # ------------------------------------------------------------------------- def _update_index_entry( self, memory_id: str, frontmatter: Dict[str, Any], rel_path: str, content_preview: str = "", ): """Add or update an entry in the index.""" index = self._load_index() entry = { "title": frontmatter.get("title", ""), "type": frontmatter.get("type", "general"), "tags": frontmatter.get("tags", []), "importance": frontmatter.get("importance", 0.5), "confidence": frontmatter.get("confidence", 0.8), "created": frontmatter.get("created", ""), "updated": frontmatter.get("updated", ""), "path": rel_path, "relations": frontmatter.get("relations", []), "content_preview": content_preview, } # Preserve existing content_preview if not provided if not content_preview: existing = index.get("entries", {}).get(memory_id, {}) entry["content_preview"] = existing.get("content_preview", "") index.setdefault("entries", {})[memory_id] = entry self._save_index(index) def _remove_index_entry(self, memory_id: str): """Remove an entry from the index.""" index = self._load_index() index.get("entries", {}).pop(memory_id, None) self._save_index(index) def _maybe_refresh_decay(self): """Auto-refresh all decay scores if state is older than 24 hours.""" if not self.state_path.exists(): self.decay() return try: state = json.loads(self.state_path.read_text()) updated_str = state.get("updated", "") if not updated_str: self.decay() return updated_dt = datetime.fromisoformat(updated_str.replace("Z", "+00:00")) if updated_dt.tzinfo is None: updated_dt = updated_dt.replace(tzinfo=timezone.utc) age_hours = (datetime.now(timezone.utc) - updated_dt).total_seconds() / 3600 if age_hours > 24: self.decay() except (json.JSONDecodeError, ValueError, OSError): self.decay() # ------------------------------------------------------------------------- # Edge index helpers # ------------------------------------------------------------------------- def _update_edge_index( self, edge_id: str, edge_data: Dict[str, Any], rel_path: str ): """Add or update an edge entry in the index.""" index = self._load_index() index.setdefault("edges", {})[edge_id] = { "type": edge_data.get("type", ""), "from_id": edge_data.get("from_id", ""), "from_title": edge_data.get("from_title", ""), "to_id": edge_data.get("to_id", ""), "to_title": edge_data.get("to_title", ""), "strength": edge_data.get("strength", 0.8), "created": edge_data.get("created", ""), "updated": edge_data.get("updated", ""), "path": rel_path, } self._save_index(index) def _remove_edge_index(self, edge_id: str): """Remove an edge entry from the index.""" index = self._load_index() index.get("edges", {}).pop(edge_id, None) self._save_index(index) def _scan_for_edge(self, edge_id: str) -> Optional[Path]: """Fallback file scan for an edge by ID if index is stale.""" edges_dir = self.memory_dir / "graph" / EDGES_DIR_NAME if not edges_dir.exists(): return None for md_file in edges_dir.glob("*.md"): try: fm, _ = self._read_memory_file(md_file) if fm.get("id") == edge_id: return md_file except Exception: continue return None def _resolve_edge_path(self, edge_id: str) -> Optional[Path]: """Find the file path for an edge by ID using the index.""" index = self._load_index() edges = index.get("edges", {}) full_id = self._resolve_prefix(edge_id, edges) if full_id: entry = edges[full_id] path = self.memory_dir / entry["path"] if path.exists(): return path return self._scan_for_edge(edge_id) # ========================================================================= # PUBLIC API # ========================================================================= def store( self, type: str, title: str, content: str, tags: Optional[List[str]] = None, importance: float = 0.5, confidence: float = 0.8, steps: Optional[List[str]] = None, preconditions: Optional[List[str]] = None, postconditions: Optional[List[str]] = None, ) -> str: """Store a new memory as a markdown file. Returns the memory UUID. """ if type not in VALID_TYPES: raise ValueError(f"Invalid type: {type}. Valid: {VALID_TYPES}") memory_id = str(uuid.uuid4()) now = datetime.now(timezone.utc).isoformat() tags = [t.lower().strip() for t in (tags or [])] frontmatter = { "id": memory_id, "type": type, "title": title, "tags": tags, "importance": max(0.0, min(1.0, importance)), "confidence": max(0.0, min(1.0, confidence)), "created": now, "updated": now, } # Add procedure-specific fields when type is "procedure" if type == "procedure": if steps: frontmatter["steps"] = steps if preconditions: frontmatter["preconditions"] = preconditions if postconditions: frontmatter["postconditions"] = postconditions # Determine file path type_dir = TYPE_DIRS[type] filename = make_filename(title, memory_id) rel_path = f"graph/{type_dir}/{filename}" full_path = self.memory_dir / rel_path # Write file self._write_memory_file(full_path, frontmatter, content) # Update index with content preview (truncate at word boundary) preview = content.strip()[:200] if len(content.strip()) > 200: last_space = preview.rfind(" ") if last_space > 0: preview = preview[:last_space] self._update_index_entry( memory_id, frontmatter, rel_path, content_preview=preview ) # Init state entry state = self._load_state() state.setdefault("entries", {})[memory_id] = { "access_count": 0, "last_accessed": now, "decay_score": importance * TYPE_WEIGHTS.get(type, 1.0) * 0.5, } self._save_state(state) # Git commit self._git_commit(f"store: {title}", [full_path]) return memory_id def recall( self, query: str, memory_types: Optional[List[str]] = None, limit: int = 10, semantic: bool = True, ) -> List[Dict[str, Any]]: """Search memories by query, ranked by relevance and decay score. When semantic=True and embeddings exist, merges keyword and semantic results. """ self._maybe_refresh_decay() query_lower = query.lower().strip() terms = query_lower.split() if not terms: return [] index = self._load_index() state = self._load_state() results = [] for mid, entry in index.get("entries", {}).items(): # Filter by type if memory_types and entry.get("type") not in memory_types: continue # Check decay threshold - skip archived s = state.get("entries", {}).get(mid, {}) decay = s.get("decay_score", 0.5) if decay < THRESHOLD_DORMANT: continue # Score based on term matches title = (entry.get("title") or "").lower() tags_str = " ".join(entry.get("tags") or []).lower() title_matches = sum(1 for t in terms if t in title) * 3 tag_matches = sum(1 for t in terms if t in tags_str) * 2 score = title_matches + tag_matches if score == 0: # Check content_preview from index (no file I/O needed) preview = (entry.get("content_preview") or "").lower() preview_matches = sum(1 for t in terms if t in preview) if preview_matches > 0: score = preview_matches else: continue # Weight by decay score weighted_score = score * (1 + decay) results.append( { "id": mid, "type": entry.get("type"), "title": entry.get("title"), "tags": entry.get("tags", []), "importance": entry.get("importance"), "decay_score": round(decay, 3), "path": entry.get("path"), "created": entry.get("created"), "_score": weighted_score, } ) results.sort(key=lambda x: x.pop("_score", 0), reverse=True) keyword_results = results[:limit] # Merge with semantic results (on by default) # Weights: semantic 60%, keyword 40% # Conceptual matching dominates; keyword acts as precision boost for exact terms if semantic: sem_results = self.semantic_recall(query, limit=limit) if sem_results: score_map: Dict[str, float] = {} result_map: Dict[str, Dict] = {} # Keyword: normalize rank to 0-1 (rank 1 = 1.0, last = ~0.1) kw_weight = 0.4 for i, r in enumerate(keyword_results): mid = r["id"] normalized = (limit - i) / limit score_map[mid] = normalized * kw_weight result_map[mid] = r # Semantic: similarity is already 0-1 sem_weight = 0.6 for r in sem_results: mid = r["id"] sim = r.get("similarity", 0.0) sem_score = sim * sem_weight if mid in score_map: score_map[mid] += sem_score result_map[mid]["similarity"] = sim else: score_map[mid] = sem_score idx_entry = index.get("entries", {}).get(mid, {}) s = state.get("entries", {}).get(mid, {}) result_map[mid] = { "id": mid, "type": r.get("type"), "title": r.get("title"), "tags": r.get("tags", []), "importance": idx_entry.get("importance"), "decay_score": round(s.get("decay_score", 0.5), 3), "similarity": sim, "path": r.get("path"), "created": idx_entry.get("created"), } # Sort by merged score merged = sorted( result_map.values(), key=lambda x: score_map.get(x["id"], 0), reverse=True, ) return merged[:limit] return keyword_results def get(self, memory_id: str) -> Optional[Dict[str, Any]]: """Get a memory by ID, update access count.""" path = self._resolve_memory_path(memory_id) if not path: return None fm, body = self._read_memory_file(path) # Update access count in state state = self._load_state() now = datetime.now(timezone.utc).isoformat() entry = state.setdefault("entries", {}).setdefault( memory_id, { "access_count": 0, "last_accessed": now, "decay_score": 0.5, }, ) entry["access_count"] = entry.get("access_count", 0) + 1 entry["last_accessed"] = now # Recalculate decay score for this single memory (just accessed, so days=0) importance = fm.get("importance", 0.5) mem_type = fm.get("type", "general") type_weight = TYPE_WEIGHTS.get(mem_type, 1.0) entry["decay_score"] = round( calculate_decay_score(importance, 0, entry["access_count"], type_weight), 4 ) self._save_state(state) return { "id": fm.get("id", memory_id), "type": fm.get("type"), "title": fm.get("title"), "content": body, "tags": fm.get("tags", []), "importance": fm.get("importance"), "confidence": fm.get("confidence"), "created": fm.get("created"), "updated": fm.get("updated"), "relations": fm.get("relations", []), "steps": fm.get("steps", []), "preconditions": fm.get("preconditions", []), "postconditions": fm.get("postconditions", []), "access_count": entry["access_count"], "decay_score": entry.get("decay_score", 0.5), "path": str(path.relative_to(self.memory_dir)), } def search( self, query: Optional[str] = None, memory_types: Optional[List[str]] = None, tags: Optional[List[str]] = None, min_importance: Optional[float] = None, limit: int = 20, ) -> List[Dict[str, Any]]: """Filter memories by type, tags, importance. Optional text query.""" self._maybe_refresh_decay() index = self._load_index() state = self._load_state() query_terms = query.lower().strip().split() if query else [] filter_tags = set(t.lower() for t in tags) if tags else None results = [] for mid, entry in index.get("entries", {}).items(): # Type filter if memory_types and entry.get("type") not in memory_types: continue # Importance filter if min_importance and entry.get("importance", 0) < min_importance: continue # Tag filter if filter_tags: mem_tags = set(t.lower() for t in entry.get("tags", [])) if not mem_tags.intersection(filter_tags): continue # Query filter if query_terms: title = (entry.get("title") or "").lower() tags_str = " ".join(entry.get("tags") or []).lower() searchable = f"{title} {tags_str}" if not any(t in searchable for t in query_terms): # Check content_preview from index (no file I/O needed) preview = (entry.get("content_preview") or "").lower() if not any(t in preview for t in query_terms): continue s = state.get("entries", {}).get(mid, {}) results.append( { "id": mid, "type": entry.get("type"), "title": entry.get("title"), "tags": entry.get("tags", []), "importance": entry.get("importance"), "decay_score": round(s.get("decay_score", 0.5), 3), "path": entry.get("path"), "created": entry.get("created"), } ) # Sort by importance descending results.sort(key=lambda x: x.get("importance", 0), reverse=True) return results[:limit] def update( self, memory_id: str, title: Optional[str] = None, content: Optional[str] = None, tags: Optional[List[str]] = None, importance: Optional[float] = None, ) -> bool: """Update fields of an existing memory.""" path = self._resolve_memory_path(memory_id) if not path: return False fm, body = self._read_memory_file(path) now = datetime.now(timezone.utc).isoformat() if title is not None: fm["title"] = title if tags is not None: fm["tags"] = [t.lower().strip() for t in tags] if importance is not None: fm["importance"] = max(0.0, min(1.0, importance)) if content is not None: body = content fm["updated"] = now self._write_memory_file(path, fm, body) # Update index (refresh content_preview if content changed) rel_path = str(path.relative_to(self.memory_dir)) preview = "" if content is not None: preview = body.strip()[:200] if len(body.strip()) > 200: last_space = preview.rfind(" ") if last_space > 0: preview = preview[:last_space] self._update_index_entry(memory_id, fm, rel_path, content_preview=preview) self._git_commit(f"update: {fm.get('title', memory_id[:8])}", [path]) return True def delete(self, memory_id: str) -> bool: """Delete a memory file, cascade-delete edges, remove from index.""" path = self._resolve_memory_path(memory_id) if not path: return False fm, _ = self._read_memory_file(path) title = fm.get("title", memory_id[:8]) # Cascade-delete edges where from_id or to_id matches index = self._load_index() edges_to_delete = [] for eid, edata in index.get("edges", {}).items(): if edata.get("from_id") == memory_id or edata.get("to_id") == memory_id: edges_to_delete.append(eid) for eid in edges_to_delete: self.edge_delete(eid) # Remove file path.unlink() # Remove from index and state self._remove_index_entry(memory_id) state = self._load_state() state.get("entries", {}).pop(memory_id, None) self._save_state(state) # Remove incoming references from other memories index = self._load_index() for mid, entry in index.get("entries", {}).items(): rels = entry.get("relations", []) original_count = len(rels) rels = [r for r in rels if r.get("target") != memory_id] if len(rels) != original_count: entry["relations"] = rels other_path = self._resolve_memory_path(mid) if other_path: try: other_fm, other_body = self._read_memory_file(other_path) other_fm["relations"] = [ r for r in other_fm.get("relations", []) if r.get("target") != memory_id ] self._write_memory_file(other_path, other_fm, other_body) except Exception: pass self._save_index(index) # Git: stage deletion try: rel_path = path.relative_to(self.memory_dir) subprocess.run( ["git", "rm", "--cached", str(rel_path)], cwd=str(self.memory_dir), capture_output=True, timeout=5, ) except Exception: pass self._git_commit(f"delete: {title}") return True def related( self, memory_id: str, rel_types: Optional[List[str]] = None, max_depth: int = 1, ) -> List[Dict[str, Any]]: """Traverse relations from a memory, depth-limited BFS.""" max_depth = max(1, min(5, max_depth)) index = self._load_index() visited = set() results = [] # Resolve partial ID prefix to full UUID entries = index.get("entries", {}) resolved = self._resolve_prefix(memory_id, entries) if resolved: memory_id = resolved def traverse(mid: str, depth: int): if depth > max_depth or mid in visited: return visited.add(mid) entry = entries.get(mid) if not entry: return for rel in entry.get("relations", []): target_id = rel.get("target") if not target_id or target_id in visited: continue if rel_types and rel.get("type") not in rel_types: continue target_entry = index.get("entries", {}).get(target_id) if target_entry: results.append( { "id": target_id, "type": target_entry.get("type"), "title": target_entry.get("title"), "relationship": rel.get("type"), "direction": rel.get("direction", "outgoing"), "strength": rel.get("strength"), "depth": depth, } ) traverse(target_id, depth + 1) traverse(memory_id, 1) return results def stats(self) -> Dict[str, Any]: """Get statistics about the memory system.""" index = self._load_index() state = self._load_state() entries = index.get("entries", {}) by_type = {} total_relations = 0 for entry in entries.values(): t = entry.get("type", "unknown") by_type[t] = by_type.get(t, 0) + 1 total_relations += len(entry.get("relations", [])) # Count files per directory dir_counts = {} for type_dir in TYPE_DIRS.values(): d = self.memory_dir / "graph" / type_dir if d.exists(): dir_counts[type_dir] = len(list(d.glob("*.md"))) vault_count = ( len(list((self.memory_dir / "vault").glob("*.md"))) if (self.memory_dir / "vault").exists() else 0 ) # Decay stats state_entries = state.get("entries", {}) active = sum( 1 for s in state_entries.values() if s.get("decay_score", 0) >= THRESHOLD_ACTIVE ) fading = sum( 1 for s in state_entries.values() if THRESHOLD_FADING <= s.get("decay_score", 0) < THRESHOLD_ACTIVE ) dormant = sum( 1 for s in state_entries.values() if THRESHOLD_DORMANT <= s.get("decay_score", 0) < THRESHOLD_FADING ) archived = sum( 1 for s in state_entries.values() if s.get("decay_score", 0) < THRESHOLD_DORMANT ) # Unique outgoing relations only (avoid double-counting) unique_relations = total_relations // 2 if total_relations > 0 else 0 return { "total_memories": len(entries), "by_type": by_type, "dir_counts": dir_counts, "vault_count": vault_count, "total_relations": unique_relations, "decay_summary": { "active": active, "fading": fading, "dormant": dormant, "archived": archived, }, "memory_dir": str(self.memory_dir), } def recent(self, limit: int = 20) -> List[Dict[str, Any]]: """Get most recently created memories.""" index = self._load_index() entries = [ {"id": mid, **entry} for mid, entry in index.get("entries", {}).items() ] entries.sort(key=lambda x: x.get("created", ""), reverse=True) return entries[:limit] def episode( self, type: str, title: str, tags: Optional[List[str]] = None, summary: Optional[str] = None, memory_link: Optional[str] = None, ): """Append an entry to today's episode file.""" now_local = datetime.now().astimezone() today = now_local.strftime("%Y-%m-%d") time_str = now_local.strftime("%H:%M") ep_path = self.memory_dir / "episodes" / f"{today}.md" tags = tags or [] tags_str = ", ".join(tags) entry_lines = [ f"## {time_str} - {title}", f"- **Type:** {type}", ] if tags_str: entry_lines.append(f"- **Tags:** {tags_str}") if memory_link: # Extract name from link path name = Path(memory_link).stem entry_lines.append(f"- **Memory:** [{name}]({memory_link})") if summary: entry_lines.append(f"- **Summary:** {summary}") entry_text = "\n".join(entry_lines) if ep_path.exists(): existing = ep_path.read_text(encoding="utf-8") new_content = f"{existing.rstrip()}\n\n{entry_text}\n" else: new_content = f"# {today}\n\n{entry_text}\n" ep_path.write_text(new_content, encoding="utf-8") self._git_commit(f"episode: {title}", [ep_path]) def reindex(self) -> int: """Rebuild _index.json from all markdown files. Recovery command.""" index = {"version": 2, "updated": "", "count": 0, "entries": {}, "edges": {}} count = 0 search_dirs = [ ("graph", self.memory_dir / "graph"), ("vault", self.memory_dir / "vault"), ] edges_dir = self.memory_dir / "graph" / EDGES_DIR_NAME for _prefix, search_dir in search_dirs: if not search_dir.exists(): continue for md_file in search_dir.rglob("*.md"): # Skip edge files — handled separately if edges_dir.exists() and md_file.parent == edges_dir: continue try: fm, body = self._read_memory_file(md_file) mid = fm.get("id") if not mid: continue rel_path = str(md_file.relative_to(self.memory_dir)) preview = body.strip()[:200] if len(body.strip()) > 200: last_space = preview.rfind(" ") if last_space > 0: preview = preview[:last_space] index["entries"][mid] = { "title": fm.get("title", ""), "type": fm.get("type", "general"), "tags": fm.get("tags", []), "importance": fm.get("importance", 0.5), "confidence": fm.get("confidence", 0.8), "created": fm.get("created", ""), "updated": fm.get("updated", ""), "path": rel_path, "relations": fm.get("relations", []), "content_preview": preview, } count += 1 except Exception as e: print(f"Warning: Failed to index {md_file}: {e}", file=sys.stderr) # Scan edge files if edges_dir.exists(): for md_file in edges_dir.glob("*.md"): try: fm, _ = self._read_memory_file(md_file) eid = fm.get("id") if not eid: continue rel_path = str(md_file.relative_to(self.memory_dir)) index["edges"][eid] = { "type": fm.get("type", ""), "from_id": fm.get("from_id", ""), "from_title": fm.get("from_title", ""), "to_id": fm.get("to_id", ""), "to_title": fm.get("to_title", ""), "strength": fm.get("strength", 0.8), "created": fm.get("created", ""), "updated": fm.get("updated", ""), "path": rel_path, } except Exception as e: print( f"Warning: Failed to index edge {md_file}: {e}", file=sys.stderr ) self._save_index(index) return count def pin(self, memory_id: str) -> bool: """Move a memory to vault/ (never decays).""" path = self._resolve_memory_path(memory_id) if not path: return False fm, body = self._read_memory_file(path) vault_dir = self.memory_dir / "vault" vault_dir.mkdir(parents=True, exist_ok=True) new_path = vault_dir / path.name self._write_memory_file(new_path, fm, body) # Remove old file old_rel = str(path.relative_to(self.memory_dir)) path.unlink() # Update index with new path new_rel = str(new_path.relative_to(self.memory_dir)) self._update_index_entry(memory_id, fm, new_rel) # Set decay to infinity in state state = self._load_state() state.setdefault("entries", {}).setdefault(memory_id, {})["decay_score"] = 999.0 self._save_state(state) self._git_commit(f"pin: {fm.get('title', memory_id[:8])}", [new_path]) return True if __name__ == "__main__": from cli import main main()