feat: initial commit — voice/text memory capture with kanban board

PySide6 app for capturing quick memories via voice or text, organized
on a kanban board (On Docket → In Progress → Complete). Complete column
is collapsible with 7d/30d/All date filters.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Cal Corum 2026-02-11 13:55:21 -06:00
commit 195cd443cb
16 changed files with 1711 additions and 0 deletions

13
.gitignore vendored Normal file
View File

@ -0,0 +1,13 @@
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info
# Virtual environments
.venv
# uv lock file
uv.lock

1
.python-version Normal file
View File

@ -0,0 +1 @@
3.13

0
README.md Normal file
View File

10
my-memory.desktop Normal file
View File

@ -0,0 +1,10 @@
[Desktop Entry]
Type=Application
Name=My Memory
Comment=Low-friction thought capture (text + voice)
Exec=my-memory
Icon=document-edit
Terminal=false
Categories=Utility;
StartupNotify=false
X-GNOME-Autostart-enabled=true

27
pyproject.toml Normal file
View File

@ -0,0 +1,27 @@
[project]
name = "my-memory"
version = "0.1.0"
description = "Low-friction capture app for thoughts, text, and voice"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
"PySide6>=6.7",
"sounddevice>=0.5",
"soundfile>=0.12",
"faster-whisper>=1.1",
"pydantic>=2.0",
"python-frontmatter>=1.1",
]
[project.scripts]
my-memory = "my_memory.__main__:main"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/my_memory"]
[tool.pytest.ini_options]
testpaths = ["tests"]

View File

@ -0,0 +1,3 @@
"""My-Memory: Low-friction capture app for thoughts, text, and voice."""
__version__ = "0.1.0"

106
src/my_memory/__main__.py Normal file
View File

@ -0,0 +1,106 @@
"""CLI entry point: single-instance dispatch and app launch."""
from __future__ import annotations
import argparse
import sys
def main() -> None:
parser = argparse.ArgumentParser(
prog="my-memory",
description="Low-friction capture app for thoughts, text, and voice",
)
parser.add_argument(
"--capture",
action="store_true",
help="Signal running instance to open capture window",
)
parser.add_argument(
"--board",
action="store_true",
help="Signal running instance to open the kanban board",
)
parser.add_argument(
"--download-model",
action="store_true",
help="Pre-download the Whisper model and exit",
)
args = parser.parse_args()
if args.download_model:
_download_model()
return
# Must create QApplication before using any Qt networking
from PySide6.QtWidgets import QApplication
_app = QApplication.instance() or QApplication(sys.argv)
if args.capture:
_send_capture()
return
if args.board:
_send_board()
return
_run_app()
def _send_capture() -> None:
"""Send capture signal to running instance."""
from my_memory.app import send_capture_signal
if send_capture_signal():
return
print("No running instance found. Starting new instance...")
_run_app(show_capture=True)
def _send_board() -> None:
"""Send board signal to running instance."""
from my_memory.app import send_board_signal
if send_board_signal():
return
print("No running instance found. Starting new instance...")
_run_app(show_board=True)
def _run_app(show_capture: bool = False, show_board: bool = False) -> None:
"""Start the main application."""
from my_memory.app import MyMemoryApp
from my_memory.config import Config
config = Config.load()
app = MyMemoryApp(config)
if not app.ensure_single_instance():
print("Another instance is already running.", file=sys.stderr)
sys.exit(1)
if show_capture:
app.show_capture_window()
if show_board:
app.show_board_window()
sys.exit(app.run())
def _download_model() -> None:
"""Pre-download the Whisper model."""
from my_memory.config import Config
from my_memory.transcriber import Transcriber
config = Config.load()
print(f"Downloading Whisper model '{config.whisper.model_size}'...")
transcriber = Transcriber(config.whisper)
transcriber.download_model()
print("Model downloaded successfully.")
if __name__ == "__main__":
main()

144
src/my_memory/app.py Normal file
View File

@ -0,0 +1,144 @@
"""QApplication with system tray icon and single-instance guard."""
from __future__ import annotations
import sys
from PySide6.QtCore import QByteArray
from PySide6.QtGui import QAction, QIcon
from PySide6.QtNetwork import QLocalServer, QLocalSocket
from PySide6.QtWidgets import QApplication, QMenu, QSystemTrayIcon
from my_memory.board_window import BoardWindow
from my_memory.capture_window import CaptureWindow
from my_memory.config import Config
from my_memory.schema import ensure_schema
SOCKET_NAME = "my-memory-instance"
def _send_signal(command: bytes) -> bool:
"""Send a command to the running instance. Returns True if signal was sent."""
socket = QLocalSocket()
socket.connectToServer(SOCKET_NAME)
if socket.waitForConnected(1000):
socket.write(QByteArray(command))
socket.waitForBytesWritten(1000)
socket.disconnectFromServer()
return True
return False
def send_capture_signal() -> bool:
"""Send capture signal to running instance. Returns True if signal was sent."""
return _send_signal(b"capture")
def send_board_signal() -> bool:
"""Send board signal to running instance. Returns True if signal was sent."""
return _send_signal(b"board")
class MyMemoryApp:
"""Main application with tray icon and single-instance IPC."""
def __init__(self, config: Config):
self._config = config
config.ensure_dirs()
ensure_schema(config)
self._app = QApplication.instance() or QApplication(sys.argv)
self._app.setApplicationName("My Memory")
self._app.setQuitOnLastWindowClosed(False)
self._server: QLocalServer | None = None
self._tray: QSystemTrayIcon | None = None
self._capture_window: CaptureWindow | None = None
self._board_window: BoardWindow | None = None
def ensure_single_instance(self) -> bool:
"""Set up local server for single-instance guard.
Returns True if this is the primary instance.
"""
self._server = QLocalServer()
self._server.newConnection.connect(self._on_new_connection)
if not self._server.listen(SOCKET_NAME):
# Server name in use - clean up stale socket and retry
QLocalServer.removeServer(SOCKET_NAME)
if not self._server.listen(SOCKET_NAME):
return False
return True
def _on_new_connection(self) -> None:
"""Handle incoming connection from another instance."""
client = self._server.nextPendingConnection()
if client:
client.waitForReadyRead(1000)
data = client.readAll().data().decode()
if data == "capture":
self.show_capture_window()
elif data == "board":
self.show_board_window()
client.disconnectFromServer()
def setup_tray(self) -> None:
"""Create system tray icon with context menu."""
self._tray = QSystemTrayIcon()
self._tray.setIcon(QIcon.fromTheme("document-edit", QIcon.fromTheme("accessories-text-editor")))
self._tray.setToolTip("My Memory - Capture thoughts")
menu = QMenu()
capture_action = QAction("Capture", menu)
capture_action.triggered.connect(self.show_capture_window)
menu.addAction(capture_action)
board_action = QAction("Board", menu)
board_action.triggered.connect(self.show_board_window)
menu.addAction(board_action)
menu.addSeparator()
quit_action = QAction("Quit", menu)
quit_action.triggered.connect(self._quit)
menu.addAction(quit_action)
self._tray.setContextMenu(menu)
self._tray.activated.connect(self._on_tray_activated)
self._tray.show()
def _on_tray_activated(self, reason: QSystemTrayIcon.ActivationReason) -> None:
if reason == QSystemTrayIcon.ActivationReason.Trigger:
self.show_capture_window()
def show_capture_window(self) -> None:
"""Show the capture popup window."""
if self._capture_window is None:
self._capture_window = CaptureWindow(self._config)
self._capture_window.board_requested.connect(self.show_board_window)
self._capture_window.show_centered()
def show_board_window(self) -> None:
"""Show the kanban board window."""
if self._board_window is None:
self._board_window = BoardWindow(self._config)
self._board_window.show_board()
def _quit(self) -> None:
if self._server:
self._server.close()
if self._board_window:
self._board_window.close()
if self._tray:
self._tray.hide()
QApplication.quit()
def run(self) -> int:
"""Run the application event loop."""
self.setup_tray()
return self._app.exec()

