Schema derive macro (reborn)#156
Conversation
Co-authored-by: Ryan Johnson <ryan.johnson@databricks.com>
(at cost of returning a ref)
| /// allows the use of standard rust snake_case, and will convert to the correct delta schema | ||
| /// camelCase version). | ||
| #[proc_macro_derive(Schema)] | ||
| pub fn derive_schema(input: proc_macro::TokenStream) -> proc_macro::TokenStream { |
There was a problem hiding this comment.
comment above added to address https://github.com/delta-incubator/delta-kernel-rs/pull/129/files#r1527171512
This will show up in the docs for the derive-macros crate as:

There was a problem hiding this comment.
If you wanted you could also omit the crate from docs with #[doc(hidden)]
There was a problem hiding this comment.
I sort of consider the crate docs like "implementer" docs. Like someone writing a connector would read them. I think understanding what this macro does is useful from that perspective, so likely it should be left in.
| let name = field.ident.as_ref().unwrap(); // we know these are named fields | ||
| let name = get_schema_name(name); | ||
| match field.ty { | ||
| Type::Path(ref type_path) => { |
There was a problem hiding this comment.
rescuing https://github.com/delta-incubator/delta-kernel-rs/pull/129/files#r1527170864:
Don't we need to emit the fully qualified type name, in case the user didn't
usethe (full) path to it?
(especially since, if I understand correctly, this is an unresolved token stream, so any qualifiers the user gave are probably needed for it to compile at all)
I've changed it here to join the full type path that was specified at the derive site. This means that if they have useed it, it will just be the name of the type, and if they have put::a:full::Path, that will happen here too
|
|
||
| /// Map containing metadata about this logical file. | ||
| pub tags: HashMap<String, Option<String>>, | ||
| pub tags: Option<HashMap<String, Option<String>>>, |
There was a problem hiding this comment.
tags are optional in the spec, so this is a bug fix
There was a problem hiding this comment.
Can we please just remove the Option<String> business? It complicates the schema handling and there's nothing in the Delta spec that allows for it -- add.tags is a string-string map.
There was a problem hiding this comment.
(ah, it's being tracked in the other comment... but still I wonder why wait)
There was a problem hiding this comment.
Indeed. Changed to <String,String> everywhere (seems to work fine)
| (f64, DataType::DOUBLE), | ||
| (bool, DataType::BOOLEAN), | ||
| (HashMap<String, String>, MapType::new(DataType::STRING, DataType::STRING, false)), | ||
| (HashMap<String, Option<String>>, MapType::new(DataType::STRING, DataType::STRING, true)), |
There was a problem hiding this comment.
quoted comment:
Do we have a ticket somewhere that tracks getting rid of this map-of-option-of-string thing?
(it keeps coming up in various PR)Delta was originally written in Scala (Java variant), where there's no such thing as a nullable map entry: looking up a non-existent key returns null as a sentinel value. That makes it hard for me to imagine a case where map-of-option-of-string could ever be anything but a semantic overhead.
I guess we need to double check whether the json-serialized form of a string-string map somehow allows null entries. But even if it does somehow happen, I'd rather filter out such entries at parsing time rather than propagate them through the rest of the system?
Yes, #128. I'll get to that in not too long I hope
| let size: Option<i64> = getters[5].get_opt(row_index, "remove.size")?; | ||
|
|
||
| // TODO(nick) stats are skipped in getters[6] and tags are skipped in getters[7] | ||
| // TODO(nick) tags are skipped in getters[6] |
There was a problem hiding this comment.
Rescued comment: https://github.com/delta-incubator/delta-kernel-rs/pull/129/files#r1513701837
Yes, will have a follow-up to derive visitors.
| crate::actions::schemas::METADATA_FIELD.clone(), | ||
| crate::actions::schemas::PROTOCOL_FIELD.clone(), | ||
| ])); | ||
| let schema = get_log_schema().project_as_schema(&[METADATA_NAME, PROTOCOL_NAME])?; |
There was a problem hiding this comment.
rescuing https://github.com/delta-incubator/delta-kernel-rs/pull/129/files#r1527196393
similar to other comment -- now this is only correct if metadata comes before protocol in the log schema.
I don't think that's true, we use get_index_of on the set of names, so it'll just find the correct items no matter the order given here. I've switched from &[METADATA_NAME, PROTOCOL_NAME] in the old PR to &[PROTOCOL_NAME, METADATA_NAME] to prove it works :)
There was a problem hiding this comment.
You're right that this one works, because the try_new_from_data methods do another projection of their own, to extract the column they care about. The other #129 (comment) is more serious because it directly uses the projected schema. We can pick up the discussion there, tho.
| .iter() | ||
| .map(|name| { | ||
| self.fields | ||
| .get_index_of(name.as_ref()) |
There was a problem hiding this comment.
rescuing: https://github.com/delta-incubator/delta-kernel-rs/pull/129/files#r1527174598
this will have O(n*m) cost, where n is the schema arity and m is the projection arity. Perhaps we could borrow spark's approach, which creates a set from the names (presumably m is much smaller than n) and then does a single filter-map pass over the fields, returning only those fields whose name is present in the set of names. Also avoids the other two passes (sort and index).
There was a problem hiding this comment.
I don't think that's quiet correct. get_index_of is O(1), so this is O(m) in the number of requested fields, plus a sort.
Given that the set of fields is quiet small, I think that should be as fast, if not faster, than first hashing the input fields, and probing while iterating all the schema fields. Especially if m is much smaller than n
There was a problem hiding this comment.
get_index_ofisO(1), so this isO(m)in the number of requested fields, plus a sort.
You're right. I keep forgetting that.
Going by the other comment in file_stream.rs below tho -- maybe it's not even desirable to preserve schema order in the first place. If so, then I think this code simplifies to something like:
let fields = names
.iter()
.map(|name| {
self.fields
.get(name.as_ref())
.ok_or_else(...)
})
.try_collect()?;
Ok(Self::new(fields))(I think rust can infer the type of fields thanks to a combination of try_collect and Self::new requiring a Vec)
| crate::actions::schemas::METADATA_FIELD.clone(), | ||
| crate::actions::schemas::PROTOCOL_FIELD.clone(), | ||
| ])); | ||
| let schema = get_log_schema().project_as_schema(&[METADATA_NAME, PROTOCOL_NAME])?; |
There was a problem hiding this comment.
You're right that this one works, because the try_new_from_data methods do another projection of their own, to extract the column they care about. The other #129 (comment) is more serious because it directly uses the projected schema. We can pick up the discussion there, tho.
| crate::actions::schemas::REMOVE_FIELD.clone(), | ||
| ] | ||
| let schema_to_use = if is_log_batch { | ||
| get_log_schema().project_as_schema(&[ADD_NAME, REMOVE_NAME])? |
There was a problem hiding this comment.
Rescuing #129 (comment)
- This schema is directly used by the
extractbelow. - The AddRemoveVisitor hard-wires the assumption that
addis the first column andremove(if present) is the second column (by slicing the exploded list of getters) - In fact, the
schema_to_useonly hasAddbeforeRemovebecause that's how get_log_schema happens to define it (becuseproject_as_schemapreserves original schema order). - If we reversed the ordering in
get_log_schemathis code would break badly. Even if we wanted to rework theAddRemoveVisitorto hard-wireRemovefirst, it would still break for checkpoints that don't pass theRemovecolumn in the first place.
Overall, I worry that project actually needs the ability reorder columns, or we'll get bitten repeatedly by gotchas like this.
There was a problem hiding this comment.
Yep. Project now honors the order passed to project
| self.fields | ||
| .get_index(*index) | ||
| .expect("get_index_of returned non-existant index") | ||
| .1 |
There was a problem hiding this comment.
aside (rust n00b question): What is .1? That's not legal syntax in most languages?
There was a problem hiding this comment.
It gets the second element of the tuple returned by get_index, with .0 being the first.
| .iter() | ||
| .map(|name| { | ||
| self.fields | ||
| .get_index_of(name.as_ref()) |
There was a problem hiding this comment.
get_index_ofisO(1), so this isO(m)in the number of requested fields, plus a sort.
You're right. I keep forgetting that.
Going by the other comment in file_stream.rs below tho -- maybe it's not even desirable to preserve schema order in the first place. If so, then I think this code simplifies to something like:
let fields = names
.iter()
.map(|name| {
self.fields
.get(name.as_ref())
.ok_or_else(...)
})
.try_collect()?;
Ok(Self::new(fields))(I think rust can infer the type of fields thanks to a combination of try_collect and Self::new requiring a Vec)
| /// Get a [`SchemaRef`] containing [`StructField`]s of the given names, preserving the original | ||
| /// order of fields. Returns an Err if a specified field doesn't exist | ||
| pub fn project_as_schema(&self, names: &[impl AsRef<str>]) -> DeltaResult<SchemaRef> { | ||
| let struct_type = self.project(names)?; |
There was a problem hiding this comment.
This seems to be the only call site for project... should we consider calling this method project, and we either rename the other project_as_struct or just fold it into this method?
64955b7 to
c69f7ce
Compare
zachschuermann
left a comment
There was a problem hiding this comment.
posted some comments from our discussion earlier, otherwise lgtm
| parquet_schema.index_of(field.name()) | ||
| .enumerate() | ||
| .filter_map(|(parquet_position, field)| { | ||
| requested_schema.index_of(field.name()).map(|index| { |
There was a problem hiding this comment.
another comment from our discussion earlier: consider duplicate names? I'm actually not even sure if that's allowed in parquet/elsewhere?
There was a problem hiding this comment.
Kernel might need to handle duplicate column names if an expression from the user projects them, but I don't think the parquet reader should have to? Parquet itself doesn't allow duplicates, and it would be wasteful to allow duplicates at parquet scan level.
There was a problem hiding this comment.
We use an IndexMap to represent our schema. The field names are the keys, so there isn't a possibility of a duplicate here.
I've added #164 as potential follow-up work
scovich
left a comment
There was a problem hiding this comment.
Belated review, but possibly worth cutting a new PR to incorporate the suggested simplifications?
| parquet_schema.index_of(field.name()) | ||
| .enumerate() | ||
| .filter_map(|(parquet_position, field)| { | ||
| requested_schema.index_of(field.name()).map(|index| { |
There was a problem hiding this comment.
Kernel might need to handle duplicate column names if an expression from the user projects them, but I don't think the parquet reader should have to? Parquet itself doesn't allow duplicates, and it would be wasteful to allow duplicates at parquet scan level.
| .filter_map(|(parquet_position, field)| { | ||
| requested_schema.index_of(field.name()).map(|index| { | ||
| found_count += 1; | ||
| mask_indicies[index] = parquet_position; |
There was a problem hiding this comment.
How do we distinguish between reading parquet column 0 first vs. "no match"? e.g. if parquet schema is [a, b, c] and read schema is [a], wouldn't that produce mask_indices [0, 0, 0]?
There was a problem hiding this comment.
Update: The docs for ProjectionMask.roots state that
repeated or out of order indices will not impact the final mask, i.e.
[0, 1, 2]will construct the same mask as[1, 0, 0, 2]
I think that means we can simplify things a lot by doing something like:
let (selected_indices, reorder_indices): (Vec<_>, Vec<_>) = parquet_schema
.fields()
.iterator()
.enumerate()
.filter_map(|parquet_index, field| {
requested_schema.index_of(field.name()).map(|index| (parquet_index, index))
}
.unzip();
require!(selected_indices.len() == requested_schema.fields().len(), Error::generic(...));
if let Some(mask) = generate_mask(builder.parquet_schema(), selected_indices) {
...
}
let mut rb = ...;
if (!reorder_indices.windows(2).all(|a, b| a < b)) {
// requested columns are not in natural order
let reordered_columns = reorder_indices
.iterator()
.map(|index| rb.column(*index).clone()) // cheap Arc clone
.collect();
let schema = Arc::new(ArrowSchema::new(requested_schema));
rb = RecordBatch::try_new(schema, reordered_columns)?;
}Note: The generate_mask method only needs two args with the above code -- the physical schema and the indices of fields to select.
| if indicies.windows(2).all(|is| is[0] <= is[1]) { | ||
| if mask_indicies.windows(2).all(|is| is[0] <= is[1]) { | ||
| // indicies is already sorted, meaning we requested in the order that the columns were | ||
| // stored in the parquet |
There was a problem hiding this comment.
I think this might be too strict a check? If parquet schema were [a, b, c, d] and read schema were [b, d] then the mask indexes would appear unordered because of the gaps? Shouldn't we instead be checking whether the requested ordering is ordered?
There was a problem hiding this comment.
this does just check that i think? it just checks that for each pair of two elements the earlier one is <= to the later one. so for your example it would be checking that 2<=4, which is true.
this can be changed to < probably, since we won't have duplicate indices.
There was a problem hiding this comment.
Ah, good point. Not sure what I was thinking.
| pub(crate) fn generate_mask( | ||
| requested_schema: &ArrowSchema, | ||
| requested_schema: &SchemaRef, | ||
| parquet_schema: &ArrowSchemaRef, |
There was a problem hiding this comment.
What purpose does the parquet_schema arg serve, as different from the parquet_physical_schema? Seems like we could just check the latter's length at L58 below, and then wouldn't need the extra arg any more?
Reopening #129 .
Description from there:
Work to be able to do:
This also then uses the generated schemas from our actions everywhere, and removes the old schema definitions. To use these we define a static
LOG_SCHEMAwhich calls all the generatedget_struct_fieldmethods to build the total log schema. We also add aprojectmethod toStructTypeso we can then pick out the columns we want when doing something like parsing only metadata.How it works
We introduce a trait,
ToDataTypewhich is:All primitive types as well as vectors and maps know how to produce a
DataType. Then we can derive a struct implementation of it by annotating with this macro, which will generate an impl that looks like:Finally, we add a
GetStructFieldtrait, which is auto derived for anything that implementsToDataType. It simply allows you to pass a name and get aStructFieldback. A schema is just aVecofStructFields, so with this implemented, it's trivial to construct the schema for any type annotated with#[derive(Schema)]Right now all error handling is via
panic, but that's generally okay as the compiler just fails with the panic message.If you want to inspect the generated code, you can use cargo expand. Install it, and then run something like:
$ cargo expand | grep -A50 "ToDataType for Protocol"and replace
Protocolwith whatever struct you want to see the impl for.