diff --git a/qdp/qdp-core/examples/distributed_multigpu_q34_probe.rs b/qdp/qdp-core/examples/distributed_multigpu_q34_probe.rs new file mode 100644 index 0000000000..9eaa226a1a --- /dev/null +++ b/qdp/qdp-core/examples/distributed_multigpu_q34_probe.rs @@ -0,0 +1,135 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Instant; + +use qdp_core::gpu::LocalCollectiveCommunicator; +use qdp_core::{ + DistributedExecutionContext, DistributionMode, MahoutError, PlacementRequest, Precision, + QdpEngine, ShardPolicy, +}; + +fn gib(bytes: usize) -> f64 { + bytes as f64 / (1024.0 * 1024.0 * 1024.0) +} + +fn parse_device_ids() -> Result, MahoutError> { + let raw = std::env::var("GPU_IDS").unwrap_or_else(|_| "0,1,2,3,4,5".to_string()); + let mut ids = Vec::new(); + for piece in raw.split(',') { + let trimmed = piece.trim(); + if trimmed.is_empty() { + continue; + } + ids.push(trimmed.parse::().map_err(|err| { + MahoutError::InvalidInput(format!("Invalid GPU ID '{trimmed}': {err}")) + })?); + } + + if ids.is_empty() { + return Err(MahoutError::InvalidInput( + "GPU_IDS must contain at least one CUDA device ID".to_string(), + )); + } + + Ok(ids) +} + +fn main() -> Result<(), MahoutError> { + let num_qubits = std::env::var("QUBITS") + .ok() + .and_then(|value| value.parse::().ok()) + .unwrap_or(34); + let host_len = std::env::var("HOST_LEN") + .ok() + .and_then(|value| value.parse::().ok()) + .unwrap_or(1); + let precision = match std::env::var("PRECISION").ok().as_deref() { + Some("f64") | Some("float64") => Precision::Float64, + _ => Precision::Float32, + }; + let shard_policy = match std::env::var("SHARD_POLICY").ok().as_deref() { + Some("equal") => ShardPolicy::Equal, + _ => ShardPolicy::BalancedUneven, + }; + let device_ids = parse_device_ids()?; + let request = + PlacementRequest::new(num_qubits, DistributionMode::ShardedCapacity, shard_policy); + let host_data = vec![1.0f64; host_len]; + + println!( + "Starting distributed amplitude probe: qubits={}, host_len={}, gpus={:?}, precision={:?}, shard_policy={:?}, collectives=in-process", + num_qubits, host_len, device_ids, precision, shard_policy + ); + + let collectives = LocalCollectiveCommunicator; + let execution = DistributedExecutionContext::single_process(device_ids.clone(), &collectives)?; + + let prepare_start = Instant::now(); + let prepared = QdpEngine::prepare_distributed_amplitude_on( + &execution, + &host_data, + num_qubits, + precision, + Some(request.clone()), + )?; + let prepare_elapsed = prepare_start.elapsed(); + + println!( + "Prepared in {:.3}s; global_len={}; shards={}; max_local_len={}; estimated_max_shard_gib={:.2}; gather_device={:?}", + prepare_elapsed.as_secs_f64(), + prepared.plan.global_len, + prepared.layout.num_shards(), + prepared.plan.max_local_len(), + gib(prepared.plan.estimated_max_shard_bytes(precision)?), + prepared.layout.recommended_gather_device_id() + ); + + for shard in &prepared.layout.shards { + let shard_bytes = match precision { + Precision::Float32 => shard.local_len * 8, + Precision::Float64 => shard.local_len * 16, + }; + println!( + " shard {} -> cuda:{} range=[{}, {}) local_len={} (~{:.2} GiB)", + shard.shard_id, + shard.device_id, + shard.start_idx, + shard.end_idx, + shard.local_len, + gib(shard_bytes) + ); + } + + let encode_start = Instant::now(); + let state = QdpEngine::encode_distributed_amplitude_to_shards_on( + &execution, + &host_data, + num_qubits, + precision, + Some(request), + )?; + let encode_elapsed = encode_start.elapsed(); + + println!( + "Encoded in {:.3}s; state_shards={}; placement={:?}", + encode_elapsed.as_secs_f64(), + state.num_shards(), + state.recommended_placement_device_ids() + ); + + Ok(()) +} diff --git a/qdp/qdp-core/src/gpu/communicator.rs b/qdp/qdp-core/src/gpu/communicator.rs new file mode 100644 index 0000000000..591cc81d32 --- /dev/null +++ b/qdp/qdp-core/src/gpu/communicator.rs @@ -0,0 +1,43 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::error::{MahoutError, Result}; + +/// Abstracts cross-shard collective operations. +/// +/// The current implementation executes collectives inside one process. A future +/// MPI-backed implementation can provide the same interface while mapping the +/// partial contributions to rank-local shards and performing a real all-reduce. +pub trait CollectiveCommunicator: Send + Sync { + fn all_reduce_sum_f64(&self, values: &[f64]) -> Result; +} + +/// In-process collective implementation for the current single-process +/// distributed path. +#[derive(Default, Debug, Clone, Copy)] +pub struct LocalCollectiveCommunicator; + +impl CollectiveCommunicator for LocalCollectiveCommunicator { + fn all_reduce_sum_f64(&self, values: &[f64]) -> Result { + if values.is_empty() { + return Err(MahoutError::InvalidInput( + "Collective reduction requires at least one partial contribution".to_string(), + )); + } + + Ok(values.iter().copied().sum()) + } +} diff --git a/qdp/qdp-core/src/gpu/cuda_ffi.rs b/qdp/qdp-core/src/gpu/cuda_ffi.rs index 2ed60c311e..efafe7c2b0 100644 --- a/qdp/qdp-core/src/gpu/cuda_ffi.rs +++ b/qdp/qdp-core/src/gpu/cuda_ffi.rs @@ -54,7 +54,14 @@ unsafe extern "C" { ptr: *const c_void, ) -> i32; + pub(crate) fn cudaGetDevice(device: *mut i32) -> i32; + pub(crate) fn cudaSetDevice(device: i32) -> i32; pub(crate) fn cudaMemGetInfo(free: *mut usize, total: *mut usize) -> i32; + pub(crate) fn cudaDeviceCanAccessPeer( + can_access_peer: *mut i32, + device: i32, + peer_device: i32, + ) -> i32; pub(crate) fn cudaMemcpyAsync( dst: *mut c_void, @@ -63,7 +70,6 @@ unsafe extern "C" { kind: u32, stream: *mut c_void, ) -> i32; - pub(crate) fn cudaEventCreateWithFlags(event: *mut *mut c_void, flags: u32) -> i32; pub(crate) fn cudaEventRecord(event: *mut c_void, stream: *mut c_void) -> i32; pub(crate) fn cudaEventDestroy(event: *mut c_void) -> i32; diff --git a/qdp/qdp-core/src/gpu/distributed/amplitude.rs b/qdp/qdp-core/src/gpu/distributed/amplitude.rs new file mode 100644 index 0000000000..fdc41635a4 --- /dev/null +++ b/qdp/qdp-core/src/gpu/distributed/amplitude.rs @@ -0,0 +1,163 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::error::{MahoutError, Result}; +use crate::gpu::distributed::{ + DistributionMode, PlacementPlan, PlacementPlanner, PlacementRequest, ShardPlacement, + ShardPolicy, +}; +use crate::gpu::memory::Precision; +use crate::gpu::topology::DeviceMesh; + +/// Shared planning math for amplitude-sharded state construction. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct DistributedAmplitudePlan { + pub request: PlacementRequest, + pub placement: PlacementPlan, + pub num_qubits: usize, + pub global_len: usize, + pub num_devices: usize, + pub shard_bits: Option, + pub uniform_shard_len: Option, +} + +/// Result of preparing a distributed amplitude encode without yet allocating +/// concrete shard buffers. This fixes the public API surface for later PRs that +/// will populate `state` with real device allocations. +#[derive(Clone)] +pub struct PreparedDistributedAmplitudeEncode { + pub mesh: DeviceMesh, + pub plan: DistributedAmplitudePlan, + pub inv_norm: f64, + pub layout: super::layout::DistributedStateLayout, +} + +impl DistributedAmplitudePlan { + pub fn for_request(mesh: &DeviceMesh, request: PlacementRequest) -> Result { + if request.num_qubits == 0 { + return Err(MahoutError::InvalidInput( + "Number of qubits must be at least 1 for distributed amplitude planning" + .to_string(), + )); + } + if mesh.num_devices() == 0 { + return Err(MahoutError::InvalidInput( + "Distributed amplitude planning requires at least one device".to_string(), + )); + } + if request.mode != DistributionMode::ShardedCapacity { + return Err(MahoutError::InvalidInput(format!( + "Distributed amplitude planning currently supports only {:?}, got {:?}", + DistributionMode::ShardedCapacity, + request.mode + ))); + } + + let num_devices = mesh.num_devices(); + let placement = PlacementPlanner::plan(mesh, &request)?; + Self::validate_local_shard_shape(request.num_qubits, &placement)?; + let global_len = placement.global_len; + let num_qubits = request.num_qubits; + let (shard_bits, uniform_shard_len) = match request.shard_policy { + ShardPolicy::Equal => { + debug_assert!(num_devices.is_power_of_two()); + let shard_bits = num_devices.trailing_zeros() as usize; + if shard_bits > request.num_qubits { + return Err(MahoutError::InvalidInput(format!( + "Cannot shard {} qubits across {} devices: shard bits {} exceed qubit count", + request.num_qubits, num_devices, shard_bits + ))); + } + (Some(shard_bits), Some(placement.shard_len()?)) + } + ShardPolicy::BalancedUneven => (None, None), + }; + + Ok(Self { + request, + placement, + num_qubits, + global_len, + num_devices, + shard_bits, + uniform_shard_len, + }) + } + + pub fn shard_range(&self, shard_id: usize) -> Result<(usize, usize)> { + let placement = self.placement.placements.get(shard_id).ok_or_else(|| { + MahoutError::InvalidInput(format!( + "Shard ID {} out of range for {} devices", + shard_id, self.num_devices + )) + })?; + Ok((placement.start_idx, placement.end_idx)) + } + + pub fn max_local_len(&self) -> usize { + self.placement + .placements + .iter() + .map(ShardPlacement::local_len) + .max() + .unwrap_or(0) + } + + pub fn estimated_max_shard_bytes(&self, precision: Precision) -> Result { + estimated_amplitude_bytes(self.max_local_len(), precision) + } + + fn validate_local_shard_shape(num_qubits: usize, placement: &PlacementPlan) -> Result<()> { + let required_local_len = placement + .placements + .iter() + .map(ShardPlacement::local_len) + .max() + .ok_or_else(|| { + MahoutError::InvalidInput( + "Placement plan must contain at least one shard".to_string(), + ) + })?; + + if required_local_len == 0 { + return Err(MahoutError::InvalidInput(format!( + "Distributed amplitude request for {} qubits produced an empty local shard", + num_qubits + ))); + } + + let _ = estimated_amplitude_bytes(required_local_len, Precision::Float32)?; + let _ = estimated_amplitude_bytes(required_local_len, Precision::Float64)?; + + Ok(()) + } +} + +fn estimated_amplitude_bytes(local_len: usize, precision: Precision) -> Result { + let bytes_per_amplitude = match precision { + Precision::Float32 => 8usize, + Precision::Float64 => 16usize, + }; + + local_len + .checked_mul(bytes_per_amplitude) + .ok_or_else(|| { + MahoutError::InvalidInput(format!( + "Distributed amplitude shard byte estimate overflowed for local_len={} and precision={:?}", + local_len, precision + )) + }) +} diff --git a/qdp/qdp-core/src/gpu/distributed/context.rs b/qdp/qdp-core/src/gpu/distributed/context.rs new file mode 100644 index 0000000000..f103db5bc2 --- /dev/null +++ b/qdp/qdp-core/src/gpu/distributed/context.rs @@ -0,0 +1,51 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::error::Result; +use crate::gpu::communicator::CollectiveCommunicator; +use crate::gpu::topology::DeviceMesh; + +/// Bundles the device mesh with the collective implementation that coordinates +/// those devices. +/// +/// The current branch uses one process with one mesh covering all participating +/// devices. A future MPI implementation can construct one context per rank with +/// a rank-local mesh and an MPI-backed collective implementation. +pub struct DistributedExecutionContext<'a> { + mesh: DeviceMesh, + collectives: &'a dyn CollectiveCommunicator, +} + +impl<'a> DistributedExecutionContext<'a> { + pub fn new(mesh: DeviceMesh, collectives: &'a dyn CollectiveCommunicator) -> Self { + Self { mesh, collectives } + } + + pub fn single_process( + device_ids: Vec, + collectives: &'a dyn CollectiveCommunicator, + ) -> Result { + Ok(Self::new(DeviceMesh::new(device_ids)?, collectives)) + } + + pub fn mesh(&self) -> &DeviceMesh { + &self.mesh + } + + pub(crate) fn collectives(&self) -> &dyn CollectiveCommunicator { + self.collectives + } +} diff --git a/qdp/qdp-core/src/gpu/distributed/layout.rs b/qdp/qdp-core/src/gpu/distributed/layout.rs new file mode 100644 index 0000000000..feb86258a0 --- /dev/null +++ b/qdp/qdp-core/src/gpu/distributed/layout.rs @@ -0,0 +1,123 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use cudarc::driver::CudaDevice; + +use crate::error::{MahoutError, Result}; +use crate::gpu::memory::Precision; +use crate::gpu::topology::{DeviceMesh, GpuTopology}; + +use super::DistributedAmplitudePlan; +use super::shared; + +/// One shard of a logically distributed state vector. +#[derive(Clone)] +pub struct StateShardLayout { + pub device: Arc, + pub device_id: usize, + pub shard_id: usize, + pub start_idx: usize, + pub end_idx: usize, + pub local_len: usize, +} + +/// Metadata describing how one distributed state is mapped onto one execution +/// context. +#[derive(Clone)] +pub struct DistributedStateLayout { + pub num_qubits: usize, + pub precision: Precision, + pub global_len: usize, + pub shard_bits: Option, + pub topology: GpuTopology, + pub shards: Vec, +} + +impl DistributedStateLayout { + #[cfg(target_os = "linux")] + pub fn recommended_gather_device_id(&self) -> Option { + shared::policy_device_ids( + &self.topology, + self.shards.iter().map(|shard| shard.device_id), + ) + .0 + } + + #[cfg(target_os = "linux")] + pub fn recommended_placement_device_ids(&self) -> Vec { + shared::policy_device_ids( + &self.topology, + self.shards.iter().map(|shard| shard.device_id), + ) + .1 + } + + pub fn num_shards(&self) -> usize { + self.shards.len() + } + + pub fn new( + mesh: &DeviceMesh, + plan: &DistributedAmplitudePlan, + precision: Precision, + ) -> Result { + if mesh.num_devices() != plan.num_devices { + return Err(MahoutError::InvalidInput(format!( + "Device mesh / amplitude plan mismatch: {} devices vs {} planned shards", + mesh.num_devices(), + plan.num_devices + ))); + } + if mesh.devices.len() != plan.num_devices { + return Err(MahoutError::InvalidInput(format!( + "Device mesh / device handles mismatch: {} handles for {} planned shards", + mesh.devices.len(), + plan.num_devices + ))); + } + if plan.placement.placements.len() != plan.num_devices { + return Err(MahoutError::InvalidInput(format!( + "Placement plan mismatch: {} placements for {} planned shards", + plan.placement.placements.len(), + plan.num_devices + ))); + } + + let mut shards = Vec::with_capacity(mesh.num_devices()); + for placement in &plan.placement.placements { + let (start_idx, end_idx) = (placement.start_idx, placement.end_idx); + shards.push(StateShardLayout { + device: mesh.device_for_id(placement.device_id)?, + device_id: placement.device_id, + shard_id: placement.shard_id, + start_idx, + end_idx, + local_len: placement.local_len(), + }); + } + + Ok(Self { + num_qubits: plan.num_qubits, + precision, + global_len: plan.global_len, + shard_bits: plan.shard_bits, + topology: mesh.topology.clone(), + shards, + }) + } +} diff --git a/qdp/qdp-core/src/gpu/distributed/mod.rs b/qdp/qdp-core/src/gpu/distributed/mod.rs new file mode 100644 index 0000000000..47249b5b64 --- /dev/null +++ b/qdp/qdp-core/src/gpu/distributed/mod.rs @@ -0,0 +1,32 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod amplitude; +mod context; +mod layout; +mod planner; +pub(crate) mod runtime; +mod shared; +mod state; + +pub use amplitude::{DistributedAmplitudePlan, PreparedDistributedAmplitudeEncode}; +pub use context::DistributedExecutionContext; +pub use layout::{DistributedStateLayout, StateShardLayout}; +pub use planner::{ + DistributionMode, PlacementPlan, PlacementPlanner, PlacementRequest, ShardPlacement, + ShardPolicy, +}; +pub use state::{DistributedStateVector, StateShard}; diff --git a/qdp/qdp-core/src/gpu/distributed/planner.rs b/qdp/qdp-core/src/gpu/distributed/planner.rs new file mode 100644 index 0000000000..060d001a80 --- /dev/null +++ b/qdp/qdp-core/src/gpu/distributed/planner.rs @@ -0,0 +1,248 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::error::{MahoutError, Result}; +use crate::gpu::topology::DeviceMesh; + +/// Runtime distribution modes for distributed state construction. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum DistributionMode { + Single, + ShardedCapacity, + Replicated, +} + +/// Placement policy for slicing the logical output state across devices. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum ShardPolicy { + /// Evenly partition the logical state into contiguous shards. + /// + /// Because the global state length is always `2^n`, equal-width integer + /// shards are only possible for power-of-two device counts. + Equal, + BalancedUneven, +} + +/// Planner input describing the logical distributed state request. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct PlacementRequest { + pub num_qubits: usize, + pub mode: DistributionMode, + pub shard_policy: ShardPolicy, +} + +impl PlacementRequest { + pub fn new(num_qubits: usize, mode: DistributionMode, shard_policy: ShardPolicy) -> Self { + Self { + num_qubits, + mode, + shard_policy, + } + } + + pub fn global_len(&self) -> Result { + 1usize.checked_shl(self.num_qubits as u32).ok_or_else(|| { + MahoutError::InvalidInput(format!( + "Global amplitude length overflow for {} qubits", + self.num_qubits + )) + }) + } +} + +/// One logical placement decision produced by the planner. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ShardPlacement { + pub device_id: usize, + pub shard_id: usize, + pub start_idx: usize, + pub end_idx: usize, +} + +impl ShardPlacement { + pub fn local_len(&self) -> usize { + self.end_idx - self.start_idx + } +} + +/// Planner output consumed by distributed encoders and by any later +/// gather/export layer that needs stable shard ranges. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct PlacementPlan { + pub mode: DistributionMode, + pub global_len: usize, + pub placements: Vec, + pub gather_device_id: Option, +} + +impl PlacementPlan { + pub fn num_devices(&self) -> usize { + self.placements.len() + } + + pub fn shard_len(&self) -> Result { + let Some(first) = self.placements.first() else { + return Err(MahoutError::InvalidInput( + "Placement plan must contain at least one shard".to_string(), + )); + }; + let shard_len = first.local_len(); + if self + .placements + .iter() + .any(|placement| placement.local_len() != shard_len) + { + return Err(MahoutError::InvalidInput( + "Placement plan contains uneven shard lengths".to_string(), + )); + } + Ok(shard_len) + } +} + +/// Stateless planner for device placement decisions. +#[derive(Clone, Debug, Default)] +pub struct PlacementPlanner; + +impl PlacementPlanner { + pub fn plan(mesh: &DeviceMesh, request: &PlacementRequest) -> Result { + if mesh.num_devices() == 0 { + return Err(MahoutError::InvalidInput( + "Placement planner requires at least one device".to_string(), + )); + } + + let global_len = request.global_len()?; + match request.mode { + DistributionMode::Single => { + let device_ids = Self::select_device_ids(mesh, request)?; + let device_id = *device_ids.first().ok_or_else(|| { + MahoutError::InvalidInput( + "Single-device placement requires one device ID".to_string(), + ) + })?; + Ok(PlacementPlan { + mode: request.mode, + global_len, + placements: vec![ShardPlacement { + device_id, + shard_id: 0, + start_idx: 0, + end_idx: global_len, + }], + gather_device_id: Some(device_id), + }) + } + DistributionMode::ShardedCapacity => { + let device_ids = Self::select_device_ids(mesh, request)?; + let shard_lengths = + Self::plan_shard_lengths(global_len, device_ids.len(), request.shard_policy)?; + let placements = Self::build_shard_placements(&device_ids, &shard_lengths); + + Ok(PlacementPlan { + mode: request.mode, + global_len, + placements, + gather_device_id: mesh.recommended_gather_device_id(), + }) + } + DistributionMode::Replicated => Err(MahoutError::NotImplemented( + "Replicated placement is not implemented yet".to_string(), + )), + } + } + + fn select_device_ids(mesh: &DeviceMesh, request: &PlacementRequest) -> Result> { + match request.mode { + DistributionMode::Single => mesh + .device_ids + .first() + .copied() + .map(|device_id| vec![device_id]) + .ok_or_else(|| { + MahoutError::InvalidInput( + "Single-device placement requires one device ID".to_string(), + ) + }), + DistributionMode::ShardedCapacity => { + let recommended = mesh.recommended_placement_device_ids(); + if recommended.is_empty() { + Ok(mesh.device_ids.clone()) + } else { + Ok(recommended) + } + } + DistributionMode::Replicated => Err(MahoutError::NotImplemented( + "Replicated placement is not implemented yet".to_string(), + )), + } + } + + fn plan_shard_lengths( + global_len: usize, + num_devices: usize, + shard_policy: ShardPolicy, + ) -> Result> { + if num_devices == 0 { + return Err(MahoutError::InvalidInput( + "Shard planning requires at least one device".to_string(), + )); + } + + match shard_policy { + ShardPolicy::Equal => { + if !num_devices.is_power_of_two() { + return Err(MahoutError::InvalidInput(format!( + "Equal shard policy requires a power-of-two device count, got {}", + num_devices + ))); + } + Ok(vec![global_len / num_devices; num_devices]) + } + ShardPolicy::BalancedUneven => { + let base_len = global_len / num_devices; + let remainder = global_len % num_devices; + Ok((0..num_devices) + .map(|shard_id| base_len + usize::from(shard_id < remainder)) + .collect()) + } + } + } + + fn build_shard_placements( + device_ids: &[usize], + shard_lengths: &[usize], + ) -> Vec { + let mut start_idx = 0usize; + device_ids + .iter() + .copied() + .zip(shard_lengths.iter().copied()) + .enumerate() + .map(|(shard_id, (device_id, local_len))| { + let end_idx = start_idx + local_len; + let placement = ShardPlacement { + device_id, + shard_id, + start_idx, + end_idx, + }; + start_idx = end_idx; + placement + }) + .collect() + } +} diff --git a/qdp/qdp-core/src/gpu/distributed/runtime.rs b/qdp/qdp-core/src/gpu/distributed/runtime.rs new file mode 100644 index 0000000000..f7b0e7df9b --- /dev/null +++ b/qdp/qdp-core/src/gpu/distributed/runtime.rs @@ -0,0 +1,336 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +#[cfg(target_os = "linux")] +use crate::error::cuda_error_to_string; +use crate::error::{MahoutError, Result}; +#[cfg(target_os = "linux")] +use crate::gpu::cuda_ffi::{CUDA_SUCCESS, cudaGetDevice, cudaSetDevice}; +use crate::gpu::distributed::DistributedExecutionContext; +#[cfg(target_os = "linux")] +use crate::gpu::memory::{BufferStorage, GpuBufferRaw}; +use crate::gpu::memory::{Precision, ensure_device_memory_available, map_allocation_error}; +use crate::gpu::{ + DistributedAmplitudePlan, DistributedStateLayout, DistributedStateVector, PlacementRequest, +}; +#[cfg(target_os = "linux")] +use cudarc::driver::{DevicePtr, DevicePtrMut}; +#[cfg(target_os = "linux")] +use qdp_kernels::{ + CuComplex, CuDoubleComplex, launch_amplitude_encode, launch_amplitude_encode_f32, +}; +#[cfg(target_os = "linux")] +use std::ffi::c_void; + +#[cfg(target_os = "linux")] +struct DistributedDeviceContextGuard { + original_device: i32, +} + +#[cfg(target_os = "linux")] +impl DistributedDeviceContextGuard { + fn switch_to(device_id: usize) -> Result { + let mut original_device = 0i32; + let get_ret = unsafe { cudaGetDevice(&mut original_device as *mut i32) }; + if get_ret != CUDA_SUCCESS { + return Err(MahoutError::Cuda(format!( + "cudaGetDevice failed before distributed shard launch: {} ({})", + get_ret, + cuda_error_to_string(get_ret) + ))); + } + + let set_ret = unsafe { cudaSetDevice(device_id as i32) }; + if set_ret != CUDA_SUCCESS { + return Err(MahoutError::Cuda(format!( + "cudaSetDevice(cuda:{}) failed before distributed shard launch: {} ({})", + device_id, + set_ret, + cuda_error_to_string(set_ret) + ))); + } + + Ok(Self { original_device }) + } +} + +#[cfg(target_os = "linux")] +impl Drop for DistributedDeviceContextGuard { + fn drop(&mut self) { + let _ = unsafe { cudaSetDevice(self.original_device) }; + } +} + +pub(crate) fn validate_distributed_input( + host_data: &[f64], + request: &PlacementRequest, +) -> Result<()> { + if request.num_qubits == 0 { + return Err(MahoutError::InvalidInput( + "Number of qubits must be at least 1 for distributed amplitude planning".to_string(), + )); + } + + if host_data.is_empty() { + return Err(MahoutError::InvalidInput( + "Input data cannot be empty".to_string(), + )); + } + + let state_len = request.global_len()?; + if host_data.len() > state_len { + return Err(MahoutError::InvalidInput(format!( + "Input data length {} exceeds state vector size {}", + host_data.len(), + state_len + ))); + } + + Ok(()) +} + +pub(crate) fn plan_distributed_encode( + execution: &DistributedExecutionContext<'_>, + host_data: &[f64], + request: PlacementRequest, +) -> Result { + validate_distributed_input(host_data, &request)?; + DistributedAmplitudePlan::for_request(execution.mesh(), request) +} + +pub(crate) fn calculate_local_norm_sq( + host_data: &[f64], + start_idx: usize, + end_idx: usize, +) -> Result { + if start_idx > end_idx { + return Err(MahoutError::InvalidInput(format!( + "Invalid shard range: start {} exceeds end {}", + start_idx, end_idx + ))); + } + + let slice_end = end_idx.min(host_data.len()); + if start_idx >= slice_end { + return Ok(0.0); + } + + let mut local_sum = 0.0f64; + for &value in &host_data[start_idx..slice_end] { + if !value.is_finite() { + return Err(MahoutError::InvalidInput( + "Input data contains NaN or Inf".to_string(), + )); + } + local_sum += value * value; + } + Ok(local_sum) +} + +pub(crate) fn calculate_inv_norm_distributed( + plan: &DistributedAmplitudePlan, + host_data: &[f64], + execution: &DistributedExecutionContext<'_>, +) -> Result { + let mut partials = Vec::with_capacity(plan.num_devices); + for shard_id in 0..plan.num_devices { + let (start_idx, end_idx) = plan.shard_range(shard_id)?; + partials.push(calculate_local_norm_sq(host_data, start_idx, end_idx)?); + } + + let global_norm_sq = execution.collectives().all_reduce_sum_f64(&partials)?; + if global_norm_sq <= 0.0 || !global_norm_sq.is_finite() { + return Err(MahoutError::InvalidInput( + "Input data has zero or non-finite norm (contains NaN, Inf, or all zeros)".to_string(), + )); + } + + Ok(1.0 / global_norm_sq.sqrt()) +} + +pub(crate) fn prepare_distributed_encode( + execution: &DistributedExecutionContext<'_>, + host_data: &[f64], + precision: Precision, + request: PlacementRequest, +) -> Result<(DistributedAmplitudePlan, f64, DistributedStateLayout)> { + let plan = plan_distributed_encode(execution, host_data, request)?; + let inv_norm = calculate_inv_norm_distributed(&plan, host_data, execution)?; + let layout = DistributedStateLayout::new(execution.mesh(), &plan, precision)?; + Ok((plan, inv_norm, layout)) +} + +#[cfg(target_os = "linux")] +pub(crate) fn encode_distributed_to_shards( + execution: &DistributedExecutionContext<'_>, + host_data: &[f64], + precision: Precision, + request: PlacementRequest, +) -> Result { + let (plan, inv_norm, layout) = + prepare_distributed_encode(execution, host_data, precision, request)?; + let num_qubits = plan.request.num_qubits; + + let mut buffers = Vec::with_capacity(plan.num_devices); + for placement in &plan.placement.placements { + let device = execution.mesh().device_for_id(placement.device_id)?; + let _device_guard = DistributedDeviceContextGuard::switch_to(placement.device_id)?; + let (start_idx, end_idx) = (placement.start_idx, placement.end_idx); + let local_len = end_idx - start_idx; + let slice_end = end_idx.min(host_data.len()); + let present_len = slice_end.saturating_sub(start_idx); + + let buffer = match precision { + Precision::Float32 => { + let requested_bytes = distributed_shard_bytes::(local_len)?; + ensure_device_memory_available( + requested_bytes, + "distributed amplitude shard allocation (f32)", + Some(num_qubits), + Some(placement.device_id), + )?; + let mut state_slice = device.alloc_zeros::(local_len).map_err(|e| { + map_allocation_error( + requested_bytes, + "distributed amplitude shard allocation (f32)", + Some(num_qubits), + Some(placement.device_id), + e, + ) + })?; + + if present_len > 0 { + let host_input = host_data[start_idx..slice_end] + .iter() + .map(|&value| value as f32) + .collect::>(); + let input_slice = device.htod_sync_copy(&host_input).map_err(|e| { + MahoutError::MemoryAllocation(format!( + "Failed to upload distributed amplitude shard input (f32): {:?}", + e + )) + })?; + + let ret = unsafe { + launch_amplitude_encode_f32( + *input_slice.device_ptr() as *const f32, + *state_slice.device_ptr_mut() as *mut c_void, + present_len, + local_len, + inv_norm as f32, + std::ptr::null_mut(), + ) + }; + + if ret != 0 { + return Err(MahoutError::KernelLaunch(format!( + "Distributed amplitude shard kernel failed on cuda:{} with CUDA error code: {} ({})", + placement.device_id, + ret, + cuda_error_to_string(ret) + ))); + } + + device.synchronize().map_err(|e| { + MahoutError::Cuda(format!( + "Distributed amplitude shard synchronize failed on cuda:{}: {:?}", + placement.device_id, e + )) + })?; + } + + Arc::new(BufferStorage::F32(GpuBufferRaw { slice: state_slice })) + } + Precision::Float64 => { + let requested_bytes = distributed_shard_bytes::(local_len)?; + ensure_device_memory_available( + requested_bytes, + "distributed amplitude shard allocation", + Some(num_qubits), + Some(placement.device_id), + )?; + let mut state_slice = + device + .alloc_zeros::(local_len) + .map_err(|e| { + map_allocation_error( + requested_bytes, + "distributed amplitude shard allocation", + Some(num_qubits), + Some(placement.device_id), + e, + ) + })?; + + if present_len > 0 { + let input_slice = device + .htod_sync_copy(&host_data[start_idx..slice_end]) + .map_err(|e| { + MahoutError::MemoryAllocation(format!( + "Failed to upload distributed amplitude shard input: {:?}", + e + )) + })?; + + let ret = unsafe { + launch_amplitude_encode( + *input_slice.device_ptr() as *const f64, + *state_slice.device_ptr_mut() as *mut c_void, + present_len, + local_len, + inv_norm, + std::ptr::null_mut(), + ) + }; + + if ret != 0 { + return Err(MahoutError::KernelLaunch(format!( + "Distributed amplitude shard kernel failed on cuda:{} with CUDA error code: {} ({})", + placement.device_id, + ret, + cuda_error_to_string(ret) + ))); + } + + device.synchronize().map_err(|e| { + MahoutError::Cuda(format!( + "Distributed amplitude shard synchronize failed on cuda:{}: {:?}", + placement.device_id, e + )) + })?; + } + + Arc::new(BufferStorage::F64(GpuBufferRaw { slice: state_slice })) + } + }; + + buffers.push(buffer); + } + + DistributedStateVector::new_with_buffers(layout, buffers) +} + +#[cfg(target_os = "linux")] +fn distributed_shard_bytes(len: usize) -> Result { + len.checked_mul(std::mem::size_of::()).ok_or_else(|| { + MahoutError::MemoryAllocation(format!( + "Distributed shard allocation size overflow (elements={})", + len + )) + }) +} diff --git a/qdp/qdp-core/src/gpu/distributed/shared.rs b/qdp/qdp-core/src/gpu/distributed/shared.rs new file mode 100644 index 0000000000..28f5708de6 --- /dev/null +++ b/qdp/qdp-core/src/gpu/distributed/shared.rs @@ -0,0 +1,64 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::gpu::topology::GpuTopology; + +pub(crate) fn policy_device_ids( + topology: &GpuTopology, + shard_device_ids: impl IntoIterator, +) -> (Option, Vec) { + let shard_device_ids = shard_device_ids.into_iter().collect::>(); + let recommended_gather_device_id = topology + .preferred_gather_index() + .and_then(|idx| shard_device_ids.get(idx).copied()); + let recommended_placement_device_ids = topology + .recommended_placement_order() + .into_iter() + .filter_map(|idx| shard_device_ids.get(idx).copied()) + .collect(); + + ( + recommended_gather_device_id, + recommended_placement_device_ids, + ) +} + +#[cfg(test)] +mod tests { + use super::policy_device_ids; + use crate::gpu::topology::{GpuTopology, LinkKind}; + + #[test] + fn policy_device_ids_skips_out_of_range_topology_indices() { + let topology = GpuTopology { + peer_access: vec![ + vec![true, true, false], + vec![true, true, true], + vec![false, true, true], + ], + links: vec![ + vec![LinkKind::SameDevice, LinkKind::Pix, LinkKind::Unknown], + vec![LinkKind::Pix, LinkKind::SameDevice, LinkKind::Node], + vec![LinkKind::Unknown, LinkKind::Node, LinkKind::SameDevice], + ], + }; + + let (gather, placement) = policy_device_ids(&topology, [17, 23]); + + assert_eq!(gather, Some(23)); + assert_eq!(placement, vec![23, 17]); + } +} diff --git a/qdp/qdp-core/src/gpu/distributed/state.rs b/qdp/qdp-core/src/gpu/distributed/state.rs new file mode 100644 index 0000000000..9f1cf4a38b --- /dev/null +++ b/qdp/qdp-core/src/gpu/distributed/state.rs @@ -0,0 +1,162 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use cudarc::driver::CudaDevice; +#[cfg(target_os = "linux")] +use qdp_kernels::{CuComplex, CuDoubleComplex}; + +use crate::error::{MahoutError, Result}; +use crate::gpu::memory::{BufferStorage, Precision}; + +use super::DistributedStateLayout; +use super::shared; + +/// One materialized shard of a distributed state vector. +#[derive(Clone)] +pub struct StateShard { + pub device: Arc, + pub device_id: usize, + pub shard_id: usize, + pub start_idx: usize, + pub end_idx: usize, + pub local_len: usize, + pub buffer: Arc, +} + +/// Materialized multi-GPU state vector with one live buffer per shard. +#[derive(Clone)] +pub struct DistributedStateVector { + pub num_qubits: usize, + pub precision: Precision, + pub global_len: usize, + pub shard_bits: Option, + pub topology: crate::gpu::topology::GpuTopology, + pub shards: Vec, +} + +impl DistributedStateVector { + pub fn recommended_gather_device_id(&self) -> Option { + shared::policy_device_ids( + &self.topology, + self.shards.iter().map(|shard| shard.device_id), + ) + .0 + } + + pub fn recommended_placement_device_ids(&self) -> Vec { + shared::policy_device_ids( + &self.topology, + self.shards.iter().map(|shard| shard.device_id), + ) + .1 + } + + pub fn num_shards(&self) -> usize { + self.shards.len() + } + + #[cfg(target_os = "linux")] + pub fn copy_shard_to_host_f64(&self, shard_id: usize) -> Result> { + let shard = self.shards.get(shard_id).ok_or_else(|| { + MahoutError::InvalidInput(format!( + "Shard ID {} out of range for {} shards", + shard_id, + self.shards.len() + )) + })?; + match shard.buffer.as_ref() { + BufferStorage::F64(buf) => shard.device.dtoh_sync_copy(&buf.slice).map_err(|e| { + MahoutError::Cuda(format!( + "Failed to copy distributed float64 shard {} to host: {:?}", + shard_id, e + )) + }), + BufferStorage::F32(_) => Err(MahoutError::InvalidInput(format!( + "Shard {} stores float32 data, not float64", + shard_id + ))), + } + } + + #[cfg(target_os = "linux")] + pub fn copy_shard_to_host_f32(&self, shard_id: usize) -> Result> { + let shard = self.shards.get(shard_id).ok_or_else(|| { + MahoutError::InvalidInput(format!( + "Shard ID {} out of range for {} shards", + shard_id, + self.shards.len() + )) + })?; + match shard.buffer.as_ref() { + BufferStorage::F32(buf) => shard.device.dtoh_sync_copy(&buf.slice).map_err(|e| { + MahoutError::Cuda(format!( + "Failed to copy distributed float32 shard {} to host: {:?}", + shard_id, e + )) + }), + BufferStorage::F64(_) => Err(MahoutError::InvalidInput(format!( + "Shard {} stores float64 data, not float32", + shard_id + ))), + } + } + + /// Construct a distributed state vector with concrete shard buffers already allocated. + pub fn new_with_buffers( + layout: DistributedStateLayout, + buffers: Vec>, + ) -> Result { + if buffers.len() != layout.shards.len() { + return Err(MahoutError::InvalidInput(format!( + "Distributed state buffer mismatch: {} buffers for {} shards", + buffers.len(), + layout.shards.len() + ))); + } + + let mut shards = Vec::with_capacity(layout.shards.len()); + for (shard_layout, buffer) in layout.shards.into_iter().zip(buffers.into_iter()) { + if buffer.precision() != layout.precision { + return Err(MahoutError::InvalidInput(format!( + "Distributed shard precision mismatch on shard {}: expected {:?}, got {:?}", + shard_layout.shard_id, + layout.precision, + buffer.precision() + ))); + } + shards.push(StateShard { + device: shard_layout.device, + device_id: shard_layout.device_id, + shard_id: shard_layout.shard_id, + start_idx: shard_layout.start_idx, + end_idx: shard_layout.end_idx, + local_len: shard_layout.local_len, + buffer, + }); + } + + Ok(Self { + num_qubits: layout.num_qubits, + precision: layout.precision, + global_len: layout.global_len, + shard_bits: layout.shard_bits, + topology: layout.topology, + shards, + }) + } +} diff --git a/qdp/qdp-core/src/gpu/encodings/amplitude.rs b/qdp/qdp-core/src/gpu/encodings/amplitude.rs index e3081b65d6..aa187649a1 100644 --- a/qdp/qdp-core/src/gpu/encodings/amplitude.rs +++ b/qdp/qdp-core/src/gpu/encodings/amplitude.rs @@ -86,6 +86,7 @@ impl QuantumEncoder for AmplitudeEncoder { input_bytes, "input staging buffer", Some(num_qubits), + Some(_device.ordinal()), )?; let input_slice = { @@ -95,6 +96,7 @@ impl QuantumEncoder for AmplitudeEncoder { input_bytes, "input staging buffer", Some(num_qubits), + Some(_device.ordinal()), e, ) })? @@ -693,6 +695,29 @@ impl QuantumEncoder for AmplitudeEncoder { } impl AmplitudeEncoder { + #[cfg(target_os = "linux")] + fn async_chunk_state_len( + state_len: usize, + chunk_offset: usize, + chunk_len: usize, + ) -> Result { + let remaining = state_len.checked_sub(chunk_offset).ok_or_else(|| { + MahoutError::InvalidInput(format!( + "Async amplitude chunk offset {} exceeds state length {}", + chunk_offset, state_len + )) + })?; + + if chunk_len > remaining { + return Err(MahoutError::InvalidInput(format!( + "Async amplitude chunk length {} at offset {} exceeds remaining state length {}", + chunk_len, chunk_offset, remaining + ))); + } + + Ok(chunk_len) + } + /// Async pipeline encoding for large data /// /// Uses the generic dual-stream pipeline infrastructure to overlap @@ -720,6 +745,8 @@ impl AmplitudeEncoder { device, host_data, |stream, input_ptr, chunk_offset, chunk_len| { + let chunk_state_len = + Self::async_chunk_state_len(state_len, chunk_offset, chunk_len)?; // Calculate offset pointer for state vector (type-safe pointer arithmetic) // Offset is in complex numbers (CuDoubleComplex), not f64 elements let state_ptr_offset = unsafe { @@ -735,7 +762,7 @@ impl AmplitudeEncoder { input_ptr, state_ptr_offset, chunk_len, - state_len, + chunk_state_len, inv_norm, stream.stream as *mut c_void, ) @@ -1093,3 +1120,62 @@ impl AmplitudeEncoder { Ok(()) } } + +#[cfg(test)] +mod tests { + use super::AmplitudeEncoder; + #[cfg(target_os = "linux")] + use crate::gpu::QuantumEncoder; + #[cfg(target_os = "linux")] + use cudarc::driver::CudaDevice; + + #[test] + #[cfg(target_os = "linux")] + fn async_chunk_state_len_uses_chunk_local_extent() { + assert_eq!( + AmplitudeEncoder::async_chunk_state_len(1024, 256, 128).unwrap(), + 128 + ); + } + + #[test] + #[cfg(target_os = "linux")] + fn async_chunk_state_len_rejects_overrun() { + let err = AmplitudeEncoder::async_chunk_state_len(1024, 1000, 64).unwrap_err(); + assert!(matches!(err, crate::error::MahoutError::InvalidInput(_))); + } + + #[test] + #[cfg(target_os = "linux")] + fn async_pipeline_large_encode_q22_matches_reference_slices() { + let device = match CudaDevice::new(0) { + Ok(device) => device, + Err(_) => return, + }; + + let num_qubits = 22usize; + let state_len = 1usize << num_qubits; + let mut input = vec![0.0f64; state_len]; + for (idx, value) in input.iter_mut().enumerate() { + *value = ((idx % 1024) as f64) + 1.0; + } + + let encoder = AmplitudeEncoder; + let state = encoder.encode(&device, &input, num_qubits).unwrap(); + let host = state.copy_to_host_f64(&device).unwrap(); + let norm = input.iter().map(|value| value * value).sum::().sqrt(); + + for idx in [0usize, 1, 1024, state_len / 2, state_len - 1] { + let expected = input[idx] / norm; + let actual = host[idx].x; + assert!( + (actual - expected).abs() < 1e-12, + "Mismatch at amplitude {}: actual={} expected={}", + idx, + actual, + expected + ); + assert_eq!(host[idx].y, 0.0); + } + } +} diff --git a/qdp/qdp-core/src/gpu/encodings/angle.rs b/qdp/qdp-core/src/gpu/encodings/angle.rs index b0c6773ff5..ef39f14611 100644 --- a/qdp/qdp-core/src/gpu/encodings/angle.rs +++ b/qdp/qdp-core/src/gpu/encodings/angle.rs @@ -57,7 +57,13 @@ impl QuantumEncoder for AngleEncoder { let angles_gpu = { crate::profile_scope!("GPU::H2D_Angles"); device.htod_sync_copy(data).map_err(|e| { - map_allocation_error(input_bytes, "angle input upload", Some(num_qubits), e) + map_allocation_error( + input_bytes, + "angle input upload", + Some(num_qubits), + Some(device.ordinal()), + e, + ) })? }; @@ -175,7 +181,13 @@ impl QuantumEncoder for AngleEncoder { let angles_gpu = { crate::profile_scope!("GPU::H2D_BatchAngles"); device.htod_sync_copy(batch_data).map_err(|e| { - map_allocation_error(input_bytes, "angle batch upload", Some(num_qubits), e) + map_allocation_error( + input_bytes, + "angle batch upload", + Some(num_qubits), + Some(device.ordinal()), + e, + ) })? }; @@ -435,7 +447,13 @@ impl QuantumEncoder for AngleEncoder { let angles_gpu = { crate::profile_scope!("GPU::H2D_BatchAnglesF32"); device.htod_sync_copy(batch_data).map_err(|e| { - map_allocation_error(input_bytes, "angle batch upload", Some(num_qubits), e) + map_allocation_error( + input_bytes, + "angle batch upload", + Some(num_qubits), + Some(device.ordinal()), + e, + ) })? }; diff --git a/qdp/qdp-core/src/gpu/encodings/basis.rs b/qdp/qdp-core/src/gpu/encodings/basis.rs index 494b385af2..db07c74b4f 100644 --- a/qdp/qdp-core/src/gpu/encodings/basis.rs +++ b/qdp/qdp-core/src/gpu/encodings/basis.rs @@ -180,6 +180,7 @@ impl QuantumEncoder for BasisEncoder { num_samples * std::mem::size_of::(), "basis indices upload", Some(num_qubits), + Some(device.ordinal()), e, ) })? diff --git a/qdp/qdp-core/src/gpu/encodings/iqp.rs b/qdp/qdp-core/src/gpu/encodings/iqp.rs index c6ecf17624..fc925c2393 100644 --- a/qdp/qdp-core/src/gpu/encodings/iqp.rs +++ b/qdp/qdp-core/src/gpu/encodings/iqp.rs @@ -82,7 +82,13 @@ impl QuantumEncoder for IqpEncoder { let data_gpu = { crate::profile_scope!("GPU::H2D_IqpData"); device.htod_sync_copy(data).map_err(|e| { - map_allocation_error(input_bytes, "IQP input upload", Some(num_qubits), e) + map_allocation_error( + input_bytes, + "IQP input upload", + Some(num_qubits), + Some(device.ordinal()), + e, + ) })? }; @@ -193,7 +199,13 @@ impl QuantumEncoder for IqpEncoder { let data_gpu = { crate::profile_scope!("GPU::H2D_BatchIqpData"); device.htod_sync_copy(batch_data).map_err(|e| { - map_allocation_error(input_bytes, "IQP batch upload", Some(num_qubits), e) + map_allocation_error( + input_bytes, + "IQP batch upload", + Some(num_qubits), + Some(device.ordinal()), + e, + ) })? }; diff --git a/qdp/qdp-core/src/gpu/encodings/phase.rs b/qdp/qdp-core/src/gpu/encodings/phase.rs index b329a2429e..335bcfd85d 100644 --- a/qdp/qdp-core/src/gpu/encodings/phase.rs +++ b/qdp/qdp-core/src/gpu/encodings/phase.rs @@ -80,7 +80,13 @@ impl QuantumEncoder for PhaseEncoder { let phases_gpu = { crate::profile_scope!("GPU::H2D_Phases"); device.htod_sync_copy(data).map_err(|e| { - map_allocation_error(input_bytes, "phase input upload", Some(num_qubits), e) + map_allocation_error( + input_bytes, + "phase input upload", + Some(num_qubits), + Some(device.ordinal()), + e, + ) })? }; @@ -199,7 +205,13 @@ impl QuantumEncoder for PhaseEncoder { let phases_gpu = { crate::profile_scope!("GPU::H2D_BatchPhases"); device.htod_sync_copy(batch_data).map_err(|e| { - map_allocation_error(input_bytes, "phase batch upload", Some(num_qubits), e) + map_allocation_error( + input_bytes, + "phase batch upload", + Some(num_qubits), + Some(device.ordinal()), + e, + ) })? }; diff --git a/qdp/qdp-core/src/gpu/memory.rs b/qdp/qdp-core/src/gpu/memory.rs index 661060b550..c39f6c5616 100644 --- a/qdp/qdp-core/src/gpu/memory.rs +++ b/qdp/qdp-core/src/gpu/memory.rs @@ -43,7 +43,9 @@ pub enum GpuDeviceType { } #[cfg(target_os = "linux")] -use crate::gpu::cuda_ffi::{cudaFreeHost, cudaHostAlloc, cudaMemGetInfo}; +use crate::gpu::cuda_ffi::{ + CUDA_SUCCESS, cudaFreeHost, cudaGetDevice, cudaHostAlloc, cudaMemGetInfo, cudaSetDevice, +}; #[cfg(target_os = "linux")] fn bytes_to_mib(bytes: usize) -> f64 { @@ -51,7 +53,49 @@ fn bytes_to_mib(bytes: usize) -> f64 { } #[cfg(target_os = "linux")] -fn query_cuda_mem_info() -> Result<(usize, usize)> { +struct DeviceContextGuard { + original_device: i32, +} + +#[cfg(target_os = "linux")] +impl DeviceContextGuard { + fn switch_to(target_device: Option) -> Result { + let mut original_device = 0i32; + let get_ret = unsafe { cudaGetDevice(&mut original_device as *mut i32) }; + if get_ret != CUDA_SUCCESS { + return Err(MahoutError::Cuda(format!( + "cudaGetDevice failed: {} ({})", + get_ret, + cuda_error_to_string(get_ret) + ))); + } + + if let Some(device_id) = target_device { + let set_ret = unsafe { cudaSetDevice(device_id as i32) }; + if set_ret != CUDA_SUCCESS { + return Err(MahoutError::Cuda(format!( + "cudaSetDevice(cuda:{}) failed: {} ({})", + device_id, + set_ret, + cuda_error_to_string(set_ret) + ))); + } + } + + Ok(Self { original_device }) + } +} + +#[cfg(target_os = "linux")] +impl Drop for DeviceContextGuard { + fn drop(&mut self) { + let _ = unsafe { cudaSetDevice(self.original_device) }; + } +} + +#[cfg(target_os = "linux")] +fn query_cuda_mem_info(device_id: Option) -> Result<(usize, usize)> { + let _guard = DeviceContextGuard::switch_to(device_id)?; unsafe { let mut free_bytes: usize = 0; let mut total_bytes: usize = 0; @@ -101,8 +145,9 @@ pub(crate) fn ensure_device_memory_available( requested_bytes: usize, context: &str, qubits: Option, + device_id: Option, ) -> Result<()> { - let (free, total) = query_cuda_mem_info()?; + let (free, total) = query_cuda_mem_info(device_id)?; if (requested_bytes as u64) > (free as u64) { return Err(MahoutError::MemoryAllocation(build_oom_message( @@ -123,9 +168,10 @@ pub(crate) fn map_allocation_error( requested_bytes: usize, context: &str, qubits: Option, + device_id: Option, source: impl std::fmt::Debug, ) -> MahoutError { - match query_cuda_mem_info() { + match query_cuda_mem_info(device_id) { Ok((free, total)) => { if (requested_bytes as u64) > (free as u64) { MahoutError::MemoryAllocation(build_oom_message( @@ -175,28 +221,28 @@ pub enum BufferStorage { } impl BufferStorage { - fn precision(&self) -> Precision { + pub(crate) fn precision(&self) -> Precision { match self { BufferStorage::F32(_) => Precision::Float32, BufferStorage::F64(_) => Precision::Float64, } } - fn ptr_void(&self) -> *mut c_void { + pub(crate) fn ptr_void(&self) -> *mut c_void { match self { BufferStorage::F32(buf) => buf.ptr() as *mut c_void, BufferStorage::F64(buf) => buf.ptr() as *mut c_void, } } - fn ptr_f64(&self) -> Option<*mut CuDoubleComplex> { + pub(crate) fn ptr_f64(&self) -> Option<*mut CuDoubleComplex> { match self { BufferStorage::F64(buf) => Some(buf.ptr()), _ => None, } } - fn ptr_f32(&self) -> Option<*mut CuComplex> { + pub(crate) fn ptr_f32(&self) -> Option<*mut CuComplex> { match self { BufferStorage::F32(buf) => Some(buf.ptr()), _ => None, @@ -250,6 +296,7 @@ impl GpuStateVector { requested_bytes, "state vector allocation (f32)", Some(qubits), + Some(_device.ordinal()), )?; let slice = unsafe { _device.alloc::(_size_elements) }.map_err(|e| { @@ -257,6 +304,7 @@ impl GpuStateVector { requested_bytes, "state vector allocation (f32)", Some(qubits), + Some(_device.ordinal()), e, ) })?; @@ -277,6 +325,7 @@ impl GpuStateVector { requested_bytes, "state vector allocation", Some(qubits), + Some(_device.ordinal()), )?; let slice = @@ -285,6 +334,7 @@ impl GpuStateVector { requested_bytes, "state vector allocation", Some(qubits), + Some(_device.ordinal()), e, ) })?; @@ -339,6 +389,30 @@ impl GpuStateVector { self.num_qubits } + #[cfg(target_os = "linux")] + pub fn copy_to_host_f64(&self, device: &Arc) -> Result> { + match self.buffer.as_ref() { + BufferStorage::F64(buf) => device.dtoh_sync_copy(&buf.slice).map_err(|e| { + MahoutError::Cuda(format!("Failed to copy float64 state to host: {:?}", e)) + }), + BufferStorage::F32(_) => Err(MahoutError::InvalidInput( + "State vector stores float32 data, not float64".to_string(), + )), + } + } + + #[cfg(target_os = "linux")] + pub fn copy_to_host_f32(&self, device: &Arc) -> Result> { + match self.buffer.as_ref() { + BufferStorage::F32(buf) => device.dtoh_sync_copy(&buf.slice).map_err(|e| { + MahoutError::Cuda(format!("Failed to copy float32 state to host: {:?}", e)) + }), + BufferStorage::F64(_) => Err(MahoutError::InvalidInput( + "State vector stores float64 data, not float32".to_string(), + )), + } + } + /// Get the size in elements (2^n where n is number of qubits) pub fn size_elements(&self) -> usize { self.size_elements @@ -374,11 +448,22 @@ impl GpuStateVector { })?; let context = "batch state vector allocation (f32)"; - ensure_device_memory_available(requested_bytes, context, Some(qubits))?; + ensure_device_memory_available( + requested_bytes, + context, + Some(qubits), + Some(_device.ordinal()), + )?; let slice = unsafe { _device.alloc::(total_elements) }.map_err(|e| { - map_allocation_error(requested_bytes, context, Some(qubits), e) + map_allocation_error( + requested_bytes, + context, + Some(qubits), + Some(_device.ordinal()), + e, + ) })?; BufferStorage::F32(GpuBufferRaw { slice }) @@ -394,11 +479,22 @@ impl GpuStateVector { })?; let context = "batch state vector allocation"; - ensure_device_memory_available(requested_bytes, context, Some(qubits))?; + ensure_device_memory_available( + requested_bytes, + context, + Some(qubits), + Some(_device.ordinal()), + )?; let slice = unsafe { _device.alloc::(total_elements) } .map_err(|e| { - map_allocation_error(requested_bytes, context, Some(qubits), e) + map_allocation_error( + requested_bytes, + context, + Some(qubits), + Some(_device.ordinal()), + e, + ) })?; BufferStorage::F64(GpuBufferRaw { slice }) @@ -450,6 +546,7 @@ impl GpuStateVector { requested_bytes, "state vector precision conversion", Some(self.num_qubits), + Some(device.ordinal()), )?; let slice = unsafe { device.alloc::(self.size_elements) } @@ -458,6 +555,7 @@ impl GpuStateVector { requested_bytes, "state vector precision conversion", Some(self.num_qubits), + Some(device.ordinal()), e, ) })?; @@ -526,6 +624,7 @@ impl GpuStateVector { requested_bytes, "state vector precision conversion", Some(self.num_qubits), + Some(device.ordinal()), )?; let slice = @@ -534,6 +633,7 @@ impl GpuStateVector { requested_bytes, "state vector precision conversion", Some(self.num_qubits), + Some(device.ordinal()), e, ) })?; diff --git a/qdp/qdp-core/src/gpu/mod.rs b/qdp/qdp-core/src/gpu/mod.rs index 7e16be7be3..ec2dff3123 100644 --- a/qdp/qdp-core/src/gpu/mod.rs +++ b/qdp/qdp-core/src/gpu/mod.rs @@ -16,8 +16,10 @@ #[cfg(target_os = "linux")] pub mod buffer_pool; +pub mod communicator; #[cfg(target_os = "linux")] pub(crate) mod cuda_sync; +pub mod distributed; pub mod encodings; pub mod memory; #[cfg(target_os = "linux")] @@ -25,15 +27,23 @@ pub mod overlap_tracker; pub mod pipeline; #[cfg(target_os = "linux")] pub mod pool_metrics; +pub mod topology; #[cfg(target_os = "linux")] pub(crate) mod cuda_ffi; #[cfg(target_os = "linux")] pub use buffer_pool::{PinnedBufferHandle, PinnedBufferPool}; +pub use communicator::{CollectiveCommunicator, LocalCollectiveCommunicator}; +pub use distributed::{ + DistributedAmplitudePlan, DistributedExecutionContext, DistributedStateLayout, + DistributedStateVector, DistributionMode, PlacementPlan, PlacementPlanner, PlacementRequest, + PreparedDistributedAmplitudeEncode, ShardPlacement, ShardPolicy, StateShard, StateShardLayout, +}; pub use encodings::{AmplitudeEncoder, AngleEncoder, BasisEncoder, QuantumEncoder, get_encoder}; pub use memory::GpuStateVector; pub use pipeline::run_dual_stream_pipeline; +pub use topology::{DeviceMesh, GpuTopology, LinkKind}; #[cfg(target_os = "linux")] pub use overlap_tracker::OverlapTracker; diff --git a/qdp/qdp-core/src/gpu/pipeline.rs b/qdp/qdp-core/src/gpu/pipeline.rs index 9a715df6f8..d960d0ce95 100644 --- a/qdp/qdp-core/src/gpu/pipeline.rs +++ b/qdp/qdp-core/src/gpu/pipeline.rs @@ -379,12 +379,23 @@ where crate::profile_scope!("GPU::ChunkProcess"); let chunk_bytes = std::mem::size_of_val(chunk); - ensure_device_memory_available(chunk_bytes, "pipeline chunk buffer allocation", None)?; + ensure_device_memory_available( + chunk_bytes, + "pipeline chunk buffer allocation", + None, + Some(device.ordinal()), + )?; // Allocate temporary device buffer for this chunk #[allow(clippy::collapsible_if, clippy::manual_is_multiple_of)] let input_chunk_dev = unsafe { device.alloc::(chunk.len()) }.map_err(|e| { - map_allocation_error(chunk_bytes, "pipeline chunk buffer allocation", None, e) + map_allocation_error( + chunk_bytes, + "pipeline chunk buffer allocation", + None, + Some(device.ordinal()), + e, + ) })?; // Acquire pinned staging buffer and populate it with the current chunk diff --git a/qdp/qdp-core/src/gpu/topology.rs b/qdp/qdp-core/src/gpu/topology.rs new file mode 100644 index 0000000000..f956c7bd9b --- /dev/null +++ b/qdp/qdp-core/src/gpu/topology.rs @@ -0,0 +1,308 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use cudarc::driver::CudaDevice; + +use crate::error::{MahoutError, Result}; +#[cfg(target_os = "linux")] +use crate::{ + cuda_error_to_string, + gpu::cuda_ffi::{CUDA_SUCCESS, cudaDeviceCanAccessPeer}, +}; + +/// Coarse-grained GPU interconnect classification used by placement policies. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum LinkKind { + SameDevice, + Pix, + Node, + Sys, + Unknown, +} + +impl LinkKind { + fn score(self) -> usize { + match self { + LinkKind::SameDevice => 4, + LinkKind::Pix => 3, + LinkKind::Node => 2, + LinkKind::Sys => 1, + LinkKind::Unknown => 0, + } + } +} + +/// Runtime topology metadata for a device mesh. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct GpuTopology { + pub peer_access: Vec>, + pub links: Vec>, +} + +impl GpuTopology { + /// Create a placeholder topology where only self-access is guaranteed. + pub fn placeholder(num_devices: usize) -> Self { + let mut peer_access = vec![vec![false; num_devices]; num_devices]; + let mut links = vec![vec![LinkKind::Unknown; num_devices]; num_devices]; + + for idx in 0..num_devices { + peer_access[idx][idx] = true; + links[idx][idx] = LinkKind::SameDevice; + } + + Self { peer_access, links } + } + + #[cfg(target_os = "linux")] + pub fn probe(device_ids: &[usize]) -> Result { + let num_devices = device_ids.len(); + let mut topology = Self::placeholder(num_devices); + + for (src_idx, &src_device_id) in device_ids.iter().enumerate() { + for (dst_idx, &dst_device_id) in device_ids.iter().enumerate() { + if src_idx == dst_idx { + continue; + } + + let mut can_access_peer = 0i32; + let ret = unsafe { + cudaDeviceCanAccessPeer( + &mut can_access_peer as *mut i32, + src_device_id as i32, + dst_device_id as i32, + ) + }; + + if ret != CUDA_SUCCESS { + return Err(MahoutError::Cuda(format!( + "cudaDeviceCanAccessPeer(cuda:{}, cuda:{}) failed: {} ({})", + src_device_id, + dst_device_id, + ret, + cuda_error_to_string(ret) + ))); + } + + topology.peer_access[src_idx][dst_idx] = can_access_peer != 0; + } + } + + Ok(topology) + } + + pub fn preferred_gather_index(&self) -> Option { + if self.peer_access.is_empty() { + return None; + } + + let mut best_idx = 0usize; + let mut best_score = 0usize; + for idx in 0..self.peer_access.len() { + let mut score = 0usize; + for peer_idx in 0..self.peer_access.len() { + if self.peer_access[idx][peer_idx] { + score += 10 + self.links[idx][peer_idx].score(); + } + if self.peer_access[peer_idx][idx] { + score += 10 + self.links[peer_idx][idx].score(); + } + } + + if score > best_score { + best_score = score; + best_idx = idx; + } + } + + Some(best_idx) + } + + pub fn recommended_placement_order(&self) -> Vec { + let Some(root_idx) = self.preferred_gather_index() else { + return Vec::new(); + }; + + let mut indices: Vec = (0..self.peer_access.len()).collect(); + indices.sort_by_key(|&idx| { + let mut score = 0usize; + if self.peer_access[root_idx][idx] { + score += 10 + self.links[root_idx][idx].score(); + } + if self.peer_access[idx][root_idx] { + score += 10 + self.links[idx][root_idx].score(); + } + (usize::MAX - score, idx) + }); + indices + } +} + +/// A validated collection of CUDA devices that back one distributed execution context. +#[derive(Clone)] +pub struct DeviceMesh { + pub device_ids: Vec, + pub devices: Vec>, + pub topology: GpuTopology, +} + +impl DeviceMesh { + /// Build a mesh from CUDA device IDs. + /// + /// On Linux this probes peer-access reachability so placement policies can + /// prefer the most connected devices without changing the caller-provided + /// device set. + pub fn new(device_ids: Vec) -> Result { + Self::validate_device_ids(&device_ids)?; + + let mut devices = Vec::with_capacity(device_ids.len()); + for &device_id in &device_ids { + let device = CudaDevice::new(device_id).map_err(|e| { + MahoutError::Cuda(format!( + "Failed to initialize CUDA device {} for device mesh: {:?}", + device_id, e + )) + })?; + devices.push(device); + } + + #[cfg(target_os = "linux")] + let topology = GpuTopology::probe(&device_ids)?; + #[cfg(not(target_os = "linux"))] + let topology = GpuTopology::placeholder(device_ids.len()); + + Ok(Self { + device_ids, + devices, + topology, + }) + } + + /// Build a mesh from explicit parts. Intended for tests and injected + /// topology metadata. + pub fn from_parts( + device_ids: Vec, + devices: Vec>, + topology: GpuTopology, + ) -> Result { + Self::validate_device_ids(&device_ids)?; + + if device_ids.len() != devices.len() { + return Err(MahoutError::InvalidInput(format!( + "Device mesh mismatch: {} device IDs but {} device handles", + device_ids.len(), + devices.len() + ))); + } + for (&device_id, device) in device_ids.iter().zip(devices.iter()) { + if device.ordinal() != device_id { + return Err(MahoutError::InvalidInput(format!( + "Device mesh ordinal mismatch: declared cuda:{} but handle targets cuda:{}", + device_id, + device.ordinal() + ))); + } + } + + let num_devices = device_ids.len(); + if topology.peer_access.len() != num_devices || topology.links.len() != num_devices { + return Err(MahoutError::InvalidInput(format!( + "Topology dimension mismatch for {} devices", + num_devices + ))); + } + if topology + .peer_access + .iter() + .any(|row| row.len() != num_devices) + || topology.links.iter().any(|row| row.len() != num_devices) + { + return Err(MahoutError::InvalidInput(format!( + "Topology row width mismatch for {} devices", + num_devices + ))); + } + + Ok(Self { + device_ids, + devices, + topology, + }) + } + + pub fn num_devices(&self) -> usize { + self.device_ids.len() + } + + pub fn validate_power_of_two(&self) -> Result<()> { + let num_devices = self.num_devices(); + if !num_devices.is_power_of_two() { + return Err(MahoutError::InvalidInput(format!( + "Distributed QDP currently requires a power-of-two device count, got {}", + num_devices + ))); + } + Ok(()) + } + + pub fn recommended_gather_device_id(&self) -> Option { + self.topology + .preferred_gather_index() + .map(|idx| self.device_ids[idx]) + } + + pub fn recommended_placement_device_ids(&self) -> Vec { + self.topology + .recommended_placement_order() + .into_iter() + .map(|idx| self.device_ids[idx]) + .collect() + } + + pub fn device_for_id(&self, device_id: usize) -> Result> { + let index = self + .device_ids + .iter() + .position(|&candidate| candidate == device_id) + .ok_or_else(|| { + MahoutError::InvalidInput(format!( + "Device mesh does not contain cuda:{}", + device_id + )) + })?; + Ok(Arc::clone(&self.devices[index])) + } + + fn validate_device_ids(device_ids: &[usize]) -> Result<()> { + if device_ids.is_empty() { + return Err(MahoutError::InvalidInput( + "Device mesh requires at least one device ID".to_string(), + )); + } + + let mut sorted = device_ids.to_vec(); + sorted.sort_unstable(); + sorted.dedup(); + if sorted.len() != device_ids.len() { + return Err(MahoutError::InvalidInput( + "Device mesh contains duplicate device IDs".to_string(), + )); + } + + Ok(()) + } +} diff --git a/qdp/qdp-core/src/lib.rs b/qdp/qdp-core/src/lib.rs index 799eb7b180..fb0ff4af96 100644 --- a/qdp/qdp-core/src/lib.rs +++ b/qdp/qdp-core/src/lib.rs @@ -36,6 +36,11 @@ mod profiling; pub use error::{MahoutError, Result, cuda_error_to_string}; pub use gpu::memory::Precision; +pub use gpu::{ + DeviceMesh, DistributedAmplitudePlan, DistributedExecutionContext, DistributedStateVector, + DistributionMode, GpuTopology, LinkKind, PlacementRequest, PreparedDistributedAmplitudeEncode, + ShardPolicy, +}; pub use reader::{NullHandling, handle_float64_nulls}; // Throughput/latency pipeline runner: single path using QdpEngine and encode_batch in Rust. @@ -52,6 +57,7 @@ use std::ffi::c_void; use std::sync::Arc; use crate::dlpack::DLManagedTensor; +use crate::gpu::distributed::runtime; use crate::gpu::get_encoder; use cudarc::driver::CudaDevice; @@ -138,6 +144,124 @@ impl QdpEngine { }) } + /// Build a validated device mesh for one distributed execution context. + pub fn new_distributed_mesh(device_ids: Vec) -> Result { + DeviceMesh::new(device_ids) + } + + /// Prepare a distributed amplitude encode on a single-process device mesh. + /// + /// This is a convenience wrapper over `prepare_distributed_amplitude_on` + /// using an in-process collective implementation. + #[doc(hidden)] + pub fn prepare_distributed_amplitude( + device_ids: Vec, + data: &[f64], + num_qubits: usize, + precision: Precision, + request: Option, + ) -> Result { + let request = Self::resolve_distributed_request(num_qubits, request)?; + runtime::validate_distributed_input(data, &request)?; + let collectives = crate::gpu::LocalCollectiveCommunicator; + let execution = DistributedExecutionContext::single_process(device_ids, &collectives)?; + Self::prepare_distributed_amplitude_on( + &execution, + data, + num_qubits, + precision, + Some(request), + ) + } + + /// Prepare a distributed amplitude encode on an explicit execution context. + /// + /// The execution context owns the participating device mesh and the + /// collective implementation that coordinates those shards. + #[doc(hidden)] + pub fn prepare_distributed_amplitude_on( + execution: &DistributedExecutionContext<'_>, + data: &[f64], + num_qubits: usize, + precision: Precision, + request: Option, + ) -> Result { + let request = Self::resolve_distributed_request(num_qubits, request)?; + runtime::validate_distributed_input(data, &request)?; + let (plan, inv_norm, layout) = + runtime::prepare_distributed_encode(execution, data, precision, request)?; + + Ok(PreparedDistributedAmplitudeEncode { + mesh: execution.mesh().clone(), + plan, + inv_norm, + layout, + }) + } + + /// Encode amplitude data into real per-device shard buffers on a + /// single-process device mesh. + /// + /// This is a convenience wrapper over `encode_distributed_amplitude_to_shards_on` + /// using an in-process collective implementation. + #[doc(hidden)] + pub fn encode_distributed_amplitude_to_shards( + device_ids: Vec, + data: &[f64], + num_qubits: usize, + precision: Precision, + request: Option, + ) -> Result { + let request = Self::resolve_distributed_request(num_qubits, request)?; + runtime::validate_distributed_input(data, &request)?; + let collectives = crate::gpu::LocalCollectiveCommunicator; + let execution = DistributedExecutionContext::single_process(device_ids, &collectives)?; + Self::encode_distributed_amplitude_to_shards_on( + &execution, + data, + num_qubits, + precision, + Some(request), + ) + } + + /// Materialize a distributed amplitude state on an explicit execution + /// context. + #[doc(hidden)] + pub fn encode_distributed_amplitude_to_shards_on( + execution: &DistributedExecutionContext<'_>, + data: &[f64], + num_qubits: usize, + precision: Precision, + request: Option, + ) -> Result { + let request = Self::resolve_distributed_request(num_qubits, request)?; + runtime::validate_distributed_input(data, &request)?; + runtime::encode_distributed_to_shards(execution, data, precision, request) + } + + fn resolve_distributed_request( + num_qubits: usize, + request: Option, + ) -> Result { + match request { + Some(request) => { + if request.num_qubits != num_qubits { + return Err(MahoutError::InvalidInput(format!( + "Distributed request qubit mismatch: argument specifies {} qubits but request specifies {}", + num_qubits, request.num_qubits + ))); + } + Ok(request) + } + None => Ok(PlacementRequest::new( + num_qubits, + DistributionMode::ShardedCapacity, + ShardPolicy::Equal, + )), + } + } + /// Encode classical data into quantum state /// /// Selects encoding strategy, executes on GPU, returns DLPack pointer. diff --git a/qdp/qdp-core/tests/gpu_api_workflow.rs b/qdp/qdp-core/tests/gpu_api_workflow.rs index 6c8e651669..cd79ac32a6 100644 --- a/qdp/qdp-core/tests/gpu_api_workflow.rs +++ b/qdp/qdp-core/tests/gpu_api_workflow.rs @@ -18,9 +18,9 @@ #[cfg(target_os = "linux")] use qdp_core::MahoutError; -use qdp_core::QdpEngine; #[cfg(target_os = "linux")] use qdp_core::gpu::pipeline::run_dual_stream_pipeline_aligned; +use qdp_core::{Precision, QdpEngine}; mod common; @@ -63,7 +63,6 @@ fn test_amplitude_encoding_workflow() { assert!(!dlpack_ptr.is_null(), "DLPack pointer should not be null"); println!("PASS: Encoding succeeded, DLPack pointer valid"); - // Simulate PyTorch behavior: manually call deleter to free GPU memory unsafe { println!("Calling deleter to free GPU memory"); common::take_deleter_and_delete(dlpack_ptr); @@ -81,7 +80,6 @@ fn test_amplitude_encoding_async_pipeline() { return; }; - // Use 200000 elements to trigger async pipeline path (ASYNC_THRESHOLD = 131072) let data = common::create_test_data(200000); println!("Created test data: {} elements", data.len()); @@ -109,7 +107,7 @@ fn test_angle_encoding_async_pipeline() { let num_qubits = 4; let sample_size = num_qubits; - let num_samples = 32768; // 32768 * 4 = 131072 elements (>= 1MB threshold) + let num_samples = 32768; let batch_data = common::create_test_data(num_samples * sample_size); let result = engine.encode_batch(&batch_data, num_samples, sample_size, num_qubits, "angle"); @@ -162,7 +160,6 @@ fn test_batch_dlpack_2d_shape() { return; }; - // Create batch data: 3 samples, each with 4 elements (2 qubits) let num_samples = 3; let num_qubits = 2; let sample_size = 4; @@ -248,6 +245,54 @@ fn test_single_encode_dlpack_2d_shape() { } } +#[test] +#[cfg(target_os = "linux")] +fn test_distributed_amplitude_two_gpu_smoke() { + println!("Testing distributed amplitude shard encode on two GPUs..."); + + if common::cuda_device().is_none() { + println!("SKIP: No GPU available"); + return; + } + + if cudarc::driver::CudaDevice::new(1).is_err() { + println!("SKIP: Second GPU unavailable"); + return; + } + + let data = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0]; + let distributed = QdpEngine::encode_distributed_amplitude_to_shards( + vec![0, 1], + &data, + 3, + Precision::Float64, + None, + ) + .expect("Distributed amplitude shard encode should succeed"); + + assert_eq!(distributed.num_shards(), 2, "Expected two shards"); + assert_eq!(distributed.shards[0].device_id, 0); + assert_eq!(distributed.shards[1].device_id, 1); + + let shard0 = distributed.copy_shard_to_host_f64(0).unwrap(); + let shard1 = distributed.copy_shard_to_host_f64(1).unwrap(); + + assert_eq!(shard0.len(), 4); + assert_eq!(shard1.len(), 4); + + let total_prob: f64 = shard0 + .iter() + .chain(shard1.iter()) + .map(|value| value.x * value.x + value.y * value.y) + .sum(); + assert!( + (total_prob - 1.0).abs() < 1e-10, + "Distributed state should remain normalized" + ); + + println!("PASS: Distributed amplitude shard encode succeeded on two GPUs"); +} + #[test] #[cfg(target_os = "linux")] fn test_dlpack_device_id() { @@ -269,13 +314,11 @@ fn test_dlpack_device_id() { let managed = &*dlpack_ptr; let tensor = &managed.dl_tensor; - // Verify device_id is correctly set (0 for device 0) assert_eq!( tensor.device.device_id, 0, "device_id should be 0 for device 0" ); - // Verify device_type is CUDA (kDLCUDA = 2) use qdp_core::dlpack::DLDeviceType; match tensor.device.device_type { DLDeviceType::kDLCUDA => println!("PASS: Device type is CUDA"), @@ -287,7 +330,6 @@ fn test_dlpack_device_id() { tensor.device.device_id ); - // Free memory common::take_deleter_and_delete(dlpack_ptr); } } diff --git a/qdp/qdp-core/tests/gpu_distributed_amplitude_plan.rs b/qdp/qdp-core/tests/gpu_distributed_amplitude_plan.rs new file mode 100644 index 0000000000..ba14064330 --- /dev/null +++ b/qdp/qdp-core/tests/gpu_distributed_amplitude_plan.rs @@ -0,0 +1,167 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use qdp_core::Precision; +use qdp_core::gpu::{ + DeviceMesh, DistributedAmplitudePlan, DistributedStateLayout, DistributionMode, GpuTopology, + PlacementRequest, ShardPolicy, +}; + +#[test] +fn distributed_amplitude_plan_has_expected_shard_math() { + let topology = GpuTopology::placeholder(4); + let mesh = DeviceMesh { + device_ids: vec![0, 1, 2, 3], + devices: Vec::new(), + topology, + }; + let request = PlacementRequest::new(4, DistributionMode::ShardedCapacity, ShardPolicy::Equal); + let plan = DistributedAmplitudePlan::for_request(&mesh, request).unwrap(); + assert_eq!(plan.global_len, 16); + assert_eq!(plan.shard_bits, Some(2)); + assert_eq!(plan.uniform_shard_len, Some(4)); + assert_eq!(plan.shard_range(0).unwrap(), (0, 4)); + assert_eq!(plan.shard_range(3).unwrap(), (12, 16)); +} + +#[test] +fn distributed_amplitude_plan_rejects_too_many_devices_for_qubits() { + let topology = GpuTopology::placeholder(4); + let mesh = DeviceMesh { + device_ids: vec![0, 1, 2, 3], + devices: Vec::new(), + topology, + }; + let request = PlacementRequest::new(1, DistributionMode::ShardedCapacity, ShardPolicy::Equal); + let err = DistributedAmplitudePlan::for_request(&mesh, request).unwrap_err(); + assert!(matches!(err, qdp_core::MahoutError::InvalidInput(_))); +} + +#[test] +fn distributed_amplitude_plan_allows_extra_global_qubit_when_shards_fit() { + let topology = GpuTopology::placeholder(2); + let mesh = DeviceMesh { + device_ids: vec![0, 1], + devices: Vec::new(), + topology, + }; + let request = PlacementRequest::new(31, DistributionMode::ShardedCapacity, ShardPolicy::Equal); + let plan = DistributedAmplitudePlan::for_request(&mesh, request).unwrap(); + assert_eq!(plan.global_len, 1usize << 31); + assert_eq!(plan.uniform_shard_len, Some(1usize << 30)); +} + +#[test] +fn distributed_amplitude_plan_supports_q34_with_balanced_six_gpu_shards() { + let topology = GpuTopology::placeholder(6); + let mesh = DeviceMesh { + device_ids: vec![0, 1, 2, 3, 4, 5], + devices: Vec::new(), + topology, + }; + let request = PlacementRequest::new( + 34, + DistributionMode::ShardedCapacity, + ShardPolicy::BalancedUneven, + ); + let plan = DistributedAmplitudePlan::for_request(&mesh, request).unwrap(); + + assert_eq!(plan.global_len, 1usize << 34); + assert_eq!(plan.num_devices, 6); + assert_eq!(plan.shard_bits, None); + assert_eq!(plan.uniform_shard_len, None); + assert_eq!(plan.shard_range(0).unwrap(), (0, 2_863_311_531)); + assert_eq!( + plan.shard_range(5).unwrap(), + (14_316_557_654, 17_179_869_184) + ); +} + +#[test] +fn distributed_amplitude_plan_rejects_zero_qubits() { + let topology = GpuTopology::placeholder(1); + let mesh = DeviceMesh { + device_ids: vec![0], + devices: Vec::new(), + topology, + }; + let request = PlacementRequest::new(0, DistributionMode::ShardedCapacity, ShardPolicy::Equal); + let err = DistributedAmplitudePlan::for_request(&mesh, request).unwrap_err(); + assert!(matches!( + err, + qdp_core::MahoutError::InvalidInput(msg) + if msg.contains("at least 1") + )); +} + +#[test] +fn distributed_amplitude_plan_reports_out_of_range_shard_ids() { + let topology = GpuTopology::placeholder(2); + let mesh = DeviceMesh { + device_ids: vec![0, 1], + devices: Vec::new(), + topology, + }; + let request = PlacementRequest::new(2, DistributionMode::ShardedCapacity, ShardPolicy::Equal); + let plan = DistributedAmplitudePlan::for_request(&mesh, request).unwrap(); + let err = plan.shard_range(2).unwrap_err(); + assert!(matches!( + err, + qdp_core::MahoutError::InvalidInput(msg) + if msg.contains("out of range") + )); +} + +#[test] +fn balanced_uneven_distributed_amplitude_plan_omits_uniform_shard_metadata() { + let topology = GpuTopology::placeholder(3); + let mesh = DeviceMesh { + device_ids: vec![0, 1, 2], + devices: Vec::new(), + topology, + }; + let request = PlacementRequest::new( + 3, + DistributionMode::ShardedCapacity, + ShardPolicy::BalancedUneven, + ); + let plan = DistributedAmplitudePlan::for_request(&mesh, request).unwrap(); + assert_eq!(plan.shard_bits, None); + assert_eq!(plan.uniform_shard_len, None); + assert_eq!(plan.shard_range(2).unwrap(), (6, 8)); +} + +#[test] +fn distributed_state_layout_rejects_mesh_with_missing_device_handles() { + let topology = GpuTopology::placeholder(2); + let mesh = DeviceMesh { + device_ids: vec![0, 1], + devices: Vec::new(), + topology, + }; + let request = PlacementRequest::new(2, DistributionMode::ShardedCapacity, ShardPolicy::Equal); + let plan = DistributedAmplitudePlan::for_request(&mesh, request).unwrap(); + + let err = match DistributedStateLayout::new(&mesh, &plan, Precision::Float64) { + Ok(_) => panic!("expected malformed mesh to be rejected"), + Err(err) => err, + }; + assert!(matches!( + err, + qdp_core::MahoutError::InvalidInput(msg) + if msg.contains("device handles") + )); +} diff --git a/qdp/qdp-core/tests/gpu_distributed_communicator.rs b/qdp/qdp-core/tests/gpu_distributed_communicator.rs new file mode 100644 index 0000000000..ffd1904671 --- /dev/null +++ b/qdp/qdp-core/tests/gpu_distributed_communicator.rs @@ -0,0 +1,35 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use qdp_core::gpu::{CollectiveCommunicator, LocalCollectiveCommunicator}; + +#[test] +fn local_collective_reduce_sum_returns_total() { + let comm = LocalCollectiveCommunicator; + let values = vec![1.0, 2.0, 3.0]; + assert_eq!(comm.all_reduce_sum_f64(&values).unwrap(), 6.0); +} + +#[test] +fn local_collective_reduce_sum_rejects_empty_inputs() { + let comm = LocalCollectiveCommunicator; + let err = comm.all_reduce_sum_f64(&[]).unwrap_err(); + assert!(matches!( + err, + qdp_core::MahoutError::InvalidInput(msg) + if msg.contains("at least one partial contribution") + )); +} diff --git a/qdp/qdp-core/tests/gpu_distributed_engine.rs b/qdp/qdp-core/tests/gpu_distributed_engine.rs new file mode 100644 index 0000000000..3caebdd3c7 --- /dev/null +++ b/qdp/qdp-core/tests/gpu_distributed_engine.rs @@ -0,0 +1,363 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use qdp_core::gpu::{ + DeviceMesh, DistributedAmplitudePlan, DistributedExecutionContext, DistributedStateLayout, + DistributionMode, GpuTopology, LinkKind, LocalCollectiveCommunicator, PlacementRequest, + QuantumEncoder, ShardPolicy, +}; +use qdp_core::{Precision, QdpEngine}; + +mod common; + +#[test] +fn prepare_distributed_amplitude_returns_expected_metadata() { + #[cfg(target_os = "linux")] + if cudarc::driver::CudaDevice::new(1).is_err() { + return; + } + + let prepared = QdpEngine::prepare_distributed_amplitude( + vec![0, 1], + &[1.0, 2.0, 3.0], + 2, + Precision::Float32, + None, + ) + .unwrap(); + + assert_eq!(prepared.mesh.num_devices(), 2); + assert_eq!(prepared.plan.uniform_shard_len, Some(2)); + assert_eq!(prepared.layout.num_shards(), 2); + assert_eq!(prepared.layout.global_len, 4); + + let expected = 1.0 / 14.0f64.sqrt(); + assert!((prepared.inv_norm - expected).abs() < 1e-12); +} + +#[test] +fn prepare_distributed_amplitude_on_execution_context_returns_expected_metadata() { + #[cfg(target_os = "linux")] + if cudarc::driver::CudaDevice::new(1).is_err() { + return; + } + + let collectives = LocalCollectiveCommunicator; + let execution = DistributedExecutionContext::single_process(vec![0, 1], &collectives).unwrap(); + let prepared = QdpEngine::prepare_distributed_amplitude_on( + &execution, + &[1.0, 2.0, 3.0], + 2, + Precision::Float32, + None, + ) + .unwrap(); + + assert_eq!(prepared.mesh.num_devices(), 2); + assert_eq!(prepared.plan.uniform_shard_len, Some(2)); + assert_eq!(prepared.layout.num_shards(), 2); + assert_eq!(prepared.layout.global_len, 4); +} + +#[cfg(target_os = "linux")] +fn reordered_three_device_mesh() -> Option { + let device0 = cudarc::driver::CudaDevice::new(0).ok()?; + let device1 = cudarc::driver::CudaDevice::new(1).ok()?; + let device2 = cudarc::driver::CudaDevice::new(2).ok()?; + let topology = GpuTopology { + peer_access: vec![ + vec![true, true, false], + vec![true, true, true], + vec![false, true, true], + ], + links: vec![ + vec![LinkKind::SameDevice, LinkKind::Pix, LinkKind::Unknown], + vec![LinkKind::Pix, LinkKind::SameDevice, LinkKind::Node], + vec![LinkKind::Unknown, LinkKind::Node, LinkKind::SameDevice], + ], + }; + + DeviceMesh::from_parts(vec![0, 1, 2], vec![device0, device1, device2], topology).ok() +} + +#[test] +#[cfg(target_os = "linux")] +fn distributed_layout_uses_device_handles_for_reordered_placements() { + let Some(mesh) = reordered_three_device_mesh() else { + return; + }; + + let request = PlacementRequest::new( + 2, + DistributionMode::ShardedCapacity, + ShardPolicy::BalancedUneven, + ); + let plan = DistributedAmplitudePlan::for_request(&mesh, request).unwrap(); + assert_eq!(plan.placement.placements[0].device_id, 1); + + let layout = DistributedStateLayout::new(&mesh, &plan, Precision::Float32).unwrap(); + assert_eq!(layout.shards[0].device_id, 1); + assert_eq!(layout.shards[0].device.ordinal(), 1); + assert_eq!(layout.shards[1].device_id, 0); + assert_eq!(layout.shards[1].device.ordinal(), 0); + assert_eq!(layout.shards[2].device_id, 2); + assert_eq!(layout.shards[2].device.ordinal(), 2); +} + +#[test] +#[cfg(target_os = "linux")] +fn distributed_encoding_uses_device_handles_for_reordered_placements() { + let Some(mesh) = reordered_three_device_mesh() else { + return; + }; + let device0 = mesh.device_for_id(0).unwrap(); + + let collectives = LocalCollectiveCommunicator; + let execution = DistributedExecutionContext::new(mesh, &collectives); + let input = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0]; + let single_state = qdp_core::gpu::AmplitudeEncoder + .encode(&device0, &input, 3) + .unwrap(); + let single_host = single_state + .copy_to_host_f64(&device0) + .unwrap() + .into_iter() + .map(|value| value.x) + .collect::>(); + + let state = QdpEngine::encode_distributed_amplitude_to_shards_on( + &execution, + &input, + 3, + Precision::Float64, + Some(PlacementRequest::new( + 3, + DistributionMode::ShardedCapacity, + ShardPolicy::BalancedUneven, + )), + ) + .unwrap(); + + assert_eq!(state.shards[0].device_id, 1); + assert_eq!(state.shards[0].device.ordinal(), 1); + assert_eq!(state.shards[1].device_id, 0); + assert_eq!(state.shards[1].device.ordinal(), 0); + assert_eq!(state.shards[2].device_id, 2); + assert_eq!(state.shards[2].device.ordinal(), 2); + + let mut distributed_host = Vec::new(); + for shard_id in 0..state.num_shards() { + distributed_host.extend( + state + .copy_shard_to_host_f64(shard_id) + .unwrap() + .into_iter() + .map(|value| value.x), + ); + } + + assert_eq!(distributed_host, single_host); +} + +#[test] +#[cfg(target_os = "linux")] +fn encode_distributed_amplitude_to_shards_returns_real_buffers() { + let Some(device0) = common::cuda_device() else { + return; + }; + let device1 = match cudarc::driver::CudaDevice::new(1) { + Ok(device) => device, + Err(_) => return, + }; + + let _ = device1; + + let state = QdpEngine::encode_distributed_amplitude_to_shards( + vec![0, 1], + &[1.0, 2.0, 3.0], + 2, + Precision::Float64, + None, + ) + .unwrap(); + + assert_eq!(state.num_shards(), 2); + + let shard0 = state.copy_shard_to_host_f64(0).unwrap(); + let shard1 = state.copy_shard_to_host_f64(1).unwrap(); + + let expected = 1.0 / 14.0f64.sqrt(); + let shard0 = shard0.iter().map(|value| value.x).collect::>(); + let shard1 = shard1.iter().map(|value| value.x).collect::>(); + + assert!((shard0[0] - expected).abs() < 1e-12); + assert!((shard0[1] - (2.0 * expected)).abs() < 1e-12); + assert!((shard1[0] - (3.0 * expected)).abs() < 1e-12); + assert_eq!(shard1[1], 0.0); + + let _ = device0; +} + +#[test] +#[cfg(target_os = "linux")] +fn distributed_amplitude_matches_single_gpu_reference_on_two_gpus() { + let device0 = match cudarc::driver::CudaDevice::new(0) { + Ok(device) => device, + Err(_) => return, + }; + let device1 = match cudarc::driver::CudaDevice::new(1) { + Ok(device) => device, + Err(_) => return, + }; + + let input = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0]; + let encoder = qdp_core::gpu::AmplitudeEncoder; + let single_state = encoder.encode(&device0, &input, 3).unwrap(); + let single_host = single_state + .copy_to_host_f64(&device0) + .unwrap() + .into_iter() + .map(|value| value.x) + .collect::>(); + + let distributed = QdpEngine::encode_distributed_amplitude_to_shards( + vec![0, 1], + &input, + 3, + Precision::Float64, + None, + ) + .unwrap(); + + let mut distributed_host = Vec::new(); + distributed_host.extend( + distributed + .copy_shard_to_host_f64(0) + .unwrap() + .into_iter() + .map(|value| value.x), + ); + distributed_host.extend( + distributed + .copy_shard_to_host_f64(1) + .unwrap() + .into_iter() + .map(|value| value.x), + ); + + assert_eq!(distributed_host.len(), single_host.len()); + for (idx, (distributed_value, single_value)) in + distributed_host.iter().zip(single_host.iter()).enumerate() + { + assert!( + (*distributed_value - *single_value).abs() < 1e-12, + "Mismatch at amplitude {}: distributed={} single={}", + idx, + distributed_value, + single_value + ); + } + + let _ = device1; +} + +#[test] +fn prepare_distributed_amplitude_rejects_oversized_inputs() { + let err = match QdpEngine::prepare_distributed_amplitude( + vec![0], + &[1.0, 2.0, 3.0], + 1, + Precision::Float64, + None, + ) { + Ok(_) => panic!("expected oversized input to be rejected"), + Err(err) => err, + }; + + assert!(matches!( + err, + qdp_core::MahoutError::InvalidInput(msg) + if msg.contains("exceeds state vector size") + )); +} + +#[test] +fn prepare_distributed_amplitude_validates_input_before_building_mesh() { + let err = match QdpEngine::prepare_distributed_amplitude( + vec![9999], + &[], + 1, + Precision::Float64, + None, + ) { + Ok(_) => panic!("expected empty input to be rejected before mesh creation"), + Err(err) => err, + }; + + assert!(matches!( + err, + qdp_core::MahoutError::InvalidInput(msg) + if msg.contains("cannot be empty") + )); +} + +#[test] +#[cfg(target_os = "linux")] +fn distributed_state_rejects_out_of_range_shard_reads() { + let Some(_device0) = common::cuda_device() else { + return; + }; + + let state = QdpEngine::encode_distributed_amplitude_to_shards( + vec![0], + &[1.0, 2.0], + 1, + Precision::Float64, + None, + ) + .unwrap(); + + let err = state.copy_shard_to_host_f64(1).unwrap_err(); + assert!(matches!( + err, + qdp_core::MahoutError::InvalidInput(msg) + if msg.contains("out of range") + )); +} + +#[test] +#[cfg(target_os = "linux")] +fn distributed_state_rejects_precision_mismatched_host_reads() { + let Some(_device0) = common::cuda_device() else { + return; + }; + + let state = QdpEngine::encode_distributed_amplitude_to_shards( + vec![0], + &[1.0, 2.0], + 1, + Precision::Float32, + None, + ) + .unwrap(); + + let err = state.copy_shard_to_host_f64(0).unwrap_err(); + assert!(matches!( + err, + qdp_core::MahoutError::InvalidInput(msg) + if msg.contains("float32 data, not float64") + )); +} diff --git a/qdp/qdp-core/tests/gpu_distributed_planner.rs b/qdp/qdp-core/tests/gpu_distributed_planner.rs new file mode 100644 index 0000000000..1ac04ef632 --- /dev/null +++ b/qdp/qdp-core/tests/gpu_distributed_planner.rs @@ -0,0 +1,142 @@ +use qdp_core::gpu::{ + DeviceMesh, DistributionMode, GpuTopology, LinkKind, PlacementPlanner, PlacementRequest, + ShardPolicy, +}; + +#[test] +fn placement_planner_emits_contiguous_ranges() { + let topology = GpuTopology::placeholder(2); + let mesh = DeviceMesh { + device_ids: vec![3, 7], + devices: Vec::new(), + topology, + }; + let request = PlacementRequest::new(3, DistributionMode::ShardedCapacity, ShardPolicy::Equal); + let plan = PlacementPlanner::plan(&mesh, &request).unwrap(); + assert_eq!(plan.gather_device_id, Some(3)); + assert_eq!(plan.placements.len(), 2); + assert_eq!(plan.placements[0].device_id, 3); + assert_eq!( + (plan.placements[0].start_idx, plan.placements[0].end_idx), + (0, 4) + ); + assert_eq!(plan.placements[1].device_id, 7); + assert_eq!( + (plan.placements[1].start_idx, plan.placements[1].end_idx), + (4, 8) + ); +} + +#[test] +fn balanced_uneven_policy_supports_non_power_of_two_device_counts() { + let topology = GpuTopology::placeholder(3); + let mesh = DeviceMesh { + device_ids: vec![0, 1, 2], + devices: Vec::new(), + topology, + }; + let request = PlacementRequest::new( + 3, + DistributionMode::ShardedCapacity, + ShardPolicy::BalancedUneven, + ); + let plan = PlacementPlanner::plan(&mesh, &request).unwrap(); + assert_eq!(plan.placements.len(), 3); + assert_eq!( + (plan.placements[0].start_idx, plan.placements[0].end_idx), + (0, 3) + ); + assert_eq!( + (plan.placements[1].start_idx, plan.placements[1].end_idx), + (3, 6) + ); + assert_eq!( + (plan.placements[2].start_idx, plan.placements[2].end_idx), + (6, 8) + ); +} + +#[test] +fn equal_policy_rejects_non_power_of_two_device_counts() { + let topology = GpuTopology::placeholder(3); + let mesh = DeviceMesh { + device_ids: vec![0, 1, 2], + devices: Vec::new(), + topology, + }; + let request = PlacementRequest::new(3, DistributionMode::ShardedCapacity, ShardPolicy::Equal); + let err = PlacementPlanner::plan(&mesh, &request).unwrap_err(); + assert!(matches!( + err, + qdp_core::MahoutError::InvalidInput(msg) + if msg.contains("power-of-two") + )); +} + +#[test] +fn single_mode_uses_only_first_device() { + let topology = GpuTopology::placeholder(3); + let mesh = DeviceMesh { + device_ids: vec![4, 8, 15], + devices: Vec::new(), + topology, + }; + let request = PlacementRequest::new(3, DistributionMode::Single, ShardPolicy::Equal); + let plan = PlacementPlanner::plan(&mesh, &request).unwrap(); + assert_eq!(plan.placements.len(), 1); + assert_eq!(plan.gather_device_id, Some(4)); + assert_eq!(plan.placements[0].device_id, 4); + assert_eq!( + (plan.placements[0].start_idx, plan.placements[0].end_idx), + (0, 8) + ); +} + +#[test] +fn replicated_mode_is_not_implemented() { + let topology = GpuTopology::placeholder(2); + let mesh = DeviceMesh { + device_ids: vec![0, 1], + devices: Vec::new(), + topology, + }; + let request = PlacementRequest::new(2, DistributionMode::Replicated, ShardPolicy::Equal); + let err = PlacementPlanner::plan(&mesh, &request).unwrap_err(); + assert!(matches!(err, qdp_core::MahoutError::NotImplemented(_))); +} + +#[test] +fn sharded_capacity_prefers_topology_recommended_device_order() { + let topology = GpuTopology { + peer_access: vec![ + vec![true, true, false], + vec![true, true, true], + vec![false, true, true], + ], + links: vec![ + vec![LinkKind::SameDevice, LinkKind::Pix, LinkKind::Unknown], + vec![LinkKind::Pix, LinkKind::SameDevice, LinkKind::Node], + vec![LinkKind::Unknown, LinkKind::Node, LinkKind::SameDevice], + ], + }; + let mesh = DeviceMesh { + device_ids: vec![10, 11, 12], + devices: Vec::new(), + topology, + }; + let request = PlacementRequest::new( + 3, + DistributionMode::ShardedCapacity, + ShardPolicy::BalancedUneven, + ); + + let plan = PlacementPlanner::plan(&mesh, &request).unwrap(); + let ordered_ids = plan + .placements + .iter() + .map(|placement| placement.device_id) + .collect::>(); + + assert_eq!(plan.gather_device_id, Some(11)); + assert_eq!(ordered_ids, vec![11, 10, 12]); +} diff --git a/qdp/qdp-core/tests/gpu_distributed_runtime.rs b/qdp/qdp-core/tests/gpu_distributed_runtime.rs new file mode 100644 index 0000000000..fcb9609cd4 --- /dev/null +++ b/qdp/qdp-core/tests/gpu_distributed_runtime.rs @@ -0,0 +1,139 @@ +use qdp_core::{DistributionMode, PlacementRequest, Precision, QdpEngine, ShardPolicy}; + +#[test] +fn prepare_distributed_amplitude_handles_padding_tail_in_norm() { + let prepared = QdpEngine::prepare_distributed_amplitude( + vec![0], + &[1.0, 2.0, 3.0], + 2, + Precision::Float64, + None, + ) + .unwrap(); + + let expected = 1.0 / 14.0f64.sqrt(); + assert!((prepared.inv_norm - expected).abs() < 1e-12); +} + +#[test] +fn prepare_distributed_amplitude_accepts_custom_request() { + #[cfg(target_os = "linux")] + if cudarc::driver::CudaDevice::new(1).is_err() { + return; + } + + let prepared = QdpEngine::prepare_distributed_amplitude( + vec![0, 1], + &[1.0, 2.0, 3.0], + 2, + Precision::Float32, + Some(PlacementRequest::new( + 2, + DistributionMode::ShardedCapacity, + ShardPolicy::Equal, + )), + ) + .unwrap(); + + assert_eq!(prepared.plan.num_qubits, 2); + assert_eq!(prepared.plan.placement.num_devices(), 2); +} + +#[test] +fn prepare_distributed_amplitude_rejects_request_qubit_mismatch() { + let err = match QdpEngine::prepare_distributed_amplitude( + vec![0], + &[1.0, 2.0], + 2, + Precision::Float64, + Some(PlacementRequest::new( + 3, + DistributionMode::ShardedCapacity, + ShardPolicy::Equal, + )), + ) { + Ok(_) => panic!("expected qubit mismatch to be rejected"), + Err(err) => err, + }; + + assert!(matches!( + err, + qdp_core::MahoutError::InvalidInput(msg) + if msg.contains("qubit mismatch") + )); +} + +#[test] +fn prepare_distributed_amplitude_rejects_empty_input() { + let err = + match QdpEngine::prepare_distributed_amplitude(vec![0], &[], 1, Precision::Float64, None) { + Ok(_) => panic!("expected empty input to be rejected"), + Err(err) => err, + }; + + assert!(matches!( + err, + qdp_core::MahoutError::InvalidInput(msg) + if msg.contains("cannot be empty") + )); +} + +#[test] +fn prepare_distributed_amplitude_rejects_zero_norm_input() { + let err = match QdpEngine::prepare_distributed_amplitude( + vec![0], + &[0.0, 0.0], + 1, + Precision::Float64, + None, + ) { + Ok(_) => panic!("expected zero-norm input to be rejected"), + Err(err) => err, + }; + + assert!(matches!( + err, + qdp_core::MahoutError::InvalidInput(msg) + if msg.contains("zero or non-finite norm") + )); +} + +#[test] +fn prepare_distributed_amplitude_rejects_non_finite_input() { + let err = match QdpEngine::prepare_distributed_amplitude( + vec![0], + &[1.0, f64::NAN], + 1, + Precision::Float64, + None, + ) { + Ok(_) => panic!("expected non-finite input to be rejected"), + Err(err) => err, + }; + + assert!(matches!( + err, + qdp_core::MahoutError::InvalidInput(msg) + if msg.contains("NaN or Inf") + )); +} + +#[test] +fn prepare_distributed_amplitude_validates_input_before_building_mesh() { + let err = match QdpEngine::prepare_distributed_amplitude( + vec![9999], + &[], + 1, + Precision::Float64, + None, + ) { + Ok(_) => panic!("expected invalid input to be rejected before mesh creation"), + Err(err) => err, + }; + + assert!(matches!( + err, + qdp_core::MahoutError::InvalidInput(msg) + if msg.contains("cannot be empty") + )); +} diff --git a/qdp/qdp-core/tests/gpu_topology.rs b/qdp/qdp-core/tests/gpu_topology.rs new file mode 100644 index 0000000000..8b0b2427f0 --- /dev/null +++ b/qdp/qdp-core/tests/gpu_topology.rs @@ -0,0 +1,79 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use qdp_core::{DeviceMesh, GpuTopology, LinkKind}; + +#[test] +fn placeholder_topology_marks_self_edges() { + let topology = GpuTopology::placeholder(2); + assert!(topology.peer_access[0][0]); + assert!(topology.peer_access[1][1]); + assert_eq!(topology.links[0][0], LinkKind::SameDevice); + assert_eq!(topology.links[1][1], LinkKind::SameDevice); + assert!(!topology.peer_access[0][1]); + assert_eq!(topology.links[0][1], LinkKind::Unknown); +} + +#[test] +fn duplicate_device_ids_are_rejected() { + let err = DeviceMesh::new(vec![0, 0]).err().unwrap(); + assert!(matches!(err, qdp_core::MahoutError::InvalidInput(_))); +} + +#[test] +fn power_of_two_validation_rejects_three_devices() { + let topology = GpuTopology::placeholder(3); + let mesh = DeviceMesh { + device_ids: vec![0, 1, 2], + devices: Vec::new(), + topology, + }; + let err = mesh.validate_power_of_two().unwrap_err(); + assert!(matches!(err, qdp_core::MahoutError::InvalidInput(_))); +} + +#[test] +#[cfg(target_os = "linux")] +fn from_parts_rejects_device_handle_mismatch() { + let device = match cudarc::driver::CudaDevice::new(0) { + Ok(device) => device, + Err(_) => return, + }; + let topology = GpuTopology::placeholder(1); + let err = DeviceMesh::from_parts(vec![1], vec![device], topology) + .err() + .unwrap(); + assert!(matches!(err, qdp_core::MahoutError::InvalidInput(_))); +} + +#[test] +fn preferred_gather_index_prefers_best_connected_device() { + let topology = GpuTopology { + peer_access: vec![ + vec![true, true, false], + vec![true, true, true], + vec![false, true, true], + ], + links: vec![ + vec![LinkKind::SameDevice, LinkKind::Pix, LinkKind::Unknown], + vec![LinkKind::Pix, LinkKind::SameDevice, LinkKind::Node], + vec![LinkKind::Unknown, LinkKind::Node, LinkKind::SameDevice], + ], + }; + + assert_eq!(topology.preferred_gather_index(), Some(1)); + assert_eq!(topology.recommended_placement_order(), vec![1, 0, 2]); +} diff --git a/qdp/qdp-kernels/build.rs b/qdp/qdp-kernels/build.rs index 097b57e15f..f12fdf9876 100644 --- a/qdp/qdp-kernels/build.rs +++ b/qdp/qdp-kernels/build.rs @@ -86,18 +86,20 @@ fn main() { // SM 75 = Turing (T4, RTX 2000 series) // SM 80 = Ampere (A100, RTX 3000 series) // SM 86 = Ampere (RTX 3090, A40) - // SM 89 = Ada Lovelace (RTX 4000 series) + // SM 89 = Ada Lovelace (RTX 4090 / RTX 4000 series) // SM 90 = Hopper (H100) - // Support both Turing (sm_75) and Ampere+ architectures + // SM 120 = Blackwell workstation parts + // Support the GPU generations currently present in the QDP test host. .flag("-gencode") .flag("arch=compute_75,code=sm_75") .flag("-gencode") .flag("arch=compute_80,code=sm_80") .flag("-gencode") .flag("arch=compute_86,code=sm_86") - // Optional: Add more architectures for production - // .flag("-gencode") - // .flag("arch=compute_89,code=sm_89") + .flag("-gencode") + .flag("arch=compute_89,code=sm_89") + .flag("-gencode") + .flag("arch=compute_120,code=sm_120") .file("src/amplitude.cu") .file("src/basis.cu") .file("src/angle.cu")