diff --git a/datafusion/catalog-listing/src/helpers.rs b/datafusion/catalog-listing/src/helpers.rs index ea016015cebd3..2c3afa15e74fa 100644 --- a/datafusion/catalog-listing/src/helpers.rs +++ b/datafusion/catalog-listing/src/helpers.rs @@ -339,17 +339,28 @@ fn filter_partitions( Ok(None) } +/// Returns `Ok(None)` when the file is not inside a valid partition path +/// (e.g. a stale file in the table root directory). Such files are skipped +/// by the caller rather than included with empty partition values, which +/// would cause downstream errors when the query references partition columns. fn try_into_partitioned_file( object_meta: ObjectMeta, partition_cols: &[(String, DataType)], table_path: &ListingTableUrl, -) -> Result { +) -> Result> { let cols = partition_cols.iter().map(|(name, _)| name.as_str()); let parsed = parse_partitions_for_path(table_path, &object_meta.location, cols); + let Some(parsed) = parsed else { + debug!( + "Skipping file outside partition structure: {}", + object_meta.location + ); + return Ok(None); + }; + let partition_values = parsed .into_iter() - .flatten() .zip(partition_cols) .map(|(parsed, (_, datatype))| { ScalarValue::try_from_string(parsed.to_string(), datatype) @@ -359,7 +370,7 @@ fn try_into_partitioned_file( let mut pf: PartitionedFile = object_meta.into(); pf.partition_values = partition_values; - Ok(pf) + Ok(Some(pf)) } /// Discover the partitions on the given path and prune out files @@ -404,13 +415,15 @@ pub async fn pruned_partition_list<'a>( )?; Ok(objects - .map_ok(|object_meta| { - try_into_partitioned_file(object_meta, partition_cols, table_path) + .try_filter_map(|object_meta| { + futures::future::ready(try_into_partitioned_file( + object_meta, + partition_cols, + table_path, + )) }) .try_filter_map(move |pf| { - futures::future::ready( - pf.and_then(|pf| filter_partitions(pf, filters, &df_schema)), - ) + futures::future::ready(filter_partitions(pf, filters, &df_schema)) }) .boxed()) } diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index ba13ef392d912..f94ef8656becc 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -21,7 +21,6 @@ use std::fmt::{self, Debug, Display}; use crate::{Result, ScalarValue}; -use crate::error::_plan_err; use arrow::datatypes::{DataType, Schema}; /// Represents a value with a degree of certainty. `Precision` is used to @@ -615,19 +614,25 @@ impl Statistics { num_rows = num_rows.add(&other.num_rows); total_byte_size = total_byte_size.add(&other.total_byte_size); - if column_statistics.len() != other.column_statistics.len() { - return _plan_err!( - "Cannot merge statistics with different number of columns: {} vs {}", - column_statistics.len(), - other.column_statistics.len() - ); - } - - for (item_col_stats, col_stats) in other - .column_statistics - .iter() - .zip(column_statistics.iter_mut()) - { + // Tolerate schema evolution: when files have different column counts + // (e.g. newer files added columns), pad the shorter side with unknown + // statistics so the merge can proceed. Columns present in only some + // files will have Absent statistics after the merge. + let max_cols = column_statistics.len().max(other.column_statistics.len()); + column_statistics.resize(max_cols, ColumnStatistics::new_unknown()); + + for (idx, col_stats) in column_statistics.iter_mut().enumerate() { + // When the other side has fewer columns (schema evolution), + // treat the missing column as unknown so the merge is symmetric + // regardless of iteration order. + let unknown; + let item_col_stats = match other.column_statistics.get(idx) { + Some(cs) => cs, + None => { + unknown = ColumnStatistics::new_unknown(); + &unknown + } + }; col_stats.null_count = col_stats.null_count.add(&item_col_stats.null_count); col_stats.max_value = col_stats.max_value.max(&item_col_stats.max_value); col_stats.min_value = col_stats.min_value.min(&item_col_stats.min_value); @@ -1287,26 +1292,271 @@ mod tests { } #[test] - fn test_try_merge_mismatched_size() { - // Create a schema with one column + fn test_try_merge_mismatched_size_pads_shorter() { let schema = Arc::new(Schema::new(vec![Field::new( "col1", DataType::Int32, false, )])); - // No column statistics - let stats1 = Statistics::default(); + // stats1 has 0 columns, stats2 has 1 column + let stats1 = Statistics::default().with_num_rows(Precision::Exact(5)); + let stats2 = Statistics::default() + .with_num_rows(Precision::Exact(10)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::from(1))) + .with_max_value(Precision::Exact(ScalarValue::from(100))), + ); + + let items = vec![stats1, stats2]; + let merged = Statistics::try_merge_iter(&items, &schema).unwrap(); + + assert_eq!(merged.num_rows, Precision::Exact(15)); + assert_eq!(merged.column_statistics.len(), 1); + // Padded unknown merged with Exact → Absent (unknown poisons the merge) + assert_eq!(merged.column_statistics[0].min_value, Precision::Absent); + assert_eq!(merged.column_statistics[0].max_value, Precision::Absent); + } + + #[test] + fn test_try_merge_schema_evolution_new_columns() { + // Simulates schema evolution: old files have 2 columns, new files have 3 + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Int32, true), // new column + ])); + + // Old file: 2 columns + let stats_old = Statistics::default() + .with_num_rows(Precision::Exact(100)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::from(1))) + .with_max_value(Precision::Exact(ScalarValue::from(50))), + ) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::from(10))) + .with_max_value(Precision::Exact(ScalarValue::from(500))), + ); + + // New file: 3 columns + let stats_new = Statistics::default() + .with_num_rows(Precision::Exact(200)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::from(5))) + .with_max_value(Precision::Exact(ScalarValue::from(80))), + ) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::from(20))) + .with_max_value(Precision::Exact(ScalarValue::from(300))), + ) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::from(0))) + .with_max_value(Precision::Exact(ScalarValue::from(999))), + ); + + let items = vec![stats_old, stats_new]; + let merged = Statistics::try_merge_iter(&items, &schema).unwrap(); + + assert_eq!(merged.num_rows, Precision::Exact(300)); + assert_eq!(merged.column_statistics.len(), 3); + + // Column a: merged from both files + assert_eq!( + merged.column_statistics[0].min_value, + Precision::Exact(ScalarValue::from(1)) + ); + assert_eq!( + merged.column_statistics[0].max_value, + Precision::Exact(ScalarValue::from(80)) + ); + + // Column b: merged from both files + assert_eq!( + merged.column_statistics[1].min_value, + Precision::Exact(ScalarValue::from(10)) + ); + assert_eq!( + merged.column_statistics[1].max_value, + Precision::Exact(ScalarValue::from(500)) + ); + + // Column c: only present in new file, old file was padded with unknown. + // unknown.min(Exact) = Absent, so the merged stats are Absent. + assert_eq!(merged.column_statistics[2].min_value, Precision::Absent); + assert_eq!(merged.column_statistics[2].max_value, Precision::Absent); + } + + #[test] + fn test_try_merge_schema_evolution_reversed_order() { + // New file first, old file second — should produce same result + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, true), + ])); + + let stats_new = Statistics::default() + .with_num_rows(Precision::Exact(50)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::from(1))) + .with_max_value(Precision::Exact(ScalarValue::from(10))), + ) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::from(100))) + .with_max_value(Precision::Exact(ScalarValue::from(200))), + ); + + let stats_old = Statistics::default() + .with_num_rows(Precision::Exact(30)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::from(5))) + .with_max_value(Precision::Exact(ScalarValue::from(20))), + ); + + let items = vec![stats_new, stats_old]; + let merged = Statistics::try_merge_iter(&items, &schema).unwrap(); - let stats2 = - Statistics::default().add_column_statistics(ColumnStatistics::new_unknown()); + assert_eq!(merged.num_rows, Precision::Exact(80)); + assert_eq!(merged.column_statistics.len(), 2); + + // Column a: merged + assert_eq!( + merged.column_statistics[0].min_value, + Precision::Exact(ScalarValue::from(1)) + ); + assert_eq!( + merged.column_statistics[0].max_value, + Precision::Exact(ScalarValue::from(20)) + ); + + // Column b: only in stats_new, stats_old treated as unknown. + // Exact.min(Absent) = Absent — symmetric with forward order. + assert_eq!(merged.column_statistics[1].min_value, Precision::Absent); + assert_eq!(merged.column_statistics[1].max_value, Precision::Absent); + } + + #[test] + fn test_try_merge_same_column_count_unchanged() { + // Ensure same-column-count merge still works identically + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + + let stats1 = Statistics::default() + .with_num_rows(Precision::Exact(10)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::from(1))) + .with_max_value(Precision::Exact(ScalarValue::from(50))), + ); + + let stats2 = Statistics::default() + .with_num_rows(Precision::Exact(20)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::from(5))) + .with_max_value(Precision::Exact(ScalarValue::from(100))), + ); + + let items = vec![stats1, stats2]; + let merged = Statistics::try_merge_iter(&items, &schema).unwrap(); + + assert_eq!(merged.num_rows, Precision::Exact(30)); + assert_eq!(merged.column_statistics.len(), 1); + assert_eq!( + merged.column_statistics[0].min_value, + Precision::Exact(ScalarValue::from(1)) + ); + assert_eq!( + merged.column_statistics[0].max_value, + Precision::Exact(ScalarValue::from(100)) + ); + } + + #[test] + fn test_try_merge_iter_pads_to_schema() { + // Table schema has 3 columns but all files only have 2. + // Merged stats should be padded to 3 columns. + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Int32, true), // new column, no files have it + ])); + + let stats1 = Statistics::default() + .with_num_rows(Precision::Exact(10)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::from(1))), + ) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::from(10))), + ); + + let stats2 = Statistics::default() + .with_num_rows(Precision::Exact(20)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::from(5))), + ) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::from(20))), + ); let items = vec![stats1, stats2]; + let merged = Statistics::try_merge_iter(&items, &schema).unwrap(); + + assert_eq!(merged.num_rows, Precision::Exact(30)); + // Merged stats have max(file1_cols, file2_cols) = 2, not schema cols + assert_eq!(merged.column_statistics.len(), 2); - let e = Statistics::try_merge_iter(&items, &schema).unwrap_err(); - assert_contains!( - e.to_string(), - "Error during planning: Cannot merge statistics with different number of columns: 0 vs 1" + // Columns a, b: merged from both files + assert_eq!( + merged.column_statistics[0].min_value, + Precision::Exact(ScalarValue::from(1)) + ); + assert_eq!( + merged.column_statistics[1].min_value, + Precision::Exact(ScalarValue::from(10)) + ); + } + + #[test] + fn test_try_merge_iter_does_not_pad_for_partition_columns() { + // Schema includes partition column; files don't have stats for it. + // try_merge_iter should NOT pad to schema — partition columns are + // handled by the caller (e.g. Atlas's plan.rs or DF's statistics.rs). + let schema = Arc::new(Schema::new(vec![ + Field::new("data_col", DataType::Int32, false), + Field::new("partition_col", DataType::Utf8, false), + ])); + + let stats = Statistics::default() + .with_num_rows(Precision::Exact(100)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::from(1))) + .with_max_value(Precision::Exact(ScalarValue::from(999))), + ); + // Only 1 column stat — partition column not included + + let items = vec![stats]; + let merged = Statistics::try_merge_iter(&items, &schema).unwrap(); + + // Should stay 1 — not padded to schema's 2 columns + assert_eq!(merged.column_statistics.len(), 1); + assert_eq!( + merged.column_statistics[0].min_value, + Precision::Exact(ScalarValue::from(1)) ); } diff --git a/datafusion/datasource/src/statistics.rs b/datafusion/datasource/src/statistics.rs index b1a56e096c222..2dd20c98ea753 100644 --- a/datafusion/datasource/src/statistics.rs +++ b/datafusion/datasource/src/statistics.rs @@ -84,20 +84,41 @@ impl MinMaxStatistics { return plan_err!("Parquet file missing statistics"); }; + // Determine the number of file (non-partition) columns from the first + // entry. Partition columns always follow file columns in the table + // schema, so we use partition_values.len() to find the boundary. + // Note: we cannot use `s.column_statistics.len()` per-file because + // schema evolution means different files may have different column + // counts, which would shift the partition offset incorrectly. + let num_file_cols = statistics_and_partition_values + .first() + .map(|(_, pv)| projected_schema.fields().len() - pv.len()) + .unwrap_or(0); + // Helper function to get min/max statistics for a given column of projected_schema let get_min_max = |i: usize| -> Result<(Vec, Vec)> { Ok(statistics_and_partition_values .iter() .map(|(s, pv)| { - if i < s.column_statistics.len() { - s.column_statistics[i] - .min_value - .get_value() - .cloned() - .zip(s.column_statistics[i].max_value.get_value().cloned()) - .ok_or_else(|| plan_datafusion_err!("statistics not found")) + if i < num_file_cols { + if i < s.column_statistics.len() { + s.column_statistics[i] + .min_value + .get_value() + .cloned() + .zip( + s.column_statistics[i].max_value.get_value().cloned(), + ) + .ok_or_else(|| { + plan_datafusion_err!("statistics not found") + }) + } else { + // File doesn't have this column (schema evolution) + // — treat as unknown, skip this file for min/max + Err(plan_datafusion_err!("statistics not found")) + } } else { - let partition_value = &pv[i - s.column_statistics.len()]; + let partition_value = &pv[i - num_file_cols]; Ok((partition_value.clone(), partition_value.clone())) } })