Skip to content

Add exact reverse scan support to PushdownSort with limit-after-reverse fix#52

Open
zhuqi-lucas wants to merge 10 commits intobranch-52from
exact-reverse-scan
Open

Add exact reverse scan support to PushdownSort with limit-after-reverse fix#52
zhuqi-lucas wants to merge 10 commits intobranch-52from
exact-reverse-scan

Conversation

@zhuqi-lucas
Copy link
Copy Markdown
Collaborator

@zhuqi-lucas zhuqi-lucas commented Apr 23, 2026

Reopened from #47 (accidentally merged during branch-52 rebase). cc @xudong963

Same content — exact reverse scan with per-RG independent reading, proto roundtrip fix, multi-RG tests.

When exact_reverse is enabled via with_exact_reverse(true) on ParquetSource:
- try_reverse_output returns Exact (Sort operator removed, fetch pushdown enabled)
- ReversedRowGroupStream buffers batches per row group, reverses batch order,
  and reverses rows within each batch. Memory: O(largest_row_group).
- Default (exact_reverse=false) returns Inexact (backward compatible)

Row reversal is done in ReversedRowGroupStream (per-RG buffer), NOT in the
per-batch map closure. This ensures correct ordering across batch boundaries
within a row group.

Tests added:
- test_exact_reverse_scan_per_rg_buffer: multi-RG, small batch_size, verifies [6,5,4,3,2,1]
- test_inexact_reverse_scan_preserves_row_order: verifies [4,5,6,1,2,3]
- test_reversed_row_group_stream_standalone: unit test for ReversedRowGroupStream
- test_exact_reverse_returns_exact: option returns Exact
- test_default_returns_inexact: default returns Inexact

Based on the approach from apache#18817.
…lay for exact reverse scan

- Add `enable_exact_reverse_scan` to ParquetOptions (default false)
- Wire config through `with_table_parquet_options` to set `exact_reverse`
- Fix limit correctness: skip passing limit to parquet reader when
  reverse_rows=true; apply limit in ReversedRowGroupStream after reversal
- Display `scan_direction=Reversed` for exact, `reverse_row_groups=true` for inexact
- Add 4 snapshot tests for exact reverse (removes Sort, fetch pushdown, through projection)
- Add 8 SLT tests: EXPLAIN plans + result verification, LIMIT, OFFSET+LIMIT, ASC unchanged, toggle off
Addresses Copilot review comment on PR #47: when `row_selection` is present
(e.g. from page pruning via pushdown_filters), the parquet stream emits only
the selected rows, so seeding `rg_row_counts` from `RowGroupMetaData::num_rows()`
caused ReversedRowGroupStream to mis-detect row-group boundaries and silently
mix batches from multiple row groups, producing wrong ordering.

Fix: new `compute_selected_rows_per_rg` helper walks the RowSelection in
lock-step with the row groups and computes the actual output row count per RG.

Tests added:
- 4 unit tests for compute_selected_rows_per_rg (no skip, spanning skips,
  all-skipped, short selection error)
- test_exact_reverse_scan_multi_rg_produces_global_desc: verifies Inexact
  yields [7,8,9,4,5,6,1,2,3] while Exact yields [9..1] (globally DESC)
- test_exact_reverse_scan_applies_limit_after_reversal: verifies limit=4
  over [1..9] yields [9,8,7,6] (top of forward order, not first N pre-reverse)
- test_exact_reverse_scan_with_row_selection_across_rgs: regression test
  for the row_selection bug — 3 RGs with per-RG selections yield the
  expected [10,9,8,7,6,5,4,3]
- test_exact_reverse_scan_with_row_selection_and_limit: combined case
When RowSelection skips all rows in a row group, rg_row_counts has a 0
entry. Without this fix, rows_remaining_in_rg=0 causes the first batch
from the next real RG to immediately trigger flush_buffer(), attributing
that batch to the wrong (empty) RG and splitting the real RG's batches
across two flush cycles — corrupting output order.

Fix: skip RGs with 0 selected rows in both new() (for leading empty RGs)
and flush_buffer() (for middle/trailing empty RGs).

