Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions packages/agents-runtime/src/pi-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -393,9 +393,16 @@ export function createPiAgentAdapter(
return new Promise<void>((resolve, reject) => {
let settled = false
let unsubscribe = (): void => {}
let abortFallback: ReturnType<typeof setTimeout> | null = null
const clearAbortFallback = (): void => {
if (!abortFallback) return
clearTimeout(abortFallback)
abortFallback = null
}
const finish = (finishReason: `stop` | `aborted` | `error`): void => {
if (settled) return
settled = true
clearAbortFallback()
running = false
abortSignal?.removeEventListener(`abort`, abortRun)
unsubscribe()
Expand All @@ -405,14 +412,22 @@ export function createPiAgentAdapter(
if (settled) return
abortedRun = true
agent.abort()
finish(`aborted`)
resolve()

// Let pi-agent-core settle its own abort lifecycle. It normally
// emits an aborted message/agent_end sequence, which we want to
// persist so the next context contains a coherent interrupted run.
// Keep a short fallback in case a provider/tool ignores AbortSignal.
abortFallback ??= setTimeout(() => {
finish(`aborted`)
resolve()
}, 5000)
}
unsubscribe = processAgentEvents(
() => {
if (settled) return
settled = true
running = false
clearAbortFallback()
abortSignal?.removeEventListener(`abort`, abortRun)
unsubscribe()
resolve()
Expand All @@ -433,6 +448,7 @@ export function createPiAgentAdapter(

Promise.resolve(runPromise).catch((err: Error) => {
if (settled) return
if (abortedRun) return
finish(`error`)
reject(err)
})
Expand Down
119 changes: 88 additions & 31 deletions packages/agents-runtime/src/timeline-context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,65 @@ function materializeRunItem(run: IncludesRun): TimelineItem {
return {
kind: `run`,
at: orderToOffset(run.order),
...(run.finish_reason ? { finishReason: run.finish_reason } : {}),
items: items.map(({ order: _order, ...item }) => item),
}
}

function isAbortSignalEntry(entry: {
kind: string
item: unknown
}): entry is { kind: `signal`; item: IncludesSignal } {
if (entry.kind !== `signal`) return false
const signal = entry.item as IncludesSignal
return signal.signal === `SIGINT` && signal.outcome === `aborted`
}

function isAbortedRunEntry(entry: {
kind: string
item: unknown
}): entry is { kind: `run`; item: IncludesRun } {
return (
entry.kind === `run` &&
(entry.item as IncludesRun).finish_reason === `aborted`
)
}

function reorderInterruptedRuns<T extends { kind: string; item: unknown }>(
sortedItems: Array<T>
): Array<T> {
const items = [...sortedItems]

for (let signalIndex = 0; signalIndex < items.length; signalIndex++) {
const signalEntry = items[signalIndex]
if (!signalEntry || !isAbortSignalEntry(signalEntry)) continue

// Runtime output is buffered through its producer, while SIGINT is appended
// directly by the server. Under interruption the signal can therefore get a
// lower stream offset than the aborted run it interrupted. For model-facing
// context, present the interrupted assistant run before the interrupt marker
// so the next user message is interpreted against a coherent transcript.
let runIndex = -1
for (let index = signalIndex + 1; index < items.length; index++) {
const candidate = items[index]
if (!candidate) continue
if (isAbortSignalEntry(candidate)) break
if (isAbortedRunEntry(candidate)) {
runIndex = index
break
}
}

if (runIndex === -1) continue

const [run] = items.splice(runIndex, 1)
items.splice(signalIndex, 0, run!)
signalIndex++
}

return items
}

export function materializeTimeline(
data: EntityTimelineData
): Array<TimelineItem> {
Expand All @@ -275,40 +330,42 @@ export function materializeTimeline(
order: TimelineOrder
item: IncludesContextRemoved
}
> = [
...data.inbox
.filter((item) => (item.status ?? `processed`) === `processed`)
.map((item) => ({
kind: `inbox` as const,
> = reorderInterruptedRuns(
[
...data.inbox
.filter((item) => (item.status ?? `processed`) === `processed`)
.map((item) => ({
kind: `inbox` as const,
order: item.order,
item,
})),
...data.wakes.map((item) => ({
kind: `wake` as const,
order: item.order,
item,
})),
...data.wakes.map((item) => ({
kind: `wake` as const,
order: item.order,
item,
})),
...(data.signals ?? []).map((item) => ({
kind: `signal` as const,
order: item.order,
item,
})),
...data.runs.map((item) => ({
kind: `run` as const,
order: item.order,
item,
})),
...data.contextInserted.map((item) => ({
kind: `context_inserted` as const,
order: item.order,
item,
})),
...data.contextRemoved.map((item) => ({
kind: `context_removed` as const,
order: item.order,
item,
})),
].sort((left, right) => compareTimelineOrders(left.order, right.order))
...(data.signals ?? []).map((item) => ({
kind: `signal` as const,
order: item.order,
item,
})),
...data.runs.map((item) => ({
kind: `run` as const,
order: item.order,
item,
})),
...data.contextInserted.map((item) => ({
kind: `context_inserted` as const,
order: item.order,
item,
})),
...data.contextRemoved.map((item) => ({
kind: `context_removed` as const,
order: item.order,
item,
})),
].sort((left, right) => compareTimelineOrders(left.order, right.order))
)

const supersededIds = new Set<string>()
const contextSuperseded = new Set<string>()
Expand Down
1 change: 1 addition & 0 deletions packages/agents-runtime/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ export type TimelineItem =
| {
kind: `run`
at: number
finishReason?: string
items: Array<
| { kind: `text`; text: string; status: `streaming` | `completed` }
| {
Expand Down
61 changes: 61 additions & 0 deletions packages/agents-runtime/test/timeline-context.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,67 @@ describe(`timeline context`, () => {
])
})

it(`places an interrupted run before the SIGINT that raced ahead of it`, () => {
expect(
buildTimelineMessages({
inbox: [
{
key: `msg-1`,
order: order(1),
from: `user`,
payload: `start`,
timestamp: `2026-03-28T00:00:00.000Z`,
},
{
key: `msg-2`,
order: order(4),
from: `user`,
payload: `continue`,
timestamp: `2026-03-28T00:00:03.000Z`,
},
],
signals: [
{
key: `sig-1`,
order: order(2),
signal: `SIGINT`,
status: `handled`,
timestamp: `2026-03-28T00:00:02.000Z`,
outcome: `aborted`,
},
],
runs: [
{
key: `run-1`,
order: order(3),
status: `completed`,
finish_reason: `aborted`,
texts: [
{
key: `text-1`,
run_id: `run-1`,
order: order(5),
status: `completed`,
text: `partial response`,
},
],
toolCalls: [],
steps: [],
errors: [],
},
],
})
).toEqual([
{ role: `user`, content: `start` },
{ role: `assistant`, content: `partial response` },
{
role: `user`,
content: `<agent_signal signal="SIGINT" status="handled" timestamp="2026-03-28T00:00:02.000Z" outcome="aborted">The active handler invocation was interrupted.</agent_signal>`,
},
{ role: `user`, content: `continue` },
])
})

it(`timelineToMessages reads the shared entity timeline shape from the db`, () => {
const db = {
collections: {
Expand Down
Loading