View File

@ -0,0 +1,115 @@
"""Audio recording via sounddevice with RMS level signals."""
from __future__ import annotations
import time
from pathlib import Path
import numpy as np
import sounddevice as sd
import soundfile as sf
from PySide6.QtCore import QObject, QTimer, Signal
from my_memory.config import AudioConfig
class AudioRecorder(QObject):
"""Records audio from the default input device.
Emits rms_level (0.0-1.0) during recording for UI level meter.
Emits recording_finished with the saved WAV path and duration.
"""
rms_level = Signal(float)
recording_finished = Signal(str, float) # (wav_path, duration_seconds)
error_occurred = Signal(str)
def __init__(self, audio_config: AudioConfig, parent: QObject | None = None):
super().__init__(parent)
self._config = audio_config
self._frames: list[np.ndarray] = []
self._stream: sd.InputStream | None = None
self._poll_timer = QTimer(self)
self._poll_timer.setInterval(50)
self._poll_timer.timeout.connect(self._poll_buffer)
self._recording = False
self._start_time: float = 0.0
self._buffer: list[np.ndarray] = []
@property
def is_recording(self) -> bool:
return self._recording
def start_recording(self) -> None:
"""Start recording audio from the default input device."""
if self._recording:
return
self._frames.clear()
self._buffer.clear()
self._recording = True
self._start_time = time.monotonic()
try:
self._stream = sd.InputStream(
samplerate=self._config.sample_rate,
channels=self._config.channels,
dtype=self._config.dtype,
callback=self._audio_callback,
)
self._stream.start()
self._poll_timer.start()
except Exception as e:
self._recording = False
self.error_occurred.emit(f"Failed to start recording: {e}")
def stop_recording(self, output_path: Path) -> None:
"""Stop recording and save to WAV file."""
if not self._recording:
return
self._recording = False
self._poll_timer.stop()
if self._stream is not None:
self._stream.stop()
self._stream.close()
self._stream = None
duration = time.monotonic() - self._start_time
if not self._frames:
self.error_occurred.emit("No audio data recorded")
return
try:
audio_data = np.concatenate(self._frames, axis=0)
sf.write(
str(output_path),
audio_data,
self._config.sample_rate,
subtype="PCM_16",
)
self.recording_finished.emit(str(output_path), duration)
except Exception as e:
self.error_occurred.emit(f"Failed to save recording: {e}")
def _audio_callback(self, indata: np.ndarray, frames: int, time_info, status) -> None:
"""Called by sounddevice in audio thread - just buffer data."""
self._frames.append(indata.copy())
self._buffer.append(indata.copy())
def _poll_buffer(self) -> None:
"""Poll buffered audio from main thread, emit RMS level."""
if not self._buffer:
return
chunks = self._buffer.copy()
self._buffer.clear()
combined = np.concatenate(chunks, axis=0).astype(np.float32)
# Normalize int16 range to 0.0-1.0
rms = np.sqrt(np.mean(combined**2)) / 32768.0
# Clamp and scale for UI visibility
level = min(1.0, rms * 5.0)
self.rms_level.emit(level)

View File

