Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 142 additions & 8 deletions crates/iceberg/src/spec/table_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::cmp::Ordering;
use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use std::hash::Hash;
use std::sync::Arc;
use std::sync::{Arc, LazyLock};

use _serde::TableMetadataEnum;
use chrono::{DateTime, Utc};
Expand All @@ -33,9 +33,9 @@ use uuid::Uuid;
use super::snapshot::SnapshotReference;
pub use super::table_metadata_builder::{TableMetadataBuildResult, TableMetadataBuilder};
use super::{
DEFAULT_PARTITION_SPEC_ID, PartitionSpecRef, PartitionStatisticsFile, SchemaId, SchemaRef,
SnapshotRef, SnapshotRetention, SortOrder, SortOrderRef, StatisticsFile, StructType,
TableProperties, parse_metadata_file_compression,
DEFAULT_PARTITION_SPEC_ID, PartitionSpecRef, PartitionStatisticsFile, PrimitiveType, Schema,
SchemaId, SchemaRef, SnapshotRef, SnapshotRetention, SortOrder, SortOrderRef, StatisticsFile,
StructType, TableProperties, parse_metadata_file_compression,
};
use crate::catalog::MetadataLocation;
use crate::compression::CompressionCodec;
Expand All @@ -60,6 +60,14 @@ pub const MIN_FORMAT_VERSION_ROW_LINEAGE: FormatVersion = FormatVersion::V3;
/// Reference to [`TableMetadata`].
pub type TableMetadataRef = Arc<TableMetadata>;

static PRIMITIVE_TYPE_MIN_FORMAT_VERSION: LazyLock<HashMap<PrimitiveType, FormatVersion>> =
LazyLock::new(|| {
HashMap::from([
(PrimitiveType::TimestampNs, FormatVersion::V3),
(PrimitiveType::TimestamptzNs, FormatVersion::V3),
])
});

#[derive(Debug, PartialEq, Deserialize, Eq, Clone)]
#[serde(try_from = "TableMetadataEnum")]
/// Fields for the version 2 of the table metadata.
Expand Down Expand Up @@ -1565,6 +1573,25 @@ impl Display for FormatVersion {
}
}

/// Returns the minimum table format version required by any type in a schema.
///
/// Returns [`None`] when the schema contains no type with a specific minimum
/// table format version requirement.
pub fn min_format_version_for_schema(schema: &Schema) -> Option<FormatVersion> {
schema
.field_id_to_fields()
.values()
.filter_map(|field| field.field_type.as_primitive_type())
.filter_map(min_format_version_for_primitive_type)
.max()
}

