Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions apps/mesh/migrations/098-message-parts.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/**
* Message Parts Migration (expand phase)
*
* Moves `thread_messages.parts` (a JSON array blob, frequently 50 MB+ on busy
* threads because tool-result parts inline large payloads) into a normalized
* `message_parts` table — one row per part.
*
* This is the first, additive step of an expand/contract migration:
* 098 (this) create message_parts — additive, no lock on thread_messages
* + deploy dual-write saveMessages → both columns
* + backfill `deco backfill-message-parts` — copy existing parts, batched/resumable
* + deploy cut reads over to message_parts
* + deploy stop writing thread_messages.parts
* later ALTER TABLE thread_messages DROP COLUMN parts (then pg_repack to reclaim disk)
*
* `content` holds each part as a JSON-text fragment (NOT jsonb): real `parts`
* blobs contain `\0` (e.g. binary tool outputs like ZIP headers), which
* Postgres `jsonb`/`json` reject (22P05) but `text` stores fine — same as the
* `thread_messages.parts` column does today. `type` is denormalized from
* `part.type` for filtering. Reconstruct a message's array by concatenating the
* fragments (the app then JSON.parses it, exactly like the old `parts` blob):
* SELECT coalesce('[' || string_agg(content, ',' ORDER BY idx) || ']', '[]')
* FROM message_parts WHERE message_id = $1
*/

import type { Kysely } from "kysely";

export async function up(db: Kysely<unknown>): Promise<void> {
await db.schema
.createTable("message_parts")
// CASCADE DELETE: parts vanish with their message (which itself cascades
// from threads), so thread deletion still fully cleans up.
.addColumn("message_id", "text", (col) =>
col.notNull().references("thread_messages.id").onDelete("cascade"),
)
.addColumn("idx", "integer", (col) => col.notNull())
.addColumn("type", "text", (col) => col.notNull())
// text, not jsonb: parts can contain binary tool outputs, which
// jsonb rejects. Stores the JSON-text fragment verbatim, like `parts`.
.addColumn("content", "text", (col) => col.notNull())
.addPrimaryKeyConstraint("message_parts_pkey", ["message_id", "idx"])
.execute();

// The PK (message_id, idx) already serves the canonical read
// (WHERE message_id = $1 ORDER BY idx), so no extra index is needed.
}

export async function down(db: Kysely<unknown>): Promise<void> {
await db.schema.dropTable("message_parts").execute();
}
2 changes: 2 additions & 0 deletions apps/mesh/migrations/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ import * as migration094orgfileconfigs from "./094-org-file-configs.ts";
import * as migration095removeautomationtoolcallkind from "./095-remove-automation-tool-call-kind.ts";
import * as migration096orgfileconfigspublicurlbase from "./096-org-file-configs-public-url-base.ts";
import * as migration097droplocaldockersandboxstate from "./097-drop-local-docker-sandbox-state.ts";
import * as migration098messageparts from "./098-message-parts.ts";

/**
* Core migrations for the Mesh application.
Expand Down Expand Up @@ -214,6 +215,7 @@ const migrations: Record<string, Migration> = {
migration096orgfileconfigspublicurlbase,
"097-drop-local-docker-sandbox-state":
migration097droplocaldockersandboxstate,
"098-message-parts": migration098messageparts,
};

export default migrations;
20 changes: 20 additions & 0 deletions apps/mesh/src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ const { values, positionals } = parseArgs({
batch: { type: "string" },
limit: { type: "string" },
org: { type: "string" },
"after-id": { type: "string" },
reconcile: { type: "boolean", default: false },
},
allowPositionals: true,
});
Expand All @@ -87,6 +89,7 @@ Usage:
deco auth <login|whoami|logout> Manage CLI authentication
deco link [studio-url] [options] Start the desktop-side link daemon
deco backfill-assets Hoist legacy inline media out of threads + connections
deco backfill-message-parts Copy thread_messages.parts into message_parts (--reconcile to verify/repair)
deco completion [shell] Install shell completions

Server Options:
Expand Down Expand Up @@ -227,6 +230,23 @@ if (command === "backfill-assets") {
process.exit(code);
}

// ── Backfill: copy thread_messages.parts into message_parts ─────────────
if (command === "backfill-message-parts") {
const { backfillMessagePartsCommand } = await import(
"./cli/commands/backfill-message-parts"
);
const code = await backfillMessagePartsCommand({
dryRun: values["dry-run"] === true,
// --batch is the id-page size; blobs are fetched one message at a time, so
// memory is bounded regardless and 200 ids/page is fine for both modes.
batch: values.batch ? Number(values.batch) : 200,
limit: values.limit ? Number(values.limit) : undefined,
afterId: values["after-id"] as string | undefined,
reconcile: values.reconcile === true,
});
process.exit(code);
}

// ── Auth / Link helpers ────────────────────────────────────────────────
function resolveDataDir(): string {
return (
Expand Down
285 changes: 285 additions & 0 deletions apps/mesh/src/cli/commands/backfill-message-parts.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,285 @@
/**
* Backfill / reconcile `thread_messages.parts` → `message_parts`
*
* Part of the expand/contract migration started in migration 098. Dual-write
* (in `ThreadStorage.saveMessages`) mirrors *new* messages into `message_parts`
* live; this command handles the *existing* rows, in two modes.
*
* # copy existing rows (run once after deploying dual-write)
* bun run deco backfill-message-parts [--dry-run] [--batch 200] [--limit N] \
* [--after-id <id>]
*
* # verify the mirror matches and repair drift (run before cutting reads over)
* bun run deco backfill-message-parts --reconcile [--dry-run] [--batch 200] \
* [--limit N] [--after-id <id>]
*
* Scale (100k+ rows, many 50 MB+): we keyset-paginate over PRIMARY KEYS ONLY
* (`--batch` ids per page — tiny, no blob transfer), fetch the grouped mirror
* count for the page in one query, then process each message individually:
* COPY skips any message that already has mirror rows (so resume / re-runs
* never re-transfer a 50 MB blob), and only then fetches that one message's
* `parts`. This bounds client memory to a single message instead of
* batch × 50 MB.
*
* Each message's write is wrapped in its OWN transaction (delete+insert for
* reconcile; chunked insert for copy) — never a run-wide transaction, which
* would hold locks across all 100k rows and explode WAL. Inserts are chunked
* under Postgres' 65535-parameter limit.
*
* `content` is stored as a JSON-text fragment (text column — parts can contain
* ` `, which jsonb rejects). Idempotent: copy uses ON CONFLICT DO NOTHING and
* skips already-mirrored messages; reconcile repairs only count mismatches. The
* cursor is logged each page; pass `--after-id <id>` to resume.
*/

