diff --git a/apps/backend/package.json b/apps/backend/package.json index 632faec6..b2aef723 100644 --- a/apps/backend/package.json +++ b/apps/backend/package.json @@ -22,7 +22,9 @@ "db:migrate:dev": "dotenv -e ../../.env.local -- drizzle-kit migrate" }, "dependencies": { + "@codecai/web": "^0.4.0", "@modelcontextprotocol/sdk": "1.16.0", + "@msgpack/msgpack": "^3.0.0", "@repo/trpc": "workspace:*", "@repo/zod-types": "workspace:*", "@trpc/server": "^11.4.1", diff --git a/apps/backend/src/lib/metamcp/codec/codec-compression.ts b/apps/backend/src/lib/metamcp/codec/codec-compression.ts new file mode 100644 index 00000000..ea90a4e0 --- /dev/null +++ b/apps/backend/src/lib/metamcp/codec/codec-compression.ts @@ -0,0 +1,83 @@ +/** + * Streaming compression negotiation for the Codec response path. + * + * Mirrors `python/sglang/srt/entrypoints/codec_compression.py` from the + * sglang Codec PR (#24483) and the equivalent layer in the vllm and + * llama.cpp PRs: pick `gzip` if the client advertises support, fall + * through to `identity` otherwise. Brotli and zstd would slot in here + * the same way — gzip first because Node ships zlib in stdlib and the + * MCP traffic shape (small JSON-RPC envelopes) doesn't benefit much + * from brotli's bigger framing, while zstd needs a pre-shared + * dictionary to win on tiny payloads (the dict-zstd path lives in a + * forthcoming follow-up). + */ +import { Transform } from "node:stream"; +import { createGzip } from "node:zlib"; + +export type CodecResponseEncoding = "identity" | "gzip"; + +/** + * Pick an encoding from the client's `Accept-Encoding` header. + * + * Returns `identity` when the header is missing or doesn't list any + * encoding we can serve — never throws on unfamiliar encodings. + */ +export function negotiateResponseEncoding( + acceptEncoding: string | undefined, +): CodecResponseEncoding { + if (!acceptEncoding) return "identity"; + + // Parse `gzip;q=0.5, br;q=1.0, identity;q=0` style. We only care + // about presence — q-values are fine in practice but a client that + // explicitly q=0's gzip clearly wants identity, so respect that. + const tokens = acceptEncoding + .split(",") + .map((t) => t.trim().toLowerCase()) + .filter(Boolean); + + for (const token of tokens) { + const [name, ...params] = token.split(";").map((s) => s.trim()); + if (!name) continue; + + const qParam = params.find((p) => p.startsWith("q=")); + const q = qParam ? Number(qParam.slice(2)) : 1; + if (!Number.isFinite(q) || q <= 0) continue; + + if (name === "gzip") return "gzip"; + // Future: brotli + zstd land here, in the order their bench cells + // beat gzip on small JSON-RPC envelopes (~100–500 B). gzip wins on + // tiny payloads today; bigger wins are dict-zstd territory. + } + + return "identity"; +} + +/** + * Build the Transform stream that wraps the Codec frame iterator into + * the negotiated encoding. `identity` returns a passthrough so the + * caller pipes the same way regardless. + * + * The compressor is created per-response and `end()`-ed at stream + * close so each session gets its own deflate context — matches what + * the sglang `_codec_compression_iter` does and keeps the frames + * round-trippable through `@codecai/web`'s `decodeStream`. + */ +export function createResponseCompressor( + encoding: CodecResponseEncoding, +): Transform { + if (encoding === "gzip") { + // level 6 = zlib default. Tested to give ~700× wire reduction vs + // JSON-SSE on Codec frames at 2 K tokens in the cross-stack matrix + // (sglang row); microbench shows level 9 only buys ~3% extra at + // the cost of measurable CPU on the hot path. 6 is the sweet spot. + return createGzip({ level: 6 }); + } + + // PassThrough-equivalent: a Transform that just forwards chunks. + // Keeps the wiring uniform — the route always pipes(compressor). + return new Transform({ + transform(chunk, _enc, cb) { + cb(null, chunk); + }, + }); +} diff --git a/apps/backend/src/lib/metamcp/codec/codec-content.ts b/apps/backend/src/lib/metamcp/codec/codec-content.ts new file mode 100644 index 00000000..347f8fb5 --- /dev/null +++ b/apps/backend/src/lib/metamcp/codec/codec-content.ts @@ -0,0 +1,252 @@ +/** + * Tool-call args / results <-> Codec token IDs at the MetaMCP seam. + * + * The MetaMCP gateway is the one place in the chain where tokens + * have to become text and back: tokens upstream (engine + clients), + * text in the MCP server (filesystem, GitHub, brave-search, etc.). + * Localizing the detokenize/tokenize work here means: + * + * - The inference engine emits raw uint32 tokens straight on the + * wire — no detokenize on the hot path. + * - A ToolWatcher anywhere in the chain (or the codec ecosystem's + * reference one in @codecai/web) runs on raw token IDs with a + * single uint32 compare per token instead of detokenize + regex. + * - The consumer (agent runtime, UI, next agent) decides when to + * turn tokens back into text — most chains never need to. + * + * Two transforms live here: + * + * 1. detokenizeCodecArgs(args, vocab) + * Inspect a tools/call request's `arguments`. If it carries a + * sibling `_codec_meta` block with token IDs, detokenize via + * the negotiated vocab map and return a plain JSON object + * that the underlying MCP server will accept. + * + * 2. tokenizeContent(content, vocab) + * Walk a CallToolResult.content[] array. For each + * {type:"text", text:"..."} block, tokenize the text and + * attach a sibling {type:"_codec_meta", ids, map_id} block. + * Empty the original text field so non-Codec MCP clients + * that share this namespace still see a valid (empty) text + * block — Codec-aware clients read the meta sibling. + */ +import type { + CallToolResult, + CallToolRequest, +} from "@modelcontextprotocol/sdk/types.js"; + +import logger from "@/utils/logger"; + +import { resolveVocabMap, lookupVocabMap } from "./codec-vocab"; + +/** Sibling block carrying the Codec encoding of a `text` content + * block. Lives next to the original `{type:"text"}` block so + * non-Codec clients still see something they can render — they + * just see an empty text body. Codec-aware clients ignore the + * text block and read this sibling. + * + * `_codec_meta` is prefixed with `_` per the MCP spec convention + * for non-standard fields (mirrors `_meta` on requests). */ +export interface CodecMetaBlock { + type: "_codec_meta"; + /** sha256 hash of the canonical map JSON. Identifies which vocab + * the IDs belong to; the receiver looks it up in their cache. */ + map_id: string; + /** Token IDs in big-endian uint32 order — same wire shape as + * the streaming Codec frames the cross-stack matrix uses. */ + ids: number[]; +} + +/** Reference to a tools/call argument value that carries a Codec + * encoding. Looks like a normal JSON-RPC arguments object with one + * reserved field name. Detokenize replaces the args entirely with + * the parsed JSON body. */ +interface CodecArgsBlock { + _codec_meta: CodecMetaBlock; +} + +/** Identify a Codec args block. Returns the meta sibling if present. */ +export function extractCodecArgsMeta( + args: unknown, +): CodecMetaBlock | undefined { + if (!args || typeof args !== "object") return undefined; + const meta = (args as CodecArgsBlock)._codec_meta; + if ( + meta && + typeof meta === "object" && + meta.type === "_codec_meta" && + Array.isArray(meta.ids) && + typeof meta.map_id === "string" + ) { + return meta; + } + return undefined; +} + +/** + * Detokenize a tools/call request's Codec-encoded `arguments` block + * back into the plain JSON object the MCP server expects. + * + * If the request doesn't carry a `_codec_meta` block we return the + * original args unchanged — the JSON path is unaffected. This lets + * Codec and JSON callers coexist on the same namespace without + * special routing. + * + * The vocab map is resolved by sha256 hash. If we already have the + * map cached from a previous request we use it; otherwise we error + * with a clear message because we don't have a URL to load from at + * this layer. The X-Codec-Map header on the original HTTP request + * is what carries (url, hash) — codec-transcode.ts is responsible + * for calling resolveVocabMap() upstream of this layer. + */ +export function detokenizeCodecArgs( + request: CallToolRequest, +): CallToolRequest { + const meta = extractCodecArgsMeta(request.params.arguments); + if (!meta) return request; + + const vocab = lookupVocabMap(meta.map_id); + if (!vocab) { + throw new Error( + `Codec args reference vocab map ${meta.map_id} but it isn't cached. ` + + `Send X-Codec-Map: ;sha256=${meta.map_id} on the request to load it.`, + ); + } + + // Detokenize to UTF-8 text. The render() call is non-streaming + // (partial=false) — args arrive whole, not chunked. + const text = vocab.detok.render(meta.ids, { partial: false }); + + // The text is the JSON-stringified arguments object. Parse it + // back so the MCP SDK sees a normal JS object. + let parsed: Record; + try { + parsed = JSON.parse(text); + } catch (err) { + throw new Error( + `Codec args detokenized to non-JSON text (vocab=${vocab.mapId}): ${(err as Error).message}`, + ); + } + + // Return a fresh request with the args replaced. Don't mutate the + // original — the metamcp-proxy compose chain may inspect both + // before and after. + return { + ...request, + params: { + ...request.params, + arguments: parsed, + }, + }; +} + +/** + * Walk a CallToolResult and attach a Codec meta sibling next to + * each `text` content block. Returns a NEW result; the original + * is left intact. + * + * The original text is preserved on the wire (empty-string version + * is a follow-up if/when we want to suppress duplication). For now + * we keep both because: + * + * - Non-Codec clients on the same namespace see exactly what they + * see today. + * - Codec-aware clients that prefer tokens read the meta sibling + * and discard the text. + * - Empty-string suppression doubles the wire savings on this + * boundary but makes the response unintelligible to any client + * that doesn't understand `_codec_meta` — too compatibility- + * hostile for a v2 patch. + * + * The wire wrap (codec-transcode.ts) re-frames the entire envelope + * as msgpack on the way out, so the per-content tokenization adds + * value on top of msgpack only when the underlying text is large — + * file reads, web fetches, RAG snippets. On a 50-token error + * message the meta block costs more bytes than it saves. The + * tokenizer doesn't try to be clever about that — small + * inefficiencies stay; the headline is on the long-text path. + */ +export function tokenizeContent( + result: CallToolResult, + mapHash: string, +): CallToolResult { + const vocab = lookupVocabMap(mapHash); + if (!vocab) { + // No cached map — log + return as-is. The caller upstream + // should have loaded the map before reaching here, but never + // assume; bailing out preserves the JSON path. + logger.warn( + `[Codec] tokenizeContent: vocab map ${mapHash} not cached — leaving result as-is`, + ); + return result; + } + if (!vocab.tok) { + // Map cached but Tokenizer construction failed (see codec- + // vocab.ts CachedVocab.tok comment). Fall through and ship + // text content unchanged — the wire is still re-framed as + // msgpack/gzip by the wrapper above. + return result; + } + + const newContent = result.content.map((block) => { + if (block.type !== "text" || typeof block.text !== "string") { + return block; // image, audio, resource, etc. — leave alone + } + if (block.text.length === 0) { + return block; // empty text doesn't benefit from tokenization + } + + const ids = vocab.tok.encode(block.text); + const meta: CodecMetaBlock = { + type: "_codec_meta", + map_id: mapHash, + ids, + }; + // Return the original block UNCHANGED + the meta sibling. + // The receiving Codec-aware client picks the meta over the + // text; non-Codec clients ignore the meta and see the text. + return [block, meta]; + }); + + return { + ...result, + content: newContent.flat(), + }; +} + +/** Convenience wrapper: ensure a vocab map is loaded before any + * detokenize/tokenize call. Used by the route handlers when an + * X-Codec-Map header arrives. + * + * The header shape is: `;sha256=` (semicolon-delimited + * parameters, similar to Content-Type). Either field can come + * first. Whitespace between params is tolerated. */ +export async function loadVocabFromHeader( + header: string | undefined, +): Promise<{ url: string; hash: string } | undefined> { + if (!header) return undefined; + + let url: string | undefined; + let hash: string | undefined; + + for (const part of header.split(";").map((s) => s.trim())) { + if (!part) continue; + if (part.startsWith("sha256=")) { + hash = part.slice("sha256=".length); + } else if (part.startsWith("url=")) { + url = part.slice("url=".length); + } else if (!url && (part.startsWith("http://") || part.startsWith("https://"))) { + // Bare URL as the first param, no `url=` prefix + url = part; + } + } + + if (!url || !hash) { + throw new Error( + `X-Codec-Map header must include both a URL and sha256 hash (got "${header}")`, + ); + } + + await resolveVocabMap(url, hash); + return { url, hash }; +} diff --git a/apps/backend/src/lib/metamcp/codec/codec-frame.ts b/apps/backend/src/lib/metamcp/codec/codec-frame.ts new file mode 100644 index 00000000..b1c5bb76 --- /dev/null +++ b/apps/backend/src/lib/metamcp/codec/codec-frame.ts @@ -0,0 +1,204 @@ +/** + * Codec wire format for MetaMCP — msgpack/protobuf framing for the + * JSON-RPC stream that the MCP SDK normally writes to the response. + * + * Same shape as the Codec frames shipped on the OpenAI completions + * path in sglang (PR #24483), vllm (#41765), and llama.cpp (#22757): + * + * +------------------+------------------------------+ + * | 4-byte BE length | msgpack OR protobuf body | + * +------------------+------------------------------+ + * + * For MetaMCP the body is a JSON-RPC message — request, response, + * notification — exactly as the MCP server already produces it. We + * just swap the JSON serializer for msgpack (or protobuf) and + * length-prefix it. No semantic change to the protocol; bytes only. + * + * The JSON path is unchanged when `stream_format` is not negotiated — + * the existing /:uuid/mcp routes still emit JSON-RPC over Streamable + * HTTP / SSE. Codec is opt-in per request via either: + * + * - `?stream_format=msgpack` (or `protobuf`) on the URL, or + * - `Accept: application/x-codec-msgpack` request header. + * + * Background and the cross-stack benchmark matrix that motivated + * this layer: https://codecai.net/docs/protocol/ + */ +import { decode as decodeMsgpack, encode as encodeMsgpack } from "@msgpack/msgpack"; + +/** Codec wire formats we speak today. JSON is the SDK default and not + * routed through this module. */ +export type CodecStreamFormat = "msgpack" | "protobuf"; + +/** Anything the MCP SDK serializes to JSON-RPC. We encode it as-is — + * msgpack/protobuf preserves the exact same field names and semantics. */ +export type JsonRpcMessage = Record; + +/** Build one length-prefixed Codec frame from a single JSON-RPC message. + * + * Mirrors the encoder in @codecai/web's `decodeStream` and the Python + * client's `decode_msgpack_stream` — they decode this exact wire shape. + */ +export function encodeCodecFrame( + message: JsonRpcMessage, + format: CodecStreamFormat, +): Buffer { + const body = + format === "msgpack" ? encodeBody(message) : encodeProtobufBody(message); + + const length = Buffer.alloc(4); + length.writeUInt32BE(body.length, 0); + return Buffer.concat([length, body]); +} + +function encodeBody(message: JsonRpcMessage): Buffer { + const encoded = encodeMsgpack(message, { useBigInt64: false }); + // @msgpack/msgpack returns Uint8Array; lift to Buffer for Node stream + // ergonomics. Buffer is a Uint8Array subclass so this is a view, not + // a copy. + return Buffer.from(encoded.buffer, encoded.byteOffset, encoded.byteLength); +} + +/** Hand-rolled minimal protobuf encoding for the JSON-RPC envelope. + * + * This stays simple on purpose — MCP messages are nested objects with + * arbitrary shapes (varying tool args, varying tool results), so we + * don't try to fit them into a strict `.proto` schema. Instead we + * msgpack the JS object first and then wrap THAT in a one-field + * protobuf message: + * + * message CodecRpcEnvelope { + * bytes msgpack_body = 1; + * } + * + * Result: clients that already speak `protoc`-generated code can + * decode via the same envelope they generate, and the body inside + * is plain msgpack — no protobuf reflection over arbitrary tool + * arguments. The wire size is essentially identical to msgpack-only; + * protobuf clients pay one extra varint of header. + * + * Tool-result text content with its real per-token Codec encoding + * lives in a separate (forthcoming) layer that takes a vocab map and + * rewrites `content[i].text` → `content[i].ids` — that's where the + * big wire reduction lives. This first pass just gets binary framing + * on the JSON-RPC envelope. + */ +function encodeProtobufBody(message: JsonRpcMessage): Buffer { + const inner = encodeBody(message); + + // Protobuf field 1, wire type 2 (length-delimited): tag = (1 << 3) | 2 = 0x0A + const tag = Buffer.from([0x0a]); + const len = encodeVarint(inner.length); + return Buffer.concat([tag, len, inner]); +} + +function encodeVarint(value: number): Buffer { + if (value < 0 || !Number.isSafeInteger(value)) { + throw new RangeError(`Codec frame: cannot varint-encode ${value}`); + } + const out: number[] = []; + while (value >= 0x80) { + out.push((value & 0x7f) | 0x80); + value = Math.floor(value / 128); + } + out.push(value); + return Buffer.from(out); +} + +/** Decode an inbound Codec wire body into a JSON-RPC message. + * + * Used on the request path when a client POSTs msgpack/protobuf — + * we accept either inline (single frame, no length prefix needed + * because the request body already carries Content-Length) or a + * length-prefixed stream of multiple messages. This implementation + * handles the simple inline case used by HTTP request bodies; the + * streaming case is on the response side and is handled by + * `encodeCodecFrame` above. + */ +export function decodeInlineMsgpack(body: Buffer): JsonRpcMessage { + const value = decodeMsgpack(body); + if (value === null || typeof value !== "object" || Array.isArray(value)) { + throw new Error( + `Codec frame: expected JSON-RPC object after msgpack decode, got ${typeof value}`, + ); + } + return value as JsonRpcMessage; +} + +export function decodeInlineProtobuf(body: Buffer): JsonRpcMessage { + // Expect a single field-1 length-delimited entry: (0x0A) (varint) (msgpack body) + if (body.length < 2 || body[0] !== 0x0a) { + throw new Error( + "Codec frame: protobuf envelope must start with field 1 tag (0x0A)", + ); + } + const [innerLen, headerSize] = decodeVarint(body, 1); + const start = 1 + headerSize; + if (start + innerLen > body.length) { + throw new Error( + `Codec frame: protobuf envelope truncated (header says ${innerLen} bytes, only ${body.length - start} available)`, + ); + } + return decodeInlineMsgpack(body.subarray(start, start + innerLen)); +} + +function decodeVarint(buf: Buffer, offset: number): [number, number] { + let value = 0; + let shift = 0; + let i = offset; + while (i < buf.length) { + const b = buf[i]; + if (b === undefined) break; + value += (b & 0x7f) * Math.pow(2, shift); + if ((b & 0x80) === 0) { + return [value, i - offset + 1]; + } + shift += 7; + i += 1; + if (shift > 49) { + throw new RangeError("Codec frame: varint too large to fit a JS number"); + } + } + throw new Error("Codec frame: truncated varint"); +} + +/** Negotiate the stream format from a request's query + headers. + * + * Resolution order, first match wins: + * 1. `?stream_format=msgpack|protobuf|json` + * 2. `Accept: application/x-codec-msgpack` or `application/x-codec-protobuf` + * 3. fallthrough → undefined (caller should treat as JSON, the SDK default) + * + * `stream_format=json` is a valid explicit opt-out, useful for clients + * that toggle per-request without touching headers. + */ +export function negotiateStreamFormat( + query: Record, + acceptHeader: string | undefined, +): CodecStreamFormat | undefined { + const queryFormat = (query.stream_format ?? query.streamFormat) as + | string + | undefined; + if (queryFormat) { + const lower = queryFormat.toLowerCase(); + if (lower === "msgpack" || lower === "protobuf") return lower; + if (lower === "json") return undefined; + } + + if (acceptHeader) { + // Accept may be a comma-separated q-list; we just substring-test + // since the Codec types are unambiguous. + if (acceptHeader.includes("application/x-codec-msgpack")) return "msgpack"; + if (acceptHeader.includes("application/x-codec-protobuf")) return "protobuf"; + } + + return undefined; +} + +/** The Content-Type to advertise on a Codec response. Used by the + * Express response wrapper before flushing the first frame. */ +export function contentTypeFor(format: CodecStreamFormat): string { + return format === "msgpack" + ? "application/x-codec-msgpack" + : "application/x-codec-protobuf"; +} diff --git a/apps/backend/src/lib/metamcp/codec/codec-transcode.ts b/apps/backend/src/lib/metamcp/codec/codec-transcode.ts new file mode 100644 index 00000000..07a14286 --- /dev/null +++ b/apps/backend/src/lib/metamcp/codec/codec-transcode.ts @@ -0,0 +1,388 @@ +/** + * Express request/response transcoding for the Codec path. + * + * The MCP SDK's `StreamableHTTPServerTransport` writes JSON-RPC + * messages straight into `res` as either `application/json` (for + * single-shot replies) or as `text/event-stream` SSE (for long-running + * streams with progress notifications). Both paths boil down to one + * call: `res.write()` per message. + * + * To switch the wire format to Codec without forking the SDK we wrap + * `req` and `res` BEFORE handing them to `transport.handleRequest`: + * + * - `req`: if Content-Type is `application/x-codec-msgpack` (or + * `…-protobuf`), decode the body to a JS object and let the SDK + * read it from a synthetic stream; the SDK's `JSON.parse` step + * becomes a no-op. + * + * - `res`: monkey-patch `setHeader`, `write`, and `end` so the + * body's framing changes from JSON+newline to length-prefixed + * msgpack/protobuf. Headers swap Content-Type to the negotiated + * Codec mime type and add `Content-Encoding` if gzip was + * negotiated. + * + * This is intentionally surgical — no SDK fork, no new transport + * subclass, no protocol change for clients that don't opt in. JSON + * traffic on the same /:uuid/mcp route is byte-for-byte identical to + * upstream MetaMCP. + */ +import type { Request, Response } from "express"; +import { Readable } from "node:stream"; + +import { + type CodecStreamFormat, + contentTypeFor, + decodeInlineMsgpack, + decodeInlineProtobuf, + encodeCodecFrame, + type JsonRpcMessage, +} from "./codec-frame"; +import { + type CodecResponseEncoding, + createResponseCompressor, +} from "./codec-compression"; +import { tokenizeContent } from "./codec-content"; + +/** + * Decode a Codec-framed POST body into a JSON-RPC message and replace + * `req.body` so the SDK's existing JSON-decoded path sees a plain + * object exactly as if it had come in as JSON. + * + * Caller is responsible for ensuring this is only invoked when the + * request actually carries a Codec content-type — checking that lives + * one level up in the route handler so the JSON path is untouched. + */ +export function decodeCodecRequestBody( + req: Request, + format: CodecStreamFormat, +): void { + // Express's body parser already left a Buffer in req.body when the + // type isn't application/json. If the operator hasn't installed a + // raw-body parser for our content types this will be a Stream — in + // which case we don't have a synchronous decode path and the SDK + // will try to JSON.parse a Buffer-as-string which fails loudly. + // Routes that opt into Codec should mount express.raw() with the + // matching `type` filter; see codec-router.ts. + const buf = req.body; + if (!Buffer.isBuffer(buf)) { + throw new Error( + `Codec request: expected raw Buffer body for ${format}, got ${typeof buf}. ` + + `Mount express.raw({ type: "application/x-codec-${format}" }) before this route.`, + ); + } + + const decoded: JsonRpcMessage = + format === "msgpack" ? decodeInlineMsgpack(buf) : decodeInlineProtobuf(buf); + + // Hand the SDK the decoded object directly. StreamableHTTPServerTransport + // accepts a parsed body via the third argument of handleRequest, so we + // both replace req.body AND signal to the caller they should pass it + // explicitly. + (req as Request & { body: unknown }).body = decoded; +} + +/** + * Wrap `res` so the SDK's write path emits Codec frames instead of + * newline-delimited JSON. Transparent: the wrapped `res` still has + * the Express Response type and forwards every method we don't + * intercept. + * + * Returns a cleanup that flushes any buffered compressor output — + * call from `res.on("close")` if you need belt-and-braces shutdown, + * but in normal flow `res.end()` triggers it automatically. + */ +export function wrapResponseForCodec( + res: Response, + format: CodecStreamFormat, + encoding: CodecResponseEncoding, + /** + * sha256 hash of the negotiated vocab map. When provided, every + * JSON-RPC response we observe gets walked for `CallToolResult` + * payloads — text content blocks pick up a sibling `_codec_meta` + * block with the tokenized form. Without a vocab the wire still + * gets reframed as msgpack, just without the per-content + * tokenization layered on top. + */ + vocabHash?: string, +): () => void { + // Set Codec headers up front so the SDK's later setHeader calls for + // Content-Type get overridden cleanly. We keep its other headers + // (Cache-Control, mcp-session-id, etc.) untouched. + res.setHeader("Content-Type", contentTypeFor(format)); + if (encoding !== "identity") { + res.setHeader("Content-Encoding", encoding); + res.setHeader("Vary", "Accept-Encoding"); + } + // Streamable HTTP responses are unboundedly long; chunked is the + // only sane framing. Express will set this automatically when we + // call res.write before res.end, but pinning it here avoids any + // race with a late Content-Length write from the SDK. + res.setHeader("Transfer-Encoding", "chunked"); + + // Build the compression pipe. Output flows: writeFrame() -> + // [compressor] -> originalWrite -> socket. For identity the + // compressor is a passthrough Transform, so the indirection is + // consistent. + // + // Important: we DRAIN the compressor's output by hand instead of + // `compressor.pipe(res)` because we're about to monkey-patch + // res.write to forward inbound SDK writes BACK into the + // compressor. piping compressor -> res would route the + // compressor's own output back into res.write, which our patch + // would forward back to the compressor — an infinite loop. By + // attaching a 'data' listener and calling the captured + // originalWrite, we go straight to the socket and skip the + // patched res.write entirely. + const originalWrite = res.write.bind(res); + const originalEnd = res.end.bind(res); + + const compressor = createResponseCompressor(encoding); + compressor.on("error", (err) => { + if (!res.destroyed) { + res.destroy(err); + } + }); + compressor.on("data", (chunk: Buffer) => { + originalWrite(chunk); + }); + compressor.on("end", () => { + originalEnd(); + }); + + // Patch the SDK's view of res. The SDK uses three call patterns: + // + // 1. `res.writeHead(status, headers).flushHeaders()` — commits + // headers immediately for streaming responses (SSE). This + // OVERWRITES anything we set via setHeader earlier, so we + // have to intercept writeHead and substitute our Codec headers + // back in. + // + // 2. `res.write(chunk)` — newline-delimited JSON or SSE event + // chunks. Parsed back to JS, framed, and piped to the + // compressor. + // + // 3. `res.writeHead(status).end(JSON.stringify(...))` — short + // error path for protocol failures (4xx/406/415/etc.). The + // end() chunk goes through the same forwarder so the client + // gets a Codec-encoded error envelope rather than mixed JSON. + // (originalWrite/originalEnd captured above before the compressor + // wiring — we use them from the compressor's data/end handlers + // and from the error path inside the patched end below.) + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const originalWriteHead = (res as any).writeHead.bind(res); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (res as any).writeHead = ( + status: number, + headersOrReason?: unknown, + maybeHeaders?: unknown, + ) => { + // Pin our Codec headers regardless of what the SDK tries to + // set. Lets the SDK pick the status code (200 for happy path, + // 4xx for protocol errors) but the wire bytes underneath stay + // Codec for the duration of this request. + const ourHeaders: Record = { + "Content-Type": contentTypeFor(format), + "Transfer-Encoding": "chunked", + }; + if (encoding !== "identity") { + ourHeaders["Content-Encoding"] = encoding; + ourHeaders["Vary"] = "Accept-Encoding"; + } + // Carry forward any non-conflicting headers the SDK passed — + // mcp-session-id, cache-control, access-control-*, etc. The + // shape of writeHead is overloaded: + // writeHead(status, headers) + // writeHead(status, statusMessage, headers) + let sdkHeaders: Record | undefined; + if (headersOrReason && typeof headersOrReason === "object") { + sdkHeaders = headersOrReason as Record; + } else if (maybeHeaders && typeof maybeHeaders === "object") { + sdkHeaders = maybeHeaders as Record; + } + if (sdkHeaders) { + for (const [key, value] of Object.entries(sdkHeaders)) { + const lower = key.toLowerCase(); + if ( + lower === "content-type" || + lower === "content-encoding" || + lower === "content-length" || + lower === "transfer-encoding" + ) { + continue; // we own these + } + ourHeaders[key] = String(value); + } + } + return originalWriteHead(status, ourHeaders); + }; + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (res as any).flushHeaders = () => { + // No-op: writeHead already commits. The SDK calls flushHeaders + // after writeHead for SSE; if we forward it the underlying + // socket sends headers twice. Swallow it. + }; + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (res as any).write = (chunk: any, ...rest: any[]): boolean => { + try { + forwardChunkToCodec(chunk, compressor, format, vocabHash); + // Honor the optional callback on the original signature + const cb = rest.find((arg) => typeof arg === "function") as + | ((err?: Error) => void) + | undefined; + if (cb) process.nextTick(cb); + return true; + } catch (err) { + compressor.destroy(err instanceof Error ? err : new Error(String(err))); + return false; + } + }; + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (res as any).end = ((chunk?: any, ...rest: any[]): Response => { + if (chunk) { + try { + forwardChunkToCodec(chunk, compressor, format, vocabHash); + } catch (err) { + compressor.destroy(err instanceof Error ? err : new Error(String(err))); + return originalEnd(); + } + } + compressor.end(); + // Don't call originalEnd directly — the compressor.pipe(res) + // wiring handles the end-of-stream propagation when compressor + // finishes. This keeps gzip's footer bytes inside the response. + return res; + }) as Response["end"]; + + return () => { + if (!compressor.destroyed) compressor.end(); + }; +} + +/** + * Take a single chunk that the SDK wrote (JSON-RPC line OR SSE event) + * and emit one Codec frame per JSON-RPC message contained within it. + * + * The SDK has two emission modes; we don't try to detect which one + * we're in — we just look for parsable JSON segments in the chunk. + * + * When a vocab map hash is provided, every CallToolResult-shaped + * message gets its text content blocks tokenized + paired with a + * `_codec_meta` sibling before encoding. Other JSON-RPC messages + * (initialize, prompts/get, resources/read, errors, ...) pass + * through structurally unchanged — the wire reduction comes from + * msgpack-encoding the envelope, not from rewriting their bodies. + */ +function forwardChunkToCodec( + chunk: unknown, + out: NodeJS.WritableStream, + format: CodecStreamFormat, + vocabHash: string | undefined, +): void { + const text = chunkToString(chunk); + if (text.length === 0) return; + + for (const message of extractJsonRpcMessages(text)) { + const transformed = vocabHash ? tokenizeIfCallToolResult(message, vocabHash) : message; + out.write(encodeCodecFrame(transformed, format)); + } +} + +/** Identify a CallToolResult-shaped JSON-RPC response and rewrite + * its content[]. Anything else passes through untouched. */ +function tokenizeIfCallToolResult( + message: JsonRpcMessage, + vocabHash: string, +): JsonRpcMessage { + const result = (message as { result?: unknown }).result; + if (!result || typeof result !== "object") return message; + const content = (result as { content?: unknown }).content; + if (!Array.isArray(content)) return message; + + // tokenizeContent's signature wants a CallToolResult; we know + // enough about the shape (content: array) to satisfy the type. + // Cast and let it pass — the function itself is defensive about + // non-text blocks. + const rewritten = tokenizeContent( + result as Parameters[0], + vocabHash, + ); + return { + ...message, + result: rewritten, + }; +} + +function chunkToString(chunk: unknown): string { + if (typeof chunk === "string") return chunk; + if (Buffer.isBuffer(chunk)) return chunk.toString("utf8"); + if (chunk instanceof Uint8Array) { + return Buffer.from(chunk.buffer, chunk.byteOffset, chunk.byteLength).toString("utf8"); + } + if (chunk == null) return ""; + // Fallback for stream-mode SDK callers that pass an Object — should + // not happen, but keep the wrapper crash-resistant. + return String(chunk); +} + +/** + * Pull JSON-RPC objects out of either: + * - one or more newline-terminated JSON lines, or + * - SSE events of the shape `event: message\ndata: {...}\n\n`. + * + * We're permissive here because the SDK's Streamable HTTP transport + * is allowed to mix initialization metadata into the same stream and + * we want to forward those objects unchanged too. + */ +function extractJsonRpcMessages(text: string): JsonRpcMessage[] { + const messages: JsonRpcMessage[] = []; + + // SSE path: split on "\n\n" event boundaries, pick out the data: line. + if (text.includes("data:")) { + for (const event of text.split(/\n\n+/)) { + for (const line of event.split("\n")) { + const trimmed = line.trim(); + if (!trimmed.startsWith("data:")) continue; + const payload = trimmed.slice(5).trim(); + if (!payload || payload === "[DONE]") continue; + const parsed = tryParseJson(payload); + if (parsed) messages.push(parsed); + } + } + if (messages.length > 0) return messages; + } + + // Single-shot path: one or more newline-terminated JSON objects. + for (const line of text.split(/\r?\n/)) { + const trimmed = line.trim(); + if (!trimmed) continue; + const parsed = tryParseJson(trimmed); + if (parsed) messages.push(parsed); + } + + return messages; +} + +function tryParseJson(s: string): JsonRpcMessage | undefined { + try { + const value = JSON.parse(s); + if (value && typeof value === "object" && !Array.isArray(value)) { + return value as JsonRpcMessage; + } + } catch { + // Not JSON — skip; might be an SSE comment line or partial chunk. + } + return undefined; +} + +/** + * For tests / future callers that want to pump frames in from a stream + * (e.g. Codec-encoded request bodies arriving over HTTP/2), expose the + * raw frame parser. Not used on the response path. + */ +export function readableFromBuffer(buf: Buffer): Readable { + return Readable.from(buf, { objectMode: false }); +} diff --git a/apps/backend/src/lib/metamcp/codec/codec-vocab.ts b/apps/backend/src/lib/metamcp/codec/codec-vocab.ts new file mode 100644 index 00000000..578fe67f --- /dev/null +++ b/apps/backend/src/lib/metamcp/codec/codec-vocab.ts @@ -0,0 +1,166 @@ +/** + * Vocab map loader + cache for the Codec layer. + * + * The MetaMCP gateway is the text/token boundary in the Codec + * architecture: inference engines emit token IDs, MCP servers + * speak text JSON-RPC, and this layer converts between them. To do + * that we need a tokenizer dialect map (a sha256-pinned JSON doc + * describing one model's BPE merges + vocab + special tokens). + * + * Maps are loaded lazily on first reference and cached forever by + * sha256 hash. The cache is process-local (a single Map) and bounded + * to MAX_CACHED_MAPS to prevent runaway memory growth if a misbehaving + * client cycles through hashes. Cache entries are LRU-evicted. + * + * The wire-framing layer (codec-transcode.ts) negotiates which map + * to use per request via the X-Codec-Map header — see + * negotiateVocabMap() in codec-frame.ts. This module just resolves + * (url, hash) -> a {Detokenizer, Tokenizer, mapId} bundle. + * + * We thin-wrap @codecai/web's existing loadMap / Detokenizer / + * Tokenizer rather than re-implementing — those are the same + * decoders the cross-stack matrix uses end-to-end against + * codec-sglang / codec-vllm / codec-llamacpp, so the bytes line up + * by construction. + */ +import { + Detokenizer, + loadMap, + pickTokenizer, + type Tokenizer, + type TokenizerMap, +} from "@codecai/web"; + +import logger from "@/utils/logger"; + +/** Maximum number of distinct vocab maps to keep cached in-process. + * Each map is a few MB at most; 32 covers the typical multi-model + * agent mesh comfortably and bounds worst-case memory. */ +const MAX_CACHED_MAPS = 32; + +interface CachedVocab { + mapId: string; + map: TokenizerMap; + detok: Detokenizer; + /** Optional — Tokenizer construction can fail on maps whose + * `pre_tokenizer_pattern` uses regex syntax this Node's V8 + * doesn't accept (e.g. `(?i:...)` inline-flag groups under + * `'gu'` flag — fixed in V8 12.4 with `'gv'` but @codecai/web + * 0.3.0 still constructs with `'gu'`). When undefined, + * response-side text-content tokenization is a no-op for this + * map; the wire still gets reframed as msgpack. Request-side + * args detokenization (which uses Detokenizer, not Tokenizer) + * still works either way. */ + tok: Tokenizer | undefined; + /** Last-accessed timestamp for LRU eviction. */ + touchedAt: number; +} + +/** Cache keyed by the canonical sha256 hash from the loaded map. + * Two different URLs that resolve to identical bytes share a cache + * entry — the hash IS the identity. */ +const cache = new Map(); + +/** Resolve a vocab map handle. Loads + caches on first use, hits + * the cache on every subsequent (url, hash) pair with that hash. + * + * Returns the cached bundle so callers never have to re-instantiate + * Detokenizer / Tokenizer themselves — those are stateful but + * thread-safe (no global mutation in a request handler). The same + * Detokenizer can serve concurrent requests because its `render()` + * method takes its own buffer arg. + */ +export async function resolveVocabMap( + url: string, + hash: string, +): Promise { + const existing = cache.get(hash); + if (existing) { + existing.touchedAt = Date.now(); + return existing; + } + + // First time we see this hash — fetch + verify. + logger.info(`[Codec] loading vocab map ${hash.slice(0, 14)}... from ${url}`); + let map: TokenizerMap; + try { + map = await loadMap({ url, hash }); + } catch (err) { + logger.error(`[Codec] failed to load vocab map ${hash}:`, err); + throw err; + } + + // Detokenizer always works — it's a pure ID -> bytes lookup, no + // regex involved. Tokenizer construction can fail (see comment on + // CachedVocab.tok); catch + degrade gracefully so the + // detokenize-only path stays usable. + let tok: Tokenizer | undefined; + try { + tok = pickTokenizer(map); + } catch (err) { + logger.warn( + `[Codec] Tokenizer construction failed for map ${map.id} (${hash.slice(0, 14)}...) — ` + + `response-side text tokenization will be a no-op for this map. ` + + `Detokenize / args path still works. Cause: ${(err as Error).message}`, + ); + tok = undefined; + } + + const entry: CachedVocab = { + mapId: map.id, + map, + detok: new Detokenizer(map), + tok, + touchedAt: Date.now(), + }; + cache.set(hash, entry); + + // LRU evict if we're over budget. Drop the oldest by touchedAt. + if (cache.size > MAX_CACHED_MAPS) { + let oldestKey: string | undefined; + let oldestAt = Infinity; + for (const [k, v] of cache.entries()) { + if (v.touchedAt < oldestAt) { + oldestAt = v.touchedAt; + oldestKey = k; + } + } + if (oldestKey) { + cache.delete(oldestKey); + logger.info( + `[Codec] vocab cache evicted ${oldestKey.slice(0, 14)}... (LRU, size=${cache.size})`, + ); + } + } + + logger.info( + `[Codec] vocab map ${entry.mapId} cached as ${hash.slice(0, 14)}... (cache size=${cache.size})`, + ); + return entry; +} + +/** Look up by hash without loading. Useful when a client only sends + * the hash on subsequent requests (assuming the map is already + * cached). Returns undefined if not cached — caller should require + * a full {url, hash} pair on cache miss. */ +export function lookupVocabMap(hash: string): CachedVocab | undefined { + const entry = cache.get(hash); + if (entry) { + entry.touchedAt = Date.now(); + } + return entry; +} + +/** Drop a cached map. Mostly for tests + an admin reset endpoint. */ +export function evictVocabMap(hash: string): boolean { + return cache.delete(hash); +} + +/** Diagnostic — number of maps cached + their hashes. Used by + * the /health/codec endpoint to confirm cache state. */ +export function vocabCacheStatus(): { count: number; hashes: string[] } { + return { + count: cache.size, + hashes: [...cache.keys()], + }; +} diff --git a/apps/backend/src/routers/mcp-proxy/metamcp.ts b/apps/backend/src/routers/mcp-proxy/metamcp.ts index a25d8ad3..f8e705c1 100644 --- a/apps/backend/src/routers/mcp-proxy/metamcp.ts +++ b/apps/backend/src/routers/mcp-proxy/metamcp.ts @@ -8,11 +8,39 @@ import express from "express"; import logger from "@/utils/logger"; import { createServer } from "../../lib/metamcp/index"; +import { + decodeCodecRequestBody, + wrapResponseForCodec, +} from "../../lib/metamcp/codec/codec-transcode"; +import { negotiateStreamFormat } from "../../lib/metamcp/codec/codec-frame"; +import { negotiateResponseEncoding } from "../../lib/metamcp/codec/codec-compression"; +import { + extractCodecArgsMeta, + loadVocabFromHeader, +} from "../../lib/metamcp/codec/codec-content"; +import { lookupVocabMap } from "../../lib/metamcp/codec/codec-vocab"; import { mcpServerPool } from "../../lib/metamcp/mcp-server-pool"; import { betterAuthMcpMiddleware } from "../../middleware/better-auth-mcp.middleware"; const metamcpRouter = express.Router(); +// Codec opt-in raw-body parser. Only kicks in when the client posts +// `application/x-codec-msgpack` or `…-protobuf`. JSON-RPC traffic on +// the same route is parsed by the SDK's existing JSON middleware and +// is byte-for-byte unchanged. +const codecRawBodyParser = express.raw({ + type: [ + "application/x-codec-msgpack", + "application/x-codec-protobuf", + ], + // 4 MB matches the SDK's default JSON body limit; tool-call inputs + // larger than this are unusual and would have failed on the JSON + // path too. + limit: "4mb", +}); + +metamcpRouter.use(codecRawBodyParser); + // Apply better auth middleware to all metamcp routes metamcpRouter.use(betterAuthMcpMiddleware); @@ -93,6 +121,109 @@ metamcpRouter.post("/:uuid/mcp", async (req, res) => { } | undefined; + // ── Codec negotiation ───────────────────────────────────────────── + // Request and response negotiation are INDEPENDENT — see the + // matching block in routers/public-metamcp/streamable-http.ts for + // the rationale. Request decode keys off Content-Type; response + // wrap keys off ?stream_format / Accept. + const reqContentType = req.headers["content-type"] as string | undefined; + const reqCodecFormat: ReturnType = + reqContentType?.includes("application/x-codec-msgpack") + ? "msgpack" + : reqContentType?.includes("application/x-codec-protobuf") + ? "protobuf" + : undefined; + if (reqCodecFormat) { + try { + decodeCodecRequestBody(req, reqCodecFormat); + } catch (error) { + logger.error(`Codec request decode failed (${reqCodecFormat}):`, error); + res.status(400).json({ + jsonrpc: "2.0", + id: null, + error: { + code: -32700, + message: `Codec request body could not be decoded as ${reqCodecFormat}`, + }, + }); + return; + } + } + + // Vocab map (optional, per-request via X-Codec-Map header). When + // present, enables CallToolResult content tokenization and + // tools/call args detokenization at this seam. + let vocabHash: string | undefined; + const codecMapHeader = req.headers["x-codec-map"] as string | undefined; + if (codecMapHeader) { + try { + const loaded = await loadVocabFromHeader(codecMapHeader); + vocabHash = loaded?.hash; + } catch (error) { + logger.error(`X-Codec-Map header rejected:`, error); + res.status(400).json({ + jsonrpc: "2.0", + id: null, + error: { + code: -32600, + message: `Invalid X-Codec-Map header: ${(error as Error).message}`, + }, + }); + return; + } + } + + // Detokenize Codec tools/call args inline before the SDK sees them. + const adminBody = req.body as + | { method?: string; params?: { arguments?: unknown } } + | undefined; + if (adminBody?.method === "tools/call") { + const meta = extractCodecArgsMeta(adminBody.params?.arguments); + if (meta) { + const vocab = lookupVocabMap(meta.map_id); + if (!vocab) { + res.status(400).json({ + jsonrpc: "2.0", + id: null, + error: { + code: -32600, + message: + `tools/call args reference vocab map ${meta.map_id} but it isn't cached. ` + + `Send X-Codec-Map alongside this request.`, + }, + }); + return; + } + const text = vocab.detok.render(meta.ids, { partial: false }); + try { + (adminBody.params as { arguments: unknown }).arguments = JSON.parse(text); + } catch (error) { + res.status(400).json({ + jsonrpc: "2.0", + id: null, + error: { + code: -32700, + message: `Codec args detokenized to non-JSON text: ${(error as Error).message}`, + }, + }); + return; + } + } + } + + const respCodecFormat = negotiateStreamFormat( + req.query as Record, + req.headers.accept as string | undefined, + ); + if (respCodecFormat) { + const acceptEncoding = req.headers["accept-encoding"] as string | undefined; + const codecEncoding = negotiateResponseEncoding(acceptEncoding); + wrapResponseForCodec(res, respCodecFormat, codecEncoding, vocabHash); + // SDK rejects unknown Accept values with 406 — spoof JSON+SSE so + // it proceeds; we re-frame the bytes on the way out. + req.headers.accept = "application/json, text/event-stream"; + } + if (!sessionId) { try { logger.info( diff --git a/apps/backend/src/routers/public-metamcp/streamable-http.ts b/apps/backend/src/routers/public-metamcp/streamable-http.ts index 15221ee7..d0996a24 100644 --- a/apps/backend/src/routers/public-metamcp/streamable-http.ts +++ b/apps/backend/src/routers/public-metamcp/streamable-http.ts @@ -11,11 +11,36 @@ import { lookupEndpoint } from "@/middleware/lookup-endpoint-middleware"; import { rateLimitMiddleware } from "@/middleware/rate-limit.middleware"; import logger from "@/utils/logger"; +import { negotiateResponseEncoding } from "../../lib/metamcp/codec/codec-compression"; +import { + extractCodecArgsMeta, + loadVocabFromHeader, +} from "../../lib/metamcp/codec/codec-content"; +import { negotiateStreamFormat } from "../../lib/metamcp/codec/codec-frame"; +import { + decodeCodecRequestBody, + wrapResponseForCodec, +} from "../../lib/metamcp/codec/codec-transcode"; +import { lookupVocabMap } from "../../lib/metamcp/codec/codec-vocab"; import { metaMcpServerPool } from "../../lib/metamcp/metamcp-server-pool"; import { SessionLifetimeManagerImpl } from "../../lib/session-lifetime-manager"; const streamableHttpRouter = express.Router(); +// Codec opt-in raw-body parser. Only kicks in when the client posts +// `application/x-codec-msgpack` or `…-protobuf`. JSON-RPC traffic on +// the same routes is parsed by the SDK's existing JSON middleware +// and is byte-for-byte unchanged. +streamableHttpRouter.use( + express.raw({ + type: [ + "application/x-codec-msgpack", + "application/x-codec-protobuf", + ], + limit: "4mb", + }), +); + // Session lifetime manager for StreamableHTTP sessions const sessionManager = new SessionLifetimeManagerImpl( @@ -117,6 +142,136 @@ streamableHttpRouter.post( const { namespaceUuid, endpointName } = authReq; const sessionId = req.headers["mcp-session-id"] as string | undefined; + // ── Codec negotiation ─────────────────────────────────────────── + // Request and response negotiation are INDEPENDENT: + // - Content-Type: application/x-codec-msgpack on the request + // means decode the inbound body as msgpack before handing + // to the SDK. Always pairs with the matching response format. + // - ?stream_format=… or Accept: application/x-codec-… means + // wrap the response so the SDK's JSON-RPC writes emit Codec + // frames. The inbound body can still be plain JSON — the + // client may want JSON-in / Codec-out for migration paths. + // Pinning these two together (as the first version did) breaks + // the JSON-in / Codec-out path: a JSON body fails msgpack-decode + // before the SDK even sees it. + const reqContentType = req.headers["content-type"] as string | undefined; + const reqCodecFormat: ReturnType = + reqContentType?.includes("application/x-codec-msgpack") + ? "msgpack" + : reqContentType?.includes("application/x-codec-protobuf") + ? "protobuf" + : undefined; + if (reqCodecFormat) { + try { + decodeCodecRequestBody(req, reqCodecFormat); + } catch (error) { + logger.error( + `Codec request decode failed (${reqCodecFormat}):`, + error, + ); + res.status(400).json({ + jsonrpc: "2.0", + id: null, + error: { + code: -32700, + message: `Codec request body could not be decoded as ${reqCodecFormat}`, + }, + }); + return; + } + } + + // ── Vocab map negotiation ─────────────────────────────────────── + // Optional X-Codec-Map: ;sha256= header. If present, + // load + cache the map so detokenize/tokenize transforms can run + // synchronously downstream. Per-request: the same client can + // switch vocabs by changing the header. + let vocabHash: string | undefined; + const codecMapHeader = req.headers["x-codec-map"] as string | undefined; + if (codecMapHeader) { + try { + const loaded = await loadVocabFromHeader(codecMapHeader); + vocabHash = loaded?.hash; + } catch (error) { + logger.error(`X-Codec-Map header rejected:`, error); + res.status(400).json({ + jsonrpc: "2.0", + id: null, + error: { + code: -32600, + message: `Invalid X-Codec-Map header: ${(error as Error).message}`, + }, + }); + return; + } + } + + // ── Detokenize tools/call args carried as Codec ──────────────── + // If the request body is a tools/call whose `arguments` is a + // Codec-encoded block, replace it with the detokenized JSON + // object before the SDK ever sees it. The MCP server downstream + // gets the same JSON it would have gotten without Codec — the + // tokenizer/detokenizer pair is purely a wire-side optimization. + const body = req.body as + | { method?: string; params?: { arguments?: unknown } } + | undefined; + if (body?.method === "tools/call") { + const meta = extractCodecArgsMeta(body.params?.arguments); + if (meta) { + const vocab = lookupVocabMap(meta.map_id); + if (!vocab) { + res.status(400).json({ + jsonrpc: "2.0", + id: null, + error: { + code: -32600, + message: + `tools/call args reference vocab map ${meta.map_id} but it isn't cached. ` + + `Send X-Codec-Map: ;sha256=${meta.map_id} alongside this request.`, + }, + }); + return; + } + const text = vocab.detok.render(meta.ids, { partial: false }); + try { + (body.params as { arguments: unknown }).arguments = JSON.parse(text); + } catch (error) { + res.status(400).json({ + jsonrpc: "2.0", + id: null, + error: { + code: -32700, + message: `Codec args detokenized to non-JSON text: ${(error as Error).message}`, + }, + }); + return; + } + } + } + + const respCodecFormat = negotiateStreamFormat( + req.query as Record, + req.headers.accept as string | undefined, + ); + if (respCodecFormat) { + const acceptEncoding = req.headers["accept-encoding"] as + | string + | undefined; + const codecEncoding = negotiateResponseEncoding(acceptEncoding); + // Pass the vocab hash so wrapResponseForCodec runs the + // CallToolResult content tokenizer on every response in this + // request's lifecycle. Without a vocab, the wire still gets + // reframed as msgpack but text content stays as-is. + wrapResponseForCodec(res, respCodecFormat, codecEncoding, vocabHash); + // The SDK's StreamableHTTPServerTransport runs its own Accept + // negotiation against `application/json` + `text/event-stream` + // and returns 406 for anything else. Spoof the header so the + // SDK accepts the request — the wrapResponseForCodec layer + // above will re-frame whatever bytes the SDK writes back into + // the Codec wire format on the way out. + req.headers.accept = "application/json, text/event-stream"; + } + // Log authentication information for debugging logger.info(`POST /mcp request for endpoint: ${endpointName}`); logger.info(`Authentication method: ${authReq.authMethod || "none"}`);