Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions apps/mesh/migrations/098-message-parts.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/**
* Message Parts Migration (expand phase)
*
* Moves `thread_messages.parts` (a JSON array blob, frequently 50 MB+ on busy
* threads because tool-result parts inline large payloads) into a normalized
* `message_parts` table — one row per part.
*
* This is the first, additive step of an expand/contract migration:
* 098 (this) create message_parts — additive, no lock on thread_messages
* + deploy dual-write saveMessages → both columns
* + backfill `deco backfill-message-parts` — copy existing parts, batched/resumable
* + deploy cut reads over to message_parts
* + deploy stop writing thread_messages.parts
* later ALTER TABLE thread_messages DROP COLUMN parts (then pg_repack to reclaim disk)
*
* `content` holds each part as a JSON-text fragment (NOT jsonb): real `parts`
* blobs contain `\0` (e.g. binary tool outputs like ZIP headers), which
* Postgres `jsonb`/`json` reject (22P05) but `text` stores fine — same as the
* `thread_messages.parts` column does today. `type` is denormalized from
* `part.type` for filtering. Reconstruct a message's array by concatenating the
* fragments (the app then JSON.parses it, exactly like the old `parts` blob):
* SELECT coalesce('[' || string_agg(content, ',' ORDER BY idx) || ']', '[]')
* FROM message_parts WHERE message_id = $1
*
* `created_at`/`updated_at` are per-part (text, like `thread_messages`): a part
* mutates in place across streaming saves (a tool-call goes input-streaming →
* input-available → output-available at the same idx), so the dual-write
* preserves `created_at` on conflict and bumps `updated_at` only when `content`
* actually changes. Backfilled rows inherit the message's `created_at`, since
* no true per-part time exists for them.
*/

import { type Kysely, sql } from "kysely";

export async function up(db: Kysely<unknown>): Promise<void> {
await db.schema
.createTable("message_parts")
// CASCADE DELETE: parts vanish with their message (which itself cascades
// from threads), so thread deletion still fully cleans up.
.addColumn("message_id", "text", (col) =>
col.notNull().references("thread_messages.id").onDelete("cascade"),
)
.addColumn("idx", "integer", (col) => col.notNull())
.addColumn("type", "text", (col) => col.notNull())
// text, not jsonb: parts can contain binary tool outputs, which
// jsonb rejects. Stores the JSON-text fragment verbatim, like `parts`.
.addColumn("content", "text", (col) => col.notNull())
.addColumn("created_at", "text", (col) =>
col.notNull().defaultTo(sql`CURRENT_TIMESTAMP`),
)
.addColumn("updated_at", "text", (col) =>
col.notNull().defaultTo(sql`CURRENT_TIMESTAMP`),
)
.addPrimaryKeyConstraint("message_parts_pkey", ["message_id", "idx"])
.execute();

// The PK (message_id, idx) already serves the canonical read
// (WHERE message_id = $1 ORDER BY idx), so no extra index is needed.
}

export async function down(db: Kysely<unknown>): Promise<void> {
await db.schema.dropTable("message_parts").execute();
}
2 changes: 2 additions & 0 deletions apps/mesh/migrations/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ import * as migration094orgfileconfigs from "./094-org-file-configs.ts";
import * as migration095removeautomationtoolcallkind from "./095-remove-automation-tool-call-kind.ts";
import * as migration096orgfileconfigspublicurlbase from "./096-org-file-configs-public-url-base.ts";
import * as migration097droplocaldockersandboxstate from "./097-drop-local-docker-sandbox-state.ts";
import * as migration098messageparts from "./098-message-parts.ts";

/**
* Core migrations for the Mesh application.
Expand Down Expand Up @@ -214,6 +215,7 @@ const migrations: Record<string, Migration> = {
migration096orgfileconfigspublicurlbase,
"097-drop-local-docker-sandbox-state":
migration097droplocaldockersandboxstate,
"098-message-parts": migration098messageparts,
};

export default migrations;
20 changes: 20 additions & 0 deletions apps/mesh/src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ const { values, positionals } = parseArgs({
batch: { type: "string" },
limit: { type: "string" },
org: { type: "string" },
"after-id": { type: "string" },
reconcile: { type: "boolean", default: false },
},
allowPositionals: true,
});
Expand All @@ -87,6 +89,7 @@ Usage:
deco auth <login|whoami|logout> Manage CLI authentication
deco link [studio-url] [options] Start the desktop-side link daemon
deco backfill-assets Hoist legacy inline media out of threads + connections
deco backfill-message-parts Copy thread_messages.parts into message_parts (--reconcile to verify/repair)
deco completion [shell] Install shell completions

Server Options:
Expand Down Expand Up @@ -227,6 +230,23 @@ if (command === "backfill-assets") {
process.exit(code);
}

// ── Backfill: copy thread_messages.parts into message_parts ─────────────
if (command === "backfill-message-parts") {
const { backfillMessagePartsCommand } = await import(
"./cli/commands/backfill-message-parts"
);
const code = await backfillMessagePartsCommand({
dryRun: values["dry-run"] === true,
// --batch is the id-page size; blobs are fetched one message at a time, so
// memory is bounded regardless and 200 ids/page is fine for both modes.
batch: values.batch ? Number(values.batch) : 200,
limit: values.limit ? Number(values.limit) : undefined,
afterId: values["after-id"] as string | undefined,
reconcile: values.reconcile === true,
});
process.exit(code);
}

// ── Auth / Link helpers ────────────────────────────────────────────────
function resolveDataDir(): string {
return (
Expand Down
Loading
Loading