diff --git a/apps/mesh/migrations/098-message-parts.ts b/apps/mesh/migrations/098-message-parts.ts new file mode 100644 index 0000000000..aef2a4edad --- /dev/null +++ b/apps/mesh/migrations/098-message-parts.ts @@ -0,0 +1,63 @@ +/** + * Message Parts Migration (expand phase) + * + * Moves `thread_messages.parts` (a JSON array blob, frequently 50 MB+ on busy + * threads because tool-result parts inline large payloads) into a normalized + * `message_parts` table — one row per part. + * + * This is the first, additive step of an expand/contract migration: + * 098 (this) create message_parts — additive, no lock on thread_messages + * + deploy dual-write saveMessages → both columns + * + backfill `deco backfill-message-parts` — copy existing parts, batched/resumable + * + deploy cut reads over to message_parts + * + deploy stop writing thread_messages.parts + * later ALTER TABLE thread_messages DROP COLUMN parts (then pg_repack to reclaim disk) + * + * `content` holds each part as a JSON-text fragment (NOT jsonb): real `parts` + * blobs contain `\0` (e.g. binary tool outputs like ZIP headers), which + * Postgres `jsonb`/`json` reject (22P05) but `text` stores fine — same as the + * `thread_messages.parts` column does today. `type` is denormalized from + * `part.type` for filtering. Reconstruct a message's array by concatenating the + * fragments (the app then JSON.parses it, exactly like the old `parts` blob): + * SELECT coalesce('[' || string_agg(content, ',' ORDER BY idx) || ']', '[]') + * FROM message_parts WHERE message_id = $1 + * + * `created_at`/`updated_at` are per-part (text, like `thread_messages`): a part + * mutates in place across streaming saves (a tool-call goes input-streaming → + * input-available → output-available at the same idx), so the dual-write + * preserves `created_at` on conflict and bumps `updated_at` only when `content` + * actually changes. Backfilled rows inherit the message's `created_at`, since + * no true per-part time exists for them. + */ + +import { type Kysely, sql } from "kysely"; + +export async function up(db: Kysely): Promise { + await db.schema + .createTable("message_parts") + // CASCADE DELETE: parts vanish with their message (which itself cascades + // from threads), so thread deletion still fully cleans up. + .addColumn("message_id", "text", (col) => + col.notNull().references("thread_messages.id").onDelete("cascade"), + ) + .addColumn("idx", "integer", (col) => col.notNull()) + .addColumn("type", "text", (col) => col.notNull()) + // text, not jsonb: parts can contain binary tool outputs, which + // jsonb rejects. Stores the JSON-text fragment verbatim, like `parts`. + .addColumn("content", "text", (col) => col.notNull()) + .addColumn("created_at", "text", (col) => + col.notNull().defaultTo(sql`CURRENT_TIMESTAMP`), + ) + .addColumn("updated_at", "text", (col) => + col.notNull().defaultTo(sql`CURRENT_TIMESTAMP`), + ) + .addPrimaryKeyConstraint("message_parts_pkey", ["message_id", "idx"]) + .execute(); + + // The PK (message_id, idx) already serves the canonical read + // (WHERE message_id = $1 ORDER BY idx), so no extra index is needed. +} + +export async function down(db: Kysely): Promise { + await db.schema.dropTable("message_parts").execute(); +} diff --git a/apps/mesh/migrations/index.ts b/apps/mesh/migrations/index.ts index d68d544b28..c7b1c22583 100644 --- a/apps/mesh/migrations/index.ts +++ b/apps/mesh/migrations/index.ts @@ -96,6 +96,7 @@ import * as migration094orgfileconfigs from "./094-org-file-configs.ts"; import * as migration095removeautomationtoolcallkind from "./095-remove-automation-tool-call-kind.ts"; import * as migration096orgfileconfigspublicurlbase from "./096-org-file-configs-public-url-base.ts"; import * as migration097droplocaldockersandboxstate from "./097-drop-local-docker-sandbox-state.ts"; +import * as migration098messageparts from "./098-message-parts.ts"; /** * Core migrations for the Mesh application. @@ -214,6 +215,7 @@ const migrations: Record = { migration096orgfileconfigspublicurlbase, "097-drop-local-docker-sandbox-state": migration097droplocaldockersandboxstate, + "098-message-parts": migration098messageparts, }; export default migrations; diff --git a/apps/mesh/src/cli.ts b/apps/mesh/src/cli.ts index f8610791f1..9d17ee888c 100644 --- a/apps/mesh/src/cli.ts +++ b/apps/mesh/src/cli.ts @@ -70,6 +70,8 @@ const { values, positionals } = parseArgs({ batch: { type: "string" }, limit: { type: "string" }, org: { type: "string" }, + "after-id": { type: "string" }, + reconcile: { type: "boolean", default: false }, }, allowPositionals: true, }); @@ -87,6 +89,7 @@ Usage: deco auth Manage CLI authentication deco link [studio-url] [options] Start the desktop-side link daemon deco backfill-assets Hoist legacy inline media out of threads + connections + deco backfill-message-parts Copy thread_messages.parts into message_parts (--reconcile to verify/repair) deco completion [shell] Install shell completions Server Options: @@ -227,6 +230,23 @@ if (command === "backfill-assets") { process.exit(code); } +// ── Backfill: copy thread_messages.parts into message_parts ───────────── +if (command === "backfill-message-parts") { + const { backfillMessagePartsCommand } = await import( + "./cli/commands/backfill-message-parts" + ); + const code = await backfillMessagePartsCommand({ + dryRun: values["dry-run"] === true, + // --batch is the id-page size; blobs are fetched one message at a time, so + // memory is bounded regardless and 200 ids/page is fine for both modes. + batch: values.batch ? Number(values.batch) : 200, + limit: values.limit ? Number(values.limit) : undefined, + afterId: values["after-id"] as string | undefined, + reconcile: values.reconcile === true, + }); + process.exit(code); +} + // ── Auth / Link helpers ──────────────────────────────────────────────── function resolveDataDir(): string { return ( diff --git a/apps/mesh/src/cli/commands/backfill-message-parts.ts b/apps/mesh/src/cli/commands/backfill-message-parts.ts new file mode 100644 index 0000000000..91b13e5159 --- /dev/null +++ b/apps/mesh/src/cli/commands/backfill-message-parts.ts @@ -0,0 +1,302 @@ +/** + * Backfill / reconcile `thread_messages.parts` → `message_parts` + * + * Part of the expand/contract migration started in migration 098. Dual-write + * (in `ThreadStorage.saveMessages`) mirrors *new* messages into `message_parts` + * live; this command handles the *existing* rows, in two modes. + * + * # copy existing rows (run once after deploying dual-write) + * bun run deco backfill-message-parts [--dry-run] [--batch 200] [--limit N] \ + * [--after-id ] + * + * # verify the mirror matches and repair drift (run before cutting reads over) + * bun run deco backfill-message-parts --reconcile [--dry-run] [--batch 200] \ + * [--limit N] [--after-id ] + * + * Scale (100k+ rows, many 50 MB+): we keyset-paginate over PRIMARY KEYS ONLY + * (`--batch` ids per page — tiny, no blob transfer), fetch the grouped mirror + * count for the page in one query, then process each message individually: + * COPY skips any message that already has mirror rows (so resume / re-runs + * never re-transfer a 50 MB blob), and only then fetches that one message's + * `parts`. This bounds client memory to a single message instead of + * batch × 50 MB. + * + * Each message's write is wrapped in its OWN transaction (delete+insert for + * reconcile; chunked insert for copy) — never a run-wide transaction, which + * would hold locks across all 100k rows and explode WAL. Inserts are chunked + * under Postgres' 65535-parameter limit. + * + * `content` is stored as a JSON-text fragment (text column — parts can contain + * ` `, which jsonb rejects). Idempotent: copy uses ON CONFLICT DO NOTHING and + * skips already-mirrored messages; reconcile repairs only count mismatches. The + * cursor is logged each page; pass `--after-id ` to resume. + */ + +import { closeDatabase, getDb } from "../../database"; + +export interface BackfillMessagePartsOptions { + dryRun: boolean; + batch: number; + limit?: number; + afterId?: string; + /** Verify the mirror matches `parts` and repair drift, instead of copying. */ + reconcile: boolean; +} + +type PartRow = { + message_id: string; + idx: number; + type: string; + content: string; + created_at: string; + updated_at: string; +}; + +// 4 columns per row; stay well under Postgres' 65535 bind-parameter ceiling. +const MAX_ROWS_PER_INSERT = 10_000; + +function humanBytes(n: number): string { + if (n < 1024) return `${n} B`; + const units = ["KB", "MB", "GB", "TB"]; + let v = n / 1024; + let i = 0; + while (v >= 1024 && i < units.length - 1) { + v /= 1024; + i++; + } + return `${v.toFixed(1)} ${units[i]}`; +} + +function fmtDuration(ms: number): string { + if (!Number.isFinite(ms) || ms <= 0) return "0s"; + const s = Math.round(ms / 1000); + const h = Math.floor(s / 3600); + const m = Math.floor((s % 3600) / 60); + const sec = s % 60; + return [h ? `${h}h` : "", m ? `${m}m` : "", `${sec}s`] + .filter(Boolean) + .join(""); +} + +/** + * Build the normalized rows for a message's parts blob (NUL-safe via JS). + * `ts` stamps both timestamps: backfilled parts have no true per-part time, so + * they inherit the message's `created_at`. + */ +function partRowsFor( + messageId: string, + rawParts: unknown, + ts: string, +): PartRow[] { + const parsed = + typeof rawParts === "string" + ? (JSON.parse(rawParts) as unknown[]) + : (rawParts as unknown[]); + if (!Array.isArray(parsed)) return []; + return parsed.map((part, idx) => ({ + message_id: messageId, + idx, + type: String((part as { type?: unknown })?.type ?? "unknown"), + content: JSON.stringify(part), + created_at: ts, + updated_at: ts, + })); +} + +function* chunk(arr: T[], size: number): Generator { + for (let i = 0; i < arr.length; i += size) yield arr.slice(i, i + size); +} + +export async function backfillMessagePartsCommand( + opts: BackfillMessagePartsOptions, +): Promise { + const { dryRun, batch, reconcile } = opts; + const database = getDb(); + const { db } = database; + const mode = reconcile ? "reconcile" : "copy"; + const verb = dryRun + ? reconcile + ? "would-repair" + : "would-copy" + : reconcile + ? "repaired" + : "copied"; + + // Exact count is cheap: count(*) reads tuple visibility, not the TOASTed blob. + const totalRow = await db + .selectFrom("thread_messages") + .select((eb) => eb.fn.countAll().as("c")) + .executeTakeFirst(); + const total = Number(totalRow?.c ?? 0); + + console.log( + `[backfill-message-parts] starting (${mode})${dryRun ? " dry-run" : ""} ` + + `total=${total} batch=${batch}${opts.limit ? ` limit=${opts.limit}` : ""}` + + `${opts.afterId ? ` resume-after=${opts.afterId}` : ""}`, + ); + + const startedAt = Date.now(); + let scanned = 0; + let changed = 0; // copied or repaired messages + let skipped = 0; // already-mirrored (copy) / already-matching (reconcile) + let partsWritten = 0; + let bytesRead = 0; + let errors = 0; + let maxBytes = 0; + let maxBytesId = ""; + let cursor = opts.afterId ?? ""; + + const logProgress = (tag: string): void => { + const elapsed = Date.now() - startedAt; + const rate = elapsed > 0 ? scanned / (elapsed / 1000) : 0; + const pct = total > 0 ? ((scanned / total) * 100).toFixed(1) : "?"; + const eta = rate > 0 ? ((total - scanned) / rate) * 1000 : 0; + console.log( + `[backfill-message-parts] ${tag} ${scanned}/${total} (${pct}%) ` + + `${verb}=${changed} skipped=${skipped} parts=${partsWritten} ` + + `read=${humanBytes(bytesRead)} errors=${errors} ` + + `rate=${rate.toFixed(1)}/s elapsed=${fmtDuration(elapsed)} ` + + `eta=${fmtDuration(eta)} cursor=${cursor}`, + ); + }; + + /** Copy: chunked insert, never clobbering live rows. Own transaction. */ + const copyMessage = async (rows: PartRow[]): Promise => { + await db.transaction().execute(async (trx) => { + for (const part of chunk(rows, MAX_ROWS_PER_INSERT)) { + await trx + .insertInto("message_parts") + .values(part) + .onConflict((oc) => oc.columns(["message_id", "idx"]).doNothing()) + .execute(); + } + }); + }; + + /** Reconcile: atomically replace a message's mirror. Own transaction. */ + const replaceMessage = async ( + messageId: string, + rows: PartRow[], + ): Promise => { + await db.transaction().execute(async (trx) => { + await trx + .deleteFrom("message_parts") + .where("message_id", "=", messageId) + .execute(); + for (const part of chunk(rows, MAX_ROWS_PER_INSERT)) { + await trx.insertInto("message_parts").values(part).execute(); + } + }); + }; + + try { + for (;;) { + // `!= null` (not truthiness): `--limit 0` must mean "scan nothing", not + // fall through to an unbounded run. + const remaining = opts.limit != null ? opts.limit - scanned : Infinity; + if (remaining <= 0) break; + + // Page over PKs only — tiny, no blob transfer. + const idRows = await db + .selectFrom("thread_messages") + .select("id") + .where("id", ">", cursor) + .orderBy("id", "asc") + .limit(Math.min(batch, remaining)) + .execute(); + if (idRows.length === 0) break; + const ids = idRows.map((r) => r.id); + + // One grouped query gives the current mirror state for the whole page. + const mirror = new Map(); + const counts = await db + .selectFrom("message_parts") + .select(["message_id", (eb) => eb.fn.countAll().as("cnt")]) + .where("message_id", "in", ids) + .groupBy("message_id") + .execute(); + for (const c of counts) mirror.set(c.message_id, Number(c.cnt)); + + for (const id of ids) { + cursor = id; + scanned++; + try { + const have = mirror.get(id) ?? 0; + + // Copy: a message with any mirror rows is already done (single + // atomic insert → 0 or all). Skip without touching its blob. + if (!reconcile && have > 0) { + skipped++; + continue; + } + + // Fetch just this one message's parts (bounds memory to one row). + // `created_at` stamps the backfilled parts (no true per-part time). + const row = await db + .selectFrom("thread_messages") + .select(["parts", "created_at"]) + .where("id", "=", id) + .executeTakeFirst(); + if (!row) { + skipped++; + continue; + } + const messageTs = + typeof row.created_at === "string" + ? row.created_at + : row.created_at.toISOString(); + + const rawParts: unknown = row.parts; + const partsText = + typeof rawParts === "string" ? rawParts : JSON.stringify(rawParts); + const b = Buffer.byteLength(partsText, "utf8"); + bytesRead += b; + if (b > maxBytes) { + maxBytes = b; + maxBytesId = id; + console.log( + `[backfill-message-parts] new largest message ${id}: ${humanBytes(b)}`, + ); + } + + const partRows = partRowsFor(id, rawParts, messageTs); + + if (reconcile) { + if (have === partRows.length) { + skipped++; + continue; + } + if (!dryRun) await replaceMessage(id, partRows); + } else { + if (partRows.length === 0) { + skipped++; + continue; + } + if (!dryRun) await copyMessage(partRows); + } + changed++; + partsWritten += partRows.length; + } catch (err) { + errors++; + console.error( + `[backfill-message-parts] message ${id} failed, skipping:`, + err, + ); + } + } + + logProgress("progress"); + } + } finally { + await closeDatabase(database).catch(() => {}); + } + + logProgress("done"); + console.log( + `[backfill-message-parts] summary (${mode}) — scanned=${scanned} ` + + `${verb}=${changed} skipped=${skipped} parts=${partsWritten} ` + + `read=${humanBytes(bytesRead)} errors=${errors} ` + + `largest=${maxBytesId ? `${maxBytesId} (${humanBytes(maxBytes)})` : "n/a"}`, + ); + return errors > 0 ? 1 : 0; +} diff --git a/apps/mesh/src/storage/threads.ts b/apps/mesh/src/storage/threads.ts index a5f160b863..eaf349e140 100644 --- a/apps/mesh/src/storage/threads.ts +++ b/apps/mesh/src/storage/threads.ts @@ -5,7 +5,7 @@ * Threads are organization-scoped, messages are thread-scoped. */ -import { type Kysely } from "kysely"; +import { type Kysely, sql } from "kysely"; import { generatePrefixedId } from "@/shared/utils/generate-id"; import { DEFAULT_THREAD_TITLE } from "@/api/routes/decopilot/constants"; import type { ThreadStoragePort } from "./ports"; @@ -21,6 +21,10 @@ function toIsoString(v: Date | string): string { return typeof v === "string" ? v : v.toISOString(); } +// `message_parts` rows have 4 columns; cap rows/INSERT well under Postgres' +// 65535 bind-parameter ceiling so a large saveMessages batch can't overflow it. +const MAX_PART_ROWS_PER_INSERT = 10_000; + // ============================================================================ // Org-Scoped Thread Storage (repository pattern) // ============================================================================ @@ -568,6 +572,60 @@ export class SqlThreadStorage implements ThreadStoragePort { ) .execute(); + // Dual-write (migration 098): mirror parts into the normalized + // `message_parts` table. Kept to two statements total (NOT two per + // message) so the transaction's lock window doesn't grow with batch size: + // one multi-row upsert, then one combined delete of any trailing rows + // (handles a message whose part count shrank; idx >= 0 clears one that + // dropped to zero parts). Idempotent — the backfill uses ON CONFLICT DO + // NOTHING, so live writes here always win for active messages. + const partRows = unique.flatMap((message) => + message.parts.map((part, idx) => ({ + message_id: message.id, + idx, + type: String((part as { type?: unknown }).type ?? "unknown"), + content: JSON.stringify(part), + created_at: now, + updated_at: now, + })), + ); + // Chunk under the bind-parameter limit; usually a single iteration. + for (let i = 0; i < partRows.length; i += MAX_PART_ROWS_PER_INSERT) { + await trx + .insertInto("message_parts") + .values(partRows.slice(i, i + MAX_PART_ROWS_PER_INSERT)) + .onConflict((oc) => + oc + .columns(["message_id", "idx"]) + .doUpdateSet((eb) => ({ + type: eb.ref("excluded.type"), + content: eb.ref("excluded.content"), + // created_at is intentionally NOT updated → first-write time + // survives in-place edits across streaming saves. + updated_at: eb.ref("excluded.updated_at"), + })) + // Skip no-op rewrites: an unchanged part isn't touched, so no dead + // tuple/WAL and `updated_at` only moves on a real content change. + .where( + sql`message_parts.content is distinct from excluded.content`, + ), + ) + .execute(); + } + await trx + .deleteFrom("message_parts") + .where((eb) => + eb.or( + unique.map((message) => + eb.and([ + eb("message_id", "=", message.id), + eb("idx", ">=", message.parts.length), + ]), + ), + ), + ) + .execute(); + await trx .updateTable("threads") .set({ updated_at: now }) diff --git a/apps/mesh/src/storage/types.ts b/apps/mesh/src/storage/types.ts index 6fd3de2b9c..d271235a30 100644 --- a/apps/mesh/src/storage/types.ts +++ b/apps/mesh/src/storage/types.ts @@ -1023,6 +1023,24 @@ export interface ThreadMessageTable { created_at: ColumnType; updated_at: ColumnType; } +/** + * Normalized message parts (migration 098). One row per element of a message's + * `parts` array. `content` is the part's JSON-text fragment stored verbatim in + * a `text` column (NOT jsonb — parts can contain ` `, e.g. binary tool + * outputs, which jsonb rejects); the app JSON.parses it on read. `type` is + * denormalized from `part.type` for filtering. Reconstruct the array via + * `'[' || string_agg(content, ',' ORDER BY idx) || ']'`. `created_at` is set + * once (preserved on conflict); `updated_at` bumps only when `content` changes. + */ +export interface MessagePartsTable { + message_id: string; + idx: number; + type: string; + content: string; + created_at: ColumnType; + updated_at: ColumnType; +} + export interface ThreadMessage extends ChatMessage { thread_id: string; created_at: string; @@ -1316,6 +1334,7 @@ export interface Database { threads: ThreadTable; thread_messages: ThreadMessageTable; + message_parts: MessagePartsTable; async_research_jobs: AsyncResearchJobTable; // Member tags tables