Tests:
- test_exact_reverse_scan_with_empty_rg_from_row_selection: 3 RGs where
  middle RG is fully skipped, verifies [12..9, 4..1] order
- test_compute_selected_rows_per_rg_with_fully_skipped_middle_rg: unit
  test confirming rg_row_counts=[4,0,4] for fully skipped middle RG
When pushdown_filters is enabled, RowFilter may reduce actual rows per
row group below what rg_row_counts predicts. ReversedRowGroupStream
handles this correctly: delayed RG boundary detection means multiple
RGs may buffer together, but all remaining batches are flushed and
reversed when the stream ends. Memory cost is O(all data) instead of
O(largest RG), acceptable for LIMIT queries.

Added SLT test verifying exact reverse with pushdown_filters=true
produces correct DESC results with and without LIMIT.
…eParquetSource)

Replace ReversedRowGroupStream's rg_row_counts boundary detection with
per-row-group independent reading. Each RG gets its own
ParquetRecordBatchStreamBuilder with RowFilter applied independently,
then batches are reversed per-RG. This fixes the correctness issue
where RowFilter reduces actual rows below rg_row_counts predictions.

Memory: O(largest RG), same as Atlas's ReverseParquetSource.

Added SLT test for exact reverse + pushdown_filters + predicate.
When PushdownSort removes SortExec and sets reverse_row_groups=true /
reverse_rows=true on ParquetSource, these runtime flags must survive
proto serialization. Without this, remote executors (RemoteExec) would
deserialize the plan without reverse scanning, producing wrong order.

Added reverse_row_groups and reverse_rows fields to ParquetScanExecNode
proto message, serialized on encode and restored on decode.
Fix: per-RG iteration was popping from Vec back, yielding forward
order instead of reversed. Changed to VecDeque::pop_front to
correctly iterate RGs in reversed order (highest RG first).

Added SLT tests:
- 12.10: multi-RG exact reverse (max_row_group_size=3, 10 rows = 4 RGs)
  - Full DESC scan: correct order across all RG boundaries
  - LIMIT spanning RG boundaries
- 12.11: multi-RG + pushdown_filters + predicate + LIMIT
Copilot AI review requested due to automatic review settings April 23, 2026 08:11
@zhuqi-lucas zhuqi-lucas requested a review from xudong963 April 23, 2026 08:12
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds an exact reverse scan mode for Parquet sort pushdown so ORDER BY ... DESC LIMIT N can eliminate SortExec entirely and push the limit into the scan while still producing globally sorted output (when enabled via config).

Changes:

  • Introduces datafusion.execution.parquet.enable_exact_reverse_scan (default false) and documents/exposes it.
  • Implements exact reverse scan execution by reversing rows within each row group and applying limit after reversal.
  • Adds proto roundtrip support for reverse-scan runtime flags plus optimizer/tests/SLT coverage for exact reverse behavior.

Reviewed changes

Copilot reviewed 14 out of 16 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
docs/source/user-guide/configs.md Documents the new Parquet config option.
datafusion/sqllogictest/test_files/sort_pushdown.slt Adds SLT coverage for exact reverse scan plans/results (including LIMIT/OFFSET and filters).
datafusion/sqllogictest/test_files/information_schema.slt Updates SHOW ALL expected output with the new config.
datafusion/common/src/config.rs Adds enable_exact_reverse_scan to Parquet execution options.
datafusion/common/src/file_options/parquet_writer.rs Plumbs the new ParquetOptions field through writer option handling/tests.
datafusion/datasource-parquet/src/source.rs Adds exact-reverse toggles/flags and returns Exact pushdown when enabled; updates plan display.
datafusion/datasource-parquet/src/opener.rs Implements per-row-group reverse reading and applies limit after reversal; adds supporting tests and schema adaptation helper.
datafusion/physical-optimizer/src/pushdown_sort.rs Removes SortExec on Exact pushdown and pushes fetch/limit into the plan tree.
datafusion/core/tests/physical_optimizer/test_utils.rs Adds helper for building Parquet execs with exact reverse enabled.
datafusion/core/tests/physical_optimizer/pushdown_sort.rs Adds snapshot tests validating Sort removal + fetch pushdown for exact reverse.
datafusion/proto/proto/datafusion.proto Extends ParquetScanExecNode with runtime reverse-scan flags.
datafusion/proto/src/generated/prost.rs Generated Rust updates for the new proto fields.
datafusion/proto/src/generated/pbjson.rs Generated JSON serde updates for the new proto fields.
datafusion/proto/src/physical_plan/mod.rs Serializes/deserializes the new reverse-scan runtime flags to preserve behavior over proto.
datafusion/proto/src/logical_plan/file_formats.rs Sets default for the new ParquetOptions field when decoding from proto.
datafusion/proto-common/src/from_proto/mod.rs Sets default for the new ParquetOptions field when decoding from proto-common.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +860 to +866
// Note: when pushdown_filters is enabled, RowFilter may reduce
// actual rows below what rg_row_counts predicts. This causes
// ReversedRowGroupStream's RG boundary detection to delay
// (multiple RGs may buffer together), but correctness is preserved
// because all buffered batches are flushed and reversed when the
// stream ends. Memory cost becomes O(all data) instead of
// O(largest RG), which is acceptable for LIMIT queries.
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment in the exact_reverse branch still references ReversedRowGroupStream RG-boundary detection drift and buffering “O(all data)” when pushdown_filters reduces row counts (lines 860-866). The implementation now reads each row group independently when reverse_rows is enabled, so this comment is misleading/outdated and should be updated to reflect the current behavior and memory characteristics.

