From 195cd443cb63eb4050dba17c8bfae6db4c895372 Mon Sep 17 00:00:00 2001 From: Cal Corum Date: Wed, 11 Feb 2026 13:55:21 -0600 Subject: [PATCH] =?UTF-8?q?feat:=20initial=20commit=20=E2=80=94=20voice/te?= =?UTF-8?q?xt=20memory=20capture=20with=20kanban=20board?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PySide6 app for capturing quick memories via voice or text, organized on a kanban board (On Docket → In Progress → Complete). Complete column is collapsible with 7d/30d/All date filters. Co-Authored-By: Claude Opus 4.6 --- .gitignore | 13 + .python-version | 1 + README.md | 0 my-memory.desktop | 10 + pyproject.toml | 27 ++ src/my_memory/__init__.py | 3 + src/my_memory/__main__.py | 106 +++++++ src/my_memory/app.py | 144 +++++++++ src/my_memory/audio_recorder.py | 115 ++++++++ src/my_memory/board_window.py | 504 ++++++++++++++++++++++++++++++++ src/my_memory/capture_window.py | 366 +++++++++++++++++++++++ src/my_memory/config.py | 81 +++++ src/my_memory/models.py | 65 ++++ src/my_memory/schema.py | 73 +++++ src/my_memory/storage.py | 73 +++++ src/my_memory/transcriber.py | 130 ++++++++ 16 files changed, 1711 insertions(+) create mode 100644 .gitignore create mode 100644 .python-version create mode 100644 README.md create mode 100644 my-memory.desktop create mode 100644 pyproject.toml create mode 100644 src/my_memory/__init__.py create mode 100644 src/my_memory/__main__.py create mode 100644 src/my_memory/app.py create mode 100644 src/my_memory/audio_recorder.py create mode 100644 src/my_memory/board_window.py create mode 100644 src/my_memory/capture_window.py create mode 100644 src/my_memory/config.py create mode 100644 src/my_memory/models.py create mode 100644 src/my_memory/schema.py create mode 100644 src/my_memory/storage.py create mode 100644 src/my_memory/transcriber.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..25b8699 --- /dev/null +++ b/.gitignore @@ -0,0 +1,13 @@ +# Python-generated files +__pycache__/ +*.py[oc] +build/ +dist/ +wheels/ +*.egg-info + +# Virtual environments +.venv + +# uv lock file +uv.lock diff --git a/.python-version b/.python-version new file mode 100644 index 0000000..24ee5b1 --- /dev/null +++ b/.python-version @@ -0,0 +1 @@ +3.13 diff --git a/README.md b/README.md new file mode 100644 index 0000000..e69de29 diff --git a/my-memory.desktop b/my-memory.desktop new file mode 100644 index 0000000..6f21158 --- /dev/null +++ b/my-memory.desktop @@ -0,0 +1,10 @@ +[Desktop Entry] +Type=Application +Name=My Memory +Comment=Low-friction thought capture (text + voice) +Exec=my-memory +Icon=document-edit +Terminal=false +Categories=Utility; +StartupNotify=false +X-GNOME-Autostart-enabled=true diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..5da343c --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,27 @@ +[project] +name = "my-memory" +version = "0.1.0" +description = "Low-friction capture app for thoughts, text, and voice" +readme = "README.md" +requires-python = ">=3.13" +dependencies = [ + "PySide6>=6.7", + "sounddevice>=0.5", + "soundfile>=0.12", + "faster-whisper>=1.1", + "pydantic>=2.0", + "python-frontmatter>=1.1", +] + +[project.scripts] +my-memory = "my_memory.__main__:main" + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/my_memory"] + +[tool.pytest.ini_options] +testpaths = ["tests"] diff --git a/src/my_memory/__init__.py b/src/my_memory/__init__.py new file mode 100644 index 0000000..420638b --- /dev/null +++ b/src/my_memory/__init__.py @@ -0,0 +1,3 @@ +"""My-Memory: Low-friction capture app for thoughts, text, and voice.""" + +__version__ = "0.1.0" diff --git a/src/my_memory/__main__.py b/src/my_memory/__main__.py new file mode 100644 index 0000000..4cbe5b1 --- /dev/null +++ b/src/my_memory/__main__.py @@ -0,0 +1,106 @@ +"""CLI entry point: single-instance dispatch and app launch.""" + +from __future__ import annotations + +import argparse +import sys + + +def main() -> None: + parser = argparse.ArgumentParser( + prog="my-memory", + description="Low-friction capture app for thoughts, text, and voice", + ) + parser.add_argument( + "--capture", + action="store_true", + help="Signal running instance to open capture window", + ) + parser.add_argument( + "--board", + action="store_true", + help="Signal running instance to open the kanban board", + ) + parser.add_argument( + "--download-model", + action="store_true", + help="Pre-download the Whisper model and exit", + ) + args = parser.parse_args() + + if args.download_model: + _download_model() + return + + # Must create QApplication before using any Qt networking + from PySide6.QtWidgets import QApplication + + _app = QApplication.instance() or QApplication(sys.argv) + + if args.capture: + _send_capture() + return + + if args.board: + _send_board() + return + + _run_app() + + +def _send_capture() -> None: + """Send capture signal to running instance.""" + from my_memory.app import send_capture_signal + + if send_capture_signal(): + return + + print("No running instance found. Starting new instance...") + _run_app(show_capture=True) + + +def _send_board() -> None: + """Send board signal to running instance.""" + from my_memory.app import send_board_signal + + if send_board_signal(): + return + + print("No running instance found. Starting new instance...") + _run_app(show_board=True) + + +def _run_app(show_capture: bool = False, show_board: bool = False) -> None: + """Start the main application.""" + from my_memory.app import MyMemoryApp + from my_memory.config import Config + + config = Config.load() + app = MyMemoryApp(config) + + if not app.ensure_single_instance(): + print("Another instance is already running.", file=sys.stderr) + sys.exit(1) + + if show_capture: + app.show_capture_window() + if show_board: + app.show_board_window() + + sys.exit(app.run()) + + +def _download_model() -> None: + """Pre-download the Whisper model.""" + from my_memory.config import Config + from my_memory.transcriber import Transcriber + + config = Config.load() + print(f"Downloading Whisper model '{config.whisper.model_size}'...") + transcriber = Transcriber(config.whisper) + transcriber.download_model() + print("Model downloaded successfully.") + + +if __name__ == "__main__": + main() diff --git a/src/my_memory/app.py b/src/my_memory/app.py new file mode 100644 index 0000000..49f197d --- /dev/null +++ b/src/my_memory/app.py @@ -0,0 +1,144 @@ +"""QApplication with system tray icon and single-instance guard.""" + +from __future__ import annotations + +import sys + +from PySide6.QtCore import QByteArray +from PySide6.QtGui import QAction, QIcon +from PySide6.QtNetwork import QLocalServer, QLocalSocket +from PySide6.QtWidgets import QApplication, QMenu, QSystemTrayIcon + +from my_memory.board_window import BoardWindow +from my_memory.capture_window import CaptureWindow +from my_memory.config import Config +from my_memory.schema import ensure_schema + +SOCKET_NAME = "my-memory-instance" + + +def _send_signal(command: bytes) -> bool: + """Send a command to the running instance. Returns True if signal was sent.""" + socket = QLocalSocket() + socket.connectToServer(SOCKET_NAME) + if socket.waitForConnected(1000): + socket.write(QByteArray(command)) + socket.waitForBytesWritten(1000) + socket.disconnectFromServer() + return True + return False + + +def send_capture_signal() -> bool: + """Send capture signal to running instance. Returns True if signal was sent.""" + return _send_signal(b"capture") + + +def send_board_signal() -> bool: + """Send board signal to running instance. Returns True if signal was sent.""" + return _send_signal(b"board") + + +class MyMemoryApp: + """Main application with tray icon and single-instance IPC.""" + + def __init__(self, config: Config): + self._config = config + config.ensure_dirs() + ensure_schema(config) + + self._app = QApplication.instance() or QApplication(sys.argv) + self._app.setApplicationName("My Memory") + self._app.setQuitOnLastWindowClosed(False) + + self._server: QLocalServer | None = None + self._tray: QSystemTrayIcon | None = None + self._capture_window: CaptureWindow | None = None + self._board_window: BoardWindow | None = None + + def ensure_single_instance(self) -> bool: + """Set up local server for single-instance guard. + + Returns True if this is the primary instance. + """ + self._server = QLocalServer() + self._server.newConnection.connect(self._on_new_connection) + + if not self._server.listen(SOCKET_NAME): + # Server name in use - clean up stale socket and retry + QLocalServer.removeServer(SOCKET_NAME) + if not self._server.listen(SOCKET_NAME): + return False + + return True + + def _on_new_connection(self) -> None: + """Handle incoming connection from another instance.""" + client = self._server.nextPendingConnection() + if client: + client.waitForReadyRead(1000) + data = client.readAll().data().decode() + if data == "capture": + self.show_capture_window() + elif data == "board": + self.show_board_window() + client.disconnectFromServer() + + def setup_tray(self) -> None: + """Create system tray icon with context menu.""" + self._tray = QSystemTrayIcon() + self._tray.setIcon(QIcon.fromTheme("document-edit", QIcon.fromTheme("accessories-text-editor"))) + self._tray.setToolTip("My Memory - Capture thoughts") + + menu = QMenu() + + capture_action = QAction("Capture", menu) + capture_action.triggered.connect(self.show_capture_window) + menu.addAction(capture_action) + + board_action = QAction("Board", menu) + board_action.triggered.connect(self.show_board_window) + menu.addAction(board_action) + + menu.addSeparator() + + quit_action = QAction("Quit", menu) + quit_action.triggered.connect(self._quit) + menu.addAction(quit_action) + + self._tray.setContextMenu(menu) + self._tray.activated.connect(self._on_tray_activated) + self._tray.show() + + def _on_tray_activated(self, reason: QSystemTrayIcon.ActivationReason) -> None: + if reason == QSystemTrayIcon.ActivationReason.Trigger: + self.show_capture_window() + + def show_capture_window(self) -> None: + """Show the capture popup window.""" + if self._capture_window is None: + self._capture_window = CaptureWindow(self._config) + self._capture_window.board_requested.connect(self.show_board_window) + + self._capture_window.show_centered() + + def show_board_window(self) -> None: + """Show the kanban board window.""" + if self._board_window is None: + self._board_window = BoardWindow(self._config) + + self._board_window.show_board() + + def _quit(self) -> None: + if self._server: + self._server.close() + if self._board_window: + self._board_window.close() + if self._tray: + self._tray.hide() + QApplication.quit() + + def run(self) -> int: + """Run the application event loop.""" + self.setup_tray() + return self._app.exec() diff --git a/src/my_memory/audio_recorder.py b/src/my_memory/audio_recorder.py new file mode 100644 index 0000000..47424cf --- /dev/null +++ b/src/my_memory/audio_recorder.py @@ -0,0 +1,115 @@ +"""Audio recording via sounddevice with RMS level signals.""" + +from __future__ import annotations + +import time +from pathlib import Path + +import numpy as np +import sounddevice as sd +import soundfile as sf +from PySide6.QtCore import QObject, QTimer, Signal + +from my_memory.config import AudioConfig + + +class AudioRecorder(QObject): + """Records audio from the default input device. + + Emits rms_level (0.0-1.0) during recording for UI level meter. + Emits recording_finished with the saved WAV path and duration. + """ + + rms_level = Signal(float) + recording_finished = Signal(str, float) # (wav_path, duration_seconds) + error_occurred = Signal(str) + + def __init__(self, audio_config: AudioConfig, parent: QObject | None = None): + super().__init__(parent) + self._config = audio_config + self._frames: list[np.ndarray] = [] + self._stream: sd.InputStream | None = None + self._poll_timer = QTimer(self) + self._poll_timer.setInterval(50) + self._poll_timer.timeout.connect(self._poll_buffer) + self._recording = False + self._start_time: float = 0.0 + self._buffer: list[np.ndarray] = [] + + @property + def is_recording(self) -> bool: + return self._recording + + def start_recording(self) -> None: + """Start recording audio from the default input device.""" + if self._recording: + return + + self._frames.clear() + self._buffer.clear() + self._recording = True + self._start_time = time.monotonic() + + try: + self._stream = sd.InputStream( + samplerate=self._config.sample_rate, + channels=self._config.channels, + dtype=self._config.dtype, + callback=self._audio_callback, + ) + self._stream.start() + self._poll_timer.start() + except Exception as e: + self._recording = False + self.error_occurred.emit(f"Failed to start recording: {e}") + + def stop_recording(self, output_path: Path) -> None: + """Stop recording and save to WAV file.""" + if not self._recording: + return + + self._recording = False + self._poll_timer.stop() + + if self._stream is not None: + self._stream.stop() + self._stream.close() + self._stream = None + + duration = time.monotonic() - self._start_time + + if not self._frames: + self.error_occurred.emit("No audio data recorded") + return + + try: + audio_data = np.concatenate(self._frames, axis=0) + sf.write( + str(output_path), + audio_data, + self._config.sample_rate, + subtype="PCM_16", + ) + self.recording_finished.emit(str(output_path), duration) + except Exception as e: + self.error_occurred.emit(f"Failed to save recording: {e}") + + def _audio_callback(self, indata: np.ndarray, frames: int, time_info, status) -> None: + """Called by sounddevice in audio thread - just buffer data.""" + self._frames.append(indata.copy()) + self._buffer.append(indata.copy()) + + def _poll_buffer(self) -> None: + """Poll buffered audio from main thread, emit RMS level.""" + if not self._buffer: + return + + chunks = self._buffer.copy() + self._buffer.clear() + + combined = np.concatenate(chunks, axis=0).astype(np.float32) + # Normalize int16 range to 0.0-1.0 + rms = np.sqrt(np.mean(combined**2)) / 32768.0 + # Clamp and scale for UI visibility + level = min(1.0, rms * 5.0) + self.rms_level.emit(level) diff --git a/src/my_memory/board_window.py b/src/my_memory/board_window.py new file mode 100644 index 0000000..8c12feb --- /dev/null +++ b/src/my_memory/board_window.py @@ -0,0 +1,504 @@ +"""Kanban board window for managing memory entry lifecycle.""" + +from __future__ import annotations + +from datetime import datetime, timedelta +from pathlib import Path + +from PySide6.QtCore import QTimer, Qt +from PySide6.QtWidgets import ( + QApplication, + QFrame, + QHBoxLayout, + QLabel, + QMenu, + QPushButton, + QScrollArea, + QSizePolicy, + QVBoxLayout, + QWidget, +) + +from my_memory.config import Config +from my_memory.models import Entry, EntrySource, EntryStatus +from my_memory.storage import load_all_entries, update_entry_status + +# Column definitions: (display name, status, accent color) +_COLUMNS: list[tuple[str, EntryStatus, str]] = [ + ("On Docket", EntryStatus.DOCKET, "#f9e2af"), + ("In Progress", EntryStatus.IN_PROGRESS, "#cba6f7"), + ("Complete", EntryStatus.COMPLETE, "#a6e3a1"), +] + +# Adjacent moves for inline buttons +_STATUS_ORDER = [EntryStatus.DOCKET, EntryStatus.IN_PROGRESS, EntryStatus.COMPLETE] + + +def _relative_time(dt: datetime) -> str: + """Return a human-readable relative timestamp.""" + delta = datetime.now() - dt + seconds = int(delta.total_seconds()) + if seconds < 60: + return "just now" + minutes = seconds // 60 + if minutes < 60: + return f"{minutes}m ago" + hours = minutes // 60 + if hours < 24: + return f"{hours}h ago" + days = hours // 24 + if days < 30: + return f"{days}d ago" + return dt.strftime("%b %d") + + +class EntryCard(QFrame): + """A single entry card in a kanban column.""" + + def __init__( + self, + filepath: Path, + entry: Entry, + on_status_change: callable, + parent: QWidget | None = None, + ): + super().__init__(parent) + self._filepath = filepath + self._entry = entry + self._on_status_change = on_status_change + + self.setObjectName("entryCard") + self.setContextMenuPolicy(Qt.ContextMenuPolicy.CustomContextMenu) + self.customContextMenuRequested.connect(self._show_context_menu) + + self._setup_ui() + + def _setup_ui(self) -> None: + layout = QVBoxLayout(self) + layout.setContentsMargins(10, 8, 10, 8) + layout.setSpacing(6) + + # Content preview (first ~120 chars, max 3 lines) + preview_text = self._entry.content[:120] + if len(self._entry.content) > 120: + preview_text += "..." + content_label = QLabel(preview_text) + content_label.setObjectName("cardContent") + content_label.setWordWrap(True) + content_label.setMaximumHeight(54) + layout.addWidget(content_label) + + # Meta row: source + timestamp + tags + meta_parts: list[str] = [] + source_icon = "\U0001f3a4" if self._entry.source == EntrySource.VOICE else "\u270d" + meta_parts.append(source_icon) + meta_parts.append(_relative_time(self._entry.timestamp)) + if self._entry.tags: + meta_parts.append(" ".join(f"#{t}" for t in self._entry.tags[:3])) + + meta_label = QLabel(" ".join(meta_parts)) + meta_label.setObjectName("cardMeta") + layout.addWidget(meta_label) + + # Action buttons row + btn_layout = QHBoxLayout() + btn_layout.setSpacing(4) + btn_layout.addStretch() + + idx = _STATUS_ORDER.index(self._entry.status) + + if idx > 0: + prev_status = _STATUS_ORDER[idx - 1] + left_btn = QPushButton(f"\u2190 {_label_for(prev_status)}") + left_btn.setObjectName("moveBtn") + left_btn.setCursor(Qt.CursorShape.PointingHandCursor) + left_btn.clicked.connect(lambda: self._move_to(prev_status)) + btn_layout.addWidget(left_btn) + + if idx < len(_STATUS_ORDER) - 1: + next_status = _STATUS_ORDER[idx + 1] + right_btn = QPushButton(f"{_label_for(next_status)} \u2192") + right_btn.setObjectName("moveBtn") + right_btn.setCursor(Qt.CursorShape.PointingHandCursor) + right_btn.clicked.connect(lambda: self._move_to(next_status)) + btn_layout.addWidget(right_btn) + + layout.addLayout(btn_layout) + + def _move_to(self, new_status: EntryStatus) -> None: + update_entry_status(self._filepath, new_status) + self._on_status_change() + + def _show_context_menu(self, pos) -> None: + menu = QMenu(self) + menu.setStyleSheet(""" + QMenu { + background: #313244; + color: #cdd6f4; + border: 1px solid #45475a; + border-radius: 4px; + padding: 4px; + } + QMenu::item:selected { + background: #45475a; + } + """) + for status in EntryStatus: + if status == self._entry.status: + continue + action = menu.addAction(f"Move to {_label_for(status)}") + action.triggered.connect(lambda _checked, s=status: self._move_to(s)) + menu.exec(self.mapToGlobal(pos)) + + +def _label_for(status: EntryStatus) -> str: + """Short display label for a status.""" + return { + EntryStatus.DOCKET: "On Docket", + EntryStatus.IN_PROGRESS: "In Progress", + EntryStatus.COMPLETE: "Complete", + }[status] + + +class KanbanColumn(QWidget): + """A single column in the kanban board.""" + + def __init__( + self, + name: str, + status: EntryStatus, + accent: str, + collapsible: bool = False, + default_filter_days: int = 7, + parent: QWidget | None = None, + ): + super().__init__(parent) + self._name = name + self._status = status + self._accent = accent + self._collapsible = collapsible + self._collapsed = collapsible # start collapsed if collapsible + self._filter_days: int | None = default_filter_days if collapsible else None + self._all_entries: list[tuple[Path, Entry]] = [] + self._on_status_change: callable | None = None + + self._setup_ui() + + def _setup_ui(self) -> None: + layout = QVBoxLayout(self) + layout.setContentsMargins(0, 0, 0, 0) + layout.setSpacing(0) + + # Column header with count badge + self._header = QLabel(self._name) + self._header.setObjectName("columnHeader") + self._header.setStyleSheet(f""" + #columnHeader {{ + color: #cdd6f4; + font-size: 14px; + font-weight: bold; + padding: 10px 12px; + border-left: 3px solid {self._accent}; + background: #181825; + border-top-left-radius: 8px; + border-top-right-radius: 8px; + }} + """) + if self._collapsible: + self._header.setCursor(Qt.CursorShape.PointingHandCursor) + self._header.installEventFilter(self) + layout.addWidget(self._header) + + # Filter bar (only for collapsible columns) + self._filter_bar = None + if self._collapsible: + self._filter_bar = QWidget() + self._filter_bar.setStyleSheet("background: #181825;") + filter_layout = QHBoxLayout(self._filter_bar) + filter_layout.setContentsMargins(12, 0, 12, 6) + filter_layout.setSpacing(4) + + self._filter_buttons: dict[int | None, QPushButton] = {} + for label, days in [("7d", 7), ("30d", 30), ("All", None)]: + btn = QPushButton(label) + btn.setCursor(Qt.CursorShape.PointingHandCursor) + btn.setFixedHeight(22) + btn.clicked.connect(lambda _checked, d=days: self._set_filter(d)) + filter_layout.addWidget(btn) + self._filter_buttons[days] = btn + + filter_layout.addStretch() + layout.addWidget(self._filter_bar) + self._filter_bar.setVisible(False) + self._update_filter_styles() + + # Scroll area for cards + self._scroll = QScrollArea() + self._scroll.setObjectName("columnScroll") + self._scroll.setWidgetResizable(True) + self._scroll.setHorizontalScrollBarPolicy(Qt.ScrollBarPolicy.ScrollBarAlwaysOff) + self._scroll.setStyleSheet(""" + #columnScroll { + background: #181825; + border: none; + border-bottom-left-radius: 8px; + border-bottom-right-radius: 8px; + } + QScrollBar:vertical { + background: #181825; + width: 6px; + margin: 0; + } + QScrollBar::handle:vertical { + background: #45475a; + border-radius: 3px; + min-height: 20px; + } + QScrollBar::add-line:vertical, QScrollBar::sub-line:vertical { + height: 0; + } + """) + + self._card_container = QWidget() + self._card_layout = QVBoxLayout(self._card_container) + self._card_layout.setContentsMargins(6, 6, 6, 6) + self._card_layout.setSpacing(6) + self._card_layout.addStretch() + + self._scroll.setWidget(self._card_container) + layout.addWidget(self._scroll) + + # Scroll area stays visible even when collapsed (shows empty dark bg) + + # -- Collapse / filter logic ------------------------------------------ + + def eventFilter(self, obj, event) -> bool: + if obj is self._header and event.type() == event.Type.MouseButtonRelease: + self._toggle_collapsed() + return True + return super().eventFilter(obj, event) + + def _toggle_collapsed(self) -> None: + self._collapsed = not self._collapsed + if self._filter_bar is not None: + self._filter_bar.setVisible(not self._collapsed) + self._update_filter_styles() + self._update_header() + self._render_cards() + + def _set_filter(self, days: int | None) -> None: + self._filter_days = days + self._update_filter_styles() + self._render_cards() + self._update_header() + + def _update_filter_styles(self) -> None: + if self._filter_bar is None: + return + for days, btn in self._filter_buttons.items(): + if days == self._filter_days: + btn.setStyleSheet(f""" + background: {self._accent}; + color: #1e1e2e; + border: none; + border-radius: 4px; + padding: 2px 8px; + font-size: 11px; + font-weight: bold; + """) + else: + btn.setStyleSheet(""" + background: #45475a; + color: #bac2de; + border: none; + border-radius: 4px; + padding: 2px 8px; + font-size: 11px; + """) + + def _update_header(self) -> None: + total = len(self._all_entries) + if not self._collapsible: + self._header.setText(f"{self._name} ({total})") + return + if self._collapsed: + self._header.setText(f"\u25b8 {self._name} ({total})") + else: + filtered = self._filtered_entries() + if len(filtered) < total: + self._header.setText( + f"\u25be {self._name} ({len(filtered)} of {total})" + ) + else: + self._header.setText(f"\u25be {self._name} ({total})") + + def _filtered_entries(self) -> list[tuple[Path, Entry]]: + if self._filter_days is None: + return self._all_entries + cutoff = datetime.now() - timedelta(days=self._filter_days) + return [(fp, e) for fp, e in self._all_entries if e.timestamp >= cutoff] + + # -- Public API ------------------------------------------------------- + + @property + def status(self) -> EntryStatus: + return self._status + + def set_entries( + self, + entries: list[tuple[Path, Entry]], + on_status_change: callable, + ) -> None: + """Replace all cards in this column.""" + self._all_entries = entries + self._on_status_change = on_status_change + self._update_header() + self._render_cards() + + def _render_cards(self) -> None: + """Clear and rebuild card widgets, applying date filter if collapsible.""" + # Clear existing cards (keep the stretch at the end) + while self._card_layout.count() > 1: + item = self._card_layout.takeAt(0) + if item.widget(): + item.widget().deleteLater() + + if self._collapsed: + return + + entries = self._filtered_entries() if self._collapsible else self._all_entries + for filepath, entry in entries: + card = EntryCard(filepath, entry, self._on_status_change, self) + self._card_layout.insertWidget(self._card_layout.count() - 1, card) + + +class BoardWindow(QWidget): + """Kanban board window for managing memory entries.""" + + def __init__(self, config: Config): + super().__init__() + self._config = config + self._columns: list[KanbanColumn] = [] + self._debounce_timer = QTimer(self) + self._debounce_timer.setSingleShot(True) + self._debounce_timer.setInterval(300) + self._debounce_timer.timeout.connect(self._refresh) + + self._setup_ui() + self._setup_watcher() + + def _setup_ui(self) -> None: + self.setWindowTitle("My Memory - Board") + self.resize(900, 600) + self.setMinimumSize(700, 400) + self.setStyleSheet(self._stylesheet()) + + layout = QVBoxLayout(self) + layout.setContentsMargins(16, 16, 16, 16) + layout.setSpacing(12) + + # Header + header = QLabel("My Memory") + header.setObjectName("boardHeader") + layout.addWidget(header) + + # Columns + columns_layout = QHBoxLayout() + columns_layout.setSpacing(10) + + for name, status, accent in _COLUMNS: + col = KanbanColumn( + name, status, accent, + collapsible=(status == EntryStatus.COMPLETE), + ) + col.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding) + self._columns.append(col) + columns_layout.addWidget(col, stretch=1) + + layout.addLayout(columns_layout, stretch=1) + + def _setup_watcher(self) -> None: + from PySide6.QtCore import QFileSystemWatcher + + self._watcher = QFileSystemWatcher(self) + entries_dir = self._config.entries_dir + if entries_dir.exists(): + self._watcher.addPath(str(entries_dir)) + for subdir in entries_dir.iterdir(): + if subdir.is_dir(): + self._watcher.addPath(str(subdir)) + + self._watcher.directoryChanged.connect(self._on_directory_changed) + + def _on_directory_changed(self, _path: str) -> None: + # Re-watch new date directories that may have appeared + entries_dir = self._config.entries_dir + if entries_dir.exists(): + current = set(self._watcher.directories()) + for subdir in entries_dir.iterdir(): + if subdir.is_dir() and str(subdir) not in current: + self._watcher.addPath(str(subdir)) + + self._debounce_timer.start() + + def show_board(self) -> None: + """Show the board window and refresh entries.""" + self._refresh() + self.show() + self.raise_() + self.activateWindow() + + def _refresh(self) -> None: + """Reload all entries from disk and repopulate columns.""" + all_entries = load_all_entries(self._config) + + grouped: dict[EntryStatus, list[tuple[Path, Entry]]] = { + s: [] for s in EntryStatus + } + for filepath, entry in all_entries: + grouped[entry.status].append((filepath, entry)) + + for col in self._columns: + col.set_entries(grouped[col.status], self._refresh) + + @staticmethod + def _stylesheet() -> str: + return """ + BoardWindow { + background: #1e1e2e; + } + #boardHeader { + color: #cdd6f4; + font-size: 18px; + font-weight: bold; + } + #entryCard { + background: #313244; + border: 1px solid #45475a; + border-radius: 8px; + } + #entryCard:hover { + border-color: #585b70; + } + #cardContent { + color: #cdd6f4; + font-size: 13px; + line-height: 1.3; + } + #cardMeta { + color: #6c7086; + font-size: 11px; + } + #moveBtn { + background: #45475a; + color: #bac2de; + border: none; + border-radius: 4px; + padding: 3px 8px; + font-size: 11px; + } + #moveBtn:hover { + background: #585b70; + color: #cdd6f4; + } + """ diff --git a/src/my_memory/capture_window.py b/src/my_memory/capture_window.py new file mode 100644 index 0000000..13dcaf1 --- /dev/null +++ b/src/my_memory/capture_window.py @@ -0,0 +1,366 @@ +"""Capture popup window: text area, voice recording, level meter, submit.""" + +from __future__ import annotations + +import time +from pathlib import Path +from uuid import uuid4 + +from PySide6.QtCore import QTimer, Qt, Signal +from PySide6.QtGui import QCursor, QKeySequence, QShortcut +from PySide6.QtWidgets import ( + QApplication, + QHBoxLayout, + QLabel, + QProgressBar, + QPushButton, + QTextEdit, + QVBoxLayout, + QWidget, +) + +from my_memory.audio_recorder import AudioRecorder +from my_memory.config import Config +from my_memory.models import Entry, EntrySource +from my_memory.storage import save_entry +from my_memory.transcriber import Transcriber + + +class CaptureWindow(QWidget): + """Frameless popup for capturing text and voice entries.""" + + board_requested = Signal() + + def __init__(self, config: Config): + super().__init__() + self._config = config + self._entry_id = uuid4() + self._record_start_time: float = 0.0 + + # Audio components + self._recorder = AudioRecorder(config.audio, parent=self) + self._transcriber = Transcriber(config.whisper, parent=self) + + self._setup_ui() + self._connect_signals() + + def _setup_ui(self) -> None: + self.setWindowFlags( + Qt.WindowType.FramelessWindowHint + | Qt.WindowType.WindowStaysOnTopHint + | Qt.WindowType.Tool + ) + self.setAttribute(Qt.WidgetAttribute.WA_ShowWithoutActivating, False) + self.setFixedSize(480, 280) + self.setStyleSheet(self._stylesheet()) + + layout = QVBoxLayout(self) + layout.setContentsMargins(12, 12, 12, 12) + layout.setSpacing(8) + + # Header row + header_layout = QHBoxLayout() + header_layout.setSpacing(8) + header = QLabel("Capture Thought") + header.setObjectName("header") + header_layout.addWidget(header) + header_layout.addStretch() + board_btn = QPushButton("Board") + board_btn.setObjectName("boardBtn") + board_btn.setFixedSize(56, 24) + board_btn.setCursor(Qt.CursorShape.PointingHandCursor) + board_btn.clicked.connect(self._open_board) + header_layout.addWidget(board_btn) + layout.addLayout(header_layout) + + # Text area + self._text_edit = QTextEdit() + self._text_edit.setPlaceholderText("Type your thought here... (Ctrl+Enter to save)") + self._text_edit.setAcceptRichText(False) + layout.addWidget(self._text_edit) + + # Audio level meter + self._level_bar = QProgressBar() + self._level_bar.setRange(0, 100) + self._level_bar.setValue(0) + self._level_bar.setTextVisible(False) + self._level_bar.setFixedHeight(6) + self._level_bar.setObjectName("levelBar") + self._level_bar.hide() + layout.addWidget(self._level_bar) + + # Status label (for transcription progress, errors) + self._status_label = QLabel("") + self._status_label.setObjectName("status") + self._status_label.hide() + layout.addWidget(self._status_label) + + # Button row + btn_layout = QHBoxLayout() + btn_layout.setSpacing(8) + + self._record_btn = QPushButton("Record") + self._record_btn.setObjectName("recordBtn") + self._record_btn.setFixedWidth(100) + self._record_btn.clicked.connect(self._toggle_recording) + btn_layout.addWidget(self._record_btn) + + self._timer_label = QLabel("") + self._timer_label.setObjectName("timer") + btn_layout.addWidget(self._timer_label) + + btn_layout.addStretch() + + self._submit_btn = QPushButton("Save") + self._submit_btn.setObjectName("submitBtn") + self._submit_btn.setFixedWidth(80) + self._submit_btn.clicked.connect(self._submit) + btn_layout.addWidget(self._submit_btn) + + layout.addLayout(btn_layout) + + # Keyboard shortcuts + QShortcut(QKeySequence("Ctrl+Return"), self, self._submit) + QShortcut(QKeySequence("Escape"), self, self._dismiss) + + # Recording timer for UI updates + self._record_timer = QTimer(self) + self._record_timer.setInterval(100) + self._record_timer.timeout.connect(self._update_record_timer) + + def _connect_signals(self) -> None: + self._recorder.rms_level.connect(self._on_rms_level) + self._recorder.recording_finished.connect(self._on_recording_finished) + self._recorder.error_occurred.connect(self._on_error) + + self._transcriber.transcription_started.connect(self._on_transcription_started) + self._transcriber.transcription_ready.connect(self._on_transcription_ready) + self._transcriber.transcription_error.connect(self._on_error) + + def show_centered(self) -> None: + """Show window centered on the screen under the cursor.""" + self._reset_state() + + screen = QApplication.screenAt(QCursor.pos()) + if screen is None: + screen = QApplication.primaryScreen() + + geo = screen.availableGeometry() + x = geo.x() + (geo.width() - self.width()) // 2 + y = geo.y() + (geo.height() - self.height()) // 2 + self.move(x, y) + + self.show() + self.raise_() + self.activateWindow() + self._text_edit.setFocus() + + def _reset_state(self) -> None: + """Reset window state for a fresh capture.""" + self._entry_id = uuid4() + self._text_edit.clear() + self._status_label.hide() + self._level_bar.hide() + self._level_bar.setValue(0) + self._timer_label.setText("") + self._record_btn.setText("Record") + self._record_btn.setProperty("recording", False) + self._record_btn.style().unpolish(self._record_btn) + self._record_btn.style().polish(self._record_btn) + self._submit_btn.setEnabled(True) + self._record_btn.setEnabled(True) + + def _toggle_recording(self) -> None: + if self._recorder.is_recording: + self._stop_recording() + else: + self._start_recording() + + def _start_recording(self) -> None: + self._record_start_time = time.monotonic() + self._level_bar.show() + self._record_btn.setText("Stop") + self._record_btn.setProperty("recording", True) + self._record_btn.style().unpolish(self._record_btn) + self._record_btn.style().polish(self._record_btn) + self._record_timer.start() + self._recorder.start_recording() + + def _stop_recording(self) -> None: + self._record_timer.stop() + + # Save WAV alongside future entry + date_str = __import__("datetime").datetime.now().strftime("%Y-%m-%d") + day_dir = self._config.entries_dir / date_str + day_dir.mkdir(parents=True, exist_ok=True) + wav_path = day_dir / f"{self._entry_id}.wav" + + self._recorder.stop_recording(wav_path) + + self._record_btn.setText("Record") + self._record_btn.setProperty("recording", False) + self._record_btn.style().unpolish(self._record_btn) + self._record_btn.style().polish(self._record_btn) + + def _update_record_timer(self) -> None: + elapsed = time.monotonic() - self._record_start_time + mins = int(elapsed) // 60 + secs = int(elapsed) % 60 + self._timer_label.setText(f"{mins}:{secs:02d}") + + def _on_rms_level(self, level: float) -> None: + self._level_bar.setValue(int(level * 100)) + + def _on_recording_finished(self, wav_path: str, duration: float) -> None: + self._level_bar.hide() + self._timer_label.setText("") + self._wav_path = wav_path + self._wav_duration = duration + + # Auto-transcribe + self._transcriber.transcribe(wav_path) + + def _on_transcription_started(self) -> None: + self._status_label.setText("Transcribing...") + self._status_label.show() + self._submit_btn.setEnabled(False) + self._record_btn.setEnabled(False) + + def _on_transcription_ready(self, text: str) -> None: + self._status_label.hide() + self._submit_btn.setEnabled(True) + self._record_btn.setEnabled(True) + + # Append transcribed text + existing = self._text_edit.toPlainText().strip() + if existing: + self._text_edit.setPlainText(f"{existing}\n\n{text}") + else: + self._text_edit.setPlainText(text) + + # Move cursor to end + cursor = self._text_edit.textCursor() + cursor.movePosition(cursor.MoveOperation.End) + self._text_edit.setTextCursor(cursor) + + def _on_error(self, msg: str) -> None: + self._status_label.setText(f"Error: {msg}") + self._status_label.show() + self._submit_btn.setEnabled(True) + self._record_btn.setEnabled(True) + self._level_bar.hide() + self._record_btn.setText("Record") + self._record_btn.setProperty("recording", False) + self._record_btn.style().unpolish(self._record_btn) + self._record_btn.style().polish(self._record_btn) + + def _submit(self) -> None: + content = self._text_edit.toPlainText().strip() + if not content: + return + + has_audio = hasattr(self, "_wav_path") + entry = Entry( + id=self._entry_id, + source=EntrySource.VOICE if has_audio else EntrySource.TEXT, + content=content, + audio_file=f"{self._entry_id}.wav" if has_audio else None, + duration_seconds=self._wav_duration if has_audio else None, + ) + + save_entry(entry, self._config) + self._dismiss() + + def _open_board(self) -> None: + """Request the board window and dismiss capture.""" + self._dismiss() + self.board_requested.emit() + + def _dismiss(self) -> None: + """Hide the window and stop any recording.""" + if self._recorder.is_recording: + self._recorder.stop_recording(Path("/dev/null")) + self._record_timer.stop() + self.hide() + + @staticmethod + def _stylesheet() -> str: + return """ + CaptureWindow { + background: #1e1e2e; + border: 1px solid #45475a; + border-radius: 12px; + } + #header { + color: #cdd6f4; + font-size: 14px; + font-weight: bold; + } + QTextEdit { + background: #313244; + color: #cdd6f4; + border: 1px solid #45475a; + border-radius: 6px; + padding: 8px; + font-size: 14px; + selection-background-color: #585b70; + } + #levelBar { + background: #313244; + border: none; + border-radius: 3px; + } + #levelBar::chunk { + background: #a6e3a1; + border-radius: 3px; + } + #status { + color: #a6adc8; + font-size: 12px; + } + #timer { + color: #f38ba8; + font-size: 13px; + font-weight: bold; + } + QPushButton { + background: #45475a; + color: #cdd6f4; + border: none; + border-radius: 6px; + padding: 6px 16px; + font-size: 13px; + } + QPushButton:hover { + background: #585b70; + } + QPushButton:disabled { + background: #313244; + color: #6c7086; + } + #submitBtn { + background: #89b4fa; + color: #1e1e2e; + font-weight: bold; + } + #submitBtn:hover { + background: #b4d0fb; + } + #recordBtn[recording="true"] { + background: #f38ba8; + color: #1e1e2e; + font-weight: bold; + } + #boardBtn { + background: transparent; + color: #6c7086; + border: 1px solid #45475a; + border-radius: 4px; + padding: 2px 8px; + font-size: 11px; + } + #boardBtn:hover { + color: #cdd6f4; + border-color: #585b70; + } + """ diff --git a/src/my_memory/config.py b/src/my_memory/config.py new file mode 100644 index 0000000..35fce31 --- /dev/null +++ b/src/my_memory/config.py @@ -0,0 +1,81 @@ +"""Configuration management with optional TOML overrides.""" + +from __future__ import annotations + +import tomllib +from dataclasses import dataclass, field +from pathlib import Path + + +def _default_base_dir() -> Path: + return Path.home() / ".my-memory" + + +@dataclass +class WhisperConfig: + model_size: str = "base" + device: str = "auto" + compute_type: str = "float16" + cpu_fallback_compute_type: str = "int8" + + +@dataclass +class AudioConfig: + sample_rate: int = 16000 + channels: int = 1 + dtype: str = "int16" + + +@dataclass +class Config: + base_dir: Path = field(default_factory=_default_base_dir) + whisper: WhisperConfig = field(default_factory=WhisperConfig) + audio: AudioConfig = field(default_factory=AudioConfig) + + @property + def entries_dir(self) -> Path: + return self.base_dir / "entries" + + @property + def config_file(self) -> Path: + return self.base_dir / "config.toml" + + @property + def schema_file(self) -> Path: + return self.base_dir / "schema.md" + + def ensure_dirs(self) -> None: + self.base_dir.mkdir(parents=True, exist_ok=True) + self.entries_dir.mkdir(parents=True, exist_ok=True) + + @classmethod + def load(cls, base_dir: Path | None = None) -> Config: + """Load config, applying TOML overrides if config.toml exists.""" + config = cls() + if base_dir is not None: + config.base_dir = base_dir + + if config.config_file.exists(): + with open(config.config_file, "rb") as f: + data = tomllib.load(f) + _apply_overrides(config, data) + + return config + + +def _apply_overrides(config: Config, data: dict) -> None: + """Apply TOML overrides onto the config dataclass.""" + if "base_dir" in data: + config.base_dir = Path(data["base_dir"]).expanduser() + + if "whisper" in data: + w = data["whisper"] + for key in ("model_size", "device", "compute_type", "cpu_fallback_compute_type"): + if key in w: + setattr(config.whisper, key, w[key]) + + if "audio" in data: + a = data["audio"] + for key in ("sample_rate", "channels", "dtype"): + if key in a: + setattr(config.audio, key, a[key]) diff --git a/src/my_memory/models.py b/src/my_memory/models.py new file mode 100644 index 0000000..1a5d84d --- /dev/null +++ b/src/my_memory/models.py @@ -0,0 +1,65 @@ +"""Pydantic data models for memory entries.""" + +from __future__ import annotations + +from datetime import datetime +from enum import Enum +from uuid import UUID, uuid4 + +from pydantic import BaseModel, Field + + +class EntrySource(str, Enum): + TEXT = "text" + VOICE = "voice" + + +class EntryStatus(str, Enum): + DOCKET = "docket" + IN_PROGRESS = "in_progress" + COMPLETE = "complete" + + +class Entry(BaseModel): + id: UUID = Field(default_factory=uuid4) + timestamp: datetime = Field(default_factory=datetime.now) + source: EntrySource = EntrySource.TEXT + tags: list[str] = Field(default_factory=list) + status: EntryStatus = EntryStatus.DOCKET + audio_file: str | None = None + duration_seconds: float | None = None + content: str = "" + + def to_frontmatter_dict(self) -> dict: + """Return dict suitable for YAML frontmatter (no content).""" + d: dict = { + "id": str(self.id), + "timestamp": self.timestamp.isoformat(), + "source": self.source.value, + "tags": self.tags, + "status": self.status.value, + } + if self.audio_file is not None: + d["audio_file"] = self.audio_file + if self.duration_seconds is not None: + d["duration_seconds"] = self.duration_seconds + return d + + @classmethod + def from_frontmatter(cls, metadata: dict, content: str) -> Entry: + """Reconstruct an Entry from parsed frontmatter metadata + body content.""" + # Handle legacy entries that have 'processed' instead of 'status' + status_val = metadata.get("status", "docket") + if "status" not in metadata and metadata.get("processed", False): + status_val = "complete" + + return cls( + id=UUID(metadata["id"]), + timestamp=datetime.fromisoformat(metadata["timestamp"]), + source=EntrySource(metadata["source"]), + tags=metadata.get("tags", []), + status=EntryStatus(status_val), + audio_file=metadata.get("audio_file"), + duration_seconds=metadata.get("duration_seconds"), + content=content.strip(), + ) diff --git a/src/my_memory/schema.py b/src/my_memory/schema.py new file mode 100644 index 0000000..0c898ff --- /dev/null +++ b/src/my_memory/schema.py @@ -0,0 +1,73 @@ +"""Auto-generate schema.md for AI agent discovery.""" + +from __future__ import annotations + +from my_memory.config import Config + +SCHEMA_CONTENT = """\ +# My-Memory Entry Schema + +This directory contains captured thoughts stored as markdown files with YAML frontmatter. + +## Directory Structure + +``` +~/.my-memory/ +├── config.toml # Optional configuration overrides +├── schema.md # This file (auto-generated) +└── entries/ + └── YYYY-MM-DD/ + ├── {uuid}.md # Entry file + └── {uuid}.wav # Optional audio recording +``` + +## Entry Format + +Each `.md` file contains YAML frontmatter followed by the entry content: + +```yaml +--- +id: "550e8400-e29b-41d4-a716-446655440000" # UUID v4 +timestamp: "2026-02-11T14:30:00" # ISO 8601 +source: "text" # "text" or "voice" +tags: [] # User-defined tags +status: "docket" # Workflow status (see below) +audio_file: "550e8400-...440000.wav" # Present only for voice entries +duration_seconds: 12.5 # Present only for voice entries +--- + +The captured thought content here. +``` + +## Fields + +| Field | Type | Required | Description | +|-------|------|----------|-------------| +| `id` | UUID string | Yes | Unique identifier | +| `timestamp` | ISO 8601 string | Yes | When the entry was captured | +| `source` | `"text"` or `"voice"` | Yes | How the entry was created | +| `tags` | list of strings | Yes | User tags (may be empty) | +| `status` | `"docket"`, `"in_progress"`, or `"complete"` | Yes | Workflow status for kanban board | +| `audio_file` | string or absent | No | Filename of WAV recording (voice entries only) | +| `duration_seconds` | float or absent | No | Recording duration in seconds (voice entries only) | + +## Audio Format + +WAV files are 16kHz mono PCM16 (standard for speech processing). + +## Status Lifecycle + +Entries move through the kanban board: +1. **docket** - Newly captured, waiting to be acted on +2. **in_progress** - Currently being worked on +3. **complete** - Done +""" + + +def ensure_schema(config: Config) -> None: + """Write schema.md if it doesn't exist or content has changed.""" + config.ensure_dirs() + schema_path = config.schema_file + if schema_path.exists() and schema_path.read_text() == SCHEMA_CONTENT: + return + schema_path.write_text(SCHEMA_CONTENT) diff --git a/src/my_memory/storage.py b/src/my_memory/storage.py new file mode 100644 index 0000000..720a389 --- /dev/null +++ b/src/my_memory/storage.py @@ -0,0 +1,73 @@ +"""Markdown + YAML frontmatter file I/O for memory entries.""" + +from __future__ import annotations + +from pathlib import Path + +import frontmatter + +from my_memory.config import Config +from my_memory.models import Entry, EntryStatus + + +def save_entry(entry: Entry, config: Config) -> Path: + """Save an entry as a markdown file with YAML frontmatter. + + Returns the path to the saved file. + """ + date_str = entry.timestamp.strftime("%Y-%m-%d") + day_dir = config.entries_dir / date_str + day_dir.mkdir(parents=True, exist_ok=True) + + filename = f"{entry.id}.md" + filepath = day_dir / filename + + post = frontmatter.Post(entry.content, **entry.to_frontmatter_dict()) + filepath.write_text(frontmatter.dumps(post) + "\n") + + return filepath + + +def load_entry(filepath: Path) -> Entry: + """Load an entry from a markdown file with YAML frontmatter.""" + post = frontmatter.load(str(filepath)) + return Entry.from_frontmatter(post.metadata, post.content) + + +def list_entries(config: Config, date_str: str | None = None) -> list[Path]: + """List all entry files, optionally filtered by date (YYYY-MM-DD). + + Returns paths sorted by modification time (newest first). + """ + if date_str: + search_dir = config.entries_dir / date_str + if not search_dir.exists(): + return [] + files = list(search_dir.glob("*.md")) + else: + files = list(config.entries_dir.rglob("*.md")) + + return sorted(files, key=lambda p: p.stat().st_mtime, reverse=True) + + +def update_entry_status(filepath: Path, new_status: EntryStatus) -> None: + """Update only the status field in an entry's frontmatter on disk.""" + post = frontmatter.load(str(filepath)) + post.metadata["status"] = new_status.value + # Remove legacy 'processed' key if present + post.metadata.pop("processed", None) + filepath.write_text(frontmatter.dumps(post) + "\n") + + +def load_all_entries(config: Config) -> list[tuple[Path, Entry]]: + """Load all entries with their file paths. + + Returns list of (filepath, entry) tuples sorted by timestamp (newest first). + """ + results: list[tuple[Path, Entry]] = [] + for filepath in config.entries_dir.rglob("*.md"): + if filepath.name == "schema.md": + continue + entry = load_entry(filepath) + results.append((filepath, entry)) + return sorted(results, key=lambda t: t[1].timestamp, reverse=True) diff --git a/src/my_memory/transcriber.py b/src/my_memory/transcriber.py new file mode 100644 index 0000000..5475a5b --- /dev/null +++ b/src/my_memory/transcriber.py @@ -0,0 +1,130 @@ +"""Lazy-loaded faster-whisper transcription with QThread worker.""" + +from __future__ import annotations + +import logging + +from PySide6.QtCore import QObject, QThread, Signal + +from my_memory.config import WhisperConfig + +logger = logging.getLogger(__name__) + +# Module-level lazy singleton +_model = None +_model_config: WhisperConfig | None = None + + +def _load_model(config: WhisperConfig): + """Load the faster-whisper model, falling back to CPU if CUDA fails.""" + global _model, _model_config + + if _model is not None and _model_config == config: + return _model + + from faster_whisper import WhisperModel + + device = config.device + compute_type = config.compute_type + + if device == "auto": + # Try CUDA first, fall back to CPU + try: + logger.info("Loading Whisper model '%s' on CUDA...", config.model_size) + _model = WhisperModel( + config.model_size, device="cuda", compute_type=compute_type + ) + _model_config = config + logger.info("Whisper model loaded on CUDA") + return _model + except Exception as e: + logger.warning("CUDA failed (%s), falling back to CPU", e) + device = "cpu" + compute_type = config.cpu_fallback_compute_type + + logger.info("Loading Whisper model '%s' on %s...", config.model_size, device) + _model = WhisperModel(config.model_size, device=device, compute_type=compute_type) + _model_config = config + logger.info("Whisper model loaded on %s", device) + return _model + + +def transcribe_file(wav_path: str, config: WhisperConfig) -> str: + """Transcribe a WAV file and return the text.""" + model = _load_model(config) + segments, _info = model.transcribe(wav_path, beam_size=5) + return " ".join(segment.text.strip() for segment in segments).strip() + + +class TranscriptionWorker(QObject): + """Worker that runs transcription in a background thread.""" + + finished = Signal(str) # transcribed text + error = Signal(str) # error message + + def __init__(self, wav_path: str, config: WhisperConfig): + super().__init__() + self._wav_path = wav_path + self._config = config + + def run(self) -> None: + try: + text = transcribe_file(self._wav_path, self._config) + self.finished.emit(text) + except Exception as e: + self.error.emit(f"Transcription failed: {e}") + + +class Transcriber(QObject): + """Manages background transcription via QThread.""" + + transcription_ready = Signal(str) # transcribed text + transcription_error = Signal(str) # error message + transcription_started = Signal() + + def __init__(self, whisper_config: WhisperConfig, parent: QObject | None = None): + super().__init__(parent) + self._config = whisper_config + self._thread: QThread | None = None + self._worker: TranscriptionWorker | None = None + + @property + def is_busy(self) -> bool: + return self._thread is not None and self._thread.isRunning() + + def transcribe(self, wav_path: str) -> None: + """Start transcription in a background thread.""" + if self.is_busy: + self.transcription_error.emit("Transcription already in progress") + return + + self.transcription_started.emit() + + self._thread = QThread() + self._worker = TranscriptionWorker(wav_path, self._config) + self._worker.moveToThread(self._thread) + + self._thread.started.connect(self._worker.run) + self._worker.finished.connect(self._on_finished) + self._worker.error.connect(self._on_error) + + self._thread.start() + + def _on_finished(self, text: str) -> None: + self._cleanup_thread() + self.transcription_ready.emit(text) + + def _on_error(self, error_msg: str) -> None: + self._cleanup_thread() + self.transcription_error.emit(error_msg) + + def _cleanup_thread(self) -> None: + if self._thread is not None: + self._thread.quit() + self._thread.wait() + self._thread = None + self._worker = None + + def download_model(self) -> None: + """Pre-download the Whisper model (blocking call for CLI use).""" + _load_model(self._config)