Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
733331a
fix(decopilot): cross-pod recovery actually fires on pod death
viktormarinho May 17, 2026
b27293e
fix(decopilot): keep purge on resume, guard with duplicate-detection …
viktormarinho May 18, 2026
3b61478
fix(heartbeat): re-arm death detection on NATS reconnect
viktormarinho May 18, 2026
0c01905
Merge remote-tracking branch 'origin/main' into viktormarinho/pod-dea…
viktormarinho May 19, 2026
f8bc4dd
test(multi-pod): clear DBOS workflow state between scenarios
viktormarinho May 19, 2026
f2ab135
refactor(heartbeat): swap NATS KV for Postgres advisory locks
viktormarinho May 19, 2026
fa4898a
test(pod-death): gate late watcher on chunk-10 to avoid kill-window race
viktormarinho May 19, 2026
5c800ed
fix(heartbeat): xact-scoped probe lock, ordered start, rename to stud…
viktormarinho May 20, 2026
05db218
fix(heartbeat): respect DB SSL config + hold xact lock through peer D…
viktormarinho May 20, 2026
38641d4
fix(claimOrphanedRun): real CAS on previous owner
viktormarinho May 20, 2026
0b8ec39
fix(recovery): scope startup orphan sweep to dead-owner threads
viktormarinho May 20, 2026
f4bfb1f
fix(heartbeat): route graceful shutdown through the same death signal
viktormarinho May 20, 2026
7d6af94
refactor: delete heartbeat + ownership, trust DBOS for recovery
viktormarinho May 20, 2026
9343540
fix(claimRunStart): drop pod-bound CAS so DBOS replay can claim
viktormarinho May 20, 2026
c6099d5
feat(settings): refuse to boot in production without POD_NAME
viktormarinho May 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 27 additions & 13 deletions apps/mesh/src/api/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -858,20 +858,33 @@ export async function createApp(options: CreateAppOptions = {}) {
getJetStream: () => natsProvider!.getJetStream(),
});

// Attempt immediate init (may no-op if NATS not ready)
// Attempt immediate init (may no-op if NATS not ready). Surface
// any real error — silent swallow here historically hid bucket-
// creation races and turned pod-death recovery into "mostly works".
podHeartbeat
.init()
.then(() => {
podHeartbeat!.start(POD_ID);
if (podHeartbeat!.isReady()) {
podHeartbeat!.start(POD_ID);
console.log(`[PodHeartbeat] Started (pod=${POD_ID})`);
}
})
.catch(() => {});
.catch((err: unknown) => {
console.error("[PodHeartbeat] Initial init failed:", err);
});

