-
Notifications
You must be signed in to change notification settings - Fork 1.3k
[core] Support partition predicate pushdown for PartitionsTable #7628
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 3 commits
1116648
6ab7f9c
e5185ab
a430947
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -36,27 +36,36 @@ | |
|
|
||
| /** | ||
| * Helper for applying partition predicate pushdown in system tables (BucketsTable, FilesTable, | ||
| * FileKeyRangesTable). | ||
| * FileKeyRangesTable, PartitionsTable). | ||
| */ | ||
| public class PartitionPredicateHelper { | ||
|
|
||
| public static boolean applyPartitionFilter( | ||
| SnapshotReader snapshotReader, | ||
| @Nullable LeafPredicate partitionPredicate, | ||
| List<String> partitionKeys, | ||
| RowType partitionType) { | ||
| if (partitionPredicate == null) { | ||
| return true; | ||
| } | ||
|
|
||
| /** | ||
| * Build a partition-typed predicate from a string-based leaf predicate on the "partition" | ||
| * column. | ||
| * | ||
| * @return the predicate on partition fields, or {@code null} if the partition spec is invalid | ||
| * (indicating no results should be returned) | ||
| */ | ||
| @Nullable | ||
| public static Predicate buildPartitionPredicate( | ||
| LeafPredicate partitionPredicate, List<String> partitionKeys, RowType partitionType) { | ||
| if (partitionPredicate.function() instanceof Equal) { | ||
| LinkedHashMap<String, String> partSpec = | ||
| parsePartitionSpec( | ||
| partitionPredicate.literals().get(0).toString(), partitionKeys); | ||
| if (partSpec == null) { | ||
| return false; | ||
| return null; | ||
| } | ||
| snapshotReader.withPartitionFilter(partSpec); | ||
| PredicateBuilder partBuilder = new PredicateBuilder(partitionType); | ||
| List<Predicate> predicates = new ArrayList<>(); | ||
| for (int i = 0; i < partitionKeys.size(); i++) { | ||
| Object value = | ||
| TypeUtils.castFromString( | ||
| partSpec.get(partitionKeys.get(i)), partitionType.getTypeAt(i)); | ||
| predicates.add(partBuilder.equal(i, value)); | ||
| } | ||
| return PredicateBuilder.and(predicates); | ||
| } else if (partitionPredicate.function() instanceof In) { | ||
| List<Predicate> orPredicates = new ArrayList<>(); | ||
| PredicateBuilder partBuilder = new PredicateBuilder(partitionType); | ||
|
|
@@ -75,51 +84,88 @@ public static boolean applyPartitionFilter( | |
| } | ||
| orPredicates.add(PredicateBuilder.and(andPredicates)); | ||
| } | ||
| if (!orPredicates.isEmpty()) { | ||
| snapshotReader.withPartitionFilter(PredicateBuilder.or(orPredicates)); | ||
| } | ||
| return orPredicates.isEmpty() ? null : PredicateBuilder.or(orPredicates); | ||
| } else if (partitionPredicate.function() instanceof LeafBinaryFunction) { | ||
| LinkedHashMap<String, String> partSpec = | ||
| parsePartitionSpec( | ||
| partitionPredicate.literals().get(0).toString(), partitionKeys); | ||
| if (partSpec != null) { | ||
| PredicateBuilder partBuilder = new PredicateBuilder(partitionType); | ||
| List<Predicate> predicates = new ArrayList<>(); | ||
| for (int i = 0; i < partitionKeys.size(); i++) { | ||
| Object value = | ||
| TypeUtils.castFromString( | ||
| partSpec.get(partitionKeys.get(i)), partitionType.getTypeAt(i)); | ||
| predicates.add( | ||
| new LeafPredicate( | ||
| partitionPredicate.function(), | ||
| partitionType.getTypeAt(i), | ||
| i, | ||
| partitionKeys.get(i), | ||
| Collections.singletonList(value))); | ||
| } | ||
| snapshotReader.withPartitionFilter(PredicateBuilder.and(predicates)); | ||
| if (partSpec == null) { | ||
| return null; | ||
| } | ||
| PredicateBuilder partBuilder = new PredicateBuilder(partitionType); | ||
| List<Predicate> predicates = new ArrayList<>(); | ||
| for (int i = 0; i < partitionKeys.size(); i++) { | ||
| Object value = | ||
| TypeUtils.castFromString( | ||
| partSpec.get(partitionKeys.get(i)), partitionType.getTypeAt(i)); | ||
| predicates.add( | ||
| new LeafPredicate( | ||
| partitionPredicate.function(), | ||
| partitionType.getTypeAt(i), | ||
| i, | ||
| partitionKeys.get(i), | ||
| Collections.singletonList(value))); | ||
| } | ||
| return PredicateBuilder.and(predicates); | ||
| } | ||
| return null; | ||
| } | ||
|
|
||
| public static boolean applyPartitionFilter( | ||
| SnapshotReader snapshotReader, | ||
| @Nullable LeafPredicate partitionPredicate, | ||
| List<String> partitionKeys, | ||
| RowType partitionType) { | ||
| if (partitionPredicate == null) { | ||
| return true; | ||
| } | ||
|
|
||
| Predicate predicate = | ||
| buildPartitionPredicate(partitionPredicate, partitionKeys, partitionType); | ||
| if (predicate == null) { | ||
| return false; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. false? No record will be returned? {@code null} if the predicate cannot be pushed down (unsupported predicate type or invalid partition spec.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch. The issue is that
Simply changing Fixed by using Added |
||
| } | ||
| snapshotReader.withPartitionFilter(predicate); | ||
| return true; | ||
| } | ||
|
|
||
| @Nullable | ||
| public static LinkedHashMap<String, String> parsePartitionSpec( | ||
| String partitionStr, List<String> partitionKeys) { | ||
| // Handle {value1, value2} format (BucketsTable, FilesTable, FileKeyRangesTable) | ||
| if (partitionStr.startsWith("{")) { | ||
| partitionStr = partitionStr.substring(1); | ||
| if (partitionStr.endsWith("}")) { | ||
| partitionStr = partitionStr.substring(0, partitionStr.length() - 1); | ||
| } | ||
| String[] partFields = partitionStr.split(", "); | ||
| if (partitionKeys.size() != partFields.length) { | ||
| return null; | ||
| } | ||
| LinkedHashMap<String, String> partSpec = new LinkedHashMap<>(); | ||
| for (int i = 0; i < partitionKeys.size(); i++) { | ||
| partSpec.put(partitionKeys.get(i), partFields[i]); | ||
| } | ||
| return partSpec; | ||
| } | ||
| if (partitionStr.endsWith("}")) { | ||
| partitionStr = partitionStr.substring(0, partitionStr.length() - 1); | ||
| } | ||
| String[] partFields = partitionStr.split(", "); | ||
|
|
||
| // Handle key=value/key=value format (PartitionsTable) | ||
| String[] partFields = partitionStr.split("/"); | ||
| if (partitionKeys.size() != partFields.length) { | ||
| return null; | ||
| } | ||
| LinkedHashMap<String, String> partSpec = new LinkedHashMap<>(); | ||
| for (int i = 0; i < partitionKeys.size(); i++) { | ||
| partSpec.put(partitionKeys.get(i), partFields[i]); | ||
| for (String field : partFields) { | ||
| int eqIndex = field.indexOf('='); | ||
| if (eqIndex < 0) { | ||
| return null; | ||
| } | ||
| partSpec.put(field.substring(0, eqIndex), field.substring(eqIndex + 1)); | ||
| } | ||
| for (String key : partitionKeys) { | ||
| if (!partSpec.containsKey(key)) { | ||
| return null; | ||
| } | ||
| } | ||
| return partSpec; | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should consider default partitions here. If the value is the default partition name, this should become
isNull(...)rather thanequal(...).