From 0cabb909fddabaeb5c224df0fce3f5fe3fa5e803 Mon Sep 17 00:00:00 2001 From: Aviral Jain Date: Sat, 20 Jun 2026 15:22:19 +0530 Subject: [PATCH 1/4] feat(kafka): support Avro-encoded message key via schema registry Added a new setting `message_key_schema_name` to Kafka external streams. When set, the Kafka message key is decoded using the Confluent Avro wire format (5-byte prefix: 0x00 magic + 4-byte schema ID) against the named subject in the configured schema registry, and the result is emitted as a JSON string into the `_tp_message_key` virtual column. Changes: - ExternalStreamSettings.h: add `message_key_schema_name` String setting - Kafka.cpp: validate setting requires `kafka_schema_registry_url`; restrict `_tp_message_key` column to String/FixedString when Avro key mode is active; build and store a `KafkaSchemaRegistryForAvro` for the key subject in the constructor; pass registry and subject name to each KafkaSource - Kafka.h: add `avro_key_schema_registry` member; include KafkaSchemaRegistryForAvro - KafkaSource.h: add `avro_key_schema_registry` and `avro_key_schema_subject` members; add constructor params; declare `decodeAvroKey()` private method - KafkaSource.cpp: implement `decodeAvroKey()` using AvroDeserializer against a single String column header; branch both the typed (String/FixedString switch case) and untyped (RESERVED_MESSAGE_KEY fallback) virtual column paths to call `decodeAvroKey()` when the registry is present, raw bytes otherwise --- .../ExternalStream/ExternalStreamSettings.h | 4 +- src/Storages/ExternalStream/Kafka/Kafka.cpp | 59 +++++++++------- src/Storages/ExternalStream/Kafka/Kafka.h | 3 + .../ExternalStream/Kafka/KafkaSource.cpp | 68 ++++++++++++++++--- .../ExternalStream/Kafka/KafkaSource.h | 9 +++ 5 files changed, 107 insertions(+), 36 deletions(-) diff --git a/src/Storages/ExternalStream/ExternalStreamSettings.h b/src/Storages/ExternalStream/ExternalStreamSettings.h index 205f164c78f..28a73519be9 100644 --- a/src/Storages/ExternalStream/ExternalStreamSettings.h +++ b/src/Storages/ExternalStream/ExternalStreamSettings.h @@ -35,7 +35,9 @@ class ASTStorage; M(Bool, use_environment_credentials, false, "Use credentials from environment, where it's applicable", 0) \ M(Bool, log_stats, false, "If set to true, print statistics to the logs. Note that, the statistics could contain quite a lot of data. The frequency of the statistics logs is control by the statistics.interval.ms property.", 0) \ M(Milliseconds, consumer_stall_timeout_ms, 60 * 1000, "Define the amount of time when a consumer is not making any progress, then consider the consumer stalled, and then a new consumer will be created. Adjust the value based on how busy a topic is. Use small values for a busy topic to avoid big latency. Use big values for less busy topics to avoid disruption. Set to 0 to disable the behavior.", 0) \ - M(Milliseconds, connection_timeout_ms, 10 * 1000, "Timeout in milliseconds for establishing a connection to a broker.", 0) + M(Milliseconds, connection_timeout_ms, 10 * 1000, "Timeout in milliseconds for establishing a connection to a broker.", 0) \ + M(String, message_key_schema_name, "", "The schema name for the message key.", 0) + #define LOG_FILE_EXTERNAL_STREAM_SETTINGS(M, ALIAS) \ M(String, log_files, "", "A comma-separated list of log files", 0) \ diff --git a/src/Storages/ExternalStream/Kafka/Kafka.cpp b/src/Storages/ExternalStream/Kafka/Kafka.cpp index 851a2b29b41..ede8e7a5715 100644 --- a/src/Storages/ExternalStream/Kafka/Kafka.cpp +++ b/src/Storages/ExternalStream/Kafka/Kafka.cpp @@ -21,6 +21,8 @@ #include #include #include +#include "Formats/FormatSettings.h" +#include "Formats/KafkaSchemaRegistryForAvro.h" #include #include @@ -197,33 +199,21 @@ DB::Kafka::Conf createConfFromSettings(const KafkaExternalStreamSettings & setti return conf; } -void validateMessageKeyColumnType(const DataTypePtr & type) -{ - static std::vector supported_types{ - TypeIndex::Bool, - TypeIndex::UInt8, - TypeIndex::UInt16, - TypeIndex::UInt32, - TypeIndex::UInt64, - TypeIndex::Int8, - TypeIndex::Int16, - TypeIndex::Int32, - TypeIndex::Int64, - TypeIndex::Float32, - TypeIndex::Float64, - TypeIndex::String, - TypeIndex::FixedString, - }; +const std::vector raw_message_key_types{ + TypeIndex::Bool, TypeIndex::UInt8, TypeIndex::UInt16, TypeIndex::UInt32, TypeIndex::UInt64, + TypeIndex::Int8, TypeIndex::Int16, TypeIndex::Int32, TypeIndex::Int64, + TypeIndex::Float32, TypeIndex::Float64, TypeIndex::String, TypeIndex::FixedString, +}; +const std::vector avro_message_key_types{TypeIndex::String, TypeIndex::FixedString}; + +void validateMessageKeyColumnType(const DataTypePtr & type, const std::vector & allowed) +{ if (type->isNullable()) - { - validateMessageKeyColumnType(static_cast(*type).getNestedType()); - } - else - { - if (std::ranges::none_of(supported_types, [type](auto supported_type) { return supported_type == type->getTypeId(); })) - throw Exception(ErrorCodes::ILLEGAL_COLUMN, "`_tp_message_key` column does not support type {}", type->getName()); - } + validateMessageKeyColumnType( + static_cast(*type).getNestedType(), allowed); + else if (std::ranges::none_of(allowed, [&](auto t) { return t == type->getTypeId(); })) + throw Exception(ErrorCodes::ILLEGAL_COLUMN, "`_tp_message_key` column does not support type {}", type->getName()); } void validateMessageHeadersColumnType(const DataTypePtr & type) @@ -271,6 +261,12 @@ void Kafka::validateSettings(bool attach) } } + if (!settings->message_key_schema_name.value.empty()){ + if (!hasSchemaRegistryUrl()){ + throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "`message_key_schema_name` is only supported when `kafka_schema_registry_url` is set"); + } + } + const auto & columns = getInMemoryMetadataPtr()->getColumns(); const bool has_event_time = columns.has(ProtonConsts::RESERVED_EVENT_TIME); const bool has_message_key = columns.has(ProtonConsts::RESERVED_MESSAGE_KEY); @@ -344,13 +340,22 @@ Kafka::Kafka( if (columns.has(ProtonConsts::RESERVED_MESSAGE_KEY)) { - validateMessageKeyColumnType(columns.getColumn({GetColumnsOptions::Kind::All}, ProtonConsts::RESERVED_MESSAGE_KEY).type); + validateMessageKeyColumnType( + columns.getColumn({GetColumnsOptions::Kind::All}, ProtonConsts::RESERVED_MESSAGE_KEY).type, + settings->message_key_schema_name.value.empty() ? raw_message_key_types : avro_message_key_types); if (hasCustomShardingExpr()) throw Exception( ErrorCodes::INVALID_SETTING_VALUE, "`sharding_expr` cannot be set when the `{}` column is defined", ProtonConsts::RESERVED_MESSAGE_KEY); + + if (!settings->message_key_schema_name.value.empty()) + { + FormatSettings key_format_settings = getFormatSettings(context); + key_format_settings.kafka_schema_registry.subject_name = settings->message_key_schema_name.value; + avro_key_schema_registry = KafkaSchemaRegistryForAvro::getOrCreate(key_format_settings); + } } if (columns.has(ProtonConsts::RESERVED_MESSAGE_HEADERS)) @@ -585,6 +590,8 @@ Pipe Kafka::read( high_watermark, max_block_size, settings->consumer_stall_timeout_ms.totalMilliseconds(), + avro_key_schema_registry, + settings->message_key_schema_name.value, external_stream_counter, context, logger)); diff --git a/src/Storages/ExternalStream/Kafka/Kafka.h b/src/Storages/ExternalStream/Kafka/Kafka.h index 9168b1ff61a..e70f3d6134e 100644 --- a/src/Storages/ExternalStream/Kafka/Kafka.h +++ b/src/Storages/ExternalStream/Kafka/Kafka.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -106,6 +107,8 @@ class Kafka final : public StorageExternalStreamImpl DB::Kafka::ConnectionPtr client; UInt64 poll_timeout_ms = 0; + + std::shared_ptr avro_key_schema_registry; }; } diff --git a/src/Storages/ExternalStream/Kafka/KafkaSource.cpp b/src/Storages/ExternalStream/Kafka/KafkaSource.cpp index 103dd80f781..dfa58796a67 100644 --- a/src/Storages/ExternalStream/Kafka/KafkaSource.cpp +++ b/src/Storages/ExternalStream/Kafka/KafkaSource.cpp @@ -4,17 +4,22 @@ #include #include #include +#include +#include #include +#include #include #include #include #include #include +#include #include #include #include #include +#include #include namespace DB @@ -144,6 +149,8 @@ KafkaSource::KafkaSource( std::optional high_watermark_, size_t max_block_size_, UInt64 consumer_stall_timeout_ms, + std::shared_ptr avro_key_schema_registry_, + String avro_key_schema_subject_, ExternalStreamCounterPtr external_stream_counter_, ContextPtr query_context_, LoggerPtr logger_) @@ -153,6 +160,8 @@ KafkaSource::KafkaSource( , virtual_col_value_functions(header.columns(), nullptr) , virtual_col_types(header.columns(), nullptr) , ignore_format_errors(format_settings.ignore_parsing_errors) + , avro_key_schema_registry(std::move(avro_key_schema_registry_)) + , avro_key_schema_subject(std::move(avro_key_schema_subject_)) , offset(offset_) , high_watermark(high_watermark_.value_or(std::numeric_limits::max())) , consumer(std::move(consumer_)) @@ -259,7 +268,7 @@ void KafkaSource::readAndProcess() DB::Kafka::WatermarkOffsets skipped_messages; auto callback = [this, &skipped_messages](const void * rkmessage, size_t /*total_count*/, void * /*data*/) { - auto * message = static_cast(rkmessage); + const auto * message = static_cast(rkmessage); /// We have seen this happened, and do not know how. So, just in case. if (message->offset < offset) { @@ -436,6 +445,23 @@ void KafkaSource::parseFormat(const rd_kafka_message_t * kmessage) } } +Field KafkaSource::decodeAvroKey(const rd_kafka_message_t * kmessage) const +{ + ReadBufferFromMemory key_buf(static_cast(kmessage->key), kmessage->key_len); + UInt32 schema_id = KafkaSchemaRegistry::readSchemaId(key_buf); + auto schema = avro_key_schema_registry->getSchema(schema_id); + auto avro_stream = std::make_unique(key_buf); + auto avro_decoder = avro::binaryDecoder(); + avro_decoder->init(*avro_stream); + Block key_header{ColumnWithTypeAndName(std::make_shared(), "_avro_key")}; + FormatSettings fs; + AvroDeserializer deserializer(key_header, schema, /*allow_missing_fields=*/true, /*null_as_default=*/false, fs); + MutableColumns cols = key_header.cloneEmptyColumns(); + RowReadExtension ext; + deserializer.deserializeRow(cols, *avro_decoder, ext); + return (*cols[0])[0]; +} + void KafkaSource::getPhysicalHeader() { auto non_virtual_header = storage_snapshot->metadata->getSampleBlockNonMaterialized(); @@ -641,11 +667,23 @@ void KafkaSource::getPhysicalHeader() [[fallthrough]]; case TypeIndex::FixedString: { - virtual_col_value_functions[pos] = [inside_nullable](const rd_kafka_message_t * kmessage) -> Field { - if (inside_nullable && kmessage->key_len == 0) - return Null{}; - return {static_cast(kmessage->key), kmessage->key_len}; - }; + if (avro_key_schema_registry) + { + virtual_col_value_functions[pos] = [this, inside_nullable](const rd_kafka_message_t * kmessage) -> Field + { + if (inside_nullable && kmessage->key_len == 0) + return Null{}; + return decodeAvroKey(kmessage); + }; + } + else + { + virtual_col_value_functions[pos] = [inside_nullable](const rd_kafka_message_t * kmessage) -> Field { + if (inside_nullable && kmessage->key_len == 0) + return Null{}; + return {static_cast(kmessage->key), kmessage->key_len}; + }; + } break; } default: @@ -745,8 +783,20 @@ void KafkaSource::getPhysicalHeader() } else if (column.name == ProtonConsts::RESERVED_MESSAGE_KEY) { - virtual_col_value_functions[pos] - = [](const rd_kafka_message_t * kmessage) -> String { return {static_cast(kmessage->key), kmessage->key_len}; }; + if (avro_key_schema_registry) + { + virtual_col_value_functions[pos] = [this](const rd_kafka_message_t * kmessage) -> Field + { + if (kmessage->key_len == 0) + return String{}; + return decodeAvroKey(kmessage); + }; + } + else + { + virtual_col_value_functions[pos] + = [](const rd_kafka_message_t * kmessage) -> String { return {static_cast(kmessage->key), kmessage->key_len}; }; + } virtual_col_types[pos] = column.type; } else @@ -896,7 +946,7 @@ Strings KafkaSource::doFetchData(const Streaming::SequenceRange & sn_range) auto callback = [&count, &results](const void * rkmessage, size_t /*total_count*/, void * /*data*/) { if (count > 0) { - auto * message = static_cast(rkmessage); + const auto * message = static_cast(rkmessage); results.emplace_back(static_cast(message->payload), message->len); --count; } diff --git a/src/Storages/ExternalStream/Kafka/KafkaSource.h b/src/Storages/ExternalStream/Kafka/KafkaSource.h index fd586065feb..3a07192415c 100644 --- a/src/Storages/ExternalStream/Kafka/KafkaSource.h +++ b/src/Storages/ExternalStream/Kafka/KafkaSource.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -10,6 +11,7 @@ #include #include #include +#include "Formats/KafkaSchemaRegistryForAvro.h" struct rd_kafka_message_s; @@ -37,6 +39,8 @@ class KafkaSource final : public Streaming::ISource, public ExternalStreamSource std::optional high_watermark_, size_t max_block_size_, UInt64 consumer_stall_timeout_ms, + std::shared_ptr avro_key_schema_registry_, + String avro_key_schema_subject_, ExternalStreamCounterPtr external_stream_counter_, ContextPtr query_context_, LoggerPtr logger_); @@ -88,6 +92,8 @@ class KafkaSource final : public Streaming::ISource, public ExternalStreamSource void getPhysicalHeader() override; + Field decodeAvroKey(const rd_kafka_message_t * kmessage) const; + Strings doFetchData(const Streaming::SequenceRange &) override; const String topic; @@ -98,6 +104,9 @@ class KafkaSource final : public Streaming::ISource, public ExternalStreamSource bool ignore_format_errors = false; bool request_virtual_columns = false; + std::shared_ptr avro_key_schema_registry; + String avro_key_schema_subject; + std::vector> result_chunks_with_sns; std::vector>::iterator iter; MutableColumns current_batch; From 2e6002e3d7eb6c93d41c251cac4a3dea47984288 Mon Sep 17 00:00:00 2001 From: Aviral Jain Date: Thu, 25 Jun 2026 08:18:42 +0530 Subject: [PATCH 2/4] fix(kafka): address review nits for Avro key decoding MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Return empty string (not decode) when key_len == 0 and column is non-nullable - Remove unused avro_key_schema_subject param/member — schema ID comes from key bytes - Use angle-bracket includes for Formats headers in Kafka.cpp Co-Authored-By: Claude Sonnet 4.6 --- src/Storages/ExternalStream/Kafka/Kafka.cpp | 5 ++--- src/Storages/ExternalStream/Kafka/KafkaSource.cpp | 6 ++---- src/Storages/ExternalStream/Kafka/KafkaSource.h | 2 -- 3 files changed, 4 insertions(+), 9 deletions(-) diff --git a/src/Storages/ExternalStream/Kafka/Kafka.cpp b/src/Storages/ExternalStream/Kafka/Kafka.cpp index 8c2da59e511..4665cd19b3c 100644 --- a/src/Storages/ExternalStream/Kafka/Kafka.cpp +++ b/src/Storages/ExternalStream/Kafka/Kafka.cpp @@ -22,8 +22,8 @@ #include #include #include -#include "Formats/FormatSettings.h" -#include "Formats/KafkaSchemaRegistryForAvro.h" +#include +#include #include #include @@ -598,7 +598,6 @@ Pipe Kafka::read( max_block_size, settings->consumer_stall_timeout_ms.totalMilliseconds(), avro_key_schema_registry, - settings->message_key_schema_name.value, external_stream_counter, context, logger)); diff --git a/src/Storages/ExternalStream/Kafka/KafkaSource.cpp b/src/Storages/ExternalStream/Kafka/KafkaSource.cpp index dfa58796a67..8e438f85367 100644 --- a/src/Storages/ExternalStream/Kafka/KafkaSource.cpp +++ b/src/Storages/ExternalStream/Kafka/KafkaSource.cpp @@ -150,7 +150,6 @@ KafkaSource::KafkaSource( size_t max_block_size_, UInt64 consumer_stall_timeout_ms, std::shared_ptr avro_key_schema_registry_, - String avro_key_schema_subject_, ExternalStreamCounterPtr external_stream_counter_, ContextPtr query_context_, LoggerPtr logger_) @@ -161,7 +160,6 @@ KafkaSource::KafkaSource( , virtual_col_types(header.columns(), nullptr) , ignore_format_errors(format_settings.ignore_parsing_errors) , avro_key_schema_registry(std::move(avro_key_schema_registry_)) - , avro_key_schema_subject(std::move(avro_key_schema_subject_)) , offset(offset_) , high_watermark(high_watermark_.value_or(std::numeric_limits::max())) , consumer(std::move(consumer_)) @@ -671,8 +669,8 @@ void KafkaSource::getPhysicalHeader() { virtual_col_value_functions[pos] = [this, inside_nullable](const rd_kafka_message_t * kmessage) -> Field { - if (inside_nullable && kmessage->key_len == 0) - return Null{}; + if (kmessage->key_len == 0) + return inside_nullable ? Field{Null{}} : Field{String{}}; return decodeAvroKey(kmessage); }; } diff --git a/src/Storages/ExternalStream/Kafka/KafkaSource.h b/src/Storages/ExternalStream/Kafka/KafkaSource.h index 3a07192415c..04c845e64cf 100644 --- a/src/Storages/ExternalStream/Kafka/KafkaSource.h +++ b/src/Storages/ExternalStream/Kafka/KafkaSource.h @@ -40,7 +40,6 @@ class KafkaSource final : public Streaming::ISource, public ExternalStreamSource size_t max_block_size_, UInt64 consumer_stall_timeout_ms, std::shared_ptr avro_key_schema_registry_, - String avro_key_schema_subject_, ExternalStreamCounterPtr external_stream_counter_, ContextPtr query_context_, LoggerPtr logger_); @@ -105,7 +104,6 @@ class KafkaSource final : public Streaming::ISource, public ExternalStreamSource bool request_virtual_columns = false; std::shared_ptr avro_key_schema_registry; - String avro_key_schema_subject; std::vector> result_chunks_with_sns; std::vector>::iterator iter; From 265f6a33d14c8634897c3d09471c21e682f6d25d Mon Sep 17 00:00:00 2001 From: Aviral Jain Date: Thu, 25 Jun 2026 08:48:41 +0530 Subject: [PATCH 3/4] fix(kafka): decode Avro key to JSON string via GenericDatum Replace broken AvroDeserializer approach (which mapped Avro fields by column name and always returned empty string) with GenericDatum round-trip: binary Avro -> GenericDatum -> Avro JSON string. Also fix include style in KafkaSource.h (angle brackets). Co-Authored-By: Claude Sonnet 4.6 --- .../ExternalStream/Kafka/KafkaSource.cpp | 38 +++++++++++++------ .../ExternalStream/Kafka/KafkaSource.h | 2 +- 2 files changed, 27 insertions(+), 13 deletions(-) diff --git a/src/Storages/ExternalStream/Kafka/KafkaSource.cpp b/src/Storages/ExternalStream/Kafka/KafkaSource.cpp index 8e438f85367..4b38f28868b 100644 --- a/src/Storages/ExternalStream/Kafka/KafkaSource.cpp +++ b/src/Storages/ExternalStream/Kafka/KafkaSource.cpp @@ -4,21 +4,25 @@ #include #include #include -#include #include +#include #include #include #include #include #include +#include #include #include -#include #include #include #include #include +#include +#include +#include + #include #include @@ -445,19 +449,29 @@ void KafkaSource::parseFormat(const rd_kafka_message_t * kmessage) Field KafkaSource::decodeAvroKey(const rd_kafka_message_t * kmessage) const { + /// Decode the Avro-encoded Kafka message key and return it as a JSON string. + /// The key bytes follow the Confluent wire format: 1-byte magic (0x00) + 4-byte schema ID + Avro binary payload. + /// We deserialize the binary payload into a GenericDatum using the schema fetched from the registry, + /// then re-encode the datum as JSON so callers receive a human-readable string representation of the key record. ReadBufferFromMemory key_buf(static_cast(kmessage->key), kmessage->key_len); UInt32 schema_id = KafkaSchemaRegistry::readSchemaId(key_buf); auto schema = avro_key_schema_registry->getSchema(schema_id); - auto avro_stream = std::make_unique(key_buf); - auto avro_decoder = avro::binaryDecoder(); - avro_decoder->init(*avro_stream); - Block key_header{ColumnWithTypeAndName(std::make_shared(), "_avro_key")}; - FormatSettings fs; - AvroDeserializer deserializer(key_header, schema, /*allow_missing_fields=*/true, /*null_as_default=*/false, fs); - MutableColumns cols = key_header.cloneEmptyColumns(); - RowReadExtension ext; - deserializer.deserializeRow(cols, *avro_decoder, ext); - return (*cols[0])[0]; + + auto avro_in = std::make_unique(key_buf); + auto bin_decoder = avro::validatingDecoder(schema, avro::binaryDecoder()); + bin_decoder->init(*avro_in); + + avro::GenericDatum datum(schema); + avro::decode(*bin_decoder, datum); + + WriteBufferFromOwnString json_buf; + Avro::OutputStreamWriteBufferAdapter avro_out(json_buf); + auto json_encoder = avro::jsonEncoder(schema); + json_encoder->init(avro_out); + avro::encode(*json_encoder, datum); + json_encoder->flush(); + + return Field{json_buf.str()}; } void KafkaSource::getPhysicalHeader() diff --git a/src/Storages/ExternalStream/Kafka/KafkaSource.h b/src/Storages/ExternalStream/Kafka/KafkaSource.h index 04c845e64cf..15114e71c7e 100644 --- a/src/Storages/ExternalStream/Kafka/KafkaSource.h +++ b/src/Storages/ExternalStream/Kafka/KafkaSource.h @@ -11,7 +11,7 @@ #include #include #include -#include "Formats/KafkaSchemaRegistryForAvro.h" +#include struct rd_kafka_message_s; From 9d76672425d0851a70e53e4e57b5f49e497e043f Mon Sep 17 00:00:00 2001 From: Aviral Jain Date: Thu, 25 Jun 2026 09:54:12 +0530 Subject: [PATCH 4/4] fix(kafka): throw NOT_IMPLEMENTED when writing Avro-encoded message key MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Writing _tp_message_key as Confluent Avro binary is not yet implemented — only decoding on read is supported. Throw a clear error on write with guidance to use a plain string key instead. Co-Authored-By: Claude Sonnet 4.6 --- src/Storages/ExternalStream/Kafka/Kafka.cpp | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/Storages/ExternalStream/Kafka/Kafka.cpp b/src/Storages/ExternalStream/Kafka/Kafka.cpp index 4665cd19b3c..d312bdd77fc 100644 --- a/src/Storages/ExternalStream/Kafka/Kafka.cpp +++ b/src/Storages/ExternalStream/Kafka/Kafka.cpp @@ -624,6 +624,17 @@ SinkToStoragePtr Kafka::write(const ASTPtr & /*query*/, const StorageMetadataPtr if (hasSchemaRegistryUrl() && data_format == "ProtobufSingle") throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Write Protobuf data with schema registry is not supported"); + /// Encoding _tp_message_key as Avro binary (Confluent wire format) on write is not yet implemented. + /// Currently only decoding Avro-encoded keys on read is supported. When this is implemented, + /// the sink will need to: fetch the schema from the registry, serialize the key JSON string + /// into a GenericDatum, binary-encode it, and prepend the Confluent wire header (magic byte + schema ID). + if (avro_key_schema_registry) + throw Exception( + ErrorCodes::NOT_IMPLEMENTED, + "Writing Avro-encoded message keys via schema registry is not yet supported. " + "`message_key_schema_name` is currently read-only. " + "To write a plain-text message key, omit `message_key_schema_name` and insert a string into `_tp_message_key` directly."); + auto producer = client->getProducer(topicName()); auto sink = std::make_shared(