diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 607fd98350..0ca1c8ec5b 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -22,7 +22,7 @@ use std::cmp::Ordering; use std::collections::HashMap; use std::fmt::{Display, Formatter}; use std::hash::Hash; -use std::sync::Arc; +use std::sync::{Arc, LazyLock}; use _serde::TableMetadataEnum; use chrono::{DateTime, Utc}; @@ -33,9 +33,9 @@ use uuid::Uuid; use super::snapshot::SnapshotReference; pub use super::table_metadata_builder::{TableMetadataBuildResult, TableMetadataBuilder}; use super::{ - DEFAULT_PARTITION_SPEC_ID, PartitionSpecRef, PartitionStatisticsFile, SchemaId, SchemaRef, - SnapshotRef, SnapshotRetention, SortOrder, SortOrderRef, StatisticsFile, StructType, - TableProperties, parse_metadata_file_compression, + DEFAULT_PARTITION_SPEC_ID, PartitionSpecRef, PartitionStatisticsFile, PrimitiveType, Schema, + SchemaId, SchemaRef, SnapshotRef, SnapshotRetention, SortOrder, SortOrderRef, StatisticsFile, + StructType, TableProperties, parse_metadata_file_compression, }; use crate::catalog::MetadataLocation; use crate::compression::CompressionCodec; @@ -60,6 +60,14 @@ pub const MIN_FORMAT_VERSION_ROW_LINEAGE: FormatVersion = FormatVersion::V3; /// Reference to [`TableMetadata`]. pub type TableMetadataRef = Arc; +static PRIMITIVE_TYPE_MIN_FORMAT_VERSION: LazyLock> = + LazyLock::new(|| { + HashMap::from([ + (PrimitiveType::TimestampNs, FormatVersion::V3), + (PrimitiveType::TimestamptzNs, FormatVersion::V3), + ]) + }); + #[derive(Debug, PartialEq, Deserialize, Eq, Clone)] #[serde(try_from = "TableMetadataEnum")] /// Fields for the version 2 of the table metadata. @@ -1565,6 +1573,25 @@ impl Display for FormatVersion { } } +/// Returns the minimum table format version required by any type in a schema. +/// +/// Returns [`None`] when the schema contains no type with a specific minimum +/// table format version requirement. +pub fn min_format_version_for_schema(schema: &Schema) -> Option { + schema + .field_id_to_fields() + .values() + .filter_map(|field| field.field_type.as_primitive_type()) + .filter_map(min_format_version_for_primitive_type) + .max() +} + +pub(crate) fn min_format_version_for_primitive_type( + primitive: &PrimitiveType, +) -> Option { + PRIMITIVE_TYPE_MIN_FORMAT_VERSION.get(primitive).copied() +} + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] #[serde(rename_all = "kebab-case")] /// Encodes changes to the previous metadata files for the table @@ -1616,10 +1643,10 @@ mod tests { use crate::io::FileIO; use crate::spec::table_metadata::TableMetadata; use crate::spec::{ - BlobMetadata, EncryptedKey, INITIAL_ROW_ID, Literal, NestedField, NullOrder, Operation, - PartitionSpec, PartitionStatisticsFile, PrimitiveLiteral, PrimitiveType, Schema, Snapshot, - SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, StatisticsFile, - Summary, TableProperties, Transform, Type, UnboundPartitionField, + BlobMetadata, EncryptedKey, INITIAL_ROW_ID, ListType, Literal, NestedField, NullOrder, + Operation, PartitionSpec, PartitionStatisticsFile, PrimitiveLiteral, PrimitiveType, Schema, + Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, + StatisticsFile, Summary, TableProperties, Transform, Type, UnboundPartitionField, }; use crate::{ErrorKind, TableCreation}; @@ -3541,6 +3568,28 @@ mod tests { ) } + fn schema_with_primitive_field(field_type: PrimitiveType) -> Schema { + Schema::builder() + .with_fields(vec![ + NestedField::required(1, "ts", Type::Primitive(field_type)).into(), + ]) + .build() + .unwrap() + } + + fn table_creation_with_format_version( + schema: Schema, + format_version: FormatVersion, + ) -> TableCreation { + TableCreation::builder() + .location("s3://db/table".to_string()) + .name("table".to_string()) + .properties(HashMap::new()) + .schema(schema) + .format_version(format_version) + .build() + } + #[test] fn test_table_metadata_builder_from_table_creation() { let table_creation = TableCreation::builder() @@ -3591,6 +3640,91 @@ mod tests { ); } + #[test] + fn test_table_metadata_builder_rejects_v1_v2_nanosecond_timestamp_tables() { + for (format_version, primitive_type) in [ + (FormatVersion::V1, PrimitiveType::TimestampNs), + (FormatVersion::V1, PrimitiveType::TimestamptzNs), + (FormatVersion::V2, PrimitiveType::TimestampNs), + (FormatVersion::V2, PrimitiveType::TimestamptzNs), + ] { + let table_creation = table_creation_with_format_version( + schema_with_primitive_field(primitive_type), + format_version, + ); + + let err = TableMetadataBuilder::from_table_creation(table_creation).unwrap_err(); + + assert_eq!(err.kind(), ErrorKind::DataInvalid); + assert!( + err.message().contains("Invalid type for ts:"), + "expected error message to name the invalid column, got {}", + err.message() + ); + assert!( + err.message().contains("is not supported until v3"), + "expected error message to explain v3 requirement, got {}", + err.message() + ); + } + } + + #[test] + fn test_table_metadata_builder_rejects_v2_list_element_requiring_v3() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required( + 1, + "ts_values", + Type::List(ListType::new( + NestedField::list_element( + 2, + Type::Primitive(PrimitiveType::TimestampNs), + false, + ) + .into(), + )), + ) + .into(), + ]) + .build() + .unwrap(); + let table_creation = table_creation_with_format_version(schema, FormatVersion::V2); + + let err = TableMetadataBuilder::from_table_creation(table_creation).unwrap_err(); + + assert_eq!(err.kind(), ErrorKind::DataInvalid); + assert!( + err.message().contains( + "Invalid type for ts_values.element: timestamp_ns is not supported until v3" + ), + "expected error message to explain nested v3 requirement with column name, got {}", + err.message() + ); + } + + #[test] + fn test_table_metadata_builder_allows_v3_nanosecond_timestamp_tables() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "ts_ns", Type::Primitive(PrimitiveType::TimestampNs)) + .into(), + NestedField::required(2, "tstz_ns", Type::Primitive(PrimitiveType::TimestamptzNs)) + .into(), + ]) + .build() + .unwrap(); + let table_creation = table_creation_with_format_version(schema, FormatVersion::V3); + + let table_metadata = TableMetadataBuilder::from_table_creation(table_creation) + .unwrap() + .build() + .unwrap() + .metadata; + + assert_eq!(table_metadata.format_version, FormatVersion::V3); + } + #[tokio::test] async fn test_table_metadata_read_write() { // Create a temporary directory for our test diff --git a/crates/iceberg/src/spec/table_metadata_builder.rs b/crates/iceberg/src/spec/table_metadata_builder.rs index 5754b5fe06..5b0323b214 100644 --- a/crates/iceberg/src/spec/table_metadata_builder.rs +++ b/crates/iceberg/src/spec/table_metadata_builder.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::sync::Arc; use uuid::Uuid; @@ -28,7 +28,10 @@ use super::{ UnboundPartitionSpec, }; use crate::error::{Error, ErrorKind, Result}; -use crate::spec::{EncryptedKey, INITIAL_ROW_ID, MIN_FORMAT_VERSION_ROW_LINEAGE}; +use crate::spec::{ + EncryptedKey, INITIAL_ROW_ID, MIN_FORMAT_VERSION_ROW_LINEAGE, + min_format_version_for_primitive_type, +}; use crate::{TableCreation, TableUpdate}; pub(crate) const FIRST_FIELD_ID: i32 = 1; @@ -196,6 +199,41 @@ impl TableMetadataBuilder { ) } + fn validate_schema_compatible_with_format_version( + format_version: FormatVersion, + schema: &Schema, + ) -> Result<()> { + let problems = schema + .field_id_to_fields() + .values() + .filter_map(|field| { + let field_type = field.field_type.as_ref(); + let primitive = field_type.as_primitive_type()?; + let min_format_version = min_format_version_for_primitive_type(primitive)?; + (format_version < min_format_version).then(|| { + let column_name = schema + .name_by_field_id(field.id) + .unwrap_or(field.name.as_str()); + ( + field.id, + format!( + "Invalid type for {column_name}: {field_type} is not supported until {min_format_version}" + ), + ) + }) + }) + .collect::>(); + + if !problems.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + problems.into_values().collect::>().join("; "), + )); + } + + Ok(()) + } + /// Changes uuid of table metadata. pub fn assign_uuid(mut self, uuid: Uuid) -> Self { if self.metadata.table_uuid != uuid { @@ -638,6 +676,11 @@ impl TableMetadataBuilder { /// Important: Use this method with caution. The builder does not check /// if the added schema is compatible with the current schema. pub fn add_schema(mut self, schema: Schema) -> Result { + Self::validate_schema_compatible_with_format_version( + self.metadata.format_version, + &schema, + )?; + // Validate that new schema fields don't conflict with existing partition field names self.validate_schema_field_names(&schema)?; diff --git a/crates/integrations/datafusion/src/schema.rs b/crates/integrations/datafusion/src/schema.rs index 508aeb303b..82ec722107 100644 --- a/crates/integrations/datafusion/src/schema.rs +++ b/crates/integrations/datafusion/src/schema.rs @@ -29,6 +29,7 @@ use futures::StreamExt; use futures::future::try_join_all; use iceberg::arrow::arrow_schema_to_schema_auto_assign_ids; use iceberg::inspect::MetadataTableType; +use iceberg::spec::min_format_version_for_schema; use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableCreation, TableIdent}; use crate::table::IcebergTableProvider; @@ -162,12 +163,20 @@ impl SchemaProvider for IcebergSchemaProvider { let df_schema = table.schema(); let iceberg_schema = arrow_schema_to_schema_auto_assign_ids(df_schema.as_ref()) .map_err(to_datafusion_error)?; + let format_version = min_format_version_for_schema(&iceberg_schema); // Create the table in the Iceberg catalog - let table_creation = TableCreation::builder() - .name(name.clone()) - .schema(iceberg_schema) - .build(); + let table_creation = match format_version { + Some(format_version) => TableCreation::builder() + .name(name.clone()) + .format_version(format_version) + .schema(iceberg_schema) + .build(), + None => TableCreation::builder() + .name(name.clone()) + .schema(iceberg_schema) + .build(), + }; let catalog = self.catalog.clone(); let namespace = self.namespace.clone(); @@ -288,10 +297,11 @@ mod tests { use std::sync::Arc; use datafusion::arrow::array::{Int32Array, StringArray}; - use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; + use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, TimeUnit}; use datafusion::arrow::record_batch::RecordBatch; use datafusion::datasource::MemTable; use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; + use iceberg::spec::FormatVersion; use iceberg::{Catalog, CatalogBuilder, NamespaceIdent}; use tempfile::TempDir; @@ -375,6 +385,37 @@ mod tests { assert!(schema_provider.table_exist("empty_table")); } + #[tokio::test] + async fn test_register_timestamp_ns_table_uses_v3() { + let (schema_provider, _temp_dir) = create_test_schema_provider().await; + + let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new( + "ts", + DataType::Timestamp(TimeUnit::Nanosecond, None), + true, + )])); + + let empty_batch = RecordBatch::new_empty(arrow_schema.clone()); + let mem_table = MemTable::try_new(arrow_schema, vec![vec![empty_batch]]).unwrap(); + + let result = + schema_provider.register_table("timestamp_ns_table".to_string(), Arc::new(mem_table)); + + assert!(result.is_ok(), "Expected success, got: {result:?}"); + + let table_ident = TableIdent::new( + schema_provider.namespace.clone(), + "timestamp_ns_table".to_string(), + ); + let table = schema_provider + .catalog + .load_table(&table_ident) + .await + .unwrap(); + + assert_eq!(FormatVersion::V3, table.metadata().format_version()); + } + #[tokio::test] async fn test_register_duplicate_table_fails() { let (schema_provider, _temp_dir) = create_test_schema_provider().await;