From 3bf3ead3967389f3d3dbf5498fab1e36f57991bd Mon Sep 17 00:00:00 2001 From: Xinyu Du Date: Mon, 27 Apr 2026 13:29:37 +0800 Subject: [PATCH 01/22] fix(memos-local-plugin): prevent orphan episode scan from closing active sessions; add episode delete API Two related fixes: 1. Orphan episode protection (2 files): - core/pipeline/memory-core.ts: init() now checks session.meta.closedAt before treating open episodes as orphans. Episodes from sessions that haven't been explicitly closed are no longer abandoned on bridge restart. - core/session/manager.ts: closeSession() now stamps session.meta.closedAt so future init() calls can distinguish 'explicitly closed' from 'crashed and might reconnect'. 2. WebUI Tasks page bulk delete (3 files): - core/storage/repos/episodes.ts: added deleteById() method - core/pipeline/memory-core.ts: added deleteEpisode() / deleteEpisodes() - agent-contract/memory-core.ts: added interface signatures - server/routes/session.ts: DELETE /api/v1/episodes now calls deleteEpisode (actually removes the row + cascading traces) instead of closeEpisode (no-op on already-closed episodes). Added POST /api/v1/episodes/delete for bulk operations. --- .../agent-contract/memory-core.ts | 2 ++ .../core/pipeline/memory-core.ts | 30 +++++++++++++++++-- .../core/session/manager.ts | 8 +++++ .../core/storage/repos/episodes.ts | 7 +++++ .../server/routes/session.ts | 23 ++++++++------ 5 files changed, 59 insertions(+), 11 deletions(-) diff --git a/apps/memos-local-plugin/agent-contract/memory-core.ts b/apps/memos-local-plugin/agent-contract/memory-core.ts index fbc4c2fc3..e8449503c 100644 --- a/apps/memos-local-plugin/agent-contract/memory-core.ts +++ b/apps/memos-local-plugin/agent-contract/memory-core.ts @@ -91,6 +91,8 @@ export interface MemoryCore { userMessage?: string; }): Promise; closeEpisode(episodeId: EpisodeId): Promise; + deleteEpisode(episodeId: EpisodeId): Promise<{ deleted: boolean }>; + deleteEpisodes(ids: readonly EpisodeId[]): Promise<{ deleted: number }>; // ── pipeline (per turn) ── /** Called *before* the agent acts. Returns the context to inject. */ diff --git a/apps/memos-local-plugin/core/pipeline/memory-core.ts b/apps/memos-local-plugin/core/pipeline/memory-core.ts index 8f5c1d60b..b7952aaf4 100644 --- a/apps/memos-local-plugin/core/pipeline/memory-core.ts +++ b/apps/memos-local-plugin/core/pipeline/memory-core.ts @@ -332,9 +332,19 @@ export function createMemoryCore( // to `closed` + sets `closeReason: "abandoned"` without touching // trace_ids_json. try { - const orphans = handle.repos.episodes.list({ status: "open", limit: 500 }); + const openEpisodes = handle.repos.episodes.list({ status: "open", limit: 500 }); + // Only treat an open episode as an orphan if its session has been + // explicitly closed (meta.closedAt is set) or no longer exists. + // Otherwise the session might reconnect — leave it alone. + const orphans = openEpisodes.filter((ep) => { + const session = handle.repos.sessions.getById(ep.sessionId); + if (!session) return true; // session row gone — orphan + if (session.meta?.closedAt != null) return true; // explicitly closed — orphan + return false; // session exists and not closed — skip, might reconnect + }); if (orphans.length > 0) { - log.info("init.orphan_episodes.close", { count: orphans.length }); + const skipped = openEpisodes.length - orphans.length; + log.info("init.orphan_episodes.close", { count: orphans.length, skipped }); const endedAt = Date.now(); for (const ep of orphans) { try { @@ -632,6 +642,20 @@ export function createMemoryCore( handle.sessionManager.finalizeEpisode(episodeId); } + async function deleteEpisode(episodeId: EpisodeId): Promise<{ deleted: boolean }> { + ensureLive(); + return { deleted: handle.repos.episodes.deleteById(episodeId) }; + } + + async function deleteEpisodes(ids: readonly EpisodeId[]): Promise<{ deleted: number }> { + ensureLive(); + let deleted = 0; + for (const id of ids) { + if (handle.repos.episodes.deleteById(id)) deleted++; + } + return { deleted }; + } + // ─── Pipeline (per turn) ── async function onTurnStart( turn: Parameters[0], @@ -1947,6 +1971,8 @@ export function createMemoryCore( closeSession, openEpisode, closeEpisode, + deleteEpisode, + deleteEpisodes, onTurnStart, onTurnEnd, submitFeedback, diff --git a/apps/memos-local-plugin/core/session/manager.ts b/apps/memos-local-plugin/core/session/manager.ts index f92356219..547d1f0db 100644 --- a/apps/memos-local-plugin/core/session/manager.ts +++ b/apps/memos-local-plugin/core/session/manager.ts @@ -143,6 +143,14 @@ export function createSessionManager(deps: SessionManagerDeps): SessionManager { for (const ep of epm.listForSession(id)) { if (ep.status === "open") epm.abandon(ep.id, `session_closed:${reason}`); } + // Stamp the session row with closedAt so future bridge init() + // can distinguish "explicitly closed" from "might reconnect". + try { + const row = deps.sessionsRepo.getById(id); + if (row) { + deps.sessionsRepo.touchLastSeen(id, Date.now(), { closedAt: Date.now() }); + } + } catch { /* best-effort */ } live.delete(id); log.info("session.closed", { sessionId: id, reason }); bus.emit({ kind: "session.closed", sessionId: id, reason }); diff --git a/apps/memos-local-plugin/core/storage/repos/episodes.ts b/apps/memos-local-plugin/core/storage/repos/episodes.ts index ccff7eed9..dceb638e9 100644 --- a/apps/memos-local-plugin/core/storage/repos/episodes.ts +++ b/apps/memos-local-plugin/core/storage/repos/episodes.ts @@ -90,6 +90,13 @@ export function makeEpisodesRepo(db: StorageDb) { ).run({ id }); }, + deleteById(id: EpisodeId): boolean { + const r = db.prepare<{ id: string }>( + `DELETE FROM episodes WHERE id=@id`, + ).run({ id }); + return r.changes > 0; + }, + setRTask(id: EpisodeId, rTask: number): void { db.prepare<{ id: string; r: number }>( `UPDATE episodes SET r_task=@r WHERE id=@id`, diff --git a/apps/memos-local-plugin/server/routes/session.ts b/apps/memos-local-plugin/server/routes/session.ts index 0e45717b9..9cc598634 100644 --- a/apps/memos-local-plugin/server/routes/session.ts +++ b/apps/memos-local-plugin/server/routes/session.ts @@ -51,8 +51,20 @@ export function registerSessionRoutes(routes: Routes, deps: ServerDeps): void { writeError(ctx, 400, "invalid_argument", "episodeId is required"); return; } - await deps.core.closeEpisode(id as EpisodeId); - return { ok: true }; + const result = await deps.core.deleteEpisode(id as EpisodeId); + return result; + }); + + routes.set("POST /api/v1/episodes/delete", async (ctx) => { + const body = parseJson<{ ids?: unknown }>(ctx); + const ids = Array.isArray(body.ids) + ? body.ids.filter((v): v is string => typeof v === "string" && v.length > 0) + : []; + if (ids.length === 0) { + writeError(ctx, 400, "invalid_argument", "ids[] is required"); + return; + } + return await deps.core.deleteEpisodes(ids as EpisodeId[]); }); routes.set("GET /api/v1/episodes", async (ctx) => { @@ -61,10 +73,6 @@ export function registerSessionRoutes(routes: Routes, deps: ServerDeps): void { const rawOffset = numberOrUndefined(ctx.url.searchParams.get("offset")); const limit = rawLimit && rawLimit > 0 ? rawLimit : 50; const offset = rawOffset && rawOffset >= 0 ? rawOffset : 0; - // Return the rich row shape — the viewer's task list needs - // session id / status / turn count / preview. The old `ids`-only - // variant is still available under the `episode.list` JSON-RPC - // method and via `?shape=ids`. if (ctx.url.searchParams.get("shape") === "ids") { const episodeIds = await deps.core.listEpisodes({ sessionId, limit, offset }); return { @@ -83,9 +91,6 @@ export function registerSessionRoutes(routes: Routes, deps: ServerDeps): void { }; }); - // Backward-compat: legacy `/api/v1/episodes/timeline?episodeId=…` - // still works; the preferred path `/api/v1/episodes/:id/timeline` - // is registered in `trace.ts`. routes.set("GET /api/v1/episodes/timeline", async (ctx) => { const episodeId = ctx.url.searchParams.get("episodeId"); if (!episodeId) { From 328d1a5d6ea88aec26fd93284e3409fcb9a38e83 Mon Sep 17 00:00:00 2001 From: Xinyu Du Date: Mon, 27 Apr 2026 13:40:10 +0800 Subject: [PATCH 02/22] fix(memos-local-plugin): prevent bridge process flood and crash on Hermes session restart Three fixes for the Hermes bridge adapter: 1. Use tsx runtime instead of node --experimental-strip-types 2. PID file to prevent duplicate bridge processes 3. Bridge lifetime tracking via register_bridge() --- .../hermes/memos_provider/bridge_client.py | 16 +++- .../hermes/memos_provider/daemon_manager.py | 91 ++++++++++++++++++- 2 files changed, 104 insertions(+), 3 deletions(-) diff --git a/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py b/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py index b63b190fe..290d638ff 100644 --- a/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py +++ b/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py @@ -22,6 +22,9 @@ from pathlib import Path from typing import TYPE_CHECKING, Any +# Ensure at-most-one bridge instance via PID file. +from daemon_manager import kill_existing_bridge, register_bridge + if TYPE_CHECKING: from collections.abc import Callable @@ -72,9 +75,16 @@ def __init__( script = bridge_path or str( Path(__file__).resolve().parent.parent.parent.parent / "bridge.cts" ) + tsx_bin = str( + Path(__file__).resolve().parent.parent.parent.parent / "node_modules" / ".bin" / "tsx" + ) + runtime = tsx_bin if Path(tsx_bin).exists() else node + runtime_args = [] if Path(tsx_bin).exists() else ["--experimental-strip-types"] env = {**os.environ, **(extra_env or {})} + # Kill any previously-running bridge before spawning a new one. + kill_existing_bridge() self._proc = subprocess.Popen( - [node, "--experimental-strip-types", script, f"--agent={agent}"], + [runtime, *runtime_args, script, f"--agent={agent}"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, @@ -82,6 +92,8 @@ def __init__( bufsize=1, env=env, ) + # Register the new process so daemon_manager can track it. + register_bridge(self._proc) self._reader = threading.Thread( target=self._read_loop, daemon=True, @@ -163,6 +175,8 @@ def close(self) -> None: self._proc.wait(timeout=5.0) except subprocess.TimeoutExpired: self._proc.kill() + # Clear PID file so a future incarnation starts fresh. + register_bridge(None) # unblock any pending waiters with self._lock: for entry in list(self._pending.values()): diff --git a/apps/memos-local-plugin/adapters/hermes/memos_provider/daemon_manager.py b/apps/memos-local-plugin/adapters/hermes/memos_provider/daemon_manager.py index 62810cc5b..d859aaa13 100644 --- a/apps/memos-local-plugin/adapters/hermes/memos_provider/daemon_manager.py +++ b/apps/memos-local-plugin/adapters/hermes/memos_provider/daemon_manager.py @@ -5,6 +5,8 @@ - Probe Node.js availability so ``MemTensorProvider.is_available`` can answer cheaply at plugin-startup time. - Graceful shutdown helpers invoked from ``MemTensorProvider.shutdown``. +- PID file management to prevent duplicate bridge processes across + Hermes session restarts. This file intentionally has **no runtime dependency** on the client; the provider instantiates its own client. Keeping these concerns split means @@ -17,9 +19,12 @@ from __future__ import annotations import logging +import os import shutil +import signal import subprocess import threading +import time from pathlib import Path @@ -28,6 +33,41 @@ _lock = threading.Lock() _bridge_ok: bool | None = None +_ACTIVE_BRIDGE_PROC: subprocess.Popen | None = None + + +# ─── PID file helpers ──────────────────────────────────────────────────── + + +def _pid_path() -> Path: + """Path to the singleton PID file under the plugin data directory.""" + return Path(__file__).resolve().parent.parent.parent.parent / "data" / "bridge.pid" + + +def _read_pid() -> int | None: + try: + return int(_pid_path().read_text().strip()) + except (FileNotFoundError, ValueError): + return None + + +def _write_pid(pid: int) -> None: + _pid_path().write_text(str(pid)) + + +def _clean_pid() -> None: + _pid_path().unlink(missing_ok=True) + + +def _pid_alive(pid: int) -> bool: + try: + os.kill(pid, 0) + return True + except (OSError, PermissionError): + return False + + +# ─── Bridge lifecycle ──────────────────────────────────────────────────── def _bridge_script() -> Path: @@ -69,8 +109,55 @@ def ensure_bridge_running(*, probe_only: bool = False) -> bool: return True +def kill_existing_bridge() -> None: + """Kill any previously-running bridge process recorded in the PID file. + + Called **before** spawning a new bridge to guarantee at-most-one + instance. Safe to call even when no stale PID exists. + """ + pid = _read_pid() + if pid is not None and _pid_alive(pid): + logger.info("MemOS: killing stale bridge (pid=%d)", pid) + try: + os.kill(pid, signal.SIGTERM) + for _ in range(25): # wait up to 2.5 s + if not _pid_alive(pid): + break + time.sleep(0.1) + else: + os.kill(pid, signal.SIGKILL) + except (OSError, ProcessLookupError): + pass + _clean_pid() + + +def register_bridge(proc: subprocess.Popen | None) -> None: + """Record the current running bridge process. + + Pass ``None`` (e.g. on close) to clear the registration and PID file. + """ + global _ACTIVE_BRIDGE_PROC + _ACTIVE_BRIDGE_PROC = proc + if proc is not None: + _write_pid(proc.pid) + else: + _clean_pid() + + def shutdown_bridge() -> None: - """Best-effort cleanup; each client owns its own subprocess.""" - global _bridge_ok + """Gracefully shut down the tracked bridge subprocess and clean PID file.""" + global _bridge_ok, _ACTIVE_BRIDGE_PROC with _lock: _bridge_ok = None + if _ACTIVE_BRIDGE_PROC is not None: + try: + _ACTIVE_BRIDGE_PROC.terminate() + _ACTIVE_BRIDGE_PROC.wait(timeout=5.0) + logger.info("MemOS: bridge terminated (pid=%d)", _ACTIVE_BRIDGE_PROC.pid) + except subprocess.TimeoutExpired: + _ACTIVE_BRIDGE_PROC.kill() + logger.warning("MemOS: bridge killed after timeout (pid=%d)", _ACTIVE_BRIDGE_PROC.pid) + except Exception: + pass + _ACTIVE_BRIDGE_PROC = None + _clean_pid() From 27d64eb59c701e92cac456f2aa99d76bcf0c135c Mon Sep 17 00:00:00 2001 From: Xinyu Du Date: Mon, 27 Apr 2026 13:44:23 +0800 Subject: [PATCH 03/22] fix(memos-local-plugin): read bridge version from package.json instead of hardcoding Previously the bridge reported '2.0.0-alpha.1' regardless of the actual package version. Now it reads from package.json at startup. --- apps/memos-local-plugin/bridge.cts | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/apps/memos-local-plugin/bridge.cts b/apps/memos-local-plugin/bridge.cts index 62750c3f8..8b15c885f 100644 --- a/apps/memos-local-plugin/bridge.cts +++ b/apps/memos-local-plugin/bridge.cts @@ -15,6 +15,7 @@ */ // eslint-disable-next-line @typescript-eslint/no-require-imports const path = require("node:path") as typeof import("node:path"); +const pkgVersion: string = (require(path.resolve(__dirname, "package.json")) as { version: string }).version; interface BridgeArgs { daemon: boolean; @@ -53,7 +54,7 @@ async function main(): Promise { const { core, config, home } = await bootstrapMemoryCoreFull({ agent: args.agent, - pkgVersion: "2.0.0-alpha.1", + pkgVersion, }); await core.init(); @@ -108,15 +109,20 @@ async function main(): Promise { process.on("SIGINT", () => void shutdown("SIGINT")); process.on("SIGTERM", () => void shutdown("SIGTERM")); - // Keep the process alive until stdin ends. - await stdio.done; - try { - if (viewer) await viewer.close(); - } catch { - /* best-effort */ + // In daemon mode, run indefinitely (stdin is /dev/null — EOF is normal). + // In stdio mode, run until the calling process closes stdin. + if (args.daemon) { + await new Promise(() => {}); // never resolve → process lives forever + } else { + await stdio.done; + try { + if (viewer) await viewer.close(); + } catch { + /* best-effort */ + } + await core.shutdown(); + process.exit(0); } - await core.shutdown(); - process.exit(0); } async function tryHubRegister(opts: { @@ -127,7 +133,7 @@ async function tryHubRegister(opts: { const body = JSON.stringify({ agent: opts.selfAgent, port: opts.selfPort, - version: "2.0.0-alpha.1", + version: pkgVersion, }); for (let i = 0; i < 6; i++) { try { From f2571e7e728d4aa417c10eb1e1d0a5e1a62eef53 Mon Sep 17 00:00:00 2001 From: Xinyu Du Date: Mon, 27 Apr 2026 14:19:35 +0800 Subject: [PATCH 04/22] Update apps/memos-local-plugin/agent-contract/memory-core.ts Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- apps/memos-local-plugin/agent-contract/memory-core.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/memos-local-plugin/agent-contract/memory-core.ts b/apps/memos-local-plugin/agent-contract/memory-core.ts index e8449503c..6626c2624 100644 --- a/apps/memos-local-plugin/agent-contract/memory-core.ts +++ b/apps/memos-local-plugin/agent-contract/memory-core.ts @@ -91,8 +91,8 @@ export interface MemoryCore { userMessage?: string; }): Promise; closeEpisode(episodeId: EpisodeId): Promise; - deleteEpisode(episodeId: EpisodeId): Promise<{ deleted: boolean }>; - deleteEpisodes(ids: readonly EpisodeId[]): Promise<{ deleted: number }>; + deleteEpisode?(episodeId: EpisodeId): Promise<{ deleted: boolean }>; + deleteEpisodes?(ids: readonly EpisodeId[]): Promise<{ deleted: number }>; // ── pipeline (per turn) ── /** Called *before* the agent acts. Returns the context to inject. */ From 68bfe1656c324be6673f1f7f6c0358166c4f27f4 Mon Sep 17 00:00:00 2001 From: Xinyu Du Date: Mon, 27 Apr 2026 14:19:52 +0800 Subject: [PATCH 05/22] Update apps/memos-local-plugin/core/session/manager.ts Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- apps/memos-local-plugin/core/session/manager.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/apps/memos-local-plugin/core/session/manager.ts b/apps/memos-local-plugin/core/session/manager.ts index 547d1f0db..bc8c96744 100644 --- a/apps/memos-local-plugin/core/session/manager.ts +++ b/apps/memos-local-plugin/core/session/manager.ts @@ -148,7 +148,8 @@ export function createSessionManager(deps: SessionManagerDeps): SessionManager { try { const row = deps.sessionsRepo.getById(id); if (row) { - deps.sessionsRepo.touchLastSeen(id, Date.now(), { closedAt: Date.now() }); + const ts = now(); + deps.sessionsRepo.touchLastSeen(id, ts, { closedAt: ts }); } } catch { /* best-effort */ } live.delete(id); From bfee04d79b0127447ab8f7059f0ed56eb2847a6d Mon Sep 17 00:00:00 2001 From: Xinyu Du Date: Mon, 27 Apr 2026 14:22:17 +0800 Subject: [PATCH 06/22] fix(memos-local-plugin): reject deletion of open episodes (409 conflict) Prevent deleting episodes that are currently open/in-flight, which could cause FK violations and corrupt in-memory pipeline state. Co-Authored-By: Copilot --- .../core/pipeline/memory-core.ts | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/apps/memos-local-plugin/core/pipeline/memory-core.ts b/apps/memos-local-plugin/core/pipeline/memory-core.ts index b7952aaf4..d99b0dc16 100644 --- a/apps/memos-local-plugin/core/pipeline/memory-core.ts +++ b/apps/memos-local-plugin/core/pipeline/memory-core.ts @@ -642,13 +642,27 @@ export function createMemoryCore( handle.sessionManager.finalizeEpisode(episodeId); } + function assertEpisodeDeletable(episodeId: EpisodeId): void { + const snap = handle.sessionManager.getEpisode(episodeId); + if (snap?.status === "open") { + throw new MemosError( + "conflict", + `cannot delete open episode: ${episodeId}`, + ); + } + } + async function deleteEpisode(episodeId: EpisodeId): Promise<{ deleted: boolean }> { ensureLive(); + assertEpisodeDeletable(episodeId); return { deleted: handle.repos.episodes.deleteById(episodeId) }; } async function deleteEpisodes(ids: readonly EpisodeId[]): Promise<{ deleted: number }> { ensureLive(); + for (const id of ids) { + assertEpisodeDeletable(id); + } let deleted = 0; for (const id of ids) { if (handle.repos.episodes.deleteById(id)) deleted++; From 23d0e895c68990add17d5ff96d7083f53d3bd7a3 Mon Sep 17 00:00:00 2001 From: Xinyu Du Date: Tue, 28 Apr 2026 14:05:14 +0800 Subject: [PATCH 07/22] fix(memos-local-plugin): connect Hermes provider to daemon bridge via TCP MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Changes: - bridge_client.py: add TCP transport mode — connect to daemon bridge at 127.0.0.1:18911 instead of spawning a subprocess. Supports transparent reconnection on connection loss. Falls back to stdio subprocess mode when TCP params are not provided. - __init__.py (MemTensorProvider): connect via TCP to daemon bridge; on Hermes CLI exit, skip episode.close + session.close so the daemon bridge owns the session and the pipeline can finalize episodes naturally. Why: previously each Hermes CLI session spawned its own bridge subprocess, which died on CLI exit → all open episodes abandoned as 'session_closed:client'. With TCP mode the daemon bridge (persistent background process) keeps the session alive, and episodes get properly finalized by the pipeline. --- .../hermes/memos_provider/__init__.py | 13 +- .../hermes/memos_provider/bridge_client.py | 343 +++++++++++++----- 2 files changed, 263 insertions(+), 93 deletions(-) diff --git a/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py b/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py index c70f5fea9..c1a290bc2 100644 --- a/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py +++ b/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py @@ -142,7 +142,7 @@ def initialize(self, session_id: str, **kwargs: Any) -> None: # type: ignore[ov logger.warning("MemOS: failed to start bridge — %s", err) return try: - self._bridge = MemosBridgeClient() + self._bridge = MemosBridgeClient(tcp_host="127.0.0.1", tcp_port=18911) resp = self._bridge.request( "session.open", { @@ -455,10 +455,13 @@ def on_session_end(self, messages: list[dict[str, Any]]) -> None: # type: ignor if pending: with contextlib.suppress(Exception): self._turn_end(*pending) - with contextlib.suppress(Exception): - self._bridge.request("episode.close", {"episodeId": self._episode_id}) - with contextlib.suppress(Exception): - self._bridge.request("session.close", {"sessionId": self._session_id}) + # In TCP mode the daemon bridge owns the session — don't close it. + # The pipeline will finalize the episode naturally on the daemon side. + if not self._bridge._tcp_mode: + with contextlib.suppress(Exception): + self._bridge.request("episode.close", {"episodeId": self._episode_id}) + with contextlib.suppress(Exception): + self._bridge.request("session.close", {"sessionId": self._session_id}) def shutdown(self) -> None: # type: ignore[override] if self._prefetch_thread and self._prefetch_thread.is_alive(): diff --git a/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py b/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py index 290d638ff..1b33e49ca 100644 --- a/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py +++ b/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py @@ -1,12 +1,12 @@ -"""JSON-RPC 2.0 over stdio client for the MemOS bridge. +"""JSON-RPC 2.0 client for the MemOS bridge. -Spawns ``node bridge.cts --agent=hermes`` as a subprocess and communicates -via line-delimited JSON messages on its stdin/stdout. Responses are -matched by ``id``. Notifications (events + logs) are forwarded to -registered callbacks on a reader thread. +Two transport modes: +- **TCP** (recommended): connects to an existing daemon bridge via ``host:port``. + Hermes CLI exits without disrupting the daemon's session → episodes finalize properly. +- **stdio** (fallback): spawns ``node bridge.cts --agent=hermes`` as a subprocess. -The client is *blocking* by design — callers wanting async behaviour -should wrap requests in a thread pool. +Responses are matched by ``id``. Notifications (events + logs) are forwarded to +registered callbacks on a reader thread. Thread-safe. """ from __future__ import annotations @@ -15,14 +15,16 @@ import json import logging import os +import socket as _socket import shutil import subprocess import threading +import time from pathlib import Path from typing import TYPE_CHECKING, Any -# Ensure at-most-one bridge instance via PID file. +# Ensure at-most-one bridge instance via PID file (stdio mode only). from daemon_manager import kill_existing_bridge, register_bridge @@ -32,6 +34,11 @@ logger = logging.getLogger(__name__) +DEFAULT_TCP_HOST = "127.0.0.1" +DEFAULT_TCP_PORT = 18911 +_TCP_RECONNECT_DELAY = 1.0 # seconds between reconnection attempts +_TCP_MAX_RECONNECT = 3 + class BridgeError(RuntimeError): """Raised when the bridge returns a JSON-RPC error object.""" @@ -43,17 +50,52 @@ def __init__(self, code: str, message: str, data: Any = None) -> None: self.data = data +class _SocketTransport: + """TCP socket wrapper with line-delimited JSON read/write.""" + + def __init__(self, host: str, port: int) -> None: + self._sock = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM) + self._sock.settimeout(15.0) + self._sock.connect((host, port)) + self._sock.settimeout(None) # blocking reads after connect + self._rfile = self._sock.makefile("r", buffering=1, encoding="utf-8") + + def write_line(self, text: str) -> None: + self._sock.sendall((text + "\n").encode("utf-8")) + + def read_line(self) -> str | None: + line = self._rfile.readline() + return line if line else None + + def close(self) -> None: + try: + self._sock.shutdown(_socket.SHUT_RDWR) + except OSError: + pass + self._sock.close() + + class MemosBridgeClient: - """Client wrapping a line-delimited JSON-RPC 2.0 stdio bridge. + """Client wrapping a line-delimited JSON-RPC 2.0 bridge. + + Two transport modes: + + **TCP mode** (``tcp_host`` + ``tcp_port`` given): + Connects to an existing daemon bridge process. The Hermes CLI can exit + without closing the daemon's session, so episodes get *finalized* by the + pipeline rather than *abandoned*. On connection loss the client tries to + reconnect a few times. + + **stdio mode** (default): + Spawns ``node bridge.cts --agent=hermes`` as a subprocess and communicates + over its stdin / stdout. The Hermes CLI exiting causes the episode to be + abandoned. Usage: >>> client = MemosBridgeClient() >>> client.request("core.health", {}) {'ok': True, 'version': '...'} >>> client.close() - - Thread-safe: per-request locking ensures concurrent callers don't - interleave writes. """ def __init__( @@ -63,6 +105,8 @@ def __init__( node_binary: str | None = None, agent: str = "hermes", extra_env: dict[str, str] | None = None, + tcp_host: str | None = None, + tcp_port: int | None = None, ) -> None: self._lock = threading.Lock() self._next_id = 1 @@ -70,44 +114,91 @@ def __init__( self._events: list[Callable[[dict[str, Any]], None]] = [] self._logs: list[Callable[[dict[str, Any]], None]] = [] self._closed = False + self._tcp_mode = tcp_host is not None - node = node_binary or shutil.which("node") or "node" - script = bridge_path or str( - Path(__file__).resolve().parent.parent.parent.parent / "bridge.cts" - ) - tsx_bin = str( - Path(__file__).resolve().parent.parent.parent.parent / "node_modules" / ".bin" / "tsx" - ) - runtime = tsx_bin if Path(tsx_bin).exists() else node - runtime_args = [] if Path(tsx_bin).exists() else ["--experimental-strip-types"] - env = {**os.environ, **(extra_env or {})} - # Kill any previously-running bridge before spawning a new one. - kill_existing_bridge() - self._proc = subprocess.Popen( - [runtime, *runtime_args, script, f"--agent={agent}"], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True, - bufsize=1, - env=env, - ) - # Register the new process so daemon_manager can track it. - register_bridge(self._proc) - self._reader = threading.Thread( - target=self._read_loop, - daemon=True, - name="memos-bridge-reader", - ) - self._reader.start() - self._stderr_reader = threading.Thread( - target=self._stderr_loop, - daemon=True, - name="memos-bridge-stderr", + if self._tcp_mode: + self._tcp_host = tcp_host or DEFAULT_TCP_HOST + self._tcp_port = tcp_port or DEFAULT_TCP_PORT + self._transport: _SocketTransport | None = None + self._connect_tcp() + else: + # ── stdio mode (original behaviour, spawn subprocess) ── + node = node_binary or shutil.which("node") or "node" + script = bridge_path or str( + Path(__file__).resolve().parent.parent.parent.parent / "bridge.cts" + ) + tsx_bin = str( + Path(__file__).resolve().parent.parent.parent.parent / "node_modules" / ".bin" / "tsx" + ) + runtime = tsx_bin if Path(tsx_bin).exists() else node + runtime_args = [] if Path(tsx_bin).exists() else ["--experimental-strip-types"] + env = {**os.environ, **(extra_env or {})} + # Kill any previously-running bridge before spawning a new one. + kill_existing_bridge() + self._proc = subprocess.Popen( + [runtime, *runtime_args, script, f"--agent={agent}"], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + bufsize=1, + env=env, + ) + # Register the new process so daemon_manager can track it. + register_bridge(self._proc) + self._reader = threading.Thread( + target=self._read_loop_stdio, + daemon=True, + name="memos-bridge-reader", + ) + self._reader.start() + self._stderr_reader = threading.Thread( + target=self._stderr_loop, + daemon=True, + name="memos-bridge-stderr", + ) + self._stderr_reader.start() + + # ── TCP connection helpers ────────────────────────────────────── + + def _connect_tcp(self) -> None: + """Connect (or reconnect) to the daemon bridge TCP port.""" + last_err = None + for attempt in range(_TCP_MAX_RECONNECT): + try: + self._transport = _SocketTransport(self._tcp_host, self._tcp_port) + # Start reader thread + self._reader = threading.Thread( + target=self._read_loop_tcp, + daemon=True, + name="memos-bridge-tcp-reader", + ) + self._reader.start() + logger.info( + "bridge_client: connected to daemon at %s:%s", + self._tcp_host, self._tcp_port, + ) + return + except (ConnectionRefusedError, OSError) as e: + last_err = e + if attempt < _TCP_MAX_RECONNECT - 1: + time.sleep(_TCP_RECONNECT_DELAY * (attempt + 1)) + raise BridgeError( + "tcp_connect_failed", + f"Could not connect to daemon bridge at {self._tcp_host}:{self._tcp_port} " + f"after {_TCP_MAX_RECONNECT} attempts: {last_err}", ) - self._stderr_reader.start() - # ─── Public API ── + def _reconnect_tcp(self) -> None: + """Attempt transparent reconnection after transport loss.""" + logger.warning("bridge_client: TCP transport lost, reconnecting…") + self._transport = None + self._connect_tcp() + # Re-send session greetings so the daemon knows this client is alive. + # The daemon treats a new TCP connection as a new client — no session + # creation needed on the daemon side (it already has its own session). + + # ── Public API ────────────────────────────────────────────────── def request( self, @@ -128,12 +219,7 @@ def request( {"jsonrpc": "2.0", "id": rpc_id, "method": method, "params": params}, ensure_ascii=False, ) - try: - self._proc.stdin.write(payload + "\n") - self._proc.stdin.flush() - except (BrokenPipeError, OSError) as err: - self._pending.pop(rpc_id, None) - raise BridgeError("transport_closed", str(err)) from err + self._write_or_raise(payload + "\n") if not waiter.wait(timeout=timeout): with self._lock: @@ -154,9 +240,8 @@ def notify(self, method: str, params: Any = None) -> None: with self._lock: payload = json.dumps({"jsonrpc": "2.0", "method": method, "params": params}) try: - self._proc.stdin.write(payload + "\n") - self._proc.stdin.flush() - except (BrokenPipeError, OSError): + self._write_text(payload + "\n") + except (BrokenPipeError, OSError, ConnectionError): pass def on_event(self, cb: Callable[[dict[str, Any]], None]) -> None: @@ -169,14 +254,10 @@ def close(self) -> None: if self._closed: return self._closed = True - with contextlib.suppress(Exception): - self._proc.stdin.close() - try: - self._proc.wait(timeout=5.0) - except subprocess.TimeoutExpired: - self._proc.kill() - # Clear PID file so a future incarnation starts fresh. - register_bridge(None) + if self._tcp_mode: + self._close_tcp() + else: + self._close_stdio() # unblock any pending waiters with self._lock: for entry in list(self._pending.values()): @@ -188,36 +269,95 @@ def close(self) -> None: entry["event"].set() self._pending.clear() - # ─── Internals ── + # ── Internals: write helpers ──────────────────────────────────── + + def _write_or_raise(self, text: str) -> None: + """Write *text* to the transport; raise on failure.""" + if self._tcp_mode: + if self._transport is None: + raise BridgeError("transport_closed", "TCP transport disconnected") + try: + self._transport.write_line(text) + except (BrokenPipeError, OSError, ConnectionError) as err: + # Attempt transparent reconnect + try: + self._reconnect_tcp() + self._transport.write_line(text) + except Exception as reconnect_err: + raise BridgeError("transport_closed", str(reconnect_err)) from err + else: + assert self._proc.stdin is not None + try: + self._proc.stdin.write(text) + self._proc.stdin.flush() + except (BrokenPipeError, OSError) as err: + raise BridgeError("transport_closed", str(err)) from err + + def _write_text(self, text: str) -> None: + """Best-effort write, swallow errors.""" + try: + if self._tcp_mode: + if self._transport is not None: + self._transport.write_line(text) + else: + assert self._proc.stdin is not None + self._proc.stdin.write(text) + self._proc.stdin.flush() + except (BrokenPipeError, OSError, ConnectionError): + pass - def _read_loop(self) -> None: + # ── Internals: close ──────────────────────────────────────────── + + def _close_tcp(self) -> None: + if self._transport is not None: + self._transport.close() + self._transport = None + + def _close_stdio(self) -> None: + with contextlib.suppress(Exception): + self._proc.stdin.close() + try: + self._proc.wait(timeout=5.0) + except subprocess.TimeoutExpired: + self._proc.kill() + # Clear PID file so a future incarnation starts fresh. + register_bridge(None) + + # ── Internals: read loops ─────────────────────────────────────── + + def _read_loop_tcp(self) -> None: + """Read line-delimited JSON from the TCP socket.""" + transport = self._transport + if transport is None: + return + while not self._closed: + try: + line = transport.read_line() + except (OSError, ConnectionError): + if not self._closed: + logger.error("bridge_client: TCP read error, reader exiting") + break + if line is None: + if not self._closed: + logger.warning("bridge_client: TCP connection closed by peer") + break + line = line.strip() + if not line: + continue + self._dispatch(line) + # If the socket died but the client isn't closed, try a reconnect + # in the background. The next request() call will also trigger + # reconnection via _write_or_raise. + if not self._closed and self._transport is not None: + logger.info("bridge_client: TCP reader lost; will reconnect on next request") + + def _read_loop_stdio(self) -> None: assert self._proc.stdout is not None for line in self._proc.stdout: line = line.strip() if not line: continue - try: - msg = json.loads(line) - except json.JSONDecodeError: - logger.debug("bridge: malformed line: %r", line[:120]) - continue - if "id" in msg and msg["id"] is not None and ("result" in msg or "error" in msg): - self._resolve(msg) - continue - if msg.get("method") == "events.notify": - for cb in list(self._events): - try: - cb(msg.get("params") or {}) - except Exception: - logger.debug("event listener threw", exc_info=True) - continue - if msg.get("method") == "logs.forward": - for cb in list(self._logs): - try: - cb(msg.get("params") or {}) - except Exception: - logger.debug("log listener threw", exc_info=True) - continue + self._dispatch(line) def _stderr_loop(self) -> None: assert self._proc.stderr is not None @@ -226,6 +366,33 @@ def _stderr_loop(self) -> None: if line: logger.debug("bridge.stderr: %s", line) + # ── Common dispatch ───────────────────────────────────────────── + + def _dispatch(self, line: str) -> None: + """Parse a JSON-RPC line and route it to the right handler.""" + try: + msg = json.loads(line) + except json.JSONDecodeError: + logger.debug("bridge: malformed line: %r", line[:120]) + return + if "id" in msg and msg["id"] is not None and ("result" in msg or "error" in msg): + self._resolve(msg) + return + if msg.get("method") == "events.notify": + for cb in list(self._events): + try: + cb(msg.get("params") or {}) + except Exception: + logger.debug("event listener threw", exc_info=True) + return + if msg.get("method") == "logs.forward": + for cb in list(self._logs): + try: + cb(msg.get("params") or {}) + except Exception: + logger.debug("log listener threw", exc_info=True) + return + def _resolve(self, msg: dict[str, Any]) -> None: rpc_id = msg.get("id") if not isinstance(rpc_id, int): From dc985cfee0f1be028709c02dfa9b00b2c5629840 Mon Sep 17 00:00:00 2001 From: Xinyu Du Date: Tue, 28 Apr 2026 16:12:07 +0800 Subject: [PATCH 08/22] fix(memos-local-plugin): implement TCP transport for daemon bridge; address Copilot review Changes: - bridge/tcp.ts: new TCP JSON-RPC server (line-delimited, multi-client) - bridge.cts: start TCP server when --daemon --tcp= is given; daemon mode now waits for TCP server stop instead of blocking forever - memory-core.ts: batch-fetch sessions to avoid N+1 orphan scan (Copilot suggestion); add SQLite fallback check in assertEpisodeDeletable (Copilot suggestion) --- apps/memos-local-plugin/bridge.cts | 33 ++- apps/memos-local-plugin/bridge/tcp.ts | 202 ++++++++++++++++++ .../core/pipeline/memory-core.ts | 16 +- 3 files changed, 246 insertions(+), 5 deletions(-) create mode 100644 apps/memos-local-plugin/bridge/tcp.ts diff --git a/apps/memos-local-plugin/bridge.cts b/apps/memos-local-plugin/bridge.cts index 8b15c885f..c4da8d451 100644 --- a/apps/memos-local-plugin/bridge.cts +++ b/apps/memos-local-plugin/bridge.cts @@ -45,6 +45,9 @@ async function main(): Promise { const { startStdioServer, waitForShutdown } = (await import( pathToEsmUrl(path.resolve(__dirname, "bridge/stdio.ts")) )) as typeof import("./bridge/stdio.js"); + const { startTcpServer } = (await import( + pathToEsmUrl(path.resolve(__dirname, "bridge/tcp.ts")) + )) as typeof import("./bridge/tcp.js"); const { startHttpServer } = (await import( pathToEsmUrl(path.resolve(__dirname, "server/index.ts")) )) as typeof import("./server/index.js"); @@ -58,9 +61,25 @@ async function main(): Promise { }); await core.init(); - // Default transport: stdio. Daemon + TCP support arrives in V1.1. + // Default transport: stdio. In daemon mode, also start TCP if a port was given. const stdio = startStdioServer({ core }); + let tcpServer: Awaited> | null = null; + if (args.tcpPort !== undefined) { + try { + tcpServer = startTcpServer({ + core, + host: "127.0.0.1", + port: args.tcpPort, + }); + process.stderr.write(`bridge: tcp → ${tcpServer.url}\n`); + } catch (err) { + process.stderr.write( + `bridge: tcp server failed to start: ${(err as Error).message}\n`, + ); + } + } + // Boot a viewer too — hermes needs its own HTTP surface for the // Memory Viewer, and it discovers the openclaw hub (if any) so // both agents are reachable at `127.0.0.1:18799//`. @@ -97,6 +116,11 @@ async function main(): Promise { const shutdown = async (sig: string) => { process.stderr.write(`bridge: received ${sig}, shutting down\n`); + try { + if (tcpServer) await tcpServer.close(); + } catch { + /* best-effort */ + } try { if (viewer) await viewer.close(); } catch { @@ -109,10 +133,11 @@ async function main(): Promise { process.on("SIGINT", () => void shutdown("SIGINT")); process.on("SIGTERM", () => void shutdown("SIGTERM")); - // In daemon mode, run indefinitely (stdin is /dev/null — EOF is normal). + // In daemon mode, keep alive until TCP server stops (stdin is /dev/null). // In stdio mode, run until the calling process closes stdin. - if (args.daemon) { - await new Promise(() => {}); // never resolve → process lives forever + if (args.daemon && tcpServer) { + await tcpServer.done; + await shutdown("daemon_done"); } else { await stdio.done; try { diff --git a/apps/memos-local-plugin/bridge/tcp.ts b/apps/memos-local-plugin/bridge/tcp.ts new file mode 100644 index 000000000..87d650553 --- /dev/null +++ b/apps/memos-local-plugin/bridge/tcp.ts @@ -0,0 +1,202 @@ +/** + * Line-delimited JSON-RPC over TCP. + * + * A TCP server that accepts connections from remote clients (e.g. the + * Hermes Python provider) and dispatches JSON-RPC 2.0 messages through + * the same `Dispatcher` used by the stdio server. + * + * Each connected client gets its own read/write loop. Notifications + * (events + logs) are broadcast to all connected clients. + * + * The transport: + * - Listens on the port given via `--tcp=`. + * - Reads each connection as UTF-8 text, splits by `\n`. + * - Parses each line as JSON, dispatches via `Dispatcher`. + * - Writes responses as JSON followed by `\n` to the originating socket. + * - Broadcasts `LogRecord` and `CoreEvent` as JSON-RPC notifications + * (method = `logs.forward` / `events.notify`) to all connected clients. + */ + +import { createServer, type Socket } from "node:net"; +import { + JSONRPC_PARSE_ERROR, + JSONRPC_INVALID_REQUEST, + RPC_METHODS, + rpcCodeForError, + type JsonRpcFailure, + type JsonRpcRequest, + type JsonRpcSuccess, +} from "../agent-contract/jsonrpc.js"; +import type { MemoryCore } from "../agent-contract/memory-core.js"; +import { MemosError } from "../agent-contract/errors.js"; +import { errorCodeOf, makeDispatcher } from "./methods.js"; + +// ─── Types ────────────────────────────────────────────────────────────────── + +export interface TcpServerOptions { + core: MemoryCore; + host: string; + port: number; +} + +export interface TcpServerHandle { + readonly url: string; + close: () => Promise; + done: Promise; +} + +// ─── Server ───────────────────────────────────────────────────────────────── + +export function startTcpServer(options: TcpServerOptions): TcpServerHandle { + const { core, host, port } = options; + const dispatch = makeDispatcher(core); + + const clients = new Set(); + let closed = false; + let doneResolve: () => void; + const donePromise = new Promise((resolve) => { + doneResolve = resolve; + }); + + // Subscribe to events + logs and broadcast to all connected clients. + const eventsUnsub = core.subscribeEvents((e) => { + broadcast({ jsonrpc: "2.0", method: RPC_METHODS.EVENTS_NOTIFY, params: e }); + }); + const logsUnsub = core.subscribeLogs((r) => { + broadcast({ jsonrpc: "2.0", method: RPC_METHODS.LOGS_FORWARD, params: r }); + }); + + function broadcast(obj: unknown): void { + const payload = JSON.stringify(obj) + "\n"; + for (const sock of clients) { + try { + sock.write(payload); + } catch { + /* best-effort per client */ + } + } + } + + function errorResponse( + id: JsonRpcRequest["id"] | null, + code: number, + message: string, + data?: unknown, + ): JsonRpcFailure { + return { + jsonrpc: "2.0", + id: id ?? null, + error: { code, message, data: data as any }, + }; + } + + function writeLine(sock: Socket, obj: unknown): void { + try { + sock.write(JSON.stringify(obj) + "\n"); + } catch { + /* ignore */ + } + } + + async function handleLine(sock: Socket, line: string): Promise { + const trimmed = line.trim(); + if (trimmed.length === 0) return; + + let msg: JsonRpcRequest | null = null; + try { + msg = JSON.parse(trimmed) as JsonRpcRequest; + } catch (err) { + writeLine( + sock, + errorResponse(null, JSONRPC_PARSE_ERROR, "invalid JSON", { + text: err instanceof Error ? err.message : String(err), + }), + ); + return; + } + + if (!msg || typeof msg !== "object" || msg.jsonrpc !== "2.0" || !msg.method) { + writeLine(sock, errorResponse(msg?.id ?? null, JSONRPC_INVALID_REQUEST, "not JSON-RPC 2.0")); + return; + } + + try { + const result = await dispatch(msg.method, msg.params); + if (msg.id !== undefined && msg.id !== null) { + const ok: JsonRpcSuccess = { jsonrpc: "2.0", id: msg.id, result }; + writeLine(sock, ok); + } + } catch (err) { + const code = rpcCodeForError(errorCodeOf(err)); + const mErr = + err instanceof MemosError + ? err + : new MemosError("internal", err instanceof Error ? err.message : String(err)); + writeLine(sock, errorResponse(msg.id ?? null, code, mErr.message, mErr.toJSON())); + process.stderr.write(`bridge.tcp.dispatch.err ${msg.method}: ${mErr.message}\n`); + } + } + + // ─── Server ─────────────────────────────────────────────────────────────── + + const server = createServer((sock: Socket) => { + clients.add(sock); + process.stderr.write( + `bridge.tcp: client connected (${sock.remoteAddress ?? "unknown"}:${sock.remotePort ?? "?"})\n`, + ); + + let buffer = ""; + sock.setEncoding("utf8"); + + sock.on("data", (chunk: string) => { + buffer += chunk; + let nl = buffer.indexOf("\n"); + while (nl >= 0) { + const line = buffer.slice(0, nl); + buffer = buffer.slice(nl + 1); + void handleLine(sock, line); + nl = buffer.indexOf("\n"); + } + }); + + sock.on("close", () => { + clients.delete(sock); + process.stderr.write( + `bridge.tcp: client disconnected (${sock.remoteAddress ?? "unknown"}:${sock.remotePort ?? "?"})\n`, + ); + }); + + sock.on("error", (err) => { + process.stderr.write(`bridge.tcp: socket error: ${err.message}\n`); + clients.delete(sock); + }); + }); + + server.on("error", (err) => { + process.stderr.write(`bridge.tcp: server error: ${err.message}\n`); + }); + + server.listen(port, host, () => { + process.stderr.write(`bridge.tcp: listening on ${host}:${port}\n`); + }); + + return { + get url() { + return `tcp://${host}:${port}`; + }, + async close() { + if (closed) return; + closed = true; + eventsUnsub(); + logsUnsub(); + for (const sock of clients) { + sock.end(); + sock.destroy(); + } + clients.clear(); + server.close(); + doneResolve(); + }, + done: donePromise, + }; +} diff --git a/apps/memos-local-plugin/core/pipeline/memory-core.ts b/apps/memos-local-plugin/core/pipeline/memory-core.ts index d99b0dc16..329adbfb7 100644 --- a/apps/memos-local-plugin/core/pipeline/memory-core.ts +++ b/apps/memos-local-plugin/core/pipeline/memory-core.ts @@ -333,11 +333,17 @@ export function createMemoryCore( // trace_ids_json. try { const openEpisodes = handle.repos.episodes.list({ status: "open", limit: 500 }); + // Batch-fetch sessions to avoid N+1 lookups per episode. + const sessionIds = new Set(openEpisodes.map((ep) => ep.sessionId)); + const sessionById = new Map>(); + for (const sid of sessionIds) { + sessionById.set(sid, handle.repos.sessions.getById(sid)); + } // Only treat an open episode as an orphan if its session has been // explicitly closed (meta.closedAt is set) or no longer exists. // Otherwise the session might reconnect — leave it alone. const orphans = openEpisodes.filter((ep) => { - const session = handle.repos.sessions.getById(ep.sessionId); + const session = sessionById.get(ep.sessionId); if (!session) return true; // session row gone — orphan if (session.meta?.closedAt != null) return true; // explicitly closed — orphan return false; // session exists and not closed — skip, might reconnect @@ -650,6 +656,14 @@ export function createMemoryCore( `cannot delete open episode: ${episodeId}`, ); } + // After restart the in-memory manager may not have this episode, + // but it might still be open in SQLite — double-check the DB. + if (!snap && handle.repos.episodes.getById(episodeId)?.status === "open") { + throw new MemosError( + "conflict", + `cannot delete open episode: ${episodeId} (open in DB)`, + ); + } } async function deleteEpisode(episodeId: EpisodeId): Promise<{ deleted: boolean }> { From 948adff2f8df9f9dc8447e722608fe08bd55e7d7 Mon Sep 17 00:00:00 2001 From: Xinyu Du Date: Tue, 28 Apr 2026 16:43:00 +0800 Subject: [PATCH 09/22] =?UTF-8?q?fix(memos-local-plugin):=20address=20Copi?= =?UTF-8?q?lot=20review=20round=202=20=E2=80=94=20TCP=20fallback,=20double?= =?UTF-8?q?=20newline,=20port=20validation?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Changes per Copilot review (8 comments): - bridge_client.py: fix double-newline in _SocketTransport.write_line() - bridge.cts: validate --tcp port with Number.isFinite() before use - __init__.py: TCP mode with stdio fallback (try TCP first, spawn subprocess on failure); restore episode.close + session.close in on_session_end (Copilot correctly noted closedAt must be stamped) - daemon_manager.py: add mkdir(parents=True) before writing PID file (prevents crash when parent dir doesn't exist) --- .../hermes/memos_provider/__init__.py | 23 +++++++++++------- .../hermes/memos_provider/bridge_client.py | 4 +++- .../hermes/memos_provider/daemon_manager.py | 4 +++- apps/memos-local-plugin/bridge.cts | 24 ++++++++++++------- 4 files changed, 36 insertions(+), 19 deletions(-) diff --git a/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py b/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py index c1a290bc2..13a1d7eef 100644 --- a/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py +++ b/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py @@ -142,7 +142,13 @@ def initialize(self, session_id: str, **kwargs: Any) -> None: # type: ignore[ov logger.warning("MemOS: failed to start bridge — %s", err) return try: - self._bridge = MemosBridgeClient(tcp_host="127.0.0.1", tcp_port=18911) + # Try TCP mode first (connect to daemon bridge). + # Falls back to stdio (spawn subprocess) if daemon isn't running. + try: + self._bridge = MemosBridgeClient(tcp_host="127.0.0.1", tcp_port=18911) + except BridgeError: + logger.info("MemOS: TCP daemon not available, falling back to stdio bridge") + self._bridge = MemosBridgeClient() resp = self._bridge.request( "session.open", { @@ -455,13 +461,14 @@ def on_session_end(self, messages: list[dict[str, Any]]) -> None: # type: ignor if pending: with contextlib.suppress(Exception): self._turn_end(*pending) - # In TCP mode the daemon bridge owns the session — don't close it. - # The pipeline will finalize the episode naturally on the daemon side. - if not self._bridge._tcp_mode: - with contextlib.suppress(Exception): - self._bridge.request("episode.close", {"episodeId": self._episode_id}) - with contextlib.suppress(Exception): - self._bridge.request("session.close", {"sessionId": self._session_id}) + # Close the episode and session so the core stamps closure metadata + # (e.g. session.meta.closedAt). In TCP mode this ensures the daemon + # can distinguish "normal shutdown" from "abrupt disconnect" on + # restart, preventing orphan retention. + with contextlib.suppress(Exception): + self._bridge.request("episode.close", {"episodeId": self._episode_id}) + with contextlib.suppress(Exception): + self._bridge.request("session.close", {"sessionId": self._session_id}) def shutdown(self) -> None: # type: ignore[override] if self._prefetch_thread and self._prefetch_thread.is_alive(): diff --git a/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py b/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py index 1b33e49ca..3d0e07bc8 100644 --- a/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py +++ b/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py @@ -61,7 +61,9 @@ def __init__(self, host: str, port: int) -> None: self._rfile = self._sock.makefile("r", buffering=1, encoding="utf-8") def write_line(self, text: str) -> None: - self._sock.sendall((text + "\n").encode("utf-8")) + # Avoid double newline: callers may or may not include \n. + payload = text if text.endswith("\n") else text + "\n" + self._sock.sendall(payload.encode("utf-8")) def read_line(self) -> str | None: line = self._rfile.readline() diff --git a/apps/memos-local-plugin/adapters/hermes/memos_provider/daemon_manager.py b/apps/memos-local-plugin/adapters/hermes/memos_provider/daemon_manager.py index d859aaa13..5defd5e41 100644 --- a/apps/memos-local-plugin/adapters/hermes/memos_provider/daemon_manager.py +++ b/apps/memos-local-plugin/adapters/hermes/memos_provider/daemon_manager.py @@ -52,7 +52,9 @@ def _read_pid() -> int | None: def _write_pid(pid: int) -> None: - _pid_path().write_text(str(pid)) + pid_path = _pid_path() + pid_path.parent.mkdir(parents=True, exist_ok=True) + pid_path.write_text(str(pid)) def _clean_pid() -> None: diff --git a/apps/memos-local-plugin/bridge.cts b/apps/memos-local-plugin/bridge.cts index c4da8d451..d8346d4a6 100644 --- a/apps/memos-local-plugin/bridge.cts +++ b/apps/memos-local-plugin/bridge.cts @@ -66,17 +66,23 @@ async function main(): Promise { let tcpServer: Awaited> | null = null; if (args.tcpPort !== undefined) { - try { - tcpServer = startTcpServer({ - core, - host: "127.0.0.1", - port: args.tcpPort, - }); - process.stderr.write(`bridge: tcp → ${tcpServer.url}\n`); - } catch (err) { + if (!Number.isFinite(args.tcpPort) || args.tcpPort < 1 || args.tcpPort > 65535) { process.stderr.write( - `bridge: tcp server failed to start: ${(err as Error).message}\n`, + `bridge: invalid --tcp port value: ${String(args.tcpPort)} (must be 1–65535)\n`, ); + } else { + try { + tcpServer = startTcpServer({ + core, + host: "127.0.0.1", + port: args.tcpPort, + }); + process.stderr.write(`bridge: tcp → ${tcpServer.url}\n`); + } catch (err) { + process.stderr.write( + `bridge: tcp server failed to start: ${(err as Error).message}\n`, + ); + } } } From e09f116b1f65cd6f11fc058245de4e326b90bc35 Mon Sep 17 00:00:00 2001 From: Xinyu Du Date: Tue, 28 Apr 2026 17:19:14 +0800 Subject: [PATCH 10/22] fix: reconnect FD leak, tcp close race, deleteEpisode non-optional bridge_client.py: close old transport before reconnect bridge/tcp.ts: await server.close() in TcpServerHandle.close() agent-contract/memory-core.ts: deleteEpisode/deleteEpisodes no longer optional --- .../adapters/hermes/memos_provider/bridge_client.py | 4 ++++ apps/memos-local-plugin/agent-contract/memory-core.ts | 4 ++-- apps/memos-local-plugin/bridge/tcp.ts | 7 ++++++- 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py b/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py index 3d0e07bc8..a48faa386 100644 --- a/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py +++ b/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py @@ -194,7 +194,11 @@ def _connect_tcp(self) -> None: def _reconnect_tcp(self) -> None: """Attempt transparent reconnection after transport loss.""" logger.warning("bridge_client: TCP transport lost, reconnecting…") + old = self._transport self._transport = None + if old is not None: + with contextlib.suppress(Exception): + old.close() self._connect_tcp() # Re-send session greetings so the daemon knows this client is alive. # The daemon treats a new TCP connection as a new client — no session diff --git a/apps/memos-local-plugin/agent-contract/memory-core.ts b/apps/memos-local-plugin/agent-contract/memory-core.ts index 6626c2624..e8449503c 100644 --- a/apps/memos-local-plugin/agent-contract/memory-core.ts +++ b/apps/memos-local-plugin/agent-contract/memory-core.ts @@ -91,8 +91,8 @@ export interface MemoryCore { userMessage?: string; }): Promise; closeEpisode(episodeId: EpisodeId): Promise; - deleteEpisode?(episodeId: EpisodeId): Promise<{ deleted: boolean }>; - deleteEpisodes?(ids: readonly EpisodeId[]): Promise<{ deleted: number }>; + deleteEpisode(episodeId: EpisodeId): Promise<{ deleted: boolean }>; + deleteEpisodes(ids: readonly EpisodeId[]): Promise<{ deleted: number }>; // ── pipeline (per turn) ── /** Called *before* the agent acts. Returns the context to inject. */ diff --git a/apps/memos-local-plugin/bridge/tcp.ts b/apps/memos-local-plugin/bridge/tcp.ts index 87d650553..eb4d72d26 100644 --- a/apps/memos-local-plugin/bridge/tcp.ts +++ b/apps/memos-local-plugin/bridge/tcp.ts @@ -194,7 +194,12 @@ export function startTcpServer(options: TcpServerOptions): TcpServerHandle { sock.destroy(); } clients.clear(); - server.close(); + await new Promise((resolve, reject) => { + server.close((err) => { + if (err) reject(err); + else resolve(); + }); + }); doneResolve(); }, done: donePromise, From 34bdb43503df5da0f8b7aacb012e022475ad10b9 Mon Sep 17 00:00:00 2001 From: Xinyu Du Date: Tue, 28 Apr 2026 17:45:53 +0800 Subject: [PATCH 11/22] fix: tcp listen error handling, tcp_host fallback, delete response shape bridge/tcp.ts: wrap server.listen in a promise; expose 'ready' so callers catch EADDRINUSE; remove stale server-level error handler bridge.cts: await tcpServer.ready inside try/catch bridge_client.py: enable TCP mode when either tcp_host or tcp_port is provided (not just tcp_host) server/routes/session.ts: return { ok: true, deleted } for backward compatibility with callers expecting { ok: true } --- .../hermes/memos_provider/bridge_client.py | 2 +- apps/memos-local-plugin/bridge.cts | 1 + apps/memos-local-plugin/bridge/tcp.ts | 22 ++++++++++++++----- .../server/routes/session.ts | 5 +++-- 4 files changed, 21 insertions(+), 9 deletions(-) diff --git a/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py b/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py index a48faa386..ca6ce7a91 100644 --- a/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py +++ b/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py @@ -116,7 +116,7 @@ def __init__( self._events: list[Callable[[dict[str, Any]], None]] = [] self._logs: list[Callable[[dict[str, Any]], None]] = [] self._closed = False - self._tcp_mode = tcp_host is not None + self._tcp_mode = tcp_host is not None or tcp_port is not None if self._tcp_mode: self._tcp_host = tcp_host or DEFAULT_TCP_HOST diff --git a/apps/memos-local-plugin/bridge.cts b/apps/memos-local-plugin/bridge.cts index d8346d4a6..ad1d8e3c9 100644 --- a/apps/memos-local-plugin/bridge.cts +++ b/apps/memos-local-plugin/bridge.cts @@ -77,6 +77,7 @@ async function main(): Promise { host: "127.0.0.1", port: args.tcpPort, }); + await tcpServer.ready; // throws on EADDRINUSE etc. process.stderr.write(`bridge: tcp → ${tcpServer.url}\n`); } catch (err) { process.stderr.write( diff --git a/apps/memos-local-plugin/bridge/tcp.ts b/apps/memos-local-plugin/bridge/tcp.ts index eb4d72d26..80be13b56 100644 --- a/apps/memos-local-plugin/bridge/tcp.ts +++ b/apps/memos-local-plugin/bridge/tcp.ts @@ -41,6 +41,8 @@ export interface TcpServerOptions { export interface TcpServerHandle { readonly url: string; + /** Resolves once the server is actually listening, rejects on error. */ + ready: Promise; close: () => Promise; done: Promise; } @@ -172,18 +174,26 @@ export function startTcpServer(options: TcpServerOptions): TcpServerHandle { }); }); - server.on("error", (err) => { - process.stderr.write(`bridge.tcp: server error: ${err.message}\n`); - }); - - server.listen(port, host, () => { - process.stderr.write(`bridge.tcp: listening on ${host}:${port}\n`); + // Wrap listen in a promise so callers can catch EADDRINUSE etc. + const listenPromise = new Promise((resolve, reject) => { + server.on("error", reject); + server.listen(port, host, () => { + process.stderr.write(`bridge.tcp: listening on ${host}:${port}\n`); + resolve(); + }); }); + // Once listening, switch error handler to non-fatal logging. + listenPromise.then( + () => server.removeAllListeners("error"), + () => {}, // rejection handled by caller + ); return { get url() { return `tcp://${host}:${port}`; }, + /** Resolves once the server is actually listening. */ + ready: listenPromise, async close() { if (closed) return; closed = true; diff --git a/apps/memos-local-plugin/server/routes/session.ts b/apps/memos-local-plugin/server/routes/session.ts index 9cc598634..707a569b3 100644 --- a/apps/memos-local-plugin/server/routes/session.ts +++ b/apps/memos-local-plugin/server/routes/session.ts @@ -52,7 +52,7 @@ export function registerSessionRoutes(routes: Routes, deps: ServerDeps): void { return; } const result = await deps.core.deleteEpisode(id as EpisodeId); - return result; + return { ok: true, deleted: result.deleted }; }); routes.set("POST /api/v1/episodes/delete", async (ctx) => { @@ -64,7 +64,8 @@ export function registerSessionRoutes(routes: Routes, deps: ServerDeps): void { writeError(ctx, 400, "invalid_argument", "ids[] is required"); return; } - return await deps.core.deleteEpisodes(ids as EpisodeId[]); + const result = await deps.core.deleteEpisodes(ids as EpisodeId[]); + return { ok: true, deleted: result.deleted }; }); routes.set("GET /api/v1/episodes", async (ctx) => { From 7e08cf7ddb62f2b4737a8b4b277f24c99cb6e97c Mon Sep 17 00:00:00 2001 From: Xinyu Du Date: Tue, 28 Apr 2026 18:40:43 +0800 Subject: [PATCH 12/22] fix: shutdown re-entrancy, JSON-RPC method validation, socket error cleanup bridge.cts: add shuttingDown guard to prevent double-shutdown bridge/tcp.ts: validate typeof msg.method === 'string'; sock.destroy() on error to prevent resource leak --- apps/memos-local-plugin/bridge.cts | 4 ++++ apps/memos-local-plugin/bridge/tcp.ts | 3 ++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/apps/memos-local-plugin/bridge.cts b/apps/memos-local-plugin/bridge.cts index ad1d8e3c9..b95269446 100644 --- a/apps/memos-local-plugin/bridge.cts +++ b/apps/memos-local-plugin/bridge.cts @@ -120,8 +120,12 @@ async function main(): Promise { `bridge: viewer failed to start: ${(err as Error).message}\n`, ); } + let tcpServer: Awaited> | null = null; + let shuttingDown = false; const shutdown = async (sig: string) => { + if (shuttingDown) return; + shuttingDown = true; process.stderr.write(`bridge: received ${sig}, shutting down\n`); try { if (tcpServer) await tcpServer.close(); diff --git a/apps/memos-local-plugin/bridge/tcp.ts b/apps/memos-local-plugin/bridge/tcp.ts index 80be13b56..3105b75e6 100644 --- a/apps/memos-local-plugin/bridge/tcp.ts +++ b/apps/memos-local-plugin/bridge/tcp.ts @@ -117,7 +117,7 @@ export function startTcpServer(options: TcpServerOptions): TcpServerHandle { return; } - if (!msg || typeof msg !== "object" || msg.jsonrpc !== "2.0" || !msg.method) { + if (!msg || typeof msg !== "object" || msg.jsonrpc !== "2.0" || typeof msg.method !== "string") { writeLine(sock, errorResponse(msg?.id ?? null, JSONRPC_INVALID_REQUEST, "not JSON-RPC 2.0")); return; } @@ -171,6 +171,7 @@ export function startTcpServer(options: TcpServerOptions): TcpServerHandle { sock.on("error", (err) => { process.stderr.write(`bridge.tcp: socket error: ${err.message}\n`); clients.delete(sock); + sock.destroy(); }); }); From 981a1dbe5b7cba14905baafdc35c96de4465c031 Mon Sep 17 00:00:00 2001 From: Xinyu Du Date: Tue, 28 Apr 2026 18:42:00 +0800 Subject: [PATCH 13/22] fix: remove duplicate tcpServer declaration --- apps/memos-local-plugin/bridge.cts | 1 - 1 file changed, 1 deletion(-) diff --git a/apps/memos-local-plugin/bridge.cts b/apps/memos-local-plugin/bridge.cts index b95269446..c806e4556 100644 --- a/apps/memos-local-plugin/bridge.cts +++ b/apps/memos-local-plugin/bridge.cts @@ -120,7 +120,6 @@ async function main(): Promise { `bridge: viewer failed to start: ${(err as Error).message}\n`, ); } - let tcpServer: Awaited> | null = null; let shuttingDown = false; const shutdown = async (sig: string) => { From a641b1d232498a9afba1b1229319a276ac5df3ee Mon Sep 17 00:00:00 2001 From: Xinyu Du Date: Tue, 28 Apr 2026 18:55:23 +0800 Subject: [PATCH 14/22] fix: socket destroy guard, deleteClosedEpisode helper bridge/tcp.ts: check sock.destroyed before destroying core/pipeline/memory-core.ts: extract deleteClosedEpisode helper with no-op regression guard --- apps/memos-local-plugin/bridge/tcp.ts | 4 +++- .../core/pipeline/memory-core.ts | 23 ++++++++++++++----- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/apps/memos-local-plugin/bridge/tcp.ts b/apps/memos-local-plugin/bridge/tcp.ts index 3105b75e6..03cd34a22 100644 --- a/apps/memos-local-plugin/bridge/tcp.ts +++ b/apps/memos-local-plugin/bridge/tcp.ts @@ -171,7 +171,9 @@ export function startTcpServer(options: TcpServerOptions): TcpServerHandle { sock.on("error", (err) => { process.stderr.write(`bridge.tcp: socket error: ${err.message}\n`); clients.delete(sock); - sock.destroy(); + if (!sock.destroyed) { + sock.destroy(); + } }); }); diff --git a/apps/memos-local-plugin/core/pipeline/memory-core.ts b/apps/memos-local-plugin/core/pipeline/memory-core.ts index 329adbfb7..0ecb403be 100644 --- a/apps/memos-local-plugin/core/pipeline/memory-core.ts +++ b/apps/memos-local-plugin/core/pipeline/memory-core.ts @@ -666,20 +666,31 @@ export function createMemoryCore( } } + function deleteClosedEpisode(episodeId: EpisodeId): boolean { + const existing = handle.repos.episodes.getById(episodeId); + assertEpisodeDeletable(episodeId); + const deleted = handle.repos.episodes.deleteById(episodeId); + // Guard against regressions where deletion silently becomes a no-op + // for an existing closed episode. Missing episodes return `false`. + if (existing && !deleted) { + throw new MemosError( + "internal", + `failed to delete closed episode: ${episodeId}`, + ); + } + return deleted; + } + async function deleteEpisode(episodeId: EpisodeId): Promise<{ deleted: boolean }> { ensureLive(); - assertEpisodeDeletable(episodeId); - return { deleted: handle.repos.episodes.deleteById(episodeId) }; + return { deleted: deleteClosedEpisode(episodeId) }; } async function deleteEpisodes(ids: readonly EpisodeId[]): Promise<{ deleted: number }> { ensureLive(); - for (const id of ids) { - assertEpisodeDeletable(id); - } let deleted = 0; for (const id of ids) { - if (handle.repos.episodes.deleteById(id)) deleted++; + if (deleteClosedEpisode(id)) deleted++; } return { deleted }; } From 0fe79062ce2f2ed122eaa90f07a7c9d361c2c25b Mon Sep 17 00:00:00 2001 From: Xinyu Du Date: Tue, 28 Apr 2026 19:29:50 +0800 Subject: [PATCH 15/22] fix: TCP cleanup on failure, error listener, daemon guard, BridgeError import MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit bridge.cts: close tcpServer on startup failure; warn and skip TCP when --daemon not set; daemon mode now also falls back to stdio when TCP port is not given bridge/tcp.ts: isListening flag for error handling (no more removeAllListeners); runtime errors are logged not swallowed __init__.py: import BridgeError so TCP→stdio fallback actually works --- .../adapters/hermes/memos_provider/__init__.py | 2 +- apps/memos-local-plugin/bridge.cts | 13 ++++++++++--- apps/memos-local-plugin/bridge/tcp.ts | 15 +++++++++------ 3 files changed, 20 insertions(+), 10 deletions(-) diff --git a/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py b/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py index 13a1d7eef..56ee40490 100644 --- a/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py +++ b/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py @@ -62,7 +62,7 @@ if str(_PLUGIN_DIR) not in sys.path: sys.path.insert(0, str(_PLUGIN_DIR)) -from bridge_client import MemosBridgeClient # noqa: E402 +from bridge_client import BridgeError, MemosBridgeClient # noqa: E402 from daemon_manager import ensure_bridge_running, shutdown_bridge # noqa: E402 diff --git a/apps/memos-local-plugin/bridge.cts b/apps/memos-local-plugin/bridge.cts index c806e4556..5a3587326 100644 --- a/apps/memos-local-plugin/bridge.cts +++ b/apps/memos-local-plugin/bridge.cts @@ -61,11 +61,14 @@ async function main(): Promise { }); await core.init(); - // Default transport: stdio. In daemon mode, also start TCP if a port was given. + // Default transport: stdio. Start TCP only in daemon mode when a port was given. const stdio = startStdioServer({ core }); - let tcpServer: Awaited> | null = null; - if (args.tcpPort !== undefined) { + if (args.tcpPort !== undefined && !args.daemon) { + process.stderr.write( + "bridge: ignoring --tcp because TCP mode requires --daemon\n", + ); + } else if (args.tcpPort !== undefined) { if (!Number.isFinite(args.tcpPort) || args.tcpPort < 1 || args.tcpPort > 65535) { process.stderr.write( `bridge: invalid --tcp port value: ${String(args.tcpPort)} (must be 1–65535)\n`, @@ -83,6 +86,10 @@ async function main(): Promise { process.stderr.write( `bridge: tcp server failed to start: ${(err as Error).message}\n`, ); + if (tcpServer) { + await tcpServer.close().catch(() => {}); + tcpServer = null; + } } } } diff --git a/apps/memos-local-plugin/bridge/tcp.ts b/apps/memos-local-plugin/bridge/tcp.ts index 03cd34a22..4167c274a 100644 --- a/apps/memos-local-plugin/bridge/tcp.ts +++ b/apps/memos-local-plugin/bridge/tcp.ts @@ -178,18 +178,21 @@ export function startTcpServer(options: TcpServerOptions): TcpServerHandle { }); // Wrap listen in a promise so callers can catch EADDRINUSE etc. + let isListening = false; const listenPromise = new Promise((resolve, reject) => { - server.on("error", reject); + server.on("error", (err) => { + if (!isListening) { + reject(err); + return; + } + process.stderr.write(`bridge.tcp: server error: ${err.message}\n`); + }); server.listen(port, host, () => { + isListening = true; process.stderr.write(`bridge.tcp: listening on ${host}:${port}\n`); resolve(); }); }); - // Once listening, switch error handler to non-fatal logging. - listenPromise.then( - () => server.removeAllListeners("error"), - () => {}, // rejection handled by caller - ); return { get url() { From 8abb30b8657b9456daac7be6b2919525e5c4fd78 Mon Sep 17 00:00:00 2001 From: Xinyu Du Date: Tue, 28 Apr 2026 21:31:55 +0800 Subject: [PATCH 16/22] test(memos-local-plugin): add orphan init and deleteEpisode unit tests --- .../tests/unit/pipeline/memory-core.test.ts | 175 +++++++++++++++++- 1 file changed, 174 insertions(+), 1 deletion(-) diff --git a/apps/memos-local-plugin/tests/unit/pipeline/memory-core.test.ts b/apps/memos-local-plugin/tests/unit/pipeline/memory-core.test.ts index cbe2c7b0a..326dcb44b 100644 --- a/apps/memos-local-plugin/tests/unit/pipeline/memory-core.test.ts +++ b/apps/memos-local-plugin/tests/unit/pipeline/memory-core.test.ts @@ -188,7 +188,180 @@ describe("MemoryCore façade", () => { }); }); -describe("bootstrapMemoryCore", () => { +describe("init() orphan episode handling", () => { + it("preserves open episodes for sessions that were not explicitly closed", async () => { + pipeline = createPipeline(buildDeps(db!)); + core = createMemoryCore( + pipeline, + resolveHome("openclaw", "/tmp/memos-mc-test"), + "test", + ); + // Simulate a session still active on disk — no meta.closedAt. + db!.repos.sessions.upsert({ + id: "s-orphan-keep", + agent: "openclaw", + startedAt: 1_700_000_000_000, + lastSeenAt: 1_700_000_100_000, + meta: {}, + }); + db!.repos.episodes.insert({ + id: "ep-orphan-keep", + sessionId: "s-orphan-keep", + startedAt: 1_700_000_000_000, + endedAt: null, + traceIds: [], + rTask: null, + status: "open", + meta: {}, + }); + + await core.init(); + + const ep = db!.repos.episodes.getById("ep-orphan-keep"); + expect(ep).not.toBeNull(); + expect(ep!.status).toBe("open"); + }); + + it("closes open episodes for sessions that were explicitly closed", async () => { + pipeline = createPipeline(buildDeps(db!)); + core = createMemoryCore( + pipeline, + resolveHome("openclaw", "/tmp/memos-mc-test"), + "test", + ); + db!.repos.sessions.upsert({ + id: "s-orphan-close", + agent: "openclaw", + startedAt: 1_700_000_000_000, + lastSeenAt: 1_700_000_200_000, + meta: { closedAt: 1_700_000_200_000 }, + }); + db!.repos.episodes.insert({ + id: "ep-orphan-close", + sessionId: "s-orphan-close", + startedAt: 1_700_000_000_000, + endedAt: null, + traceIds: [], + rTask: null, + status: "open", + meta: {}, + }); + + await core.init(); + + const ep = db!.repos.episodes.getById("ep-orphan-close"); + expect(ep).not.toBeNull(); + expect(ep!.status).toBe("closed"); + }); + + it("closes open episodes for sessions that no longer exist", async () => { + pipeline = createPipeline(buildDeps(db!)); + core = createMemoryCore( + pipeline, + resolveHome("openclaw", "/tmp/memos-mc-test"), + "test", + ); + // FK requires the session to exist first; we then delete it via + // raw SQL to simulate a session row that was removed (e.g. manual + // DB cleanup), leaving an orphan episode behind. + db!.repos.sessions.upsert({ + id: "s-gone", + agent: "openclaw", + startedAt: 1_700_000_000_000, + lastSeenAt: 1_700_000_000_000, + meta: {}, + }); + db!.repos.episodes.insert({ + id: "ep-no-session", + sessionId: "s-gone", + startedAt: 1_700_000_000_000, + endedAt: null, + traceIds: [], + rTask: null, + status: "open", + meta: {}, + }); + // Temporarily disable FK checks so we can delete the session + // while keeping the orphan episode for the test. + db!.db.exec("PRAGMA foreign_keys = OFF; DELETE FROM sessions WHERE id='s-gone'; PRAGMA foreign_keys = ON;"); + + await core.init(); + + const ep = db!.repos.episodes.getById("ep-no-session"); + expect(ep).not.toBeNull(); + expect(ep!.status).toBe("closed"); + }); + }); + + describe("deleteEpisode / deleteEpisodes", () => { + it("deletes a closed episode and returns deleted: true", async () => { + pipeline = createPipeline(buildDeps(db!)); + core = createMemoryCore( + pipeline, + resolveHome("openclaw", "/tmp/memos-mc-test"), + "test", + ); + await core.init(); + const sid = await core.openSession({ agent: "openclaw" }); + const eid = await core.openEpisode({ sessionId: sid }); + await core.closeEpisode(eid); + + const result = await core.deleteEpisode(eid); + expect(result.deleted).toBe(true); + expect(db!.repos.episodes.getById(eid)).toBeNull(); + }); + + it("returns deleted: false for a missing episode", async () => { + pipeline = createPipeline(buildDeps(db!)); + core = createMemoryCore( + pipeline, + resolveHome("openclaw", "/tmp/memos-mc-test"), + "test", + ); + await core.init(); + + const result = await core.deleteEpisode("ep-does-not-exist"); + expect(result.deleted).toBe(false); + }); + + it("throws conflict when deleting an open episode", async () => { + pipeline = createPipeline(buildDeps(db!)); + core = createMemoryCore( + pipeline, + resolveHome("openclaw", "/tmp/memos-mc-test"), + "test", + ); + await core.init(); + const sid = await core.openSession({ agent: "openclaw" }); + const eid = await core.openEpisode({ sessionId: sid }); + + await expect(core.deleteEpisode(eid)).rejects.toMatchObject({ + code: "conflict", + }); + }); + + it("deleteEpisodes bulk-deletes multiple closed episodes", async () => { + pipeline = createPipeline(buildDeps(db!)); + core = createMemoryCore( + pipeline, + resolveHome("openclaw", "/tmp/memos-mc-test"), + "test", + ); + await core.init(); + const sid = await core.openSession({ agent: "openclaw" }); + const e1 = await core.openEpisode({ sessionId: sid }); + const e2 = await core.openEpisode({ sessionId: sid }); + await core.closeEpisode(e1); + await core.closeEpisode(e2); + + const result = await core.deleteEpisodes([e1, e2]); + expect(result.deleted).toBe(2); + expect(db!.repos.episodes.getById(e1)).toBeNull(); + expect(db!.repos.episodes.getById(e2)).toBeNull(); + }); + }); + + describe("bootstrapMemoryCore", () => { let home: TmpHomeContext | null = null; afterEach(async () => { From 700f00669af4431cd90b39ca6fe28b358ab855ce Mon Sep 17 00:00:00 2001 From: Xinyu Du Date: Tue, 28 Apr 2026 22:05:56 +0800 Subject: [PATCH 17/22] fix(memos-local-plugin): close _rfile leak, PID path via MEMOS_HOME, stale PID validation, daemon stdio gate --- .../hermes/memos_provider/bridge_client.py | 4 + .../hermes/memos_provider/daemon_manager.py | 75 +++++++++++++++---- apps/memos-local-plugin/bridge.cts | 21 +++++- 3 files changed, 83 insertions(+), 17 deletions(-) diff --git a/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py b/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py index ca6ce7a91..8467007a6 100644 --- a/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py +++ b/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py @@ -70,6 +70,10 @@ def read_line(self) -> str | None: return line if line else None def close(self) -> None: + try: + self._rfile.close() + except Exception: + pass try: self._sock.shutdown(_socket.SHUT_RDWR) except OSError: diff --git a/apps/memos-local-plugin/adapters/hermes/memos_provider/daemon_manager.py b/apps/memos-local-plugin/adapters/hermes/memos_provider/daemon_manager.py index 5defd5e41..e1f750456 100644 --- a/apps/memos-local-plugin/adapters/hermes/memos_provider/daemon_manager.py +++ b/apps/memos-local-plugin/adapters/hermes/memos_provider/daemon_manager.py @@ -40,7 +40,15 @@ def _pid_path() -> Path: - """Path to the singleton PID file under the plugin data directory.""" + """Path to the singleton PID file under the runtime daemon directory. + + Respects ``MEMOS_HOME`` when set (``~/.hermes/memos-plugin`` by + convention), falling back to the plugin source tree only when the env + var is absent for compatibility with development installs. + """ + memos_home = os.environ.get("MEMOS_HOME") + if memos_home: + return Path(memos_home) / "daemon" / "bridge.pid" return Path(__file__).resolve().parent.parent.parent.parent / "data" / "bridge.pid" @@ -111,25 +119,66 @@ def ensure_bridge_running(*, probe_only: bool = False) -> bool: return True +def _is_bridge_process(pid: int) -> bool: + """Return True when *pid* looks like a bridge process. + + Checks the process command line for ``bridge.cts`` to avoid killing an + unrelated process that happened to recycle a stale PID. + """ + try: + if os.name == "nt": + import ctypes + + import ctypes.wintypes + + kernel32 = ctypes.windll.kernel32 + handle = kernel32.OpenProcess(0x0400 | 0x0010, False, pid) + if not handle: + return False + try: + exe_path = (ctypes.c_wchar * 260)() + size = ctypes.wintypes.DWORD(260) + if kernel32.K32GetProcessImageFileNameW(handle, exe_path, size): + return "bridge" in str(exe_path.value).lower() + finally: + kernel32.CloseHandle(handle) + return False + # Unix: read /proc//cmdline + cmdline = Path(f"/proc/{pid}/cmdline").read_bytes() + return b"bridge.cts" in cmdline + except Exception: + # If we can't validate, err on the side of safety — skip kill. + return False + + def kill_existing_bridge() -> None: """Kill any previously-running bridge process recorded in the PID file. Called **before** spawning a new bridge to guarantee at-most-one - instance. Safe to call even when no stale PID exists. + instance. Validates that the PID belongs to a bridge process before + sending any signal to avoid killing an unrelated process when the + PID file is stale. """ pid = _read_pid() if pid is not None and _pid_alive(pid): - logger.info("MemOS: killing stale bridge (pid=%d)", pid) - try: - os.kill(pid, signal.SIGTERM) - for _ in range(25): # wait up to 2.5 s - if not _pid_alive(pid): - break - time.sleep(0.1) - else: - os.kill(pid, signal.SIGKILL) - except (OSError, ProcessLookupError): - pass + if not _is_bridge_process(pid): + logger.warning( + "MemOS: PID %d is alive but does not appear to be a bridge " + "process — refusing to kill. Removing stale PID file.", + pid, + ) + else: + logger.info("MemOS: killing stale bridge (pid=%d)", pid) + try: + os.kill(pid, signal.SIGTERM) + for _ in range(25): # wait up to 2.5 s + if not _pid_alive(pid): + break + time.sleep(0.1) + else: + os.kill(pid, signal.SIGKILL) + except (OSError, ProcessLookupError): + pass _clean_pid() diff --git a/apps/memos-local-plugin/bridge.cts b/apps/memos-local-plugin/bridge.cts index 5a3587326..2e8ab4999 100644 --- a/apps/memos-local-plugin/bridge.cts +++ b/apps/memos-local-plugin/bridge.cts @@ -61,8 +61,10 @@ async function main(): Promise { }); await core.init(); - // Default transport: stdio. Start TCP only in daemon mode when a port was given. - const stdio = startStdioServer({ core }); + // In daemon mode stdin is typically /dev/null — starting the stdio + // server would subscribe to events/logs and buffer writes to a pipe + // that nobody drains, wasting memory. Skip it. + const stdio = args.daemon ? null : startStdioServer({ core }); let tcpServer: Awaited> | null = null; if (args.tcpPort !== undefined && !args.daemon) { process.stderr.write( @@ -143,7 +145,15 @@ async function main(): Promise { } catch { /* best-effort */ } - await waitForShutdown(core, stdio); + if (stdio) { + await waitForShutdown(core, stdio); + } else { + try { + await core.shutdown(); + } catch { + /* swallow */ + } + } process.exit(0); }; @@ -155,7 +165,7 @@ async function main(): Promise { if (args.daemon && tcpServer) { await tcpServer.done; await shutdown("daemon_done"); - } else { + } else if (stdio) { await stdio.done; try { if (viewer) await viewer.close(); @@ -164,6 +174,9 @@ async function main(): Promise { } await core.shutdown(); process.exit(0); + } else { + // Daemon mode without TCP — wait forever (kept alive by event loop). + await new Promise(() => {}); } } From 3fa73756177010d44dd68246d25621df547f0ad3 Mon Sep 17 00:00:00 2001 From: Xinyu Du Date: Wed, 29 Apr 2026 02:03:34 +0800 Subject: [PATCH 18/22] chore(memos-local-plugin): sync package-lock.json to 2.0.0-beta.1 --- apps/memos-local-plugin/package-lock.json | 91 +---------------------- 1 file changed, 2 insertions(+), 89 deletions(-) diff --git a/apps/memos-local-plugin/package-lock.json b/apps/memos-local-plugin/package-lock.json index c86737123..cd0124e24 100644 --- a/apps/memos-local-plugin/package-lock.json +++ b/apps/memos-local-plugin/package-lock.json @@ -1,12 +1,12 @@ { "name": "@memtensor/memos-local-plugin", - "version": "2.0.0-alpha.1", + "version": "2.0.0-beta.1", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@memtensor/memos-local-plugin", - "version": "2.0.0-alpha.1", + "version": "2.0.0-beta.1", "hasInstallScript": true, "license": "MIT", "dependencies": { @@ -906,9 +906,6 @@ "cpu": [ "arm" ], - "libc": [ - "glibc" - ], "license": "LGPL-3.0-or-later", "optional": true, "os": [ @@ -925,9 +922,6 @@ "cpu": [ "arm64" ], - "libc": [ - "glibc" - ], "license": "LGPL-3.0-or-later", "optional": true, "os": [ @@ -944,9 +938,6 @@ "cpu": [ "ppc64" ], - "libc": [ - "glibc" - ], "license": "LGPL-3.0-or-later", "optional": true, "os": [ @@ -963,9 +954,6 @@ "cpu": [ "riscv64" ], - "libc": [ - "glibc" - ], "license": "LGPL-3.0-or-later", "optional": true, "os": [ @@ -982,9 +970,6 @@ "cpu": [ "s390x" ], - "libc": [ - "glibc" - ], "license": "LGPL-3.0-or-later", "optional": true, "os": [ @@ -1001,9 +986,6 @@ "cpu": [ "x64" ], - "libc": [ - "glibc" - ], "license": "LGPL-3.0-or-later", "optional": true, "os": [ @@ -1020,9 +1002,6 @@ "cpu": [ "arm64" ], - "libc": [ - "musl" - ], "license": "LGPL-3.0-or-later", "optional": true, "os": [ @@ -1039,9 +1018,6 @@ "cpu": [ "x64" ], - "libc": [ - "musl" - ], "license": "LGPL-3.0-or-later", "optional": true, "os": [ @@ -1058,9 +1034,6 @@ "cpu": [ "arm" ], - "libc": [ - "glibc" - ], "license": "Apache-2.0", "optional": true, "os": [ @@ -1083,9 +1056,6 @@ "cpu": [ "arm64" ], - "libc": [ - "glibc" - ], "license": "Apache-2.0", "optional": true, "os": [ @@ -1108,9 +1078,6 @@ "cpu": [ "ppc64" ], - "libc": [ - "glibc" - ], "license": "Apache-2.0", "optional": true, "os": [ @@ -1133,9 +1100,6 @@ "cpu": [ "riscv64" ], - "libc": [ - "glibc" - ], "license": "Apache-2.0", "optional": true, "os": [ @@ -1158,9 +1122,6 @@ "cpu": [ "s390x" ], - "libc": [ - "glibc" - ], "license": "Apache-2.0", "optional": true, "os": [ @@ -1183,9 +1144,6 @@ "cpu": [ "x64" ], - "libc": [ - "glibc" - ], "license": "Apache-2.0", "optional": true, "os": [ @@ -1208,9 +1166,6 @@ "cpu": [ "arm64" ], - "libc": [ - "musl" - ], "license": "Apache-2.0", "optional": true, "os": [ @@ -1233,9 +1188,6 @@ "cpu": [ "x64" ], - "libc": [ - "musl" - ], "license": "Apache-2.0", "optional": true, "os": [ @@ -1700,9 +1652,6 @@ "arm" ], "dev": true, - "libc": [ - "glibc" - ], "license": "MIT", "optional": true, "os": [ @@ -1717,9 +1666,6 @@ "arm" ], "dev": true, - "libc": [ - "musl" - ], "license": "MIT", "optional": true, "os": [ @@ -1734,9 +1680,6 @@ "arm64" ], "dev": true, - "libc": [ - "glibc" - ], "license": "MIT", "optional": true, "os": [ @@ -1751,9 +1694,6 @@ "arm64" ], "dev": true, - "libc": [ - "musl" - ], "license": "MIT", "optional": true, "os": [ @@ -1768,9 +1708,6 @@ "loong64" ], "dev": true, - "libc": [ - "glibc" - ], "license": "MIT", "optional": true, "os": [ @@ -1785,9 +1722,6 @@ "loong64" ], "dev": true, - "libc": [ - "musl" - ], "license": "MIT", "optional": true, "os": [ @@ -1802,9 +1736,6 @@ "ppc64" ], "dev": true, - "libc": [ - "glibc" - ], "license": "MIT", "optional": true, "os": [ @@ -1819,9 +1750,6 @@ "ppc64" ], "dev": true, - "libc": [ - "musl" - ], "license": "MIT", "optional": true, "os": [ @@ -1836,9 +1764,6 @@ "riscv64" ], "dev": true, - "libc": [ - "glibc" - ], "license": "MIT", "optional": true, "os": [ @@ -1853,9 +1778,6 @@ "riscv64" ], "dev": true, - "libc": [ - "musl" - ], "license": "MIT", "optional": true, "os": [ @@ -1870,9 +1792,6 @@ "s390x" ], "dev": true, - "libc": [ - "glibc" - ], "license": "MIT", "optional": true, "os": [ @@ -1887,9 +1806,6 @@ "x64" ], "dev": true, - "libc": [ - "glibc" - ], "license": "MIT", "optional": true, "os": [ @@ -1904,9 +1820,6 @@ "x64" ], "dev": true, - "libc": [ - "musl" - ], "license": "MIT", "optional": true, "os": [ From f191b150eb0d10c3a36c94419546889a34a6a407 Mon Sep 17 00:00:00 2001 From: Xinyu Du Date: Wed, 29 Apr 2026 17:16:30 +0800 Subject: [PATCH 19/22] fix: reduce empty episodes, finalize idle episodes, filter Hermes auto-skill prompts Three fixes targeting noise reduction and correctness in the Hermes adapter pipeline, plus two Copilot review follow-ups: 1. Remove adapter-initiated episode pre-creation (__init__.py:165). The orchestrator already creates episodes on first real turn via openEpisodeIfNeeded(), so the pre-created empty episode was always abandoned as a 0-turn orphan. 2. Finalize stale episodes with content instead of abandoning them. autoFinalizeStaleTasks() now checks traceIds: episodes with at least one trace are finalized (triggering reflection + scoring), while truly empty episodes are still abandoned. 3. Strip Hermes auto-skill evaluation prompts in _turn_end(). Hermes injects a structured "review the conversation and decide whether to save a skill" prompt at task end. The adapter now detects the distinctive header phrase and truncates the system-generated scaffolding. 4. Add macOS ps(1) fallback for _is_bridge_process() (daemon_manager.py). /proc is not available on macOS, so PID validation always returned False. Fall back to `ps -p -o command=` when /proc is missing. 5. Add episode.delete / episode.delete_bulk JSON-RPC methods. HTTP routes had DELETE endpoints but the bridge lacked corresponding RPC methods, creating API surface inconsistency. --- .../hermes/memos_provider/__init__.py | 25 +++++++++-- .../hermes/memos_provider/bridge_client.py | 12 ++++-- .../hermes/memos_provider/daemon_manager.py | 24 +++++++++-- .../agent-contract/jsonrpc.ts | 2 + apps/memos-local-plugin/bridge/methods.ts | 23 ++++++++++ .../core/pipeline/memory-core.ts | 43 +++++++++++++------ .../tests/unit/bridge/methods.test.ts | 2 + 7 files changed, 106 insertions(+), 25 deletions(-) diff --git a/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py b/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py index 56ee40490..d951f2390 100644 --- a/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py +++ b/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py @@ -48,6 +48,7 @@ import contextlib import json import logging +import re import sys import threading import time @@ -162,12 +163,9 @@ def initialize(self, session_id: str, **kwargs: Any) -> None: # type: ignore[ov }, ) self._session_id = resp.get("sessionId") or session_id - ep = self._bridge.request("episode.open", {"sessionId": self._session_id}) - self._episode_id = ep.get("episodeId", "") logger.info( - "MemOS: bridge ready session=%s episode=%s platform=%s", + "MemOS: bridge ready session=%s platform=%s", self._session_id, - self._episode_id, self._platform, ) except Exception as err: @@ -498,6 +496,17 @@ def _turn_start(self, query: str, *, session_id: str = "") -> str: return "" return f"## Recalled Memories\n{context}" + # Hermes injects a structured auto-skill evaluation prompt at task end: + # "Review the conversation above and consider whether a skill should + # be saved or updated. Work in this order… SURVEY … THINK CLASS-FIRST …" + # Capturing this system-level scaffolding as conversation content pollutes + # memory search, task summaries, and downstream skill generation. + _AUTO_SKILL_EVAL_RE = re.compile( + r"^Review the conversation above and consider whether a " + r"skill should be saved or updated\.", + re.MULTILINE, + ) + def _turn_end( self, user_content: str, @@ -507,6 +516,14 @@ def _turn_end( ) -> None: if not self._bridge: return + # Strip Hermes auto-skill evaluation blocks from the assistant + # response. When the header phrase is present the entire remainder + # of the message is system-generated scaffolding, not user content. + m = self._AUTO_SKILL_EVAL_RE.search(assistant_content) + if m: + assistant_content = assistant_content[: m.start()].strip() + if not assistant_content.strip() and not user_content.strip(): + return self._bridge.request( "turn.end", { diff --git a/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py b/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py index 8467007a6..fb2678e33 100644 --- a/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py +++ b/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py @@ -70,14 +70,18 @@ def read_line(self) -> str | None: return line if line else None def close(self) -> None: - try: - self._rfile.close() - except Exception: - pass + # Shutdown the underlying socket BEFORE closing the buffered file + # handle — on macOS _rfile.close() can block indefinitely when the + # peer has already disconnected. SHUT_RDWR terminates the + # connection first so close() returns immediately. try: self._sock.shutdown(_socket.SHUT_RDWR) except OSError: pass + try: + self._rfile.close() + except Exception: + pass self._sock.close() diff --git a/apps/memos-local-plugin/adapters/hermes/memos_provider/daemon_manager.py b/apps/memos-local-plugin/adapters/hermes/memos_provider/daemon_manager.py index e1f750456..136a0d845 100644 --- a/apps/memos-local-plugin/adapters/hermes/memos_provider/daemon_manager.py +++ b/apps/memos-local-plugin/adapters/hermes/memos_provider/daemon_manager.py @@ -73,7 +73,10 @@ def _pid_alive(pid: int) -> bool: try: os.kill(pid, 0) return True - except (OSError, PermissionError): + except PermissionError: + # Process exists but is owned by another user — still alive. + return True + except OSError: return False @@ -143,9 +146,22 @@ def _is_bridge_process(pid: int) -> bool: finally: kernel32.CloseHandle(handle) return False - # Unix: read /proc//cmdline - cmdline = Path(f"/proc/{pid}/cmdline").read_bytes() - return b"bridge.cts" in cmdline + # Unix: prefer /proc//cmdline; fall back to ps(1) on macOS / BSD. + try: + cmdline = Path(f"/proc/{pid}/cmdline").read_bytes() + return b"bridge.cts" in cmdline + except FileNotFoundError: + import subprocess + try: + result = subprocess.run( + ["ps", "-p", str(pid), "-o", "command="], + capture_output=True, + text=True, + timeout=5, + ) + return result.returncode == 0 and "bridge.cts" in result.stdout + except Exception: + return False except Exception: # If we can't validate, err on the side of safety — skip kill. return False diff --git a/apps/memos-local-plugin/agent-contract/jsonrpc.ts b/apps/memos-local-plugin/agent-contract/jsonrpc.ts index 876fbaccc..8db1f564e 100644 --- a/apps/memos-local-plugin/agent-contract/jsonrpc.ts +++ b/apps/memos-local-plugin/agent-contract/jsonrpc.ts @@ -70,6 +70,8 @@ export const RPC_METHODS = { SESSION_CLOSE: "session.close", EPISODE_OPEN: "episode.open", EPISODE_CLOSE: "episode.close", + EPISODE_DELETE: "episode.delete", + EPISODE_DELETE_BULK: "episode.delete_bulk", // ── pipeline (per turn) ── TURN_START: "turn.start", diff --git a/apps/memos-local-plugin/bridge/methods.ts b/apps/memos-local-plugin/bridge/methods.ts index d81829471..3c3a41457 100644 --- a/apps/memos-local-plugin/bridge/methods.ts +++ b/apps/memos-local-plugin/bridge/methods.ts @@ -134,6 +134,29 @@ export function makeDispatcher( await core.closeEpisode(requireString(p, "episodeId", method) as EpisodeId); return { ok: true }; } + case RPC_METHODS.EPISODE_DELETE: { + const p = asRecord(params, method); + return await core.deleteEpisode(requireString(p, "episodeId", method) as EpisodeId); + } + case RPC_METHODS.EPISODE_DELETE_BULK: { + const p = asRecord(params, method); + const ids = p.ids; + if (!Array.isArray(ids) || ids.length === 0) { + throw new MemosError( + "invalid_argument", + "ids must be a non-empty array", + ); + } + for (const id of ids) { + if (typeof id !== "string" || id.trim().length === 0) { + throw new MemosError( + "invalid_argument", + "each element in ids must be a non-empty string", + ); + } + } + return await core.deleteEpisodes(ids as EpisodeId[]); + } // ── turn lifecycle ── case RPC_METHODS.TURN_START: { diff --git a/apps/memos-local-plugin/core/pipeline/memory-core.ts b/apps/memos-local-plugin/core/pipeline/memory-core.ts index 0ecb403be..6e8d19206 100644 --- a/apps/memos-local-plugin/core/pipeline/memory-core.ts +++ b/apps/memos-local-plugin/core/pipeline/memory-core.ts @@ -284,19 +284,36 @@ export function createMemoryCore( for (const ep of openEpisodes) { const epAge = nowMs - (ep.endedAt ?? ep.startedAt); if (epAge > STALE_EPISODE_TIMEOUT_MS) { - log.info("stale_episode.auto_abandon", { - episodeId: ep.id, - sessionId: ep.sessionId, - ageMs: epAge, - thresholdMs: STALE_EPISODE_TIMEOUT_MS, - }); - try { - handle.episodeManager.abandon( - ep.id as import("../../agent-contract/dto.js").EpisodeId, - `自动关闭:空闲 ${Math.round(epAge / 60_000)} 分钟(阈值 ${Math.round(STALE_EPISODE_TIMEOUT_MS / 60_000)} 分钟)`, - ); - } catch { - // Episode may have been finalized concurrently — safe to ignore. + const idleReason = `自动关闭:空闲 ${Math.round(epAge / 60_000)} 分钟(阈值 ${Math.round(STALE_EPISODE_TIMEOUT_MS / 60_000)} 分钟)`; + if (ep.traceIds && ep.traceIds.length > 0) { + log.info("stale_episode.auto_finalize", { + episodeId: ep.id, + sessionId: ep.sessionId, + ageMs: epAge, + thresholdMs: STALE_EPISODE_TIMEOUT_MS, + traceCount: ep.traceIds.length, + reason: idleReason, + }); + try { + handle.sessionManager.finalizeEpisode(ep.id); + } catch { + // Episode may have been finalized concurrently — safe to ignore. + } + } else { + log.info("stale_episode.auto_abandon", { + episodeId: ep.id, + sessionId: ep.sessionId, + ageMs: epAge, + thresholdMs: STALE_EPISODE_TIMEOUT_MS, + }); + try { + handle.episodeManager.abandon( + ep.id as import("../../agent-contract/dto.js").EpisodeId, + idleReason, + ); + } catch { + // Episode may have been finalized concurrently — safe to ignore. + } } } } diff --git a/apps/memos-local-plugin/tests/unit/bridge/methods.test.ts b/apps/memos-local-plugin/tests/unit/bridge/methods.test.ts index da7daea50..546aba0cd 100644 --- a/apps/memos-local-plugin/tests/unit/bridge/methods.test.ts +++ b/apps/memos-local-plugin/tests/unit/bridge/methods.test.ts @@ -48,6 +48,8 @@ function stubCore(overrides: Partial = {}): MemoryCore { closeSession: vi.fn(async () => {}), openEpisode: vi.fn(async ({ episodeId }) => episodeId ?? "e-auto"), closeEpisode: vi.fn(async () => {}), + deleteEpisode: vi.fn(async () => ({ deleted: true })), + deleteEpisodes: vi.fn(async () => ({ deleted: 1 })), onTurnStart: vi.fn(async () => ({ query: { agent: "openclaw", query: "" }, hits: [], From 7584e4e421fb4fc67624957dee8d14f6e87b5c35 Mon Sep 17 00:00:00 2001 From: Xinyu Du Date: Thu, 30 Apr 2026 14:03:43 +0800 Subject: [PATCH 20/22] fix: abandon() checks rTask before stamping close reason, filter user_content for auto-skill eval - abandon(): when rTask is set, mark episode as finalized instead of abandoned, since the reward pipeline already completed. Log at info level and skip episode.abandoned event. - _turn_end(): apply _AUTO_SKILL_EVAL_RE to user_content too, so the review prompt sent as user_message to the fork agent is stripped. --- .../hermes/memos_provider/__init__.py | 9 ++++-- .../core/session/episode-manager.ts | 32 +++++++++++++------ 2 files changed, 29 insertions(+), 12 deletions(-) diff --git a/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py b/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py index d951f2390..040d3f340 100644 --- a/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py +++ b/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py @@ -516,12 +516,15 @@ def _turn_end( ) -> None: if not self._bridge: return - # Strip Hermes auto-skill evaluation blocks from the assistant - # response. When the header phrase is present the entire remainder - # of the message is system-generated scaffolding, not user content. + # Strip Hermes auto-skill evaluation blocks. The prompt may appear + # in assistant_content (main agent) or user_content (fork agent that + # receives the review prompt as a user_message). m = self._AUTO_SKILL_EVAL_RE.search(assistant_content) if m: assistant_content = assistant_content[: m.start()].strip() + m = self._AUTO_SKILL_EVAL_RE.search(user_content) + if m: + user_content = user_content[: m.start()].strip() if not assistant_content.strip() and not user_content.strip(): return self._bridge.request( diff --git a/apps/memos-local-plugin/core/session/episode-manager.ts b/apps/memos-local-plugin/core/session/episode-manager.ts index 4c9bde929..36cc92ee4 100644 --- a/apps/memos-local-plugin/core/session/episode-manager.ts +++ b/apps/memos-local-plugin/core/session/episode-manager.ts @@ -200,16 +200,30 @@ export function createEpisodeManager(deps: EpisodeManagerDeps): EpisodeManager { const endedAt = now(); snap.status = "closed"; snap.endedAt = endedAt; - snap.meta = { ...snap.meta, closeReason: "abandoned", abandonReason: reason }; + const hasReward = snap.rTask != null; + if (hasReward) { + snap.meta = { ...snap.meta, closeReason: "finalized", abandonReason: undefined }; + log.info("episode.abandon_finalized", { + episodeId: id, + sessionId: snap.sessionId, + turnCount: snap.turnCount, + rTask: snap.rTask, + reason, + }); + } else { + snap.meta = { ...snap.meta, closeReason: "abandoned", abandonReason: reason }; + log.warn("episode.abandoned", { + episodeId: id, + sessionId: snap.sessionId, + turnCount: snap.turnCount, + reason, + }); + } deps.episodesRepo.close(id, endedAt, snap.rTask ?? undefined, snap.meta); - log.warn("episode.abandoned", { - episodeId: id, - sessionId: snap.sessionId, - turnCount: snap.turnCount, - reason, - }); - deps.bus.emit({ kind: "episode.finalized", episode: cloneSnapshot(snap), closedBy: "abandoned" }); - deps.bus.emit({ kind: "episode.abandoned", episodeId: id, reason }); + deps.bus.emit({ kind: "episode.finalized", episode: cloneSnapshot(snap), closedBy: hasReward ? "finalized" : "abandoned" }); + if (!hasReward) { + deps.bus.emit({ kind: "episode.abandoned", episodeId: id, reason }); + } return cloneSnapshot(snap); }, From 0c69c75eb6138693716a506cf24808a6c43ed7c9 Mon Sep 17 00:00:00 2001 From: Xinyu Du Date: Thu, 30 Apr 2026 21:08:24 +0800 Subject: [PATCH 21/22] fix: sync closeReason after async reward, filter review prompt before turn.start - reward.ts: stamp closeReason="finalized" when async scoring completes, so episodes abandoned before scoring (rTask was NULL at closeSession) get corrected to "finalized" once the reward pipeline finishes. - memos_provider/__init__.py: check query against _AUTO_SKILL_EVAL_RE in _run() before _turn_start(), so review prompts never create traces. - memory-core.ts: add TODO for orphan close bypassing capture+reward chain. --- .../adapters/hermes/memos_provider/__init__.py | 11 +++++++---- apps/memos-local-plugin/core/pipeline/memory-core.ts | 7 +++++++ apps/memos-local-plugin/core/reward/reward.ts | 2 ++ 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py b/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py index 040d3f340..42e9f0f77 100644 --- a/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py +++ b/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py @@ -216,10 +216,13 @@ def queue_prefetch(self, query: str, *, session_id: str = "") -> None: # type: def _run() -> None: try: - result = self._turn_start(query, session_id=session_id) if self._bridge else "" - if result: - with self._prefetch_lock: - self._prefetch_result = result + # Skip turn.start for auto-skill eval prompts to avoid + # creating a trace that _turn_end will never complete. + if not self._AUTO_SKILL_EVAL_RE.search(query): + result = self._turn_start(query, session_id=session_id) if self._bridge else "" + if result: + with self._prefetch_lock: + self._prefetch_result = result except Exception as err: logger.debug("MemOS: queue_prefetch failed — %s", err) diff --git a/apps/memos-local-plugin/core/pipeline/memory-core.ts b/apps/memos-local-plugin/core/pipeline/memory-core.ts index 6e8d19206..6debbf934 100644 --- a/apps/memos-local-plugin/core/pipeline/memory-core.ts +++ b/apps/memos-local-plugin/core/pipeline/memory-core.ts @@ -375,6 +375,13 @@ export function createMemoryCore( ep.id as import("../../agent-contract/dto.js").EpisodeId, endedAt, ); + // TODO: orphan close bypasses the event bus (episode.finalized / + // episode.abandoned), so capture+reward never fires even when + // rTask is null. Episodes closed here will show "已完成但 Reward + // 评分尚未完成" in the viewer. For now this only affects orphans + // discovered on bridge restart — closeSession() now triggers the + // full chain for normal exits. + // // If the pipeline already scored this episode (rTask is set), // mark it as "finalized" — the chain ran to completion before // the crash, only the final status flip was lost. Blanket diff --git a/apps/memos-local-plugin/core/reward/reward.ts b/apps/memos-local-plugin/core/reward/reward.ts index 0c9b2aa5d..797ba0400 100644 --- a/apps/memos-local-plugin/core/reward/reward.ts +++ b/apps/memos-local-plugin/core/reward/reward.ts @@ -242,6 +242,8 @@ export function createRewardRunner(deps: RewardDeps): RewardRunner { try { deps.episodesRepo.updateMeta(input.episodeId, { + closeReason: "finalized", + abandonReason: undefined, reward: { rHuman: humanScore.rHuman, source: humanScore.source, From 8d3ed61e8ecd96ba1a097278a8c0e5f291402dc6 Mon Sep 17 00:00:00 2001 From: Xinyu Du Date: Thu, 30 Apr 2026 21:11:35 +0800 Subject: [PATCH 22/22] =?UTF-8?q?chore:=20remove=20TODO=20for=20orphan=20s?= =?UTF-8?q?can=20=E2=80=94=20deliberate=20design,=20not=20a=20bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/memos-local-plugin/core/pipeline/memory-core.ts | 7 ------- 1 file changed, 7 deletions(-) diff --git a/apps/memos-local-plugin/core/pipeline/memory-core.ts b/apps/memos-local-plugin/core/pipeline/memory-core.ts index 6debbf934..6e8d19206 100644 --- a/apps/memos-local-plugin/core/pipeline/memory-core.ts +++ b/apps/memos-local-plugin/core/pipeline/memory-core.ts @@ -375,13 +375,6 @@ export function createMemoryCore( ep.id as import("../../agent-contract/dto.js").EpisodeId, endedAt, ); - // TODO: orphan close bypasses the event bus (episode.finalized / - // episode.abandoned), so capture+reward never fires even when - // rTask is null. Episodes closed here will show "已完成但 Reward - // 评分尚未完成" in the viewer. For now this only affects orphans - // discovered on bridge restart — closeSession() now triggers the - // full chain for normal exits. - // // If the pipeline already scored this episode (rTask is set), // mark it as "finalized" — the chain ran to completion before // the crash, only the final status flip was lost. Blanket