diff --git a/datafusion/datasource-parquet/src/decoder_projection.rs b/datafusion/datasource-parquet/src/decoder_projection.rs new file mode 100644 index 0000000000000..dcf52a37d4ff3 --- /dev/null +++ b/datafusion/datasource-parquet/src/decoder_projection.rs @@ -0,0 +1,130 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Decoder-projection construction for the parquet scan. +//! +//! [`DecoderProjection`] owns the two halves of "project a decoded parquet +//! batch onto the scan's output schema": +//! +//! * the [`ProjectionMask`] installed on every parquet decoder run, and +//! * the per-batch transform ([`DecoderProjection::map`]) that applies the +//! projector and, when needed, rebuilds the batch with the user's +//! `output_schema` to recover metadata / nullability the file schema does +//! not carry. +//! +//! The opener constructs one [`DecoderProjection`] per file via +//! [`DecoderProjection::try_new`] and hands it to the push-decoder stream, +//! which calls [`map`](DecoderProjection::map) on every decoded batch. + +use std::sync::Arc; + +use arrow::array::{RecordBatch, RecordBatchOptions}; +use arrow::datatypes::SchemaRef; + +use datafusion_common::Result; +use datafusion_physical_expr::projection::{ProjectionExprs, Projector}; +use datafusion_physical_expr::utils::reassign_expr_columns; + +use parquet::arrow::ProjectionMask; +use parquet::schema::types::SchemaDescriptor; + +use crate::row_filter::build_projection_read_plan; + +/// Per-file decoder projection: the [`ProjectionMask`] installed on every +/// parquet decoder run, plus the per-batch transform that maps the decoder's +/// output onto the scan's `output_schema`. +/// +/// Built once per file by the opener via [`Self::try_new`]; the +/// push-decoder stream installs [`Self::projection_mask`] on each decoder +/// and calls [`Self::map`] on every decoded batch. +pub(crate) struct DecoderProjection { + projection_mask: ProjectionMask, + projector: Projector, + output_schema: SchemaRef, + /// `true` when the projector's output schema differs from `output_schema` + /// in metadata / nullability and [`map`](Self::map) must rebuild the batch + /// with `output_schema`. + replace_schema: bool, +} + +impl DecoderProjection { + /// Build the decoder projection for a file. + /// + /// `projection` references columns in `physical_file_schema` (i.e. already + /// adapted by the per-file expr adapter); `parquet_schema` is the + /// corresponding parquet [`SchemaDescriptor`]. `output_schema` is what + /// consumers of the scan stream expect. + pub(crate) fn try_new( + projection: &ProjectionExprs, + physical_file_schema: &SchemaRef, + parquet_schema: &SchemaDescriptor, + output_schema: &SchemaRef, + ) -> Result { + let read_plan = build_projection_read_plan( + projection.expr_iter(), + physical_file_schema, + parquet_schema, + ); + + let stream_schema = read_plan.projected_schema; + + // Rebase the projection onto the decoder's stream schema (column + // indices change because the decoder yields only the masked columns). + let rebased_projection = projection + .clone() + .try_map_exprs(|expr| reassign_expr_columns(expr, &stream_schema))?; + let projector = rebased_projection.make_projector(&stream_schema)?; + + // Compare against the projector's *output* schema rather than the + // stream schema, so future widening of the mask (e.g. for post-scan + // filter columns) does not flip this flag. + let replace_schema = projector.output_schema() != output_schema; + + Ok(Self { + projection_mask: read_plan.projection_mask, + projector, + output_schema: Arc::clone(output_schema), + replace_schema, + }) + } + + /// The projection mask to install on every parquet decoder in the scan. + pub(crate) fn projection_mask(&self) -> &ProjectionMask { + &self.projection_mask + } + + /// Map a decoded batch onto the scan's output schema. + /// + /// Applies the [`Projector`] and, when the projector's output schema + /// differs from `output_schema` in metadata or nullability, rebuilds the + /// batch with `output_schema` (some writers emit OPTIONAL fields even when + /// the data has no nulls; some logical schemas carry field-level metadata + /// the file schema does not). + pub(crate) fn map(&self, batch: &RecordBatch) -> Result { + let projected = self.projector.project_batch(batch)?; + if !self.replace_schema { + return Ok(projected); + } + let (_stream_schema, arrays, num_rows) = projected.into_parts(); + let options = RecordBatchOptions::new().with_row_count(Some(num_rows)); + Ok(RecordBatch::try_new_with_options( + Arc::clone(&self.output_schema), + arrays, + &options, + )?) + } +} diff --git a/datafusion/datasource-parquet/src/mod.rs b/datafusion/datasource-parquet/src/mod.rs index 6da0cfc4c5371..cf1caf336fd56 100644 --- a/datafusion/datasource-parquet/src/mod.rs +++ b/datafusion/datasource-parquet/src/mod.rs @@ -26,6 +26,7 @@ pub mod access_plan; mod bloom_filter; +mod decoder_projection; pub mod file_format; pub mod metadata; mod metrics; diff --git a/datafusion/datasource-parquet/src/opener/mod.rs b/datafusion/datasource-parquet/src/opener/mod.rs index 95e0516e8bc27..f138a26bf4701 100644 --- a/datafusion/datasource-parquet/src/opener/mod.rs +++ b/datafusion/datasource-parquet/src/opener/mod.rs @@ -24,9 +24,10 @@ use self::early_stop::EarlyStoppingStream; #[cfg(feature = "parquet_encryption")] use self::encryption::EncryptionContext; use crate::access_plan::PreparedAccessPlan; +use crate::decoder_projection::DecoderProjection; use crate::page_filter::PagePruningAccessPlanFilter; use crate::push_decoder::{DecoderBuilderConfig, PushDecoderStreamState}; -use crate::row_filter::{RowFilterGenerator, build_projection_read_plan}; +use crate::row_filter::RowFilterGenerator; use crate::row_group_filter::{BloomFilterStatistics, RowGroupAccessPlanFilter}; use crate::{ Int96Coercer, ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory, @@ -36,7 +37,6 @@ use arrow::array::RecordBatch; use arrow::datatypes::DataType; use datafusion_datasource::morsel::{Morsel, MorselPlan, MorselPlanner, Morselizer}; use datafusion_physical_expr::projection::ProjectionExprs; -use datafusion_physical_expr::utils::reassign_expr_columns; use datafusion_physical_expr_adapter::replace_columns_with_literals; use std::collections::{HashMap, VecDeque}; use std::fmt; @@ -1156,11 +1156,17 @@ impl RowGroupsPrunedParquetOpen { }; let arrow_reader_metrics = ArrowReaderMetrics::enabled(); - let read_plan = build_projection_read_plan( - prepared.projection.expr_iter(), + + // Build the decoder projection (mask + per-batch transform) in a + // single call. Encapsulating it behind `DecoderProjection` keeps the + // opener's orchestration body focused on filter / decoder / stream + // wiring. + let decoder_projection = DecoderProjection::try_new( + &prepared.projection, &prepared.physical_file_schema, reader_metadata.parquet_schema(), - ); + &prepared.output_schema, + )?; let (decoder, pending_decoders, remaining_limit) = { let pushdown_predicate = prepared @@ -1188,7 +1194,7 @@ impl RowGroupsPrunedParquetOpen { let remaining_limit = prepared.limit.filter(|_| run_count > 1); let decoder_config = DecoderBuilderConfig { - read_plan: &read_plan, + projection_mask: decoder_projection.projection_mask(), batch_size: prepared.batch_size, arrow_reader_metrics: &arrow_reader_metrics, force_filter_selections: prepared.force_filter_selections, @@ -1226,19 +1232,6 @@ impl RowGroupsPrunedParquetOpen { let predicate_cache_records = prepared.file_metrics.predicate_cache_records.clone(); - // Check if we need to replace the schema to handle things like differing nullability or metadata. - // See note below about file vs. output schema. - let stream_schema = read_plan.projected_schema; - let replace_schema = stream_schema != prepared.output_schema; - - // Rebase column indices to match the narrowed stream schema. - // The projection expressions have indices based on physical_file_schema, - // but the stream only contains the columns selected by the ProjectionMask. - let projection = prepared - .projection - .try_map_exprs(|expr| reassign_expr_columns(expr, &stream_schema))?; - let projector = projection.make_projector(&stream_schema)?; - let output_schema = Arc::clone(&prepared.output_schema); let files_ranges_pruned_statistics = prepared.file_metrics.files_ranges_pruned_statistics.clone(); let stream = PushDecoderStreamState { @@ -1246,9 +1239,7 @@ impl RowGroupsPrunedParquetOpen { pending_decoders, remaining_limit, reader: prepared.async_file_reader, - projector, - output_schema, - replace_schema, + decoder_projection, arrow_reader_metrics, predicate_cache_inner_records, predicate_cache_records, diff --git a/datafusion/datasource-parquet/src/push_decoder.rs b/datafusion/datasource-parquet/src/push_decoder.rs index 8b71be3e8de96..3156b9e35fe24 100644 --- a/datafusion/datasource-parquet/src/push_decoder.rs +++ b/datafusion/datasource-parquet/src/push_decoder.rs @@ -32,24 +32,22 @@ //! [`PushDecoderStreamState::into_stream`] for consumption. use std::collections::VecDeque; -use std::sync::Arc; -use arrow::array::{RecordBatch, RecordBatchOptions}; -use arrow::datatypes::Schema; +use arrow::array::RecordBatch; use futures::StreamExt; use futures::stream::BoxStream; use parquet::DecodeResult; +use parquet::arrow::ProjectionMask; use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics; use parquet::arrow::arrow_reader::{ArrowReaderMetadata, RowSelectionPolicy}; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::push_decoder::{ParquetPushDecoder, ParquetPushDecoderBuilder}; use datafusion_common::{DataFusionError, Result}; -use datafusion_physical_expr::projection::Projector; use datafusion_physical_plan::metrics::{BaselineMetrics, Gauge}; use crate::access_plan::PreparedAccessPlan; -use crate::row_filter::ParquetReadPlan; +use crate::decoder_projection::DecoderProjection; /// Shared options applied to every [`ParquetPushDecoderBuilder`] in a file scan. /// @@ -58,7 +56,9 @@ use crate::row_filter::ParquetReadPlan; /// requirements). All decoders in that scan share the same projection, batch /// size, metrics sink, and selection policy. pub(crate) struct DecoderBuilderConfig<'a> { - pub(crate) read_plan: &'a ParquetReadPlan, + /// Projection mask installed on every decoder in the scan. Sourced from + /// the file's [`DecoderProjection`]. + pub(crate) projection_mask: &'a ProjectionMask, pub(crate) batch_size: usize, pub(crate) arrow_reader_metrics: &'a ArrowReaderMetrics, pub(crate) force_filter_selections: bool, @@ -77,7 +77,7 @@ impl DecoderBuilderConfig<'_> { metadata: ArrowReaderMetadata, ) -> ParquetPushDecoderBuilder { let mut builder = ParquetPushDecoderBuilder::new_with_metadata(metadata) - .with_projection(self.read_plan.projection_mask.clone()) + .with_projection(self.projection_mask.clone()) .with_batch_size(self.batch_size) .with_metrics(self.arrow_reader_metrics.clone()); if self.force_filter_selections { @@ -113,9 +113,9 @@ pub(crate) struct PushDecoderStreamState { /// here instead. pub(crate) remaining_limit: Option, pub(crate) reader: Box, - pub(crate) projector: Projector, - pub(crate) output_schema: Arc, - pub(crate) replace_schema: bool, + /// Per-file projection: the mask installed on every decoder and the + /// per-batch transform applied by [`Self::project_batch`]. + pub(crate) decoder_projection: DecoderProjection, pub(crate) arrow_reader_metrics: ArrowReaderMetrics, pub(crate) predicate_cache_inner_records: Gauge, pub(crate) predicate_cache_records: Gauge, @@ -216,24 +216,6 @@ impl PushDecoderStreamState { } fn project_batch(&self, batch: &RecordBatch) -> Result { - let mut batch = self.projector.project_batch(batch)?; - if self.replace_schema { - // Ensure the output batch has the expected schema. - // This handles things like schema level and field level metadata, which may not be present - // in the physical file schema. - // It is also possible for nullability to differ; some writers create files with - // OPTIONAL fields even when there are no nulls in the data. - // In these cases it may make sense for the logical schema to be `NOT NULL`. - // RecordBatch::try_new_with_options checks that if the schema is NOT NULL - // the array cannot contain nulls, amongst other checks. - let (_stream_schema, arrays, num_rows) = batch.into_parts(); - let options = RecordBatchOptions::new().with_row_count(Some(num_rows)); - batch = RecordBatch::try_new_with_options( - Arc::clone(&self.output_schema), - arrays, - &options, - )?; - } - Ok(batch) + self.decoder_projection.map(batch) } }