@ -0,0 +1,504 @@
"""Kanban board window for managing memory entry lifecycle."""
from __future__ import annotations
from datetime import datetime, timedelta
from pathlib import Path
from PySide6.QtCore import QTimer, Qt
from PySide6.QtWidgets import (
QApplication,
QFrame,
QHBoxLayout,
QLabel,
QMenu,
QPushButton,
QScrollArea,
QSizePolicy,
QVBoxLayout,
QWidget,
)
from my_memory.config import Config
from my_memory.models import Entry, EntrySource, EntryStatus
from my_memory.storage import load_all_entries, update_entry_status
# Column definitions: (display name, status, accent color)
_COLUMNS: list[tuple[str, EntryStatus, str]] = [
("On Docket", EntryStatus.DOCKET, "#f9e2af"),
("In Progress", EntryStatus.IN_PROGRESS, "#cba6f7"),
("Complete", EntryStatus.COMPLETE, "#a6e3a1"),
]
# Adjacent moves for inline buttons
_STATUS_ORDER = [EntryStatus.DOCKET, EntryStatus.IN_PROGRESS, EntryStatus.COMPLETE]
def _relative_time(dt: datetime) -> str:
"""Return a human-readable relative timestamp."""
delta = datetime.now() - dt
seconds = int(delta.total_seconds())
if seconds < 60:
return "just now"
minutes = seconds // 60
if minutes < 60:
return f"{minutes}m ago"
hours = minutes // 60
if hours < 24:
return f"{hours}h ago"
days = hours // 24
if days < 30:
return f"{days}d ago"
return dt.strftime("%b %d")
class EntryCard(QFrame):
"""A single entry card in a kanban column."""
def __init__(
self,
filepath: Path,
entry: Entry,
on_status_change: callable,
parent: QWidget | None = None,
):
super().__init__(parent)
self._filepath = filepath
self._entry = entry
self._on_status_change = on_status_change
self.setObjectName("entryCard")
self.setContextMenuPolicy(Qt.ContextMenuPolicy.CustomContextMenu)
self.customContextMenuRequested.connect(self._show_context_menu)
self._setup_ui()
def _setup_ui(self) -> None:
layout = QVBoxLayout(self)
layout.setContentsMargins(10, 8, 10, 8)
layout.setSpacing(6)
# Content preview (first ~120 chars, max 3 lines)
preview_text = self._entry.content[:120]
if len(self._entry.content) > 120:
preview_text += "..."
content_label = QLabel(preview_text)
content_label.setObjectName("cardContent")
content_label.setWordWrap(True)
content_label.setMaximumHeight(54)
layout.addWidget(content_label)
# Meta row: source + timestamp + tags
meta_parts: list[str] = []
source_icon = "\U0001f3a4" if self._entry.source == EntrySource.VOICE else "\u270d"
meta_parts.append(source_icon)
meta_parts.append(_relative_time(self._entry.timestamp))
if self._entry.tags:
meta_parts.append(" ".join(f"#{t}" for t in self._entry.tags[:3]))
meta_label = QLabel(" ".join(meta_parts))
meta_label.setObjectName("cardMeta")
layout.addWidget(meta_label)
# Action buttons row
btn_layout = QHBoxLayout()
btn_layout.setSpacing(4)
btn_layout.addStretch()
idx = _STATUS_ORDER.index(self._entry.status)
if idx > 0:
prev_status = _STATUS_ORDER[idx - 1]
left_btn = QPushButton(f"\u2190 {_label_for(prev_status)}")
left_btn.setObjectName("moveBtn")
left_btn.setCursor(Qt.CursorShape.PointingHandCursor)
left_btn.clicked.connect(lambda: self._move_to(prev_status))
btn_layout.addWidget(left_btn)
if idx < len(_STATUS_ORDER) - 1:
next_status = _STATUS_ORDER[idx + 1]
right_btn = QPushButton(f"{_label_for(next_status)} \u2192")
right_btn.setObjectName("moveBtn")
right_btn.setCursor(Qt.CursorShape.PointingHandCursor)
right_btn.clicked.connect(lambda: self._move_to(next_status))
btn_layout.addWidget(right_btn)
layout.addLayout(btn_layout)
def _move_to(self, new_status: EntryStatus) -> None:
update_entry_status(self._filepath, new_status)
self._on_status_change()
def _show_context_menu(self, pos) -> None:
menu = QMenu(self)
menu.setStyleSheet("""
QMenu {
background: #313244;
color: #cdd6f4;
border: 1px solid #45475a;
border-radius: 4px;
padding: 4px;
}
QMenu::item:selected {
background: #45475a;
}
""")
for status in EntryStatus:
if status == self._entry.status:
continue
action = menu.addAction(f"Move to {_label_for(status)}")
action.triggered.connect(lambda _checked, s=status: self._move_to(s))
menu.exec(self.mapToGlobal(pos))
def _label_for(status: EntryStatus) -> str:
"""Short display label for a status."""
return {
EntryStatus.DOCKET: "On Docket",
EntryStatus.IN_PROGRESS: "In Progress",
EntryStatus.COMPLETE: "Complete",
}[status]
class KanbanColumn(QWidget):
"""A single column in the kanban board."""
def __init__(
self,
name: str,
status: EntryStatus,
accent: str,
collapsible: bool = False,
default_filter_days: int = 7,
parent: QWidget | None = None,
):
super().__init__(parent)
self._name = name
self._status = status
self._accent = accent
self._collapsible = collapsible
self._collapsed = collapsible # start collapsed if collapsible
self._filter_days: int | None = default_filter_days if collapsible else None
self._all_entries: list[tuple[Path, Entry]] = []
self._on_status_change: callable | None = None
self._setup_ui()
def _setup_ui(self) -> None:
layout = QVBoxLayout(self)
layout.setContentsMargins(0, 0, 0, 0)
layout.setSpacing(0)
# Column header with count badge
self._header = QLabel(self._name)
self._header.setObjectName("columnHeader")
self._header.setStyleSheet(f"""
#columnHeader {{
color: #cdd6f4;
font-size: 14px;
font-weight: bold;
padding: 10px 12px;
border-left: 3px solid {self._accent};
background: #181825;
border-top-left-radius: 8px;
border-top-right-radius: 8px;
}}
""")
if self._collapsible:
self._header.setCursor(Qt.CursorShape.PointingHandCursor)
self._header.installEventFilter(self)
layout.addWidget(self._header)
# Filter bar (only for collapsible columns)
self._filter_bar = None
if self._collapsible:
self._filter_bar = QWidget()
self._filter_bar.setStyleSheet("background: #181825;")
filter_layout = QHBoxLayout(self._filter_bar)
filter_layout.setContentsMargins(12, 0, 12, 6)
filter_layout.setSpacing(4)
self._filter_buttons: dict[int | None, QPushButton] = {}
for label, days in [("7d", 7), ("30d", 30), ("All", None)]:
btn = QPushButton(label)
btn.setCursor(Qt.CursorShape.PointingHandCursor)
btn.setFixedHeight(22)
btn.clicked.connect(lambda _checked, d=days: self._set_filter(d))
filter_layout.addWidget(btn)
self._filter_buttons[days] = btn
filter_layout.addStretch()
layout.addWidget(self._filter_bar)
self._filter_bar.setVisible(False)
self._update_filter_styles()
# Scroll area for cards
self._scroll = QScrollArea()
self._scroll.setObjectName("columnScroll")
self._scroll.setWidgetResizable(True)
self._scroll.setHorizontalScrollBarPolicy(Qt.ScrollBarPolicy.ScrollBarAlwaysOff)
self._scroll.setStyleSheet("""
#columnScroll {
background: #181825;
border: none;
border-bottom-left-radius: 8px;
border-bottom-right-radius: 8px;
}
QScrollBar:vertical {
background: #181825;
width: 6px;
margin: 0;
}
QScrollBar::handle:vertical {
background: #45475a;
border-radius: 3px;
min-height: 20px;
}
QScrollBar::add-line:vertical, QScrollBar::sub-line:vertical {
height: 0;
}
""")
self._card_container = QWidget()
self._card_layout = QVBoxLayout(self._card_container)
self._card_layout.setContentsMargins(6, 6, 6, 6)
self._card_layout.setSpacing(6)
self._card_layout.addStretch()
self._scroll.setWidget(self._card_container)
layout.addWidget(self._scroll)
# Scroll area stays visible even when collapsed (shows empty dark bg)
# -- Collapse / filter logic ------------------------------------------
def eventFilter(self, obj, event) -> bool:
if obj is self._header and event.type() == event.Type.MouseButtonRelease:
self._toggle_collapsed()
return True
return super().eventFilter(obj, event)
def _toggle_collapsed(self) -> None:
self._collapsed = not self._collapsed
if self._filter_bar is not None:
self._filter_bar.setVisible(not self._collapsed)
self._update_filter_styles()
self._update_header()
self._render_cards()
def _set_filter(self, days: int | None) -> None:
self._filter_days = days
self._update_filter_styles()
self._render_cards()
self._update_header()
def _update_filter_styles(self) -> None:
if self._filter_bar is None:
return
for days, btn in self._filter_buttons.items():
if days == self._filter_days:
btn.setStyleSheet(f"""
background: {self._accent};
color: #1e1e2e;
border: none;
border-radius: 4px;
padding: 2px 8px;
font-size: 11px;
font-weight: bold;
""")
else:
btn.setStyleSheet("""
background: #45475a;
color: #bac2de;
border: none;
border-radius: 4px;
padding: 2px 8px;
font-size: 11px;
""")
def _update_header(self) -> None:
total = len(self._all_entries)
if not self._collapsible:
self._header.setText(f"{self._name} ({total})")
return
if self._collapsed:
self._header.setText(f"\u25b8 {self._name} ({total})")
else:
filtered = self._filtered_entries()
if len(filtered) < total:
self._header.setText(
f"\u25be {self._name} ({len(filtered)} of {total})"
)
else:
self._header.setText(f"\u25be {self._name} ({total})")
def _filtered_entries(self) -> list[tuple[Path, Entry]]:
if self._filter_days is None:
return self._all_entries
cutoff = datetime.now() - timedelta(days=self._filter_days)
return [(fp, e) for fp, e in self._all_entries if e.timestamp >= cutoff]
# -- Public API -------------------------------------------------------
@property
def status(self) -> EntryStatus:
return self._status
def set_entries(
self,
entries: list[tuple[Path, Entry]],
on_status_change: callable,
) -> None:
"""Replace all cards in this column."""
self._all_entries = entries
self._on_status_change = on_status_change
self._update_header()
self._render_cards()
def _render_cards(self) -> None:
"""Clear and rebuild card widgets, applying date filter if collapsible."""
# Clear existing cards (keep the stretch at the end)
while self._card_layout.count() > 1:
item = self._card_layout.takeAt(0)
if item.widget():
item.widget().deleteLater()
if self._collapsed:
return
entries = self._filtered_entries() if self._collapsible else self._all_entries
for filepath, entry in entries:
card = EntryCard(filepath, entry, self._on_status_change, self)
self._card_layout.insertWidget(self._card_layout.count() - 1, card)
class BoardWindow(QWidget):
"""Kanban board window for managing memory entries."""
def __init__(self, config: Config):
super().__init__()
self._config = config
self._columns: list[KanbanColumn] = []
self._debounce_timer = QTimer(self)
self._debounce_timer.setSingleShot(True)
self._debounce_timer.setInterval(300)
self._debounce_timer.timeout.connect(self._refresh)
self._setup_ui()
self._setup_watcher()
def _setup_ui(self) -> None:
self.setWindowTitle("My Memory - Board")
self.resize(900, 600)
self.setMinimumSize(700, 400)
self.setStyleSheet(self._stylesheet())
layout = QVBoxLayout(self)
layout.setContentsMargins(16, 16, 16, 16)
layout.setSpacing(12)
# Header
header = QLabel("My Memory")
header.setObjectName("boardHeader")
layout.addWidget(header)
# Columns
columns_layout = QHBoxLayout()
columns_layout.setSpacing(10)
for name, status, accent in _COLUMNS:
col = KanbanColumn(
name, status, accent,
collapsible=(status == EntryStatus.COMPLETE),
)
col.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding)
self._columns.append(col)
columns_layout.addWidget(col, stretch=1)
layout.addLayout(columns_layout, stretch=1)
def _setup_watcher(self) -> None:
from PySide6.QtCore import QFileSystemWatcher
self._watcher = QFileSystemWatcher(self)
entries_dir = self._config.entries_dir
if entries_dir.exists():
self._watcher.addPath(str(entries_dir))
for subdir in entries_dir.iterdir():
if subdir.is_dir():
self._watcher.addPath(str(subdir))
self._watcher.directoryChanged.connect(self._on_directory_changed)
def _on_directory_changed(self, _path: str) -> None:
# Re-watch new date directories that may have appeared
entries_dir = self._config.entries_dir
if entries_dir.exists():
current = set(self._watcher.directories())
for subdir in entries_dir.iterdir():
if subdir.is_dir() and str(subdir) not in current:
self._watcher.addPath(str(subdir))
self._debounce_timer.start()
def show_board(self) -> None:
"""Show the board window and refresh entries."""
self._refresh()
self.show()
self.raise_()
self.activateWindow()
def _refresh(self) -> None:
"""Reload all entries from disk and repopulate columns."""
all_entries = load_all_entries(self._config)
grouped: dict[EntryStatus, list[tuple[Path, Entry]]] = {
s: [] for s in EntryStatus
}
for filepath, entry in all_entries:
grouped[entry.status].append((filepath, entry))
for col in self._columns:
col.set_entries(grouped[col.status], self._refresh)
@staticmethod
def _stylesheet() -> str:
return """
BoardWindow {
background: #1e1e2e;
}
#boardHeader {
color: #cdd6f4;
font-size: 18px;
font-weight: bold;
}
#entryCard {
background: #313244;
border: 1px solid #45475a;
border-radius: 8px;
}
#entryCard:hover {
border-color: #585b70;
}
#cardContent {
color: #cdd6f4;
font-size: 13px;
line-height: 1.3;
}
#cardMeta {
color: #6c7086;
font-size: 11px;
}
#moveBtn {
background: #45475a;
color: #bac2de;
border: none;
border-radius: 4px;
padding: 3px 8px;
font-size: 11px;
}
#moveBtn:hover {
background: #585b70;
color: #cdd6f4;
}
"""

