-
Notifications
You must be signed in to change notification settings - Fork 334
Fix requestSnapshot publish ordering #4282
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
2abdc60
e9edc2b
d26eb26
da25045
351c3d1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| --- | ||
| '@electric-sql/client': patch | ||
| --- | ||
|
|
||
| Fix `requestSnapshot()` so it resolves only after the injected snapshot batch has been delivered to subscribers, including async and reentrant subscriber paths. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,31 @@ | ||
| ## PR comment draft | ||
|
|
||
| I narrowed the reentrant publish bypass so it only applies to snapshot injection from `requestSnapshot()`, rather than to any nested publish while `#isPublishing` is true. | ||
|
|
||
| Concretely: | ||
|
|
||
| - `#onMessages(...)` and `#publish(...)` now accept an internal opt-in flag for reentrant bypass | ||
| - ordinary stream batches, including SSE batches, still serialize through `#messageChain` | ||
| - only `requestSnapshot()` calls `#onMessages(..., { allowReentrantPublishBypass: true })` | ||
|
|
||
| This preserves the original deadlock fix: | ||
|
|
||
| - if a subscriber handling batch `M1` does `await requestSnapshot(...)`, the injected snapshot batch can still publish immediately instead of being queued behind the subscriber that is awaiting it | ||
|
|
||
| But it avoids the broader regression: | ||
|
|
||
| - later SSE batches no longer bypass earlier in-flight publishes just because `#isPublishing === true` | ||
|
|
||
| I added tests for both sides of the behavior: | ||
|
|
||
| 1. **SSE ordering regression test** | ||
| - proves a later SSE batch is not delivered before subscriber callbacks for the earlier batch complete | ||
|
|
||
| 2. **Bystander subscriber behavior test** | ||
| - explicitly documents current behavior that a reentrant `requestSnapshot()` may re-enter bystander subscribers before their earlier callback completes | ||
|
|
||
| So the resulting contract is: | ||
|
|
||
| - **ordinary stream traffic remains serialized** | ||
| - **snapshot injection is the only allowed reentrant bypass** | ||
| - **bystander reentrancy during snapshot injection remains allowed and now has test coverage** |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -609,6 +609,12 @@ export class ShapeStream<T extends Row<unknown> = Row> | |
| #tickPromiseResolver?: () => void | ||
| #tickPromiseRejecter?: (reason?: unknown) => void | ||
| #messageChain = Promise.resolve<void[]>([]) // promise chain for incoming messages | ||
| // Tracks when subscriber callbacks are actively being delivered from | ||
| // #messageChain. requestSnapshot can inject a nested batch from inside a | ||
| // subscriber; in that reentrant case #publish uses this as an intentional | ||
| // escape hatch to deliver the nested snapshot batch immediately rather than | ||
| // queueing it behind the subscriber that is awaiting it. | ||
| #isPublishing = false | ||
| #snapshotTracker = new SnapshotTracker() | ||
| #pauseLock: PauseLock | ||
| #currentFetchUrl?: URL // Current fetch URL for computing shape key | ||
|
|
@@ -1374,7 +1380,11 @@ export class ShapeStream<T extends Row<unknown> = Row> | |
| return true | ||
| } | ||
|
|
||
| async #onMessages(batch: Array<Message<T>>, isSseMessage = false) { | ||
| async #onMessages( | ||
| batch: Array<Message<T>>, | ||
| isSseMessage = false, | ||
| opts: { allowReentrantPublishBypass?: boolean } = {} | ||
| ) { | ||
| if (!Array.isArray(batch)) { | ||
| console.warn( | ||
| `[Electric] #onMessages called with non-array argument (${typeof batch}). ` + | ||
|
|
@@ -1426,7 +1436,9 @@ export class ShapeStream<T extends Row<unknown> = Row> | |
| return true // Always process control messages | ||
| }) | ||
|
|
||
| await this.#publish(messagesToProcess) | ||
| await this.#publish(messagesToProcess, { | ||
| allowReentrantBypass: opts.allowReentrantPublishBypass, | ||
|
icehaunter marked this conversation as resolved.
|
||
| }) | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -1718,12 +1730,11 @@ export class ShapeStream<T extends Row<unknown> = Row> | |
| } | ||
| } | ||
|
|
||
| async #publish(messages: Message<T>[]): Promise<void[]> { | ||
| // We process messages asynchronously | ||
| // but SSE's `onmessage` handler is synchronous. | ||
| // We use a promise chain to ensure that the handlers | ||
| // execute sequentially in the order the messages were received. | ||
| this.#messageChain = this.#messageChain.then(() => | ||
| async #publish( | ||
| messages: Message<T>[], | ||
| opts: { allowReentrantBypass?: boolean } = {} | ||
| ): Promise<void[]> { | ||
| const deliver = () => | ||
| Promise.all( | ||
| Array.from(this.#subscribers.values()).map(async ([callback, __]) => { | ||
| try { | ||
|
|
@@ -1735,7 +1746,24 @@ export class ShapeStream<T extends Row<unknown> = Row> | |
| } | ||
| }) | ||
| ) | ||
| ) | ||
|
|
||
| // We process messages asynchronously but SSE's `onmessage` handler is | ||
| // synchronous. Use a promise chain to ensure handlers execute sequentially | ||
| // in the order messages were received. Only requestSnapshot's injected | ||
| // snapshot batch is allowed to bypass the queue reentrantly; ordinary | ||
| // stream batches (including SSE batches) must remain serialized. | ||
| if (this.#isPublishing && opts.allowReentrantBypass) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we even need this whole |
||
| return deliver() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we call
So the per-subscriber invariant "my callback won't be re-entered while a previous invocation is in flight" is broken for any bystander subscriber whose M1 callback is still awaiting something when the reentrant publish fires. The messageChain-level ordering is preserved (next queued batch still waits), but B can observe In practice this may be fine — if subscribers are effectively stateless across awaits, the window is invisible. But a subscriber that mutates |
||
| } | ||
|
|
||
| this.#messageChain = this.#messageChain.then(async () => { | ||
| this.#isPublishing = true | ||
| try { | ||
| return await deliver() | ||
| } finally { | ||
| this.#isPublishing = false | ||
| } | ||
| }) | ||
|
|
||
| return this.#messageChain | ||
| } | ||
|
|
@@ -1901,7 +1929,9 @@ export class ShapeStream<T extends Row<unknown> = Row> | |
| metadata, | ||
| new Set(data.map((message) => message.key)) | ||
| ) | ||
| this.#onMessages(dataWithEndBoundary, false) | ||
| await this.#onMessages(dataWithEndBoundary, false, { | ||
| allowReentrantPublishBypass: true, | ||
| }) | ||
|
|
||
| // On cold start the stream's offset is still at "now". Advance it | ||
| // to the snapshot's position so no updates are missed in between. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should try to avoid agent comment-creep - the flag's function is fairly clear with the comment left when used in code further down
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bump on this