Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions src/acp/errors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { JsonRpcError } from './jsonrpc'

export type AcpErrorData = {
kind?: string
retryable?: boolean
sessionId?: string
toolCallId?: string
[key: string]: unknown
}

export class AcpError extends Error {
readonly code: number
readonly data?: AcpErrorData

constructor(code: number, message: string, data?: AcpErrorData) {
super(message)
this.name = 'AcpError'
this.code = code
this.data = data
}
}

export function toJsonRpcError(error: unknown): JsonRpcError {
if (error instanceof JsonRpcError) return error
if (error instanceof AcpError) {
return new JsonRpcError(error.code, error.message, error.data)
}

const message = error instanceof Error ? error.message : String(error)
return new JsonRpcError(-32603, message)
}
18 changes: 18 additions & 0 deletions src/acp/jsonrpc.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { format } from 'node:util'
import { assertJsonPayloadBudget, JsonPayloadBudgetError } from './validation'

export type JsonRpcId = string | number | null

Expand Down Expand Up @@ -76,6 +77,7 @@ export class JsonRpcPeer {
resolve: (value: unknown) => void
reject: (err: unknown) => void
abort?: AbortSignal
abortHandler?: () => void
timeoutId?: NodeJS.Timeout
}
>()
Expand Down Expand Up @@ -145,6 +147,17 @@ export class JsonRpcPeer {
const params = 'params' in payload ? (payload as any).params : undefined
const id = hasId ? (payload.id as string | number) : null

try {
assertJsonPayloadBudget(payload, {
label: `JSON-RPC request ${method}`,
})
} catch (err) {
if (err instanceof JsonPayloadBudgetError) {
return makeErrorResponse(id, err.code, err.message, err.data)
}
throw err
}

const handler = this.handlers.get(method)
if (!handler) {
if (id === null) return null
Expand Down Expand Up @@ -177,6 +190,9 @@ export class JsonRpcPeer {
if (!pending) return
this.pending.delete(id)

if (pending.abortHandler && pending.abort) {
pending.abort.removeEventListener('abort', pending.abortHandler)
}
if (pending.timeoutId) clearTimeout(pending.timeoutId)

if (
Expand Down Expand Up @@ -218,6 +234,7 @@ export class JsonRpcPeer {
resolve: (value: unknown) => void
reject: (err: unknown) => void
abort?: AbortSignal
abortHandler?: () => void
timeoutId?: NodeJS.Timeout
} = { resolve, reject, abort: args.signal }

Expand All @@ -238,6 +255,7 @@ export class JsonRpcPeer {
onAbort()
return
}
entry.abortHandler = onAbort
args.signal.addEventListener('abort', onAbort, { once: true })
}

Expand Down
116 changes: 116 additions & 0 deletions src/acp/sessionManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import type { WrappedClient } from '@services/mcpClient'

export const ACP_MAX_ACTIVE_SESSIONS = 100

export type ManagedAcpSession = {
sessionId: string
activeAbortController?: AbortController | null
sessionOwnedMcpClients?: WrappedClient[]
}

type SessionEntry<T extends ManagedAcpSession> = {
session: T
lastAccessedAt: number
}

export async function closeSessionOwnedMcpClients(
session: ManagedAcpSession,
): Promise<void> {
const clients = session.sessionOwnedMcpClients ?? []
for (const client of clients) {
if (client.type !== 'connected') continue
try {
await client.client.close()
} catch {}
}
session.sessionOwnedMcpClients = []
}

export class AcpSessionManager<T extends ManagedAcpSession> {
private readonly sessions = new Map<string, SessionEntry<T>>()

constructor(
private readonly options: {
maxSessions?: number
ttlMs?: number
now?: () => number
} = {},
) {}

get size(): number {
return this.sessions.size
}

get(sessionId: string): T | undefined {
const entry = this.sessions.get(sessionId)
if (!entry) return undefined
entry.lastAccessedAt = this.now()
return entry.session
}

values(): T[] {
return Array.from(this.sessions.values(), entry => entry.session)
}

async set(sessionId: string, session: T): Promise<void> {
const existing = this.sessions.get(sessionId)
if (existing && existing.session !== session) {
existing.session.activeAbortController?.abort()
await closeSessionOwnedMcpClients(existing.session)
}

this.sessions.set(sessionId, {
session,
lastAccessedAt: this.now(),
})
await this.evictIfNeeded()
}

async delete(sessionId: string): Promise<void> {
const existing = this.sessions.get(sessionId)
if (!existing) return
this.sessions.delete(sessionId)
existing.session.activeAbortController?.abort()
await closeSessionOwnedMcpClients(existing.session)
}

async cleanupExpired(): Promise<void> {
const ttlMs = this.options.ttlMs
if (!ttlMs || ttlMs <= 0) return

const now = this.now()
for (const [sessionId, entry] of this.sessions.entries()) {
if (now - entry.lastAccessedAt <= ttlMs) continue
await this.delete(sessionId)
}
}

clear(): void {
for (const entry of this.sessions.values()) {
entry.session.activeAbortController?.abort()
void closeSessionOwnedMcpClients(entry.session)
}
this.sessions.clear()
}

private now(): number {
return this.options.now?.() ?? Date.now()
}

private async evictIfNeeded(): Promise<void> {
const maxSessions = this.options.maxSessions ?? ACP_MAX_ACTIVE_SESSIONS
while (this.sessions.size > maxSessions) {
let oldestSessionId: string | null = null
let oldestAccess = Infinity

for (const [sessionId, entry] of this.sessions.entries()) {
if (entry.lastAccessedAt >= oldestAccess) continue
oldestAccess = entry.lastAccessedAt
oldestSessionId = sessionId
}

if (!oldestSessionId) return
await this.delete(oldestSessionId)
}
}
}
187 changes: 187 additions & 0 deletions src/acp/sessionStore.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
import {
mkdir,
readdir,
readFile,
rename,
rm,
stat,
writeFile,
} from 'node:fs/promises'
import { join } from 'node:path'

import * as Protocol from './protocol'

import type { Message } from '@query'
import type { ToolUseContext } from '@tool'
import { getKodeBaseDir } from '@utils/config/env'
import { logError } from '@utils/log'
import { debug } from '@utils/log/debugLogger'
import type { ToolPermissionContext } from '@kode-types/toolPermissionContext'

export const ACP_SESSION_STORE_VERSION = 1
export const ACP_SESSION_TTL_MS = 24 * 60 * 60 * 1000

export type PersistedAcpSession = {
version: number
sessionId: string
cwd: string
mcpServers: Protocol.McpServer[]
messages: Message[]
toolPermissionContext: ToolPermissionContext
readFileTimestamps: Record<string, number>
responseState: ToolUseContext['responseState']
currentModeId: Protocol.SessionModeId
}

export type AcpSessionPersistSource = {
sessionId: string
cwd: string
mcpServers: Protocol.McpServer[]
messages: Message[]
toolPermissionContext: ToolPermissionContext
readFileTimestamps: Record<string, number>
responseState: ToolUseContext['responseState']
currentModeId: Protocol.SessionModeId
}

export function getProjectDirSlug(cwd: string): string {
return cwd.replace(/[^a-zA-Z0-9]/g, '-')
}

export function sanitizeSessionId(sessionId: string): string {
return sessionId.replace(/[^a-zA-Z0-9_-]/g, '_')
}

export function getAcpSessionDir(cwd: string): string {
return join(getKodeBaseDir(), getProjectDirSlug(cwd), 'acp-sessions')
}

export function getAcpSessionFilePath(cwd: string, sessionId: string): string {
return join(getAcpSessionDir(cwd), `${sanitizeSessionId(sessionId)}.json`)
}

export function buildPersistedAcpSession(
session: AcpSessionPersistSource,
): PersistedAcpSession {
return {
version: ACP_SESSION_STORE_VERSION,
sessionId: session.sessionId,
cwd: session.cwd,
mcpServers: session.mcpServers,
messages: session.messages,
toolPermissionContext: session.toolPermissionContext,
readFileTimestamps: session.readFileTimestamps,
responseState: session.responseState,
currentModeId: session.currentModeId,
}
}

export async function persistAcpSessionToDisk(
session: AcpSessionPersistSource,
): Promise<void> {
const startedAt = Date.now()
const dir = getAcpSessionDir(session.cwd)
const path = getAcpSessionFilePath(session.cwd, session.sessionId)
const tmpPath = `${path}.${process.pid}.${Date.now()}.tmp`

try {
await mkdir(dir, { recursive: true })
await writeFile(
tmpPath,
JSON.stringify(buildPersistedAcpSession(session), null, 2),
'utf8',
)
await rename(tmpPath, path)
debug.info('ACP_SESSION_PERSIST_DONE', {
sessionId: session.sessionId,
durationMs: Date.now() - startedAt,
})
} catch (error) {
try {
await rm(tmpPath, { force: true })
} catch {}
debug.warn('ACP_SESSION_PERSIST_FAILED', {
sessionId: session.sessionId,
durationMs: Date.now() - startedAt,
error: error instanceof Error ? error.message : String(error),
})
logError(error)
}
}

export async function loadAcpSessionFromDisk(
cwd: string,
sessionId: string,
): Promise<PersistedAcpSession | null> {
const startedAt = Date.now()

try {
const path = getAcpSessionFilePath(cwd, sessionId)
const raw = await readFile(path, 'utf8')
const parsed = JSON.parse(raw) as PersistedAcpSession
if (!parsed || typeof parsed !== 'object') return null
if (parsed.sessionId !== sessionId) return null
if (typeof parsed.cwd !== 'string' || parsed.cwd !== cwd) return null
if (!Array.isArray(parsed.messages)) return null

debug.info('ACP_SESSION_LOAD_DONE', {
sessionId,
durationMs: Date.now() - startedAt,
})
return parsed
} catch {
return null
}
}

async function listSessionDirs(cwd?: string): Promise<string[]> {
if (cwd) return [getAcpSessionDir(cwd)]

const base = getKodeBaseDir()
try {
const entries = await readdir(base, { withFileTypes: true })
return entries
.filter(entry => entry.isDirectory())
.map(entry => join(base, entry.name, 'acp-sessions'))
} catch {
return []
}
}

export async function cleanupExpiredAcpSessions(options?: {
cwd?: string
ttlMs?: number
nowMs?: number
}): Promise<void> {
const ttlMs = options?.ttlMs ?? ACP_SESSION_TTL_MS
const nowMs = options?.nowMs ?? Date.now()
const startedAt = Date.now()
let deleted = 0

for (const dir of await listSessionDirs(options?.cwd)) {
let entries: Array<{ isFile(): boolean; name: string }>
try {
entries = (await readdir(dir, {
withFileTypes: true,
})) as Array<{ isFile(): boolean; name: string }>
} catch {
continue
}

for (const entry of entries) {
if (!entry.isFile() || !entry.name.endsWith('.json')) continue
const filePath = join(dir, entry.name)
try {
const info = await stat(filePath)
if (nowMs - info.mtimeMs <= ttlMs) continue
await rm(filePath, { force: true })
deleted += 1
} catch {}
}
}

debug.info('ACP_SESSION_CLEANUP_DONE', {
deleted,
durationMs: Date.now() - startedAt,
})
}
Loading
Loading