diff --git a/src/acp/errors.ts b/src/acp/errors.ts new file mode 100644 index 00000000..e4f2a484 --- /dev/null +++ b/src/acp/errors.ts @@ -0,0 +1,31 @@ +import { JsonRpcError } from './jsonrpc' + +export type AcpErrorData = { + kind?: string + retryable?: boolean + sessionId?: string + toolCallId?: string + [key: string]: unknown +} + +export class AcpError extends Error { + readonly code: number + readonly data?: AcpErrorData + + constructor(code: number, message: string, data?: AcpErrorData) { + super(message) + this.name = 'AcpError' + this.code = code + this.data = data + } +} + +export function toJsonRpcError(error: unknown): JsonRpcError { + if (error instanceof JsonRpcError) return error + if (error instanceof AcpError) { + return new JsonRpcError(error.code, error.message, error.data) + } + + const message = error instanceof Error ? error.message : String(error) + return new JsonRpcError(-32603, message) +} diff --git a/src/acp/jsonrpc.ts b/src/acp/jsonrpc.ts index f5a0d8e6..50ad626d 100644 --- a/src/acp/jsonrpc.ts +++ b/src/acp/jsonrpc.ts @@ -1,4 +1,5 @@ import { format } from 'node:util' +import { assertJsonPayloadBudget, JsonPayloadBudgetError } from './validation' export type JsonRpcId = string | number | null @@ -76,6 +77,7 @@ export class JsonRpcPeer { resolve: (value: unknown) => void reject: (err: unknown) => void abort?: AbortSignal + abortHandler?: () => void timeoutId?: NodeJS.Timeout } >() @@ -145,6 +147,17 @@ export class JsonRpcPeer { const params = 'params' in payload ? (payload as any).params : undefined const id = hasId ? (payload.id as string | number) : null + try { + assertJsonPayloadBudget(payload, { + label: `JSON-RPC request ${method}`, + }) + } catch (err) { + if (err instanceof JsonPayloadBudgetError) { + return makeErrorResponse(id, err.code, err.message, err.data) + } + throw err + } + const handler = this.handlers.get(method) if (!handler) { if (id === null) return null @@ -177,6 +190,9 @@ export class JsonRpcPeer { if (!pending) return this.pending.delete(id) + if (pending.abortHandler && pending.abort) { + pending.abort.removeEventListener('abort', pending.abortHandler) + } if (pending.timeoutId) clearTimeout(pending.timeoutId) if ( @@ -218,6 +234,7 @@ export class JsonRpcPeer { resolve: (value: unknown) => void reject: (err: unknown) => void abort?: AbortSignal + abortHandler?: () => void timeoutId?: NodeJS.Timeout } = { resolve, reject, abort: args.signal } @@ -238,6 +255,7 @@ export class JsonRpcPeer { onAbort() return } + entry.abortHandler = onAbort args.signal.addEventListener('abort', onAbort, { once: true }) } diff --git a/src/acp/sessionManager.ts b/src/acp/sessionManager.ts new file mode 100644 index 00000000..9fd3cdd6 --- /dev/null +++ b/src/acp/sessionManager.ts @@ -0,0 +1,116 @@ +import type { WrappedClient } from '@services/mcpClient' + +export const ACP_MAX_ACTIVE_SESSIONS = 100 + +export type ManagedAcpSession = { + sessionId: string + activeAbortController?: AbortController | null + sessionOwnedMcpClients?: WrappedClient[] +} + +type SessionEntry = { + session: T + lastAccessedAt: number +} + +export async function closeSessionOwnedMcpClients( + session: ManagedAcpSession, +): Promise { + const clients = session.sessionOwnedMcpClients ?? [] + for (const client of clients) { + if (client.type !== 'connected') continue + try { + await client.client.close() + } catch {} + } + session.sessionOwnedMcpClients = [] +} + +export class AcpSessionManager { + private readonly sessions = new Map>() + + constructor( + private readonly options: { + maxSessions?: number + ttlMs?: number + now?: () => number + } = {}, + ) {} + + get size(): number { + return this.sessions.size + } + + get(sessionId: string): T | undefined { + const entry = this.sessions.get(sessionId) + if (!entry) return undefined + entry.lastAccessedAt = this.now() + return entry.session + } + + values(): T[] { + return Array.from(this.sessions.values(), entry => entry.session) + } + + async set(sessionId: string, session: T): Promise { + const existing = this.sessions.get(sessionId) + if (existing && existing.session !== session) { + existing.session.activeAbortController?.abort() + await closeSessionOwnedMcpClients(existing.session) + } + + this.sessions.set(sessionId, { + session, + lastAccessedAt: this.now(), + }) + await this.evictIfNeeded() + } + + async delete(sessionId: string): Promise { + const existing = this.sessions.get(sessionId) + if (!existing) return + this.sessions.delete(sessionId) + existing.session.activeAbortController?.abort() + await closeSessionOwnedMcpClients(existing.session) + } + + async cleanupExpired(): Promise { + const ttlMs = this.options.ttlMs + if (!ttlMs || ttlMs <= 0) return + + const now = this.now() + for (const [sessionId, entry] of this.sessions.entries()) { + if (now - entry.lastAccessedAt <= ttlMs) continue + await this.delete(sessionId) + } + } + + clear(): void { + for (const entry of this.sessions.values()) { + entry.session.activeAbortController?.abort() + void closeSessionOwnedMcpClients(entry.session) + } + this.sessions.clear() + } + + private now(): number { + return this.options.now?.() ?? Date.now() + } + + private async evictIfNeeded(): Promise { + const maxSessions = this.options.maxSessions ?? ACP_MAX_ACTIVE_SESSIONS + while (this.sessions.size > maxSessions) { + let oldestSessionId: string | null = null + let oldestAccess = Infinity + + for (const [sessionId, entry] of this.sessions.entries()) { + if (entry.lastAccessedAt >= oldestAccess) continue + oldestAccess = entry.lastAccessedAt + oldestSessionId = sessionId + } + + if (!oldestSessionId) return + await this.delete(oldestSessionId) + } + } +} diff --git a/src/acp/sessionStore.ts b/src/acp/sessionStore.ts new file mode 100644 index 00000000..edb0ee91 --- /dev/null +++ b/src/acp/sessionStore.ts @@ -0,0 +1,187 @@ +import { + mkdir, + readdir, + readFile, + rename, + rm, + stat, + writeFile, +} from 'node:fs/promises' +import { join } from 'node:path' + +import * as Protocol from './protocol' + +import type { Message } from '@query' +import type { ToolUseContext } from '@tool' +import { getKodeBaseDir } from '@utils/config/env' +import { logError } from '@utils/log' +import { debug } from '@utils/log/debugLogger' +import type { ToolPermissionContext } from '@kode-types/toolPermissionContext' + +export const ACP_SESSION_STORE_VERSION = 1 +export const ACP_SESSION_TTL_MS = 24 * 60 * 60 * 1000 + +export type PersistedAcpSession = { + version: number + sessionId: string + cwd: string + mcpServers: Protocol.McpServer[] + messages: Message[] + toolPermissionContext: ToolPermissionContext + readFileTimestamps: Record + responseState: ToolUseContext['responseState'] + currentModeId: Protocol.SessionModeId +} + +export type AcpSessionPersistSource = { + sessionId: string + cwd: string + mcpServers: Protocol.McpServer[] + messages: Message[] + toolPermissionContext: ToolPermissionContext + readFileTimestamps: Record + responseState: ToolUseContext['responseState'] + currentModeId: Protocol.SessionModeId +} + +export function getProjectDirSlug(cwd: string): string { + return cwd.replace(/[^a-zA-Z0-9]/g, '-') +} + +export function sanitizeSessionId(sessionId: string): string { + return sessionId.replace(/[^a-zA-Z0-9_-]/g, '_') +} + +export function getAcpSessionDir(cwd: string): string { + return join(getKodeBaseDir(), getProjectDirSlug(cwd), 'acp-sessions') +} + +export function getAcpSessionFilePath(cwd: string, sessionId: string): string { + return join(getAcpSessionDir(cwd), `${sanitizeSessionId(sessionId)}.json`) +} + +export function buildPersistedAcpSession( + session: AcpSessionPersistSource, +): PersistedAcpSession { + return { + version: ACP_SESSION_STORE_VERSION, + sessionId: session.sessionId, + cwd: session.cwd, + mcpServers: session.mcpServers, + messages: session.messages, + toolPermissionContext: session.toolPermissionContext, + readFileTimestamps: session.readFileTimestamps, + responseState: session.responseState, + currentModeId: session.currentModeId, + } +} + +export async function persistAcpSessionToDisk( + session: AcpSessionPersistSource, +): Promise { + const startedAt = Date.now() + const dir = getAcpSessionDir(session.cwd) + const path = getAcpSessionFilePath(session.cwd, session.sessionId) + const tmpPath = `${path}.${process.pid}.${Date.now()}.tmp` + + try { + await mkdir(dir, { recursive: true }) + await writeFile( + tmpPath, + JSON.stringify(buildPersistedAcpSession(session), null, 2), + 'utf8', + ) + await rename(tmpPath, path) + debug.info('ACP_SESSION_PERSIST_DONE', { + sessionId: session.sessionId, + durationMs: Date.now() - startedAt, + }) + } catch (error) { + try { + await rm(tmpPath, { force: true }) + } catch {} + debug.warn('ACP_SESSION_PERSIST_FAILED', { + sessionId: session.sessionId, + durationMs: Date.now() - startedAt, + error: error instanceof Error ? error.message : String(error), + }) + logError(error) + } +} + +export async function loadAcpSessionFromDisk( + cwd: string, + sessionId: string, +): Promise { + const startedAt = Date.now() + + try { + const path = getAcpSessionFilePath(cwd, sessionId) + const raw = await readFile(path, 'utf8') + const parsed = JSON.parse(raw) as PersistedAcpSession + if (!parsed || typeof parsed !== 'object') return null + if (parsed.sessionId !== sessionId) return null + if (typeof parsed.cwd !== 'string' || parsed.cwd !== cwd) return null + if (!Array.isArray(parsed.messages)) return null + + debug.info('ACP_SESSION_LOAD_DONE', { + sessionId, + durationMs: Date.now() - startedAt, + }) + return parsed + } catch { + return null + } +} + +async function listSessionDirs(cwd?: string): Promise { + if (cwd) return [getAcpSessionDir(cwd)] + + const base = getKodeBaseDir() + try { + const entries = await readdir(base, { withFileTypes: true }) + return entries + .filter(entry => entry.isDirectory()) + .map(entry => join(base, entry.name, 'acp-sessions')) + } catch { + return [] + } +} + +export async function cleanupExpiredAcpSessions(options?: { + cwd?: string + ttlMs?: number + nowMs?: number +}): Promise { + const ttlMs = options?.ttlMs ?? ACP_SESSION_TTL_MS + const nowMs = options?.nowMs ?? Date.now() + const startedAt = Date.now() + let deleted = 0 + + for (const dir of await listSessionDirs(options?.cwd)) { + let entries: Array<{ isFile(): boolean; name: string }> + try { + entries = (await readdir(dir, { + withFileTypes: true, + })) as Array<{ isFile(): boolean; name: string }> + } catch { + continue + } + + for (const entry of entries) { + if (!entry.isFile() || !entry.name.endsWith('.json')) continue + const filePath = join(dir, entry.name) + try { + const info = await stat(filePath) + if (nowMs - info.mtimeMs <= ttlMs) continue + await rm(filePath, { force: true }) + deleted += 1 + } catch {} + } + } + + debug.info('ACP_SESSION_CLEANUP_DONE', { + deleted, + durationMs: Date.now() - startedAt, + }) +} diff --git a/tests/unit/acp-jsonrpc-validation.test.ts b/tests/unit/acp-jsonrpc-validation.test.ts new file mode 100644 index 00000000..96b7b3aa --- /dev/null +++ b/tests/unit/acp-jsonrpc-validation.test.ts @@ -0,0 +1,110 @@ +import { describe, expect, test } from 'bun:test' +import { AcpError, toJsonRpcError } from '../../src/acp/errors' +import { JsonRpcPeer } from '../../src/acp/jsonrpc' +import { MAX_JSON_PAYLOAD_BYTES } from '../../src/acp/validation' + +function nestedObject(depth: number): Record { + let value: Record = {} + for (let i = 0; i < depth; i += 1) { + value = { child: value } + } + return value +} + +describe('ACP JSON-RPC validation', () => { + test('rejects oversized inbound params with structured error data', async () => { + const peer = new JsonRpcPeer() + const lines: string[] = [] + peer.setSend(line => lines.push(line)) + peer.registerMethod('session/prompt', () => ({})) + + await peer.handleIncoming({ + jsonrpc: '2.0', + id: 1, + method: 'session/prompt', + params: { text: 'x'.repeat(MAX_JSON_PAYLOAD_BYTES + 1) }, + }) + + const response = JSON.parse(lines[0]!) + expect(response.error.code).toBe(-32602) + expect(response.error.data.kind).toBe('payload_too_large') + expect(response.error.data.retryable).toBe(false) + }) + + test('rejects deeply nested inbound params with structured error data', async () => { + const peer = new JsonRpcPeer() + const lines: string[] = [] + peer.setSend(line => lines.push(line)) + peer.registerMethod('session/new', () => ({})) + + await peer.handleIncoming({ + jsonrpc: '2.0', + id: 2, + method: 'session/new', + params: nestedObject(11), + }) + + const response = JSON.parse(lines[0]!) + expect(response.error.code).toBe(-32602) + expect(response.error.data.kind).toBe('payload_too_deep') + }) + + test('removes abort listeners when outbound request resolves', async () => { + const peer = new JsonRpcPeer() + const lines: string[] = [] + peer.setSend(line => lines.push(line)) + + const controller = new AbortController() + const signal = controller.signal + const add = signal.addEventListener.bind(signal) + const remove = signal.removeEventListener.bind(signal) + let addCount = 0 + let removeCount = 0 + + signal.addEventListener = ((...args: Parameters) => { + addCount += 1 + return add(...args) + }) as typeof signal.addEventListener + signal.removeEventListener = ((...args: Parameters) => { + removeCount += 1 + return remove(...args) + }) as typeof signal.removeEventListener + + const pending = peer.sendRequest({ + method: 'client/test', + signal, + timeoutMs: 10_000, + }) + const outbound = JSON.parse(lines[0]!) + + await peer.handleIncoming({ + jsonrpc: '2.0', + id: outbound.id, + result: 'ok', + }) + + await expect(pending).resolves.toBe('ok') + expect(addCount).toBe(1) + expect(removeCount).toBe(1) + }) +}) + +describe('AcpError mapping', () => { + test('converts to JsonRpcError while preserving optional data', () => { + const mapped = toJsonRpcError( + new AcpError(-32602, 'Invalid ACP params', { + kind: 'invalid_params', + retryable: false, + sessionId: 'sess_1', + }), + ) + + expect(mapped.code).toBe(-32602) + expect(mapped.message).toBe('Invalid ACP params') + expect(mapped.data).toEqual({ + kind: 'invalid_params', + retryable: false, + sessionId: 'sess_1', + }) + }) +}) diff --git a/tests/unit/acp-session-store-manager.test.ts b/tests/unit/acp-session-store-manager.test.ts new file mode 100644 index 00000000..d7a8d9dd --- /dev/null +++ b/tests/unit/acp-session-store-manager.test.ts @@ -0,0 +1,152 @@ +import { afterEach, describe, expect, test } from 'bun:test' +import { existsSync, mkdtempSync, rmSync } from 'node:fs' +import { readdir, readFile, utimes, writeFile } from 'node:fs/promises' +import { tmpdir } from 'node:os' +import { join } from 'node:path' +import { + cleanupExpiredAcpSessions, + getAcpSessionDir, + getAcpSessionFilePath, + loadAcpSessionFromDisk, + persistAcpSessionToDisk, +} from '../../src/acp/sessionStore' +import { AcpSessionManager } from '../../src/acp/sessionManager' +import { createDefaultToolPermissionContext } from '@kode-types/toolPermissionContext' + +describe('ACP session store', () => { + const originalConfigDir = process.env.KODE_CONFIG_DIR + let configDir = '' + let projectDir = '' + + afterEach(() => { + if (originalConfigDir === undefined) delete process.env.KODE_CONFIG_DIR + else process.env.KODE_CONFIG_DIR = originalConfigDir + if (configDir) rmSync(configDir, { recursive: true, force: true }) + if (projectDir) rmSync(projectDir, { recursive: true, force: true }) + configDir = '' + projectDir = '' + }) + + test('persists atomically and loads the backward-compatible JSON shape', async () => { + configDir = mkdtempSync(join(tmpdir(), 'kode-acp-store-config-')) + projectDir = mkdtempSync(join(tmpdir(), 'kode-acp-store-project-')) + process.env.KODE_CONFIG_DIR = configDir + + await persistAcpSessionToDisk({ + sessionId: 'sess_test', + cwd: projectDir, + mcpServers: [], + messages: [], + toolPermissionContext: createDefaultToolPermissionContext(), + readFileTimestamps: {}, + responseState: {}, + currentModeId: 'default', + }) + + const path = getAcpSessionFilePath(projectDir, 'sess_test') + expect(existsSync(path)).toBe(true) + const files = await readdir(getAcpSessionDir(projectDir)) + expect(files.filter(file => file.endsWith('.tmp'))).toEqual([]) + + const raw = JSON.parse(await readFile(path, 'utf8')) + expect(raw.sessionId).toBe('sess_test') + expect(raw.cwd).toBe(projectDir) + expect(raw.version).toBe(1) + + const loaded = await loadAcpSessionFromDisk(projectDir, 'sess_test') + expect(loaded?.sessionId).toBe('sess_test') + expect(loaded?.currentModeId).toBe('default') + }) + + test('cleanup removes expired session files by mtime only', async () => { + configDir = mkdtempSync(join(tmpdir(), 'kode-acp-clean-config-')) + projectDir = mkdtempSync(join(tmpdir(), 'kode-acp-clean-project-')) + process.env.KODE_CONFIG_DIR = configDir + + const oldPath = getAcpSessionFilePath(projectDir, 'old') + const freshPath = getAcpSessionFilePath(projectDir, 'fresh') + await persistAcpSessionToDisk({ + sessionId: 'old', + cwd: projectDir, + mcpServers: [], + messages: [], + toolPermissionContext: createDefaultToolPermissionContext(), + readFileTimestamps: {}, + responseState: {}, + currentModeId: 'default', + }) + await persistAcpSessionToDisk({ + sessionId: 'fresh', + cwd: projectDir, + mcpServers: [], + messages: [], + toolPermissionContext: createDefaultToolPermissionContext(), + readFileTimestamps: {}, + responseState: {}, + currentModeId: 'default', + }) + + const oldDate = new Date(Date.now() - 10_000) + await utimes(oldPath, oldDate, oldDate) + await writeFile(join(getAcpSessionDir(projectDir), 'note.txt'), 'keep') + + await cleanupExpiredAcpSessions({ + cwd: projectDir, + ttlMs: 1_000, + nowMs: Date.now(), + }) + + expect(existsSync(oldPath)).toBe(false) + expect(existsSync(freshPath)).toBe(true) + expect(existsSync(join(getAcpSessionDir(projectDir), 'note.txt'))).toBe( + true, + ) + }) +}) + +describe('ACP in-memory session manager', () => { + test('evicts oldest sessions and closes only session-owned MCP clients', async () => { + let now = 1 + const closed: string[] = [] + const aborted: string[] = [] + const manager = new AcpSessionManager({ + maxSessions: 1, + now: () => now, + }) + + const firstAbort = new AbortController() + firstAbort.signal.addEventListener('abort', () => aborted.push('first')) + + await manager.set('first', { + sessionId: 'first', + activeAbortController: firstAbort, + sessionOwnedMcpClients: [ + { + type: 'connected', + name: 'owned', + capabilities: null, + client: { close: async () => closed.push('owned') }, + }, + ], + }) + + now += 1 + await manager.set('second', { + sessionId: 'second', + activeAbortController: null, + sessionOwnedMcpClients: [ + { + type: 'connected', + name: 'second-owned', + capabilities: null, + client: { close: async () => closed.push('second-owned') }, + }, + ], + }) + + expect(manager.get('first')).toBeUndefined() + expect(manager.get('second')?.sessionId).toBe('second') + expect(aborted).toEqual(['first']) + expect(closed).toEqual(['owned']) + }) +})