From 94be11d2751b5f2ae76b007523aefe31fbbc8835 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20Fran=C3=A7a?= Date: Mon, 1 Jun 2026 17:14:09 -0300 Subject: [PATCH 1/6] feat(migrations): add backfill command for message parts and update storage logic - Introduced a new migration (098) to handle the backfill of `thread_messages.parts` into the new `message_parts` table. - Added a CLI command `backfill-message-parts` to facilitate copying and reconciling existing message parts. - Updated `SqlThreadStorage` to dual-write message parts during message saves, ensuring consistency between `thread_messages` and `message_parts`. - Defined a new `MessagePartsTable` interface to represent the structure of the normalized message parts in the database. --- apps/mesh/migrations/098-message-parts.ts | Bin 0 -> 2467 bytes apps/mesh/migrations/index.ts | 2 + apps/mesh/src/cli.ts | 20 ++ .../cli/commands/backfill-message-parts.ts | 188 ++++++++++++++++++ apps/mesh/src/storage/threads.ts | 31 +++ apps/mesh/src/storage/types.ts | 16 ++ 6 files changed, 257 insertions(+) create mode 100644 apps/mesh/migrations/098-message-parts.ts create mode 100644 apps/mesh/src/cli/commands/backfill-message-parts.ts diff --git a/apps/mesh/migrations/098-message-parts.ts b/apps/mesh/migrations/098-message-parts.ts new file mode 100644 index 0000000000000000000000000000000000000000..3a59275c4661812214203cc9e97c8aed5b3a51c3 GIT binary patch literal 2467 zcmbVO+iu%N5Y4l`VjvU{>Bv+XAVBLT0UTRJTHgd)MbQR;Mup|KoTfd3Q!O-0pU(7bmZ>EJ@tjgGzDl|Y#5k(z_#h1g2YPOhI7bT z=7KrXSc(BE%&4Q_W>_5E>Yg{>Nz zS^1#Ms@=nw0zBV^ zx(;^_=hIV|@4h*l-VcHO(~}wOogDsjbllO0*MbJB)w#oppx~w)7Fwthr1G1d*E#Q& zSenDyY8n7Vv5qM~c;XqedOP4%AUcaxNe3Z0KAFP@Z_Ogz;i)y`EtK3cQ{NMgCJ1F9 ze?rp0Y?Z-68R42pbFg)&YEqn58-!nfn8BK$L+8^mT&`8I<~%d&!-^1xJXXpl@jIt_ zACP#MJbylW_97)Y1=1w7mlJC(pI@?Qc$Z$3$TJWOoG^bB<-B7JNK3K7O3`F=w5c#> zbx6F?83AC+n2D#&+A~Y*sG_DN(IPWAC88N0Tum*Co&zJov9*PUfhlOEhKwdIKDO2) z>*LNqR24UEBKAxMPpv5uC|;2S$xR||EL>Vxk9`C_?NoU-J)G{%shH5{3rt4uM)2tq zA;evti`6RWmOh5jcmyY>`$ViazkrfgDIXlYAJH%G-jA3h#J=xOPmcO&T8n|7-%d}b zU!Exmui&e%>GWhUP-SIZ00sy?!a?JadcKbE!Eh6Ahu;kbWG#G5_{J2ltW6QUx2_Vo z*y*BP)@EburFl(6n-TiT6nooB+!CVNV{2DG5SqXuwTt6Kok(IOpnq&!u@CDJ8e5TFDw4i_} ze`2t=-3^gPW1NZHgluS8Yr>(>D2#`?A|s|NBQM;GzID^BlEX0!nbh=6o6;$~dL2)b z$6B2k8;&Vgl4cH<=+G4Cli_`OQqSD!C5PQ7R4iuMV?G1 zuF7#MozAWvZlF`ef1HNVn>kgIuN`2s7lluz7j8BgZSlO?U_&Y2eW7E!U = { migration096orgfileconfigspublicurlbase, "097-drop-local-docker-sandbox-state": migration097droplocaldockersandboxstate, + "098-message-parts": migration098messageparts, }; export default migrations; diff --git a/apps/mesh/src/cli.ts b/apps/mesh/src/cli.ts index f8610791f1..2a13a2c863 100644 --- a/apps/mesh/src/cli.ts +++ b/apps/mesh/src/cli.ts @@ -70,6 +70,8 @@ const { values, positionals } = parseArgs({ batch: { type: "string" }, limit: { type: "string" }, org: { type: "string" }, + "after-id": { type: "string" }, + reconcile: { type: "boolean", default: false }, }, allowPositionals: true, }); @@ -87,6 +89,7 @@ Usage: deco auth Manage CLI authentication deco link [studio-url] [options] Start the desktop-side link daemon deco backfill-assets Hoist legacy inline media out of threads + connections + deco backfill-message-parts Copy thread_messages.parts into message_parts (--reconcile to verify/repair) deco completion [shell] Install shell completions Server Options: @@ -227,6 +230,23 @@ if (command === "backfill-assets") { process.exit(code); } +// ── Backfill: copy thread_messages.parts into message_parts ───────────── +if (command === "backfill-message-parts") { + const { backfillMessagePartsCommand } = await import( + "./cli/commands/backfill-message-parts" + ); + const reconcile = values.reconcile === true; + const code = await backfillMessagePartsCommand({ + dryRun: values["dry-run"] === true, + // reconcile pages are cheap (count check); copy pages can be huge rows. + batch: values.batch ? Number(values.batch) : reconcile ? 200 : 50, + limit: values.limit ? Number(values.limit) : undefined, + afterId: values["after-id"] as string | undefined, + reconcile, + }); + process.exit(code); +} + // ── Auth / Link helpers ──────────────────────────────────────────────── function resolveDataDir(): string { return ( diff --git a/apps/mesh/src/cli/commands/backfill-message-parts.ts b/apps/mesh/src/cli/commands/backfill-message-parts.ts new file mode 100644 index 0000000000..747baca859 --- /dev/null +++ b/apps/mesh/src/cli/commands/backfill-message-parts.ts @@ -0,0 +1,188 @@ +/** + * Backfill / reconcile `thread_messages.parts` → `message_parts` + * + * Part of the expand/contract migration started in migration 098. Dual-write + * (in `ThreadStorage.saveMessages`) mirrors *new* messages into `message_parts` + * live; this command handles the *existing* rows, in two modes. + * + * # copy existing rows (run once after deploying dual-write) + * bun run deco backfill-message-parts [--dry-run] [--batch 50] [--limit N] \ + * [--after-id ] + * + * # verify the mirror matches and repair drift (run right before cutting + * # reads over to message_parts) + * bun run deco backfill-message-parts --reconcile [--dry-run] [--batch 200] \ + * [--limit N] [--after-id ] + * + * Why reconcile exists: the copy pass uses ON CONFLICT DO NOTHING, so it never + * clobbers live dual-write data — but a live edit that *shrinks* a message + * during the copy window can race and leave a stale trailing part. Nothing + * reads `message_parts` until the read-cutover PR, so this pass is the gate: + * it re-derives the correct parts from `thread_messages.parts` (still the + * source of truth until we stop writing it) and repairs any message whose + * mirror count differs, by atomically replacing that message's parts. + * + * Scale: `thread_messages` is multi-GB with individual rows reaching 50 MB+. + * Both modes keyset-paginate on the primary key (bounded PK range scan per + * page, never a full scan) and operate one message per write statement so each + * INSERT stays bounded. Idempotent: re-running copy is a no-op on copied rows; + * re-running reconcile finds matching counts and repairs nothing. The cursor is + * logged each page; pass `--after-id ` to resume an interrupted run. + */ + +import { closeDatabase, getDb } from "../../database"; + +export interface BackfillMessagePartsOptions { + dryRun: boolean; + batch: number; + limit?: number; + afterId?: string; + /** Verify the mirror matches `parts` and repair drift, instead of copying. */ + reconcile: boolean; +} + +/** Build the normalized rows for a message's parts blob. */ +function partRowsFor( + messageId: string, + rawParts: unknown, +): Array<{ message_id: string; idx: number; type: string; content: string }> { + const parts = + typeof rawParts === "string" + ? (JSON.parse(rawParts) as unknown[]) + : (rawParts as unknown[]); + if (!Array.isArray(parts)) return []; + return parts.map((part, idx) => ({ + message_id: messageId, + idx, + type: String((part as { type?: unknown })?.type ?? "unknown"), + content: JSON.stringify(part), + })); +} + +export async function backfillMessagePartsCommand( + opts: BackfillMessagePartsOptions, +): Promise { + const { dryRun, batch, reconcile } = opts; + const database = getDb(); + const { db } = database; + const mode = reconcile ? "reconcile" : "copy"; + + console.log( + `[backfill-message-parts] starting (${mode})${dryRun ? " dry-run" : ""} ` + + `batch=${batch}${opts.limit ? ` limit=${opts.limit}` : ""}`, + ); + + let scanned = 0; + let changed = 0; // copied messages, or repaired messages + let parts = 0; // parts written + let errors = 0; + let cursor = opts.afterId ?? ""; + if (cursor) { + console.log(`[backfill-message-parts] resuming after id ${cursor}`); + } + const verb = dryRun + ? reconcile + ? "would-repair" + : "would-copy" + : reconcile + ? "repaired" + : "copied"; + + /** Atomically replace one message's mirror with `rows` (used by reconcile). */ + const replaceMessage = async ( + messageId: string, + rows: ReturnType, + ): Promise => { + await db.transaction().execute(async (trx) => { + await trx + .deleteFrom("message_parts") + .where("message_id", "=", messageId) + .execute(); + if (rows.length > 0) { + await trx.insertInto("message_parts").values(rows).execute(); + } + }); + }; + + try { + for (;;) { + const remaining = opts.limit ? opts.limit - scanned : Infinity; + if (remaining <= 0) break; + const rows = await db + .selectFrom("thread_messages") + .select(["id", "parts"]) + .where("id", ">", cursor) + .orderBy("id", "asc") + .limit(Math.min(batch, remaining)) + .execute(); + if (rows.length === 0) break; + + // Reconcile needs the current mirror counts for the page in one query. + const mirrorCounts = new Map(); + if (reconcile) { + const counts = await db + .selectFrom("message_parts") + .select(["message_id", (eb) => eb.fn.countAll().as("cnt")]) + .where( + "message_id", + "in", + rows.map((r) => r.id), + ) + .groupBy("message_id") + .execute(); + for (const c of counts) { + mirrorCounts.set(c.message_id, Number(c.cnt)); + } + } + + for (const row of rows) { + cursor = row.id; + scanned++; + try { + const partRows = partRowsFor(row.id, row.parts); + + if (reconcile) { + // Repair only when the mirror's part count diverges from the blob. + const have = mirrorCounts.get(row.id) ?? 0; + if (have === partRows.length) continue; + if (!dryRun) await replaceMessage(row.id, partRows); + changed++; + parts += partRows.length; + continue; + } + + // Copy mode: never clobber live dual-write rows. + if (partRows.length === 0) continue; + if (!dryRun) { + await db + .insertInto("message_parts") + .values(partRows) + .onConflict((oc) => oc.columns(["message_id", "idx"]).doNothing()) + .execute(); + } + changed++; + parts += partRows.length; + } catch (err) { + errors++; + console.error( + `[backfill-message-parts] message ${row.id} failed, skipping:`, + err, + ); + } + } + + console.log( + `[backfill-message-parts] scanned=${scanned} ${verb}=${changed} ` + + `parts=${parts} errors=${errors} cursor=${cursor}`, + ); + } + } finally { + await closeDatabase(database).catch(() => {}); + } + + console.log( + `[backfill-message-parts] done (${mode}) — scanned=${scanned} ` + + `${verb}=${changed} parts=${parts} errors=${errors}`, + ); + return errors > 0 ? 1 : 0; +} diff --git a/apps/mesh/src/storage/threads.ts b/apps/mesh/src/storage/threads.ts index a5f160b863..627e51ae05 100644 --- a/apps/mesh/src/storage/threads.ts +++ b/apps/mesh/src/storage/threads.ts @@ -568,6 +568,37 @@ export class SqlThreadStorage implements ThreadStoragePort { ) .execute(); + // Dual-write (migration 098): mirror each message's parts into the + // normalized `message_parts` table. Upsert by (message_id, idx) and + // delete any trailing rows so a message whose part count shrank doesn't + // keep stale tail rows. Idempotent — the backfill uses ON CONFLICT DO + // NOTHING, so live writes here always win for active messages. + for (const message of unique) { + const partRows = message.parts.map((part, idx) => ({ + message_id: message.id, + idx, + type: String((part as { type?: unknown }).type ?? "unknown"), + content: JSON.stringify(part), + })); + if (partRows.length > 0) { + await trx + .insertInto("message_parts") + .values(partRows) + .onConflict((oc) => + oc.columns(["message_id", "idx"]).doUpdateSet((eb) => ({ + type: eb.ref("excluded.type"), + content: eb.ref("excluded.content"), + })), + ) + .execute(); + } + await trx + .deleteFrom("message_parts") + .where("message_id", "=", message.id) + .where("idx", ">=", partRows.length) + .execute(); + } + await trx .updateTable("threads") .set({ updated_at: now }) diff --git a/apps/mesh/src/storage/types.ts b/apps/mesh/src/storage/types.ts index 6fd3de2b9c..7615494870 100644 --- a/apps/mesh/src/storage/types.ts +++ b/apps/mesh/src/storage/types.ts @@ -1023,6 +1023,21 @@ export interface ThreadMessageTable { created_at: ColumnType; updated_at: ColumnType; } +/** + * Normalized message parts (migration 098). One row per element of a message's + * `parts` array. `content` is the part's JSON-text fragment stored verbatim in + * a `text` column (NOT jsonb — parts can contain ` `, e.g. binary tool + * outputs, which jsonb rejects); the app JSON.parses it on read. `type` is + * denormalized from `part.type` for filtering. Reconstruct the array via + * `'[' || string_agg(content, ',' ORDER BY idx) || ']'`. + */ +export interface MessagePartsTable { + message_id: string; + idx: number; + type: string; + content: string; +} + export interface ThreadMessage extends ChatMessage { thread_id: string; created_at: string; @@ -1316,6 +1331,7 @@ export interface Database { threads: ThreadTable; thread_messages: ThreadMessageTable; + message_parts: MessagePartsTable; async_research_jobs: AsyncResearchJobTable; // Member tags tables From ffed030b83b524b6ff2162e8ee67260bb77939d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20Fran=C3=A7a?= Date: Mon, 1 Jun 2026 17:22:19 -0300 Subject: [PATCH 2/6] chore(migrations): update binary for message parts migration --- apps/mesh/migrations/098-message-parts.ts | Bin 2467 -> 2465 bytes 1 file changed, 0 insertions(+), 0 deletions(-) diff --git a/apps/mesh/migrations/098-message-parts.ts b/apps/mesh/migrations/098-message-parts.ts index 3a59275c4661812214203cc9e97c8aed5b3a51c3..82df3ebad7cf2ba9c887aa9b34393d601e5a1648 100644 GIT binary patch delta 40 wcmZ21yij;U7Bf?f!RBmcV;140%)G>+N`;dA{2Yb+(vpJGlH$!<*xxe(020&=h5!Hn delta 42 ycmZ1|yjXZc7BeHm<{V~Y7BL0|jik)H#G*=tlKlJ}h5XWzg3=Pr&70WYGXek*iVgVy From 187cfc3207c8e0a87c68d0876e2da2ace1ca8877 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20Fran=C3=A7a?= Date: Mon, 1 Jun 2026 17:23:27 -0300 Subject: [PATCH 3/6] refactor(cli): simplify backfill-message-parts command options and improve documentation - Updated the default batch size for the backfill-message-parts command to 200. - Enhanced comments and documentation for clarity on command usage and performance considerations. - Introduced utility functions for better handling of message parts and logging progress during execution. - Improved memory management by ensuring that each message's write operation is wrapped in its own transaction. --- apps/mesh/src/cli.ts | 8 +- .../cli/commands/backfill-message-parts.ts | 271 ++++++++++++------ 2 files changed, 187 insertions(+), 92 deletions(-) diff --git a/apps/mesh/src/cli.ts b/apps/mesh/src/cli.ts index 2a13a2c863..9d17ee888c 100644 --- a/apps/mesh/src/cli.ts +++ b/apps/mesh/src/cli.ts @@ -235,14 +235,14 @@ if (command === "backfill-message-parts") { const { backfillMessagePartsCommand } = await import( "./cli/commands/backfill-message-parts" ); - const reconcile = values.reconcile === true; const code = await backfillMessagePartsCommand({ dryRun: values["dry-run"] === true, - // reconcile pages are cheap (count check); copy pages can be huge rows. - batch: values.batch ? Number(values.batch) : reconcile ? 200 : 50, + // --batch is the id-page size; blobs are fetched one message at a time, so + // memory is bounded regardless and 200 ids/page is fine for both modes. + batch: values.batch ? Number(values.batch) : 200, limit: values.limit ? Number(values.limit) : undefined, afterId: values["after-id"] as string | undefined, - reconcile, + reconcile: values.reconcile === true, }); process.exit(code); } diff --git a/apps/mesh/src/cli/commands/backfill-message-parts.ts b/apps/mesh/src/cli/commands/backfill-message-parts.ts index 747baca859..54f5f6624b 100644 --- a/apps/mesh/src/cli/commands/backfill-message-parts.ts +++ b/apps/mesh/src/cli/commands/backfill-message-parts.ts @@ -6,28 +6,30 @@ * live; this command handles the *existing* rows, in two modes. * * # copy existing rows (run once after deploying dual-write) - * bun run deco backfill-message-parts [--dry-run] [--batch 50] [--limit N] \ + * bun run deco backfill-message-parts [--dry-run] [--batch 200] [--limit N] \ * [--after-id ] * - * # verify the mirror matches and repair drift (run right before cutting - * # reads over to message_parts) + * # verify the mirror matches and repair drift (run before cutting reads over) * bun run deco backfill-message-parts --reconcile [--dry-run] [--batch 200] \ * [--limit N] [--after-id ] * - * Why reconcile exists: the copy pass uses ON CONFLICT DO NOTHING, so it never - * clobbers live dual-write data — but a live edit that *shrinks* a message - * during the copy window can race and leave a stale trailing part. Nothing - * reads `message_parts` until the read-cutover PR, so this pass is the gate: - * it re-derives the correct parts from `thread_messages.parts` (still the - * source of truth until we stop writing it) and repairs any message whose - * mirror count differs, by atomically replacing that message's parts. + * Scale (100k+ rows, many 50 MB+): we keyset-paginate over PRIMARY KEYS ONLY + * (`--batch` ids per page — tiny, no blob transfer), fetch the grouped mirror + * count for the page in one query, then process each message individually: + * COPY skips any message that already has mirror rows (so resume / re-runs + * never re-transfer a 50 MB blob), and only then fetches that one message's + * `parts`. This bounds client memory to a single message instead of + * batch × 50 MB. * - * Scale: `thread_messages` is multi-GB with individual rows reaching 50 MB+. - * Both modes keyset-paginate on the primary key (bounded PK range scan per - * page, never a full scan) and operate one message per write statement so each - * INSERT stays bounded. Idempotent: re-running copy is a no-op on copied rows; - * re-running reconcile finds matching counts and repairs nothing. The cursor is - * logged each page; pass `--after-id ` to resume an interrupted run. + * Each message's write is wrapped in its OWN transaction (delete+insert for + * reconcile; chunked insert for copy) — never a run-wide transaction, which + * would hold locks across all 100k rows and explode WAL. Inserts are chunked + * under Postgres' 65535-parameter limit. + * + * `content` is stored as a JSON-text fragment (text column — parts can contain + * ` `, which jsonb rejects). Idempotent: copy uses ON CONFLICT DO NOTHING and + * skips already-mirrored messages; reconcile repairs only count mismatches. The + * cursor is logged each page; pass `--after-id ` to resume. */ import { closeDatabase, getDb } from "../../database"; @@ -41,17 +43,47 @@ export interface BackfillMessagePartsOptions { reconcile: boolean; } -/** Build the normalized rows for a message's parts blob. */ -function partRowsFor( - messageId: string, - rawParts: unknown, -): Array<{ message_id: string; idx: number; type: string; content: string }> { - const parts = +type PartRow = { + message_id: string; + idx: number; + type: string; + content: string; +}; + +// 4 columns per row; stay well under Postgres' 65535 bind-parameter ceiling. +const MAX_ROWS_PER_INSERT = 10_000; + +function humanBytes(n: number): string { + if (n < 1024) return `${n} B`; + const units = ["KB", "MB", "GB", "TB"]; + let v = n / 1024; + let i = 0; + while (v >= 1024 && i < units.length - 1) { + v /= 1024; + i++; + } + return `${v.toFixed(1)} ${units[i]}`; +} + +function fmtDuration(ms: number): string { + if (!Number.isFinite(ms) || ms <= 0) return "0s"; + const s = Math.round(ms / 1000); + const h = Math.floor(s / 3600); + const m = Math.floor((s % 3600) / 60); + const sec = s % 60; + return [h ? `${h}h` : "", m ? `${m}m` : "", `${sec}s`] + .filter(Boolean) + .join(""); +} + +/** Build the normalized rows for a message's parts blob (NUL-safe via JS). */ +function partRowsFor(messageId: string, rawParts: unknown): PartRow[] { + const parsed = typeof rawParts === "string" ? (JSON.parse(rawParts) as unknown[]) : (rawParts as unknown[]); - if (!Array.isArray(parts)) return []; - return parts.map((part, idx) => ({ + if (!Array.isArray(parsed)) return []; + return parsed.map((part, idx) => ({ message_id: messageId, idx, type: String((part as { type?: unknown })?.type ?? "unknown"), @@ -59,6 +91,10 @@ function partRowsFor( })); } +function* chunk(arr: T[], size: number): Generator { + for (let i = 0; i < arr.length; i += size) yield arr.slice(i, i + size); +} + export async function backfillMessagePartsCommand( opts: BackfillMessagePartsOptions, ): Promise { @@ -66,40 +102,77 @@ export async function backfillMessagePartsCommand( const database = getDb(); const { db } = database; const mode = reconcile ? "reconcile" : "copy"; + const verb = dryRun + ? reconcile + ? "would-repair" + : "would-copy" + : reconcile + ? "repaired" + : "copied"; + + // Exact count is cheap: count(*) reads tuple visibility, not the TOASTed blob. + const totalRow = await db + .selectFrom("thread_messages") + .select((eb) => eb.fn.countAll().as("c")) + .executeTakeFirst(); + const total = Number(totalRow?.c ?? 0); console.log( `[backfill-message-parts] starting (${mode})${dryRun ? " dry-run" : ""} ` + - `batch=${batch}${opts.limit ? ` limit=${opts.limit}` : ""}`, + `total=${total} batch=${batch}${opts.limit ? ` limit=${opts.limit}` : ""}` + + `${opts.afterId ? ` resume-after=${opts.afterId}` : ""}`, ); + const startedAt = Date.now(); let scanned = 0; - let changed = 0; // copied messages, or repaired messages - let parts = 0; // parts written + let changed = 0; // copied or repaired messages + let skipped = 0; // already-mirrored (copy) / already-matching (reconcile) + let partsWritten = 0; + let bytesRead = 0; let errors = 0; + let maxBytes = 0; + let maxBytesId = ""; let cursor = opts.afterId ?? ""; - if (cursor) { - console.log(`[backfill-message-parts] resuming after id ${cursor}`); - } - const verb = dryRun - ? reconcile - ? "would-repair" - : "would-copy" - : reconcile - ? "repaired" - : "copied"; - /** Atomically replace one message's mirror with `rows` (used by reconcile). */ + const logProgress = (tag: string): void => { + const elapsed = Date.now() - startedAt; + const rate = elapsed > 0 ? scanned / (elapsed / 1000) : 0; + const pct = total > 0 ? ((scanned / total) * 100).toFixed(1) : "?"; + const eta = rate > 0 ? ((total - scanned) / rate) * 1000 : 0; + console.log( + `[backfill-message-parts] ${tag} ${scanned}/${total} (${pct}%) ` + + `${verb}=${changed} skipped=${skipped} parts=${partsWritten} ` + + `read=${humanBytes(bytesRead)} errors=${errors} ` + + `rate=${rate.toFixed(1)}/s elapsed=${fmtDuration(elapsed)} ` + + `eta=${fmtDuration(eta)} cursor=${cursor}`, + ); + }; + + /** Copy: chunked insert, never clobbering live rows. Own transaction. */ + const copyMessage = async (rows: PartRow[]): Promise => { + await db.transaction().execute(async (trx) => { + for (const part of chunk(rows, MAX_ROWS_PER_INSERT)) { + await trx + .insertInto("message_parts") + .values(part) + .onConflict((oc) => oc.columns(["message_id", "idx"]).doNothing()) + .execute(); + } + }); + }; + + /** Reconcile: atomically replace a message's mirror. Own transaction. */ const replaceMessage = async ( messageId: string, - rows: ReturnType, + rows: PartRow[], ): Promise => { await db.transaction().execute(async (trx) => { await trx .deleteFrom("message_parts") .where("message_id", "=", messageId) .execute(); - if (rows.length > 0) { - await trx.insertInto("message_parts").values(rows).execute(); + for (const part of chunk(rows, MAX_ROWS_PER_INSERT)) { + await trx.insertInto("message_parts").values(part).execute(); } }); }; @@ -108,81 +181,103 @@ export async function backfillMessagePartsCommand( for (;;) { const remaining = opts.limit ? opts.limit - scanned : Infinity; if (remaining <= 0) break; - const rows = await db + + // Page over PKs only — tiny, no blob transfer. + const idRows = await db .selectFrom("thread_messages") - .select(["id", "parts"]) + .select("id") .where("id", ">", cursor) .orderBy("id", "asc") .limit(Math.min(batch, remaining)) .execute(); - if (rows.length === 0) break; - - // Reconcile needs the current mirror counts for the page in one query. - const mirrorCounts = new Map(); - if (reconcile) { - const counts = await db - .selectFrom("message_parts") - .select(["message_id", (eb) => eb.fn.countAll().as("cnt")]) - .where( - "message_id", - "in", - rows.map((r) => r.id), - ) - .groupBy("message_id") - .execute(); - for (const c of counts) { - mirrorCounts.set(c.message_id, Number(c.cnt)); - } - } + if (idRows.length === 0) break; + const ids = idRows.map((r) => r.id); + + // One grouped query gives the current mirror state for the whole page. + const mirror = new Map(); + const counts = await db + .selectFrom("message_parts") + .select(["message_id", (eb) => eb.fn.countAll().as("cnt")]) + .where("message_id", "in", ids) + .groupBy("message_id") + .execute(); + for (const c of counts) mirror.set(c.message_id, Number(c.cnt)); - for (const row of rows) { - cursor = row.id; + for (const id of ids) { + cursor = id; scanned++; try { - const partRows = partRowsFor(row.id, row.parts); + const have = mirror.get(id) ?? 0; - if (reconcile) { - // Repair only when the mirror's part count diverges from the blob. - const have = mirrorCounts.get(row.id) ?? 0; - if (have === partRows.length) continue; - if (!dryRun) await replaceMessage(row.id, partRows); - changed++; - parts += partRows.length; + // Copy: a message with any mirror rows is already done (single + // atomic insert → 0 or all). Skip without touching its blob. + if (!reconcile && have > 0) { + skipped++; + continue; + } + + // Fetch just this one message's parts (bounds memory to one row). + const row = await db + .selectFrom("thread_messages") + .select("parts") + .where("id", "=", id) + .executeTakeFirst(); + if (!row) { + skipped++; continue; } - // Copy mode: never clobber live dual-write rows. - if (partRows.length === 0) continue; - if (!dryRun) { - await db - .insertInto("message_parts") - .values(partRows) - .onConflict((oc) => oc.columns(["message_id", "idx"]).doNothing()) - .execute(); + const rawParts: unknown = row.parts; + const partsText = + typeof rawParts === "string" ? rawParts : JSON.stringify(rawParts); + const b = Buffer.byteLength(partsText, "utf8"); + bytesRead += b; + if (b > maxBytes) { + maxBytes = b; + maxBytesId = id; + console.log( + `[backfill-message-parts] new largest message ${id}: ${humanBytes(b)}`, + ); + } + + const partRows = partRowsFor(id, rawParts); + + if (reconcile) { + if (have === partRows.length) { + skipped++; + continue; + } + if (!dryRun) await replaceMessage(id, partRows); + } else { + if (partRows.length === 0) { + skipped++; + continue; + } + if (!dryRun) await copyMessage(partRows); } changed++; - parts += partRows.length; + partsWritten += partRows.length; } catch (err) { errors++; console.error( - `[backfill-message-parts] message ${row.id} failed, skipping:`, + `[backfill-message-parts] message ${id} failed, skipping:`, err, ); } } - console.log( - `[backfill-message-parts] scanned=${scanned} ${verb}=${changed} ` + - `parts=${parts} errors=${errors} cursor=${cursor}`, - ); + logProgress("progress"); } } finally { await closeDatabase(database).catch(() => {}); } + logProgress("done"); console.log( - `[backfill-message-parts] done (${mode}) — scanned=${scanned} ` + - `${verb}=${changed} parts=${parts} errors=${errors}`, + `[backfill-message-parts] summary (${mode}) — scanned=${scanned} ` + + `${verb}=${changed} skipped=${skipped} parts=${partsWritten} ` + + `read=${humanBytes(bytesRead)} errors=${errors} ` + + `largest=${maxBytesId ? `${maxBytesId} (${humanBytes(maxBytes)})` : "n/a"}`, ); return errors > 0 ? 1 : 0; } From 6e6cc6aa9a9453328f4786bf46bc3ab517560a30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20Fran=C3=A7a?= Date: Mon, 1 Jun 2026 17:26:59 -0300 Subject: [PATCH 4/6] refactor(storage): enhance message parts handling in dual-write migration - Updated the logic for inserting and deleting message parts in the `message_parts` table to improve efficiency and maintain consistency. - Simplified the upsert operation by using a single multi-row insert followed by a combined delete for trailing rows. - Enhanced comments for better clarity on the migration process and transaction handling. --- .../cli/commands/backfill-message-parts.ts | 4 +- apps/mesh/src/storage/threads.ts | 54 +++++++++++-------- 2 files changed, 35 insertions(+), 23 deletions(-) diff --git a/apps/mesh/src/cli/commands/backfill-message-parts.ts b/apps/mesh/src/cli/commands/backfill-message-parts.ts index 54f5f6624b..fa1108037f 100644 --- a/apps/mesh/src/cli/commands/backfill-message-parts.ts +++ b/apps/mesh/src/cli/commands/backfill-message-parts.ts @@ -179,7 +179,9 @@ export async function backfillMessagePartsCommand( try { for (;;) { - const remaining = opts.limit ? opts.limit - scanned : Infinity; + // `!= null` (not truthiness): `--limit 0` must mean "scan nothing", not + // fall through to an unbounded run. + const remaining = opts.limit != null ? opts.limit - scanned : Infinity; if (remaining <= 0) break; // Page over PKs only — tiny, no blob transfer. diff --git a/apps/mesh/src/storage/threads.ts b/apps/mesh/src/storage/threads.ts index 627e51ae05..031dc631a6 100644 --- a/apps/mesh/src/storage/threads.ts +++ b/apps/mesh/src/storage/threads.ts @@ -568,36 +568,46 @@ export class SqlThreadStorage implements ThreadStoragePort { ) .execute(); - // Dual-write (migration 098): mirror each message's parts into the - // normalized `message_parts` table. Upsert by (message_id, idx) and - // delete any trailing rows so a message whose part count shrank doesn't - // keep stale tail rows. Idempotent — the backfill uses ON CONFLICT DO + // Dual-write (migration 098): mirror parts into the normalized + // `message_parts` table. Kept to two statements total (NOT two per + // message) so the transaction's lock window doesn't grow with batch size: + // one multi-row upsert, then one combined delete of any trailing rows + // (handles a message whose part count shrank; idx >= 0 clears one that + // dropped to zero parts). Idempotent — the backfill uses ON CONFLICT DO // NOTHING, so live writes here always win for active messages. - for (const message of unique) { - const partRows = message.parts.map((part, idx) => ({ + const partRows = unique.flatMap((message) => + message.parts.map((part, idx) => ({ message_id: message.id, idx, type: String((part as { type?: unknown }).type ?? "unknown"), content: JSON.stringify(part), - })); - if (partRows.length > 0) { - await trx - .insertInto("message_parts") - .values(partRows) - .onConflict((oc) => - oc.columns(["message_id", "idx"]).doUpdateSet((eb) => ({ - type: eb.ref("excluded.type"), - content: eb.ref("excluded.content"), - })), - ) - .execute(); - } + })), + ); + if (partRows.length > 0) { await trx - .deleteFrom("message_parts") - .where("message_id", "=", message.id) - .where("idx", ">=", partRows.length) + .insertInto("message_parts") + .values(partRows) + .onConflict((oc) => + oc.columns(["message_id", "idx"]).doUpdateSet((eb) => ({ + type: eb.ref("excluded.type"), + content: eb.ref("excluded.content"), + })), + ) .execute(); } + await trx + .deleteFrom("message_parts") + .where((eb) => + eb.or( + unique.map((message) => + eb.and([ + eb("message_id", "=", message.id), + eb("idx", ">=", message.parts.length), + ]), + ), + ), + ) + .execute(); await trx .updateTable("threads") From e71fd4c00889d2f37aceeb292d165e63189eed65 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20Fran=C3=A7a?= Date: Mon, 1 Jun 2026 17:35:05 -0300 Subject: [PATCH 5/6] refactor(storage): optimize message parts insertion to prevent overflow - Introduced a constant to limit the number of rows inserted into the `message_parts` table, ensuring compliance with Postgres' bind-parameter ceiling. - Updated the insertion logic in `SqlThreadStorage` to chunk message parts into manageable sizes during batch inserts, enhancing performance and stability. --- apps/mesh/src/storage/threads.ts | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/apps/mesh/src/storage/threads.ts b/apps/mesh/src/storage/threads.ts index 031dc631a6..53b3e2e98e 100644 --- a/apps/mesh/src/storage/threads.ts +++ b/apps/mesh/src/storage/threads.ts @@ -21,6 +21,10 @@ function toIsoString(v: Date | string): string { return typeof v === "string" ? v : v.toISOString(); } +// `message_parts` rows have 4 columns; cap rows/INSERT well under Postgres' +// 65535 bind-parameter ceiling so a large saveMessages batch can't overflow it. +const MAX_PART_ROWS_PER_INSERT = 10_000; + // ============================================================================ // Org-Scoped Thread Storage (repository pattern) // ============================================================================ @@ -583,10 +587,11 @@ export class SqlThreadStorage implements ThreadStoragePort { content: JSON.stringify(part), })), ); - if (partRows.length > 0) { + // Chunk under the bind-parameter limit; usually a single iteration. + for (let i = 0; i < partRows.length; i += MAX_PART_ROWS_PER_INSERT) { await trx .insertInto("message_parts") - .values(partRows) + .values(partRows.slice(i, i + MAX_PART_ROWS_PER_INSERT)) .onConflict((oc) => oc.columns(["message_id", "idx"]).doUpdateSet((eb) => ({ type: eb.ref("excluded.type"), From 72237f63f9c1f5fc57aea60b853f8fc6537879f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20Fran=C3=A7a?= Date: Mon, 1 Jun 2026 19:42:21 -0300 Subject: [PATCH 6/6] feat(migrations): enhance message parts schema and backfill logic - Added `created_at` and `updated_at` columns to the `message_parts` table to track part-level timestamps. - Updated the backfill logic to inherit the message's `created_at` timestamp for backfilled parts. - Improved the insertion logic in `SqlThreadStorage` to maintain the integrity of timestamps during updates. - Enhanced comments for clarity on the handling of timestamps and dual-write behavior. --- apps/mesh/migrations/098-message-parts.ts | 15 ++++++++++- .../cli/commands/backfill-message-parts.ts | 25 ++++++++++++++++--- apps/mesh/src/storage/threads.ts | 22 ++++++++++++---- apps/mesh/src/storage/types.ts | 5 +++- 4 files changed, 56 insertions(+), 11 deletions(-) diff --git a/apps/mesh/migrations/098-message-parts.ts b/apps/mesh/migrations/098-message-parts.ts index 82df3ebad7..aef2a4edad 100644 --- a/apps/mesh/migrations/098-message-parts.ts +++ b/apps/mesh/migrations/098-message-parts.ts @@ -21,9 +21,16 @@ * fragments (the app then JSON.parses it, exactly like the old `parts` blob): * SELECT coalesce('[' || string_agg(content, ',' ORDER BY idx) || ']', '[]') * FROM message_parts WHERE message_id = $1 + * + * `created_at`/`updated_at` are per-part (text, like `thread_messages`): a part + * mutates in place across streaming saves (a tool-call goes input-streaming → + * input-available → output-available at the same idx), so the dual-write + * preserves `created_at` on conflict and bumps `updated_at` only when `content` + * actually changes. Backfilled rows inherit the message's `created_at`, since + * no true per-part time exists for them. */ -import type { Kysely } from "kysely"; +import { type Kysely, sql } from "kysely"; export async function up(db: Kysely): Promise { await db.schema @@ -38,6 +45,12 @@ export async function up(db: Kysely): Promise { // text, not jsonb: parts can contain binary tool outputs, which // jsonb rejects. Stores the JSON-text fragment verbatim, like `parts`. .addColumn("content", "text", (col) => col.notNull()) + .addColumn("created_at", "text", (col) => + col.notNull().defaultTo(sql`CURRENT_TIMESTAMP`), + ) + .addColumn("updated_at", "text", (col) => + col.notNull().defaultTo(sql`CURRENT_TIMESTAMP`), + ) .addPrimaryKeyConstraint("message_parts_pkey", ["message_id", "idx"]) .execute(); diff --git a/apps/mesh/src/cli/commands/backfill-message-parts.ts b/apps/mesh/src/cli/commands/backfill-message-parts.ts index fa1108037f..91b13e5159 100644 --- a/apps/mesh/src/cli/commands/backfill-message-parts.ts +++ b/apps/mesh/src/cli/commands/backfill-message-parts.ts @@ -48,6 +48,8 @@ type PartRow = { idx: number; type: string; content: string; + created_at: string; + updated_at: string; }; // 4 columns per row; stay well under Postgres' 65535 bind-parameter ceiling. @@ -76,8 +78,16 @@ function fmtDuration(ms: number): string { .join(""); } -/** Build the normalized rows for a message's parts blob (NUL-safe via JS). */ -function partRowsFor(messageId: string, rawParts: unknown): PartRow[] { +/** + * Build the normalized rows for a message's parts blob (NUL-safe via JS). + * `ts` stamps both timestamps: backfilled parts have no true per-part time, so + * they inherit the message's `created_at`. + */ +function partRowsFor( + messageId: string, + rawParts: unknown, + ts: string, +): PartRow[] { const parsed = typeof rawParts === "string" ? (JSON.parse(rawParts) as unknown[]) @@ -88,6 +98,8 @@ function partRowsFor(messageId: string, rawParts: unknown): PartRow[] { idx, type: String((part as { type?: unknown })?.type ?? "unknown"), content: JSON.stringify(part), + created_at: ts, + updated_at: ts, })); } @@ -219,15 +231,20 @@ export async function backfillMessagePartsCommand( } // Fetch just this one message's parts (bounds memory to one row). + // `created_at` stamps the backfilled parts (no true per-part time). const row = await db .selectFrom("thread_messages") - .select("parts") + .select(["parts", "created_at"]) .where("id", "=", id) .executeTakeFirst(); if (!row) { skipped++; continue; } + const messageTs = + typeof row.created_at === "string" + ? row.created_at + : row.created_at.toISOString(); const rawParts: unknown = row.parts; const partsText = @@ -242,7 +259,7 @@ export async function backfillMessagePartsCommand( ); } - const partRows = partRowsFor(id, rawParts); + const partRows = partRowsFor(id, rawParts, messageTs); if (reconcile) { if (have === partRows.length) { diff --git a/apps/mesh/src/storage/threads.ts b/apps/mesh/src/storage/threads.ts index 53b3e2e98e..eaf349e140 100644 --- a/apps/mesh/src/storage/threads.ts +++ b/apps/mesh/src/storage/threads.ts @@ -5,7 +5,7 @@ * Threads are organization-scoped, messages are thread-scoped. */ -import { type Kysely } from "kysely"; +import { type Kysely, sql } from "kysely"; import { generatePrefixedId } from "@/shared/utils/generate-id"; import { DEFAULT_THREAD_TITLE } from "@/api/routes/decopilot/constants"; import type { ThreadStoragePort } from "./ports"; @@ -585,6 +585,8 @@ export class SqlThreadStorage implements ThreadStoragePort { idx, type: String((part as { type?: unknown }).type ?? "unknown"), content: JSON.stringify(part), + created_at: now, + updated_at: now, })), ); // Chunk under the bind-parameter limit; usually a single iteration. @@ -593,10 +595,20 @@ export class SqlThreadStorage implements ThreadStoragePort { .insertInto("message_parts") .values(partRows.slice(i, i + MAX_PART_ROWS_PER_INSERT)) .onConflict((oc) => - oc.columns(["message_id", "idx"]).doUpdateSet((eb) => ({ - type: eb.ref("excluded.type"), - content: eb.ref("excluded.content"), - })), + oc + .columns(["message_id", "idx"]) + .doUpdateSet((eb) => ({ + type: eb.ref("excluded.type"), + content: eb.ref("excluded.content"), + // created_at is intentionally NOT updated → first-write time + // survives in-place edits across streaming saves. + updated_at: eb.ref("excluded.updated_at"), + })) + // Skip no-op rewrites: an unchanged part isn't touched, so no dead + // tuple/WAL and `updated_at` only moves on a real content change. + .where( + sql`message_parts.content is distinct from excluded.content`, + ), ) .execute(); } diff --git a/apps/mesh/src/storage/types.ts b/apps/mesh/src/storage/types.ts index 7615494870..d271235a30 100644 --- a/apps/mesh/src/storage/types.ts +++ b/apps/mesh/src/storage/types.ts @@ -1029,13 +1029,16 @@ export interface ThreadMessageTable { * a `text` column (NOT jsonb — parts can contain ` `, e.g. binary tool * outputs, which jsonb rejects); the app JSON.parses it on read. `type` is * denormalized from `part.type` for filtering. Reconstruct the array via - * `'[' || string_agg(content, ',' ORDER BY idx) || ']'`. + * `'[' || string_agg(content, ',' ORDER BY idx) || ']'`. `created_at` is set + * once (preserved on conflict); `updated_at` bumps only when `content` changes. */ export interface MessagePartsTable { message_id: string; idx: number; type: string; content: string; + created_at: ColumnType; + updated_at: ColumnType; } export interface ThreadMessage extends ChatMessage {