Use a proc macro for being able to derive schemas for our action structs#129
Use a proc macro for being able to derive schemas for our action structs#129nicklan wants to merge 19 commits intodelta-io:mainfrom
Conversation
1fd9ff9 to
a52592f
Compare
ryan-johnson-databricks
left a comment
There was a problem hiding this comment.
Thanks for taking a stab at this! Having something to iterate on is so much better than guessing what it might be like to implement it.
Once we get this sorted out, we should also be able to derive the various visitors such as MetadataVisitor?
| } | ||
| } | ||
|
|
||
| fn get_data_type(path_segment: &PathSegment) -> Option<TokenStream> { |
There was a problem hiding this comment.
Some related issues here...
The name of the field is wrong. Unfortunately the struct name isn't exactly what we want (i.e. struct is named
Metadatabut schema name ismetaData. alsodeletionVectorvs.DeletionVectorDescriptor). We can likely use an attribute or a transformation rule to fix this.
I think the problem is that we try to return a StructField instead of DataType, which also makes life harder for everyone.
For example, the schema of any type doesn't have a "name" -- the type only becomes associated with a field name if/when it becomes a field of some struct. In Delta spark, for example, the Metadata action's name is metaData because that's the name given to it by the SingleAction type that unions all other action types, and the latter provides the schema we use to parse commit .json files.
Using StructField instead of DataType also leads to this get_data_type function. Once we define a schema as DataType instead, we can just impl GetSchema for all the basic types we want to support. See playground example.
The key to handling nullability is to recognize that it's not a property of the data type that might be null -- it's a property of the owning complex type. Thus, we would not impl<T: GetSchema> GetSchema for Option<T>. Instead, e.g. impl<T: GetSchema> GetSchema for Vec<T> covers non-nullable array elements, and impl<T: GetSchema> GetSchema for Vec<Option<T>> covers nullable array elements.
If we do all that, this macro's job gets simpler: Figure out the name, type, and nullability of each field (with name literally being the struct field name, type being the "base" type of that field, and nullability decided by whether the base type is wrapped in Option or not). The base types are handled recursively by appeal to the GetSchema trait, and any nested object that fails to implement the trait will trigger a compilation failure because the type bound is not satisfied.
There was a problem hiding this comment.
Yeah, that makes sense. Perhaps once #109 merges, we can look at changing the way we represent schemas and then make this macro less complex.
| if let Some(fin) = type_path.path.segments.iter().last() { | ||
| get_data_type(fin) | ||
| } else { | ||
| panic!("Path for generic type must have last") |
There was a problem hiding this comment.
Right now all error handling is via panic
Is that a problem for macros that are "running" at compile-time?
Intuitively, it should just result in a compilation error?
There was a problem hiding this comment.
Yeah, it's not really an issue, but doing something more complex like this will give nicer errors.
Not something we need to do up front, hence all the panics here
8da02b1 to
9389f06
Compare
| let size: Option<i64> = getters[5].get_opt(row_index, "remove.size")?; | ||
|
|
||
| // TODO(nick) stats are skipped in getters[6] and tags are skipped in getters[7] | ||
| // TODO(nick) tags are skipped in getters[6] |
There was a problem hiding this comment.
the stats field appears to have been mistakenly copied over into the schema from Add. Remove does not actually have a stats field, so this was all incorrect below (but we just hadn't tested it properly before)
There was a problem hiding this comment.
Next PR: derive macro for basic visitors? That would eliminate the possibility that they get out of sync.
For fields that are themselves structs, we could use that struct's own visitor (after verifying that the first non-nullable sub-field is non-null, in case the field was also nullable).
The one annoyance is all those e.g. "remove.xxx" error message helpers -- we'd have to either derive remove from the struct name, or else tell the macro the field name to use.
|
|
||
| /// Map containing metadata about this logical file. | ||
| pub tags: HashMap<String, Option<String>>, | ||
| pub tags: Option<HashMap<String, Option<String>>>, |
There was a problem hiding this comment.
tags are optional in the spec, so this is a bug fix
| let tokens: Vec<TokenTree> = list.tokens.clone().into_iter().collect(); | ||
| match tokens[..] { |
There was a problem hiding this comment.
Out of curiosity -- do we actually need to clone tokens? Or can we just slice and match it directly?
There was a problem hiding this comment.
| let tokens: Vec<TokenTree> = list.tokens.clone().into_iter().collect(); | |
| match tokens[..] { | |
| match list.tokens[..] { |
(might not need the ref matching any more after that?)
There was a problem hiding this comment.
(TokenStream)[https://doc.rust-lang.org/proc_macro/struct.TokenStream.html] doesn't support being used as a slice. it's really only an iter, which is why we collect it and then use it as a slice.
I think I could probably re-write this to avoid the clone, but it would look more like the previous code where we'd have to match one token at a time, and was much uglier.
| let schema = StructType::new(vec![crate::actions::schemas::METADATA_FIELD.clone()]); | ||
| let mut visitor = MetadataVisitor::default(); | ||
| data.extract(Arc::new(schema), &mut visitor)?; | ||
| data.extract(Metadata::get_schema(), &mut visitor)?; |
There was a problem hiding this comment.
I'm not convinced this get_schema method is helpful. Every read we perform is ultimately some projection of fields from actions::get_log_schema(), and the latter already names all the fields (metaData in this case).
Instead of "creating" a metadata schema, and needing to worry about "magically" getting the right field name, can we just filter get_log_schema()? Spark's StructType has specific methods for extracting one or several fields -- you pass 1+ fields to be projected, and they are returned in schema order. The rust analogue would be something like:
| data.extract(Metadata::get_schema(), &mut visitor)?; | |
| data.extract(get_log_schema().project_one("metaData"), &mut visitor)?; |
or, for the add+remove case in scan/mod.rs below,
let action_schema = Arc::new(StructType::new(vec![
Option::<Add>::get_field("add"),
Option::<Remove>::get_field("remove"),
]));becomes
let action_schema = Arc::new(get_log_schema().project(&["add", "remove"]));The one bummer is, I don't see any way to actually get away from the field names. Even if we use e.g. Metadata::get_schema() to "hide" the name, the selection logic still needs to know it.
There was a problem hiding this comment.
Alternatively, we could observe that the above is equivalent to:
| data.extract(Metadata::get_schema(), &mut visitor)?; | |
| data.extract(Arc::new(StructType::new(vec![Metadata::get_field("metaData")])), &mut visitor)?; |
... which is annoying because of arc/struct/vec wrappings. But we can fix that once for everyone by defining something like:
trait GetSchema : GetField {
fn get_schema(name: Into<String>) -> SchemaRef {
Arc::new(StructType::new(vec![Self::get_field(name)]))
}
}... which produces here:
| data.extract(Metadata::get_schema(), &mut visitor)?; | |
| data.extract(Metadata::get_schema("metaData"), &mut visitor)?; |
There was a problem hiding this comment.
I actually quite like the project option. It has a few advantages:
- It lets us make only
LOG_SCHEMAhave to be in alazy_static, so we can get rid of the somewhat trickyOnceLockconstruct currently used to make the generated schema static - We no longer need the annotations to allow field rename, so we can remove the most complex macro parsing code
There was a problem hiding this comment.
Sounds good to me!
We still need to solve the problem that the magic constant column name passed to project might not be correct, and trigger a runtime error. But that problem existed before, and at least now we can define a constant (in LogSchema perhaps?) for each top-level column name if we want?
| let mut visitor = ProtocolVisitor::default(); | ||
| let schema = StructType::new(vec![crate::actions::schemas::PROTOCOL_FIELD.clone()]); | ||
| data.extract(Arc::new(schema), &mut visitor)?; | ||
| data.extract(Protocol::get_schema(), &mut visitor)?; |
There was a problem hiding this comment.
How does this work? The protocol column of the EngineData should be nullable, since most rows will contain some other type. But this is non-nullable?
There was a problem hiding this comment.
It works because even if we didn't read any protocol objects we still read using a schema with a protocol column in it. That means that at the top level when you do column_by_name("protocol") in the arrow you do get a StructArray with children that match the schema, it's just that those columns are all null.
This would give: Error::MissingData("Found required field protocol, but it's null") if called on an EngineData that had been read with the incorrect schema. That feels maybe correct, but we could also have it just not error out be return None in that case.
It does get a bit tricky to model though. What are the semantics of a struct that is nullable, with fields that are not? I guess it's reasonable to say that if the struct is null, everything can be null. I'd have to modify the existing extract code though, I don't think it would handle that case properly (i.e. it would complain that your schema says a Protocol must have a minReaderVersion even if the data had no protocol column at all and we marked protocol as nullable)
There was a problem hiding this comment.
What are the semantics of a struct that is nullable, with fields that are not?
IIRC, parquet handles this case very badly in practice (corrupt file). Spark compensates by trying to force all children of a nullable field to themselves be nullable. See StructType.asNullable, for example, tho the latter only goes one layer deep instead of being fully transitive.
Given that spark, parquet, and arrow all seem to treat null-struct vs struct-of-null as ~equivalent, maybe we should just formalize the idea in kernel? An exploded field is nullable if it or any parent is nullable, and null if it or any parent is null?
There was a problem hiding this comment.
An exploded field is nullable if it or any parent is nullable, and null if it or any parent is null?
yes, I think this is the most logical way to represent this. I can update our extraction code to do this, and we'll need to be careful to document it for connectors so their extraction code can do the same.
Co-authored-by: Ryan Johnson <ryan.johnson@databricks.com>
(at cost of returning a ref)
44b7e07 to
b51b7b1
Compare
| Self::extract_columns_from_array(out_col_array, schema, None)?; | ||
| } else if array.is_none() || field.is_nullable() { | ||
| if let DataType::Struct(inner_struct) = field.data_type() { | ||
| Self::extract_columns_from_array(out_col_array, inner_struct.as_ref(), None)?; |
There was a problem hiding this comment.
this was a bug before where we were passing the parent schema instead of the child one
| } | ||
| } else { | ||
| quote_spanned! {field.span()=> | ||
| #type_ident::get_field(stringify!(#name)) |
There was a problem hiding this comment.
Don't we need to emit the fully qualified type name, in case the user didn't use the (full) path to it?
(especially since, if I understand correctly, this is an unresolved token stream, so any qualifiers the user gave are probably needed for it to compile at all)
| proc_macro::TokenStream::from(output) | ||
| } | ||
|
|
||
| // turn our struct name into the schema name, goes from snake_case to camelCase |
There was a problem hiding this comment.
I don't know where to put the doc comment, but somewhere we should be careful to explain that the actual field names are all mandated by Delta spec, and so the user of this macro is responsible to ensure that e.g. Metadata::schema_string is the snake-case-ified version of schemaString from Delta's Change Metadata action, in order to keep rust happy. This macro is written with the assumption that it merely undoes that (previously correctly performed) transformation.
The same explains why it's ok to use to_ascii_uppercase below -- all Delta field names are plain ASCII.
| (f64, DataType::DOUBLE), | ||
| (bool, DataType::BOOLEAN), | ||
| (HashMap<String, String>, MapType::new(DataType::STRING, DataType::STRING, false)), | ||
| (HashMap<String, Option<String>>, MapType::new(DataType::STRING, DataType::STRING, true)), |
There was a problem hiding this comment.
Do we have a ticket somewhere that tracks getting rid of this map-of-option-of-string thing?
(it keeps coming up in various PR)
Delta was originally written in Scala (Java variant), where there's no such thing as a nullable map entry: looking up a non-existent key returns null as a sentinel value. That makes it hard for me to imagine a case where map-of-option-of-string could ever be anything but a semantic overhead.
I guess we need to double check whether the json-serialized form of a string-string map somehow allows null entries. But even if it does somehow happen, I'd rather filter out such entries at parsing time rather than propagate them through the rest of the system?
| let size: Option<i64> = getters[5].get_opt(row_index, "remove.size")?; | ||
|
|
||
| // TODO(nick) stats are skipped in getters[6] and tags are skipped in getters[7] | ||
| // TODO(nick) tags are skipped in getters[6] |
There was a problem hiding this comment.
Next PR: derive macro for basic visitors? That would eliminate the possibility that they get out of sync.
For fields that are themselves structs, we could use that struct's own visitor (after verifying that the first non-nullable sub-field is non-null, in case the field was also nullable).
The one annoyance is all those e.g. "remove.xxx" error message helpers -- we'd have to either derive remove from the struct name, or else tell the macro the field name to use.
| .iter() | ||
| .map(|name| { | ||
| self.fields | ||
| .get_index_of(name.as_ref()) |
There was a problem hiding this comment.
This will have O(n*m) cost, where n is the schema arity and m is the projection arity. Perhaps we could borrow spark's approach, which creates a set from the names (presumably m is much smaller than n) and then does a single filter-map pass over the fields, returning only those fields whose name is present in the set of names. Also avoids the other two passes (sort and index).
| crate::actions::schemas::REMOVE_FIELD.clone(), | ||
| ] | ||
| let schema_to_use = if is_log_batch { | ||
| get_log_schema().project_as_schema(&[ADD_NAME, REMOVE_NAME])? |
There was a problem hiding this comment.
Because project_as_schema preserves schema order, this will only work because add comes before remove in the log schema. Could be a surprising pitfall worth a comment?
| crate::actions::schemas::PROTOCOL_FIELD.clone(), | ||
| ]); | ||
| let data_batches = self.replay(engine_interface, Arc::new(schema), None)?; | ||
| let schema = get_log_schema().project_as_schema(&[METADATA_NAME, PROTOCOL_NAME])?; |
There was a problem hiding this comment.
similar to other comment -- now this is only correct if metadata comes before protocol in the log schema.
Work to be able to do:
This also then uses the generated schemas from our actions everywhere, and removes the old schema definitions. To use these we define a static
LOG_SCHEMAwhich calls all the generatedget_fieldmethods to build the total log schema. We also add aprojectmethod toStructTypeso we can then pick out the columns we want when doing something like parsing only metadata.How it works
We introduce a trait,
GetFieldwhich is:And then adding the derive will generate an impl that looks like:
Right now all error handling is via
panic, but that's generally okay as the compiler just fails with the panic message.If you want to inspect the generated code, you can use cargo expand. Install it, and then run something like:
$ cargo expand | grep -A50 "impl crate::actions::GetField for Protocol"and replace
Protocolwith whatever struct you want to see the impl for.