Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/Storages/ExternalStream/ExternalStreamSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ class ASTStorage;
M(Bool, use_environment_credentials, false, "Use credentials from environment, where it's applicable", 0) \
M(Bool, log_stats, false, "If set to true, print statistics to the logs. Note that, the statistics could contain quite a lot of data. The frequency of the statistics logs is control by the statistics.interval.ms property.", 0) \
M(Milliseconds, consumer_stall_timeout_ms, 60 * 1000, "Define the amount of time when a consumer is not making any progress, then consider the consumer stalled, and then a new consumer will be created. Adjust the value based on how busy a topic is. Use small values for a busy topic to avoid big latency. Use big values for less busy topics to avoid disruption. Set to 0 to disable the behavior.", 0) \
M(Milliseconds, connection_timeout_ms, 10 * 1000, "Timeout in milliseconds for establishing a connection to a broker.", 0)
M(Milliseconds, connection_timeout_ms, 10 * 1000, "Timeout in milliseconds for establishing a connection to a broker.", 0) \
M(String, message_key_schema_name, "", "The schema name for the message key.", 0)


#define LOG_FILE_EXTERNAL_STREAM_SETTINGS(M, ALIAS) \
M(String, log_files, "", "A comma-separated list of log files", 0) \
Expand Down
59 changes: 33 additions & 26 deletions src/Storages/ExternalStream/Kafka/Kafka.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include <Storages/parseShards.h>
#include <Common/ProtonCommon.h>
#include <Common/logger_useful.h>
#include "Formats/FormatSettings.h"
Comment thread
burnerlee marked this conversation as resolved.
Outdated
#include "Formats/KafkaSchemaRegistryForAvro.h"

#include <boost/algorithm/string/classification.hpp>
#include <boost/algorithm/string/predicate.hpp>
Expand Down Expand Up @@ -197,33 +199,21 @@ DB::Kafka::Conf createConfFromSettings(const KafkaExternalStreamSettings & setti
return conf;
}

void validateMessageKeyColumnType(const DataTypePtr & type)
{
static std::vector<TypeIndex> supported_types{
TypeIndex::Bool,
TypeIndex::UInt8,
TypeIndex::UInt16,
TypeIndex::UInt32,
TypeIndex::UInt64,
TypeIndex::Int8,
TypeIndex::Int16,
TypeIndex::Int32,
TypeIndex::Int64,
TypeIndex::Float32,
TypeIndex::Float64,
TypeIndex::String,
TypeIndex::FixedString,
};
const std::vector<TypeIndex> raw_message_key_types{
TypeIndex::Bool, TypeIndex::UInt8, TypeIndex::UInt16, TypeIndex::UInt32, TypeIndex::UInt64,
TypeIndex::Int8, TypeIndex::Int16, TypeIndex::Int32, TypeIndex::Int64,
TypeIndex::Float32, TypeIndex::Float64, TypeIndex::String, TypeIndex::FixedString,
};

const std::vector<TypeIndex> avro_message_key_types{TypeIndex::String, TypeIndex::FixedString};

void validateMessageKeyColumnType(const DataTypePtr & type, const std::vector<TypeIndex> & allowed)
{
if (type->isNullable())
{
validateMessageKeyColumnType(static_cast<const DataTypeNullable &>(*type).getNestedType());
}
else
{
if (std::ranges::none_of(supported_types, [type](auto supported_type) { return supported_type == type->getTypeId(); }))
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "`_tp_message_key` column does not support type {}", type->getName());
}
validateMessageKeyColumnType(
static_cast<const DataTypeNullable &>(*type).getNestedType(), allowed);
else if (std::ranges::none_of(allowed, [&](auto t) { return t == type->getTypeId(); }))
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "`_tp_message_key` column does not support type {}", type->getName());
}

void validateMessageHeadersColumnType(const DataTypePtr & type)
Expand Down Expand Up @@ -275,6 +265,12 @@ void Kafka::verifySettings(const ExternalStreamSettingsPtr & new_settings, bool
}
}

if (!settings->message_key_schema_name.value.empty()){
if (!hasSchemaRegistryUrl()){
throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "`message_key_schema_name` is only supported when `kafka_schema_registry_url` is set");
}
}

const auto & columns = getInMemoryMetadataPtr()->getColumns();
const bool has_event_time = columns.has(ProtonConsts::RESERVED_EVENT_TIME);
const bool has_message_key = columns.has(ProtonConsts::RESERVED_MESSAGE_KEY);
Expand Down Expand Up @@ -348,13 +344,22 @@ Kafka::Kafka(

if (has_message_key)
{
validateMessageKeyColumnType(columns.getColumn({GetColumnsOptions::Kind::All}, ProtonConsts::RESERVED_MESSAGE_KEY).type);
validateMessageKeyColumnType(
columns.getColumn({GetColumnsOptions::Kind::All}, ProtonConsts::RESERVED_MESSAGE_KEY).type,
settings->message_key_schema_name.value.empty() ? raw_message_key_types : avro_message_key_types);

if (hasCustomShardingExpr())
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"`sharding_expr` cannot be set when the `{}` column is defined",
ProtonConsts::RESERVED_MESSAGE_KEY);

if (!settings->message_key_schema_name.value.empty())
{
FormatSettings key_format_settings = getFormatSettings(context);
key_format_settings.kafka_schema_registry.subject_name = settings->message_key_schema_name.value;
avro_key_schema_registry = KafkaSchemaRegistryForAvro::getOrCreate(key_format_settings);
}
}

