Skip to content

Commit 67f59e2

Browse files
authored
perf(qdp): Implement async prefetching and native f32 dispatch pipelines (#1242)
* Perf: Implement asynchronous background prefetching to eliminate GPU starvation in QDP pipeline * perf(qdp): Implement async prefetching and native f32 dispatch pipelines * fix ci errors * fix ci errors * update and improve * fix ci errors * fix ci erros * update and improve * update and improve * fix ci errors * update * fix ci errors * fix ci errors
1 parent c2676eb commit 67f59e2

File tree

14 files changed

+1006
-315
lines changed

14 files changed

+1006
-315
lines changed

qdp/qdp-core/src/dlpack.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -252,13 +252,16 @@ impl GpuStateVector {
252252
/// # Safety
253253
/// Freed by DLPack deleter when PyTorch releases tensor.
254254
/// Do not free manually.
255+
#[allow(clippy::manual_is_multiple_of)]
255256
pub fn to_dlpack(&self) -> *mut DLManagedTensor {
256257
// Always return 2D tensor: Batch [num_samples, state_len], Single [1, state_len]
257258
let (shape, strides) = if let Some(num_samples) = self.num_samples {
258259
// Batch: [num_samples, state_len_per_sample]
259260
debug_assert!(
260-
num_samples > 0 && self.size_elements.is_multiple_of(num_samples),
261-
"Batch state vector size must be divisible by num_samples"
261+
num_samples > 0 && self.size_elements % num_samples == 0,
262+
"Batch mismatch: {} elements cannot be evenly divided into {} samples",
263+
self.size_elements,
264+
num_samples
262265
);
263266
let state_len_per_sample = self.size_elements / num_samples;
264267
let shape = vec![num_samples as i64, state_len_per_sample as i64];

qdp/qdp-core/src/gpu/encodings/amplitude.rs

Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,232 @@ impl QuantumEncoder for AmplitudeEncoder {
457457
Ok(batch_state_vector)
458458
}
459459

460+
/// Encode multiple samples in a single GPU allocation and kernel launch for f32 inputs
461+
#[cfg(target_os = "linux")]
462+
fn encode_batch_f32(
463+
&self,
464+
device: &Arc<CudaDevice>,
465+
batch_data: &[f32],
466+
num_samples: usize,
467+
sample_size: usize,
468+
num_qubits: usize,
469+
) -> Result<GpuStateVector> {
470+
crate::profile_scope!("AmplitudeEncoder::encode_batch_f32");
471+
472+
let state_len = 1 << num_qubits;
473+
474+
if sample_size == 0 {
475+
return Err(MahoutError::InvalidInput(
476+
"sample_size cannot be zero".into(),
477+
));
478+
}
479+
if sample_size > state_len {
480+
return Err(MahoutError::InvalidInput(format!(
481+
"sample_size {} exceeds state vector length {} (2^{} qubits)",
482+
sample_size, state_len, num_qubits
483+
)));
484+
}
485+
if batch_data.len() != num_samples * sample_size {
486+
return Err(MahoutError::InvalidInput(format!(
487+
"batch_data length mismatch (expected {} * {} = {}, got {})",
488+
num_samples,
489+
sample_size,
490+
num_samples * sample_size,
491+
batch_data.len()
492+
)));
493+
}
494+
495+
let batch_state_vector = {
496+
crate::profile_scope!("GPU::AllocBatch_f32");
497+
GpuStateVector::new_batch(device, num_samples, num_qubits, Precision::Float32)?
498+
};
499+
500+
// Upload input data to GPU
501+
let input_batch_gpu = {
502+
crate::profile_scope!("GPU::H2D_InputBatch_f32");
503+
device.htod_sync_copy(batch_data).map_err(|e| {
504+
MahoutError::MemoryAllocation(format!("Failed to upload batch input: {:?}", e))
505+
})?
506+
};
507+
508+
// Compute inverse norms on GPU using warp-reduced kernel
509+
let inv_norms_gpu = {
510+
crate::profile_scope!("GPU::BatchNormKernel_f32");
511+
use cudarc::driver::DevicePtrMut;
512+
let mut buffer = device.alloc_zeros::<f32>(num_samples).map_err(|e| {
513+
MahoutError::MemoryAllocation(format!("Failed to allocate norm buffer: {:?}", e))
514+
})?;
515+
516+
let ret = unsafe {
517+
launch_l2_norm_batch_f32(
518+
*input_batch_gpu.device_ptr() as *const f32,
519+
num_samples,
520+
sample_size,
521+
*buffer.device_ptr_mut() as *mut f32,
522+
std::ptr::null_mut(), // default stream
523+
)
524+
};
525+
526+
if ret != 0 {
527+
return Err(MahoutError::KernelLaunch(format!(
528+
"Norm reduction kernel failed: {} ({})",
529+
ret,
530+
cuda_error_to_string(ret)
531+
)));
532+
}
533+
buffer
534+
};
535+
536+
// Validate norms on host
537+
{
538+
crate::profile_scope!("GPU::NormValidation_f32");
539+
let host_inv_norms = device
540+
.dtoh_sync_copy(&inv_norms_gpu)
541+
.map_err(|e| MahoutError::Cuda(format!("Failed to copy norms to host: {:?}", e)))?;
542+
543+
if host_inv_norms.iter().any(|v| !v.is_finite() || *v == 0.0) {
544+
return Err(MahoutError::InvalidInput(
545+
"One or more samples have zero or invalid norm".to_string(),
546+
));
547+
}
548+
}
549+
550+
// Launch batch kernel
551+
{
552+
crate::profile_scope!("GPU::BatchKernelLaunch_f32");
553+
use cudarc::driver::DevicePtr;
554+
let state_ptr = batch_state_vector.ptr_f32().ok_or_else(|| {
555+
MahoutError::InvalidInput(
556+
"Batch state vector precision mismatch (expected float32 buffer)".to_string(),
557+
)
558+
})?;
559+
let ret = unsafe {
560+
launch_amplitude_encode_batch_f32(
561+
*input_batch_gpu.device_ptr() as *const f32,
562+
state_ptr as *mut c_void,
563+
*inv_norms_gpu.device_ptr() as *const f32,
564+
num_samples,
565+
sample_size,
566+
state_len,
567+
std::ptr::null_mut(), // default stream
568+
)
569+
};
570+
571+
if ret != 0 {
572+
return Err(MahoutError::KernelLaunch(format!(
573+
"Batch kernel launch failed: {} ({})",
574+
ret,
575+
cuda_error_to_string(ret)
576+
)));
577+
}
578+
}
579+
580+
{
581+
crate::profile_scope!("GPU::Synchronize");
582+
device
583+
.synchronize()
584+
.map_err(|e| MahoutError::Cuda(format!("Sync failed: {:?}", e)))?;
585+
}
586+
587+
Ok(batch_state_vector)
588+
}
589+
590+
#[cfg(target_os = "linux")]
591+
unsafe fn encode_batch_from_gpu_ptr_f32(
592+
&self,
593+
device: &Arc<CudaDevice>,
594+
input_batch_d: *const c_void,
595+
num_samples: usize,
596+
sample_size: usize,
597+
num_qubits: usize,
598+
stream: *mut c_void,
599+
) -> Result<GpuStateVector> {
600+
let state_len = 1 << num_qubits;
601+
if sample_size == 0 {
602+
return Err(MahoutError::InvalidInput(
603+
"Sample size cannot be zero".into(),
604+
));
605+
}
606+
if sample_size > state_len {
607+
return Err(MahoutError::InvalidInput(format!(
608+
"Sample size {} exceeds state vector size {} (2^{} qubits)",
609+
sample_size, state_len, num_qubits
610+
)));
611+
}
612+
let input_batch_d = input_batch_d as *const f32;
613+
let batch_state_vector = {
614+
crate::profile_scope!("GPU::AllocBatch_f32");
615+
GpuStateVector::new_batch(device, num_samples, num_qubits, Precision::Float32)?
616+
};
617+
let inv_norms_gpu = {
618+
crate::profile_scope!("GPU::BatchNormKernel_f32");
619+
use cudarc::driver::DevicePtrMut;
620+
let mut buffer = device.alloc_zeros::<f32>(num_samples).map_err(|e| {
621+
MahoutError::MemoryAllocation(format!("Failed to allocate norm buffer: {:?}", e))
622+
})?;
623+
let ret = unsafe {
624+
launch_l2_norm_batch_f32(
625+
input_batch_d,
626+
num_samples,
627+
sample_size,
628+
*buffer.device_ptr_mut() as *mut f32,
629+
stream,
630+
)
631+
};
632+
if ret != 0 {
633+
return Err(MahoutError::KernelLaunch(format!(
634+
"Norm reduction kernel failed with CUDA error code: {} ({})",
635+
ret,
636+
cuda_error_to_string(ret)
637+
)));
638+
}
639+
buffer
640+
};
641+
{
642+
crate::profile_scope!("GPU::NormValidation_f32");
643+
let host_inv_norms = device
644+
.dtoh_sync_copy(&inv_norms_gpu)
645+
.map_err(|e| MahoutError::Cuda(format!("Failed to copy norms to host: {:?}", e)))?;
646+
if host_inv_norms.iter().any(|v| !v.is_finite() || *v == 0.0) {
647+
return Err(MahoutError::InvalidInput(
648+
"One or more samples have zero or invalid norm".to_string(),
649+
));
650+
}
651+
}
652+
{
653+
crate::profile_scope!("GPU::BatchKernelLaunch_f32");
654+
use cudarc::driver::DevicePtr;
655+
let state_ptr = batch_state_vector.ptr_f32().ok_or_else(|| {
656+
MahoutError::InvalidInput(
657+
"Batch state vector precision mismatch (expected float32 buffer)".to_string(),
658+
)
659+
})?;
660+
let ret = unsafe {
661+
launch_amplitude_encode_batch_f32(
662+
input_batch_d,
663+
state_ptr as *mut c_void,
664+
*inv_norms_gpu.device_ptr() as *const f32,
665+
num_samples,
666+
sample_size,
667+
state_len,
668+
stream,
669+
)
670+
};
671+
if ret != 0 {
672+
return Err(MahoutError::KernelLaunch(format!(
673+
"Batch kernel launch failed with CUDA error code: {} ({})",
674+
ret,
675+
cuda_error_to_string(ret)
676+
)));
677+
}
678+
}
679+
{
680+
crate::profile_scope!("GPU::Synchronize");
681+
sync_cuda_stream(stream, "CUDA stream synchronize failed")?;
682+
}
683+
Ok(batch_state_vector)
684+
}
685+
460686
fn name(&self) -> &'static str {
461687
"amplitude"
462688
}

qdp/qdp-core/src/gpu/encodings/mod.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,41 @@ pub trait QuantumEncoder: Send + Sync {
134134
self.name()
135135
)))
136136
}
137+
138+
/// Encode multiple samples in a single GPU allocation and kernel launch using f32 inputs.
139+
fn encode_batch_f32(
140+
&self,
141+
_device: &Arc<CudaDevice>,
142+
_batch_data: &[f32],
143+
_num_samples: usize,
144+
_sample_size: usize,
145+
_num_qubits: usize,
146+
) -> Result<GpuStateVector> {
147+
Err(MahoutError::NotImplemented(format!(
148+
"encode_batch_f32 not implemented for {}",
149+
self.name()
150+
)))
151+
}
152+
153+
/// Encode batch from existing GPU pointer (zero-copy) for f32 inputs.
154+
///
155+
/// # Safety
156+
/// Caller must ensure `input_batch_d` points to valid GPU memory (f32).
157+
#[cfg(target_os = "linux")]
158+
unsafe fn encode_batch_from_gpu_ptr_f32(
159+
&self,
160+
_device: &Arc<CudaDevice>,
161+
_input_batch_d: *const c_void,
162+
_num_samples: usize,
163+
_sample_size: usize,
164+
_num_qubits: usize,
165+
_stream: *mut c_void,
166+
) -> Result<GpuStateVector> {
167+
Err(MahoutError::NotImplemented(format!(
168+
"encode_batch_from_gpu_ptr_f32 not supported for {}",
169+
self.name()
170+
)))
171+
}
137172
}
138173

139174
// Encoding implementations

0 commit comments

Comments
 (0)