diff --git a/.changeset/agents-runtime-sigint-abort.md b/.changeset/agents-runtime-sigint-abort.md new file mode 100644 index 0000000000..f0051257ce --- /dev/null +++ b/.changeset/agents-runtime-sigint-abort.md @@ -0,0 +1,5 @@ +--- +"@electric-ax/agents-runtime": patch +--- + +Settle interrupted agent runs promptly when the model stream ignores abort completion, while preserving aborted run context ordering. diff --git a/packages/agents-runtime/src/pi-adapter.ts b/packages/agents-runtime/src/pi-adapter.ts index b554e4b224..b72375a74e 100644 --- a/packages/agents-runtime/src/pi-adapter.ts +++ b/packages/agents-runtime/src/pi-adapter.ts @@ -393,9 +393,16 @@ export function createPiAgentAdapter( return new Promise((resolve, reject) => { let settled = false let unsubscribe = (): void => {} + let abortFallback: ReturnType | null = null + const clearAbortFallback = (): void => { + if (!abortFallback) return + clearTimeout(abortFallback) + abortFallback = null + } const finish = (finishReason: `stop` | `aborted` | `error`): void => { if (settled) return settled = true + clearAbortFallback() running = false abortSignal?.removeEventListener(`abort`, abortRun) unsubscribe() @@ -405,14 +412,21 @@ export function createPiAgentAdapter( if (settled) return abortedRun = true agent.abort() - finish(`aborted`) - resolve() + + // Let pi-agent-core settle synchronous abort events first. If the + // provider/tool ignores AbortSignal and emits nothing, close the + // run on the next macrotask so callers are not left waiting. + abortFallback ??= setTimeout(() => { + finish(`aborted`) + resolve() + }, 0) } unsubscribe = processAgentEvents( () => { if (settled) return settled = true running = false + clearAbortFallback() abortSignal?.removeEventListener(`abort`, abortRun) unsubscribe() resolve() @@ -433,6 +447,7 @@ export function createPiAgentAdapter( Promise.resolve(runPromise).catch((err: Error) => { if (settled) return + if (abortedRun) return finish(`error`) reject(err) }) diff --git a/packages/agents-runtime/src/timeline-context.ts b/packages/agents-runtime/src/timeline-context.ts index 95cd7c0e6d..cb59a082ab 100644 --- a/packages/agents-runtime/src/timeline-context.ts +++ b/packages/agents-runtime/src/timeline-context.ts @@ -253,10 +253,65 @@ function materializeRunItem(run: IncludesRun): TimelineItem { return { kind: `run`, at: orderToOffset(run.order), + ...(run.finish_reason ? { finishReason: run.finish_reason } : {}), items: items.map(({ order: _order, ...item }) => item), } } +function isAbortSignalEntry(entry: { + kind: string + item: unknown +}): entry is { kind: `signal`; item: IncludesSignal } { + if (entry.kind !== `signal`) return false + const signal = entry.item as IncludesSignal + return signal.signal === `SIGINT` && signal.outcome === `aborted` +} + +function isAbortedRunEntry(entry: { + kind: string + item: unknown +}): entry is { kind: `run`; item: IncludesRun } { + return ( + entry.kind === `run` && + (entry.item as IncludesRun).finish_reason === `aborted` + ) +} + +function reorderInterruptedRuns( + sortedItems: Array +): Array { + const items = [...sortedItems] + + for (let signalIndex = 0; signalIndex < items.length; signalIndex++) { + const signalEntry = items[signalIndex] + if (!signalEntry || !isAbortSignalEntry(signalEntry)) continue + + // Runtime output is buffered through its producer, while SIGINT is appended + // directly by the server. Under interruption the signal can therefore get a + // lower stream offset than the aborted run it interrupted. For model-facing + // context, present the interrupted assistant run before the interrupt marker + // so the next user message is interpreted against a coherent transcript. + let runIndex = -1 + for (let index = signalIndex + 1; index < items.length; index++) { + const candidate = items[index] + if (!candidate) continue + if (isAbortSignalEntry(candidate)) break + if (isAbortedRunEntry(candidate)) { + runIndex = index + break + } + } + + if (runIndex === -1) continue + + const [run] = items.splice(runIndex, 1) + items.splice(signalIndex, 0, run!) + signalIndex++ + } + + return items +} + export function materializeTimeline( data: EntityTimelineData ): Array { @@ -275,40 +330,42 @@ export function materializeTimeline( order: TimelineOrder item: IncludesContextRemoved } - > = [ - ...data.inbox - .filter((item) => (item.status ?? `processed`) === `processed`) - .map((item) => ({ - kind: `inbox` as const, + > = reorderInterruptedRuns( + [ + ...data.inbox + .filter((item) => (item.status ?? `processed`) === `processed`) + .map((item) => ({ + kind: `inbox` as const, + order: item.order, + item, + })), + ...data.wakes.map((item) => ({ + kind: `wake` as const, order: item.order, item, })), - ...data.wakes.map((item) => ({ - kind: `wake` as const, - order: item.order, - item, - })), - ...(data.signals ?? []).map((item) => ({ - kind: `signal` as const, - order: item.order, - item, - })), - ...data.runs.map((item) => ({ - kind: `run` as const, - order: item.order, - item, - })), - ...data.contextInserted.map((item) => ({ - kind: `context_inserted` as const, - order: item.order, - item, - })), - ...data.contextRemoved.map((item) => ({ - kind: `context_removed` as const, - order: item.order, - item, - })), - ].sort((left, right) => compareTimelineOrders(left.order, right.order)) + ...(data.signals ?? []).map((item) => ({ + kind: `signal` as const, + order: item.order, + item, + })), + ...data.runs.map((item) => ({ + kind: `run` as const, + order: item.order, + item, + })), + ...data.contextInserted.map((item) => ({ + kind: `context_inserted` as const, + order: item.order, + item, + })), + ...data.contextRemoved.map((item) => ({ + kind: `context_removed` as const, + order: item.order, + item, + })), + ].sort((left, right) => compareTimelineOrders(left.order, right.order)) + ) const supersededIds = new Set() const contextSuperseded = new Set() diff --git a/packages/agents-runtime/src/types.ts b/packages/agents-runtime/src/types.ts index f147189199..90e0cd202c 100644 --- a/packages/agents-runtime/src/types.ts +++ b/packages/agents-runtime/src/types.ts @@ -325,6 +325,7 @@ export type TimelineItem = | { kind: `run` at: number + finishReason?: string items: Array< | { kind: `text`; text: string; status: `streaming` | `completed` } | { diff --git a/packages/agents-runtime/test/timeline-context.test.ts b/packages/agents-runtime/test/timeline-context.test.ts index b3f88dc883..3400b95e16 100644 --- a/packages/agents-runtime/test/timeline-context.test.ts +++ b/packages/agents-runtime/test/timeline-context.test.ts @@ -294,6 +294,67 @@ describe(`timeline context`, () => { ]) }) + it(`places an interrupted run before the SIGINT that raced ahead of it`, () => { + expect( + buildTimelineMessages({ + inbox: [ + { + key: `msg-1`, + order: order(1), + from: `user`, + payload: `start`, + timestamp: `2026-03-28T00:00:00.000Z`, + }, + { + key: `msg-2`, + order: order(4), + from: `user`, + payload: `continue`, + timestamp: `2026-03-28T00:00:03.000Z`, + }, + ], + signals: [ + { + key: `sig-1`, + order: order(2), + signal: `SIGINT`, + status: `handled`, + timestamp: `2026-03-28T00:00:02.000Z`, + outcome: `aborted`, + }, + ], + runs: [ + { + key: `run-1`, + order: order(3), + status: `completed`, + finish_reason: `aborted`, + texts: [ + { + key: `text-1`, + run_id: `run-1`, + order: order(5), + status: `completed`, + text: `partial response`, + }, + ], + toolCalls: [], + steps: [], + errors: [], + }, + ], + }) + ).toEqual([ + { role: `user`, content: `start` }, + { role: `assistant`, content: `partial response` }, + { + role: `user`, + content: `The active handler invocation was interrupted.`, + }, + { role: `user`, content: `continue` }, + ]) + }) + it(`timelineToMessages reads the shared entity timeline shape from the db`, () => { const db = { collections: {