diff --git a/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py b/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py index c70f5fea9..56ee40490 100644 --- a/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py +++ b/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py @@ -62,7 +62,7 @@ if str(_PLUGIN_DIR) not in sys.path: sys.path.insert(0, str(_PLUGIN_DIR)) -from bridge_client import MemosBridgeClient # noqa: E402 +from bridge_client import BridgeError, MemosBridgeClient # noqa: E402 from daemon_manager import ensure_bridge_running, shutdown_bridge # noqa: E402 @@ -142,7 +142,13 @@ def initialize(self, session_id: str, **kwargs: Any) -> None: # type: ignore[ov logger.warning("MemOS: failed to start bridge — %s", err) return try: - self._bridge = MemosBridgeClient() + # Try TCP mode first (connect to daemon bridge). + # Falls back to stdio (spawn subprocess) if daemon isn't running. + try: + self._bridge = MemosBridgeClient(tcp_host="127.0.0.1", tcp_port=18911) + except BridgeError: + logger.info("MemOS: TCP daemon not available, falling back to stdio bridge") + self._bridge = MemosBridgeClient() resp = self._bridge.request( "session.open", { @@ -455,6 +461,10 @@ def on_session_end(self, messages: list[dict[str, Any]]) -> None: # type: ignor if pending: with contextlib.suppress(Exception): self._turn_end(*pending) + # Close the episode and session so the core stamps closure metadata + # (e.g. session.meta.closedAt). In TCP mode this ensures the daemon + # can distinguish "normal shutdown" from "abrupt disconnect" on + # restart, preventing orphan retention. with contextlib.suppress(Exception): self._bridge.request("episode.close", {"episodeId": self._episode_id}) with contextlib.suppress(Exception): diff --git a/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py b/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py index b63b190fe..8467007a6 100644 --- a/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py +++ b/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py @@ -1,12 +1,12 @@ -"""JSON-RPC 2.0 over stdio client for the MemOS bridge. +"""JSON-RPC 2.0 client for the MemOS bridge. -Spawns ``node bridge.cts --agent=hermes`` as a subprocess and communicates -via line-delimited JSON messages on its stdin/stdout. Responses are -matched by ``id``. Notifications (events + logs) are forwarded to -registered callbacks on a reader thread. +Two transport modes: +- **TCP** (recommended): connects to an existing daemon bridge via ``host:port``. + Hermes CLI exits without disrupting the daemon's session → episodes finalize properly. +- **stdio** (fallback): spawns ``node bridge.cts --agent=hermes`` as a subprocess. -The client is *blocking* by design — callers wanting async behaviour -should wrap requests in a thread pool. +Responses are matched by ``id``. Notifications (events + logs) are forwarded to +registered callbacks on a reader thread. Thread-safe. """ from __future__ import annotations @@ -15,13 +15,18 @@ import json import logging import os +import socket as _socket import shutil import subprocess import threading +import time from pathlib import Path from typing import TYPE_CHECKING, Any +# Ensure at-most-one bridge instance via PID file (stdio mode only). +from daemon_manager import kill_existing_bridge, register_bridge + if TYPE_CHECKING: from collections.abc import Callable @@ -29,6 +34,11 @@ logger = logging.getLogger(__name__) +DEFAULT_TCP_HOST = "127.0.0.1" +DEFAULT_TCP_PORT = 18911 +_TCP_RECONNECT_DELAY = 1.0 # seconds between reconnection attempts +_TCP_MAX_RECONNECT = 3 + class BridgeError(RuntimeError): """Raised when the bridge returns a JSON-RPC error object.""" @@ -40,17 +50,58 @@ def __init__(self, code: str, message: str, data: Any = None) -> None: self.data = data +class _SocketTransport: + """TCP socket wrapper with line-delimited JSON read/write.""" + + def __init__(self, host: str, port: int) -> None: + self._sock = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM) + self._sock.settimeout(15.0) + self._sock.connect((host, port)) + self._sock.settimeout(None) # blocking reads after connect + self._rfile = self._sock.makefile("r", buffering=1, encoding="utf-8") + + def write_line(self, text: str) -> None: + # Avoid double newline: callers may or may not include \n. + payload = text if text.endswith("\n") else text + "\n" + self._sock.sendall(payload.encode("utf-8")) + + def read_line(self) -> str | None: + line = self._rfile.readline() + return line if line else None + + def close(self) -> None: + try: + self._rfile.close() + except Exception: + pass + try: + self._sock.shutdown(_socket.SHUT_RDWR) + except OSError: + pass + self._sock.close() + + class MemosBridgeClient: - """Client wrapping a line-delimited JSON-RPC 2.0 stdio bridge. + """Client wrapping a line-delimited JSON-RPC 2.0 bridge. + + Two transport modes: + + **TCP mode** (``tcp_host`` + ``tcp_port`` given): + Connects to an existing daemon bridge process. The Hermes CLI can exit + without closing the daemon's session, so episodes get *finalized* by the + pipeline rather than *abandoned*. On connection loss the client tries to + reconnect a few times. + + **stdio mode** (default): + Spawns ``node bridge.cts --agent=hermes`` as a subprocess and communicates + over its stdin / stdout. The Hermes CLI exiting causes the episode to be + abandoned. Usage: >>> client = MemosBridgeClient() >>> client.request("core.health", {}) {'ok': True, 'version': '...'} >>> client.close() - - Thread-safe: per-request locking ensures concurrent callers don't - interleave writes. """ def __init__( @@ -60,6 +111,8 @@ def __init__( node_binary: str | None = None, agent: str = "hermes", extra_env: dict[str, str] | None = None, + tcp_host: str | None = None, + tcp_port: int | None = None, ) -> None: self._lock = threading.Lock() self._next_id = 1 @@ -67,35 +120,95 @@ def __init__( self._events: list[Callable[[dict[str, Any]], None]] = [] self._logs: list[Callable[[dict[str, Any]], None]] = [] self._closed = False + self._tcp_mode = tcp_host is not None or tcp_port is not None - node = node_binary or shutil.which("node") or "node" - script = bridge_path or str( - Path(__file__).resolve().parent.parent.parent.parent / "bridge.cts" - ) - env = {**os.environ, **(extra_env or {})} - self._proc = subprocess.Popen( - [node, "--experimental-strip-types", script, f"--agent={agent}"], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True, - bufsize=1, - env=env, - ) - self._reader = threading.Thread( - target=self._read_loop, - daemon=True, - name="memos-bridge-reader", - ) - self._reader.start() - self._stderr_reader = threading.Thread( - target=self._stderr_loop, - daemon=True, - name="memos-bridge-stderr", + if self._tcp_mode: + self._tcp_host = tcp_host or DEFAULT_TCP_HOST + self._tcp_port = tcp_port or DEFAULT_TCP_PORT + self._transport: _SocketTransport | None = None + self._connect_tcp() + else: + # ── stdio mode (original behaviour, spawn subprocess) ── + node = node_binary or shutil.which("node") or "node" + script = bridge_path or str( + Path(__file__).resolve().parent.parent.parent.parent / "bridge.cts" + ) + tsx_bin = str( + Path(__file__).resolve().parent.parent.parent.parent / "node_modules" / ".bin" / "tsx" + ) + runtime = tsx_bin if Path(tsx_bin).exists() else node + runtime_args = [] if Path(tsx_bin).exists() else ["--experimental-strip-types"] + env = {**os.environ, **(extra_env or {})} + # Kill any previously-running bridge before spawning a new one. + kill_existing_bridge() + self._proc = subprocess.Popen( + [runtime, *runtime_args, script, f"--agent={agent}"], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + bufsize=1, + env=env, + ) + # Register the new process so daemon_manager can track it. + register_bridge(self._proc) + self._reader = threading.Thread( + target=self._read_loop_stdio, + daemon=True, + name="memos-bridge-reader", + ) + self._reader.start() + self._stderr_reader = threading.Thread( + target=self._stderr_loop, + daemon=True, + name="memos-bridge-stderr", + ) + self._stderr_reader.start() + + # ── TCP connection helpers ────────────────────────────────────── + + def _connect_tcp(self) -> None: + """Connect (or reconnect) to the daemon bridge TCP port.""" + last_err = None + for attempt in range(_TCP_MAX_RECONNECT): + try: + self._transport = _SocketTransport(self._tcp_host, self._tcp_port) + # Start reader thread + self._reader = threading.Thread( + target=self._read_loop_tcp, + daemon=True, + name="memos-bridge-tcp-reader", + ) + self._reader.start() + logger.info( + "bridge_client: connected to daemon at %s:%s", + self._tcp_host, self._tcp_port, + ) + return + except (ConnectionRefusedError, OSError) as e: + last_err = e + if attempt < _TCP_MAX_RECONNECT - 1: + time.sleep(_TCP_RECONNECT_DELAY * (attempt + 1)) + raise BridgeError( + "tcp_connect_failed", + f"Could not connect to daemon bridge at {self._tcp_host}:{self._tcp_port} " + f"after {_TCP_MAX_RECONNECT} attempts: {last_err}", ) - self._stderr_reader.start() - # ─── Public API ── + def _reconnect_tcp(self) -> None: + """Attempt transparent reconnection after transport loss.""" + logger.warning("bridge_client: TCP transport lost, reconnecting…") + old = self._transport + self._transport = None + if old is not None: + with contextlib.suppress(Exception): + old.close() + self._connect_tcp() + # Re-send session greetings so the daemon knows this client is alive. + # The daemon treats a new TCP connection as a new client — no session + # creation needed on the daemon side (it already has its own session). + + # ── Public API ────────────────────────────────────────────────── def request( self, @@ -116,12 +229,7 @@ def request( {"jsonrpc": "2.0", "id": rpc_id, "method": method, "params": params}, ensure_ascii=False, ) - try: - self._proc.stdin.write(payload + "\n") - self._proc.stdin.flush() - except (BrokenPipeError, OSError) as err: - self._pending.pop(rpc_id, None) - raise BridgeError("transport_closed", str(err)) from err + self._write_or_raise(payload + "\n") if not waiter.wait(timeout=timeout): with self._lock: @@ -142,9 +250,8 @@ def notify(self, method: str, params: Any = None) -> None: with self._lock: payload = json.dumps({"jsonrpc": "2.0", "method": method, "params": params}) try: - self._proc.stdin.write(payload + "\n") - self._proc.stdin.flush() - except (BrokenPipeError, OSError): + self._write_text(payload + "\n") + except (BrokenPipeError, OSError, ConnectionError): pass def on_event(self, cb: Callable[[dict[str, Any]], None]) -> None: @@ -157,12 +264,10 @@ def close(self) -> None: if self._closed: return self._closed = True - with contextlib.suppress(Exception): - self._proc.stdin.close() - try: - self._proc.wait(timeout=5.0) - except subprocess.TimeoutExpired: - self._proc.kill() + if self._tcp_mode: + self._close_tcp() + else: + self._close_stdio() # unblock any pending waiters with self._lock: for entry in list(self._pending.values()): @@ -174,36 +279,95 @@ def close(self) -> None: entry["event"].set() self._pending.clear() - # ─── Internals ── + # ── Internals: write helpers ──────────────────────────────────── + + def _write_or_raise(self, text: str) -> None: + """Write *text* to the transport; raise on failure.""" + if self._tcp_mode: + if self._transport is None: + raise BridgeError("transport_closed", "TCP transport disconnected") + try: + self._transport.write_line(text) + except (BrokenPipeError, OSError, ConnectionError) as err: + # Attempt transparent reconnect + try: + self._reconnect_tcp() + self._transport.write_line(text) + except Exception as reconnect_err: + raise BridgeError("transport_closed", str(reconnect_err)) from err + else: + assert self._proc.stdin is not None + try: + self._proc.stdin.write(text) + self._proc.stdin.flush() + except (BrokenPipeError, OSError) as err: + raise BridgeError("transport_closed", str(err)) from err + + def _write_text(self, text: str) -> None: + """Best-effort write, swallow errors.""" + try: + if self._tcp_mode: + if self._transport is not None: + self._transport.write_line(text) + else: + assert self._proc.stdin is not None + self._proc.stdin.write(text) + self._proc.stdin.flush() + except (BrokenPipeError, OSError, ConnectionError): + pass + + # ── Internals: close ──────────────────────────────────────────── + + def _close_tcp(self) -> None: + if self._transport is not None: + self._transport.close() + self._transport = None + + def _close_stdio(self) -> None: + with contextlib.suppress(Exception): + self._proc.stdin.close() + try: + self._proc.wait(timeout=5.0) + except subprocess.TimeoutExpired: + self._proc.kill() + # Clear PID file so a future incarnation starts fresh. + register_bridge(None) - def _read_loop(self) -> None: + # ── Internals: read loops ─────────────────────────────────────── + + def _read_loop_tcp(self) -> None: + """Read line-delimited JSON from the TCP socket.""" + transport = self._transport + if transport is None: + return + while not self._closed: + try: + line = transport.read_line() + except (OSError, ConnectionError): + if not self._closed: + logger.error("bridge_client: TCP read error, reader exiting") + break + if line is None: + if not self._closed: + logger.warning("bridge_client: TCP connection closed by peer") + break + line = line.strip() + if not line: + continue + self._dispatch(line) + # If the socket died but the client isn't closed, try a reconnect + # in the background. The next request() call will also trigger + # reconnection via _write_or_raise. + if not self._closed and self._transport is not None: + logger.info("bridge_client: TCP reader lost; will reconnect on next request") + + def _read_loop_stdio(self) -> None: assert self._proc.stdout is not None for line in self._proc.stdout: line = line.strip() if not line: continue - try: - msg = json.loads(line) - except json.JSONDecodeError: - logger.debug("bridge: malformed line: %r", line[:120]) - continue - if "id" in msg and msg["id"] is not None and ("result" in msg or "error" in msg): - self._resolve(msg) - continue - if msg.get("method") == "events.notify": - for cb in list(self._events): - try: - cb(msg.get("params") or {}) - except Exception: - logger.debug("event listener threw", exc_info=True) - continue - if msg.get("method") == "logs.forward": - for cb in list(self._logs): - try: - cb(msg.get("params") or {}) - except Exception: - logger.debug("log listener threw", exc_info=True) - continue + self._dispatch(line) def _stderr_loop(self) -> None: assert self._proc.stderr is not None @@ -212,6 +376,33 @@ def _stderr_loop(self) -> None: if line: logger.debug("bridge.stderr: %s", line) + # ── Common dispatch ───────────────────────────────────────────── + + def _dispatch(self, line: str) -> None: + """Parse a JSON-RPC line and route it to the right handler.""" + try: + msg = json.loads(line) + except json.JSONDecodeError: + logger.debug("bridge: malformed line: %r", line[:120]) + return + if "id" in msg and msg["id"] is not None and ("result" in msg or "error" in msg): + self._resolve(msg) + return + if msg.get("method") == "events.notify": + for cb in list(self._events): + try: + cb(msg.get("params") or {}) + except Exception: + logger.debug("event listener threw", exc_info=True) + return + if msg.get("method") == "logs.forward": + for cb in list(self._logs): + try: + cb(msg.get("params") or {}) + except Exception: + logger.debug("log listener threw", exc_info=True) + return + def _resolve(self, msg: dict[str, Any]) -> None: rpc_id = msg.get("id") if not isinstance(rpc_id, int): diff --git a/apps/memos-local-plugin/adapters/hermes/memos_provider/daemon_manager.py b/apps/memos-local-plugin/adapters/hermes/memos_provider/daemon_manager.py index 62810cc5b..e1f750456 100644 --- a/apps/memos-local-plugin/adapters/hermes/memos_provider/daemon_manager.py +++ b/apps/memos-local-plugin/adapters/hermes/memos_provider/daemon_manager.py @@ -5,6 +5,8 @@ - Probe Node.js availability so ``MemTensorProvider.is_available`` can answer cheaply at plugin-startup time. - Graceful shutdown helpers invoked from ``MemTensorProvider.shutdown``. +- PID file management to prevent duplicate bridge processes across + Hermes session restarts. This file intentionally has **no runtime dependency** on the client; the provider instantiates its own client. Keeping these concerns split means @@ -17,9 +19,12 @@ from __future__ import annotations import logging +import os import shutil +import signal import subprocess import threading +import time from pathlib import Path @@ -28,6 +33,51 @@ _lock = threading.Lock() _bridge_ok: bool | None = None +_ACTIVE_BRIDGE_PROC: subprocess.Popen | None = None + + +# ─── PID file helpers ──────────────────────────────────────────────────── + + +def _pid_path() -> Path: + """Path to the singleton PID file under the runtime daemon directory. + + Respects ``MEMOS_HOME`` when set (``~/.hermes/memos-plugin`` by + convention), falling back to the plugin source tree only when the env + var is absent for compatibility with development installs. + """ + memos_home = os.environ.get("MEMOS_HOME") + if memos_home: + return Path(memos_home) / "daemon" / "bridge.pid" + return Path(__file__).resolve().parent.parent.parent.parent / "data" / "bridge.pid" + + +def _read_pid() -> int | None: + try: + return int(_pid_path().read_text().strip()) + except (FileNotFoundError, ValueError): + return None + + +def _write_pid(pid: int) -> None: + pid_path = _pid_path() + pid_path.parent.mkdir(parents=True, exist_ok=True) + pid_path.write_text(str(pid)) + + +def _clean_pid() -> None: + _pid_path().unlink(missing_ok=True) + + +def _pid_alive(pid: int) -> bool: + try: + os.kill(pid, 0) + return True + except (OSError, PermissionError): + return False + + +# ─── Bridge lifecycle ──────────────────────────────────────────────────── def _bridge_script() -> Path: @@ -69,8 +119,96 @@ def ensure_bridge_running(*, probe_only: bool = False) -> bool: return True +def _is_bridge_process(pid: int) -> bool: + """Return True when *pid* looks like a bridge process. + + Checks the process command line for ``bridge.cts`` to avoid killing an + unrelated process that happened to recycle a stale PID. + """ + try: + if os.name == "nt": + import ctypes + + import ctypes.wintypes + + kernel32 = ctypes.windll.kernel32 + handle = kernel32.OpenProcess(0x0400 | 0x0010, False, pid) + if not handle: + return False + try: + exe_path = (ctypes.c_wchar * 260)() + size = ctypes.wintypes.DWORD(260) + if kernel32.K32GetProcessImageFileNameW(handle, exe_path, size): + return "bridge" in str(exe_path.value).lower() + finally: + kernel32.CloseHandle(handle) + return False + # Unix: read /proc//cmdline + cmdline = Path(f"/proc/{pid}/cmdline").read_bytes() + return b"bridge.cts" in cmdline + except Exception: + # If we can't validate, err on the side of safety — skip kill. + return False + + +def kill_existing_bridge() -> None: + """Kill any previously-running bridge process recorded in the PID file. + + Called **before** spawning a new bridge to guarantee at-most-one + instance. Validates that the PID belongs to a bridge process before + sending any signal to avoid killing an unrelated process when the + PID file is stale. + """ + pid = _read_pid() + if pid is not None and _pid_alive(pid): + if not _is_bridge_process(pid): + logger.warning( + "MemOS: PID %d is alive but does not appear to be a bridge " + "process — refusing to kill. Removing stale PID file.", + pid, + ) + else: + logger.info("MemOS: killing stale bridge (pid=%d)", pid) + try: + os.kill(pid, signal.SIGTERM) + for _ in range(25): # wait up to 2.5 s + if not _pid_alive(pid): + break + time.sleep(0.1) + else: + os.kill(pid, signal.SIGKILL) + except (OSError, ProcessLookupError): + pass + _clean_pid() + + +def register_bridge(proc: subprocess.Popen | None) -> None: + """Record the current running bridge process. + + Pass ``None`` (e.g. on close) to clear the registration and PID file. + """ + global _ACTIVE_BRIDGE_PROC + _ACTIVE_BRIDGE_PROC = proc + if proc is not None: + _write_pid(proc.pid) + else: + _clean_pid() + + def shutdown_bridge() -> None: - """Best-effort cleanup; each client owns its own subprocess.""" - global _bridge_ok + """Gracefully shut down the tracked bridge subprocess and clean PID file.""" + global _bridge_ok, _ACTIVE_BRIDGE_PROC with _lock: _bridge_ok = None + if _ACTIVE_BRIDGE_PROC is not None: + try: + _ACTIVE_BRIDGE_PROC.terminate() + _ACTIVE_BRIDGE_PROC.wait(timeout=5.0) + logger.info("MemOS: bridge terminated (pid=%d)", _ACTIVE_BRIDGE_PROC.pid) + except subprocess.TimeoutExpired: + _ACTIVE_BRIDGE_PROC.kill() + logger.warning("MemOS: bridge killed after timeout (pid=%d)", _ACTIVE_BRIDGE_PROC.pid) + except Exception: + pass + _ACTIVE_BRIDGE_PROC = None + _clean_pid() diff --git a/apps/memos-local-plugin/agent-contract/memory-core.ts b/apps/memos-local-plugin/agent-contract/memory-core.ts index fbc4c2fc3..e8449503c 100644 --- a/apps/memos-local-plugin/agent-contract/memory-core.ts +++ b/apps/memos-local-plugin/agent-contract/memory-core.ts @@ -91,6 +91,8 @@ export interface MemoryCore { userMessage?: string; }): Promise; closeEpisode(episodeId: EpisodeId): Promise; + deleteEpisode(episodeId: EpisodeId): Promise<{ deleted: boolean }>; + deleteEpisodes(ids: readonly EpisodeId[]): Promise<{ deleted: number }>; // ── pipeline (per turn) ── /** Called *before* the agent acts. Returns the context to inject. */ diff --git a/apps/memos-local-plugin/bridge.cts b/apps/memos-local-plugin/bridge.cts index 62750c3f8..2e8ab4999 100644 --- a/apps/memos-local-plugin/bridge.cts +++ b/apps/memos-local-plugin/bridge.cts @@ -15,6 +15,7 @@ */ // eslint-disable-next-line @typescript-eslint/no-require-imports const path = require("node:path") as typeof import("node:path"); +const pkgVersion: string = (require(path.resolve(__dirname, "package.json")) as { version: string }).version; interface BridgeArgs { daemon: boolean; @@ -44,6 +45,9 @@ async function main(): Promise { const { startStdioServer, waitForShutdown } = (await import( pathToEsmUrl(path.resolve(__dirname, "bridge/stdio.ts")) )) as typeof import("./bridge/stdio.js"); + const { startTcpServer } = (await import( + pathToEsmUrl(path.resolve(__dirname, "bridge/tcp.ts")) + )) as typeof import("./bridge/tcp.js"); const { startHttpServer } = (await import( pathToEsmUrl(path.resolve(__dirname, "server/index.ts")) )) as typeof import("./server/index.js"); @@ -53,12 +57,44 @@ async function main(): Promise { const { core, config, home } = await bootstrapMemoryCoreFull({ agent: args.agent, - pkgVersion: "2.0.0-alpha.1", + pkgVersion, }); await core.init(); - // Default transport: stdio. Daemon + TCP support arrives in V1.1. - const stdio = startStdioServer({ core }); + // In daemon mode stdin is typically /dev/null — starting the stdio + // server would subscribe to events/logs and buffer writes to a pipe + // that nobody drains, wasting memory. Skip it. + const stdio = args.daemon ? null : startStdioServer({ core }); + let tcpServer: Awaited> | null = null; + if (args.tcpPort !== undefined && !args.daemon) { + process.stderr.write( + "bridge: ignoring --tcp because TCP mode requires --daemon\n", + ); + } else if (args.tcpPort !== undefined) { + if (!Number.isFinite(args.tcpPort) || args.tcpPort < 1 || args.tcpPort > 65535) { + process.stderr.write( + `bridge: invalid --tcp port value: ${String(args.tcpPort)} (must be 1–65535)\n`, + ); + } else { + try { + tcpServer = startTcpServer({ + core, + host: "127.0.0.1", + port: args.tcpPort, + }); + await tcpServer.ready; // throws on EADDRINUSE etc. + process.stderr.write(`bridge: tcp → ${tcpServer.url}\n`); + } catch (err) { + process.stderr.write( + `bridge: tcp server failed to start: ${(err as Error).message}\n`, + ); + if (tcpServer) { + await tcpServer.close().catch(() => {}); + tcpServer = null; + } + } + } + } // Boot a viewer too — hermes needs its own HTTP surface for the // Memory Viewer, and it discovers the openclaw hub (if any) so @@ -93,30 +129,55 @@ async function main(): Promise { `bridge: viewer failed to start: ${(err as Error).message}\n`, ); } + let shuttingDown = false; const shutdown = async (sig: string) => { + if (shuttingDown) return; + shuttingDown = true; process.stderr.write(`bridge: received ${sig}, shutting down\n`); + try { + if (tcpServer) await tcpServer.close(); + } catch { + /* best-effort */ + } try { if (viewer) await viewer.close(); } catch { /* best-effort */ } - await waitForShutdown(core, stdio); + if (stdio) { + await waitForShutdown(core, stdio); + } else { + try { + await core.shutdown(); + } catch { + /* swallow */ + } + } process.exit(0); }; process.on("SIGINT", () => void shutdown("SIGINT")); process.on("SIGTERM", () => void shutdown("SIGTERM")); - // Keep the process alive until stdin ends. - await stdio.done; - try { - if (viewer) await viewer.close(); - } catch { - /* best-effort */ + // In daemon mode, keep alive until TCP server stops (stdin is /dev/null). + // In stdio mode, run until the calling process closes stdin. + if (args.daemon && tcpServer) { + await tcpServer.done; + await shutdown("daemon_done"); + } else if (stdio) { + await stdio.done; + try { + if (viewer) await viewer.close(); + } catch { + /* best-effort */ + } + await core.shutdown(); + process.exit(0); + } else { + // Daemon mode without TCP — wait forever (kept alive by event loop). + await new Promise(() => {}); } - await core.shutdown(); - process.exit(0); } async function tryHubRegister(opts: { @@ -127,7 +188,7 @@ async function tryHubRegister(opts: { const body = JSON.stringify({ agent: opts.selfAgent, port: opts.selfPort, - version: "2.0.0-alpha.1", + version: pkgVersion, }); for (let i = 0; i < 6; i++) { try { diff --git a/apps/memos-local-plugin/bridge/tcp.ts b/apps/memos-local-plugin/bridge/tcp.ts new file mode 100644 index 000000000..4167c274a --- /dev/null +++ b/apps/memos-local-plugin/bridge/tcp.ts @@ -0,0 +1,223 @@ +/** + * Line-delimited JSON-RPC over TCP. + * + * A TCP server that accepts connections from remote clients (e.g. the + * Hermes Python provider) and dispatches JSON-RPC 2.0 messages through + * the same `Dispatcher` used by the stdio server. + * + * Each connected client gets its own read/write loop. Notifications + * (events + logs) are broadcast to all connected clients. + * + * The transport: + * - Listens on the port given via `--tcp=`. + * - Reads each connection as UTF-8 text, splits by `\n`. + * - Parses each line as JSON, dispatches via `Dispatcher`. + * - Writes responses as JSON followed by `\n` to the originating socket. + * - Broadcasts `LogRecord` and `CoreEvent` as JSON-RPC notifications + * (method = `logs.forward` / `events.notify`) to all connected clients. + */ + +import { createServer, type Socket } from "node:net"; +import { + JSONRPC_PARSE_ERROR, + JSONRPC_INVALID_REQUEST, + RPC_METHODS, + rpcCodeForError, + type JsonRpcFailure, + type JsonRpcRequest, + type JsonRpcSuccess, +} from "../agent-contract/jsonrpc.js"; +import type { MemoryCore } from "../agent-contract/memory-core.js"; +import { MemosError } from "../agent-contract/errors.js"; +import { errorCodeOf, makeDispatcher } from "./methods.js"; + +// ─── Types ────────────────────────────────────────────────────────────────── + +export interface TcpServerOptions { + core: MemoryCore; + host: string; + port: number; +} + +export interface TcpServerHandle { + readonly url: string; + /** Resolves once the server is actually listening, rejects on error. */ + ready: Promise; + close: () => Promise; + done: Promise; +} + +// ─── Server ───────────────────────────────────────────────────────────────── + +export function startTcpServer(options: TcpServerOptions): TcpServerHandle { + const { core, host, port } = options; + const dispatch = makeDispatcher(core); + + const clients = new Set(); + let closed = false; + let doneResolve: () => void; + const donePromise = new Promise((resolve) => { + doneResolve = resolve; + }); + + // Subscribe to events + logs and broadcast to all connected clients. + const eventsUnsub = core.subscribeEvents((e) => { + broadcast({ jsonrpc: "2.0", method: RPC_METHODS.EVENTS_NOTIFY, params: e }); + }); + const logsUnsub = core.subscribeLogs((r) => { + broadcast({ jsonrpc: "2.0", method: RPC_METHODS.LOGS_FORWARD, params: r }); + }); + + function broadcast(obj: unknown): void { + const payload = JSON.stringify(obj) + "\n"; + for (const sock of clients) { + try { + sock.write(payload); + } catch { + /* best-effort per client */ + } + } + } + + function errorResponse( + id: JsonRpcRequest["id"] | null, + code: number, + message: string, + data?: unknown, + ): JsonRpcFailure { + return { + jsonrpc: "2.0", + id: id ?? null, + error: { code, message, data: data as any }, + }; + } + + function writeLine(sock: Socket, obj: unknown): void { + try { + sock.write(JSON.stringify(obj) + "\n"); + } catch { + /* ignore */ + } + } + + async function handleLine(sock: Socket, line: string): Promise { + const trimmed = line.trim(); + if (trimmed.length === 0) return; + + let msg: JsonRpcRequest | null = null; + try { + msg = JSON.parse(trimmed) as JsonRpcRequest; + } catch (err) { + writeLine( + sock, + errorResponse(null, JSONRPC_PARSE_ERROR, "invalid JSON", { + text: err instanceof Error ? err.message : String(err), + }), + ); + return; + } + + if (!msg || typeof msg !== "object" || msg.jsonrpc !== "2.0" || typeof msg.method !== "string") { + writeLine(sock, errorResponse(msg?.id ?? null, JSONRPC_INVALID_REQUEST, "not JSON-RPC 2.0")); + return; + } + + try { + const result = await dispatch(msg.method, msg.params); + if (msg.id !== undefined && msg.id !== null) { + const ok: JsonRpcSuccess = { jsonrpc: "2.0", id: msg.id, result }; + writeLine(sock, ok); + } + } catch (err) { + const code = rpcCodeForError(errorCodeOf(err)); + const mErr = + err instanceof MemosError + ? err + : new MemosError("internal", err instanceof Error ? err.message : String(err)); + writeLine(sock, errorResponse(msg.id ?? null, code, mErr.message, mErr.toJSON())); + process.stderr.write(`bridge.tcp.dispatch.err ${msg.method}: ${mErr.message}\n`); + } + } + + // ─── Server ─────────────────────────────────────────────────────────────── + + const server = createServer((sock: Socket) => { + clients.add(sock); + process.stderr.write( + `bridge.tcp: client connected (${sock.remoteAddress ?? "unknown"}:${sock.remotePort ?? "?"})\n`, + ); + + let buffer = ""; + sock.setEncoding("utf8"); + + sock.on("data", (chunk: string) => { + buffer += chunk; + let nl = buffer.indexOf("\n"); + while (nl >= 0) { + const line = buffer.slice(0, nl); + buffer = buffer.slice(nl + 1); + void handleLine(sock, line); + nl = buffer.indexOf("\n"); + } + }); + + sock.on("close", () => { + clients.delete(sock); + process.stderr.write( + `bridge.tcp: client disconnected (${sock.remoteAddress ?? "unknown"}:${sock.remotePort ?? "?"})\n`, + ); + }); + + sock.on("error", (err) => { + process.stderr.write(`bridge.tcp: socket error: ${err.message}\n`); + clients.delete(sock); + if (!sock.destroyed) { + sock.destroy(); + } + }); + }); + + // Wrap listen in a promise so callers can catch EADDRINUSE etc. + let isListening = false; + const listenPromise = new Promise((resolve, reject) => { + server.on("error", (err) => { + if (!isListening) { + reject(err); + return; + } + process.stderr.write(`bridge.tcp: server error: ${err.message}\n`); + }); + server.listen(port, host, () => { + isListening = true; + process.stderr.write(`bridge.tcp: listening on ${host}:${port}\n`); + resolve(); + }); + }); + + return { + get url() { + return `tcp://${host}:${port}`; + }, + /** Resolves once the server is actually listening. */ + ready: listenPromise, + async close() { + if (closed) return; + closed = true; + eventsUnsub(); + logsUnsub(); + for (const sock of clients) { + sock.end(); + sock.destroy(); + } + clients.clear(); + await new Promise((resolve, reject) => { + server.close((err) => { + if (err) reject(err); + else resolve(); + }); + }); + doneResolve(); + }, + done: donePromise, + }; +} diff --git a/apps/memos-local-plugin/core/pipeline/memory-core.ts b/apps/memos-local-plugin/core/pipeline/memory-core.ts index 8f5c1d60b..0ecb403be 100644 --- a/apps/memos-local-plugin/core/pipeline/memory-core.ts +++ b/apps/memos-local-plugin/core/pipeline/memory-core.ts @@ -332,9 +332,25 @@ export function createMemoryCore( // to `closed` + sets `closeReason: "abandoned"` without touching // trace_ids_json. try { - const orphans = handle.repos.episodes.list({ status: "open", limit: 500 }); + const openEpisodes = handle.repos.episodes.list({ status: "open", limit: 500 }); + // Batch-fetch sessions to avoid N+1 lookups per episode. + const sessionIds = new Set(openEpisodes.map((ep) => ep.sessionId)); + const sessionById = new Map>(); + for (const sid of sessionIds) { + sessionById.set(sid, handle.repos.sessions.getById(sid)); + } + // Only treat an open episode as an orphan if its session has been + // explicitly closed (meta.closedAt is set) or no longer exists. + // Otherwise the session might reconnect — leave it alone. + const orphans = openEpisodes.filter((ep) => { + const session = sessionById.get(ep.sessionId); + if (!session) return true; // session row gone — orphan + if (session.meta?.closedAt != null) return true; // explicitly closed — orphan + return false; // session exists and not closed — skip, might reconnect + }); if (orphans.length > 0) { - log.info("init.orphan_episodes.close", { count: orphans.length }); + const skipped = openEpisodes.length - orphans.length; + log.info("init.orphan_episodes.close", { count: orphans.length, skipped }); const endedAt = Date.now(); for (const ep of orphans) { try { @@ -632,6 +648,53 @@ export function createMemoryCore( handle.sessionManager.finalizeEpisode(episodeId); } + function assertEpisodeDeletable(episodeId: EpisodeId): void { + const snap = handle.sessionManager.getEpisode(episodeId); + if (snap?.status === "open") { + throw new MemosError( + "conflict", + `cannot delete open episode: ${episodeId}`, + ); + } + // After restart the in-memory manager may not have this episode, + // but it might still be open in SQLite — double-check the DB. + if (!snap && handle.repos.episodes.getById(episodeId)?.status === "open") { + throw new MemosError( + "conflict", + `cannot delete open episode: ${episodeId} (open in DB)`, + ); + } + } + + function deleteClosedEpisode(episodeId: EpisodeId): boolean { + const existing = handle.repos.episodes.getById(episodeId); + assertEpisodeDeletable(episodeId); + const deleted = handle.repos.episodes.deleteById(episodeId); + // Guard against regressions where deletion silently becomes a no-op + // for an existing closed episode. Missing episodes return `false`. + if (existing && !deleted) { + throw new MemosError( + "internal", + `failed to delete closed episode: ${episodeId}`, + ); + } + return deleted; + } + + async function deleteEpisode(episodeId: EpisodeId): Promise<{ deleted: boolean }> { + ensureLive(); + return { deleted: deleteClosedEpisode(episodeId) }; + } + + async function deleteEpisodes(ids: readonly EpisodeId[]): Promise<{ deleted: number }> { + ensureLive(); + let deleted = 0; + for (const id of ids) { + if (deleteClosedEpisode(id)) deleted++; + } + return { deleted }; + } + // ─── Pipeline (per turn) ── async function onTurnStart( turn: Parameters[0], @@ -1947,6 +2010,8 @@ export function createMemoryCore( closeSession, openEpisode, closeEpisode, + deleteEpisode, + deleteEpisodes, onTurnStart, onTurnEnd, submitFeedback, diff --git a/apps/memos-local-plugin/core/session/manager.ts b/apps/memos-local-plugin/core/session/manager.ts index f92356219..bc8c96744 100644 --- a/apps/memos-local-plugin/core/session/manager.ts +++ b/apps/memos-local-plugin/core/session/manager.ts @@ -143,6 +143,15 @@ export function createSessionManager(deps: SessionManagerDeps): SessionManager { for (const ep of epm.listForSession(id)) { if (ep.status === "open") epm.abandon(ep.id, `session_closed:${reason}`); } + // Stamp the session row with closedAt so future bridge init() + // can distinguish "explicitly closed" from "might reconnect". + try { + const row = deps.sessionsRepo.getById(id); + if (row) { + const ts = now(); + deps.sessionsRepo.touchLastSeen(id, ts, { closedAt: ts }); + } + } catch { /* best-effort */ } live.delete(id); log.info("session.closed", { sessionId: id, reason }); bus.emit({ kind: "session.closed", sessionId: id, reason }); diff --git a/apps/memos-local-plugin/core/storage/repos/episodes.ts b/apps/memos-local-plugin/core/storage/repos/episodes.ts index ccff7eed9..dceb638e9 100644 --- a/apps/memos-local-plugin/core/storage/repos/episodes.ts +++ b/apps/memos-local-plugin/core/storage/repos/episodes.ts @@ -90,6 +90,13 @@ export function makeEpisodesRepo(db: StorageDb) { ).run({ id }); }, + deleteById(id: EpisodeId): boolean { + const r = db.prepare<{ id: string }>( + `DELETE FROM episodes WHERE id=@id`, + ).run({ id }); + return r.changes > 0; + }, + setRTask(id: EpisodeId, rTask: number): void { db.prepare<{ id: string; r: number }>( `UPDATE episodes SET r_task=@r WHERE id=@id`, diff --git a/apps/memos-local-plugin/package-lock.json b/apps/memos-local-plugin/package-lock.json index c86737123..cd0124e24 100644 --- a/apps/memos-local-plugin/package-lock.json +++ b/apps/memos-local-plugin/package-lock.json @@ -1,12 +1,12 @@ { "name": "@memtensor/memos-local-plugin", - "version": "2.0.0-alpha.1", + "version": "2.0.0-beta.1", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@memtensor/memos-local-plugin", - "version": "2.0.0-alpha.1", + "version": "2.0.0-beta.1", "hasInstallScript": true, "license": "MIT", "dependencies": { @@ -906,9 +906,6 @@ "cpu": [ "arm" ], - "libc": [ - "glibc" - ], "license": "LGPL-3.0-or-later", "optional": true, "os": [ @@ -925,9 +922,6 @@ "cpu": [ "arm64" ], - "libc": [ - "glibc" - ], "license": "LGPL-3.0-or-later", "optional": true, "os": [ @@ -944,9 +938,6 @@ "cpu": [ "ppc64" ], - "libc": [ - "glibc" - ], "license": "LGPL-3.0-or-later", "optional": true, "os": [ @@ -963,9 +954,6 @@ "cpu": [ "riscv64" ], - "libc": [ - "glibc" - ], "license": "LGPL-3.0-or-later", "optional": true, "os": [ @@ -982,9 +970,6 @@ "cpu": [ "s390x" ], - "libc": [ - "glibc" - ], "license": "LGPL-3.0-or-later", "optional": true, "os": [ @@ -1001,9 +986,6 @@ "cpu": [ "x64" ], - "libc": [ - "glibc" - ], "license": "LGPL-3.0-or-later", "optional": true, "os": [ @@ -1020,9 +1002,6 @@ "cpu": [ "arm64" ], - "libc": [ - "musl" - ], "license": "LGPL-3.0-or-later", "optional": true, "os": [ @@ -1039,9 +1018,6 @@ "cpu": [ "x64" ], - "libc": [ - "musl" - ], "license": "LGPL-3.0-or-later", "optional": true, "os": [ @@ -1058,9 +1034,6 @@ "cpu": [ "arm" ], - "libc": [ - "glibc" - ], "license": "Apache-2.0", "optional": true, "os": [ @@ -1083,9 +1056,6 @@ "cpu": [ "arm64" ], - "libc": [ - "glibc" - ], "license": "Apache-2.0", "optional": true, "os": [ @@ -1108,9 +1078,6 @@ "cpu": [ "ppc64" ], - "libc": [ - "glibc" - ], "license": "Apache-2.0", "optional": true, "os": [ @@ -1133,9 +1100,6 @@ "cpu": [ "riscv64" ], - "libc": [ - "glibc" - ], "license": "Apache-2.0", "optional": true, "os": [ @@ -1158,9 +1122,6 @@ "cpu": [ "s390x" ], - "libc": [ - "glibc" - ], "license": "Apache-2.0", "optional": true, "os": [ @@ -1183,9 +1144,6 @@ "cpu": [ "x64" ], - "libc": [ - "glibc" - ], "license": "Apache-2.0", "optional": true, "os": [ @@ -1208,9 +1166,6 @@ "cpu": [ "arm64" ], - "libc": [ - "musl" - ], "license": "Apache-2.0", "optional": true, "os": [ @@ -1233,9 +1188,6 @@ "cpu": [ "x64" ], - "libc": [ - "musl" - ], "license": "Apache-2.0", "optional": true, "os": [ @@ -1700,9 +1652,6 @@ "arm" ], "dev": true, - "libc": [ - "glibc" - ], "license": "MIT", "optional": true, "os": [ @@ -1717,9 +1666,6 @@ "arm" ], "dev": true, - "libc": [ - "musl" - ], "license": "MIT", "optional": true, "os": [ @@ -1734,9 +1680,6 @@ "arm64" ], "dev": true, - "libc": [ - "glibc" - ], "license": "MIT", "optional": true, "os": [ @@ -1751,9 +1694,6 @@ "arm64" ], "dev": true, - "libc": [ - "musl" - ], "license": "MIT", "optional": true, "os": [ @@ -1768,9 +1708,6 @@ "loong64" ], "dev": true, - "libc": [ - "glibc" - ], "license": "MIT", "optional": true, "os": [ @@ -1785,9 +1722,6 @@ "loong64" ], "dev": true, - "libc": [ - "musl" - ], "license": "MIT", "optional": true, "os": [ @@ -1802,9 +1736,6 @@ "ppc64" ], "dev": true, - "libc": [ - "glibc" - ], "license": "MIT", "optional": true, "os": [ @@ -1819,9 +1750,6 @@ "ppc64" ], "dev": true, - "libc": [ - "musl" - ], "license": "MIT", "optional": true, "os": [ @@ -1836,9 +1764,6 @@ "riscv64" ], "dev": true, - "libc": [ - "glibc" - ], "license": "MIT", "optional": true, "os": [ @@ -1853,9 +1778,6 @@ "riscv64" ], "dev": true, - "libc": [ - "musl" - ], "license": "MIT", "optional": true, "os": [ @@ -1870,9 +1792,6 @@ "s390x" ], "dev": true, - "libc": [ - "glibc" - ], "license": "MIT", "optional": true, "os": [ @@ -1887,9 +1806,6 @@ "x64" ], "dev": true, - "libc": [ - "glibc" - ], "license": "MIT", "optional": true, "os": [ @@ -1904,9 +1820,6 @@ "x64" ], "dev": true, - "libc": [ - "musl" - ], "license": "MIT", "optional": true, "os": [ diff --git a/apps/memos-local-plugin/server/routes/session.ts b/apps/memos-local-plugin/server/routes/session.ts index 0e45717b9..707a569b3 100644 --- a/apps/memos-local-plugin/server/routes/session.ts +++ b/apps/memos-local-plugin/server/routes/session.ts @@ -51,8 +51,21 @@ export function registerSessionRoutes(routes: Routes, deps: ServerDeps): void { writeError(ctx, 400, "invalid_argument", "episodeId is required"); return; } - await deps.core.closeEpisode(id as EpisodeId); - return { ok: true }; + const result = await deps.core.deleteEpisode(id as EpisodeId); + return { ok: true, deleted: result.deleted }; + }); + + routes.set("POST /api/v1/episodes/delete", async (ctx) => { + const body = parseJson<{ ids?: unknown }>(ctx); + const ids = Array.isArray(body.ids) + ? body.ids.filter((v): v is string => typeof v === "string" && v.length > 0) + : []; + if (ids.length === 0) { + writeError(ctx, 400, "invalid_argument", "ids[] is required"); + return; + } + const result = await deps.core.deleteEpisodes(ids as EpisodeId[]); + return { ok: true, deleted: result.deleted }; }); routes.set("GET /api/v1/episodes", async (ctx) => { @@ -61,10 +74,6 @@ export function registerSessionRoutes(routes: Routes, deps: ServerDeps): void { const rawOffset = numberOrUndefined(ctx.url.searchParams.get("offset")); const limit = rawLimit && rawLimit > 0 ? rawLimit : 50; const offset = rawOffset && rawOffset >= 0 ? rawOffset : 0; - // Return the rich row shape — the viewer's task list needs - // session id / status / turn count / preview. The old `ids`-only - // variant is still available under the `episode.list` JSON-RPC - // method and via `?shape=ids`. if (ctx.url.searchParams.get("shape") === "ids") { const episodeIds = await deps.core.listEpisodes({ sessionId, limit, offset }); return { @@ -83,9 +92,6 @@ export function registerSessionRoutes(routes: Routes, deps: ServerDeps): void { }; }); - // Backward-compat: legacy `/api/v1/episodes/timeline?episodeId=…` - // still works; the preferred path `/api/v1/episodes/:id/timeline` - // is registered in `trace.ts`. routes.set("GET /api/v1/episodes/timeline", async (ctx) => { const episodeId = ctx.url.searchParams.get("episodeId"); if (!episodeId) { diff --git a/apps/memos-local-plugin/tests/unit/pipeline/memory-core.test.ts b/apps/memos-local-plugin/tests/unit/pipeline/memory-core.test.ts index cbe2c7b0a..326dcb44b 100644 --- a/apps/memos-local-plugin/tests/unit/pipeline/memory-core.test.ts +++ b/apps/memos-local-plugin/tests/unit/pipeline/memory-core.test.ts @@ -188,7 +188,180 @@ describe("MemoryCore façade", () => { }); }); -describe("bootstrapMemoryCore", () => { +describe("init() orphan episode handling", () => { + it("preserves open episodes for sessions that were not explicitly closed", async () => { + pipeline = createPipeline(buildDeps(db!)); + core = createMemoryCore( + pipeline, + resolveHome("openclaw", "/tmp/memos-mc-test"), + "test", + ); + // Simulate a session still active on disk — no meta.closedAt. + db!.repos.sessions.upsert({ + id: "s-orphan-keep", + agent: "openclaw", + startedAt: 1_700_000_000_000, + lastSeenAt: 1_700_000_100_000, + meta: {}, + }); + db!.repos.episodes.insert({ + id: "ep-orphan-keep", + sessionId: "s-orphan-keep", + startedAt: 1_700_000_000_000, + endedAt: null, + traceIds: [], + rTask: null, + status: "open", + meta: {}, + }); + + await core.init(); + + const ep = db!.repos.episodes.getById("ep-orphan-keep"); + expect(ep).not.toBeNull(); + expect(ep!.status).toBe("open"); + }); + + it("closes open episodes for sessions that were explicitly closed", async () => { + pipeline = createPipeline(buildDeps(db!)); + core = createMemoryCore( + pipeline, + resolveHome("openclaw", "/tmp/memos-mc-test"), + "test", + ); + db!.repos.sessions.upsert({ + id: "s-orphan-close", + agent: "openclaw", + startedAt: 1_700_000_000_000, + lastSeenAt: 1_700_000_200_000, + meta: { closedAt: 1_700_000_200_000 }, + }); + db!.repos.episodes.insert({ + id: "ep-orphan-close", + sessionId: "s-orphan-close", + startedAt: 1_700_000_000_000, + endedAt: null, + traceIds: [], + rTask: null, + status: "open", + meta: {}, + }); + + await core.init(); + + const ep = db!.repos.episodes.getById("ep-orphan-close"); + expect(ep).not.toBeNull(); + expect(ep!.status).toBe("closed"); + }); + + it("closes open episodes for sessions that no longer exist", async () => { + pipeline = createPipeline(buildDeps(db!)); + core = createMemoryCore( + pipeline, + resolveHome("openclaw", "/tmp/memos-mc-test"), + "test", + ); + // FK requires the session to exist first; we then delete it via + // raw SQL to simulate a session row that was removed (e.g. manual + // DB cleanup), leaving an orphan episode behind. + db!.repos.sessions.upsert({ + id: "s-gone", + agent: "openclaw", + startedAt: 1_700_000_000_000, + lastSeenAt: 1_700_000_000_000, + meta: {}, + }); + db!.repos.episodes.insert({ + id: "ep-no-session", + sessionId: "s-gone", + startedAt: 1_700_000_000_000, + endedAt: null, + traceIds: [], + rTask: null, + status: "open", + meta: {}, + }); + // Temporarily disable FK checks so we can delete the session + // while keeping the orphan episode for the test. + db!.db.exec("PRAGMA foreign_keys = OFF; DELETE FROM sessions WHERE id='s-gone'; PRAGMA foreign_keys = ON;"); + + await core.init(); + + const ep = db!.repos.episodes.getById("ep-no-session"); + expect(ep).not.toBeNull(); + expect(ep!.status).toBe("closed"); + }); + }); + + describe("deleteEpisode / deleteEpisodes", () => { + it("deletes a closed episode and returns deleted: true", async () => { + pipeline = createPipeline(buildDeps(db!)); + core = createMemoryCore( + pipeline, + resolveHome("openclaw", "/tmp/memos-mc-test"), + "test", + ); + await core.init(); + const sid = await core.openSession({ agent: "openclaw" }); + const eid = await core.openEpisode({ sessionId: sid }); + await core.closeEpisode(eid); + + const result = await core.deleteEpisode(eid); + expect(result.deleted).toBe(true); + expect(db!.repos.episodes.getById(eid)).toBeNull(); + }); + + it("returns deleted: false for a missing episode", async () => { + pipeline = createPipeline(buildDeps(db!)); + core = createMemoryCore( + pipeline, + resolveHome("openclaw", "/tmp/memos-mc-test"), + "test", + ); + await core.init(); + + const result = await core.deleteEpisode("ep-does-not-exist"); + expect(result.deleted).toBe(false); + }); + + it("throws conflict when deleting an open episode", async () => { + pipeline = createPipeline(buildDeps(db!)); + core = createMemoryCore( + pipeline, + resolveHome("openclaw", "/tmp/memos-mc-test"), + "test", + ); + await core.init(); + const sid = await core.openSession({ agent: "openclaw" }); + const eid = await core.openEpisode({ sessionId: sid }); + + await expect(core.deleteEpisode(eid)).rejects.toMatchObject({ + code: "conflict", + }); + }); + + it("deleteEpisodes bulk-deletes multiple closed episodes", async () => { + pipeline = createPipeline(buildDeps(db!)); + core = createMemoryCore( + pipeline, + resolveHome("openclaw", "/tmp/memos-mc-test"), + "test", + ); + await core.init(); + const sid = await core.openSession({ agent: "openclaw" }); + const e1 = await core.openEpisode({ sessionId: sid }); + const e2 = await core.openEpisode({ sessionId: sid }); + await core.closeEpisode(e1); + await core.closeEpisode(e2); + + const result = await core.deleteEpisodes([e1, e2]); + expect(result.deleted).toBe(2); + expect(db!.repos.episodes.getById(e1)).toBeNull(); + expect(db!.repos.episodes.getById(e2)).toBeNull(); + }); + }); + + describe("bootstrapMemoryCore", () => { let home: TmpHomeContext | null = null; afterEach(async () => {