Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ ring = { version = "0.17", default-features = false, features = ["wasm32_unknown
ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] }

[dependencies]
arrow-arith = { workspace = true, optional = true }
arrow-array = { workspace = true, optional = true }
arrow-buffer = { workspace = true, optional = true }
arrow-csv = { workspace = true, optional = true }
arrow-data = { workspace = true, optional = true }
arrow-ord = { workspace = true, optional = true }
arrow-schema = { workspace = true, optional = true }
arrow-select = { workspace = true, optional = true }
arrow-ipc = { workspace = true, optional = true }
Expand All @@ -49,7 +51,7 @@ parquet-variant = { workspace = true, optional = true }
parquet-variant-json = { workspace = true, optional = true }
parquet-variant-compute = { workspace = true, optional = true }

object_store = { version = "0.13.1", default-features = false, optional = true }
object_store = { version = "0.13.1", default-features = false, optional = true, features = ["tokio"] }

bytes = { version = "1.1", default-features = false, features = ["std"] }
thrift = { version = "0.17", default-features = false }
Expand Down Expand Up @@ -104,7 +106,7 @@ default = ["arrow", "snap", "brotli", "flate2-zlib-rs", "lz4", "zstd", "base64",
# Enable lz4
lz4 = ["lz4_flex"]
# Enable arrow reader/writer APIs
arrow = ["base64", "arrow-array", "arrow-buffer", "arrow-data", "arrow-schema", "arrow-select", "arrow-ipc"]
arrow = ["base64", "arrow-arith", "arrow-array", "arrow-buffer", "arrow-data", "arrow-ord", "arrow-schema", "arrow-select", "arrow-ipc"]
# Enable support for arrow canonical extension types
arrow_canonical_extension_types = ["arrow-schema?/canonical_extension_types"]
# Enable CLI tools
Expand Down
124 changes: 118 additions & 6 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,32 @@ use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_pr

use crate::arrow::ArrowSchemaConverter;
use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
use crate::arrow::hll::HyperLogLog;
use crate::basic::Type as PhysicalType;

/// File-level KV metadata key that opts in to HyperLogLog-based `distinct_count`
/// estimation for `Int64` columns. When `WriterProperties::key_value_metadata()`
/// contains an entry with this key whose value is `"true"`, the arrow writer
/// feeds every `Int64` array routed to a column chunk into a HyperLogLog sketch
/// (m=256, p=8) and stores the estimate in the chunk's `distinct_count`
/// statistic. The estimate has a standard error of roughly 6.5%; consumers that
/// require an exact count must not rely on it.
///
/// Has no effect on non-`Int64` columns.
const ICEBERG_ESTIMATE_INT64_DISTINCT_COUNT_META_KEY: &str =
"iceberg.estimate-int64-distinct-count";

fn estimate_int64_distinct_count_enabled(props: &WriterProperties) -> bool {
props
.key_value_metadata()
.map(|kv| {
kv.iter().any(|entry| {
entry.key == ICEBERG_ESTIMATE_INT64_DISTINCT_COUNT_META_KEY
&& entry.value.as_deref() == Some("true")
})
})
.unwrap_or(false)
}
use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
use crate::column::page_encryption::PageEncryptor;
use crate::column::writer::encoder::ColumnValueEncoder;
Expand Down Expand Up @@ -863,14 +889,17 @@ impl std::fmt::Debug for ArrowColumnWriter {

enum ArrowColumnWriterImpl {
ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
Column(ColumnWriter<'static>),
Column(ColumnWriter<'static>, Option<HyperLogLog>),
}

impl ArrowColumnWriter {
/// Write an [`ArrowLeafColumn`]
pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
match &mut self.writer {
ArrowColumnWriterImpl::Column(c) => {
ArrowColumnWriterImpl::Column(c, hll) => {
if let Some(hll) = hll.as_mut() {
hll.insert_array(col.0.array().clone())?;
}
let leaf = col.0.array();
match leaf.as_any_dictionary_opt() {
Some(dictionary) => {
Expand All @@ -892,7 +921,14 @@ impl ArrowColumnWriter {
pub fn close(self) -> Result<ArrowColumnChunk> {
let close = match self.writer {
ArrowColumnWriterImpl::ByteArray(c) => c.close()?,
ArrowColumnWriterImpl::Column(c) => c.close()?,
ArrowColumnWriterImpl::Column(mut c, hll) => {
if let Some(hll) = hll {
if let ColumnWriter::Int64ColumnWriter(int64) = &mut c {
int64.set_column_distinct_count(Some(hll.count() as u64));
}
}
c.close()?
}
};
let chunk = Arc::try_unwrap(self.chunk).ok().unwrap();
let data = chunk.into_inner().unwrap();
Expand All @@ -912,7 +948,7 @@ impl ArrowColumnWriter {
pub fn memory_size(&self) -> usize {
match &self.writer {
ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(),
ArrowColumnWriterImpl::Column(c) => c.memory_size(),
ArrowColumnWriterImpl::Column(c, _) => c.memory_size(),
}
}

Expand All @@ -926,7 +962,7 @@ impl ArrowColumnWriter {
pub fn get_estimated_total_bytes(&self) -> usize {
match &self.writer {
ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _,
ArrowColumnWriterImpl::Column(c) => c.get_estimated_total_bytes() as _,
ArrowColumnWriterImpl::Column(c, _) => c.get_estimated_total_bytes() as _,
}
}
}
Expand Down Expand Up @@ -1134,9 +1170,12 @@ impl ArrowColumnWriterFactory {
let page_writer = self.create_page_writer(desc, out.len())?;
let chunk = page_writer.buffer.clone();
let writer = get_column_writer(desc.clone(), props.clone(), page_writer);
let hll = (estimate_int64_distinct_count_enabled(props)
&& desc.physical_type() == PhysicalType::INT64)
.then(HyperLogLog::new);
Ok(ArrowColumnWriter {
chunk,
writer: ArrowColumnWriterImpl::Column(writer),
writer: ArrowColumnWriterImpl::Column(writer, hll),
})
};

Expand Down Expand Up @@ -4955,4 +4994,77 @@ mod tests {
let total_rows: i64 = sizes.iter().sum();
assert_eq!(total_rows, 100, "Total rows should be preserved");
}

fn distinct_count_from_buffer(buffer: Vec<u8>, column: usize) -> Option<u64> {
let reader = SerializedFileReader::new(Bytes::from(buffer)).unwrap();
reader
.metadata()
.row_group(0)
.column(column)
.statistics()
.and_then(|s| s.distinct_count_opt())
}

fn write_to_buffer(batch: &RecordBatch, props: WriterProperties) -> Vec<u8> {
let mut buffer = Vec::new();
let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
writer.write(batch).unwrap();
writer.close().unwrap();
buffer
}

fn estimate_int64_distinct_count_props() -> WriterProperties {
WriterProperties::builder()
.set_key_value_metadata(Some(vec![KeyValue::new(
ICEBERG_ESTIMATE_INT64_DISTINCT_COUNT_META_KEY.to_owned(),
"true".to_owned(),
)]))
.build()
}

#[test]
fn estimate_int64_distinct_count_int64() {
let n_distinct: i64 = 10_000;
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]));
let values: Int64Array = (0..n_distinct).collect();
let batch = RecordBatch::try_new(schema, vec![Arc::new(values)]).unwrap();

let buffer = write_to_buffer(&batch, estimate_int64_distinct_count_props());

let count = distinct_count_from_buffer(buffer, 0).expect("distinct_count populated");
let n = n_distinct as u64;
let lower = n - n / 10;
let upper = n + n / 10;
assert!(
count >= lower && count <= upper,
"HLL estimate {count} not within ±10% of {n}",
);
}

#[test]
fn estimate_int64_distinct_count_disabled_by_default() {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]));
let values: Int64Array = (0..1_000_i64).collect();
let batch = RecordBatch::try_new(schema, vec![Arc::new(values)]).unwrap();

let buffer = write_to_buffer(&batch, WriterProperties::builder().build());
assert_eq!(distinct_count_from_buffer(buffer, 0), None);
}

#[test]
fn estimate_int64_distinct_count_only_int64() {
let schema = Arc::new(Schema::new(vec![
Field::new("i32", DataType::Int32, false),
Field::new("i64", DataType::Int64, false),
]));
let i32_values = Int32Array::from((0..500).collect::<Vec<_>>());
let i64_values: Int64Array = (0..500_i64).collect();
let batch =
RecordBatch::try_new(schema, vec![Arc::new(i32_values), Arc::new(i64_values)]).unwrap();

let buffer = write_to_buffer(&batch, estimate_int64_distinct_count_props());

assert_eq!(distinct_count_from_buffer(buffer.clone(), 0), None);
assert!(distinct_count_from_buffer(buffer, 1).is_some());
}
}
110 changes: 110 additions & 0 deletions parquet/src/arrow/hll.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Approximate distinct-value counting via the [HyperLogLog] algorithm.
//!
//! [HyperLogLog]: https://en.wikipedia.org/wiki/HyperLogLog

use arrow_array::{
ArrayRef, PrimitiveArray, UInt8Array,
cast::as_primitive_array,
types::{Int64Type, UInt8Type, UInt64Type},
};
use twox_hash::XxHash64;

use crate::errors::ParquetError;

const HLL_HASH_SEED: u64 = 0;

/// HyperLogLog sketch with `m = 256` registers (precision `p = 8`).
///
/// Each `Int64` value inserted is hashed with xxHash64; the top 8 bits select
/// the register and the remaining 56 bits contribute their *rank* — the
/// 1-indexed position of the leftmost 1-bit, i.e. `leading_zeros + 1`. Each
/// register tracks the maximum rank observed for its bucket, and
/// [`count`](Self::count) derives a cardinality estimate from those values.
pub struct HyperLogLog {
registers: [u8; 256],
}

impl Default for HyperLogLog {
fn default() -> Self {
Self::new()
}
}

impl HyperLogLog {
/// Create an empty sketch with all registers set to zero.
pub fn new() -> Self {
Self {
registers: [0; 256],
}
}

/// Insert every non-null value of `array` (must be `Int64`) into the sketch.
///
/// Each value is hashed once; for every register `i` the new maximum
/// rank among hashes routed to bucket `i` is merged into `registers[i]`.
/// Returns an error if `array` is not an `Int64Array` or if the underlying
/// Arrow compute kernels fail.
pub fn insert_array(&mut self, array: ArrayRef) -> Result<(), ParquetError> {
let array: &PrimitiveArray<Int64Type> = as_primitive_array(&array);
let hashes: PrimitiveArray<UInt64Type> =
array.unary(|v| XxHash64::oneshot(HLL_HASH_SEED, &v.to_le_bytes()));
let indices: UInt8Array = hashes.unary(|h| (h >> 56) as u8);
let ranks: UInt8Array = hashes.unary(|h| (h << 8).leading_zeros() as u8 + 1);

for i in 0..=255u8 {
let scalar = UInt8Array::new_scalar(i);
let mask = arrow_ord::cmp::eq(&indices, &scalar)?;
let filtered = arrow_select::filter::filter(&ranks, &mask)?;
let filtered: &PrimitiveArray<UInt8Type> = as_primitive_array(&filtered);
if let Some(max_rank) = arrow_arith::aggregate::max(filtered) {
let register = &mut self.registers[i as usize];
if max_rank > *register {
*register = max_rank;
}
}
}
Ok(())
}

/// Estimate the number of distinct values inserted so far.
///
/// Uses the standard HyperLogLog estimator
/// `E = α_m · m² / Σ 2^(-M[j])` (harmonic-mean form) with the
/// bias-correction constant `α_m` for `m = 256`. When the raw estimate is
/// small (`≤ 2.5 m`) and at least one register is still zero, falls back to
/// linear counting `m · ln(m / V)`, which is more accurate at low
/// cardinalities. See Flajolet et al., *HyperLogLog: the analysis of a
/// near-optimal cardinality estimation algorithm* (2007).
pub fn count(&self) -> f64 {
const M: usize = 256;
const M_F: f64 = M as f64;
const ALPHA: f64 = 0.7213 / (1.0 + 1.079 / M_F);

let z: f64 = self.registers.iter().map(|&r| (-(r as f64)).exp2()).sum();
let raw = ALPHA * M_F * M_F / z;

let zeros = self.registers.iter().filter(|&&r| r == 0).count();
if raw <= 2.5 * M_F && zeros > 0 {
M_F * (M_F / zeros as f64).ln()
} else {
raw
}
}
}
2 changes: 2 additions & 0 deletions parquet/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ pub mod arrow_writer;
mod buffer;
mod decoder;

pub mod hll;

#[cfg(feature = "async")]
pub mod async_reader;
#[cfg(feature = "async")]
Expand Down
14 changes: 14 additions & 0 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,20 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
}
}

/// Override the `distinct_count` value that will be written into this
/// column chunk's statistics.
///
/// Normally [`Self::write_batch_with_statistics`] only honors a
/// `distinct_count` argument on the very first batch written to a chunk,
/// and clears it on subsequent writes. This setter writes the given value
/// directly into the chunk's accumulated metrics so callers that compute
/// the count externally (for example via a HyperLogLog sketch fed by every
/// batch) can install the final value at any point before
/// [`Self::close`].
pub fn set_column_distinct_count(&mut self, count: Option<u64>) {
self.column_metrics.column_distinct_count = count;
}

#[allow(clippy::too_many_arguments)]
pub(crate) fn write_batch_internal(
&mut self,
Expand Down
Loading