Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions tools/server/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,64 @@
include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR})

# ── Codec binary streaming (optional, opt-in at runtime) ─────────────────
#
# The Codec protocol (https://github.com/wdunn001/Codec) lets clients ask
# for a binary token-ID stream instead of JSON SSE — ~14× wire reduction
# uncompressed, ~45× with Content-Encoding: br. Fully backwards compatible:
# clients that don't pass `stream_format` get the existing SSE path.
#
# We try three sources in order, all controlled by LLAMA_CODEC:
# AUTO (default) prefer system-installed libcodec (vcpkg / OS package
# manager), fall back to FetchContent from GitHub
# SYSTEM fail if no system-installed libcodec is found
# FETCH always FetchContent
# OFF disable the binary streaming endpoint entirely
set(LLAMA_CODEC AUTO CACHE STRING "Codec binary streaming support: AUTO|SYSTEM|FETCH|OFF")
set_property(CACHE LLAMA_CODEC PROPERTY STRINGS AUTO SYSTEM FETCH OFF)

set(LLAMA_CODEC_TARGET "")
if (NOT LLAMA_CODEC STREQUAL "OFF")
if (LLAMA_CODEC STREQUAL "AUTO" OR LLAMA_CODEC STREQUAL "SYSTEM")
find_package(codec CONFIG QUIET)
if (codec_FOUND)
set(LLAMA_CODEC_TARGET codec::codec)
message(STATUS "Codec: using system-installed libcodec")
elseif (LLAMA_CODEC STREQUAL "SYSTEM")
message(FATAL_ERROR "LLAMA_CODEC=SYSTEM but find_package(codec) failed.\n"
"Install via vcpkg (`vcpkg install codec`) or set LLAMA_CODEC=FETCH.")
endif()
endif()

if (NOT LLAMA_CODEC_TARGET AND (LLAMA_CODEC STREQUAL "AUTO" OR LLAMA_CODEC STREQUAL "FETCH"))
include(FetchContent)
FetchContent_Declare(codec
GIT_REPOSITORY https://github.com/wdunn001/Codec.git
GIT_TAG main
SOURCE_SUBDIR packages/c)
# libcodec respects PROJECT_IS_TOP_LEVEL — it won't drag tests/examples
# into the parent build, just the library targets.
set(CODEC_BUILD_TESTS OFF CACHE BOOL "" FORCE)
set(CODEC_BUILD_EXAMPLES OFF CACHE BOOL "" FORCE)
set(CODEC_INSTALL OFF CACHE BOOL "" FORCE)
FetchContent_MakeAvailable(codec)
# libcodec creates `codec::codec` as the universal alias (resolves
# to whichever of shared/static was built). Prefer the static
# target when explicitly available so we don't drag a DLL alongside
# llama-server, but fall back to `codec_static` (no namespace) if
# the alias didn't propagate through FetchContent's add_subdirectory.
if (TARGET codec::codec_static)
set(LLAMA_CODEC_TARGET codec::codec_static)
elseif (TARGET codec_static)
set(LLAMA_CODEC_TARGET codec_static)
elseif (TARGET codec::codec)
set(LLAMA_CODEC_TARGET codec::codec)
else()
message(FATAL_ERROR "Codec: FetchContent succeeded but no usable target found.")
endif()
message(STATUS "Codec: vendoring libcodec via FetchContent (target=${LLAMA_CODEC_TARGET})")
endif()
endif()

# server-context containing the core server logic, used by llama-server and CLI

set(TARGET server-context)
Expand Down Expand Up @@ -27,6 +86,11 @@ target_include_directories(${TARGET} PRIVATE ../mtmd)
target_include_directories(${TARGET} PRIVATE ${CMAKE_SOURCE_DIR})
target_link_libraries(${TARGET} PUBLIC llama-common mtmd ${CMAKE_THREAD_LIBS_INIT})

if (LLAMA_CODEC_TARGET)
target_compile_definitions(${TARGET} PRIVATE LLAMA_HAVE_CODEC=1)
target_link_libraries(${TARGET} PRIVATE ${LLAMA_CODEC_TARGET})
endif()


# llama-server executable

Expand Down
260 changes: 260 additions & 0 deletions tools/server/server-context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
#include "server-task.h"
#include "server-queue.h"

#if defined(LLAMA_HAVE_CODEC)
#include <codec/codec.h>
#endif

#include "build-info.h"
#include "common.h"
#include "llama.h"
Expand Down Expand Up @@ -3309,6 +3313,262 @@ std::unique_ptr<server_res_generator> server_routes::handle_completions_impl(
dynamic_cast<server_task_result_cmpl_final*> (first_result.get()) != nullptr
);

