diff --git a/Cargo.toml b/Cargo.toml index d7ef7832c6..9ece4d6bc4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [workspace] members = [ "acceptance", + "derive-macros", "ffi", "kernel", "kernel/examples/*", diff --git a/derive-macros/Cargo.toml b/derive-macros/Cargo.toml new file mode 100644 index 0000000000..e103333b62 --- /dev/null +++ b/derive-macros/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "derive-macros" +authors.workspace = true +edition.workspace = true +homepage.workspace = true +keywords.workspace = true +license.workspace = true +repository.workspace = true +readme.workspace = true +version.workspace = true + +[lib] +proc-macro = true + +[dependencies] +proc-macro2 = "1" +syn = { version = "2.0", features = ["extra-traits"] } +quote = "1.0" + + diff --git a/derive-macros/src/lib.rs b/derive-macros/src/lib.rs new file mode 100644 index 0000000000..dcb4a6b815 --- /dev/null +++ b/derive-macros/src/lib.rs @@ -0,0 +1,87 @@ +use proc_macro2::{Ident, TokenStream}; +use quote::{quote, quote_spanned}; +use syn::spanned::Spanned; +use syn::{parse_macro_input, Data, DataStruct, DeriveInput, Fields, PathArguments, Type}; + +#[proc_macro_derive(Schema, attributes(schema))] +pub fn derive_schema(input: proc_macro::TokenStream) -> proc_macro::TokenStream { + let input = parse_macro_input!(input as DeriveInput); + let struct_ident = input.ident; + + let schema_fields = gen_schema_fields(&input.data); + let output = quote! { + impl crate::actions::schemas::GetField for #struct_ident { + fn get_field(name: impl Into) -> crate::schema::StructField { + use crate::actions::schemas::GetField; + crate::schema::StructField::new( + name, + crate::schema::StructType::new(vec![ + #schema_fields + ]), + // By default not nullable. To make something nullable wrap it in an Option + false, + ) + } + } + }; + proc_macro::TokenStream::from(output) +} + +// turn our struct name into the schema name, goes from snake_case to camelCase +fn get_schema_name(name: &Ident) -> Ident { + let snake_name = name.to_string(); + let mut next_caps = false; + let ret: String = snake_name + .chars() + .filter_map(|c| { + if c == '_' { + next_caps = true; + None + } else if next_caps { + next_caps = false; + // This assumes we're using ascii, should be okay + Some(c.to_ascii_uppercase()) + } else { + Some(c) + } + }) + .collect(); + Ident::new(&ret, name.span()) +} + +fn gen_schema_fields(data: &Data) -> TokenStream { + let fields = match data { + Data::Struct(DataStruct { + fields: Fields::Named(fields), + .. + }) => &fields.named, + _ => panic!("this derive macro only works on structs with named fields"), + }; + + let schema_fields = fields.iter().map(|field| { + let name = field.ident.as_ref().unwrap(); // we know these are named fields + let name = get_schema_name(name); + match field.ty { + Type::Path(ref type_path) => { + if let Some(fin) = type_path.path.segments.iter().last() { + let type_ident = &fin.ident; + if let PathArguments::AngleBracketed(angle_args) = &fin.arguments { + quote_spanned! {field.span()=> + #type_ident::#angle_args::get_field(stringify!(#name)) + } + } else { + quote_spanned! {field.span()=> + #type_ident::get_field(stringify!(#name)) + } + } + } else { + panic!("Couldn't get type"); + } + } + _ => { + panic!("Can't handle type: {:?}", field.ty); + } + } + }); + quote! { #(#schema_fields),* } +} diff --git a/kernel/Cargo.toml b/kernel/Cargo.toml index f8e4fe4a3e..c63acc49db 100644 --- a/kernel/Cargo.toml +++ b/kernel/Cargo.toml @@ -29,6 +29,9 @@ url = "2" uuid = "1.3.0" z85 = "3.0.5" +# bring in our derive macros +derive-macros = { path = "../derive-macros" } + # used for developer-visibility visibility = "0.1.0" diff --git a/kernel/src/actions/deletion_vector.rs b/kernel/src/actions/deletion_vector.rs index 0238f8c053..a8d80376a9 100644 --- a/kernel/src/actions/deletion_vector.rs +++ b/kernel/src/actions/deletion_vector.rs @@ -4,12 +4,13 @@ use std::io::{Cursor, Read}; use std::sync::Arc; use bytes::Bytes; +use derive_macros::Schema; use roaring::RoaringTreemap; use url::Url; use crate::{DeltaResult, Error, FileSystemClient}; -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Schema)] pub struct DeletionVectorDescriptor { /// A single character to indicate how to access the DV. Legal options are: ['u', 'i', 'p']. pub storage_type: String, diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index a2c73d772f..a72f811e0c 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -3,14 +3,42 @@ pub(crate) mod deletion_vector; pub(crate) mod schemas; pub(crate) mod visitors; -use std::{collections::HashMap, sync::Arc}; +use derive_macros::Schema; +use lazy_static::lazy_static; use visitors::{AddVisitor, MetadataVisitor, ProtocolVisitor}; +use self::deletion_vector::DeletionVectorDescriptor; +use crate::actions::schemas::GetField; use crate::{schema::StructType, DeltaResult, EngineData}; -use self::deletion_vector::DeletionVectorDescriptor; +use std::collections::HashMap; + +pub(crate) static ADD_NAME: &str = "add"; +pub(crate) static REMOVE_NAME: &str = "remove"; +pub(crate) static METADATA_NAME: &str = "metaData"; +pub(crate) static PROTOCOL_NAME: &str = "protocol"; + +lazy_static! { + static ref LOG_SCHEMA: StructType = StructType::new( + vec![ + Option::::get_field(ADD_NAME), + Option::::get_field(REMOVE_NAME), + Option::::get_field(METADATA_NAME), + Option::::get_field(PROTOCOL_NAME), + // We don't support the following actions yet + //Option::get_field(CDC_NAME), + //Option::get_field(COMMIT_INFO_NAME), + //Option::get_field(DOMAIN_METADATA_NAME), + //Option::get_field(TRANSACTION_NAME), + ] + ); +} -#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) fn get_log_schema() -> &'static StructType { + &LOG_SCHEMA +} + +#[derive(Debug, Clone, PartialEq, Eq, Schema)] pub struct Format { /// Name of the encoding for files in this table pub provider: String, @@ -27,7 +55,7 @@ impl Default for Format { } } -#[derive(Debug, Default, Clone, PartialEq, Eq)] +#[derive(Debug, Default, Clone, PartialEq, Eq, Schema)] pub struct Metadata { /// Unique identifier for this table pub id: String, @@ -49,9 +77,11 @@ pub struct Metadata { impl Metadata { pub fn try_new_from_data(data: &dyn EngineData) -> DeltaResult> { - let schema = StructType::new(vec![crate::actions::schemas::METADATA_FIELD.clone()]); let mut visitor = MetadataVisitor::default(); - data.extract(Arc::new(schema), &mut visitor)?; + data.extract( + get_log_schema().project_as_schema(&[METADATA_NAME])?, + &mut visitor, + )?; Ok(visitor.metadata) } @@ -60,7 +90,7 @@ impl Metadata { } } -#[derive(Default, Debug, Clone, PartialEq, Eq)] +#[derive(Default, Debug, Clone, PartialEq, Eq, Schema)] pub struct Protocol { /// The minimum version of the Delta read protocol that a client must implement /// in order to correctly read this table @@ -79,13 +109,15 @@ pub struct Protocol { impl Protocol { pub fn try_new_from_data(data: &dyn EngineData) -> DeltaResult> { let mut visitor = ProtocolVisitor::default(); - let schema = StructType::new(vec![crate::actions::schemas::PROTOCOL_FIELD.clone()]); - data.extract(Arc::new(schema), &mut visitor)?; + data.extract( + get_log_schema().project_as_schema(&[PROTOCOL_NAME])?, + &mut visitor, + )?; Ok(visitor.protocol) } } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Schema)] pub struct Add { /// A relative path to a data file from the root of the table or an absolute path to a file /// that should be added to the table. The path is a URI as specified by @@ -113,7 +145,7 @@ pub struct Add { pub stats: Option, /// Map containing metadata about this logical file. - pub tags: HashMap>, + pub tags: Option>>, /// Information about deletion vector (DV) associated with this add action pub deletion_vector: Option, @@ -134,8 +166,10 @@ impl Add { /// Since we always want to parse multiple adds from data, we return a `Vec` pub fn parse_from_data(data: &dyn EngineData) -> DeltaResult> { let mut visitor = AddVisitor::default(); - let schema = StructType::new(vec![crate::actions::schemas::ADD_FIELD.clone()]); - data.extract(Arc::new(schema), &mut visitor)?; + data.extract( + get_log_schema().project_as_schema(&[ADD_NAME])?, + &mut visitor, + )?; Ok(visitor.adds) } @@ -144,7 +178,7 @@ impl Add { } } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Schema)] pub(crate) struct Remove { /// A relative path to a data file from the root of the table or an absolute path to a file /// that should be added to the table. The path is a URI as specified by @@ -153,13 +187,13 @@ pub(crate) struct Remove { /// [RFC 2396 URI Generic Syntax]: https://www.ietf.org/rfc/rfc2396.txt pub(crate) path: String, + /// The time this logical file was created, as milliseconds since the epoch. + pub(crate) deletion_timestamp: Option, + /// When `false` the logical file must already be present in the table or the records /// in the added file must be contained in one or more remove actions in the same version. pub(crate) data_change: bool, - /// The time this logical file was created, as milliseconds since the epoch. - pub(crate) deletion_timestamp: Option, - /// When true the fields `partition_values`, `size`, and `tags` are present pub(crate) extended_file_metadata: Option, @@ -185,19 +219,111 @@ pub(crate) struct Remove { } impl Remove { - // _try_new_from_data for now, to avoid warning, probably will need at some point - // pub(crate) fn _try_new_from_data( - // data: &dyn EngineData, - // ) -> DeltaResult { - // let mut visitor = Visitor::new(visit_remove); - // let schema = StructType::new(vec![crate::actions::schemas::REMOVE_FIELD.clone()]); - // data.extract(Arc::new(schema), &mut visitor)?; - // visitor - // .extracted - // .unwrap_or_else(|| Err(Error::generic("Didn't get expected remove"))) - // } - pub(crate) fn dv_unique_id(&self) -> Option { self.deletion_vector.as_ref().map(|dv| dv.unique_id()) } } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use super::*; + use crate::schema::{ArrayType, DataType, MapType, StructField}; + + #[test] + fn test_metadata_schema() { + let schema = get_log_schema() + .project_as_schema(&["metaData"]) + .expect("Couldn't get metaData field"); + + let expected = Arc::new(StructType::new(vec![StructField::new( + "metaData", + StructType::new(vec![ + StructField::new("id", DataType::STRING, false), + StructField::new("name", DataType::STRING, true), + StructField::new("description", DataType::STRING, true), + StructField::new( + "format", + StructType::new(vec![ + StructField::new("provider", DataType::STRING, false), + StructField::new( + "options", + MapType::new(DataType::STRING, DataType::STRING, false), + false, + ), + ]), + false, + ), + StructField::new("schemaString", DataType::STRING, false), + StructField::new( + "partitionColumns", + ArrayType::new(DataType::STRING, false), + false, + ), + StructField::new("createdTime", DataType::LONG, true), + StructField::new( + "configuration", + MapType::new(DataType::STRING, DataType::STRING, true), + false, + ), + ]), + true, + )])); + assert_eq!(schema, expected); + } + + fn tags_field() -> StructField { + StructField::new( + "tags", + MapType::new(DataType::STRING, DataType::STRING, true), + true, + ) + } + + fn partition_values_field() -> StructField { + StructField::new( + "partitionValues", + MapType::new(DataType::STRING, DataType::STRING, true), + true, + ) + } + + fn deletion_vector_field() -> StructField { + StructField::new( + "deletionVector", + DataType::Struct(Box::new(StructType::new(vec![ + StructField::new("storageType", DataType::STRING, false), + StructField::new("pathOrInlineDv", DataType::STRING, false), + StructField::new("offset", DataType::INTEGER, true), + StructField::new("sizeInBytes", DataType::INTEGER, false), + StructField::new("cardinality", DataType::LONG, false), + ]))), + true, + ) + } + + #[test] + fn test_remove_schema() { + let schema = get_log_schema() + .project_as_schema(&["remove"]) + .expect("Couldn't get remove field"); + let expected = Arc::new(StructType::new(vec![StructField::new( + "remove", + StructType::new(vec![ + StructField::new("path", DataType::STRING, false), + StructField::new("deletionTimestamp", DataType::LONG, true), + StructField::new("dataChange", DataType::BOOLEAN, false), + StructField::new("extendedFileMetadata", DataType::BOOLEAN, true), + partition_values_field(), + StructField::new("size", DataType::LONG, true), + tags_field(), + deletion_vector_field(), + StructField::new("baseRowId", DataType::LONG, true), + StructField::new("defaultRowCommitVersion", DataType::LONG, true), + ]), + true, + )])); + assert_eq!(schema, expected); + } +} diff --git a/kernel/src/actions/schemas.rs b/kernel/src/actions/schemas.rs index a1cbf890cf..894bbf8fe2 100644 --- a/kernel/src/actions/schemas.rs +++ b/kernel/src/actions/schemas.rs @@ -1,254 +1,44 @@ //! Schema definitions for action types -use lazy_static::lazy_static; +use std::collections::HashMap; -use crate::schema::{ArrayType, DataType, MapType, StructField, StructType}; +use crate::schema::{ArrayType, DataType, MapType, StructField}; -lazy_static! { - // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#change-metadata - pub(crate) static ref METADATA_FIELD: StructField = StructField::new( - "metaData", - StructType::new(vec![ - StructField::new("id", DataType::STRING, false), - StructField::new("name", DataType::STRING, true), - StructField::new("description", DataType::STRING, true), - StructField::new( - "format", - StructType::new(vec![ - StructField::new("provider", DataType::STRING, false), - StructField::new( - "options", - MapType::new( - DataType::STRING, - DataType::STRING, - true, - ), - true, - ), - ]), - false, - ), - StructField::new("schemaString", DataType::STRING, false), - StructField::new( - "partitionColumns", - ArrayType::new(DataType::STRING, false), - false, - ), - StructField::new("createdTime", DataType::LONG, true), - StructField::new( - "configuration", - MapType::new( - DataType::STRING, - DataType::STRING, - true, - ), - false, - ), - ]), - true, - ); - // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#protocol-evolution - pub(crate) static ref PROTOCOL_FIELD: StructField = StructField::new( - "protocol", - StructType::new(vec![ - StructField::new("minReaderVersion", DataType::INTEGER, false), - StructField::new("minWriterVersion", DataType::INTEGER, false), - StructField::new( - "readerFeatures", - ArrayType::new(DataType::STRING, false), - true, - ), - StructField::new( - "writerFeatures", - ArrayType::new(DataType::STRING, false), - true, - ), - ]), - true, - ); - // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#commit-provenance-information - static ref COMMIT_INFO_FIELD: StructField = StructField::new( - "commitInfo", - StructType::new(vec![ - StructField::new("timestamp", DataType::LONG, false), - StructField::new("operation", DataType::STRING, false), - StructField::new("isolationLevel", DataType::STRING, true), - StructField::new("isBlindAppend", DataType::BOOLEAN, true), - StructField::new("txnId", DataType::STRING, true), - StructField::new("readVersion", DataType::LONG, true), - StructField::new( - "operationParameters", - MapType::new( - DataType::STRING, - DataType::STRING, - true, - ), - true, - ), - StructField::new( - "operationMetrics", - MapType::new( - DataType::STRING, - DataType::STRING, - true, - ), - true, - ), - ]), - true, - ); - // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-file-and-remove-file - pub(crate) static ref ADD_FIELD: StructField = StructField::new( - "add", - StructType::new(vec![ - StructField::new("path", DataType::STRING, false), - partition_values_field(), - StructField::new("size", DataType::LONG, false), - StructField::new("modificationTime", DataType::LONG, false), - StructField::new("dataChange", DataType::BOOLEAN, false), - StructField::new("stats", DataType::STRING, true), - tags_field(), - deletion_vector_field(), - StructField::new("baseRowId", DataType::LONG, true), - StructField::new("defaultRowCommitVersion", DataType::LONG, true), - StructField::new("clusteringProvider", DataType::STRING, true), - ]), - true, - ); - // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-file-and-remove-file - pub(crate) static ref REMOVE_FIELD: StructField = StructField::new( - "remove", - StructType::new(vec![ - StructField::new("path", DataType::STRING, false), - StructField::new("deletionTimestamp", DataType::LONG, true), - StructField::new("dataChange", DataType::BOOLEAN, false), - StructField::new("extendedFileMetadata", DataType::BOOLEAN, true), - partition_values_field(), - StructField::new("size", DataType::LONG, true), - StructField::new("stats", DataType::STRING, true), - tags_field(), - deletion_vector_field(), - StructField::new("baseRowId", DataType::LONG, true), - StructField::new("defaultRowCommitVersion", DataType::LONG, true), - ]), - true, - ); - static ref REMOVE_FIELD_CHECKPOINT: StructField = StructField::new( - "remove", - StructType::new(vec![ - StructField::new("path", DataType::STRING, false), - StructField::new("deletionTimestamp", DataType::LONG, true), - StructField::new("dataChange", DataType::BOOLEAN, false), - ]), - true, - ); - // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-cdc-file - static ref CDC_FIELD: StructField = StructField::new( - "cdc", - StructType::new(vec![ - StructField::new("path", DataType::STRING, false), - partition_values_field(), - StructField::new("size", DataType::LONG, false), - StructField::new("dataChange", DataType::BOOLEAN, false), - tags_field(), - ]), - true, - ); - // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#transaction-identifiers - static ref TXN_FIELD: StructField = StructField::new( - "txn", - StructType::new(vec![ - StructField::new("appId", DataType::STRING, false), - StructField::new("version", DataType::LONG, false), - StructField::new("lastUpdated", DataType::LONG, true), - ]), - true, - ); - // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#domain-metadata - static ref DOMAIN_METADATA_FIELD: StructField = StructField::new( - "domainMetadata", - StructType::new(vec![ - StructField::new("domain", DataType::STRING, false), - StructField::new( - "configuration", - MapType::new( - DataType::STRING, - DataType::STRING, - true, - ), - false, - ), - StructField::new("removed", DataType::BOOLEAN, false), - ]), - true, - ); - // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#checkpoint-metadata - static ref CHECKPOINT_METADATA_FIELD: StructField = StructField::new( - "checkpointMetadata", - StructType::new(vec![ - StructField::new("flavor", DataType::STRING, false), - tags_field(), - ]), - true, - ); - // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#sidecar-file-information - static ref SIDECAR_FIELD: StructField = StructField::new( - "sidecar", - StructType::new(vec![ - StructField::new("path", DataType::STRING, false), - StructField::new("sizeInBytes", DataType::LONG, false), - StructField::new("modificationTime", DataType::LONG, false), - StructField::new("type", DataType::STRING, false), - tags_field(), - ]), - true, - ); - - static ref LOG_SCHEMA: StructType = StructType::new( - vec![ - ADD_FIELD.clone(), - CDC_FIELD.clone(), - COMMIT_INFO_FIELD.clone(), - DOMAIN_METADATA_FIELD.clone(), - METADATA_FIELD.clone(), - PROTOCOL_FIELD.clone(), - REMOVE_FIELD.clone(), - TXN_FIELD.clone(), - ] - ); -} - -fn tags_field() -> StructField { - StructField::new( - "tags", - MapType::new(DataType::STRING, DataType::STRING, true), - true, - ) -} - -fn partition_values_field() -> StructField { - StructField::new( - "partitionValues", - MapType::new(DataType::STRING, DataType::STRING, true), - false, - ) +/// A trait that allows getting a `StructField` based on the provided name and nullability +pub(crate) trait GetField { + fn get_field(name: impl Into) -> StructField; } -fn deletion_vector_field() -> StructField { - StructField::new( - "deletionVector", - DataType::Struct(Box::new(StructType::new(vec![ - StructField::new("storageType", DataType::STRING, false), - StructField::new("pathOrInlineDv", DataType::STRING, false), - StructField::new("offset", DataType::INTEGER, true), - StructField::new("sizeInBytes", DataType::INTEGER, false), - StructField::new("cardinality", DataType::LONG, false), - ]))), - true, - ) +macro_rules! impl_get_field { + ( $(($rust_type: ty, $kernel_type: expr)), * ) => { + $( + impl GetField for $rust_type { + fn get_field(name: impl Into) -> StructField { + StructField::new(name, $kernel_type, false) + } + } + )* + }; } -#[cfg(test)] -pub(crate) fn log_schema() -> &'static StructType { - &LOG_SCHEMA +impl_get_field!( + (String, DataType::STRING), + (i64, DataType::LONG), + (i32, DataType::INTEGER), + (i16, DataType::SHORT), + (char, DataType::BYTE), + (f32, DataType::FLOAT), + (f64, DataType::DOUBLE), + (bool, DataType::BOOLEAN), + (HashMap, MapType::new(DataType::STRING, DataType::STRING, false)), + (HashMap>, MapType::new(DataType::STRING, DataType::STRING, true)), + (Vec, ArrayType::new(DataType::STRING, false)) +); + +impl GetField for Option { + fn get_field(name: impl Into) -> StructField { + let mut inner = T::get_field(name); + inner.nullable = true; + inner + } } diff --git a/kernel/src/actions/visitors.rs b/kernel/src/actions/visitors.rs index 3fb0a1589f..1656e91639 100644 --- a/kernel/src/actions/visitors.rs +++ b/kernel/src/actions/visitors.rs @@ -154,7 +154,7 @@ impl AddVisitor { modification_time, data_change, stats, - tags: HashMap::new(), + tags: None, deletion_vector, base_row_id, default_row_commit_version, @@ -196,20 +196,20 @@ impl RemoveVisitor { let size: Option = getters[5].get_opt(row_index, "remove.size")?; - // TODO(nick) stats are skipped in getters[6] and tags are skipped in getters[7] + // TODO(nick) tags are skipped in getters[6] let deletion_vector = if let Some(storage_type) = - getters[8].get_opt(row_index, "remove.deletionVector.storageType")? + getters[7].get_opt(row_index, "remove.deletionVector.storageType")? { // there is a storageType, so the whole DV must be there let path_or_inline_dv: String = - getters[9].get(row_index, "remove.deletionVector.pathOrInlineDv")?; + getters[8].get(row_index, "remove.deletionVector.pathOrInlineDv")?; let offset: Option = - getters[10].get_opt(row_index, "remove.deletionVector.offset")?; + getters[9].get_opt(row_index, "remove.deletionVector.offset")?; let size_in_bytes: i32 = - getters[11].get(row_index, "remove.deletionVector.sizeInBytes")?; + getters[10].get(row_index, "remove.deletionVector.sizeInBytes")?; let cardinality: i64 = - getters[12].get(row_index, "remove.deletionVector.cardinality")?; + getters[11].get(row_index, "remove.deletionVector.cardinality")?; Some(DeletionVectorDescriptor { storage_type, path_or_inline_dv, @@ -221,9 +221,9 @@ impl RemoveVisitor { None }; - let base_row_id: Option = getters[13].get_opt(row_index, "remove.baseRowId")?; + let base_row_id: Option = getters[12].get_opt(row_index, "remove.baseRowId")?; let default_row_commit_version: Option = - getters[14].get_opt(row_index, "remove.defaultRowCommitVersion")?; + getters[13].get_opt(row_index, "remove.defaultRowCommitVersion")?; Ok(Remove { path, @@ -262,8 +262,7 @@ mod tests { use super::*; use crate::{ - actions::schemas::log_schema, - schema::StructType, + actions::{get_log_schema, ADD_NAME}, simple_client::{data::SimpleData, json::SimpleJsonHandler, SimpleClient}, EngineData, EngineInterface, JsonHandler, }; @@ -286,7 +285,7 @@ mod tests { r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableDeletionVectors":"true","delta.columnMapping.mode":"none"},"createdTime":1677811175819}}"#, ] .into(); - let output_schema = Arc::new(log_schema().clone()); + let output_schema = Arc::new(get_log_schema().clone()); let parsed = handler .parse_json(string_array_to_engine_data(json_strings), output_schema) .unwrap(); @@ -352,15 +351,15 @@ mod tests { r#"{"add":{"path":"c1=6/c2=a/part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet","partitionValues":{"c1":"6","c2":"a"},"size":452,"modificationTime":1670892998137,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":4},\"maxValues\":{\"c3\":4},\"nullCount\":{\"c3\":0}}"}}"#, ] .into(); - let output_schema = Arc::new(log_schema().clone()); + let output_schema = Arc::new(get_log_schema().clone()); let batch = json_handler .parse_json(string_array_to_engine_data(json_strings), output_schema) .unwrap(); - let add_schema = StructType::new(vec![crate::actions::schemas::ADD_FIELD.clone()]); + let add_schema = get_log_schema() + .project_as_schema(&[ADD_NAME]) + .expect("Can't get add schema"); let mut add_visitor = AddVisitor::default(); - batch - .extract(Arc::new(add_schema), &mut add_visitor) - .unwrap(); + batch.extract(add_schema, &mut add_visitor).unwrap(); let add1 = Add { path: "c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet".into(), partition_values: HashMap::from([ @@ -371,7 +370,7 @@ mod tests { modification_time: 1670892998135, data_change: true, stats: Some("{\"numRecords\":1,\"minValues\":{\"c3\":5},\"maxValues\":{\"c3\":5},\"nullCount\":{\"c3\":0}}".into()), - tags: HashMap::new(), + tags: None, deletion_vector: None, base_row_id: None, default_row_commit_version: None, diff --git a/kernel/src/client/json.rs b/kernel/src/client/json.rs index 2a95a13df1..87cb21f2f7 100644 --- a/kernel/src/client/json.rs +++ b/kernel/src/client/json.rs @@ -234,7 +234,7 @@ mod tests { use object_store::{local::LocalFileSystem, ObjectStore}; use super::*; - use crate::{actions::schemas::log_schema, executor::tokio::TokioBackgroundExecutor}; + use crate::{actions::get_log_schema, executor::tokio::TokioBackgroundExecutor}; fn string_array_to_engine_data(string_array: StringArray) -> Box { let string_field = Arc::new(Field::new("a", DataType::Utf8, true)); @@ -255,7 +255,7 @@ mod tests { r#"{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["deletionVectors"],"writerFeatures":["deletionVectors"]}}"#, r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableDeletionVectors":"true","delta.columnMapping.mode":"none"},"createdTime":1677811175819}}"#, ]); - let output_schema = Arc::new(log_schema().clone()); + let output_schema = Arc::new(get_log_schema().clone()); let batch = handler .parse_json(string_array_to_engine_data(json_strings), output_schema) @@ -282,7 +282,7 @@ mod tests { }]; let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new())); - let physical_schema = Arc::new(ArrowSchema::try_from(log_schema()).unwrap()); + let physical_schema = Arc::new(ArrowSchema::try_from(get_log_schema()).unwrap()); let data: Vec = handler .read_json_files(files, Arc::new(physical_schema.try_into().unwrap()), None) .unwrap() diff --git a/kernel/src/scan/file_stream.rs b/kernel/src/scan/file_stream.rs index 3721408194..a2cf11245d 100644 --- a/kernel/src/scan/file_stream.rs +++ b/kernel/src/scan/file_stream.rs @@ -1,14 +1,14 @@ use std::collections::HashSet; -use std::sync::Arc; use either::Either; use tracing::debug; use super::data_skipping::DataSkippingFilter; +use crate::actions::{get_log_schema, ADD_NAME, REMOVE_NAME}; use crate::actions::{visitors::AddVisitor, visitors::RemoveVisitor, Add, Remove}; use crate::engine_data::{GetData, TypedGetData}; use crate::expressions::Expression; -use crate::schema::{SchemaRef, StructType}; +use crate::schema::SchemaRef; use crate::{DataVisitor, DeltaResult, EngineData, EngineInterface}; struct LogReplayScanner { @@ -80,18 +80,15 @@ impl LogReplayScanner { None => actions, }; - let schema_to_use = StructType::new(if is_log_batch { - vec![ - crate::actions::schemas::ADD_FIELD.clone(), - crate::actions::schemas::REMOVE_FIELD.clone(), - ] + let schema_to_use = if is_log_batch { + get_log_schema().project_as_schema(&[ADD_NAME, REMOVE_NAME])? } else { // All checkpoint actions are already reconciled and Remove actions in checkpoint files // only serve as tombstones for vacuum jobs. So no need to load them here. - vec![crate::actions::schemas::ADD_FIELD.clone()] - }); + get_log_schema().project_as_schema(&[ADD_NAME])? + }; let mut visitor = AddRemoveVisitor::default(); - actions.extract(Arc::new(schema_to_use), &mut visitor)?; + actions.extract(schema_to_use, &mut visitor)?; for remove in visitor.removes.into_iter() { self.seen diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 6dc94d41d1..1806819c35 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use itertools::Itertools; use self::file_stream::log_replay_iter; -use crate::actions::Add; +use crate::actions::{get_log_schema, Add, ADD_NAME, REMOVE_NAME}; use crate::expressions::{Expression, Scalar}; use crate::schema::{DataType, SchemaRef, StructType}; use crate::snapshot::Snapshot; @@ -128,10 +128,7 @@ impl Scan { &self, engine_interface: &dyn EngineInterface, ) -> DeltaResult>> { - let action_schema = Arc::new(StructType::new(vec![ - crate::actions::schemas::ADD_FIELD.clone(), - crate::actions::schemas::REMOVE_FIELD.clone(), - ])); + let action_schema = get_log_schema().project_as_schema(&[ADD_NAME, REMOVE_NAME])?; let log_iter = self.snapshot.log_segment.replay( engine_interface, diff --git a/kernel/src/schema.rs b/kernel/src/schema.rs index 7d3050af2f..94b29834b8 100644 --- a/kernel/src/schema.rs +++ b/kernel/src/schema.rs @@ -3,8 +3,11 @@ use std::sync::Arc; use std::{collections::HashMap, fmt::Display}; use indexmap::IndexMap; +use itertools::Itertools; use serde::{Deserialize, Serialize}; +use crate::{DeltaResult, Error}; + pub type Schema = StructType; pub type SchemaRef = Arc; @@ -140,6 +143,38 @@ impl StructType { } } + /// Get a [`StructType`] containing [`StructField`]s of the given names, preserving the original + /// order of fields. Returns an Err if a specified field doesn't exist + pub fn project(&self, names: &[impl AsRef]) -> DeltaResult { + let mut indexes: Vec = names + .iter() + .map(|name| { + self.fields + .get_index_of(name.as_ref()) + .ok_or_else(|| Error::missing_column(name.as_ref())) + }) + .try_collect()?; + indexes.sort(); // keep schema order + let fields: Vec = indexes + .iter() + .map(|index| { + self.fields + .get_index(*index) + .expect("get_index_of returned non-existant index") + .1 + .clone() + }) + .collect(); + Ok(Self::new(fields)) + } + + /// Get a [`SchemaRef`] containing [`StructField`]s of the given names, preserving the original + /// order of fields. Returns an Err if a specified field doesn't exist + pub fn project_as_schema(&self, names: &[impl AsRef]) -> DeltaResult { + let struct_type = self.project(names)?; + Ok(Arc::new(struct_type)) + } + pub fn field(&self, name: impl AsRef) -> Option<&StructField> { self.fields.get(name.as_ref()) } diff --git a/kernel/src/simple_client/data.rs b/kernel/src/simple_client/data.rs index 941c30a480..5b09997087 100644 --- a/kernel/src/simple_client/data.rs +++ b/kernel/src/simple_client/data.rs @@ -214,13 +214,16 @@ impl SimpleData { .filter(|a| *a.data_type() != ArrowDataType::Null); // Note: if col is None we have either: // a) encountered a column that is all nulls or, - // b) recursed into a struct that was all null. - // So below if the field is allowed to be null, we push that, otherwise we error out. + // b) recursed into a optional struct that was null. In this case, array.is_none() is + // true and we don't need to check field nullability, because we assume all fields + // of a nullable struct can be null + // So below if the field is allowed to be null, OR array.is_none() we push that, + // otherwise we error out. if let Some(col) = col { Self::extract_column(out_col_array, field, col)?; - } else if field.is_nullable() { - if let DataType::Struct(_) = field.data_type() { - Self::extract_columns_from_array(out_col_array, schema, None)?; + } else if array.is_none() || field.is_nullable() { + if let DataType::Struct(inner_struct) = field.data_type() { + Self::extract_columns_from_array(out_col_array, inner_struct.as_ref(), None)?; } else { debug!("Pushing a null field for {}", field.name); out_col_array.push(&()); @@ -314,10 +317,10 @@ mod tests { use arrow_array::{RecordBatch, StringArray}; use arrow_schema::{DataType, Field, Schema as ArrowSchema}; - use crate::actions::Metadata; + use crate::actions::{Metadata, Protocol}; use crate::DeltaResult; use crate::{ - actions::schemas::log_schema, + actions::get_log_schema, simple_client::{data::SimpleData, SimpleClient}, EngineData, EngineInterface, }; @@ -338,7 +341,7 @@ mod tests { r#"{"metaData":{"id":"aff5cb91-8cd9-4195-aef9-446908507302","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}"#, ] .into(); - let output_schema = Arc::new(log_schema().clone()); + let output_schema = Arc::new(get_log_schema().clone()); let parsed = handler .parse_json(string_array_to_engine_data(json_strings), output_schema) .unwrap(); @@ -348,4 +351,21 @@ mod tests { assert_eq!(metadata.partition_columns, vec!("c1", "c2")); Ok(()) } + + #[test] + fn test_nullable_struct() -> DeltaResult<()> { + let client = SimpleClient::new(); + let handler = client.get_json_handler(); + let json_strings: StringArray = vec![ + r#"{"metaData":{"id":"aff5cb91-8cd9-4195-aef9-446908507302","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}"#, + ] + .into(); + let output_schema = get_log_schema().project_as_schema(&["metaData"])?; + let parsed = handler + .parse_json(string_array_to_engine_data(json_strings), output_schema) + .unwrap(); + let protocol = Protocol::try_new_from_data(parsed.as_ref())?; + assert!(protocol.is_none()); + Ok(()) + } } diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index f7df065a65..84c7ae77dd 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -9,9 +9,9 @@ use itertools::Itertools; use serde::{Deserialize, Serialize}; use url::Url; -use crate::actions::{Metadata, Protocol}; +use crate::actions::{get_log_schema, Metadata, Protocol, METADATA_NAME, PROTOCOL_NAME}; use crate::path::LogPath; -use crate::schema::{Schema, SchemaRef, StructType}; +use crate::schema::{Schema, SchemaRef}; use crate::{DeltaResult, EngineInterface, Error, FileMeta, FileSystemClient, Version}; use crate::{EngineData, Expression}; @@ -66,11 +66,8 @@ impl LogSegment { &self, engine_interface: &dyn EngineInterface, ) -> DeltaResult> { - let schema = StructType::new(vec![ - crate::actions::schemas::METADATA_FIELD.clone(), - crate::actions::schemas::PROTOCOL_FIELD.clone(), - ]); - let data_batches = self.replay(engine_interface, Arc::new(schema), None)?; + let schema = get_log_schema().project_as_schema(&[METADATA_NAME, PROTOCOL_NAME])?; + let data_batches = self.replay(engine_interface, schema, None)?; let mut metadata_opt: Option = None; let mut protocol_opt: Option = None; for batch in data_batches {