diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 14dacbc7e9536..935de21fc503e 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -732,6 +732,13 @@ config_namespace! { /// parquet reader setting. 0 means no caching. pub max_predicate_cache_size: Option, default = None + /// (reading) If true, reverse scans produce exact descending order + /// by reversing rows within each row group. This allows the Sort + /// operator to be removed entirely and fetch/limit to be pushed + /// down to the scan. If false (default), reverse scans only reverse + /// row group order (inexact), keeping TopK above for final sorting. + pub enable_exact_reverse_scan: bool, default = false + // The following options affect writing to parquet files // and map to parquet::file::properties::WriterProperties diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index 196cb96f3832d..4d37e663a0453 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -209,6 +209,7 @@ impl ParquetOptions { coerce_int96: _, // not used for writer props skip_arrow_metadata: _, max_predicate_cache_size: _, + enable_exact_reverse_scan: _, // reads not used for writer props } = self; let mut builder = WriterProperties::builder() @@ -464,6 +465,7 @@ mod tests { skip_arrow_metadata: defaults.skip_arrow_metadata, coerce_int96: None, max_predicate_cache_size: defaults.max_predicate_cache_size, + enable_exact_reverse_scan: defaults.enable_exact_reverse_scan, } } @@ -578,6 +580,8 @@ mod tests { binary_as_string: global_options_defaults.binary_as_string, skip_arrow_metadata: global_options_defaults.skip_arrow_metadata, coerce_int96: None, + enable_exact_reverse_scan: global_options_defaults + .enable_exact_reverse_scan, }, column_specific_options, key_value_metadata, diff --git a/datafusion/core/tests/physical_optimizer/pushdown_sort.rs b/datafusion/core/tests/physical_optimizer/pushdown_sort.rs index caef0fba052cb..1fc4a6d8a6b68 100644 --- a/datafusion/core/tests/physical_optimizer/pushdown_sort.rs +++ b/datafusion/core/tests/physical_optimizer/pushdown_sort.rs @@ -33,9 +33,9 @@ use std::sync::Arc; use crate::physical_optimizer::test_utils::{ OptimizationTest, coalesce_batches_exec, coalesce_partitions_exec, parquet_exec, - parquet_exec_with_sort, projection_exec, projection_exec_with_alias, - repartition_exec, schema, simple_projection_exec, sort_exec, sort_exec_with_fetch, - sort_expr, sort_expr_named, test_scan_with_ordering, + parquet_exec_with_sort, parquet_exec_with_sort_exact_reverse, projection_exec, + projection_exec_with_alias, repartition_exec, schema, simple_projection_exec, + sort_exec, sort_exec_with_fetch, sort_expr, sort_expr_named, test_scan_with_ordering, }; #[test] @@ -1038,3 +1038,119 @@ fn test_sort_pushdown_with_test_scan_arbitrary_ordering() { " ); } + +// ============================================================================ +// EXACT REVERSE SCAN TESTS +// ============================================================================ +// These tests verify behavior when exact_reverse is enabled on ParquetSource. +// With exact reverse, the Sort operator is removed entirely and fetch is pushed +// down to the scan. + +#[test] +fn test_exact_reverse_removes_sort() { + // With exact_reverse=true, Sort should be removed entirely + let schema = schema(); + let a = sort_expr("a", &schema); + let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap(); + let source = + parquet_exec_with_sort_exact_reverse(schema.clone(), vec![source_ordering]); + + let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap(); + let plan = sort_exec(desc_ordering, source); + + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownSort::new(), true), + @r" + OptimizationTest: + input: + - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet + output: + Ok: + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet, scan_direction=Reversed + " + ); +} + +#[test] +fn test_exact_reverse_with_fetch_pushes_limit() { + // With exact_reverse=true, Sort with fetch should be removed and fetch + // pushed down to the scan + let schema = schema(); + let a = sort_expr("a", &schema); + let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap(); + let source = + parquet_exec_with_sort_exact_reverse(schema.clone(), vec![source_ordering]); + + let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap(); + let plan = sort_exec_with_fetch(desc_ordering, Some(10), source); + + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownSort::new(), true), + @r" + OptimizationTest: + input: + - SortExec: TopK(fetch=10), expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet + output: + Ok: + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], limit=10, output_ordering=[a@0 ASC], file_type=parquet, scan_direction=Reversed + " + ); +} + +#[test] +fn test_exact_reverse_through_projection_with_fetch() { + // Exact reverse with fetch pushes through projection + let schema = schema(); + let a = sort_expr("a", &schema); + let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap(); + let source = + parquet_exec_with_sort_exact_reverse(schema.clone(), vec![source_ordering]); + + let projection = simple_projection_exec(source, vec![0, 1]); + + let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap(); + let plan = sort_exec_with_fetch(desc_ordering, Some(5), projection); + + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownSort::new(), true), + @r" + OptimizationTest: + input: + - SortExec: TopK(fetch=5), expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] + - ProjectionExec: expr=[a@0 as a, b@1 as b] + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet + output: + Ok: + - ProjectionExec: expr=[a@0 as a, b@1 as b] + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], limit=5, output_ordering=[a@0 ASC], file_type=parquet, scan_direction=Reversed + " + ); +} + +#[test] +fn test_exact_reverse_without_fetch_no_limit() { + // Exact reverse without fetch: Sort removed, no limit on scan + let schema = schema(); + let a = sort_expr("a", &schema); + let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap(); + let source = + parquet_exec_with_sort_exact_reverse(schema.clone(), vec![source_ordering]); + + let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap(); + let plan = sort_exec(desc_ordering, source); // no fetch + + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownSort::new(), true), + @r" + OptimizationTest: + input: + - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet + output: + Ok: + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet, scan_direction=Reversed + " + ); +} diff --git a/datafusion/core/tests/physical_optimizer/test_utils.rs b/datafusion/core/tests/physical_optimizer/test_utils.rs index 5b50181d7fd3e..71e32c16f558a 100644 --- a/datafusion/core/tests/physical_optimizer/test_utils.rs +++ b/datafusion/core/tests/physical_optimizer/test_utils.rs @@ -101,6 +101,23 @@ pub(crate) fn parquet_exec_with_sort( DataSourceExec::from_data_source(config) } +/// Create a single parquet file that is sorted with exact_reverse enabled +pub(crate) fn parquet_exec_with_sort_exact_reverse( + schema: SchemaRef, + output_ordering: Vec, +) -> Arc { + let source = ParquetSource::new(schema).with_exact_reverse(true); + let config = FileScanConfigBuilder::new( + ObjectStoreUrl::parse("test:///").unwrap(), + Arc::new(source), + ) + .with_file(PartitionedFile::new("x".to_string(), 100)) + .with_output_ordering(output_ordering) + .build(); + + DataSourceExec::from_data_source(config) +} + fn int64_stats() -> ColumnStatistics { ColumnStatistics { null_count: Precision::Absent, diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 962b78f394354..bc97cf5fcce5c 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -29,7 +29,7 @@ use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_expr::utils::reassign_expr_columns; use datafusion_physical_expr_adapter::replace_columns_with_literals; -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -122,6 +122,8 @@ pub(super) struct ParquetOpener { /// discard partially-matched row groups because they may contain rows that /// sort before fully-matched groups. pub preserve_order: bool, + /// Whether to reverse rows within each batch (for Exact reverse scan) + pub reverse_rows: bool, } /// Represents a prepared access plan with optional row selection @@ -170,6 +172,54 @@ impl PreparedAccessPlan { Ok(self) } + /// Split the overall row_selection into per-RG selections. + /// Returns a map from RG index → RowSelection for that RG. + pub(crate) fn per_rg_selections( + &self, + rg_metadata: &[RowGroupMetaData], + ) -> HashMap { + use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; + + let mut result = HashMap::new(); + let Some(ref overall) = self.row_selection else { + return result; + }; + + let mut selectors = overall.iter().peekable(); + let mut current_remaining: usize = 0; + let mut current_skip: bool = false; + + for &rg_idx in &self.row_group_indexes { + let mut rows_left = rg_metadata[rg_idx].num_rows() as usize; + let mut rg_selectors = Vec::new(); + + while rows_left > 0 { + if current_remaining == 0 { + if let Some(sel) = selectors.next() { + current_remaining = sel.row_count; + current_skip = sel.skip; + } else { + break; + } + } + let consumed = rows_left.min(current_remaining); + if current_skip { + rg_selectors.push(RowSelector::skip(consumed)); + } else { + rg_selectors.push(RowSelector::select(consumed)); + } + rows_left -= consumed; + current_remaining -= consumed; + } + + if !rg_selectors.is_empty() { + result.insert(rg_idx, RowSelection::from(rg_selectors)); + } + } + + result + } + /// Apply this access plan to a ParquetRecordBatchStreamBuilder fn apply_to_builder( self, @@ -182,6 +232,9 @@ impl PreparedAccessPlan { } } +/// Compute per-row-group *selected* row counts for exact reverse buffering. +/// +/// `RowSelection` is a flat sequence of `RowSelector` values (alternating impl FileOpener for ParquetOpener { fn open(&self, partitioned_file: PartitionedFile) -> Result { let file_range = partitioned_file.range.clone(); @@ -279,6 +332,10 @@ impl FileOpener for ParquetOpener { let max_predicate_cache_size = self.max_predicate_cache_size; let reverse_row_groups = self.reverse_row_groups; + let reverse_rows = self.reverse_rows; + let partition_index = self.partition_index; + let parquet_file_reader_factory = Arc::clone(&self.parquet_file_reader_factory); + let metrics = self.metrics.clone(); Ok(Box::pin(async move { #[cfg(feature = "parquet_encryption")] let file_decryption_properties = encryption_context @@ -433,6 +490,13 @@ impl FileOpener for ParquetOpener { metadata_timer.stop(); + // Clone metadata before moving into builder — needed for per-RG + // reverse scan which creates independent builders per row group. + let reader_metadata_for_reverse = if reverse_rows { + Some(Arc::new(reader_metadata.clone())) + } else { + None + }; let mut builder = ParquetRecordBatchStreamBuilder::new_with_metadata( async_file_reader, reader_metadata, @@ -442,6 +506,9 @@ impl FileOpener for ParquetOpener { let mask = ProjectionMask::roots(builder.parquet_schema(), indices); + // Save the physical predicate for per-RG reverse scan filter pushdown + let pushdown_predicate = predicate.clone(); + // Filter pushdown: evaluate predicates during scan if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() { let row_filter = row_filter::build_row_filter( @@ -563,137 +630,245 @@ impl FileOpener for ParquetOpener { prepared_plan = prepared_plan.reverse(file_metadata.as_ref())?; } - // Apply the prepared plan to the builder - builder = prepared_plan.apply_to_builder(builder); - - if let Some(limit) = limit { - builder = builder.with_limit(limit) - } - - if let Some(max_predicate_cache_size) = max_predicate_cache_size { - builder = builder.with_max_predicate_cache_size(max_predicate_cache_size); - } - - // metrics from the arrow reader itself - let arrow_reader_metrics = ArrowReaderMetrics::enabled(); - - let stream = builder - .with_projection(mask) - .with_batch_size(batch_size) - .with_metrics(arrow_reader_metrics.clone()) - .build()?; - - let files_ranges_pruned_statistics = - file_metrics.files_ranges_pruned_statistics.clone(); - let predicate_cache_inner_records = - file_metrics.predicate_cache_inner_records.clone(); - let predicate_cache_records = file_metrics.predicate_cache_records.clone(); - - let stream_schema = Arc::clone(stream.schema()); - // Check if we need to replace the schema to handle things like differing nullability or metadata. - // See note below about file vs. output schema. - let replace_schema = !stream_schema.eq(&output_schema); - - // Rebase column indices to match the narrowed stream schema. - // The projection expressions have indices based on physical_file_schema, - // but the stream only contains the columns selected by the ProjectionMask. - let projection = projection - .try_map_exprs(|expr| reassign_expr_columns(expr, &stream_schema))?; - - let projector = projection.make_projector(&stream_schema)?; - - let stream = stream.map_err(DataFusionError::from).map(move |b| { - b.and_then(|mut b| { - copy_arrow_reader_metrics( - &arrow_reader_metrics, - &predicate_cache_inner_records, - &predicate_cache_records, - ); - b = projector.project_batch(&b)?; - if replace_schema { - // Ensure the output batch has the expected schema. - // - // In DataFusion 51, SchemaAdapter::map_batch() handled - // schema mismatches by casting each column via - // arrow::compute::cast_with_options(). DF 52 removed - // SchemaAdapter, so we restore that behaviour here. - // - // This handles: - // - Schema/field level metadata differences - // - Nullability mismatches (OPTIONAL vs NOT NULL) - // - Type mismatches from schema evolution (e.g. Utf8 → Date32) - // - List/Struct inner field name/nullability differences - // (e.g. List(Field("conditions", Int32, false)) vs - // List(Field("element", Int32, true))) - let (stream_schema, arrays, num_rows) = b.into_parts(); - let adapted_arrays: Vec = arrays - .iter() - .enumerate() - .map(|(i, array)| { - let target_type = output_schema.field(i).data_type(); - if array.data_type() == target_type { - Ok(Arc::clone(array)) - } else { - // Try cast first (handles value-level conversions - // like Utf8 → Date32) - let casted = if arrow::compute::can_cast_types( - array.data_type(), - target_type, - ) { - arrow::compute::cast(array, target_type)? - } else { - Arc::clone(array) - }; - // If types still differ after cast (e.g. List inner - // field name/nullability), rebuild with target type - if casted.data_type() != target_type { - let data = casted - .to_data() - .into_builder() - .data_type(target_type.clone()) - .build() - .map_err(|e| { - DataFusionError::ArrowError(Box::new(e), Some(format!( - "Failed to adapt column '{}' from {} to {}", - stream_schema.field(i).name(), - array.data_type(), - target_type, - ))) - })?; - Ok(arrow::array::make_array(data)) - } else { - Ok(casted) + // When reverse_rows is enabled, read each row group independently + // and reverse rows within it. This avoids the rg_row_counts boundary + // detection issue when RowFilter reduces actual row counts. + // Memory: O(largest RG). Modeled after Atlas's ReverseParquetSource. + if reverse_rows { + let rg_indexes = prepared_plan.row_group_indexes.clone(); + let per_rg_sels = prepared_plan.per_rg_selections(rg_metadata); + let files_ranges_pruned_statistics = + file_metrics.files_ranges_pruned_statistics.clone(); + let reader_metadata = reader_metadata_for_reverse + .expect("reader_metadata_for_reverse set when reverse_rows=true"); + + let physical_file_schema_for_filter = Arc::clone(&physical_file_schema); + let file_metrics_for_rg = file_metrics.clone(); + + // rg_indexes is already in reversed order [3,2,1,0]. + // Use VecDeque to pop from front (highest RG first). + let rg_deque: VecDeque = rg_indexes.into_iter().collect(); + let stream = futures::stream::try_unfold( + (rg_deque, limit), + move |(mut rg_indexes, mut remaining_limit)| { + let reader_metadata = Arc::clone(&reader_metadata); + let mask = mask.clone(); + let output_schema = Arc::clone(&output_schema); + let projection = projection.clone(); + let partitioned_file = partitioned_file.clone(); + let parquet_file_reader_factory = + Arc::clone(&parquet_file_reader_factory); + let metrics = metrics.clone(); + let pushdown_predicate = pushdown_predicate.clone(); + let physical_file_schema = + Arc::clone(&physical_file_schema_for_filter); + let file_metrics = file_metrics_for_rg.clone(); + let per_rg_sels = per_rg_sels.clone(); + + async move { + // Pop from front — RGs are in reversed order [3,2,1,0] + let rg_idx = match rg_indexes.pop_front() { + Some(idx) if remaining_limit != Some(0) => idx, + _ => { + return Ok::<_, DataFusionError>(None); + } + }; + + // Create a fresh reader for this single RG + let reader: Box = + parquet_file_reader_factory.create_reader( + partition_index, + partitioned_file.clone(), + metadata_size_hint, + &metrics, + )?; + + let mut rg_builder = + ParquetRecordBatchStreamBuilder::new_with_metadata( + reader, + reader_metadata.as_ref().clone(), + ); + + // Apply predicate pushdown to per-RG builder + if let Some(pred) = pushdown_filters + .then_some(pushdown_predicate.as_ref()) + .flatten() + { + let row_filter = row_filter::build_row_filter( + pred, + &physical_file_schema, + rg_builder.metadata(), + reorder_predicates, + &file_metrics, + ); + match row_filter { + Ok(Some(filter)) => { + rg_builder = rg_builder.with_row_filter(filter); } + Ok(None) => {} + Err(_) => {} + }; + } + + if let Some(max_predicate_cache_size) = + max_predicate_cache_size + { + rg_builder = rg_builder.with_max_predicate_cache_size( + max_predicate_cache_size, + ); + } + + // Apply per-RG row selection (from page pruning) + if let Some(rg_sel) = per_rg_sels.get(&rg_idx) { + rg_builder = + rg_builder.with_row_selection(rg_sel.clone()); + } + + let rg_stream = rg_builder + .with_projection(mask.clone()) + .with_batch_size(batch_size) + .with_row_groups(vec![rg_idx]) + .build()?; + + let stream_schema = Arc::clone(rg_stream.schema()); + let replace_schema = !stream_schema.eq(&output_schema); + let projection = projection.try_map_exprs(|expr| { + reassign_expr_columns(expr, &stream_schema) + })?; + let projector = projection.make_projector(&stream_schema)?; + + // Read all batches for this RG, apply projection + let batches: Vec = rg_stream + .map_err(DataFusionError::from) + .map(|b| { + b.and_then(|b| { + let b = projector.project_batch(&b)?; + if replace_schema { + adapt_batch_schema(&b, &output_schema) + } else { + Ok(b) + } + }) + }) + .try_collect() + .await?; + + // Reverse each batch, then reverse batch order + let mut reversed = Vec::with_capacity(batches.len()); + for batch in batches.into_iter().rev() { + if batch.num_rows() <= 1 { + reversed.push(batch); + continue; } - }) - .collect::>>()?; - // Note: nullability handling is left to the caller - // (e.g. atlas's adapt_table_schema_for_parquet which - // forces file columns nullable without touching partition - // columns). We only handle type/field-name adaptation here. - let options = - RecordBatchOptions::new().with_row_count(Some(num_rows)); - RecordBatch::try_new_with_options( - Arc::clone(&output_schema), - adapted_arrays, - &options, - ) - .map_err(Into::into) - } else { - Ok(b) - } - }) - }); - - if let Some(file_pruner) = file_pruner { - Ok(EarlyStoppingStream::new( - stream, - file_pruner, - files_ranges_pruned_statistics, + let indices = arrow::array::UInt32Array::from_iter_values( + (0..batch.num_rows() as u32).rev(), + ); + reversed.push(arrow::compute::take_record_batch( + &batch, &indices, + )?); + } + + // Apply limit across RGs + if let Some(ref mut lim) = remaining_limit { + let mut limited = Vec::new(); + for batch in reversed { + if *lim == 0 { + break; + } + let take = batch.num_rows().min(*lim); + limited.push(batch.slice(0, take)); + *lim -= take; + } + reversed = limited; + } + + Ok(Some(( + futures::stream::iter( + reversed.into_iter().map(Ok::<_, DataFusionError>), + ), + (rg_indexes, remaining_limit), + ))) + } + }, ) - .boxed()) + .try_flatten() + .boxed(); + + if let Some(file_pruner) = file_pruner { + Ok(EarlyStoppingStream::new( + stream, + file_pruner, + files_ranges_pruned_statistics, + ) + .boxed()) + } else { + Ok(stream.boxed()) + } } else { - Ok(stream.boxed()) + // Non-reverse path: one stream for all RGs + + // Apply the prepared plan to the builder + builder = prepared_plan.apply_to_builder(builder); + + if let Some(limit) = limit { + builder = builder.with_limit(limit) + } + + if let Some(max_predicate_cache_size) = max_predicate_cache_size { + builder = + builder.with_max_predicate_cache_size(max_predicate_cache_size); + } + + let arrow_reader_metrics = ArrowReaderMetrics::enabled(); + + let stream = builder + .with_projection(mask) + .with_batch_size(batch_size) + .with_metrics(arrow_reader_metrics.clone()) + .build()?; + + let files_ranges_pruned_statistics = + file_metrics.files_ranges_pruned_statistics.clone(); + let predicate_cache_inner_records = + file_metrics.predicate_cache_inner_records.clone(); + let predicate_cache_records = + file_metrics.predicate_cache_records.clone(); + + let stream_schema = Arc::clone(stream.schema()); + let replace_schema = !stream_schema.eq(&output_schema); + let projection = projection + .try_map_exprs(|expr| reassign_expr_columns(expr, &stream_schema))?; + let projector = projection.make_projector(&stream_schema)?; + + let stream = stream.map_err(DataFusionError::from).map(move |b| { + b.and_then(|mut b| { + copy_arrow_reader_metrics( + &arrow_reader_metrics, + &predicate_cache_inner_records, + &predicate_cache_records, + ); + b = projector.project_batch(&b)?; + if replace_schema { + adapt_batch_schema(&b, &output_schema) + } else { + Ok(b) + } + }) + }); + + let stream: futures::stream::BoxStream<'static, Result> = + stream.boxed(); + + if let Some(file_pruner) = file_pruner { + Ok(EarlyStoppingStream::new( + stream, + file_pruner, + files_ranges_pruned_statistics, + ) + .boxed()) + } else { + Ok(stream.boxed()) + } } })) } @@ -701,6 +876,57 @@ impl FileOpener for ParquetOpener { /// Copies metrics from ArrowReaderMetrics (the metrics collected by the /// arrow-rs parquet reader) to the parquet file metrics for DataFusion +/// Adapt a RecordBatch to match the expected output schema. +/// Handles type mismatches from schema evolution, nullability differences, +/// and List/Struct inner field name differences. +fn adapt_batch_schema( + batch: &RecordBatch, + output_schema: &SchemaRef, +) -> Result { + let (stream_schema, arrays, num_rows) = batch.clone().into_parts(); + let adapted_arrays: Vec = arrays + .iter() + .enumerate() + .map(|(i, array)| { + let target_type = output_schema.field(i).data_type(); + if array.data_type() == target_type { + Ok(Arc::clone(array)) + } else { + let casted = + if arrow::compute::can_cast_types(array.data_type(), target_type) { + arrow::compute::cast(array, target_type)? + } else { + Arc::clone(array) + }; + if casted.data_type() != target_type { + let data = casted + .to_data() + .into_builder() + .data_type(target_type.clone()) + .build() + .map_err(|e| { + DataFusionError::ArrowError( + Box::new(e), + Some(format!( + "Failed to adapt column '{}' from {} to {}", + stream_schema.field(i).name(), + array.data_type(), + target_type, + )), + ) + })?; + Ok(arrow::array::make_array(data)) + } else { + Ok(casted) + } + } + }) + .collect::>>()?; + let options = RecordBatchOptions::new().with_row_count(Some(num_rows)); + RecordBatch::try_new_with_options(Arc::clone(output_schema), adapted_arrays, &options) + .map_err(Into::into) +} + fn copy_arrow_reader_metrics( arrow_reader_metrics: &ArrowReaderMetrics, predicate_cache_inner_records: &Count, @@ -1035,6 +1261,7 @@ fn should_enable_page_index( #[cfg(test)] mod test { + use std::pin::Pin; use std::sync::Arc; use super::{ConstantColumns, constant_columns_from_stats}; @@ -1085,6 +1312,7 @@ mod test { max_predicate_cache_size: Option, reverse_row_groups: bool, preserve_order: bool, + reverse_rows: bool, } impl ParquetOpenerBuilder { @@ -1111,6 +1339,7 @@ mod test { max_predicate_cache_size: None, reverse_row_groups: false, preserve_order: false, + reverse_rows: false, } } @@ -1168,6 +1397,12 @@ mod test { self } + /// Set reverse_rows flag (Exact reverse scan: per-RG buffer + row reversal). + fn with_reverse_rows(mut self, enable: bool) -> Self { + self.reverse_rows = enable; + self + } + /// Set preserve_order flag. When true, prune_by_limit is disabled. fn with_preserve_order(mut self, enable: bool) -> Self { self.preserve_order = enable; @@ -1231,6 +1466,7 @@ mod test { max_predicate_cache_size: self.max_predicate_cache_size, reverse_row_groups: self.reverse_row_groups, preserve_order: self.preserve_order, + reverse_rows: self.reverse_rows, } } } @@ -1319,7 +1555,7 @@ mod test { } async fn count_batches_and_rows( - mut stream: std::pin::Pin< + mut stream: Pin< Box< dyn Stream> + Send, @@ -1337,7 +1573,7 @@ mod test { /// Helper to collect all int32 values from the first column of batches async fn collect_int32_values( - mut stream: std::pin::Pin< + mut stream: Pin< Box< dyn Stream> + Send, @@ -2338,4 +2574,343 @@ mod test { ); } } + + // ============================================================================ + // Exact reverse scan tests + // ============================================================================ + // + // These cover the `reverse_rows=true` path (per-RG buffer + row reversal) that + // is layered on top of `reverse_row_groups`: + // + // reverse_row_groups only: Inexact — RGs reversed, rows within RG still ASC. + // reverse_row_groups + reverse_rows: Exact — globally DESC. + // + #[tokio::test] + async fn test_exact_reverse_scan_multi_rg_produces_global_desc() { + // Three RGs, each with an ascending run. With reverse_row_groups + + // reverse_rows, the output must be globally descending. + use parquet::file::properties::WriterProperties; + + let store = Arc::new(InMemory::new()) as Arc; + let batch1 = + record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap(); + let batch2 = + record_batch!(("a", Int32, vec![Some(4), Some(5), Some(6)])).unwrap(); + let batch3 = + record_batch!(("a", Int32, vec![Some(7), Some(8), Some(9)])).unwrap(); + + let props = WriterProperties::builder() + .set_max_row_group_size(3) // one RG per batch + .build(); + let data_len = write_parquet_batches( + Arc::clone(&store), + "test.parquet", + vec![batch1.clone(), batch2, batch3], + Some(props), + ) + .await; + + let schema = batch1.schema(); + let file = PartitionedFile::new( + "test.parquet".to_string(), + u64::try_from(data_len).unwrap(), + ); + + // Inexact (only RGs reversed; rows within RG still ASC). + let inexact = ParquetOpenerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(Arc::clone(&schema)) + .with_projection_indices(&[0]) + .with_reverse_row_groups(true) + .build(); + let stream = inexact.open(file.clone()).unwrap().await.unwrap(); + let inexact_values = collect_int32_values(stream).await; + assert_eq!( + inexact_values, + vec![7, 8, 9, 4, 5, 6, 1, 2, 3], + "Inexact: RGs reversed but rows within RG stay ASC" + ); + + // Exact (reverse_rows adds per-RG row reversal → globally DESC). + let exact = ParquetOpenerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(Arc::clone(&schema)) + .with_projection_indices(&[0]) + .with_reverse_row_groups(true) + .with_reverse_rows(true) + .build(); + let stream = exact.open(file.clone()).unwrap().await.unwrap(); + let exact_values = collect_int32_values(stream).await; + assert_eq!( + exact_values, + vec![9, 8, 7, 6, 5, 4, 3, 2, 1], + "Exact: globally sorted DESC" + ); + } + + #[tokio::test] + async fn test_exact_reverse_scan_applies_limit_after_reversal() { + // With exact reverse + limit, the limit must come from the *end* of the + // logical forward order, not the first N rows pre-reversal. + use parquet::file::properties::WriterProperties; + + let store = Arc::new(InMemory::new()) as Arc; + let batch1 = + record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap(); + let batch2 = + record_batch!(("a", Int32, vec![Some(4), Some(5), Some(6)])).unwrap(); + let batch3 = + record_batch!(("a", Int32, vec![Some(7), Some(8), Some(9)])).unwrap(); + + let props = WriterProperties::builder() + .set_max_row_group_size(3) + .build(); + let data_len = write_parquet_batches( + Arc::clone(&store), + "test.parquet", + vec![batch1.clone(), batch2, batch3], + Some(props), + ) + .await; + + let schema = batch1.schema(); + let file = PartitionedFile::new( + "test.parquet".to_string(), + u64::try_from(data_len).unwrap(), + ); + + let opener = ParquetOpenerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(Arc::clone(&schema)) + .with_projection_indices(&[0]) + .with_reverse_row_groups(true) + .with_reverse_rows(true) + .with_limit(Some(4)) + .build(); + let stream = opener.open(file).unwrap().await.unwrap(); + let values = collect_int32_values(stream).await; + assert_eq!( + values, + vec![9, 8, 7, 6], + "Limit must be applied AFTER row reversal; \ + applying it at the parquet reader layer would produce [1,2,3,4] \ + reversed to [4,3,2,1] — wrong." + ); + } + + #[tokio::test] + async fn test_exact_reverse_scan_with_row_selection_across_rgs() { + // Regression test for copilot review comment #2: when `row_selection` + // (e.g. from page pruning / pushdown filters) causes the stream to emit + // fewer rows per RG than `num_rows()` suggests, `ReversedRowGroupStream` + // must still detect RG boundaries correctly. Before the fix, + // `rg_row_counts` was seeded from `RowGroupMetaData::num_rows()` and the + // boundary detector drifted, silently mixing batches from multiple RGs. + use parquet::file::properties::WriterProperties; + + let store = Arc::new(InMemory::new()) as Arc; + // Three RGs of 4 rows each. Each RG's rows are ASC (and so are the RGs + // relative to one another), so forward scan = [1..12] and any correct + // reverse scan over the selected rows must be DESC. + let batch1 = + record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3), Some(4)])) + .unwrap(); + let batch2 = + record_batch!(("a", Int32, vec![Some(5), Some(6), Some(7), Some(8)])) + .unwrap(); + let batch3 = + record_batch!(("a", Int32, vec![Some(9), Some(10), Some(11), Some(12)])) + .unwrap(); + + let props = WriterProperties::builder() + .set_max_row_group_size(4) + .build(); + let data_len = write_parquet_batches( + Arc::clone(&store), + "test.parquet", + vec![batch1.clone(), batch2, batch3], + Some(props), + ) + .await; + + let schema = batch1.schema(); + + // Attach a ParquetAccessPlan with a per-RG RowSelection: + // RG0 : skip first 2, select last 2 → selects rows {3, 4} + // RG1 : select all → selects rows {5, 6, 7, 8} + // RG2 : select first 2, skip last 2 → selects rows {9, 10} + // + // Exact reverse over this selection must return [10, 9, 8, 7, 6, 5, 4, 3]. + use crate::ParquetAccessPlan; + use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; + + let mut access_plan = ParquetAccessPlan::new_all(3); + access_plan.scan_selection( + 0, + RowSelection::from(vec![RowSelector::skip(2), RowSelector::select(2)]), + ); + access_plan.scan_selection( + 2, + RowSelection::from(vec![RowSelector::select(2), RowSelector::skip(2)]), + ); + + let file = PartitionedFile::new( + "test.parquet".to_string(), + u64::try_from(data_len).unwrap(), + ) + .with_extensions(Arc::new(access_plan)); + + let opener = ParquetOpenerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(Arc::clone(&schema)) + .with_projection_indices(&[0]) + .with_reverse_row_groups(true) + .with_reverse_rows(true) + .build(); + let stream = opener.open(file).unwrap().await.unwrap(); + let values = collect_int32_values(stream).await; + assert_eq!( + values, + vec![10, 9, 8, 7, 6, 5, 4, 3], + "Exact reverse must respect row_selection when computing RG boundaries" + ); + } + + #[tokio::test] + async fn test_exact_reverse_scan_with_row_selection_and_limit() { + // Exact reverse + row_selection + limit. Must produce the top-N in DESC + // order taken from the selected rows (not the unselected ones). + use parquet::file::properties::WriterProperties; + + let store = Arc::new(InMemory::new()) as Arc; + let batch1 = + record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3), Some(4)])) + .unwrap(); + let batch2 = + record_batch!(("a", Int32, vec![Some(5), Some(6), Some(7), Some(8)])) + .unwrap(); + let props = WriterProperties::builder() + .set_max_row_group_size(4) + .build(); + let data_len = write_parquet_batches( + Arc::clone(&store), + "test.parquet", + vec![batch1.clone(), batch2], + Some(props), + ) + .await; + + let schema = batch1.schema(); + + // Select only rows {2, 3, 6, 7}. + use crate::ParquetAccessPlan; + use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; + let mut access_plan = ParquetAccessPlan::new_all(2); + access_plan.scan_selection( + 0, + RowSelection::from(vec![ + RowSelector::skip(1), + RowSelector::select(2), + RowSelector::skip(1), + ]), + ); + access_plan.scan_selection( + 1, + RowSelection::from(vec![ + RowSelector::skip(1), + RowSelector::select(2), + RowSelector::skip(1), + ]), + ); + + let file = PartitionedFile::new( + "test.parquet".to_string(), + u64::try_from(data_len).unwrap(), + ) + .with_extensions(Arc::new(access_plan)); + + let opener = ParquetOpenerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(Arc::clone(&schema)) + .with_projection_indices(&[0]) + .with_reverse_row_groups(true) + .with_reverse_rows(true) + .with_limit(Some(3)) + .build(); + let stream = opener.open(file).unwrap().await.unwrap(); + let values = collect_int32_values(stream).await; + assert_eq!( + values, + vec![7, 6, 3], + "top 3 of {{2, 3, 6, 7}} in DESC order" + ); + } + + /// Regression test: when RowSelection skips ALL rows in an RG (empty RG), + /// `rg_row_counts` has a 0 entry. Without the skip-empty-RG fix, + /// `rows_remaining_in_rg=0` causes the first batch from the next real RG + /// to immediately trigger `flush_buffer()`, attributing that batch to the + /// wrong (empty) RG and corrupting the output order. + #[tokio::test] + async fn test_exact_reverse_scan_with_empty_rg_from_row_selection() { + use parquet::file::properties::WriterProperties; + + let store = Arc::new(InMemory::new()) as Arc; + // Three RGs of 4 rows each. + let batch1 = + record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3), Some(4)])) + .unwrap(); + let batch2 = + record_batch!(("a", Int32, vec![Some(5), Some(6), Some(7), Some(8)])) + .unwrap(); + let batch3 = + record_batch!(("a", Int32, vec![Some(9), Some(10), Some(11), Some(12)])) + .unwrap(); + + let props = WriterProperties::builder() + .set_max_row_group_size(4) + .build(); + let data_len = write_parquet_batches( + Arc::clone(&store), + "test.parquet", + vec![batch1.clone(), batch2, batch3], + Some(props), + ) + .await; + + let schema = batch1.schema(); + + // RG0: select all 4 rows → {1,2,3,4} + // RG1: skip ALL rows → empty (0 selected) + // RG2: select all 4 rows → {9,10,11,12} + use crate::ParquetAccessPlan; + use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; + + let mut access_plan = ParquetAccessPlan::new_all(3); + // RG1: skip all 4 rows + access_plan.scan_selection(1, RowSelection::from(vec![RowSelector::skip(4)])); + + let file = PartitionedFile::new( + "test.parquet".to_string(), + u64::try_from(data_len).unwrap(), + ) + .with_extensions(Arc::new(access_plan)); + + let opener = ParquetOpenerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(Arc::clone(&schema)) + .with_projection_indices(&[0]) + .with_reverse_row_groups(true) + .with_reverse_rows(true) + .build(); + let stream = opener.open(file).unwrap().await.unwrap(); + let values = collect_int32_values(stream).await; + + // Reversed: RG2 first [12,11,10,9], skip empty RG1, then RG0 [4,3,2,1] + assert_eq!( + values, + vec![12, 11, 10, 9, 4, 3, 2, 1], + "empty RG (all rows skipped) must be handled without corrupting order" + ); + } } diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 6eee12e5c609d..b9b824d6736ff 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -288,11 +288,19 @@ pub struct ParquetSource { pub(crate) projection: ProjectionExprs, #[cfg(feature = "parquet_encryption")] pub(crate) encryption_factory: Option>, - /// If true, read files in reverse order and reverse row groups within files. - /// But it's not guaranteed that rows within row groups are in reverse order, - /// so we still need to sort them after reading, so the reverse scan is inexact. - /// Used to optimize ORDER BY ... DESC on sorted data. + /// If true, read row groups in reverse order within each file. + /// Combined with `reverse_rows`, controls the sort pushdown behavior: + /// - `reverse_row_groups=true, reverse_rows=false`: Inexact (RGs reversed, rows within RG not) + /// - `reverse_row_groups=true, reverse_rows=true`: Exact (both RGs and rows reversed) reverse_row_groups: bool, + /// If true, reverse the row order within each batch after reading. + /// This gives exact descending order when combined with `reverse_row_groups`. + reverse_rows: bool, + /// If true, `try_reverse_output` returns `Exact` (reverse_row_groups + reverse_rows), + /// allowing the Sort operator to be removed entirely and fetch to be pushed down. + /// If false (default), returns `Inexact` (only reverse_row_groups), preserving + /// backward-compatible behavior where Sort is kept as TopK. + exact_reverse: bool, } impl ParquetSource { @@ -318,6 +326,8 @@ impl ParquetSource { #[cfg(feature = "parquet_encryption")] encryption_factory: None, reverse_row_groups: false, + reverse_rows: false, + exact_reverse: false, } } @@ -326,6 +336,7 @@ impl ParquetSource { mut self, table_parquet_options: TableParquetOptions, ) -> Self { + self.exact_reverse = table_parquet_options.global.enable_exact_reverse_scan; self.table_parquet_options = table_parquet_options; self } @@ -481,6 +492,40 @@ impl ParquetSource { pub fn reverse_row_groups(&self) -> bool { self.reverse_row_groups } + + /// Enable or disable exact reverse scanning. + /// + /// When `true`, `try_reverse_output` returns `Exact` (both row groups and + /// rows within each batch are reversed), allowing the Sort operator to be + /// removed entirely and fetch/limit to be pushed down to the scan. + /// + /// When `false` (default), `try_reverse_output` returns `Inexact` (only row + /// groups are reversed), preserving backward-compatible behavior where the + /// Sort operator is kept as TopK. + pub fn with_exact_reverse(mut self, exact_reverse: bool) -> Self { + self.exact_reverse = exact_reverse; + self + } + + /// Returns whether exact reverse scanning is enabled. + pub fn exact_reverse(&self) -> bool { + self.exact_reverse + } + + /// Returns whether rows within each batch are reversed. + pub fn reverse_rows(&self) -> bool { + self.reverse_rows + } + + /// Enable or disable row reversal within each batch. + /// + /// This is normally set internally by `try_reverse_output` when + /// `exact_reverse` is enabled, but must be set explicitly when + /// restoring a plan from proto serialization (RemoteExec path). + pub fn with_reverse_rows(mut self, reverse_rows: bool) -> Self { + self.reverse_rows = reverse_rows; + self + } } /// Parses datafusion.common.config.ParquetOptions.coerce_int96 String to a arrow_schema.datatype.TimeUnit @@ -568,6 +613,7 @@ impl FileSource for ParquetSource { max_predicate_cache_size: self.max_predicate_cache_size(), reverse_row_groups: self.reverse_row_groups, preserve_order: !base_config.output_ordering.is_empty(), + reverse_rows: self.reverse_rows, }); Ok(opener) } @@ -621,8 +667,10 @@ impl FileSource for ParquetSource { write!(f, "{predicate_string}")?; - // Add reverse_scan info if enabled - if self.reverse_row_groups { + // Add reverse scan info if enabled + if self.reverse_row_groups && self.reverse_rows { + write!(f, ", scan_direction=Reversed")?; + } else if self.reverse_row_groups { write!(f, ", reverse_row_groups=true")?; } @@ -804,12 +852,32 @@ impl FileSource for ParquetSource { return Ok(SortOrderPushdownResult::Unsupported); } - // Return Inexact because we're only reversing row group order, - // not guaranteeing perfect row-level ordering - let new_source = self.clone().with_reverse_row_groups(true); - Ok(SortOrderPushdownResult::Inexact { - inner: Arc::new(new_source) as Arc, - }) + let new_source: Arc = if self.exact_reverse { + // Exact: reverse both row groups and rows within each batch, + // giving globally sorted output. This allows the Sort operator + // to be removed entirely and fetch to be pushed down to the scan. + // + // Note: when pushdown_filters is enabled, RowFilter may reduce + // actual rows below what rg_row_counts predicts. This causes + // ReversedRowGroupStream's RG boundary detection to delay + // (multiple RGs may buffer together), but correctness is preserved + // because all buffered batches are flushed and reversed when the + // stream ends. Memory cost becomes O(all data) instead of + // O(largest RG), which is acceptable for LIMIT queries. + let mut source = self.clone().with_reverse_row_groups(true); + source.reverse_rows = true; + Arc::new(source) + } else { + // Inexact: only reverse row groups. The Sort operator stays + // (as TopK) but benefits from early termination. + Arc::new(self.clone().with_reverse_row_groups(true)) + }; + + if self.exact_reverse { + Ok(SortOrderPushdownResult::Exact { inner: new_source }) + } else { + Ok(SortOrderPushdownResult::Inexact { inner: new_source }) + } // TODO Phase 2: Add support for other optimizations: // - File reordering based on min/max statistics @@ -917,4 +985,69 @@ mod tests { assert!(source.reverse_row_groups()); assert!(source.filter().is_some()); } + + #[test] + fn test_exact_reverse_returns_exact() { + use arrow::compute::SortOptions; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_physical_expr::EquivalenceProperties; + use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; + use datafusion_physical_plan::SortOrderPushdownResult; + + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)])); + + let source = ParquetSource::new(schema.clone()).with_exact_reverse(true); + + // Build equivalence properties with ASC ordering + let sort_expr = PhysicalSortExpr { + expr: Arc::new(datafusion_physical_expr::expressions::Column::new("a", 0)), + options: SortOptions::default(), // ASC NULLS LAST + }; + let mut eq_properties = EquivalenceProperties::new(schema); + eq_properties.add_orderings(vec![vec![sort_expr.clone()]]); + + // Request DESC ordering (reverse of source) + let desc_expr = sort_expr.reverse(); + + let result = source + .try_reverse_output(&[desc_expr], &eq_properties) + .unwrap(); + + assert!( + matches!(result, SortOrderPushdownResult::Exact { .. }), + "with_exact_reverse(true) should return Exact" + ); + } + + #[test] + fn test_default_returns_inexact() { + use arrow::compute::SortOptions; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_physical_expr::EquivalenceProperties; + use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; + use datafusion_physical_plan::SortOrderPushdownResult; + + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)])); + + // Default: exact_reverse is false + let source = ParquetSource::new(schema.clone()); + + let sort_expr = PhysicalSortExpr { + expr: Arc::new(datafusion_physical_expr::expressions::Column::new("a", 0)), + options: SortOptions::default(), + }; + let mut eq_properties = EquivalenceProperties::new(schema); + eq_properties.add_orderings(vec![vec![sort_expr.clone()]]); + + let desc_expr = sort_expr.reverse(); + + let result = source + .try_reverse_output(&[desc_expr], &eq_properties) + .unwrap(); + + assert!( + matches!(result, SortOrderPushdownResult::Inexact { .. }), + "default (exact_reverse=false) should return Inexact" + ); + } } diff --git a/datafusion/physical-optimizer/src/pushdown_sort.rs b/datafusion/physical-optimizer/src/pushdown_sort.rs index 1fa15492d2a92..00fa95f570e70 100644 --- a/datafusion/physical-optimizer/src/pushdown_sort.rs +++ b/datafusion/physical-optimizer/src/pushdown_sort.rs @@ -95,7 +95,16 @@ impl PhysicalOptimizerRule for PushdownSort { // Each node type defines its own pushdown behavior via try_pushdown_sort() match sort_input.try_pushdown_sort(required_ordering)? { SortOrderPushdownResult::Exact { inner } => { - // Data source guarantees perfect ordering - remove the Sort operator + // Data source guarantees perfect ordering - remove the Sort. + // If Sort had a fetch (TopK), push it into the inner plan + // tree for file-level early termination. Traverse + // single-child nodes (Projection, Cooperative) to reach + // the leaf data source that supports with_fetch. + let inner = if let Some(fetch) = sort_exec.fetch() { + push_fetch_into_plan(inner, fetch) + } else { + inner + }; Ok(Transformed::yes(inner)) } SortOrderPushdownResult::Inexact { inner } => { @@ -127,3 +136,34 @@ impl PhysicalOptimizerRule for PushdownSort { true } } + +/// Push fetch (limit) into a plan tree for Exact sort pushdown. +/// +/// Traverses single-child nodes (ProjectionExec, CooperativeExec, etc.) +/// to find the deepest node that supports `with_fetch` (typically +/// FileScanExec or DataSourceExec) and sets fetch on it. +/// +/// Falls back to wrapping with GlobalLimitExec if no node supports it. +fn push_fetch_into_plan( + plan: Arc, + fetch: usize, +) -> Arc { + // Try with_fetch on the current node + if let Some(plan_with_fetch) = plan.with_fetch(Some(fetch)) { + return plan_with_fetch; + } + + // Single-child node: recurse into child, then rebuild parent + let children = plan.children(); + if children.len() == 1 { + let child = Arc::clone(children[0]); + let new_child = push_fetch_into_plan(child, fetch); + if let Ok(rebuilt) = Arc::clone(&plan).with_new_children(vec![new_child]) { + return rebuilt; + } + } + + // Fallback: wrap with GlobalLimitExec + use datafusion_physical_plan::limit::GlobalLimitExec; + Arc::new(GlobalLimitExec::new(plan, 0, Some(fetch))) +} diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index df2865d71bca0..d64dccc13f7cf 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -1013,6 +1013,7 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions { max_predicate_cache_size: value.max_predicate_cache_size_opt.map(|opt| match opt { protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v) => Some(v as usize), }).unwrap_or(None), + enable_exact_reverse_scan: false, }) } } diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index bd7dd3a6aff3c..3e22b170466fc 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1058,6 +1058,12 @@ message ParquetScanExecNode { PhysicalExprNode predicate = 3; datafusion_common.TableParquetOptions parquet_options = 4; + + // Runtime reverse-scan flags set by PushdownSort optimizer. + // Must be preserved across proto roundtrips so that remote executors + // produce correctly reversed output after SortExec removal. + bool reverse_row_groups = 5; + bool reverse_rows = 6; } message CsvScanExecNode { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index e269606d163a3..ef68c4a3b09bf 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -13739,6 +13739,12 @@ impl serde::Serialize for ParquetScanExecNode { if self.parquet_options.is_some() { len += 1; } + if self.reverse_row_groups { + len += 1; + } + if self.reverse_rows { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion.ParquetScanExecNode", len)?; if let Some(v) = self.base_conf.as_ref() { struct_ser.serialize_field("baseConf", v)?; @@ -13749,6 +13755,12 @@ impl serde::Serialize for ParquetScanExecNode { if let Some(v) = self.parquet_options.as_ref() { struct_ser.serialize_field("parquetOptions", v)?; } + if self.reverse_row_groups { + struct_ser.serialize_field("reverseRowGroups", &self.reverse_row_groups)?; + } + if self.reverse_rows { + struct_ser.serialize_field("reverseRows", &self.reverse_rows)?; + } struct_ser.end() } } @@ -13764,6 +13776,10 @@ impl<'de> serde::Deserialize<'de> for ParquetScanExecNode { "predicate", "parquet_options", "parquetOptions", + "reverse_row_groups", + "reverseRowGroups", + "reverse_rows", + "reverseRows", ]; #[allow(clippy::enum_variant_names)] @@ -13771,6 +13787,8 @@ impl<'de> serde::Deserialize<'de> for ParquetScanExecNode { BaseConf, Predicate, ParquetOptions, + ReverseRowGroups, + ReverseRows, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -13795,6 +13813,8 @@ impl<'de> serde::Deserialize<'de> for ParquetScanExecNode { "baseConf" | "base_conf" => Ok(GeneratedField::BaseConf), "predicate" => Ok(GeneratedField::Predicate), "parquetOptions" | "parquet_options" => Ok(GeneratedField::ParquetOptions), + "reverseRowGroups" | "reverse_row_groups" => Ok(GeneratedField::ReverseRowGroups), + "reverseRows" | "reverse_rows" => Ok(GeneratedField::ReverseRows), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -13817,6 +13837,8 @@ impl<'de> serde::Deserialize<'de> for ParquetScanExecNode { let mut base_conf__ = None; let mut predicate__ = None; let mut parquet_options__ = None; + let mut reverse_row_groups__ = None; + let mut reverse_rows__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::BaseConf => { @@ -13837,12 +13859,26 @@ impl<'de> serde::Deserialize<'de> for ParquetScanExecNode { } parquet_options__ = map_.next_value()?; } + GeneratedField::ReverseRowGroups => { + if reverse_row_groups__.is_some() { + return Err(serde::de::Error::duplicate_field("reverseRowGroups")); + } + reverse_row_groups__ = Some(map_.next_value()?); + } + GeneratedField::ReverseRows => { + if reverse_rows__.is_some() { + return Err(serde::de::Error::duplicate_field("reverseRows")); + } + reverse_rows__ = Some(map_.next_value()?); + } } } Ok(ParquetScanExecNode { base_conf: base_conf__, predicate: predicate__, parquet_options: parquet_options__, + reverse_row_groups: reverse_row_groups__.unwrap_or_default(), + reverse_rows: reverse_rows__.unwrap_or_default(), }) } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index cf343e0258d0b..d178c41865d30 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1607,6 +1607,13 @@ pub struct ParquetScanExecNode { pub parquet_options: ::core::option::Option< super::datafusion_common::TableParquetOptions, >, + /// Runtime reverse-scan flags set by PushdownSort optimizer. + /// Must be preserved across proto roundtrips so that remote executors + /// produce correctly reversed output after SortExec removal. + #[prost(bool, tag = "5")] + pub reverse_row_groups: bool, + #[prost(bool, tag = "6")] + pub reverse_rows: bool, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct CsvScanExecNode { diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index 08f42b0af7290..85811c433bbf8 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -525,6 +525,7 @@ mod parquet { max_predicate_cache_size: proto.max_predicate_cache_size_opt.as_ref().map(|opt| match opt { parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size) => *size as usize, }), + enable_exact_reverse_scan: false, } } } diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 4ff90b61eed9c..b0c0db0f13a66 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -724,6 +724,14 @@ impl protobuf::PhysicalPlanNode { if let Some(predicate) = predicate { source = source.with_predicate(predicate); } + + // Restore runtime reverse-scan flags from proto + if scan.reverse_row_groups { + source = source.with_reverse_row_groups(true); + } + if scan.reverse_rows { + source = source.with_reverse_rows(true); + } let base_config = parse_protobuf_file_scan_config( base_conf, ctx, @@ -2672,6 +2680,8 @@ impl protobuf::PhysicalPlanNode { )?), predicate, parquet_options: Some(conf.table_parquet_options().try_into()?), + reverse_row_groups: conf.reverse_row_groups(), + reverse_rows: conf.reverse_rows(), }, )), })); diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 18f72cb9f7798..08ac5f0288bb9 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -242,6 +242,7 @@ datafusion.execution.parquet.data_page_row_count_limit 20000 datafusion.execution.parquet.data_pagesize_limit 1048576 datafusion.execution.parquet.dictionary_enabled true datafusion.execution.parquet.dictionary_page_size_limit 1048576 +datafusion.execution.parquet.enable_exact_reverse_scan false datafusion.execution.parquet.enable_page_index true datafusion.execution.parquet.encoding NULL datafusion.execution.parquet.force_filter_selections false @@ -377,6 +378,7 @@ datafusion.execution.parquet.data_page_row_count_limit 20000 (writing) Sets best datafusion.execution.parquet.data_pagesize_limit 1048576 (writing) Sets best effort maximum size of data page in bytes datafusion.execution.parquet.dictionary_enabled true (writing) Sets if dictionary encoding is enabled. If NULL, uses default parquet writer setting datafusion.execution.parquet.dictionary_page_size_limit 1048576 (writing) Sets best effort maximum dictionary page size, in bytes +datafusion.execution.parquet.enable_exact_reverse_scan false (reading) If true, reverse scans produce exact descending order by reversing rows within each row group. This allows the Sort operator to be removed entirely and fetch/limit to be pushed down to the scan. If false (default), reverse scans only reverse row group order (inexact), keeping TopK above for final sorting. datafusion.execution.parquet.enable_page_index true (reading) If true, reads the Parquet data page level metadata (the Page Index), if present, to reduce the I/O and number of rows decoded. datafusion.execution.parquet.encoding NULL (writing) Sets default encoding for any column. Valid values are: plain, plain_dictionary, rle, bit_packed, delta_binary_packed, delta_length_byte_array, delta_byte_array, rle_dictionary, and byte_stream_split. These values are not case sensitive. If NULL, uses default parquet writer setting datafusion.execution.parquet.force_filter_selections false (reading) Force the use of RowSelections for filter results, when pushdown_filters is enabled. If false, the reader will automatically choose between a RowSelection and a Bitmap based on the number and pattern of selected rows. diff --git a/datafusion/sqllogictest/test_files/sort_pushdown.slt b/datafusion/sqllogictest/test_files/sort_pushdown.slt index 99f26b66d458b..ed65db8aa0b3c 100644 --- a/datafusion/sqllogictest/test_files/sort_pushdown.slt +++ b/datafusion/sqllogictest/test_files/sort_pushdown.slt @@ -1524,6 +1524,283 @@ SELECT * FROM multi_partition_parquet ORDER BY id ASC; statement ok SET datafusion.execution.target_partitions = 2; +# ============================================================================ +# Test 12: Exact Reverse Scan +# ============================================================================ +# When enable_exact_reverse_scan=true, the Sort operator is removed entirely +# and fetch is pushed down to the scan. + +# Enable exact reverse scan BEFORE creating table so ParquetSource picks it up +statement ok +SET datafusion.execution.parquet.enable_exact_reverse_scan = true; + +statement ok +CREATE TABLE exact_rev_data(id INT, value INT, name VARCHAR) AS VALUES +(1, 100, 'a'), (2, 200, 'b'), (3, 300, 'c'), (4, 400, 'd'), (5, 500, 'e'), +(6, 600, 'f'), (7, 700, 'g'), (8, 800, 'h'), (9, 900, 'i'), (10, 1000, 'j'); + +query I +COPY (SELECT * FROM exact_rev_data ORDER BY id ASC) +TO 'test_files/scratch/sort_pushdown/exact_rev_data.parquet'; +---- +10 + +statement ok +CREATE EXTERNAL TABLE exact_rev_parquet(id INT, value INT, name VARCHAR) +STORED AS PARQUET +LOCATION 'test_files/scratch/sort_pushdown/exact_rev_data.parquet' +WITH ORDER (id ASC); + +# Test 12.1: Sort is removed, scan_direction=Reversed (exact), no SortExec +query TT +EXPLAIN SELECT * FROM exact_rev_parquet ORDER BY id DESC; +---- +logical_plan +01)Sort: exact_rev_parquet.id DESC NULLS FIRST +02)--TableScan: exact_rev_parquet projection=[id, value, name] +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/exact_rev_data.parquet]]}, projection=[id, value, name], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet, scan_direction=Reversed + +# Test 12.2: Results are correct in DESC order +query IIT +SELECT * FROM exact_rev_parquet ORDER BY id DESC; +---- +10 1000 j +9 900 i +8 800 h +7 700 g +6 600 f +5 500 e +4 400 d +3 300 c +2 200 b +1 100 a + +# Test 12.3: Sort removed, fetch pushed to scan (applied after row reversal in opener) +query TT +EXPLAIN SELECT * FROM exact_rev_parquet ORDER BY id DESC LIMIT 3; +---- +logical_plan +01)Sort: exact_rev_parquet.id DESC NULLS FIRST, fetch=3 +02)--TableScan: exact_rev_parquet projection=[id, value, name] +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/exact_rev_data.parquet]]}, projection=[id, value, name], limit=3, output_ordering=[id@0 ASC NULLS LAST], file_type=parquet, scan_direction=Reversed + +# Test 12.4: Results correct with LIMIT +query IIT +SELECT * FROM exact_rev_parquet ORDER BY id DESC LIMIT 3; +---- +10 1000 j +9 900 i +8 800 h + +# Test 12.5: OFFSET + LIMIT with exact reverse +query TT +EXPLAIN SELECT * FROM exact_rev_parquet ORDER BY id DESC LIMIT 3 OFFSET 2; +---- +logical_plan +01)Limit: skip=2, fetch=3 +02)--Sort: exact_rev_parquet.id DESC NULLS FIRST, fetch=5 +03)----TableScan: exact_rev_parquet projection=[id, value, name] +physical_plan +01)GlobalLimitExec: skip=2, fetch=3 +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/exact_rev_data.parquet]]}, projection=[id, value, name], limit=5, output_ordering=[id@0 ASC NULLS LAST], file_type=parquet, scan_direction=Reversed + +# Test 12.6: Results correct with OFFSET + LIMIT +query IIT +SELECT * FROM exact_rev_parquet ORDER BY id DESC LIMIT 3 OFFSET 2; +---- +8 800 h +7 700 g +6 600 f + +# Test 12.7: ASC order should NOT use reverse scan (same direction as source) +query TT +EXPLAIN SELECT * FROM exact_rev_parquet ORDER BY id ASC LIMIT 3; +---- +logical_plan +01)Sort: exact_rev_parquet.id ASC NULLS LAST, fetch=3 +02)--TableScan: exact_rev_parquet projection=[id, value, name] +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/exact_rev_data.parquet]]}, projection=[id, value, name], limit=3, output_ordering=[id@0 ASC NULLS LAST], file_type=parquet + +# Test 12.8: Verify default (exact_reverse=false) keeps Sort as TopK +statement ok +SET datafusion.execution.parquet.enable_exact_reverse_scan = false; + +statement ok +CREATE EXTERNAL TABLE exact_rev_disabled_parquet(id INT, value INT, name VARCHAR) +STORED AS PARQUET +LOCATION 'test_files/scratch/sort_pushdown/exact_rev_data.parquet' +WITH ORDER (id ASC); + +query TT +EXPLAIN SELECT * FROM exact_rev_disabled_parquet ORDER BY id DESC LIMIT 3; +---- +logical_plan +01)Sort: exact_rev_disabled_parquet.id DESC NULLS FIRST, fetch=3 +02)--TableScan: exact_rev_disabled_parquet projection=[id, value, name] +physical_plan +01)SortExec: TopK(fetch=3), expr=[id@0 DESC], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/exact_rev_data.parquet]]}, projection=[id, value, name], file_type=parquet, predicate=DynamicFilter [ empty ], reverse_row_groups=true + +# Test 12.9: Exact reverse works correctly with pushdown_filters enabled. +# RowFilter may reduce actual rows below rg_row_counts, but +# ReversedRowGroupStream handles this by flushing all remaining +# buffered batches when the stream ends. + +statement ok +SET datafusion.execution.parquet.enable_exact_reverse_scan = true; + +statement ok +SET datafusion.execution.parquet.pushdown_filters = true; + +statement ok +CREATE EXTERNAL TABLE exact_rev_pushdown_parquet(id INT, value INT, name VARCHAR) +STORED AS PARQUET +LOCATION 'test_files/scratch/sort_pushdown/exact_rev_data.parquet' +WITH ORDER (id ASC); + +# Sort removed (Exact), scan_direction=Reversed, with predicate pushed down +query TT +EXPLAIN SELECT * FROM exact_rev_pushdown_parquet WHERE value > 500 ORDER BY id DESC LIMIT 3; +---- +logical_plan +01)Sort: exact_rev_pushdown_parquet.id DESC NULLS FIRST, fetch=3 +02)--Filter: exact_rev_pushdown_parquet.value > Int32(500) +03)----TableScan: exact_rev_pushdown_parquet projection=[id, value, name], partial_filters=[exact_rev_pushdown_parquet.value > Int32(500)] +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/exact_rev_data.parquet]]}, projection=[id, value, name], limit=3, output_ordering=[id@0 ASC NULLS LAST], file_type=parquet, predicate=value@1 > 500, scan_direction=Reversed, pruning_predicate=value_null_count@1 != row_count@2 AND value_max@0 > 500, required_guarantees=[] + +# Results correct in DESC order with filter +query IIT +SELECT * FROM exact_rev_pushdown_parquet WHERE value > 500 ORDER BY id DESC LIMIT 3; +---- +10 1000 j +9 900 i +8 800 h + +# Without LIMIT — all filtered rows in DESC order +query IIT +SELECT * FROM exact_rev_pushdown_parquet WHERE value > 500 ORDER BY id DESC; +---- +10 1000 j +9 900 i +8 800 h +7 700 g +6 600 f + +statement ok +DROP TABLE exact_rev_pushdown_parquet; + +statement ok +SET datafusion.execution.parquet.pushdown_filters = false; + +# Test 12.10: Multi-RG exact reverse — create a file with small row groups +# to verify per-RG independent reverse works across RG boundaries. + +statement ok +SET datafusion.execution.parquet.enable_exact_reverse_scan = true; + +statement ok +SET datafusion.execution.parquet.max_row_group_size = 3; + +statement ok +CREATE TABLE exact_rev_multi_rg_data(id INT, value INT) AS VALUES +(1, 10), (2, 20), (3, 30), (4, 40), (5, 50), +(6, 60), (7, 70), (8, 80), (9, 90), (10, 100); + +query I +COPY (SELECT * FROM exact_rev_multi_rg_data ORDER BY id ASC) +TO 'test_files/scratch/sort_pushdown/exact_rev_multi_rg.parquet'; +---- +10 + +statement ok +CREATE EXTERNAL TABLE exact_rev_multi_rg(id INT, value INT) +STORED AS PARQUET +LOCATION 'test_files/scratch/sort_pushdown/exact_rev_multi_rg.parquet' +WITH ORDER (id ASC); + +# Sort removed (Exact), correct DESC results across multiple row groups +query TT +EXPLAIN SELECT * FROM exact_rev_multi_rg ORDER BY id DESC; +---- +logical_plan +01)Sort: exact_rev_multi_rg.id DESC NULLS FIRST +02)--TableScan: exact_rev_multi_rg projection=[id, value] +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/exact_rev_multi_rg.parquet]]}, projection=[id, value], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet, scan_direction=Reversed + +query II +SELECT * FROM exact_rev_multi_rg ORDER BY id DESC; +---- +10 100 +9 90 +8 80 +7 70 +6 60 +5 50 +4 40 +3 30 +2 20 +1 10 + +# LIMIT across RG boundaries (limit=4 spans RGs of size 3) +query II +SELECT * FROM exact_rev_multi_rg ORDER BY id DESC LIMIT 4; +---- +10 100 +9 90 +8 80 +7 70 + +# Test 12.11: Multi-RG + pushdown_filters — per-RG RowFilter applied correctly +statement ok +SET datafusion.execution.parquet.pushdown_filters = true; + +statement ok +CREATE EXTERNAL TABLE exact_rev_multi_rg_filtered(id INT, value INT) +STORED AS PARQUET +LOCATION 'test_files/scratch/sort_pushdown/exact_rev_multi_rg.parquet' +WITH ORDER (id ASC); + +query II +SELECT * FROM exact_rev_multi_rg_filtered WHERE value > 50 ORDER BY id DESC; +---- +10 100 +9 90 +8 80 +7 70 +6 60 + +query II +SELECT * FROM exact_rev_multi_rg_filtered WHERE value > 50 ORDER BY id DESC LIMIT 3; +---- +10 100 +9 90 +8 80 + +statement ok +DROP TABLE exact_rev_multi_rg_filtered; + +statement ok +SET datafusion.execution.parquet.pushdown_filters = false; + +statement ok +SET datafusion.execution.parquet.max_row_group_size = 1048576; + +statement ok +DROP TABLE exact_rev_multi_rg; + +statement ok +DROP TABLE exact_rev_multi_rg_data; + +# Cleanup exact reverse test tables +statement ok +DROP TABLE exact_rev_data; + +statement ok +DROP TABLE exact_rev_parquet; + +statement ok +DROP TABLE exact_rev_disabled_parquet; + # Cleanup statement ok DROP TABLE reversed_high; diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index ebd9ef728ae7b..22e0c765bbce3 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -90,6 +90,7 @@ The following configuration settings are available: | datafusion.execution.parquet.coerce_int96 | NULL | (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for reading data from systems like Spark which stores microsecond resolution timestamps in an int96 allowing it to write values with a larger date range than 64-bit timestamps with nanosecond resolution. | | datafusion.execution.parquet.bloom_filter_on_read | true | (reading) Use any available bloom filters when reading parquet files | | datafusion.execution.parquet.max_predicate_cache_size | NULL | (reading) The maximum predicate cache size, in bytes. When `pushdown_filters` is enabled, sets the maximum memory used to cache the results of predicate evaluation between filter evaluation and output generation. Decreasing this value will reduce memory usage, but may increase IO and CPU usage. None means use the default parquet reader setting. 0 means no caching. | +| datafusion.execution.parquet.enable_exact_reverse_scan | false | (reading) If true, reverse scans produce exact descending order by reversing rows within each row group. This allows the Sort operator to be removed entirely and fetch/limit to be pushed down to the scan. If false (default), reverse scans only reverse row group order (inexact), keeping TopK above for final sorting. | | datafusion.execution.parquet.data_pagesize_limit | 1048576 | (writing) Sets best effort maximum size of data page in bytes | | datafusion.execution.parquet.write_batch_size | 1024 | (writing) Sets write_batch_size in bytes | | datafusion.execution.parquet.writer_version | 1.0 | (writing) Sets parquet writer version valid values are "1.0" and "2.0" |