// ── Codec binary streaming (opt-in) ────────────────────────────────
// When the request body sets stream_format to "msgpack" or "protobuf",
// emit raw token IDs as Codec frames instead of JSON SSE. The wire
// shrinks ~14× (msgpack) or more with Content-Encoding negotiation,
// and downstream agents can feed the IDs straight back without a
// text round-trip. See https://github.com/wdunn001/Codec.
#if defined(LLAMA_HAVE_CODEC)
const std::string stream_format = json_value(data, "stream_format", std::string("json"));
if (stream_format == "msgpack" || stream_format == "protobuf") {
const bool use_protobuf = stream_format == "protobuf";

// ── ToolWatcher (opt-in) ───────────────────────────────────────────
// When `tool_watcher: true`, scan the outbound token stream for a
// (start_id, end_id) pair and surface the captured region as a
// structured tool_call on the same frame whose ids come from
// immediately after the region. Mirrors sglang PR #24557 / the
// libcodec C99 watcher (~ns per token, no detokenize on the hot
// path). Markers default to Qwen 2.5+'s `<tool_call>`/`</tool_call>`
// and can be overridden per request — the strings must each
// resolve to exactly one special token in the loaded model's
// vocab. Failure to resolve disables the watcher and surfaces a
// warning header; binary streaming still works in plain
// passthrough mode.
struct codec_watcher_state {
codec_tool_watcher_t * w = nullptr;
const llama_vocab * vocab = nullptr;
uint32_t next_call_id = 1;
~codec_watcher_state() {
if (w) codec_tool_watcher_free(w);
}
};
std::shared_ptr<codec_watcher_state> ws;

const bool want_watcher = json_value(data, "tool_watcher", false);
if (want_watcher) {
const std::string start_marker =
json_value(data, "tool_watcher_start", std::string("<tool_call>"));
const std::string end_marker =
json_value(data, "tool_watcher_end", std::string("</tool_call>"));

// parse_special=true so the tokenizer treats `<tool_call>` as
// the single special token registered in the GGUF vocab,
// rather than splitting into <, tool_call, > pieces.
std::vector<llama_token> start_ids = common_tokenize(
ctx_server.vocab, start_marker, /*add_special*/ false, /*parse_special*/ true);
std::vector<llama_token> end_ids = common_tokenize(
ctx_server.vocab, end_marker, /*add_special*/ false, /*parse_special*/ true);

if (start_ids.size() == 1 && end_ids.size() == 1) {
auto state = std::make_shared<codec_watcher_state>();
state->vocab = ctx_server.vocab;
if (codec_tool_watcher_new_with_ids(
(uint32_t) start_ids[0], (uint32_t) end_ids[0], &state->w) == CODEC_OK) {
ws = std::move(state);
} else {
SRV_WRN("Codec ToolWatcher: failed to allocate watcher (markers '%s'/'%s'); "
"falling back to plain Codec streaming.\n",
start_marker.c_str(), end_marker.c_str());
}
} else {
SRV_WRN("Codec ToolWatcher: markers '%s'/'%s' do not resolve to single special "
"tokens in this model's vocab (got %zu/%zu tokens). Falling back to "
"plain Codec streaming.\n",
start_marker.c_str(), end_marker.c_str(),
start_ids.size(), end_ids.size());
}
}

auto encode_frame_bytes = [use_protobuf, ws](
const llama_tokens & tokens, bool done, const std::string & finish_reason) -> std::string {
std::vector<uint32_t> raw_ids;
raw_ids.reserve(tokens.size());
for (auto t : tokens) {
// Negative IDs would indicate a server bug; clamp to 0 defensively.
raw_ids.push_back(t < 0 ? 0u : (uint32_t) t);
}

// Frame buffers. When the watcher is active, `frame_ids` holds
// the passthrough subset; when not, it's the full chunk.
std::vector<uint32_t> frame_ids;
struct pending_call_t { std::string name, args, id; };
std::vector<pending_call_t> pending;

if (ws && ws->w) {
codec_watcher_event_t * events = nullptr;
size_t nev = 0;
if (codec_tool_watcher_feed(ws->w, raw_ids.data(), raw_ids.size(),
&events, &nev) == CODEC_OK) {
for (size_t i = 0; i < nev; i++) {
if (events[i].kind == CODEC_WATCH_PASSTHROUGH) {
frame_ids.insert(frame_ids.end(),
events[i].ids,
events[i].ids + events[i].ids_len);
} else { /* CODEC_WATCH_REGION_END */
// Detokenize body locally — llama.cpp owns the
// vocab; no Codec map fetch needed. Tool-call
// bodies are JSON, so render WITHOUT special
// tokens (they were already consumed by the
// watcher).
std::string body;
for (size_t j = 0; j < events[i].ids_len; j++) {
body += common_token_to_piece(
ws->vocab,
(llama_token) events[i].ids[j],
/*special*/ false);
}

// Parse name field if the standard
// {"name":"...","arguments":{...}} shape matches.
// Malformed JSON still produces an event — the
// raw body rides along as arguments_json so the
// orchestrator can return an error to the model.
std::string name;
try {
auto parsed = json::parse(body);
if (parsed.is_object() && parsed.contains("name")
&& parsed["name"].is_string()) {
name = parsed["name"].get<std::string>();
}
} catch (const std::exception &) { /* keep raw body */ }

char id_buf[20];
snprintf(id_buf, sizeof(id_buf), "tc_%08x",
(unsigned) ws->next_call_id++);

pending_call_t pc;
pc.args = std::move(body);
pc.name = std::move(name);
pc.id = id_buf;
pending.push_back(std::move(pc));
}
}
} else {
// Watcher feed failed (OOM); pass IDs through unmodified
// so the stream stays usable.
frame_ids = raw_ids;
}
} else {
frame_ids = raw_ids;
}

// Build the codec_tool_call_t array AFTER `pending` is fully
// populated — c_str() pointers must stay stable through the
// codec_encode_* call below, which they do as long as
// `pending` is no longer mutated.
std::vector<codec_tool_call_t> tool_calls;
tool_calls.reserve(pending.size());
for (auto & pc : pending) {
codec_tool_call_t tc{};
tc.name = pc.name.empty() ? nullptr : pc.name.c_str();
tc.arguments_json = pc.args.c_str();
tc.id = pc.id.c_str();
tool_calls.push_back(tc);
}

codec_frame_t frame;
codec_frame_init(&frame);
frame.ids = frame_ids.data();
frame.ids_len = frame_ids.size();
frame.done = done;
std::string fr = finish_reason;
frame.finish_reason = fr.empty() ? nullptr : fr.data();
frame.tool_calls = tool_calls.empty() ? nullptr : tool_calls.data();
frame.tool_calls_len = tool_calls.size();

codec_buffer_t buf{nullptr, 0};
codec_status_t st = use_protobuf
? codec_encode_protobuf(&frame, &buf)
: codec_encode_msgpack(&frame, &buf);
// Don't let codec_frame_destroy free the borrowed pointers.
frame.ids = nullptr; frame.ids_len = 0;
frame.finish_reason = nullptr;
frame.tool_calls = nullptr; frame.tool_calls_len = 0;
if (st != CODEC_OK) {
return std::string();
}
std::string out((const char *) buf.data, buf.len);
codec_buffer_free(&buf);
return out;
};

auto extract_finish_reason = [](server_task_result * r) -> std::string {
auto * fin = dynamic_cast<server_task_result_cmpl_final *>(r);
if (!fin) return "";
switch (fin->stop) {
case STOP_TYPE_EOS: return "eos_token";
case STOP_TYPE_LIMIT: return "length";
case STOP_TYPE_WORD: return "stop_sequence";
default: return "";
}
};

// Capture-by-value for both inner lambdas so the closures stay
// valid after this function returns and `res->next` runs later
// from the streaming dispatcher.
auto frame_bytes_from_result = [encode_frame_bytes, extract_finish_reason](
server_task_result * r, bool & is_terminal) -> std::string {
if (auto * partial = dynamic_cast<server_task_result_cmpl_partial *>(r)) {
is_terminal = false;
return encode_frame_bytes(partial->tokens, /*done=*/false, "");
}
auto * fin = dynamic_cast<server_task_result_cmpl_final *>(r);
GGML_ASSERT(fin != nullptr);
is_terminal = true;
return encode_frame_bytes(fin->tokens, /*done=*/true, extract_finish_reason(fin));
};

bool first_is_terminal = false;
res->data = frame_bytes_from_result(first_result.get(), first_is_terminal);
res->status = 200;
res->content_type = use_protobuf ? "application/x-protobuf" : "application/x-msgpack";
res->next = [res_this = res.get(), encode_frame_bytes,
frame_bytes_from_result, &req](std::string & output) -> bool {
try {
if (req.should_stop()) {
return false;
}
if (!res_this->data.empty()) {
output = std::move(res_this->data);
res_this->data.clear();
return true;
}

server_response_reader & rd = res_this->rd;
if (!rd.has_next()) {
// Codec has no [DONE] sentinel — done=true on the
// final frame already terminated the stream.
output.clear();
return false;
}

auto result = rd.next(req.should_stop);
if (result == nullptr) {
GGML_ASSERT(req.should_stop());
return false;
}

if (result->is_error()) {
// Emit a terminal error frame so binary clients can
// distinguish a server error from a clean truncation.
output = encode_frame_bytes({}, /*done=*/true, "error");
return false;
}

bool is_terminal = false;
output = frame_bytes_from_result(result.get(), is_terminal);
return !is_terminal;
} catch (const std::exception &) {
output = encode_frame_bytes({}, /*done=*/true, "error");
return false;
}
};
return res;
}
#endif // LLAMA_HAVE_CODEC

// next responses are streamed
// to be sent immediately
json first_result_json = first_result->to_json();
Expand Down