View File

@ -0,0 +1,366 @@
"""Capture popup window: text area, voice recording, level meter, submit."""
from __future__ import annotations
import time
from pathlib import Path
from uuid import uuid4
from PySide6.QtCore import QTimer, Qt, Signal
from PySide6.QtGui import QCursor, QKeySequence, QShortcut
from PySide6.QtWidgets import (
QApplication,
QHBoxLayout,
QLabel,
QProgressBar,
QPushButton,
QTextEdit,
QVBoxLayout,
QWidget,
)
from my_memory.audio_recorder import AudioRecorder
from my_memory.config import Config
from my_memory.models import Entry, EntrySource
from my_memory.storage import save_entry
from my_memory.transcriber import Transcriber
class CaptureWindow(QWidget):
"""Frameless popup for capturing text and voice entries."""
board_requested = Signal()
def __init__(self, config: Config):
super().__init__()
self._config = config
self._entry_id = uuid4()
self._record_start_time: float = 0.0
# Audio components
self._recorder = AudioRecorder(config.audio, parent=self)
self._transcriber = Transcriber(config.whisper, parent=self)
self._setup_ui()
self._connect_signals()
def _setup_ui(self) -> None:
self.setWindowFlags(
Qt.WindowType.FramelessWindowHint
| Qt.WindowType.WindowStaysOnTopHint
| Qt.WindowType.Tool
)
self.setAttribute(Qt.WidgetAttribute.WA_ShowWithoutActivating, False)
self.setFixedSize(480, 280)
self.setStyleSheet(self._stylesheet())
layout = QVBoxLayout(self)
layout.setContentsMargins(12, 12, 12, 12)
layout.setSpacing(8)
# Header row
header_layout = QHBoxLayout()
header_layout.setSpacing(8)
header = QLabel("Capture Thought")
header.setObjectName("header")
header_layout.addWidget(header)
header_layout.addStretch()
board_btn = QPushButton("Board")
board_btn.setObjectName("boardBtn")
board_btn.setFixedSize(56, 24)
board_btn.setCursor(Qt.CursorShape.PointingHandCursor)
board_btn.clicked.connect(self._open_board)
header_layout.addWidget(board_btn)
layout.addLayout(header_layout)
# Text area
self._text_edit = QTextEdit()
self._text_edit.setPlaceholderText("Type your thought here... (Ctrl+Enter to save)")
self._text_edit.setAcceptRichText(False)
layout.addWidget(self._text_edit)
# Audio level meter
self._level_bar = QProgressBar()
self._level_bar.setRange(0, 100)
self._level_bar.setValue(0)
self._level_bar.setTextVisible(False)
self._level_bar.setFixedHeight(6)
self._level_bar.setObjectName("levelBar")
self._level_bar.hide()
layout.addWidget(self._level_bar)
# Status label (for transcription progress, errors)
self._status_label = QLabel("")
self._status_label.setObjectName("status")
self._status_label.hide()
layout.addWidget(self._status_label)
# Button row
btn_layout = QHBoxLayout()
btn_layout.setSpacing(8)
self._record_btn = QPushButton("Record")
self._record_btn.setObjectName("recordBtn")
self._record_btn.setFixedWidth(100)
self._record_btn.clicked.connect(self._toggle_recording)
btn_layout.addWidget(self._record_btn)
self._timer_label = QLabel("")
self._timer_label.setObjectName("timer")
btn_layout.addWidget(self._timer_label)
btn_layout.addStretch()
self._submit_btn = QPushButton("Save")
self._submit_btn.setObjectName("submitBtn")
self._submit_btn.setFixedWidth(80)
self._submit_btn.clicked.connect(self._submit)
btn_layout.addWidget(self._submit_btn)
layout.addLayout(btn_layout)
# Keyboard shortcuts
QShortcut(QKeySequence("Ctrl+Return"), self, self._submit)
QShortcut(QKeySequence("Escape"), self, self._dismiss)
# Recording timer for UI updates
self._record_timer = QTimer(self)
self._record_timer.setInterval(100)
self._record_timer.timeout.connect(self._update_record_timer)
def _connect_signals(self) -> None:
self._recorder.rms_level.connect(self._on_rms_level)
self._recorder.recording_finished.connect(self._on_recording_finished)
self._recorder.error_occurred.connect(self._on_error)
self._transcriber.transcription_started.connect(self._on_transcription_started)
self._transcriber.transcription_ready.connect(self._on_transcription_ready)
self._transcriber.transcription_error.connect(self._on_error)
def show_centered(self) -> None:
"""Show window centered on the screen under the cursor."""
self._reset_state()
screen = QApplication.screenAt(QCursor.pos())
if screen is None:
screen = QApplication.primaryScreen()
geo = screen.availableGeometry()
x = geo.x() + (geo.width() - self.width()) // 2
y = geo.y() + (geo.height() - self.height()) // 2
self.move(x, y)
self.show()
self.raise_()
self.activateWindow()
self._text_edit.setFocus()
def _reset_state(self) -> None:
"""Reset window state for a fresh capture."""
self._entry_id = uuid4()
self._text_edit.clear()
self._status_label.hide()
self._level_bar.hide()
self._level_bar.setValue(0)
self._timer_label.setText("")
self._record_btn.setText("Record")
self._record_btn.setProperty("recording", False)
self._record_btn.style().unpolish(self._record_btn)
self._record_btn.style().polish(self._record_btn)
self._submit_btn.setEnabled(True)
self._record_btn.setEnabled(True)
def _toggle_recording(self) -> None:
if self._recorder.is_recording:
self._stop_recording()
else:
self._start_recording()
def _start_recording(self) -> None:
self._record_start_time = time.monotonic()
self._level_bar.show()
self._record_btn.setText("Stop")
self._record_btn.setProperty("recording", True)
self._record_btn.style().unpolish(self._record_btn)
self._record_btn.style().polish(self._record_btn)
self._record_timer.start()
self._recorder.start_recording()
def _stop_recording(self) -> None:
self._record_timer.stop()
# Save WAV alongside future entry
date_str = __import__("datetime").datetime.now().strftime("%Y-%m-%d")
day_dir = self._config.entries_dir / date_str
day_dir.mkdir(parents=True, exist_ok=True)
wav_path = day_dir / f"{self._entry_id}.wav"
self._recorder.stop_recording(wav_path)
self._record_btn.setText("Record")
self._record_btn.setProperty("recording", False)
self._record_btn.style().unpolish(self._record_btn)
self._record_btn.style().polish(self._record_btn)
def _update_record_timer(self) -> None:
elapsed = time.monotonic() - self._record_start_time
mins = int(elapsed) // 60
secs = int(elapsed) % 60
self._timer_label.setText(f"{mins}:{secs:02d}")
def _on_rms_level(self, level: float) -> None:
self._level_bar.setValue(int(level * 100))
def _on_recording_finished(self, wav_path: str, duration: float) -> None:
self._level_bar.hide()
self._timer_label.setText("")
self._wav_path = wav_path
self._wav_duration = duration
# Auto-transcribe
self._transcriber.transcribe(wav_path)
def _on_transcription_started(self) -> None:
self._status_label.setText("Transcribing...")
self._status_label.show()
self._submit_btn.setEnabled(False)
self._record_btn.setEnabled(False)
def _on_transcription_ready(self, text: str) -> None:
self._status_label.hide()
self._submit_btn.setEnabled(True)
self._record_btn.setEnabled(True)
# Append transcribed text
existing = self._text_edit.toPlainText().strip()
if existing:
self._text_edit.setPlainText(f"{existing}\n\n{text}")
else:
self._text_edit.setPlainText(text)
# Move cursor to end
cursor = self._text_edit.textCursor()
cursor.movePosition(cursor.MoveOperation.End)
self._text_edit.setTextCursor(cursor)
def _on_error(self, msg: str) -> None:
self._status_label.setText(f"Error: {msg}")
self._status_label.show()
self._submit_btn.setEnabled(True)
self._record_btn.setEnabled(True)
self._level_bar.hide()
self._record_btn.setText("Record")
self._record_btn.setProperty("recording", False)
self._record_btn.style().unpolish(self._record_btn)
self._record_btn.style().polish(self._record_btn)
def _submit(self) -> None:
content = self._text_edit.toPlainText().strip()
if not content:
return
has_audio = hasattr(self, "_wav_path")
entry = Entry(
id=self._entry_id,
source=EntrySource.VOICE if has_audio else EntrySource.TEXT,
content=content,
audio_file=f"{self._entry_id}.wav" if has_audio else None,
duration_seconds=self._wav_duration if has_audio else None,
)
save_entry(entry, self._config)
self._dismiss()
def _open_board(self) -> None:
"""Request the board window and dismiss capture."""
self._dismiss()
self.board_requested.emit()
def _dismiss(self) -> None:
"""Hide the window and stop any recording."""
if self._recorder.is_recording:
self._recorder.stop_recording(Path("/dev/null"))
self._record_timer.stop()
self.hide()
@staticmethod
def _stylesheet() -> str:
return """
CaptureWindow {
background: #1e1e2e;
border: 1px solid #45475a;
border-radius: 12px;
}
#header {
color: #cdd6f4;
font-size: 14px;
font-weight: bold;
}
QTextEdit {
background: #313244;
color: #cdd6f4;
border: 1px solid #45475a;
border-radius: 6px;
padding: 8px;
font-size: 14px;
selection-background-color: #585b70;
}
#levelBar {
background: #313244;
border: none;
border-radius: 3px;
}
#levelBar::chunk {
background: #a6e3a1;
border-radius: 3px;
}
#status {
color: #a6adc8;
font-size: 12px;
}
#timer {
color: #f38ba8;
font-size: 13px;
font-weight: bold;
}
QPushButton {
background: #45475a;
color: #cdd6f4;
border: none;
border-radius: 6px;
padding: 6px 16px;
font-size: 13px;
}
QPushButton:hover {
background: #585b70;
}
QPushButton:disabled {
background: #313244;
color: #6c7086;
}
#submitBtn {
background: #89b4fa;
color: #1e1e2e;
font-weight: bold;
}
#submitBtn:hover {
background: #b4d0fb;
}
#recordBtn[recording="true"] {
background: #f38ba8;
color: #1e1e2e;
font-weight: bold;
}
#boardBtn {
background: transparent;
color: #6c7086;
border: 1px solid #45475a;
border-radius: 4px;
padding: 2px 8px;
font-size: 11px;
}
#boardBtn:hover {
color: #cdd6f4;
border-color: #585b70;
}
"""

