Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
from pathlib import Path
from typing import TYPE_CHECKING, Any

# Ensure at-most-one bridge instance via PID file.
from daemon_manager import kill_existing_bridge, register_bridge


if TYPE_CHECKING:
from collections.abc import Callable
Expand Down Expand Up @@ -72,16 +75,25 @@ def __init__(
script = bridge_path or str(
Path(__file__).resolve().parent.parent.parent.parent / "bridge.cts"
)
tsx_bin = str(
Path(__file__).resolve().parent.parent.parent.parent / "node_modules" / ".bin" / "tsx"
)
runtime = tsx_bin if Path(tsx_bin).exists() else node
runtime_args = [] if Path(tsx_bin).exists() else ["--experimental-strip-types"]
env = {**os.environ, **(extra_env or {})}
# Kill any previously-running bridge before spawning a new one.
kill_existing_bridge()
self._proc = subprocess.Popen(
[node, "--experimental-strip-types", script, f"--agent={agent}"],
[runtime, *runtime_args, script, f"--agent={agent}"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
env=env,
)
# Register the new process so daemon_manager can track it.
register_bridge(self._proc)
self._reader = threading.Thread(
target=self._read_loop,
daemon=True,
Expand Down Expand Up @@ -163,6 +175,8 @@ def close(self) -> None:
self._proc.wait(timeout=5.0)
except subprocess.TimeoutExpired:
self._proc.kill()
# Clear PID file so a future incarnation starts fresh.
register_bridge(None)
# unblock any pending waiters
with self._lock:
for entry in list(self._pending.values()):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
- Probe Node.js availability so ``MemTensorProvider.is_available`` can
answer cheaply at plugin-startup time.
- Graceful shutdown helpers invoked from ``MemTensorProvider.shutdown``.
- PID file management to prevent duplicate bridge processes across
Hermes session restarts.

This file intentionally has **no runtime dependency** on the client; the
provider instantiates its own client. Keeping these concerns split means
Expand All @@ -17,9 +19,12 @@
from __future__ import annotations

import logging
import os
import shutil
import signal
import subprocess
import threading
import time

from pathlib import Path

Expand All @@ -28,6 +33,41 @@

_lock = threading.Lock()
_bridge_ok: bool | None = None
_ACTIVE_BRIDGE_PROC: subprocess.Popen | None = None


# ─── PID file helpers ────────────────────────────────────────────────────


def _pid_path() -> Path:
"""Path to the singleton PID file under the plugin data directory."""
return Path(__file__).resolve().parent.parent.parent.parent / "data" / "bridge.pid"


def _read_pid() -> int | None:
try:
return int(_pid_path().read_text().strip())
except (FileNotFoundError, ValueError):
return None


def _write_pid(pid: int) -> None:
_pid_path().write_text(str(pid))


def _clean_pid() -> None:
_pid_path().unlink(missing_ok=True)


def _pid_alive(pid: int) -> bool:
try:
os.kill(pid, 0)
return True
except (OSError, PermissionError):
return False


# ─── Bridge lifecycle ────────────────────────────────────────────────────


def _bridge_script() -> Path:
Expand Down Expand Up @@ -69,8 +109,55 @@ def ensure_bridge_running(*, probe_only: bool = False) -> bool:
return True


def kill_existing_bridge() -> None:
"""Kill any previously-running bridge process recorded in the PID file.

Called **before** spawning a new bridge to guarantee at-most-one
instance. Safe to call even when no stale PID exists.
"""
pid = _read_pid()
if pid is not None and _pid_alive(pid):
logger.info("MemOS: killing stale bridge (pid=%d)", pid)
try:
os.kill(pid, signal.SIGTERM)
for _ in range(25): # wait up to 2.5 s
if not _pid_alive(pid):
break
time.sleep(0.1)
else:
os.kill(pid, signal.SIGKILL)
except (OSError, ProcessLookupError):
pass
_clean_pid()


def register_bridge(proc: subprocess.Popen | None) -> None:
"""Record the current running bridge process.

Pass ``None`` (e.g. on close) to clear the registration and PID file.
"""
global _ACTIVE_BRIDGE_PROC
_ACTIVE_BRIDGE_PROC = proc
if proc is not None:
_write_pid(proc.pid)
else:
_clean_pid()


def shutdown_bridge() -> None:
"""Best-effort cleanup; each client owns its own subprocess."""
global _bridge_ok
"""Gracefully shut down the tracked bridge subprocess and clean PID file."""
global _bridge_ok, _ACTIVE_BRIDGE_PROC
with _lock:
_bridge_ok = None
if _ACTIVE_BRIDGE_PROC is not None:
try:
_ACTIVE_BRIDGE_PROC.terminate()
_ACTIVE_BRIDGE_PROC.wait(timeout=5.0)
logger.info("MemOS: bridge terminated (pid=%d)", _ACTIVE_BRIDGE_PROC.pid)
except subprocess.TimeoutExpired:
_ACTIVE_BRIDGE_PROC.kill()
logger.warning("MemOS: bridge killed after timeout (pid=%d)", _ACTIVE_BRIDGE_PROC.pid)
except Exception:
pass
_ACTIVE_BRIDGE_PROC = None
_clean_pid()
2 changes: 2 additions & 0 deletions apps/memos-local-plugin/agent-contract/memory-core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ export interface MemoryCore {
userMessage?: string;
}): Promise<EpisodeId>;
closeEpisode(episodeId: EpisodeId): Promise<void>;
deleteEpisode?(episodeId: EpisodeId): Promise<{ deleted: boolean }>;
deleteEpisodes?(ids: readonly EpisodeId[]): Promise<{ deleted: number }>;

// ── pipeline (per turn) ──
/** Called *before* the agent acts. Returns the context to inject. */
Expand Down
26 changes: 16 additions & 10 deletions apps/memos-local-plugin/bridge.cts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
// eslint-disable-next-line @typescript-eslint/no-require-imports
const path = require("node:path") as typeof import("node:path");
const pkgVersion: string = (require(path.resolve(__dirname, "package.json")) as { version: string }).version;

interface BridgeArgs {
daemon: boolean;
Expand Down Expand Up @@ -53,7 +54,7 @@ async function main(): Promise<void> {

const { core, config, home } = await bootstrapMemoryCoreFull({
agent: args.agent,
pkgVersion: "2.0.0-alpha.1",
pkgVersion,
});
await core.init();

Expand Down Expand Up @@ -108,15 +109,20 @@ async function main(): Promise<void> {
process.on("SIGINT", () => void shutdown("SIGINT"));
process.on("SIGTERM", () => void shutdown("SIGTERM"));

// Keep the process alive until stdin ends.
await stdio.done;
try {
if (viewer) await viewer.close();
} catch {
/* best-effort */
// In daemon mode, run indefinitely (stdin is /dev/null — EOF is normal).
// In stdio mode, run until the calling process closes stdin.
if (args.daemon) {
await new Promise(() => {}); // never resolve → process lives forever
} else {
await stdio.done;
try {
if (viewer) await viewer.close();
} catch {
/* best-effort */
}
await core.shutdown();
process.exit(0);
}
await core.shutdown();
process.exit(0);
}

async function tryHubRegister(opts: {
Expand All @@ -127,7 +133,7 @@ async function tryHubRegister(opts: {
const body = JSON.stringify({
agent: opts.selfAgent,
port: opts.selfPort,
version: "2.0.0-alpha.1",
version: pkgVersion,
});
for (let i = 0; i < 6; i++) {
try {
Expand Down
44 changes: 42 additions & 2 deletions apps/memos-local-plugin/core/pipeline/memory-core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -332,9 +332,19 @@ export function createMemoryCore(
// to `closed` + sets `closeReason: "abandoned"` without touching
// trace_ids_json.
try {
const orphans = handle.repos.episodes.list({ status: "open", limit: 500 });
const openEpisodes = handle.repos.episodes.list({ status: "open", limit: 500 });
// Only treat an open episode as an orphan if its session has been
// explicitly closed (meta.closedAt is set) or no longer exists.
// Otherwise the session might reconnect — leave it alone.
const orphans = openEpisodes.filter((ep) => {
Comment on lines +335 to +339
Copy link

Copilot AI Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new orphan-episode filter changes boot-time behavior in a subtle but reliability-critical way (preserving open episodes when the owning session wasn’t explicitly closed). There are existing MemoryCore tests (e.g. tests/unit/pipeline/memory-core.test.ts); please add coverage for this init-path so regressions don’t reintroduce accidental orphan-closing on restart.

Copilot uses AI. Check for mistakes.
const session = handle.repos.sessions.getById(ep.sessionId);
if (!session) return true; // session row gone — orphan
if (session.meta?.closedAt != null) return true; // explicitly closed — orphan
return false; // session exists and not closed — skip, might reconnect
});
if (orphans.length > 0) {
log.info("init.orphan_episodes.close", { count: orphans.length });
const skipped = openEpisodes.length - orphans.length;
log.info("init.orphan_episodes.close", { count: orphans.length, skipped });
const endedAt = Date.now();
for (const ep of orphans) {
try {
Expand Down Expand Up @@ -632,6 +642,34 @@ export function createMemoryCore(
handle.sessionManager.finalizeEpisode(episodeId);
}

function assertEpisodeDeletable(episodeId: EpisodeId): void {
const snap = handle.sessionManager.getEpisode(episodeId);
if (snap?.status === "open") {
throw new MemosError(
"conflict",
`cannot delete open episode: ${episodeId}`,
);
}
}

async function deleteEpisode(episodeId: EpisodeId): Promise<{ deleted: boolean }> {
ensureLive();
assertEpisodeDeletable(episodeId);
return { deleted: handle.repos.episodes.deleteById(episodeId) };
}
Comment on lines +655 to +659
Copy link

Copilot AI Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New delete APIs/endpoints (deleteEpisode + bulk delete) are user-facing data-management operations; adding at least a small unit test around the delete behavior (and cascade expectations) would align with the existing MemoryCore test coverage and guard against regressions like the earlier no-op deletion bug.

Copilot uses AI. Check for mistakes.

async function deleteEpisodes(ids: readonly EpisodeId[]): Promise<{ deleted: number }> {
ensureLive();
Comment thread
Starfie1d1272 marked this conversation as resolved.
for (const id of ids) {
assertEpisodeDeletable(id);
}
let deleted = 0;
for (const id of ids) {
if (handle.repos.episodes.deleteById(id)) deleted++;
}
return { deleted };
}

// ─── Pipeline (per turn) ──
async function onTurnStart(
turn: Parameters<MemoryCore["onTurnStart"]>[0],
Expand Down Expand Up @@ -1947,6 +1985,8 @@ export function createMemoryCore(
closeSession,
openEpisode,
closeEpisode,
deleteEpisode,
deleteEpisodes,
onTurnStart,
onTurnEnd,
submitFeedback,
Expand Down
9 changes: 9 additions & 0 deletions apps/memos-local-plugin/core/session/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,15 @@ export function createSessionManager(deps: SessionManagerDeps): SessionManager {
for (const ep of epm.listForSession(id)) {
if (ep.status === "open") epm.abandon(ep.id, `session_closed:${reason}`);
}
// Stamp the session row with closedAt so future bridge init()
// can distinguish "explicitly closed" from "might reconnect".
try {
const row = deps.sessionsRepo.getById(id);
if (row) {
const ts = now();
deps.sessionsRepo.touchLastSeen(id, ts, { closedAt: ts });
}
} catch { /* best-effort */ }
live.delete(id);
log.info("session.closed", { sessionId: id, reason });
bus.emit({ kind: "session.closed", sessionId: id, reason });
Expand Down
7 changes: 7 additions & 0 deletions apps/memos-local-plugin/core/storage/repos/episodes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@ export function makeEpisodesRepo(db: StorageDb) {
).run({ id });
},

deleteById(id: EpisodeId): boolean {
const r = db.prepare<{ id: string }>(
`DELETE FROM episodes WHERE id=@id`,
).run({ id });
return r.changes > 0;
},

setRTask(id: EpisodeId, rTask: number): void {
db.prepare<{ id: string; r: number }>(
`UPDATE episodes SET r_task=@r WHERE id=@id`,
Expand Down
23 changes: 14 additions & 9 deletions apps/memos-local-plugin/server/routes/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,20 @@ export function registerSessionRoutes(routes: Routes, deps: ServerDeps): void {
writeError(ctx, 400, "invalid_argument", "episodeId is required");
return;
}
await deps.core.closeEpisode(id as EpisodeId);
return { ok: true };
const result = await deps.core.deleteEpisode(id as EpisodeId);
return result;
});

routes.set("POST /api/v1/episodes/delete", async (ctx) => {
const body = parseJson<{ ids?: unknown }>(ctx);
const ids = Array.isArray(body.ids)
? body.ids.filter((v): v is string => typeof v === "string" && v.length > 0)
: [];
if (ids.length === 0) {
writeError(ctx, 400, "invalid_argument", "ids[] is required");
return;
}
return await deps.core.deleteEpisodes(ids as EpisodeId[]);
});

routes.set("GET /api/v1/episodes", async (ctx) => {
Expand All @@ -61,10 +73,6 @@ export function registerSessionRoutes(routes: Routes, deps: ServerDeps): void {
const rawOffset = numberOrUndefined(ctx.url.searchParams.get("offset"));
const limit = rawLimit && rawLimit > 0 ? rawLimit : 50;
const offset = rawOffset && rawOffset >= 0 ? rawOffset : 0;
// Return the rich row shape — the viewer's task list needs
// session id / status / turn count / preview. The old `ids`-only
// variant is still available under the `episode.list` JSON-RPC
// method and via `?shape=ids`.
if (ctx.url.searchParams.get("shape") === "ids") {
const episodeIds = await deps.core.listEpisodes({ sessionId, limit, offset });
return {
Expand All @@ -83,9 +91,6 @@ export function registerSessionRoutes(routes: Routes, deps: ServerDeps): void {
};
});

// Backward-compat: legacy `/api/v1/episodes/timeline?episodeId=…`
// still works; the preferred path `/api/v1/episodes/:id/timeline`
// is registered in `trace.ts`.
routes.set("GET /api/v1/episodes/timeline", async (ctx) => {
const episodeId = ctx.url.searchParams.get("episodeId");
if (!episodeId) {
Expand Down