Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ably-common
7 changes: 4 additions & 3 deletions src/core/transport/pipe-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ export const pipeStream = async <TEvent, TMessage>(
new Promise<void>(() => {});

let reason: StreamResult['reason'] = 'complete';
let caughtError: Error | undefined;

try {
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition -- intentional infinite loop broken by return/break
Expand Down Expand Up @@ -77,8 +78,8 @@ export const pipeStream = async <TEvent, TMessage>(
}
} catch (error) {
reason = 'error';
const errorText = error instanceof Error ? error.message : String(error);
logger?.error('pipeStream(); stream error', { error: errorText });
caughtError = error instanceof Error ? error : new Error(String(error));
logger?.error('pipeStream(); stream error', { error: caughtError.message });
try {
await encoder.close();
} catch {
Expand All @@ -91,5 +92,5 @@ export const pipeStream = async <TEvent, TMessage>(
reader.releaseLock();
}

return { reason };
return { reason, error: caughtError };
};
116 changes: 74 additions & 42 deletions src/core/transport/server-transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -328,14 +328,13 @@ class DefaultServerTransport<TEvent, TMessage> implements ServerTransport<TEvent
error instanceof Ably.ErrorInfo ? error : undefined,
);
logger?.error('Turn.start(); failed to publish turn-start', { turnId });
turnOnError?.(errInfo);
throw errInfo;
}

logger?.debug('Turn.start(); turn started', { turnId });
},

