diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index d1ada01c3773..269734e5a539 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -37,10 +37,12 @@ ring = { version = "0.17", default-features = false, features = ["wasm32_unknown ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] } [dependencies] +arrow-arith = { workspace = true, optional = true } arrow-array = { workspace = true, optional = true } arrow-buffer = { workspace = true, optional = true } arrow-csv = { workspace = true, optional = true } arrow-data = { workspace = true, optional = true } +arrow-ord = { workspace = true, optional = true } arrow-schema = { workspace = true, optional = true } arrow-select = { workspace = true, optional = true } arrow-ipc = { workspace = true, optional = true } @@ -49,7 +51,7 @@ parquet-variant = { workspace = true, optional = true } parquet-variant-json = { workspace = true, optional = true } parquet-variant-compute = { workspace = true, optional = true } -object_store = { version = "0.13.1", default-features = false, optional = true } +object_store = { version = "0.13.1", default-features = false, optional = true, features = ["tokio"] } bytes = { version = "1.1", default-features = false, features = ["std"] } thrift = { version = "0.17", default-features = false } @@ -104,7 +106,7 @@ default = ["arrow", "snap", "brotli", "flate2-zlib-rs", "lz4", "zstd", "base64", # Enable lz4 lz4 = ["lz4_flex"] # Enable arrow reader/writer APIs -arrow = ["base64", "arrow-array", "arrow-buffer", "arrow-data", "arrow-schema", "arrow-select", "arrow-ipc"] +arrow = ["base64", "arrow-arith", "arrow-array", "arrow-buffer", "arrow-data", "arrow-ord", "arrow-schema", "arrow-select", "arrow-ipc"] # Enable support for arrow canonical extension types arrow_canonical_extension_types = ["arrow-schema?/canonical_extension_types"] # Enable CLI tools diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 979988eebc05..048d64272bbe 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -35,6 +35,32 @@ use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_pr use crate::arrow::ArrowSchemaConverter; use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder; +use crate::arrow::hll::HyperLogLog; +use crate::basic::Type as PhysicalType; + +/// File-level KV metadata key that opts in to HyperLogLog-based `distinct_count` +/// estimation for `Int64` columns. When `WriterProperties::key_value_metadata()` +/// contains an entry with this key whose value is `"true"`, the arrow writer +/// feeds every `Int64` array routed to a column chunk into a HyperLogLog sketch +/// (m=256, p=8) and stores the estimate in the chunk's `distinct_count` +/// statistic. The estimate has a standard error of roughly 6.5%; consumers that +/// require an exact count must not rely on it. +/// +/// Has no effect on non-`Int64` columns. +const ICEBERG_ESTIMATE_INT64_DISTINCT_COUNT_META_KEY: &str = + "iceberg.estimate-int64-distinct-count"; + +fn estimate_int64_distinct_count_enabled(props: &WriterProperties) -> bool { + props + .key_value_metadata() + .map(|kv| { + kv.iter().any(|entry| { + entry.key == ICEBERG_ESTIMATE_INT64_DISTINCT_COUNT_META_KEY + && entry.value.as_deref() == Some("true") + }) + }) + .unwrap_or(false) +} use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter}; use crate::column::page_encryption::PageEncryptor; use crate::column::writer::encoder::ColumnValueEncoder; @@ -863,14 +889,17 @@ impl std::fmt::Debug for ArrowColumnWriter { enum ArrowColumnWriterImpl { ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>), - Column(ColumnWriter<'static>), + Column(ColumnWriter<'static>, Option), } impl ArrowColumnWriter { /// Write an [`ArrowLeafColumn`] pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> { match &mut self.writer { - ArrowColumnWriterImpl::Column(c) => { + ArrowColumnWriterImpl::Column(c, hll) => { + if let Some(hll) = hll.as_mut() { + hll.insert_array(col.0.array().clone())?; + } let leaf = col.0.array(); match leaf.as_any_dictionary_opt() { Some(dictionary) => { @@ -892,7 +921,14 @@ impl ArrowColumnWriter { pub fn close(self) -> Result { let close = match self.writer { ArrowColumnWriterImpl::ByteArray(c) => c.close()?, - ArrowColumnWriterImpl::Column(c) => c.close()?, + ArrowColumnWriterImpl::Column(mut c, hll) => { + if let Some(hll) = hll { + if let ColumnWriter::Int64ColumnWriter(int64) = &mut c { + int64.set_column_distinct_count(Some(hll.count() as u64)); + } + } + c.close()? + } }; let chunk = Arc::try_unwrap(self.chunk).ok().unwrap(); let data = chunk.into_inner().unwrap(); @@ -912,7 +948,7 @@ impl ArrowColumnWriter { pub fn memory_size(&self) -> usize { match &self.writer { ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(), - ArrowColumnWriterImpl::Column(c) => c.memory_size(), + ArrowColumnWriterImpl::Column(c, _) => c.memory_size(), } } @@ -926,7 +962,7 @@ impl ArrowColumnWriter { pub fn get_estimated_total_bytes(&self) -> usize { match &self.writer { ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _, - ArrowColumnWriterImpl::Column(c) => c.get_estimated_total_bytes() as _, + ArrowColumnWriterImpl::Column(c, _) => c.get_estimated_total_bytes() as _, } } } @@ -1134,9 +1170,12 @@ impl ArrowColumnWriterFactory { let page_writer = self.create_page_writer(desc, out.len())?; let chunk = page_writer.buffer.clone(); let writer = get_column_writer(desc.clone(), props.clone(), page_writer); + let hll = (estimate_int64_distinct_count_enabled(props) + && desc.physical_type() == PhysicalType::INT64) + .then(HyperLogLog::new); Ok(ArrowColumnWriter { chunk, - writer: ArrowColumnWriterImpl::Column(writer), + writer: ArrowColumnWriterImpl::Column(writer, hll), }) }; @@ -4955,4 +4994,77 @@ mod tests { let total_rows: i64 = sizes.iter().sum(); assert_eq!(total_rows, 100, "Total rows should be preserved"); } + + fn distinct_count_from_buffer(buffer: Vec, column: usize) -> Option { + let reader = SerializedFileReader::new(Bytes::from(buffer)).unwrap(); + reader + .metadata() + .row_group(0) + .column(column) + .statistics() + .and_then(|s| s.distinct_count_opt()) + } + + fn write_to_buffer(batch: &RecordBatch, props: WriterProperties) -> Vec { + let mut buffer = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap(); + writer.write(batch).unwrap(); + writer.close().unwrap(); + buffer + } + + fn estimate_int64_distinct_count_props() -> WriterProperties { + WriterProperties::builder() + .set_key_value_metadata(Some(vec![KeyValue::new( + ICEBERG_ESTIMATE_INT64_DISTINCT_COUNT_META_KEY.to_owned(), + "true".to_owned(), + )])) + .build() + } + + #[test] + fn estimate_int64_distinct_count_int64() { + let n_distinct: i64 = 10_000; + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])); + let values: Int64Array = (0..n_distinct).collect(); + let batch = RecordBatch::try_new(schema, vec![Arc::new(values)]).unwrap(); + + let buffer = write_to_buffer(&batch, estimate_int64_distinct_count_props()); + + let count = distinct_count_from_buffer(buffer, 0).expect("distinct_count populated"); + let n = n_distinct as u64; + let lower = n - n / 10; + let upper = n + n / 10; + assert!( + count >= lower && count <= upper, + "HLL estimate {count} not within ±10% of {n}", + ); + } + + #[test] + fn estimate_int64_distinct_count_disabled_by_default() { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])); + let values: Int64Array = (0..1_000_i64).collect(); + let batch = RecordBatch::try_new(schema, vec![Arc::new(values)]).unwrap(); + + let buffer = write_to_buffer(&batch, WriterProperties::builder().build()); + assert_eq!(distinct_count_from_buffer(buffer, 0), None); + } + + #[test] + fn estimate_int64_distinct_count_only_int64() { + let schema = Arc::new(Schema::new(vec![ + Field::new("i32", DataType::Int32, false), + Field::new("i64", DataType::Int64, false), + ])); + let i32_values = Int32Array::from((0..500).collect::>()); + let i64_values: Int64Array = (0..500_i64).collect(); + let batch = + RecordBatch::try_new(schema, vec![Arc::new(i32_values), Arc::new(i64_values)]).unwrap(); + + let buffer = write_to_buffer(&batch, estimate_int64_distinct_count_props()); + + assert_eq!(distinct_count_from_buffer(buffer.clone(), 0), None); + assert!(distinct_count_from_buffer(buffer, 1).is_some()); + } } diff --git a/parquet/src/arrow/hll.rs b/parquet/src/arrow/hll.rs new file mode 100644 index 000000000000..44326c543cb9 --- /dev/null +++ b/parquet/src/arrow/hll.rs @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Approximate distinct-value counting via the [HyperLogLog] algorithm. +//! +//! [HyperLogLog]: https://en.wikipedia.org/wiki/HyperLogLog + +use arrow_array::{ + ArrayRef, PrimitiveArray, UInt8Array, + cast::as_primitive_array, + types::{Int64Type, UInt8Type, UInt64Type}, +}; +use twox_hash::XxHash64; + +use crate::errors::ParquetError; + +const HLL_HASH_SEED: u64 = 0; + +/// HyperLogLog sketch with `m = 256` registers (precision `p = 8`). +/// +/// Each `Int64` value inserted is hashed with xxHash64; the top 8 bits select +/// the register and the remaining 56 bits contribute their *rank* — the +/// 1-indexed position of the leftmost 1-bit, i.e. `leading_zeros + 1`. Each +/// register tracks the maximum rank observed for its bucket, and +/// [`count`](Self::count) derives a cardinality estimate from those values. +pub struct HyperLogLog { + registers: [u8; 256], +} + +impl Default for HyperLogLog { + fn default() -> Self { + Self::new() + } +} + +impl HyperLogLog { + /// Create an empty sketch with all registers set to zero. + pub fn new() -> Self { + Self { + registers: [0; 256], + } + } + + /// Insert every non-null value of `array` (must be `Int64`) into the sketch. + /// + /// Each value is hashed once; for every register `i` the new maximum + /// rank among hashes routed to bucket `i` is merged into `registers[i]`. + /// Returns an error if `array` is not an `Int64Array` or if the underlying + /// Arrow compute kernels fail. + pub fn insert_array(&mut self, array: ArrayRef) -> Result<(), ParquetError> { + let array: &PrimitiveArray = as_primitive_array(&array); + let hashes: PrimitiveArray = + array.unary(|v| XxHash64::oneshot(HLL_HASH_SEED, &v.to_le_bytes())); + let indices: UInt8Array = hashes.unary(|h| (h >> 56) as u8); + let ranks: UInt8Array = hashes.unary(|h| (h << 8).leading_zeros() as u8 + 1); + + for i in 0..=255u8 { + let scalar = UInt8Array::new_scalar(i); + let mask = arrow_ord::cmp::eq(&indices, &scalar)?; + let filtered = arrow_select::filter::filter(&ranks, &mask)?; + let filtered: &PrimitiveArray = as_primitive_array(&filtered); + if let Some(max_rank) = arrow_arith::aggregate::max(filtered) { + let register = &mut self.registers[i as usize]; + if max_rank > *register { + *register = max_rank; + } + } + } + Ok(()) + } + + /// Estimate the number of distinct values inserted so far. + /// + /// Uses the standard HyperLogLog estimator + /// `E = α_m · m² / Σ 2^(-M[j])` (harmonic-mean form) with the + /// bias-correction constant `α_m` for `m = 256`. When the raw estimate is + /// small (`≤ 2.5 m`) and at least one register is still zero, falls back to + /// linear counting `m · ln(m / V)`, which is more accurate at low + /// cardinalities. See Flajolet et al., *HyperLogLog: the analysis of a + /// near-optimal cardinality estimation algorithm* (2007). + pub fn count(&self) -> f64 { + const M: usize = 256; + const M_F: f64 = M as f64; + const ALPHA: f64 = 0.7213 / (1.0 + 1.079 / M_F); + + let z: f64 = self.registers.iter().map(|&r| (-(r as f64)).exp2()).sum(); + let raw = ALPHA * M_F * M_F / z; + + let zeros = self.registers.iter().filter(|&&r| r == 0).count(); + if raw <= 2.5 * M_F && zeros > 0 { + M_F * (M_F / zeros as f64).ln() + } else { + raw + } + } +} diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs index 52152988166f..7c0b542d66d9 100644 --- a/parquet/src/arrow/mod.rs +++ b/parquet/src/arrow/mod.rs @@ -185,6 +185,8 @@ pub mod arrow_writer; mod buffer; mod decoder; +pub mod hll; + #[cfg(feature = "async")] pub mod async_reader; #[cfg(feature = "async")] diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index c014397f132e..f2bc8527a792 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -424,6 +424,20 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { } } + /// Override the `distinct_count` value that will be written into this + /// column chunk's statistics. + /// + /// Normally [`Self::write_batch_with_statistics`] only honors a + /// `distinct_count` argument on the very first batch written to a chunk, + /// and clears it on subsequent writes. This setter writes the given value + /// directly into the chunk's accumulated metrics so callers that compute + /// the count externally (for example via a HyperLogLog sketch fed by every + /// batch) can install the final value at any point before + /// [`Self::close`]. + pub fn set_column_distinct_count(&mut self, count: Option) { + self.column_metrics.column_distinct_count = count; + } + #[allow(clippy::too_many_arguments)] pub(crate) fn write_batch_internal( &mut self,