diff --git a/apps/mesh/src/api/app.ts b/apps/mesh/src/api/app.ts index c1e0357748..61a90d23e0 100644 --- a/apps/mesh/src/api/app.ts +++ b/apps/mesh/src/api/app.ts @@ -92,7 +92,6 @@ import { NatsStreamBuffer } from "./routes/decopilot/nats-stream-buffer"; import { RunRegistry } from "./routes/decopilot/run-registry"; import type { RunReactorDeps } from "./routes/decopilot/run-reactor"; import { SqlThreadStorage } from "../storage/threads"; -import type { Thread } from "../storage/types"; import { registerMonitoringRetentionWorkflow } from "../monitoring/dbos-retention-workflow"; import { cleanupOldMonitoringFiles } from "../monitoring/ndjson-retention"; import { getLogsDir, getTracesDir, getMetricsDir } from "../monitoring/schema"; @@ -114,12 +113,6 @@ import { } from "../dispatch-queue"; import { DBOS } from "@dbos-inc/dbos-sdk"; import { dispatchRunAndWait } from "./routes/decopilot/dispatch-run"; -import { - PersistedRunConfigSchema, - toModelsConfig, -} from "./routes/decopilot/run-config"; -import { getPodId } from "../core/pod-identity"; -import { NatsPodHeartbeat } from "../nats/pod-heartbeat"; import { createAutomationsStorage } from "../storage/automations"; import { KyselyKVStorage } from "../storage/kv"; import { KyselyTriggerCallbackTokenStorage } from "../storage/trigger-callback-tokens"; @@ -748,7 +741,7 @@ export async function createApp(options: CreateAppOptions = {}) { })(); }, createTailStream: async () => null, - purge: () => {}, + purge: async () => {}, teardown: () => {}, }; } else { @@ -813,8 +806,7 @@ export async function createApp(options: CreateAppOptions = {}) { sseHub, }; - const POD_ID = getPodId(); - const runRegistry = new RunRegistry(cancelReactorDeps, POD_ID); + const runRegistry = new RunRegistry(cancelReactorDeps); cancelBroadcast .start((taskId) => { @@ -839,38 +831,11 @@ export async function createApp(options: CreateAppOptions = {}) { ); }); - // Per-pod heartbeat via NATS KV (only when NATS is available) - let podHeartbeat: NatsPodHeartbeat | null = null; - if (natsProvider) { - podHeartbeat = new NatsPodHeartbeat({ - getConnection: () => natsProvider!.getConnection(), - getJetStream: () => natsProvider!.getJetStream(), - }); - - // Attempt immediate init (may no-op if NATS not ready) - podHeartbeat - .init() - .then(() => { - podHeartbeat!.start(POD_ID); - }) - .catch(() => {}); - - // Re-init when NATS connects - natsProvider.onReady(() => { - podHeartbeat! - .init() - .then(() => { - podHeartbeat!.start(POD_ID); - }) - .catch((err: unknown) => { - console.error("[PodHeartbeat] Deferred init failed:", err); - }); - }); - } - currentDecopilotCleanup = async () => { - // Delete KV key first → watcher fires on other pods → immediate handoff - await podHeartbeat?.stop(); + // Abort in-flight runs so streamText loops stop cleanly. DBOS's + // launch-time recovery picks up the workflows on the next start of + // this same pod (executorID = POD_NAME, stable on K8s StatefulSet), + // so we don't need any in-process death detection here. await runRegistry.stopAll(); runRegistry.dispose(); cancelBroadcast.stop().catch(() => {}); @@ -1219,108 +1184,20 @@ export async function createApp(options: CreateAppOptions = {}) { ).setAutomationEventDispatcher(automationEventDispatcher); } - // ============================================================================ - // Crash Recovery — resume orphaned automation runs after rolling deploy - // ============================================================================ - - /** Shared resume function for both startup recovery and pod-death watcher. */ - const resumeOrphanedThread = async (thread: Thread) => { - const parsed = PersistedRunConfigSchema.safeParse(thread.run_config); - if (!parsed.success) { - console.warn( - `[recovery] Invalid run_config for ${thread.id}, force-failing`, - ); - await threadStorage.forceFailIfInProgress( - thread.id, - thread.organization_id, - ); - return; - } - const config = parsed.data; - - // Build context for the original user - const resumeCtx = await automationContextFactory( - thread.organization_id, - thread.created_by, - ); - if (!resumeCtx) { - console.warn( - `[recovery] Cannot build context for ${thread.id}, force-failing`, - ); - await threadStorage.forceFailIfInProgress( - thread.id, - thread.organization_id, - ); - return; - } - - // Audit trail: record that this run was auto-resumed - const now = new Date().toISOString(); - await threadStorage.saveMessages( - [ - { - id: crypto.randomUUID(), - thread_id: thread.id, - role: "system", - parts: [ - { - type: "text", - text: "Run resumed automatically after infrastructure restart.", - }, - ], - metadata: undefined, - created_at: now, - updated_at: now, - }, - ], - thread.organization_id, - ); - - // Pod-death recovery: a different pod's run was claimed by us. Drain - // synchronously to know when the run completes server-side. We - // deliberately don't pass a streamBuffer here — this background - // recovery is the safety net for threads no DBOS replay or attached - // client picks up; clients reconnecting via /stream see the run via - // the workflow's own JetStream pump on the pod that DBOS replayed it - // onto. - await dispatchRunAndWait( - { - messages: [], - models: toModelsConfig(config.models), - agent: config.agent, - temperature: config.temperature, - toolApprovalLevel: config.toolApprovalLevel, - mode: config.mode, - organizationId: thread.organization_id, - userId: thread.created_by, - taskId: thread.id, - windowSize: config.windowSize, - isResume: true, - }, - resumeCtx, - { runRegistry, cancelBroadcast }, - ); - }; - - // Wire pod death watcher → orphan recovery - if (podHeartbeat) { - podHeartbeat.onPodDeath((deadPodId) => { - runRegistry - .handlePodDeath(deadPodId, resumeOrphanedThread, cancelBroadcast) - .catch((err) => { - console.error( - `[Decopilot] Pod death recovery failed for ${deadPodId}:`, - err, - ); - }); - }); - } - - setTimeout(() => { - runRegistry.recoverOrphanedRuns(resumeOrphanedThread).catch((err) => { - console.error("[recovery] Orphan recovery failed:", err); - }); - }, 10_000); // 10s grace for rolling deploys + // Crash recovery is delegated to DBOS. A pod that dies mid-stream + // leaves its threadGateWorkflow row in `dbos.workflow_status` with + // status PENDING and executor_id = POD_NAME. When K8s restarts the + // pod with the same name (StatefulSet), DBOS launch-time recovery + // re-runs the `dispatchRunAndWait` step from scratch — streamText + // produces chunks again into the same per-thread JetStream subject, + // and SSE tails on any pod see the resumed stream. No cross-pod + // detection or claim CAS lives here on purpose. + // + // Pre-DBOS: a custom Postgres-advisory-lock heartbeat detected dead + // peers in ~5s and CAS-stole their runs. Deleted in favor of trusting + // DBOS recovery — fewer races, single source of truth, and the + // bounded-by-pod-restart-time recovery window is acceptable for chat + // UX. See thread-gate-workflow.ts for the durable workflow surface. // NDJSON monitoring retention cleanup runs as a DBOS scheduled workflow // (see `initDbos` below). Kick off a single eager sweep at boot so a fresh diff --git a/apps/mesh/src/api/routes/decopilot/dispatch-run.ts b/apps/mesh/src/api/routes/decopilot/dispatch-run.ts index 75c3812b64..5695b6a75a 100644 --- a/apps/mesh/src/api/routes/decopilot/dispatch-run.ts +++ b/apps/mesh/src/api/routes/decopilot/dispatch-run.ts @@ -558,8 +558,24 @@ async function prepareRun( } } - // Purge stale buffered chunks from any previous run on this thread - streamBuffer?.purge(mem.thread.id); + // Purge stale buffered chunks from any previous run on this thread. + // Always purges — including on resume — because the resumed run + // re-invokes the LLM from scratch and produces chunk-1..chunk-N + // again. Without this, any /stream opened after recovery starts + // would see the assistant's reply twice (deliverPolicy:"all" + // replays the dead-pod prefix and then the resumed run's full + // body). Regression guard: + // tests/multi-pod/scenarios/pod-death-dbos-replay.test.ts. + // + // ⚠️ Known UX gap, not addressed here: a /stream that was already + // tailing when the owner pod died will still receive the dead + // pod's prefix from its local consumer buffer (the purge is + // server-side and doesn't reach into already-delivered messages) + // AND the resumed run's full body afterwards — so it sees the + // reply rendered twice. A proper fix would publish a "reset" + // sentinel to the subject before the resume pump starts so all + // consumers flush their UI buffer; left as a follow-up. + await streamBuffer?.purge(mem.thread.id); // Split system messages from user message const systemMessages = input.messages.filter((m) => m.role === "system"); diff --git a/apps/mesh/src/api/routes/decopilot/nats-stream-buffer.ts b/apps/mesh/src/api/routes/decopilot/nats-stream-buffer.ts index 36d3e51b50..038391d581 100644 --- a/apps/mesh/src/api/routes/decopilot/nats-stream-buffer.ts +++ b/apps/mesh/src/api/routes/decopilot/nats-stream-buffer.ts @@ -249,11 +249,15 @@ export class NatsStreamBuffer implements StreamBuffer { }); } - purge(taskId: string): void { + async purge(taskId: string): Promise { if (!this.jsm) return; - this.jsm.streams - .purge(STREAM_NAME, { filter: streamSubject(taskId) }) - .catch(() => {}); + try { + await this.jsm.streams.purge(STREAM_NAME, { + filter: streamSubject(taskId), + }); + } catch { + // Best-effort cleanup; never propagate to the caller. + } } teardown(): void { diff --git a/apps/mesh/src/api/routes/decopilot/run-reactor.test.ts b/apps/mesh/src/api/routes/decopilot/run-reactor.test.ts index d42ef950b0..1e2871b154 100644 --- a/apps/mesh/src/api/routes/decopilot/run-reactor.test.ts +++ b/apps/mesh/src/api/routes/decopilot/run-reactor.test.ts @@ -20,11 +20,7 @@ function makeDeps(): RunReactorDeps { listMessages: mock(() => Promise.resolve({ messages: [], total: 0 })), listByTriggerIds: mock(() => Promise.resolve({ threads: [], total: 0 })), forceFailIfInProgress: mock(() => Promise.resolve(true)), - claimOrphanedRun: mock(() => Promise.resolve(false)), claimRunStart: mock(() => Promise.resolve(true)), - listOrphanedRuns: mock(() => Promise.resolve([])), - listOrphanedRunsByPod: mock(() => Promise.resolve([])), - orphanRunsByPod: mock(() => Promise.resolve([])), addInflightAsyncJob: mock(() => Promise.resolve()), findInflightAsyncJob: mock(() => Promise.resolve(null)), removeInflightAsyncJob: mock(() => Promise.resolve()), diff --git a/apps/mesh/src/api/routes/decopilot/run-reactor.ts b/apps/mesh/src/api/routes/decopilot/run-reactor.ts index d9975d3d13..6ff1f9f9e1 100644 --- a/apps/mesh/src/api/routes/decopilot/run-reactor.ts +++ b/apps/mesh/src/api/routes/decopilot/run-reactor.ts @@ -62,7 +62,7 @@ async function handleTerminalStatus( run_config: null, run_started_at: null, }); - streamBuffer.purge(taskId); + void streamBuffer.purge(taskId); sseHub.emit( orgId, createDecopilotThreadStatusEvent(taskId, status, { @@ -168,7 +168,7 @@ async function react(event: RunEvent, deps: RunReactorDeps): Promise { run_started_at: null, }); } - streamBuffer.purge(event.taskId); + void streamBuffer.purge(event.taskId); const failedThread = await storage.get(event.taskId, event.orgId); sseHub.emit( event.orgId, diff --git a/apps/mesh/src/api/routes/decopilot/run-registry.test.ts b/apps/mesh/src/api/routes/decopilot/run-registry.test.ts index 2e19daf226..d072160248 100644 --- a/apps/mesh/src/api/routes/decopilot/run-registry.test.ts +++ b/apps/mesh/src/api/routes/decopilot/run-registry.test.ts @@ -19,11 +19,7 @@ function makeNoopDeps(): RunReactorDeps { listMessages: mock(() => Promise.resolve({ messages: [], total: 0 })), listByTriggerIds: mock(() => Promise.resolve({ threads: [], total: 0 })), forceFailIfInProgress: mock(() => Promise.resolve(false)), - claimOrphanedRun: mock(() => Promise.resolve(false)), claimRunStart: mock(() => Promise.resolve(true)), - listOrphanedRuns: mock(() => Promise.resolve([])), - listOrphanedRunsByPod: mock(() => Promise.resolve([])), - orphanRunsByPod: mock(() => Promise.resolve([])), addInflightAsyncJob: mock(() => Promise.resolve()), findInflightAsyncJob: mock(() => Promise.resolve(null)), removeInflightAsyncJob: mock(() => Promise.resolve()), @@ -45,10 +41,7 @@ function createRegistry( deps = makeNoopDeps(), clock?: () => Date, ): RunRegistry { - const podId = "test-pod"; - const r = clock - ? new RunRegistry(deps, podId, clock) - : new RunRegistry(deps, podId); + const r = clock ? new RunRegistry(deps, clock) : new RunRegistry(deps); createdRegistries.push(r); return r; } @@ -328,7 +321,7 @@ describe("RunRegistry", () => { // stopAll // ------------------------------------------------------------------------- describe("stopAll (orphan semantics)", () => { - it("orphans runs in DB, aborts running entries, and clears state", async () => { + it("aborts running entries and clears state without touching DB ownership", async () => { const deps = makeNoopDeps(); const registry = createRegistry(deps); @@ -348,8 +341,11 @@ describe("RunRegistry", () => { await registry.stopAll(); - // DB orphan is called first so runs are resumable if process dies - expect(deps.storage.orphanRunsByPod).toHaveBeenCalled(); + // run_owner_pod is deliberately left set so survivors that detect + // our death via the released advisory lock can find these runs + // via listOrphanedRunsByPod(thisPodId). Nulling here drops the + // recovery signal — see graceful-shutdown comment in stopAll(). + expect(deps.storage.update).not.toHaveBeenCalled(); // In-memory: abort controllers triggered and state cleared expect(signalT1.aborted).toBe(true); @@ -358,15 +354,6 @@ describe("RunRegistry", () => { expect(registry.isRunning("t2")).toBe(false); }); - it("calls orphanRunsByPod with correct podId", async () => { - const deps = makeNoopDeps(); - const registry = createRegistry(deps); - startThread(registry, "t1", "org1", "u1"); - - await registry.stopAll(); - expect(deps.storage.orphanRunsByPod).toHaveBeenCalledWith("test-pod"); - }); - it("aborts AbortControllers", async () => { const deps = makeNoopDeps(); const registry = createRegistry(deps); @@ -385,179 +372,7 @@ describe("RunRegistry", () => { await registry.stopAll(); expect(registry.isRunning("t1")).toBe(false); }); - - it("handles orphanRunsByPod failure gracefully", async () => { - const deps = makeNoopDeps(); - ( - deps.storage.orphanRunsByPod as ReturnType - ).mockImplementation(() => Promise.reject(new Error("DB down"))); - const registry = createRegistry(deps); - startThread(registry, "t1", "org1", "u1"); - const signal = registry.getAbortSignal("t1")!; - - await registry.stopAll(); // should not throw - - // controllers still aborted, state still cleared - expect(signal.aborted).toBe(true); - expect(registry.isRunning("t1")).toBe(false); - }); }); - - // ------------------------------------------------------------------------- - // recoverOrphanedRuns - // ------------------------------------------------------------------------- - describe("recoverOrphanedRuns", () => { - it("auto-resumes orphaned runs", async () => { - const deps = makeNoopDeps(); - ( - deps.storage.listOrphanedRuns as ReturnType - ).mockImplementation(() => - Promise.resolve([ - { - id: "t1", - organization_id: "org1", - trigger_id: "trig1", - run_config: {}, - title: "t", - description: null, - status: "in_progress", - created_at: "", - updated_at: "", - created_by: "u1", - updated_by: undefined, - hidden: false, - context_start_message_id: null, - run_owner_pod: null, - run_started_at: null, - }, - ]), - ); - ( - deps.storage.claimOrphanedRun as ReturnType - ).mockImplementation(() => Promise.resolve(true)); - const registry = createRegistry(deps); - const resumeFn = mock(() => Promise.resolve()); - await registry.recoverOrphanedRuns(resumeFn); - expect(resumeFn).toHaveBeenCalled(); - }); - - it("auto-resumes interactive runs (trigger_id null) too", async () => { - const deps = makeNoopDeps(); - ( - deps.storage.listOrphanedRuns as ReturnType - ).mockImplementation(() => - Promise.resolve([ - { - id: "t1", - organization_id: "org1", - trigger_id: null, - run_config: {}, - title: "t", - description: null, - status: "in_progress", - created_at: "", - updated_at: "", - created_by: "u1", - updated_by: undefined, - hidden: false, - context_start_message_id: null, - run_owner_pod: null, - run_started_at: null, - }, - ]), - ); - ( - deps.storage.claimOrphanedRun as ReturnType - ).mockImplementation(() => Promise.resolve(true)); - const registry = createRegistry(deps); - const resumeFn = mock(() => Promise.resolve()); - await registry.recoverOrphanedRuns(resumeFn); - expect(resumeFn).toHaveBeenCalled(); - }); - - it("skips when CAS claim fails", async () => { - const deps = makeNoopDeps(); - ( - deps.storage.listOrphanedRuns as ReturnType - ).mockImplementation(() => - Promise.resolve([ - { - id: "t1", - organization_id: "org1", - trigger_id: "trig1", - run_config: {}, - title: "t", - description: null, - status: "in_progress", - created_at: "", - updated_at: "", - created_by: "u1", - updated_by: undefined, - hidden: false, - context_start_message_id: null, - run_owner_pod: null, - run_started_at: null, - }, - ]), - ); - ( - deps.storage.claimOrphanedRun as ReturnType - ).mockImplementation(() => Promise.resolve(false)); - const registry = createRegistry(deps); - const resumeFn = mock(() => Promise.resolve()); - await registry.recoverOrphanedRuns(resumeFn); - expect(resumeFn).not.toHaveBeenCalled(); - }); - - it("force-fails on resumeFn error", async () => { - const deps = makeNoopDeps(); - ( - deps.storage.listOrphanedRuns as ReturnType - ).mockImplementation(() => - Promise.resolve([ - { - id: "t1", - organization_id: "org1", - trigger_id: "trig1", - run_config: {}, - title: "t", - description: null, - status: "in_progress", - created_at: "", - updated_at: "", - created_by: "u1", - updated_by: undefined, - hidden: false, - context_start_message_id: null, - run_owner_pod: null, - run_started_at: null, - }, - ]), - ); - ( - deps.storage.claimOrphanedRun as ReturnType - ).mockImplementation(() => Promise.resolve(true)); - const registry = createRegistry(deps); - const resumeFn = mock(() => Promise.reject(new Error("boom"))); - await registry.recoverOrphanedRuns(resumeFn); - expect(deps.storage.forceFailIfInProgress).toHaveBeenCalledWith( - "t1", - "org1", - ); - }); - - it("handles empty orphan list", async () => { - const deps = makeNoopDeps(); - ( - deps.storage.listOrphanedRuns as ReturnType - ).mockImplementation(() => Promise.resolve([])); - const registry = createRegistry(deps); - const resumeFn = mock(() => Promise.resolve()); - await registry.recoverOrphanedRuns(resumeFn); - expect(resumeFn).not.toHaveBeenCalled(); - }); - }); - // ------------------------------------------------------------------------- // reapStaleRuns // ------------------------------------------------------------------------- @@ -624,113 +439,4 @@ describe("RunRegistry", () => { expect(registry.isRunning("fresh")).toBe(true); }); }); - - // ------------------------------------------------------------------------- - // handlePodDeath - // ------------------------------------------------------------------------- - describe("handlePodDeath", () => { - const makeThread = (id: string, orgId = "org1") => ({ - id, - organization_id: orgId, - trigger_id: null, - run_config: {}, - title: "t", - description: null, - status: "in_progress" as const, - created_at: "", - updated_at: "", - created_by: "u1", - updated_by: undefined, - hidden: false, - context_start_message_id: null, - run_owner_pod: "dead-pod", - run_started_at: null, - }); - - it("claims and resumes all orphans from dead pod", async () => { - const deps = makeNoopDeps(); - ( - deps.storage.listOrphanedRunsByPod as ReturnType - ).mockImplementation(() => - Promise.resolve([makeThread("t1"), makeThread("t2"), makeThread("t3")]), - ); - ( - deps.storage.claimOrphanedRun as ReturnType - ).mockImplementation(() => Promise.resolve(true)); - - const registry = createRegistry(deps); - const resumeFn = mock(() => Promise.resolve()); - await registry.handlePodDeath("dead-pod", resumeFn); - - expect(deps.storage.listOrphanedRunsByPod).toHaveBeenCalledWith( - "dead-pod", - ); - expect(resumeFn).toHaveBeenCalledTimes(3); - }); - - it("skips orphans when CAS claim fails", async () => { - const deps = makeNoopDeps(); - ( - deps.storage.listOrphanedRunsByPod as ReturnType - ).mockImplementation(() => Promise.resolve([makeThread("t1")])); - ( - deps.storage.claimOrphanedRun as ReturnType - ).mockImplementation(() => Promise.resolve(false)); - - const registry = createRegistry(deps); - const resumeFn = mock(() => Promise.resolve()); - await registry.handlePodDeath("dead-pod", resumeFn); - - expect(resumeFn).not.toHaveBeenCalled(); - }); - - it("force-fails on resumeFn error", async () => { - const deps = makeNoopDeps(); - ( - deps.storage.listOrphanedRunsByPod as ReturnType - ).mockImplementation(() => Promise.resolve([makeThread("t1")])); - ( - deps.storage.claimOrphanedRun as ReturnType - ).mockImplementation(() => Promise.resolve(true)); - - const registry = createRegistry(deps); - const resumeFn = mock(() => Promise.reject(new Error("boom"))); - await registry.handlePodDeath("dead-pod", resumeFn); - - expect(deps.storage.forceFailIfInProgress).toHaveBeenCalledWith( - "t1", - "org1", - ); - }); - - it("broadcasts cancel for orphans", async () => { - const deps = makeNoopDeps(); - ( - deps.storage.listOrphanedRunsByPod as ReturnType - ).mockImplementation(() => Promise.resolve([makeThread("t1")])); - ( - deps.storage.claimOrphanedRun as ReturnType - ).mockImplementation(() => Promise.resolve(true)); - - const registry = createRegistry(deps); - const resumeFn = mock(() => Promise.resolve()); - const cancelBroadcast = { broadcast: mock(() => {}) }; - await registry.handlePodDeath("dead-pod", resumeFn, cancelBroadcast); - - expect(cancelBroadcast.broadcast).toHaveBeenCalledWith("t1"); - }); - - it("no-ops when dead pod has no orphans", async () => { - const deps = makeNoopDeps(); - ( - deps.storage.listOrphanedRunsByPod as ReturnType - ).mockImplementation(() => Promise.resolve([])); - - const registry = createRegistry(deps); - const resumeFn = mock(() => Promise.resolve()); - await registry.handlePodDeath("dead-pod", resumeFn); - - expect(resumeFn).not.toHaveBeenCalled(); - }); - }); }); diff --git a/apps/mesh/src/api/routes/decopilot/run-registry.ts b/apps/mesh/src/api/routes/decopilot/run-registry.ts index 0f69e9cc1c..cf1efff999 100644 --- a/apps/mesh/src/api/routes/decopilot/run-registry.ts +++ b/apps/mesh/src/api/routes/decopilot/run-registry.ts @@ -18,7 +18,6 @@ import { decide } from "./run-decider"; import { project } from "./run-projector"; import type { RunReactorDeps } from "./run-reactor"; import { reactAll } from "./run-reactor"; -import type { Thread } from "@/storage/types"; import { meter } from "@/observability"; export type { RunReactorDeps }; @@ -47,7 +46,6 @@ export class RunRegistry { constructor( private readonly deps: RunReactorDeps, - private readonly podId: string, private readonly clock: () => Date = () => new Date(), ) { this.reaperTimer = setInterval( @@ -140,110 +138,21 @@ export class RunRegistry { } /** - * Graceful shutdown: orphan all runs in the DB first (so they are resumable - * if the process dies), then abort in-memory controllers and clear state. - * The DB write MUST happen before states.clear() — if the process dies - * between clear() and the DB write, threads would be permanently stuck. + * Graceful shutdown: abort in-memory controllers and clear state. + * Recovery of the aborted runs is handled by DBOS — the workflow + * row stays PENDING in `dbos.workflow_status`, and when this pod + * (same executor_id = POD_NAME) relaunches, DBOS re-runs the + * dispatch step from scratch. */ async stopAll(): Promise { - // 1. DB: orphan all runs owned by this pod FIRST - // (if process dies after this, runs are resumable) - try { - await this.deps.storage.orphanRunsByPod(this.podId); - } catch (err) { - console.error("[RunRegistry] Failed to orphan runs in DB:", err); - } - // 2. In-memory: abort all controllers (stops streamText loops) for (const [, state] of this.states) { if (state.status.tag === "running") { state.status.abortController.abort(); } } - // 3. In-memory: clear map this.states.clear(); } - /** - * Recover all orphaned runs on startup. Server crashes shouldn't - * punish users — every in-progress run gets resumed automatically. - * Concurrency is capped at 5 concurrent resumes. - */ - async recoverOrphanedRuns( - resumeFn: (thread: Thread) => Promise, - ): Promise { - const orphans = await this.deps.storage.listOrphanedRuns(this.podId); - if (orphans.length === 0) return; - - // Concurrency cap: max 5 concurrent resumes - const CONCURRENCY = 5; - for (let i = 0; i < orphans.length; i += CONCURRENCY) { - const batch = orphans.slice(i, i + CONCURRENCY); - await Promise.allSettled( - batch.map(async (thread) => { - const claimed = await this.deps.storage.claimOrphanedRun( - thread.id, - thread.organization_id, - this.podId, - ); - if (!claimed) return; // Another pod got it - - try { - await resumeFn(thread); - } catch (err) { - console.error(`[RunRegistry] Failed to resume ${thread.id}:`, err); - await this.deps.storage - .forceFailIfInProgress(thread.id, thread.organization_id) - .catch(() => {}); - } - }), - ); - } - } - - /** - * Handle a dead pod notification from the heartbeat watcher. Finds all - * in-progress threads owned by the dead pod, broadcasts cancel (in case - * it's partitioned, not dead), then CAS-claims and resumes each orphan. - */ - async handlePodDeath( - deadPodId: string, - resumeFn: (thread: Thread) => Promise, - cancelBroadcast?: { broadcast(taskId: string): void }, - ): Promise { - const orphans = await this.deps.storage.listOrphanedRunsByPod(deadPodId); - if (orphans.length === 0) return; - - // Cancel running threads on the dead pod (in case it's alive but partitioned) - for (const thread of orphans) { - cancelBroadcast?.broadcast(thread.id); - } - - const CONCURRENCY = 5; - for (let i = 0; i < orphans.length; i += CONCURRENCY) { - const batch = orphans.slice(i, i + CONCURRENCY); - await Promise.allSettled( - batch.map(async (thread) => { - if (this.isRunning(thread.id)) return; - const claimed = await this.deps.storage.claimOrphanedRun( - thread.id, - thread.organization_id, - this.podId, - ); - if (!claimed) return; - - try { - await resumeFn(thread); - } catch (err) { - console.error(`[RunRegistry] Failed to resume ${thread.id}:`, err); - await this.deps.storage - .forceFailIfInProgress(thread.id, thread.organization_id) - .catch(() => {}); - } - }), - ); - } - } - /** Stop the reaper timer. Call once during server shutdown. */ dispose(): void { if (this.reaperTimer) { diff --git a/apps/mesh/src/api/routes/decopilot/stream-buffer.ts b/apps/mesh/src/api/routes/decopilot/stream-buffer.ts index e8792d86cd..1f07381170 100644 --- a/apps/mesh/src/api/routes/decopilot/stream-buffer.ts +++ b/apps/mesh/src/api/routes/decopilot/stream-buffer.ts @@ -64,7 +64,15 @@ export interface StreamBuffer { ): Promise; /** Purge buffered data for a thread (best-effort, fire-and-forget). */ - purge(taskId: string): void; + /** + * Remove all buffered chunks for `taskId`. Returns a Promise so + * callers in the dispatch hot path can `await` purge completion + * before publishing fresh chunks — without that ordering, a /stream + * tail that subscribes during the gap can replay the stale prefix. + * Caller errors are swallowed: purge is best-effort cleanup, not + * load-bearing for any single message's delivery. + */ + purge(taskId: string): Promise; /** Release resources (clear references, called on shutdown). */ teardown(): void; diff --git a/apps/mesh/src/database/index.ts b/apps/mesh/src/database/index.ts index 3d877f6c54..e152c019e3 100644 --- a/apps/mesh/src/database/index.ts +++ b/apps/mesh/src/database/index.ts @@ -68,7 +68,7 @@ const defaultPoolOptions = { allowExitOnIdle: true, }; -function getSsl(): boolean { +export function getSsl(): boolean { try { return getSettings().databasePgSsl; } catch { diff --git a/apps/mesh/src/index.ts b/apps/mesh/src/index.ts index 708976c956..befe4abb1d 100644 --- a/apps/mesh/src/index.ts +++ b/apps/mesh/src/index.ts @@ -46,6 +46,15 @@ DBOS.setConfig({ // N workers all call DBOS.launch(); the admin server would otherwise fight // over port 3001. Re-enable per-process once we need workflow admin HTTP. runAdminServer: false, + // Pin this pod's workflows to its stable identity. DBOS only recovers + // PENDING workflows whose executor_id matches the booting process, + // so for distributed self-hosting we need each replica to come back + // with the same id it had before — which K8s StatefulSet guarantees + // via metadata.name (wired through POD_NAME → settings.podName). + // Outside K8s, settings.podName falls back to a random UUID, which + // disables cross-process recovery (each restart looks like a new + // executor). That's fine for single-process / dev. + executorID: settings.podName, }); const { createApp } = await import("./api/app"); diff --git a/apps/mesh/src/nats/pod-heartbeat.ts b/apps/mesh/src/nats/pod-heartbeat.ts deleted file mode 100644 index 32ba21f8cb..0000000000 --- a/apps/mesh/src/nats/pod-heartbeat.ts +++ /dev/null @@ -1,179 +0,0 @@ -/** - * Per-Pod Heartbeat via NATS KV - * - * A single KV key per pod, refreshed on a timer, with bucket-level TTL. - * When a pod dies (hard kill) or shuts down (graceful), its key expires/deletes - * and watchers on other pods are notified immediately. - * - * O(1) writes per pod regardless of thread count. - */ - -import type { JetStreamClient, NatsConnection, KV } from "nats"; -import { StorageType } from "nats"; - -const BUCKET_NAME = "POD_HEARTBEATS"; -const BUCKET_TTL_MS = 45_000; // Key expires 45s after last refresh -const REFRESH_INTERVAL_MS = 10_000; // Refresh every 10s - -export interface PodHeartbeat { - init(): Promise; - start(podId: string): void; - /** Watch for pod deaths. Callback receives the dead podId. */ - onPodDeath(callback: (deadPodId: string) => void): void; - stop(): Promise; -} - -export interface NatsPodHeartbeatDeps { - getConnection: () => NatsConnection | null; - getJetStream: () => JetStreamClient | null; -} - -export class NatsPodHeartbeat implements PodHeartbeat { - private kv: KV | null = null; - private podId: string | null = null; - private refreshTimer: ReturnType | null = null; - private watchAbortController: AbortController | null = null; - private initPromise: Promise | null = null; - private pendingDeathCallback: ((deadPodId: string) => void) | null = null; - - constructor(private readonly deps: NatsPodHeartbeatDeps) {} - - async init(): Promise { - // Stop old watcher and clear stale state so we re-create from scratch - if (this.watchAbortController) { - this.watchAbortController.abort(); - this.watchAbortController = null; - } - if (this.refreshTimer) { - clearInterval(this.refreshTimer); - this.refreshTimer = null; - } - this.kv = null; - this.initPromise = null; - - const js = this.deps.getJetStream(); - if (!js) return; // NATS not ready — heartbeat disabled until re-init - this.initPromise = js.views - .kv(BUCKET_NAME, { - ttl: BUCKET_TTL_MS, - storage: StorageType.Memory, - }) - .then((kv) => { - this.kv = kv; - }) - .catch((err) => { - this.initPromise = null; - throw err; - }); - return this.initPromise; - } - - start(podId: string): void { - if (!this.kv) return; // Not initialized — skip heartbeat - if (this.refreshTimer) return; // Already running — prevent double start - this.podId = podId; - - // Immediate first heartbeat - this.kv.put(podId, new TextEncoder().encode(new Date().toISOString())); - - // Refresh on interval - this.refreshTimer = setInterval(() => { - this.kv - ?.put(podId, new TextEncoder().encode(new Date().toISOString())) - .catch((err) => { - console.error("[PodHeartbeat] Refresh failed:", err); - }); - }, REFRESH_INTERVAL_MS); - - // Activate deferred death watcher if registered before init - if (this.pendingDeathCallback) { - this.startDeathWatcher(this.pendingDeathCallback); - this.pendingDeathCallback = null; - } - } - - onPodDeath(callback: (deadPodId: string) => void): void { - if (!this.kv) { - // Store callback — will activate when start() runs after init() - this.pendingDeathCallback = callback; - return; - } - this.startDeathWatcher(callback); - } - - private startDeathWatcher(callback: (deadPodId: string) => void): void { - if (!this.kv) return; - - this.watchAbortController = new AbortController(); - const kv = this.kv; - const ownPodId = this.podId; - const signal = this.watchAbortController.signal; - - const startWatcher = async () => { - while (!signal.aborted) { - try { - const watcher = await kv.watch({ - // Watch all keys - initializedFn: () => { - // Initial values loaded, now watching for changes - }, - }); - - for await (const entry of watcher) { - if (signal.aborted) break; - - // DEL = explicit delete, PURGE = TTL expiry - if (entry.operation === "DEL" || entry.operation === "PURGE") { - const deadPodId = entry.key; - // Don't notify about own pod death - if (deadPodId !== ownPodId) { - callback(deadPodId); - } - } - } - } catch (err) { - if (signal.aborted) break; - console.error( - "[PodHeartbeat] Watcher error, reconnecting in 1s:", - err, - ); - await new Promise((r) => setTimeout(r, 1000)); - } - } - }; - - startWatcher().catch((err) => { - if (!signal.aborted) { - console.error("[PodHeartbeat] Watcher loop failed:", err); - } - }); - } - - async stop(): Promise { - // 1. Stop refresh timer - if (this.refreshTimer) { - clearInterval(this.refreshTimer); - this.refreshTimer = null; - } - - // 2. Delete own key (triggers watcher on other pods immediately) - if (this.kv && this.podId) { - try { - await this.kv.delete(this.podId); - } catch { - // Best effort — pod is shutting down anyway - } - } - - // 3. Stop watcher - if (this.watchAbortController) { - this.watchAbortController.abort(); - this.watchAbortController = null; - } - - this.kv = null; - this.podId = null; - this.initPromise = null; - this.pendingDeathCallback = null; - } -} diff --git a/apps/mesh/src/settings/resolve-config.test.ts b/apps/mesh/src/settings/resolve-config.test.ts new file mode 100644 index 0000000000..e7312ad23d --- /dev/null +++ b/apps/mesh/src/settings/resolve-config.test.ts @@ -0,0 +1,51 @@ +import { describe, it, expect } from "bun:test"; +import { resolveConfig } from "./resolve-config"; +import type { CliFlags } from "./types"; + +const baseFlags: CliFlags = { + port: "3000", + home: "/tmp/test-home", + localMode: false, + skipMigrations: false, +}; + +describe("resolveConfig — pod identity", () => { + it("uses POD_NAME verbatim when set", () => { + const { settings } = resolveConfig(baseFlags, { + NODE_ENV: "production", + POD_NAME: "mesh-7", + }); + expect(settings.podName).toBe("mesh-7"); + }); + + it("falls back to a random UUID in development", () => { + const { settings } = resolveConfig(baseFlags, { NODE_ENV: "development" }); + expect(settings.podName).toMatch( + /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/, + ); + }); + + it("falls back to a random UUID when NODE_ENV is unset", () => { + const { settings } = resolveConfig(baseFlags, {}); + expect(settings.podName).toMatch( + /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/, + ); + }); + + it("refuses to boot in production when POD_NAME is missing", () => { + // The error message is part of the operator-facing contract: any + // change here should be intentional (it's how the misconfigured + // pod tells its operator what to fix). Keeping the assertion on a + // distinctive substring rather than the full text so wording can + // evolve without breaking the test. + expect(() => resolveConfig(baseFlags, { NODE_ENV: "production" })).toThrow( + /POD_NAME must be set in production/, + ); + }); + + it("refuses to boot in production when POD_NAME is set to empty string", () => { + expect(() => + resolveConfig(baseFlags, { NODE_ENV: "production", POD_NAME: "" }), + ).toThrow(/POD_NAME must be set in production/); + }); +}); diff --git a/apps/mesh/src/settings/resolve-config.ts b/apps/mesh/src/settings/resolve-config.ts index 0d95d0b622..646cfd902f 100644 --- a/apps/mesh/src/settings/resolve-config.ts +++ b/apps/mesh/src/settings/resolve-config.ts @@ -101,7 +101,40 @@ export function resolveConfig( // Runtime flags isCli: true, noTui: flags.noTui === true, - podName: envVars.POD_NAME ?? crypto.randomUUID(), + // Pod identity is the DBOS executorID, which is how durable + // workflow recovery scopes "whose work to resume on launch." A + // value that changes per restart (random UUID) silently strands + // any in-flight workflow at the moment the pod dies — DBOS only + // recovers rows whose executor_id matches the booting process. + // + // In production, refuse to boot without an explicit POD_NAME so + // misconfigured deployments crash visibly rather than losing + // workflows. Operators should wire it from the K8s downward API + // (metadata.name for StatefulSet) or set a stable constant for + // single-pod self-hosted (e.g. POD_NAME=studio). + // + // Outside production, fall back to a random UUID — single-process + // dev/test still gets DBOS recovery across same-process restarts + // via the application_version path, and the random id stops + // accidental cross-environment recovery if multiple dev instances + // share a database. + podName: (() => { + if (envVars.POD_NAME && envVars.POD_NAME.length > 0) { + return envVars.POD_NAME; + } + if (nodeEnv === "production") { + throw new Error( + "POD_NAME must be set in production. DBOS recovery is " + + "scoped to executor_id (the pod name), and a missing or " + + "auto-generated value strands in-flight workflows on every " + + "restart. Wire POD_NAME from the K8s downward API " + + "(metadata.name on a StatefulSet) for multi-pod deploys, " + + "or set a stable constant like POD_NAME=studio for a " + + "single-pod self-hosted instance.", + ); + } + return crypto.randomUUID(); + })(), // External service credentials decoSupabaseUrl: envVars.DECO_SUPABASE_URL, diff --git a/apps/mesh/src/storage/ports.ts b/apps/mesh/src/storage/ports.ts index ee140f2fb4..57647110be 100644 --- a/apps/mesh/src/storage/ports.ts +++ b/apps/mesh/src/storage/ports.ts @@ -65,19 +65,6 @@ export interface ThreadStoragePort { triggerIds: string[], options?: { limit?: number; offset?: number }, ): Promise<{ threads: Thread[]; total: number }>; - /** Atomically claim an orphaned run. Returns true if this pod won the CAS. */ - claimOrphanedRun( - taskId: string, - organizationId: string, - podId: string, - ): Promise; - - /** List all in_progress threads not owned by the given pod (null or stale owner). */ - listOrphanedRuns(currentPodId: string): Promise; - - /** List all in_progress threads owned by a specific (dead) pod. */ - listOrphanedRunsByPod(deadPodId: string): Promise; - /** * Atomically claim a run start via CAS. Returns true if this pod won. * Allows: new runs (not in_progress), orphans (null pod), or same-pod restarts. @@ -89,9 +76,6 @@ export interface ThreadStoragePort { podId: string | null, ): Promise; - /** Release ownership for all runs owned by this pod (graceful shutdown). */ - orphanRunsByPod(podId: string): Promise; - /** Append an entry to threads.inflight_async_jobs. Atomic via jsonb concat. */ addInflightAsyncJob( taskId: string, diff --git a/apps/mesh/src/storage/threads.test.ts b/apps/mesh/src/storage/threads.test.ts index 2d8c96978d..b85462ef11 100644 --- a/apps/mesh/src/storage/threads.test.ts +++ b/apps/mesh/src/storage/threads.test.ts @@ -175,173 +175,6 @@ describe("SqlThreadStorage", () => { // ========================================================================== describe("durable run operations", () => { - describe("claimOrphanedRun", () => { - it("claims when run_owner_pod is NULL", async () => { - const thread = await storage.create({ - organization_id: "org_1", - created_by: "user_1", - status: "in_progress", - }); - await storage.claimOrphanedRun(thread.id, "org_1", "pod-1"); - const loaded = await storage.get(thread.id, "org_1"); - expect(loaded?.run_owner_pod).toBe("pod-1"); - }); - - it("claims from a different (stale) pod", async () => { - const thread = await storage.create({ - organization_id: "org_1", - created_by: "user_1", - status: "in_progress", - }); - // Simulate a crashed pod owning this thread - await storage.update(thread.id, "org_1", { - run_owner_pod: "dead-pod", - }); - // New pod should be able to claim it (verify via side effect — - // PGlite doesn't report numUpdatedRows correctly) - await storage.claimOrphanedRun(thread.id, "org_1", "new-pod"); - const loaded = await storage.get(thread.id, "org_1"); - expect(loaded?.run_owner_pod).toBe("new-pod"); - }); - - it("does not claim when status is not in_progress", async () => { - const thread = await storage.create({ - organization_id: "org_1", - created_by: "user_1", - status: "completed", - }); - await storage.claimOrphanedRun(thread.id, "org_1", "pod-1"); - const loaded = await storage.get(thread.id, "org_1"); - // Should not have been claimed - expect(loaded?.run_owner_pod).toBeNull(); - }); - - it("claims when run is recorded against the SAME pod (registry-lost recovery)", async () => { - // Repro of the bug that pinned threads in_progress on single-pod - // self-hosted deploys: the run record still names the current pod, - // but the in-memory registry no longer has it (e.g. K8s rolling - // restart, transient reactor write failure). The orphan-resume path - // must still be able to re-claim. Caller is responsible for - // verifying the registry doesn't hold the run before invoking. - const thread = await storage.create({ - organization_id: "org_1", - created_by: "user_1", - status: "in_progress", - }); - await storage.update(thread.id, "org_1", { - run_owner_pod: "pod-1", - }); - - await storage.claimOrphanedRun(thread.id, "org_1", "pod-1"); - - const loaded = await storage.get(thread.id, "org_1"); - expect(loaded?.run_owner_pod).toBe("pod-1"); - expect(loaded?.status).toBe("in_progress"); - }); - }); - - describe("listOrphanedRuns", () => { - it("returns threads with NULL owner", async () => { - const thread = await storage.create({ - organization_id: "org_1", - created_by: "user_1", - status: "in_progress", - }); - // Set run_config so it qualifies as an orphan - await storage.update(thread.id, "org_1", { - run_config: { agent: { id: "a" } }, - }); - const orphans = await storage.listOrphanedRuns("current-pod"); - const found = orphans.find((t) => t.id === thread.id); - expect(found).toBeDefined(); - }); - - it("does NOT return threads without run_config", async () => { - const thread = await storage.create({ - organization_id: "org_1", - created_by: "user_1", - status: "in_progress", - }); - // No run_config set - const orphans = await storage.listOrphanedRuns("current-pod"); - const found = orphans.find((t) => t.id === thread.id); - expect(found).toBeUndefined(); - }); - - it("does NOT return threads with non-in_progress status", async () => { - const thread = await storage.create({ - organization_id: "org_1", - created_by: "user_1", - status: "completed", - }); - await storage.update(thread.id, "org_1", { - run_config: { agent: { id: "a" } }, - }); - const orphans = await storage.listOrphanedRuns("current-pod"); - const found = orphans.find((t) => t.id === thread.id); - expect(found).toBeUndefined(); - }); - - it("returns threads owned by the same pod (StatefulSet restart)", async () => { - // K8s StatefulSet pod names are stable across restarts; the new - // process must still discover orphans left by the previous - // incarnation that ran under the same name. Recovery happens at - // boot when the registry is empty, so listing same-pod entries is - // safe. - const thread = await storage.create({ - organization_id: "org_1", - created_by: "user_1", - status: "in_progress", - }); - await storage.update(thread.id, "org_1", { - run_config: { agent: { id: "a" } }, - run_owner_pod: "current-pod", - }); - const orphans = await storage.listOrphanedRuns("current-pod"); - const found = orphans.find((t) => t.id === thread.id); - expect(found).toBeDefined(); - }); - }); - - describe("orphanRunsByPod", () => { - it("clears ownership for all runs owned by pod", async () => { - const thread = await storage.create({ - organization_id: "org_1", - created_by: "user_1", - status: "in_progress", - }); - await storage.update(thread.id, "org_1", { - run_owner_pod: "pod-orphan-test", - run_config: { agent: { id: "a" } }, - }); - - await storage.orphanRunsByPod("pod-orphan-test"); - - const loaded = await storage.get(thread.id, "org_1"); - expect(loaded?.run_owner_pod).toBeNull(); - // status should remain in_progress - expect(loaded?.status).toBe("in_progress"); - // run_config should be preserved - expect(loaded?.run_config).not.toBeNull(); - }); - - it("does not affect runs owned by other pods", async () => { - const thread = await storage.create({ - organization_id: "org_1", - created_by: "user_1", - status: "in_progress", - }); - await storage.update(thread.id, "org_1", { - run_owner_pod: "pod-other", - }); - - await storage.orphanRunsByPod("pod-orphan-different"); - - const loaded = await storage.get(thread.id, "org_1"); - expect(loaded?.run_owner_pod).toBe("pod-other"); - }); - }); - describe("update() with new columns", () => { it("persists run_owner_pod", async () => { const thread = await storage.create({ diff --git a/apps/mesh/src/storage/threads.ts b/apps/mesh/src/storage/threads.ts index e11f00b886..9b6686fd62 100644 --- a/apps/mesh/src/storage/threads.ts +++ b/apps/mesh/src/storage/threads.ts @@ -629,78 +629,11 @@ export class SqlThreadStorage implements ThreadStoragePort { // Cross-Org System Operations (not exposed via OrgScopedThreadStorage) // ========================================================================== - async claimOrphanedRun( - taskId: string, - organizationId: string, - podId: string, - ): Promise { - // Claim any in-progress run, regardless of which pod is recorded as the - // current owner. Callers MUST verify that this pod's RunRegistry has no - // live entry for `taskId` before invoking this — that's the orphan - // precondition. Same-pod claims are intentionally allowed because: - // - // 1. K8s rolling restarts re-use the StatefulSet pod name, so a - // previous incarnation may have left `run_owner_pod = currentPodId` - // while the new process has an empty registry. - // 2. If the run was projected out of memory (e.g. a transient DB - // failure in the reactor between in-memory projection and the - // `run_owner_pod = NULL` write on terminal status), the same pod is - // the only authority that can recover it without restarting. - // - // Excluding same-pod claims here used to lock those threads in - // `in_progress` indefinitely on single-pod self-hosted deploys. - const result = await this.db - .updateTable("threads") - .set({ run_owner_pod: podId, updated_at: new Date().toISOString() }) - .where("id", "=", taskId) - .where("organization_id", "=", organizationId) - .where("status", "=", "in_progress") - .executeTakeFirst(); - return (result?.numUpdatedRows ?? 0n) > 0n; - } - - async listOrphanedRuns(_currentPodId: string): Promise { - // Lists every in_progress thread with a persisted run_config, regardless - // of which pod is recorded as the current owner. Intended for the - // startup recovery sweep, which runs against a registry that is empty - // by construction — anything in the DB is recoverable from this - // process's perspective. - // - // Filtering out same-pod owners (the previous behavior) was a bug for - // K8s StatefulSet rolling restarts, where the new pod re-uses the - // previous incarnation's POD_NAME. Those runs were silently skipped - // until something else (a user revisit hitting /stream) recovered them. - // - // The `_currentPodId` parameter is retained for interface compatibility. - const rows = await this.db - .selectFrom("threads") - .selectAll() - .where("status", "=", "in_progress") - .where("run_config", "is not", null) - .orderBy("run_started_at", "asc") - .limit(100) - .execute(); - return rows.map((row) => this.threadFromDbRow(row)); - } - - async listOrphanedRunsByPod(deadPodId: string): Promise { - const rows = await this.db - .selectFrom("threads") - .selectAll() - .where("status", "=", "in_progress") - .where("run_config", "is not", null) - .where("run_owner_pod", "=", deadPodId) - .orderBy("run_started_at", "asc") - .limit(100) - .execute(); - return rows.map((row) => this.threadFromDbRow(row)); - } - async claimRunStart( taskId: string, organizationId: string, data: Partial, - podId: string | null, + _podId: string | null, ): Promise { const now = new Date().toISOString(); @@ -716,38 +649,27 @@ export class SqlThreadStorage implements ThreadStoragePort { if (data.run_started_at !== undefined) updateData.run_started_at = data.run_started_at; - // CAS: only claim if not already running on a different pod + // Always update — the thread-gate DBOS queue (concurrency=1 per + // threadId) already serialises dispatches per thread, so two pods + // can never legitimately race here. The prior pod-bound CAS + // ("only claim if run_owner_pod is null / same / not yet + // in_progress") rejected the legitimate replay case where DBOS + // recovers a workflow on a different executor than the original + // — that stalled recovery on restart. The status guard against + // restarting a terminal run is unnecessary too: if status had + // gone to completed/failed, this codepath wouldn't be re-entered + // (the workflow row in DBOS is the source of truth for whether + // a run should be in flight). const result = await this.db .updateTable("threads") .set(updateData) .where("id", "=", taskId) .where("organization_id", "=", organizationId) - .where(({ eb, or }) => - or([ - // Not currently in_progress → fresh start - eb("status", "!=", "in_progress"), - // Orphan → null pod - eb("run_owner_pod", "is", null), - // Same pod restart - ...(podId ? [eb("run_owner_pod", "=", podId)] : []), - ]), - ) .executeTakeFirst(); return (result?.numUpdatedRows ?? 0n) > 0n; } - async orphanRunsByPod(podId: string): Promise { - const rows = await this.db - .updateTable("threads") - .set({ run_owner_pod: null, updated_at: new Date().toISOString() }) - .where("run_owner_pod", "=", podId) - .where("status", "=", "in_progress") - .returning("id") - .execute(); - return rows.map((r) => r.id); - } - async addInflightAsyncJob( taskId: string, organizationId: string, diff --git a/tests/multi-pod/docker-compose.yml b/tests/multi-pod/docker-compose.yml index 6643001497..a4ea3a2557 100644 --- a/tests/multi-pod/docker-compose.yml +++ b/tests/multi-pod/docker-compose.yml @@ -48,6 +48,11 @@ services: # migration step (we override the entrypoint with a direct start command) to # avoid races on parallel boot. migrate: + # Same image as the mesh-N services so a single `docker compose + # build mesh-1` updates the migration set everywhere. Without + # this, migrate kept its own separately-tagged image and would + # silently use stale migration code while mesh-N had the new one. + image: multi-pod-studio:latest build: context: ../../ dockerfile: tests/resilience/Dockerfile.studio @@ -59,7 +64,14 @@ services: postgres: condition: service_healthy + # mesh-1 is the build target. mesh-2 and mesh-3 reference the same + # `image:` tag so all three pods share one studio image and one build + # — Compose 1.x would otherwise build the same context three times, + # producing identical-but-separately-tagged images and confusing + # `docker compose build mesh-1` (the others would still point at + # their stale per-service images). mesh-1: &mesh + image: multi-pod-studio:latest build: context: ../../ dockerfile: tests/resilience/Dockerfile.studio diff --git a/tests/multi-pod/lib/hooks.ts b/tests/multi-pod/lib/hooks.ts index d86185e5ee..6e273760d5 100644 --- a/tests/multi-pod/lib/hooks.ts +++ b/tests/multi-pod/lib/hooks.ts @@ -1,20 +1,69 @@ /** * Standard test hooks for multi-pod scenarios. * - * Call `registerTestHooks()` at the top of each scenario file. It (1) - * restarts any mesh pods that were left stopped by a previous scenario - * (the pod-death scenarios SIGKILL pods and don't currently restore - * them themselves) and (2) waits for every pod to report /health/live - * before any test body runs. + * Call `registerTestHooks()` at the top of each scenario file. It: + * 1. Clears DBOS workflow state from prior scenarios so a pod that's + * about to restart (next step) doesn't re-execute someone else's + * half-finished workflow on boot. + * 2. Restarts any mesh pods that were left stopped by a previous + * scenario (the pod-death scenarios SIGKILL pods and don't restore + * them themselves). + * 3. Waits for every pod to report /health/live before any test body + * runs. + * + * Order matters: 1 must precede 2 — if we clean DBOS state after the + * dead pod is back up, it has a window to start recovery against the + * stale rows before we delete them, and the next scenario races a + * ghost workflow. */ import { beforeAll } from "bun:test"; import { waitReady } from "./cluster"; +import { dbQuery } from "./db"; import { start } from "./pod"; import { ALL_PODS } from "./pods"; const COMPOSE_FILE = new URL("../docker-compose.yml", import.meta.url).pathname; +/** + * DELETE every DBOS workflow row from prior scenarios. + * + * Background: when pod-death scenarios SIGKILL a pod, DBOS's + * `workflow_status` table is left with the dead pod's in-progress + * workflows. When that pod (or any pod sharing its executor_id) boots + * later, DBOS's launch-time recovery re-executes those workflows — which + * publishes chunks to a thread's JetStream subject long after the + * originating test scenario has ended. Subsequent scenarios then see + * "phantom" runs they didn't initiate. + * + * The FK constraints DBOS sets up (`ON DELETE CASCADE` to + * operation_outputs, workflow_inputs, notifications, workflow_events) + * mean a single DELETE on workflow_status takes the rest with it. + * workflow_queue isn't FK'd back to workflow_status, so it gets its + * own DELETE. + */ +async function clearDbosState(): Promise { + await dbQuery("DELETE FROM dbos.workflow_status"); + await dbQuery("DELETE FROM dbos.workflow_queue"); +} + +/** + * Mark every leftover `in_progress` thread as `failed` and clear its + * `run_owner_pod`. After a pod-death scenario, the killed pod's threads + * stay in_progress with run_owner_pod=; when that pod + * restarts at the next scenario's `restoreStoppedPods`, its startup + * recovery sees the row as its own previous incarnation and dispatches + * a resume — long after the original test ended. That resumed run then + * races concurrent dispatches the new scenario kicks off and chunks + * appear on the wrong /stream subscribers. Resetting the table here + * keeps each scenario's recovery scope to its own in-flight work. + */ +async function clearStaleThreads(): Promise { + await dbQuery( + "UPDATE threads SET status = 'failed', run_owner_pod = NULL WHERE status = 'in_progress'", + ); +} + async function restoreStoppedPods(): Promise { const proc = Bun.spawn( [ @@ -67,6 +116,8 @@ async function restoreStoppedPods(): Promise { export function registerTestHooks(): void { beforeAll(async () => { + await clearDbosState(); + await clearStaleThreads(); await restoreStoppedPods(); await waitReady(); }, 180_000); diff --git a/tests/multi-pod/mock-ai/server.ts b/tests/multi-pod/mock-ai/server.ts index f81f9d7034..abde0f3a1a 100644 --- a/tests/multi-pod/mock-ai/server.ts +++ b/tests/multi-pod/mock-ai/server.ts @@ -108,6 +108,12 @@ async function streamCompletion(req: Request): Promise { const reqBody = (await req.json().catch(() => ({}))) as CompletionsBody; const { chunks: numChunks, delayMs } = parseHints(reqBody); const id = `chatcmpl-${crypto.randomUUID()}`; + // Stamp every chunk with the wall-clock time at which THIS request + // (this call to the mock) began. Lets the pod-death scenario tell + // dead-pod chunks (call started before kill) apart from resumed-pump + // chunks (call started after kill) without resorting to chunk-count + // timing heuristics. + const callStartedAt = Date.now(); const stream = new ReadableStream({ async start(controller) { @@ -118,7 +124,11 @@ async function streamCompletion(req: Request): Promise { for (let i = 0; i < numChunks; i++) { if (delayMs > 0) await Bun.sleep(delayMs); controller.enqueue( - frame(buildChunk(id, { content: `chunk-${i + 1} ` })), + frame( + buildChunk(id, { + content: `t${callStartedAt} chunk-${i + 1} `, + }), + ), ); } diff --git a/tests/multi-pod/scenarios/pod-death-dbos-replay.test.ts b/tests/multi-pod/scenarios/pod-death-dbos-replay.test.ts index d9ce883d32..91b8dbaa9a 100644 --- a/tests/multi-pod/scenarios/pod-death-dbos-replay.test.ts +++ b/tests/multi-pod/scenarios/pod-death-dbos-replay.test.ts @@ -1,75 +1,69 @@ /** - * Pod-death + DBOS workflow replay. + * Pod-death recovery via DBOS workflow replay. * - * ⚠️ Currently `.skip`ped — see "Architectural finding" below. + * Validates the simplification: heartbeat + cross-pod claim CAS were + * deleted in favor of trusting DBOS launch-time recovery. The contract + * being verified here is exactly DBOS's documented one — "When an + * application with an executor ID restarts, it only recovers pending + * workflows assigned to that executor ID" — under our actual streaming + * pipeline (threadGateWorkflow → dispatchRunAndWaitStep → streamText + * → JetStream pump). * - * Intended scenario: when the pod owning a run dies mid-stream, recovery - * should resume the run on a survivor pod so /stream tails continue to - * receive chunks without the client reconnecting. + * ── What the test exercises ────────────────────────────────────────── * - * Test shape: + * 1. POST a SLOW message (20 chunks × 500ms ≈ 10s). + * 2. Open /stream on all three pods. + * 3. Wait for chunk-2 — proof the run is flowing. + * 4. Read `threads.run_owner_pod` and SIGKILL that pod. + * 5. Restart the killed pod with `docker compose start ` — + * mimics K8s StatefulSet bringing the pod back with the same + * identity (POD_NAME → DBOS executor_id). + * 6. DBOS launches on the restarted pod, sees its own PENDING + * threadGateWorkflow in `dbos.workflow_status`, and re-runs the + * `dispatchRunAndWaitStep` from scratch — streamText fires again + * against the mock-ai, chunks land on the same per-thread + * JetStream subject, and SSE tails on any pod see them. + * 7. Wait for a chunk whose mock-ai call-start timestamp is *after* + * the kill — proves the chunks came from the replayed step, not + * from the dead pod's leftover JetStream prefix. + * 8. Wait for chunk-20 to confirm the replay completes end-to-end. + * 9. Open a late /stream AFTER the resumed pump is flowing and + * assert chunks 1..20 appear EXACTLY ONCE (`chunk-1 ` count == 1). + * If `prepareRun`'s JetStream purge ever regresses, this watcher + * would see the dead pod's prefix AND the resumed run, doubling + * the count. * - * 1. Send a SLOW message (20 chunks × 500ms ≈ 10s window for kill). - * 2. Open /stream on all three pods so at least two survive the kill - * no matter which pod owns the run. - * 3. Wait for "chunk-2" on any watcher (proof the run started). - * 4. Read the actual owner from `threads.run_owner_pod` and SIGKILL - * that pod (POD_NAME on each compose service matches the service - * name, so the value maps directly to a `pod.kill()` target). - * 5. Assert one surviving /stream receives "chunk-20". + * ── What enables this ──────────────────────────────────────────────── * - * ── Architectural finding (2026-05-17, revised) ────────────────────── + * - **DBOS executorID = POD_NAME** (`apps/mesh/src/index.ts`): + * pins this pod's workflows to a stable id that K8s StatefulSet + * preserves across restarts. Without this, every restart would be + * a "new executor" and orphans would sit forever. * - * Running this test against the framework cluster surfaces three - * separate, compounding gaps in the current cross-pod recovery story: + * - **Single `dispatchRunAndWaitStep` per thread-gate workflow** + * (`apps/mesh/src/dispatch-queue/thread-gate-workflow.ts`): the + * entire streamText loop is one DBOS step, so replay re-runs the + * whole agent turn (correct as long as steps are idempotent, which + * they are for our LLM calls). * - * 1. **No cross-pod DBOS recovery.** All pods boot DBOS with - * `executor_id = "local"` (default; env `DBOS__VMID` is unset). - * DBOS's `recoverPendingWorkflows` is filtered by executor_id, and - * it only fires on `DBOS.launch()` — i.e., when a pod restarts. - * The OSS SDK has no built-in scan for workflows whose executor is - * dead. Surviving peers never replay a dead pod's workflows. + * - **Unconditional awaited `streamBuffer.purge` in prepareRun** + * (`apps/mesh/src/api/routes/decopilot/dispatch-run.ts`): replay + * would otherwise pump on top of the dead pod's leftover prefix in + * JetStream, and any /stream opened post-recovery would see the + * response duplicated. The chunk-1 count assertion below is the + * regression guard. * - * 2. **`prepareRun` purges JetStream unconditionally** at - * dispatch-run.ts:519, including on resume. Even if the heartbeat - * watcher (NatsPodHeartbeat + runRegistry.handlePodDeath) does - * kick in and trigger `dispatchRunAndWait(isResume: true)` on a - * survivor pod, the first thing that runs nukes any chunks the - * survivor watchers were mid-consumption of. - * - * 3. **NatsPodHeartbeat bucket creation is fragile**. On cold boot - * `KV_POD_HEARTBEATS` sometimes isn't in NATS' jsz output even - * though every other JS resource is — bucket init silently - * swallows errors at app.ts:867. Re-runs eventually create it, - * but it's flaky enough that you can't rely on the heartbeat - * firing in the first 45s window. - * - * Net: permanent pod death is currently only recovered when the dead - * pod *itself* restarts (DBOS recovery on the same POD_NAME succeeds - * because `claimRunStart`'s strict CAS matches). Cross-pod recovery - * needs: - * - * - per-pod `DBOS__VMID` so DBOS knows which executor owns what - * - a death-detection mechanism (the existing NATS heartbeat, or - * Postgres advisory locks — sub-second detection, no NATS bucket - * reliability issue) that triggers recovery on a survivor - * - skip `streamBuffer.purge()` when `isResume` is true - * - * The recovery itself can stay in `runRegistry.handlePodDeath` - * (storage-level claim + `dispatchRunAndWait(isResume: true)`) — going - * through DBOS's internal recoverPendingWorkflows API is possible but - * not exposed publicly, so it'd require importing internals. - * - * Un-skip this test once the three gaps above are addressed. The test - * body is left intact so the fix can be verified by removing the - * `.skip` and re-running. + * - **Stable mock-ai per-call timestamp** (`tests/multi-pod/mock-ai/ + * server.ts`): every chunk emits `t` for the wall-clock moment + * the mock-ai call began. A chunk with `t > killTime` provably + * came from a call started after the kill — i.e. the replay. */ import { describe, expect, test } from "bun:test"; import { postJson, sse } from "../lib/client"; import { getThreadRunOwnerPod } from "../lib/db"; import { registerTestHooks } from "../lib/hooks"; -import { kill } from "../lib/pod"; +import { kill, start } from "../lib/pod"; import type { PodInfo, PodName } from "../lib/pods"; import { ALL_PODS, PODS } from "../lib/pods"; import { pollUntil } from "../lib/poll-until"; @@ -119,10 +113,7 @@ function openAttachWatcher( } catch (err) { // Aborts and connection drops (when the owning pod dies) are // expected in this scenario — swallow them. Anything else - // re-throws so the test sees it. Bun's fetch surfaces the - // condition as either an `AbortError`, a system error with a - // `code` property (ECONNRESET/ECONNREFUSED), or a generic "socket - // connection was closed" message; cover all three. + // re-throws so the test sees it. const code = (err as { code?: string } | null)?.code ?? ""; const msg = err instanceof Error ? err.message : String(err); const expectedCodes = ["ECONNRESET", "ECONNREFUSED", "ABORT_ERR"]; @@ -144,7 +135,7 @@ function openAttachWatcher( } describe("pod-death + DBOS replay", () => { - test.skip("killing the owning pod mid-stream still delivers the final chunk via a survivor", async () => { + test("killing the owning pod mid-stream still delivers the final chunk via DBOS replay on restart", async () => { const session = await bootstrapSession(PODS.MESH_1); await wireMockProvider(PODS.MESH_1, session); const { virtualMcpId } = await createTestAgent(PODS.MESH_1, session); @@ -173,15 +164,10 @@ describe("pod-death + DBOS replay", () => { ); expect(postRes.status).toBe(202); - // Open watchers on all three pods. Whichever pod we end up killing, - // the other two stay alive and at least one of them sees the run - // through to completion. const watchers = ALL_PODS.map((pod) => openAttachWatcher(pod, session.orgSlug, threadId, session.apiKey), ); - // Wait until at least one watcher sees a chunk — proves the run - // is actually flowing before we go knock a pod over. await pollUntil( async () => watchers.some((w) => w.joined.includes("chunk-2")), { @@ -191,49 +177,78 @@ describe("pod-death + DBOS replay", () => { }, ); - // Identify the dispatch owner from the DB (POD_NAME equals the - // compose service name, so the value here is a valid PodName). const ownerRaw = await pollUntil( async () => (await getThreadRunOwnerPod(threadId)) !== null, - { - timeoutMs: 5_000, - intervalMs: 200, - label: "owner-pod-claimed", - }, + { timeoutMs: 5_000, intervalMs: 200, label: "owner-pod-claimed" }, ) .then(() => getThreadRunOwnerPod(threadId)) .then((v) => v as PodName); expect(["mesh-1", "mesh-2", "mesh-3"]).toContain(ownerRaw); - console.log(` → run owned by ${ownerRaw}; SIGKILLing it`); + console.log(` → run owned by ${ownerRaw}; SIGKILLing then restarting it`); + const killTime = Date.now(); await kill(ownerRaw); - // Survivor watchers (everything except the killed pod) must - // eventually see the final chunk. We don't care which one — any - // single survivor receiving "chunk-20" proves the pump resumed - // on a replay-target pod and JetStream caught it back up. - const survivors = watchers.filter((w) => w.pod.service !== ownerRaw); + // Bring the same pod back up (StatefulSet-style identity). DBOS + // launch-time recovery on this pod scans its workflow_status rows + // for executor_id = POD_NAME with status PENDING and replays them. + await start(ownerRaw); + + const otherWatchers = watchers.filter((w) => w.pod.service !== ownerRaw); + let lateWatcher: Watcher | null = null; try { + // Deterministic gate: a chunk with `t > killTime` came from a + // mock-ai call started AFTER the kill, which can only be the + // DBOS-replayed dispatchRunAndWaitStep on the restarted pod. + // Window covers: pod restart (~30s for clean reboot of mesh in + // docker) + DBOS launch recovery + replay step + first chunks. await pollUntil( - async () => survivors.some((w) => w.joined.includes("chunk-20")), + async () => + otherWatchers.some((w) => { + for (const m of w.joined.matchAll(/t(\d+) chunk-/g)) { + if (Number(m[1]) > killTime) return true; + } + return false; + }), { - // Generous window: DBOS workflow replay fires fast but its - // first claim attempt fails (the dead pod still owns the - // `run_owner_pod` row, claimRunStart rejects). Actual - // recovery comes via the heartbeat watcher's - // claimOrphanedRun + dispatchRunAndWait(isResume:true) - // backstop, which depends on the pod-death-detection - // interval. timeoutMs: 120_000, intervalMs: 500, + label: "replayed-pump-flowing", + }, + ); + + // Open a late /attach AFTER the replayed pump is flowing. + // With the unconditional purge in prepareRun, this watcher sees + // ONE copy of the response. Without it, a late /stream sees the + // dead pod's prefix AND the replay → chunk-1 count == 2. + lateWatcher = openAttachWatcher( + otherWatchers[0]!.pod, + session.orgSlug, + threadId, + session.apiKey, + ); + + await pollUntil( + async () => otherWatchers.some((w) => w.joined.includes("chunk-20")), + { + timeoutMs: 90_000, + intervalMs: 500, label: "final-chunk-after-replay", }, ); + await pollUntil(async () => lateWatcher!.joined.includes("chunk-20"), { + timeoutMs: 20_000, + intervalMs: 200, + label: "late-watcher-saw-final-chunk", + }); + + const chunk1Count = (lateWatcher.joined.match(/chunk-1 /g) ?? []).length; + expect(chunk1Count).toBe(1); } finally { - // Always close out the SSE consumers so the test process exits - // cleanly even on assertion failure. for (const w of watchers) w.abort.abort(); + lateWatcher?.abort.abort(); await Promise.allSettled(watchers.map((w) => w.done)); + if (lateWatcher) await lateWatcher.done.catch(() => {}); } - }, 120_000); + }, 300_000); });