81
src/my_memory/config.py Normal file
View File

@ -0,0 +1,81 @@
"""Configuration management with optional TOML overrides."""
from __future__ import annotations
import tomllib
from dataclasses import dataclass, field
from pathlib import Path
def _default_base_dir() -> Path:
return Path.home() / ".my-memory"
@dataclass
class WhisperConfig:
model_size: str = "base"
device: str = "auto"
compute_type: str = "float16"
cpu_fallback_compute_type: str = "int8"
@dataclass
class AudioConfig:
sample_rate: int = 16000
channels: int = 1
dtype: str = "int16"
@dataclass
class Config:
base_dir: Path = field(default_factory=_default_base_dir)
whisper: WhisperConfig = field(default_factory=WhisperConfig)
audio: AudioConfig = field(default_factory=AudioConfig)
@property
def entries_dir(self) -> Path:
return self.base_dir / "entries"
@property
def config_file(self) -> Path:
return self.base_dir / "config.toml"
@property
def schema_file(self) -> Path:
return self.base_dir / "schema.md"
def ensure_dirs(self) -> None:
self.base_dir.mkdir(parents=True, exist_ok=True)
self.entries_dir.mkdir(parents=True, exist_ok=True)
@classmethod
def load(cls, base_dir: Path | None = None) -> Config:
"""Load config, applying TOML overrides if config.toml exists."""
config = cls()
if base_dir is not None:
config.base_dir = base_dir
if config.config_file.exists():
with open(config.config_file, "rb") as f:
data = tomllib.load(f)
_apply_overrides(config, data)
return config
def _apply_overrides(config: Config, data: dict) -> None:
"""Apply TOML overrides onto the config dataclass."""
if "base_dir" in data:
config.base_dir = Path(data["base_dir"]).expanduser()
if "whisper" in data:
w = data["whisper"]
for key in ("model_size", "device", "compute_type", "cpu_fallback_compute_type"):
if key in w:
setattr(config.whisper, key, w[key])
if "audio" in data:
a = data["audio"]
for key in ("sample_rate", "channels", "dtype"):
if key in a:
setattr(config.audio, key, a[key])