Suggested change
// Note: when pushdown_filters is enabled, RowFilter may reduce
// actual rows below what rg_row_counts predicts. This causes
// ReversedRowGroupStream's RG boundary detection to delay
// (multiple RGs may buffer together), but correctness is preserved
// because all buffered batches are flushed and reversed when the
// stream ends. Memory cost becomes O(all data) instead of
// O(largest RG), which is acceptable for LIMIT queries.
// When `reverse_rows` is enabled, each row group is read and
// reversed independently. `pushdown_filters` may still reduce the
// number of rows produced from a row group, but it does not cause
// cross-row-group buffering or delayed RG-boundary detection in
// this path. Memory usage therefore remains bounded by the row
// group being processed rather than growing to all buffered data.

Copilot uses AI. Check for mistakes.
@@ -525,6 +525,7 @@ mod parquet {
max_predicate_cache_size: proto.max_predicate_cache_size_opt.as_ref().map(|opt| match opt {
parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size) => *size as usize,
}),
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

enable_exact_reverse_scan is hardcoded to false when decoding ParquetOptionsProto (line 528). This makes ParquetOptions proto roundtrips lossy: a plan/file-format encoded with exact reverse enabled cannot be restored with the same setting. If this option is intended to be serializable like the other ParquetOptions fields, it should be added to the protobuf schema and mapped here; otherwise, please add an explicit comment explaining why it is intentionally not preserved.

Suggested change
}),
}),
// `ParquetOptionsProto` does not currently encode
// `enable_exact_reverse_scan`, so decoding cannot restore a
// previously configured value here. Preserve existing behavior by
// falling back to the backward-compatible default.

Copilot uses AI. Check for mistakes.
max_predicate_cache_size: value.max_predicate_cache_size_opt.map(|opt| match opt {
protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v) => Some(v as usize),
}).unwrap_or(None),
enable_exact_reverse_scan: false,
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

enable_exact_reverse_scan is always set to false in proto deserialization (line 1016), which makes ParquetOptions protobuf roundtrips unable to preserve this configuration. If the option should survive serialization like other ParquetOptions fields, add it to the protobuf and decode it here; otherwise document why it is intentionally dropped.

Suggested change
enable_exact_reverse_scan: false,
enable_exact_reverse_scan: value.enable_exact_reverse_scan,

Copilot uses AI. Check for mistakes.
Comment on lines +151 to +154
// Try with_fetch on the current node
if let Some(plan_with_fetch) = plan.with_fetch(Some(fetch)) {
return plan_with_fetch;
}
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

push_fetch_into_plan applies with_fetch to the first node that supports it (lines 151-154). Several single-child wrapper nodes (e.g. CoalesceBatchesExec) implement with_fetch but do not push the limit down to their child, which prevents file-level early termination. To actually push the fetch to the scan, recurse into the single child first and only apply with_fetch on the current node if the child path can’t accept it (or explicitly target DataSourceExec/FileScanConfig nodes that support limit pushdown).