// Re-init when NATS connects
// Re-init when NATS connects (or reconnects). Idempotent — start()
// is a no-op if the heartbeat is already running.
natsProvider.onReady(() => {
podHeartbeat!
.init()
.then(() => {
podHeartbeat!.start(POD_ID);
if (podHeartbeat!.isReady()) {
podHeartbeat!.start(POD_ID);
console.log(
`[PodHeartbeat] Started after NATS ready (pod=${POD_ID})`,
);
}
})
.catch((err: unknown) => {
console.error("[PodHeartbeat] Deferred init failed:", err);
Expand Down Expand Up @@ -1287,13 +1300,14 @@ export async function createApp(options: CreateAppOptions = {}) {
thread.organization_id,
);

// Pod-death recovery: a different pod's run was claimed by us. Drain
// synchronously to know when the run completes server-side. We
// deliberately don't pass a streamBuffer here — this background
// recovery is the safety net for threads no DBOS replay or attached
// client picks up; clients reconnecting via /attach see the run via
// the workflow's own JetStream pump on the pod that DBOS replayed it
// onto.
// Pod-death recovery: a different pod's run was claimed by us.
// Drain synchronously to know when the run completes server-side
// AND pump chunks into JetStream via `streamBuffer` so any /attach
// tails on survivor pods continue to receive data — that's the
// entire user-visible win of taking over a dead pod's work. The
// previous omission of `streamBuffer` here meant the resumed run
// ran but its chunks went nowhere visible, defeating the point of
// the recovery path.
await dispatchRunAndWait(
{
messages: [],
Expand All @@ -1309,7 +1323,7 @@ export async function createApp(options: CreateAppOptions = {}) {
isResume: true,
},
resumeCtx,
{ runRegistry, cancelBroadcast },
{ runRegistry, cancelBroadcast, streamBuffer },
);
};

Expand Down
13 changes: 11 additions & 2 deletions apps/mesh/src/api/routes/decopilot/dispatch-run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -515,8 +515,17 @@ async function prepareRun(
}
}

// Purge stale buffered chunks from any previous run on this thread
streamBuffer?.purge(mem.thread.id);
// Purge stale buffered chunks from any previous run on this thread.
// Skip on resume: this dispatch is a continuation of a still-active
// run (typically pod-death recovery via runRegistry.handlePodDeath),
// and any /attach clients tailing the per-thread subject would lose
// their place — including the prefix chunks pumped by the previous
// owner pod before it died. The about-to-start pump will publish
// fresh chunks alongside whatever's already buffered, and the
// consumer's deduplication of `done` sentinels handles the overlap.
if (!input.isResume) {
streamBuffer?.purge(mem.thread.id);
}

// Split system messages from user message
const systemMessages = input.messages.filter((m) => m.role === "system");
Expand Down
10 changes: 9 additions & 1 deletion apps/mesh/src/api/routes/decopilot/run-registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,15 @@ export class RunRegistry {
cancelBroadcast?: { broadcast(taskId: string): void },
): Promise<void> {
const orphans = await this.deps.storage.listOrphanedRunsByPod(deadPodId);
if (orphans.length === 0) return;
if (orphans.length === 0) {
console.log(
`[RunRegistry] handlePodDeath(${deadPodId}): no orphans to recover`,
);
return;
}
console.log(
`[RunRegistry] handlePodDeath(${deadPodId}): recovering ${orphans.length} orphan(s) on pod ${this.podId}`,
);

// Cancel running threads on the dead pod (in case it's alive but partitioned)
for (const thread of orphans) {
Expand Down
98 changes: 87 additions & 11 deletions apps/mesh/src/nats/pod-heartbeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,22 @@
* Per-Pod Heartbeat via NATS KV
*
* A single KV key per pod, refreshed on a timer, with bucket-level TTL.
* When a pod dies (hard kill) or shuts down (graceful), its key expires/deletes
* and watchers on other pods are notified immediately.
* Survivor pods learn of dead peers via two complementary mechanisms:
*
* O(1) writes per pod regardless of thread count.
* - **Watch** (`kv.watch()`) — fires synchronously on explicit
* `kv.delete()` from a graceful shutdown. Near-instant for clean
* stops but doesn't fire on TTL-based expiry (NATS' background
* cleanup removes expired KV entries server-side without emitting
* any consumer-visible operation).
*
* - **Poll** (`kv.keys()` on a timer) — closes the gap for hard
* kills (SIGKILL, OOM, network partition) where the dying pod
* never gets to send the DEL. Diffs successive scans; anything
* in the previous snapshot but not the current one is treated as
* dead.
*
* O(1) writes per pod regardless of thread count; O(pods) per poll
* tick.
*/

import type { JetStreamClient, NatsConnection, KV } from "nats";
Expand All @@ -14,9 +26,12 @@ import { StorageType } from "nats";
const BUCKET_NAME = "POD_HEARTBEATS";
const BUCKET_TTL_MS = 45_000; // Key expires 45s after last refresh
const REFRESH_INTERVAL_MS = 10_000; // Refresh every 10s
const POLL_INTERVAL_MS = 10_000; // Scan for vanished keys every 10s

export interface PodHeartbeat {
init(): Promise<void>;
/** True once `init()` has successfully created/opened the KV bucket. */
isReady(): boolean;
start(podId: string): void;
/** Watch for pod deaths. Callback receives the dead podId. */
onPodDeath(callback: (deadPodId: string) => void): void;
Expand All @@ -32,9 +47,12 @@ export class NatsPodHeartbeat implements PodHeartbeat {
private kv: KV | null = null;
private podId: string | null = null;
private refreshTimer: ReturnType<typeof setInterval> | null = null;
private pollTimer: ReturnType<typeof setInterval> | null = null;
private knownPods: Set<string> = new Set();
private watchAbortController: AbortController | null = null;
private initPromise: Promise<void> | null = null;
private pendingDeathCallback: ((deadPodId: string) => void) | null = null;
private deathCallback: ((deadPodId: string) => void) | null = null;

constructor(private readonly deps: NatsPodHeartbeatDeps) {}

Expand All @@ -48,6 +66,11 @@ export class NatsPodHeartbeat implements PodHeartbeat {
clearInterval(this.refreshTimer);
this.refreshTimer = null;
}
if (this.pollTimer) {
clearInterval(this.pollTimer);
this.pollTimer = null;
}
this.knownPods = new Set();
this.kv = null;
this.initPromise = null;

Expand All @@ -68,6 +91,10 @@ export class NatsPodHeartbeat implements PodHeartbeat {
return this.initPromise;
}

isReady(): boolean {
return this.kv !== null;
}

start(podId: string): void {
if (!this.kv) return; // Not initialized — skip heartbeat
if (this.refreshTimer) return; // Already running — prevent double start
Expand All @@ -88,17 +115,65 @@ export class NatsPodHeartbeat implements PodHeartbeat {
// Activate deferred death watcher if registered before init
if (this.pendingDeathCallback) {
this.startDeathWatcher(this.pendingDeathCallback);
this.startDeathPoller();
Comment thread
cubic-dev-ai[bot] marked this conversation as resolved.
Outdated
this.pendingDeathCallback = null;
}
}

onPodDeath(callback: (deadPodId: string) => void): void {
this.deathCallback = callback;
if (!this.kv) {
// Store callback — will activate when start() runs after init()
this.pendingDeathCallback = callback;
return;
}
this.startDeathWatcher(callback);
this.startDeathPoller();
}

/**
* Periodic scan that catches deaths the watcher misses. NATS KV
* TTL-based key expiry happens server-side without emitting any
* notification on the consumer — so a SIGKILL'd pod's key just
* vanishes from the bucket silently. We list keys on a timer,
* diff against the previous snapshot, and treat absences as
* deaths.
*/
private startDeathPoller(): void {
if (this.pollTimer) return;
if (!this.kv) return;
const kv = this.kv;

const tick = async () => {
try {
const live = new Set<string>();
const iter = await kv.keys();
for await (const key of iter) {
live.add(key);
}
// First tick: just record the baseline. Don't fire deaths for
// pods we never knew about — the watcher's initial sync
// covered those.
if (this.knownPods.size === 0) {
this.knownPods = live;
return;
}
for (const prev of this.knownPods) {
if (!live.has(prev) && prev !== this.podId && this.deathCallback) {
console.log(`[PodHeartbeat:poll] detected vanished key: ${prev}`);
this.deathCallback(prev);
}
}
// Update snapshot — additions (new pods joining) become
// tracked, vanished pods drop out.
this.knownPods = live;
} catch (err) {
console.error("[PodHeartbeat:poll] scan failed:", err);
}
};
// Fire one immediate tick to seed the baseline, then on interval.
tick();
this.pollTimer = setInterval(tick, POLL_INTERVAL_MS);
}

private startDeathWatcher(callback: (deadPodId: string) => void): void {
Expand All @@ -112,17 +187,14 @@ export class NatsPodHeartbeat implements PodHeartbeat {
const startWatcher = async () => {
while (!signal.aborted) {
try {
const watcher = await kv.watch({
// Watch all keys
initializedFn: () => {
// Initial values loaded, now watching for changes
},
});
const watcher = await kv.watch();

for await (const entry of watcher) {
if (signal.aborted) break;

// DEL = explicit delete, PURGE = TTL expiry
// DEL fires on explicit graceful kv.delete(); PURGE on
// explicit kv.purge(). TTL-based expiry doesn't fire
// either — the poller covers that case.
if (entry.operation === "DEL" || entry.operation === "PURGE") {
const deadPodId = entry.key;
// Don't notify about own pod death
Expand Down Expand Up @@ -150,11 +222,15 @@ export class NatsPodHeartbeat implements PodHeartbeat {
}

async stop(): Promise<void> {
// 1. Stop refresh timer
// 1. Stop refresh + poll timers
if (this.refreshTimer) {
clearInterval(this.refreshTimer);
this.refreshTimer = null;
}
if (this.pollTimer) {
clearInterval(this.pollTimer);
this.pollTimer = null;
}

// 2. Delete own key (triggers watcher on other pods immediately)
if (this.kv && this.podId) {
Expand Down
7 changes: 7 additions & 0 deletions tests/multi-pod/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,14 @@ services:
postgres:
condition: service_healthy

# mesh-1 is the build target. mesh-2 and mesh-3 reference the same
# `image:` tag so all three pods share one studio image and one build
# — Compose 1.x would otherwise build the same context three times,
# producing identical-but-separately-tagged images and confusing
# `docker compose build mesh-1` (the others would still point at
# their stale per-service images).
mesh-1: &mesh
image: multi-pod-studio:latest
build:
context: ../../
dockerfile: tests/resilience/Dockerfile.studio
Expand Down
20 changes: 10 additions & 10 deletions tests/multi-pod/scenarios/pod-death-dbos-replay.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ function openAttachWatcher(
}

describe("pod-death + DBOS replay", () => {
test.skip("killing the owning pod mid-stream still delivers the final chunk via a survivor", async () => {
test("killing the owning pod mid-stream still delivers the final chunk via a survivor", async () => {
const session = await bootstrapSession(PODS.MESH_1);
await wireMockProvider(PODS.MESH_1, session);
const { virtualMcpId } = await createTestAgent(PODS.MESH_1, session);
Expand Down Expand Up @@ -217,14 +217,11 @@ describe("pod-death + DBOS replay", () => {
await pollUntil(
async () => survivors.some((w) => w.joined.includes("chunk-20")),
{
// Generous window: DBOS workflow replay fires fast but its
// first claim attempt fails (the dead pod still owns the
// `run_owner_pod` row, claimRunStart rejects). Actual
// recovery comes via the heartbeat watcher's
// claimOrphanedRun + dispatchRunAndWait(isResume:true)
// backstop, which depends on the pod-death-detection
// interval.
timeoutMs: 120_000,
// Window has to cover: heartbeat KV TTL (45s) + NATS purge
// sweep + handlePodDeath claim + dispatchRunAndWait resume +
// ~2.5s of mock chunks ≈ 55s worst case. 90s leaves comfortable
// margin for cold-boot variance without dragging green runs.
timeoutMs: 90_000,
intervalMs: 500,
label: "final-chunk-after-replay",
},
Expand All @@ -235,5 +232,8 @@ describe("pod-death + DBOS replay", () => {
for (const w of watchers) w.abort.abort();
await Promise.allSettled(watchers.map((w) => w.done));
}
}, 120_000);
// Test budget exceeds pollUntil by a safety margin so the finally
// above always runs (cleanly aborts SSE consumers) instead of
// racing the bun-test-level timeout.
}, 150_000);
});
Loading