Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
733331a
fix(decopilot): cross-pod recovery actually fires on pod death
viktormarinho May 17, 2026
b27293e
fix(decopilot): keep purge on resume, guard with duplicate-detection …
viktormarinho May 18, 2026
3b61478
fix(heartbeat): re-arm death detection on NATS reconnect
viktormarinho May 18, 2026
0c01905
Merge remote-tracking branch 'origin/main' into viktormarinho/pod-dea…
viktormarinho May 19, 2026
f8bc4dd
test(multi-pod): clear DBOS workflow state between scenarios
viktormarinho May 19, 2026
f2ab135
refactor(heartbeat): swap NATS KV for Postgres advisory locks
viktormarinho May 19, 2026
fa4898a
test(pod-death): gate late watcher on chunk-10 to avoid kill-window race
viktormarinho May 19, 2026
5c800ed
fix(heartbeat): xact-scoped probe lock, ordered start, rename to stud…
viktormarinho May 20, 2026
05db218
fix(heartbeat): respect DB SSL config + hold xact lock through peer D…
viktormarinho May 20, 2026
38641d4
fix(claimOrphanedRun): real CAS on previous owner
viktormarinho May 20, 2026
0b8ec39
fix(recovery): scope startup orphan sweep to dead-owner threads
viktormarinho May 20, 2026
f4bfb1f
fix(heartbeat): route graceful shutdown through the same death signal
viktormarinho May 20, 2026
7d6af94
refactor: delete heartbeat + ownership, trust DBOS for recovery
viktormarinho May 20, 2026
9343540
fix(claimRunStart): drop pod-bound CAS so DBOS replay can claim
viktormarinho May 20, 2026
c6099d5
feat(settings): refuse to boot in production without POD_NAME
viktormarinho May 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions apps/mesh/migrations/078-studio-pods.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/**
* `studio_pods` — registry of currently-live pod ids.
*
* Each mesh pod inserts its row on boot (`INSERT ... ON CONFLICT DO
* NOTHING`) and deletes on graceful stop. The Postgres-advisory-lock
* heartbeat at `apps/mesh/src/core/pod-heartbeat.ts` (PgPodHeartbeat)
* uses this table to enumerate peer pods to probe — surveying every
* row and trying `pg_try_advisory_xact_lock(hashtext(pod_id))` to
* detect the ones whose owner is gone.
*
* Single-column on purpose: peer-death recovery cares about the set
* of pod ids and nothing else. Any timestamps would just rot since
* the advisory lock IS the liveness signal.
*/
import type { Kysely } from "kysely";

export async function up(db: Kysely<unknown>): Promise<void> {
await db.schema
.createTable("studio_pods")
.addColumn("pod_id", "text", (col) => col.primaryKey())
.execute();
}

export async function down(db: Kysely<unknown>): Promise<void> {
await db.schema.dropTable("studio_pods").execute();
}
2 changes: 2 additions & 0 deletions apps/mesh/migrations/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ import * as migration074sandboxrunnerstatehandlenonunique from "./074-sandbox-ru
import * as migration075threadinflightasyncjobs from "./075-thread-inflight-async-jobs.ts";
import * as migration076automationsdropagentjson from "./076-automations-drop-agent-json.ts";
import * as migration077tieronlymodelselection from "./077-tier-only-model-selection.ts";
import * as migration078studiopods from "./078-studio-pods.ts";

