From 3a4b94230b4b871ad78f7a9ab05800cf7a9470ba Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Thu, 21 May 2026 10:57:24 +0800 Subject: [PATCH 1/3] Optimize metric label cloning --- datafusion/datasource-parquet/src/metrics.rs | 76 +++++++++-------- .../src/metrics/builder.rs | 4 +- .../physical-expr-common/src/metrics/mod.rs | 82 +++++++++++++++++-- 3 files changed, 117 insertions(+), 45 deletions(-) diff --git a/datafusion/datasource-parquet/src/metrics.rs b/datafusion/datasource-parquet/src/metrics.rs index 262dde024a527..e7aa284f44b26 100644 --- a/datafusion/datasource-parquet/src/metrics.rs +++ b/datafusion/datasource-parquet/src/metrics.rs @@ -16,8 +16,8 @@ // under the License. use datafusion_physical_plan::metrics::{ - Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricCategory, MetricType, - PruningMetrics, RatioMergeStrategy, RatioMetrics, Time, + Count, ExecutionPlanMetricsSet, Gauge, Label, MetricBuilder, MetricCategory, + MetricType, PruningMetrics, RatioMergeStrategy, RatioMetrics, Time, }; /// Stores metrics about the parquet execution for a particular parquet file. @@ -100,37 +100,42 @@ impl ParquetFileMetrics { filename: &str, metrics: &ExecutionPlanMetricsSet, ) -> Self { + // Share the filename label across all per-file metrics to avoid + // allocating the same filename string for each metric. + let filename_label = Label::new("filename", filename.to_string()); + let builder = MetricBuilder::new(metrics).with_label(filename_label); + // ----------------------- // 'summary' level metrics // ----------------------- - let row_groups_pruned_bloom_filter = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) + let row_groups_pruned_bloom_filter = builder + .clone() .with_type(MetricType::Summary) .pruning_metrics("row_groups_pruned_bloom_filter", partition); - let limit_pruned_row_groups = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) + let limit_pruned_row_groups = builder + .clone() .with_type(MetricType::Summary) .pruning_metrics("limit_pruned_row_groups", partition); - let row_groups_pruned_statistics = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) + let row_groups_pruned_statistics = builder + .clone() .with_type(MetricType::Summary) .pruning_metrics("row_groups_pruned_statistics", partition); - let page_index_pages_pruned = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) + let page_index_pages_pruned = builder + .clone() .with_type(MetricType::Summary) .pruning_metrics("page_index_pages_pruned", partition); - let bytes_scanned = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) + let bytes_scanned = builder + .clone() .with_type(MetricType::Summary) .with_category(MetricCategory::Bytes) .counter("bytes_scanned", partition); - let metadata_load_time = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) + let metadata_load_time = builder + .clone() .with_type(MetricType::Summary) .subset_time("metadata_load_time", partition); @@ -138,8 +143,8 @@ impl ParquetFileMetrics { .with_type(MetricType::Summary) .pruning_metrics("files_ranges_pruned_statistics", partition); - let scan_efficiency_ratio = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) + let scan_efficiency_ratio = builder + .clone() .with_type(MetricType::Summary) .ratio_metrics_with_strategy( "scan_efficiency_ratio", @@ -150,45 +155,44 @@ impl ParquetFileMetrics { // ----------------------- // 'dev' level metrics // ----------------------- - let predicate_evaluation_errors = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) + let predicate_evaluation_errors = builder + .clone() .with_category(MetricCategory::Rows) .counter("predicate_evaluation_errors", partition); - let pushdown_rows_pruned = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) + let pushdown_rows_pruned = builder + .clone() .with_category(MetricCategory::Rows) .counter("pushdown_rows_pruned", partition); - let pushdown_rows_matched = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) + let pushdown_rows_matched = builder + .clone() .with_category(MetricCategory::Rows) .counter("pushdown_rows_matched", partition); - let row_pushdown_eval_time = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) + let row_pushdown_eval_time = builder + .clone() .subset_time("row_pushdown_eval_time", partition); - let statistics_eval_time = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) + let statistics_eval_time = builder + .clone() .subset_time("statistics_eval_time", partition); - let bloom_filter_eval_time = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) + let bloom_filter_eval_time = builder + .clone() .subset_time("bloom_filter_eval_time", partition); - let page_index_eval_time = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) + let page_index_eval_time = builder + .clone() .subset_time("page_index_eval_time", partition); - let page_index_rows_pruned = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) + let page_index_rows_pruned = builder + .clone() .pruning_metrics("page_index_rows_pruned", partition); - let predicate_cache_inner_records = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) + let predicate_cache_inner_records = builder + .clone() .with_category(MetricCategory::Rows) .gauge("predicate_cache_inner_records", partition); - let predicate_cache_records = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) + let predicate_cache_records = builder .with_category(MetricCategory::Rows) .gauge("predicate_cache_records", partition); diff --git a/datafusion/physical-expr-common/src/metrics/builder.rs b/datafusion/physical-expr-common/src/metrics/builder.rs index e9c0b76af2582..6d9c572e3b6f0 100644 --- a/datafusion/physical-expr-common/src/metrics/builder.rs +++ b/datafusion/physical-expr-common/src/metrics/builder.rs @@ -31,7 +31,8 @@ use super::{ /// Structure for constructing metrics, counters, timers, etc. /// /// Note the use of `Cow<..>` is to avoid allocations in the common -/// case of constant strings +/// case of constant strings. Dynamically created label strings are shared when +/// [`Label`] values are cloned. /// /// ```rust /// use datafusion_physical_expr_common::metrics::*; @@ -47,6 +48,7 @@ use super::{ /// .with_new_label("filename", "my_awesome_file.parquet") /// .counter("num_bytes", partition); /// ``` +#[derive(Clone)] pub struct MetricBuilder<'a> { /// Location that the metric created by this builder will be added do metrics: &'a ExecutionPlanMetricsSet, diff --git a/datafusion/physical-expr-common/src/metrics/mod.rs b/datafusion/physical-expr-common/src/metrics/mod.rs index eecd8cfabd5eb..a76c7b9e049f4 100644 --- a/datafusion/physical-expr-common/src/metrics/mod.rs +++ b/datafusion/physical-expr-common/src/metrics/mod.rs @@ -30,6 +30,7 @@ use parking_lot::Mutex; use std::{ borrow::Cow, fmt::{self, Debug, Display}, + hash::{Hash, Hasher}, sync::Arc, vec::IntoIter, }; @@ -519,12 +520,14 @@ impl From for ExecutionPlanMetricsSet { /// telemetry], /// etc. /// -/// As the name and value are expected to mostly be constant strings, -/// use a [`Cow`] to avoid copying / allocations in this common case. +/// As the name and value are expected to often be constant strings, borrowed +/// static strings avoid allocations in that common case. Dynamic strings are +/// stored behind [`Arc`] so cloning labels does not copy the underlying +/// string data. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct Label { - name: Cow<'static, str>, - value: Cow<'static, str>, + name: LabelValue, + value: LabelValue, } impl Label { @@ -533,19 +536,19 @@ impl Label { name: impl Into>, value: impl Into>, ) -> Self { - let name = name.into(); - let value = value.into(); + let name = LabelValue::from(name.into()); + let value = LabelValue::from(value.into()); Self { name, value } } /// Returns the name of this label pub fn name(&self) -> &str { - self.name.as_ref() + self.name.as_str() } /// Returns the value of this label pub fn value(&self) -> &str { - self.value.as_ref() + self.value.as_str() } } @@ -555,6 +558,60 @@ impl Display for Label { } } +/// Internal representation for label names and values. +/// +/// `Static` preserves the existing allocation-free path for string literals. +/// `Shared` stores dynamic strings behind [`Arc`], so cloning a [`Label`] +/// only increments an atomic reference count and does not allocate or copy the +/// underlying string data. +#[derive(Clone, Eq)] +enum LabelValue { + Static(&'static str), + Shared(Arc), +} + +impl LabelValue { + fn as_str(&self) -> &str { + match self { + Self::Static(value) => value, + Self::Shared(value) => value.as_ref(), + } + } +} + +impl From> for LabelValue { + fn from(value: Cow<'static, str>) -> Self { + match value { + Cow::Borrowed(value) => Self::Static(value), + Cow::Owned(value) => Self::Shared(Arc::from(value)), + } + } +} + +impl PartialEq for LabelValue { + fn eq(&self, other: &Self) -> bool { + self.as_str() == other.as_str() + } +} + +impl Hash for LabelValue { + fn hash(&self, state: &mut H) { + self.as_str().hash(state); + } +} + +impl Debug for LabelValue { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + Debug::fmt(self.as_str(), f) + } +} + +impl Display for LabelValue { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + Display::fmt(self.as_str(), f) + } +} + #[cfg(test)] mod tests { use std::time::Duration; @@ -609,6 +666,15 @@ mod tests { assert_eq!("output_rows{partition=2, foo=bar}=66", metric.to_string()) } + #[test] + fn test_label_owned_and_borrowed_values_are_equal() { + let borrowed = Label::new("foo", "bar"); + let owned = Label::new("foo".to_string(), "bar".to_string()); + + assert_eq!(borrowed, owned); + assert_eq!(borrowed.to_string(), owned.to_string()); + } + #[test] fn test_output_rows() { let metrics = ExecutionPlanMetricsSet::new(); From 3c70c217b589e5236bb556d52b381b07953a8f1a Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Fri, 22 May 2026 13:19:21 +0800 Subject: [PATCH 2/3] Avoid intermediate string allocation for label values --- datafusion/datasource-parquet/src/metrics.rs | 4 +- .../src/metrics/builder.rs | 8 ++- .../physical-expr-common/src/metrics/mod.rs | 63 +++++++++++++------ 3 files changed, 53 insertions(+), 22 deletions(-) diff --git a/datafusion/datasource-parquet/src/metrics.rs b/datafusion/datasource-parquet/src/metrics.rs index e7aa284f44b26..4bf009afd6d63 100644 --- a/datafusion/datasource-parquet/src/metrics.rs +++ b/datafusion/datasource-parquet/src/metrics.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +use std::sync::Arc; + use datafusion_physical_plan::metrics::{ Count, ExecutionPlanMetricsSet, Gauge, Label, MetricBuilder, MetricCategory, MetricType, PruningMetrics, RatioMergeStrategy, RatioMetrics, Time, @@ -102,7 +104,7 @@ impl ParquetFileMetrics { ) -> Self { // Share the filename label across all per-file metrics to avoid // allocating the same filename string for each metric. - let filename_label = Label::new("filename", filename.to_string()); + let filename_label = Label::new("filename", Arc::::from(filename)); let builder = MetricBuilder::new(metrics).with_label(filename_label); // ----------------------- diff --git a/datafusion/physical-expr-common/src/metrics/builder.rs b/datafusion/physical-expr-common/src/metrics/builder.rs index 6d9c572e3b6f0..de9d1e03d88df 100644 --- a/datafusion/physical-expr-common/src/metrics/builder.rs +++ b/datafusion/physical-expr-common/src/metrics/builder.rs @@ -25,7 +25,8 @@ use crate::metrics::{ }; use super::{ - Count, ExecutionPlanMetricsSet, Gauge, Label, Metric, MetricValue, Time, Timestamp, + Count, ExecutionPlanMetricsSet, Gauge, Label, LabelValue, Metric, MetricValue, Time, + Timestamp, }; /// Structure for constructing metrics, counters, timers, etc. @@ -110,7 +111,10 @@ impl<'a> MetricBuilder<'a> { name: impl Into>, value: impl Into>, ) -> Self { - self.with_label(Label::new(name.into(), value.into())) + self.with_label(Label::new( + LabelValue::from(name.into()), + LabelValue::from(value.into()), + )) } /// Set the partition of the metric being constructed diff --git a/datafusion/physical-expr-common/src/metrics/mod.rs b/datafusion/physical-expr-common/src/metrics/mod.rs index a76c7b9e049f4..17c43c60b6f2f 100644 --- a/datafusion/physical-expr-common/src/metrics/mod.rs +++ b/datafusion/physical-expr-common/src/metrics/mod.rs @@ -532,12 +532,9 @@ pub struct Label { impl Label { /// Create a new [`Label`] - pub fn new( - name: impl Into>, - value: impl Into>, - ) -> Self { - let name = LabelValue::from(name.into()); - let value = LabelValue::from(value.into()); + pub fn new(name: impl Into, value: impl Into) -> Self { + let name = name.into(); + let value = value.into(); Self { name, value } } @@ -558,32 +555,55 @@ impl Display for Label { } } -/// Internal representation for label names and values. +/// A label name or value. /// -/// `Static` preserves the existing allocation-free path for string literals. -/// `Shared` stores dynamic strings behind [`Arc`], so cloning a [`Label`] -/// only increments an atomic reference count and does not allocate or copy the -/// underlying string data. -#[derive(Clone, Eq)] -enum LabelValue { +/// String literals preserve the existing allocation-free path. Dynamic strings +/// can be stored behind [`Arc`], so cloning a [`Label`] only increments an +/// atomic reference count and does not allocate or copy the underlying string +/// data. +#[derive(Clone)] +pub struct LabelValue(LabelValueInner); + +/// Internal representation for label names and values. +#[derive(Clone)] +enum LabelValueInner { Static(&'static str), Shared(Arc), } impl LabelValue { - fn as_str(&self) -> &str { - match self { - Self::Static(value) => value, - Self::Shared(value) => value.as_ref(), + /// Return this label value as a string slice. + pub fn as_str(&self) -> &str { + match &self.0 { + LabelValueInner::Static(value) => value, + LabelValueInner::Shared(value) => value.as_ref(), } } } +impl From<&'static str> for LabelValue { + fn from(value: &'static str) -> Self { + Self(LabelValueInner::Static(value)) + } +} + +impl From for LabelValue { + fn from(value: String) -> Self { + Self(LabelValueInner::Shared(Arc::from(value))) + } +} + +impl From> for LabelValue { + fn from(value: Arc) -> Self { + Self(LabelValueInner::Shared(value)) + } +} + impl From> for LabelValue { fn from(value: Cow<'static, str>) -> Self { match value { - Cow::Borrowed(value) => Self::Static(value), - Cow::Owned(value) => Self::Shared(Arc::from(value)), + Cow::Borrowed(value) => value.into(), + Cow::Owned(value) => value.into(), } } } @@ -594,6 +614,8 @@ impl PartialEq for LabelValue { } } +impl Eq for LabelValue {} + impl Hash for LabelValue { fn hash(&self, state: &mut H) { self.as_str().hash(state); @@ -670,9 +692,12 @@ mod tests { fn test_label_owned_and_borrowed_values_are_equal() { let borrowed = Label::new("foo", "bar"); let owned = Label::new("foo".to_string(), "bar".to_string()); + let shared = Label::new("foo", Arc::::from("bar")); assert_eq!(borrowed, owned); + assert_eq!(borrowed, shared); assert_eq!(borrowed.to_string(), owned.to_string()); + assert_eq!(borrowed.to_string(), shared.to_string()); } #[test] From fe340a85c461ca2be9cff58769ad760aef2cf4f3 Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Fri, 22 May 2026 13:28:14 +0800 Subject: [PATCH 3/3] Document private label value storage --- datafusion/physical-expr-common/src/metrics/mod.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/datafusion/physical-expr-common/src/metrics/mod.rs b/datafusion/physical-expr-common/src/metrics/mod.rs index 17c43c60b6f2f..0a03075b91094 100644 --- a/datafusion/physical-expr-common/src/metrics/mod.rs +++ b/datafusion/physical-expr-common/src/metrics/mod.rs @@ -565,6 +565,10 @@ impl Display for Label { pub struct LabelValue(LabelValueInner); /// Internal representation for label names and values. +/// +/// `LabelValue` is public because `Label::new` accepts it, but these storage +/// variants are implementation details. Keeping them private prevents external +/// code from constructing or matching on `Static` and `Shared` directly. #[derive(Clone)] enum LabelValueInner { Static(&'static str),