Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 130 additions & 0 deletions datafusion/datasource-parquet/src/decoder_projection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Decoder-projection construction for the parquet scan.
//!
//! [`DecoderProjection`] owns the two halves of "project a decoded parquet
//! batch onto the scan's output schema":
//!
//! * the [`ProjectionMask`] installed on every parquet decoder run, and
//! * the per-batch transform ([`DecoderProjection::map`]) that applies the
//! projector and, when needed, rebuilds the batch with the user's
//! `output_schema` to recover metadata / nullability the file schema does
//! not carry.
//!
//! The opener constructs one [`DecoderProjection`] per file via
//! [`DecoderProjection::build`] and hands it to the push-decoder stream,
Comment thread
adriangb marked this conversation as resolved.
Outdated
//! which calls [`map`](DecoderProjection::map) on every decoded batch.

use std::sync::Arc;

use arrow::array::{RecordBatch, RecordBatchOptions};
use arrow::datatypes::SchemaRef;

use datafusion_common::Result;
use datafusion_physical_expr::projection::{ProjectionExprs, Projector};
use datafusion_physical_expr::utils::reassign_expr_columns;

use parquet::arrow::ProjectionMask;
use parquet::schema::types::SchemaDescriptor;

use crate::row_filter::build_projection_read_plan;

/// Per-file decoder projection: the [`ProjectionMask`] installed on every
/// parquet decoder run, plus the per-batch transform that maps the decoder's
/// output onto the scan's `output_schema`.
///
/// Built once per file by the opener via [`Self::try_new`]; the
/// push-decoder stream installs [`Self::projection_mask`] on each decoder
/// and calls [`Self::map`] on every decoded batch.
pub(crate) struct DecoderProjection {
projection_mask: ProjectionMask,
projector: Projector,
output_schema: SchemaRef,
/// `true` when the projector's output schema differs from `output_schema`
/// in metadata / nullability and [`map`](Self::map) must rebuild the batch
/// with `output_schema`.
replace_schema: bool,
}

impl DecoderProjection {
/// Build the decoder projection for a file.
///
/// `projection` references columns in `physical_file_schema` (i.e. already
/// adapted by the per-file expr adapter); `parquet_schema` is the
/// corresponding parquet [`SchemaDescriptor`]. `output_schema` is what
/// consumers of the scan stream expect.
pub(crate) fn try_new(
projection: &ProjectionExprs,
physical_file_schema: &SchemaRef,
parquet_schema: &SchemaDescriptor,
output_schema: &SchemaRef,
) -> Result<Self> {
let read_plan = build_projection_read_plan(
projection.expr_iter(),
physical_file_schema,
parquet_schema,
);

let stream_schema = read_plan.projected_schema;

// Rebase the projection onto the decoder's stream schema (column
// indices change because the decoder yields only the masked columns).
let rebased_projection = projection
.clone()
.try_map_exprs(|expr| reassign_expr_columns(expr, &stream_schema))?;
let projector = rebased_projection.make_projector(&stream_schema)?;

// Compare against the projector's *output* schema rather than the
// stream schema, so future widening of the mask (e.g. for post-scan
// filter columns) does not flip this flag.
let replace_schema = projector.output_schema() != output_schema;

Ok(Self {
projection_mask: read_plan.projection_mask,
projector,
output_schema: Arc::clone(output_schema),
replace_schema,
})
}

/// The projection mask to install on every parquet decoder in the scan.
pub(crate) fn projection_mask(&self) -> &ProjectionMask {
&self.projection_mask
}

/// Map a decoded batch onto the scan's output schema.
///
/// Applies the [`Projector`] and, when the projector's output schema
/// differs from `output_schema` in metadata or nullability, rebuilds the
/// batch with `output_schema` (some writers emit OPTIONAL fields even when
/// the data has no nulls; some logical schemas carry field-level metadata
/// the file schema does not).
pub(crate) fn map(&self, batch: &RecordBatch) -> Result<RecordBatch> {
let projected = self.projector.project_batch(batch)?;
if !self.replace_schema {
return Ok(projected);
}
let (_stream_schema, arrays, num_rows) = projected.into_parts();
let options = RecordBatchOptions::new().with_row_count(Some(num_rows));
Ok(RecordBatch::try_new_with_options(
Arc::clone(&self.output_schema),
arrays,
&options,
)?)
}
}
1 change: 1 addition & 0 deletions datafusion/datasource-parquet/src/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

