Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 21 additions & 8 deletions datafusion/catalog-listing/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,17 +339,28 @@ fn filter_partitions(
Ok(None)
}

/// Returns `Ok(None)` when the file is not inside a valid partition path
/// (e.g. a stale file in the table root directory). Such files are skipped
/// by the caller rather than included with empty partition values, which
/// would cause downstream errors when the query references partition columns.
fn try_into_partitioned_file(
object_meta: ObjectMeta,
partition_cols: &[(String, DataType)],
table_path: &ListingTableUrl,
) -> Result<PartitionedFile> {
) -> Result<Option<PartitionedFile>> {
let cols = partition_cols.iter().map(|(name, _)| name.as_str());
let parsed = parse_partitions_for_path(table_path, &object_meta.location, cols);

let Some(parsed) = parsed else {
debug!(
"Skipping file outside partition structure: {}",
object_meta.location
);
return Ok(None);
};

let partition_values = parsed
.into_iter()
.flatten()
.zip(partition_cols)
.map(|(parsed, (_, datatype))| {
ScalarValue::try_from_string(parsed.to_string(), datatype)
Expand All @@ -359,7 +370,7 @@ fn try_into_partitioned_file(
let mut pf: PartitionedFile = object_meta.into();
pf.partition_values = partition_values;

Ok(pf)
Ok(Some(pf))
}

/// Discover the partitions on the given path and prune out files
Expand Down Expand Up @@ -404,13 +415,15 @@ pub async fn pruned_partition_list<'a>(
)?;

Ok(objects
.map_ok(|object_meta| {
try_into_partitioned_file(object_meta, partition_cols, table_path)
.try_filter_map(|object_meta| {
futures::future::ready(try_into_partitioned_file(
object_meta,
partition_cols,
table_path,
))
})
.try_filter_map(move |pf| {
futures::future::ready(
pf.and_then(|pf| filter_partitions(pf, filters, &df_schema)),
)
futures::future::ready(filter_partitions(pf, filters, &df_schema))
})
.boxed())
}
Expand Down
298 changes: 274 additions & 24 deletions datafusion/common/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use std::fmt::{self, Debug, Display};

use crate::{Result, ScalarValue};

use crate::error::_plan_err;
use arrow::datatypes::{DataType, Schema};

/// Represents a value with a degree of certainty. `Precision` is used to
Expand Down Expand Up @@ -615,19 +614,25 @@ impl Statistics {
num_rows = num_rows.add(&other.num_rows);
total_byte_size = total_byte_size.add(&other.total_byte_size);

if column_statistics.len() != other.column_statistics.len() {
return _plan_err!(
"Cannot merge statistics with different number of columns: {} vs {}",
column_statistics.len(),
other.column_statistics.len()
);
}

for (item_col_stats, col_stats) in other
.column_statistics
.iter()
.zip(column_statistics.iter_mut())
{
// Tolerate schema evolution: when files have different column counts
// (e.g. newer files added columns), pad the shorter side with unknown
// statistics so the merge can proceed. Columns present in only some
// files will have Absent statistics after the merge.
let max_cols = column_statistics.len().max(other.column_statistics.len());
column_statistics.resize(max_cols, ColumnStatistics::new_unknown());

Comment on lines +617 to +623
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Statistics::try_merge no longer errors when column_statistics lengths differ, but the function docs still say it returns an error for mismatched schemas. Please update the doc comment to reflect the new schema-evolution behavior (and what kinds of mismatches, if any, still produce an error).

Copilot uses AI. Check for mistakes.
for (idx, col_stats) in column_statistics.iter_mut().enumerate() {
// When the other side has fewer columns (schema evolution),
// treat the missing column as unknown so the merge is symmetric
// regardless of iteration order.
let unknown;
let item_col_stats = match other.column_statistics.get(idx) {
Some(cs) => cs,
None => {
unknown = ColumnStatistics::new_unknown();
&unknown
}
};
col_stats.null_count = col_stats.null_count.add(&item_col_stats.null_count);
col_stats.max_value = col_stats.max_value.max(&item_col_stats.max_value);
col_stats.min_value = col_stats.min_value.min(&item_col_stats.min_value);
Expand Down Expand Up @@ -1287,26 +1292,271 @@ mod tests {
}

#[test]
fn test_try_merge_mismatched_size() {
// Create a schema with one column
fn test_try_merge_mismatched_size_pads_shorter() {
let schema = Arc::new(Schema::new(vec![Field::new(
"col1",
DataType::Int32,
false,
)]));

// No column statistics
let stats1 = Statistics::default();
// stats1 has 0 columns, stats2 has 1 column
let stats1 = Statistics::default().with_num_rows(Precision::Exact(5));
let stats2 = Statistics::default()
.with_num_rows(Precision::Exact(10))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::from(1)))
.with_max_value(Precision::Exact(ScalarValue::from(100))),
);

let items = vec![stats1, stats2];
let merged = Statistics::try_merge_iter(&items, &schema).unwrap();

assert_eq!(merged.num_rows, Precision::Exact(15));
assert_eq!(merged.column_statistics.len(), 1);
// Padded unknown merged with Exact → Absent (unknown poisons the merge)
assert_eq!(merged.column_statistics[0].min_value, Precision::Absent);
assert_eq!(merged.column_statistics[0].max_value, Precision::Absent);
}

#[test]
fn test_try_merge_schema_evolution_new_columns() {
// Simulates schema evolution: old files have 2 columns, new files have 3
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Int32, true), // new column
]));

// Old file: 2 columns
let stats_old = Statistics::default()
.with_num_rows(Precision::Exact(100))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::from(1)))
.with_max_value(Precision::Exact(ScalarValue::from(50))),
)
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::from(10)))
.with_max_value(Precision::Exact(ScalarValue::from(500))),
);