// Spec: AIT-ST5, AIT-ST5a, AIT-ST5b
// Spec: AIT-ST5, AIT-ST5a, AIT-ST5b, AIT-ST5c
Comment thread
zknill marked this conversation as resolved.
addMessages: async (nodes: MessageNode<TMessage>[], opts?: AddMessageOptions): Promise<AddMessagesResult> => {
logger?.trace('Turn.addMessages();', { turnId, count: nodes.length });

Expand All @@ -350,35 +349,47 @@ class DefaultServerTransport<TEvent, TMessage> implements ServerTransport<TEvent

const msgIds: string[] = [];

for (const node of nodes) {
// Build transport headers from the node's typed fields, then merge
// any extra headers from the node (e.g. domain-specific headers).
const headers = mergeHeaders(
buildTransportHeaders({
role: 'user',
turnId,
msgId: node.msgId,
turnClientId: opts?.clientId,
parent: node.parentId ?? turnParent,
forkOf: node.forkOf ?? turnForkOf,
}),
node.headers,
);

const encoder = codec.createEncoder(channel, {
extras: { headers },
onMessage,
});
try {
for (const node of nodes) {
// Build transport headers from the node's typed fields, then merge
// any extra headers from the node (e.g. domain-specific headers).
const headers = mergeHeaders(
buildTransportHeaders({
role: 'user',
turnId,
msgId: node.msgId,
turnClientId: opts?.clientId,
parent: node.parentId ?? turnParent,
forkOf: node.forkOf ?? turnForkOf,
}),
node.headers,
);

const encoder = codec.createEncoder(channel, {
extras: { headers },
onMessage,
});

await encoder.writeMessages([node.message], opts?.clientId ? { clientId: opts.clientId } : undefined);
await encoder.writeMessages([node.message], opts?.clientId ? { clientId: opts.clientId } : undefined);

msgIds.push(node.msgId);
msgIds.push(node.msgId);
}
} catch (error) {
const errInfo = new Ably.ErrorInfo(
`unable to publish messages for turn ${turnId}; ${error instanceof Error ? error.message : String(error)}`,
ErrorCode.TurnLifecycleError,
500,
error instanceof Ably.ErrorInfo ? error : undefined,
);
logger?.error('Turn.addMessages(); publish failed', { turnId });
throw errInfo;
Comment on lines +378 to +385
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should expose this error twice; once in the onError callback and once thrown from here. 🤔

}

logger?.debug('Turn.addMessages(); messages published', { turnId, count: nodes.length });
return { msgIds };
},

// Spec: AIT-ST5c
addEvents: async (nodes: EventsNode<TEvent>[]): Promise<void> => {
logger?.trace('Turn.addEvents();', { turnId, count: nodes.length });

Expand All @@ -393,31 +404,42 @@ class DefaultServerTransport<TEvent, TMessage> implements ServerTransport<TEvent

const turnOwnerClientId = turnManager.getClientId(turnId);

for (const node of nodes) {
const headers = buildTransportHeaders({
role: 'assistant',
turnId,
msgId: node.msgId,
turnClientId: turnOwnerClientId,
amend: node.msgId,
});
try {
for (const node of nodes) {
const headers = buildTransportHeaders({
role: 'assistant',
turnId,
msgId: node.msgId,
turnClientId: turnOwnerClientId,
amend: node.msgId,
});

const encoder = codec.createEncoder(channel, {
extras: { headers },
onMessage,
});
const encoder = codec.createEncoder(channel, {
extras: { headers },
onMessage,
});

for (const event of node.events) {
await encoder.writeEvent(event);
}
for (const event of node.events) {
await encoder.writeEvent(event);
}

await encoder.close();
await encoder.close();
}
} catch (error) {
const errInfo = new Ably.ErrorInfo(
`unable to publish events for turn ${turnId}; ${error instanceof Error ? error.message : String(error)}`,
ErrorCode.TurnLifecycleError,
500,
error instanceof Ably.ErrorInfo ? error : undefined,
);
logger?.error('Turn.addEvents(); publish failed', { turnId });
throw errInfo;
}

logger?.debug('Turn.addEvents(); events published', { turnId, count: nodes.length });
},

// Spec: AIT-ST6, AIT-ST6a, AIT-ST6b, AIT-ST6b1, AIT-ST6b2, AIT-ST6b3, AIT-ST6c
// Spec: AIT-ST6, AIT-ST6a, AIT-ST6b, AIT-ST6b1, AIT-ST6b2, AIT-ST6b3, AIT-ST6b4, AIT-ST6c
streamResponse: async (
stream: ReadableStream<TEvent>,
streamOpts?: StreamResponseOptions,
Expand Down Expand Up @@ -455,11 +477,22 @@ class DefaultServerTransport<TEvent, TMessage> implements ServerTransport<TEvent

const result = await pipeStream(stream, encoder, signal, onAbort, logger);

if (result.error) {
const errInfo = new Ably.ErrorInfo(
`unable to stream response for turn ${turnId}; ${result.error.message}`,
ErrorCode.StreamError,
500,
result.error instanceof Ably.ErrorInfo ? result.error : undefined,
);
Comment on lines +481 to +486
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This re-wraps errors thrown from encoder.appendEvent in pipeStream (i.e. ably publishes) but the js doc describes the ErrorCode.StreamError as though it's only an error from the LLM provider.

  /**
   * The source event stream threw during piping (e.g. LLM provider rate
   * limit, model error, network failure).
   */

logger?.error('Turn.streamResponse(); stream error', { turnId });
turnOnError?.(errInfo);
}

logger?.debug('Turn.streamResponse(); stream finished', { turnId, reason: result.reason });
return result;
},

// Spec: AIT-ST7, AIT-ST7a
// Spec: AIT-ST7, AIT-ST7a, AIT-ST7b
end: async (reason: TurnEndReason): Promise<void> => {
logger?.trace('Turn.end();', { turnId, reason });

Expand All @@ -483,7 +516,6 @@ class DefaultServerTransport<TEvent, TMessage> implements ServerTransport<TEvent
error instanceof Ably.ErrorInfo ? error : undefined,
);
logger?.error('Turn.end(); failed to publish turn-end', { turnId });
turnOnError?.(errInfo);
throw errInfo;
} finally {
registeredTurns.delete(turnId);
Expand Down
24 changes: 22 additions & 2 deletions src/core/transport/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,14 @@ export interface StreamResponseOptions {
export interface StreamResult {
/** Why the stream ended. */
reason: TurnEndReason;
/**
* The error that caused the stream to fail, present when `reason` is
* `'error'`. This is the original error (e.g. from the LLM provider)
* preserved so the caller can inspect provider-specific fields. The
* turn's `onError` callback also fires with a wrapped `Ably.ErrorInfo`
* (code `StreamError`) for standardized observability.
*/
error?: Error;
}

/** Options passed to newTurn for configuring the turn lifecycle. */
Expand Down Expand Up @@ -154,8 +162,20 @@ export interface NewTurnOptions<TEvent> {
onCancel?: (request: CancelRequest) => Promise<boolean>;

/**
* Called with non-fatal errors scoped to this turn. Examples: turn-start
* publish failure, encoder recovery failure, stream encoding errors.
* Called with non-fatal turn-scoped errors that have no other delivery
* path. Fires in two scenarios:
* - Stream failures in `streamResponse` — the underlying error is also
* returned on `StreamResult.error`, but this callback delivers it
* wrapped as an `Ably.ErrorInfo` (code `StreamError`) for standardized
* observability.
* - Failures in the `onCancel` handler.
*
* Publish failures in `start`, `addMessages`, `addEvents`, and `end`
* are not delivered here — those methods reject their returned promise
* with an `Ably.ErrorInfo`, and the caller should handle it at the await
* site. Turn errors never render the transport unusable, but the turn
* may be in an inconsistent state; the caller should typically `end` it
* with reason `'error'`.
*/
onError?: (error: Ably.ErrorInfo) => void;

Expand Down
9 changes: 8 additions & 1 deletion src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export enum ErrorCode {
CancelListenerError = 104002,

/**
* A turn lifecycle event (turn-start or turn-end) failed to publish.
* A publish within a turn failed (lifecycle event, message, or event).
*/
TurnLifecycleError = 104003,

Expand All @@ -59,6 +59,13 @@ export enum ErrorCode {
* (not ATTACHED or ATTACHING).
*/
ChannelNotReady = 104007,

/**
* An error occurred while piping a response stream to the channel — either
* the source event stream threw (e.g. LLM provider rate limit, model error,
* network failure) or an underlying publish failed mid-stream.
*/
StreamError = 104008,
}

/**
Expand Down
52 changes: 52 additions & 0 deletions test/core/transport/pipe-stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -258,5 +258,57 @@ describe('pipeStream', () => {
const result = await pipeStream(stream, encoder, noSignal);
expect(result.reason).toBe('error');
});

it('includes the caught error in StreamResult', async () => {
const originalError = new Error('model rate limit exceeded');
const stream = errorStream([], originalError);

const result = await pipeStream(stream, encoder, noSignal);

expect(result.reason).toBe('error');
expect(result.error).toBe(originalError);
});

it('wraps non-Error throws as Error in StreamResult', async () => {
// A stream that throws a non-Error value
const stream = new ReadableStream<TestEvent>({
start: (controller) => {
controller.error('string error');
},
});

const result = await pipeStream(stream, encoder, noSignal);

expect(result.reason).toBe('error');
expect(result.error).toBeInstanceOf(Error);
expect(result.error?.message).toBe('string error');
});
});

describe('StreamResult.error absence', () => {
it('is undefined when stream completes', async () => {
const stream = streamOf({ type: 'text', text: 'done' });

const result = await pipeStream(stream, encoder, noSignal);

expect(result.reason).toBe('complete');
expect(result.error).toBeUndefined();
});

it('is undefined when stream is cancelled', async () => {
const controller = new AbortController();
controller.abort();

const stream = new ReadableStream<TestEvent>({
start: () => {
/* paused */
},
});

const result = await pipeStream(stream, encoder, controller.signal);

expect(result.reason).toBe('cancelled');
expect(result.error).toBeUndefined();
});
});
});
Loading
Loading