diff --git a/ably-common b/ably-common index 1236fc2a..4e2ea694 160000 --- a/ably-common +++ b/ably-common @@ -1 +1 @@ -Subproject commit 1236fc2adadaf8d8e0b019d84f7b102646fa48ea +Subproject commit 4e2ea694535182f5966260cb147c1b98d97bbe22 diff --git a/src/core/transport/pipe-stream.ts b/src/core/transport/pipe-stream.ts index c3beb070..e503cc7b 100644 --- a/src/core/transport/pipe-stream.ts +++ b/src/core/transport/pipe-stream.ts @@ -48,6 +48,7 @@ export const pipeStream = async ( new Promise(() => {}); let reason: StreamResult['reason'] = 'complete'; + let caughtError: Error | undefined; try { // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition -- intentional infinite loop broken by return/break @@ -77,8 +78,8 @@ export const pipeStream = async ( } } catch (error) { reason = 'error'; - const errorText = error instanceof Error ? error.message : String(error); - logger?.error('pipeStream(); stream error', { error: errorText }); + caughtError = error instanceof Error ? error : new Error(String(error)); + logger?.error('pipeStream(); stream error', { error: caughtError.message }); try { await encoder.close(); } catch { @@ -91,5 +92,5 @@ export const pipeStream = async ( reader.releaseLock(); } - return { reason }; + return { reason, error: caughtError }; }; diff --git a/src/core/transport/server-transport.ts b/src/core/transport/server-transport.ts index cd682b98..8efb9056 100644 --- a/src/core/transport/server-transport.ts +++ b/src/core/transport/server-transport.ts @@ -328,14 +328,13 @@ class DefaultServerTransport implements ServerTransport[], opts?: AddMessageOptions): Promise => { logger?.trace('Turn.addMessages();', { turnId, count: nodes.length }); @@ -350,35 +349,47 @@ class DefaultServerTransport implements ServerTransport[]): Promise => { logger?.trace('Turn.addEvents();', { turnId, count: nodes.length }); @@ -393,31 +404,42 @@ class DefaultServerTransport implements ServerTransport, streamOpts?: StreamResponseOptions, @@ -455,11 +477,22 @@ class DefaultServerTransport implements ServerTransport => { logger?.trace('Turn.end();', { turnId, reason }); @@ -483,7 +516,6 @@ class DefaultServerTransport implements ServerTransport { onCancel?: (request: CancelRequest) => Promise; /** - * Called with non-fatal errors scoped to this turn. Examples: turn-start - * publish failure, encoder recovery failure, stream encoding errors. + * Called with non-fatal turn-scoped errors that have no other delivery + * path. Fires in two scenarios: + * - Stream failures in `streamResponse` — the underlying error is also + * returned on `StreamResult.error`, but this callback delivers it + * wrapped as an `Ably.ErrorInfo` (code `StreamError`) for standardized + * observability. + * - Failures in the `onCancel` handler. + * + * Publish failures in `start`, `addMessages`, `addEvents`, and `end` + * are not delivered here — those methods reject their returned promise + * with an `Ably.ErrorInfo`, and the caller should handle it at the await + * site. Turn errors never render the transport unusable, but the turn + * may be in an inconsistent state; the caller should typically `end` it + * with reason `'error'`. */ onError?: (error: Ably.ErrorInfo) => void; diff --git a/src/errors.ts b/src/errors.ts index a0278c91..c2c7dea9 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -33,7 +33,7 @@ export enum ErrorCode { CancelListenerError = 104002, /** - * A turn lifecycle event (turn-start or turn-end) failed to publish. + * A publish within a turn failed (lifecycle event, message, or event). */ TurnLifecycleError = 104003, @@ -59,6 +59,13 @@ export enum ErrorCode { * (not ATTACHED or ATTACHING). */ ChannelNotReady = 104007, + + /** + * An error occurred while piping a response stream to the channel — either + * the source event stream threw (e.g. LLM provider rate limit, model error, + * network failure) or an underlying publish failed mid-stream. + */ + StreamError = 104008, } /** diff --git a/test/core/transport/pipe-stream.test.ts b/test/core/transport/pipe-stream.test.ts index b16349b7..9806fade 100644 --- a/test/core/transport/pipe-stream.test.ts +++ b/test/core/transport/pipe-stream.test.ts @@ -258,5 +258,57 @@ describe('pipeStream', () => { const result = await pipeStream(stream, encoder, noSignal); expect(result.reason).toBe('error'); }); + + it('includes the caught error in StreamResult', async () => { + const originalError = new Error('model rate limit exceeded'); + const stream = errorStream([], originalError); + + const result = await pipeStream(stream, encoder, noSignal); + + expect(result.reason).toBe('error'); + expect(result.error).toBe(originalError); + }); + + it('wraps non-Error throws as Error in StreamResult', async () => { + // A stream that throws a non-Error value + const stream = new ReadableStream({ + start: (controller) => { + controller.error('string error'); + }, + }); + + const result = await pipeStream(stream, encoder, noSignal); + + expect(result.reason).toBe('error'); + expect(result.error).toBeInstanceOf(Error); + expect(result.error?.message).toBe('string error'); + }); + }); + + describe('StreamResult.error absence', () => { + it('is undefined when stream completes', async () => { + const stream = streamOf({ type: 'text', text: 'done' }); + + const result = await pipeStream(stream, encoder, noSignal); + + expect(result.reason).toBe('complete'); + expect(result.error).toBeUndefined(); + }); + + it('is undefined when stream is cancelled', async () => { + const controller = new AbortController(); + controller.abort(); + + const stream = new ReadableStream({ + start: () => { + /* paused */ + }, + }); + + const result = await pipeStream(stream, encoder, controller.signal); + + expect(result.reason).toBe('cancelled'); + expect(result.error).toBeUndefined(); + }); }); }); diff --git a/test/core/transport/server-transport.test.ts b/test/core/transport/server-transport.test.ts index 64825295..05b53001 100644 --- a/test/core/transport/server-transport.test.ts +++ b/test/core/transport/server-transport.test.ts @@ -528,7 +528,7 @@ describe('ServerTransport', () => { }); describe('error handling', () => { - it('start() calls onError and throws on publish failure', async () => { + it('start() throws on publish failure without invoking onError', async () => { const failChannel = createMockChannel(); vi.mocked(failChannel.publish).mockRejectedValue(new Error('publish failed')); const onError = vi.fn(); @@ -544,19 +544,92 @@ describe('ServerTransport', () => { }); await expect(turn.start()).rejects.toBeErrorInfoWithCode(ErrorCode.TurnLifecycleError); - expect(onError).toHaveBeenCalled(); + expect(onError).not.toHaveBeenCalled(); failTransport.close(); }); - it('end() calls onError and throws on publish failure', async () => { - const turn = transport.newTurn({ turnId: 'turn-1', onError: vi.fn() }); + it('end() throws on publish failure without invoking onError', async () => { + const onError = vi.fn(); + const turn = transport.newTurn({ turnId: 'turn-1', onError }); await turn.start(); // Make the next publish fail (for turn-end) vi.mocked(channel.publish).mockRejectedValueOnce(new Error('publish failed')); await expect(turn.end('complete')).rejects.toBeErrorInfoWithCode(ErrorCode.TurnLifecycleError); + expect(onError).not.toHaveBeenCalled(); + }); + + it('addMessages() throws on publish failure without invoking onError', async () => { + const onError = vi.fn(); + const failCodec = createMockCodec(); + const failEncoder = createMockEncoder(); + // eslint-disable-next-line @typescript-eslint/unbound-method -- vi mock + vi.mocked(failEncoder.writeMessages).mockRejectedValue(new Error('publish failed')); + // eslint-disable-next-line @typescript-eslint/unbound-method -- vi mock + vi.mocked(failCodec.createEncoder).mockReturnValue(failEncoder); + + const failTransport = createServerTransport({ channel, codec: failCodec }); + const turn = failTransport.newTurn({ turnId: 'turn-1', onError }); + await turn.start(); + + await expect(turn.addMessages([makeNode({ id: 'm1', content: 'hello' })])).rejects.toBeErrorInfoWithCode( + ErrorCode.TurnLifecycleError, + ); + expect(onError).not.toHaveBeenCalled(); + + failTransport.close(); + }); + + it('addEvents() throws on publish failure without invoking onError', async () => { + const onError = vi.fn(); + const failCodec = createMockCodec(); + const failEncoder = createMockEncoder(); + // eslint-disable-next-line @typescript-eslint/unbound-method -- vi mock + vi.mocked(failEncoder.writeEvent).mockRejectedValue(new Error('publish failed')); + // eslint-disable-next-line @typescript-eslint/unbound-method -- vi mock + vi.mocked(failCodec.createEncoder).mockReturnValue(failEncoder); + + const failTransport = createServerTransport({ channel, codec: failCodec }); + const turn = failTransport.newTurn({ turnId: 'turn-1', onError }); + await turn.start(); + + await expect( + turn.addEvents([{ kind: 'event', msgId: 'target-1', events: [{ type: 'ev' }] }]), + ).rejects.toBeErrorInfoWithCode(ErrorCode.TurnLifecycleError); + expect(onError).not.toHaveBeenCalled(); + + failTransport.close(); + }); + + it('streamResponse() calls onError when stream errors', async () => { + const onError = vi.fn(); + const turn = transport.newTurn({ turnId: 'turn-1', onError }); + await turn.start(); + + const stream = new ReadableStream({ + start: (controller) => { + controller.enqueue({ type: 'text', text: 'partial' }); + controller.error(new Error('model rate limit exceeded')); + }, + }); + + const result = await turn.streamResponse(stream); + expect(result.reason).toBe('error'); + expect(result.error).toBeInstanceOf(Error); + expect(onError).toHaveBeenCalledWith(expect.toBeErrorInfo({ code: ErrorCode.StreamError, statusCode: 500 })); + }); + + it('streamResponse() does not call onError when stream completes', async () => { + const onError = vi.fn(); + const turn = transport.newTurn({ turnId: 'turn-1', onError }); + await turn.start(); + + const result = await turn.streamResponse(streamOf({ type: 'text', text: 'done' })); + expect(result.reason).toBe('complete'); + expect(result.error).toBeUndefined(); + expect(onError).not.toHaveBeenCalled(); }); it('onCancel handler error calls onError and does not prevent other turns', async () => {