pub mod access_plan;
mod bloom_filter;
mod decoder_projection;
pub mod file_format;
pub mod metadata;
mod metrics;
Expand Down
35 changes: 13 additions & 22 deletions datafusion/datasource-parquet/src/opener/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ use self::early_stop::EarlyStoppingStream;
#[cfg(feature = "parquet_encryption")]
use self::encryption::EncryptionContext;
use crate::access_plan::PreparedAccessPlan;
use crate::decoder_projection::DecoderProjection;
use crate::page_filter::PagePruningAccessPlanFilter;
use crate::push_decoder::{DecoderBuilderConfig, PushDecoderStreamState};
use crate::row_filter::{RowFilterGenerator, build_projection_read_plan};
use crate::row_filter::RowFilterGenerator;
use crate::row_group_filter::{BloomFilterStatistics, RowGroupAccessPlanFilter};
use crate::{
Int96Coercer, ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory,
Expand All @@ -36,7 +37,6 @@ use arrow::array::RecordBatch;
use arrow::datatypes::DataType;
use datafusion_datasource::morsel::{Morsel, MorselPlan, MorselPlanner, Morselizer};
use datafusion_physical_expr::projection::ProjectionExprs;
use datafusion_physical_expr::utils::reassign_expr_columns;
use datafusion_physical_expr_adapter::replace_columns_with_literals;
use std::collections::{HashMap, VecDeque};
use std::fmt;
Expand Down Expand Up @@ -1156,11 +1156,17 @@ impl RowGroupsPrunedParquetOpen {
};

let arrow_reader_metrics = ArrowReaderMetrics::enabled();
let read_plan = build_projection_read_plan(
prepared.projection.expr_iter(),

// Build the decoder projection (mask + per-batch transform) in a
// single call. Encapsulating it behind `DecoderProjection` keeps the
// opener's orchestration body focused on filter / decoder / stream
// wiring.
let decoder_projection = DecoderProjection::try_new(
&prepared.projection,
&prepared.physical_file_schema,
reader_metadata.parquet_schema(),
);
&prepared.output_schema,
)?;

let (decoder, pending_decoders, remaining_limit) = {
let pushdown_predicate = prepared
Expand Down Expand Up @@ -1188,7 +1194,7 @@ impl RowGroupsPrunedParquetOpen {
let remaining_limit = prepared.limit.filter(|_| run_count > 1);

let decoder_config = DecoderBuilderConfig {
read_plan: &read_plan,
projection_mask: decoder_projection.projection_mask(),
batch_size: prepared.batch_size,
arrow_reader_metrics: &arrow_reader_metrics,
force_filter_selections: prepared.force_filter_selections,
Expand Down Expand Up @@ -1226,29 +1232,14 @@ impl RowGroupsPrunedParquetOpen {
let predicate_cache_records =
prepared.file_metrics.predicate_cache_records.clone();

// Check if we need to replace the schema to handle things like differing nullability or metadata.
// See note below about file vs. output schema.
let stream_schema = read_plan.projected_schema;
let replace_schema = stream_schema != prepared.output_schema;

// Rebase column indices to match the narrowed stream schema.
// The projection expressions have indices based on physical_file_schema,
// but the stream only contains the columns selected by the ProjectionMask.
let projection = prepared
.projection
.try_map_exprs(|expr| reassign_expr_columns(expr, &stream_schema))?;
let projector = projection.make_projector(&stream_schema)?;
let output_schema = Arc::clone(&prepared.output_schema);
let files_ranges_pruned_statistics =
prepared.file_metrics.files_ranges_pruned_statistics.clone();
let stream = PushDecoderStreamState {
decoder,
pending_decoders,
remaining_limit,
reader: prepared.async_file_reader,
projector,
output_schema,
replace_schema,
decoder_projection,
arrow_reader_metrics,
predicate_cache_inner_records,
predicate_cache_records,
Expand Down
40 changes: 11 additions & 29 deletions datafusion/datasource-parquet/src/push_decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,22 @@
//! [`PushDecoderStreamState::into_stream`] for consumption.

use std::collections::VecDeque;
use std::sync::Arc;

use arrow::array::{RecordBatch, RecordBatchOptions};
use arrow::datatypes::Schema;
use arrow::array::RecordBatch;
use futures::StreamExt;
use futures::stream::BoxStream;
use parquet::DecodeResult;
use parquet::arrow::ProjectionMask;
use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics;
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, RowSelectionPolicy};
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::push_decoder::{ParquetPushDecoder, ParquetPushDecoderBuilder};

use datafusion_common::{DataFusionError, Result};
use datafusion_physical_expr::projection::Projector;
use datafusion_physical_plan::metrics::{BaselineMetrics, Gauge};

use crate::access_plan::PreparedAccessPlan;
use crate::row_filter::ParquetReadPlan;
use crate::decoder_projection::DecoderProjection;

/// Shared options applied to every [`ParquetPushDecoderBuilder`] in a file scan.
///
Expand All @@ -58,7 +56,9 @@ use crate::row_filter::ParquetReadPlan;
/// requirements). All decoders in that scan share the same projection, batch
/// size, metrics sink, and selection policy.
pub(crate) struct DecoderBuilderConfig<'a> {
pub(crate) read_plan: &'a ParquetReadPlan,
/// Projection mask installed on every decoder in the scan. Sourced from
/// the file's [`DecoderProjection`].
pub(crate) projection_mask: &'a ProjectionMask,
pub(crate) batch_size: usize,
pub(crate) arrow_reader_metrics: &'a ArrowReaderMetrics,
pub(crate) force_filter_selections: bool,
Expand All @@ -77,7 +77,7 @@ impl DecoderBuilderConfig<'_> {
metadata: ArrowReaderMetadata,
) -> ParquetPushDecoderBuilder {
let mut builder = ParquetPushDecoderBuilder::new_with_metadata(metadata)
.with_projection(self.read_plan.projection_mask.clone())
.with_projection(self.projection_mask.clone())
.with_batch_size(self.batch_size)
.with_metrics(self.arrow_reader_metrics.clone());
if self.force_filter_selections {
Expand Down Expand Up @@ -113,9 +113,9 @@ pub(crate) struct PushDecoderStreamState {
/// here instead.
pub(crate) remaining_limit: Option<usize>,
pub(crate) reader: Box<dyn AsyncFileReader>,
pub(crate) projector: Projector,
pub(crate) output_schema: Arc<Schema>,
pub(crate) replace_schema: bool,
/// Per-file projection: the mask installed on every decoder and the
/// per-batch transform applied by [`Self::project_batch`].
pub(crate) decoder_projection: DecoderProjection,
pub(crate) arrow_reader_metrics: ArrowReaderMetrics,
pub(crate) predicate_cache_inner_records: Gauge,
pub(crate) predicate_cache_records: Gauge,
Expand Down Expand Up @@ -216,24 +216,6 @@ impl PushDecoderStreamState {
}

fn project_batch(&self, batch: &RecordBatch) -> Result<RecordBatch> {
let mut batch = self.projector.project_batch(batch)?;
if self.replace_schema {
// Ensure the output batch has the expected schema.
// This handles things like schema level and field level metadata, which may not be present
// in the physical file schema.
// It is also possible for nullability to differ; some writers create files with
// OPTIONAL fields even when there are no nulls in the data.
// In these cases it may make sense for the logical schema to be `NOT NULL`.
// RecordBatch::try_new_with_options checks that if the schema is NOT NULL
// the array cannot contain nulls, amongst other checks.
let (_stream_schema, arrays, num_rows) = batch.into_parts();
let options = RecordBatchOptions::new().with_row_count(Some(num_rows));
batch = RecordBatch::try_new_with_options(
Arc::clone(&self.output_schema),
arrays,
&options,
)?;
}
Ok(batch)
self.decoder_projection.map(batch)
}
}
Loading