diff --git a/datafusion/proto-models/proto/datafusion.proto b/datafusion/proto-models/proto/datafusion.proto index e627e6dd4e89e..ea6d078366625 100644 --- a/datafusion/proto-models/proto/datafusion.proto +++ b/datafusion/proto-models/proto/datafusion.proto @@ -1120,6 +1120,7 @@ message FileScanExecConf { optional uint64 batch_size = 12; optional ProjectionExprs projection_exprs = 13; + optional bool partitioned_by_file_group = 14; } message ParquetScanExecNode { diff --git a/datafusion/proto-models/src/generated/pbjson.rs b/datafusion/proto-models/src/generated/pbjson.rs index 26e8424023ecc..8e6997757f110 100644 --- a/datafusion/proto-models/src/generated/pbjson.rs +++ b/datafusion/proto-models/src/generated/pbjson.rs @@ -6848,6 +6848,9 @@ impl serde::Serialize for FileScanExecConf { if self.projection_exprs.is_some() { len += 1; } + if self.partitioned_by_file_group.is_some() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion.FileScanExecConf", len)?; if !self.file_groups.is_empty() { struct_ser.serialize_field("fileGroups", &self.file_groups)?; @@ -6884,6 +6887,9 @@ impl serde::Serialize for FileScanExecConf { if let Some(v) = self.projection_exprs.as_ref() { struct_ser.serialize_field("projectionExprs", v)?; } + if let Some(v) = self.partitioned_by_file_group.as_ref() { + struct_ser.serialize_field("partitionedByFileGroup", v)?; + } struct_ser.end() } } @@ -6911,6 +6917,8 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { "batchSize", "projection_exprs", "projectionExprs", + "partitioned_by_file_group", + "partitionedByFileGroup", ]; #[allow(clippy::enum_variant_names)] @@ -6926,6 +6934,7 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { Constraints, BatchSize, ProjectionExprs, + PartitionedByFileGroup, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -6958,6 +6967,7 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { "constraints" => Ok(GeneratedField::Constraints), "batchSize" | "batch_size" => Ok(GeneratedField::BatchSize), "projectionExprs" | "projection_exprs" => Ok(GeneratedField::ProjectionExprs), + "partitionedByFileGroup" | "partitioned_by_file_group" => Ok(GeneratedField::PartitionedByFileGroup), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -6988,6 +6998,7 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { let mut constraints__ = None; let mut batch_size__ = None; let mut projection_exprs__ = None; + let mut partitioned_by_file_group__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::FileGroups => { @@ -7061,6 +7072,12 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { } projection_exprs__ = map_.next_value()?; } + GeneratedField::PartitionedByFileGroup => { + if partitioned_by_file_group__.is_some() { + return Err(serde::de::Error::duplicate_field("partitionedByFileGroup")); + } + partitioned_by_file_group__ = map_.next_value()?; + } } } Ok(FileScanExecConf { @@ -7075,6 +7092,7 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { constraints: constraints__, batch_size: batch_size__, projection_exprs: projection_exprs__, + partitioned_by_file_group: partitioned_by_file_group__, }) } } diff --git a/datafusion/proto-models/src/generated/prost.rs b/datafusion/proto-models/src/generated/prost.rs index 0b43e2e7d6e4a..d8187e65a501e 100644 --- a/datafusion/proto-models/src/generated/prost.rs +++ b/datafusion/proto-models/src/generated/prost.rs @@ -1677,6 +1677,8 @@ pub struct FileScanExecConf { pub batch_size: ::core::option::Option, #[prost(message, optional, tag = "13")] pub projection_exprs: ::core::option::Option, + #[prost(bool, optional, tag = "14")] + pub partitioned_by_file_group: ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct ParquetScanExecNode { diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index f5fd214ef683f..5b4b95d9c6591 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -711,6 +711,7 @@ pub fn parse_protobuf_file_scan_config( .with_limit(proto.limit.as_ref().map(|sl| sl.limit as usize)) .with_output_ordering(output_ordering) .with_batch_size(proto.batch_size.map(|s| s as usize)) + .with_partitioned_by_file_group(proto.partitioned_by_file_group.unwrap_or(false)) .build(); Ok(config) } diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 5181c9740130a..84de2cecbf17c 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -757,6 +757,7 @@ pub fn serialize_file_scan_config( constraints: Some(conf.constraints.clone().into()), batch_size: conf.batch_size.map(|s| s as u64), projection_exprs, + partitioned_by_file_group: Some(conf.partitioned_by_file_group), }) } diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 58b79d641b55d..f28d3a1f5b4de 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -4067,5 +4067,54 @@ fn test_custom_node_with_dynamic_filter_dedup_roundtrip() -> Result<()> { // rewrite can reconstruct the remapped form on the other side. assert_dynamic_filters_equal(deser_custom_df, deser_filter_df); assert_dynamic_filter_update_is_visible(deser_custom_df, deser_filter_df)?; + + Ok(()) +} + +#[test] +fn roundtrip_parquet_exec_partitioned_by_file_group() -> Result<()> { + use datafusion::datasource::physical_plan::FileScanConfig; + + let file_schema = + Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)])); + let file_source = Arc::new(ParquetSource::new(Arc::clone(&file_schema))); + let scan_config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new( + "/path/to/file.parquet".to_string(), + 1024, + )])]) + .with_partitioned_by_file_group(true) + .build(); + + assert!(scan_config.partitioned_by_file_group); + + let exec_plan: Arc = DataSourceExec::from_data_source(scan_config); + + let ctx = SessionContext::new(); + let codec = DefaultPhysicalExtensionCodec {}; + let proto_converter = DefaultPhysicalProtoConverter {}; + let bytes = physical_plan_to_bytes_with_proto_converter( + Arc::clone(&exec_plan), + &codec, + &proto_converter, + )?; + let result_plan = physical_plan_from_bytes_with_proto_converter( + bytes.as_ref(), + ctx.task_ctx().as_ref(), + &codec, + &proto_converter, + )?; + + let data_source_exec = result_plan + .downcast_ref::() + .expect("Expected DataSourceExec"); + let file_scan_config = data_source_exec + .data_source() + .downcast_ref::() + .expect("Expected FileScanConfig"); + + assert!(file_scan_config.partitioned_by_file_group); + Ok(()) }