cognitive-memory/common.py
Cal Corum 48df2a89ce Initial commit: extract cognitive-memory app from skill directory
Moved application code from ~/.claude/skills/cognitive-memory/ to its own
project directory. The skill layer (SKILL.md, SCHEMA.md) remains in the
skill directory for Claude Code to read.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 16:02:28 -06:00

582 lines
17 KiB
Python

"""
Cognitive Memory - Common Constants & Helpers
Module-level constants, YAML parsing, slug generation, decay calculation,
embedding helpers, and cosine similarity. Shared by all other modules.
"""
import json
import math
import os
import re
import urllib.request
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from urllib.error import URLError
# =============================================================================
# CONSTANTS
# =============================================================================
# Data directory resolution order:
# 1. COGNITIVE_MEMORY_DIR env var (explicit override)
# 2. XDG_DATA_HOME/cognitive-memory/ (Linux standard)
# 3. ~/.local/share/cognitive-memory/ (XDG default)
_env_dir = os.environ.get("COGNITIVE_MEMORY_DIR", "")
if _env_dir:
MEMORY_DIR = Path(_env_dir).expanduser()
else:
_xdg_data = os.environ.get("XDG_DATA_HOME", "") or str(
Path.home() / ".local" / "share"
)
MEMORY_DIR = Path(_xdg_data) / "cognitive-memory"
INDEX_PATH = MEMORY_DIR / "_index.json"
STATE_PATH = MEMORY_DIR / "_state.json"
EMBEDDINGS_PATH = MEMORY_DIR / "_embeddings.json"
OLLAMA_URL = "http://localhost:11434"
EMBEDDING_MODEL = "nomic-embed-text"
EMBEDDING_TIMEOUT = 5 # seconds
CONFIG_PATH = MEMORY_DIR / "_config.json"
OPENAI_EMBED_URL = "https://api.openai.com/v1/embeddings"
OPENAI_MODEL_DEFAULT = "text-embedding-3-small"
# Memory type -> directory name mapping
TYPE_DIRS = {
"solution": "solutions",
"fix": "fixes",
"decision": "decisions",
"configuration": "configurations",
"problem": "problems",
"workflow": "workflows",
"code_pattern": "code-patterns",
"error": "errors",
"general": "general",
"procedure": "procedures",
"insight": "insights",
}
VALID_TYPES = list(TYPE_DIRS.keys())
# Decay model type weights
TYPE_WEIGHTS = {
"decision": 1.3,
"solution": 1.2,
"insight": 1.25,
"code_pattern": 1.1,
"configuration": 1.1,
"fix": 1.0,
"workflow": 1.0,
"problem": 0.9,
"error": 0.8,
"general": 0.8,
"procedure": 1.4,
}
DECAY_LAMBDA = 0.03 # Half-life ~23 days
# Decay score thresholds
THRESHOLD_ACTIVE = 0.5
THRESHOLD_FADING = 0.2
THRESHOLD_DORMANT = 0.05
# Relationship types (subset from MemoryGraph, focused on most useful)
VALID_RELATION_TYPES = [
"SOLVES",
"CAUSES",
"BUILDS_ON",
"ALTERNATIVE_TO",
"REQUIRES",
"FOLLOWS",
"RELATED_TO",
]
# Edge file constants
EDGES_DIR_NAME = "edges"
EDGE_FIELD_ORDER = [
"id",
"type",
"from_id",
"from_title",
"to_id",
"to_title",
"strength",
"created",
"updated",
]
# Frontmatter field order for consistent output
FIELD_ORDER = [
"id",
"type",
"title",
"tags",
"importance",
"confidence",
"steps",
"preconditions",
"postconditions",
"created",
"updated",
"relations",
]
# CORE.md token budget (approximate, 1 token ~= 4 chars)
CORE_MAX_CHARS = 12000 # ~3K tokens
GRAPHS_BASE_DIR = MEMORY_DIR.parent / "cognitive-memory-graphs"
# =============================================================================
# YAML FRONTMATTER PARSING (stdlib only)
# =============================================================================
def _needs_quoting(s: str) -> bool:
"""Check if a YAML string value needs quoting."""
if not s:
return True
if any(c in s for c in ":#{}[]&*?|>!%@`"):
return True
try:
float(s)
return True
except ValueError:
pass
if s.lower() in ("true", "false", "null", "yes", "no", "on", "off"):
return True
return False
def _quote_yaml(s: str) -> str:
"""Quote a string for YAML, escaping internal quotes."""
escaped = s.replace("\\", "\\\\").replace('"', '\\"')
return f'"{escaped}"'
def _format_yaml_value(value: Any, force_quote: bool = False) -> str:
"""Format a Python value for YAML output."""
if value is None:
return "null"
if isinstance(value, bool):
return "true" if value else "false"
if isinstance(value, (int, float)):
return str(value)
s = str(value)
if force_quote or _needs_quoting(s):
return _quote_yaml(s)
return s
def _parse_scalar(value: str) -> Any:
"""Parse a YAML scalar value to Python type."""
v = value.strip()
if not v or v == "null":
return None
if v == "true":
return True
if v == "false":
return False
# Try numeric
try:
if "." in v:
return float(v)
return int(v)
except ValueError:
pass
# Strip quotes
if (v.startswith('"') and v.endswith('"')) or (
v.startswith("'") and v.endswith("'")
):
return v[1:-1]
return v
def serialize_frontmatter(data: Dict[str, Any]) -> str:
"""Serialize a dict to YAML frontmatter string (between --- markers)."""
lines = ["---"]
for key in FIELD_ORDER:
if key not in data:
continue
value = data[key]
if key == "tags" and isinstance(value, list):
if value:
items = ", ".join(_format_yaml_value(t) for t in value)
lines.append(f"tags: [{items}]")
else:
lines.append("tags: []")
elif key in ("steps", "preconditions", "postconditions") and isinstance(
value, list
):
if not value:
continue
lines.append(f"{key}:")
for item in value:
lines.append(f" - {_format_yaml_value(str(item), force_quote=True)}")
elif key == "relations" and isinstance(value, list):
if not value:
continue
lines.append("relations:")
for rel in value:
first = True
for rk in [
"target",
"type",
"direction",
"strength",
"context",
"edge_id",
]:
if rk not in rel:
continue
rv = rel[rk]
prefix = " - " if first else " "
force_q = rk in ("context",)
lines.append(
f"{prefix}{rk}: {_format_yaml_value(rv, force_quote=force_q)}"
)
first = False
elif key == "title":
lines.append(f"title: {_format_yaml_value(value, force_quote=True)}")
else:
lines.append(f"{key}: {_format_yaml_value(value)}")
lines.append("---")
return "\n".join(lines)
def parse_frontmatter(text: str) -> Tuple[Dict[str, Any], str]:
"""Parse YAML frontmatter and body from markdown text.
Returns (frontmatter_dict, body_text).
"""
if not text.startswith("---\n"):
return {}, text
# Find closing ---
end_match = re.search(r"\n---\s*\n", text[3:])
if not end_match:
# Try end of string
if text.rstrip().endswith("---"):
end_pos = text.rstrip().rfind("\n---")
if end_pos <= 3:
return {}, text
fm_text = text[4:end_pos]
body = ""
else:
return {}, text
else:
end_pos = end_match.start() + 3 # Offset from text[3:]
fm_text = text[4:end_pos]
body = text[end_pos + end_match.end() - end_match.start() :]
body = body.lstrip("\n")
data = {}
lines = fm_text.split("\n")
i = 0
while i < len(lines):
line = lines[i]
# Skip empty lines
if not line.strip():
i += 1
continue
# Must be a top-level key (no leading whitespace)
if line[0] == " ":
i += 1
continue
if ":" not in line:
i += 1
continue
key, _, rest = line.partition(":")
key = key.strip()
rest = rest.strip()
if not rest:
# Block value - collect indented lines
block_lines = []
j = i + 1
while j < len(lines) and lines[j] and lines[j][0] == " ":
block_lines.append(lines[j])
j += 1
if key == "relations":
data["relations"] = _parse_relations_block(block_lines)
elif block_lines and block_lines[0].strip().startswith("- "):
# Simple list
data[key] = [
_parse_scalar(bl.strip().lstrip("- "))
for bl in block_lines
if bl.strip().startswith("- ")
]
else:
data[key] = None
i = j
continue
# Inline list: [a, b, c]
if rest.startswith("[") and rest.endswith("]"):
inner = rest[1:-1]
if inner.strip():
data[key] = [
_parse_scalar(v.strip()) for v in inner.split(",") if v.strip()
]
else:
data[key] = []
else:
data[key] = _parse_scalar(rest)
i += 1
return data, body
def _parse_relations_block(lines: List[str]) -> List[Dict[str, Any]]:
"""Parse a YAML block list of relation dicts."""
relations = []
current = None
for line in lines:
stripped = line.strip()
if not stripped:
continue
if stripped.startswith("- "):
# New relation entry
current = {}
relations.append(current)
# Parse key:value on same line as -
rest = stripped[2:]
if ":" in rest:
k, _, v = rest.partition(":")
current[k.strip()] = _parse_scalar(v.strip())
elif current is not None and ":" in stripped:
k, _, v = stripped.partition(":")
current[k.strip()] = _parse_scalar(v.strip())
return relations
# =============================================================================
# HELPER FUNCTIONS
# =============================================================================
def slugify(text: str, max_length: int = 60) -> str:
"""Convert text to a URL-friendly slug."""
text = text.lower().strip()
text = re.sub(r"[^\w\s-]", "", text)
text = re.sub(r"[\s_]+", "-", text)
text = re.sub(r"-+", "-", text)
text = text.strip("-")
if len(text) > max_length:
text = text[:max_length].rstrip("-")
return text or "untitled"
def make_filename(title: str, memory_id: str) -> str:
"""Create a filename from title and UUID suffix."""
slug = slugify(title)
suffix = memory_id[:6]
return f"{slug}-{suffix}.md"
def calculate_decay_score(
importance: float, days_since_access: float, access_count: int, type_weight: float
) -> float:
"""Calculate decay score for a memory.
decay_score = importance * e^(-lambda * days) * log2(access_count + 1) * type_weight
"""
time_factor = math.exp(-DECAY_LAMBDA * days_since_access)
usage_factor = math.log2(access_count + 1) if access_count > 0 else 1.0
return importance * time_factor * usage_factor * type_weight
def _ollama_embed(
texts: List[str],
model: str = EMBEDDING_MODEL,
timeout: int = EMBEDDING_TIMEOUT,
) -> Optional[List[List[float]]]:
"""Get embeddings from Ollama for a list of texts.
Returns list of embedding vectors, or None if Ollama is unavailable.
"""
try:
payload = json.dumps({"model": model, "input": texts}).encode("utf-8")
req = urllib.request.Request(
f"{OLLAMA_URL}/api/embed",
data=payload,
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(req, timeout=timeout) as resp:
if resp.status != 200:
return None
data = json.loads(resp.read().decode("utf-8"))
embeddings = data.get("embeddings")
if embeddings and isinstance(embeddings, list):
return embeddings
return None
except (
ConnectionRefusedError,
URLError,
TimeoutError,
OSError,
json.JSONDecodeError,
ValueError,
KeyError,
):
return None
def _load_memory_config(config_path: Optional[Path] = None) -> Dict[str, Any]:
"""Read _config.json, return defaults if missing."""
path = config_path or CONFIG_PATH
defaults = {
"embedding_provider": "ollama",
"openai_api_key": None,
"ollama_model": EMBEDDING_MODEL,
"openai_model": OPENAI_MODEL_DEFAULT,
}
if path.exists():
try:
data = json.loads(path.read_text())
for k, v in defaults.items():
data.setdefault(k, v)
return data
except (json.JSONDecodeError, OSError):
pass
return defaults
def _openai_embed(
texts: List[str],
api_key: str,
model: str = OPENAI_MODEL_DEFAULT,
timeout: int = 30,
) -> Optional[List[List[float]]]:
"""Get embeddings from OpenAI API (stdlib-only, same interface as _ollama_embed)."""
try:
payload = json.dumps({"input": texts, "model": model}).encode("utf-8")
req = urllib.request.Request(
OPENAI_EMBED_URL,
data=payload,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}",
},
method="POST",
)
with urllib.request.urlopen(req, timeout=timeout) as resp:
if resp.status != 200:
return None
data = json.loads(resp.read().decode("utf-8"))
items = data.get("data", [])
if items and isinstance(items, list):
# Sort by index to ensure order matches input
items.sort(key=lambda x: x.get("index", 0))
return [item["embedding"] for item in items]
return None
except (
ConnectionRefusedError,
URLError,
TimeoutError,
OSError,
json.JSONDecodeError,
ValueError,
KeyError,
):
return None
def _cosine_similarity(a: List[float], b: List[float]) -> float:
"""Compute cosine similarity between two vectors."""
dot = sum(x * y for x, y in zip(a, b))
norm_a = math.sqrt(sum(x * x for x in a))
norm_b = math.sqrt(sum(x * x for x in b))
if norm_a == 0.0 or norm_b == 0.0:
return 0.0
return dot / (norm_a * norm_b)
def _make_edge_filename(
from_title: str, rel_type: str, to_title: str, edge_id: str
) -> str:
"""Produce edge filename: {from-slug}--{TYPE}--{to-slug}-{6char}.md"""
from_slug = slugify(from_title, max_length=30)
to_slug = slugify(to_title, max_length=30)
suffix = edge_id[:6]
return f"{from_slug}--{rel_type}--{to_slug}-{suffix}.md"
def serialize_edge_frontmatter(data: Dict[str, Any]) -> str:
"""Serialize an edge dict to YAML frontmatter string."""
lines = ["---"]
for key in EDGE_FIELD_ORDER:
if key not in data:
continue
value = data[key]
if key in ("from_title", "to_title"):
lines.append(f"{key}: {_format_yaml_value(value, force_quote=True)}")
else:
lines.append(f"{key}: {_format_yaml_value(value)}")
lines.append("---")
return "\n".join(lines)
def load_graph_config(config_path: Optional[Path] = None) -> Dict[str, Dict[str, Any]]:
"""Load named graphs config from _config.json 'graphs' key."""
cfg = _load_memory_config(config_path)
return cfg.get("graphs", {})
def resolve_graph_path(
graph_name: Optional[str], config_path: Optional[Path] = None
) -> Path:
"""Resolve graph name to directory path. None/'default' → MEMORY_DIR."""
if not graph_name or graph_name == "default":
return MEMORY_DIR
graphs = load_graph_config(config_path)
if graph_name in graphs:
p = graphs[graph_name].get("path", "")
if p:
return Path(p).expanduser()
# Convention: sibling of MEMORY_DIR
return GRAPHS_BASE_DIR / graph_name
def list_graphs(config_path: Optional[Path] = None) -> List[Dict[str, Any]]:
"""List all known graphs: default + configured + discovered on disk."""
result = [{"name": "default", "path": str(MEMORY_DIR)}]
seen = {"default"}
# From config
graphs = load_graph_config(config_path)
for name, cfg in graphs.items():
if name not in seen:
p = cfg.get("path", "")
path = str(Path(p).expanduser()) if p else str(GRAPHS_BASE_DIR / name)
result.append({"name": name, "path": path})
seen.add(name)
# Discover on disk
if GRAPHS_BASE_DIR.exists():
for d in sorted(GRAPHS_BASE_DIR.iterdir()):
if d.is_dir() and d.name not in seen:
result.append({"name": d.name, "path": str(d)})
seen.add(d.name)
return result