// New file: 3 columns
let stats_new = Statistics::default()
.with_num_rows(Precision::Exact(200))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::from(5)))
.with_max_value(Precision::Exact(ScalarValue::from(80))),
)
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::from(20)))
.with_max_value(Precision::Exact(ScalarValue::from(300))),
)
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::from(0)))
.with_max_value(Precision::Exact(ScalarValue::from(999))),
);

let items = vec![stats_old, stats_new];
let merged = Statistics::try_merge_iter(&items, &schema).unwrap();

assert_eq!(merged.num_rows, Precision::Exact(300));
assert_eq!(merged.column_statistics.len(), 3);

// Column a: merged from both files
assert_eq!(
merged.column_statistics[0].min_value,
Precision::Exact(ScalarValue::from(1))
);
assert_eq!(
merged.column_statistics[0].max_value,
Precision::Exact(ScalarValue::from(80))
);

// Column b: merged from both files
assert_eq!(
merged.column_statistics[1].min_value,
Precision::Exact(ScalarValue::from(10))
);
assert_eq!(
merged.column_statistics[1].max_value,
Precision::Exact(ScalarValue::from(500))
);

// Column c: only present in new file, old file was padded with unknown.
// unknown.min(Exact) = Absent, so the merged stats are Absent.
assert_eq!(merged.column_statistics[2].min_value, Precision::Absent);
assert_eq!(merged.column_statistics[2].max_value, Precision::Absent);
}

#[test]
fn test_try_merge_schema_evolution_reversed_order() {
// New file first, old file second — should produce same result
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, true),
]));

let stats_new = Statistics::default()
.with_num_rows(Precision::Exact(50))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::from(1)))
.with_max_value(Precision::Exact(ScalarValue::from(10))),
)
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::from(100)))
.with_max_value(Precision::Exact(ScalarValue::from(200))),
);

let stats_old = Statistics::default()
.with_num_rows(Precision::Exact(30))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::from(5)))
.with_max_value(Precision::Exact(ScalarValue::from(20))),
);

let items = vec![stats_new, stats_old];
let merged = Statistics::try_merge_iter(&items, &schema).unwrap();

let stats2 =
Statistics::default().add_column_statistics(ColumnStatistics::new_unknown());
assert_eq!(merged.num_rows, Precision::Exact(80));
assert_eq!(merged.column_statistics.len(), 2);

// Column a: merged
assert_eq!(
merged.column_statistics[0].min_value,
Precision::Exact(ScalarValue::from(1))
);
assert_eq!(
merged.column_statistics[0].max_value,
Precision::Exact(ScalarValue::from(20))
);

