Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions qdp/qdp-core/src/gpu/buffer_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ use crate::gpu::pool_metrics::PoolMetrics;

/// Handle that automatically returns a buffer to the pool on drop.
#[cfg(target_os = "linux")]
pub struct PinnedBufferHandle {
buffer: Option<PinnedHostBuffer>,
pool: Arc<PinnedBufferPool>,
pub struct PinnedBufferHandle<T: Copy = f64> {
buffer: Option<PinnedHostBuffer<T>>,
pool: Arc<PinnedBufferPool<T>>,
}

#[cfg(target_os = "linux")]
impl std::ops::Deref for PinnedBufferHandle {
type Target = PinnedHostBuffer;
impl<T: Copy> std::ops::Deref for PinnedBufferHandle<T> {
type Target = PinnedHostBuffer<T>;

fn deref(&self) -> &Self::Target {
self.buffer
Expand All @@ -45,7 +45,7 @@ impl std::ops::Deref for PinnedBufferHandle {
}

#[cfg(target_os = "linux")]
impl std::ops::DerefMut for PinnedBufferHandle {
impl<T: Copy> std::ops::DerefMut for PinnedBufferHandle<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.buffer
.as_mut()
Expand All @@ -54,7 +54,7 @@ impl std::ops::DerefMut for PinnedBufferHandle {
}

#[cfg(target_os = "linux")]
impl Drop for PinnedBufferHandle {
impl<T: Copy> Drop for PinnedBufferHandle<T> {
fn drop(&mut self) {
if let Some(buf) = self.buffer.take() {
let mut free = self.pool.lock_free();
Expand All @@ -66,16 +66,16 @@ impl Drop for PinnedBufferHandle {

/// Pool of pinned host buffers sized for a fixed batch shape.
#[cfg(target_os = "linux")]
pub struct PinnedBufferPool {
free: Mutex<Vec<PinnedHostBuffer>>,
pub struct PinnedBufferPool<T: Copy = f64> {
free: Mutex<Vec<PinnedHostBuffer<T>>>,
available_cv: Condvar,
capacity: usize,
elements_per_buffer: usize,
}

#[cfg(target_os = "linux")]
impl PinnedBufferPool {
/// Create a pool with `pool_size` pinned buffers, each sized for `elements_per_buffer` f64 values.
impl<T: Copy> PinnedBufferPool<T> {
/// Create a pool with `pool_size` pinned buffers, each sized for `elements_per_buffer` values of `T`.
pub fn new(pool_size: usize, elements_per_buffer: usize) -> Result<Arc<Self>> {
if pool_size == 0 {
return Err(MahoutError::InvalidInput(
Expand All @@ -90,7 +90,7 @@ impl PinnedBufferPool {

let mut buffers = Vec::with_capacity(pool_size);
for _ in 0..pool_size {
buffers.push(PinnedHostBuffer::new(elements_per_buffer)?);
buffers.push(PinnedHostBuffer::<T>::new(elements_per_buffer)?);
}

Ok(Arc::new(Self {
Expand All @@ -101,15 +101,15 @@ impl PinnedBufferPool {
}))
}

fn lock_free(&self) -> MutexGuard<'_, Vec<PinnedHostBuffer>> {
fn lock_free(&self) -> MutexGuard<'_, Vec<PinnedHostBuffer<T>>> {
// Ignore poisoning to keep the pool usable after a panic elsewhere.
self.free
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner())
}

/// Acquire a pinned buffer, blocking until one is available.
pub fn acquire(self: &Arc<Self>) -> PinnedBufferHandle {
pub fn acquire(self: &Arc<Self>) -> PinnedBufferHandle<T> {
self.acquire_with_metrics(None)
}

Expand All @@ -123,7 +123,7 @@ impl PinnedBufferPool {
pub fn acquire_with_metrics(
self: &Arc<Self>,
metrics: Option<&PoolMetrics>,
) -> PinnedBufferHandle {
) -> PinnedBufferHandle<T> {
let mut free = self.lock_free();

// Record available count while holding the lock to avoid TOCTOU race condition
Expand Down Expand Up @@ -161,7 +161,7 @@ impl PinnedBufferPool {
///
/// Returns `None` if the pool is currently empty; callers can choose to spin/wait
/// or fall back to synchronous paths.
pub fn try_acquire(self: &Arc<Self>) -> Option<PinnedBufferHandle> {
pub fn try_acquire(self: &Arc<Self>) -> Option<PinnedBufferHandle<T>> {
let mut free = self.lock_free();
free.pop().map(|buffer| PinnedBufferHandle {
buffer: Some(buffer),
Expand Down
215 changes: 139 additions & 76 deletions qdp/qdp-core/src/gpu/encodings/angle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,26 @@ impl QuantumEncoder for AngleEncoder {
) -> Result<GpuStateVector> {
crate::profile_scope!("AngleEncoder::encode_batch");

if num_samples == 0 {
return Err(MahoutError::InvalidInput(
"Number of samples cannot be zero".into(),
));
}
if sample_size == 0 {
return Err(MahoutError::InvalidInput(
"Sample size cannot be zero".into(),
));
}
if sample_size != num_qubits {
return Err(MahoutError::InvalidInput(format!(
"Angle encoding expects sample_size={} (one angle per qubit), got {}",
num_qubits, sample_size
)));
}

if batch_data.len() != num_samples * sample_size {
let expected_len = num_samples
.checked_mul(sample_size)
.ok_or_else(|| MahoutError::InvalidInput("Angle batch size overflow".to_string()))?;
if batch_data.len() != expected_len {
return Err(MahoutError::InvalidInput(format!(
"Batch data length {} doesn't match num_samples {} * sample_size {}",
batch_data.len(),
Expand Down Expand Up @@ -235,6 +247,18 @@ impl QuantumEncoder for AngleEncoder {
validate_qubit_count(num_qubits)?;
let state_len = 1 << num_qubits;
let angles_d = input_d as *const f64;
{
crate::profile_scope!("GPU::AngleFiniteCheck");
unsafe {
crate::gpu::validation::assert_all_finite_f64(
device,
angles_d,
input_len,
stream,
"Angle encoding",
)?;
}
}
let state_vector = {
crate::profile_scope!("GPU::Alloc");
GpuStateVector::new(device, num_qubits, Precision::Float64)?
Expand Down Expand Up @@ -280,6 +304,11 @@ impl QuantumEncoder for AngleEncoder {
num_qubits: usize,
stream: *mut c_void,
) -> Result<GpuStateVector> {
if num_samples == 0 {
return Err(MahoutError::InvalidInput(
"Number of samples cannot be zero".into(),
));
}
if sample_size == 0 {
return Err(MahoutError::InvalidInput(
"Sample size cannot be zero".into(),
Expand All @@ -294,47 +323,19 @@ impl QuantumEncoder for AngleEncoder {
validate_qubit_count(num_qubits)?;
let state_len = 1 << num_qubits;
let input_batch_d = input_batch_d as *const f64;
let angle_validation_buffer = {
let total_angles = num_samples
.checked_mul(sample_size)
.ok_or_else(|| MahoutError::InvalidInput("Angle batch size overflow".to_string()))?;
{
crate::profile_scope!("GPU::AngleFiniteCheckBatch");
use cudarc::driver::DevicePtrMut;
let mut buffer = device.alloc_zeros::<f64>(num_samples).map_err(|e| {
MahoutError::MemoryAllocation(format!(
"Failed to allocate angle validation buffer: {:?}",
e
))
})?;
let ret = unsafe {
qdp_kernels::launch_l2_norm_batch(
unsafe {
crate::gpu::validation::assert_all_finite_f64(
device,
input_batch_d,
num_samples,
sample_size,
*buffer.device_ptr_mut() as *mut f64,
total_angles,
stream,
)
};
if ret != 0 {
return Err(MahoutError::KernelLaunch(format!(
"Angle validation norm kernel failed with CUDA error code: {} ({})",
ret,
cuda_error_to_string(ret)
)));
}
buffer
};
{
crate::profile_scope!("GPU::AngleFiniteValidationHostCopy");
let host_norms = device
.dtoh_sync_copy(&angle_validation_buffer)
.map_err(|e| {
MahoutError::Cuda(format!(
"Failed to copy angle validation norms to host: {:?}",
e
))
})?;
if host_norms.iter().any(|v| !v.is_finite()) {
return Err(MahoutError::InvalidInput(
"Angle encoding batch contains non-finite values (NaN or Inf)".to_string(),
));
"Angle encoding",
)?;
}
}
let batch_state_vector = {
Expand Down Expand Up @@ -426,6 +427,22 @@ impl QuantumEncoder for AngleEncoder {
}

let state_len = 1 << num_qubits;

// For large batches, stream through dual-stream pipeline to overlap
// H2D copy with kernel compute. Threshold matches the f64 path (1MB
// worth of elements), scaled for f32's smaller element size.
const ASYNC_THRESHOLD_ELEMENTS: usize = 1024 * 1024 / std::mem::size_of::<f32>(); // 1MB
if batch_data.len() >= ASYNC_THRESHOLD_ELEMENTS {
return Self::encode_batch_async_pipeline_f32(
device,
batch_data,
num_samples,
sample_size,
num_qubits,
state_len,
);
}

let batch_state_vector = {
crate::profile_scope!("GPU::AllocBatchF32");
GpuStateVector::new_batch(device, num_samples, num_qubits, Precision::Float32)?
Expand Down Expand Up @@ -510,46 +527,16 @@ impl QuantumEncoder for AngleEncoder {
let total_angles = num_samples
.checked_mul(sample_size)
.ok_or_else(|| MahoutError::InvalidInput("Angle batch size overflow".to_string()))?;
let angle_validation_buffer = {
{
crate::profile_scope!("GPU::AngleFiniteCheckBatchF32");
use cudarc::driver::DevicePtrMut;
let mut buffer = device.alloc_zeros::<i32>(1).map_err(|e| {
MahoutError::MemoryAllocation(format!(
"Failed to allocate angle validation buffer: {:?}",
e
))
})?;
let ret = unsafe {
qdp_kernels::launch_check_finite_batch_f32(
unsafe {
crate::gpu::validation::assert_all_finite_f32(
device,
input_batch_d,
total_angles,
*buffer.device_ptr_mut() as *mut i32,
stream,
)
};
if ret != 0 {
return Err(MahoutError::KernelLaunch(format!(
"Angle finite validation kernel (f32) failed with CUDA error code: {} ({})",
ret,
cuda_error_to_string(ret)
)));
}
buffer
};
{
crate::profile_scope!("GPU::AngleFiniteValidationHostCopyF32");
let host_flags = device
.dtoh_sync_copy(&angle_validation_buffer)
.map_err(|e| {
MahoutError::Cuda(format!(
"Failed to copy angle validation flags to host: {:?}",
e
))
})?;
if host_flags.first().copied().unwrap_or_default() != 0 {
return Err(MahoutError::InvalidInput(
"Angle encoding batch contains non-finite values (NaN or Inf)".to_string(),
));
"Angle encoding",
)?;
}
}
let batch_state_vector = {
Expand Down Expand Up @@ -649,6 +636,18 @@ impl AngleEncoder {

validate_qubit_count(num_qubits)?;
let state_len = 1 << num_qubits;
{
crate::profile_scope!("GPU::AngleFiniteCheckF32");
unsafe {
crate::gpu::validation::assert_all_finite_f32(
device,
input_d,
input_len,
stream,
"Angle encoding",
)?;
}
}
let state_vector = {
crate::profile_scope!("GPU::Alloc");
GpuStateVector::new(device, num_qubits, Precision::Float32)?
Expand Down Expand Up @@ -780,4 +779,68 @@ impl AngleEncoder {

Ok(batch_state_vector)
}

#[cfg(target_os = "linux")]
fn encode_batch_async_pipeline_f32(
device: &Arc<CudaDevice>,
batch_data: &[f32],
num_samples: usize,
sample_size: usize,
num_qubits: usize,
state_len: usize,
) -> Result<GpuStateVector> {
let batch_state_vector = {
crate::profile_scope!("GPU::AllocBatchF32");
GpuStateVector::new_batch(device, num_samples, num_qubits, Precision::Float32)?
};

let state_ptr = batch_state_vector.ptr_f32().ok_or_else(|| {
MahoutError::InvalidInput(
"Batch state vector precision mismatch (expected float32 buffer)".to_string(),
)
})?;

crate::gpu::pipeline::run_dual_stream_pipeline_aligned_f32(
device,
batch_data,
sample_size,
|stream, input_ptr, chunk_offset, chunk_len| {
if chunk_len % sample_size != 0 || chunk_offset % sample_size != 0 {
return Err(MahoutError::InvalidInput(
"Angle batch chunk is not aligned to sample size".to_string(),
));
}

let chunk_samples = chunk_len / sample_size;
let sample_offset = chunk_offset / sample_size;
let offset_elements = sample_offset.checked_mul(state_len).ok_or_else(|| {
MahoutError::InvalidInput("Angle batch output offset overflow".to_string())
})?;

let state_ptr_offset = unsafe { state_ptr.add(offset_elements) as *mut c_void };
let ret = unsafe {
qdp_kernels::launch_angle_encode_batch_f32(
input_ptr,
state_ptr_offset,
chunk_samples,
state_len,
num_qubits as u32,
stream.stream as *mut c_void,
)
};

if ret != 0 {
return Err(MahoutError::KernelLaunch(format!(
"Batch angle encoding kernel (f32) failed: {} ({})",
ret,
cuda_error_to_string(ret)
)));
}

Ok(())
},
)?;

Ok(batch_state_vector)
}
}
Loading
Loading