65
src/my_memory/models.py Normal file
View File

@ -0,0 +1,65 @@
"""Pydantic data models for memory entries."""
from __future__ import annotations
from datetime import datetime
from enum import Enum
from uuid import UUID, uuid4
from pydantic import BaseModel, Field
class EntrySource(str, Enum):
TEXT = "text"
VOICE = "voice"
class EntryStatus(str, Enum):
DOCKET = "docket"
IN_PROGRESS = "in_progress"
COMPLETE = "complete"
class Entry(BaseModel):
id: UUID = Field(default_factory=uuid4)
timestamp: datetime = Field(default_factory=datetime.now)
source: EntrySource = EntrySource.TEXT
tags: list[str] = Field(default_factory=list)
status: EntryStatus = EntryStatus.DOCKET
audio_file: str | None = None
duration_seconds: float | None = None
content: str = ""
def to_frontmatter_dict(self) -> dict:
"""Return dict suitable for YAML frontmatter (no content)."""
d: dict = {
"id": str(self.id),
"timestamp": self.timestamp.isoformat(),
"source": self.source.value,
"tags": self.tags,
"status": self.status.value,
}
if self.audio_file is not None:
d["audio_file"] = self.audio_file
if self.duration_seconds is not None:
d["duration_seconds"] = self.duration_seconds
return d
@classmethod
def from_frontmatter(cls, metadata: dict, content: str) -> Entry:
"""Reconstruct an Entry from parsed frontmatter metadata + body content."""
# Handle legacy entries that have 'processed' instead of 'status'
status_val = metadata.get("status", "docket")
if "status" not in metadata and metadata.get("processed", False):
status_val = "complete"
return cls(
id=UUID(metadata["id"]),
timestamp=datetime.fromisoformat(metadata["timestamp"]),
source=EntrySource(metadata["source"]),
tags=metadata.get("tags", []),
status=EntryStatus(status_val),
audio_file=metadata.get("audio_file"),
duration_seconds=metadata.get("duration_seconds"),
content=content.strip(),
)

