Skip to content

[FLINK-39421] Metadata filter push-down for table sources#27913

Merged
twalthr merged 1 commit intoapache:masterfrom
confluentinc:metadata-filter-pushdown
Apr 24, 2026
Merged

[FLINK-39421] Metadata filter push-down for table sources#27913
twalthr merged 1 commit intoapache:masterfrom
confluentinc:metadata-filter-pushdown

Conversation

@jnh5y
Copy link
Copy Markdown
Contributor

@jnh5y jnh5y commented Apr 10, 2026

Add dedicated metadata filter push-down path through SupportsReadingMetadata. Metadata predicates are classified separately from physical predicates and pushed via applyMetadataFilters() with a dedicated MetadataFilterResult type.

MetadataFilterPushDownSpec stores a metadata-only predicateRowType whose field names are metadata keys (not SQL aliases). Physical columns are not included, which avoids name collisions with metadata keys (e.g. offset INT, msg_offset INT METADATA FROM 'offset'). Predicate RexInputRefs are remapped to index into the metadata-only row during rule application, so the serialized spec is self-contained and does not need a column-to-key map.

Generated-by: Claude Code

What is the purpose of the change

Predicates on metadata columns (e.g., Kafka offset, timestamp, partition) cannot be pushed through the existing SupportsFilterPushDown path because FilterPushDownSpec's serialized RexInputRef indices break during compiled plan restoration when ProjectPushDownSpec narrows the row type. This PR adds a dedicated metadata filter push-down path that solves the serialization problem and enables metadata-aware source optimizations. See FLIP-574 for the full design.

Brief change log

  • Extended SupportsReadingMetadata with supportsMetadataFilterPushDown() and applyMetadataFilters() default methods and a MetadataFilterResult type
  • Added MetadataFilterPushDownSpec that stores a metadata-only predicate row type (field names are metadata keys) and re-verifies supportsMetadataFilterPushDown() on plan restore
  • Extended PushFilterIntoTableSourceScanRule to classify predicates as physical, metadata, or mixed, with a two-path push-down flow
  • PushFilterIntoSourceScanRuleBase builds a metadata-only row type and RexShuttle-remaps RexInputRef indices so the stored predicates reference only metadata columns — avoids field-name collisions with physical columns of the same name
  • Extracted FilterPushDownSpec.resolvePredicates() as a package-private helper shared between the physical and metadata paths

Verifying this change

This change added tests and can be verified as follows:

  • Added MetadataFilterInReadingMetadataTest with 7 planner rule tests: basic push-down, opt-out when unsupported, aliased metadata keys, mixed physical+metadata separation, partial acceptance, interaction with projection push-down, and physical-vs-metadata field name collision
  • Added a MetadataFilterPushDownSpec entry to the existing parameterized testDynamicTableSinkSpecSerde list in DynamicTableSourceSpecSerdeTest (spec3) to verify serde round-trip and full source apply
  • Added an enable-metadata-filter-push-down option to TestValuesTableFactory (mirrors enable-watermark-push-down) to gate the new capability in tests
  • Existing physical filter push-down tests continue to pass unchanged

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): yes — new default methods on @PublicEvolving interface SupportsReadingMetadata
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? JavaDocs

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Apr 10, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@airlock-confluentinc airlock-confluentinc Bot force-pushed the metadata-filter-pushdown branch from 2b6f73d to f96af04 Compare April 14, 2026 01:57
Copy link
Copy Markdown
Contributor

@twalthr twalthr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @jnh5y. Overall the PR is already in a good shape, I just found one major issue that needs clarification around field name collisions.

String.format(
"%s does not support SupportsReadingMetadata.",
tableSource.getClass().getName()));
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should also check whether the source still returns supportsMetadataFilterPushDown = true.

Option.apply(
context.getTypeFactory().buildRelNodeRowType(metadataKeyRowType)));

List<Expression> filters =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to avoid code deduplication, feel free to create a package visible helper method in FilterPushDownSpec and reuse it here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Added resolvePredicates.

return new Tuple2<>(result, newTableSourceTable);
}

/** Replaces SQL alias names with metadata key names in the RowType. */
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a change that fields can collide?
CREATE TABLE t (offset INT, msg_offset INT METADATA FROM 'offset')

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should most likely only store metadata columns in this row, not combined with physical.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I've addressed this and added a test for it.

@airlock-confluentinc airlock-confluentinc Bot force-pushed the metadata-filter-pushdown branch 2 times, most recently from a4306b8 to 772158d Compare April 21, 2026 14:10
@jnh5y
Copy link
Copy Markdown
Contributor Author

jnh5y commented Apr 21, 2026

@twalthr thanks for the review. I used two separate force pushes to rebase on the latest master and then address comments in https://github.com/apache/flink/compare/a4306b81c155ef4793cacbfe843ea641e0b3b060..772158d6d27dfb0e09d402379ac6cab64d40d0fe. That diff should make reviewing the changes easier (if it helps).

Copy link
Copy Markdown
Contributor

@twalthr twalthr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update @jnh5y. I found one last code improvement. Should be good in the next iteration.

Comment on lines +421 to +428
PlannerMocks plannerMocks = PlannerMocks.create();
SerdeContext serdeCtx =
configuredSerdeContext(
plannerMocks.getCatalogManager(), plannerMocks.getTableConfig());

String json = toJson(serdeCtx, original);
MetadataFilterPushDownSpec deserialized =
toObject(serdeCtx, json, MetadataFilterPushDownSpec.class);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to avoid code deduplication, add your test case to the existing testDynamicTableSinkSpecSerde list of serde items

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, doing that. In the process, I've extended the TestValues source to make that possible.

Copy link
Copy Markdown
Contributor

@twalthr twalthr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @jnh5y.

@airlock-confluentinc airlock-confluentinc Bot force-pushed the metadata-filter-pushdown branch 2 times, most recently from ec8b483 to d5e344e Compare April 23, 2026 21:39
Add a dedicated metadata filter push-down path through SupportsReadingMetadata.
Metadata predicates are classified separately from physical predicates and
pushed via applyMetadataFilters() with a dedicated MetadataFilterResult type.

MetadataFilterPushDownSpec stores a metadata-only predicateRowType whose
field names are metadata keys (not SQL aliases). Physical columns are not
included, which avoids name collisions with metadata keys (e.g.
`offset INT, msg_offset INT METADATA FROM 'offset'`). Predicate RexInputRefs
are remapped to index into the metadata-only row during rule application, so
the serialized spec is self-contained and does not need a column-to-key map.

A package-private FilterPushDownSpec.resolvePredicates() helper handles the
RexNode -> ResolvedExpression conversion shared between the physical and
metadata paths.

On plan restoration, applyMetadataFilters() re-verifies both that the source
is still a SupportsReadingMetadata AND that it still reports
supportsMetadataFilterPushDown() = true.

Tests assert on the exact ResolvedExpression list toString (matching the
convention in DeletePushDownUtilsTest), including a testPhysicalAndMetadataNameCollision
case that covers the physical-vs-metadata-key field name collision scenario.

Generated-by: Claude Code (claude-opus-4-6)
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@airlock-confluentinc airlock-confluentinc Bot force-pushed the metadata-filter-pushdown branch from d5e344e to eb1b6fc Compare April 24, 2026 01:06
@twalthr twalthr merged commit 92e67cf into apache:master Apr 24, 2026
@twalthr twalthr deleted the metadata-filter-pushdown branch April 24, 2026 13:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants