diff --git a/tools/server/CMakeLists.txt b/tools/server/CMakeLists.txt index 71cc0e7a8c2..1718840135d 100644 --- a/tools/server/CMakeLists.txt +++ b/tools/server/CMakeLists.txt @@ -1,5 +1,64 @@ include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR}) +# ── Codec binary streaming (optional, opt-in at runtime) ───────────────── +# +# The Codec protocol (https://github.com/wdunn001/Codec) lets clients ask +# for a binary token-ID stream instead of JSON SSE — ~14× wire reduction +# uncompressed, ~45× with Content-Encoding: br. Fully backwards compatible: +# clients that don't pass `stream_format` get the existing SSE path. +# +# We try three sources in order, all controlled by LLAMA_CODEC: +# AUTO (default) prefer system-installed libcodec (vcpkg / OS package +# manager), fall back to FetchContent from GitHub +# SYSTEM fail if no system-installed libcodec is found +# FETCH always FetchContent +# OFF disable the binary streaming endpoint entirely +set(LLAMA_CODEC AUTO CACHE STRING "Codec binary streaming support: AUTO|SYSTEM|FETCH|OFF") +set_property(CACHE LLAMA_CODEC PROPERTY STRINGS AUTO SYSTEM FETCH OFF) + +set(LLAMA_CODEC_TARGET "") +if (NOT LLAMA_CODEC STREQUAL "OFF") + if (LLAMA_CODEC STREQUAL "AUTO" OR LLAMA_CODEC STREQUAL "SYSTEM") + find_package(codec CONFIG QUIET) + if (codec_FOUND) + set(LLAMA_CODEC_TARGET codec::codec) + message(STATUS "Codec: using system-installed libcodec") + elseif (LLAMA_CODEC STREQUAL "SYSTEM") + message(FATAL_ERROR "LLAMA_CODEC=SYSTEM but find_package(codec) failed.\n" + "Install via vcpkg (`vcpkg install codec`) or set LLAMA_CODEC=FETCH.") + endif() + endif() + + if (NOT LLAMA_CODEC_TARGET AND (LLAMA_CODEC STREQUAL "AUTO" OR LLAMA_CODEC STREQUAL "FETCH")) + include(FetchContent) + FetchContent_Declare(codec + GIT_REPOSITORY https://github.com/wdunn001/Codec.git + GIT_TAG main + SOURCE_SUBDIR packages/c) + # libcodec respects PROJECT_IS_TOP_LEVEL — it won't drag tests/examples + # into the parent build, just the library targets. + set(CODEC_BUILD_TESTS OFF CACHE BOOL "" FORCE) + set(CODEC_BUILD_EXAMPLES OFF CACHE BOOL "" FORCE) + set(CODEC_INSTALL OFF CACHE BOOL "" FORCE) + FetchContent_MakeAvailable(codec) + # libcodec creates `codec::codec` as the universal alias (resolves + # to whichever of shared/static was built). Prefer the static + # target when explicitly available so we don't drag a DLL alongside + # llama-server, but fall back to `codec_static` (no namespace) if + # the alias didn't propagate through FetchContent's add_subdirectory. + if (TARGET codec::codec_static) + set(LLAMA_CODEC_TARGET codec::codec_static) + elseif (TARGET codec_static) + set(LLAMA_CODEC_TARGET codec_static) + elseif (TARGET codec::codec) + set(LLAMA_CODEC_TARGET codec::codec) + else() + message(FATAL_ERROR "Codec: FetchContent succeeded but no usable target found.") + endif() + message(STATUS "Codec: vendoring libcodec via FetchContent (target=${LLAMA_CODEC_TARGET})") + endif() +endif() + # server-context containing the core server logic, used by llama-server and CLI set(TARGET server-context) @@ -27,6 +86,11 @@ target_include_directories(${TARGET} PRIVATE ../mtmd) target_include_directories(${TARGET} PRIVATE ${CMAKE_SOURCE_DIR}) target_link_libraries(${TARGET} PUBLIC llama-common mtmd ${CMAKE_THREAD_LIBS_INIT}) +if (LLAMA_CODEC_TARGET) + target_compile_definitions(${TARGET} PRIVATE LLAMA_HAVE_CODEC=1) + target_link_libraries(${TARGET} PRIVATE ${LLAMA_CODEC_TARGET}) +endif() + # llama-server executable diff --git a/tools/server/server-context.cpp b/tools/server/server-context.cpp index 4b28033d9f3..7a8a1e993d8 100644 --- a/tools/server/server-context.cpp +++ b/tools/server/server-context.cpp @@ -6,6 +6,10 @@ #include "server-task.h" #include "server-queue.h" +#if defined(LLAMA_HAVE_CODEC) +#include +#endif + #include "build-info.h" #include "common.h" #include "llama.h" @@ -3309,6 +3313,262 @@ std::unique_ptr server_routes::handle_completions_impl( dynamic_cast (first_result.get()) != nullptr ); + // ── Codec binary streaming (opt-in) ──────────────────────────────── + // When the request body sets stream_format to "msgpack" or "protobuf", + // emit raw token IDs as Codec frames instead of JSON SSE. The wire + // shrinks ~14× (msgpack) or more with Content-Encoding negotiation, + // and downstream agents can feed the IDs straight back without a + // text round-trip. See https://github.com/wdunn001/Codec. +#if defined(LLAMA_HAVE_CODEC) + const std::string stream_format = json_value(data, "stream_format", std::string("json")); + if (stream_format == "msgpack" || stream_format == "protobuf") { + const bool use_protobuf = stream_format == "protobuf"; + + // ── ToolWatcher (opt-in) ─────────────────────────────────────────── + // When `tool_watcher: true`, scan the outbound token stream for a + // (start_id, end_id) pair and surface the captured region as a + // structured tool_call on the same frame whose ids come from + // immediately after the region. Mirrors sglang PR #24557 / the + // libcodec C99 watcher (~ns per token, no detokenize on the hot + // path). Markers default to Qwen 2.5+'s ``/`` + // and can be overridden per request — the strings must each + // resolve to exactly one special token in the loaded model's + // vocab. Failure to resolve disables the watcher and surfaces a + // warning header; binary streaming still works in plain + // passthrough mode. + struct codec_watcher_state { + codec_tool_watcher_t * w = nullptr; + const llama_vocab * vocab = nullptr; + uint32_t next_call_id = 1; + ~codec_watcher_state() { + if (w) codec_tool_watcher_free(w); + } + }; + std::shared_ptr ws; + + const bool want_watcher = json_value(data, "tool_watcher", false); + if (want_watcher) { + const std::string start_marker = + json_value(data, "tool_watcher_start", std::string("")); + const std::string end_marker = + json_value(data, "tool_watcher_end", std::string("")); + + // parse_special=true so the tokenizer treats `` as + // the single special token registered in the GGUF vocab, + // rather than splitting into <, tool_call, > pieces. + std::vector start_ids = common_tokenize( + ctx_server.vocab, start_marker, /*add_special*/ false, /*parse_special*/ true); + std::vector end_ids = common_tokenize( + ctx_server.vocab, end_marker, /*add_special*/ false, /*parse_special*/ true); + + if (start_ids.size() == 1 && end_ids.size() == 1) { + auto state = std::make_shared(); + state->vocab = ctx_server.vocab; + if (codec_tool_watcher_new_with_ids( + (uint32_t) start_ids[0], (uint32_t) end_ids[0], &state->w) == CODEC_OK) { + ws = std::move(state); + } else { + SRV_WRN("Codec ToolWatcher: failed to allocate watcher (markers '%s'/'%s'); " + "falling back to plain Codec streaming.\n", + start_marker.c_str(), end_marker.c_str()); + } + } else { + SRV_WRN("Codec ToolWatcher: markers '%s'/'%s' do not resolve to single special " + "tokens in this model's vocab (got %zu/%zu tokens). Falling back to " + "plain Codec streaming.\n", + start_marker.c_str(), end_marker.c_str(), + start_ids.size(), end_ids.size()); + } + } + + auto encode_frame_bytes = [use_protobuf, ws]( + const llama_tokens & tokens, bool done, const std::string & finish_reason) -> std::string { + std::vector raw_ids; + raw_ids.reserve(tokens.size()); + for (auto t : tokens) { + // Negative IDs would indicate a server bug; clamp to 0 defensively. + raw_ids.push_back(t < 0 ? 0u : (uint32_t) t); + } + + // Frame buffers. When the watcher is active, `frame_ids` holds + // the passthrough subset; when not, it's the full chunk. + std::vector frame_ids; + struct pending_call_t { std::string name, args, id; }; + std::vector pending; + + if (ws && ws->w) { + codec_watcher_event_t * events = nullptr; + size_t nev = 0; + if (codec_tool_watcher_feed(ws->w, raw_ids.data(), raw_ids.size(), + &events, &nev) == CODEC_OK) { + for (size_t i = 0; i < nev; i++) { + if (events[i].kind == CODEC_WATCH_PASSTHROUGH) { + frame_ids.insert(frame_ids.end(), + events[i].ids, + events[i].ids + events[i].ids_len); + } else { /* CODEC_WATCH_REGION_END */ + // Detokenize body locally — llama.cpp owns the + // vocab; no Codec map fetch needed. Tool-call + // bodies are JSON, so render WITHOUT special + // tokens (they were already consumed by the + // watcher). + std::string body; + for (size_t j = 0; j < events[i].ids_len; j++) { + body += common_token_to_piece( + ws->vocab, + (llama_token) events[i].ids[j], + /*special*/ false); + } + + // Parse name field if the standard + // {"name":"...","arguments":{...}} shape matches. + // Malformed JSON still produces an event — the + // raw body rides along as arguments_json so the + // orchestrator can return an error to the model. + std::string name; + try { + auto parsed = json::parse(body); + if (parsed.is_object() && parsed.contains("name") + && parsed["name"].is_string()) { + name = parsed["name"].get(); + } + } catch (const std::exception &) { /* keep raw body */ } + + char id_buf[20]; + snprintf(id_buf, sizeof(id_buf), "tc_%08x", + (unsigned) ws->next_call_id++); + + pending_call_t pc; + pc.args = std::move(body); + pc.name = std::move(name); + pc.id = id_buf; + pending.push_back(std::move(pc)); + } + } + } else { + // Watcher feed failed (OOM); pass IDs through unmodified + // so the stream stays usable. + frame_ids = raw_ids; + } + } else { + frame_ids = raw_ids; + } + + // Build the codec_tool_call_t array AFTER `pending` is fully + // populated — c_str() pointers must stay stable through the + // codec_encode_* call below, which they do as long as + // `pending` is no longer mutated. + std::vector tool_calls; + tool_calls.reserve(pending.size()); + for (auto & pc : pending) { + codec_tool_call_t tc{}; + tc.name = pc.name.empty() ? nullptr : pc.name.c_str(); + tc.arguments_json = pc.args.c_str(); + tc.id = pc.id.c_str(); + tool_calls.push_back(tc); + } + + codec_frame_t frame; + codec_frame_init(&frame); + frame.ids = frame_ids.data(); + frame.ids_len = frame_ids.size(); + frame.done = done; + std::string fr = finish_reason; + frame.finish_reason = fr.empty() ? nullptr : fr.data(); + frame.tool_calls = tool_calls.empty() ? nullptr : tool_calls.data(); + frame.tool_calls_len = tool_calls.size(); + + codec_buffer_t buf{nullptr, 0}; + codec_status_t st = use_protobuf + ? codec_encode_protobuf(&frame, &buf) + : codec_encode_msgpack(&frame, &buf); + // Don't let codec_frame_destroy free the borrowed pointers. + frame.ids = nullptr; frame.ids_len = 0; + frame.finish_reason = nullptr; + frame.tool_calls = nullptr; frame.tool_calls_len = 0; + if (st != CODEC_OK) { + return std::string(); + } + std::string out((const char *) buf.data, buf.len); + codec_buffer_free(&buf); + return out; + }; + + auto extract_finish_reason = [](server_task_result * r) -> std::string { + auto * fin = dynamic_cast(r); + if (!fin) return ""; + switch (fin->stop) { + case STOP_TYPE_EOS: return "eos_token"; + case STOP_TYPE_LIMIT: return "length"; + case STOP_TYPE_WORD: return "stop_sequence"; + default: return ""; + } + }; + + // Capture-by-value for both inner lambdas so the closures stay + // valid after this function returns and `res->next` runs later + // from the streaming dispatcher. + auto frame_bytes_from_result = [encode_frame_bytes, extract_finish_reason]( + server_task_result * r, bool & is_terminal) -> std::string { + if (auto * partial = dynamic_cast(r)) { + is_terminal = false; + return encode_frame_bytes(partial->tokens, /*done=*/false, ""); + } + auto * fin = dynamic_cast(r); + GGML_ASSERT(fin != nullptr); + is_terminal = true; + return encode_frame_bytes(fin->tokens, /*done=*/true, extract_finish_reason(fin)); + }; + + bool first_is_terminal = false; + res->data = frame_bytes_from_result(first_result.get(), first_is_terminal); + res->status = 200; + res->content_type = use_protobuf ? "application/x-protobuf" : "application/x-msgpack"; + res->next = [res_this = res.get(), encode_frame_bytes, + frame_bytes_from_result, &req](std::string & output) -> bool { + try { + if (req.should_stop()) { + return false; + } + if (!res_this->data.empty()) { + output = std::move(res_this->data); + res_this->data.clear(); + return true; + } + + server_response_reader & rd = res_this->rd; + if (!rd.has_next()) { + // Codec has no [DONE] sentinel — done=true on the + // final frame already terminated the stream. + output.clear(); + return false; + } + + auto result = rd.next(req.should_stop); + if (result == nullptr) { + GGML_ASSERT(req.should_stop()); + return false; + } + + if (result->is_error()) { + // Emit a terminal error frame so binary clients can + // distinguish a server error from a clean truncation. + output = encode_frame_bytes({}, /*done=*/true, "error"); + return false; + } + + bool is_terminal = false; + output = frame_bytes_from_result(result.get(), is_terminal); + return !is_terminal; + } catch (const std::exception &) { + output = encode_frame_bytes({}, /*done=*/true, "error"); + return false; + } + }; + return res; + } +#endif // LLAMA_HAVE_CODEC + // next responses are streamed // to be sent immediately json first_result_json = first_result->to_json();