From 2abdc608ebb0f8a99aa4033e6e59231ef29a74c8 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Wed, 6 May 2026 14:27:45 -0600 Subject: [PATCH 1/5] Fix requestSnapshot publish ordering --- packages/typescript-client/src/client.ts | 29 ++- .../typescript-client/test/client.test.ts | 9 +- .../typescript-client/test/stream.test.ts | 180 ++++++++++++++++++ 3 files changed, 207 insertions(+), 11 deletions(-) diff --git a/packages/typescript-client/src/client.ts b/packages/typescript-client/src/client.ts index 58212292dd..2cdb09f82f 100644 --- a/packages/typescript-client/src/client.ts +++ b/packages/typescript-client/src/client.ts @@ -609,6 +609,7 @@ export class ShapeStream = Row> #tickPromiseResolver?: () => void #tickPromiseRejecter?: (reason?: unknown) => void #messageChain = Promise.resolve([]) // promise chain for incoming messages + #isPublishing = false #snapshotTracker = new SnapshotTracker() #pauseLock: PauseLock #currentFetchUrl?: URL // Current fetch URL for computing shape key @@ -1719,11 +1720,7 @@ export class ShapeStream = Row> } async #publish(messages: Message[]): Promise { - // We process messages asynchronously - // but SSE's `onmessage` handler is synchronous. - // We use a promise chain to ensure that the handlers - // execute sequentially in the order the messages were received. - this.#messageChain = this.#messageChain.then(() => + const deliver = () => Promise.all( Array.from(this.#subscribers.values()).map(async ([callback, __]) => { try { @@ -1735,7 +1732,25 @@ export class ShapeStream = Row> } }) ) - ) + + // We process messages asynchronously but SSE's `onmessage` handler is + // synchronous. Use a promise chain to ensure handlers execute sequentially + // in the order messages were received. If a subscriber reentrantly requests + // a snapshot, deliver that nested batch immediately instead of appending it + // behind the currently-running subscriber callback, which would deadlock + // when requestSnapshot awaits publication. + if (this.#isPublishing) { + return deliver() + } + + this.#messageChain = this.#messageChain.then(async () => { + this.#isPublishing = true + try { + return await deliver() + } finally { + this.#isPublishing = false + } + }) return this.#messageChain } @@ -1901,7 +1916,7 @@ export class ShapeStream = Row> metadata, new Set(data.map((message) => message.key)) ) - this.#onMessages(dataWithEndBoundary, false) + await this.#onMessages(dataWithEndBoundary, false) // On cold start the stream's offset is still at "now". Advance it // to the snapshot's position so no updates are missed in between. diff --git a/packages/typescript-client/test/client.test.ts b/packages/typescript-client/test/client.test.ts index f4babae414..94830fc02f 100644 --- a/packages/typescript-client/test/client.test.ts +++ b/packages/typescript-client/test/client.test.ts @@ -1694,10 +1694,11 @@ describe.for(fetchAndSse)( limit: 100, }) - // Wait until shape reflects the snapshot - await vi.waitFor(() => { - expect(shape.currentRows.length).toBe(data.length) - }) + // requestSnapshot must not resolve until subscriber callbacks for the + // injected snapshot batch have completed. Callers such as TanStack DB's + // on-demand loadSubset rely on this to make immediate reads after await + // consistent. + expect(shape.currentRows.length).toBe(data.length) // Compare keys in stream vs returned snapshot data const returnedKeys = new Set(data.map((m) => m.key)) diff --git a/packages/typescript-client/test/stream.test.ts b/packages/typescript-client/test/stream.test.ts index f249a513ab..b1bf11a842 100644 --- a/packages/typescript-client/test/stream.test.ts +++ b/packages/typescript-client/test/stream.test.ts @@ -2,6 +2,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' import { ShapeStream, isChangeMessage, + isControlMessage, Message, Row, _resetHttpWarningForTesting, @@ -24,6 +25,185 @@ describe(`ShapeStream`, () => { afterEach(() => aborter.abort()) + it(`requestSnapshot waits for snapshot messages to be published to subscribers before resolving`, async () => { + const snapshotRow = { + key: `test-1`, + value: { id: `1` }, + headers: { + operation: `insert`, + relation: [`public`, `test`], + }, + offset: `0_0`, + } + + const fetchMock = vi.fn(() => + Promise.resolve( + new Response( + JSON.stringify({ + metadata: { + snapshot_mark: 1, + xmin: `1`, + xmax: `2`, + xip_list: [], + database_lsn: `0`, + }, + data: [snapshotRow], + }), + { + status: 200, + headers: { + 'content-type': `application/json`, + 'electric-handle': `handle-1`, + 'electric-offset': `0_0`, + 'electric-schema': `{"id":{"type":"text"}}`, + }, + } + ) + ) + ) + + const stream = new ShapeStream({ + url: shapeUrl, + params: { table: `test` }, + log: `changes_only`, + signal: aborter.signal, + fetchClient: fetchMock, + subscribe: false, + }) + + let releaseSubscriber!: () => void + const subscriberFinished = new Promise((resolve) => { + releaseSubscriber = resolve + }) + let snapshotRequestResolved = false + let publishedMessages: Message[] = [] + + stream.subscribe(async (messages) => { + if (messages.some(isChangeMessage)) { + publishedMessages = messages + await subscriberFinished + } + }) + + const snapshotRequest = stream.requestSnapshot({ limit: 1 }).then(() => { + snapshotRequestResolved = true + }) + + await resolveInMacrotask(undefined) + expect(snapshotRequestResolved).toBe(false) + + releaseSubscriber() + await snapshotRequest + + expect(publishedMessages.some(isChangeMessage)).toBe(true) + expect( + publishedMessages.some( + (message) => + isControlMessage(message) && + message.headers.control === `snapshot-end` + ) + ).toBe(true) + expect( + publishedMessages.some( + (message) => + isControlMessage(message) && message.headers.control === `subset-end` + ) + ).toBe(true) + }) + + it(`requestSnapshot can be awaited reentrantly from a subscriber`, async () => { + const streamRow = { + key: `stream-1`, + value: { id: `1` }, + headers: { + operation: `insert`, + relation: [`public`, `test`], + }, + offset: `0_0`, + } + const snapshotRow = { + key: `snapshot-1`, + value: { id: `2` }, + headers: { + operation: `insert`, + relation: [`public`, `test`], + }, + offset: `0_1`, + } + + let requestCount = 0 + const fetchMock = vi.fn(() => { + requestCount++ + + if (requestCount === 1) { + return Promise.resolve( + new Response( + JSON.stringify([ + streamRow, + { headers: { control: `up-to-date` }, offset: `0_0` }, + ]), + { + status: 200, + headers: { + 'content-type': `application/json`, + 'electric-handle': `handle-1`, + 'electric-offset': `0_0`, + 'electric-schema': `{"id":{"type":"text"}}`, + }, + } + ) + ) + } + + return Promise.resolve( + new Response( + JSON.stringify({ + metadata: { + snapshot_mark: 1, + xmin: `1`, + xmax: `2`, + xip_list: [], + database_lsn: `0`, + }, + data: [snapshotRow], + }), + { + status: 200, + headers: { + 'content-type': `application/json`, + 'electric-handle': `handle-1`, + 'electric-offset': `0_1`, + 'electric-schema': `{"id":{"type":"text"}}`, + }, + } + ) + ) + }) + + const stream = new ShapeStream({ + url: shapeUrl, + params: { table: `test` }, + log: `changes_only`, + signal: aborter.signal, + fetchClient: fetchMock, + subscribe: false, + }) + + let requestedSnapshot = false + let reentrantSnapshotResolved = false + stream.subscribe(async (messages) => { + if (requestedSnapshot || !messages.some(isChangeMessage)) return + + requestedSnapshot = true + await stream.requestSnapshot({ limit: 1 }) + reentrantSnapshotResolved = true + }) + + await vi.waitFor(() => { + expect(reentrantSnapshotResolved).toBe(true) + }) + }) + it(`should attach specified headers to requests`, async () => { const eventTarget = new EventTarget() const requestArgs: Array = [] From e9edc2b36479905cca5a0706976fcfedba37d0b2 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Wed, 6 May 2026 14:28:14 -0600 Subject: [PATCH 2/5] Add changeset for requestSnapshot fix --- .changeset/fix-requestsnapshot-publish-ordering.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/fix-requestsnapshot-publish-ordering.md diff --git a/.changeset/fix-requestsnapshot-publish-ordering.md b/.changeset/fix-requestsnapshot-publish-ordering.md new file mode 100644 index 0000000000..90e0082d17 --- /dev/null +++ b/.changeset/fix-requestsnapshot-publish-ordering.md @@ -0,0 +1,5 @@ +--- +'@electric-sql/client': patch +--- + +Fix `requestSnapshot()` so it resolves only after the injected snapshot batch has been delivered to subscribers, including async and reentrant subscriber paths. From d26eb26b04c6717e158f564ecfaeb4e03b05efa4 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Wed, 6 May 2026 14:40:49 -0600 Subject: [PATCH 3/5] Document reentrant snapshot publish escape hatch --- packages/typescript-client/src/client.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/packages/typescript-client/src/client.ts b/packages/typescript-client/src/client.ts index 2cdb09f82f..541dfd202f 100644 --- a/packages/typescript-client/src/client.ts +++ b/packages/typescript-client/src/client.ts @@ -609,6 +609,11 @@ export class ShapeStream = Row> #tickPromiseResolver?: () => void #tickPromiseRejecter?: (reason?: unknown) => void #messageChain = Promise.resolve([]) // promise chain for incoming messages + // Tracks when subscriber callbacks are actively being delivered from + // #messageChain. requestSnapshot can inject a nested batch from inside a + // subscriber; in that reentrant case #publish uses this as an intentional + // escape hatch to deliver the nested snapshot batch immediately rather than + // queueing it behind the subscriber that is awaiting it. #isPublishing = false #snapshotTracker = new SnapshotTracker() #pauseLock: PauseLock From da25045951f4ac2b2b4e3c64ac1b0195d5caca47 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Wed, 20 May 2026 13:38:54 -0600 Subject: [PATCH 4/5] Narrow reentrant snapshot publish bypass --- packages/typescript-client/src/client.ts | 28 +- .../typescript-client/test/stream.test.ts | 262 ++++++++++++++++++ 2 files changed, 281 insertions(+), 9 deletions(-) diff --git a/packages/typescript-client/src/client.ts b/packages/typescript-client/src/client.ts index 541dfd202f..615bb53c06 100644 --- a/packages/typescript-client/src/client.ts +++ b/packages/typescript-client/src/client.ts @@ -1380,7 +1380,11 @@ export class ShapeStream = Row> return true } - async #onMessages(batch: Array>, isSseMessage = false) { + async #onMessages( + batch: Array>, + isSseMessage = false, + opts: { allowReentrantPublishBypass?: boolean } = {} + ) { if (!Array.isArray(batch)) { console.warn( `[Electric] #onMessages called with non-array argument (${typeof batch}). ` + @@ -1432,7 +1436,9 @@ export class ShapeStream = Row> return true // Always process control messages }) - await this.#publish(messagesToProcess) + await this.#publish(messagesToProcess, { + allowReentrantBypass: opts.allowReentrantPublishBypass, + }) } /** @@ -1724,7 +1730,10 @@ export class ShapeStream = Row> } } - async #publish(messages: Message[]): Promise { + async #publish( + messages: Message[], + opts: { allowReentrantBypass?: boolean } = {} + ): Promise { const deliver = () => Promise.all( Array.from(this.#subscribers.values()).map(async ([callback, __]) => { @@ -1740,11 +1749,10 @@ export class ShapeStream = Row> // We process messages asynchronously but SSE's `onmessage` handler is // synchronous. Use a promise chain to ensure handlers execute sequentially - // in the order messages were received. If a subscriber reentrantly requests - // a snapshot, deliver that nested batch immediately instead of appending it - // behind the currently-running subscriber callback, which would deadlock - // when requestSnapshot awaits publication. - if (this.#isPublishing) { + // in the order messages were received. Only requestSnapshot's injected + // snapshot batch is allowed to bypass the queue reentrantly; ordinary + // stream batches (including SSE batches) must remain serialized. + if (this.#isPublishing && opts.allowReentrantBypass) { return deliver() } @@ -1921,7 +1929,9 @@ export class ShapeStream = Row> metadata, new Set(data.map((message) => message.key)) ) - await this.#onMessages(dataWithEndBoundary, false) + await this.#onMessages(dataWithEndBoundary, false, { + allowReentrantPublishBypass: true, + }) // On cold start the stream's offset is still at "now". Advance it // to the snapshot's position so no updates are missed in between. diff --git a/packages/typescript-client/test/stream.test.ts b/packages/typescript-client/test/stream.test.ts index b1bf11a842..813cce4ff6 100644 --- a/packages/typescript-client/test/stream.test.ts +++ b/packages/typescript-client/test/stream.test.ts @@ -204,6 +204,268 @@ describe(`ShapeStream`, () => { }) }) + it(`does not deliver a later SSE batch before subscriber callbacks for the earlier batch finish`, async () => { + const batch1Row = { + key: `stream-1`, + value: { id: `1` }, + headers: { + operation: `insert`, + relation: [`public`, `test`], + }, + offset: `0_1`, + } + const batch2Row = { + key: `stream-2`, + value: { id: `2` }, + headers: { + operation: `insert`, + relation: [`public`, `test`], + }, + offset: `0_2`, + } + + const encode = (message: unknown) => + new TextEncoder().encode(`data: ${JSON.stringify(message)}\n\n`) + + let releaseSecondBatch!: () => void + const secondBatchGate = new Promise((resolve) => { + releaseSecondBatch = resolve + }) + + const fetchMock = vi.fn((input: string | URL | Request) => { + const url = input.toString() + + if (url.includes(`live_sse=true`)) { + const stream = new ReadableStream({ + async start(controller) { + controller.enqueue(encode(batch1Row)) + controller.enqueue( + encode({ headers: { control: `up-to-date` }, offset: `0_1` }) + ) + await secondBatchGate + controller.enqueue(encode(batch2Row)) + controller.enqueue( + encode({ headers: { control: `up-to-date` }, offset: `0_2` }) + ) + controller.close() + }, + }) + + return Promise.resolve( + new Response(stream, { + status: 200, + headers: { + 'content-type': `text/event-stream`, + 'electric-handle': `handle-1`, + 'electric-offset': `0_0`, + 'electric-cursor': `cursor-1`, + 'electric-schema': `{"id":{"type":"text"}}`, + }, + }) + ) + } + + return Promise.resolve( + new Response( + JSON.stringify([ + { headers: { control: `up-to-date` }, offset: `0_0` }, + ]), + { + status: 200, + headers: { + 'content-type': `application/json`, + 'electric-handle': `handle-1`, + 'electric-offset': `0_0`, + 'electric-cursor': `cursor-0`, + 'electric-schema': `{"id":{"type":"text"}}`, + }, + } + ) + ) + }) + + const stream = new ShapeStream({ + url: shapeUrl, + params: { table: `test` }, + signal: aborter.signal, + fetchClient: fetchMock, + liveSse: true, + }) + + let releaseFirstBatch!: () => void + const firstBatchBlocked = new Promise((resolve) => { + releaseFirstBatch = resolve + }) + + const events: string[] = [] + let firstBatchSeen = false + + stream.subscribe(async (messages) => { + const change = messages.find(isChangeMessage) + if (!change) return + + if (change.key === `stream-1`) { + events.push(`batch1:start`) + firstBatchSeen = true + releaseSecondBatch() + await firstBatchBlocked + events.push(`batch1:end`) + return + } + + if (change.key === `stream-2`) { + events.push(`batch2:start`) + aborter.abort() + } + }) + + await vi.waitFor(() => { + expect(firstBatchSeen).toBe(true) + }) + + await resolveInMacrotask(undefined) + expect(events).toEqual([`batch1:start`]) + + releaseFirstBatch() + + await vi.waitFor(() => { + expect(events).toEqual([`batch1:start`, `batch1:end`, `batch2:start`]) + }) + }) + + it(`reentrant requestSnapshot may re-enter bystander subscribers before their earlier callback completes`, async () => { + const streamRow = { + key: `stream-1`, + value: { id: `1` }, + headers: { + operation: `insert`, + relation: [`public`, `test`], + }, + offset: `0_0`, + } + const snapshotRow = { + key: `snapshot-1`, + value: { id: `2` }, + headers: { + operation: `insert`, + relation: [`public`, `test`], + }, + offset: `0_1`, + } + + let requestCount = 0 + const fetchMock = vi.fn(() => { + requestCount++ + + if (requestCount === 1) { + return Promise.resolve( + new Response( + JSON.stringify([ + streamRow, + { headers: { control: `up-to-date` }, offset: `0_0` }, + ]), + { + status: 200, + headers: { + 'content-type': `application/json`, + 'electric-handle': `handle-1`, + 'electric-offset': `0_0`, + 'electric-schema': `{"id":{"type":"text"}}`, + }, + } + ) + ) + } + + return Promise.resolve( + new Response( + JSON.stringify({ + metadata: { + snapshot_mark: 1, + xmin: `1`, + xmax: `2`, + xip_list: [], + database_lsn: `0`, + }, + data: [snapshotRow], + }), + { + status: 200, + headers: { + 'content-type': `application/json`, + 'electric-handle': `handle-1`, + 'electric-offset': `0_1`, + 'electric-schema': `{"id":{"type":"text"}}`, + }, + } + ) + ) + }) + + const stream = new ShapeStream({ + url: shapeUrl, + params: { table: `test` }, + log: `changes_only`, + signal: aborter.signal, + fetchClient: fetchMock, + subscribe: false, + }) + + let releaseBystander!: () => void + const blockBystander = new Promise((resolve) => { + releaseBystander = resolve + }) + + const bystanderEvents: string[] = [] + let initialBystanderEntered = false + let requestedSnapshot = false + + stream.subscribe(async (messages) => { + if (requestedSnapshot) return + if ( + !messages.some( + (message) => isChangeMessage(message) && message.key === `stream-1` + ) + ) { + return + } + + requestedSnapshot = true + await stream.requestSnapshot({ limit: 1 }) + }) + + stream.subscribe(async (messages) => { + const change = messages.find(isChangeMessage) + if (!change) return + + if (change.key === `stream-1`) { + bystanderEvents.push(`stream:start`) + initialBystanderEntered = true + await blockBystander + bystanderEvents.push(`stream:end`) + return + } + + if (change.key === `snapshot-1`) { + bystanderEvents.push(`snapshot:start`) + releaseBystander() + aborter.abort() + } + }) + + await vi.waitFor(() => { + expect(initialBystanderEntered).toBe(true) + }) + + await vi.waitFor(() => { + expect(bystanderEvents).toEqual([ + `stream:start`, + `snapshot:start`, + `stream:end`, + ]) + }) + }) + it(`should attach specified headers to requests`, async () => { const eventTarget = new EventTarget() const requestArgs: Array = [] From 351c3d1554464808cb4eb3fe85e7e497344d6b88 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Wed, 20 May 2026 14:06:33 -0600 Subject: [PATCH 5/5] Add requestSnapshot PR comment draft --- docs/requestsnapshot-pr-comment.md | 31 ++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 docs/requestsnapshot-pr-comment.md diff --git a/docs/requestsnapshot-pr-comment.md b/docs/requestsnapshot-pr-comment.md new file mode 100644 index 0000000000..f6f0f8f108 --- /dev/null +++ b/docs/requestsnapshot-pr-comment.md @@ -0,0 +1,31 @@ +## PR comment draft + +I narrowed the reentrant publish bypass so it only applies to snapshot injection from `requestSnapshot()`, rather than to any nested publish while `#isPublishing` is true. + +Concretely: + +- `#onMessages(...)` and `#publish(...)` now accept an internal opt-in flag for reentrant bypass +- ordinary stream batches, including SSE batches, still serialize through `#messageChain` +- only `requestSnapshot()` calls `#onMessages(..., { allowReentrantPublishBypass: true })` + +This preserves the original deadlock fix: + +- if a subscriber handling batch `M1` does `await requestSnapshot(...)`, the injected snapshot batch can still publish immediately instead of being queued behind the subscriber that is awaiting it + +But it avoids the broader regression: + +- later SSE batches no longer bypass earlier in-flight publishes just because `#isPublishing === true` + +I added tests for both sides of the behavior: + +1. **SSE ordering regression test** + - proves a later SSE batch is not delivered before subscriber callbacks for the earlier batch complete + +2. **Bystander subscriber behavior test** + - explicitly documents current behavior that a reentrant `requestSnapshot()` may re-enter bystander subscribers before their earlier callback completes + +So the resulting contract is: + +- **ordinary stream traffic remains serialized** +- **snapshot injection is the only allowed reentrant bypass** +- **bystander reentrancy during snapshot injection remains allowed and now has test coverage**