pub(crate) fn min_format_version_for_primitive_type(
primitive: &PrimitiveType,
) -> Option<FormatVersion> {
PRIMITIVE_TYPE_MIN_FORMAT_VERSION.get(primitive).copied()
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[serde(rename_all = "kebab-case")]
/// Encodes changes to the previous metadata files for the table
Expand Down Expand Up @@ -1616,10 +1643,10 @@ mod tests {
use crate::io::FileIO;
use crate::spec::table_metadata::TableMetadata;
use crate::spec::{
BlobMetadata, EncryptedKey, INITIAL_ROW_ID, Literal, NestedField, NullOrder, Operation,
PartitionSpec, PartitionStatisticsFile, PrimitiveLiteral, PrimitiveType, Schema, Snapshot,
SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, StatisticsFile,
Summary, TableProperties, Transform, Type, UnboundPartitionField,
BlobMetadata, EncryptedKey, INITIAL_ROW_ID, ListType, Literal, NestedField, NullOrder,
Operation, PartitionSpec, PartitionStatisticsFile, PrimitiveLiteral, PrimitiveType, Schema,
Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder,
StatisticsFile, Summary, TableProperties, Transform, Type, UnboundPartitionField,
};
use crate::{ErrorKind, TableCreation};

Expand Down Expand Up @@ -3541,6 +3568,28 @@ mod tests {
)
}

fn schema_with_primitive_field(field_type: PrimitiveType) -> Schema {
Schema::builder()
.with_fields(vec![
NestedField::required(1, "ts", Type::Primitive(field_type)).into(),
])
.build()
.unwrap()
}

fn table_creation_with_format_version(
schema: Schema,
format_version: FormatVersion,
) -> TableCreation {
TableCreation::builder()
.location("s3://db/table".to_string())
.name("table".to_string())
.properties(HashMap::new())
.schema(schema)
.format_version(format_version)
.build()
}

#[test]
fn test_table_metadata_builder_from_table_creation() {
let table_creation = TableCreation::builder()
Expand Down Expand Up @@ -3591,6 +3640,91 @@ mod tests {
);
}

#[test]
fn test_table_metadata_builder_rejects_v1_v2_nanosecond_timestamp_tables() {
for (format_version, primitive_type) in [
(FormatVersion::V1, PrimitiveType::TimestampNs),
(FormatVersion::V1, PrimitiveType::TimestamptzNs),
(FormatVersion::V2, PrimitiveType::TimestampNs),
(FormatVersion::V2, PrimitiveType::TimestamptzNs),
] {
let table_creation = table_creation_with_format_version(
schema_with_primitive_field(primitive_type),
format_version,
);

let err = TableMetadataBuilder::from_table_creation(table_creation).unwrap_err();

assert_eq!(err.kind(), ErrorKind::DataInvalid);
assert!(
err.message().contains("Invalid type for ts:"),
"expected error message to name the invalid column, got {}",
err.message()
);
assert!(
err.message().contains("is not supported until v3"),
"expected error message to explain v3 requirement, got {}",
err.message()
);
}
}

#[test]
fn test_table_metadata_builder_rejects_v2_list_element_requiring_v3() {
let schema = Schema::builder()
.with_fields(vec![
NestedField::required(
1,
"ts_values",
Type::List(ListType::new(
NestedField::list_element(
2,
Type::Primitive(PrimitiveType::TimestampNs),
false,
)
.into(),
)),
)
.into(),
])
.build()
.unwrap();
let table_creation = table_creation_with_format_version(schema, FormatVersion::V2);

let err = TableMetadataBuilder::from_table_creation(table_creation).unwrap_err();

assert_eq!(err.kind(), ErrorKind::DataInvalid);
assert!(
err.message().contains(
"Invalid type for ts_values.element: timestamp_ns is not supported until v3"
),
"expected error message to explain nested v3 requirement with column name, got {}",
err.message()
);
}

#[test]
fn test_table_metadata_builder_allows_v3_nanosecond_timestamp_tables() {
let schema = Schema::builder()
.with_fields(vec![
NestedField::required(1, "ts_ns", Type::Primitive(PrimitiveType::TimestampNs))
.into(),
NestedField::required(2, "tstz_ns", Type::Primitive(PrimitiveType::TimestamptzNs))
.into(),
])
.build()
.unwrap();
let table_creation = table_creation_with_format_version(schema, FormatVersion::V3);

let table_metadata = TableMetadataBuilder::from_table_creation(table_creation)
.unwrap()
.build()
.unwrap()
.metadata;

assert_eq!(table_metadata.format_version, FormatVersion::V3);
}

#[tokio::test]
async fn test_table_metadata_read_write() {
// Create a temporary directory for our test
Expand Down
47 changes: 45 additions & 2 deletions crates/iceberg/src/spec/table_metadata_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::{HashMap, HashSet};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::Arc;

use uuid::Uuid;
Expand All @@ -28,7 +28,10 @@ use super::{
UnboundPartitionSpec,
};
use crate::error::{Error, ErrorKind, Result};
use crate::spec::{EncryptedKey, INITIAL_ROW_ID, MIN_FORMAT_VERSION_ROW_LINEAGE};
use crate::spec::{
EncryptedKey, INITIAL_ROW_ID, MIN_FORMAT_VERSION_ROW_LINEAGE,
min_format_version_for_primitive_type,
};
use crate::{TableCreation, TableUpdate};

pub(crate) const FIRST_FIELD_ID: i32 = 1;
Expand Down Expand Up @@ -196,6 +199,41 @@ impl TableMetadataBuilder {
)
}

fn validate_schema_compatible_with_format_version(
format_version: FormatVersion,
schema: &Schema,
) -> Result<()> {
let problems = schema
.field_id_to_fields()
.values()
.filter_map(|field| {
let field_type = field.field_type.as_ref();
let primitive = field_type.as_primitive_type()?;
let min_format_version = min_format_version_for_primitive_type(primitive)?;
(format_version < min_format_version).then(|| {
let column_name = schema
.name_by_field_id(field.id)
.unwrap_or(field.name.as_str());
(
field.id,
format!(
"Invalid type for {column_name}: {field_type} is not supported until {min_format_version}"
),
)
})
})
.collect::<BTreeMap<_, _>>();

if !problems.is_empty() {
return Err(Error::new(
ErrorKind::DataInvalid,
problems.into_values().collect::<Vec<_>>().join("; "),
));
}

Ok(())
}

/// Changes uuid of table metadata.
pub fn assign_uuid(mut self, uuid: Uuid) -> Self {
if self.metadata.table_uuid != uuid {
Expand Down Expand Up @@ -638,6 +676,11 @@ impl TableMetadataBuilder {
/// Important: Use this method with caution. The builder does not check
/// if the added schema is compatible with the current schema.
pub fn add_schema(mut self, schema: Schema) -> Result<Self> {
Self::validate_schema_compatible_with_format_version(
self.metadata.format_version,
&schema,
)?;

// Validate that new schema fields don't conflict with existing partition field names
self.validate_schema_field_names(&schema)?;

Expand Down
51 changes: 46 additions & 5 deletions crates/integrations/datafusion/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use futures::StreamExt;
use futures::future::try_join_all;
use iceberg::arrow::arrow_schema_to_schema_auto_assign_ids;
use iceberg::inspect::MetadataTableType;
use iceberg::spec::min_format_version_for_schema;
use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableCreation, TableIdent};

use crate::table::IcebergTableProvider;
Expand Down Expand Up @@ -162,12 +163,20 @@ impl SchemaProvider for IcebergSchemaProvider {
let df_schema = table.schema();
let iceberg_schema = arrow_schema_to_schema_auto_assign_ids(df_schema.as_ref())
.map_err(to_datafusion_error)?;
let format_version = min_format_version_for_schema(&iceberg_schema);

// Create the table in the Iceberg catalog
let table_creation = TableCreation::builder()
.name(name.clone())
.schema(iceberg_schema)
.build();
let table_creation = match format_version {
Some(format_version) => TableCreation::builder()
.name(name.clone())
.format_version(format_version)
.schema(iceberg_schema)
.build(),
None => TableCreation::builder()
.name(name.clone())
.schema(iceberg_schema)
.build(),
};

let catalog = self.catalog.clone();
let namespace = self.namespace.clone();
Expand Down Expand Up @@ -288,10 +297,11 @@ mod tests {
use std::sync::Arc;

use datafusion::arrow::array::{Int32Array, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, TimeUnit};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::MemTable;
use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
use iceberg::spec::FormatVersion;
use iceberg::{Catalog, CatalogBuilder, NamespaceIdent};
use tempfile::TempDir;

Expand Down Expand Up @@ -375,6 +385,37 @@ mod tests {
assert!(schema_provider.table_exist("empty_table"));
}

#[tokio::test]
async fn test_register_timestamp_ns_table_uses_v3() {
let (schema_provider, _temp_dir) = create_test_schema_provider().await;

let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
"ts",
DataType::Timestamp(TimeUnit::Nanosecond, None),
true,
)]));

let empty_batch = RecordBatch::new_empty(arrow_schema.clone());
let mem_table = MemTable::try_new(arrow_schema, vec![vec![empty_batch]]).unwrap();

let result =
schema_provider.register_table("timestamp_ns_table".to_string(), Arc::new(mem_table));

assert!(result.is_ok(), "Expected success, got: {result:?}");

let table_ident = TableIdent::new(
schema_provider.namespace.clone(),
"timestamp_ns_table".to_string(),
);
let table = schema_provider
.catalog
.load_table(&table_ident)
.await
.unwrap();

assert_eq!(FormatVersion::V3, table.metadata().format_version());
}

#[tokio::test]
async fn test_register_duplicate_table_fails() {
let (schema_provider, _temp_dir) = create_test_schema_provider().await;
Expand Down
Loading