Skip to content

feat(kafka): support Avro-encoded message key via schema registry#1201

Open
burnerlee wants to merge 6 commits into
timeplus-io:developfrom
burnerlee:feat/kafka-avro-message-key
Open

feat(kafka): support Avro-encoded message key via schema registry#1201
burnerlee wants to merge 6 commits into
timeplus-io:developfrom
burnerlee:feat/kafka-avro-message-key

Conversation

@burnerlee

@burnerlee burnerlee commented Jun 21, 2026

Copy link
Copy Markdown
Contributor

PR checklist:

  • Did you run ClangFormat? — new code follows surrounding style; all edited files are in Proton-specific paths
  • Did you separate headers to a different section in existing community code base? — new includes added in alphabetical order within existing include blocks
  • Did you surround proton: starts/ends for new code in existing community code base? — not applicable, all changes are in src/Storages/ExternalStream/Kafka/ which is Proton-specific code

Please write user-readable short description of the changes:

Adds a new message_key_schema_name setting to Kafka external streams. When set, Proton decodes the Kafka message key using the Confluent Avro wire format (5-byte wire prefix: magic byte + 4-byte schema ID) and emits the decoded value as a JSON string into _tp_message_key.

Motivation: Producers that use Confluent Schema Registry commonly Avro-encode message keys, not just values. Reading _tp_message_key on such topics previously returned raw bytes.

Behaviour:

  • message_key_schema_name names an Avro schema subject in the schema registry
  • Requires kafka_schema_registry_url (validated at CREATE EXTERNAL STREAM time)
  • _tp_message_key column must be String or Nullable(String) in Avro key mode (validated at CREATE time)
  • Schema lookup is cached via the existing KafkaSchemaRegistryForAvro mechanism
  • On read: Avro binary key is decoded into a GenericDatum, then re-encoded as Avro JSON (not plain JSON — union types use Avro union encoding, bytes fields are base64). The result is stored as a string in _tp_message_key
  • On write: throws NOT_IMPLEMENTED if message_key_schema_name is set — Avro-encoding keys on write is not yet supported. Plain string keys via _tp_message_key without message_key_schema_name continue to work
  • Empty key (key_len == 0) returns NULL for Nullable(String) columns, empty string for String columns

Example:

CREATE EXTERNAL STREAM orders (
    _tp_message_key String,
    payload         String
) SETTINGS
    type                       = 'kafka',
    brokers                    = 'localhost:9092',
    topic                      = 'orders',
    data_format                = 'JSONEachRow',
    kafka_schema_registry_url  = 'http://localhost:8081',
    message_key_schema_name    = 'orders-key';

SELECT _tp_message_key FROM orders;
-- Returns Avro JSON string, e.g. {"user_id": 42, "tenant": "acme"}

Closes #915

Added a new setting `message_key_schema_name` to Kafka external streams.
When set, the Kafka message key is decoded using the Confluent Avro wire
format (5-byte prefix: 0x00 magic + 4-byte schema ID) against the named
subject in the configured schema registry, and the result is emitted as
a JSON string into the `_tp_message_key` virtual column.

Changes:
- ExternalStreamSettings.h: add `message_key_schema_name` String setting
- Kafka.cpp: validate setting requires `kafka_schema_registry_url`; restrict
  `_tp_message_key` column to String/FixedString when Avro key mode is active;
  build and store a `KafkaSchemaRegistryForAvro` for the key subject in the
  constructor; pass registry and subject name to each KafkaSource
- Kafka.h: add `avro_key_schema_registry` member; include KafkaSchemaRegistryForAvro
- KafkaSource.h: add `avro_key_schema_registry` and `avro_key_schema_subject`
  members; add constructor params; declare `decodeAvroKey()` private method
- KafkaSource.cpp: implement `decodeAvroKey()` using AvroDeserializer against a
  single String column header; branch both the typed (String/FixedString switch
  case) and untyped (RESERVED_MESSAGE_KEY fallback) virtual column paths to call
  `decodeAvroKey()` when the registry is present, raw bytes otherwise
@chenziliang chenziliang requested a review from yuzifeng1984 June 21, 2026 17:10
Comment thread src/Storages/ExternalStream/Kafka/KafkaSource.cpp Outdated
Comment thread src/Storages/ExternalStream/Kafka/KafkaSource.h Outdated
Comment thread src/Storages/ExternalStream/Kafka/KafkaSource.cpp Outdated
@yuzifeng1984

Copy link
Copy Markdown
Collaborator

Need implement KafkaSink related things to encode message key or throw an exception on write if currently not supported.

Comment thread src/Storages/ExternalStream/Kafka/Kafka.cpp Outdated
burnerlee and others added 4 commits June 25, 2026 09:56
- Return empty string (not decode) when key_len == 0 and column is non-nullable
- Remove unused avro_key_schema_subject param/member — schema ID comes from key bytes
- Use angle-bracket includes for Formats headers in Kafka.cpp

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Replace broken AvroDeserializer approach (which mapped Avro fields by
column name and always returned empty string) with GenericDatum round-trip:
binary Avro -> GenericDatum -> Avro JSON string.

Also fix include style in KafkaSource.h (angle brackets).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Writing _tp_message_key as Confluent Avro binary is not yet implemented —
only decoding on read is supported. Throw a clear error on write with
guidance to use a plain string key instead.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@burnerlee

Copy link
Copy Markdown
Contributor Author

@yuzifeng1984 how can I test my changes in local - can you share your testing setup?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Kafka External Stream to support read the Avro-encoded message key

2 participants