Copilot uses AI. Check for mistakes.
Comment on lines +235 to +237
/// Compute per-row-group *selected* row counts for exact reverse buffering.
///
/// `RowSelection` is a flat sequence of `RowSelector` values (alternating
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The doc comment starting here (“Compute per-row-group selected row counts…”) is incomplete and is currently attached to the impl FileOpener for ParquetOpener block rather than a function/type. This looks like a leftover from an earlier implementation and should either be removed or completed and moved onto the actual helper it describes.

Suggested change
/// Compute per-row-group *selected* row counts for exact reverse buffering.
///
/// `RowSelection` is a flat sequence of `RowSelector` values (alternating

Copilot uses AI. Check for mistakes.
Comment on lines 877 to +883
/// Copies metrics from ArrowReaderMetrics (the metrics collected by the
/// arrow-rs parquet reader) to the parquet file metrics for DataFusion
/// Adapt a RecordBatch to match the expected output schema.
/// Handles type mismatches from schema evolution, nullability differences,
/// and List/Struct inner field name differences.
fn adapt_batch_schema(
batch: &RecordBatch,
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment “Copies metrics from ArrowReaderMetrics …” (line 877) now documents adapt_batch_schema, and copy_arrow_reader_metrics no longer has a doc comment. Please move this metrics comment down to copy_arrow_reader_metrics and add (or keep) a separate doc comment describing adapt_batch_schema.

Copilot uses AI. Check for mistakes.
rg_builder = rg_builder.with_row_filter(filter);
}
Ok(None) => {}
Err(_) => {}
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the reverse_rows per-RG path, row_filter::build_row_filter errors are silently swallowed (Err(_) => {}), while the non-reverse path logs a debug message with the predicate/error. Please keep the behavior consistent by logging (at least at debug) so filter pushdown failures remain diagnosable.

Suggested change
Err(_) => {}
Err(e) => {
log::debug!(
"Ignoring error building row filter for predicate {pred}: {e}"
);
}

Copilot uses AI. Check for mistakes.
Comment on lines +727 to +732
let rg_stream = rg_builder
.with_projection(mask.clone())
.with_batch_size(batch_size)
.with_row_groups(vec![rg_idx])
.build()?;

Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reverse_rows per-RG reader path builds rg_stream without configuring ArrowReaderMetrics / calling copy_arrow_reader_metrics, which means parquet predicate-cache/reader metrics won’t be reported for exact reverse scans. If metrics parity matters, consider wiring ArrowReaderMetrics::enabled() into rg_builder.with_metrics(...) and copying the counters like the non-reverse path.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator

@xudong963 xudong963 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I re-checked the merged code & the related fix commits.

The ordering-properties issue still looks unresolved: the exact path removes SortExec, but the rebuilt FileScanConfig still keeps the original output_ordering instead of reversing it. The current snapshots still show output_ordering=[a@0 ASC] together with scan_direction=Reversed, which suggests the plan metadata still does not match the actual output ordering.

@zhuqi-lucas
Copy link
Copy Markdown
Collaborator Author

I re-checked the merged code & the related fix commits.

The ordering-properties issue still looks unresolved: the exact path removes SortExec, but the rebuilt FileScanConfig still keeps the original output_ordering instead of reversing it. The current snapshots still show output_ordering=[a@0 ASC] together with scan_direction=Reversed, which suggests the plan metadata still does not match the actual output ordering.

@xudong963 output_ordering describes the physical sort order of data in the file, which is ASC regardless of scan direction. scan_direction=Reversed is the runtime flag that tells the executor to reverse the read. These
are intentionally separate concerns — output_ordering is file metadata, scan_direction is execution behavior.

The optimizer already handles this correctly: when PushdownSort gets Exact, it knows the output satisfies the requested DESC ordering and removes SortExec. No subsequent optimizer re-checks output_ordering() against the actual data direction. SLT tests verify correct DESC results.

Could you point to a specific code path where this causes incorrect behavior? I'd like to understand the concern better.

@xudong963
Copy link
Copy Markdown
Collaborator