import { closeDatabase, getDb } from "../../database";

export interface BackfillMessagePartsOptions {
dryRun: boolean;
batch: number;
limit?: number;
afterId?: string;
/** Verify the mirror matches `parts` and repair drift, instead of copying. */
reconcile: boolean;
}

type PartRow = {
message_id: string;
idx: number;
type: string;
content: string;
};

// 4 columns per row; stay well under Postgres' 65535 bind-parameter ceiling.
const MAX_ROWS_PER_INSERT = 10_000;

function humanBytes(n: number): string {
if (n < 1024) return `${n} B`;
const units = ["KB", "MB", "GB", "TB"];
let v = n / 1024;
let i = 0;
while (v >= 1024 && i < units.length - 1) {
v /= 1024;
i++;
}
return `${v.toFixed(1)} ${units[i]}`;
}

function fmtDuration(ms: number): string {
if (!Number.isFinite(ms) || ms <= 0) return "0s";
const s = Math.round(ms / 1000);
const h = Math.floor(s / 3600);
const m = Math.floor((s % 3600) / 60);
const sec = s % 60;
return [h ? `${h}h` : "", m ? `${m}m` : "", `${sec}s`]
.filter(Boolean)
.join("");
}

/** Build the normalized rows for a message's parts blob (NUL-safe via JS). */
function partRowsFor(messageId: string, rawParts: unknown): PartRow[] {
const parsed =
typeof rawParts === "string"
? (JSON.parse(rawParts) as unknown[])
: (rawParts as unknown[]);
if (!Array.isArray(parsed)) return [];
return parsed.map((part, idx) => ({
message_id: messageId,
idx,
type: String((part as { type?: unknown })?.type ?? "unknown"),
content: JSON.stringify(part),
}));
}

function* chunk<T>(arr: T[], size: number): Generator<T[]> {
for (let i = 0; i < arr.length; i += size) yield arr.slice(i, i + size);
}

