Skip to content
Open
117 changes: 117 additions & 0 deletions datafusion/core/tests/parquet/dynamic_row_group_pruning.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! End-to-end test for **runtime row-group pruning** driven by a TopK
//! `SortExec`'s `DynamicFilterPhysicalExpr`.
//!
//! A 5-row-group parquet file is constructed with disjoint statistics on
//! the sort column (`v`): row group `i` contains values
//! `[i*100, (i+1)*100)`. The query `ORDER BY v DESC LIMIT 5` fills the
//! TopK heap from the row group with the largest values; the threshold
//! then proves the remaining row groups cannot contribute. The runtime
//! `RowGroupPruner` in the parquet scan must observe the tightened
//! threshold and increment `row_groups_pruned_dynamic_filter`.
//!
//! We assert a property (`pruned >= 1`) rather than an exact count
//! because batch-arrival timing affects how soon the TopK heap fills,
//! and we don't want this test to become flaky.

use std::sync::Arc;

use arrow::array::{ArrayRef, Int64Array, RecordBatch};
use arrow_schema::{DataType, Field, Schema};

use crate::parquet::Unit::RowGroup;
use crate::parquet::{ContextWithParquet, Scenario};

/// Build five `RecordBatch`es whose `v` column ranges are disjoint:
/// batch `i` carries `v` values `[i*100, (i+1)*100)`. When written with
/// `max_row_group_row_count = 100` each batch lands in its own row group.
fn build_five_disjoint_batches(schema: &Arc<Schema>) -> Vec<RecordBatch> {
(0..5i64)
.map(|rg| {
let base = rg * 100;
let values: Vec<i64> = (base..base + 100).collect();
let col: ArrayRef = Arc::new(Int64Array::from(values));
RecordBatch::try_new(Arc::clone(schema), vec![col]).unwrap()
})
.collect()
}

/// `ORDER BY v DESC LIMIT 5` against a 5-RG file with disjoint per-RG
/// stats must trigger runtime RG pruning: the first RG read fills the
/// heap, and the tightened threshold proves every other RG unreachable.
#[tokio::test]
async fn dynamic_rg_pruning_metric_fires_for_topk_descending_limit() {
let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int64, false)]));
let batches = build_five_disjoint_batches(&schema);

// `with_custom_data` honors the custom schema + batches and ignores
// `Scenario`. `Unit::RowGroup(100)` enables `pushdown_filters`, which
// is required for the TopK dynamic filter to reach the parquet scan.
let mut ctx = ContextWithParquet::with_custom_data(
Scenario::Int,
RowGroup(100),
Arc::clone(&schema),
batches,
)
.await;

let output = ctx.query("SELECT v FROM t ORDER BY v DESC LIMIT 5").await;

assert_eq!(output.result_rows, 5, "query must return LIMIT rows",);

let pruned = output
.row_groups_pruned_dynamic_filter()
.expect("`row_groups_pruned_dynamic_filter` metric must be registered");
assert!(
pruned >= 1,
"dynamic RG pruner must skip at least one row group; \
pruned={pruned}\n{}",
output.description(),
);
}

/// A query without ORDER BY does not produce a TopK and therefore no
/// `DynamicFilterPhysicalExpr` reaches the scan. The runtime pruner must
/// stay quiet — the metric should be 0.
#[tokio::test]
async fn dynamic_rg_pruning_metric_quiet_without_topk() {
let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int64, false)]));
let batches = build_five_disjoint_batches(&schema);

let mut ctx = ContextWithParquet::with_custom_data(
Scenario::Int,
RowGroup(100),
Arc::clone(&schema),
batches,
)
.await;

// Plain `SELECT *` — no sort, no limit, no dynamic filter.
let output = ctx.query("SELECT v FROM t").await;
assert_eq!(output.result_rows, 500);