if (has_message_headers)
Expand Down Expand Up @@ -589,6 +594,8 @@ Pipe Kafka::read(
high_watermark,
max_block_size,
settings->consumer_stall_timeout_ms.totalMilliseconds(),
avro_key_schema_registry,
settings->message_key_schema_name.value,
external_stream_counter,
context,
logger));
Expand Down
3 changes: 3 additions & 0 deletions src/Storages/ExternalStream/Kafka/Kafka.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <Formats/KafkaSchemaRegistryForAvro.h>
#include <IO/Kafka/Connection.h>
#include <Storages/ExternalStream/ExternalStreamCounter.h>
#include <Storages/ExternalStream/ExternalStreamSettings.h>
Expand Down Expand Up @@ -108,6 +109,8 @@ class Kafka final : public StorageExternalStreamImpl
DB::Kafka::ConnectionPtr client;

UInt64 poll_timeout_ms = 0;

std::shared_ptr<KafkaSchemaRegistryForAvro> avro_key_schema_registry;
};

}
Expand Down
68 changes: 59 additions & 9 deletions src/Storages/ExternalStream/Kafka/KafkaSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,22 @@
#include <Checkpoint/CheckpointCoordinator.h>
#include <Cluster/Common/Constants.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <Formats/Avro/InputStreamReadBufferAdapter.h>
#include <Formats/FormatFactory.h>
#include <Formats/KafkaSchemaRegistry.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Processors/Executors/StreamingFormatExecutor.h>
#include <Processors/Formats/Impl/AvroRowInputFormat.h>
#include <Storages/ExternalStream/Kafka/Kafka.h>
#include <base/ClockUtils.h>
#include <Common/Exception.h>
#include <Common/ProtonCommon.h>

#include <memory>
#include <utility>