73
src/my_memory/schema.py Normal file
View File

@ -0,0 +1,73 @@
"""Auto-generate schema.md for AI agent discovery."""
from __future__ import annotations
from my_memory.config import Config
SCHEMA_CONTENT = """\
# My-Memory Entry Schema
This directory contains captured thoughts stored as markdown files with YAML frontmatter.
## Directory Structure
```
~/.my-memory/
config.toml # Optional configuration overrides
schema.md # This file (auto-generated)
entries/
YYYY-MM-DD/
{uuid}.md # Entry file
{uuid}.wav # Optional audio recording
```
## Entry Format
Each `.md` file contains YAML frontmatter followed by the entry content:
```yaml
---
id: "550e8400-e29b-41d4-a716-446655440000" # UUID v4
timestamp: "2026-02-11T14:30:00" # ISO 8601
source: "text" # "text" or "voice"
tags: [] # User-defined tags
status: "docket" # Workflow status (see below)
audio_file: "550e8400-...440000.wav" # Present only for voice entries
duration_seconds: 12.5 # Present only for voice entries
---
The captured thought content here.
```
## Fields
| Field | Type | Required | Description |
|-------|------|----------|-------------|
| `id` | UUID string | Yes | Unique identifier |
| `timestamp` | ISO 8601 string | Yes | When the entry was captured |
| `source` | `"text"` or `"voice"` | Yes | How the entry was created |
| `tags` | list of strings | Yes | User tags (may be empty) |
| `status` | `"docket"`, `"in_progress"`, or `"complete"` | Yes | Workflow status for kanban board |
| `audio_file` | string or absent | No | Filename of WAV recording (voice entries only) |
| `duration_seconds` | float or absent | No | Recording duration in seconds (voice entries only) |
## Audio Format
WAV files are 16kHz mono PCM16 (standard for speech processing).
## Status Lifecycle
Entries move through the kanban board:
1. **docket** - Newly captured, waiting to be acted on
2. **in_progress** - Currently being worked on
3. **complete** - Done
"""
def ensure_schema(config: Config) -> None:
"""Write schema.md if it doesn't exist or content has changed."""
config.ensure_dirs()
schema_path = config.schema_file
if schema_path.exists() and schema_path.read_text() == SCHEMA_CONTENT:
return
schema_path.write_text(SCHEMA_CONTENT)

