diff --git a/src/Storages/ExternalStream/ExternalStreamSettings.h b/src/Storages/ExternalStream/ExternalStreamSettings.h index aa096364dd..5ff7b09abb 100644 --- a/src/Storages/ExternalStream/ExternalStreamSettings.h +++ b/src/Storages/ExternalStream/ExternalStreamSettings.h @@ -35,7 +35,9 @@ class ASTStorage; M(Bool, use_environment_credentials, false, "Use credentials from environment, where it's applicable", 0) \ M(Bool, log_stats, false, "If set to true, print statistics to the logs. Note that, the statistics could contain quite a lot of data. The frequency of the statistics logs is control by the statistics.interval.ms property.", 0) \ M(Milliseconds, consumer_stall_timeout_ms, 60 * 1000, "Define the amount of time when a consumer is not making any progress, then consider the consumer stalled, and then a new consumer will be created. Adjust the value based on how busy a topic is. Use small values for a busy topic to avoid big latency. Use big values for less busy topics to avoid disruption. Set to 0 to disable the behavior.", 0) \ - M(Milliseconds, connection_timeout_ms, 10 * 1000, "Timeout in milliseconds for establishing a connection to a broker.", 0) + M(Milliseconds, connection_timeout_ms, 10 * 1000, "Timeout in milliseconds for establishing a connection to a broker.", 0) \ + M(String, message_key_schema_name, "", "The schema name for the message key.", 0) + #define LOG_FILE_EXTERNAL_STREAM_SETTINGS(M, ALIAS) \ M(String, log_files, "", "A comma-separated list of log files", 0) \ diff --git a/src/Storages/ExternalStream/Kafka/Kafka.cpp b/src/Storages/ExternalStream/Kafka/Kafka.cpp index cfe33f8706..d312bdd77f 100644 --- a/src/Storages/ExternalStream/Kafka/Kafka.cpp +++ b/src/Storages/ExternalStream/Kafka/Kafka.cpp @@ -22,6 +22,8 @@ #include #include #include +#include +#include #include #include @@ -200,33 +202,21 @@ DB::Kafka::Conf createConfFromSettings(const KafkaExternalStreamSettings & setti return conf; } -void validateMessageKeyColumnType(const DataTypePtr & type) -{ - static std::vector supported_types{ - TypeIndex::Bool, - TypeIndex::UInt8, - TypeIndex::UInt16, - TypeIndex::UInt32, - TypeIndex::UInt64, - TypeIndex::Int8, - TypeIndex::Int16, - TypeIndex::Int32, - TypeIndex::Int64, - TypeIndex::Float32, - TypeIndex::Float64, - TypeIndex::String, - TypeIndex::FixedString, - }; +const std::vector raw_message_key_types{ + TypeIndex::Bool, TypeIndex::UInt8, TypeIndex::UInt16, TypeIndex::UInt32, TypeIndex::UInt64, + TypeIndex::Int8, TypeIndex::Int16, TypeIndex::Int32, TypeIndex::Int64, + TypeIndex::Float32, TypeIndex::Float64, TypeIndex::String, TypeIndex::FixedString, +}; + +const std::vector avro_message_key_types{TypeIndex::String, TypeIndex::FixedString}; +void validateMessageKeyColumnType(const DataTypePtr & type, const std::vector & allowed) +{ if (type->isNullable()) - { - validateMessageKeyColumnType(static_cast(*type).getNestedType()); - } - else - { - if (std::ranges::none_of(supported_types, [type](auto supported_type) { return supported_type == type->getTypeId(); })) - throw Exception(ErrorCodes::ILLEGAL_COLUMN, "`_tp_message_key` column does not support type {}", type->getName()); - } + validateMessageKeyColumnType( + static_cast(*type).getNestedType(), allowed); + else if (std::ranges::none_of(allowed, [&](auto t) { return t == type->getTypeId(); })) + throw Exception(ErrorCodes::ILLEGAL_COLUMN, "`_tp_message_key` column does not support type {}", type->getName()); } void validateMessageHeadersColumnType(const DataTypePtr & type) @@ -278,6 +268,12 @@ void Kafka::verifySettings(const ExternalStreamSettingsPtr & new_settings, bool } } + if (!settings->message_key_schema_name.value.empty()){ + if (!hasSchemaRegistryUrl()){ + throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "`message_key_schema_name` is only supported when `kafka_schema_registry_url` is set"); + } + } + const auto & columns = getInMemoryMetadataPtr()->getColumns(); const bool has_event_time = columns.has(ProtonConsts::RESERVED_EVENT_TIME); const bool has_message_key = columns.has(ProtonConsts::RESERVED_MESSAGE_KEY); @@ -351,13 +347,22 @@ Kafka::Kafka( if (has_message_key) { - validateMessageKeyColumnType(columns.getColumn({GetColumnsOptions::Kind::All}, ProtonConsts::RESERVED_MESSAGE_KEY).type); + validateMessageKeyColumnType( + columns.getColumn({GetColumnsOptions::Kind::All}, ProtonConsts::RESERVED_MESSAGE_KEY).type, + settings->message_key_schema_name.value.empty() ? raw_message_key_types : avro_message_key_types); if (hasCustomShardingExpr()) throw Exception( ErrorCodes::INVALID_SETTING_VALUE, "`sharding_expr` cannot be set when the `{}` column is defined", ProtonConsts::RESERVED_MESSAGE_KEY); + + if (!settings->message_key_schema_name.value.empty()) + { + FormatSettings key_format_settings = getFormatSettings(context); + key_format_settings.kafka_schema_registry.subject_name = settings->message_key_schema_name.value; + avro_key_schema_registry = KafkaSchemaRegistryForAvro::getOrCreate(key_format_settings); + } } if (has_message_headers) @@ -592,6 +597,7 @@ Pipe Kafka::read( high_watermark, max_block_size, settings->consumer_stall_timeout_ms.totalMilliseconds(), + avro_key_schema_registry, external_stream_counter, context, logger)); @@ -618,6 +624,17 @@ SinkToStoragePtr Kafka::write(const ASTPtr & /*query*/, const StorageMetadataPtr if (hasSchemaRegistryUrl() && data_format == "ProtobufSingle") throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Write Protobuf data with schema registry is not supported"); + /// Encoding _tp_message_key as Avro binary (Confluent wire format) on write is not yet implemented. + /// Currently only decoding Avro-encoded keys on read is supported. When this is implemented, + /// the sink will need to: fetch the schema from the registry, serialize the key JSON string + /// into a GenericDatum, binary-encode it, and prepend the Confluent wire header (magic byte + schema ID). + if (avro_key_schema_registry) + throw Exception( + ErrorCodes::NOT_IMPLEMENTED, + "Writing Avro-encoded message keys via schema registry is not yet supported. " + "`message_key_schema_name` is currently read-only. " + "To write a plain-text message key, omit `message_key_schema_name` and insert a string into `_tp_message_key` directly."); + auto producer = client->getProducer(topicName()); auto sink = std::make_shared( diff --git a/src/Storages/ExternalStream/Kafka/Kafka.h b/src/Storages/ExternalStream/Kafka/Kafka.h index 2442794531..b03f1510bc 100644 --- a/src/Storages/ExternalStream/Kafka/Kafka.h +++ b/src/Storages/ExternalStream/Kafka/Kafka.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -108,6 +109,8 @@ class Kafka final : public StorageExternalStreamImpl DB::Kafka::ConnectionPtr client; UInt64 poll_timeout_ms = 0; + + std::shared_ptr avro_key_schema_registry; }; } diff --git a/src/Storages/ExternalStream/Kafka/KafkaSource.cpp b/src/Storages/ExternalStream/Kafka/KafkaSource.cpp index 103dd80f78..4b38f28868 100644 --- a/src/Storages/ExternalStream/Kafka/KafkaSource.cpp +++ b/src/Storages/ExternalStream/Kafka/KafkaSource.cpp @@ -4,10 +4,14 @@ #include #include #include +#include +#include #include +#include #include #include #include +#include #include #include #include @@ -15,6 +19,11 @@ #include #include +#include +#include +#include + +#include #include namespace DB @@ -144,6 +153,7 @@ KafkaSource::KafkaSource( std::optional high_watermark_, size_t max_block_size_, UInt64 consumer_stall_timeout_ms, + std::shared_ptr avro_key_schema_registry_, ExternalStreamCounterPtr external_stream_counter_, ContextPtr query_context_, LoggerPtr logger_) @@ -153,6 +163,7 @@ KafkaSource::KafkaSource( , virtual_col_value_functions(header.columns(), nullptr) , virtual_col_types(header.columns(), nullptr) , ignore_format_errors(format_settings.ignore_parsing_errors) + , avro_key_schema_registry(std::move(avro_key_schema_registry_)) , offset(offset_) , high_watermark(high_watermark_.value_or(std::numeric_limits::max())) , consumer(std::move(consumer_)) @@ -259,7 +270,7 @@ void KafkaSource::readAndProcess() DB::Kafka::WatermarkOffsets skipped_messages; auto callback = [this, &skipped_messages](const void * rkmessage, size_t /*total_count*/, void * /*data*/) { - auto * message = static_cast(rkmessage); + const auto * message = static_cast(rkmessage); /// We have seen this happened, and do not know how. So, just in case. if (message->offset < offset) { @@ -436,6 +447,33 @@ void KafkaSource::parseFormat(const rd_kafka_message_t * kmessage) } } +Field KafkaSource::decodeAvroKey(const rd_kafka_message_t * kmessage) const +{ + /// Decode the Avro-encoded Kafka message key and return it as a JSON string. + /// The key bytes follow the Confluent wire format: 1-byte magic (0x00) + 4-byte schema ID + Avro binary payload. + /// We deserialize the binary payload into a GenericDatum using the schema fetched from the registry, + /// then re-encode the datum as JSON so callers receive a human-readable string representation of the key record. + ReadBufferFromMemory key_buf(static_cast(kmessage->key), kmessage->key_len); + UInt32 schema_id = KafkaSchemaRegistry::readSchemaId(key_buf); + auto schema = avro_key_schema_registry->getSchema(schema_id); + + auto avro_in = std::make_unique(key_buf); + auto bin_decoder = avro::validatingDecoder(schema, avro::binaryDecoder()); + bin_decoder->init(*avro_in); + + avro::GenericDatum datum(schema); + avro::decode(*bin_decoder, datum); + + WriteBufferFromOwnString json_buf; + Avro::OutputStreamWriteBufferAdapter avro_out(json_buf); + auto json_encoder = avro::jsonEncoder(schema); + json_encoder->init(avro_out); + avro::encode(*json_encoder, datum); + json_encoder->flush(); + + return Field{json_buf.str()}; +} + void KafkaSource::getPhysicalHeader() { auto non_virtual_header = storage_snapshot->metadata->getSampleBlockNonMaterialized(); @@ -641,11 +679,23 @@ void KafkaSource::getPhysicalHeader() [[fallthrough]]; case TypeIndex::FixedString: { - virtual_col_value_functions[pos] = [inside_nullable](const rd_kafka_message_t * kmessage) -> Field { - if (inside_nullable && kmessage->key_len == 0) - return Null{}; - return {static_cast(kmessage->key), kmessage->key_len}; - }; + if (avro_key_schema_registry) + { + virtual_col_value_functions[pos] = [this, inside_nullable](const rd_kafka_message_t * kmessage) -> Field + { + if (kmessage->key_len == 0) + return inside_nullable ? Field{Null{}} : Field{String{}}; + return decodeAvroKey(kmessage); + }; + } + else + { + virtual_col_value_functions[pos] = [inside_nullable](const rd_kafka_message_t * kmessage) -> Field { + if (inside_nullable && kmessage->key_len == 0) + return Null{}; + return {static_cast(kmessage->key), kmessage->key_len}; + }; + } break; } default: @@ -745,8 +795,20 @@ void KafkaSource::getPhysicalHeader() } else if (column.name == ProtonConsts::RESERVED_MESSAGE_KEY) { - virtual_col_value_functions[pos] - = [](const rd_kafka_message_t * kmessage) -> String { return {static_cast(kmessage->key), kmessage->key_len}; }; + if (avro_key_schema_registry) + { + virtual_col_value_functions[pos] = [this](const rd_kafka_message_t * kmessage) -> Field + { + if (kmessage->key_len == 0) + return String{}; + return decodeAvroKey(kmessage); + }; + } + else + { + virtual_col_value_functions[pos] + = [](const rd_kafka_message_t * kmessage) -> String { return {static_cast(kmessage->key), kmessage->key_len}; }; + } virtual_col_types[pos] = column.type; } else @@ -896,7 +958,7 @@ Strings KafkaSource::doFetchData(const Streaming::SequenceRange & sn_range) auto callback = [&count, &results](const void * rkmessage, size_t /*total_count*/, void * /*data*/) { if (count > 0) { - auto * message = static_cast(rkmessage); + const auto * message = static_cast(rkmessage); results.emplace_back(static_cast(message->payload), message->len); --count; } diff --git a/src/Storages/ExternalStream/Kafka/KafkaSource.h b/src/Storages/ExternalStream/Kafka/KafkaSource.h index fd586065fe..15114e71c7 100644 --- a/src/Storages/ExternalStream/Kafka/KafkaSource.h +++ b/src/Storages/ExternalStream/Kafka/KafkaSource.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -10,6 +11,7 @@ #include #include #include +#include struct rd_kafka_message_s; @@ -37,6 +39,7 @@ class KafkaSource final : public Streaming::ISource, public ExternalStreamSource std::optional high_watermark_, size_t max_block_size_, UInt64 consumer_stall_timeout_ms, + std::shared_ptr avro_key_schema_registry_, ExternalStreamCounterPtr external_stream_counter_, ContextPtr query_context_, LoggerPtr logger_); @@ -88,6 +91,8 @@ class KafkaSource final : public Streaming::ISource, public ExternalStreamSource void getPhysicalHeader() override; + Field decodeAvroKey(const rd_kafka_message_t * kmessage) const; + Strings doFetchData(const Streaming::SequenceRange &) override; const String topic; @@ -98,6 +103,8 @@ class KafkaSource final : public Streaming::ISource, public ExternalStreamSource bool ignore_format_errors = false; bool request_virtual_columns = false; + std::shared_ptr avro_key_schema_registry; + std::vector> result_chunks_with_sns; std::vector>::iterator iter; MutableColumns current_batch;