export async function backfillMessagePartsCommand(
opts: BackfillMessagePartsOptions,
): Promise<number> {
const { dryRun, batch, reconcile } = opts;
const database = getDb();
const { db } = database;
const mode = reconcile ? "reconcile" : "copy";
const verb = dryRun
? reconcile
? "would-repair"
: "would-copy"
: reconcile
? "repaired"
: "copied";

// Exact count is cheap: count(*) reads tuple visibility, not the TOASTed blob.
const totalRow = await db
.selectFrom("thread_messages")
.select((eb) => eb.fn.countAll().as("c"))
.executeTakeFirst();
const total = Number(totalRow?.c ?? 0);

console.log(
`[backfill-message-parts] starting (${mode})${dryRun ? " dry-run" : ""} ` +
`total=${total} batch=${batch}${opts.limit ? ` limit=${opts.limit}` : ""}` +
`${opts.afterId ? ` resume-after=${opts.afterId}` : ""}`,
);

const startedAt = Date.now();
let scanned = 0;
let changed = 0; // copied or repaired messages
let skipped = 0; // already-mirrored (copy) / already-matching (reconcile)
let partsWritten = 0;
let bytesRead = 0;
let errors = 0;
let maxBytes = 0;
let maxBytesId = "";
let cursor = opts.afterId ?? "";

const logProgress = (tag: string): void => {
const elapsed = Date.now() - startedAt;
const rate = elapsed > 0 ? scanned / (elapsed / 1000) : 0;
const pct = total > 0 ? ((scanned / total) * 100).toFixed(1) : "?";
const eta = rate > 0 ? ((total - scanned) / rate) * 1000 : 0;
console.log(
`[backfill-message-parts] ${tag} ${scanned}/${total} (${pct}%) ` +
`${verb}=${changed} skipped=${skipped} parts=${partsWritten} ` +
`read=${humanBytes(bytesRead)} errors=${errors} ` +
`rate=${rate.toFixed(1)}/s elapsed=${fmtDuration(elapsed)} ` +
`eta=${fmtDuration(eta)} cursor=${cursor}`,
);
};

/** Copy: chunked insert, never clobbering live rows. Own transaction. */
const copyMessage = async (rows: PartRow[]): Promise<void> => {
await db.transaction().execute(async (trx) => {
for (const part of chunk(rows, MAX_ROWS_PER_INSERT)) {
await trx
.insertInto("message_parts")
.values(part)
.onConflict((oc) => oc.columns(["message_id", "idx"]).doNothing())
.execute();
}
});
};

/** Reconcile: atomically replace a message's mirror. Own transaction. */
const replaceMessage = async (
messageId: string,
rows: PartRow[],
): Promise<void> => {
await db.transaction().execute(async (trx) => {
await trx
.deleteFrom("message_parts")
.where("message_id", "=", messageId)
.execute();
for (const part of chunk(rows, MAX_ROWS_PER_INSERT)) {
await trx.insertInto("message_parts").values(part).execute();
}
});
};

try {
for (;;) {
// `!= null` (not truthiness): `--limit 0` must mean "scan nothing", not
// fall through to an unbounded run.
const remaining = opts.limit != null ? opts.limit - scanned : Infinity;
if (remaining <= 0) break;

// Page over PKs only — tiny, no blob transfer.
const idRows = await db
.selectFrom("thread_messages")
.select("id")
.where("id", ">", cursor)
.orderBy("id", "asc")
.limit(Math.min(batch, remaining))
.execute();
if (idRows.length === 0) break;
const ids = idRows.map((r) => r.id);

// One grouped query gives the current mirror state for the whole page.
const mirror = new Map<string, number>();
const counts = await db
.selectFrom("message_parts")
.select(["message_id", (eb) => eb.fn.countAll().as("cnt")])
.where("message_id", "in", ids)
.groupBy("message_id")
.execute();
for (const c of counts) mirror.set(c.message_id, Number(c.cnt));

for (const id of ids) {
cursor = id;
scanned++;
try {
const have = mirror.get(id) ?? 0;

// Copy: a message with any mirror rows is already done (single
// atomic insert → 0 or all). Skip without touching its blob.
if (!reconcile && have > 0) {
skipped++;
continue;
}

// Fetch just this one message's parts (bounds memory to one row).
const row = await db
.selectFrom("thread_messages")
.select("parts")
.where("id", "=", id)
.executeTakeFirst();
if (!row) {
skipped++;
continue;
}

const rawParts: unknown = row.parts;
const partsText =
typeof rawParts === "string" ? rawParts : JSON.stringify(rawParts);
const b = Buffer.byteLength(partsText, "utf8");
bytesRead += b;
if (b > maxBytes) {
maxBytes = b;
maxBytesId = id;
console.log(
`[backfill-message-parts] new largest message ${id}: ${humanBytes(b)}`,
);
}

const partRows = partRowsFor(id, rawParts);

if (reconcile) {
if (have === partRows.length) {
skipped++;
continue;
}
if (!dryRun) await replaceMessage(id, partRows);
} else {
if (partRows.length === 0) {
skipped++;
continue;
}
if (!dryRun) await copyMessage(partRows);
}
changed++;
partsWritten += partRows.length;
} catch (err) {
errors++;
console.error(
`[backfill-message-parts] message ${id} failed, skipping:`,
err,
);
}
}

logProgress("progress");
}
} finally {
await closeDatabase(database).catch(() => {});
}

logProgress("done");
console.log(
`[backfill-message-parts] summary (${mode}) — scanned=${scanned} ` +
`${verb}=${changed} skipped=${skipped} parts=${partsWritten} ` +
`read=${humanBytes(bytesRead)} errors=${errors} ` +
`largest=${maxBytesId ? `${maxBytesId} (${humanBytes(maxBytes)})` : "n/a"}`,
);
return errors > 0 ? 1 : 0;
}
Loading
Loading