Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/opencode/src/session/compaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ When constructing the summary, try to stick to this template:
auto: input.auto,
overflow: input.overflow,
})
MessageV2.invalidateCompactionBoundary(input.sessionID)
})

return Service.of({
Expand Down
43 changes: 37 additions & 6 deletions packages/opencode/src/session/message-v2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { APICallError, convertToModelMessages, LoadAPIKeyError, type ModelMessag
import { LSP } from "../lsp"
import { Snapshot } from "@/snapshot"
import { SyncEvent } from "../sync"
import { Database, NotFoundError, and, desc, eq, inArray, lt, or } from "@/storage/db"
import { Database, NotFoundError, and, desc, eq, gt, gte, inArray, lt, or } from "@/storage/db"
import { MessageTable, PartTable, SessionTable } from "./session.sql"
import { ProviderError } from "@/provider/error"
import { iife } from "@/util/iife"
Expand Down Expand Up @@ -550,6 +550,12 @@ export namespace MessageV2 {
and(eq(MessageTable.time_created, row.time), lt(MessageTable.id, row.id)),
)

const newerOrEqual = (row: Cursor) =>
or(
gt(MessageTable.time_created, row.time),
and(eq(MessageTable.time_created, row.time), gte(MessageTable.id, row.id)),
)

function hydrate(rows: (typeof MessageTable.$inferSelect)[]) {
const ids = rows.map((row) => row.id)
const partByMessage = new Map<string, MessageV2.Part[]>()
Expand Down Expand Up @@ -845,11 +851,12 @@ export namespace MessageV2 {
return Effect.runPromise(toModelMessagesEffect(input, model, options).pipe(Effect.provide(EffectLogger.layer)))
}

export function page(input: { sessionID: SessionID; limit: number; before?: string }) {
export function page(input: { sessionID: SessionID; limit: number; before?: string; since?: Cursor }) {
const before = input.before ? cursor.decode(input.before) : undefined
const where = before
let where = before
? and(eq(MessageTable.session_id, input.sessionID), older(before))
: eq(MessageTable.session_id, input.sessionID)
if (input.since) where = and(where, newerOrEqual(input.since))
const rows = Database.use((db) =>
db
.select()
Expand Down Expand Up @@ -882,11 +889,11 @@ export namespace MessageV2 {
}
}

export function* stream(sessionID: SessionID) {
export function* stream(sessionID: SessionID, since?: Cursor) {
const size = 50
let before: string | undefined
while (true) {
const next = page({ sessionID, limit: size, before })
const next = page({ sessionID, limit: size, before, since })
if (next.items.length === 0) break
for (let i = next.items.length - 1; i >= 0; i--) {
yield next.items[i]
Expand Down Expand Up @@ -944,8 +951,32 @@ export namespace MessageV2 {
return result
}

// Cache compaction boundaries per session to avoid re-reading pre-compaction messages.
// Capped at MAX_BOUNDARY_CACHE entries; oldest entry evicted on insert when full.
const MAX_BOUNDARY_CACHE = 1000
const compactionBoundary = new Map<string, Cursor>()

function setCompactionBoundary(sessionID: string, cursor: Cursor) {
if (!compactionBoundary.has(sessionID) && compactionBoundary.size >= MAX_BOUNDARY_CACHE) {
const oldest = compactionBoundary.keys().next().value
if (oldest !== undefined) compactionBoundary.delete(oldest)
}
compactionBoundary.set(sessionID, cursor)
}

export function invalidateCompactionBoundary(sessionID?: string) {
if (sessionID) compactionBoundary.delete(sessionID)
else compactionBoundary.clear()
}

export const filterCompactedEffect = Effect.fnUntraced(function* (sessionID: SessionID) {
return filterCompacted(stream(sessionID))
const cached = compactionBoundary.get(sessionID)
const msgs = filterCompacted(stream(sessionID, cached))
if (msgs.length > 0) {
const first = msgs[0]
setCompactionBoundary(sessionID, { id: first.info.id, time: first.info.time.created })
}
return msgs
})

export function fromError(
Expand Down
Loading