// Column b: only in stats_new, stats_old treated as unknown.
// Exact.min(Absent) = Absent — symmetric with forward order.
assert_eq!(merged.column_statistics[1].min_value, Precision::Absent);
assert_eq!(merged.column_statistics[1].max_value, Precision::Absent);
}

#[test]
fn test_try_merge_same_column_count_unchanged() {
// Ensure same-column-count merge still works identically
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));

let stats1 = Statistics::default()
.with_num_rows(Precision::Exact(10))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::from(1)))
.with_max_value(Precision::Exact(ScalarValue::from(50))),
);

let stats2 = Statistics::default()
.with_num_rows(Precision::Exact(20))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::from(5)))
.with_max_value(Precision::Exact(ScalarValue::from(100))),
);

let items = vec![stats1, stats2];
let merged = Statistics::try_merge_iter(&items, &schema).unwrap();

assert_eq!(merged.num_rows, Precision::Exact(30));
assert_eq!(merged.column_statistics.len(), 1);
assert_eq!(
merged.column_statistics[0].min_value,
Precision::Exact(ScalarValue::from(1))
);
assert_eq!(
merged.column_statistics[0].max_value,
Precision::Exact(ScalarValue::from(100))
);
}

#[test]
fn test_try_merge_iter_pads_to_schema() {
// Table schema has 3 columns but all files only have 2.
// Merged stats should be padded to 3 columns.
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Int32, true), // new column, no files have it
]));
Comment on lines +1484 to +1491
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test’s name/comment says merge_iter “pads to schema”, but the assertions verify the opposite (column_statistics.len() remains 2, not 3). Please rename/update the test (and its comments) so it matches the intended behavior described in the PR (no padding to table schema / partition columns handled elsewhere).

Copilot uses AI. Check for mistakes.

let stats1 = Statistics::default()
.with_num_rows(Precision::Exact(10))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::from(1))),
)
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::from(10))),
);

let stats2 = Statistics::default()
.with_num_rows(Precision::Exact(20))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::from(5))),
)
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::from(20))),
);

let items = vec![stats1, stats2];
let merged = Statistics::try_merge_iter(&items, &schema).unwrap();

assert_eq!(merged.num_rows, Precision::Exact(30));
// Merged stats have max(file1_cols, file2_cols) = 2, not schema cols
assert_eq!(merged.column_statistics.len(), 2);

let e = Statistics::try_merge_iter(&items, &schema).unwrap_err();
assert_contains!(
e.to_string(),
"Error during planning: Cannot merge statistics with different number of columns: 0 vs 1"
// Columns a, b: merged from both files
assert_eq!(
merged.column_statistics[0].min_value,
Precision::Exact(ScalarValue::from(1))
);
assert_eq!(
merged.column_statistics[1].min_value,
Precision::Exact(ScalarValue::from(10))
);
}

#[test]
fn test_try_merge_iter_does_not_pad_for_partition_columns() {
// Schema includes partition column; files don't have stats for it.
// try_merge_iter should NOT pad to schema — partition columns are
// handled by the caller (e.g. Atlas's plan.rs or DF's statistics.rs).
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment references “Atlas's plan.rs”, but there’s no corresponding code in this repo and it’s unclear to DataFusion contributors. Please replace this with an in-repo reference (e.g. the DataFusion module that pads/handles partition column statistics) or remove the external mention.

Suggested change
// handled by the caller (e.g. Atlas's plan.rs or DF's statistics.rs).
// expected to be handled by higher-level planning/statistics code.

Copilot uses AI. Check for mistakes.
let schema = Arc::new(Schema::new(vec![
Field::new("data_col", DataType::Int32, false),
Field::new("partition_col", DataType::Utf8, false),
]));

let stats = Statistics::default()
.with_num_rows(Precision::Exact(100))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::from(1)))
.with_max_value(Precision::Exact(ScalarValue::from(999))),
);
// Only 1 column stat — partition column not included

let items = vec![stats];
let merged = Statistics::try_merge_iter(&items, &schema).unwrap();

// Should stay 1 — not padded to schema's 2 columns
assert_eq!(merged.column_statistics.len(), 1);
assert_eq!(
merged.column_statistics[0].min_value,
Precision::Exact(ScalarValue::from(1))
);
}

Expand Down
Loading
Loading