Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions clippy.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
disallowed-methods = [
{ path = "tokio::task::spawn", reason = "To provide cancel-safety, use `SpawnedTask::spawn` instead (https://github.com/apache/datafusion/issues/6513)" },
{ path = "tokio::task::spawn_blocking", reason = "To provide cancel-safety, use `SpawnedTask::spawn_blocking` instead (https://github.com/apache/datafusion/issues/6513)" },
{ path = "std::vec::Vec::reserve", reason = "Use `Vec::try_reserve` so allocation failures can be reported instead of panicking", replacement = "try_reserve" },
]

disallowed-types = [
Expand Down
9 changes: 7 additions & 2 deletions datafusion/functions-aggregate/src/median.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use arrow::datatypes::{

use datafusion_common::types::{NativeType, logical_float64};
use datafusion_common::{
DataFusionError, Result, ScalarValue, assert_eq_or_internal_err,
DataFusionError, Result, ScalarValue, assert_eq_or_internal_err, exec_datafusion_err,
internal_datafusion_err,
};
use datafusion_expr::function::StateFieldsArgs;
Expand Down Expand Up @@ -282,7 +282,12 @@ impl<T: ArrowNumericType> Accumulator for MedianAccumulator<T> {

fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
let values = values[0].as_primitive::<T>();
self.all_values.reserve(values.len() - values.null_count());
let additional = values.len() - values.null_count();
self.all_values.try_reserve(additional).map_err(|e| {
exec_datafusion_err!(
"failed to reserve {additional} values for median accumulator: {e}"
)
})?;
self.all_values.extend(values.iter().flatten());
Ok(())
}
Expand Down
10 changes: 8 additions & 2 deletions datafusion/functions-aggregate/src/percentile_cont.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ use datafusion_functions_aggregate_common::noop_accumulator::NoopAccumulator;

use crate::min_max::{max_udaf, min_udaf};
use datafusion_common::{
Result, ScalarValue, internal_datafusion_err, utils::take_function_args,
Result, ScalarValue, exec_datafusion_err, internal_datafusion_err,
utils::take_function_args,
};
use datafusion_expr::utils::format_state_name;
use datafusion_expr::{
Expand Down Expand Up @@ -420,7 +421,12 @@ where

fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
let values = values[0].as_primitive::<T>();
self.all_values.reserve(values.len() - values.null_count());
let additional = values.len() - values.null_count();
self.all_values.try_reserve(additional).map_err(|e| {
exec_datafusion_err!(
"failed to reserve {additional} values for percentile_cont accumulator: {e}"
)
})?;
self.all_values.extend(values.iter().flatten());
Ok(())
}
Expand Down
4 changes: 4 additions & 0 deletions datafusion/functions/src/string/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,10 @@ fn case_conversion_utf8view_ascii_inner<F: Fn(&u8) -> u8>(
block_size = block_size.saturating_mul(2);
}
let to_reserve = len.max(block_size as usize);
#[expect(
clippy::disallowed_methods,
reason = "StringView block_size bounds growth, so reserve cannot overflow capacity"
)]
in_progress.reserve(to_reserve);
}

Expand Down
8 changes: 8 additions & 0 deletions datafusion/functions/src/strings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,10 @@ impl StringViewArrayBuilder {
if self.in_progress.capacity() < required_cap {
self.flush_in_progress();
let to_reserve = (length as usize).max(self.next_block_size() as usize);
#[expect(
clippy::disallowed_methods,
reason = "StringView block_size bounds growth, so reserve cannot overflow capacity"
)]
self.in_progress.reserve(to_reserve);
}

Expand Down Expand Up @@ -730,6 +734,10 @@ impl StringViewArrayBuilder {
if self.in_progress.capacity() < required_cap {
self.flush_in_progress();
let to_reserve = (length as usize).max(self.next_block_size() as usize);
#[expect(
clippy::disallowed_methods,
reason = "StringView block_size bounds growth, so reserve cannot overflow capacity"
)]
self.in_progress.reserve(to_reserve);
}
}
Expand Down
13 changes: 11 additions & 2 deletions datafusion/physical-plan/src/recursive_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ use arrow::datatypes::{Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion_common::tree_node::TreeNodeRecursion;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{Result, internal_datafusion_err, not_impl_err};
use datafusion_common::{
Result, exec_datafusion_err, internal_datafusion_err, not_impl_err,
};
use datafusion_execution::TaskContext;
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_physical_expr::PhysicalExpr;
Expand Down Expand Up @@ -489,7 +491,14 @@ impl DistinctDeduplicator {
/// We also detect duplicates by enforcing that group ids are increasing.
fn deduplicate(&mut self, batch: &RecordBatch) -> Result<RecordBatch> {
let size_before = self.group_values.len();
self.intern_output_buffer.reserve(batch.num_rows());
let additional = batch.num_rows();
self.intern_output_buffer
.try_reserve(additional)
.map_err(|e| {
exec_datafusion_err!(
"failed to reserve {additional} recursive query group ids: {e}"
)
})?;
self.group_values
.intern(batch.columns(), &mut self.intern_output_buffer)?;
let mask = new_groups_mask(&self.intern_output_buffer, size_before);
Expand Down
12 changes: 10 additions & 2 deletions datafusion/spark/src/function/math/hex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use datafusion_common::utils::take_function_args;
use datafusion_common::{
DataFusionError,
cast::{as_binary_array, as_fixed_size_binary_array, as_int64_array},
exec_err,
exec_datafusion_err, exec_err,
};
use datafusion_expr::{
Coercion, ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature,
Expand Down Expand Up @@ -178,7 +178,15 @@ where
if let Some(b) = v {
let bytes = b.as_ref();
buffer.clear();
buffer.reserve(bytes.len() * 2);
let additional = bytes
.len()
.checked_mul(2)
.ok_or_else(|| exec_datafusion_err!("hex output size overflow"))?;
buffer.try_reserve(additional).map_err(|e| {
exec_datafusion_err!(
"failed to reserve {additional} bytes for hex output: {e}"
)
})?;
for &byte in bytes {
buffer.extend_from_slice(&lookup[byte as usize]);
}
Expand Down
11 changes: 9 additions & 2 deletions datafusion/spark/src/function/math/unhex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ use datafusion_common::cast::{
};
use datafusion_common::types::logical_string;
use datafusion_common::utils::take_function_args;
use datafusion_common::{DataFusionError, Result, ScalarValue, exec_err};
use datafusion_common::{
DataFusionError, Result, ScalarValue, exec_datafusion_err, exec_err,
};
use datafusion_expr::{
Coercion, ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature,
TypeSignatureClass, Volatility,
Expand Down Expand Up @@ -125,7 +127,12 @@ where
for v in iter {
if let Some(s) = v {
buffer.clear();
buffer.reserve(s.as_ref().len().div_ceil(2));
let additional = s.as_ref().len().div_ceil(2);
buffer.try_reserve(additional).map_err(|e| {
exec_datafusion_err!(
"failed to reserve {additional} bytes for unhex output: {e}"
)
})?;
if unhex_common(s.as_ref().as_bytes(), &mut buffer) {
builder.append_value(&buffer);
} else {
Expand Down
Loading