let pruned = output.row_groups_pruned_dynamic_filter().unwrap_or(0);
assert_eq!(
pruned,
0,
"without TopK there is no dynamic filter, so the runtime pruner \
must not fire; pruned={pruned}\n{}",
output.description(),
);
}
8 changes: 8 additions & 0 deletions datafusion/core/tests/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use tempfile::NamedTempFile;

mod content_defined_chunking;
mod custom_reader;
mod dynamic_row_group_pruning;
#[cfg(feature = "parquet_encryption")]
mod encryption;
mod expr_adapter;
Expand Down Expand Up @@ -259,6 +260,13 @@ impl TestOutput {
.map(|pm| pm.total_pruned())
}

/// The number of row groups pruned at runtime by the dynamic
/// row-group pruner (e.g. driven by a TopK `SortExec` threshold
/// pushed down via `DynamicFilterPhysicalExpr`).
fn row_groups_pruned_dynamic_filter(&self) -> Option<usize> {
self.metric_value("row_groups_pruned_dynamic_filter")
}

fn description(&self) -> String {
format!(
"Input:\n{}\nQuery:\n{}\nOutput:\n{}\nMetrics:\n{}",
Expand Down
26 changes: 21 additions & 5 deletions datafusion/datasource-parquet/src/access_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,8 +406,19 @@ impl ParquetAccessPlan {

/// Split this plan into consecutive row group runs that share the same row
/// filter requirement.
pub(crate) fn split_runs(self, needs_filter: bool) -> Vec<RowGroupRun> {
if !needs_filter || !self.has_fully_matched() {
///
/// When `force_per_row_group` is set, every scanned row group becomes its
/// own run regardless of the fully-matched classification. Callers should
/// turn this on when they intend to apply runtime-evaluated pruning
/// between decoders (e.g. dynamic-filter-driven RG pruning), since the
/// default grouping would coalesce away the inter-RG transition points
/// the pruner depends on.
pub(crate) fn split_runs(
self,
needs_filter: bool,
force_per_row_group: bool,
) -> Vec<RowGroupRun> {
if !force_per_row_group && (!needs_filter || !self.has_fully_matched()) {
return vec![RowGroupRun::new(needs_filter, self)];
}

Expand All @@ -424,9 +435,14 @@ impl ParquetAccessPlan {
}

let row_group_needs_filter = !fully_matched;
if let Some(run) = runs
.last_mut()
.filter(|run| run.needs_filter == row_group_needs_filter)
// Coalesce consecutive RGs into a run only when (a) they share
// the same filter requirement and (b) we're not forcing per-RG
// splitting for runtime pruning.
let can_coalesce = !force_per_row_group;
if can_coalesce
&& let Some(run) = runs
.last_mut()
.filter(|run| run.needs_filter == row_group_needs_filter)
{
run.access_plan.set(idx, access);
if fully_matched {
Expand Down
14 changes: 14 additions & 0 deletions datafusion/datasource-parquet/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ pub struct ParquetFileMetrics {
pub limit_pruned_row_groups: PruningMetrics,
/// Number of row groups pruned by statistics
pub row_groups_pruned_statistics: PruningMetrics,
/// Number of row groups pruned at runtime by a dynamic predicate
/// (e.g. the threshold expression a TopK `SortExec` pushes down).
///
/// Unlike [`Self::row_groups_pruned_statistics`], which is decided once
/// at access-plan time, this counter reflects row groups that survived
/// the initial pruning but were proved unreachable mid-scan after the
/// dynamic filter tightened.
pub row_groups_pruned_dynamic_filter: Count,
/// Total number of bytes scanned
pub bytes_scanned: Count,
/// Total rows filtered out by predicates pushed into parquet scan
Expand Down Expand Up @@ -198,6 +206,11 @@ impl ParquetFileMetrics {
.with_category(MetricCategory::Rows)
.gauge("predicate_cache_records", partition);

let row_groups_pruned_dynamic_filter = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.with_type(MetricType::Summary)
.counter("row_groups_pruned_dynamic_filter", partition);

Self {
files_ranges_pruned_statistics,
predicate_evaluation_errors,
Expand All @@ -217,6 +230,7 @@ impl ParquetFileMetrics {
scan_efficiency_ratio,
predicate_cache_inner_records,
predicate_cache_records,
row_groups_pruned_dynamic_filter,
}
}

Expand Down
89 changes: 76 additions & 13 deletions datafusion/datasource-parquet/src/opener/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ use self::encryption::EncryptionContext;
use crate::access_plan::PreparedAccessPlan;
use crate::decoder_projection::DecoderProjection;
use crate::page_filter::PagePruningAccessPlanFilter;
use crate::push_decoder::{DecoderBuilderConfig, PushDecoderStreamState};
use crate::push_decoder::{
DecoderBuilderConfig, PendingDecoderRun, PushDecoderStreamState, RowGroupPruner,
};
use crate::row_filter::RowFilterGenerator;
use crate::row_group_filter::{BloomFilterStatistics, RowGroupAccessPlanFilter};
use crate::{
Expand All @@ -50,6 +52,7 @@ use datafusion_common::encryption::FileDecryptionProperties;
use datafusion_common::stats::Precision;
use datafusion_common::{ColumnStatistics, Result, ScalarValue, Statistics, exec_err};
use datafusion_datasource::{PartitionedFile, TableSchema};
use datafusion_physical_expr::expressions::DynamicFilterTracking;
use datafusion_physical_expr::simplifier::PhysicalExprSimplifier;
use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
Expand Down Expand Up @@ -1158,7 +1161,7 @@ impl RowGroupsPrunedParquetOpen {
&prepared.output_schema,
)?;

let (decoder, pending_decoders, remaining_limit) = {
let (decoder, pending_runs, remaining_limit) = {
let pushdown_predicate = prepared
.pushdown_filters
.then_some(prepared.predicate.as_ref())
Expand All @@ -1171,11 +1174,41 @@ impl RowGroupsPrunedParquetOpen {
&prepared.file_metrics,
);

// If the scan carries a *dynamic* predicate that may still
// tighten during the scan (e.g. the threshold expression a
// TopK `SortExec` pushes down), force one decoder per row
// group so the runtime row-group pruner has a transition
// point at every RG boundary. Without this, a TopK query
// whose initial predicate is `lit(true)` collapses to a
// single run covering every RG, hiding the inter-RG hook
// the pruner needs.
//
// Dynamic filters that are *already complete* at scan-startup
// (the typical hash-join case, where the build side
// completed before the probe scan started) gain nothing
// from per-RG splitting: the static `PruningPredicate` path
// at file-open has already used the final filter value, so
// every subsequent pruner check returns the same answer.
// We pay a per-RG decoder allocation cost for no gain. Gate
// the splitting on "predicate is dynamic AND not yet
// complete" so already-complete filters fall through to the
// existing single-run path.
let force_per_rg_runs = prepared.predicate.as_ref().is_some_and(|p| {
// `Watching` is the variant that holds at least one
// still-incomplete dynamic filter — exactly the
// "dynamic AND not yet complete" gate the comment above
// describes.
matches!(
DynamicFilterTracking::classify(p),
DynamicFilterTracking::Watching(_)
)
});
// Split into consecutive runs of row groups that share the same filter
// requirement. Fully matched row groups skip the RowFilter; others need it.
// Reverse the run order for reverse scans so the combined decoder stream
// preserves the requested global row group order.
let mut runs = access_plan.split_runs(row_filter_generator.has_row_filter());
let mut runs = access_plan
.split_runs(row_filter_generator.has_row_filter(), force_per_rg_runs);
if prepared.reverse_row_groups {
runs.reverse();
}
Expand All @@ -1191,9 +1224,13 @@ impl RowGroupsPrunedParquetOpen {
decoder_limit,
};

// Build a decoder per run.
let mut decoders = VecDeque::with_capacity(runs.len());
// Build a decoder per run, tagging each with the row group
// indices it will scan so downstream consumers (e.g. dynamic
// RG-level pruning) can decide to skip a run at runtime.
let mut decoders: VecDeque<PendingDecoderRun> =
VecDeque::with_capacity(runs.len());
for run in runs {
let row_group_indices = run.access_plan.row_group_indexes();
let prepared_access_plan = prepare_access_plan(run.access_plan)?;
let mut builder =
decoder_config.build(prepared_access_plan, reader_metadata.clone());
Expand All @@ -1208,13 +1245,16 @@ impl RowGroupsPrunedParquetOpen {
.with_max_predicate_cache_size(max_predicate_cache_size);
}
}
decoders.push_back(builder.build()?);
decoders.push_back(PendingDecoderRun {
decoder: builder.build()?,
row_group_indices,
});
}

let decoder = decoders
let first = decoders
.pop_front()
.expect("at least one decoder must be created");
(decoder, decoders, remaining_limit)
(first.decoder, decoders, remaining_limit)
};

let predicate_cache_inner_records =
Expand All @@ -1224,16 +1264,39 @@ impl RowGroupsPrunedParquetOpen {

let files_ranges_pruned_statistics =
prepared.file_metrics.files_ranges_pruned_statistics.clone();

// Build a dynamic row-group pruner only when both halves are useful:
// 1) the scan has a predicate (so there is something to evaluate),
// 2) there is at least one pending run that could be skipped.
// The pruner consults the predicate's `snapshot_generation` so the
// cost is one rebuild per dynamic-filter update, not per RG check.
let row_group_pruner = match (&prepared.predicate, pending_runs.is_empty()) {
(Some(predicate), false) => Some(RowGroupPruner::new(
Arc::clone(predicate),
Arc::clone(&prepared.physical_file_schema),
Arc::clone(reader_metadata.metadata()),
prepared.predicate_creation_errors.clone(),
prepared.file_metrics.predicate_evaluation_errors.clone(),
Comment thread
zhuqi-lucas marked this conversation as resolved.
)),
_ => None,
};
let row_groups_pruned_dynamic = prepared
.file_metrics
.row_groups_pruned_dynamic_filter
.clone();

let stream = PushDecoderStreamState {
decoder,
pending_decoders,
pending_runs,
remaining_limit,
reader: prepared.async_file_reader,
decoder_projection,
arrow_reader_metrics,
predicate_cache_inner_records,
predicate_cache_records,
baseline_metrics: prepared.baseline_metrics,
row_group_pruner,
row_groups_pruned_dynamic,
}
.into_stream();

Expand Down Expand Up @@ -2594,7 +2657,7 @@ mod test {
RowGroupAccess::Scan,
RowGroupAccess::Scan,
]);
let runs = plan.split_runs(true);
let runs = plan.split_runs(true, false);
assert_eq!(runs.len(), 1);
assert!(runs[0].needs_filter);
assert_eq!(runs[0].access_plan.row_group_indexes(), vec![0, 1, 2]);
Expand All @@ -2612,7 +2675,7 @@ mod test {
plan.mark_fully_matched(1);
plan.mark_fully_matched(2);

let runs = plan.split_runs(true);
let runs = plan.split_runs(true, false);
assert_eq!(runs.len(), 1);
assert!(!runs[0].needs_filter);
assert_eq!(runs[0].access_plan.row_group_indexes(), vec![0, 1, 2]);
Expand All @@ -2632,7 +2695,7 @@ mod test {
plan.mark_fully_matched(2);
plan.mark_fully_matched(4);

let runs = plan.split_runs(true);
let runs = plan.split_runs(true, false);
assert_eq!(runs.len(), 4);

assert!(runs[0].needs_filter);
Expand All @@ -2659,7 +2722,7 @@ mod test {
]);
plan.mark_fully_matched(2);

let runs = plan.split_runs(true);
let runs = plan.split_runs(true, false);
assert_eq!(runs.len(), 3);

assert!(runs[0].needs_filter);
Expand Down
Loading
Loading