/**
* Core migrations for the Mesh application.
Expand Down Expand Up @@ -167,6 +168,7 @@ const migrations: Record<string, Migration> = {
"075-thread-inflight-async-jobs": migration075threadinflightasyncjobs,
"076-automations-drop-agent-json": migration076automationsdropagentjson,
"077-tier-only-model-selection": migration077tieronlymodelselection,
"078-studio-pods": migration078studiopods,
};

export default migrations;
97 changes: 44 additions & 53 deletions apps/mesh/src/api/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import {
createMeshContextFactory,
} from "../core/context-factory";
import type { MeshContext } from "../core/mesh-context";
import { closeDatabase, getDb, type MeshDatabase } from "../database";
import { closeDatabase, getDb, getSsl, type MeshDatabase } from "../database";
import { asDockerRunner, getSharedRunnerIfInit } from "../sandbox/lifecycle";
import { createEventBus, type EventBus } from "../event-bus";
import {
Expand Down Expand Up @@ -119,7 +119,7 @@ import {
toModelsConfig,
} from "./routes/decopilot/run-config";
import { getPodId } from "../core/pod-identity";
import { NatsPodHeartbeat } from "../nats/pod-heartbeat";
import { PgPodHeartbeat, type PodHeartbeat } from "../core/pod-heartbeat";
import { createAutomationsStorage } from "../storage/automations";
import { KyselyKVStorage } from "../storage/kv";
import { KyselyTriggerCallbackTokenStorage } from "../storage/trigger-callback-tokens";
Expand Down Expand Up @@ -748,7 +748,7 @@ export async function createApp(options: CreateAppOptions = {}) {
})();
},
createTailStream: async () => null,
purge: () => {},
purge: async () => {},
teardown: () => {},
};
} else {
Expand Down Expand Up @@ -839,38 +839,30 @@ export async function createApp(options: CreateAppOptions = {}) {
);
});

// Per-pod heartbeat via NATS KV (only when NATS is available)
let podHeartbeat: NatsPodHeartbeat | null = null;
if (natsProvider) {
podHeartbeat = new NatsPodHeartbeat({
getConnection: () => natsProvider!.getConnection(),
getJetStream: () => natsProvider!.getJetStream(),
// Per-pod heartbeat via Postgres advisory locks. The dedicated
// long-lived connection's lifetime IS the liveness signal — when this
// pod's process dies, Postgres releases the lock instantly via TCP
// close, and survivor pods detect that on the next poll tick (~5s).
// No NATS dependency; recovery still fires even when NATS is degraded.
const podHeartbeat: PodHeartbeat = new PgPodHeartbeat({
connectionString: getSettings().databaseUrl,
ssl: getSsl(),
pool: database.pool,
});
podHeartbeat
.init()
.then(() => podHeartbeat.start(POD_ID))
.then(() => {
console.log(`[PodHeartbeat] Started (pod=${POD_ID})`);
})
.catch((err: unknown) => {
Comment thread
cubic-dev-ai[bot] marked this conversation as resolved.
Outdated
console.error("[PodHeartbeat] Init failed:", err);
});

// Attempt immediate init (may no-op if NATS not ready)
podHeartbeat
.init()
.then(() => {
podHeartbeat!.start(POD_ID);
})
.catch(() => {});

// Re-init when NATS connects
natsProvider.onReady(() => {
podHeartbeat!
.init()
.then(() => {
podHeartbeat!.start(POD_ID);
})
.catch((err: unknown) => {
console.error("[PodHeartbeat] Deferred init failed:", err);
});
});
}

currentDecopilotCleanup = async () => {
// Delete KV key first → watcher fires on other pods → immediate handoff
await podHeartbeat?.stop();
// Release the liveness lock + remove from registry first → survivors
// see us gone on their next poll, no false-alive window.
await podHeartbeat.stop();
await runRegistry.stopAll();
runRegistry.dispose();
cancelBroadcast.stop().catch(() => {});
Expand Down Expand Up @@ -1276,13 +1268,14 @@ export async function createApp(options: CreateAppOptions = {}) {
thread.organization_id,
);

// Pod-death recovery: a different pod's run was claimed by us. Drain
// synchronously to know when the run completes server-side. We
// deliberately don't pass a streamBuffer here — this background
// recovery is the safety net for threads no DBOS replay or attached
// client picks up; clients reconnecting via /stream see the run via
// the workflow's own JetStream pump on the pod that DBOS replayed it
// onto.
// Pod-death recovery: a different pod's run was claimed by us.
// Drain synchronously to know when the run completes server-side
// AND pump chunks into JetStream via `streamBuffer` so any /stream
// tails on survivor pods continue to receive data — that's the
// entire user-visible win of taking over a dead pod's work. The
// previous omission of `streamBuffer` here meant the resumed run
// ran but its chunks went nowhere visible, defeating the point of
// the recovery path.
await dispatchRunAndWait(
{
messages: [],
Expand All @@ -1298,23 +1291,21 @@ export async function createApp(options: CreateAppOptions = {}) {
isResume: true,
},
resumeCtx,
{ runRegistry, cancelBroadcast },
{ runRegistry, cancelBroadcast, streamBuffer },
);
};

// Wire pod death watcher → orphan recovery
if (podHeartbeat) {
podHeartbeat.onPodDeath((deadPodId) => {
runRegistry
.handlePodDeath(deadPodId, resumeOrphanedThread, cancelBroadcast)
.catch((err) => {
console.error(
`[Decopilot] Pod death recovery failed for ${deadPodId}:`,
err,
);
});
});
}
// Wire pod death watcher → orphan recovery.
podHeartbeat.onPodDeath((deadPodId) => {
runRegistry
.handlePodDeath(deadPodId, resumeOrphanedThread, cancelBroadcast)
.catch((err) => {
console.error(
`[Decopilot] Pod death recovery failed for ${deadPodId}:`,
err,
);
});
});

setTimeout(() => {
runRegistry.recoverOrphanedRuns(resumeOrphanedThread).catch((err) => {
Expand Down
20 changes: 18 additions & 2 deletions apps/mesh/src/api/routes/decopilot/dispatch-run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -558,8 +558,24 @@ async function prepareRun(
}
}

// Purge stale buffered chunks from any previous run on this thread
streamBuffer?.purge(mem.thread.id);
// Purge stale buffered chunks from any previous run on this thread.
// Always purges — including on resume — because the resumed run
// re-invokes the LLM from scratch and produces chunk-1..chunk-N
// again. Without this, any /stream opened after recovery starts
// would see the assistant's reply twice (deliverPolicy:"all"
// replays the dead-pod prefix and then the resumed run's full
// body). Regression guard:
// tests/multi-pod/scenarios/pod-death-dbos-replay.test.ts.
//
// ⚠️ Known UX gap, not addressed here: a /stream that was already
// tailing when the owner pod died will still receive the dead
// pod's prefix from its local consumer buffer (the purge is
// server-side and doesn't reach into already-delivered messages)
// AND the resumed run's full body afterwards — so it sees the
// reply rendered twice. A proper fix would publish a "reset"
// sentinel to the subject before the resume pump starts so all
// consumers flush their UI buffer; left as a follow-up.
await streamBuffer?.purge(mem.thread.id);

// Split system messages from user message
const systemMessages = input.messages.filter((m) => m.role === "system");
Expand Down
12 changes: 8 additions & 4 deletions apps/mesh/src/api/routes/decopilot/nats-stream-buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -249,11 +249,15 @@ export class NatsStreamBuffer implements StreamBuffer {
});
}

purge(taskId: string): void {
async purge(taskId: string): Promise<void> {
if (!this.jsm) return;
this.jsm.streams
.purge(STREAM_NAME, { filter: streamSubject(taskId) })
.catch(() => {});
try {
await this.jsm.streams.purge(STREAM_NAME, {
filter: streamSubject(taskId),
});
} catch {
// Best-effort cleanup; never propagate to the caller.
}
}

teardown(): void {
Expand Down
4 changes: 2 additions & 2 deletions apps/mesh/src/api/routes/decopilot/run-reactor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ async function handleTerminalStatus(
run_config: null,
run_started_at: null,
});
streamBuffer.purge(taskId);
void streamBuffer.purge(taskId);
sseHub.emit(
orgId,
createDecopilotThreadStatusEvent(taskId, status, {
Expand Down Expand Up @@ -168,7 +168,7 @@ async function react(event: RunEvent, deps: RunReactorDeps): Promise<void> {
run_started_at: null,
});
}
streamBuffer.purge(event.taskId);
void streamBuffer.purge(event.taskId);
const failedThread = await storage.get(event.taskId, event.orgId);
sseHub.emit(
event.orgId,
Expand Down
21 changes: 20 additions & 1 deletion apps/mesh/src/api/routes/decopilot/run-registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,14 @@ export class RunRegistry {
const batch = orphans.slice(i, i + CONCURRENCY);
await Promise.allSettled(
batch.map(async (thread) => {
// CAS on the thread's current owner so concurrent claimers
// serialize. First to flip the column wins; the rest see the
// new owner and bail.
const claimed = await this.deps.storage.claimOrphanedRun(
thread.id,
thread.organization_id,
this.podId,
thread.run_owner_pod ?? null,
);
if (!claimed) return; // Another pod got it

Expand Down Expand Up @@ -211,7 +215,15 @@ export class RunRegistry {
cancelBroadcast?: { broadcast(taskId: string): void },
): Promise<void> {
const orphans = await this.deps.storage.listOrphanedRunsByPod(deadPodId);
if (orphans.length === 0) return;
if (orphans.length === 0) {
console.log(
`[RunRegistry] handlePodDeath(${deadPodId}): no orphans to recover`,
);
return;
}
console.log(
`[RunRegistry] handlePodDeath(${deadPodId}): recovering ${orphans.length} orphan(s) on pod ${this.podId}`,
);

// Cancel running threads on the dead pod (in case it's alive but partitioned)
for (const thread of orphans) {
Expand All @@ -224,10 +236,17 @@ export class RunRegistry {
await Promise.allSettled(
batch.map(async (thread) => {
if (this.isRunning(thread.id)) return;
// CAS on `deadPodId` so the first survivor wins. The race
// ISN'T hypothetical: with parallel heartbeat pollers on
// every survivor, two pods regularly detect the same dead
// peer within the same tick. Without this CAS both claim,
// both dispatch a resumed run, and the per-thread JetStream
// subject gets two copies of every chunk.
const claimed = await this.deps.storage.claimOrphanedRun(
thread.id,
thread.organization_id,
this.podId,
deadPodId,
);
if (!claimed) return;

Expand Down
10 changes: 9 additions & 1 deletion apps/mesh/src/api/routes/decopilot/stream-buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,15 @@ export interface StreamBuffer {
): Promise<ReadableStream | null>;

/** Purge buffered data for a thread (best-effort, fire-and-forget). */
purge(taskId: string): void;
/**
* Remove all buffered chunks for `taskId`. Returns a Promise so
* callers in the dispatch hot path can `await` purge completion
* before publishing fresh chunks — without that ordering, a /stream
* tail that subscribes during the gap can replay the stale prefix.
* Caller errors are swallowed: purge is best-effort cleanup, not
* load-bearing for any single message's delivery.
*/
purge(taskId: string): Promise<void>;

/** Release resources (clear references, called on shutdown). */
teardown(): void;
Expand Down
Loading
Loading