73
src/my_memory/storage.py Normal file
View File

@ -0,0 +1,73 @@
"""Markdown + YAML frontmatter file I/O for memory entries."""
from __future__ import annotations
from pathlib import Path
import frontmatter
from my_memory.config import Config
from my_memory.models import Entry, EntryStatus
def save_entry(entry: Entry, config: Config) -> Path:
"""Save an entry as a markdown file with YAML frontmatter.
Returns the path to the saved file.
"""
date_str = entry.timestamp.strftime("%Y-%m-%d")
day_dir = config.entries_dir / date_str
day_dir.mkdir(parents=True, exist_ok=True)
filename = f"{entry.id}.md"
filepath = day_dir / filename
post = frontmatter.Post(entry.content, **entry.to_frontmatter_dict())
filepath.write_text(frontmatter.dumps(post) + "\n")
return filepath
def load_entry(filepath: Path) -> Entry:
"""Load an entry from a markdown file with YAML frontmatter."""
post = frontmatter.load(str(filepath))
return Entry.from_frontmatter(post.metadata, post.content)
def list_entries(config: Config, date_str: str | None = None) -> list[Path]:
"""List all entry files, optionally filtered by date (YYYY-MM-DD).
Returns paths sorted by modification time (newest first).
"""
if date_str:
search_dir = config.entries_dir / date_str
if not search_dir.exists():
return []
files = list(search_dir.glob("*.md"))
else:
files = list(config.entries_dir.rglob("*.md"))
return sorted(files, key=lambda p: p.stat().st_mtime, reverse=True)
def update_entry_status(filepath: Path, new_status: EntryStatus) -> None:
"""Update only the status field in an entry's frontmatter on disk."""
post = frontmatter.load(str(filepath))
post.metadata["status"] = new_status.value
# Remove legacy 'processed' key if present
post.metadata.pop("processed", None)
filepath.write_text(frontmatter.dumps(post) + "\n")
def load_all_entries(config: Config) -> list[tuple[Path, Entry]]:
"""Load all entries with their file paths.
Returns list of (filepath, entry) tuples sorted by timestamp (newest first).
"""
results: list[tuple[Path, Entry]] = []
for filepath in config.entries_dir.rglob("*.md"):
if filepath.name == "schema.md":
continue
entry = load_entry(filepath)
results.append((filepath, entry))
return sorted(results, key=lambda t: t[1].timestamp, reverse=True)

View File

@ -0,0 +1,130 @@
"""Lazy-loaded faster-whisper transcription with QThread worker."""
from __future__ import annotations
import logging
from PySide6.QtCore import QObject, QThread, Signal
from my_memory.config import WhisperConfig
logger = logging.getLogger(__name__)
# Module-level lazy singleton
_model = None
_model_config: WhisperConfig | None = None
def _load_model(config: WhisperConfig):
"""Load the faster-whisper model, falling back to CPU if CUDA fails."""
global _model, _model_config
if _model is not None and _model_config == config:
return _model
from faster_whisper import WhisperModel
device = config.device
compute_type = config.compute_type
if device == "auto":
# Try CUDA first, fall back to CPU
try:
logger.info("Loading Whisper model '%s' on CUDA...", config.model_size)
_model = WhisperModel(
config.model_size, device="cuda", compute_type=compute_type
)
_model_config = config
logger.info("Whisper model loaded on CUDA")
return _model
except Exception as e:
logger.warning("CUDA failed (%s), falling back to CPU", e)
device = "cpu"
compute_type = config.cpu_fallback_compute_type
logger.info("Loading Whisper model '%s' on %s...", config.model_size, device)
_model = WhisperModel(config.model_size, device=device, compute_type=compute_type)
_model_config = config
logger.info("Whisper model loaded on %s", device)
return _model
def transcribe_file(wav_path: str, config: WhisperConfig) -> str:
"""Transcribe a WAV file and return the text."""
model = _load_model(config)
segments, _info = model.transcribe(wav_path, beam_size=5)
return " ".join(segment.text.strip() for segment in segments).strip()
class TranscriptionWorker(QObject):
"""Worker that runs transcription in a background thread."""
finished = Signal(str) # transcribed text
error = Signal(str) # error message
def __init__(self, wav_path: str, config: WhisperConfig):
super().__init__()
self._wav_path = wav_path
self._config = config
def run(self) -> None:
try:
text = transcribe_file(self._wav_path, self._config)
self.finished.emit(text)
except Exception as e:
self.error.emit(f"Transcription failed: {e}")
class Transcriber(QObject):
"""Manages background transcription via QThread."""
transcription_ready = Signal(str) # transcribed text
transcription_error = Signal(str) # error message
transcription_started = Signal()
def __init__(self, whisper_config: WhisperConfig, parent: QObject | None = None):
super().__init__(parent)
self._config = whisper_config
self._thread: QThread | None = None
self._worker: TranscriptionWorker | None = None
@property
def is_busy(self) -> bool:
return self._thread is not None and self._thread.isRunning()
def transcribe(self, wav_path: str) -> None:
"""Start transcription in a background thread."""
if self.is_busy:
self.transcription_error.emit("Transcription already in progress")
return
self.transcription_started.emit()
self._thread = QThread()
self._worker = TranscriptionWorker(wav_path, self._config)
self._worker.moveToThread(self._thread)
self._thread.started.connect(self._worker.run)
self._worker.finished.connect(self._on_finished)
self._worker.error.connect(self._on_error)
self._thread.start()
def _on_finished(self, text: str) -> None:
self._cleanup_thread()
self.transcription_ready.emit(text)
def _on_error(self, error_msg: str) -> None:
self._cleanup_thread()
self.transcription_error.emit(error_msg)
def _cleanup_thread(self) -> None:
if self._thread is not None:
self._thread.quit()
self._thread.wait()
self._thread = None
self._worker = None
def download_model(self) -> None:
"""Pre-download the Whisper model (blocking call for CLI use)."""
_load_model(self._config)