-
Notifications
You must be signed in to change notification settings - Fork 1
Tolerate schema evolution in statistics merge (v2) #50
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: branch-52
Are you sure you want to change the base?
Changes from 5 commits
a8dad16
8ae74a3
6c0cb14
4a276bb
6101fa6
acf1224
a074c0a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -21,7 +21,6 @@ use std::fmt::{self, Debug, Display}; | |||||
|
|
||||||
| use crate::{Result, ScalarValue}; | ||||||
|
|
||||||
| use crate::error::_plan_err; | ||||||
| use arrow::datatypes::{DataType, Schema}; | ||||||
|
|
||||||
| /// Represents a value with a degree of certainty. `Precision` is used to | ||||||
|
|
@@ -615,19 +614,25 @@ impl Statistics { | |||||
| num_rows = num_rows.add(&other.num_rows); | ||||||
| total_byte_size = total_byte_size.add(&other.total_byte_size); | ||||||
|
|
||||||
| if column_statistics.len() != other.column_statistics.len() { | ||||||
| return _plan_err!( | ||||||
| "Cannot merge statistics with different number of columns: {} vs {}", | ||||||
| column_statistics.len(), | ||||||
| other.column_statistics.len() | ||||||
| ); | ||||||
| } | ||||||
|
|
||||||
| for (item_col_stats, col_stats) in other | ||||||
| .column_statistics | ||||||
| .iter() | ||||||
| .zip(column_statistics.iter_mut()) | ||||||
| { | ||||||
| // Tolerate schema evolution: when files have different column counts | ||||||
| // (e.g. newer files added columns), pad the shorter side with unknown | ||||||
| // statistics so the merge can proceed. Columns present in only some | ||||||
| // files will have Absent statistics after the merge. | ||||||
| let max_cols = column_statistics.len().max(other.column_statistics.len()); | ||||||
| column_statistics.resize(max_cols, ColumnStatistics::new_unknown()); | ||||||
|
|
||||||
| for (idx, col_stats) in column_statistics.iter_mut().enumerate() { | ||||||
| // When the other side has fewer columns (schema evolution), | ||||||
| // treat the missing column as unknown so the merge is symmetric | ||||||
| // regardless of iteration order. | ||||||
| let unknown; | ||||||
| let item_col_stats = match other.column_statistics.get(idx) { | ||||||
| Some(cs) => cs, | ||||||
| None => { | ||||||
| unknown = ColumnStatistics::new_unknown(); | ||||||
| &unknown | ||||||
| } | ||||||
| }; | ||||||
| col_stats.null_count = col_stats.null_count.add(&item_col_stats.null_count); | ||||||
| col_stats.max_value = col_stats.max_value.max(&item_col_stats.max_value); | ||||||
| col_stats.min_value = col_stats.min_value.min(&item_col_stats.min_value); | ||||||
|
|
@@ -1287,26 +1292,271 @@ mod tests { | |||||
| } | ||||||
|
|
||||||
| #[test] | ||||||
| fn test_try_merge_mismatched_size() { | ||||||
| // Create a schema with one column | ||||||
| fn test_try_merge_mismatched_size_pads_shorter() { | ||||||
| let schema = Arc::new(Schema::new(vec![Field::new( | ||||||
| "col1", | ||||||
| DataType::Int32, | ||||||
| false, | ||||||
| )])); | ||||||
|
|
||||||
| // No column statistics | ||||||
| let stats1 = Statistics::default(); | ||||||
| // stats1 has 0 columns, stats2 has 1 column | ||||||
| let stats1 = Statistics::default().with_num_rows(Precision::Exact(5)); | ||||||
| let stats2 = Statistics::default() | ||||||
| .with_num_rows(Precision::Exact(10)) | ||||||
| .add_column_statistics( | ||||||
| ColumnStatistics::new_unknown() | ||||||
| .with_min_value(Precision::Exact(ScalarValue::from(1))) | ||||||
| .with_max_value(Precision::Exact(ScalarValue::from(100))), | ||||||
| ); | ||||||
|
|
||||||
| let items = vec![stats1, stats2]; | ||||||
| let merged = Statistics::try_merge_iter(&items, &schema).unwrap(); | ||||||
|
|
||||||
| assert_eq!(merged.num_rows, Precision::Exact(15)); | ||||||
| assert_eq!(merged.column_statistics.len(), 1); | ||||||
| // Padded unknown merged with Exact → Absent (unknown poisons the merge) | ||||||
| assert_eq!(merged.column_statistics[0].min_value, Precision::Absent); | ||||||
| assert_eq!(merged.column_statistics[0].max_value, Precision::Absent); | ||||||
| } | ||||||
|
|
||||||
| #[test] | ||||||
| fn test_try_merge_schema_evolution_new_columns() { | ||||||
| // Simulates schema evolution: old files have 2 columns, new files have 3 | ||||||
| let schema = Arc::new(Schema::new(vec![ | ||||||
| Field::new("a", DataType::Int32, false), | ||||||
| Field::new("b", DataType::Int32, false), | ||||||
| Field::new("c", DataType::Int32, true), // new column | ||||||
| ])); | ||||||
|
|
||||||
| // Old file: 2 columns | ||||||
| let stats_old = Statistics::default() | ||||||
| .with_num_rows(Precision::Exact(100)) | ||||||
| .add_column_statistics( | ||||||
| ColumnStatistics::new_unknown() | ||||||
| .with_min_value(Precision::Exact(ScalarValue::from(1))) | ||||||
| .with_max_value(Precision::Exact(ScalarValue::from(50))), | ||||||
| ) | ||||||
| .add_column_statistics( | ||||||
| ColumnStatistics::new_unknown() | ||||||
| .with_min_value(Precision::Exact(ScalarValue::from(10))) | ||||||
| .with_max_value(Precision::Exact(ScalarValue::from(500))), | ||||||
| ); | ||||||
|
|
||||||
| // New file: 3 columns | ||||||
| let stats_new = Statistics::default() | ||||||
| .with_num_rows(Precision::Exact(200)) | ||||||
| .add_column_statistics( | ||||||
| ColumnStatistics::new_unknown() | ||||||
| .with_min_value(Precision::Exact(ScalarValue::from(5))) | ||||||
| .with_max_value(Precision::Exact(ScalarValue::from(80))), | ||||||
| ) | ||||||
| .add_column_statistics( | ||||||
| ColumnStatistics::new_unknown() | ||||||
| .with_min_value(Precision::Exact(ScalarValue::from(20))) | ||||||
| .with_max_value(Precision::Exact(ScalarValue::from(300))), | ||||||
| ) | ||||||
| .add_column_statistics( | ||||||
| ColumnStatistics::new_unknown() | ||||||
| .with_min_value(Precision::Exact(ScalarValue::from(0))) | ||||||
| .with_max_value(Precision::Exact(ScalarValue::from(999))), | ||||||
| ); | ||||||
|
|
||||||
| let items = vec![stats_old, stats_new]; | ||||||
| let merged = Statistics::try_merge_iter(&items, &schema).unwrap(); | ||||||
|
|
||||||
| assert_eq!(merged.num_rows, Precision::Exact(300)); | ||||||
| assert_eq!(merged.column_statistics.len(), 3); | ||||||
|
|
||||||
| // Column a: merged from both files | ||||||
| assert_eq!( | ||||||
| merged.column_statistics[0].min_value, | ||||||
| Precision::Exact(ScalarValue::from(1)) | ||||||
| ); | ||||||
| assert_eq!( | ||||||
| merged.column_statistics[0].max_value, | ||||||
| Precision::Exact(ScalarValue::from(80)) | ||||||
| ); | ||||||
|
|
||||||
| // Column b: merged from both files | ||||||
| assert_eq!( | ||||||
| merged.column_statistics[1].min_value, | ||||||
| Precision::Exact(ScalarValue::from(10)) | ||||||
| ); | ||||||
| assert_eq!( | ||||||
| merged.column_statistics[1].max_value, | ||||||
| Precision::Exact(ScalarValue::from(500)) | ||||||
| ); | ||||||
|
|
||||||
| // Column c: only present in new file, old file was padded with unknown. | ||||||
| // unknown.min(Exact) = Absent, so the merged stats are Absent. | ||||||
| assert_eq!(merged.column_statistics[2].min_value, Precision::Absent); | ||||||
| assert_eq!(merged.column_statistics[2].max_value, Precision::Absent); | ||||||
| } | ||||||
|
|
||||||
| #[test] | ||||||
| fn test_try_merge_schema_evolution_reversed_order() { | ||||||
| // New file first, old file second — should produce same result | ||||||
| let schema = Arc::new(Schema::new(vec![ | ||||||
| Field::new("a", DataType::Int32, false), | ||||||
| Field::new("b", DataType::Int32, true), | ||||||
| ])); | ||||||
|
|
||||||
| let stats_new = Statistics::default() | ||||||
| .with_num_rows(Precision::Exact(50)) | ||||||
| .add_column_statistics( | ||||||
| ColumnStatistics::new_unknown() | ||||||
| .with_min_value(Precision::Exact(ScalarValue::from(1))) | ||||||
| .with_max_value(Precision::Exact(ScalarValue::from(10))), | ||||||
| ) | ||||||
| .add_column_statistics( | ||||||
| ColumnStatistics::new_unknown() | ||||||
| .with_min_value(Precision::Exact(ScalarValue::from(100))) | ||||||
| .with_max_value(Precision::Exact(ScalarValue::from(200))), | ||||||
| ); | ||||||
|
|
||||||
| let stats_old = Statistics::default() | ||||||
| .with_num_rows(Precision::Exact(30)) | ||||||
| .add_column_statistics( | ||||||
| ColumnStatistics::new_unknown() | ||||||
| .with_min_value(Precision::Exact(ScalarValue::from(5))) | ||||||
| .with_max_value(Precision::Exact(ScalarValue::from(20))), | ||||||
| ); | ||||||
|
|
||||||
| let items = vec![stats_new, stats_old]; | ||||||
| let merged = Statistics::try_merge_iter(&items, &schema).unwrap(); | ||||||
|
|
||||||
| let stats2 = | ||||||
| Statistics::default().add_column_statistics(ColumnStatistics::new_unknown()); | ||||||
| assert_eq!(merged.num_rows, Precision::Exact(80)); | ||||||
| assert_eq!(merged.column_statistics.len(), 2); | ||||||
|
|
||||||
| // Column a: merged | ||||||
| assert_eq!( | ||||||
| merged.column_statistics[0].min_value, | ||||||
| Precision::Exact(ScalarValue::from(1)) | ||||||
| ); | ||||||
| assert_eq!( | ||||||
| merged.column_statistics[0].max_value, | ||||||
| Precision::Exact(ScalarValue::from(20)) | ||||||
| ); | ||||||
|
|
||||||
| // Column b: only in stats_new, stats_old treated as unknown. | ||||||
| // Exact.min(Absent) = Absent — symmetric with forward order. | ||||||
| assert_eq!(merged.column_statistics[1].min_value, Precision::Absent); | ||||||
| assert_eq!(merged.column_statistics[1].max_value, Precision::Absent); | ||||||
| } | ||||||
|
|
||||||
| #[test] | ||||||
| fn test_try_merge_same_column_count_unchanged() { | ||||||
| // Ensure same-column-count merge still works identically | ||||||
| let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); | ||||||
|
|
||||||
| let stats1 = Statistics::default() | ||||||
| .with_num_rows(Precision::Exact(10)) | ||||||
| .add_column_statistics( | ||||||
| ColumnStatistics::new_unknown() | ||||||
| .with_min_value(Precision::Exact(ScalarValue::from(1))) | ||||||
| .with_max_value(Precision::Exact(ScalarValue::from(50))), | ||||||
| ); | ||||||
|
|
||||||
| let stats2 = Statistics::default() | ||||||
| .with_num_rows(Precision::Exact(20)) | ||||||
| .add_column_statistics( | ||||||
| ColumnStatistics::new_unknown() | ||||||
| .with_min_value(Precision::Exact(ScalarValue::from(5))) | ||||||
| .with_max_value(Precision::Exact(ScalarValue::from(100))), | ||||||
| ); | ||||||
|
|
||||||
| let items = vec![stats1, stats2]; | ||||||
| let merged = Statistics::try_merge_iter(&items, &schema).unwrap(); | ||||||
|
|
||||||
| assert_eq!(merged.num_rows, Precision::Exact(30)); | ||||||
| assert_eq!(merged.column_statistics.len(), 1); | ||||||
| assert_eq!( | ||||||
| merged.column_statistics[0].min_value, | ||||||
| Precision::Exact(ScalarValue::from(1)) | ||||||
| ); | ||||||
| assert_eq!( | ||||||
| merged.column_statistics[0].max_value, | ||||||
| Precision::Exact(ScalarValue::from(100)) | ||||||
| ); | ||||||
| } | ||||||
|
|
||||||
| #[test] | ||||||
| fn test_try_merge_iter_pads_to_schema() { | ||||||
| // Table schema has 3 columns but all files only have 2. | ||||||
| // Merged stats should be padded to 3 columns. | ||||||
| let schema = Arc::new(Schema::new(vec![ | ||||||
| Field::new("a", DataType::Int32, false), | ||||||
| Field::new("b", DataType::Int32, false), | ||||||
| Field::new("c", DataType::Int32, true), // new column, no files have it | ||||||
| ])); | ||||||
|
Comment on lines
+1484
to
+1491
|
||||||
|
|
||||||
| let stats1 = Statistics::default() | ||||||
| .with_num_rows(Precision::Exact(10)) | ||||||
| .add_column_statistics( | ||||||
| ColumnStatistics::new_unknown() | ||||||
| .with_min_value(Precision::Exact(ScalarValue::from(1))), | ||||||
| ) | ||||||
| .add_column_statistics( | ||||||
| ColumnStatistics::new_unknown() | ||||||
| .with_min_value(Precision::Exact(ScalarValue::from(10))), | ||||||
| ); | ||||||
|
|
||||||
| let stats2 = Statistics::default() | ||||||
| .with_num_rows(Precision::Exact(20)) | ||||||
| .add_column_statistics( | ||||||
| ColumnStatistics::new_unknown() | ||||||
| .with_min_value(Precision::Exact(ScalarValue::from(5))), | ||||||
| ) | ||||||
| .add_column_statistics( | ||||||
| ColumnStatistics::new_unknown() | ||||||
| .with_min_value(Precision::Exact(ScalarValue::from(20))), | ||||||
| ); | ||||||
|
|
||||||
| let items = vec![stats1, stats2]; | ||||||
| let merged = Statistics::try_merge_iter(&items, &schema).unwrap(); | ||||||
|
|
||||||
| assert_eq!(merged.num_rows, Precision::Exact(30)); | ||||||
| // Merged stats have max(file1_cols, file2_cols) = 2, not schema cols | ||||||
| assert_eq!(merged.column_statistics.len(), 2); | ||||||
|
|
||||||
| let e = Statistics::try_merge_iter(&items, &schema).unwrap_err(); | ||||||
| assert_contains!( | ||||||
| e.to_string(), | ||||||
| "Error during planning: Cannot merge statistics with different number of columns: 0 vs 1" | ||||||
| // Columns a, b: merged from both files | ||||||
| assert_eq!( | ||||||
| merged.column_statistics[0].min_value, | ||||||
| Precision::Exact(ScalarValue::from(1)) | ||||||
| ); | ||||||
| assert_eq!( | ||||||
| merged.column_statistics[1].min_value, | ||||||
| Precision::Exact(ScalarValue::from(10)) | ||||||
| ); | ||||||
| } | ||||||
|
|
||||||
| #[test] | ||||||
| fn test_try_merge_iter_does_not_pad_for_partition_columns() { | ||||||
| // Schema includes partition column; files don't have stats for it. | ||||||
| // try_merge_iter should NOT pad to schema — partition columns are | ||||||
| // handled by the caller (e.g. Atlas's plan.rs or DF's statistics.rs). | ||||||
|
||||||
| // handled by the caller (e.g. Atlas's plan.rs or DF's statistics.rs). | |
| // expected to be handled by higher-level planning/statistics code. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Statistics::try_mergeno longer errors whencolumn_statisticslengths differ, but the function docs still say it returns an error for mismatched schemas. Please update the doc comment to reflect the new schema-evolution behavior (and what kinds of mismatches, if any, still produce an error).