diff --git a/.github/workflows/python-testing.yml b/.github/workflows/python-testing.yml index 0ca666bf0d..a453f169c9 100644 --- a/.github/workflows/python-testing.yml +++ b/.github/workflows/python-testing.yml @@ -22,17 +22,43 @@ on: paths: - "**.py" - "pyproject.toml" + - "qdp/qdp-core/**/*.rs" + - "qdp/qdp-kernels/**/*.rs" + - "qdp/qdp-python/**/*.rs" + - "qdp/**/Cargo.toml" - ".github/workflows/python-testing.yml" pull_request: branches: [main] paths: - "**.py" - "pyproject.toml" + - "qdp/qdp-core/**/*.rs" + - "qdp/qdp-kernels/**/*.rs" + - "qdp/qdp-python/**/*.rs" + - "qdp/**/Cargo.toml" - ".github/workflows/python-testing.yml" workflow_dispatch: jobs: + rust-check: + # Fast gate: type-check both with and without CUDA stubs so duplicate + # stub definitions or cfg mismatches fail in ~30s instead of surfacing + # during the slower maturin build below. + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v6 + + - name: Install Rust toolchain + uses: dtolnay/rust-toolchain@stable + + - name: Cargo check (no-CUDA stubs) + working-directory: qdp + env: + QDP_NO_CUDA: "1" + run: cargo check --workspace --tests + test: + needs: rust-check runs-on: ubuntu-latest strategy: matrix: diff --git a/qdp/DEVELOPMENT.md b/qdp/DEVELOPMENT.md index fc927022d3..ab30788fc8 100644 --- a/qdp/DEVELOPMENT.md +++ b/qdp/DEVELOPMENT.md @@ -91,6 +91,23 @@ uv run pytest testing/qdp -v uv run pytest testing/qdp_python -v ``` +### Pre-push sanity: no-CUDA build + +CI builds on a runner without `nvcc`, which activates the `qdp_no_cuda` +cfg and swaps every `extern "C"` CUDA launcher for its stub. If you only +ever build locally with CUDA, duplicate stubs or cfg mismatches can slip +through unnoticed. Before pushing Rust / FFI changes, run: + +```bash +cd qdp +QDP_NO_CUDA=1 cargo build --workspace --lib --release +cargo check --workspace --tests +cd .. +``` + +The first command is what `maturin develop --release` runs on CI; the +second verifies tests type-check in the CUDA build. + ## 4. Benchmarks From the repo root, set up and prepare benchmarks: diff --git a/qdp/qdp-core/src/gpu/buffer_pool.rs b/qdp/qdp-core/src/gpu/buffer_pool.rs index cfe7e0ef91..0b0ec24010 100644 --- a/qdp/qdp-core/src/gpu/buffer_pool.rs +++ b/qdp/qdp-core/src/gpu/buffer_pool.rs @@ -28,14 +28,14 @@ use crate::gpu::pool_metrics::PoolMetrics; /// Handle that automatically returns a buffer to the pool on drop. #[cfg(target_os = "linux")] -pub struct PinnedBufferHandle { - buffer: Option, - pool: Arc, +pub struct PinnedBufferHandle { + buffer: Option>, + pool: Arc>, } #[cfg(target_os = "linux")] -impl std::ops::Deref for PinnedBufferHandle { - type Target = PinnedHostBuffer; +impl std::ops::Deref for PinnedBufferHandle { + type Target = PinnedHostBuffer; fn deref(&self) -> &Self::Target { self.buffer @@ -45,7 +45,7 @@ impl std::ops::Deref for PinnedBufferHandle { } #[cfg(target_os = "linux")] -impl std::ops::DerefMut for PinnedBufferHandle { +impl std::ops::DerefMut for PinnedBufferHandle { fn deref_mut(&mut self) -> &mut Self::Target { self.buffer .as_mut() @@ -54,7 +54,7 @@ impl std::ops::DerefMut for PinnedBufferHandle { } #[cfg(target_os = "linux")] -impl Drop for PinnedBufferHandle { +impl Drop for PinnedBufferHandle { fn drop(&mut self) { if let Some(buf) = self.buffer.take() { let mut free = self.pool.lock_free(); @@ -66,16 +66,16 @@ impl Drop for PinnedBufferHandle { /// Pool of pinned host buffers sized for a fixed batch shape. #[cfg(target_os = "linux")] -pub struct PinnedBufferPool { - free: Mutex>, +pub struct PinnedBufferPool { + free: Mutex>>, available_cv: Condvar, capacity: usize, elements_per_buffer: usize, } #[cfg(target_os = "linux")] -impl PinnedBufferPool { - /// Create a pool with `pool_size` pinned buffers, each sized for `elements_per_buffer` f64 values. +impl PinnedBufferPool { + /// Create a pool with `pool_size` pinned buffers, each sized for `elements_per_buffer` values of `T`. pub fn new(pool_size: usize, elements_per_buffer: usize) -> Result> { if pool_size == 0 { return Err(MahoutError::InvalidInput( @@ -90,7 +90,7 @@ impl PinnedBufferPool { let mut buffers = Vec::with_capacity(pool_size); for _ in 0..pool_size { - buffers.push(PinnedHostBuffer::new(elements_per_buffer)?); + buffers.push(PinnedHostBuffer::::new(elements_per_buffer)?); } Ok(Arc::new(Self { @@ -101,7 +101,7 @@ impl PinnedBufferPool { })) } - fn lock_free(&self) -> MutexGuard<'_, Vec> { + fn lock_free(&self) -> MutexGuard<'_, Vec>> { // Ignore poisoning to keep the pool usable after a panic elsewhere. self.free .lock() @@ -109,7 +109,7 @@ impl PinnedBufferPool { } /// Acquire a pinned buffer, blocking until one is available. - pub fn acquire(self: &Arc) -> PinnedBufferHandle { + pub fn acquire(self: &Arc) -> PinnedBufferHandle { self.acquire_with_metrics(None) } @@ -123,7 +123,7 @@ impl PinnedBufferPool { pub fn acquire_with_metrics( self: &Arc, metrics: Option<&PoolMetrics>, - ) -> PinnedBufferHandle { + ) -> PinnedBufferHandle { let mut free = self.lock_free(); // Record available count while holding the lock to avoid TOCTOU race condition @@ -161,7 +161,7 @@ impl PinnedBufferPool { /// /// Returns `None` if the pool is currently empty; callers can choose to spin/wait /// or fall back to synchronous paths. - pub fn try_acquire(self: &Arc) -> Option { + pub fn try_acquire(self: &Arc) -> Option> { let mut free = self.lock_free(); free.pop().map(|buffer| PinnedBufferHandle { buffer: Some(buffer), diff --git a/qdp/qdp-core/src/gpu/encodings/angle.rs b/qdp/qdp-core/src/gpu/encodings/angle.rs index b0c6773ff5..99a9d2998a 100644 --- a/qdp/qdp-core/src/gpu/encodings/angle.rs +++ b/qdp/qdp-core/src/gpu/encodings/angle.rs @@ -123,14 +123,26 @@ impl QuantumEncoder for AngleEncoder { ) -> Result { crate::profile_scope!("AngleEncoder::encode_batch"); + if num_samples == 0 { + return Err(MahoutError::InvalidInput( + "Number of samples cannot be zero".into(), + )); + } + if sample_size == 0 { + return Err(MahoutError::InvalidInput( + "Sample size cannot be zero".into(), + )); + } if sample_size != num_qubits { return Err(MahoutError::InvalidInput(format!( "Angle encoding expects sample_size={} (one angle per qubit), got {}", num_qubits, sample_size ))); } - - if batch_data.len() != num_samples * sample_size { + let expected_len = num_samples + .checked_mul(sample_size) + .ok_or_else(|| MahoutError::InvalidInput("Angle batch size overflow".to_string()))?; + if batch_data.len() != expected_len { return Err(MahoutError::InvalidInput(format!( "Batch data length {} doesn't match num_samples {} * sample_size {}", batch_data.len(), @@ -235,6 +247,18 @@ impl QuantumEncoder for AngleEncoder { validate_qubit_count(num_qubits)?; let state_len = 1 << num_qubits; let angles_d = input_d as *const f64; + { + crate::profile_scope!("GPU::AngleFiniteCheck"); + unsafe { + crate::gpu::validation::assert_all_finite_f64( + device, + angles_d, + input_len, + stream, + "Angle encoding", + )?; + } + } let state_vector = { crate::profile_scope!("GPU::Alloc"); GpuStateVector::new(device, num_qubits, Precision::Float64)? @@ -280,6 +304,11 @@ impl QuantumEncoder for AngleEncoder { num_qubits: usize, stream: *mut c_void, ) -> Result { + if num_samples == 0 { + return Err(MahoutError::InvalidInput( + "Number of samples cannot be zero".into(), + )); + } if sample_size == 0 { return Err(MahoutError::InvalidInput( "Sample size cannot be zero".into(), @@ -294,47 +323,19 @@ impl QuantumEncoder for AngleEncoder { validate_qubit_count(num_qubits)?; let state_len = 1 << num_qubits; let input_batch_d = input_batch_d as *const f64; - let angle_validation_buffer = { + let total_angles = num_samples + .checked_mul(sample_size) + .ok_or_else(|| MahoutError::InvalidInput("Angle batch size overflow".to_string()))?; + { crate::profile_scope!("GPU::AngleFiniteCheckBatch"); - use cudarc::driver::DevicePtrMut; - let mut buffer = device.alloc_zeros::(num_samples).map_err(|e| { - MahoutError::MemoryAllocation(format!( - "Failed to allocate angle validation buffer: {:?}", - e - )) - })?; - let ret = unsafe { - qdp_kernels::launch_l2_norm_batch( + unsafe { + crate::gpu::validation::assert_all_finite_f64( + device, input_batch_d, - num_samples, - sample_size, - *buffer.device_ptr_mut() as *mut f64, + total_angles, stream, - ) - }; - if ret != 0 { - return Err(MahoutError::KernelLaunch(format!( - "Angle validation norm kernel failed with CUDA error code: {} ({})", - ret, - cuda_error_to_string(ret) - ))); - } - buffer - }; - { - crate::profile_scope!("GPU::AngleFiniteValidationHostCopy"); - let host_norms = device - .dtoh_sync_copy(&angle_validation_buffer) - .map_err(|e| { - MahoutError::Cuda(format!( - "Failed to copy angle validation norms to host: {:?}", - e - )) - })?; - if host_norms.iter().any(|v| !v.is_finite()) { - return Err(MahoutError::InvalidInput( - "Angle encoding batch contains non-finite values (NaN or Inf)".to_string(), - )); + "Angle encoding", + )?; } } let batch_state_vector = { @@ -426,6 +427,22 @@ impl QuantumEncoder for AngleEncoder { } let state_len = 1 << num_qubits; + + // For large batches, stream through dual-stream pipeline to overlap + // H2D copy with kernel compute. Threshold matches the f64 path (1MB + // worth of elements), scaled for f32's smaller element size. + const ASYNC_THRESHOLD_ELEMENTS: usize = 1024 * 1024 / std::mem::size_of::(); // 1MB + if batch_data.len() >= ASYNC_THRESHOLD_ELEMENTS { + return Self::encode_batch_async_pipeline_f32( + device, + batch_data, + num_samples, + sample_size, + num_qubits, + state_len, + ); + } + let batch_state_vector = { crate::profile_scope!("GPU::AllocBatchF32"); GpuStateVector::new_batch(device, num_samples, num_qubits, Precision::Float32)? @@ -510,46 +527,16 @@ impl QuantumEncoder for AngleEncoder { let total_angles = num_samples .checked_mul(sample_size) .ok_or_else(|| MahoutError::InvalidInput("Angle batch size overflow".to_string()))?; - let angle_validation_buffer = { + { crate::profile_scope!("GPU::AngleFiniteCheckBatchF32"); - use cudarc::driver::DevicePtrMut; - let mut buffer = device.alloc_zeros::(1).map_err(|e| { - MahoutError::MemoryAllocation(format!( - "Failed to allocate angle validation buffer: {:?}", - e - )) - })?; - let ret = unsafe { - qdp_kernels::launch_check_finite_batch_f32( + unsafe { + crate::gpu::validation::assert_all_finite_f32( + device, input_batch_d, total_angles, - *buffer.device_ptr_mut() as *mut i32, stream, - ) - }; - if ret != 0 { - return Err(MahoutError::KernelLaunch(format!( - "Angle finite validation kernel (f32) failed with CUDA error code: {} ({})", - ret, - cuda_error_to_string(ret) - ))); - } - buffer - }; - { - crate::profile_scope!("GPU::AngleFiniteValidationHostCopyF32"); - let host_flags = device - .dtoh_sync_copy(&angle_validation_buffer) - .map_err(|e| { - MahoutError::Cuda(format!( - "Failed to copy angle validation flags to host: {:?}", - e - )) - })?; - if host_flags.first().copied().unwrap_or_default() != 0 { - return Err(MahoutError::InvalidInput( - "Angle encoding batch contains non-finite values (NaN or Inf)".to_string(), - )); + "Angle encoding", + )?; } } let batch_state_vector = { @@ -649,6 +636,18 @@ impl AngleEncoder { validate_qubit_count(num_qubits)?; let state_len = 1 << num_qubits; + { + crate::profile_scope!("GPU::AngleFiniteCheckF32"); + unsafe { + crate::gpu::validation::assert_all_finite_f32( + device, + input_d, + input_len, + stream, + "Angle encoding", + )?; + } + } let state_vector = { crate::profile_scope!("GPU::Alloc"); GpuStateVector::new(device, num_qubits, Precision::Float32)? @@ -780,4 +779,68 @@ impl AngleEncoder { Ok(batch_state_vector) } + + #[cfg(target_os = "linux")] + fn encode_batch_async_pipeline_f32( + device: &Arc, + batch_data: &[f32], + num_samples: usize, + sample_size: usize, + num_qubits: usize, + state_len: usize, + ) -> Result { + let batch_state_vector = { + crate::profile_scope!("GPU::AllocBatchF32"); + GpuStateVector::new_batch(device, num_samples, num_qubits, Precision::Float32)? + }; + + let state_ptr = batch_state_vector.ptr_f32().ok_or_else(|| { + MahoutError::InvalidInput( + "Batch state vector precision mismatch (expected float32 buffer)".to_string(), + ) + })?; + + crate::gpu::pipeline::run_dual_stream_pipeline_aligned_f32( + device, + batch_data, + sample_size, + |stream, input_ptr, chunk_offset, chunk_len| { + if chunk_len % sample_size != 0 || chunk_offset % sample_size != 0 { + return Err(MahoutError::InvalidInput( + "Angle batch chunk is not aligned to sample size".to_string(), + )); + } + + let chunk_samples = chunk_len / sample_size; + let sample_offset = chunk_offset / sample_size; + let offset_elements = sample_offset.checked_mul(state_len).ok_or_else(|| { + MahoutError::InvalidInput("Angle batch output offset overflow".to_string()) + })?; + + let state_ptr_offset = unsafe { state_ptr.add(offset_elements) as *mut c_void }; + let ret = unsafe { + qdp_kernels::launch_angle_encode_batch_f32( + input_ptr, + state_ptr_offset, + chunk_samples, + state_len, + num_qubits as u32, + stream.stream as *mut c_void, + ) + }; + + if ret != 0 { + return Err(MahoutError::KernelLaunch(format!( + "Batch angle encoding kernel (f32) failed: {} ({})", + ret, + cuda_error_to_string(ret) + ))); + } + + Ok(()) + }, + )?; + + Ok(batch_state_vector) + } } diff --git a/qdp/qdp-core/src/gpu/encodings/basis.rs b/qdp/qdp-core/src/gpu/encodings/basis.rs index 494b385af2..1db78cdc15 100644 --- a/qdp/qdp-core/src/gpu/encodings/basis.rs +++ b/qdp/qdp-core/src/gpu/encodings/basis.rs @@ -240,8 +240,21 @@ impl QuantumEncoder for BasisEncoder { input_len ))); } + validate_qubit_count(num_qubits)?; let state_len = 1 << num_qubits; let basis_indices_d = input_d as *const usize; + { + crate::profile_scope!("GPU::BasisIndexBoundsCheck"); + unsafe { + crate::gpu::validation::assert_basis_indices_in_range_usize( + device, + basis_indices_d, + 1, + state_len, + stream, + )?; + } + } let state_vector = { crate::profile_scope!("GPU::Alloc"); GpuStateVector::new(device, num_qubits, Precision::Float64)? @@ -288,14 +301,32 @@ impl QuantumEncoder for BasisEncoder { num_qubits: usize, stream: *mut c_void, ) -> Result { + if num_samples == 0 { + return Err(MahoutError::InvalidInput( + "Number of samples cannot be zero".into(), + )); + } if sample_size != 1 { return Err(MahoutError::InvalidInput(format!( "Basis encoding expects sample_size=1 (one index per sample), got {}", sample_size ))); } + validate_qubit_count(num_qubits)?; let state_len = 1 << num_qubits; let basis_indices_d = input_batch_d as *const usize; + { + crate::profile_scope!("GPU::BasisIndexBoundsCheck"); + unsafe { + crate::gpu::validation::assert_basis_indices_in_range_usize( + device, + basis_indices_d, + num_samples, + state_len, + stream, + )?; + } + } let batch_state_vector = { crate::profile_scope!("GPU::AllocBatch"); GpuStateVector::new_batch(device, num_samples, num_qubits, Precision::Float64)? @@ -343,6 +374,173 @@ impl QuantumEncoder for BasisEncoder { Ok(()) } + #[cfg(target_os = "linux")] + fn encode_batch_f32( + &self, + device: &Arc, + batch_data: &[f32], + num_samples: usize, + sample_size: usize, + num_qubits: usize, + ) -> Result { + crate::profile_scope!("BasisEncoder::encode_batch_f32"); + + if sample_size != 1 { + return Err(MahoutError::InvalidInput(format!( + "Basis encoding expects sample_size=1 (one index per sample), got {}", + sample_size + ))); + } + if batch_data.len() != num_samples { + return Err(MahoutError::InvalidInput(format!( + "Batch data length {} doesn't match num_samples {}", + batch_data.len(), + num_samples + ))); + } + validate_qubit_count(num_qubits)?; + + let state_len = 1 << num_qubits; + + // Convert f32 indices to usize (reuse the same validation as F64) + let basis_indices: Vec = batch_data + .iter() + .enumerate() + .map(|(i, &val)| { + Self::validate_basis_index(val as f64, state_len) + .map_err(|e| MahoutError::InvalidInput(format!("Sample {}: {}", i, e))) + }) + .collect::>>()?; + + let batch_state_vector = { + crate::profile_scope!("GPU::AllocBatchF32"); + GpuStateVector::new_batch(device, num_samples, num_qubits, Precision::Float32)? + }; + + let indices_gpu = { + crate::profile_scope!("GPU::H2D_BasisIndicesF32"); + device.htod_sync_copy(&basis_indices).map_err(|e| { + map_allocation_error( + num_samples * std::mem::size_of::(), + "basis indices f32 upload", + Some(num_qubits), + e, + ) + })? + }; + + let state_ptr = batch_state_vector.ptr_f32().ok_or_else(|| { + MahoutError::InvalidInput( + "Batch state vector precision mismatch (expected float32 buffer)".to_string(), + ) + })?; + + { + crate::profile_scope!("GPU::BatchKernelLaunchF32"); + let ret = unsafe { + qdp_kernels::launch_basis_encode_batch_f32( + *indices_gpu.device_ptr() as *const usize, + state_ptr as *mut c_void, + num_samples, + state_len, + num_qubits as u32, + std::ptr::null_mut(), + ) + }; + if ret != 0 { + return Err(MahoutError::KernelLaunch(format!( + "Batch basis encoding f32 kernel failed: {} ({})", + ret, + cuda_error_to_string(ret) + ))); + } + } + + { + crate::profile_scope!("GPU::Synchronize"); + device + .synchronize() + .map_err(|e| MahoutError::Cuda(format!("Sync failed: {:?}", e)))?; + } + + Ok(batch_state_vector) + } + + #[cfg(target_os = "linux")] + unsafe fn encode_batch_from_gpu_ptr_f32( + &self, + device: &Arc, + input_batch_d: *const c_void, + num_samples: usize, + sample_size: usize, + num_qubits: usize, + stream: *mut c_void, + ) -> Result { + if num_samples == 0 { + return Err(MahoutError::InvalidInput( + "Number of samples cannot be zero".into(), + )); + } + if sample_size != 1 { + return Err(MahoutError::InvalidInput(format!( + "Basis encoding expects sample_size=1 (one index per sample), got {}", + sample_size + ))); + } + validate_qubit_count(num_qubits)?; + let state_len = 1 << num_qubits; + let input_batch_d = input_batch_d as *const f32; + + let indices_gpu = { + crate::profile_scope!("GPU::BasisIndexValidateCastF32"); + unsafe { + crate::gpu::validation::validate_and_cast_basis_indices_f32( + device, + input_batch_d, + num_samples, + state_len, + stream, + )? + } + }; + + let batch_state_vector = { + crate::profile_scope!("GPU::AllocBatchF32"); + GpuStateVector::new_batch(device, num_samples, num_qubits, Precision::Float32)? + }; + let state_ptr = batch_state_vector.ptr_f32().ok_or_else(|| { + MahoutError::InvalidInput( + "Batch state vector precision mismatch (expected float32 buffer)".to_string(), + ) + })?; + + { + crate::profile_scope!("GPU::BatchKernelLaunchF32"); + let ret = unsafe { + qdp_kernels::launch_basis_encode_batch_f32( + *indices_gpu.device_ptr() as *const usize, + state_ptr as *mut c_void, + num_samples, + state_len, + num_qubits as u32, + stream, + ) + }; + if ret != 0 { + return Err(MahoutError::KernelLaunch(format!( + "Batch basis encoding kernel (f32) failed: {} ({})", + ret, + cuda_error_to_string(ret) + ))); + } + } + { + crate::profile_scope!("GPU::Synchronize"); + crate::gpu::cuda_sync::sync_cuda_stream(stream, "CUDA stream synchronize failed")?; + } + Ok(batch_state_vector) + } + fn name(&self) -> &'static str { "basis" } @@ -353,7 +551,70 @@ impl QuantumEncoder for BasisEncoder { } impl BasisEncoder { - /// Validate and convert a f64 value to a valid basis index + /// Encode a batch of basis indices supplied as a device-resident `f32` buffer. + /// + /// Each sample is one index; the kernel validates and casts to `size_t` on the + /// device in a single pass before launching the encode kernel. + /// + /// # Safety + /// The caller must ensure that `input_batch_d` points to at least `num_samples` + /// contiguous `f32` values in GPU-accessible memory and remains valid for the + /// duration of this call. `stream` must be either null or a valid CUDA stream + /// handle associated with `device`. + #[cfg(target_os = "linux")] + pub unsafe fn encode_batch_from_gpu_ptr_f32_with_stream( + device: &Arc, + input_batch_d: *const f32, + num_samples: usize, + sample_size: usize, + num_qubits: usize, + stream: *mut c_void, + ) -> Result { + unsafe { + BasisEncoder.encode_batch_from_gpu_ptr_f32( + device, + input_batch_d as *const c_void, + num_samples, + sample_size, + num_qubits, + stream, + ) + } + } + + /// Encode a single basis index supplied as a device-resident `f32`. + /// + /// Thin wrapper around the batch path with `num_samples = 1`. + /// + /// # Safety + /// `input_d` must point to one valid `f32` in GPU-accessible memory on `device`, + /// remain valid for the duration of this call, and `stream` must be either null + /// or a valid CUDA stream handle associated with `device`. + #[cfg(target_os = "linux")] + pub unsafe fn encode_from_gpu_ptr_f32_with_stream( + device: &Arc, + input_d: *const f32, + num_qubits: usize, + stream: *mut c_void, + ) -> Result { + unsafe { + BasisEncoder.encode_batch_from_gpu_ptr_f32( + device, + input_d as *const c_void, + 1, + 1, + num_qubits, + stream, + ) + } + } + + /// Validate and convert a f64 value to a valid basis index. + // + // Structural debt: the streaming encoder in `qdp-core/src/encoding/basis.rs` + // carries a near-identical host-side validator (with "Sample {idx}:" error + // prefix). These two trait hierarchies (`QuantumEncoder` vs `ChunkEncoder`) + // should be merged so the validator lives in exactly one place. fn validate_basis_index(value: f64, state_len: usize) -> Result { // Check for non-finite values if !value.is_finite() { diff --git a/qdp/qdp-core/src/gpu/encodings/mod.rs b/qdp/qdp-core/src/gpu/encodings/mod.rs index fa6362d4cf..e8e985e642 100644 --- a/qdp/qdp-core/src/gpu/encodings/mod.rs +++ b/qdp/qdp-core/src/gpu/encodings/mod.rs @@ -185,6 +185,14 @@ pub use iqp::IqpEncoder; pub use phase::PhaseEncoder; /// Create encoder by name: "amplitude", "angle", "basis", "iqp", or "iqp-z" +// +// Structural debt: the encoders are unit structs, yet this performs two heap +// allocations per call (String from `to_lowercase`, Box) to dispatch to +// `'static` functions. A future refactor should replace this with a +// `Copy` enum that returns `&'static dyn QuantumEncoder` for zero-alloc +// static dispatch, and unify with `encoding::ChunkEncoder` (which has its own +// parallel implementation of amplitude/angle/basis with a case-sensitive +// dispatcher — see `qdp-core/src/encoding/mod.rs`). pub fn get_encoder(name: &str) -> Result> { match name.to_lowercase().as_str() { "amplitude" => Ok(Box::new(AmplitudeEncoder)), diff --git a/qdp/qdp-core/src/gpu/memory.rs b/qdp/qdp-core/src/gpu/memory.rs index baa9a29887..de5f74fb52 100644 --- a/qdp/qdp-core/src/gpu/memory.rs +++ b/qdp/qdp-core/src/gpu/memory.rs @@ -585,18 +585,18 @@ impl GpuStateVector { /// /// Allocates page-locked memory to maximize H2D throughput in streaming IO paths. #[cfg(target_os = "linux")] -pub struct PinnedHostBuffer { - ptr: *mut f64, +pub struct PinnedHostBuffer { + ptr: *mut T, size_elements: usize, } #[cfg(target_os = "linux")] -impl PinnedHostBuffer { - /// Allocate pinned memory +impl PinnedHostBuffer { + /// Allocate pinned memory holding `elements` values of type `T`. pub fn new(elements: usize) -> Result { unsafe { let bytes = elements - .checked_mul(std::mem::size_of::()) + .checked_mul(std::mem::size_of::()) .ok_or_else(|| { MahoutError::MemoryAllocation(format!( "Requested pinned buffer allocation size overflow (elements={})", @@ -615,24 +615,24 @@ impl PinnedHostBuffer { } Ok(Self { - ptr: ptr as *mut f64, + ptr: ptr as *mut T, size_elements: elements, }) } } /// Get mutable slice to write data into - pub fn as_slice_mut(&mut self) -> &mut [f64] { + pub fn as_slice_mut(&mut self) -> &mut [T] { unsafe { std::slice::from_raw_parts_mut(self.ptr, self.size_elements) } } /// Immutable slice view of the pinned region - pub fn as_slice(&self) -> &[f64] { + pub fn as_slice(&self) -> &[T] { unsafe { std::slice::from_raw_parts(self.ptr, self.size_elements) } } /// Get raw pointer for CUDA memcpy - pub fn ptr(&self) -> *const f64 { + pub fn ptr(&self) -> *const T { self.ptr } @@ -646,7 +646,7 @@ impl PinnedHostBuffer { } #[cfg(target_os = "linux")] -impl Drop for PinnedHostBuffer { +impl Drop for PinnedHostBuffer { fn drop(&mut self) { unsafe { let result = cudaFreeHost(self.ptr as *mut c_void); @@ -663,7 +663,7 @@ impl Drop for PinnedHostBuffer { // Safety: Pinned memory is accessible from any thread #[cfg(target_os = "linux")] -unsafe impl Send for PinnedHostBuffer {} +unsafe impl Send for PinnedHostBuffer {} #[cfg(target_os = "linux")] -unsafe impl Sync for PinnedHostBuffer {} +unsafe impl Sync for PinnedHostBuffer {} diff --git a/qdp/qdp-core/src/gpu/metrics.rs b/qdp/qdp-core/src/gpu/metrics.rs new file mode 100644 index 0000000000..a7ce54e312 --- /dev/null +++ b/qdp/qdp-core/src/gpu/metrics.rs @@ -0,0 +1,218 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Fidelity and trace distance metrics for quantum state validation. +//! +//! These utilities compare quantum states encoded at different precisions +//! or by different implementations. All computations download GPU data +//! to the host and run on the CPU to produce a single scalar per sample. +//! They are intended for **testing and validation**, not the hot path. + +#[cfg(target_os = "linux")] +use cudarc::driver::CudaDevice; +#[cfg(target_os = "linux")] +use std::sync::Arc; + +#[cfg(target_os = "linux")] +use qdp_kernels::{CuComplex, CuDoubleComplex}; + +use crate::error::{MahoutError, Result}; + +/// Compute the state fidelity |⟨ψ|φ⟩|² between two complex state vectors +/// given as interleaved (re, im) f64 slices of equal length. +/// +/// Both slices must have length `2 * state_dim` (re0, im0, re1, im1, ...). +/// Inputs must be normalized; the result is clamped to `[0, 1]` to absorb +/// floating-point rounding error. Fidelity == 1 means identical states (up +/// to global phase). +pub fn fidelity_f64(state_a: &[f64], state_b: &[f64]) -> Result { + if state_a.len() != state_b.len() { + return Err(MahoutError::InvalidInput(format!( + "fidelity: length mismatch ({} vs {})", + state_a.len(), + state_b.len() + ))); + } + if !state_a.len().is_multiple_of(2) { + return Err(MahoutError::InvalidInput( + "fidelity: length must be even (interleaved re/im pairs)".to_string(), + )); + } + + // ⟨ψ|φ⟩ = Σ_i conj(a_i) * b_i + let mut re_acc = 0.0_f64; + let mut im_acc = 0.0_f64; + for i in (0..state_a.len()).step_by(2) { + let a_re = state_a[i]; + let a_im = state_a[i + 1]; + let b_re = state_b[i]; + let b_im = state_b[i + 1]; + // conj(a) * b = (a_re - i*a_im)(b_re + i*b_im) + // = (a_re*b_re + a_im*b_im) + i*(a_re*b_im - a_im*b_re) + re_acc += a_re * b_re + a_im * b_im; + im_acc += a_re * b_im - a_im * b_re; + } + + Ok((re_acc * re_acc + im_acc * im_acc).clamp(0.0, 1.0)) +} + +/// Compute fidelity from interleaved f32 data (promoted to f64 for accumulation). +pub fn fidelity_f32(state_a: &[f32], state_b: &[f32]) -> Result { + if state_a.len() != state_b.len() { + return Err(MahoutError::InvalidInput(format!( + "fidelity_f32: length mismatch ({} vs {})", + state_a.len(), + state_b.len() + ))); + } + if !state_a.len().is_multiple_of(2) { + return Err(MahoutError::InvalidInput( + "fidelity_f32: length must be even (interleaved re/im pairs)".to_string(), + )); + } + + let mut re_acc = 0.0_f64; + let mut im_acc = 0.0_f64; + for i in (0..state_a.len()).step_by(2) { + let a_re = state_a[i] as f64; + let a_im = state_a[i + 1] as f64; + let b_re = state_b[i] as f64; + let b_im = state_b[i + 1] as f64; + re_acc += a_re * b_re + a_im * b_im; + im_acc += a_re * b_im - a_im * b_re; + } + + Ok((re_acc * re_acc + im_acc * im_acc).clamp(0.0, 1.0)) +} + +/// Cross-precision fidelity: compare an f32 state against an f64 reference. +/// Both are interleaved (re, im) with the same number of complex elements. +pub fn fidelity_cross_precision(state_f32: &[f32], state_f64: &[f64]) -> Result { + if state_f32.len() != state_f64.len() { + return Err(MahoutError::InvalidInput(format!( + "fidelity_cross_precision: length mismatch ({} vs {})", + state_f32.len(), + state_f64.len() + ))); + } + if !state_f32.len().is_multiple_of(2) { + return Err(MahoutError::InvalidInput( + "fidelity_cross_precision: length must be even".to_string(), + )); + } + + let mut re_acc = 0.0_f64; + let mut im_acc = 0.0_f64; + for i in (0..state_f32.len()).step_by(2) { + let a_re = state_f32[i] as f64; + let a_im = state_f32[i + 1] as f64; + let b_re = state_f64[i]; + let b_im = state_f64[i + 1]; + re_acc += a_re * b_re + a_im * b_im; + im_acc += a_re * b_im - a_im * b_re; + } + + Ok((re_acc * re_acc + im_acc * im_acc).clamp(0.0, 1.0)) +} + +/// Trace distance between two pure states: √(1 − |⟨ψ|φ⟩|²). +/// Returns a value in [0, 1]. 0 means identical, 1 means orthogonal. +pub fn trace_distance_f64(state_a: &[f64], state_b: &[f64]) -> Result { + let f = fidelity_f64(state_a, state_b)?; + Ok((1.0 - f.clamp(0.0, 1.0)).sqrt()) +} + +/// Trace distance for f32 states. +pub fn trace_distance_f32(state_a: &[f32], state_b: &[f32]) -> Result { + let f = fidelity_f32(state_a, state_b)?; + Ok((1.0 - f.clamp(0.0, 1.0)).sqrt()) +} + +/// Trace distance cross-precision (f32 vs f64). +pub fn trace_distance_cross_precision(state_f32: &[f32], state_f64: &[f64]) -> Result { + let f = fidelity_cross_precision(state_f32, state_f64)?; + Ok((1.0 - f.clamp(0.0, 1.0)).sqrt()) +} + +// ── GPU readback helpers (Linux/CUDA only) ────────────────────────────── + +/// Download f64 complex GPU data to host as interleaved (re, im) f64 vec. +/// +/// `gpu_ptr` must point to `num_elements` `CuDoubleComplex` values on device. +#[cfg(target_os = "linux")] +pub fn download_complex_f64( + device: &Arc, + gpu_ptr: *const CuDoubleComplex, + num_elements: usize, +) -> Result> { + if gpu_ptr.is_null() { + return Err(MahoutError::InvalidInput( + "download_complex_f64: null GPU pointer".to_string(), + )); + } + + let byte_count = num_elements * std::mem::size_of::(); + let mut host_buf = vec![0.0_f64; num_elements * 2]; // interleaved re, im + + unsafe { + let ret = cudarc::driver::sys::lib().cuMemcpyDtoH_v2( + host_buf.as_mut_ptr() as *mut _, + gpu_ptr as u64, + byte_count, + ); + if ret != cudarc::driver::sys::CUresult::CUDA_SUCCESS { + return Err(MahoutError::Cuda(format!( + "cuMemcpyDtoH failed during f64 download: {:?}", + ret + ))); + } + } + let _ = device; // keep device alive + Ok(host_buf) +} + +/// Download f32 complex GPU data to host as interleaved (re, im) f32 vec. +#[cfg(target_os = "linux")] +pub fn download_complex_f32( + device: &Arc, + gpu_ptr: *const CuComplex, + num_elements: usize, +) -> Result> { + if gpu_ptr.is_null() { + return Err(MahoutError::InvalidInput( + "download_complex_f32: null GPU pointer".to_string(), + )); + } + + let byte_count = num_elements * std::mem::size_of::(); + let mut host_buf = vec![0.0_f32; num_elements * 2]; + + unsafe { + let ret = cudarc::driver::sys::lib().cuMemcpyDtoH_v2( + host_buf.as_mut_ptr() as *mut _, + gpu_ptr as u64, + byte_count, + ); + if ret != cudarc::driver::sys::CUresult::CUDA_SUCCESS { + return Err(MahoutError::Cuda(format!( + "cuMemcpyDtoH failed during f32 download: {:?}", + ret + ))); + } + } + let _ = device; + Ok(host_buf) +} diff --git a/qdp/qdp-core/src/gpu/mod.rs b/qdp/qdp-core/src/gpu/mod.rs index 7e16be7be3..d4c8d5dd0e 100644 --- a/qdp/qdp-core/src/gpu/mod.rs +++ b/qdp/qdp-core/src/gpu/mod.rs @@ -20,11 +20,14 @@ pub mod buffer_pool; pub(crate) mod cuda_sync; pub mod encodings; pub mod memory; +pub mod metrics; #[cfg(target_os = "linux")] pub mod overlap_tracker; pub mod pipeline; #[cfg(target_os = "linux")] pub mod pool_metrics; +#[cfg(target_os = "linux")] +pub(crate) mod validation; #[cfg(target_os = "linux")] pub(crate) mod cuda_ffi; @@ -33,6 +36,12 @@ pub(crate) mod cuda_ffi; pub use buffer_pool::{PinnedBufferHandle, PinnedBufferPool}; pub use encodings::{AmplitudeEncoder, AngleEncoder, BasisEncoder, QuantumEncoder, get_encoder}; pub use memory::GpuStateVector; +#[cfg(target_os = "linux")] +pub use metrics::{download_complex_f32, download_complex_f64}; +pub use metrics::{ + fidelity_cross_precision, fidelity_f32, fidelity_f64, trace_distance_cross_precision, + trace_distance_f32, trace_distance_f64, +}; pub use pipeline::run_dual_stream_pipeline; #[cfg(target_os = "linux")] diff --git a/qdp/qdp-core/src/gpu/pipeline.rs b/qdp/qdp-core/src/gpu/pipeline.rs index 9a715df6f8..6d31cbdacd 100644 --- a/qdp/qdp-core/src/gpu/pipeline.rs +++ b/qdp/qdp-core/src/gpu/pipeline.rs @@ -39,7 +39,7 @@ use crate::gpu::memory::{ensure_device_memory_available, map_allocation_error}; use crate::gpu::overlap_tracker::OverlapTracker; #[cfg(target_os = "linux")] use crate::gpu::pool_metrics::PoolMetrics; -use cudarc::driver::{CudaDevice, CudaSlice, DevicePtr, safe::CudaStream}; +use cudarc::driver::{CudaDevice, CudaSlice, DevicePtr, DeviceRepr, safe::CudaStream}; use std::ffi::c_void; use std::sync::Arc; @@ -98,21 +98,21 @@ impl PipelineContext { /// Async H2D copy on the copy stream. /// /// # Safety - /// `src` must be valid for `len_elements` `f64` values and properly aligned. - /// `dst` must point to device memory for `len_elements` `f64` values on the same device. + /// `src` must be valid for `len_bytes` bytes and properly aligned. + /// `dst` must point to device memory for `len_bytes` bytes on the same device. /// Both pointers must remain valid until the copy completes on `stream_copy`. pub unsafe fn async_copy_to_device( &self, src: *const c_void, dst: *mut c_void, - len_elements: usize, + len_bytes: usize, ) -> Result<()> { crate::profile_scope!("GPU::H2D_Copy"); unsafe { let ret = cudaMemcpyAsync( dst, src, - len_elements * std::mem::size_of::(), + len_bytes, CUDA_MEMCPY_HOST_TO_DEVICE, self.stream_copy.stream as *mut c_void, ); @@ -261,7 +261,7 @@ where crate::profile_scope!("GPU::AsyncPipeline"); const CHUNK_SIZE_ELEMENTS: usize = 8 * 1024 * 1024 / std::mem::size_of::(); // 8MB - run_dual_stream_pipeline_with_chunk_size( + run_dual_stream_pipeline_with_chunk_size::( device, host_data, CHUNK_SIZE_ELEMENTS, @@ -283,6 +283,46 @@ pub fn run_dual_stream_pipeline_aligned( ) -> Result<()> where F: FnMut(&CudaStream, *const f64, usize, usize) -> Result<()>, +{ + run_dual_stream_pipeline_aligned_typed::( + device, + host_data, + align_elements, + kernel_launcher, + ) +} + +/// f32 variant of `run_dual_stream_pipeline_aligned`. +#[cfg(target_os = "linux")] +#[allow(clippy::manual_is_multiple_of)] +pub fn run_dual_stream_pipeline_aligned_f32( + device: &Arc, + host_data: &[f32], + align_elements: usize, + kernel_launcher: F, +) -> Result<()> +where + F: FnMut(&CudaStream, *const f32, usize, usize) -> Result<()>, +{ + run_dual_stream_pipeline_aligned_typed::( + device, + host_data, + align_elements, + kernel_launcher, + ) +} + +#[cfg(target_os = "linux")] +#[allow(clippy::manual_is_multiple_of)] +fn run_dual_stream_pipeline_aligned_typed( + device: &Arc, + host_data: &[T], + align_elements: usize, + kernel_launcher: F, +) -> Result<()> +where + T: DeviceRepr + Copy + Send + Sync + Unpin + 'static, + F: FnMut(&CudaStream, *const T, usize, usize) -> Result<()>, { crate::profile_scope!("GPU::AsyncPipelineAligned"); @@ -299,14 +339,14 @@ where ))); } - const BASE_CHUNK_SIZE_ELEMENTS: usize = 8 * 1024 * 1024 / std::mem::size_of::(); // 8MB - let chunk_size_elements = if align_elements >= BASE_CHUNK_SIZE_ELEMENTS { + let base_chunk_size_elements: usize = 8 * 1024 * 1024 / std::mem::size_of::(); // 8MB + let chunk_size_elements = if align_elements >= base_chunk_size_elements { align_elements } else { - BASE_CHUNK_SIZE_ELEMENTS - (BASE_CHUNK_SIZE_ELEMENTS % align_elements) + base_chunk_size_elements - (base_chunk_size_elements % align_elements) }; - run_dual_stream_pipeline_with_chunk_size( + run_dual_stream_pipeline_with_chunk_size::( device, host_data, chunk_size_elements, @@ -315,14 +355,15 @@ where } #[cfg(target_os = "linux")] -fn run_dual_stream_pipeline_with_chunk_size( +fn run_dual_stream_pipeline_with_chunk_size( device: &Arc, - host_data: &[f64], + host_data: &[T], chunk_size_elements: usize, mut kernel_launcher: F, ) -> Result<()> where - F: FnMut(&CudaStream, *const f64, usize, usize) -> Result<()>, + T: DeviceRepr + Copy + Send + Sync + Unpin + 'static, + F: FnMut(&CudaStream, *const T, usize, usize) -> Result<()>, { if chunk_size_elements == 0 { return Err(MahoutError::InvalidInput( @@ -362,9 +403,9 @@ where // TODO: tune dynamically based on GPU/PCIe bandwidth. // 3. Keep temporary buffers alive until all streams complete // This prevents Rust from dropping them while GPU is still using them - let mut keep_alive_buffers: Vec> = Vec::new(); + let mut keep_alive_buffers: Vec> = Vec::new(); // Keep pinned buffers alive until the copy stream has completed their H2D copy - let mut in_flight_pinned: Vec = Vec::new(); + let mut in_flight_pinned: Vec> = Vec::new(); // 4. Pipeline loop: copy on copy stream, compute on compute stream with event handoff let mut chunk_idx = 0usize; @@ -383,7 +424,7 @@ where // Allocate temporary device buffer for this chunk #[allow(clippy::collapsible_if, clippy::manual_is_multiple_of)] - let input_chunk_dev = unsafe { device.alloc::(chunk.len()) }.map_err(|e| { + let input_chunk_dev = unsafe { device.alloc::(chunk.len()) }.map_err(|e| { map_allocation_error(chunk_bytes, "pipeline chunk buffer allocation", None, e) })?; @@ -420,7 +461,7 @@ where ctx.async_copy_to_device( pinned_buf.ptr() as *const c_void, *input_chunk_dev.device_ptr() as *mut c_void, - chunk.len(), + std::mem::size_of_val(chunk), )?; // Record copy end if overlap tracking enabled @@ -452,7 +493,7 @@ where } // Get device pointer for kernel launch - let input_ptr = *input_chunk_dev.device_ptr() as *const f64; + let input_ptr = *input_chunk_dev.device_ptr() as *const T; // Invoke caller's kernel launcher (non-blocking) { diff --git a/qdp/qdp-core/src/gpu/validation.rs b/qdp/qdp-core/src/gpu/validation.rs new file mode 100644 index 0000000000..d7968599a8 --- /dev/null +++ b/qdp/qdp-core/src/gpu/validation.rs @@ -0,0 +1,275 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! GPU-side validation helpers shared across encoders. +//! +//! These wrap the atomic-flag validation kernels in `qdp-kernels/src/validation.cu`. +//! They allocate a 1-element i32 flag buffer, launch the relevant kernel, and copy +//! the flag back to the host, converting non-zero states into `MahoutError::InvalidInput`. + +#![allow(unused_unsafe)] + +use crate::error::{MahoutError, Result, cuda_error_to_string}; +use cudarc::driver::{CudaDevice, CudaSlice, DevicePtrMut}; +use std::ffi::c_void; +use std::sync::Arc; + +// Error bitmask flags emitted by `check_basis_indices_kernel_*`. +// Must stay in sync with `qdp-kernels/src/validation.cu`. +const BASIS_IDX_ERR_NON_FINITE: i32 = 0x1; +const BASIS_IDX_ERR_NEGATIVE: i32 = 0x2; +const BASIS_IDX_ERR_NON_INTEGER: i32 = 0x4; +const BASIS_IDX_ERR_OUT_OF_RANGE: i32 = 0x8; + +/// Assert that every value in a device-resident f32 buffer is finite. +/// +/// # Safety +/// `input_d` must point to at least `total_values` `f32`s on `device`, and `stream` must +/// be either null or a valid CUDA stream associated with `device`. +pub unsafe fn assert_all_finite_f32( + device: &Arc, + input_d: *const f32, + total_values: usize, + stream: *mut c_void, + context: &str, +) -> Result<()> { + if total_values == 0 { + return Ok(()); + } + let mut flag = device.alloc_zeros::(1).map_err(|e| { + MahoutError::MemoryAllocation(format!( + "Failed to allocate finite-check flag buffer: {:?}", + e + )) + })?; + let ret = unsafe { + qdp_kernels::launch_check_finite_batch_f32( + input_d, + total_values, + *flag.device_ptr_mut() as *mut i32, + stream, + ) + }; + if ret != 0 { + return Err(MahoutError::KernelLaunch(format!( + "{}: finite validation kernel (f32) failed: {} ({})", + context, + ret, + cuda_error_to_string(ret) + ))); + } + let host_flags = device.dtoh_sync_copy(&flag).map_err(|e| { + MahoutError::Cuda(format!( + "{}: failed to copy finite validation flag: {:?}", + context, e + )) + })?; + if host_flags.first().copied().unwrap_or_default() != 0 { + return Err(MahoutError::InvalidInput(format!( + "{}: batch contains non-finite values (NaN or Inf)", + context + ))); + } + Ok(()) +} + +/// Assert that every value in a device-resident f64 buffer is finite. +/// +/// # Safety +/// See `assert_all_finite_f32`. +pub unsafe fn assert_all_finite_f64( + device: &Arc, + input_d: *const f64, + total_values: usize, + stream: *mut c_void, + context: &str, +) -> Result<()> { + if total_values == 0 { + return Ok(()); + } + let mut flag = device.alloc_zeros::(1).map_err(|e| { + MahoutError::MemoryAllocation(format!( + "Failed to allocate finite-check flag buffer: {:?}", + e + )) + })?; + let ret = unsafe { + qdp_kernels::launch_check_finite_batch_f64( + input_d, + total_values, + *flag.device_ptr_mut() as *mut i32, + stream, + ) + }; + if ret != 0 { + return Err(MahoutError::KernelLaunch(format!( + "{}: finite validation kernel (f64) failed: {} ({})", + context, + ret, + cuda_error_to_string(ret) + ))); + } + let host_flags = device.dtoh_sync_copy(&flag).map_err(|e| { + MahoutError::Cuda(format!( + "{}: failed to copy finite validation flag: {:?}", + context, e + )) + })?; + if host_flags.first().copied().unwrap_or_default() != 0 { + return Err(MahoutError::InvalidInput(format!( + "{}: batch contains non-finite values (NaN or Inf)", + context + ))); + } + Ok(()) +} + +fn basis_index_error_message(flags: i32, state_len: usize) -> String { + let mut reasons: Vec<&'static str> = Vec::new(); + if flags & BASIS_IDX_ERR_NON_FINITE != 0 { + reasons.push("non-finite"); + } + if flags & BASIS_IDX_ERR_NEGATIVE != 0 { + reasons.push("negative"); + } + if flags & BASIS_IDX_ERR_NON_INTEGER != 0 { + reasons.push("non-integer"); + } + if flags & BASIS_IDX_ERR_OUT_OF_RANGE != 0 { + reasons.push("out of range"); + } + format!( + "Basis index batch contains invalid values ({}); valid indices must be finite, \ + non-negative integers in [0, {})", + reasons.join(", "), + state_len + ) +} + +/// Validate a device-resident f32 basis-index buffer and cast it to `size_t`. +/// +/// Returns a newly allocated device buffer holding the truncated indices. +/// Synchronizes on `stream` to make the error flag observable. +/// +/// # Safety +/// See `assert_all_finite_f32`. +pub unsafe fn validate_and_cast_basis_indices_f32( + device: &Arc, + input_d: *const f32, + num_samples: usize, + state_len: usize, + stream: *mut c_void, +) -> Result> { + if num_samples == 0 { + return Err(MahoutError::InvalidInput( + "Number of samples cannot be zero".into(), + )); + } + let mut flag = device.alloc_zeros::(1).map_err(|e| { + MahoutError::MemoryAllocation(format!( + "Failed to allocate basis-index flag buffer: {:?}", + e + )) + })?; + let mut indices_out = device.alloc_zeros::(num_samples).map_err(|e| { + MahoutError::MemoryAllocation(format!( + "Failed to allocate basis-index cast buffer: {:?}", + e + )) + })?; + let ret = unsafe { + qdp_kernels::launch_validate_and_cast_basis_indices_f32( + input_d, + num_samples, + state_len, + *indices_out.device_ptr_mut() as *mut usize, + *flag.device_ptr_mut() as *mut i32, + stream, + ) + }; + if ret != 0 { + return Err(MahoutError::KernelLaunch(format!( + "Basis index validate+cast kernel (f32) failed: {} ({})", + ret, + cuda_error_to_string(ret) + ))); + } + let host_flags = device.dtoh_sync_copy(&flag).map_err(|e| { + MahoutError::Cuda(format!( + "Failed to copy basis-index validation flag: {:?}", + e + )) + })?; + let bits = host_flags.first().copied().unwrap_or_default(); + if bits != 0 { + return Err(MahoutError::InvalidInput(basis_index_error_message( + bits, state_len, + ))); + } + Ok(indices_out) +} + +/// Assert that every value in a device-resident `usize` buffer is a valid +/// basis index (i.e. strictly less than `state_len`). +/// +/// # Safety +/// See `assert_all_finite_f32`. +pub unsafe fn assert_basis_indices_in_range_usize( + device: &Arc, + indices_d: *const usize, + num_samples: usize, + state_len: usize, + stream: *mut c_void, +) -> Result<()> { + if num_samples == 0 { + return Ok(()); + } + let mut flag = device.alloc_zeros::(1).map_err(|e| { + MahoutError::MemoryAllocation(format!( + "Failed to allocate basis-index flag buffer: {:?}", + e + )) + })?; + let ret = unsafe { + qdp_kernels::launch_check_basis_indices_usize( + indices_d, + num_samples, + state_len, + *flag.device_ptr_mut() as *mut i32, + stream, + ) + }; + if ret != 0 { + return Err(MahoutError::KernelLaunch(format!( + "Basis index bounds-check kernel failed: {} ({})", + ret, + cuda_error_to_string(ret) + ))); + } + let host_flags = device.dtoh_sync_copy(&flag).map_err(|e| { + MahoutError::Cuda(format!( + "Failed to copy basis-index validation flag: {:?}", + e + )) + })?; + let bits = host_flags.first().copied().unwrap_or_default(); + if bits != 0 { + return Err(MahoutError::InvalidInput(basis_index_error_message( + bits, state_len, + ))); + } + Ok(()) +} diff --git a/qdp/qdp-core/src/lib.rs b/qdp/qdp-core/src/lib.rs index 799eb7b180..2a531ca441 100644 --- a/qdp/qdp-core/src/lib.rs +++ b/qdp/qdp-core/src/lib.rs @@ -515,11 +515,16 @@ impl QdpEngine { Ok(state_vector.to_dlpack()) } - /// Encode from existing GPU pointer (float32 input, amplitude encoding only) + /// Encode from existing GPU pointer (float32 input, **amplitude encoding only**). /// /// Zero-copy encoding from PyTorch CUDA float32 tensors. Uses the default CUDA stream. /// For stream interop use `encode_from_gpu_ptr_f32_with_stream`. /// + /// This method does **not** dispatch by `encoding_method` — it always runs amplitude. + /// For other encodings use the explicit variants + /// ([`encode_angle_from_gpu_ptr_f32`](Self::encode_angle_from_gpu_ptr_f32), + /// [`encode_basis_from_gpu_ptr_f32`](Self::encode_basis_from_gpu_ptr_f32)). + /// /// # Arguments /// * `input_d` - Device pointer to input data (f32 array on GPU) /// * `input_len` - Number of f32 elements in the input @@ -696,11 +701,16 @@ impl QdpEngine { Ok(state_vector.to_dlpack()) } - /// Encode a batch from an existing GPU pointer (float32 input, amplitude encoding only). + /// Encode a batch from an existing GPU pointer (float32 input, **amplitude encoding only**). /// /// Zero-copy batch encoding from PyTorch CUDA float32 tensors. Uses the default CUDA stream. /// For stream interop use `encode_batch_from_gpu_ptr_f32_with_stream`. /// + /// This method does **not** dispatch by `encoding_method` — it always runs amplitude. + /// For other encodings use the explicit variants + /// ([`encode_angle_batch_from_gpu_ptr_f32`](Self::encode_angle_batch_from_gpu_ptr_f32), + /// [`encode_basis_batch_from_gpu_ptr_f32`](Self::encode_basis_batch_from_gpu_ptr_f32)). + /// /// # Safety /// The input pointer must: /// - Point to valid GPU memory on the same device as the engine @@ -840,6 +850,136 @@ impl QdpEngine { Ok(batch_state_vector.to_dlpack()) } + /// Encode a single basis index from an existing GPU pointer (float32). + /// + /// The GPU validates the index (finite, non-negative, integer-valued, + /// `< 2^num_qubits`) before encoding. Uses the default CUDA stream. + /// + /// # Safety + /// The input pointer must: + /// - Point to one valid f32 in GPU memory on the same device as the engine + /// - Remain valid for the duration of this call + #[cfg(target_os = "linux")] + pub unsafe fn encode_basis_from_gpu_ptr_f32( + &self, + input_d: *const f32, + num_qubits: usize, + ) -> Result<*mut DLManagedTensor> { + unsafe { + self.encode_basis_from_gpu_ptr_f32_with_stream( + input_d, + num_qubits, + std::ptr::null_mut(), + ) + } + } + + /// Encode a single basis index from an existing GPU pointer (float32) on a + /// specified CUDA stream. + /// + /// # Safety + /// In addition to the `encode_basis_from_gpu_ptr_f32` requirements, the + /// stream pointer must remain valid for the duration of this call. + #[cfg(target_os = "linux")] + pub unsafe fn encode_basis_from_gpu_ptr_f32_with_stream( + &self, + input_d: *const f32, + num_qubits: usize, + stream: *mut c_void, + ) -> Result<*mut DLManagedTensor> { + crate::profile_scope!("Mahout::EncodeBasisFromGpuPtrF32"); + + validate_cuda_input_ptr(&self.device, input_d as *const c_void)?; + + let state_vector = unsafe { + gpu::BasisEncoder::encode_from_gpu_ptr_f32_with_stream( + &self.device, + input_d, + num_qubits, + stream, + ) + }?; + let state_vector = state_vector.to_precision(&self.device, self.precision)?; + Ok(state_vector.to_dlpack()) + } + + /// Encode a basis batch from an existing GPU pointer (float32 index input). + /// + /// Zero-copy batch encoding from CUDA float32 tensors. Each element is treated + /// as a basis index; the GPU validates (finite, non-negative, integer-valued, + /// `< 2^num_qubits`) and casts to `size_t` before encoding. + /// + /// Uses the default CUDA stream. For stream interop use + /// `encode_basis_batch_from_gpu_ptr_f32_with_stream`. + /// + /// # Safety + /// The input pointer must: + /// - Point to valid GPU memory on the same device as the engine + /// - Contain at least `num_samples` f32 elements + /// - Remain valid for the duration of this call + #[cfg(target_os = "linux")] + pub unsafe fn encode_basis_batch_from_gpu_ptr_f32( + &self, + input_batch_d: *const f32, + num_samples: usize, + sample_size: usize, + num_qubits: usize, + ) -> Result<*mut DLManagedTensor> { + unsafe { + self.encode_basis_batch_from_gpu_ptr_f32_with_stream( + input_batch_d, + num_samples, + sample_size, + num_qubits, + std::ptr::null_mut(), + ) + } + } + + /// Encode a basis batch from an existing GPU pointer (float32 index input) + /// on a specified CUDA stream. + /// + /// # Safety + /// In addition to the `encode_basis_batch_from_gpu_ptr_f32` requirements, + /// the stream pointer must remain valid for the duration of this call. + #[cfg(target_os = "linux")] + pub unsafe fn encode_basis_batch_from_gpu_ptr_f32_with_stream( + &self, + input_batch_d: *const f32, + num_samples: usize, + sample_size: usize, + num_qubits: usize, + stream: *mut c_void, + ) -> Result<*mut DLManagedTensor> { + crate::profile_scope!("Mahout::EncodeBasisBatchFromGpuPtrF32"); + + if num_samples == 0 { + return Err(MahoutError::InvalidInput( + "Number of samples cannot be zero".into(), + )); + } + if sample_size == 0 { + return Err(MahoutError::InvalidInput( + "Sample size cannot be zero".into(), + )); + } + + validate_cuda_input_ptr(&self.device, input_batch_d as *const c_void)?; + + let batch_state_vector = unsafe { + gpu::BasisEncoder::encode_batch_from_gpu_ptr_f32_with_stream( + &self.device, + input_batch_d, + num_samples, + sample_size, + num_qubits, + stream, + ) + }?; + let batch_state_vector = batch_state_vector.to_precision(&self.device, self.precision)?; + Ok(batch_state_vector.to_dlpack()) + } + /// Encode batch from existing GPU pointer (zero-copy for CUDA tensors) /// /// This method enables zero-copy batch encoding from PyTorch CUDA tensors. diff --git a/qdp/qdp-core/src/pipeline_runner.rs b/qdp/qdp-core/src/pipeline_runner.rs index 42bb5cc655..4c2358fbf8 100644 --- a/qdp/qdp-core/src/pipeline_runner.rs +++ b/qdp/qdp-core/src/pipeline_runner.rs @@ -45,7 +45,7 @@ pub struct PipelineConfig { impl PipelineConfig { /// Normalizes the configuration, such as falling back to f64 if f32 is requested - /// but the encoding doesn't support it. + /// but the encoding doesn't support it, and auto-computing prefetch depth if not set. pub fn normalize(&mut self) { if self.float32_pipeline && !encoding_supports_f32(&self.encoding_method) { log::info!( @@ -54,6 +54,22 @@ impl PipelineConfig { ); self.float32_pipeline = false; } + if self.prefetch_depth == 0 { + self.prefetch_depth = compute_optimal_prefetch_depth( + self.num_qubits as usize, + self.batch_size, + &self.encoding_method, + self.float32_pipeline, + ); + log::debug!( + "auto prefetch_depth={} (qubits={}, batch={}, encoding={}, f32={})", + self.prefetch_depth, + self.num_qubits, + self.batch_size, + self.encoding_method, + self.float32_pipeline, + ); + } } } @@ -69,7 +85,7 @@ impl Default for PipelineConfig { warmup_batches: 0, null_handling: NullHandling::FillZero, float32_pipeline: false, - prefetch_depth: 16, + prefetch_depth: 0, // 0 = auto-compute in normalize() } } } @@ -99,10 +115,55 @@ pub trait BatchProducer: Send + 'static { fn produce(&mut self, recycled: Option) -> Result>; } +/// Compute optimal prefetch depth to keep the CPU-side prefetch buffer under ~256 MB, +/// clamped to [1, 32]. +/// +/// The CPU prefetch buffer holds `prefetch_depth` batches of raw input data (not +/// encoded state vectors, which live on the GPU). For amplitude encoding the input +/// size dominates; for angle/basis it is tiny, so the cap kicks in at the upper bound. +/// +/// | num_qubits | encoding | bytes/batch (f64, bs=64) | auto depth | +/// |------------|------------ |--------------------------|------------| +/// | 8 | amplitude | ~128 KB | 32 | +/// | 12 | amplitude | ~2 MB | 32 | +/// | 16 | amplitude | ~32 MB | 8 | +/// | 20 | amplitude | ~512 MB | 1 | +/// | * | angle/basis| tiny | 32 | +fn compute_optimal_prefetch_depth( + num_qubits: usize, + batch_size: usize, + encoding_method: &str, + float32: bool, +) -> usize { + const TARGET_BYTES: usize = 256 * 1024 * 1024; // 256 MB + const MIN_DEPTH: usize = 1; + const MAX_DEPTH: usize = 32; + + let sample_len = match encoding_method.to_lowercase().as_str() { + "angle" => Some(num_qubits), + "basis" => Some(1), + _ => 1usize.checked_shl(num_qubits as u32), // amplitude / iqp; None on overflow + }; + let bytes_per_element = if float32 { 4usize } else { 8usize }; + // Use checked arithmetic throughout; treat any overflow as "extremely large batch" + // and return MIN_DEPTH so we never buffer more than one batch. + let bytes_per_batch = sample_len + .and_then(|s| batch_size.checked_mul(s)) + .and_then(|b| b.checked_mul(bytes_per_element)); + + match bytes_per_batch { + None | Some(0) => MIN_DEPTH, + Some(bpb) => (TARGET_BYTES / bpb).clamp(MIN_DEPTH, MAX_DEPTH), + } +} + /// Returns true if the given encoding method has a native f32 GPU kernel. /// Used to auto-gate `float32_pipeline` so unsupported encodings fall back to f64. fn encoding_supports_f32(encoding_method: &str) -> bool { - matches!(encoding_method.to_lowercase().as_str(), "amplitude") + matches!( + encoding_method.to_lowercase().as_str(), + "amplitude" | "angle" | "basis" + ) } pub struct SyntheticProducer { @@ -355,12 +416,34 @@ fn read_file_by_extension( /// Stateful iterator that yields one batch DLPack at a time for Python `for` loop consumption. /// Reads prefetched batches via a bounded channel. +/// +/// # Thread safety +/// `Receiver` is `!Sync`, so `rx` is wrapped in a `Mutex` to satisfy PyO3's `#[pyclass]` +/// `Sync` bound. In practice, `PyRefMut` guarantees exclusive access, so the lock is +/// never contended at runtime. `Sender` is already `Send + Sync` and needs no wrapper. pub struct PipelineIterator { engine: QdpEngine, config: PipelineConfig, - rx: std::sync::Mutex>>, - recycle_tx: std::sync::Mutex>, - _producer_handle: std::sync::Mutex>, + rx: std::sync::Mutex>>>, + recycle_tx: Option>, + producer_handle: Option>, +} + +impl Drop for PipelineIterator { + fn drop(&mut self) { + // Drop the recycle sender first to unblock the producer if it is waiting on try_recv. + drop(self.recycle_tx.take()); + // Close the receiver by taking it out of the Option. This makes any pending + // or future tx.send() in the producer thread return Err(SendError), so the + // producer exits its loop without us having to drain the channel manually. + // The previous drain-loop approach had a TOCTOU race: after we drained, the + // producer could refill the sync_channel and block on tx.send() forever + // while we were waiting on join(). + drop(self.rx.lock().unwrap().take()); + if let Some(handle) = self.producer_handle.take() { + let _ = handle.join(); + } + } } impl PipelineIterator { @@ -373,9 +456,9 @@ impl PipelineIterator { Ok(Self { engine, config, - rx: std::sync::Mutex::new(rx), - recycle_tx: std::sync::Mutex::new(recycle_tx), - _producer_handle: std::sync::Mutex::new(_producer_handle), + rx: std::sync::Mutex::new(Some(rx)), + recycle_tx: Some(recycle_tx), + producer_handle: Some(_producer_handle), }) } @@ -425,9 +508,9 @@ impl PipelineIterator { Ok(Self { engine, config, - rx: std::sync::Mutex::new(rx), - recycle_tx: std::sync::Mutex::new(recycle_tx), - _producer_handle: std::sync::Mutex::new(_producer_handle), + rx: std::sync::Mutex::new(Some(rx)), + recycle_tx: Some(recycle_tx), + producer_handle: Some(_producer_handle), }) } @@ -497,15 +580,15 @@ impl PipelineIterator { Ok(Self { engine, config, - rx: std::sync::Mutex::new(rx), - recycle_tx: std::sync::Mutex::new(recycle_tx), - _producer_handle: std::sync::Mutex::new(_producer_handle), + rx: std::sync::Mutex::new(Some(rx)), + recycle_tx: Some(recycle_tx), + producer_handle: Some(_producer_handle), }) } /// Returns the next batch as a DLPack pointer; `Ok(None)` when exhausted. pub fn next_batch(&mut self) -> Result> { - let batch = match self.rx.lock().unwrap().recv() { + let batch = match self.rx.lock().unwrap().as_ref().unwrap().recv() { Ok(Ok(b)) => b, Ok(Err(e)) => return Err(e), Err(_) => return Ok(None), @@ -526,7 +609,9 @@ impl PipelineIterator { &self.config.encoding_method, )?, }; - let _ = self.recycle_tx.lock().unwrap().send(batch.data); + if let Some(tx) = &self.recycle_tx { + let _ = tx.send(batch.data); + } Ok(Some(ptr)) } } @@ -1117,13 +1202,13 @@ mod tests { } #[test] - fn test_synthetic_producer_f32_fallback_for_angle() { + fn test_synthetic_producer_f32_for_angle() { let mut config = PipelineConfig { total_batches: 1, num_qubits: 3, batch_size: 4, encoding_method: "angle".to_string(), - float32_pipeline: true, // requested f32, but angle doesn't support it + float32_pipeline: true, ..Default::default() }; config.normalize(); @@ -1132,13 +1217,13 @@ mod tests { let batch = producer.produce(None).unwrap().unwrap(); assert!( - matches!(batch.data, BatchData::F64(_)), - "angle with float32_pipeline=true should fall back to F64 data" + matches!(batch.data, BatchData::F32(_)), + "angle with float32_pipeline=true should produce F32 data" ); } #[test] - fn test_synthetic_producer_f32_fallback_for_basis() { + fn test_synthetic_producer_f32_for_basis() { let mut config = PipelineConfig { total_batches: 1, num_qubits: 3, @@ -1153,8 +1238,8 @@ mod tests { let batch = producer.produce(None).unwrap().unwrap(); assert!( - matches!(batch.data, BatchData::F64(_)), - "basis with float32_pipeline=true should fall back to F64 data" + matches!(batch.data, BatchData::F32(_)), + "basis with float32_pipeline=true should produce F32 data" ); } @@ -1163,8 +1248,62 @@ mod tests { assert!(super::encoding_supports_f32("amplitude")); assert!(super::encoding_supports_f32("Amplitude")); assert!(super::encoding_supports_f32("AMPLITUDE")); - assert!(!super::encoding_supports_f32("angle")); - assert!(!super::encoding_supports_f32("basis")); + assert!(super::encoding_supports_f32("angle")); + assert!(super::encoding_supports_f32("basis")); assert!(!super::encoding_supports_f32("iqp")); } + + #[test] + fn test_compute_optimal_prefetch_depth_bounds() { + // Small qubit count → hits MAX_DEPTH cap (32) + let d = super::compute_optimal_prefetch_depth(4, 64, "amplitude", false); + assert_eq!(d, 32, "4 qubits/amplitude should hit max depth"); + + // angle/basis are tiny input → should also hit MAX_DEPTH + let d_angle = super::compute_optimal_prefetch_depth(16, 64, "angle", false); + assert_eq!( + d_angle, 32, + "angle encoding has small input, should hit max" + ); + + let d_basis = super::compute_optimal_prefetch_depth(16, 64, "basis", false); + assert_eq!( + d_basis, 32, + "basis encoding has 1-element input, should hit max" + ); + + // Large qubit count amplitude → depth should be ≥ 1 and ≤ 32 + let d_large = super::compute_optimal_prefetch_depth(20, 64, "amplitude", false); + assert!( + (1..=32).contains(&d_large), + "20 qubits depth out of range: {d_large}" + ); + // At 20 qubits, amplitude batch is ~512 MB — floor(256M/512M) = 0, clamped to 1 + assert_eq!(d_large, 1); + + // 16 qubits f64, bs=64: 65536*64*8 = 32 MB → 256M/32M = 8 + let d16 = super::compute_optimal_prefetch_depth(16, 64, "amplitude", false); + assert_eq!(d16, 8); + + // 16 qubits f32, bs=64: 65536*64*4 = 16 MB → 256M/16M = 16 + let d16_f32 = super::compute_optimal_prefetch_depth(16, 64, "amplitude", true); + assert_eq!(d16_f32, 16); + } + + #[test] + fn test_normalize_sets_prefetch_depth() { + // Default config has prefetch_depth=0; normalize() should compute it. + let mut config = PipelineConfig { + num_qubits: 16, + batch_size: 64, + encoding_method: "amplitude".to_string(), + ..Default::default() + }; + assert_eq!(config.prefetch_depth, 0, "default should be 0 (auto)"); + config.normalize(); + assert!( + config.prefetch_depth > 0, + "normalize() must set prefetch_depth > 0" + ); + } } diff --git a/qdp/qdp-core/src/reader.rs b/qdp/qdp-core/src/reader.rs index 5ea3f34867..fd36fa2314 100644 --- a/qdp/qdp-core/src/reader.rs +++ b/qdp/qdp-core/src/reader.rs @@ -96,6 +96,11 @@ pub fn handle_float64_nulls( /// /// This interface enables zero-copy streaming where possible and maintains /// memory efficiency for large datasets. +// +// Structural debt: this trait hard-codes `Vec` in `read_batch`, so +// `float32_pipeline=true` still forces every file-backed source to materialise +// as f64 before casting. A future refactor should parameterise over the element +// type (`T: FloatElem`) so f32 pipelines stay f32 end-to-end. pub trait DataReader { /// Read all data from the source. /// diff --git a/qdp/qdp-core/tests/gpu_angle_encoding.rs b/qdp/qdp-core/tests/gpu_angle_encoding.rs index 6b60d51537..c3d4fa99b6 100644 --- a/qdp/qdp-core/tests/gpu_angle_encoding.rs +++ b/qdp/qdp-core/tests/gpu_angle_encoding.rs @@ -273,3 +273,33 @@ fn test_angle_batch_f32_rejects_length_overflow() { _ => panic!("expected InvalidInput, got {:?}", result), } } + +/// Exercises the dual-stream async path by feeding a batch above the +/// `ASYNC_THRESHOLD_ELEMENTS` threshold (1 MB of f32 = 262144 angles). +/// Validates that large-batch f32 angle encoding returns a correctly-shaped +/// DLPack tensor via the async pipeline. +#[test] +fn test_angle_batch_f32_async_pipeline_path() { + let Some(engine) = common::qdp_engine_with_precision(qdp_core::Precision::Float32) else { + println!("SKIP: No GPU available"); + return; + }; + + let num_qubits = 4; + // 2 * 1024 * 1024 / 4 = 524288 angles, well above the 262144 threshold. + let num_samples = 2 * 1024 * 1024 / num_qubits; + let sample_size = num_qubits; + let data = vec![0.0_f32; num_samples * sample_size]; + + let dlpack_ptr = engine + .encode_batch_f32(&data, num_samples, sample_size, num_qubits, "angle") + .expect("angle batch f32 async path should succeed"); + + unsafe { + common::assert_dlpack_shape_2d_and_delete( + dlpack_ptr, + num_samples as i64, + (1 << num_qubits) as i64, + ); + } +} diff --git a/qdp/qdp-core/tests/gpu_fidelity.rs b/qdp/qdp-core/tests/gpu_fidelity.rs new file mode 100644 index 0000000000..cd38a5c97c --- /dev/null +++ b/qdp/qdp-core/tests/gpu_fidelity.rs @@ -0,0 +1,329 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Tests for fidelity / trace-distance metrics and F32 vs F64 precision +//! comparison across different qubit counts. + +#![cfg(target_os = "linux")] + +use approx::assert_relative_eq; +use qdp_core::gpu::metrics::{ + fidelity_cross_precision, fidelity_f32, fidelity_f64, trace_distance_f64, +}; + +mod common; + +// ═══════════════════════════════════════════════════════════════════════ +// Unit tests for the fidelity / trace-distance functions themselves +// ═══════════════════════════════════════════════════════════════════════ + +#[test] +fn test_fidelity_identical_states() { + // |0⟩ = (1+0i, 0+0i) interleaved: [1, 0, 0, 0] + let state = vec![1.0, 0.0, 0.0, 0.0]; + let f = fidelity_f64(&state, &state).unwrap(); + assert_relative_eq!(f, 1.0, epsilon = 1e-12); +} + +#[test] +fn test_fidelity_orthogonal_states() { + // |0⟩ and |1⟩ + let state_0 = vec![1.0, 0.0, 0.0, 0.0]; + let state_1 = vec![0.0, 0.0, 1.0, 0.0]; + let f = fidelity_f64(&state_0, &state_1).unwrap(); + assert_relative_eq!(f, 0.0, epsilon = 1e-12); +} + +#[test] +fn test_fidelity_superposition() { + // |+⟩ = 1/√2 (|0⟩ + |1⟩) + let inv_sqrt2 = 1.0_f64 / 2.0_f64.sqrt(); + let plus = vec![inv_sqrt2, 0.0, inv_sqrt2, 0.0]; + let f = fidelity_f64(&plus, &plus).unwrap(); + assert_relative_eq!(f, 1.0, epsilon = 1e-12); + + // ⟨+|0⟩ = 1/√2, fidelity = 0.5 + let zero = vec![1.0, 0.0, 0.0, 0.0]; + let f2 = fidelity_f64(&plus, &zero).unwrap(); + assert_relative_eq!(f2, 0.5, epsilon = 1e-12); +} + +#[test] +fn test_fidelity_global_phase() { + // |ψ⟩ = (0+i) * |0⟩ = [0, 1, 0, 0] (global phase e^{iπ/2}) + let state_a = vec![1.0, 0.0, 0.0, 0.0]; + let state_b = vec![0.0, 1.0, 0.0, 0.0]; + let f = fidelity_f64(&state_a, &state_b).unwrap(); + // |⟨ψ|φ⟩|² = |⟨0| (i|0⟩)|² = |i|² = 1 + assert_relative_eq!(f, 1.0, epsilon = 1e-12); +} + +#[test] +fn test_trace_distance_identical() { + let state = vec![1.0, 0.0, 0.0, 0.0]; + let td = trace_distance_f64(&state, &state).unwrap(); + assert_relative_eq!(td, 0.0, epsilon = 1e-12); +} + +#[test] +fn test_trace_distance_orthogonal() { + let state_0 = vec![1.0, 0.0, 0.0, 0.0]; + let state_1 = vec![0.0, 0.0, 1.0, 0.0]; + let td = trace_distance_f64(&state_0, &state_1).unwrap(); + assert_relative_eq!(td, 1.0, epsilon = 1e-12); +} + +#[test] +fn test_fidelity_f32_basic() { + let state: Vec = vec![1.0, 0.0, 0.0, 0.0]; + let f = fidelity_f32(&state, &state).unwrap(); + assert_relative_eq!(f, 1.0, epsilon = 1e-6); +} + +#[test] +fn test_fidelity_cross_precision_basic() { + let state_f32: Vec = vec![1.0, 0.0, 0.0, 0.0]; + let state_f64: Vec = vec![1.0, 0.0, 0.0, 0.0]; + let f = fidelity_cross_precision(&state_f32, &state_f64).unwrap(); + assert_relative_eq!(f, 1.0, epsilon = 1e-6); +} + +#[test] +fn test_fidelity_length_mismatch_errors() { + let a = vec![1.0, 0.0]; + let b = vec![1.0, 0.0, 0.0, 0.0]; + assert!(fidelity_f64(&a, &b).is_err()); +} + +#[test] +fn test_fidelity_odd_length_errors() { + let a = vec![1.0, 0.0, 0.5]; + assert!(fidelity_f64(&a, &a).is_err()); +} + +// ═══════════════════════════════════════════════════════════════════════ +// GPU-backed F32 vs F64 precision comparison tests +// ═══════════════════════════════════════════════════════════════════════ + +use qdp_core::Precision; +use qdp_core::gpu::metrics::{download_complex_f32, download_complex_f64}; + +/// Encode the same data at F32 and F64, download both, compute cross-precision fidelity. +fn compare_f32_f64_amplitude(num_qubits: usize) -> Option { + let engine_f64 = common::qdp_engine_with_precision(Precision::Float64)?; + let engine_f32 = common::qdp_engine_with_precision(Precision::Float32)?; + let device = common::cuda_device()?; + + let state_dim = 1usize << num_qubits; + let num_samples = 4; + let sample_size = state_dim; + + // Build deterministic test data + let data_f64: Vec = (0..num_samples * sample_size) + .map(|i| ((i as f64) + 1.0) / (num_samples * sample_size) as f64) + .collect(); + let data_f32: Vec = data_f64.iter().map(|&x| x as f32).collect(); + + // Encode at F64 + let dlpack_f64 = engine_f64 + .encode_batch(&data_f64, num_samples, sample_size, num_qubits, "amplitude") + .expect("F64 encode_batch should succeed"); + + // Encode at F32 + let dlpack_f32 = engine_f32 + .encode_batch_f32(&data_f32, num_samples, sample_size, num_qubits, "amplitude") + .expect("F32 encode_batch should succeed"); + + // Read back from GPU + let f64_tensor = unsafe { &(*dlpack_f64).dl_tensor }; + let f32_tensor = unsafe { &(*dlpack_f32).dl_tensor }; + + let total_elements = num_samples * state_dim; + let host_f64 = + download_complex_f64(&device, f64_tensor.data as *const _, total_elements).unwrap(); + let host_f32 = + download_complex_f32(&device, f32_tensor.data as *const _, total_elements).unwrap(); + + // Compute per-sample fidelity, return minimum + let mut min_fidelity = 1.0_f64; + for s in 0..num_samples { + let offset_f64 = s * state_dim * 2; + let offset_f32 = s * state_dim * 2; + let sample_f64 = &host_f64[offset_f64..offset_f64 + state_dim * 2]; + let sample_f32 = &host_f32[offset_f32..offset_f32 + state_dim * 2]; + let f = fidelity_cross_precision(sample_f32, sample_f64).unwrap(); + if f < min_fidelity { + min_fidelity = f; + } + } + + // Clean up DLPack + unsafe { + common::take_deleter_and_delete(dlpack_f64); + common::take_deleter_and_delete(dlpack_f32); + } + + Some(min_fidelity) +} + +#[test] +fn test_f32_vs_f64_amplitude_8_qubits() { + let Some(fidelity) = compare_f32_f64_amplitude(8) else { + println!("SKIP: No GPU available"); + return; + }; + println!("F32 vs F64 fidelity @ 8 qubits: {:.10}", fidelity); + assert!( + fidelity > 1.0 - 1e-3, + "Fidelity too low at 8 qubits: {fidelity}" + ); +} + +#[test] +fn test_f32_vs_f64_amplitude_12_qubits() { + let Some(fidelity) = compare_f32_f64_amplitude(12) else { + println!("SKIP: No GPU available"); + return; + }; + println!("F32 vs F64 fidelity @ 12 qubits: {:.10}", fidelity); + assert!( + fidelity > 1.0 - 1e-3, + "Fidelity too low at 12 qubits: {fidelity}" + ); +} + +#[test] +fn test_f32_vs_f64_amplitude_16_qubits() { + let Some(fidelity) = compare_f32_f64_amplitude(16) else { + println!("SKIP: No GPU available"); + return; + }; + println!("F32 vs F64 fidelity @ 16 qubits: {:.10}", fidelity); + assert!( + fidelity > 1.0 - 1e-3, + "Fidelity too low at 16 qubits: {fidelity}" + ); +} + +#[test] +fn test_f32_vs_f64_amplitude_20_qubits() { + let Some(fidelity) = compare_f32_f64_amplitude(20) else { + println!("SKIP: No GPU available"); + return; + }; + println!("F32 vs F64 fidelity @ 20 qubits: {:.10}", fidelity); + // At 20 qubits (1M elements), F32 norm precision degrades. + // We use a relaxed threshold here to document the baseline. + assert!( + fidelity > 1.0 - 1e-2, + "Fidelity too low at 20 qubits: {fidelity}" + ); +} + +// ═════════════════════════════════════════════════════════════════════════ +// Angle encoding: F32 vs F64 fidelity +// ═══════════════════════════════════════════════════════════════════════ + +fn compare_f32_f64_angle(num_qubits: usize) -> Option { + let engine_f64 = common::qdp_engine_with_precision(Precision::Float64)?; + let engine_f32 = common::qdp_engine_with_precision(Precision::Float32)?; + let device = common::cuda_device()?; + + let state_dim = 1usize << num_qubits; + let num_samples = 4; + let sample_size = num_qubits; // angle encoding: one angle per qubit + + // Build deterministic angle data + let data_f64: Vec = (0..num_samples * sample_size) + .map(|i| (i as f64) * std::f64::consts::PI / (num_samples * sample_size) as f64) + .collect(); + let data_f32: Vec = data_f64.iter().map(|&x| x as f32).collect(); + + let dlpack_f64 = engine_f64 + .encode_batch(&data_f64, num_samples, sample_size, num_qubits, "angle") + .expect("F64 angle encode_batch should succeed"); + + let dlpack_f32 = engine_f32 + .encode_batch_f32(&data_f32, num_samples, sample_size, num_qubits, "angle") + .expect("F32 angle encode_batch should succeed"); + + let f64_tensor = unsafe { &(*dlpack_f64).dl_tensor }; + let f32_tensor = unsafe { &(*dlpack_f32).dl_tensor }; + + let total_elements = num_samples * state_dim; + let host_f64 = + download_complex_f64(&device, f64_tensor.data as *const _, total_elements).unwrap(); + let host_f32 = + download_complex_f32(&device, f32_tensor.data as *const _, total_elements).unwrap(); + + let mut min_fidelity = 1.0_f64; + for s in 0..num_samples { + let offset_f64 = s * state_dim * 2; + let offset_f32 = s * state_dim * 2; + let sample_f64 = &host_f64[offset_f64..offset_f64 + state_dim * 2]; + let sample_f32 = &host_f32[offset_f32..offset_f32 + state_dim * 2]; + let f = fidelity_cross_precision(sample_f32, sample_f64).unwrap(); + if f < min_fidelity { + min_fidelity = f; + } + } + + unsafe { + common::take_deleter_and_delete(dlpack_f64); + common::take_deleter_and_delete(dlpack_f32); + } + + Some(min_fidelity) +} + +#[test] +fn test_f32_vs_f64_angle_8_qubits() { + let Some(fidelity) = compare_f32_f64_angle(8) else { + println!("SKIP: No GPU available"); + return; + }; + println!("Angle F32 vs F64 fidelity @ 8 qubits: {:.10}", fidelity); + assert!( + fidelity > 1.0 - 1e-3, + "Angle fidelity too low at 8 qubits: {fidelity}" + ); +} + +#[test] +fn test_f32_vs_f64_angle_12_qubits() { + let Some(fidelity) = compare_f32_f64_angle(12) else { + println!("SKIP: No GPU available"); + return; + }; + println!("Angle F32 vs F64 fidelity @ 12 qubits: {:.10}", fidelity); + assert!( + fidelity > 1.0 - 1e-3, + "Angle fidelity too low at 12 qubits: {fidelity}" + ); +} + +#[test] +fn test_f32_vs_f64_angle_16_qubits() { + let Some(fidelity) = compare_f32_f64_angle(16) else { + println!("SKIP: No GPU available"); + return; + }; + println!("Angle F32 vs F64 fidelity @ 16 qubits: {:.10}", fidelity); + assert!( + fidelity > 1.0 - 1e-3, + "Angle fidelity too low at 16 qubits: {fidelity}" + ); +} diff --git a/qdp/qdp-core/tests/gpu_ptr_encoding.rs b/qdp/qdp-core/tests/gpu_ptr_encoding.rs index 88c65e752d..68ee26bed4 100644 --- a/qdp/qdp-core/tests/gpu_ptr_encoding.rs +++ b/qdp/qdp-core/tests/gpu_ptr_encoding.rs @@ -1526,3 +1526,228 @@ fn test_encode_angle_batch_from_gpu_ptr_f32_success_f64_engine() { }; unsafe { common::assert_dlpack_shape_2d_and_delete(dlpack_ptr, 2, 8) }; } + +// ── Basis f32 batch from GPU pointer ──────────────────────────────────── + +#[test] +fn test_encode_basis_batch_from_gpu_ptr_f32_success() { + let engine = match engine_f32() { + Some(e) => e, + None => { + println!("SKIP: No GPU"); + return; + } + }; + let num_samples = 3; + let num_qubits = 3; + let (_device, input_d) = match common::copy_f32_to_device(&[0.0_f32, 3.0_f32, 7.0_f32]) { + Some(t) => t, + None => { + println!("SKIP: No CUDA device"); + return; + } + }; + let dlpack_ptr = unsafe { + engine + .encode_basis_batch_from_gpu_ptr_f32( + *input_d.device_ptr() as *const f32, + num_samples, + 1, + num_qubits, + ) + .expect("encode_basis_batch_from_gpu_ptr_f32") + }; + unsafe { common::assert_dlpack_shape_2d_and_delete(dlpack_ptr, num_samples as i64, 8) }; +} + +#[test] +fn test_encode_basis_batch_from_gpu_ptr_f32_with_stream_success() { + let engine = match engine_f32() { + Some(e) => e, + None => { + println!("SKIP: No GPU"); + return; + } + }; + let (device, input_d) = match common::copy_f32_to_device(&[1.0_f32, 2.0_f32]) { + Some(t) => t, + None => { + println!("SKIP: No CUDA device"); + return; + } + }; + let stream = device.fork_default_stream().expect("fork_default_stream"); + let dlpack_ptr = unsafe { + engine + .encode_basis_batch_from_gpu_ptr_f32_with_stream( + *input_d.device_ptr() as *const f32, + 2, + 1, + 2, + stream.stream as *mut c_void, + ) + .expect("encode_basis_batch_from_gpu_ptr_f32_with_stream") + }; + unsafe { common::assert_dlpack_shape_2d_and_delete(dlpack_ptr, 2, 4) }; +} + +#[test] +fn test_encode_basis_batch_from_gpu_ptr_f32_null_pointer() { + let Some(engine) = engine_f32() else { + println!("SKIP: No GPU"); + return; + }; + let result = unsafe { engine.encode_basis_batch_from_gpu_ptr_f32(std::ptr::null(), 2, 1, 2) }; + assert!(result.is_err()); +} + +#[test] +fn test_encode_basis_batch_from_gpu_ptr_f32_zero_samples() { + let Some(engine) = engine_f32() else { + println!("SKIP: No GPU"); + return; + }; + let result = unsafe { engine.encode_basis_batch_from_gpu_ptr_f32(std::ptr::null(), 0, 1, 2) }; + assert!(result.is_err()); + match result.unwrap_err() { + MahoutError::InvalidInput(msg) => assert!(msg.contains("samples"), "msg: {msg}"), + e => panic!("expected InvalidInput, got {:?}", e), + } +} + +#[test] +fn test_encode_basis_batch_from_gpu_ptr_f32_sample_size_mismatch() { + let Some(engine) = engine_f32() else { + println!("SKIP: No GPU"); + return; + }; + let (_device, input_d) = match common::copy_f32_to_device(&[0.0_f32, 1.0_f32]) { + Some(t) => t, + None => { + println!("SKIP: No CUDA device"); + return; + } + }; + let result = unsafe { + engine.encode_basis_batch_from_gpu_ptr_f32(*input_d.device_ptr() as *const f32, 2, 2, 2) + }; + assert!(result.is_err()); + match result.unwrap_err() { + MahoutError::InvalidInput(msg) => assert!(msg.contains("sample_size=1"), "msg: {msg}"), + e => panic!("expected InvalidInput, got {:?}", e), + } +} + +#[test] +fn test_encode_basis_batch_from_gpu_ptr_f32_non_finite_rejected() { + let Some(engine) = engine_f32() else { + println!("SKIP: No GPU"); + return; + }; + let (_device, input_d) = match common::copy_f32_to_device(&[0.0_f32, f32::NAN]) { + Some(t) => t, + None => { + println!("SKIP: No CUDA device"); + return; + } + }; + let result = unsafe { + engine.encode_basis_batch_from_gpu_ptr_f32(*input_d.device_ptr() as *const f32, 2, 1, 2) + }; + assert!(result.is_err()); + match result.unwrap_err() { + MahoutError::InvalidInput(msg) => assert!(msg.contains("non-finite"), "msg: {msg}"), + e => panic!("expected InvalidInput, got {:?}", e), + } +} + +#[test] +fn test_encode_basis_batch_from_gpu_ptr_f32_out_of_range_rejected() { + let Some(engine) = engine_f32() else { + println!("SKIP: No GPU"); + return; + }; + // state_len = 2^2 = 4; index 10 is out of range + let (_device, input_d) = match common::copy_f32_to_device(&[0.0_f32, 10.0_f32]) { + Some(t) => t, + None => { + println!("SKIP: No CUDA device"); + return; + } + }; + let result = unsafe { + engine.encode_basis_batch_from_gpu_ptr_f32(*input_d.device_ptr() as *const f32, 2, 1, 2) + }; + assert!(result.is_err()); + match result.unwrap_err() { + MahoutError::InvalidInput(msg) => assert!(msg.contains("out of range"), "msg: {msg}"), + e => panic!("expected InvalidInput, got {:?}", e), + } +} + +#[test] +fn test_encode_basis_batch_from_gpu_ptr_f32_non_integer_rejected() { + let Some(engine) = engine_f32() else { + println!("SKIP: No GPU"); + return; + }; + let (_device, input_d) = match common::copy_f32_to_device(&[0.0_f32, 1.5_f32]) { + Some(t) => t, + None => { + println!("SKIP: No CUDA device"); + return; + } + }; + let result = unsafe { + engine.encode_basis_batch_from_gpu_ptr_f32(*input_d.device_ptr() as *const f32, 2, 1, 2) + }; + assert!(result.is_err()); + match result.unwrap_err() { + MahoutError::InvalidInput(msg) => assert!(msg.contains("non-integer"), "msg: {msg}"), + e => panic!("expected InvalidInput, got {:?}", e), + } +} + +#[test] +fn test_encode_basis_batch_from_gpu_ptr_f32_negative_rejected() { + let Some(engine) = engine_f32() else { + println!("SKIP: No GPU"); + return; + }; + let (_device, input_d) = match common::copy_f32_to_device(&[0.0_f32, -1.0_f32]) { + Some(t) => t, + None => { + println!("SKIP: No CUDA device"); + return; + } + }; + let result = unsafe { + engine.encode_basis_batch_from_gpu_ptr_f32(*input_d.device_ptr() as *const f32, 2, 1, 2) + }; + assert!(result.is_err()); + match result.unwrap_err() { + MahoutError::InvalidInput(msg) => assert!(msg.contains("negative"), "msg: {msg}"), + e => panic!("expected InvalidInput, got {:?}", e), + } +} + +#[test] +fn test_encode_basis_from_gpu_ptr_f32_single_sample_success() { + let Some(engine) = engine_f32() else { + println!("SKIP: No GPU"); + return; + }; + let (_device, input_d) = match common::copy_f32_to_device(&[5.0_f32]) { + Some(t) => t, + None => { + println!("SKIP: No CUDA device"); + return; + } + }; + let dlpack_ptr = unsafe { + engine + .encode_basis_from_gpu_ptr_f32(*input_d.device_ptr() as *const f32, 3) + .expect("encode_basis_from_gpu_ptr_f32") + }; + unsafe { common::assert_dlpack_shape_2d_and_delete(dlpack_ptr, 1, 8) }; +} diff --git a/qdp/qdp-kernels/src/basis.cu b/qdp/qdp-kernels/src/basis.cu index 63a2d12824..0698922f8a 100644 --- a/qdp/qdp-kernels/src/basis.cu +++ b/qdp/qdp-kernels/src/basis.cu @@ -80,6 +80,62 @@ __global__ void basis_encode_batch_kernel( } } +/// Single sample basis encoding kernel (F32) +/// +/// Sets state[basis_index] = 1.0 + 0.0i, all others = 0.0 + 0.0i +__global__ void basis_encode_kernel_f32( + size_t basis_index, + cuComplex* __restrict__ state, + size_t state_len +) { + size_t idx = blockIdx.x * blockDim.x + threadIdx.x; + if (idx >= state_len) return; + + if (idx == basis_index) { + state[idx] = make_cuComplex(1.0f, 0.0f); + } else { + state[idx] = make_cuComplex(0.0f, 0.0f); + } +} + +/// Batch basis encoding kernel (F32) +/// +/// Each sample has its own basis index, resulting in independent basis states. +/// Memory layout: +/// - basis_indices: [idx0, idx1, ..., idxN] +/// - state_batch: [sample0_state | sample1_state | ... | sampleN_state] +__global__ void basis_encode_batch_kernel_f32( + const size_t* __restrict__ basis_indices, + cuComplex* __restrict__ state_batch, + size_t num_samples, + size_t state_len, + unsigned int num_qubits +) { + // Grid-stride loop over all elements across all samples + const size_t total_elements = num_samples * state_len; + const size_t stride = gridDim.x * blockDim.x; + const size_t state_mask = state_len - 1; + + for (size_t global_idx = blockIdx.x * blockDim.x + threadIdx.x; + global_idx < total_elements; + global_idx += stride) { + // Decompose into (sample_idx, element_idx) + // state_len = 2^num_qubits, so division/modulo can use shift/mask + const size_t sample_idx = global_idx >> num_qubits; + const size_t element_idx = global_idx & state_mask; + + // Get basis index for this sample + const size_t basis_index = basis_indices[sample_idx]; + + // Set amplitude: 1.0 at basis_index, 0.0 elsewhere + if (element_idx == basis_index) { + state_batch[global_idx] = make_cuComplex(1.0f, 0.0f); + } else { + state_batch[global_idx] = make_cuComplex(0.0f, 0.0f); + } + } +} + extern "C" { /// Launch basis encoding kernel @@ -163,4 +219,85 @@ int launch_basis_encode_batch( return (int)cudaGetLastError(); } +/// Launch basis encoding kernel (F32) +/// +/// # Arguments +/// * basis_index - The computational basis state index (0 to state_len-1) +/// * state_d - Device pointer to output state vector +/// * state_len - Target state vector size (2^num_qubits) +/// * stream - CUDA stream for async execution (nullptr = default stream) +/// +/// # Returns +/// CUDA error code (0 = cudaSuccess) +int launch_basis_encode_f32( + size_t basis_index, + void* state_d, + size_t state_len, + cudaStream_t stream +) { + if (state_len == 0) { + return cudaErrorInvalidValue; + } + + if (basis_index >= state_len) { + return cudaErrorInvalidValue; + } + + cuComplex* state_complex_d = static_cast(state_d); + + const int blockSize = DEFAULT_BLOCK_SIZE; + const int gridSize = (state_len + blockSize - 1) / blockSize; + + basis_encode_kernel_f32<<>>( + basis_index, + state_complex_d, + state_len + ); + + return (int)cudaGetLastError(); +} + +/// Launch batch basis encoding kernel (F32) +/// +/// # Arguments +/// * basis_indices_d - Device pointer to array of basis indices (one per sample) +/// * state_batch_d - Device pointer to output batch state vectors +/// * num_samples - Number of samples in batch +/// * state_len - State vector size per sample (2^num_qubits) +/// * num_qubits - Number of qubits (for bit-shift optimization) +/// * stream - CUDA stream for async execution +/// +/// # Returns +/// CUDA error code (0 = cudaSuccess) +int launch_basis_encode_batch_f32( + const size_t* basis_indices_d, + void* state_batch_d, + size_t num_samples, + size_t state_len, + unsigned int num_qubits, + cudaStream_t stream +) { + if (num_samples == 0 || state_len == 0) { + return cudaErrorInvalidValue; + } + + cuComplex* state_complex_d = static_cast(state_batch_d); + + const int blockSize = DEFAULT_BLOCK_SIZE; + const size_t total_elements = num_samples * state_len; + const size_t blocks_needed = (total_elements + blockSize - 1) / blockSize; + const size_t max_blocks = MAX_GRID_BLOCKS; + const size_t gridSize = (blocks_needed < max_blocks) ? blocks_needed : max_blocks; + + basis_encode_batch_kernel_f32<<>>( + basis_indices_d, + state_complex_d, + num_samples, + state_len, + num_qubits + ); + + return (int)cudaGetLastError(); +} + } // extern "C" diff --git a/qdp/qdp-kernels/src/lib.rs b/qdp/qdp-kernels/src/lib.rs index 271039fb12..9a1f832d85 100644 --- a/qdp/qdp-kernels/src/lib.rs +++ b/qdp/qdp-kernels/src/lib.rs @@ -187,12 +187,12 @@ unsafe extern "C" { stream: *mut c_void, ) -> i32; - /// Launch basis encoding kernel + /// Launch basis encoding kernel. /// Maps an integer index to a computational basis state. /// Returns CUDA error code (0 = success) /// /// # Safety - /// Requires valid GPU pointer, must sync before freeing + /// Requires valid GPU pointer; must sync before freeing. pub fn launch_basis_encode( basis_index: usize, state_d: *mut c_void, @@ -200,11 +200,11 @@ unsafe extern "C" { stream: *mut c_void, ) -> i32; - /// Launch batch basis encoding kernel + /// Launch batch basis encoding kernel. /// Returns CUDA error code (0 = success) /// /// # Safety - /// Requires valid GPU pointers, must sync before freeing + /// Requires valid GPU pointers; must sync before freeing. pub fn launch_basis_encode_batch( basis_indices_d: *const usize, state_batch_d: *mut c_void, @@ -214,37 +214,51 @@ unsafe extern "C" { stream: *mut c_void, ) -> i32; - /// Launch angle encoding kernel + /// Launch basis encoding kernel (float32). + /// Maps an integer index to a computational basis state. /// Returns CUDA error code (0 = success) /// /// # Safety - /// Requires valid GPU pointers, must sync before freeing - pub fn launch_angle_encode( - angles_d: *const f64, + /// Requires valid GPU pointer; must sync before freeing. + pub fn launch_basis_encode_f32( + basis_index: usize, state_d: *mut c_void, state_len: usize, + stream: *mut c_void, + ) -> i32; + + /// Launch batch basis encoding kernel (float32). + /// Returns CUDA error code (0 = success) + /// + /// # Safety + /// Requires valid GPU pointers; must sync before freeing. + pub fn launch_basis_encode_batch_f32( + basis_indices_d: *const usize, + state_batch_d: *mut c_void, + num_samples: usize, + state_len: usize, num_qubits: u32, stream: *mut c_void, ) -> i32; - /// Launch angle encoding kernel for float32 inputs. + /// Launch angle encoding kernel. /// Returns CUDA error code (0 = success) /// /// # Safety - /// Requires valid GPU pointers, must sync before freeing - pub fn launch_angle_encode_f32( - angles_d: *const f32, + /// Requires valid GPU pointers; must sync before freeing. + pub fn launch_angle_encode( + angles_d: *const f64, state_d: *mut c_void, state_len: usize, num_qubits: u32, stream: *mut c_void, ) -> i32; - /// Launch batch angle encoding kernel + /// Launch batch angle encoding kernel. /// Returns CUDA error code (0 = success) /// /// # Safety - /// Requires valid GPU pointers, must sync before freeing + /// Requires valid GPU pointers; must sync before freeing. pub fn launch_angle_encode_batch( angles_batch_d: *const f64, state_batch_d: *mut c_void, @@ -254,11 +268,24 @@ unsafe extern "C" { stream: *mut c_void, ) -> i32; - /// Launch batch angle encoding kernel for float32 inputs. + /// Launch angle encoding kernel (float32). /// Returns CUDA error code (0 = success) /// /// # Safety - /// Requires valid GPU pointers, must sync before freeing + /// Requires valid GPU pointers; must sync before freeing. + pub fn launch_angle_encode_f32( + angles_d: *const f32, + state_d: *mut c_void, + state_len: usize, + num_qubits: u32, + stream: *mut c_void, + ) -> i32; + + /// Launch batch angle encoding kernel (float32). + /// Returns CUDA error code (0 = success) + /// + /// # Safety + /// Requires valid GPU pointers; must sync before freeing. pub fn launch_angle_encode_batch_f32( angles_batch_d: *const f32, state_batch_d: *mut c_void, @@ -280,6 +307,45 @@ unsafe extern "C" { stream: *mut c_void, ) -> i32; + /// Launch float64 batch finite-value validation. + /// Returns CUDA error code (0 = success) + /// + /// # Safety + /// Requires valid GPU pointers, must sync before reading the output flag. + pub fn launch_check_finite_batch_f64( + input_batch_d: *const f64, + total_values: usize, + has_non_finite_d: *mut i32, + stream: *mut c_void, + ) -> i32; + + /// Launch combined validation + cast for float32 basis indices. + /// `error_flags_d` receives a bitmask (0 = all indices valid); `indices_out_d` + /// receives the truncated `size_t` indices (0 for invalid slots). + /// + /// # Safety + /// Requires valid GPU pointers, must sync before reading the output flag. + pub fn launch_validate_and_cast_basis_indices_f32( + input_batch_d: *const f32, + num_samples: usize, + state_len: usize, + indices_out_d: *mut usize, + error_flags_d: *mut i32, + stream: *mut c_void, + ) -> i32; + + /// Launch bounds-check for an existing `size_t` basis-index buffer. + /// + /// # Safety + /// Requires valid GPU pointers, must sync before reading the output flag. + pub fn launch_check_basis_indices_usize( + indices_d: *const usize, + num_samples: usize, + state_len: usize, + error_flags_d: *mut i32, + stream: *mut c_void, + ) -> i32; + /// Launch IQP encoding kernel /// Returns CUDA error code (0 = success) /// @@ -487,10 +553,22 @@ pub extern "C" fn launch_basis_encode_batch( #[cfg(any(not(target_os = "linux"), qdp_no_cuda))] #[unsafe(no_mangle)] -pub extern "C" fn launch_angle_encode( - _angles_d: *const f64, +pub extern "C" fn launch_basis_encode_f32( + _basis_index: usize, _state_d: *mut c_void, _state_len: usize, + _stream: *mut c_void, +) -> i32 { + 999 +} + +#[cfg(any(not(target_os = "linux"), qdp_no_cuda))] +#[unsafe(no_mangle)] +pub extern "C" fn launch_basis_encode_batch_f32( + _basis_indices_d: *const usize, + _state_batch_d: *mut c_void, + _num_samples: usize, + _state_len: usize, _num_qubits: u32, _stream: *mut c_void, ) -> i32 { @@ -499,8 +577,8 @@ pub extern "C" fn launch_angle_encode( #[cfg(any(not(target_os = "linux"), qdp_no_cuda))] #[unsafe(no_mangle)] -pub extern "C" fn launch_angle_encode_f32( - _angles_d: *const f32, +pub extern "C" fn launch_angle_encode( + _angles_d: *const f64, _state_d: *mut c_void, _state_len: usize, _num_qubits: u32, @@ -522,6 +600,18 @@ pub extern "C" fn launch_angle_encode_batch( 999 } +#[cfg(any(not(target_os = "linux"), qdp_no_cuda))] +#[unsafe(no_mangle)] +pub extern "C" fn launch_angle_encode_f32( + _angles_d: *const f32, + _state_d: *mut c_void, + _state_len: usize, + _num_qubits: u32, + _stream: *mut c_void, +) -> i32 { + 999 +} + #[cfg(any(not(target_os = "linux"), qdp_no_cuda))] #[unsafe(no_mangle)] pub extern "C" fn launch_angle_encode_batch_f32( @@ -546,6 +636,42 @@ pub extern "C" fn launch_check_finite_batch_f32( 999 } +#[cfg(any(not(target_os = "linux"), qdp_no_cuda))] +#[unsafe(no_mangle)] +pub extern "C" fn launch_check_finite_batch_f64( + _input_batch_d: *const f64, + _total_values: usize, + _has_non_finite_d: *mut i32, + _stream: *mut c_void, +) -> i32 { + 999 +} + +#[cfg(any(not(target_os = "linux"), qdp_no_cuda))] +#[unsafe(no_mangle)] +pub extern "C" fn launch_validate_and_cast_basis_indices_f32( + _input_batch_d: *const f32, + _num_samples: usize, + _state_len: usize, + _indices_out_d: *mut usize, + _error_flags_d: *mut i32, + _stream: *mut c_void, +) -> i32 { + 999 +} + +#[cfg(any(not(target_os = "linux"), qdp_no_cuda))] +#[unsafe(no_mangle)] +pub extern "C" fn launch_check_basis_indices_usize( + _indices_d: *const usize, + _num_samples: usize, + _state_len: usize, + _error_flags_d: *mut i32, + _stream: *mut c_void, +) -> i32 { + 999 +} + #[cfg(any(not(target_os = "linux"), qdp_no_cuda))] #[unsafe(no_mangle)] pub extern "C" fn launch_iqp_encode( diff --git a/qdp/qdp-kernels/src/validation.cu b/qdp/qdp-kernels/src/validation.cu index e38b65665d..7d19af1d00 100644 --- a/qdp/qdp-kernels/src/validation.cu +++ b/qdp/qdp-kernels/src/validation.cu @@ -36,6 +36,87 @@ __global__ void check_finite_batch_kernel_f32( } } +__global__ void check_finite_batch_kernel_f64( + const double* __restrict__ input_batch, + size_t total_values, + int* __restrict__ has_non_finite +) { + const size_t stride = gridDim.x * blockDim.x; + for (size_t idx = blockIdx.x * blockDim.x + threadIdx.x; + idx < total_values; + idx += stride) { + if (!isfinite(input_batch[idx])) { + atomicExch(has_non_finite, 1); + return; + } + } +} + +// Validation error flags for basis indices (bitmask, written via atomicOr). +// 0 means valid. +#define BASIS_IDX_ERR_NON_FINITE 0x1 +#define BASIS_IDX_ERR_NEGATIVE 0x2 +#define BASIS_IDX_ERR_NON_INTEGER 0x4 +#define BASIS_IDX_ERR_OUT_OF_RANGE 0x8 + +// Validate f32 basis indices and cast them to size_t in a single pass. +// `indices_out` receives the truncated indices (set to 0 when the sample is +// invalid to keep the downstream encode kernel bounded). +__global__ void validate_and_cast_basis_indices_kernel_f32( + const float* __restrict__ input_batch, + size_t num_samples, + size_t state_len, + size_t* __restrict__ indices_out, + int* __restrict__ error_flags +) { + const size_t stride = gridDim.x * blockDim.x; + for (size_t idx = blockIdx.x * blockDim.x + threadIdx.x; + idx < num_samples; + idx += stride) { + const float v = input_batch[idx]; + if (!isfinite(v)) { + atomicOr(error_flags, BASIS_IDX_ERR_NON_FINITE); + indices_out[idx] = 0; + continue; + } + if (v < 0.0f) { + atomicOr(error_flags, BASIS_IDX_ERR_NEGATIVE); + indices_out[idx] = 0; + continue; + } + const float truncated = truncf(v); + if (truncated != v) { + atomicOr(error_flags, BASIS_IDX_ERR_NON_INTEGER); + indices_out[idx] = 0; + continue; + } + if ((double)truncated >= (double)state_len) { + atomicOr(error_flags, BASIS_IDX_ERR_OUT_OF_RANGE); + indices_out[idx] = 0; + continue; + } + indices_out[idx] = (size_t)truncated; + } +} + +// Bounds-check existing size_t basis indices against state_len. +__global__ void check_basis_indices_kernel_usize( + const size_t* __restrict__ indices, + size_t num_samples, + size_t state_len, + int* __restrict__ error_flags +) { + const size_t stride = gridDim.x * blockDim.x; + for (size_t idx = blockIdx.x * blockDim.x + threadIdx.x; + idx < num_samples; + idx += stride) { + if (indices[idx] >= state_len) { + atomicOr(error_flags, BASIS_IDX_ERR_OUT_OF_RANGE); + return; + } + } +} + extern "C" { /// Launch batch finite-value validation for float32 input. @@ -79,4 +160,125 @@ int launch_check_finite_batch_f32( return (int)cudaGetLastError(); } +/// Launch batch finite-value validation for float64 input. +/// +/// Writes 1 to `has_non_finite_d` if any NaN/Inf is found, else leaves it at 0. +int launch_check_finite_batch_f64( + const double* input_batch_d, + size_t total_values, + int* has_non_finite_d, + cudaStream_t stream +) { + if (total_values == 0 || has_non_finite_d == nullptr) { + return cudaErrorInvalidValue; + } + + cudaError_t memset_status = cudaMemsetAsync( + has_non_finite_d, + 0, + sizeof(int), + stream + ); + if (memset_status != cudaSuccess) { + return memset_status; + } + + const int blockSize = DEFAULT_BLOCK_SIZE; + size_t gridSize = (total_values + blockSize - 1) / blockSize; + if (gridSize == 0) { + gridSize = 1; + } + if (gridSize > MAX_GRID_BLOCKS) { + gridSize = MAX_GRID_BLOCKS; + } + + check_finite_batch_kernel_f64<<>>( + input_batch_d, + total_values, + has_non_finite_d + ); + + return (int)cudaGetLastError(); +} + +/// Launch combined validation + cast for float32 basis indices. +/// +/// Writes a bitmask into `error_flags_d` (0 = all valid) and casts each valid +/// sample into `indices_out_d` (a `size_t` per sample). Invalid samples have +/// their output slot zeroed so the downstream encode kernel is bounded even +/// if the caller ignores the error flag. +int launch_validate_and_cast_basis_indices_f32( + const float* input_batch_d, + size_t num_samples, + size_t state_len, + size_t* indices_out_d, + int* error_flags_d, + cudaStream_t stream +) { + if (num_samples == 0 || state_len == 0 + || error_flags_d == nullptr || indices_out_d == nullptr) { + return cudaErrorInvalidValue; + } + + cudaError_t memset_status = cudaMemsetAsync(error_flags_d, 0, sizeof(int), stream); + if (memset_status != cudaSuccess) { + return memset_status; + } + + const int blockSize = DEFAULT_BLOCK_SIZE; + size_t gridSize = (num_samples + blockSize - 1) / blockSize; + if (gridSize == 0) { + gridSize = 1; + } + if (gridSize > MAX_GRID_BLOCKS) { + gridSize = MAX_GRID_BLOCKS; + } + + validate_and_cast_basis_indices_kernel_f32<<>>( + input_batch_d, + num_samples, + state_len, + indices_out_d, + error_flags_d + ); + + return (int)cudaGetLastError(); +} + +/// Launch bounds-check for size_t basis indices. +int launch_check_basis_indices_usize( + const size_t* indices_d, + size_t num_samples, + size_t state_len, + int* error_flags_d, + cudaStream_t stream +) { + if (num_samples == 0 || state_len == 0 || error_flags_d == nullptr) { + return cudaErrorInvalidValue; + } + + cudaError_t memset_status = cudaMemsetAsync(error_flags_d, 0, sizeof(int), stream); + if (memset_status != cudaSuccess) { + return memset_status; + } + + const int blockSize = DEFAULT_BLOCK_SIZE; + size_t gridSize = (num_samples + blockSize - 1) / blockSize; + if (gridSize == 0) { + gridSize = 1; + } + if (gridSize > MAX_GRID_BLOCKS) { + gridSize = MAX_GRID_BLOCKS; + } + + check_basis_indices_kernel_usize<<>>( + indices_d, + num_samples, + state_len, + error_flags_d + ); + + return (int)cudaGetLastError(); +} + } // extern "C" diff --git a/qdp/qdp-python/benchmark/benchmark_latency.py b/qdp/qdp-python/benchmark/benchmark_latency.py index 5b53c56c86..2ba98d1a37 100644 --- a/qdp/qdp-python/benchmark/benchmark_latency.py +++ b/qdp/qdp-python/benchmark/benchmark_latency.py @@ -91,6 +91,7 @@ def run_mahout( batch_size: int, prefetch: int, encoding_method: str = "amplitude", + warmup: int = 5, ): """Run Mahout latency using the generic user API (QdpBenchmark).""" try: @@ -100,6 +101,7 @@ def run_mahout( .encoding(encoding_method) .batches(total_batches, size=batch_size) .prefetch(prefetch) + .warmup(warmup) .run_latency() ) except Exception as exc: @@ -239,6 +241,12 @@ def main() -> None: choices=["amplitude", "angle", "basis"], help="Encoding method to use for Mahout (amplitude, angle, or basis).", ) + parser.add_argument( + "--warmup", + type=int, + default=5, + help="Number of warm-up batches (excluded from timing).", + ) args = parser.parse_args() if not torch.cuda.is_available(): @@ -259,6 +267,7 @@ def main() -> None: print(f" Prefetch : {args.prefetch}") print(f" Frameworks : {', '.join(frameworks)}") print(f" Encode method: {args.encoding_method}") + print(f" Warmup : {args.warmup}") bytes_per_vec = vector_len * 8 print(f" Generated {total_vectors} samples") print( @@ -308,6 +317,7 @@ def main() -> None: args.batch_size, args.prefetch, args.encoding_method, + warmup=args.warmup, ) print() diff --git a/qdp/qdp-python/benchmark/benchmark_throughput.py b/qdp/qdp-python/benchmark/benchmark_throughput.py index 8abbf7c79b..3fb6920a29 100644 --- a/qdp/qdp-python/benchmark/benchmark_throughput.py +++ b/qdp/qdp-python/benchmark/benchmark_throughput.py @@ -82,6 +82,7 @@ def run_mahout( batch_size: int, prefetch: int, encoding_method: str = "amplitude", + warmup: int = 5, ): """Run Mahout throughput using the generic user API (QdpBenchmark).""" try: @@ -91,6 +92,7 @@ def run_mahout( .encoding(encoding_method) .batches(total_batches, size=batch_size) .prefetch(prefetch) + .warmup(warmup) .run_throughput() ) except Exception as exc: @@ -212,6 +214,12 @@ def main() -> None: choices=["amplitude", "angle", "basis"], help="Encoding method to use for Mahout (amplitude, angle, or basis).", ) + parser.add_argument( + "--warmup", + type=int, + default=5, + help="Number of warm-up batches (excluded from timing).", + ) args = parser.parse_args() try: @@ -229,6 +237,7 @@ def main() -> None: print(f" Prefetch : {args.prefetch}") print(f" Frameworks : {', '.join(frameworks)}") print(f" Encode method: {args.encoding_method}") + print(f" Warmup : {args.warmup}") bytes_per_vec = vector_len * 8 print(f" Generated {total_vectors} samples") print( @@ -268,6 +277,7 @@ def main() -> None: args.batch_size, args.prefetch, args.encoding_method, + warmup=args.warmup, ) print() diff --git a/qdp/qdp-python/qumat_qdp/loader.py b/qdp/qdp-python/qumat_qdp/loader.py index 9a180baf00..fb02ffab16 100644 --- a/qdp/qdp-python/qumat_qdp/loader.py +++ b/qdp/qdp-python/qumat_qdp/loader.py @@ -203,12 +203,16 @@ def source_file(self, path: str, streaming: bool = False) -> QuantumDataLoader: raise ValueError( "Remote URL query/fragment is not supported; use plain scheme://bucket/key path." ) - # For remote URLs, extract the key portion for extension checks. - check_path = path.split("?")[0].rsplit("/", 1)[-1] if "://" in path else path - if streaming and not (check_path.lower().endswith(".parquet")): - raise ValueError( - "streaming=True supports only .parquet files; use streaming=False for other formats." - ) + import os + + # Reject streaming=True for non-.parquet at builder time (not iteration time) so + # the error is immediate and the message is actionable before any data is read. + if streaming: + ext = os.path.splitext(path.split("?")[0].rsplit("/", 1)[-1])[1].lower() + if ext != ".parquet": + raise ValueError( + "streaming=True supports only .parquet files; use streaming=False for other formats." + ) self._file_path = path self._file_requested = True self._streaming_requested = streaming @@ -238,13 +242,18 @@ def null_handling(self, policy: str) -> QuantumDataLoader: return self def backend(self, name: str) -> QuantumDataLoader: - """Set encoding backend: ``'rust'`` or ``'pytorch'``. + """Set encoding backend: ``'rust'``, ``'pytorch'``, or ``'auto'``. - The PyTorch reference backend is intended for testing and must be - explicitly selected. Returns self for chaining. + ``'auto'``: tries the Rust backend first and falls back to the PyTorch + reference backend if the Rust extension is unavailable, emitting a + ``RuntimeWarning`` when the fallback occurs. ``'rust'`` raises if the + extension is missing. ``'pytorch'`` always uses the pure-PyTorch path. + Returns self for chaining. """ - if name not in ("rust", "pytorch"): - raise ValueError(f"backend must be 'rust' or 'pytorch', got {name!r}") + if name not in ("rust", "pytorch", "auto"): + raise ValueError( + f"backend must be 'rust', 'pytorch', or 'auto', got {name!r}" + ) self._backend_name = name return self @@ -259,6 +268,39 @@ def _create_iterator(self) -> Iterator[object]: "source_file() was not called with a path; set file source with .source_file(path)." ) use_synthetic = not self._file_requested + if ( + not use_synthetic + and self._file_path is not None + and self._backend_name != "pytorch" + ): + import os + + _SUPPORTED_EXTS = { + ".parquet", + ".arrow", + ".feather", + ".ipc", + ".npy", + ".pt", + ".pth", + ".pb", + } + is_remote = "://" in self._file_path + check_path = ( + self._file_path.split("?")[0].rsplit("/", 1)[-1] + if is_remote + else self._file_path + ) + ext = os.path.splitext(check_path)[1].lower() + if ext not in _SUPPORTED_EXTS: + raise ValueError( + f"Unsupported file extension {ext!r}. " + f"Supported: {', '.join(sorted(_SUPPORTED_EXTS))}" + ) + if not is_remote and not os.path.exists(self._file_path): + raise FileNotFoundError( + f"File not found: {self._file_path!r}. Check the path and try again." + ) if use_synthetic: _validate_loader_args( device_id=self._device_id, @@ -270,14 +312,39 @@ def _create_iterator(self) -> Iterator[object]: ) if self._backend_name == "pytorch": return self._create_pytorch_iterator(use_synthetic) - # Rust backend (default). + # Rust backend (default) or auto-fallback. qdp = _get_qdp() QdpEngine = getattr(qdp, "QdpEngine", None) if qdp else None if QdpEngine is None: + if self._backend_name == "auto": + import sys + import warnings + + platform_hint = ( + f" (running on {sys.platform}; Rust GPU extension requires Linux with CUDA)" + if sys.platform != "linux" + else "" + ) + warnings.warn( + f"Rust extension (_qdp) is not available{platform_hint}; " + "falling back to PyTorch reference backend. " + "Build with: maturin develop to enable GPU acceleration.", + RuntimeWarning, + stacklevel=3, + ) + return self._create_pytorch_iterator(use_synthetic) + import sys + + platform_hint = "" + if sys.platform != "linux": + platform_hint = ( + f" Note: the Rust GPU extension requires Linux with CUDA; " + f"you are on {sys.platform}." + ) raise RuntimeError( "Rust extension (_qdp) is not available. " "Build with: maturin develop, or explicitly select the PyTorch " - "reference backend with .backend('pytorch')." + f"reference backend with .backend('pytorch').{platform_hint}" ) return self._create_rust_iterator(QdpEngine, use_synthetic) @@ -395,9 +462,17 @@ def _pytorch_file_iter(self, torch, encode_fn, device: str) -> Iterator[object]: ext = os.path.splitext(path)[1].lower() if self._streaming_requested: + import sys + + platform_hint = "" + if sys.platform != "linux": + platform_hint = ( + f" Note: streaming requires Linux with CUDA; " + f"you are on {sys.platform}." + ) raise RuntimeError( "Streaming file loading requires the _qdp Rust extension. " - "Build with: maturin develop" + f"Build with: maturin develop{platform_hint}" ) if ext not in _FALLBACK_FILE_EXTS: @@ -437,6 +512,54 @@ def _pytorch_file_iter(self, torch, encode_fn, device: str) -> Iterator[object]: batch = all_data[start:end] yield encode_fn(batch, num_qubits, encoding_method, device=device) + def as_torch_dataset(self): + """Wrap this loader as a ``torch.utils.data.IterableDataset``. + + Returns a dataset that yields one encoded batch (``torch.Tensor``) per + iteration step, compatible with ``torch.utils.data.DataLoader``. + + Example:: + + from qumat_qdp import QuantumDataLoader + import torch + + dataset = (QuantumDataLoader() + .qubits(16).encoding("amplitude") + .batches(100, size=64) + .source_synthetic() + .as_torch_dataset()) + loader = torch.utils.data.DataLoader(dataset, batch_size=None, num_workers=0) + for batch in loader: + ... # batch is torch.Tensor, shape (64, 2*2^16) + + Note: ``batch_size=None`` in DataLoader disables DataLoader's own batching; + ``num_workers=0`` is required because the Rust backend holds GPU state that + cannot be pickled for multi-process workers. + """ + try: + import torch # noqa: F401 — ensure torch is available before building the class + from torch.utils.data import IterableDataset as _IterableDataset + except ImportError: + raise RuntimeError( + "as_torch_dataset() requires PyTorch. Install with: pip install torch" + ) from None + + loader_ref = self + + class _QdpDataset(_IterableDataset): # type: ignore[misc] + """IterableDataset wrapping a QuantumDataLoader.""" + + def __iter__(self_) -> Iterator[object]: + import torch as _torch + + for batch in loader_ref: + # DLPack capsule from the Rust backend → tensor + if not isinstance(batch, _torch.Tensor): + batch = _torch.from_dlpack(batch) + yield batch + + return _QdpDataset() + def __iter__(self) -> Iterator[object]: """Return iterator that yields one encoded batch per step. diff --git a/qdp/qdp-python/src/dlpack.rs b/qdp/qdp-python/src/dlpack.rs index 4a1ad08ec5..9746292ee5 100644 --- a/qdp/qdp-python/src/dlpack.rs +++ b/qdp/qdp-python/src/dlpack.rs @@ -24,6 +24,10 @@ use std::ffi::c_void; /// /// This struct owns the DLManagedTensor pointer and ensures proper cleanup /// via the DLPack deleter when dropped (RAII pattern). +/// +/// Currently unused: `_encode_from_cuda_tensor` uses `extract_cuda_tensor_info` +/// from `pytorch.rs`. Preserved for a planned DLPack-first dispatcher refactor. +#[allow(dead_code)] pub struct DLPackTensorInfo { /// Raw DLManagedTensor pointer from PyTorch DLPack capsule /// This is owned by this struct and will be freed via deleter on drop @@ -63,6 +67,7 @@ impl Drop for DLPackTensorInfo { /// for the entire duration that `data_ptr` is in use. Python's GIL ensures /// the tensor won't be garbage collected during `encode()`, but the caller /// must not deallocate or resize the tensor while encoding is in progress. +#[allow(dead_code)] pub fn extract_dlpack_tensor( _py: Python<'_>, tensor: &Bound<'_, PyAny>, diff --git a/qdp/qdp-python/src/engine.rs b/qdp/qdp-python/src/engine.rs index 9d6a2a6ae6..7888d5485d 100644 --- a/qdp/qdp-python/src/engine.rs +++ b/qdp/qdp-python/src/engine.rs @@ -14,10 +14,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::dlpack::extract_dlpack_tensor; use crate::pytorch::{ - extract_cuda_tensor_info, get_tensor_device_id, get_torch_cuda_stream_ptr, is_cuda_tensor, - is_pytorch_tensor, validate_cuda_tensor_for_encoding, validate_shape, validate_tensor, + extract_cuda_tensor_info, get_torch_cuda_stream_ptr, is_cuda_tensor, is_pytorch_tensor, + validate_cuda_tensor_for_encoding, validate_shape, validate_tensor, }; use crate::tensor::QuantumTensor; use numpy::{PyReadonlyArray1, PyReadonlyArray2, PyUntypedArrayMethods}; @@ -205,81 +204,11 @@ impl QdpEngine { num_qubits: usize, encoding_method: &str, ) -> PyResult { - // Check if it's a CUDA tensor - use zero-copy GPU encoding via DLPack + // CUDA tensors delegate to the central CUDA dispatcher so f32 angle/basis + // route to their dedicated zero-copy paths instead of the f64 c_void + // fallback, which would reinterpret the bytes. if is_cuda_tensor(data)? { - // Validate CUDA tensor for direct GPU encoding - validate_cuda_tensor_for_encoding( - data, - self.engine.device().ordinal(), - encoding_method, - )?; - - // Extract GPU pointer via DLPack (RAII wrapper ensures deleter is called) - let dlpack_info = extract_dlpack_tensor(data.py(), data)?; - - // ensure PyTorch API and DLPack metadata agree on device ID - let pytorch_device_id = get_tensor_device_id(data)?; - if dlpack_info.device_id != pytorch_device_id { - return Err(PyRuntimeError::new_err(format!( - "Device ID mismatch: PyTorch reports device {}, but DLPack metadata reports {}. \ - This indicates an inconsistency between PyTorch and DLPack device information.", - pytorch_device_id, dlpack_info.device_id - ))); - } - - let ndim: usize = data.call_method0("dim")?.extract()?; - validate_shape(ndim, "CUDA tensor")?; - - match ndim { - 1 => { - // 1D CUDA tensor: single sample encoding - let input_len = dlpack_info.shape[0] as usize; - // SAFETY: dlpack_info.data_ptr was validated via DLPack protocol from a - // valid PyTorch CUDA tensor. The tensor remains alive during this call - // (held by Python's GIL), and we validated dtype/contiguity/device above. - // The DLPackTensorInfo RAII wrapper will call deleter when dropped. - let ptr = unsafe { - self.engine - .encode_from_gpu_ptr( - dlpack_info.data_ptr, - input_len, - num_qubits, - encoding_method, - ) - .map_err(|e| { - PyRuntimeError::new_err(format!("Encoding failed: {}", e)) - })? - }; - return Ok(QuantumTensor { - ptr, - consumed: false, - }); - } - 2 => { - // 2D CUDA tensor: batch encoding - let num_samples = dlpack_info.shape[0] as usize; - let sample_size = dlpack_info.shape[1] as usize; - // SAFETY: Same as above - pointer from validated DLPack tensor - let ptr = unsafe { - self.engine - .encode_batch_from_gpu_ptr( - dlpack_info.data_ptr, - num_samples, - sample_size, - num_qubits, - encoding_method, - ) - .map_err(|e| { - PyRuntimeError::new_err(format!("Encoding failed: {}", e)) - })? - }; - return Ok(QuantumTensor { - ptr, - consumed: false, - }); - } - _ => unreachable!("validate_shape() should have caught invalid ndim"), - } + return self._encode_from_cuda_tensor(data, num_qubits, encoding_method); } // CPU tensor path @@ -499,82 +428,87 @@ impl QdpEngine { let ndim: usize = data.call_method0("dim")?.extract()?; let tensor_info = extract_cuda_tensor_info(data)?; - if is_f32 && matches!(method.as_str(), "amplitude" | "angle") { - match ndim { - 1 => { - let input_len: usize = data.call_method0("numel")?.extract()?; - let stream_ptr = get_torch_cuda_stream_ptr(data)?; - let data_ptr_u64: u64 = data.call_method0("data_ptr")?.extract()?; - let data_ptr = data_ptr_u64 as *const f32; - - let ptr = unsafe { - match method.as_str() { - "amplitude" => self - .engine - .encode_from_gpu_ptr_f32_with_stream( - data_ptr, input_len, num_qubits, stream_ptr, - ) - .map_err(|e| { - PyRuntimeError::new_err(format!( - "Encoding failed (float32 amplitude): {}", - e - )) - })?, - "angle" => self - .engine - .encode_angle_from_gpu_ptr_f32_with_stream( - data_ptr, input_len, num_qubits, stream_ptr, - ) - .map_err(|e| { - PyRuntimeError::new_err(format!( - "Encoding failed (float32 angle): {}", - e - )) - })?, - _ => unreachable!("unreachable: unhandled f32 encoding method"), - } - }; + if is_f32 && matches!(method.as_str(), "amplitude" | "angle" | "basis") { + let stream_ptr = get_torch_cuda_stream_ptr(data)?; + let data_ptr_u64: u64 = data.call_method0("data_ptr")?.extract()?; + let data_ptr = data_ptr_u64 as *const f32; - Ok(QuantumTensor { - ptr, - consumed: false, - }) + let ptr = match (method.as_str(), ndim) { + ("amplitude", 1) => { + let input_len: usize = data.call_method0("numel")?.extract()?; + unsafe { + self.engine.encode_from_gpu_ptr_f32_with_stream( + data_ptr, input_len, num_qubits, stream_ptr, + ) + } } - 2 => { + ("amplitude", 2) => { let num_samples = tensor_info.shape[0] as usize; let sample_size = tensor_info.shape[1] as usize; - let stream_ptr = get_torch_cuda_stream_ptr(data)?; - let data_ptr_u64: u64 = data.call_method0("data_ptr")?.extract()?; - let data_ptr = data_ptr_u64 as *const f32; - - let ptr = unsafe { - self.engine - .encode_batch_from_gpu_ptr_f32_with_stream( - data_ptr, - num_samples, - sample_size, - num_qubits, - stream_ptr, - ) - .map_err(|e| { - PyRuntimeError::new_err(format!( - "Encoding failed (float32 amplitude batch): {}", - e - )) - })? - }; - - Ok(QuantumTensor { - ptr, - consumed: false, - }) + unsafe { + self.engine.encode_batch_from_gpu_ptr_f32_with_stream( + data_ptr, + num_samples, + sample_size, + num_qubits, + stream_ptr, + ) + } + } + ("angle", 1) => { + let input_len: usize = data.call_method0("numel")?.extract()?; + unsafe { + self.engine.encode_angle_from_gpu_ptr_f32_with_stream( + data_ptr, input_len, num_qubits, stream_ptr, + ) + } + } + ("angle", 2) => { + let num_samples = tensor_info.shape[0] as usize; + let sample_size = tensor_info.shape[1] as usize; + unsafe { + self.engine.encode_angle_batch_from_gpu_ptr_f32_with_stream( + data_ptr, + num_samples, + sample_size, + num_qubits, + stream_ptr, + ) + } + } + ("basis", 1) => unsafe { + self.engine + .encode_basis_from_gpu_ptr_f32_with_stream(data_ptr, num_qubits, stream_ptr) + }, + ("basis", 2) => { + let num_samples = tensor_info.shape[0] as usize; + let sample_size = tensor_info.shape[1] as usize; + unsafe { + self.engine.encode_basis_batch_from_gpu_ptr_f32_with_stream( + data_ptr, + num_samples, + sample_size, + num_qubits, + stream_ptr, + ) + } + } + (_, other) => { + return Err(PyRuntimeError::new_err(format!( + "Unsupported CUDA tensor shape: {}D. Expected 1D tensor for single \ + sample encoding or 2D tensor (batch_size, features) for batch encoding.", + other + ))); } - _ => Err(PyRuntimeError::new_err(format!( - "Unsupported CUDA tensor shape: {}D. Expected 1D tensor for single \ - sample encoding or 2D tensor (batch_size, features) for batch encoding.", - ndim - ))), } + .map_err(|e| { + PyRuntimeError::new_err(format!("Encoding failed (float32 {}): {}", method, e)) + })?; + + Ok(QuantumTensor { + ptr, + consumed: false, + }) } else { let stream_ptr = get_torch_cuda_stream_ptr(data)?; diff --git a/qdp/qdp-python/src/pytorch.rs b/qdp/qdp-python/src/pytorch.rs index 9b6645105c..2089130544 100644 --- a/qdp/qdp-python/src/pytorch.rs +++ b/qdp/qdp-python/src/pytorch.rs @@ -151,7 +151,6 @@ pub fn validate_cuda_tensor_for_encoding( encoding_method: &str, ) -> PyResult<()> { let method = encoding_method.to_ascii_lowercase(); - let ndim: usize = tensor.call_method0("dim")?.extract()?; if !CUDA_ENCODING_METHODS.contains(&method.as_str()) { return Err(PyRuntimeError::new_err(format!( @@ -176,15 +175,17 @@ pub fn validate_cuda_tensor_for_encoding( ))); } } - "angle" | "iqp" | "iqp-z" => { - if method == "angle" && dtype_str_lower.contains("float32") { - if ndim != 1 { - return Err(PyRuntimeError::new_err( - "CUDA tensor float32 angle encoding currently supports only 1D single-sample tensors. \ - Use tensor.to(torch.float64) for batch angle encoding.", - )); - } - } else if !dtype_str_lower.contains("float64") { + "angle" => { + if !(dtype_str_lower.contains("float64") || dtype_str_lower.contains("float32")) { + return Err(PyRuntimeError::new_err(format!( + "CUDA tensor must have dtype float64 or float32 for angle encoding, got {}. \ + Use tensor.to(torch.float64) or tensor.to(torch.float32)", + dtype_str + ))); + } + } + "iqp" | "iqp-z" => { + if !dtype_str_lower.contains("float64") { return Err(PyRuntimeError::new_err(format!( "CUDA tensor must have dtype float64 for {} encoding, got {}. \ Use tensor.to(torch.float64)", @@ -193,10 +194,10 @@ pub fn validate_cuda_tensor_for_encoding( } } "basis" => { - if !dtype_str_lower.contains("int64") { + if !(dtype_str_lower.contains("int64") || dtype_str_lower.contains("float32")) { return Err(PyRuntimeError::new_err(format!( - "CUDA tensor must have dtype int64 for basis encoding, got {}. \ - Use tensor.to(torch.int64)", + "CUDA tensor must have dtype int64 or float32 for basis encoding, got {}. \ + Use tensor.to(torch.int64) or tensor.to(torch.float32)", dtype_str ))); } diff --git a/testing/qdp/test_bindings.py b/testing/qdp/test_bindings.py index 7664cb61e0..0d2b40cf19 100644 --- a/testing/qdp/test_bindings.py +++ b/testing/qdp/test_bindings.py @@ -694,8 +694,8 @@ def test_angle_encode_cuda_tensor_float32_input_output_dtype(precision, expected @requires_qdp @pytest.mark.gpu -def test_angle_encode_cuda_tensor_float32_batch_rejected(): - """Test that float32 CUDA angle encoding stays limited to 1D single-sample tensors.""" +def test_angle_encode_cuda_tensor_float32_batch_supported(): + """Float32 CUDA angle 2D batch is supported via the f32 zero-copy batch path.""" pytest.importorskip("torch") from _qdp import QdpEngine @@ -709,8 +709,11 @@ def test_angle_encode_cuda_tensor_float32_batch_rejected(): device="cuda:0", ) - with pytest.raises(RuntimeError, match="supports only 1D single-sample tensors"): - engine.encode(data, 2, "angle") + result = engine.encode(data, 2, "angle") + assert result is not None + qt = torch.from_dlpack(result) + assert qt.is_cuda + assert qt.shape == (2, 4) @requires_qdp diff --git a/testing/qdp_python/test_dlpack_validation.py b/testing/qdp_python/test_dlpack_validation.py index 8b90c41911..0785724ec2 100644 --- a/testing/qdp_python/test_dlpack_validation.py +++ b/testing/qdp_python/test_dlpack_validation.py @@ -138,8 +138,9 @@ def test_cuda_float32_angle_supported_single_sample() -> None: @pytest.mark.skipif(not _cuda_available(), reason="CUDA not available") -def test_cuda_float32_angle_2d_rejected() -> None: - """Float32 CUDA angle encoding should remain single-sample only.""" +def test_cuda_float32_angle_2d_batch_supported() -> None: + """Float32 CUDA angle batch encoding (2D) is supported via the + f32 zero-copy batch path.""" engine = _engine() t = torch.tensor( [[0.0, 0.0], [torch.pi / 2, 0.0]], @@ -147,8 +148,13 @@ def test_cuda_float32_angle_2d_rejected() -> None: device="cuda", ) - with pytest.raises(RuntimeError, match="1D single-sample"): - engine.encode(t, num_qubits=2, encoding_method="angle") + result = engine.encode(t, num_qubits=2, encoding_method="angle") + assert result is not None + + qt = torch.from_dlpack(result) + assert qt.is_cuda + assert qt.shape == (2, 4) + assert qt.dtype == torch.complex64 @pytest.mark.skipif(not _cuda_available(), reason="CUDA not available") diff --git a/testing/qdp_python/test_fallback.py b/testing/qdp_python/test_fallback.py index e6aac2d614..a8400846ca 100644 --- a/testing/qdp_python/test_fallback.py +++ b/testing/qdp_python/test_fallback.py @@ -247,8 +247,8 @@ def test_streaming_raises(self): def test_invalid_backend_raises(self): from qumat_qdp.loader import QuantumDataLoader - with pytest.raises(ValueError, match="'rust' or 'pytorch'"): - QuantumDataLoader(device_id=0).backend("auto") + with pytest.raises(ValueError, match="'rust', 'pytorch', or 'auto'"): + QuantumDataLoader(device_id=0).backend("invalid") # ---------------------------------------------------------------------------