diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java
index 60b6932b61769..0d1c65f0f2f96 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java
@@ -25,11 +25,13 @@
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.utils.LogicalTypeCasts;
+import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -154,4 +156,71 @@ public interface SupportsReadingMetadata {
default boolean supportsMetadataProjection() {
return true;
}
+
+ /**
+ * Whether this source supports filtering on metadata columns.
+ *
+ *
When this method returns {@code true}, the planner may call {@link
+ * #applyMetadataFilters(List)} during optimization with predicates expressed in metadata key
+ * names (from {@link #listReadableMetadata()}), not SQL column aliases. Sources that do not
+ * override this method will not receive metadata filter predicates.
+ *
+ *
This is independent of {@link SupportsFilterPushDown}, which handles physical column
+ * predicates. A source can implement both to accept filters on physical and metadata columns.
+ */
+ default boolean supportsMetadataFilterPushDown() {
+ return false;
+ }
+
+ /**
+ * Provides a list of metadata filters in conjunctive form. A source can pick filters and return
+ * the accepted and remaining filters. Same contract as {@link
+ * SupportsFilterPushDown#applyFilters(List)}, but for metadata columns.
+ *
+ *
The provided filters reference metadata key names (from {@link #listReadableMetadata()}),
+ * not SQL column aliases. For example, a column declared as {@code msg_offset BIGINT METADATA
+ * FROM 'offset'} will have its predicate expressed as {@code offset >= 1000}, not {@code
+ * msg_offset >= 1000}. The planner handles the alias-to-key translation before calling this
+ * method.
+ */
+ default MetadataFilterResult applyMetadataFilters(List metadataFilters) {
+ return MetadataFilterResult.of(Collections.emptyList(), metadataFilters);
+ }
+
+ /**
+ * Result of a metadata filter push down. Communicates the source's response to the planner
+ * during optimization.
+ */
+ @PublicEvolving
+ final class MetadataFilterResult {
+ private final List acceptedFilters;
+ private final List remainingFilters;
+
+ private MetadataFilterResult(
+ List acceptedFilters,
+ List remainingFilters) {
+ this.acceptedFilters = acceptedFilters;
+ this.remainingFilters = remainingFilters;
+ }
+
+ /**
+ * Constructs a metadata filter push-down result.
+ *
+ * @param acceptedFilters filters consumed by the source (best effort)
+ * @param remainingFilters filters that a subsequent operation must still apply at runtime
+ */
+ public static MetadataFilterResult of(
+ List acceptedFilters,
+ List remainingFilters) {
+ return new MetadataFilterResult(acceptedFilters, remainingFilters);
+ }
+
+ public List getAcceptedFilters() {
+ return acceptedFilters;
+ }
+
+ public List getRemainingFilters() {
+ return remainingFilters;
+ }
+ }
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java
index d30fb1a4181c0..7a4e1481e04d3 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java
@@ -99,50 +99,9 @@ public static SupportsFilterPushDown.Result apply(
DynamicTableSource tableSource,
SourceAbilityContext context) {
if (tableSource instanceof SupportsFilterPushDown) {
- RexNodeToExpressionConverter converter =
- new RexNodeToExpressionConverter(
- new RexBuilder(context.getTypeFactory()),
- context.getSourceRowType().getFieldNames().toArray(new String[0]),
- context.getFunctionCatalog(),
- context.getCatalogManager(),
- Option.apply(
- context.getTypeFactory()
- .buildRelNodeRowType(context.getSourceRowType())));
- List filters =
- predicates.stream()
- .map(
- p -> {
- scala.Option expr = p.accept(converter);
- if (expr.isDefined()) {
- return expr.get();
- } else {
- throw new TableException(
- String.format(
- "%s can not be converted to Expression, please make sure %s can accept %s.",
- p.toString(),
- tableSource.getClass().getSimpleName(),
- p.toString()));
- }
- })
- .collect(Collectors.toList());
- ExpressionResolver resolver =
- ExpressionResolver.resolverFor(
- context.getTableConfig(),
- context.getClassLoader(),
- name -> Optional.empty(),
- context.getFunctionCatalog()
- .asLookup(
- str -> {
- throw new TableException(
- "We should not need to lookup any expressions at this point");
- }),
- context.getCatalogManager().getDataTypeFactory(),
- (sqlExpression, inputRowType, outputType) -> {
- throw new TableException(
- "SQL expression parsing is not supported at this location.");
- })
- .build();
- return ((SupportsFilterPushDown) tableSource).applyFilters(resolver.resolve(filters));
+ List resolved =
+ resolvePredicates(predicates, context.getSourceRowType(), tableSource, context);
+ return ((SupportsFilterPushDown) tableSource).applyFilters(resolved);
} else {
throw new TableException(
String.format(
@@ -151,6 +110,59 @@ public static SupportsFilterPushDown.Result apply(
}
}
+ /**
+ * Converts {@link RexNode} predicates to {@link ResolvedExpression}s using the given row type.
+ * Shared between physical and metadata filter push-down paths.
+ */
+ static List resolvePredicates(
+ List predicates,
+ RowType rowType,
+ DynamicTableSource tableSource,
+ SourceAbilityContext context) {
+ RexNodeToExpressionConverter converter =
+ new RexNodeToExpressionConverter(
+ new RexBuilder(context.getTypeFactory()),
+ rowType.getFieldNames().toArray(new String[0]),
+ context.getFunctionCatalog(),
+ context.getCatalogManager(),
+ Option.apply(context.getTypeFactory().buildRelNodeRowType(rowType)));
+ List filters =
+ predicates.stream()
+ .map(
+ p -> {
+ scala.Option expr = p.accept(converter);
+ if (expr.isDefined()) {
+ return expr.get();
+ } else {
+ throw new TableException(
+ String.format(
+ "%s can not be converted to Expression, please make sure %s can accept %s.",
+ p.toString(),
+ tableSource.getClass().getSimpleName(),
+ p.toString()));
+ }
+ })
+ .collect(Collectors.toList());
+ ExpressionResolver resolver =
+ ExpressionResolver.resolverFor(
+ context.getTableConfig(),
+ context.getClassLoader(),
+ name -> Optional.empty(),
+ context.getFunctionCatalog()
+ .asLookup(
+ str -> {
+ throw new TableException(
+ "We should not need to lookup any expressions at this point");
+ }),
+ context.getCatalogManager().getDataTypeFactory(),
+ (sqlExpression, inputRowType, outputType) -> {
+ throw new TableException(
+ "SQL expression parsing is not supported at this location.");
+ })
+ .build();
+ return resolver.resolve(filters);
+ }
+
@Override
public boolean needAdjustFieldReferenceAfterProjection() {
return true;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/MetadataFilterPushDownSpec.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/MetadataFilterPushDownSpec.java
new file mode 100644
index 0000000000000..090ab45fe27ac
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/MetadataFilterPushDownSpec.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.abilities.source;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata.MetadataFilterResult;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+
+import org.apache.calcite.rex.RexNode;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Serializes metadata filter predicates and replays them during compiled plan restoration.
+ *
+ * Predicates are stored with a {@code predicateRowType} that already uses metadata key names
+ * (not SQL aliases). The alias-to-key translation happens once at optimization time, so no
+ * column-to-key mapping needs to be persisted.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonTypeName("MetadataFilterPushDown")
+public final class MetadataFilterPushDownSpec extends SourceAbilitySpecBase {
+
+ public static final String FIELD_NAME_PREDICATES = "predicates";
+ public static final String FIELD_NAME_PREDICATE_ROW_TYPE = "predicateRowType";
+
+ @JsonProperty(FIELD_NAME_PREDICATES)
+ private final List predicates;
+
+ /**
+ * Row type snapshot using metadata key names. Stored because ProjectPushDownSpec may narrow the
+ * context's row type during restore.
+ */
+ @JsonProperty(FIELD_NAME_PREDICATE_ROW_TYPE)
+ private final RowType predicateRowType;
+
+ @JsonCreator
+ public MetadataFilterPushDownSpec(
+ @JsonProperty(FIELD_NAME_PREDICATES) List predicates,
+ @JsonProperty(FIELD_NAME_PREDICATE_ROW_TYPE) RowType predicateRowType) {
+ this.predicates = new ArrayList<>(checkNotNull(predicates));
+ this.predicateRowType = checkNotNull(predicateRowType);
+ }
+
+ public List getPredicates() {
+ return predicates;
+ }
+
+ @Override
+ public void apply(DynamicTableSource tableSource, SourceAbilityContext context) {
+ // Use stored predicateRowType; context's row type may be narrowed by ProjectPushDownSpec.
+ MetadataFilterResult result =
+ applyMetadataFilters(predicates, predicateRowType, tableSource, context);
+ if (result.getAcceptedFilters().size() != predicates.size()) {
+ throw new TableException("All metadata predicates should be accepted here.");
+ }
+ }
+
+ /**
+ * Converts RexNode predicates to ResolvedExpressions using the given row type and calls
+ * applyMetadataFilters on the source. The row type must already use metadata key names.
+ */
+ public static MetadataFilterResult applyMetadataFilters(
+ List predicates,
+ RowType metadataKeyRowType,
+ DynamicTableSource tableSource,
+ SourceAbilityContext context) {
+ if (!(tableSource instanceof SupportsReadingMetadata)) {
+ throw new TableException(
+ String.format(
+ "%s does not support SupportsReadingMetadata.",
+ tableSource.getClass().getName()));
+ }
+ SupportsReadingMetadata readingMetadata = (SupportsReadingMetadata) tableSource;
+ if (!readingMetadata.supportsMetadataFilterPushDown()) {
+ throw new TableException(
+ String.format(
+ "%s no longer supports metadata filter push-down.",
+ tableSource.getClass().getName()));
+ }
+ List resolved =
+ FilterPushDownSpec.resolvePredicates(
+ predicates, metadataKeyRowType, tableSource, context);
+ return readingMetadata.applyMetadataFilters(resolved);
+ }
+
+ @Override
+ public boolean needAdjustFieldReferenceAfterProjection() {
+ return true;
+ }
+
+ @Override
+ public String getDigests(SourceAbilityContext context) {
+ final List expressionStrs = new ArrayList<>();
+ for (RexNode rexNode : predicates) {
+ expressionStrs.add(
+ FlinkRexUtil.getExpressionString(
+ rexNode,
+ JavaScalaConversionUtil.toScala(predicateRowType.getFieldNames())));
+ }
+
+ return String.format(
+ "metadataFilter=[%s]",
+ expressionStrs.stream()
+ .reduce((l, r) -> String.format("and(%s, %s)", l, r))
+ .orElse(""));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ MetadataFilterPushDownSpec that = (MetadataFilterPushDownSpec) o;
+ return Objects.equals(predicates, that.predicates)
+ && Objects.equals(predicateRowType, that.predicateRowType);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), predicates, predicateRowType);
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilitySpec.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilitySpec.java
index e51328d5e9f25..20e594304a86b 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilitySpec.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilitySpec.java
@@ -38,6 +38,7 @@
@JsonSubTypes({
@JsonSubTypes.Type(value = FilterPushDownSpec.class),
@JsonSubTypes.Type(value = LimitPushDownSpec.class),
+ @JsonSubTypes.Type(value = MetadataFilterPushDownSpec.class),
@JsonSubTypes.Type(value = PartitionPushDownSpec.class),
@JsonSubTypes.Type(value = ProjectPushDownSpec.class),
@JsonSubTypes.Type(value = ReadingMetadataSpec.class),
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java
index 76e5a9b4a105f..eb5521ef78222 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java
@@ -17,23 +17,36 @@
package org.apache.flink.table.planner.plan.rules.logical;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata.MetadataFilterResult;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.planner.expressions.converter.ExpressionConverter;
import org.apache.flink.table.planner.plan.abilities.source.FilterPushDownSpec;
+import org.apache.flink.table.planner.plan.abilities.source.MetadataFilterPushDownSpec;
import org.apache.flink.table.planner.plan.abilities.source.SourceAbilityContext;
import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.RowType.RowField;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleOperand;
import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexVisitorImpl;
import org.apache.calcite.tools.RelBuilder;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
import scala.Tuple2;
@@ -60,15 +73,8 @@ protected RexNode createRemainingCondition(
}
/**
- * Resolves filters using the underlying sources {@link SupportsFilterPushDown} and creates a
- * new {@link TableSourceTable} with the supplied predicates.
- *
- * @param convertiblePredicates Predicates to resolve
- * @param oldTableSourceTable TableSourceTable to copy
- * @param scan Underlying table scan to push to
- * @param relBuilder Builder to push the scan to
- * @return A tuple, constituting of the resolved filters and the newly created {@link
- * TableSourceTable}
+ * Resolves filters via {@link SupportsFilterPushDown} and creates a new {@link
+ * TableSourceTable}.
*/
protected Tuple2
resolveFiltersAndCreateTableSourceTable(
@@ -102,17 +108,168 @@ protected RexNode createRemainingCondition(
return new Tuple2<>(result, newTableSourceTable);
}
- /**
- * Determines wether we can pushdown the filter into the source. we can not push filter twice,
- * make sure FilterPushDownSpec has not been assigned as a capability.
- *
- * @param tableSourceTable Table scan to attempt to push into
- * @return Whether we can push or not
- */
+ /** Whether filter push-down is possible and not already assigned. */
protected boolean canPushdownFilter(TableSourceTable tableSourceTable) {
return tableSourceTable != null
&& tableSourceTable.tableSource() instanceof SupportsFilterPushDown
&& Arrays.stream(tableSourceTable.abilitySpecs())
.noneMatch(spec -> spec instanceof FilterPushDownSpec);
}
+
+ /** Whether metadata filter push-down is possible and not already assigned. */
+ protected boolean canPushdownMetadataFilter(TableSourceTable tableSourceTable) {
+ if (tableSourceTable == null) {
+ return false;
+ }
+ DynamicTableSource source = tableSourceTable.tableSource();
+ if (!(source instanceof SupportsReadingMetadata)) {
+ return false;
+ }
+ if (!((SupportsReadingMetadata) source).supportsMetadataFilterPushDown()) {
+ return false;
+ }
+ return Arrays.stream(tableSourceTable.abilitySpecs())
+ .noneMatch(spec -> spec instanceof MetadataFilterPushDownSpec);
+ }
+
+ /**
+ * True if predicate references metadata columns exclusively (no physical columns).
+ *
+ * A predicate like {@code OR(physical_pred, metadata_pred)} returns false because it
+ * references both physical and metadata columns. Mixed predicates remain as runtime filters.
+ */
+ protected boolean referencesOnlyMetadataColumns(RexNode predicate, int physicalColumnCount) {
+ boolean[] saw = new boolean[2]; // [0] = sawPhysical, [1] = sawMetadata
+ predicate.accept(
+ new RexVisitorImpl(true) {
+ @Override
+ public Void visitInputRef(RexInputRef inputRef) {
+ if (inputRef.getIndex() >= physicalColumnCount) {
+ saw[1] = true;
+ } else {
+ saw[0] = true;
+ }
+ return null;
+ }
+ });
+ return saw[1] && !saw[0];
+ }
+
+ /** Number of physical columns in the scan's schema. */
+ protected int getPhysicalColumnCount(TableSourceTable tableSourceTable) {
+ ResolvedSchema schema = tableSourceTable.contextResolvedTable().getResolvedSchema();
+ return (int) schema.getColumns().stream().filter(Column::isPhysical).count();
+ }
+
+ /** Maps SQL column names to metadata keys for metadata columns. */
+ protected Map buildColumnToMetadataKeyMap(TableSourceTable tableSourceTable) {
+ ResolvedSchema schema = tableSourceTable.contextResolvedTable().getResolvedSchema();
+ Map mapping = new HashMap<>();
+ for (Column col : schema.getColumns()) {
+ if (col instanceof Column.MetadataColumn) {
+ Column.MetadataColumn metaCol = (Column.MetadataColumn) col;
+ String sqlName = metaCol.getName();
+ String metadataKey = metaCol.getMetadataKey().orElse(sqlName);
+ mapping.put(sqlName, metadataKey);
+ }
+ }
+ return mapping;
+ }
+
+ /** Resolves metadata filters and creates a new {@link TableSourceTable}. */
+ protected Tuple2
+ resolveMetadataFiltersAndCreateTableSourceTable(
+ RexNode[] metadataPredicates,
+ TableSourceTable oldTableSourceTable,
+ TableScan scan,
+ RelBuilder relBuilder) {
+ DynamicTableSource newTableSource = oldTableSourceTable.tableSource().copy();
+ SourceAbilityContext abilityContext = SourceAbilityContext.from(scan);
+
+ // Build a metadata-only row type (field names are metadata keys, not SQL aliases) and
+ // an old->new index mapping. Storing only metadata columns avoids name collisions with
+ // physical columns (e.g. `offset INT, msg_offset INT METADATA FROM 'offset'`).
+ MetadataRowInfo metadataRowInfo =
+ buildMetadataRowInfo(oldTableSourceTable, abilityContext.getSourceRowType());
+ RexNode[] remappedPredicates =
+ remapPredicates(metadataPredicates, metadataRowInfo.oldIndexToNewIndex);
+
+ MetadataFilterResult result =
+ MetadataFilterPushDownSpec.applyMetadataFilters(
+ Arrays.asList(remappedPredicates),
+ metadataRowInfo.metadataRowType,
+ newTableSource,
+ abilityContext);
+
+ int acceptedCount = result.getAcceptedFilters().size();
+ List acceptedRemappedPredicates = new ArrayList<>();
+ for (int i = 0; i < acceptedCount; i++) {
+ acceptedRemappedPredicates.add(remappedPredicates[i]);
+ }
+ MetadataFilterPushDownSpec metadataSpec =
+ new MetadataFilterPushDownSpec(
+ acceptedRemappedPredicates, metadataRowInfo.metadataRowType);
+
+ TableSourceTable newTableSourceTable =
+ oldTableSourceTable.copy(
+ newTableSource,
+ oldTableSourceTable.getStatistic(),
+ new SourceAbilitySpec[] {metadataSpec});
+
+ return new Tuple2<>(result, newTableSourceTable);
+ }
+
+ /**
+ * Builds a {@link RowType} containing only metadata columns (named by metadata key) together
+ * with an old-index-to-new-index mapping from the scan's source row type. Non-metadata
+ * positions are absent from the mapping.
+ */
+ private MetadataRowInfo buildMetadataRowInfo(
+ TableSourceTable tableSourceTable, RowType sourceRowType) {
+ Map columnToMetadataKey = buildColumnToMetadataKeyMap(tableSourceTable);
+ List metadataFields = new ArrayList<>();
+ Map oldToNewIndex = new HashMap<>();
+ for (int i = 0; i < sourceRowType.getFieldCount(); i++) {
+ String sqlName = sourceRowType.getFieldNames().get(i);
+ String metadataKey = columnToMetadataKey.get(sqlName);
+ if (metadataKey != null) {
+ oldToNewIndex.put(i, metadataFields.size());
+ metadataFields.add(new RowField(metadataKey, sourceRowType.getTypeAt(i)));
+ }
+ }
+ return new MetadataRowInfo(new RowType(false, metadataFields), oldToNewIndex);
+ }
+
+ /**
+ * Rewrites {@link RexInputRef}s in each predicate using the supplied old->new index map. Throws
+ * if a predicate references an index not in the map (would indicate a non-metadata reference
+ * slipped past {@link #referencesOnlyMetadataColumns}).
+ */
+ private RexNode[] remapPredicates(
+ RexNode[] predicates, Map oldIndexToNewIndex) {
+ RexShuttle shuttle =
+ new RexShuttle() {
+ @Override
+ public RexNode visitInputRef(RexInputRef inputRef) {
+ Integer newIdx = oldIndexToNewIndex.get(inputRef.getIndex());
+ if (newIdx == null) {
+ throw new IllegalStateException(
+ "Metadata predicate references non-metadata column index "
+ + inputRef.getIndex());
+ }
+ return new RexInputRef(newIdx, inputRef.getType());
+ }
+ };
+ return Arrays.stream(predicates).map(p -> p.accept(shuttle)).toArray(RexNode[]::new);
+ }
+
+ private static final class MetadataRowInfo {
+ final RowType metadataRowType;
+ final Map oldIndexToNewIndex;
+
+ MetadataRowInfo(RowType metadataRowType, Map oldIndexToNewIndex) {
+ this.metadataRowType = metadataRowType;
+ this.oldIndexToNewIndex = oldIndexToNewIndex;
+ }
+ }
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
index eaa6999cdd7fe..57d80b1b2f94c 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
@@ -19,7 +19,8 @@
package org.apache.flink.table.planner.plan.rules.logical;
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
-import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
+import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata.MetadataFilterResult;
import org.apache.flink.table.planner.plan.schema.TableSourceTable;
import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
@@ -29,11 +30,15 @@
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.tools.RelBuilder;
+import java.util.ArrayList;
+import java.util.List;
+
import scala.Tuple2;
/**
- * Planner rule that tries to push a filter into a {@link LogicalTableScan}, which table is a {@link
- * TableSourceTable}. And the table source in the table is a {@link SupportsFilterPushDown}.
+ * Pushes filters from a {@link Filter} into a {@link LogicalTableScan}. Physical filters use {@link
+ * SupportsFilterPushDown}; metadata filters use {@link
+ * SupportsReadingMetadata#applyMetadataFilters}.
*/
public class PushFilterIntoTableSourceScanRule extends PushFilterIntoSourceScanRuleBase {
public static final PushFilterIntoTableSourceScanRule INSTANCE =
@@ -59,7 +64,7 @@ public boolean matches(RelOptRuleCall call) {
LogicalTableScan scan = call.rel(1);
TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
- return canPushdownFilter(tableSourceTable);
+ return canPushdownFilter(tableSourceTable) || canPushdownMetadataFilter(tableSourceTable);
}
@Override
@@ -74,7 +79,7 @@ private void pushFilterIntoScan(
RelOptRuleCall call,
Filter filter,
LogicalTableScan scan,
- FlinkPreparingTableBase relOptTable) {
+ TableSourceTable tableSourceTable) {
RelBuilder relBuilder = call.builder();
Tuple2 extractedPredicates =
@@ -87,28 +92,77 @@ private void pushFilterIntoScan(
RexNode[] convertiblePredicates = extractedPredicates._1;
RexNode[] unconvertedPredicates = extractedPredicates._2;
if (convertiblePredicates.length == 0) {
- // no condition can be translated to expression
return;
}
- Tuple2 scanAfterPushdownWithResult =
- resolveFiltersAndCreateTableSourceTable(
- convertiblePredicates,
- relOptTable.unwrap(TableSourceTable.class),
- scan,
- relBuilder);
+ boolean supportsPhysicalFilter = canPushdownFilter(tableSourceTable);
+ boolean supportsMetadataFilter = canPushdownMetadataFilter(tableSourceTable);
+ int physicalColumnCount = getPhysicalColumnCount(tableSourceTable);
+
+ // Classify predicates: only separate metadata predicates when the source
+ // actually supports metadata filter push-down. Otherwise, all predicates
+ // go through the physical path to preserve the FilterPushDownSpec guard
+ // that prevents rule re-firing and maintains scan reuse invariants.
+ List physicalPredicates = new ArrayList<>();
+ List metadataPredicates = new ArrayList<>();
+ for (RexNode predicate : convertiblePredicates) {
+ if (supportsMetadataFilter
+ && referencesOnlyMetadataColumns(predicate, physicalColumnCount)) {
+ metadataPredicates.add(predicate);
+ } else {
+ physicalPredicates.add(predicate);
+ }
+ }
+
+ List allRemainingRexNodes = new ArrayList<>();
+ TableSourceTable currentTable = tableSourceTable;
+
+ if (!physicalPredicates.isEmpty() && supportsPhysicalFilter) {
+ Tuple2 physicalResult =
+ resolveFiltersAndCreateTableSourceTable(
+ physicalPredicates.toArray(new RexNode[0]),
+ currentTable,
+ scan,
+ relBuilder);
+ currentTable = physicalResult._2;
+ List physicalRemaining =
+ convertExpressionToRexNode(physicalResult._1.getRemainingFilters(), relBuilder);
+ allRemainingRexNodes.addAll(physicalRemaining);
+ } else {
+ allRemainingRexNodes.addAll(physicalPredicates);
+ }
- SupportsFilterPushDown.Result result = scanAfterPushdownWithResult._1;
- TableSourceTable tableSourceTable = scanAfterPushdownWithResult._2;
+ if (!metadataPredicates.isEmpty()) {
+ Tuple2 metadataResult =
+ resolveMetadataFiltersAndCreateTableSourceTable(
+ metadataPredicates.toArray(new RexNode[0]),
+ currentTable,
+ scan,
+ relBuilder);
+ currentTable = metadataResult._2;
+ // Remaining (rejected) metadata predicates stay as a LogicalFilter above
+ // the scan so they are still evaluated at runtime. We use the original
+ // RexNodes (suffix) because the remaining ResolvedExpressions use metadata
+ // key names, not SQL aliases needed by the Filter's row type. The
+ // validation in resolveMetadataFiltersAndCreateTableSourceTable ensures
+ // the partition invariant (accepted prefix + remaining suffix = input).
+ int acceptedCount = metadataResult._1.getAcceptedFilters().size();
+ for (int i = acceptedCount; i < metadataPredicates.size(); i++) {
+ allRemainingRexNodes.add(metadataPredicates.get(i));
+ }
+ }
+
+ for (RexNode unconverted : unconvertedPredicates) {
+ allRemainingRexNodes.add(unconverted);
+ }
LogicalTableScan newScan =
- LogicalTableScan.create(scan.getCluster(), tableSourceTable, scan.getHints());
- if (result.getRemainingFilters().isEmpty() && unconvertedPredicates.length == 0) {
+ LogicalTableScan.create(scan.getCluster(), currentTable, scan.getHints());
+
+ if (allRemainingRexNodes.isEmpty()) {
call.transformTo(newScan);
} else {
- RexNode remainingCondition =
- createRemainingCondition(
- relBuilder, result.getRemainingFilters(), unconvertedPredicates);
+ RexNode remainingCondition = relBuilder.and(allRemainingRexNodes);
RexNode simplifiedRemainingCondition =
FlinkRexUtil.simplify(
relBuilder.getRexBuilder(),
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
index 1e6b540d4b17e..d011687d82efd 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
@@ -440,6 +440,9 @@ private static RowKind parseRowKind(String rowKindShortString) {
private static final ConfigOption ENABLE_WATERMARK_PUSH_DOWN =
ConfigOptions.key("enable-watermark-push-down").booleanType().defaultValue(false);
+ private static final ConfigOption ENABLE_METADATA_FILTER_PUSH_DOWN =
+ ConfigOptions.key("enable-metadata-filter-push-down").booleanType().defaultValue(false);
+
private static final ConfigOption ENABLE_CUSTOM_SHUFFLE =
ConfigOptions.key("enable-custom-shuffle").booleanType().defaultValue(false);
@@ -574,6 +577,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
boolean enableAggregatePushDown = helper.getOptions().get(ENABLE_AGGREGATE_PUSH_DOWN);
boolean nestedProjectionSupported = helper.getOptions().get(NESTED_PROJECTION_SUPPORTED);
boolean enableWatermarkPushDown = helper.getOptions().get(ENABLE_WATERMARK_PUSH_DOWN);
+ boolean enableMetadataFilterPushDown =
+ helper.getOptions().get(ENABLE_METADATA_FILTER_PUSH_DOWN);
boolean failingSource = helper.getOptions().get(FAILING_SOURCE);
int numElementToSkip = helper.getOptions().get(SOURCE_NUM_ELEMENT_TO_SKIP);
boolean internalData = helper.getOptions().get(INTERNAL_DATA);
@@ -692,45 +697,51 @@ public DynamicTableSource createDynamicTableSource(Context context) {
if (disableLookup) {
if (enableWatermarkPushDown) {
- return new TestValuesScanTableSourceWithWatermarkPushDown(
- producedDataType,
- changelogMode,
- terminating,
- runtimeSource,
- failingSource,
- partition2Rows,
- context.getObjectIdentifier().getObjectName(),
- nestedProjectionSupported,
- null,
- Collections.emptyList(),
- filterableFieldsSet,
- dynamicFilteringFieldsSet,
- numElementToSkip,
- Long.MAX_VALUE,
- partitions,
- readableMetadata,
- null,
- enableAggregatePushDown);
+ TestValuesScanTableSourceWithWatermarkPushDown source =
+ new TestValuesScanTableSourceWithWatermarkPushDown(
+ producedDataType,
+ changelogMode,
+ terminating,
+ runtimeSource,
+ failingSource,
+ partition2Rows,
+ context.getObjectIdentifier().getObjectName(),
+ nestedProjectionSupported,
+ null,
+ Collections.emptyList(),
+ filterableFieldsSet,
+ dynamicFilteringFieldsSet,
+ numElementToSkip,
+ Long.MAX_VALUE,
+ partitions,
+ readableMetadata,
+ null,
+ enableAggregatePushDown);
+ source.setEnableMetadataFilterPushDown(enableMetadataFilterPushDown);
+ return source;
} else {
- return new TestValuesScanTableSource(
- producedDataType,
- changelogMode,
- boundedness,
- terminating,
- runtimeSource,
- failingSource,
- partition2Rows,
- nestedProjectionSupported,
- null,
- Collections.emptyList(),
- filterableFieldsSet,
- dynamicFilteringFieldsSet,
- numElementToSkip,
- Long.MAX_VALUE,
- partitions,
- readableMetadata,
- null,
- enableAggregatePushDown);
+ TestValuesScanTableSource source =
+ new TestValuesScanTableSource(
+ producedDataType,
+ changelogMode,
+ boundedness,
+ terminating,
+ runtimeSource,
+ failingSource,
+ partition2Rows,
+ nestedProjectionSupported,
+ null,
+ Collections.emptyList(),
+ filterableFieldsSet,
+ dynamicFilteringFieldsSet,
+ numElementToSkip,
+ Long.MAX_VALUE,
+ partitions,
+ readableMetadata,
+ null,
+ enableAggregatePushDown);
+ source.setEnableMetadataFilterPushDown(enableMetadataFilterPushDown);
+ return source;
}
} else {
Collection consumedData =
@@ -904,6 +915,7 @@ public Set> optionalOptions() {
SINK_CHANGELOG_MODE_ENFORCED,
WRITABLE_METADATA,
ENABLE_WATERMARK_PUSH_DOWN,
+ ENABLE_METADATA_FILTER_PUSH_DOWN,
SINK_DROP_LATE_EVENT,
SINK_BUCKET_COUNT_REQUIRED,
SINK_SUPPORTS_DELETE_BY_KEY,
@@ -1088,6 +1100,7 @@ private static class TestValuesScanTableSourceWithoutProjectionPushDown
protected final Map readableMetadata;
protected @Nullable int[] projectedMetadataFields;
protected final boolean enableAggregatePushDown;
+ protected boolean enableMetadataFilterPushDown;
private @Nullable int[] groupingSet;
private List aggregateExpressions;
@@ -1619,6 +1632,22 @@ public void applyReadableMetadata(
remainingMetadataKeys.stream().mapToInt(allMetadataKeys::indexOf).toArray();
}
+ @Override
+ public boolean supportsMetadataFilterPushDown() {
+ return enableMetadataFilterPushDown;
+ }
+
+ @Override
+ public SupportsReadingMetadata.MetadataFilterResult applyMetadataFilters(
+ List metadataFilters) {
+ return SupportsReadingMetadata.MetadataFilterResult.of(
+ metadataFilters, Collections.emptyList());
+ }
+
+ void setEnableMetadataFilterPushDown(boolean enable) {
+ this.enableMetadataFilterPushDown = enable;
+ }
+
@Override
public List listAcceptedFilterFields() {
return new ArrayList<>(dynamicFilteringFields);
@@ -1678,25 +1707,28 @@ private TestValuesScanTableSource(
@Override
public DynamicTableSource copy() {
- return new TestValuesScanTableSource(
- producedDataType,
- changelogMode,
- boundedness,
- terminating,
- runtimeSource,
- failingSource,
- data,
- nestedProjectionSupported,
- projectedPhysicalFields,
- filterPredicates,
- filterableFields,
- dynamicFilteringFields,
- numElementToSkip,
- limit,
- allPartitions,
- readableMetadata,
- projectedMetadataFields,
- enableAggregatePushDown);
+ TestValuesScanTableSource copy =
+ new TestValuesScanTableSource(
+ producedDataType,
+ changelogMode,
+ boundedness,
+ terminating,
+ runtimeSource,
+ failingSource,
+ data,
+ nestedProjectionSupported,
+ projectedPhysicalFields,
+ filterPredicates,
+ filterableFields,
+ dynamicFilteringFields,
+ numElementToSkip,
+ limit,
+ allPartitions,
+ readableMetadata,
+ projectedMetadataFields,
+ enableAggregatePushDown);
+ copy.setEnableMetadataFilterPushDown(enableMetadataFilterPushDown);
+ return copy;
}
@Override
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java
index 6fd5942f19e43..478c91eedcda3 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java
@@ -42,6 +42,7 @@
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.plan.abilities.source.FilterPushDownSpec;
import org.apache.flink.table.planner.plan.abilities.source.LimitPushDownSpec;
+import org.apache.flink.table.planner.plan.abilities.source.MetadataFilterPushDownSpec;
import org.apache.flink.table.planner.plan.abilities.source.PartitionPushDownSpec;
import org.apache.flink.table.planner.plan.abilities.source.ProjectPushDownSpec;
import org.apache.flink.table.planner.plan.abilities.source.ReadingMetadataSpec;
@@ -64,6 +65,7 @@
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.TimestampString;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.params.ParameterizedTest;
@@ -274,7 +276,71 @@ public static Stream testDynamicTableSinkSpecSerde() {
put("p", "B");
}
}))));
- return Stream.of(spec1, spec2);
+ Map options3 = new HashMap<>();
+ options3.put("connector", TestValuesTableFactory.IDENTIFIER);
+ options3.put("disable-lookup", "true");
+ options3.put("enable-metadata-filter-push-down", "true");
+ options3.put("bounded", "false");
+ options3.put("readable-metadata", "timestamp:TIMESTAMP(3), offset:BIGINT");
+
+ final ResolvedSchema resolvedSchema3 =
+ new ResolvedSchema(
+ Arrays.asList(
+ Column.physical("id", DataTypes.INT()),
+ Column.metadata(
+ "rowtime", DataTypes.TIMESTAMP(3), "timestamp", false),
+ Column.metadata("offset", DataTypes.BIGINT(), null, false)),
+ Collections.emptyList(),
+ null,
+ Collections.emptyList(),
+ null);
+
+ final CatalogTable catalogTable3 =
+ CatalogTable.newBuilder()
+ .schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema3).build())
+ .options(options3)
+ .build();
+
+ // predicateRowType uses metadata key names (already translated from SQL aliases).
+ RowType predicateRowType3 =
+ RowType.of(
+ new LogicalType[] {new TimestampType(3), new BigIntType()},
+ new String[] {"timestamp", "offset"});
+
+ DynamicTableSourceSpec spec3 =
+ new DynamicTableSourceSpec(
+ ContextResolvedTable.temporary(
+ ObjectIdentifier.of(
+ TableConfigOptions.TABLE_CATALOG_NAME.defaultValue(),
+ TableConfigOptions.TABLE_DATABASE_NAME.defaultValue(),
+ "MyTableMetadata"),
+ new ResolvedCatalogTable(catalogTable3, resolvedSchema3)),
+ Collections.singletonList(
+ new MetadataFilterPushDownSpec(
+ Arrays.asList(
+ // timestamp > '2024-01-01'
+ rexBuilder.makeCall(
+ SqlStdOperatorTable.GREATER_THAN,
+ rexBuilder.makeInputRef(
+ factory.createSqlType(
+ SqlTypeName.TIMESTAMP, 3),
+ 0),
+ rexBuilder.makeTimestampLiteral(
+ new TimestampString(
+ "2024-01-01 00:00:00"),
+ 3)),
+ // offset >= 10
+ rexBuilder.makeCall(
+ SqlStdOperatorTable.GREATER_THAN_OR_EQUAL,
+ rexBuilder.makeInputRef(
+ factory.createSqlType(
+ SqlTypeName.BIGINT),
+ 1),
+ rexBuilder.makeExactLiteral(
+ new BigDecimal(10)))),
+ predicateRowType3)));
+
+ return Stream.of(spec1, spec2, spec3);
}
@ParameterizedTest
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterInReadingMetadataTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterInReadingMetadataTest.java
new file mode 100644
index 0000000000000..ffd6aa0ea89f7
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterInReadingMetadataTest.java
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableDescriptor;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata.MetadataFilterResult;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.planner.calcite.CalciteConfig;
+import org.apache.flink.table.planner.factories.TableFactoryHarness;
+import org.apache.flink.table.planner.plan.optimize.program.BatchOptimizeContext;
+import org.apache.flink.table.planner.plan.optimize.program.FlinkBatchProgram;
+import org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgramBuilder;
+import org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE;
+import org.apache.flink.table.planner.utils.BatchTableTestUtil;
+import org.apache.flink.table.planner.utils.TableConfigUtils;
+import org.apache.flink.table.planner.utils.TableTestBase;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.testutils.junit.SharedObjectsExtension;
+import org.apache.flink.testutils.junit.SharedReference;
+
+import org.apache.calcite.plan.hep.HepMatchOrder;
+import org.apache.calcite.rel.rules.CoreRules;
+import org.apache.calcite.tools.RuleSets;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for metadata filter push-down through {@link SupportsReadingMetadata}. */
+class MetadataFilterInReadingMetadataTest extends TableTestBase {
+
+ @RegisterExtension
+ private final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();
+
+ private BatchTableTestUtil util;
+
+ @BeforeEach
+ void setup() {
+ util = batchTestUtil(TableConfig.getDefault());
+ util.buildBatchProgram(FlinkBatchProgram.DEFAULT_REWRITE());
+ CalciteConfig calciteConfig =
+ TableConfigUtils.getCalciteConfig(util.tableEnv().getConfig());
+ calciteConfig
+ .getBatchProgram()
+ .get()
+ .addLast(
+ "rules",
+ FlinkHepRuleSetProgramBuilder.newBuilder()
+ .setHepRulesExecutionType(
+ HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION())
+ .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+ .add(
+ RuleSets.ofList(
+ PushFilterIntoTableSourceScanRule.INSTANCE,
+ CoreRules.FILTER_PROJECT_TRANSPOSE))
+ .build());
+ }
+
+ @Test
+ void testMetadataFilterPushDown() {
+ SharedReference> receivedFilters =
+ sharedObjects.add(new ArrayList<>());
+ TableDescriptor descriptor =
+ TableFactoryHarness.newBuilder()
+ .schema(MetadataFilterSource.SCHEMA)
+ .source(new MetadataFilterSource(true, receivedFilters))
+ .build();
+ util.tableEnv().createTable("T1", descriptor);
+
+ util.verifyRelPlan("SELECT id FROM T1 WHERE event_time > TIMESTAMP '2024-01-01 00:00:00'");
+
+ assertThat(receivedFilters.get().toString())
+ .isEqualTo("[greaterThan(event_time, 2024-01-01T00:00)]");
+ }
+
+ @Test
+ void testMetadataFilterNotPushedWhenNotSupported() {
+ SharedReference> receivedFilters =
+ sharedObjects.add(new ArrayList<>());
+ TableDescriptor descriptor =
+ TableFactoryHarness.newBuilder()
+ .schema(MetadataFilterSource.SCHEMA)
+ .source(new MetadataFilterSource(false, receivedFilters))
+ .build();
+ util.tableEnv().createTable("T2", descriptor);
+
+ util.verifyRelPlan("SELECT id FROM T2 WHERE event_time > TIMESTAMP '2024-01-01 00:00:00'");
+
+ // No metadata filters should have been pushed
+ assertThat(receivedFilters.get()).isEmpty();
+ }
+
+ @Test
+ void testAliasedMetadataColumnFilter() {
+ SharedReference> receivedFilters =
+ sharedObjects.add(new ArrayList<>());
+ TableDescriptor descriptor =
+ TableFactoryHarness.newBuilder()
+ .schema(RenamedMetadataFilterSource.SCHEMA)
+ .source(new RenamedMetadataFilterSource(receivedFilters))
+ .build();
+ util.tableEnv().createTable("T3", descriptor);
+
+ // 'event_ts' is the SQL alias for metadata key 'timestamp'
+ util.verifyRelPlan("SELECT id FROM T3 WHERE event_ts > TIMESTAMP '2024-01-01 00:00:00'");
+
+ // The source should receive the filter with metadata key 'timestamp', not 'event_ts'.
+ assertThat(receivedFilters.get().toString())
+ .isEqualTo("[greaterThan(timestamp, 2024-01-01T00:00)]");
+ }
+
+ @Test
+ void testMixedPhysicalAndMetadataFilters() {
+ SharedReference> metadataFilters =
+ sharedObjects.add(new ArrayList<>());
+ SharedReference> physicalFilters =
+ sharedObjects.add(new ArrayList<>());
+ TableDescriptor descriptor =
+ TableFactoryHarness.newBuilder()
+ .schema(MixedFilterSource.SCHEMA)
+ .source(new MixedFilterSource(metadataFilters, physicalFilters))
+ .build();
+ util.tableEnv().createTable("T4", descriptor);
+
+ util.verifyRelPlan(
+ "SELECT id FROM T4 WHERE id > 10 AND event_time > TIMESTAMP '2024-01-01 00:00:00'");
+
+ // Verify routing: id > 10 → physical path, event_time > ... → metadata path.
+ assertThat(physicalFilters.get().toString()).isEqualTo("[greaterThan(id, 10)]");
+ assertThat(metadataFilters.get().toString())
+ .isEqualTo("[greaterThan(event_time, 2024-01-01T00:00)]");
+ }
+
+ @Test
+ void testPartialMetadataFilterAcceptance() {
+ SharedReference> receivedFilters =
+ sharedObjects.add(new ArrayList<>());
+ TableDescriptor descriptor =
+ TableFactoryHarness.newBuilder()
+ .schema(PartialMetadataFilterSource.SCHEMA)
+ .source(new PartialMetadataFilterSource(receivedFilters))
+ .build();
+ util.tableEnv().createTable("T6", descriptor);
+
+ // Two metadata filters: the source accepts only the first one
+ util.verifyRelPlan(
+ "SELECT id FROM T6 WHERE event_time > TIMESTAMP '2024-01-01 00:00:00'"
+ + " AND priority > 5");
+
+ // Source receives both filters; the XML reference verifies only the first is accepted
+ // (the second remains as a LogicalFilter above the scan).
+ assertThat(receivedFilters.get().toString())
+ .isEqualTo("[greaterThan(event_time, 2024-01-01T00:00), greaterThan(priority, 5)]");
+ }
+
+ @Test
+ void testPhysicalAndMetadataNameCollision() {
+ // Physical column 'offset' shares a name with the metadata key 'offset'
+ // (aliased in SQL as 'msg_offset'). The predicate on the metadata column
+ // must be pushed down using the metadata key, not confused with the
+ // physical column of the same name.
+ SharedReference> receivedFilters =
+ sharedObjects.add(new ArrayList<>());
+ TableDescriptor descriptor =
+ TableFactoryHarness.newBuilder()
+ .schema(CollidingNameSource.SCHEMA)
+ .source(new CollidingNameSource(receivedFilters))
+ .build();
+ util.tableEnv().createTable("T7", descriptor);
+
+ util.verifyRelPlan("SELECT id FROM T7 WHERE msg_offset > 5");
+
+ // Must reference the metadata key 'offset', NOT the SQL alias 'msg_offset'.
+ assertThat(receivedFilters.get().toString()).isEqualTo("[greaterThan(offset, 5)]");
+ }
+
+ @Test
+ void testMetadataFilterWithProjection() {
+ SharedReference> receivedFilters =
+ sharedObjects.add(new ArrayList<>());
+ TableDescriptor descriptor =
+ TableFactoryHarness.newBuilder()
+ .schema(MetadataFilterSource.SCHEMA)
+ .source(new MetadataFilterSource(true, receivedFilters))
+ .build();
+ util.tableEnv().createTable("T5", descriptor);
+
+ util.verifyRelPlan(
+ "SELECT id, name FROM T5 WHERE event_time > TIMESTAMP '2024-01-01 00:00:00'");
+
+ // Projection push-down must not perturb the metadata filter.
+ assertThat(receivedFilters.get().toString())
+ .isEqualTo("[greaterThan(event_time, 2024-01-01T00:00)]");
+ }
+
+ // -----------------------------------------------------------------------------------------
+ // Test sources
+ // -----------------------------------------------------------------------------------------
+
+ /** Supports metadata filter push-down. */
+ private static class MetadataFilterSource extends TableFactoryHarness.ScanSourceBase
+ implements SupportsReadingMetadata {
+
+ public static final Schema SCHEMA =
+ Schema.newBuilder()
+ .column("id", INT())
+ .column("name", STRING())
+ .columnByMetadata("event_time", TIMESTAMP(3))
+ .build();
+
+ private final boolean supportsMetadataFilter;
+ private final SharedReference> receivedMetadataFilters;
+
+ MetadataFilterSource(
+ boolean supportsMetadataFilter,
+ SharedReference> receivedMetadataFilters) {
+ this.supportsMetadataFilter = supportsMetadataFilter;
+ this.receivedMetadataFilters = receivedMetadataFilters;
+ }
+
+ @Override
+ public Map listReadableMetadata() {
+ Map metadata = new HashMap<>();
+ metadata.put("event_time", org.apache.flink.table.api.DataTypes.TIMESTAMP(3));
+ return metadata;
+ }
+
+ @Override
+ public void applyReadableMetadata(List metadataKeys, DataType producedDataType) {}
+
+ @Override
+ public boolean supportsMetadataFilterPushDown() {
+ return supportsMetadataFilter;
+ }
+
+ @Override
+ public MetadataFilterResult applyMetadataFilters(List metadataFilters) {
+ receivedMetadataFilters.get().addAll(metadataFilters);
+ return MetadataFilterResult.of(metadataFilters, Collections.emptyList());
+ }
+ }
+
+ /** Tests key translation when SQL alias differs from metadata key. */
+ private static class RenamedMetadataFilterSource extends TableFactoryHarness.ScanSourceBase
+ implements SupportsReadingMetadata {
+
+ public static final Schema SCHEMA =
+ Schema.newBuilder()
+ .column("id", INT())
+ .columnByMetadata("event_ts", TIMESTAMP(3), "timestamp")
+ .build();
+
+ private final SharedReference> receivedMetadataFilters;
+
+ RenamedMetadataFilterSource(
+ SharedReference> receivedMetadataFilters) {
+ this.receivedMetadataFilters = receivedMetadataFilters;
+ }
+
+ @Override
+ public Map listReadableMetadata() {
+ Map metadata = new HashMap<>();
+ metadata.put("timestamp", org.apache.flink.table.api.DataTypes.TIMESTAMP(3));
+ return metadata;
+ }
+
+ @Override
+ public void applyReadableMetadata(List metadataKeys, DataType producedDataType) {}
+
+ @Override
+ public boolean supportsMetadataFilterPushDown() {
+ return true;
+ }
+
+ @Override
+ public MetadataFilterResult applyMetadataFilters(List metadataFilters) {
+ receivedMetadataFilters.get().addAll(metadataFilters);
+ return MetadataFilterResult.of(metadataFilters, Collections.emptyList());
+ }
+ }
+
+ /** Accepts only the first metadata filter; rejected filters remain in plan. */
+ private static class PartialMetadataFilterSource extends TableFactoryHarness.ScanSourceBase
+ implements SupportsReadingMetadata {
+
+ public static final Schema SCHEMA =
+ Schema.newBuilder()
+ .column("id", INT())
+ .columnByMetadata("event_time", TIMESTAMP(3))
+ .columnByMetadata("priority", INT())
+ .build();
+
+ private final SharedReference> receivedMetadataFilters;
+
+ PartialMetadataFilterSource(
+ SharedReference> receivedMetadataFilters) {
+ this.receivedMetadataFilters = receivedMetadataFilters;
+ }
+
+ @Override
+ public Map listReadableMetadata() {
+ Map metadata = new HashMap<>();
+ metadata.put("event_time", TIMESTAMP(3));
+ metadata.put("priority", INT());
+ return metadata;
+ }
+
+ @Override
+ public void applyReadableMetadata(List metadataKeys, DataType producedDataType) {}
+
+ @Override
+ public boolean supportsMetadataFilterPushDown() {
+ return true;
+ }
+
+ @Override
+ public MetadataFilterResult applyMetadataFilters(List metadataFilters) {
+ receivedMetadataFilters.get().addAll(metadataFilters);
+ // Accept only the first filter
+ List accepted =
+ metadataFilters.isEmpty()
+ ? Collections.emptyList()
+ : Collections.singletonList(metadataFilters.get(0));
+ List remaining =
+ metadataFilters.size() > 1
+ ? metadataFilters.subList(1, metadataFilters.size())
+ : Collections.emptyList();
+ return MetadataFilterResult.of(accepted, remaining);
+ }
+ }
+
+ /** Tests mixed physical and metadata filter push-down. */
+ private static class MixedFilterSource extends TableFactoryHarness.ScanSourceBase
+ implements SupportsReadingMetadata, SupportsFilterPushDown {
+
+ public static final Schema SCHEMA =
+ Schema.newBuilder()
+ .column("id", INT())
+ .column("name", STRING())
+ .columnByMetadata("event_time", TIMESTAMP(3))
+ .build();
+
+ private final SharedReference> receivedMetadataFilters;
+ private final SharedReference> receivedPhysicalFilters;
+
+ MixedFilterSource(
+ SharedReference> receivedMetadataFilters,
+ SharedReference> receivedPhysicalFilters) {
+ this.receivedMetadataFilters = receivedMetadataFilters;
+ this.receivedPhysicalFilters = receivedPhysicalFilters;
+ }
+
+ @Override
+ public Map listReadableMetadata() {
+ Map metadata = new HashMap<>();
+ metadata.put("event_time", org.apache.flink.table.api.DataTypes.TIMESTAMP(3));
+ return metadata;
+ }
+
+ @Override
+ public void applyReadableMetadata(List metadataKeys, DataType producedDataType) {}
+
+ @Override
+ public boolean supportsMetadataFilterPushDown() {
+ return true;
+ }
+
+ @Override
+ public MetadataFilterResult applyMetadataFilters(List metadataFilters) {
+ receivedMetadataFilters.get().addAll(metadataFilters);
+ return MetadataFilterResult.of(metadataFilters, Collections.emptyList());
+ }
+
+ @Override
+ public Result applyFilters(List filters) {
+ receivedPhysicalFilters.get().addAll(filters);
+ return Result.of(filters, Collections.emptyList());
+ }
+ }
+
+ /**
+ * Physical column {@code offset} shares a name with the metadata key {@code offset} (SQL alias
+ * {@code msg_offset}). Exercises the physical-vs-metadata name collision case.
+ */
+ private static class CollidingNameSource extends TableFactoryHarness.ScanSourceBase
+ implements SupportsReadingMetadata {
+
+ public static final Schema SCHEMA =
+ Schema.newBuilder()
+ .column("id", INT())
+ .column("offset", INT())
+ .columnByMetadata("msg_offset", INT(), "offset")
+ .build();
+
+ private final SharedReference> receivedMetadataFilters;
+
+ CollidingNameSource(SharedReference> receivedMetadataFilters) {
+ this.receivedMetadataFilters = receivedMetadataFilters;
+ }
+
+ @Override
+ public Map listReadableMetadata() {
+ Map metadata = new HashMap<>();
+ metadata.put("offset", INT());
+ return metadata;
+ }
+
+ @Override
+ public void applyReadableMetadata(List metadataKeys, DataType producedDataType) {}
+
+ @Override
+ public boolean supportsMetadataFilterPushDown() {
+ return true;
+ }
+
+ @Override
+ public MetadataFilterResult applyMetadataFilters(List metadataFilters) {
+ receivedMetadataFilters.get().addAll(metadataFilters);
+ return MetadataFilterResult.of(metadataFilters, Collections.emptyList());
+ }
+ }
+}
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterInReadingMetadataTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterInReadingMetadataTest.xml
new file mode 100644
index 0000000000000..cef00b21d6948
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/MetadataFilterInReadingMetadataTest.xml
@@ -0,0 +1,161 @@
+
+
+
+
+
+ TIMESTAMP '2024-01-01 00:00:00']]>
+
+
+ ($1, 2024-01-01 00:00:00)])
+ +- LogicalProject(id=[$0], event_ts=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T3, metadata=[timestamp]]])
+]]>
+
+
+ (timestamp, 2024-01-01 00:00:00)]]])
+]]>
+
+
+
+
+ TIMESTAMP '2024-01-01 00:00:00']]>
+
+
+ ($2, 2024-01-01 00:00:00)])
+ +- LogicalProject(id=[$0], name=[$1], event_time=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T2, metadata=[event_time]]])
+]]>
+
+
+ ($2, 2024-01-01 00:00:00)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T2, metadata=[event_time]]])
+]]>
+
+
+
+
+ TIMESTAMP '2024-01-01 00:00:00']]>
+
+
+ ($2, 2024-01-01 00:00:00)])
+ +- LogicalProject(id=[$0], name=[$1], event_time=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T1, metadata=[event_time]]])
+]]>
+
+
+ (event_time, 2024-01-01 00:00:00)]]])
+]]>
+
+
+
+
+ TIMESTAMP '2024-01-01 00:00:00']]>
+
+
+ ($2, 2024-01-01 00:00:00)])
+ +- LogicalProject(id=[$0], name=[$1], event_time=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5, metadata=[event_time]]])
+]]>
+
+
+ (event_time, 2024-01-01 00:00:00)]]])
+]]>
+
+
+
+
+ 10 AND event_time > TIMESTAMP '2024-01-01 00:00:00']]>
+
+
+ ($0, 10), >($2, 2024-01-01 00:00:00))])
+ +- LogicalProject(id=[$0], name=[$1], event_time=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T4, metadata=[event_time]]])
+]]>
+
+
+ (id, 10)], metadataFilter=[>(event_time, 2024-01-01 00:00:00)]]])
+]]>
+
+
+
+
+ TIMESTAMP '2024-01-01 00:00:00' AND priority > 5]]>
+
+
+ ($1, 2024-01-01 00:00:00), >($2, 5))])
+ +- LogicalProject(id=[$0], event_time=[$2], priority=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T6, metadata=[priority, event_time]]])
+]]>
+
+
+ ($1, 5)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T6, metadata=[priority, event_time], metadataFilter=[>(event_time, 2024-01-01 00:00:00)]]])
+]]>
+
+
+
+
+ 5]]>
+
+
+ ($2, 5)])
+ +- LogicalProject(id=[$0], offset=[$1], msg_offset=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T7, metadata=[offset]]])
+]]>
+
+
+ (offset, 5)]]])
+]]>
+
+
+