Could you point to a specific code path where this causes incorrect behavior? I'd like to understand the concern better

I contructed some tests for reference

statement ok
SET datafusion.optimizer.enable_sort_pushdown = true;

statement ok
SET datafusion.execution.parquet.enable_exact_reverse_scan = true;

statement ok
SET datafusion.execution.target_partitions = 4;

statement ok
CREATE TABLE exact_rev_multi_part1(id INT, value INT, name VARCHAR) AS VALUES
(1, 100, 'a'), (2, 200, 'b'), (3, 300, 'c');

statement ok
CREATE TABLE exact_rev_multi_part2(id INT, value INT, name VARCHAR) AS VALUES
(4, 400, 'd'), (5, 500, 'e'), (6, 600, 'f');

statement ok
CREATE TABLE exact_rev_multi_part3(id INT, value INT, name VARCHAR) AS VALUES
(7, 700, 'g'), (8, 800, 'h'), (9, 900, 'i');

query I
COPY (SELECT * FROM exact_rev_multi_part1 ORDER BY id ASC)
TO 'test_files/scratch/sort_pushdown/exact_multi/part1.parquet';
----
3

query I
COPY (SELECT * FROM exact_rev_multi_part2 ORDER BY id ASC)
TO 'test_files/scratch/sort_pushdown/exact_multi/part2.parquet';
----
3

query I
COPY (SELECT * FROM exact_rev_multi_part3 ORDER BY id ASC)
TO 'test_files/scratch/sort_pushdown/exact_multi/part3.parquet';
----
3

statement ok
CREATE EXTERNAL TABLE exact_rev_multi_part(id INT, value INT, name VARCHAR)
STORED AS PARQUET
LOCATION 'test_files/scratch/sort_pushdown/exact_multi/'
WITH ORDER (id ASC);

query error SanityCheckPlan[\\s\\S]*does not satisfy order requirements: \\[id@0 DESC\\]
EXPLAIN SELECT * FROM exact_rev_multi_part ORDER BY id DESC LIMIT 3;

@zhuqi-lucas
Copy link
Copy Markdown
Collaborator Author

Could you point to a specific code path where this causes incorrect behavior? I'd like to understand the concern better

I contructed some tests for reference

statement ok
SET datafusion.optimizer.enable_sort_pushdown = true;

statement ok
SET datafusion.execution.parquet.enable_exact_reverse_scan = true;

statement ok
SET datafusion.execution.target_partitions = 4;

statement ok
CREATE TABLE exact_rev_multi_part1(id INT, value INT, name VARCHAR) AS VALUES
(1, 100, 'a'), (2, 200, 'b'), (3, 300, 'c');

statement ok
CREATE TABLE exact_rev_multi_part2(id INT, value INT, name VARCHAR) AS VALUES
(4, 400, 'd'), (5, 500, 'e'), (6, 600, 'f');

statement ok
CREATE TABLE exact_rev_multi_part3(id INT, value INT, name VARCHAR) AS VALUES
(7, 700, 'g'), (8, 800, 'h'), (9, 900, 'i');

query I
COPY (SELECT * FROM exact_rev_multi_part1 ORDER BY id ASC)
TO 'test_files/scratch/sort_pushdown/exact_multi/part1.parquet';
----
3

query I
COPY (SELECT * FROM exact_rev_multi_part2 ORDER BY id ASC)
TO 'test_files/scratch/sort_pushdown/exact_multi/part2.parquet';
----
3

query I
COPY (SELECT * FROM exact_rev_multi_part3 ORDER BY id ASC)
TO 'test_files/scratch/sort_pushdown/exact_multi/part3.parquet';
----
3

statement ok
CREATE EXTERNAL TABLE exact_rev_multi_part(id INT, value INT, name VARCHAR)
STORED AS PARQUET
LOCATION 'test_files/scratch/sort_pushdown/exact_multi/'
WITH ORDER (id ASC);

query error SanityCheckPlan[\\s\\S]*does not satisfy order requirements: \\[id@0 DESC\\]
EXPLAIN SELECT * FROM exact_rev_multi_part ORDER BY id DESC LIMIT 3;

Thank you @xudong963 , i will investigate more about this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants