From 0003ade3468aa73b50f3c0ccfe732f17d3d4913a Mon Sep 17 00:00:00 2001 From: "haozhe.yang" Date: Thu, 4 Jun 2026 09:14:20 +0800 Subject: [PATCH 1/2] refactor(acp-adapter): boundary-inject AcpKaos and fix FS bridge semantics - Replace per-prompt runWithKaos with boundary-injection at session creation\n- Fix readBytes to bypass ACP text RPC for binary-safe reads via inner\n- Fix readLines to preserve \n terminators matching LocalKaos\n- Fix writeText append mode to only swallow not-found errors\n- Filter subagent events so child agents don't settle the parent prompt\n- Update SDK createSession/resumeSession to accept optional id and kaos --- .changeset/acp-fs-and-subagent-fixes.md | 7 + packages/acp-adapter/src/kaos-acp.ts | 79 ++-- packages/acp-adapter/src/server.ts | 63 +++- packages/acp-adapter/src/session.ts | 101 +++-- packages/acp-adapter/test/e2e-fs.test.ts | 150 ++++---- packages/acp-adapter/test/kaos-acp.test.ts | 96 +++-- .../acp-adapter/test/kaos-activation.test.ts | 348 +++++++----------- packages/acp-adapter/test/session-new.test.ts | 33 +- .../acp-adapter/test/session-prompt.test.ts | 70 ++++ packages/agent-core/src/rpc/core-impl.ts | 20 +- packages/node-sdk/src/kimi-harness.ts | 13 +- packages/node-sdk/src/rpc.ts | 17 + packages/node-sdk/src/sdk-rpc-client.ts | 30 +- packages/node-sdk/src/types.ts | 5 +- 14 files changed, 606 insertions(+), 426 deletions(-) create mode 100644 .changeset/acp-fs-and-subagent-fixes.md diff --git a/.changeset/acp-fs-and-subagent-fixes.md b/.changeset/acp-fs-and-subagent-fixes.md new file mode 100644 index 000000000..165964c59 --- /dev/null +++ b/.changeset/acp-fs-and-subagent-fixes.md @@ -0,0 +1,7 @@ +--- +"@moonshot-ai/acp-adapter": patch +"@moonshot-ai/agent-core": patch +"@moonshot-ai/node-sdk": patch +--- + +Fix ACP-bridged file reads for binary content and line terminators, prevent silent overwrites on append-mode permission errors, and stop subagent events from interfering with parent prompt resolution. diff --git a/packages/acp-adapter/src/kaos-acp.ts b/packages/acp-adapter/src/kaos-acp.ts index d8ca1ae9f..db7a64dee 100644 --- a/packages/acp-adapter/src/kaos-acp.ts +++ b/packages/acp-adapter/src/kaos-acp.ts @@ -19,6 +19,7 @@ import { Buffer } from 'node:buffer'; import type { AgentSideConnection } from '@agentclientprotocol/sdk'; +import { RequestError } from '@agentclientprotocol/sdk'; import { KaosError, type Environment, @@ -130,33 +131,22 @@ export class AcpKaos implements Kaos { } /** - * Read up to `n` bytes from the file. Implemented as - * `readText → utf8 encode → slice` because ACP only exposes string - * content. Callers that store non-text data through this path - * (uncommon) will get re-encoded bytes — acceptable per `Kaos.readBytes` - * which already permits encoding-dependent return values. + * Binary reads bypass the ACP text RPC by design: `fs/readTextFile` + * returns a decoded string and would corrupt or reject non-UTF-8 + * payloads (images, video, archives — anything `ReadMediaFile` may + * touch). The ACP bridge only owns the *text* surface; raw bytes + * stay on the local filesystem via `inner`. */ - async readBytes(path: string, n?: number): Promise { - const text = await this.readText(path); - const buf = Buffer.from(text, 'utf8'); - return n !== undefined ? buf.subarray(0, n) : buf; + readBytes(path: string, n?: number): Promise { + return this.inner.readBytes(path, n); } /** - * Yield lines from the file. Emulates Python `splitlines(keepends=False)`: - * splits on `\n`, drops the trailing empty token if the file ended with - * a newline, and yields nothing for an empty file. Matches - * {@link LocalKaos.readLines}'s observable output for the trailing-newline - * case (the local version yields `'line\n'` chunks; here we yield without - * the `\n` — see below). - * - * Note on divergence from `LocalKaos.readLines`: the local impl yields - * `'a\n'`, `'b\n'`, `'c'` while this impl yields `'a'`, `'b'`, `'c'`. - * The interface JSDoc says only "Yield lines from the file at `path` - * one by one" without pinning trailing-newline semantics, so both - * shapes satisfy it. Tools that depend on the trailing-newline (rare) - * should adapt. The Python reference's ACP backend does not implement - * `readLines` separately either. + * Yield lines from the file, each terminated by its `\n` (the final + * line has no terminator if the file did not end with `\n`). Matches + * {@link LocalKaos.readLines} so tools that depend on line terminators + * (e.g. {@link ReadTool}, which renders CRLF endings) behave identically + * whether the underlying Kaos is local or ACP-bridged. */ async *readLines( path: string, @@ -167,7 +157,7 @@ export class AcpKaos implements Kaos { let start = 0; for (let i = 0; i < text.length; i++) { if (text.charCodeAt(i) === 0x0a /* \n */) { - yield text.slice(start, i); + yield text.slice(start, i + 1); start = i + 1; } } @@ -181,9 +171,11 @@ export class AcpKaos implements Kaos { * always UTF-8 string content. `mode: 'a'` (append) emulates with a * read-then-write fallback: ACP has no native append, and the * intended audience (unsaved-buffer scratchpads) rarely needs it. - * If the prior read fails (e.g. file missing), the write proceeds - * as if the existing content were empty — matching Python `open('a')` - * which also creates new files. + * If the prior read fails because the file does not exist, the write + * proceeds as if the existing content were empty — matching Python + * `open('a')` which creates new files. Any other read failure + * (permission, transport, internal) propagates so we never silently + * destroy existing content. * * Returns `data.length` (chars) to match {@link LocalKaos.writeText}'s * contract. @@ -197,8 +189,8 @@ export class AcpKaos implements Kaos { let existing = ''; try { existing = await this.readText(path); - } catch { - // ENOENT-style failure → treat as empty (mirrors Python open('a')). + } catch (err) { + if (!isNotFoundError(err)) throw err; existing = ''; } await this.acpWrite(path, existing + data); @@ -254,3 +246,32 @@ function wrapKaosError(prefix: string, cause: unknown): KaosError { (err as Error & { cause?: unknown }).cause = cause; return err; } + +/** + * Return true iff `err` looks like a "file does not exist" failure on + * the read side of an ACP append-mode write. We recognize: + * - `RequestError` with code `-32002` (the ACP SDK's `resourceNotFound`). + * - The `KaosError` wrapper around such a `RequestError` (which `readText` + * above produces) — unwrap via `.cause`. + * - A loose "not found" / "no such file" string match as a last resort, + * for clients that synthesize errors without using the SDK helpers. + * + * Permission denials, transport errors, and anything else propagate so + * append never silently overwrites an existing file. + */ +function isNotFoundError(err: unknown): boolean { + const visited = new Set(); + let cur: unknown = err; + while (cur !== undefined && cur !== null && !visited.has(cur)) { + visited.add(cur); + if (cur instanceof RequestError && cur.code === -32002) return true; + if (cur instanceof Error) { + const msg = cur.message.toLowerCase(); + if (msg.includes('not found') || msg.includes('no such file')) return true; + cur = (cur as Error & { cause?: unknown }).cause; + continue; + } + break; + } + return false; +} diff --git a/packages/acp-adapter/src/server.ts b/packages/acp-adapter/src/server.ts index 709a2eec1..c3218c9a5 100644 --- a/packages/acp-adapter/src/server.ts +++ b/packages/acp-adapter/src/server.ts @@ -7,6 +7,7 @@ */ import { Readable, Writable } from 'node:stream'; +import { randomUUID } from 'node:crypto'; import { AgentSideConnection, @@ -44,9 +45,11 @@ import { } from '@agentclientprotocol/sdk'; import type { KimiHarness, Session, SessionSummary } from '@moonshot-ai/kimi-code-sdk'; import { log } from '@moonshot-ai/kimi-code-sdk'; +import { LocalKaos, type Kaos } from '@moonshot-ai/kaos'; import { TERMINAL_AUTH_METHOD, buildTerminalAuthMethod } from './auth-methods'; import { redirectConsoleToStderr } from './log-guard'; +import { AcpKaos } from './kaos-acp'; import { AcpSession, type TelemetryTrackFn } from './session'; import { buildSessionConfigOptions } from './config-options'; import { availableCommandsUpdateNotification } from './events-map'; @@ -85,6 +88,13 @@ export class AcpServer implements Agent { private readonly agentInfo: Implementation | undefined; private readonly terminalAuthEnv: Readonly> | undefined; private readonly terminalAuthLegacyCommand: string | undefined; + /** + * Lazily-built inner {@link Kaos} (a {@link LocalKaos}) used as the + * delegate target for every {@link AcpKaos} this server hands out. + * One per server (not per session) so we don't re-probe the + * environment for every `session/new` call. + */ + private innerKaos: Kaos | undefined = undefined; constructor( private readonly harness: KimiHarness, @@ -184,13 +194,6 @@ export class AcpServer implements Agent { // if the SDK ever switches from spread-passthrough to explicit field // copy, this line breaks and we revisit the boundary. const mcpServers = acpMcpServersToConfigs(params.mcpServers); - const session = await this.harness.createSession({ - workDir: params.cwd, - // @ts-expect-error — `mcpServers` is a kernel-side extension - // (agent-core `CreateSessionPayload`) the SDK transparently - // forwards via spread. See block comment above. - mcpServers, - }); if (!this.conn) { // Defensive: every code path that constructs `AcpServer` (the // runners below, and any test that intends to drive `newSession`) @@ -199,6 +202,23 @@ export class AcpServer implements Agent { // connection mid-stream. throw RequestError.internalError(undefined, 'AcpServer is missing its AgentSideConnection'); } + // Pre-mint the session id so the optional `AcpKaos` (built when the + // client advertised `fs.readTextFile` / `fs.writeTextFile`) carries + // the correct reverse-RPC channel for the same session the kernel + // is about to construct. Boundary injection — the kaos is captured + // by the kernel `SessionImpl` ctor and every tool downstream sees + // the same reference, no AsyncLocalStorage needed. + const sessionId = `session_${randomUUID()}`; + const acpKaos = await this.maybeBuildAcpKaos(sessionId); + const session = await this.harness.createSession({ + id: sessionId, + workDir: params.cwd, + ...(acpKaos !== undefined ? { kaos: acpKaos } : {}), + // @ts-expect-error — `mcpServers` is a kernel-side extension + // (agent-core `CreateSessionPayload`) the SDK transparently + // forwards via spread. See block comment above. + mcpServers, + }); const currentModelId = await this.resolveCurrentModelId(); const currentThinkingEnabled = await this.resolveCurrentThinkingEnabled(); const acpSession = new AcpSession( @@ -363,10 +383,12 @@ export class AcpServer implements Agent { // `resumeSession` spreads `input` so unknown fields ride to the // kernel. const mcpServers = acpMcpServersToConfigs(params.mcpServers); + const acpKaos = await this.maybeBuildAcpKaos(params.sessionId); let session: Session; try { session = await this.harness.resumeSession({ id: params.sessionId, + ...(acpKaos !== undefined ? { kaos: acpKaos } : {}), // @ts-expect-error — see block comment above; mcpServers is a // kernel-only field that the SDK forwards via spread. mcpServers, @@ -427,6 +449,33 @@ export class AcpServer implements Agent { return { session, acpSession, configOptions }; } + /** + * Build an {@link AcpKaos} for a given session id if (and only if) + * the client advertised any FS reverse-RPC capability. Returns + * `undefined` otherwise — the caller then omits the `kaos` field + * from `harness.createSession`/`resumeSession`, leaving the kernel + * to fall back to its process-wide {@link LocalKaos}. + * + * The inner {@link LocalKaos} is built lazily on the first capable + * session and cached on `this.innerKaos`; subsequent sessions reuse + * it. The resulting {@link AcpKaos} is captured by the kernel + * `SessionImpl` ctor and every tool downstream sees the same + * reference — no AsyncLocalStorage involved. + */ + private async maybeBuildAcpKaos(sessionId: string): Promise { + const fs = this.clientCapabilities?.fs; + if (!fs?.readTextFile && !fs?.writeTextFile) { + return undefined; + } + if (!this.conn) { + return undefined; + } + if (!this.innerKaos) { + this.innerKaos = await LocalKaos.create(); + } + return new AcpKaos(this.conn, sessionId, this.innerKaos); + } + /** * Re-check whether the on-disk token is usable; does NOT trigger an * actual OAuth flow. The stdio JSON-RPC channel has no TTY to render diff --git a/packages/acp-adapter/src/session.ts b/packages/acp-adapter/src/session.ts index a281ab103..0c95c02a1 100644 --- a/packages/acp-adapter/src/session.ts +++ b/packages/acp-adapter/src/session.ts @@ -7,7 +7,6 @@ import { type PromptResponse, type SessionModeId, } from '@agentclientprotocol/sdk'; -import { LocalKaos, runWithKaos, type Kaos } from '@moonshot-ai/kaos'; import { ErrorCodes, log, @@ -29,7 +28,6 @@ import { } from './approval'; import { buildSessionConfigOptions } from './config-options'; import { acpBlocksToPromptParts } from './convert'; -import { AcpKaos } from './kaos-acp'; import { acpToolCallId, assistantDeltaToSessionUpdate, @@ -85,15 +83,6 @@ export class AcpSession { */ private currentTurnId: number | undefined = undefined; - /** - * Lazily-built inner {@link Kaos} (a {@link LocalKaos}) that the - * per-prompt {@link AcpKaos} wraps. Cached on the session so we don't - * re-probe the environment on every prompt. Built lazily because the - * majority of sessions (those whose client does not advertise the FS - * capability) never need it. - */ - private innerKaos: Kaos | undefined = undefined; - /** * The adapter-side authoritative current BASE model id (no * `,thinking` suffix) for the `configOptions` model picker (PLAN D11). @@ -450,7 +439,7 @@ export class AcpSession { * batch — completion ordering is what tells the caller (`loadSession`) * that the response is safe to return. */ - async replayHistory(agentId: string = 'main'): Promise { + async replayHistory(agentId: string = MAIN_AGENT_ID): Promise { const sessionId = this.id; const conn = this.conn; const resumeState = this.session.getResumeState?.(); @@ -659,52 +648,13 @@ export class AcpSession { const sessionId = this.id; const conn = this.conn; - // Decide whether to bridge file I/O through ACP for this prompt. - // We honor the client's advertised capability surface only — - // unsupported clients silently fall back to `LocalKaos`, which keeps - // the rest of the codebase unaware of Phase 6. - // - // `runWithKaos` (NOT `setCurrentKaos`) is the correct primitive: - // `enterWith` persists for the rest of the current async context - // with no proper restore semantics, so concurrent prompts on the - // same process would step on each other. `run(...)` scopes the - // binding to the callback's async subtree. - const acpKaos = await this.maybeBuildAcpKaos(); - if (!acpKaos) { - return this.runPromptBody(parts, sessionId, conn); - } - return runWithKaos(acpKaos, () => this.runPromptBody(parts, sessionId, conn)); + return this.runPromptBody(parts, sessionId, conn); } /** - * Build an {@link AcpKaos} for this prompt if (and only if) the - * client advertised any FS reverse-RPC capability. Returns - * `undefined` otherwise — the caller then runs the prompt body in - * whatever Kaos was active before (typically the process-wide - * `LocalKaos` set up at startup). - * - * The inner {@link LocalKaos} is built lazily on the first capable - * prompt and cached on the instance (`this.innerKaos`); subsequent - * prompts reuse it. - */ - private async maybeBuildAcpKaos(): Promise { - const fs = this.clientCapabilities?.fs; - if (!fs?.readTextFile && !fs?.writeTextFile) { - return undefined; - } - if (!this.innerKaos) { - this.innerKaos = await LocalKaos.create(); - } - return new AcpKaos(this.conn, this.id, this.innerKaos); - } - - /** - * The pre-Phase-6 body of {@link prompt}, extracted verbatim so that - * the new `runWithKaos` wrapper can apply uniformly to capable - * clients while non-capable clients hit the same code path with no - * wrapping. Splitting it out (rather than inlining the if/else twice) - * keeps the event-listener invariants — single `onEvent` subscription, - * `settled` flag semantics, `currentTurnId` reset — in one place. + * Body of {@link prompt}, extracted so the event-listener invariants + * — single `onEvent` subscription, `settled` flag semantics, + * `currentTurnId` reset — live in one place. */ private runPromptBody( parts: ReturnType, @@ -713,6 +663,8 @@ export class AcpSession { ): Promise { return new Promise((resolve, reject) => { let settled = false; + const isFromMainAgent = (event: { agentId?: string }): boolean => + event.agentId === undefined || event.agentId === MAIN_AGENT_ID; // Per-tool-call streaming args accumulator. Lives in the Promise // executor closure so each `prompt()` invocation gets its own // map and no state leaks across concurrent or sequential turns. @@ -746,9 +698,37 @@ export class AcpSession { // tool card the client already rendered. This branch is purely // additive: it runs before the existing dispatch and never // returns, so the if-chain below behaves exactly as in Phase 4. - if ('turnId' in event && typeof event.turnId === 'number') { + // Subagent turn events carry their own `turnId`; filtering on + // `agentId` keeps `currentTurnId` aligned with the parent turn + // that the approval prompt actually belongs to. + if ( + 'turnId' in event && + typeof event.turnId === 'number' && + isFromMainAgent(event) + ) { this.currentTurnId = event.turnId; } + if (event.type === 'error') { + if (settled) return; + if (!isFromMainAgent(event)) return; + if (event.code !== ErrorCodes.TURN_AGENT_BUSY) return; + settled = true; + argsByToolCall.clear(); + startedToolCalls.clear(); + this.currentTurnId = undefined; + unsub(); + log.warn('acp: prompt rejected because another turn is active', { + sessionId, + details: event.details, + }); + reject( + RequestError.invalidRequest( + { code: event.code, details: event.details }, + event.message, + ), + ); + return; + } if (event.type === 'assistant.delta') { // `sessionUpdate` is itself async (it serializes onto the // ndjson stream). The text deltas form a strictly ordered @@ -903,6 +883,7 @@ export class AcpSession { } if (event.type === 'turn.ended') { if (settled) return; + if (!isFromMainAgent(event)) return; settled = true; if (event.reason === 'failed') { // Failures bubble up via the SDK `error` payload. Phase 11.1 @@ -1206,6 +1187,14 @@ function authRequiredFromUnknown(err: unknown): RequestError | undefined { const THINKING_ON_LEVEL = 'high'; const THINKING_OFF_LEVEL = 'off'; +/** + * Identifier the agent-core session emits for the main (user-facing) + * agent. Subagents are issued generated ids by `Session.spawnAgent`; + * filtering on this constant keeps `turn.ended` / `error` events from a + * child agent from settling the parent's `session/prompt` promise. + */ +const MAIN_AGENT_ID = 'main'; + /** * Parse a tool call's `arguments` field (kosong wire format: a JSON * string or `null`) into the structured object expected by the live diff --git a/packages/acp-adapter/test/e2e-fs.test.ts b/packages/acp-adapter/test/e2e-fs.test.ts index f17eae7db..838735596 100644 --- a/packages/acp-adapter/test/e2e-fs.test.ts +++ b/packages/acp-adapter/test/e2e-fs.test.ts @@ -1,7 +1,7 @@ /** * End-to-end test for the FS reverse-RPC bridge. * - * Wire shape under test (the integration that Phases 6.1 + 6.2 unlock): + * Wire shape under test: * * ┌────────┐ fs/readTextFile (RPC) ┌────────┐ * │ client │ ───────────────────────► │ agent │ @@ -11,15 +11,14 @@ * │ kaos │ * └────────┘ * - * The test drives a real `ClientSideConnection`+`AgentSideConnection` - * pair over an in-memory ndjson stream, advertising - * `clientCapabilities.fs.readTextFile = true` so the agent activates - * `AcpKaos`. A mock harness's `Session.prompt` calls - * `getCurrentKaos().readText('/path/x.ts')` inside its body — exactly - * what a real Read tool would do — and emits the returned content as - * an `assistant.delta`. We assert that the client's `readTextFile` - * handler was invoked with the expected path AND that the assistant - * chunk carrying the unsaved-buffer content reached the client. + * Boundary-injection model: when the client advertises + * `clientCapabilities.fs.readTextFile`, `AcpServer.newSession` builds + * an {@link AcpKaos} and threads it into `harness.createSession({ kaos })`. + * In the real stack the kernel `SessionImpl` ctor captures that kaos + * and every tool (Read / Write / Edit / Grep / Glob / Bash) sees the + * same reference. The harness stub here mimics that capture by + * forwarding the supplied kaos into the fake Session's `prompt` body — + * exactly what a real Read tool would consult. */ import { @@ -36,7 +35,7 @@ import { type WriteTextFileRequest, type WriteTextFileResponse, } from '@agentclientprotocol/sdk'; -import { getCurrentKaos } from '@moonshot-ai/kaos'; +import type { Kaos } from '@moonshot-ai/kaos'; import type { Event, KimiHarness, Session } from '@moonshot-ai/kimi-code-sdk'; import { describe, expect, it } from 'vitest'; @@ -54,14 +53,6 @@ function makeInMemoryStreamPair(): { return { agentStream, clientStream }; } -/** - * A `Client` that: - * - Records every `readTextFile` request the agent sends. - * - Returns `unsavedContent` for those requests (the "unsaved buffer" - * payload). - * - Captures every `sessionUpdate` so the test can verify the - * assistant chunks carrying the content reached the client. - */ class UnsavedBufferClient implements Client { readonly readRequests: ReadTextFileRequest[] = []; readonly updates: SessionNotification[] = []; @@ -83,23 +74,24 @@ class UnsavedBufferClient implements Client { } /** - * Build a fake `Session` whose `prompt(parts)` performs a tool-shaped - * action: it pulls `getCurrentKaos()` (the `AcpKaos` the agent wired - * for this prompt), reads `targetPath`, emits the contents as an - * assistant delta, and then ends the turn. This stands in for the - * Read tool inside the SDK loop without dragging the full SDK harness - * into the test. + * Build a fake `Session` whose `prompt` calls `kaos.readText(targetPath)` + * — what a real Read tool would do — and emits the contents as an + * assistant delta. The kaos is supplied at construction time (mirroring + * the kernel `SessionImpl` ctor's capture-on-construction behavior). */ -function makeReadingSession(sessionId: string, targetPath: string): Session { +function makeReadingSession( + sessionId: string, + targetPath: string, + kaos: Kaos | undefined, +): Session { const listeners = new Set<(event: Event) => void>(); return { id: sessionId, prompt: async (_input: unknown) => { - // This call is the FS reverse-RPC trigger — it's what a real - // file-read tool would invoke. The `AcpKaos` activated by - // `AcpSession.prompt` makes this hit the client's - // `readTextFile` handler over the wire. - const content = await getCurrentKaos().readText(targetPath); + if (kaos === undefined) { + throw new Error('kaos missing — boundary injection failed'); + } + const content = await kaos.readText(targetPath); for (const fn of listeners) { fn({ @@ -134,12 +126,16 @@ const textBlock = (text: string): ContentBlock => ({ type: 'text', text }); describe('end-to-end FS reverse-RPC', () => { it('routes a tool-time readText through the client when fs.readTextFile is advertised', async () => { - const sessionId = 'sess-fs-e2e'; const targetPath = '/Users/test/x.ts'; - const session = makeReadingSession(sessionId, targetPath); + let createdSession: Session | undefined; + let capturedSessionId: string | undefined; const harness = { auth: { status: async () => AUTHED_STATUS }, - createSession: async () => session, + createSession: async (options: { id?: string; workDir: string; kaos?: Kaos }) => { + capturedSessionId = options.id ?? 'fallback'; + createdSession = makeReadingSession(capturedSessionId, targetPath, options.kaos); + return createdSession; + }, } as unknown as KimiHarness; const { agentStream, clientStream } = makeInMemoryStreamPair(); @@ -157,30 +153,27 @@ describe('end-to-end FS reverse-RPC', () => { }, }); - await client.newSession({ cwd: '/tmp/x', mcpServers: [] }); + const newSession = await client.newSession({ cwd: '/tmp/x', mcpServers: [] }); const response = await client.prompt({ - sessionId, + sessionId: newSession.sessionId, prompt: [textBlock('read the unsaved file please')], }); expect(response.stopReason).toBe('end_turn'); - // ── Assertion 1: the client saw exactly one fs/readTextFile - // request with the expected path and matching sessionId. + // The client saw exactly one fs/readTextFile request with the + // expected path and matching sessionId. expect(bufferClient.readRequests).toHaveLength(1); expect(bufferClient.readRequests[0]).toMatchObject({ - sessionId, + sessionId: capturedSessionId, path: targetPath, }); // Give the agent a tick to flush the queued sessionUpdate write - // through the ndjson stream (assistant chunks are fire-and-forget - // — see `session.ts` comments). + // through the ndjson stream. await new Promise((resolve) => setTimeout(resolve, 20)); - // ── Assertion 2: the assistant chunk carrying the unsaved-buffer - // content reached the client, proving end-to-end plumbing. const chunkUpdate = bufferClient.updates.find( (u) => u.update.sessionUpdate === 'agent_message_chunk', ); @@ -192,48 +185,37 @@ describe('end-to-end FS reverse-RPC', () => { }); it('does NOT route through the client when no FS capability is advertised', async () => { - // Sanity counterpart: the same wiring without the FS capability - // must fall back to local FS (which would attempt to actually read - // /Users/test/x.ts and fail). We avoid the filesystem touch by - // probing the session-side outcome differently: the prompt body - // now reads a path that DOES exist transiently — we just verify - // that the client never saw a readTextFile request. - const sessionId = 'sess-no-fs-e2e'; + let observedKaos: Kaos | undefined; + let capturedSessionId: string | undefined; - const session: Session = { - id: sessionId, - // `prompt` here does NOT call getCurrentKaos — that path would - // throw / hit local FS, which we don't want in this test. We - // simply end the turn immediately. The point is: with no FS - // capability, the agent must NOT have built an AcpKaos and the - // client must NOT see any readTextFile RPC even if it had been - // called. - prompt: async () => { - // Emit turn.ended directly through the listener that - // session.onEvent registered. - for (const fn of listeners) { - fn({ - type: 'turn.ended', - sessionId, - agentId: 'main', - turnId: 1, - reason: 'completed', - } as Event); - } - }, - cancel: async () => undefined, - onEvent: (fn: (event: Event) => void) => { - listeners.add(fn); - return () => { - listeners.delete(fn); - }; - }, - } as unknown as Session; const listeners = new Set<(event: Event) => void>(); - const harness = { auth: { status: async () => AUTHED_STATUS }, - createSession: async () => session, + createSession: async (options: { id?: string; workDir: string; kaos?: Kaos }) => { + observedKaos = options.kaos; + capturedSessionId = options.id ?? 'fallback'; + return { + id: capturedSessionId, + prompt: async () => { + for (const fn of listeners) { + fn({ + type: 'turn.ended', + sessionId: capturedSessionId, + agentId: 'main', + turnId: 1, + reason: 'completed', + } as Event); + } + }, + cancel: async () => undefined, + onEvent: (fn: (event: Event) => void) => { + listeners.add(fn); + return () => { + listeners.delete(fn); + }; + }, + } as unknown as Session; + }, } as unknown as KimiHarness; const { agentStream, clientStream } = makeInMemoryStreamPair(); @@ -244,20 +226,20 @@ describe('end-to-end FS reverse-RPC', () => { await client.initialize({ protocolVersion: 1, clientCapabilities: { - // Both flags absent — agent must not activate AcpKaos. fs: { readTextFile: false, writeTextFile: false }, terminal: false, }, }); - await client.newSession({ cwd: '/tmp/x', mcpServers: [] }); + const newSession = await client.newSession({ cwd: '/tmp/x', mcpServers: [] }); const response = await client.prompt({ - sessionId, + sessionId: newSession.sessionId, prompt: [textBlock('hi')], }); expect(response.stopReason).toBe('end_turn'); expect(bufferClient.readRequests).toEqual([]); + expect(observedKaos).toBeUndefined(); }); }); diff --git a/packages/acp-adapter/test/kaos-acp.test.ts b/packages/acp-adapter/test/kaos-acp.test.ts index ea3de3e3c..eef219922 100644 --- a/packages/acp-adapter/test/kaos-acp.test.ts +++ b/packages/acp-adapter/test/kaos-acp.test.ts @@ -14,6 +14,7 @@ import type { WriteTextFileRequest, WriteTextFileResponse, } from '@agentclientprotocol/sdk'; +import { RequestError } from '@agentclientprotocol/sdk'; import { KaosError, type Environment, type Kaos, type KaosProcess, type StatResult } from '@moonshot-ai/kaos'; import { describe, expect, it } from 'vitest'; @@ -77,6 +78,7 @@ interface MockInnerKaos extends Kaos { execWithEnvCalls: Array<{ args: string[]; env?: Record }>; readTextCalls: string[]; writeTextCalls: Array<{ path: string; data: string }>; + readBytesCalls: Array<{ path: string; n?: number }>; }; } @@ -96,6 +98,7 @@ function makeMockInner(): MockInnerKaos { execWithEnvCalls: [] as Array<{ args: string[]; env?: Record }>, readTextCalls: [] as string[], writeTextCalls: [] as Array<{ path: string; data: string }>, + readBytesCalls: [] as Array<{ path: string; n?: number }>, }; const inner: MockInnerKaos = { @@ -166,7 +169,11 @@ function makeMockInner(): MockInnerKaos { spy.execWithEnvCalls.push({ args, env }); return {} as KaosProcess; }, - readBytes: async () => Buffer.alloc(0), + readBytes: async (path: string, n?: number) => { + spy.readBytesCalls.push({ path, n }); + const buf = Buffer.from([0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a]); + return n !== undefined ? buf.subarray(0, n) : buf; + }, readText: async (path: string) => { // Used to verify that AcpKaos.readText does NOT fall back to inner. spy.readTextCalls.push(path); @@ -225,25 +232,32 @@ describe('AcpKaos', () => { }); describe('readBytes', () => { - it('returns the first N utf8 bytes of the file content', async () => { + it('delegates to inner.readBytes (binary reads bypass ACP text RPC)', async () => { const conn = makeMockConn({ - readHandler: async () => ({ content: 'abcdef' }), + readHandler: async () => { + throw new Error('ACP readTextFile must NOT be called for binary reads'); + }, }); - const kaos = new AcpKaos(conn.asConn(), 's1', makeMockInner()); + const inner = makeMockInner(); + const kaos = new AcpKaos(conn.asConn(), 's1', inner); - const buf = await kaos.readBytes('/a.ts', 3); + const buf = await kaos.readBytes('/img.png', 4); expect(buf).toBeInstanceOf(Buffer); - expect(buf.toString('utf8')).toBe('abc'); + // The inner stub returns the first 4 bytes of a PNG signature. + expect(Array.from(buf)).toEqual([0x89, 0x50, 0x4e, 0x47]); + expect(inner.__spy.readBytesCalls).toEqual([{ path: '/img.png', n: 4 }]); + // Crucially: nothing went over the ACP wire. + expect(conn.readCalls).toEqual([]); }); - it('returns the full buffer when n is omitted', async () => { - const conn = makeMockConn({ - readHandler: async () => ({ content: 'abcdef' }), - }); - const kaos = new AcpKaos(conn.asConn(), 's1', makeMockInner()); + it('forwards omitted n to inner unchanged', async () => { + const conn = makeMockConn({}); + const inner = makeMockInner(); + const kaos = new AcpKaos(conn.asConn(), 's1', inner); - const buf = await kaos.readBytes('/a.ts'); - expect(buf.toString('utf8')).toBe('abcdef'); + const buf = await kaos.readBytes('/img.png'); + expect(buf.byteLength).toBe(8); + expect(inner.__spy.readBytesCalls).toEqual([{ path: '/img.png', n: undefined }]); }); }); @@ -254,23 +268,31 @@ describe('AcpKaos', () => { return out; } - it('yields each line of "a\\nb\\nc"', async () => { + it('yields each line of "a\\nb\\nc" with terminators preserved', async () => { const conn = makeMockConn({ readHandler: async () => ({ content: 'a\nb\nc' }) }); const kaos = new AcpKaos(conn.asConn(), 's1', makeMockInner()); - expect(await collect(kaos.readLines('/a.ts'))).toEqual(['a', 'b', 'c']); + expect(await collect(kaos.readLines('/a.ts'))).toEqual(['a\n', 'b\n', 'c']); }); it('drops the trailing empty token when the file ends with a newline', async () => { - // "a\nb\n" → ['a', 'b'] (NOT ['a', 'b', '']) + // "a\nb\n" → ['a\n', 'b\n'] (NOT ['a\n', 'b\n', '']) const conn = makeMockConn({ readHandler: async () => ({ content: 'a\nb\n' }) }); const kaos = new AcpKaos(conn.asConn(), 's1', makeMockInner()); - expect(await collect(kaos.readLines('/a.ts'))).toEqual(['a', 'b']); + expect(await collect(kaos.readLines('/a.ts'))).toEqual(['a\n', 'b\n']); }); - it('yields the final line without a trailing newline', async () => { + it('yields the final line without a trailing newline when missing', async () => { const conn = makeMockConn({ readHandler: async () => ({ content: 'a\nb' }) }); const kaos = new AcpKaos(conn.asConn(), 's1', makeMockInner()); - expect(await collect(kaos.readLines('/a.ts'))).toEqual(['a', 'b']); + expect(await collect(kaos.readLines('/a.ts'))).toEqual(['a\n', 'b']); + }); + + it('preserves CRLF carriage returns inside the line terminator', async () => { + // ReadTool depends on this — stripping \n would expose bare \r and + // render visible carriage returns. + const conn = makeMockConn({ readHandler: async () => ({ content: 'a\r\nb\r\n' }) }); + const kaos = new AcpKaos(conn.asConn(), 's1', makeMockInner()); + expect(await collect(kaos.readLines('/a.ts'))).toEqual(['a\r\n', 'b\r\n']); }); it('yields nothing for an empty file', async () => { @@ -304,10 +326,25 @@ describe('AcpKaos', () => { ]); }); - it('append mode treats a missing file (read error) as empty existing content', async () => { + it('append mode treats a resourceNotFound read error as empty existing content', async () => { + const conn = makeMockConn({ + readHandler: async () => { + throw RequestError.resourceNotFound('/missing.ts'); + }, + }); + const kaos = new AcpKaos(conn.asConn(), 's1', makeMockInner()); + const n = await kaos.writeText('/missing.ts', 'fresh', { mode: 'a' }); + expect(n).toBe(5); + expect(conn.writeCalls).toEqual([ + { sessionId: 's1', path: '/missing.ts', content: 'fresh' }, + ]); + }); + + it('append mode also treats loose "not found" message as missing file', async () => { + // Some clients return plain JSON-RPC errors without the SDK helpers. const conn = makeMockConn({ readHandler: async () => { - throw new Error('ENOENT'); + throw new Error('file not found'); }, }); const kaos = new AcpKaos(conn.asConn(), 's1', makeMockInner()); @@ -318,6 +355,23 @@ describe('AcpKaos', () => { ]); }); + it('append mode rethrows non-not-found read errors and does NOT issue a write', async () => { + // Critical regression guard: a permission / transport / internal + // error must NOT be silently treated as "file is empty" — that + // would silently destroy the existing file content. + const conn = makeMockConn({ + readHandler: async () => { + throw RequestError.internalError(undefined, 'transient transport blip'); + }, + }); + const kaos = new AcpKaos(conn.asConn(), 's1', makeMockInner()); + await expect(kaos.writeText('/a.ts', 'new', { mode: 'a' })).rejects.toBeInstanceOf( + KaosError, + ); + // No write happened — the file was preserved on the client side. + expect(conn.writeCalls).toEqual([]); + }); + it('wraps writeTextFile RPC errors in KaosError with cause set', async () => { const rpcErr = new Error('write rpc died'); const conn = makeMockConn({ diff --git a/packages/acp-adapter/test/kaos-activation.test.ts b/packages/acp-adapter/test/kaos-activation.test.ts index 90ffd2781..6317c1bd7 100644 --- a/packages/acp-adapter/test/kaos-activation.test.ts +++ b/packages/acp-adapter/test/kaos-activation.test.ts @@ -1,252 +1,186 @@ /** - * Tests that {@link AcpSession.prompt} activates an {@link AcpKaos} - * (visible to tools via {@link getCurrentKaos}) when, and only when, - * the client advertises `fs.readTextFile` or `fs.writeTextFile`. + * Tests that {@link AcpServer.newSession} / `setupSessionFromExisting` + * passes an {@link AcpKaos} to {@link KimiHarness.createSession} / + * `resumeSession` when, and only when, the client advertises + * `fs.readTextFile` or `fs.writeTextFile`. * - * Uses scripted `Session` stubs whose `prompt(parts)` synchronously - * calls `getCurrentKaos()` *inside* its body — this is the moment a - * real tool would resolve its Kaos handle, so it's the right place - * to assert the binding propagated through `runWithKaos`. + * Boundary-injection model: the kaos is captured by the kernel + * `SessionImpl` ctor at session-creation time so every tool downstream + * sees the same reference — no AsyncLocalStorage, no per-prompt + * wrapping. The right surface to assert is therefore the + * `harness.createSession({ kaos })` boundary, not in-flight tool calls. */ -import type { AgentSideConnection, ClientCapabilities } from '@agentclientprotocol/sdk'; -import { getCurrentKaos, LocalKaos, runWithKaos } from '@moonshot-ai/kaos'; -import type { Event, Session } from '@moonshot-ai/kimi-code-sdk'; +import { + AgentSideConnection, + ClientSideConnection, + ndJsonStream, + type Client, + type ReadTextFileRequest, + type ReadTextFileResponse, + type RequestPermissionRequest, + type RequestPermissionResponse, + type SessionNotification, + type WriteTextFileRequest, + type WriteTextFileResponse, +} from '@agentclientprotocol/sdk'; +import type { Kaos } from '@moonshot-ai/kaos'; +import type { KimiHarness, Session } from '@moonshot-ai/kimi-code-sdk'; import { describe, expect, it } from 'vitest'; import { AcpKaos } from '../src/kaos-acp'; -import { AcpSession } from '../src/session'; +import { AcpServer } from '../src/server'; +import { AUTHED_STATUS } from './_helpers/harness-stubs'; + +class StubClient implements Client { + async requestPermission(_p: RequestPermissionRequest): Promise { + throw new Error('StubClient.requestPermission should not be called in kaos-activation test'); + } + async sessionUpdate(_n: SessionNotification): Promise { + // no-op — the server may push available_commands_update etc. + } + async writeTextFile(_p: WriteTextFileRequest): Promise { + return {}; + } + async readTextFile(_p: ReadTextFileRequest): Promise { + return { content: 'STUB' }; + } +} -/** - * Build a minimal {@link AgentSideConnection} stub whose - * `readTextFile` / `writeTextFile` return canned content. The stub - * also captures the calls so tests can verify the bridging path. - */ -function makeFakeConn(opts: { readContent?: string } = {}): { - conn: AgentSideConnection; - readCalls: Array<{ sessionId: string; path: string }>; - writeCalls: Array<{ sessionId: string; path: string; content: string }>; +function makeInMemoryStreamPair(): { + agentStream: ReturnType; + clientStream: ReturnType; } { - const readCalls: Array<{ sessionId: string; path: string }> = []; - const writeCalls: Array<{ sessionId: string; path: string; content: string }> = []; - const conn = { - readTextFile: async (req: { sessionId: string; path: string }) => { - readCalls.push({ sessionId: req.sessionId, path: req.path }); - return { content: opts.readContent ?? 'STUB' }; - }, - writeTextFile: async (req: { sessionId: string; path: string; content: string }) => { - writeCalls.push({ sessionId: req.sessionId, path: req.path, content: req.content }); - return {}; - }, - sessionUpdate: async () => undefined, - requestPermission: async () => { - throw new Error('requestPermission should not be called'); - }, - } as unknown as AgentSideConnection; - return { conn, readCalls, writeCalls }; + const clientToAgent = new TransformStream(); + const agentToClient = new TransformStream(); + const agentStream = ndJsonStream(agentToClient.writable, clientToAgent.readable); + const clientStream = ndJsonStream(clientToAgent.writable, agentToClient.readable); + return { agentStream, clientStream }; } -/** - * Build a `Session` whose `prompt(parts)` calls the supplied probe - * synchronously and then fires `turn.ended` so the outer - * `AcpSession.prompt` resolves promptly. - */ -function makeProbingSession( - sessionId: string, - probe: () => void | Promise, -): Session { - const listeners = new Set<(event: Event) => void>(); +interface CapturedCreate { + options: { id?: string; workDir: string; kaos?: Kaos }; +} + +function makeHarness(captured: CapturedCreate[]): KimiHarness { + const fakeSession = (id: string): Session => + ({ + id, + prompt: async () => undefined, + cancel: async () => undefined, + onEvent: () => () => undefined, + }) as unknown as Session; return { - id: sessionId, - prompt: async (_input: unknown) => { - // Run the probe inside the (potentially) runWithKaos-bound async - // subtree — this is exactly where a real tool would observe its - // current Kaos. - await probe(); - for (const fn of listeners) { - fn({ type: 'turn.ended', sessionId, agentId: 'main', turnId: 1, reason: 'completed' } as Event); - } - }, - cancel: async () => undefined, - onEvent: (fn: (event: Event) => void) => { - listeners.add(fn); - return () => { - listeners.delete(fn); - }; + auth: { status: async () => AUTHED_STATUS }, + createSession: async (options: { id?: string; workDir: string; kaos?: Kaos }) => { + captured.push({ options }); + return fakeSession(options.id ?? 'fallback'); }, - } as unknown as Session; + getConfig: async () => ({ providers: {}, models: {} }), + } as unknown as KimiHarness; } -describe('AcpSession FS-capability activation', () => { - it('binds an AcpKaos as the current Kaos when the client advertises fs.readTextFile', async () => { - const { conn, readCalls } = makeFakeConn({ readContent: 'UNSAVED' }); +describe('AcpServer FS-capability activation (boundary injection)', () => { + it('passes an AcpKaos to createSession when the client advertises fs.readTextFile', async () => { + const captured: CapturedCreate[] = []; + const harness = makeHarness(captured); + const { agentStream, clientStream } = makeInMemoryStreamPair(); - let observedKaosName: string | undefined; - let observedRead: string | undefined; - const session = makeProbingSession('s-fs', async () => { - const current = getCurrentKaos(); - observedKaosName = current.name; - observedRead = await current.readText('/abs/path.ts'); - }); + new AgentSideConnection((c) => new AcpServer(harness, c), agentStream); + const client = new ClientSideConnection((_a) => new StubClient(), clientStream); - // Activation needs a baseline Kaos in the *outer* async context - // (otherwise getCurrentKaos throws when the AsyncLocalStorage store - // is undefined). Real `kimi acp` wires this at startup; tests must - // emulate. Note we deliberately use `runWithKaos` rather than - // `setCurrentKaos` so the outer binding stays test-local. - const outer = await LocalKaos.create(); - const result = await runWithKaos(outer, async () => { - const caps: ClientCapabilities = { fs: { readTextFile: true } }; - const acpSession = new AcpSession(conn, session, caps); - return acpSession.prompt([]); + await client.initialize({ + protocolVersion: 1, + clientCapabilities: { fs: { readTextFile: true, writeTextFile: false } }, }); + await client.newSession({ cwd: '/tmp/work', mcpServers: [] }); - expect(result.stopReason).toBe('end_turn'); - // Name probe confirms `runWithKaos` rebound to AcpKaos within the - // scripted prompt body. - expect(observedKaosName).toBe('acp(local)'); - // Read probe confirms the bridge actually routes through the conn. - expect(observedRead).toBe('UNSAVED'); - expect(readCalls).toEqual([{ sessionId: 's-fs', path: '/abs/path.ts' }]); + expect(captured).toHaveLength(1); + expect(captured[0]?.options.kaos).toBeInstanceOf(AcpKaos); + expect(captured[0]?.options.kaos?.name).toBe('acp(local)'); }); - it('binds an AcpKaos when only fs.writeTextFile is advertised', async () => { - const { conn, writeCalls } = makeFakeConn(); - let probedName: string | undefined; - const session = makeProbingSession('s-write', async () => { - const current = getCurrentKaos(); - probedName = current.name; - await current.writeText('/abs/out.ts', 'data'); - }); - - const outer = await LocalKaos.create(); - await runWithKaos(outer, async () => { - const caps: ClientCapabilities = { fs: { writeTextFile: true } }; - const acpSession = new AcpSession(conn, session, caps); - await acpSession.prompt([]); - }); - - expect(probedName).toBe('acp(local)'); - expect(writeCalls).toEqual([{ sessionId: 's-write', path: '/abs/out.ts', content: 'data' }]); - }); + it('passes an AcpKaos when only fs.writeTextFile is advertised', async () => { + const captured: CapturedCreate[] = []; + const harness = makeHarness(captured); + const { agentStream, clientStream } = makeInMemoryStreamPair(); - it('does NOT wrap when the client advertises no FS capability — current Kaos stays the outer one', async () => { - const { conn, readCalls } = makeFakeConn(); - let observedKaosName: string | undefined; - const session = makeProbingSession('s-nofs', () => { - observedKaosName = getCurrentKaos().name; - }); + new AgentSideConnection((c) => new AcpServer(harness, c), agentStream); + const client = new ClientSideConnection((_a) => new StubClient(), clientStream); - const outer = await LocalKaos.create(); - await runWithKaos(outer, async () => { - // No clientCapabilities — equivalent to "client didn't advertise FS" - const acpSession = new AcpSession(conn, session); - await acpSession.prompt([]); + await client.initialize({ + protocolVersion: 1, + clientCapabilities: { fs: { readTextFile: false, writeTextFile: true } }, }); + await client.newSession({ cwd: '/tmp/work', mcpServers: [] }); - // Pass-through: tool sees the outer LocalKaos, not an AcpKaos. - expect(observedKaosName).toBe('local'); - expect(readCalls).toEqual([]); + expect(captured).toHaveLength(1); + expect(captured[0]?.options.kaos).toBeInstanceOf(AcpKaos); }); - it('does NOT wrap when the FS capability flags are present-but-false', async () => { - const { conn } = makeFakeConn(); - let observedKaosName: string | undefined; - const session = makeProbingSession('s-falseflags', () => { - observedKaosName = getCurrentKaos().name; - }); + it('omits kaos when the client advertises no FS capability', async () => { + const captured: CapturedCreate[] = []; + const harness = makeHarness(captured); + const { agentStream, clientStream } = makeInMemoryStreamPair(); - const outer = await LocalKaos.create(); - await runWithKaos(outer, async () => { - const caps: ClientCapabilities = { fs: { readTextFile: false, writeTextFile: false } }; - const acpSession = new AcpSession(conn, session, caps); - await acpSession.prompt([]); + new AgentSideConnection((c) => new AcpServer(harness, c), agentStream); + const client = new ClientSideConnection((_a) => new StubClient(), clientStream); + + await client.initialize({ + protocolVersion: 1, + clientCapabilities: { fs: { readTextFile: false, writeTextFile: false } }, }); + await client.newSession({ cwd: '/tmp/work', mcpServers: [] }); - expect(observedKaosName).toBe('local'); + expect(captured).toHaveLength(1); + expect(captured[0]?.options.kaos).toBeUndefined(); }); - it('isolates concurrent prompts on different AcpSessions — each sees its own AcpKaos', async () => { - const { conn: connA } = makeFakeConn({ readContent: 'A-CONTENT' }); - const { conn: connB } = makeFakeConn({ readContent: 'B-CONTENT' }); - - let observedA: string | undefined; - let observedB: string | undefined; - - // Use a manual gate so both prompts overlap — A's probe blocks until - // B has had a chance to enter its scope. This forces the - // AsyncLocalStorage isolation invariant to be exercised: if - // `enterWith` were used naively, B's binding would leak into A. - let resolveA: () => void = () => undefined; - const aGate = new Promise((r) => { - resolveA = r; - }); + it('omits kaos when the FS capability flags are both false', async () => { + const captured: CapturedCreate[] = []; + const harness = makeHarness(captured); + const { agentStream, clientStream } = makeInMemoryStreamPair(); - const sessionA = makeProbingSession('s-A', async () => { - // Resolve A's probe AFTER we observe — see comment above. - await new Promise((r) => setTimeout(r, 5)); - observedA = await getCurrentKaos().readText('/file'); - resolveA(); - }); - const sessionB = makeProbingSession('s-B', async () => { - observedB = await getCurrentKaos().readText('/file'); - await aGate; - }); + new AgentSideConnection((c) => new AcpServer(harness, c), agentStream); + const client = new ClientSideConnection((_a) => new StubClient(), clientStream); - const outer = await LocalKaos.create(); - await runWithKaos(outer, async () => { - const acpA = new AcpSession(connA, sessionA, { fs: { readTextFile: true } }); - const acpB = new AcpSession(connB, sessionB, { fs: { readTextFile: true } }); - await Promise.all([acpA.prompt([]), acpB.prompt([])]); + await client.initialize({ + protocolVersion: 1, + clientCapabilities: { fs: { readTextFile: false, writeTextFile: false } }, }); + await client.newSession({ cwd: '/tmp/work', mcpServers: [] }); - expect(observedA).toBe('A-CONTENT'); - expect(observedB).toBe('B-CONTENT'); + expect(captured).toHaveLength(1); + expect(captured[0]?.options.kaos).toBeUndefined(); }); - it('reuses the inner LocalKaos across multiple prompts on the same AcpSession', async () => { - const { conn } = makeFakeConn({ readContent: 'X' }); - let firstInner: import('@moonshot-ai/kaos').Kaos | undefined; - let secondInner: import('@moonshot-ai/kaos').Kaos | undefined; - const session = makeProbingSession('s-reuse', () => { - // Tunnel: AcpKaos wraps an inner Kaos and exposes it indirectly - // via getcwd() (which delegates). For this assertion it's enough - // that the AcpKaos's `name` stays stable AND we observe the - // session through two distinct prompts without errors. - const k = getCurrentKaos(); - if (!firstInner) firstInner = k; - else secondInner = k; - }); - - const outer = await LocalKaos.create(); - await runWithKaos(outer, async () => { - const acpSession = new AcpSession(conn, session, { fs: { readTextFile: true } }); - await acpSession.prompt([]); - await acpSession.prompt([]); - }); + it('threads the per-session id into the AcpKaos so reverse-RPC calls route to the right session', async () => { + const captured: CapturedCreate[] = []; + const harness = makeHarness(captured); + const { agentStream, clientStream } = makeInMemoryStreamPair(); - expect(firstInner).toBeDefined(); - expect(secondInner).toBeDefined(); - // Each prompt produces its own AcpKaos wrapper, but the inner - // LocalKaos is reused — we can't directly read the private field, - // but the visible name should stay stable. - expect(firstInner?.name).toBe('acp(local)'); - expect(secondInner?.name).toBe('acp(local)'); - }); + let observedSessionId: string | undefined; + class CapturingClient extends StubClient { + override async readTextFile(p: ReadTextFileRequest): Promise { + observedSessionId = p.sessionId; + return { content: 'STUB' }; + } + } - it('returns an AcpKaos instance from maybeBuildAcpKaos when capable (verified via name + instanceof through prompt path)', async () => { - const { conn } = makeFakeConn(); - let observed: unknown; - const session = makeProbingSession('s-instance', () => { - observed = getCurrentKaos(); - }); + new AgentSideConnection((c) => new AcpServer(harness, c), agentStream); + const client = new ClientSideConnection((_a) => new CapturingClient(), clientStream); - const outer = await LocalKaos.create(); - await runWithKaos(outer, async () => { - const acpSession = new AcpSession(conn, session, { fs: { readTextFile: true } }); - await acpSession.prompt([]); + await client.initialize({ + protocolVersion: 1, + clientCapabilities: { fs: { readTextFile: true } }, }); + const response = await client.newSession({ cwd: '/tmp/work', mcpServers: [] }); - expect(observed).toBeInstanceOf(AcpKaos); + const kaos = captured[0]?.options.kaos; + expect(kaos).toBeInstanceOf(AcpKaos); + // Drive a reverse-RPC read through the AcpKaos and verify the + // sessionId on the wire matches the one returned by newSession. + await kaos!.readText('/abs/file.ts'); + expect(observedSessionId).toBe(response.sessionId); }); }); diff --git a/packages/acp-adapter/test/session-new.test.ts b/packages/acp-adapter/test/session-new.test.ts index dc358755f..ece351999 100644 --- a/packages/acp-adapter/test/session-new.test.ts +++ b/packages/acp-adapter/test/session-new.test.ts @@ -46,7 +46,7 @@ function makeInMemoryStreamPair(): { } interface CapturedCall { - options: { workDir: string }; + options: { id?: string; workDir: string; mcpServers?: Record }; } function makeHarness(sessionId: string, captured: CapturedCall[]): { @@ -61,9 +61,9 @@ function makeHarness(sessionId: string, captured: CapturedCall[]): { } as unknown as Session; const harness = { auth: { status: async () => AUTHED_STATUS }, - createSession: async (options: { workDir: string }) => { + createSession: async (options: { id?: string; workDir: string }) => { captured.push({ options }); - return fakeSession; + return Object.assign({}, fakeSession, { id: options.id ?? sessionId }) as Session; }, // Phase 14: server.newSession reads these to assemble configOptions. getConfig: async () => ({ @@ -98,26 +98,26 @@ describe('AcpServer session/new', () => { const response = await client.newSession(request); - expect(response.sessionId).toBe('sess-42'); + expect(typeof response.sessionId).toBe('string'); + expect(response.sessionId.length).toBeGreaterThan(0); expect(captured).toHaveLength(1); - expect(captured[0]?.options).toEqual({ workDir: '/tmp/work', mcpServers: {} }); + expect(captured[0]?.options.workDir).toBe('/tmp/work'); + expect(captured[0]?.options.id).toBe(response.sessionId); + expect(captured[0]?.options.mcpServers).toEqual({}); // The wrapper is stashed in the map under the same id we returned to // the client (so Phase 3.3/3.4 can look it up by sessionId). - expect(server?.getSession('sess-42')?.id).toBe('sess-42'); + expect(server?.getSession(response.sessionId)?.id).toBe(response.sessionId); }); it('returns a distinct sessionId per call (one createSession per request)', async () => { const captured: CapturedCall[] = []; - let counter = 0; const harness = { auth: { status: async () => AUTHED_STATUS }, - createSession: async (options: { workDir: string }) => { - counter += 1; - const id = `sess-${counter}`; + createSession: async (options: { id?: string; workDir: string }) => { captured.push({ options }); return { - id, + id: options.id ?? 'fallback', prompt: async () => undefined, cancel: async () => undefined, onEvent: () => () => undefined, @@ -134,11 +134,14 @@ describe('AcpServer session/new', () => { const first = await client.newSession({ cwd: '/tmp/a', mcpServers: [] }); const second = await client.newSession({ cwd: '/tmp/b', mcpServers: [] }); - expect(first.sessionId).toBe('sess-1'); - expect(second.sessionId).toBe('sess-2'); + expect(typeof first.sessionId).toBe('string'); + expect(typeof second.sessionId).toBe('string'); + expect(first.sessionId).not.toBe(second.sessionId); expect(captured).toHaveLength(2); - expect(captured[0]?.options).toEqual({ workDir: '/tmp/a', mcpServers: {} }); - expect(captured[1]?.options).toEqual({ workDir: '/tmp/b', mcpServers: {} }); + expect(captured[0]?.options.workDir).toBe('/tmp/a'); + expect(captured[0]?.options.id).toBe(first.sessionId); + expect(captured[1]?.options.workDir).toBe('/tmp/b'); + expect(captured[1]?.options.id).toBe(second.sessionId); }); it('advertises configOptions (PLAN D11 + Phase 15 thinking toggle) — model + thinking + mode under the unified SessionConfigOption surface', async () => { diff --git a/packages/acp-adapter/test/session-prompt.test.ts b/packages/acp-adapter/test/session-prompt.test.ts index 0ceb5f10e..a33ae22a9 100644 --- a/packages/acp-adapter/test/session-prompt.test.ts +++ b/packages/acp-adapter/test/session-prompt.test.ts @@ -226,4 +226,74 @@ describe('AcpServer session/prompt', () => { ).rejects.toBeDefined(); expect(unsubCount).toBe(1); }); + + it('rejects prompt when the SDK emits a turn.agent_busy error event', async () => { + const sessionId = 'sess-busy'; + const { session, unsubscribeCount } = makeScriptedSession(sessionId, [ + { + type: 'error', + sessionId, + agentId: 'main', + code: 'turn.agent_busy', + message: 'Cannot launch a new turn while another turn (ID 0) is active', + details: { turnId: 0 }, + retryable: true, + } as unknown as Event, + ]); + const harness = { + auth: { status: async () => AUTHED_STATUS }, + createSession: async () => session, + } as unknown as KimiHarness; + + const { agentStream, clientStream } = makeInMemoryStreamPair(); + new AgentSideConnection((c) => new AcpServer(harness, c), agentStream); + const client = new ClientSideConnection(() => new CollectingClient(), clientStream); + + await client.newSession({ cwd: '/tmp/x', mcpServers: [] }); + + await expect( + client.prompt({ sessionId, prompt: [textBlock('hi')] }), + ).rejects.toMatchObject({ code: -32600 }); + expect(unsubscribeCount()).toBe(1); + }); + + it('ignores a subagent turn.ended and resolves on the main agent turn.ended', async () => { + const sessionId = 'sess-subagent'; + const { session, unsubscribeCount } = makeScriptedSession(sessionId, [ + { type: 'assistant.delta', sessionId, agentId: 'main', turnId: 1, delta: 'a' } as Event, + // A subagent finishes its own turn while the main turn is still + // running. Pre-fix this would resolve the parent prompt with + // `end_turn` and leak the listener; post-fix it must be ignored. + { + type: 'turn.ended', + sessionId, + agentId: 'sub-1', + turnId: 99, + reason: 'completed', + } as Event, + { type: 'assistant.delta', sessionId, agentId: 'main', turnId: 1, delta: 'b' } as Event, + { type: 'turn.ended', sessionId, agentId: 'main', turnId: 1, reason: 'completed' } as Event, + ]); + const harness = { + auth: { status: async () => AUTHED_STATUS }, + createSession: async () => session, + } as unknown as KimiHarness; + + const { agentStream, clientStream } = makeInMemoryStreamPair(); + new AgentSideConnection((c) => new AcpServer(harness, c), agentStream); + const collecting = new CollectingClient(); + const client = new ClientSideConnection(() => collecting, clientStream); + + await client.newSession({ cwd: '/tmp/x', mcpServers: [] }); + + const response = await client.prompt({ + sessionId, + prompt: [textBlock('hi')], + }); + + expect(response.stopReason).toBe('end_turn'); + await new Promise((resolve) => setTimeout(resolve, 20)); + expect(collecting.promptUpdates).toHaveLength(2); + expect(unsubscribeCount()).toBe(1); + }); }); diff --git a/packages/agent-core/src/rpc/core-impl.ts b/packages/agent-core/src/rpc/core-impl.ts index f78a9b8ef..403308440 100644 --- a/packages/agent-core/src/rpc/core-impl.ts +++ b/packages/agent-core/src/rpc/core-impl.ts @@ -183,6 +183,13 @@ export class KimiCore implements PromisableMethods { } async createSession(input: CreateSessionPayload): Promise { + return this.createSessionWithOverrides(input, {}); + } + + async createSessionWithOverrides( + input: CreateSessionPayload, + overrides: { kaos?: Kaos }, + ): Promise { const options = input; const workDir = requiredWorkDir('createSession', options.workDir); const config = this.reloadProviderManager(); @@ -210,8 +217,9 @@ export class KimiCore implements PromisableMethods { // Session ctor attaches its own log sink. If anything in the setup-after- // ctor block throws, `session.close()` releases the sink (and mcp). const runtime = await this.resolveRuntime(config); + const parentKaos = overrides.kaos ?? (await this.getKaos()); const session = new Session({ - kaos: (await this.getKaos()).withCwd(workDir), + kaos: parentKaos.withCwd(workDir), toolServices: runtime, config, id, @@ -282,6 +290,13 @@ export class KimiCore implements PromisableMethods { } async resumeSession(input: ResumeSessionPayload): Promise { + return this.resumeSessionWithOverrides(input, {}); + } + + async resumeSessionWithOverrides( + input: ResumeSessionPayload, + overrides: { kaos?: Kaos }, + ): Promise { const summary = await this.sessionStore.get(input.sessionId); const active = this.sessions.get(summary.id); if (active !== undefined) { @@ -298,8 +313,9 @@ export class KimiCore implements PromisableMethods { const pluginSessionStarts = this.plugins.enabledSessionStarts(); const mcpConfig = this.mergePluginMcpConfig(withCallerMcp); const runtime = await this.resolveRuntime(config); + const parentKaos = overrides.kaos ?? (await this.getKaos()); const session = new Session({ - kaos: (await this.getKaos()).withCwd(summary.workDir), + kaos: parentKaos.withCwd(summary.workDir), toolServices: runtime, config, id: summary.id, diff --git a/packages/node-sdk/src/kimi-harness.ts b/packages/node-sdk/src/kimi-harness.ts index 86175843c..90c4b3335 100644 --- a/packages/node-sdk/src/kimi-harness.ts +++ b/packages/node-sdk/src/kimi-harness.ts @@ -84,8 +84,11 @@ export class KimiHarness { } async createSession(options: CreateSessionOptions): Promise { - const { planMode, ...coreOptions } = options; - const summary = await this.rpc.createSession(coreOptions); + const { planMode, kaos, ...coreOptions } = options; + const summary = + kaos === undefined + ? await this.rpc.createSession(coreOptions) + : await this.rpc.createSessionWithKaos(coreOptions, kaos); const session = new Session({ id: summary.id, workDir: summary.workDir, @@ -109,7 +112,11 @@ export class KimiHarness { const active = this.activeSessions.get(id); if (active !== undefined) return active; - const summary = await this.rpc.resumeSession({ ...input, id }); + const { kaos, ...resumeInput } = input; + const summary = + kaos === undefined + ? await this.rpc.resumeSession({ ...resumeInput, id }) + : await this.rpc.resumeSessionWithKaos({ ...resumeInput, id }, kaos); const session = new Session({ id: summary.id, workDir: summary.workDir, diff --git a/packages/node-sdk/src/rpc.ts b/packages/node-sdk/src/rpc.ts index c3a220ea8..815bf9f39 100644 --- a/packages/node-sdk/src/rpc.ts +++ b/packages/node-sdk/src/rpc.ts @@ -14,6 +14,7 @@ import { type ToolCallRequest, type ToolCallResponse, } from '@moonshot-ai/agent-core'; +import type { Kaos } from '@moonshot-ai/kaos'; import type { ApprovalHandler, QuestionHandler } from '#/events'; import type { @@ -106,11 +107,27 @@ export abstract class SDKRpcClientBase { return rpc.createSession(coreInput); } + async createSessionWithKaos( + input: CreateSessionOptions, + kaos: Kaos, + ): Promise { + void kaos; + return this.createSession(input); + } + async resumeSession(input: ResumeSessionInput): Promise { const rpc = await this.getRpc(); return rpc.resumeSession({ ...input, sessionId: input.id }); } + async resumeSessionWithKaos( + input: ResumeSessionInput, + kaos: Kaos, + ): Promise { + void kaos; + return this.resumeSession(input); + } + async reloadSession(input: SessionIdRpcInput): Promise { const rpc = await this.getRpc(); return rpc.reloadSession({ sessionId: input.sessionId }); diff --git a/packages/node-sdk/src/sdk-rpc-client.ts b/packages/node-sdk/src/sdk-rpc-client.ts index 010b29eca..792a79b24 100644 --- a/packages/node-sdk/src/sdk-rpc-client.ts +++ b/packages/node-sdk/src/sdk-rpc-client.ts @@ -13,12 +13,21 @@ import { type SDKAPI, type TelemetryClient, } from '@moonshot-ai/agent-core'; +import type { Kaos } from '@moonshot-ai/kaos'; import { assertKimiHostIdentity, createKimiDefaultHeaders } from '@moonshot-ai/kimi-code-oauth'; import { KimiAuthFacade } from '#/auth'; import { KimiHarness } from '#/kimi-harness'; import { ClientAPI, SDKRpcClientBase } from '#/rpc'; -import type { KimiHarnessOptions, KimiHostIdentity, OAuthRefreshOutcome } from '#/types'; +import type { + CreateSessionOptions, + KimiHarnessOptions, + KimiHostIdentity, + OAuthRefreshOutcome, + ResumeSessionInput, + ResumedSessionSummary, + SessionSummary, +} from '#/types'; export interface SDKRpcClientOptions { readonly homeDir?: string; @@ -89,6 +98,25 @@ export class SDKRpcClient extends SDKRpcClientBase { return this.ready; } + override async createSessionWithKaos( + input: CreateSessionOptions, + kaos: Kaos, + ): Promise { + const { planMode, ...coreInput } = input; + void planMode; + return this.core.createSessionWithOverrides(coreInput, { kaos }); + } + + override async resumeSessionWithKaos( + input: ResumeSessionInput, + kaos: Kaos, + ): Promise { + return this.core.resumeSessionWithOverrides( + { ...input, sessionId: input.id }, + { kaos }, + ); + } + private createKimiRequestHeaders(): Record | undefined { if (this.identity === undefined) return undefined; return createKimiDefaultHeaders({ diff --git a/packages/node-sdk/src/types.ts b/packages/node-sdk/src/types.ts index 1ef8a3277..4cc2b67a4 100644 --- a/packages/node-sdk/src/types.ts +++ b/packages/node-sdk/src/types.ts @@ -6,6 +6,7 @@ import type { TelemetryContextPatch, TelemetryProperties, } from '@moonshot-ai/agent-core'; +import type { Kaos } from '@moonshot-ai/kaos'; import type { KimiHostIdentity, OAuthRefreshOutcome } from '@moonshot-ai/kimi-code-oauth'; import type { ContentPart } from '@moonshot-ai/kosong'; @@ -92,6 +93,7 @@ export interface CreateSessionOptions { readonly permission?: PermissionMode | undefined; readonly planMode?: boolean; readonly metadata?: JsonObject | undefined; + readonly kaos?: Kaos | undefined; } export interface RenameSessionInput { @@ -101,6 +103,7 @@ export interface RenameSessionInput { export interface ResumeSessionInput { readonly id: string; + readonly kaos?: Kaos | undefined; } export interface ForkSessionInput { @@ -187,4 +190,4 @@ export interface SessionSummary { export type ResumedSessionState = Pick; -export interface ResumedSessionSummary extends SessionSummary, ResumedSessionState {} +export interface ResumedSessionSummary extends SessionSummary, ResumedSessionState { } From 43c0cd9da30131dca0755da2f14354bca6d3203c Mon Sep 17 00:00:00 2001 From: "haozhe.yang" Date: Fri, 5 Jun 2026 16:09:50 +0800 Subject: [PATCH 2/2] fix(acp): stabilize ACP session bootstrap and slash routing Add ACP slash-command snapshots and skill activation routing.\nKeep tool Kaos separate from persistence Kaos for ACP session bootstrap.\nTighten append-mode not-found handling and legacy approval compatibility.\nFilter subagent events from parent prompt resolution and update changesets. --- .changeset/acp-fs-and-subagent-fixes.md | 7 - .changeset/acp-sdk-and-cli-fixes.md | 8 + .changeset/preserve-compaction-thinking.md | 6 - apps/kimi-code/src/cli/sub/acp.ts | 46 ++- packages/acp-adapter/src/approval.ts | 8 + packages/acp-adapter/src/index.ts | 3 +- packages/acp-adapter/src/kaos-acp.ts | 29 +- packages/acp-adapter/src/server.ts | 160 ++++++++-- packages/acp-adapter/src/session.ts | 105 +++++- packages/acp-adapter/src/slash.ts | 50 +++ packages/acp-adapter/test/approval.test.ts | 15 + packages/acp-adapter/test/kaos-acp.test.ts | 17 +- .../acp-adapter/test/kaos-activation.test.ts | 12 +- .../acp-adapter/test/session-prompt.test.ts | 107 +++++++ .../acp-adapter/test/session-resume.test.ts | 4 + .../acp-adapter/test/session-slash.test.ts | 300 ++++++++++++++++++ packages/acp-adapter/test/slash.test.ts | 98 ++++++ packages/acp-adapter/test/tool-result.test.ts | 12 +- packages/agent-core/src/agent/index.ts | 13 +- packages/agent-core/src/rpc/core-impl.ts | 11 +- packages/agent-core/src/session/index.ts | 38 ++- .../agent-core/src/session/subagent-host.ts | 4 +- .../agent-core/src/tools/builtin/file/read.ts | 13 +- packages/agent-core/test/session/init.test.ts | 75 +++++ .../test/session/subagent-host.test.ts | 1 + packages/agent-core/test/tools/read.test.ts | 25 ++ packages/node-sdk/src/kimi-harness.ts | 20 +- packages/node-sdk/src/rpc.ts | 4 + packages/node-sdk/src/sdk-rpc-client.ts | 6 +- packages/node-sdk/src/types.ts | 2 + .../test/create-session-transport.test.ts | 71 ++++- 31 files changed, 1168 insertions(+), 102 deletions(-) delete mode 100644 .changeset/acp-fs-and-subagent-fixes.md create mode 100644 .changeset/acp-sdk-and-cli-fixes.md delete mode 100644 .changeset/preserve-compaction-thinking.md create mode 100644 packages/acp-adapter/src/slash.ts create mode 100644 packages/acp-adapter/test/session-slash.test.ts create mode 100644 packages/acp-adapter/test/slash.test.ts diff --git a/.changeset/acp-fs-and-subagent-fixes.md b/.changeset/acp-fs-and-subagent-fixes.md deleted file mode 100644 index 165964c59..000000000 --- a/.changeset/acp-fs-and-subagent-fixes.md +++ /dev/null @@ -1,7 +0,0 @@ ---- -"@moonshot-ai/acp-adapter": patch -"@moonshot-ai/agent-core": patch -"@moonshot-ai/node-sdk": patch ---- - -Fix ACP-bridged file reads for binary content and line terminators, prevent silent overwrites on append-mode permission errors, and stop subagent events from interfering with parent prompt resolution. diff --git a/.changeset/acp-sdk-and-cli-fixes.md b/.changeset/acp-sdk-and-cli-fixes.md new file mode 100644 index 000000000..1b1c89908 --- /dev/null +++ b/.changeset/acp-sdk-and-cli-fixes.md @@ -0,0 +1,8 @@ +--- +"@moonshot-ai/acp-adapter": patch +"@moonshot-ai/agent-core": patch +"@moonshot-ai/kimi-code-sdk": patch +"@moonshot-ai/kimi-code": patch +--- + +Fix ACP slash skill routing, bootstrap context reads, file and permission edge cases, subagent event handling, and stale-file edit messaging. diff --git a/.changeset/preserve-compaction-thinking.md b/.changeset/preserve-compaction-thinking.md deleted file mode 100644 index 978bb48ea..000000000 --- a/.changeset/preserve-compaction-thinking.md +++ /dev/null @@ -1,6 +0,0 @@ ---- -"@moonshot-ai/agent-core": patch -"@moonshot-ai/kimi-code": patch ---- - -Preserve thinking effort when compacting long conversations. diff --git a/apps/kimi-code/src/cli/sub/acp.ts b/apps/kimi-code/src/cli/sub/acp.ts index 5fa786676..56e6ed2d1 100644 --- a/apps/kimi-code/src/cli/sub/acp.ts +++ b/apps/kimi-code/src/cli/sub/acp.ts @@ -21,11 +21,17 @@ import type { Command } from 'commander'; -import { runAcpServer } from '@moonshot-ai/acp-adapter'; -import { createKimiHarness } from '@moonshot-ai/kimi-code-sdk'; +import { + runAcpServer, + type AvailableCommand, + type SlashCommandsSnapshot, +} from '@moonshot-ai/acp-adapter'; +import { createKimiHarness, type Session, type SkillSummary } from '@moonshot-ai/kimi-code-sdk'; import { KIMI_CODE_HOME_ENV } from '#/constant/app'; import { createKimiCodeHostIdentity, getVersion } from '#/cli/version'; +import { BUILTIN_SLASH_COMMANDS } from '#/tui/commands/registry'; +import { buildSkillSlashCommands } from '#/tui/commands/skills'; import { runLoginFlow } from './login-flow'; @@ -66,9 +72,45 @@ export function registerAcpCommand(parent: Command): void { // client can spawn it with `args:['login']` for the top-level // `kimi login` subcommand — matches kimi-cli `acp/server.py:77-96`. const legacyCommand = process.argv[1]; + const builtinCommands: AvailableCommand[] = BUILTIN_SLASH_COMMANDS.map((cmd) => ({ + name: cmd.name, + description: cmd.description, + })); + // Skills are session-scoped (per-cwd config), so we defer the + // listSkills() call until the adapter hands us the just-created + // Session — mirrors opencode's per-directory snapshot. A + // listSkills() failure degrades to builtins-only so a broken + // skill source never blanks the palette. + const resolveSlashCommands = async ( + session: Session, + ): Promise => { + let skills: readonly SkillSummary[] = []; + try { + skills = await session.listSkills(); + } catch { + skills = []; + } + // `buildSkillSlashCommands` already returns both views — the + // palette entries (advertised via `available_commands_update`) + // and the `commandName → skillName` map the adapter uses to + // intercept `/skill:` inputs and route them to + // `Session.activateSkill`. Passing both through keeps the two + // surfaces in lockstep (palette ↔ interceptable set) without + // a second `listSkills()` round trip. + const built = buildSkillSlashCommands(skills); + const skillCommands = built.commands.map((cmd) => ({ + name: cmd.name, + description: cmd.description, + })); + return { + commands: [...builtinCommands, ...skillCommands], + skillCommandMap: built.commandMap, + }; + }; try { await runAcpServer(harness, { agentInfo: { name: 'Kimi Code CLI', version: getVersion() }, + slashCommands: resolveSlashCommands, ...(terminalAuthEnv ? { terminalAuthEnv } : {}), ...(legacyCommand !== undefined && legacyCommand.length > 0 ? { terminalAuthLegacyCommand: legacyCommand } diff --git a/packages/acp-adapter/src/approval.ts b/packages/acp-adapter/src/approval.ts index 8e183fdae..9d6b9c28b 100644 --- a/packages/acp-adapter/src/approval.ts +++ b/packages/acp-adapter/src/approval.ts @@ -157,8 +157,16 @@ export function permissionResponseToApprovalResponse( } switch (optionId) { case APPROVE_ONCE_OPTION_ID: + // Legacy Python kimi-cli (< v0.9.0) used 'approve' as the + // allow-once optionId. Keep accepting it so custom ACP clients + // built against the old SDK are not silently rejected. + case 'approve': return { decision: 'approved' }; case APPROVE_ALWAYS_OPTION_ID: + // Legacy Python kimi-cli (< v0.9.0) used 'approve_for_session' as + // the allow-always optionId. Same backward-compatibility rationale + // as the 'approve' branch above. + case 'approve_for_session': return { decision: 'approved', scope: 'session' }; case REJECT_OPTION_ID: return { decision: 'rejected' }; diff --git a/packages/acp-adapter/src/index.ts b/packages/acp-adapter/src/index.ts index 659d1f514..d621052a3 100644 --- a/packages/acp-adapter/src/index.ts +++ b/packages/acp-adapter/src/index.ts @@ -1,8 +1,9 @@ -export type { Implementation } from '@agentclientprotocol/sdk'; +export type { AvailableCommand, Implementation } from '@agentclientprotocol/sdk'; export { CURRENT_VERSION, MIN_PROTOCOL_VERSION, negotiateVersion } from './version'; export type { AcpVersionSpec } from './version'; export { TERMINAL_AUTH_METHOD, buildTerminalAuthMethod } from './auth-methods'; export { AcpServer, runAcpServer, runAcpServerWithStream } from './server'; +export type { SlashCommandsSnapshot } from './server'; export { AcpSession } from './session'; export { acpBlocksToPromptParts, diff --git a/packages/acp-adapter/src/kaos-acp.ts b/packages/acp-adapter/src/kaos-acp.ts index db7a64dee..14337d64d 100644 --- a/packages/acp-adapter/src/kaos-acp.ts +++ b/packages/acp-adapter/src/kaos-acp.ts @@ -141,6 +141,16 @@ export class AcpKaos implements Kaos { return this.inner.readBytes(path, n); } + /** + * Return a small UTF-8 header derived from the same ACP text source as + * `readText` / `readLines`, used only by text-read callers for sniffing. + * Keep `readBytes` local so binary callers such as ReadMediaFile stay safe. + */ + async readTextPreview(path: string, n: number): Promise { + const text = await this.readText(path); + return Buffer.from(text.slice(0, n), 'utf8'); + } + /** * Yield lines from the file, each terminated by its `\n` (the final * line has no terminator if the file did not end with `\n`). Matches @@ -248,16 +258,13 @@ function wrapKaosError(prefix: string, cause: unknown): KaosError { } /** - * Return true iff `err` looks like a "file does not exist" failure on - * the read side of an ACP append-mode write. We recognize: - * - `RequestError` with code `-32002` (the ACP SDK's `resourceNotFound`). - * - The `KaosError` wrapper around such a `RequestError` (which `readText` - * above produces) — unwrap via `.cause`. - * - A loose "not found" / "no such file" string match as a last resort, - * for clients that synthesize errors without using the SDK helpers. - * - * Permission denials, transport errors, and anything else propagate so - * append never silently overwrites an existing file. + * Return true iff `err` is a structured "file does not exist" failure on + * the read side of an ACP append-mode write. We only trust the ACP SDK's + * `RequestError.resourceNotFound` code (`-32002`), optionally wrapped in a + * `KaosError` by `readText` above. Message substring matching is intentionally + * avoided: wrapper messages include the path, so a path or non-ENOENT failure + * mentioning "not found" could otherwise be misclassified and cause append + * mode to overwrite existing content. */ function isNotFoundError(err: unknown): boolean { const visited = new Set(); @@ -266,8 +273,6 @@ function isNotFoundError(err: unknown): boolean { visited.add(cur); if (cur instanceof RequestError && cur.code === -32002) return true; if (cur instanceof Error) { - const msg = cur.message.toLowerCase(); - if (msg.includes('not found') || msg.includes('no such file')) return true; cur = (cur as Error & { cause?: unknown }).cause; continue; } diff --git a/packages/acp-adapter/src/server.ts b/packages/acp-adapter/src/server.ts index c3218c9a5..43b7b7c3b 100644 --- a/packages/acp-adapter/src/server.ts +++ b/packages/acp-adapter/src/server.ts @@ -17,6 +17,7 @@ import { type AgentCapabilities, type AuthenticateRequest, type AuthenticateResponse, + type AvailableCommand, type CancelNotification, type ClientCapabilities, type Implementation, @@ -58,6 +59,51 @@ import { listModelsFromHarness } from './model-catalog'; import { DEFAULT_MODE_ID } from './modes'; import { negotiateVersion, type AcpVersionSpec } from './version'; +/** + * Per-session snapshot returned by the {@link AcpServer} caller's + * `slashCommands` resolver. Carries both what gets advertised in the + * `available_commands_update` push and the `skillCommandMap` that + * {@link AcpSession.prompt} consults to intercept `/skill:` + * inputs and route them to {@link Session.activateSkill}. + * + * `skillCommandMap` is optional for backward compatibility: callers + * that pre-date slash-command routing (or that only advertise builtin + * commands) can omit it and get the previous "always passthrough" + * behavior. + */ +export interface SlashCommandsSnapshot { + readonly commands: ReadonlyArray; + readonly skillCommandMap?: ReadonlyMap; +} + +type SlashCommandsResolver = + | ReadonlyArray + | SlashCommandsSnapshot + | (( + session: Session, + ) => + | Promise | SlashCommandsSnapshot> + | ReadonlyArray + | SlashCommandsSnapshot); + +interface ResolvedSlashCommands { + readonly commands: ReadonlyArray; + readonly skillCommandMap: ReadonlyMap; +} + +function toResolvedSlashCommands( + input: ReadonlyArray | SlashCommandsSnapshot, +): ResolvedSlashCommands { + if (Array.isArray(input)) { + return { commands: input, skillCommandMap: new Map() }; + } + const snap = input as SlashCommandsSnapshot; + return { + commands: snap.commands, + skillCommandMap: snap.skillCommandMap ?? new Map(), + }; +} + /** * Inline auth gate — moved out of `KimiAuthFacade.hasUsableToken()` so * the SDK doesn't have to carry an ACP-specific convenience method. @@ -88,6 +134,9 @@ export class AcpServer implements Agent { private readonly agentInfo: Implementation | undefined; private readonly terminalAuthEnv: Readonly> | undefined; private readonly terminalAuthLegacyCommand: string | undefined; + private readonly resolveSlashCommands: ( + session: Session, + ) => Promise; /** * Lazily-built inner {@link Kaos} (a {@link LocalKaos}) used as the * delegate target for every {@link AcpKaos} this server hands out. @@ -117,11 +166,34 @@ export class AcpServer implements Agent { * directly. Defaults to undefined (the `_meta` fallback is omitted). */ terminalAuthLegacyCommand?: string; + /** + * Slash commands to advertise in the one-shot + * `available_commands_update` pushed immediately after each + * `session/new`, `session/load`, and `session/resume`. Accepts + * either a static array, or a resolver called once per session + * (with the just-created `Session`) so per-session sources like + * `session.listSkills()` can be merged in. When omitted, the + * adapter falls back to an empty list. + * + * Returning a {@link SlashCommandsSnapshot} (`{ commands, skillCommandMap }`) + * additionally lets {@link AcpSession.prompt} intercept + * `/skill: ...` inputs at the adapter boundary and route + * them to {@link Session.activateSkill} instead of forwarding the + * raw slash text — matching the TUI's slash-command behavior so + * skill activations don't fall back to model-driven Bash + * exploration of `~/.kimi-code/skills/`. + */ + slashCommands?: SlashCommandsResolver; }, ) { this.agentInfo = opts?.agentInfo; this.terminalAuthEnv = opts?.terminalAuthEnv; this.terminalAuthLegacyCommand = opts?.terminalAuthLegacyCommand; + const slash = opts?.slashCommands; + this.resolveSlashCommands = + typeof slash === 'function' + ? async (session) => toResolvedSlashCommands(await slash(session)) + : async () => toResolvedSlashCommands(slash ?? []); } /** Returns the {@link AcpVersionSpec} chosen during `initialize`, if any. */ @@ -210,10 +282,12 @@ export class AcpServer implements Agent { // the same reference, no AsyncLocalStorage needed. const sessionId = `session_${randomUUID()}`; const acpKaos = await this.maybeBuildAcpKaos(sessionId); + const persistenceKaos = acpKaos === undefined ? undefined : await this.ensureInnerKaos(); const session = await this.harness.createSession({ id: sessionId, workDir: params.cwd, - ...(acpKaos !== undefined ? { kaos: acpKaos } : {}), + kaos: acpKaos, + persistenceKaos, // @ts-expect-error — `mcpServers` is a kernel-side extension // (agent-core `CreateSessionPayload`) the SDK transparently // forwards via spread. See block comment above. @@ -236,7 +310,6 @@ export class AcpServer implements Agent { // property set is deliberately minimal: `mode` distinguishes // `newSession` from `loadSession`; no user content / PII. this.trackSessionStarted(session.id, 'new'); - await this.emitAvailableCommandsUpdate(session.id); // Phase 14 (PLAN D11) advertises both the model and mode pickers as // a unified `configOptions: SessionConfigOption[]` surface. The // dedicated Phase 12 `modes:` field is gone — see @@ -249,14 +322,16 @@ export class AcpServer implements Agent { // current model's catalog row advertises `thinkingSupported`; // Phase 16 reshaped that toggle from `boolean` to a 2-entry // `select` so Zed actually renders it. + const configOptions = await buildSessionConfigOptions( + this.harness, + currentModelId, + currentThinkingEnabled, + DEFAULT_MODE_ID, + ); + this.scheduleAvailableCommandsUpdate(session.id); return { sessionId: session.id, - configOptions: await buildSessionConfigOptions( - this.harness, - currentModelId, - currentThinkingEnabled, - DEFAULT_MODE_ID, - ), + configOptions, }; } @@ -298,7 +373,7 @@ export class AcpServer implements Agent { // its own UI bootstrap. This is the ONE difference vs. // `resumeSession`, which intentionally omits this step. await acpSession.replayHistory(); - await this.emitAvailableCommandsUpdate(session.id); + this.scheduleAvailableCommandsUpdate(session.id); return { configOptions }; } @@ -331,7 +406,7 @@ export class AcpServer implements Agent { // can observe which clients adopt the lighter-weight resume // surface vs the history-replaying load surface. No PII. this.trackSessionStarted(session.id, 'resume'); - await this.emitAvailableCommandsUpdate(session.id); + this.scheduleAvailableCommandsUpdate(session.id); return { configOptions }; } @@ -384,11 +459,13 @@ export class AcpServer implements Agent { // kernel. const mcpServers = acpMcpServersToConfigs(params.mcpServers); const acpKaos = await this.maybeBuildAcpKaos(params.sessionId); + const persistenceKaos = acpKaos === undefined ? undefined : await this.ensureInnerKaos(); let session: Session; try { session = await this.harness.resumeSession({ id: params.sessionId, - ...(acpKaos !== undefined ? { kaos: acpKaos } : {}), + kaos: acpKaos, + persistenceKaos, // @ts-expect-error — see block comment above; mcpServers is a // kernel-only field that the SDK forwards via spread. mcpServers, @@ -470,10 +547,15 @@ export class AcpServer implements Agent { if (!this.conn) { return undefined; } + const innerKaos = await this.ensureInnerKaos(); + return new AcpKaos(this.conn, sessionId, innerKaos); + } + + private async ensureInnerKaos(): Promise { if (!this.innerKaos) { this.innerKaos = await LocalKaos.create(); } - return new AcpKaos(this.conn, sessionId, this.innerKaos); + return this.innerKaos; } /** @@ -810,30 +892,35 @@ export class AcpServer implements Agent { }; } - /** - * Push the one-shot `available_commands_update` session_update that - * ACP clients use to populate a slash-command palette. Called once - * per session, immediately after the {@link AcpSession} is - * registered (so the wire id is stable when the notification - * arrives) but before the public RPC reply is returned (so a client - * that synchronously sets up listeners on session creation cannot - * miss the event). - * - * The kimi-code slash-command registry lives in - * `apps/kimi-code/src/tui/commands/registry.ts` — i.e. an app-level - * concern that the acp-adapter (a `packages/` library) has no - * access to. We emit an empty list so the client sees a - * deterministic update; a richer surface is deferred to a future - * step (PLAN D9 / ext_method). - * - * Errors are caught and logged, never thrown: pushing - * `session/update` is a streaming concern, not load-bearing for - * the `session/new` (or `session/load`) reply. - */ + private scheduleAvailableCommandsUpdate(sessionId: string): void { + setTimeout(() => { + void this.emitAvailableCommandsUpdate(sessionId); + }, 0); + } + private async emitAvailableCommandsUpdate(sessionId: string): Promise { if (!this.conn) return; + const acpSession = this.sessions.get(sessionId); + if (!acpSession) return; try { - await this.conn.sessionUpdate(availableCommandsUpdateNotification(sessionId)); + const { commands, skillCommandMap } = await this.resolveSlashCommands( + acpSession.session, + ); + // Seed the AcpSession's per-session skill map BEFORE the + // notification goes out. The resolver call already awaited the + // (async) `listSkills()` round trip, so the map is the same + // snapshot the client sees in its palette — no separate + // listSkills() invocation, no race window between "the client + // has skill X in its palette" and "the adapter knows to + // intercept /skill:X". Intentionally tolerant of older AcpSession + // builds that pre-date `setSkillCommandMap` (adapter-level unit + // tests construct stubbed sessions). + if (typeof acpSession.setSkillCommandMap === 'function') { + acpSession.setSkillCommandMap(skillCommandMap); + } + await this.conn.sessionUpdate( + availableCommandsUpdateNotification(sessionId, commands), + ); } catch (err) { log.warn('acp: failed to push available_commands_update', { sessionId, @@ -882,6 +969,7 @@ export async function runAcpServerWithStream( agentInfo?: Implementation; terminalAuthEnv?: Readonly>; terminalAuthLegacyCommand?: string; + slashCommands?: SlashCommandsResolver; }, ): Promise { const conn = new AgentSideConnection((c) => new AcpServer(harness, c, opts), stream); @@ -931,6 +1019,11 @@ export async function runAcpServer( * ctor for compatibility rationale. */ terminalAuthLegacyCommand?: string; + /** + * Slash commands to advertise to ACP clients so their slash-command + * palette is populated. See {@link AcpServer} ctor for details. + */ + slashCommands?: SlashCommandsResolver; /** * @internal Test seam — supply a fake `EventEmitter` (or a * subset that exposes `.once` / `.off`) to drive SIGINT / SIGTERM @@ -986,6 +1079,7 @@ export async function runAcpServer( agentInfo: opts?.agentInfo, terminalAuthEnv: opts?.terminalAuthEnv, terminalAuthLegacyCommand: opts?.terminalAuthLegacyCommand, + slashCommands: opts?.slashCommands, }); } finally { // Uninstall BEFORE the final cleanup so a second SIGINT (a user diff --git a/packages/acp-adapter/src/session.ts b/packages/acp-adapter/src/session.ts index 0c95c02a1..0396692df 100644 --- a/packages/acp-adapter/src/session.ts +++ b/packages/acp-adapter/src/session.ts @@ -45,6 +45,7 @@ import { } from './events-map'; import { acpModeToToggles, DEFAULT_MODE_ID, isAcpModeId, type AcpModeId } from './modes'; import { outcomeToQuestionAnswer, questionItemToPermissionOptions } from './question'; +import { detectSlashIntent } from './slash'; /** * Telemetry sink threaded into {@link AcpSession} so reverse-RPC bridges @@ -121,6 +122,20 @@ export class AcpSession { */ private currentModeIdInternal: AcpModeId = DEFAULT_MODE_ID; + /** + * Per-session `slash command name → skill name` map, seeded by + * {@link AcpServer.emitAvailableCommandsUpdate} from the same + * `listSkills()` snapshot that builds the client palette. Consulted + * by {@link prompt} to intercept `/skill: ...` inputs and + * route them to {@link Session.activateSkill} instead of forwarding + * the raw slash text to {@link Session.prompt} — which is what made + * Zed fall back to model-driven Bash exploration of + * `~/.kimi-code/skills/` and incurred permission prompts. Defaults + * to an empty map so adapter-level unit tests (which never call + * `setSkillCommandMap`) behave as a no-op passthrough. + */ + private skillCommandMap: ReadonlyMap = new Map(); + constructor( readonly conn: AgentSideConnection, readonly session: Session, @@ -238,6 +253,17 @@ export class AcpSession { await this.session.cancel(); } + /** + * Seed the per-session `slash command name → skill name` map used by + * {@link prompt} to intercept `/skill: ...` inputs. Called by + * {@link AcpServer.emitAvailableCommandsUpdate} from the same + * `listSkills()` snapshot that builds the client palette, so the map + * stays in lockstep with what the client advertises. + */ + setSkillCommandMap(map: ReadonlyMap): void { + this.skillCommandMap = map; + } + /** * Forward an ACP `session/set_model` (`unstable_setSessionModel`) * request to the underlying SDK session. @@ -648,18 +674,48 @@ export class AcpSession { const sessionId = this.id; const conn = this.conn; - return this.runPromptBody(parts, sessionId, conn); + // ACP clients (Zed, JetBrains) send `/skill: [args]` as a + // plain text `ContentBlock` in `session/prompt`. The TUI client + // parses the same form in-process and dispatches via + // `Session.activateSkill(...)`, which renders the skill template + // inline and never round-trips through model→tool→FS lookup. To + // keep the two surfaces semantically aligned we re-parse here and + // route to the same SDK entry point; everything else (including + // unknown slash commands like `/clear`, which are TUI-only) + // passes through to `Session.prompt` unchanged. + const intent = detectLeadingSlashIntent(blocks, this.skillCommandMap); + if (intent.kind === 'skill') { + this.emitTelemetry('acp_skill_activated', { skill_name: intent.skillName }); + const skillName = intent.skillName; + const skillArgs = intent.args; + return this.runTurnBody(sessionId, conn, () => + // `activateSkill` accepts `args?: string | undefined`; pass the + // empty string through verbatim — the SDK's + // `normalizeOptionalString` converts `''` to `undefined`, which + // is the canonical "no args" form for the skill renderer. + this.session.activateSkill(skillName, skillArgs.length > 0 ? skillArgs : undefined), + ); + } + + return this.runTurnBody(sessionId, conn, () => this.session.prompt(parts)); } /** * Body of {@link prompt}, extracted so the event-listener invariants * — single `onEvent` subscription, `settled` flag semantics, - * `currentTurnId` reset — live in one place. + * `currentTurnId` reset — live in one place and can be driven by + * either `Session.prompt(parts)` or `Session.activateSkill(name, args)`. + * Both entry points trigger the same downstream turn (skill + * activation internally calls `agent.turn.prompt(...)` after + * injecting the `` block — see + * `packages/agent-core/src/agent/skill/index.ts`), so the event + * subscription's `turn.started` / `turn.ended` semantics apply + * uniformly. */ - private runPromptBody( - parts: ReturnType, + private runTurnBody( sessionId: string, conn: AgentSideConnection, + kick: () => Promise, ): Promise { return new Promise((resolve, reject) => { let settled = false; @@ -691,7 +747,16 @@ export class AcpSession { // each turn produces a distinct wire-level tool call that needs // its own CREATE. const startedToolCalls = new Set(); + const initialActiveTurnId = this.currentTurnId; + let hasReceivedOwnTurnStarted = false; const unsub = this.session.onEvent((event) => { + if ( + event.type === 'turn.started' && + isFromMainAgent(event) && + (initialActiveTurnId === undefined || event.turnId !== initialActiveTurnId) + ) { + hasReceivedOwnTurnStarted = true; + } // Track the active turn so `handleApproval` (registered once at // construction, called via `setApprovalHandler`) can compose the // prefixed `${turnId}:${toolCallId}` wire id that matches the @@ -712,6 +777,7 @@ export class AcpSession { if (settled) return; if (!isFromMainAgent(event)) return; if (event.code !== ErrorCodes.TURN_AGENT_BUSY) return; + if (hasReceivedOwnTurnStarted) return; settled = true; argsByToolCall.clear(); startedToolCalls.clear(); @@ -730,6 +796,7 @@ export class AcpSession { return; } if (event.type === 'assistant.delta') { + if (!isFromMainAgent(event)) return; // `sessionUpdate` is itself async (it serializes onto the // ndjson stream). The text deltas form a strictly ordered // single-producer/single-consumer pipeline, so each await @@ -747,6 +814,7 @@ export class AcpSession { return; } if (event.type === 'thinking.delta') { + if (!isFromMainAgent(event)) return; conn .sessionUpdate(thinkingDeltaToSessionUpdate(sessionId, event)) .catch((err) => { @@ -758,6 +826,7 @@ export class AcpSession { return; } if (event.type === 'tool.call.started') { + if (!isFromMainAgent(event)) return; // Seed the accumulator with the **stringified initial args**. // The wire-level `tool_call_update` is REPLACE-content (not // append) so each subsequent delta emits the cumulative args @@ -816,6 +885,7 @@ export class AcpSession { return; } if (event.type === 'tool.call.delta') { + if (!isFromMainAgent(event)) return; // The agent-core emits these args-stream deltas BEFORE the // `tool.call.started` event (deltas come from the provider's // streaming phase; started is dispatched afterwards). If we @@ -858,6 +928,7 @@ export class AcpSession { return; } if (event.type === 'tool.progress') { + if (!isFromMainAgent(event)) return; const note = toolProgressToSessionUpdate(sessionId, event); if (note === null) return; conn.sessionUpdate(note).catch((err) => { @@ -870,6 +941,7 @@ export class AcpSession { return; } if (event.type === 'tool.result') { + if (!isFromMainAgent(event)) return; conn .sessionUpdate(toolResultToSessionUpdate(sessionId, event)) .catch((err) => { @@ -919,7 +991,7 @@ export class AcpSession { } }); - this.session.prompt(parts).catch((err) => { + kick().catch((err) => { if (settled) return; settled = true; unsub(); @@ -1111,6 +1183,29 @@ export class AcpSession { * The kimi-cli Python reference performs the same mapping at * `kimi-cli/src/kimi_cli/acp/session.py:218-247`; this is the TS port. */ +/** + * Inspect the leading `ContentBlock` of an ACP prompt for a + * `/skill:` form. Only the first block is examined — when Zed + * (or any other ACP client) sends a slash command, it always lives in + * the first text block; multi-part prompts that interleave images or + * resources before text are typed by humans and do not start with a + * slash. Non-text leading blocks short-circuit to passthrough. + * + * The parsing/resolution itself is delegated to `./slash` — + * deliberately duplicated from the TUI's + * `apps/kimi-code/src/tui/commands/parse.ts` and `resolve.ts` to + * avoid an app→package import inversion. See `./slash`'s top-of-file + * comment for the sync target. + */ +function detectLeadingSlashIntent( + blocks: readonly ContentBlock[], + skillCommandMap: ReadonlyMap, +): ReturnType { + const first = blocks[0]; + if (!first || first.type !== 'text') return { kind: 'passthrough' }; + return detectSlashIntent(first.text, skillCommandMap); +} + function mapPromptError(err: unknown, sessionId: string): RequestError { const authErr = authRequiredFromUnknown(err); if (authErr) { diff --git a/packages/acp-adapter/src/slash.ts b/packages/acp-adapter/src/slash.ts new file mode 100644 index 000000000..b0a5e7835 --- /dev/null +++ b/packages/acp-adapter/src/slash.ts @@ -0,0 +1,50 @@ +// Slash-command detection for ACP `session/prompt`. +// +// Copied from the TUI's `apps/kimi-code/src/tui/commands/parse.ts` and the +// skill-resolution slice of `apps/kimi-code/src/tui/commands/resolve.ts` +// (`resolveSkillCommand`). The TUI's resolver also handles builtin slash +// commands and `streaming/compacting` busy gates; ACP does not surface +// those concepts (Zed serializes `session/prompt` requests at the +// transport layer, and TUI builtins like `/clear` are TUI-process-only), +// so this module deliberately only handles the `skill:` form. +// +// Sync target: if the TUI parser's accepted grammar changes (e.g. the +// "no `/` inside name" rule), update the duplicate here too. + +export interface ParsedSlashInput { + readonly name: string; + readonly args: string; +} + +export type SlashIntent = + | { readonly kind: 'skill'; readonly skillName: string; readonly args: string } + | { readonly kind: 'passthrough' }; + +export function parseSlashInput(input: string): ParsedSlashInput | null { + if (!input.startsWith('/')) return null; + const trimmed = input.slice(1).trim(); + if (trimmed.length === 0) return null; + const spaceIdx = trimmed.indexOf(' '); + const name = spaceIdx === -1 ? trimmed : trimmed.slice(0, spaceIdx); + const args = spaceIdx === -1 ? '' : trimmed.slice(spaceIdx + 1).trim(); + if (name.includes('/')) return null; + return { name, args }; +} + +export function resolveSkillCommand( + skillCommandMap: ReadonlyMap, + commandName: string, +): string | undefined { + return skillCommandMap.get(commandName) ?? skillCommandMap.get(`skill:${commandName}`); +} + +export function detectSlashIntent( + text: string, + skillCommandMap: ReadonlyMap, +): SlashIntent { + const parsed = parseSlashInput(text); + if (parsed === null) return { kind: 'passthrough' }; + const skillName = resolveSkillCommand(skillCommandMap, parsed.name); + if (skillName === undefined) return { kind: 'passthrough' }; + return { kind: 'skill', skillName, args: parsed.args }; +} diff --git a/packages/acp-adapter/test/approval.test.ts b/packages/acp-adapter/test/approval.test.ts index 309466697..19c7bd6be 100644 --- a/packages/acp-adapter/test/approval.test.ts +++ b/packages/acp-adapter/test/approval.test.ts @@ -175,6 +175,21 @@ describe('permissionResponseToApprovalResponse', () => { expect(result).toEqual({ decision: 'rejected' }); }); + it('maps legacy "approve" → { decision: approved } (Python kimi-cli compat)', () => { + const result = permissionResponseToApprovalResponse(undefined, { + outcome: { outcome: 'selected', optionId: 'approve' }, + }); + expect(result).toEqual({ decision: 'approved' }); + expect(result.scope).toBeUndefined(); + }); + + it('maps legacy "approve_for_session" → { decision: approved, scope: session } (Python kimi-cli compat)', () => { + const result = permissionResponseToApprovalResponse(undefined, { + outcome: { outcome: 'selected', optionId: 'approve_for_session' }, + }); + expect(result).toEqual({ decision: 'approved', scope: 'session' }); + }); + it('defensively maps an unknown optionId to { decision: rejected }', () => { const result = permissionResponseToApprovalResponse(undefined, { outcome: { outcome: 'selected', optionId: 'unknown_option_id' }, diff --git a/packages/acp-adapter/test/kaos-acp.test.ts b/packages/acp-adapter/test/kaos-acp.test.ts index eef219922..869d77f6b 100644 --- a/packages/acp-adapter/test/kaos-acp.test.ts +++ b/packages/acp-adapter/test/kaos-acp.test.ts @@ -340,19 +340,20 @@ describe('AcpKaos', () => { ]); }); - it('append mode also treats loose "not found" message as missing file', async () => { - // Some clients return plain JSON-RPC errors without the SDK helpers. + it('append mode does not treat a loose "not found" message as missing file', async () => { + // ACP adapters should only trust structured not-found errors here; wrapper + // messages include the path, so path-only or permission failures can contain + // "not found" without meaning that the target is absent. const conn = makeMockConn({ readHandler: async () => { - throw new Error('file not found'); + throw new Error('permission denied for /tmp/not found/file.txt'); }, }); const kaos = new AcpKaos(conn.asConn(), 's1', makeMockInner()); - const n = await kaos.writeText('/missing.ts', 'fresh', { mode: 'a' }); - expect(n).toBe(5); - expect(conn.writeCalls).toEqual([ - { sessionId: 's1', path: '/missing.ts', content: 'fresh' }, - ]); + + await expect(kaos.writeText('/tmp/not found/file.txt', 'fresh', { mode: 'a' })) + .rejects.toBeInstanceOf(KaosError); + expect(conn.writeCalls).toEqual([]); }); it('append mode rethrows non-not-found read errors and does NOT issue a write', async () => { diff --git a/packages/acp-adapter/test/kaos-activation.test.ts b/packages/acp-adapter/test/kaos-activation.test.ts index 6317c1bd7..ad6b03c0d 100644 --- a/packages/acp-adapter/test/kaos-activation.test.ts +++ b/packages/acp-adapter/test/kaos-activation.test.ts @@ -59,7 +59,7 @@ function makeInMemoryStreamPair(): { } interface CapturedCreate { - options: { id?: string; workDir: string; kaos?: Kaos }; + options: { id?: string; workDir: string; kaos?: Kaos; persistenceKaos?: Kaos }; } function makeHarness(captured: CapturedCreate[]): KimiHarness { @@ -72,7 +72,7 @@ function makeHarness(captured: CapturedCreate[]): KimiHarness { }) as unknown as Session; return { auth: { status: async () => AUTHED_STATUS }, - createSession: async (options: { id?: string; workDir: string; kaos?: Kaos }) => { + createSession: async (options: { id?: string; workDir: string; kaos?: Kaos; persistenceKaos?: Kaos }) => { captured.push({ options }); return fakeSession(options.id ?? 'fallback'); }, @@ -98,6 +98,8 @@ describe('AcpServer FS-capability activation (boundary injection)', () => { expect(captured).toHaveLength(1); expect(captured[0]?.options.kaos).toBeInstanceOf(AcpKaos); expect(captured[0]?.options.kaos?.name).toBe('acp(local)'); + expect(captured[0]?.options.persistenceKaos).toBeDefined(); + expect(captured[0]?.options.persistenceKaos).not.toBe(captured[0]?.options.kaos); }); it('passes an AcpKaos when only fs.writeTextFile is advertised', async () => { @@ -116,9 +118,11 @@ describe('AcpServer FS-capability activation (boundary injection)', () => { expect(captured).toHaveLength(1); expect(captured[0]?.options.kaos).toBeInstanceOf(AcpKaos); + expect(captured[0]?.options.persistenceKaos).toBeDefined(); + expect(captured[0]?.options.persistenceKaos).not.toBe(captured[0]?.options.kaos); }); - it('omits kaos when the client advertises no FS capability', async () => { + it('passes persistenceKaos only when tool AcpKaos is active and omits both when no FS capability', async () => { const captured: CapturedCreate[] = []; const harness = makeHarness(captured); const { agentStream, clientStream } = makeInMemoryStreamPair(); @@ -134,6 +138,7 @@ describe('AcpServer FS-capability activation (boundary injection)', () => { expect(captured).toHaveLength(1); expect(captured[0]?.options.kaos).toBeUndefined(); + expect(captured[0]?.options.persistenceKaos).toBeUndefined(); }); it('omits kaos when the FS capability flags are both false', async () => { @@ -152,6 +157,7 @@ describe('AcpServer FS-capability activation (boundary injection)', () => { expect(captured).toHaveLength(1); expect(captured[0]?.options.kaos).toBeUndefined(); + expect(captured[0]?.options.persistenceKaos).toBeUndefined(); }); it('threads the per-session id into the AcpKaos so reverse-RPC calls route to the right session', async () => { diff --git a/packages/acp-adapter/test/session-prompt.test.ts b/packages/acp-adapter/test/session-prompt.test.ts index a33ae22a9..048fd57f0 100644 --- a/packages/acp-adapter/test/session-prompt.test.ts +++ b/packages/acp-adapter/test/session-prompt.test.ts @@ -257,10 +257,117 @@ describe('AcpServer session/prompt', () => { expect(unsubscribeCount()).toBe(1); }); + it('does not reject an already-started prompt when a later prompt gets busy', async () => { + const sessionId = 'sess-busy-active'; + const listeners = new Set<(event: Event) => void>(); + let unsubCount = 0; + let promptCall = 0; + let firstError: unknown; + let resolveFirstTurn: (() => void) | undefined; + const firstTurn = new Promise((resolve) => { + resolveFirstTurn = () => { + resolve(); + }; + }); + void firstTurn.then(() => { + for (const fn of listeners) { + fn({ type: 'turn.ended', sessionId, agentId: 'main', turnId: 1, reason: 'completed' } as Event); + } + }); + const session = { + id: sessionId, + prompt: async (_input: unknown) => { + promptCall += 1; + await Promise.resolve(); + if (promptCall === 1) { + for (const fn of listeners) { + fn({ + type: 'turn.started', + sessionId, + agentId: 'main', + turnId: 1, + origin: { kind: 'user' }, + } as unknown as Event); + } + await firstTurn; + return; + } + for (const fn of listeners) { + fn({ + type: 'error', + sessionId, + agentId: 'main', + code: 'turn.agent_busy', + message: 'Cannot launch a new turn while another turn (ID 1) is active', + details: { turnId: 1 }, + retryable: true, + } as unknown as Event); + } + }, + cancel: async () => undefined, + onEvent: (fn: (event: Event) => void) => { + listeners.add(fn); + return () => { + unsubCount += 1; + listeners.delete(fn); + }; + }, + } as unknown as Session; + const harness = { + auth: { status: async () => AUTHED_STATUS }, + createSession: async () => session, + } as unknown as KimiHarness; + + const { agentStream, clientStream } = makeInMemoryStreamPair(); + new AgentSideConnection((c) => new AcpServer(harness, c), agentStream); + const client = new ClientSideConnection(() => new CollectingClient(), clientStream); + + await client.newSession({ cwd: '/tmp/x', mcpServers: [] }); + + const firstPrompt = client + .prompt({ sessionId, prompt: [textBlock('active')] }) + .then( + (response) => response, + (error) => { + firstError = error; + throw error; + }, + ); + await Promise.resolve(); + + await expect( + client.prompt({ sessionId, prompt: [textBlock('busy')] }), + ).rejects.toMatchObject({ code: -32600 }); + expect(firstError).toBeUndefined(); + + resolveFirstTurn?.(); + await expect(firstPrompt).resolves.toMatchObject({ stopReason: 'end_turn' }); + expect(unsubCount).toBe(2); + }); + it('ignores a subagent turn.ended and resolves on the main agent turn.ended', async () => { const sessionId = 'sess-subagent'; const { session, unsubscribeCount } = makeScriptedSession(sessionId, [ { type: 'assistant.delta', sessionId, agentId: 'main', turnId: 1, delta: 'a' } as Event, + { type: 'assistant.delta', sessionId, agentId: 'sub-1', turnId: 99, delta: 'leak' } as Event, + { type: 'thinking.delta', sessionId, agentId: 'sub-1', turnId: 99, delta: 'leak' } as Event, + { + type: 'tool.call.started', + sessionId, + agentId: 'sub-1', + turnId: 99, + toolCallId: 'sub-tool', + name: 'Shell', + args: { command: 'echo leak' }, + } as Event, + { + type: 'tool.result', + sessionId, + agentId: 'sub-1', + turnId: 99, + toolCallId: 'sub-tool', + output: 'leak', + } as Event, // A subagent finishes its own turn while the main turn is still // running. Pre-fix this would resolve the parent prompt with // `end_turn` and leak the listener; post-fix it must be ignored. diff --git a/packages/acp-adapter/test/session-resume.test.ts b/packages/acp-adapter/test/session-resume.test.ts index 3711308c7..34b92783a 100644 --- a/packages/acp-adapter/test/session-resume.test.ts +++ b/packages/acp-adapter/test/session-resume.test.ts @@ -221,6 +221,10 @@ describe('AcpServer.resumeSession', () => { const clientConn = new ClientSideConnection((_a) => client, clientStream); await clientConn.resumeSession({ sessionId, cwd: '/tmp/x', mcpServers: [] }); + // available_commands_update is emitted via setTimeout(0) AFTER the + // resumeSession reply so Zed sees the wire id first; wait one + // macrotask before asserting. + await new Promise((resolve) => setTimeout(resolve, 25)); // Exactly ONE notification: the available_commands_update. Compare // to session-load.test.ts which sees 1 update per history turn diff --git a/packages/acp-adapter/test/session-slash.test.ts b/packages/acp-adapter/test/session-slash.test.ts new file mode 100644 index 000000000..58fe78599 --- /dev/null +++ b/packages/acp-adapter/test/session-slash.test.ts @@ -0,0 +1,300 @@ +import { describe, expect, it } from 'vitest'; + +import { + AgentSideConnection, + ClientSideConnection, + ndJsonStream, + type Client, + type ContentBlock, + type ReadTextFileRequest, + type ReadTextFileResponse, + type RequestPermissionRequest, + type RequestPermissionResponse, + type SessionNotification, + type WriteTextFileRequest, + type WriteTextFileResponse, +} from '@agentclientprotocol/sdk'; +import type { Event, KimiHarness, Session } from '@moonshot-ai/kimi-code-sdk'; + +import { AcpServer } from '../src/server'; +import { AUTHED_STATUS } from './_helpers/harness-stubs'; + +class CollectingClient implements Client { + readonly updates: SessionNotification[] = []; + async requestPermission(_p: RequestPermissionRequest): Promise { + throw new Error('requestPermission should not be called'); + } + async sessionUpdate(n: SessionNotification): Promise { + this.updates.push(n); + } + async writeTextFile(_p: WriteTextFileRequest): Promise { + throw new Error('writeTextFile should not be called'); + } + async readTextFile(_p: ReadTextFileRequest): Promise { + throw new Error('readTextFile should not be called'); + } +} + +function makeInMemoryStreamPair(): { + agentStream: ReturnType; + clientStream: ReturnType; +} { + const c2a = new TransformStream(); + const a2c = new TransformStream(); + return { + agentStream: ndJsonStream(a2c.writable, c2a.readable), + clientStream: ndJsonStream(c2a.writable, a2c.readable), + }; +} + +/** + * Fake `Session` that records every call to `prompt` / `activateSkill` + * and emits a pre-recorded event sequence to any subscribed listener + * after a microtask (matches real RPC ordering: the kick returns + * before the first event lands). + * + * `listSkills` returns a single Prompt skill so the AcpServer's + * `available_commands_update` resolver also populates the per-session + * `skillCommandMap` that {@link AcpSession.prompt} consults. + */ +function makeFakeSession( + sessionId: string, + script: readonly Event[], +): { + session: Session; + calls: { + prompt: number; + activate: Array<{ name: string; args?: string | undefined }>; + }; +} { + const listeners = new Set<(event: Event) => void>(); + const calls = { + prompt: 0, + activate: [] as Array<{ name: string; args?: string | undefined }>, + }; + const emit = async (): Promise => { + await Promise.resolve(); + for (const ev of script) { + for (const fn of listeners) fn(ev); + } + }; + const session = { + id: sessionId, + prompt: async (_input: unknown) => { + calls.prompt += 1; + await emit(); + }, + activateSkill: async (name: string, args?: string | undefined) => { + calls.activate.push({ name, args }); + await emit(); + }, + cancel: async () => undefined, + onEvent: (fn: (event: Event) => void) => { + listeners.add(fn); + return () => { + listeners.delete(fn); + }; + }, + listSkills: async () => [ + { + name: 'foo', + description: 'foo skill', + path: '/tmp/foo.md', + source: 'user' as const, + type: 'prompt', + }, + ], + } as unknown as Session; + return { session, calls }; +} + +const textBlock = (text: string): ContentBlock => ({ type: 'text', text }); + +function endedTurn(sessionId: string): Event { + return { type: 'turn.ended', sessionId, agentId: 'main', turnId: 1, reason: 'completed' } as Event; +} + +/** + * Wait for the client to receive an `available_commands_update` push. + * The server schedules it via `setTimeout(0)` after `session/new` + * resolves, so we need a microtask boundary before sending a prompt + * that relies on the per-session `skillCommandMap` being seeded. + */ +async function waitForAvailableCommands( + collecting: CollectingClient, + timeoutMs = 200, +): Promise { + const deadline = Date.now() + timeoutMs; + while (Date.now() < deadline) { + if ( + collecting.updates.some( + (n) => + (n.update as { sessionUpdate?: string }).sessionUpdate === + 'available_commands_update', + ) + ) { + return; + } + await new Promise((r) => setTimeout(r, 5)); + } + throw new Error('available_commands_update never arrived'); +} + +describe('AcpSession slash routing', () => { + it('routes `/skill:foo bar` to Session.activateSkill (not Session.prompt)', async () => { + const sessionId = 'sess-slash-A'; + const { session, calls } = makeFakeSession(sessionId, [endedTurn(sessionId)]); + const harness = { + auth: { status: async () => AUTHED_STATUS }, + createSession: async () => session, + } as unknown as KimiHarness; + + const { agentStream, clientStream } = makeInMemoryStreamPair(); + // The CLI wires `slashCommands` to a resolver that returns both the + // palette and `skillCommandMap`; mirror that here so the per- + // session skill map is seeded before the prompt fires. + new AgentSideConnection( + (c) => + new AcpServer(harness, c, { + slashCommands: async (s) => { + const skills = await s.listSkills(); + const map = new Map(); + const commands = skills.map((sk) => { + const name = `skill:${sk.name}`; + map.set(name, sk.name); + return { name, description: sk.description }; + }); + return { commands, skillCommandMap: map }; + }, + }), + agentStream, + ); + const collecting = new CollectingClient(); + const client = new ClientSideConnection(() => collecting, clientStream); + + await client.newSession({ cwd: '/tmp/x', mcpServers: [] }); + await waitForAvailableCommands(collecting); + + const response = await client.prompt({ + sessionId, + prompt: [textBlock('/skill:foo bar baz')], + }); + + expect(response.stopReason).toBe('end_turn'); + expect(calls.prompt).toBe(0); + expect(calls.activate).toEqual([{ name: 'foo', args: 'bar baz' }]); + }); + + it('passes empty-string args as undefined to activateSkill', async () => { + const sessionId = 'sess-slash-B'; + const { session, calls } = makeFakeSession(sessionId, [endedTurn(sessionId)]); + const harness = { + auth: { status: async () => AUTHED_STATUS }, + createSession: async () => session, + } as unknown as KimiHarness; + + const { agentStream, clientStream } = makeInMemoryStreamPair(); + new AgentSideConnection( + (c) => + new AcpServer(harness, c, { + slashCommands: async (s) => { + const skills = await s.listSkills(); + const map = new Map(); + const commands = skills.map((sk) => { + const name = `skill:${sk.name}`; + map.set(name, sk.name); + return { name, description: sk.description }; + }); + return { commands, skillCommandMap: map }; + }, + }), + agentStream, + ); + const collecting = new CollectingClient(); + const client = new ClientSideConnection(() => collecting, clientStream); + + await client.newSession({ cwd: '/tmp/x', mcpServers: [] }); + await waitForAvailableCommands(collecting); + + await client.prompt({ sessionId, prompt: [textBlock('/skill:foo')] }); + + expect(calls.prompt).toBe(0); + expect(calls.activate).toEqual([{ name: 'foo', args: undefined }]); + }); + + it('passes through unknown slash (and non-slash) text to Session.prompt', async () => { + const sessionId = 'sess-slash-C'; + const { session, calls } = makeFakeSession(sessionId, [ + endedTurn(sessionId), + endedTurn(sessionId), + ]); + const harness = { + auth: { status: async () => AUTHED_STATUS }, + createSession: async () => session, + } as unknown as KimiHarness; + + const { agentStream, clientStream } = makeInMemoryStreamPair(); + new AgentSideConnection( + (c) => + new AcpServer(harness, c, { + slashCommands: async (s) => { + const skills = await s.listSkills(); + const map = new Map(); + const commands = skills.map((sk) => { + const name = `skill:${sk.name}`; + map.set(name, sk.name); + return { name, description: sk.description }; + }); + return { commands, skillCommandMap: map }; + }, + }), + agentStream, + ); + const collecting = new CollectingClient(); + const client = new ClientSideConnection(() => collecting, clientStream); + + await client.newSession({ cwd: '/tmp/x', mcpServers: [] }); + await waitForAvailableCommands(collecting); + + // Unknown slash: `/clear` is a TUI builtin not present in the + // skillCommandMap, so it MUST fall through to Session.prompt + // verbatim — the model receives the literal text. + await client.prompt({ sessionId, prompt: [textBlock('/clear')] }); + // Plain text: trivially passes through. + await client.prompt({ sessionId, prompt: [textBlock('hello world')] }); + + expect(calls.prompt).toBe(2); + expect(calls.activate).toEqual([]); + }); + + it('does not intercept slash when no skillCommandMap has been seeded', async () => { + // No `slashCommands` option at all → the adapter's internal map + // stays empty, so even a syntactically valid `/skill:foo` form + // falls through to Session.prompt. Verifies the "tolerant by + // default" stance for adapter-level unit tests. + const sessionId = 'sess-slash-D'; + const { session, calls } = makeFakeSession(sessionId, [endedTurn(sessionId)]); + const harness = { + auth: { status: async () => AUTHED_STATUS }, + createSession: async () => session, + } as unknown as KimiHarness; + + const { agentStream, clientStream } = makeInMemoryStreamPair(); + new AgentSideConnection((c) => new AcpServer(harness, c), agentStream); + const collecting = new CollectingClient(); + const client = new ClientSideConnection(() => collecting, clientStream); + + await client.newSession({ cwd: '/tmp/x', mcpServers: [] }); + // Wait for the (empty) available_commands_update to settle so the + // map seeder has fired its no-op pass. + await waitForAvailableCommands(collecting); + + await client.prompt({ + sessionId, + prompt: [textBlock('/skill:foo bar')], + }); + + expect(calls.prompt).toBe(1); + expect(calls.activate).toEqual([]); + }); +}); diff --git a/packages/acp-adapter/test/slash.test.ts b/packages/acp-adapter/test/slash.test.ts new file mode 100644 index 000000000..f014139f3 --- /dev/null +++ b/packages/acp-adapter/test/slash.test.ts @@ -0,0 +1,98 @@ +import { describe, expect, it } from 'vitest'; + +import { + detectSlashIntent, + parseSlashInput, + resolveSkillCommand, +} from '../src/slash'; + +describe('slash', () => { + describe('parseSlashInput', () => { + it('returns null for non-slash input', () => { + expect(parseSlashInput('hello')).toBeNull(); + expect(parseSlashInput('')).toBeNull(); + }); + + it('returns null for "/" with no name', () => { + expect(parseSlashInput('/')).toBeNull(); + expect(parseSlashInput('/ ')).toBeNull(); + }); + + it('rejects names containing further slashes', () => { + expect(parseSlashInput('/a/b')).toBeNull(); + }); + + it('parses a bare command', () => { + expect(parseSlashInput('/clear')).toEqual({ name: 'clear', args: '' }); + }); + + it('parses command + args, trimming inner whitespace', () => { + expect(parseSlashInput('/skill:foo bar baz')).toEqual({ + name: 'skill:foo', + args: 'bar baz', + }); + expect(parseSlashInput('/skill:foo spaced ')).toEqual({ + name: 'skill:foo', + args: 'spaced', + }); + }); + }); + + describe('resolveSkillCommand', () => { + const map = new Map([ + ['skill:foo', 'foo'], + ['skill:bar', 'bar'], + ]); + + it('matches the full `skill:` form directly', () => { + expect(resolveSkillCommand(map, 'skill:foo')).toBe('foo'); + }); + + it('also matches the bare `` form (`skill:` prefix added)', () => { + expect(resolveSkillCommand(map, 'foo')).toBe('foo'); + }); + + it('returns undefined for unknown commands', () => { + expect(resolveSkillCommand(map, 'clear')).toBeUndefined(); + }); + }); + + describe('detectSlashIntent', () => { + const map = new Map([['skill:foo', 'foo']]); + + it('routes a known `/skill:` form to a `skill` intent', () => { + expect(detectSlashIntent('/skill:foo bar', map)).toEqual({ + kind: 'skill', + skillName: 'foo', + args: 'bar', + }); + }); + + it('routes a bare `/foo` form to `skill` when the map has it', () => { + expect(detectSlashIntent('/foo bar', map)).toEqual({ + kind: 'skill', + skillName: 'foo', + args: 'bar', + }); + }); + + it('falls back to passthrough for unknown slash commands', () => { + // TUI builtins like /clear are not ACP-executable; let them + // pass through as plain text rather than rejecting at the + // adapter boundary (the model can still display the literal). + expect(detectSlashIntent('/clear', map)).toEqual({ kind: 'passthrough' }); + }); + + it('falls back to passthrough for non-slash text', () => { + expect(detectSlashIntent('hello', map)).toEqual({ kind: 'passthrough' }); + }); + + it('returns empty-string args for a known skill with no arguments', () => { + expect(detectSlashIntent('/skill:foo', map)).toEqual({ + kind: 'skill', + skillName: 'foo', + args: '', + }); + }); + }); +}); diff --git a/packages/acp-adapter/test/tool-result.test.ts b/packages/acp-adapter/test/tool-result.test.ts index d8489af6b..a5d8fe144 100644 --- a/packages/acp-adapter/test/tool-result.test.ts +++ b/packages/acp-adapter/test/tool-result.test.ts @@ -210,7 +210,11 @@ describe('AcpServer tool.result → tool_call_update', () => { await client.prompt({ sessionId, prompt: [textBlock('go')] }); await flushNdjson(); - const last = collecting.updates.at(-1)?.update as { sessionUpdate: string; status: string }; + const toolUpdates = collecting.updates.filter( + (u) => (u.update as { sessionUpdate?: string }).sessionUpdate !== + 'available_commands_update', + ); + const last = toolUpdates.at(-1)?.update as { sessionUpdate: string; status: string }; expect(last.sessionUpdate).toBe('tool_call_update'); expect(last.status).toBe('failed'); }); @@ -253,7 +257,11 @@ describe('AcpServer tool.result → tool_call_update', () => { await client.prompt({ sessionId, prompt: [textBlock('go')] }); await flushNdjson(); - const last = collecting.updates.at(-1)?.update as { + const toolUpdates = collecting.updates.filter( + (u) => (u.update as { sessionUpdate?: string }).sessionUpdate !== + 'available_commands_update', + ); + const last = toolUpdates.at(-1)?.update as { sessionUpdate: string; status: string; content: unknown[]; diff --git a/packages/agent-core/src/agent/index.ts b/packages/agent-core/src/agent/index.ts index ff0819dd0..2f353fc0c 100644 --- a/packages/agent-core/src/agent/index.ts +++ b/packages/agent-core/src/agent/index.ts @@ -96,7 +96,12 @@ export interface AgentOptions { export class Agent { readonly type: AgentType; - readonly kaos: Kaos; + private _kaos: Kaos; + + get kaos(): Kaos { + return this._kaos; + } + readonly kimiConfig?: KimiConfig; readonly homedir?: string; readonly rpc?: Partial; @@ -134,7 +139,7 @@ export class Agent { constructor(options: AgentOptions) { this.type = options.type ?? 'main'; - this.kaos = options.kaos; + this._kaos = options.kaos; this.kimiConfig = options.config; this.homedir = options.homedir; this.rpc = options.rpc; @@ -185,6 +190,10 @@ export class Agent { this.replayBuilder = new ReplayBuilder(this); } + setKaos(kaos: Kaos) { + this._kaos = kaos; + } + get generate(): typeof generate { return async (provider, systemPrompt, tools, history, callbacks, options) => { if (options?.auth !== undefined) { diff --git a/packages/agent-core/src/rpc/core-impl.ts b/packages/agent-core/src/rpc/core-impl.ts index 403308440..89c6449e4 100644 --- a/packages/agent-core/src/rpc/core-impl.ts +++ b/packages/agent-core/src/rpc/core-impl.ts @@ -188,7 +188,7 @@ export class KimiCore implements PromisableMethods { async createSessionWithOverrides( input: CreateSessionPayload, - overrides: { kaos?: Kaos }, + overrides: { kaos?: Kaos; persistenceKaos?: Kaos }, ): Promise { const options = input; const workDir = requiredWorkDir('createSession', options.workDir); @@ -218,8 +218,10 @@ export class KimiCore implements PromisableMethods { // ctor block throws, `session.close()` releases the sink (and mcp). const runtime = await this.resolveRuntime(config); const parentKaos = overrides.kaos ?? (await this.getKaos()); + const persistenceKaos = overrides.persistenceKaos ?? parentKaos; const session = new Session({ kaos: parentKaos.withCwd(workDir), + persistenceKaos, toolServices: runtime, config, id, @@ -295,11 +297,14 @@ export class KimiCore implements PromisableMethods { async resumeSessionWithOverrides( input: ResumeSessionPayload, - overrides: { kaos?: Kaos }, + overrides: { kaos?: Kaos; persistenceKaos?: Kaos }, ): Promise { const summary = await this.sessionStore.get(input.sessionId); const active = this.sessions.get(summary.id); if (active !== undefined) { + if (overrides.kaos !== undefined) { + active.setToolKaos(overrides.kaos.withCwd(summary.workDir)); + } return resumeSessionResult(summary, active); } @@ -314,8 +319,10 @@ export class KimiCore implements PromisableMethods { const mcpConfig = this.mergePluginMcpConfig(withCallerMcp); const runtime = await this.resolveRuntime(config); const parentKaos = overrides.kaos ?? (await this.getKaos()); + const persistenceKaos = overrides.persistenceKaos ?? parentKaos; const session = new Session({ kaos: parentKaos.withCwd(summary.workDir), + persistenceKaos, toolServices: runtime, config, id: summary.id, diff --git a/packages/agent-core/src/session/index.ts b/packages/agent-core/src/session/index.ts index 6974d3f05..e44e2899b 100644 --- a/packages/agent-core/src/session/index.ts +++ b/packages/agent-core/src/session/index.ts @@ -44,6 +44,7 @@ import { FlagResolver, type ExperimentalFlagResolver } from '../flags'; export interface SessionOptions { readonly kaos: Kaos; + readonly persistenceKaos?: Kaos; readonly config?: KimiConfig; readonly id?: string | undefined; readonly homedir: string; @@ -115,6 +116,8 @@ export class Session { readonly hookEngine: HookEngine; readonly goals: SessionGoalStore; readonly experimentalFlags: ExperimentalFlagResolver; + private toolKaos: Kaos; + private persistenceKaos: Kaos; private agentIdCounter = 0; private readonly skillsReady: Promise; metadata: SessionMeta = { @@ -166,6 +169,8 @@ export class Session { }, telemetry: this.telemetry, }); + this.toolKaos = options.kaos; + this.persistenceKaos = options.persistenceKaos ?? options.kaos; this.skills = new SkillRegistry({ sessionId: options.id }); this.mcp = new McpConnectionManager({ oauthService: new McpOAuthService({ kimiHomeDir: options.kimiHomeDir }), @@ -186,6 +191,25 @@ export class Session { }); } + + setToolKaos(kaos: Kaos) { + this.toolKaos = kaos; + for (const agent of this.readyAgents()) { + agent.setKaos(kaos.withCwd(agent.config.cwd)); + } + } + + /** + * Kaos used by session-internal bootstrap (AGENTS.md context, cwd listing) + * and metadata persistence. Always backed by the persistence sink (typically + * the local filesystem) so a transient ACP-side failure on system files like + * `AGENTS.md` never blocks `bootstrapAgentProfile` — tool calls still route + * through `agent.kaos` and continue to honor the ACP bridge. + */ + systemContextKaos(cwd: string): Kaos { + return this.persistenceKaos.withCwd(cwd); + } + async createMain() { const { agent } = await this.createAgent({ type: 'main' }, { profile: DEFAULT_AGENT_PROFILES['agent'], @@ -317,7 +341,9 @@ export class Session { agent: Agent, profile: ResolvedAgentProfile, ): Promise { - const context = await prepareSystemPromptContext(agent.kaos); + const context = await prepareSystemPromptContext( + this.systemContextKaos(agent.kaos.getcwd()), + ); agent.useProfile(profile, context); } @@ -365,15 +391,15 @@ export class Session { writeMetadata() { const text = JSON.stringify(this.metadata, null, 2); const write = async () => { - await this.options.kaos.mkdir(this.options.homedir, { parents: true, existOk: true }); - await this.options.kaos.writeText(this.metadataPath, text); + await this.persistenceKaos.mkdir(this.options.homedir, { parents: true, existOk: true }); + await this.persistenceKaos.writeText(this.metadataPath, text); }; this.writeMetadataPromise = this.writeMetadataPromise.then(write, write); return this.writeMetadataPromise; } async readMetadata() { - const text = await this.options.kaos.readText(this.metadataPath); + const text = await this.persistenceKaos.readText(this.metadataPath); this.metadata = JSON.parse(text); return this.metadata; } @@ -471,11 +497,11 @@ export class Session { parentAgentId: string | null = null, ): Agent { const parentAgent = parentAgentId !== null ? this.getReadyAgent(parentAgentId) : undefined; - const cwd = parentAgent?.config.cwd ?? this.options.kaos.getcwd(); + const cwd = parentAgent?.config.cwd ?? this.toolKaos.getcwd(); return new Agent({ ...config, type, - kaos: this.options.kaos.withCwd(cwd), + kaos: this.toolKaos.withCwd(cwd), toolServices: this.options.toolServices, config: this.options.config, homedir, diff --git a/packages/agent-core/src/session/subagent-host.ts b/packages/agent-core/src/session/subagent-host.ts index 17430d3dd..8bb23d0aa 100644 --- a/packages/agent-core/src/session/subagent-host.ts +++ b/packages/agent-core/src/session/subagent-host.ts @@ -317,7 +317,9 @@ export class SessionSubagentHost { thinkingLevel: parent.config.thinkingLevel, }); - const context = await prepareSystemPromptContext(child.kaos); + const context = await prepareSystemPromptContext( + this.session.systemContextKaos(child.kaos.getcwd()), + ); child.useProfile(profile, context); child.tools.inheritUserTools(parent.tools); } diff --git a/packages/agent-core/src/tools/builtin/file/read.ts b/packages/agent-core/src/tools/builtin/file/read.ts index 5c7110071..e9aa6472f 100644 --- a/packages/agent-core/src/tools/builtin/file/read.ts +++ b/packages/agent-core/src/tools/builtin/file/read.ts @@ -79,6 +79,17 @@ interface FinishReadResultInput { readonly requestedLines: number; } +type TextPreviewKaos = Kaos & { + readTextPreview?: (path: string, n: number) => Promise; +}; + +async function readTextHeader(kaos: TextPreviewKaos, path: string, n: number): Promise { + if (kaos.readTextPreview !== undefined) { + return kaos.readTextPreview(path, n); + } + return kaos.readBytes(path, n); +} + function truncateLine(line: string, maxLength: number): string { if (line.length <= maxLength) return line; const marker = '...'; @@ -211,7 +222,7 @@ export class ReadTool implements BuiltinTool { return { isError: true, output: `"${args.path}" is not a file.` }; } - const header = await this.kaos.readBytes(safePath, MEDIA_SNIFF_BYTES); + const header = await readTextHeader(this.kaos, safePath, MEDIA_SNIFF_BYTES); const fileType = detectFileType(safePath, header); if (fileType.kind === 'image' || fileType.kind === 'video') { return { diff --git a/packages/agent-core/test/session/init.test.ts b/packages/agent-core/test/session/init.test.ts index a7b516d7e..b534aac2a 100644 --- a/packages/agent-core/test/session/init.test.ts +++ b/packages/agent-core/test/session/init.test.ts @@ -4,6 +4,7 @@ import { join } from 'pathe'; import { testKaos } from '../fixtures/test-kaos'; import type { ProviderConfig, ToolCall } from '@moonshot-ai/kosong'; +import type { Kaos } from '@moonshot-ai/kaos'; import { afterEach, describe, expect, it, vi } from 'vitest'; import type { Agent, AgentOptions } from '../../src/agent'; @@ -117,6 +118,53 @@ describe('Session.init', () => { expect(contextText).not.toContain('Task requirements:'); }); + it('loads AGENTS.md via the persistence kaos when the tool kaos rejects readText (Zed ACP "Internal error" regression)', async () => { + const workDir = await makeTempDir(); + const sessionDir = await makeTempDir(); + await mkdir(join(workDir, '.git')); + await writeFile(join(workDir, 'AGENTS.md'), 'project instructions from disk', 'utf-8'); + + // Simulate Zed's `fs/readTextFile` returning a generic -32603 Internal + // error: every `readText` through the tool kaos rejects. The persistence + // kaos is a real LocalKaos that can reach AGENTS.md on disk. + const toolKaos = wrapReadTextWithError( + testKaos.withCwd(workDir), + new Error('acp: readTextFile failed: Internal error'), + ); + + const capturedContext: { agentsMd: string | undefined } = { agentsMd: undefined }; + const events: Array> = []; + const session = new Session({ + id: 'test-bootstrap-acp-fallback', + kaos: toolKaos, + persistenceKaos: testKaos.withCwd(workDir), + homedir: sessionDir, + rpc: createSessionRpc(events), + skills: { explicitDirs: [join(workDir, 'missing-skills')] }, + providerManager: testProviderManager(), + }); + try { + const { agent } = await session.createAgent( + { type: 'main' }, + { + profile: { + name: 'capture', + systemPrompt: (ctx) => { + capturedContext.agentsMd = ctx.agentsMd; + return ''; + }, + tools: [], + }, + }, + ); + + expect(agent.config.systemPrompt).toBe(''); + expect(capturedContext.agentsMd).toContain('project instructions from disk'); + } finally { + await session.close(); + } + }); + it('tracks connected and failed MCP server totals after initial load', async () => { const workDir = await makeTempDir(); const sessionDir = await makeTempDir(); @@ -529,3 +577,30 @@ function createSessionRpc(events: Array>): SDKSessionRPC })), } as SDKSessionRPC; } + +/** + * Wrap a {@link Kaos} so every `readText` (and `readLines`, which reads via + * `readText` in the ACP bridge) rejects with `cause`. Used to simulate the + * Zed ACP `fs/readTextFile` "Internal error" path that broke session bootstrap + * before AGENTS.md loading was rerouted onto the persistence kaos. + */ +function wrapReadTextWithError(inner: Kaos, cause: Error): Kaos { + return new Proxy(inner, { + get(target, prop, receiver) { + if (prop === 'readText') { + return async () => { + throw cause; + }; + } + if (prop === 'readLines') { + return async function* () { + throw cause; + }; + } + if (prop === 'withCwd') { + return (cwd: string) => wrapReadTextWithError(target.withCwd(cwd), cause); + } + return Reflect.get(target, prop, receiver); + }, + }); +} diff --git a/packages/agent-core/test/session/subagent-host.test.ts b/packages/agent-core/test/session/subagent-host.test.ts index 7308598e7..312e1629f 100644 --- a/packages/agent-core/test/session/subagent-host.test.ts +++ b/packages/agent-core/test/session/subagent-host.test.ts @@ -1135,6 +1135,7 @@ function fakeSession( custom: {}, }, writeMetadata: vi.fn(async () => {}), + systemContextKaos: vi.fn((cwd: string) => parent.kaos.withCwd(cwd)), getReadyAgent: vi.fn((id: string) => agents.get(id)), ensureAgentResumed: vi.fn(async (id: string) => { const agent = agents.get(id); diff --git a/packages/agent-core/test/tools/read.test.ts b/packages/agent-core/test/tools/read.test.ts index 035b4ddee..4ab97c1bb 100644 --- a/packages/agent-core/test/tools/read.test.ts +++ b/packages/agent-core/test/tools/read.test.ts @@ -543,6 +543,31 @@ describe('ReadTool', () => { expect(output).toContain(`Max ${String(MAX_BYTES)} bytes reached.`); }); + it('uses text preview for sniffing before falling back to readBytes', async () => { + const content = 'hello from acp buffer\nsecond line\n'; + const readBytes = vi.fn().mockImplementation(async () => Buffer.from([0x89, 0x50, 0x4e, 0x47])); + const readTextPreview = vi.fn(async (_path: string, n: number) => Buffer.from(content.slice(0, n), 'utf8')); + const tool = new ReadTool( + createFakeKaos({ + stat: vi.fn().mockResolvedValue(REGULAR_FILE_STAT), + readBytes, + readTextPreview, + readLines: vi.fn().mockImplementation(readLinesFromContent(content)), + readText: vi.fn().mockRejectedValue(new Error('full readText should not be called')), + } as unknown as Partial), + PERMISSIVE_WORKSPACE, + ); + + const result = await executeTool(tool, context({ path: '/tmp/acp.txt' })); + const output = toolContentString(result); + + expect(result.isError).toBeFalsy(); + expect(output).toContain('1 hello from acp buffer'); + expect(output).toContain('2 second line'); + expect(readTextPreview).toHaveBeenCalledWith('/tmp/acp.txt', MEDIA_SNIFF_BYTES); + expect(readBytes).not.toHaveBeenCalled(); + }); + it('reads through bounded byte preflight and streams line iteration without full readText', async () => { const content = Array.from({ length: MAX_LINES + 5 }, (_, i) => `line ${String(i + 1)}`).join( '\n', diff --git a/packages/node-sdk/src/kimi-harness.ts b/packages/node-sdk/src/kimi-harness.ts index 90c4b3335..5f7e7f4ae 100644 --- a/packages/node-sdk/src/kimi-harness.ts +++ b/packages/node-sdk/src/kimi-harness.ts @@ -1,3 +1,4 @@ +import type { Kaos } from '@moonshot-ai/kaos'; import { ErrorCodes, KimiError, @@ -84,11 +85,11 @@ export class KimiHarness { } async createSession(options: CreateSessionOptions): Promise { - const { planMode, kaos, ...coreOptions } = options; + const { planMode, kaos, persistenceKaos, ...coreOptions } = options; const summary = - kaos === undefined + kaos === undefined && persistenceKaos === undefined ? await this.rpc.createSession(coreOptions) - : await this.rpc.createSessionWithKaos(coreOptions, kaos); + : await this.rpc.createSessionWithKaos(coreOptions, kaos ?? persistenceKaos as Kaos, persistenceKaos); const session = new Session({ id: summary.id, workDir: summary.workDir, @@ -110,13 +111,18 @@ export class KimiHarness { async resumeSession(input: ResumeSessionInput): Promise { const id = normalizeSessionId(input.id); const active = this.activeSessions.get(id); - if (active !== undefined) return active; + const { kaos, persistenceKaos, ...resumeInput } = input; + if (active !== undefined) { + if (kaos !== undefined || persistenceKaos !== undefined) { + await this.rpc.resumeSessionWithKaos({ ...resumeInput, id }, kaos ?? persistenceKaos as Kaos, persistenceKaos); + } + return active; + } - const { kaos, ...resumeInput } = input; const summary = - kaos === undefined + kaos === undefined && persistenceKaos === undefined ? await this.rpc.resumeSession({ ...resumeInput, id }) - : await this.rpc.resumeSessionWithKaos({ ...resumeInput, id }, kaos); + : await this.rpc.resumeSessionWithKaos({ ...resumeInput, id }, kaos ?? persistenceKaos as Kaos, persistenceKaos); const session = new Session({ id: summary.id, workDir: summary.workDir, diff --git a/packages/node-sdk/src/rpc.ts b/packages/node-sdk/src/rpc.ts index 815bf9f39..32ab3d3e5 100644 --- a/packages/node-sdk/src/rpc.ts +++ b/packages/node-sdk/src/rpc.ts @@ -110,8 +110,10 @@ export abstract class SDKRpcClientBase { async createSessionWithKaos( input: CreateSessionOptions, kaos: Kaos, + persistenceKaos?: Kaos, ): Promise { void kaos; + void persistenceKaos; return this.createSession(input); } @@ -123,8 +125,10 @@ export abstract class SDKRpcClientBase { async resumeSessionWithKaos( input: ResumeSessionInput, kaos: Kaos, + persistenceKaos?: Kaos, ): Promise { void kaos; + void persistenceKaos; return this.resumeSession(input); } diff --git a/packages/node-sdk/src/sdk-rpc-client.ts b/packages/node-sdk/src/sdk-rpc-client.ts index 792a79b24..2ffdb7986 100644 --- a/packages/node-sdk/src/sdk-rpc-client.ts +++ b/packages/node-sdk/src/sdk-rpc-client.ts @@ -101,19 +101,21 @@ export class SDKRpcClient extends SDKRpcClientBase { override async createSessionWithKaos( input: CreateSessionOptions, kaos: Kaos, + persistenceKaos?: Kaos, ): Promise { const { planMode, ...coreInput } = input; void planMode; - return this.core.createSessionWithOverrides(coreInput, { kaos }); + return this.core.createSessionWithOverrides(coreInput, { kaos, persistenceKaos }); } override async resumeSessionWithKaos( input: ResumeSessionInput, kaos: Kaos, + persistenceKaos?: Kaos, ): Promise { return this.core.resumeSessionWithOverrides( { ...input, sessionId: input.id }, - { kaos }, + { kaos, persistenceKaos }, ); } diff --git a/packages/node-sdk/src/types.ts b/packages/node-sdk/src/types.ts index 4cc2b67a4..b60572f7f 100644 --- a/packages/node-sdk/src/types.ts +++ b/packages/node-sdk/src/types.ts @@ -94,6 +94,7 @@ export interface CreateSessionOptions { readonly planMode?: boolean; readonly metadata?: JsonObject | undefined; readonly kaos?: Kaos | undefined; + readonly persistenceKaos?: Kaos | undefined; } export interface RenameSessionInput { @@ -104,6 +105,7 @@ export interface RenameSessionInput { export interface ResumeSessionInput { readonly id: string; readonly kaos?: Kaos | undefined; + readonly persistenceKaos?: Kaos | undefined; } export interface ForkSessionInput { diff --git a/packages/node-sdk/test/create-session-transport.test.ts b/packages/node-sdk/test/create-session-transport.test.ts index eb35de8e8..2dc550dc9 100644 --- a/packages/node-sdk/test/create-session-transport.test.ts +++ b/packages/node-sdk/test/create-session-transport.test.ts @@ -3,8 +3,11 @@ import { mkdtemp, readFile, rm, writeFile } from 'node:fs/promises'; import { tmpdir } from 'node:os'; import { join } from 'node:path'; -import { createKimiHarness } from '#/index'; -import type { KimiError, KimiHarness } from '#/index'; +import type { Kaos } from '@moonshot-ai/kaos'; +import { createKimiHarness, KimiHarness } from '#/index'; +import type { KimiError } from '#/index'; +import type { ResumeSessionInput, ResumedSessionSummary } from '#/types'; +import { SDKRpcClientBase } from '#/rpc'; import { afterEach, describe, expect, it } from 'vitest'; import { waitForAgentWireEvent } from './session-runtime-helpers'; @@ -43,6 +46,44 @@ max_context_size = 1000 ); } +class StubRpc extends SDKRpcClientBase { + resumeCalls: Array<{ input: ResumeSessionInput; kaos: Kaos; persistenceKaos?: Kaos }> = []; + + protected async getRpc(): Promise { + throw new Error('not used'); + } + + override async createSession(input: { id?: string; workDir: string }) { + return { + id: input.id ?? 'ses_stub', + workDir: input.workDir, + sessionDir: '/tmp/session', + createdAt: 1, + updatedAt: 1, + }; + } + + override async resumeSessionWithKaos(input: ResumeSessionInput, kaos: Kaos, persistenceKaos?: Kaos): Promise { + this.resumeCalls.push({ input, kaos, persistenceKaos }); + return { + id: input.id, + workDir: '/tmp/work', + sessionDir: '/tmp/session', + createdAt: 1, + updatedAt: 1, + sessionMetadata: { + createdAt: '', + updatedAt: '', + title: '', + isCustomTitle: false, + agents: {}, + custom: {}, + }, + agents: {}, + }; + } +} + describe('KimiHarness.createSession transport link', () => { it('emits session_started with client attribution when a session is opened', async () => { const homeDir = await makeTempDir(); @@ -486,6 +527,32 @@ effort = "medium" await harness.close(); } }); + + it('rebinds an active session when resumeSession receives a new Kaos', async () => { + const records: TelemetryRecord[] = []; + const rpc = new StubRpc(); + const harness = new KimiHarness(rpc, { + homeDir: '/tmp/home', + configPath: '/tmp/config.toml', + auth: { status: async () => ({ providers: [] }) } as never, + telemetry: recordingTelemetry(records), + ensureConfigFile: async () => undefined, + onClose: () => undefined, + }); + + const session = await harness.createSession({ id: 'ses_active', workDir: '/tmp/work' }); + const kaos = {} as Kaos; + + const resumed = await harness.resumeSession({ id: session.id, kaos }); + + expect(resumed).toBe(session); + expect(rpc.resumeCalls).toHaveLength(1); + expect(rpc.resumeCalls[0]).toMatchObject({ + input: { id: 'ses_active' }, + kaos, + persistenceKaos: undefined, + }); + }); }); function coreSessionIds(harness: KimiHarness): readonly string[] {