Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,13 @@ config_namespace! {
/// parquet reader setting. 0 means no caching.
pub max_predicate_cache_size: Option<usize>, default = None

/// (reading) If true, reverse scans produce exact descending order
/// by reversing rows within each row group. This allows the Sort
/// operator to be removed entirely and fetch/limit to be pushed
/// down to the scan. If false (default), reverse scans only reverse
/// row group order (inexact), keeping TopK above for final sorting.
pub enable_exact_reverse_scan: bool, default = false

// The following options affect writing to parquet files
// and map to parquet::file::properties::WriterProperties

Expand Down
4 changes: 4 additions & 0 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ impl ParquetOptions {
coerce_int96: _, // not used for writer props
skip_arrow_metadata: _,
max_predicate_cache_size: _,
enable_exact_reverse_scan: _, // reads not used for writer props
} = self;

let mut builder = WriterProperties::builder()
Expand Down Expand Up @@ -464,6 +465,7 @@ mod tests {
skip_arrow_metadata: defaults.skip_arrow_metadata,
coerce_int96: None,
max_predicate_cache_size: defaults.max_predicate_cache_size,
enable_exact_reverse_scan: defaults.enable_exact_reverse_scan,
}
}

Expand Down Expand Up @@ -578,6 +580,8 @@ mod tests {
binary_as_string: global_options_defaults.binary_as_string,
skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
coerce_int96: None,
enable_exact_reverse_scan: global_options_defaults
.enable_exact_reverse_scan,
},
column_specific_options,
key_value_metadata,
Expand Down
122 changes: 119 additions & 3 deletions datafusion/core/tests/physical_optimizer/pushdown_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ use std::sync::Arc;

use crate::physical_optimizer::test_utils::{
OptimizationTest, coalesce_batches_exec, coalesce_partitions_exec, parquet_exec,
parquet_exec_with_sort, projection_exec, projection_exec_with_alias,
repartition_exec, schema, simple_projection_exec, sort_exec, sort_exec_with_fetch,
sort_expr, sort_expr_named, test_scan_with_ordering,
parquet_exec_with_sort, parquet_exec_with_sort_exact_reverse, projection_exec,
projection_exec_with_alias, repartition_exec, schema, simple_projection_exec,
sort_exec, sort_exec_with_fetch, sort_expr, sort_expr_named, test_scan_with_ordering,
};

#[test]
Expand Down Expand Up @@ -1038,3 +1038,119 @@ fn test_sort_pushdown_with_test_scan_arbitrary_ordering() {
"
);
}

// ============================================================================
// EXACT REVERSE SCAN TESTS
// ============================================================================
// These tests verify behavior when exact_reverse is enabled on ParquetSource.
// With exact reverse, the Sort operator is removed entirely and fetch is pushed
// down to the scan.

#[test]
fn test_exact_reverse_removes_sort() {
// With exact_reverse=true, Sort should be removed entirely
let schema = schema();
let a = sort_expr("a", &schema);
let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap();
let source =
parquet_exec_with_sort_exact_reverse(schema.clone(), vec![source_ordering]);

let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap();
let plan = sort_exec(desc_ordering, source);

insta::assert_snapshot!(
OptimizationTest::new(plan, PushdownSort::new(), true),
@r"
OptimizationTest:
input:
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
output:
Ok:
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet, scan_direction=Reversed
"
);
}

#[test]
fn test_exact_reverse_with_fetch_pushes_limit() {
// With exact_reverse=true, Sort with fetch should be removed and fetch
// pushed down to the scan
let schema = schema();
let a = sort_expr("a", &schema);
let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap();
let source =
parquet_exec_with_sort_exact_reverse(schema.clone(), vec![source_ordering]);

let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap();
let plan = sort_exec_with_fetch(desc_ordering, Some(10), source);

insta::assert_snapshot!(
OptimizationTest::new(plan, PushdownSort::new(), true),
@r"
OptimizationTest:
input:
- SortExec: TopK(fetch=10), expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
output:
Ok:
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], limit=10, output_ordering=[a@0 ASC], file_type=parquet, scan_direction=Reversed
"
);
}

#[test]
fn test_exact_reverse_through_projection_with_fetch() {
// Exact reverse with fetch pushes through projection
let schema = schema();
let a = sort_expr("a", &schema);
let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap();
let source =
parquet_exec_with_sort_exact_reverse(schema.clone(), vec![source_ordering]);

let projection = simple_projection_exec(source, vec![0, 1]);

let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap();
let plan = sort_exec_with_fetch(desc_ordering, Some(5), projection);

insta::assert_snapshot!(
OptimizationTest::new(plan, PushdownSort::new(), true),
@r"
OptimizationTest:
input:
- SortExec: TopK(fetch=5), expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
- ProjectionExec: expr=[a@0 as a, b@1 as b]
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
output:
Ok:
- ProjectionExec: expr=[a@0 as a, b@1 as b]
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], limit=5, output_ordering=[a@0 ASC], file_type=parquet, scan_direction=Reversed
"
);
}

#[test]
fn test_exact_reverse_without_fetch_no_limit() {
// Exact reverse without fetch: Sort removed, no limit on scan
let schema = schema();
let a = sort_expr("a", &schema);
let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap();
let source =
parquet_exec_with_sort_exact_reverse(schema.clone(), vec![source_ordering]);

let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap();
let plan = sort_exec(desc_ordering, source); // no fetch

insta::assert_snapshot!(
OptimizationTest::new(plan, PushdownSort::new(), true),
@r"
OptimizationTest:
input:
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
output:
Ok:
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet, scan_direction=Reversed
"
);
}
17 changes: 17 additions & 0 deletions datafusion/core/tests/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,23 @@ pub(crate) fn parquet_exec_with_sort(
DataSourceExec::from_data_source(config)
}

/// Create a single parquet file that is sorted with exact_reverse enabled
pub(crate) fn parquet_exec_with_sort_exact_reverse(
schema: SchemaRef,
output_ordering: Vec<LexOrdering>,
) -> Arc<DataSourceExec> {
let source = ParquetSource::new(schema).with_exact_reverse(true);
let config = FileScanConfigBuilder::new(
ObjectStoreUrl::parse("test:///").unwrap(),
Arc::new(source),
)
.with_file(PartitionedFile::new("x".to_string(), 100))
.with_output_ordering(output_ordering)
.build();

DataSourceExec::from_data_source(config)
}

fn int64_stats() -> ColumnStatistics {
ColumnStatistics {
null_count: Precision::Absent,
Expand Down
Loading
Loading