Non-default graphs were second-class citizens — timers only maintained the default graph, git sync ignored named graphs, there was no way to create a graph without editing config manually, cross-graph edge errors were confusing, and utility scripts were hardcoded to the default graph. - Add `graph-create` CLI command + `create_graph()` in common.py, with custom path registration written to the default graph's _config.json - Add `scripts/maintain-all-graphs.sh` to loop decay/core/embed/reflect over all discovered graphs; update systemd services to call it - Refactor `memory-git-sync.sh` into sync_repo() function that iterates default + all named graphs with .git directories - Improve cross-graph edge ValueError to explain the same-graph constraint - Add --graph flag to edge-proposer.py and session_memory.py - Update systemd/README.md with portable paths and new architecture Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
626 lines
18 KiB
Python
626 lines
18 KiB
Python
"""
|
|
Cognitive Memory - Common Constants & Helpers
|
|
|
|
Module-level constants, YAML parsing, slug generation, decay calculation,
|
|
embedding helpers, and cosine similarity. Shared by all other modules.
|
|
"""
|
|
|
|
import json
|
|
import math
|
|
import os
|
|
import re
|
|
import urllib.request
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
from urllib.error import URLError
|
|
|
|
# =============================================================================
|
|
# CONSTANTS
|
|
# =============================================================================
|
|
|
|
# Data directory resolution order:
|
|
# 1. COGNITIVE_MEMORY_DIR env var (explicit override)
|
|
# 2. XDG_DATA_HOME/cognitive-memory/ (Linux standard)
|
|
# 3. ~/.local/share/cognitive-memory/ (XDG default)
|
|
_env_dir = os.environ.get("COGNITIVE_MEMORY_DIR", "")
|
|
if _env_dir:
|
|
MEMORY_DIR = Path(_env_dir).expanduser()
|
|
else:
|
|
_xdg_data = os.environ.get("XDG_DATA_HOME", "") or str(
|
|
Path.home() / ".local" / "share"
|
|
)
|
|
MEMORY_DIR = Path(_xdg_data) / "cognitive-memory"
|
|
|
|
INDEX_PATH = MEMORY_DIR / "_index.json"
|
|
STATE_PATH = MEMORY_DIR / "_state.json"
|
|
EMBEDDINGS_PATH = MEMORY_DIR / "_embeddings.json"
|
|
OLLAMA_URL = "http://localhost:11434"
|
|
EMBEDDING_MODEL = "nomic-embed-text"
|
|
EMBEDDING_TIMEOUT = 5 # seconds
|
|
CONFIG_PATH = MEMORY_DIR / "_config.json"
|
|
OPENAI_EMBED_URL = "https://api.openai.com/v1/embeddings"
|
|
OPENAI_MODEL_DEFAULT = "text-embedding-3-small"
|
|
|
|
# Memory type -> directory name mapping
|
|
TYPE_DIRS = {
|
|
"solution": "solutions",
|
|
"fix": "fixes",
|
|
"decision": "decisions",
|
|
"configuration": "configurations",
|
|
"problem": "problems",
|
|
"workflow": "workflows",
|
|
"code_pattern": "code-patterns",
|
|
"error": "errors",
|
|
"general": "general",
|
|
"procedure": "procedures",
|
|
"insight": "insights",
|
|
}
|
|
|
|
VALID_TYPES = list(TYPE_DIRS.keys())
|
|
|
|
# Decay model type weights
|
|
TYPE_WEIGHTS = {
|
|
"decision": 1.3,
|
|
"solution": 1.2,
|
|
"insight": 1.25,
|
|
"code_pattern": 1.1,
|
|
"configuration": 1.1,
|
|
"fix": 1.0,
|
|
"workflow": 1.0,
|
|
"problem": 0.9,
|
|
"error": 0.8,
|
|
"general": 0.8,
|
|
"procedure": 1.4,
|
|
}
|
|
|
|
DECAY_LAMBDA = 0.03 # Half-life ~23 days
|
|
|
|
# Decay score thresholds
|
|
THRESHOLD_ACTIVE = 0.5
|
|
THRESHOLD_FADING = 0.2
|
|
THRESHOLD_DORMANT = 0.05
|
|
|
|
# Relationship types (subset from MemoryGraph, focused on most useful)
|
|
VALID_RELATION_TYPES = [
|
|
"SOLVES",
|
|
"CAUSES",
|
|
"BUILDS_ON",
|
|
"ALTERNATIVE_TO",
|
|
"REQUIRES",
|
|
"FOLLOWS",
|
|
"RELATED_TO",
|
|
]
|
|
|
|
# Edge file constants
|
|
EDGES_DIR_NAME = "edges"
|
|
EDGE_FIELD_ORDER = [
|
|
"id",
|
|
"type",
|
|
"from_id",
|
|
"from_title",
|
|
"to_id",
|
|
"to_title",
|
|
"strength",
|
|
"created",
|
|
"updated",
|
|
]
|
|
|
|
# Frontmatter field order for consistent output
|
|
FIELD_ORDER = [
|
|
"id",
|
|
"type",
|
|
"title",
|
|
"tags",
|
|
"importance",
|
|
"confidence",
|
|
"steps",
|
|
"preconditions",
|
|
"postconditions",
|
|
"created",
|
|
"updated",
|
|
"relations",
|
|
]
|
|
|
|
# CORE.md token budget (approximate, 1 token ~= 4 chars)
|
|
CORE_MAX_CHARS = 12000 # ~3K tokens
|
|
|
|
GRAPHS_BASE_DIR = MEMORY_DIR.parent / "cognitive-memory-graphs"
|
|
|
|
|
|
# =============================================================================
|
|
# YAML FRONTMATTER PARSING (stdlib only)
|
|
# =============================================================================
|
|
|
|
|
|
def _needs_quoting(s: str) -> bool:
|
|
"""Check if a YAML string value needs quoting."""
|
|
if not s:
|
|
return True
|
|
if any(c in s for c in ":#{}[]&*?|>!%@`"):
|
|
return True
|
|
try:
|
|
float(s)
|
|
return True
|
|
except ValueError:
|
|
pass
|
|
if s.lower() in ("true", "false", "null", "yes", "no", "on", "off"):
|
|
return True
|
|
return False
|
|
|
|
|
|
def _quote_yaml(s: str) -> str:
|
|
"""Quote a string for YAML, escaping internal quotes."""
|
|
escaped = s.replace("\\", "\\\\").replace('"', '\\"')
|
|
return f'"{escaped}"'
|
|
|
|
|
|
def _format_yaml_value(value: Any, force_quote: bool = False) -> str:
|
|
"""Format a Python value for YAML output."""
|
|
if value is None:
|
|
return "null"
|
|
if isinstance(value, bool):
|
|
return "true" if value else "false"
|
|
if isinstance(value, (int, float)):
|
|
return str(value)
|
|
s = str(value)
|
|
if force_quote or _needs_quoting(s):
|
|
return _quote_yaml(s)
|
|
return s
|
|
|
|
|
|
def _parse_scalar(value: str) -> Any:
|
|
"""Parse a YAML scalar value to Python type."""
|
|
v = value.strip()
|
|
if not v or v == "null":
|
|
return None
|
|
if v == "true":
|
|
return True
|
|
if v == "false":
|
|
return False
|
|
# Try numeric
|
|
try:
|
|
if "." in v:
|
|
return float(v)
|
|
return int(v)
|
|
except ValueError:
|
|
pass
|
|
# Strip quotes
|
|
if (v.startswith('"') and v.endswith('"')) or (
|
|
v.startswith("'") and v.endswith("'")
|
|
):
|
|
return v[1:-1]
|
|
return v
|
|
|
|
|
|
def serialize_frontmatter(data: Dict[str, Any]) -> str:
|
|
"""Serialize a dict to YAML frontmatter string (between --- markers)."""
|
|
lines = ["---"]
|
|
|
|
for key in FIELD_ORDER:
|
|
if key not in data:
|
|
continue
|
|
value = data[key]
|
|
|
|
if key == "tags" and isinstance(value, list):
|
|
if value:
|
|
items = ", ".join(_format_yaml_value(t) for t in value)
|
|
lines.append(f"tags: [{items}]")
|
|
else:
|
|
lines.append("tags: []")
|
|
|
|
elif key in ("steps", "preconditions", "postconditions") and isinstance(
|
|
value, list
|
|
):
|
|
if not value:
|
|
continue
|
|
lines.append(f"{key}:")
|
|
for item in value:
|
|
lines.append(f" - {_format_yaml_value(str(item), force_quote=True)}")
|
|
|
|
elif key == "relations" and isinstance(value, list):
|
|
if not value:
|
|
continue
|
|
lines.append("relations:")
|
|
for rel in value:
|
|
first = True
|
|
for rk in [
|
|
"target",
|
|
"type",
|
|
"direction",
|
|
"strength",
|
|
"context",
|
|
"edge_id",
|
|
]:
|
|
if rk not in rel:
|
|
continue
|
|
rv = rel[rk]
|
|
prefix = " - " if first else " "
|
|
force_q = rk in ("context",)
|
|
lines.append(
|
|
f"{prefix}{rk}: {_format_yaml_value(rv, force_quote=force_q)}"
|
|
)
|
|
first = False
|
|
|
|
elif key == "title":
|
|
lines.append(f"title: {_format_yaml_value(value, force_quote=True)}")
|
|
|
|
else:
|
|
lines.append(f"{key}: {_format_yaml_value(value)}")
|
|
|
|
lines.append("---")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def parse_frontmatter(text: str) -> Tuple[Dict[str, Any], str]:
|
|
"""Parse YAML frontmatter and body from markdown text.
|
|
|
|
Returns (frontmatter_dict, body_text).
|
|
"""
|
|
if not text.startswith("---\n"):
|
|
return {}, text
|
|
|
|
# Find closing ---
|
|
end_match = re.search(r"\n---\s*\n", text[3:])
|
|
if not end_match:
|
|
# Try end of string
|
|
if text.rstrip().endswith("---"):
|
|
end_pos = text.rstrip().rfind("\n---")
|
|
if end_pos <= 3:
|
|
return {}, text
|
|
fm_text = text[4:end_pos]
|
|
body = ""
|
|
else:
|
|
return {}, text
|
|
else:
|
|
end_pos = end_match.start() + 3 # Offset from text[3:]
|
|
fm_text = text[4:end_pos]
|
|
body = text[end_pos + end_match.end() - end_match.start() :]
|
|
|
|
body = body.lstrip("\n")
|
|
data = {}
|
|
lines = fm_text.split("\n")
|
|
i = 0
|
|
|
|
while i < len(lines):
|
|
line = lines[i]
|
|
|
|
# Skip empty lines
|
|
if not line.strip():
|
|
i += 1
|
|
continue
|
|
|
|
# Must be a top-level key (no leading whitespace)
|
|
if line[0] == " ":
|
|
i += 1
|
|
continue
|
|
|
|
if ":" not in line:
|
|
i += 1
|
|
continue
|
|
|
|
key, _, rest = line.partition(":")
|
|
key = key.strip()
|
|
rest = rest.strip()
|
|
|
|
if not rest:
|
|
# Block value - collect indented lines
|
|
block_lines = []
|
|
j = i + 1
|
|
while j < len(lines) and lines[j] and lines[j][0] == " ":
|
|
block_lines.append(lines[j])
|
|
j += 1
|
|
|
|
if key == "relations":
|
|
data["relations"] = _parse_relations_block(block_lines)
|
|
elif block_lines and block_lines[0].strip().startswith("- "):
|
|
# Simple list
|
|
data[key] = [
|
|
_parse_scalar(bl.strip().lstrip("- "))
|
|
for bl in block_lines
|
|
if bl.strip().startswith("- ")
|
|
]
|
|
else:
|
|
data[key] = None
|
|
i = j
|
|
continue
|
|
|
|
# Inline list: [a, b, c]
|
|
if rest.startswith("[") and rest.endswith("]"):
|
|
inner = rest[1:-1]
|
|
if inner.strip():
|
|
data[key] = [
|
|
_parse_scalar(v.strip()) for v in inner.split(",") if v.strip()
|
|
]
|
|
else:
|
|
data[key] = []
|
|
else:
|
|
data[key] = _parse_scalar(rest)
|
|
|
|
i += 1
|
|
|
|
return data, body
|
|
|
|
|
|
def _parse_relations_block(lines: List[str]) -> List[Dict[str, Any]]:
|
|
"""Parse a YAML block list of relation dicts."""
|
|
relations = []
|
|
current = None
|
|
|
|
for line in lines:
|
|
stripped = line.strip()
|
|
if not stripped:
|
|
continue
|
|
|
|
if stripped.startswith("- "):
|
|
# New relation entry
|
|
current = {}
|
|
relations.append(current)
|
|
# Parse key:value on same line as -
|
|
rest = stripped[2:]
|
|
if ":" in rest:
|
|
k, _, v = rest.partition(":")
|
|
current[k.strip()] = _parse_scalar(v.strip())
|
|
elif current is not None and ":" in stripped:
|
|
k, _, v = stripped.partition(":")
|
|
current[k.strip()] = _parse_scalar(v.strip())
|
|
|
|
return relations
|
|
|
|
|
|
# =============================================================================
|
|
# HELPER FUNCTIONS
|
|
# =============================================================================
|
|
|
|
|
|
def slugify(text: str, max_length: int = 60) -> str:
|
|
"""Convert text to a URL-friendly slug."""
|
|
text = text.lower().strip()
|
|
text = re.sub(r"[^\w\s-]", "", text)
|
|
text = re.sub(r"[\s_]+", "-", text)
|
|
text = re.sub(r"-+", "-", text)
|
|
text = text.strip("-")
|
|
if len(text) > max_length:
|
|
text = text[:max_length].rstrip("-")
|
|
return text or "untitled"
|
|
|
|
|
|
def make_filename(title: str, memory_id: str) -> str:
|
|
"""Create a filename from title and UUID suffix."""
|
|
slug = slugify(title)
|
|
suffix = memory_id[:6]
|
|
return f"{slug}-{suffix}.md"
|
|
|
|
|
|
def calculate_decay_score(
|
|
importance: float, days_since_access: float, access_count: int, type_weight: float
|
|
) -> float:
|
|
"""Calculate decay score for a memory.
|
|
|
|
decay_score = importance * e^(-lambda * days) * log2(access_count + 1) * type_weight
|
|
"""
|
|
time_factor = math.exp(-DECAY_LAMBDA * days_since_access)
|
|
usage_factor = math.log2(access_count + 1) if access_count > 0 else 1.0
|
|
return importance * time_factor * usage_factor * type_weight
|
|
|
|
|
|
def _ollama_embed(
|
|
texts: List[str],
|
|
model: str = EMBEDDING_MODEL,
|
|
timeout: int = EMBEDDING_TIMEOUT,
|
|
) -> Optional[List[List[float]]]:
|
|
"""Get embeddings from Ollama for a list of texts.
|
|
|
|
Returns list of embedding vectors, or None if Ollama is unavailable.
|
|
"""
|
|
try:
|
|
payload = json.dumps({"model": model, "input": texts}).encode("utf-8")
|
|
req = urllib.request.Request(
|
|
f"{OLLAMA_URL}/api/embed",
|
|
data=payload,
|
|
headers={"Content-Type": "application/json"},
|
|
method="POST",
|
|
)
|
|
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
|
if resp.status != 200:
|
|
return None
|
|
data = json.loads(resp.read().decode("utf-8"))
|
|
embeddings = data.get("embeddings")
|
|
if embeddings and isinstance(embeddings, list):
|
|
return embeddings
|
|
return None
|
|
except (
|
|
ConnectionRefusedError,
|
|
URLError,
|
|
TimeoutError,
|
|
OSError,
|
|
json.JSONDecodeError,
|
|
ValueError,
|
|
KeyError,
|
|
):
|
|
return None
|
|
|
|
|
|
def _load_memory_config(config_path: Optional[Path] = None) -> Dict[str, Any]:
|
|
"""Read _config.json, return defaults if missing."""
|
|
path = config_path or CONFIG_PATH
|
|
defaults = {
|
|
"embedding_provider": "ollama",
|
|
"openai_api_key": None,
|
|
"ollama_model": EMBEDDING_MODEL,
|
|
"openai_model": OPENAI_MODEL_DEFAULT,
|
|
}
|
|
if path.exists():
|
|
try:
|
|
data = json.loads(path.read_text())
|
|
for k, v in defaults.items():
|
|
data.setdefault(k, v)
|
|
return data
|
|
except (json.JSONDecodeError, OSError):
|
|
pass
|
|
return defaults
|
|
|
|
|
|
def _openai_embed(
|
|
texts: List[str],
|
|
api_key: str,
|
|
model: str = OPENAI_MODEL_DEFAULT,
|
|
timeout: int = 30,
|
|
) -> Optional[List[List[float]]]:
|
|
"""Get embeddings from OpenAI API (stdlib-only, same interface as _ollama_embed)."""
|
|
try:
|
|
payload = json.dumps({"input": texts, "model": model}).encode("utf-8")
|
|
req = urllib.request.Request(
|
|
OPENAI_EMBED_URL,
|
|
data=payload,
|
|
headers={
|
|
"Content-Type": "application/json",
|
|
"Authorization": f"Bearer {api_key}",
|
|
},
|
|
method="POST",
|
|
)
|
|
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
|
if resp.status != 200:
|
|
return None
|
|
data = json.loads(resp.read().decode("utf-8"))
|
|
items = data.get("data", [])
|
|
if items and isinstance(items, list):
|
|
# Sort by index to ensure order matches input
|
|
items.sort(key=lambda x: x.get("index", 0))
|
|
return [item["embedding"] for item in items]
|
|
return None
|
|
except (
|
|
ConnectionRefusedError,
|
|
URLError,
|
|
TimeoutError,
|
|
OSError,
|
|
json.JSONDecodeError,
|
|
ValueError,
|
|
KeyError,
|
|
):
|
|
return None
|
|
|
|
|
|
def _cosine_similarity(a: List[float], b: List[float]) -> float:
|
|
"""Compute cosine similarity between two vectors."""
|
|
dot = sum(x * y for x, y in zip(a, b))
|
|
norm_a = math.sqrt(sum(x * x for x in a))
|
|
norm_b = math.sqrt(sum(x * x for x in b))
|
|
if norm_a == 0.0 or norm_b == 0.0:
|
|
return 0.0
|
|
return dot / (norm_a * norm_b)
|
|
|
|
|
|
def _make_edge_filename(
|
|
from_title: str, rel_type: str, to_title: str, edge_id: str
|
|
) -> str:
|
|
"""Produce edge filename: {from-slug}--{TYPE}--{to-slug}-{6char}.md"""
|
|
from_slug = slugify(from_title, max_length=30)
|
|
to_slug = slugify(to_title, max_length=30)
|
|
suffix = edge_id[:6]
|
|
return f"{from_slug}--{rel_type}--{to_slug}-{suffix}.md"
|
|
|
|
|
|
def serialize_edge_frontmatter(data: Dict[str, Any]) -> str:
|
|
"""Serialize an edge dict to YAML frontmatter string."""
|
|
lines = ["---"]
|
|
for key in EDGE_FIELD_ORDER:
|
|
if key not in data:
|
|
continue
|
|
value = data[key]
|
|
if key in ("from_title", "to_title"):
|
|
lines.append(f"{key}: {_format_yaml_value(value, force_quote=True)}")
|
|
else:
|
|
lines.append(f"{key}: {_format_yaml_value(value)}")
|
|
lines.append("---")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def load_graph_config(config_path: Optional[Path] = None) -> Dict[str, Dict[str, Any]]:
|
|
"""Load named graphs config from _config.json 'graphs' key."""
|
|
cfg = _load_memory_config(config_path)
|
|
return cfg.get("graphs", {})
|
|
|
|
|
|
def resolve_graph_path(
|
|
graph_name: Optional[str], config_path: Optional[Path] = None
|
|
) -> Path:
|
|
"""Resolve graph name to directory path. None/'default' → MEMORY_DIR."""
|
|
if not graph_name or graph_name == "default":
|
|
return MEMORY_DIR
|
|
graphs = load_graph_config(config_path)
|
|
if graph_name in graphs:
|
|
p = graphs[graph_name].get("path", "")
|
|
if p:
|
|
return Path(p).expanduser()
|
|
# Convention: sibling of MEMORY_DIR
|
|
return GRAPHS_BASE_DIR / graph_name
|
|
|
|
|
|
def create_graph(name: str, path: Optional[Path] = None) -> Dict[str, Any]:
|
|
"""Create a new named graph directory structure.
|
|
|
|
If path is None, uses the convention path (GRAPHS_BASE_DIR / name) and
|
|
does NOT modify any config file — the convention path is auto-discovered.
|
|
|
|
If a custom path is given, the mapping is written to the default graph's
|
|
_config.json under graphs.<name>.path so resolve_graph_path() can find it.
|
|
|
|
Returns a dict with keys: name, path, created (bool), registered (bool).
|
|
"""
|
|
if path is None:
|
|
graph_path = GRAPHS_BASE_DIR / name
|
|
register = False
|
|
else:
|
|
graph_path = Path(path).expanduser()
|
|
register = True
|
|
|
|
# Track whether this is a new graph or already exists
|
|
already_existed = graph_path.exists()
|
|
|
|
# Create the standard subdirectory layout that CognitiveMemoryClient expects
|
|
for type_dir in TYPE_DIRS.values():
|
|
(graph_path / "graph" / type_dir).mkdir(parents=True, exist_ok=True)
|
|
(graph_path / "graph" / EDGES_DIR_NAME).mkdir(parents=True, exist_ok=True)
|
|
(graph_path / "episodes").mkdir(parents=True, exist_ok=True)
|
|
(graph_path / "vault").mkdir(parents=True, exist_ok=True)
|
|
|
|
# Register custom path in the default graph's _config.json
|
|
if register:
|
|
cfg = _load_memory_config(CONFIG_PATH)
|
|
graphs_section = cfg.setdefault("graphs", {})
|
|
graphs_section[name] = {"path": str(graph_path)}
|
|
CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
CONFIG_PATH.write_text(json.dumps(cfg, indent=2))
|
|
|
|
return {
|
|
"name": name,
|
|
"path": str(graph_path),
|
|
"created": not already_existed,
|
|
"registered": register,
|
|
}
|
|
|
|
|
|
def list_graphs(config_path: Optional[Path] = None) -> List[Dict[str, Any]]:
|
|
"""List all known graphs: default + configured + discovered on disk."""
|
|
result = [{"name": "default", "path": str(MEMORY_DIR)}]
|
|
seen = {"default"}
|
|
|
|
# From config
|
|
graphs = load_graph_config(config_path)
|
|
for name, cfg in graphs.items():
|
|
if name not in seen:
|
|
p = cfg.get("path", "")
|
|
path = str(Path(p).expanduser()) if p else str(GRAPHS_BASE_DIR / name)
|
|
result.append({"name": name, "path": path})
|
|
seen.add(name)
|
|
|
|
# Discover on disk
|
|
if GRAPHS_BASE_DIR.exists():
|
|
for d in sorted(GRAPHS_BASE_DIR.iterdir()):
|
|
if d.is_dir() and d.name not in seen:
|
|
result.append({"name": d.name, "path": str(d)})
|
|
seen.add(d.name)
|
|
|
|
return result
|