namespace DB
Expand Down Expand Up @@ -144,6 +149,8 @@ KafkaSource::KafkaSource(
std::optional<Int64> high_watermark_,
size_t max_block_size_,
UInt64 consumer_stall_timeout_ms,
std::shared_ptr<KafkaSchemaRegistryForAvro> avro_key_schema_registry_,
String avro_key_schema_subject_,
ExternalStreamCounterPtr external_stream_counter_,
ContextPtr query_context_,
LoggerPtr logger_)
Expand All @@ -153,6 +160,8 @@ KafkaSource::KafkaSource(
, virtual_col_value_functions(header.columns(), nullptr)
, virtual_col_types(header.columns(), nullptr)
, ignore_format_errors(format_settings.ignore_parsing_errors)
, avro_key_schema_registry(std::move(avro_key_schema_registry_))
, avro_key_schema_subject(std::move(avro_key_schema_subject_))
, offset(offset_)
, high_watermark(high_watermark_.value_or(std::numeric_limits<Int64>::max()))
, consumer(std::move(consumer_))
Expand Down Expand Up @@ -259,7 +268,7 @@ void KafkaSource::readAndProcess()
DB::Kafka::WatermarkOffsets skipped_messages;

auto callback = [this, &skipped_messages](const void * rkmessage, size_t /*total_count*/, void * /*data*/) {
auto * message = static_cast<const rd_kafka_message_t *>(rkmessage);
const auto * message = static_cast<const rd_kafka_message_t *>(rkmessage);
/// We have seen this happened, and do not know how. So, just in case.
if (message->offset < offset)
{
Expand Down Expand Up @@ -436,6 +445,23 @@ void KafkaSource::parseFormat(const rd_kafka_message_t * kmessage)
}
}

Field KafkaSource::decodeAvroKey(const rd_kafka_message_t * kmessage) const
{
ReadBufferFromMemory key_buf(static_cast<const char *>(kmessage->key), kmessage->key_len);
UInt32 schema_id = KafkaSchemaRegistry::readSchemaId(key_buf);
auto schema = avro_key_schema_registry->getSchema(schema_id);
auto avro_stream = std::make_unique<AvroInputStreamReadBufferAdapter>(key_buf);
auto avro_decoder = avro::binaryDecoder();
avro_decoder->init(*avro_stream);
Block key_header{ColumnWithTypeAndName(std::make_shared<DataTypeString>(), "_avro_key")};
Comment thread
burnerlee marked this conversation as resolved.
Outdated
FormatSettings fs;
AvroDeserializer deserializer(key_header, schema, /*allow_missing_fields=*/true, /*null_as_default=*/false, fs);
MutableColumns cols = key_header.cloneEmptyColumns();
RowReadExtension ext;
deserializer.deserializeRow(cols, *avro_decoder, ext);
return (*cols[0])[0];
}

void KafkaSource::getPhysicalHeader()
{
auto non_virtual_header = storage_snapshot->metadata->getSampleBlockNonMaterialized();
Expand Down Expand Up @@ -641,11 +667,23 @@ void KafkaSource::getPhysicalHeader()
[[fallthrough]];
case TypeIndex::FixedString:
{
virtual_col_value_functions[pos] = [inside_nullable](const rd_kafka_message_t * kmessage) -> Field {
if (inside_nullable && kmessage->key_len == 0)
return Null{};
return {static_cast<char *>(kmessage->key), kmessage->key_len};
};
if (avro_key_schema_registry)
{
virtual_col_value_functions[pos] = [this, inside_nullable](const rd_kafka_message_t * kmessage) -> Field
{
if (inside_nullable && kmessage->key_len == 0)
return Null{};
Comment thread
burnerlee marked this conversation as resolved.
Outdated
return decodeAvroKey(kmessage);
};
}
else
{
virtual_col_value_functions[pos] = [inside_nullable](const rd_kafka_message_t * kmessage) -> Field {
if (inside_nullable && kmessage->key_len == 0)
return Null{};
return {static_cast<char *>(kmessage->key), kmessage->key_len};
};
}
break;
}
default:
Expand Down Expand Up @@ -745,8 +783,20 @@ void KafkaSource::getPhysicalHeader()
}
else if (column.name == ProtonConsts::RESERVED_MESSAGE_KEY)
{
virtual_col_value_functions[pos]
= [](const rd_kafka_message_t * kmessage) -> String { return {static_cast<char *>(kmessage->key), kmessage->key_len}; };
if (avro_key_schema_registry)
{
virtual_col_value_functions[pos] = [this](const rd_kafka_message_t * kmessage) -> Field
{
if (kmessage->key_len == 0)
return String{};
return decodeAvroKey(kmessage);
};
}
else
{
virtual_col_value_functions[pos]
= [](const rd_kafka_message_t * kmessage) -> String { return {static_cast<char *>(kmessage->key), kmessage->key_len}; };
}
virtual_col_types[pos] = column.type;
}
else
Expand Down Expand Up @@ -896,7 +946,7 @@ Strings KafkaSource::doFetchData(const Streaming::SequenceRange & sn_range)
auto callback = [&count, &results](const void * rkmessage, size_t /*total_count*/, void * /*data*/) {
if (count > 0)
{
auto * message = static_cast<const rd_kafka_message_t *>(rkmessage);
const auto * message = static_cast<const rd_kafka_message_t *>(rkmessage);
results.emplace_back(static_cast<const char *>(message->payload), message->len);
--count;
}
Expand Down
9 changes: 9 additions & 0 deletions src/Storages/ExternalStream/Kafka/KafkaSource.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <memory>
#include <Checkpoint/CheckpointRequest.h>
#include <IO/Kafka/Client.h>
#include <IO/ReadBufferFromMemory.h>
Expand All @@ -10,6 +11,7 @@
#include <Storages/StorageSnapshot.h>
#include <Common/Stopwatch.h>
#include <Common/TimeBasedThrottler.h>
#include "Formats/KafkaSchemaRegistryForAvro.h"

struct rd_kafka_message_s;

Expand Down Expand Up @@ -37,6 +39,8 @@ class KafkaSource final : public Streaming::ISource, public ExternalStreamSource
std::optional<Int64> high_watermark_,
size_t max_block_size_,
UInt64 consumer_stall_timeout_ms,
std::shared_ptr<KafkaSchemaRegistryForAvro> avro_key_schema_registry_,
String avro_key_schema_subject_,
ExternalStreamCounterPtr external_stream_counter_,
ContextPtr query_context_,
LoggerPtr logger_);
Expand Down Expand Up @@ -88,6 +92,8 @@ class KafkaSource final : public Streaming::ISource, public ExternalStreamSource

void getPhysicalHeader() override;

Field decodeAvroKey(const rd_kafka_message_t * kmessage) const;

Strings doFetchData(const Streaming::SequenceRange &) override;

const String topic;
Expand All @@ -98,6 +104,9 @@ class KafkaSource final : public Streaming::ISource, public ExternalStreamSource
bool ignore_format_errors = false;
bool request_virtual_columns = false;

std::shared_ptr<KafkaSchemaRegistryForAvro> avro_key_schema_registry;
String avro_key_schema_subject;
Comment thread
burnerlee marked this conversation as resolved.
Outdated

std::vector<std::pair<Chunk, Streaming::SequenceRange>> result_chunks_with_sns;
std::vector<std::pair<Chunk, Streaming::SequenceRange>>::iterator iter;
MutableColumns current_batch;
Expand Down