diff --git a/qdp/qdp-core/src/dlpack.rs b/qdp/qdp-core/src/dlpack.rs index 1181dbd8e5..0108f4f19c 100644 --- a/qdp/qdp-core/src/dlpack.rs +++ b/qdp/qdp-core/src/dlpack.rs @@ -252,13 +252,16 @@ impl GpuStateVector { /// # Safety /// Freed by DLPack deleter when PyTorch releases tensor. /// Do not free manually. + #[allow(clippy::manual_is_multiple_of)] pub fn to_dlpack(&self) -> *mut DLManagedTensor { // Always return 2D tensor: Batch [num_samples, state_len], Single [1, state_len] let (shape, strides) = if let Some(num_samples) = self.num_samples { // Batch: [num_samples, state_len_per_sample] debug_assert!( - num_samples > 0 && self.size_elements.is_multiple_of(num_samples), - "Batch state vector size must be divisible by num_samples" + num_samples > 0 && self.size_elements % num_samples == 0, + "Batch mismatch: {} elements cannot be evenly divided into {} samples", + self.size_elements, + num_samples ); let state_len_per_sample = self.size_elements / num_samples; let shape = vec![num_samples as i64, state_len_per_sample as i64]; diff --git a/qdp/qdp-core/src/gpu/encodings/amplitude.rs b/qdp/qdp-core/src/gpu/encodings/amplitude.rs index 1be318cfe3..e3081b65d6 100644 --- a/qdp/qdp-core/src/gpu/encodings/amplitude.rs +++ b/qdp/qdp-core/src/gpu/encodings/amplitude.rs @@ -457,6 +457,232 @@ impl QuantumEncoder for AmplitudeEncoder { Ok(batch_state_vector) } + /// Encode multiple samples in a single GPU allocation and kernel launch for f32 inputs + #[cfg(target_os = "linux")] + fn encode_batch_f32( + &self, + device: &Arc, + batch_data: &[f32], + num_samples: usize, + sample_size: usize, + num_qubits: usize, + ) -> Result { + crate::profile_scope!("AmplitudeEncoder::encode_batch_f32"); + + let state_len = 1 << num_qubits; + + if sample_size == 0 { + return Err(MahoutError::InvalidInput( + "sample_size cannot be zero".into(), + )); + } + if sample_size > state_len { + return Err(MahoutError::InvalidInput(format!( + "sample_size {} exceeds state vector length {} (2^{} qubits)", + sample_size, state_len, num_qubits + ))); + } + if batch_data.len() != num_samples * sample_size { + return Err(MahoutError::InvalidInput(format!( + "batch_data length mismatch (expected {} * {} = {}, got {})", + num_samples, + sample_size, + num_samples * sample_size, + batch_data.len() + ))); + } + + let batch_state_vector = { + crate::profile_scope!("GPU::AllocBatch_f32"); + GpuStateVector::new_batch(device, num_samples, num_qubits, Precision::Float32)? + }; + + // Upload input data to GPU + let input_batch_gpu = { + crate::profile_scope!("GPU::H2D_InputBatch_f32"); + device.htod_sync_copy(batch_data).map_err(|e| { + MahoutError::MemoryAllocation(format!("Failed to upload batch input: {:?}", e)) + })? + }; + + // Compute inverse norms on GPU using warp-reduced kernel + let inv_norms_gpu = { + crate::profile_scope!("GPU::BatchNormKernel_f32"); + use cudarc::driver::DevicePtrMut; + let mut buffer = device.alloc_zeros::(num_samples).map_err(|e| { + MahoutError::MemoryAllocation(format!("Failed to allocate norm buffer: {:?}", e)) + })?; + + let ret = unsafe { + launch_l2_norm_batch_f32( + *input_batch_gpu.device_ptr() as *const f32, + num_samples, + sample_size, + *buffer.device_ptr_mut() as *mut f32, + std::ptr::null_mut(), // default stream + ) + }; + + if ret != 0 { + return Err(MahoutError::KernelLaunch(format!( + "Norm reduction kernel failed: {} ({})", + ret, + cuda_error_to_string(ret) + ))); + } + buffer + }; + + // Validate norms on host + { + crate::profile_scope!("GPU::NormValidation_f32"); + let host_inv_norms = device + .dtoh_sync_copy(&inv_norms_gpu) + .map_err(|e| MahoutError::Cuda(format!("Failed to copy norms to host: {:?}", e)))?; + + if host_inv_norms.iter().any(|v| !v.is_finite() || *v == 0.0) { + return Err(MahoutError::InvalidInput( + "One or more samples have zero or invalid norm".to_string(), + )); + } + } + + // Launch batch kernel + { + crate::profile_scope!("GPU::BatchKernelLaunch_f32"); + use cudarc::driver::DevicePtr; + let state_ptr = batch_state_vector.ptr_f32().ok_or_else(|| { + MahoutError::InvalidInput( + "Batch state vector precision mismatch (expected float32 buffer)".to_string(), + ) + })?; + let ret = unsafe { + launch_amplitude_encode_batch_f32( + *input_batch_gpu.device_ptr() as *const f32, + state_ptr as *mut c_void, + *inv_norms_gpu.device_ptr() as *const f32, + num_samples, + sample_size, + state_len, + std::ptr::null_mut(), // default stream + ) + }; + + if ret != 0 { + return Err(MahoutError::KernelLaunch(format!( + "Batch kernel launch failed: {} ({})", + ret, + cuda_error_to_string(ret) + ))); + } + } + + { + crate::profile_scope!("GPU::Synchronize"); + device + .synchronize() + .map_err(|e| MahoutError::Cuda(format!("Sync failed: {:?}", e)))?; + } + + Ok(batch_state_vector) + } + + #[cfg(target_os = "linux")] + unsafe fn encode_batch_from_gpu_ptr_f32( + &self, + device: &Arc, + input_batch_d: *const c_void, + num_samples: usize, + sample_size: usize, + num_qubits: usize, + stream: *mut c_void, + ) -> Result { + let state_len = 1 << num_qubits; + if sample_size == 0 { + return Err(MahoutError::InvalidInput( + "Sample size cannot be zero".into(), + )); + } + if sample_size > state_len { + return Err(MahoutError::InvalidInput(format!( + "Sample size {} exceeds state vector size {} (2^{} qubits)", + sample_size, state_len, num_qubits + ))); + } + let input_batch_d = input_batch_d as *const f32; + let batch_state_vector = { + crate::profile_scope!("GPU::AllocBatch_f32"); + GpuStateVector::new_batch(device, num_samples, num_qubits, Precision::Float32)? + }; + let inv_norms_gpu = { + crate::profile_scope!("GPU::BatchNormKernel_f32"); + use cudarc::driver::DevicePtrMut; + let mut buffer = device.alloc_zeros::(num_samples).map_err(|e| { + MahoutError::MemoryAllocation(format!("Failed to allocate norm buffer: {:?}", e)) + })?; + let ret = unsafe { + launch_l2_norm_batch_f32( + input_batch_d, + num_samples, + sample_size, + *buffer.device_ptr_mut() as *mut f32, + stream, + ) + }; + if ret != 0 { + return Err(MahoutError::KernelLaunch(format!( + "Norm reduction kernel failed with CUDA error code: {} ({})", + ret, + cuda_error_to_string(ret) + ))); + } + buffer + }; + { + crate::profile_scope!("GPU::NormValidation_f32"); + let host_inv_norms = device + .dtoh_sync_copy(&inv_norms_gpu) + .map_err(|e| MahoutError::Cuda(format!("Failed to copy norms to host: {:?}", e)))?; + if host_inv_norms.iter().any(|v| !v.is_finite() || *v == 0.0) { + return Err(MahoutError::InvalidInput( + "One or more samples have zero or invalid norm".to_string(), + )); + } + } + { + crate::profile_scope!("GPU::BatchKernelLaunch_f32"); + use cudarc::driver::DevicePtr; + let state_ptr = batch_state_vector.ptr_f32().ok_or_else(|| { + MahoutError::InvalidInput( + "Batch state vector precision mismatch (expected float32 buffer)".to_string(), + ) + })?; + let ret = unsafe { + launch_amplitude_encode_batch_f32( + input_batch_d, + state_ptr as *mut c_void, + *inv_norms_gpu.device_ptr() as *const f32, + num_samples, + sample_size, + state_len, + stream, + ) + }; + if ret != 0 { + return Err(MahoutError::KernelLaunch(format!( + "Batch kernel launch failed with CUDA error code: {} ({})", + ret, + cuda_error_to_string(ret) + ))); + } + } + { + crate::profile_scope!("GPU::Synchronize"); + sync_cuda_stream(stream, "CUDA stream synchronize failed")?; + } + Ok(batch_state_vector) + } + fn name(&self) -> &'static str { "amplitude" } diff --git a/qdp/qdp-core/src/gpu/encodings/mod.rs b/qdp/qdp-core/src/gpu/encodings/mod.rs index b94e4f4627..fa6362d4cf 100644 --- a/qdp/qdp-core/src/gpu/encodings/mod.rs +++ b/qdp/qdp-core/src/gpu/encodings/mod.rs @@ -134,6 +134,41 @@ pub trait QuantumEncoder: Send + Sync { self.name() ))) } + + /// Encode multiple samples in a single GPU allocation and kernel launch using f32 inputs. + fn encode_batch_f32( + &self, + _device: &Arc, + _batch_data: &[f32], + _num_samples: usize, + _sample_size: usize, + _num_qubits: usize, + ) -> Result { + Err(MahoutError::NotImplemented(format!( + "encode_batch_f32 not implemented for {}", + self.name() + ))) + } + + /// Encode batch from existing GPU pointer (zero-copy) for f32 inputs. + /// + /// # Safety + /// Caller must ensure `input_batch_d` points to valid GPU memory (f32). + #[cfg(target_os = "linux")] + unsafe fn encode_batch_from_gpu_ptr_f32( + &self, + _device: &Arc, + _input_batch_d: *const c_void, + _num_samples: usize, + _sample_size: usize, + _num_qubits: usize, + _stream: *mut c_void, + ) -> Result { + Err(MahoutError::NotImplemented(format!( + "encode_batch_from_gpu_ptr_f32 not supported for {}", + self.name() + ))) + } } // Encoding implementations diff --git a/qdp/qdp-core/src/gpu/pipeline.rs b/qdp/qdp-core/src/gpu/pipeline.rs index 1756216037..9a715df6f8 100644 --- a/qdp/qdp-core/src/gpu/pipeline.rs +++ b/qdp/qdp-core/src/gpu/pipeline.rs @@ -274,6 +274,7 @@ where /// `align_elements` must evenly divide the host data length and ensures chunks do not /// split logical records (e.g., per-sample data in batch encoding). #[cfg(target_os = "linux")] +#[allow(clippy::manual_is_multiple_of)] pub fn run_dual_stream_pipeline_aligned( device: &Arc, host_data: &[f64], @@ -290,7 +291,7 @@ where "Alignment must be greater than zero".to_string(), )); } - if !host_data.len().is_multiple_of(align_elements) { + if host_data.len() % align_elements != 0 { return Err(MahoutError::InvalidInput(format!( "Host data length {} is not aligned to {} elements", host_data.len(), @@ -381,6 +382,7 @@ where ensure_device_memory_available(chunk_bytes, "pipeline chunk buffer allocation", None)?; // Allocate temporary device buffer for this chunk + #[allow(clippy::collapsible_if, clippy::manual_is_multiple_of)] let input_chunk_dev = unsafe { device.alloc::(chunk.len()) }.map_err(|e| { map_allocation_error(chunk_bytes, "pipeline chunk buffer allocation", None, e) })?; @@ -403,14 +405,15 @@ where // Record copy start if overlap tracking enabled // Note: Overlap tracking is optional observability - failures should not stop the pipeline - if let Some(ref tracker) = overlap_tracker - && let Err(e) = tracker.record_copy_start(&ctx.stream_copy, event_slot) - { - log::warn!( - "Chunk {}: Failed to record copy start event: {}. Overlap tracking may be incomplete.", - chunk_idx, - e - ); + #[allow(clippy::collapsible_if)] + if let Some(ref tracker) = overlap_tracker { + if let Err(e) = tracker.record_copy_start(&ctx.stream_copy, event_slot) { + log::warn!( + "Chunk {}: Failed to record copy start event: {}. Overlap tracking may be incomplete.", + chunk_idx, + e + ); + } } unsafe { @@ -422,14 +425,15 @@ where // Record copy end if overlap tracking enabled // Note: Overlap tracking is optional observability - failures should not stop the pipeline - if let Some(ref tracker) = overlap_tracker - && let Err(e) = tracker.record_copy_end(&ctx.stream_copy, event_slot) - { - log::warn!( - "Chunk {}: Failed to record copy end event: {}. Overlap tracking may be incomplete.", - chunk_idx, - e - ); + #[allow(clippy::collapsible_if)] + if let Some(ref tracker) = overlap_tracker { + if let Err(e) = tracker.record_copy_end(&ctx.stream_copy, event_slot) { + log::warn!( + "Chunk {}: Failed to record copy end event: {}. Overlap tracking may be incomplete.", + chunk_idx, + e + ); + } } ctx.record_copy_done(event_slot)?; @@ -456,28 +460,30 @@ where // Record compute start if overlap tracking enabled // Note: Overlap tracking is optional observability - failures should not stop the pipeline - if let Some(ref tracker) = overlap_tracker - && let Err(e) = tracker.record_compute_start(&ctx.stream_compute, event_slot) - { - log::warn!( - "Chunk {}: Failed to record compute start event: {}. Overlap tracking may be incomplete.", - chunk_idx, - e - ); + #[allow(clippy::collapsible_if)] + if let Some(ref tracker) = overlap_tracker { + if let Err(e) = tracker.record_compute_start(&ctx.stream_compute, event_slot) { + log::warn!( + "Chunk {}: Failed to record compute start event: {}. Overlap tracking may be incomplete.", + chunk_idx, + e + ); + } } kernel_launcher(&ctx.stream_compute, input_ptr, chunk_offset, chunk.len())?; // Record compute end if overlap tracking enabled // Note: Overlap tracking is optional observability - failures should not stop the pipeline - if let Some(ref tracker) = overlap_tracker - && let Err(e) = tracker.record_compute_end(&ctx.stream_compute, event_slot) - { - log::warn!( - "Chunk {}: Failed to record compute end event: {}. Overlap tracking may be incomplete.", - chunk_idx, - e - ); + #[allow(clippy::collapsible_if)] + if let Some(ref tracker) = overlap_tracker { + if let Err(e) = tracker.record_compute_end(&ctx.stream_compute, event_slot) { + log::warn!( + "Chunk {}: Failed to record compute end event: {}. Overlap tracking may be incomplete.", + chunk_idx, + e + ); + } } } @@ -489,24 +495,25 @@ where // Note: log_overlap now handles both success and failure cases internally, // logging at appropriate levels (INFO for visibility, DEBUG for details). #[allow(clippy::manual_is_multiple_of)] - if let Some(ref tracker) = overlap_tracker - && (chunk_idx % 10 == 0 || chunk_idx == 0) - { - // Only log every Nth chunk to avoid excessive logging - // Note: log_overlap waits for events to complete, which may take time - // If events fail (e.g., invalid resource handle), log_overlap will log - // at INFO level so it's visible in both debug and info modes - if let Err(e) = tracker.log_overlap(chunk_idx) { - // log_overlap already logged the error at INFO level - // We only need to log additional details at DEBUG level if needed - if log::log_enabled!(log::Level::Debug) { - log::debug!( - "Overlap tracking failed for chunk {}: {}. Pipeline continues normally.", - chunk_idx, - e - ); + #[allow(clippy::collapsible_if)] + if let Some(ref tracker) = overlap_tracker { + if chunk_idx % 10 == 0 || chunk_idx == 0 { + // Only log every Nth chunk to avoid excessive logging + // Note: log_overlap waits for events to complete, which may take time + // If events fail (e.g., invalid resource handle), log_overlap will log + // at INFO level so it's visible in both debug and info modes + if let Err(e) = tracker.log_overlap(chunk_idx) { + // log_overlap already logged the error at INFO level + // We only need to log additional details at DEBUG level if needed + if log::log_enabled!(log::Level::Debug) { + log::debug!( + "Overlap tracking failed for chunk {}: {}. Pipeline continues normally.", + chunk_idx, + e + ); + } + // Don't fail the pipeline - overlap tracking is optional observability } - // Don't fail the pipeline - overlap tracking is optional observability } } diff --git a/qdp/qdp-core/src/lib.rs b/qdp/qdp-core/src/lib.rs index 49406b3d4b..0153f8719f 100644 --- a/qdp/qdp-core/src/lib.rs +++ b/qdp/qdp-core/src/lib.rs @@ -44,7 +44,7 @@ mod pipeline_runner; #[cfg(target_os = "linux")] pub use pipeline_runner::{ - DataSource, PipelineConfig, PipelineIterator, PipelineRunResult, run_latency_pipeline, + PipelineConfig, PipelineIterator, PipelineRunResult, run_latency_pipeline, run_throughput_pipeline, }; @@ -222,6 +222,31 @@ impl QdpEngine { Ok(dlpack_ptr) } + /// Encode multiple samples in a single fused kernel (most efficient) using f32 host input. + pub fn encode_batch_f32( + &self, + batch_data: &[f32], + num_samples: usize, + sample_size: usize, + num_qubits: usize, + encoding_method: &str, + ) -> Result<*mut DLManagedTensor> { + crate::profile_scope!("Mahout::EncodeBatchF32"); + + let encoder = get_encoder(encoding_method)?; + let state_vector = encoder.encode_batch_f32( + &self.device, + batch_data, + num_samples, + sample_size, + num_qubits, + )?; + + let state_vector = state_vector.to_precision(&self.device, self.precision)?; + let dlpack_ptr = state_vector.to_dlpack(); + Ok(dlpack_ptr) + } + /// Run dual-stream pipeline for encoding (H2D + kernel overlap). Exposes gpu::pipeline::run_dual_stream_pipeline. /// Currently supports amplitude encoding (1D host_data). Does not return a tensor; /// use for throughput measurement or when the encoded state is not needed. diff --git a/qdp/qdp-core/src/pipeline_runner.rs b/qdp/qdp-core/src/pipeline_runner.rs index df1f61a239..42bb5cc655 100644 --- a/qdp/qdp-core/src/pipeline_runner.rs +++ b/qdp/qdp-core/src/pipeline_runner.rs @@ -19,7 +19,6 @@ use std::f64::consts::PI; use std::path::Path; -use std::sync::Mutex; use std::time::Instant; use crate::QdpEngine; @@ -40,6 +39,22 @@ pub struct PipelineConfig { pub seed: Option, pub warmup_batches: usize, pub null_handling: NullHandling, + pub float32_pipeline: bool, + pub prefetch_depth: usize, +} + +impl PipelineConfig { + /// Normalizes the configuration, such as falling back to f64 if f32 is requested + /// but the encoding doesn't support it. + pub fn normalize(&mut self) { + if self.float32_pipeline && !encoding_supports_f32(&self.encoding_method) { + log::info!( + "float32_pipeline requested but encoding '{}' does not support f32; falling back to f64", + self.encoding_method + ); + self.float32_pipeline = false; + } + } } impl Default for PipelineConfig { @@ -53,6 +68,8 @@ impl Default for PipelineConfig { seed: None, warmup_batches: 0, null_handling: NullHandling::FillZero, + float32_pipeline: false, + prefetch_depth: 16, } } } @@ -65,82 +82,235 @@ pub struct PipelineRunResult { pub latency_ms_per_vector: f64, } -/// Data source for the pipeline iterator (Phase 1: Synthetic; Phase 2a: InMemory; Phase 2b: Streaming). -pub enum DataSource { - Synthetic { - seed: u64, - batch_index: usize, - total_batches: usize, - }, - /// Phase 2a: full file loaded once; iterator slices by batch_size. - InMemory { - data: Vec, - cursor: usize, - num_samples: usize, - sample_size: usize, - batches_yielded: usize, - batch_limit: usize, - }, - /// Phase 2b: stream from Parquet in chunks; iterator refills buffer and encodes by batch. - /// Reader is in Mutex so PipelineIterator remains Sync (required by PyO3 pyclass). - Streaming { - reader: Mutex, - buffer: Vec, - buffer_cursor: usize, - read_chunk_scratch: Vec, - sample_size: usize, - batch_limit: usize, - batches_yielded: usize, - }, +#[derive(Clone, Debug, PartialEq)] +pub enum BatchData { + F32(Vec), + F64(Vec), } -impl std::fmt::Debug for DataSource { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - DataSource::Synthetic { - seed, - batch_index, - total_batches, - } => f - .debug_struct("Synthetic") - .field("seed", seed) - .field("batch_index", batch_index) - .field("total_batches", total_batches) - .finish(), - DataSource::InMemory { - cursor, - num_samples, - sample_size, - batches_yielded, - batch_limit, - .. - } => f - .debug_struct("InMemory") - .field("cursor", cursor) - .field("num_samples", num_samples) - .field("sample_size", sample_size) - .field("batches_yielded", batches_yielded) - .field("batch_limit", batch_limit) - .finish(), - DataSource::Streaming { - buffer, - buffer_cursor, - sample_size, - batch_limit, - batches_yielded, - .. - } => f - .debug_struct("Streaming") - .field("buffer_len", &buffer.len()) - .field("buffer_cursor", buffer_cursor) - .field("sample_size", sample_size) - .field("batch_limit", batch_limit) - .field("batches_yielded", batches_yielded) - .finish(), +pub struct PrefetchedBatch { + pub data: BatchData, + pub batch_n: usize, + pub sample_size: usize, + pub num_qubits: usize, +} + +pub trait BatchProducer: Send + 'static { + fn produce(&mut self, recycled: Option) -> Result>; +} + +/// Returns true if the given encoding method has a native f32 GPU kernel. +/// Used to auto-gate `float32_pipeline` so unsupported encodings fall back to f64. +fn encoding_supports_f32(encoding_method: &str) -> bool { + matches!(encoding_method.to_lowercase().as_str(), "amplitude") +} + +pub struct SyntheticProducer { + pub config: PipelineConfig, + pub vector_len: usize, + pub batch_index: usize, + pub total_batches: usize, +} + +impl SyntheticProducer { + pub fn new(config: PipelineConfig, vector_len: usize) -> Self { + let total_batches = config.total_batches; + Self { + config, + vector_len, + batch_index: 0, + total_batches, } } } +impl BatchProducer for SyntheticProducer { + fn produce(&mut self, recycled: Option) -> Result> { + if self.batch_index >= self.total_batches { + return Ok(None); + } + + let mut data = match recycled { + Some(BatchData::F32(mut buf)) if self.config.float32_pipeline => { + buf.resize(self.config.batch_size * self.vector_len, 0.0); + BatchData::F32(buf) + } + Some(BatchData::F64(mut buf)) if !self.config.float32_pipeline => { + buf.resize(self.config.batch_size * self.vector_len, 0.0); + BatchData::F64(buf) + } + _ => { + if self.config.float32_pipeline { + BatchData::F32(vec![0.0f32; self.config.batch_size * self.vector_len]) + } else { + BatchData::F64(vec![0.0f64; self.config.batch_size * self.vector_len]) + } + } + }; + + match &mut data { + BatchData::F32(buf) => { + fill_batch_inplace_f32(&self.config, self.batch_index, self.vector_len, buf) + } + BatchData::F64(buf) => { + fill_batch_inplace(&self.config, self.batch_index, self.vector_len, buf) + } + } + + self.batch_index += 1; + Ok(Some(PrefetchedBatch { + data, + batch_n: self.config.batch_size, + sample_size: self.vector_len, + num_qubits: self.config.num_qubits as usize, + })) + } +} + +pub struct InMemoryProducer { + pub data: Vec, + pub cursor: usize, + pub sample_size: usize, + pub batch_size: usize, + pub num_qubits: usize, + pub batches_yielded: usize, + pub batch_limit: usize, +} + +impl BatchProducer for InMemoryProducer { + fn produce(&mut self, recycled: Option) -> Result> { + if self.batches_yielded >= self.batch_limit { + return Ok(None); + } + let remaining = (self.data.len() - self.cursor) / self.sample_size; + if remaining == 0 { + return Ok(None); + } + + let batch_n = remaining.min(self.batch_size); + let start = self.cursor; + let end = start + batch_n * self.sample_size; + self.cursor = end; + self.batches_yielded += 1; + let slice = &self.data[start..end]; + + let data = match recycled { + Some(BatchData::F64(mut buf)) => { + buf.clear(); + buf.extend_from_slice(slice); + BatchData::F64(buf) + } + _ => BatchData::F64(slice.to_vec()), + }; + + Ok(Some(PrefetchedBatch { + data, + batch_n, + sample_size: self.sample_size, + num_qubits: self.num_qubits, + })) + } +} + +pub struct StreamingProducer { + pub reader: ParquetStreamingReader, + pub buffer: Vec, + pub buffer_cursor: usize, + pub read_chunk_scratch: Vec, + pub sample_size: usize, + pub batch_size: usize, + pub num_qubits: usize, + pub batches_yielded: usize, + pub batch_limit: usize, +} + +impl BatchProducer for StreamingProducer { + fn produce(&mut self, recycled: Option) -> Result> { + if self.batches_yielded >= self.batch_limit { + return Ok(None); + } + let required = self.batch_size * self.sample_size; + while (self.buffer.len() - self.buffer_cursor) < required { + let written = self.reader.read_chunk(&mut self.read_chunk_scratch)?; + if written == 0 { + break; + } + self.buffer + .extend_from_slice(&self.read_chunk_scratch[..written]); + } + let available = self.buffer.len() - self.buffer_cursor; + let available_samples = available / self.sample_size; + + if available_samples == 0 { + return Ok(None); + } + + let batch_n = available_samples.min(self.batch_size); + let start = self.buffer_cursor; + let end = start + batch_n * self.sample_size; + self.buffer_cursor = end; + self.batches_yielded += 1; + + let data = match recycled { + Some(BatchData::F64(mut buf)) => { + buf.clear(); + buf.extend_from_slice(&self.buffer[start..end]); + BatchData::F64(buf) + } + _ => BatchData::F64(self.buffer[start..end].to_vec()), + }; + + if self.buffer_cursor >= self.buffer.len() / BUFFER_COMPACT_DENOM { + self.buffer.drain(..self.buffer_cursor); + self.buffer_cursor = 0; + } + + Ok(Some(PrefetchedBatch { + data, + batch_n, + sample_size: self.sample_size, + num_qubits: self.num_qubits, + })) + } +} + +type ProducerHandles = ( + std::sync::mpsc::Receiver>, + std::sync::mpsc::Sender, + std::thread::JoinHandle<()>, +); + +fn spawn_producer( + mut producer: impl BatchProducer, + prefetch_depth: usize, +) -> Result { + // If prefetch_depth is 0, default to a minimum of 1 to ensure channel can hold at least 1 item + let depth = prefetch_depth.max(1); + let (tx, rx) = std::sync::mpsc::sync_channel(depth); + let (recycle_tx, recycle_rx) = std::sync::mpsc::channel::(); + let handle = std::thread::Builder::new() + .name("qdp-prefetch".into()) + .spawn(move || { + loop { + let recycled = recycle_rx.try_recv().ok(); + match producer.produce(recycled) { + Ok(Some(batch)) => { + if tx.send(Ok(batch)).is_err() { + break; + } + } + Ok(None) => break, + Err(e) => { + let _ = tx.send(Err(e)); + break; + } + } + } + }) + .map_err(|e| MahoutError::Io(format!("Failed to spawn prefetch thread: {}", e)))?; + Ok((rx, recycle_tx, handle)) +} + /// Default Parquet row group size for streaming reader (tunable). const DEFAULT_PARQUET_ROW_GROUP_SIZE: usize = 2048; @@ -184,31 +354,28 @@ fn read_file_by_extension( } /// Stateful iterator that yields one batch DLPack at a time for Python `for` loop consumption. -/// Holds a clone of QdpEngine, PipelineConfig, and source state; reuses generate_batch and encode_batch. +/// Reads prefetched batches via a bounded channel. pub struct PipelineIterator { engine: QdpEngine, config: PipelineConfig, - source: DataSource, - vector_len: usize, + rx: std::sync::Mutex>>, + recycle_tx: std::sync::Mutex>, + _producer_handle: std::sync::Mutex>, } -/// (batch_data, batch_n, sample_size, num_qubits) from one source pull. -type BatchFromSource = (Vec, usize, usize, usize); - impl PipelineIterator { - /// Create a new synthetic-data pipeline iterator. - pub fn new_synthetic(engine: QdpEngine, config: PipelineConfig) -> Result { + pub fn new_synthetic(engine: QdpEngine, mut config: PipelineConfig) -> Result { + config.normalize(); let vector_len = vector_len(config.num_qubits, &config.encoding_method); - let source = DataSource::Synthetic { - seed: config.seed.unwrap_or(0), - batch_index: 0, - total_batches: config.total_batches, - }; + let producer = SyntheticProducer::new(config.clone(), vector_len); + let prefetch_depth = config.prefetch_depth; + let (rx, recycle_tx, _producer_handle) = spawn_producer(producer, prefetch_depth)?; Ok(Self { engine, config, - source, - vector_len, + rx: std::sync::Mutex::new(rx), + recycle_tx: std::sync::Mutex::new(recycle_tx), + _producer_handle: std::sync::Mutex::new(_producer_handle), }) } @@ -220,9 +387,10 @@ impl PipelineIterator { pub fn new_from_file>( engine: QdpEngine, path: P, - config: PipelineConfig, + mut config: PipelineConfig, batch_limit: usize, ) -> Result { + config.normalize(); let path = path.as_ref(); let (data, num_samples, sample_size) = read_file_by_extension(path, config.null_handling)?; let vector_len = vector_len(config.num_qubits, &config.encoding_method); @@ -243,19 +411,23 @@ impl PipelineIterator { ))); } - let source = DataSource::InMemory { + let producer = InMemoryProducer { data, cursor: 0, - num_samples, sample_size, + batch_size: config.batch_size, + num_qubits: config.num_qubits as usize, batches_yielded: 0, batch_limit, }; + let prefetch_depth = config.prefetch_depth; + let (rx, recycle_tx, _producer_handle) = spawn_producer(producer, prefetch_depth)?; Ok(Self { engine, config, - source, - vector_len, + rx: std::sync::Mutex::new(rx), + recycle_tx: std::sync::Mutex::new(recycle_tx), + _producer_handle: std::sync::Mutex::new(_producer_handle), }) } @@ -265,9 +437,10 @@ impl PipelineIterator { pub fn new_from_file_streaming>( engine: QdpEngine, path: P, - config: PipelineConfig, + mut config: PipelineConfig, batch_limit: usize, ) -> Result { + config.normalize(); let path = path.as_ref(); if path_extension_lower(path).as_deref() != Some("parquet") { return Err(MahoutError::InvalidInput(format!( @@ -308,138 +481,52 @@ impl PipelineIterator { buffer.truncate(written); let read_chunk_scratch = vec![0.0; INITIAL_CHUNK_CAP]; - let source = DataSource::Streaming { - reader: Mutex::new(reader), + let producer = StreamingProducer { + reader, buffer, buffer_cursor: 0, read_chunk_scratch, sample_size, - batch_limit, + batch_size: config.batch_size, + num_qubits: config.num_qubits as usize, batches_yielded: 0, + batch_limit, }; + let prefetch_depth = config.prefetch_depth; + let (rx, recycle_tx, _producer_handle) = spawn_producer(producer, prefetch_depth)?; Ok(Self { engine, config, - source, - vector_len, - }) - } - - /// Yields the next batch data from the current source; `None` when exhausted. - /// Returns (batch_data, batch_n, sample_size, num_qubits). - fn take_batch_from_source(&mut self) -> Result> { - Ok(match &mut self.source { - DataSource::Synthetic { - batch_index, - total_batches, - .. - } => { - if *batch_index >= *total_batches { - None - } else { - let data = generate_batch(&self.config, *batch_index, self.vector_len); - *batch_index += 1; - Some(( - data, - self.config.batch_size, - self.vector_len, - self.config.num_qubits as usize, - )) - } - } - DataSource::InMemory { - data, - cursor, - sample_size, - batches_yielded, - batch_limit, - .. - } => { - if *batches_yielded >= *batch_limit { - None - } else { - let remaining = (data.len() - *cursor) / *sample_size; - if remaining == 0 { - None - } else { - let batch_n = remaining.min(self.config.batch_size); - let start = *cursor; - let end = start + batch_n * *sample_size; - *cursor = end; - *batches_yielded += 1; - let slice = data[start..end].to_vec(); - Some(( - slice, - batch_n, - *sample_size, - self.config.num_qubits as usize, - )) - } - } - } - DataSource::Streaming { - reader, - buffer, - buffer_cursor, - read_chunk_scratch, - sample_size, - batch_limit, - batches_yielded, - } => { - if *batches_yielded >= *batch_limit { - None - } else { - let required = self.config.batch_size * *sample_size; - while (buffer.len() - *buffer_cursor) < required { - let r = reader.get_mut().map_err(|e| { - MahoutError::Io(format!("Streaming reader mutex poisoned: {}", e)) - })?; - let written = r.read_chunk(read_chunk_scratch)?; - if written == 0 { - break; - } - buffer.extend_from_slice(&read_chunk_scratch[..written]); - } - let available = buffer.len() - *buffer_cursor; - let available_samples = available / *sample_size; - if available_samples == 0 { - None - } else { - let batch_n = available_samples.min(self.config.batch_size); - let start = *buffer_cursor; - let end = start + batch_n * *sample_size; - *buffer_cursor = end; - *batches_yielded += 1; - let slice = buffer[start..end].to_vec(); - if *buffer_cursor >= buffer.len() / BUFFER_COMPACT_DENOM { - buffer.drain(..*buffer_cursor); - *buffer_cursor = 0; - } - Some(( - slice, - batch_n, - *sample_size, - self.config.num_qubits as usize, - )) - } - } - } + rx: std::sync::Mutex::new(rx), + recycle_tx: std::sync::Mutex::new(recycle_tx), + _producer_handle: std::sync::Mutex::new(_producer_handle), }) } /// Returns the next batch as a DLPack pointer; `Ok(None)` when exhausted. pub fn next_batch(&mut self) -> Result> { - let Some((batch_data, batch_n, sample_size, num_qubits)) = self.take_batch_from_source()? - else { - return Ok(None); + let batch = match self.rx.lock().unwrap().recv() { + Ok(Ok(b)) => b, + Ok(Err(e)) => return Err(e), + Err(_) => return Ok(None), }; - let ptr = self.engine.encode_batch( - &batch_data, - batch_n, - sample_size, - num_qubits, - &self.config.encoding_method, - )?; + let ptr = match &batch.data { + BatchData::F64(buf) => self.engine.encode_batch( + buf, + batch.batch_n, + batch.sample_size, + batch.num_qubits, + &self.config.encoding_method, + )?, + BatchData::F32(buf) => self.engine.encode_batch_f32( + buf, + batch.batch_n, + batch.sample_size, + batch.num_qubits, + &self.config.encoding_method, + )?, + }; + let _ = self.recycle_tx.lock().unwrap().send(batch.data); Ok(Some(ptr)) } } @@ -489,6 +576,7 @@ fn fill_sample(seed: u64, out: &mut [f64], encoding_method: &str, num_qubits: us } /// Generate one batch (batch_size * vector_len elements, or batch_size * 1 for basis). +#[cfg(test)] fn generate_batch(config: &PipelineConfig, batch_idx: usize, vector_len: usize) -> Vec { let mut batch = vec![0.0f64; config.batch_size * vector_len]; fill_batch_inplace(config, batch_idx, vector_len, &mut batch); @@ -518,6 +606,66 @@ fn fill_batch_inplace( } } +/// Deterministic sample generation for f32. +fn fill_sample_f32( + seed: u64, + out: &mut [f32], + encoding_method: &str, + num_qubits: usize, +) -> Result<()> { + let len = out.len(); + if len == 0 { + return Ok(()); + } + match encoding_method.to_lowercase().as_str() { + "basis" => { + let state_space_size = 1 << num_qubits; + let mask = (state_space_size - 1) as u64; + let idx = seed & mask; + out[0] = idx as f32; + } + "angle" => { + let scale = (2.0 * std::f32::consts::PI) / len as f32; + for (i, v) in out.iter_mut().enumerate() { + let mixed = (i as u64 + seed) % (len as u64); + *v = mixed as f32 * scale; + } + } + _ => { + // amplitude + let mask = (len - 1) as u64; + let scale = 1.0 / len as f32; + for (i, v) in out.iter_mut().enumerate() { + let mixed = (i as u64 + seed) & mask; + *v = mixed as f32 * scale; + } + } + } + Ok(()) +} + +fn fill_batch_inplace_f32( + config: &PipelineConfig, + batch_idx: usize, + vector_len: usize, + batch_buf: &mut [f32], +) { + debug_assert_eq!(batch_buf.len(), config.batch_size * vector_len); + let seed_base = config + .seed + .unwrap_or(0) + .wrapping_add((batch_idx * config.batch_size) as u64); + for i in 0..config.batch_size { + let offset = i * vector_len; + let _ = fill_sample_f32( + seed_base + i as u64, + &mut batch_buf[offset..offset + vector_len], + &config.encoding_method, + config.num_qubits as usize, + ); + } +} + /// Release DLPack tensor (call deleter so GPU memory is freed). unsafe fn release_dlpack(ptr: *mut DLManagedTensor) { if ptr.is_null() { @@ -531,47 +679,85 @@ unsafe fn release_dlpack(ptr: *mut DLManagedTensor) { /// Run throughput pipeline: warmup, then timed encode_batch loop; returns stats. pub fn run_throughput_pipeline(config: &PipelineConfig) -> Result { + let mut config = config.clone(); + config.normalize(); + let engine = QdpEngine::new(config.device_id)?; let vector_len = vector_len(config.num_qubits, &config.encoding_method); let num_qubits = config.num_qubits as usize; - // Reuse a single CPU batch buffer to avoid per-iteration allocations in throughput benchmarks. - let mut batch_buf = vec![0.0f64; config.batch_size * vector_len]; - // Warmup - for b in 0..config.warmup_batches { - fill_batch_inplace(config, b, vector_len, &mut batch_buf); - let ptr = engine.encode_batch( - &batch_buf, - config.batch_size, - vector_len, - num_qubits, - &config.encoding_method, - )?; - unsafe { release_dlpack(ptr) }; + if config.float32_pipeline { + let mut batch_buf = vec![0.0f32; config.batch_size * vector_len]; + for b in 0..config.warmup_batches { + fill_batch_inplace_f32(&config, b, vector_len, &mut batch_buf); + let ptr = engine.encode_batch_f32( + &batch_buf, + config.batch_size, + vector_len, + num_qubits, + &config.encoding_method, + )?; + unsafe { release_dlpack(ptr) }; + } + } else { + let mut batch_buf = vec![0.0f64; config.batch_size * vector_len]; + for b in 0..config.warmup_batches { + fill_batch_inplace(&config, b, vector_len, &mut batch_buf); + let ptr = engine.encode_batch( + &batch_buf, + config.batch_size, + vector_len, + num_qubits, + &config.encoding_method, + )?; + unsafe { release_dlpack(ptr) }; + } } - #[cfg(target_os = "linux")] - engine.synchronize()?; - let start = Instant::now(); - for b in 0..config.total_batches { - fill_batch_inplace(config, b, vector_len, &mut batch_buf); - let ptr = engine.encode_batch( - &batch_buf, - config.batch_size, - vector_len, - num_qubits, - &config.encoding_method, - )?; + + let producer = SyntheticProducer::new(config.clone(), vector_len); + let prefetch_depth = config.prefetch_depth; + let (rx, recycle_tx, producer_handle) = spawn_producer(producer, prefetch_depth)?; + + // Iteration loop + let mut total_batches = 0; + while let Ok(result) = rx.recv() { + let batch = result?; + let ptr = match &batch.data { + BatchData::F64(buf) => engine.encode_batch( + buf, + batch.batch_n, + batch.sample_size, + batch.num_qubits, + &config.encoding_method, + )?, + BatchData::F32(buf) => engine.encode_batch_f32( + buf, + batch.batch_n, + batch.sample_size, + batch.num_qubits, + &config.encoding_method, + )?, + }; unsafe { release_dlpack(ptr) }; + total_batches += 1; + let _ = recycle_tx.send(batch.data); } + let _ = producer_handle.join(); + #[cfg(target_os = "linux")] engine.synchronize()?; let duration_sec = start.elapsed().as_secs_f64().max(1e-9); - let total_vectors = config.total_batches * config.batch_size; + let total_vectors = total_batches * config.batch_size; + if total_vectors == 0 { + return Err(MahoutError::InvalidInput( + "No vectors processed in pipeline".into(), + )); + } let vectors_per_sec = total_vectors as f64 / duration_sec; let latency_ms_per_vector = (duration_sec / total_vectors as f64) * 1000.0; @@ -792,4 +978,193 @@ mod tests { ); } } + #[test] + fn test_synthetic_producer_batch_count() { + let config = PipelineConfig { + total_batches: 5, + num_qubits: 3, + batch_size: 4, + encoding_method: "amplitude".to_string(), + ..Default::default() + }; + let vector_len = super::vector_len(config.num_qubits, &config.encoding_method); + let mut producer = SyntheticProducer::new(config, vector_len); + + let mut count = 0; + while let Ok(Some(_)) = producer.produce(None) { + count += 1; + } + assert_eq!(count, 5); + } + + #[test] + fn test_synthetic_producer_data_consistency() { + let config = PipelineConfig { + total_batches: 1, + num_qubits: 3, + batch_size: 4, + encoding_method: "amplitude".to_string(), + ..Default::default() + }; + let vector_len = super::vector_len(config.num_qubits, &config.encoding_method); + let mut producer = SyntheticProducer::new(config.clone(), vector_len); + + let batch_from_producer = producer.produce(None).unwrap().unwrap(); + let expected_data = generate_batch(&config, 0, vector_len); + + assert_eq!(batch_from_producer.data, BatchData::F64(expected_data)); + } + + #[test] + fn test_inmemory_producer_partial_last_batch() { + let config = PipelineConfig { + batch_size: 5, + num_qubits: 2, + encoding_method: "amplitude".to_string(), + ..Default::default() + }; + let sample_size = 4; // 2^2 + let data = vec![0.0f64; 16]; // 16 elements = 4 samples + + let mut producer = InMemoryProducer { + data, + cursor: 0, + sample_size, + batch_size: config.batch_size, + num_qubits: config.num_qubits as usize, + batches_yielded: 0, + batch_limit: 10, + }; + + let batch1 = producer.produce(None).unwrap().unwrap(); + assert_eq!(batch1.batch_n, 4); + + let batch2 = producer.produce(None).unwrap(); + assert!(batch2.is_none()); + } + + #[test] + fn test_spawn_producer_channel_exhaustion() { + let config = PipelineConfig { + total_batches: 3, + prefetch_depth: 16, + ..Default::default() + }; + let vector_len = super::vector_len(config.num_qubits, &config.encoding_method); + let producer = SyntheticProducer::new(config, vector_len); + + let (rx, _recycle_tx, handle) = spawn_producer(producer, 16).unwrap(); + + // We expect 3 batches + assert!(rx.recv().unwrap().is_ok()); + assert!(rx.recv().unwrap().is_ok()); + assert!(rx.recv().unwrap().is_ok()); + + // Iterator should be exhausted, channel should be closed down successfully + assert!(rx.recv().is_err()); + handle.join().unwrap(); + } + + #[test] + fn test_spawn_producer_early_consumer_drop() { + let config = PipelineConfig { + total_batches: 1000, // Very large so producer definitely tries to send multiple + prefetch_depth: 16, + ..Default::default() + }; + let vector_len = super::vector_len(config.num_qubits, &config.encoding_method); + let producer = SyntheticProducer::new(config, vector_len); + + let (rx, _recycle_tx, handle) = spawn_producer(producer, 16).unwrap(); + + // Let it start + assert!(rx.recv().unwrap().is_ok()); + + // Drop rx, closing the channel + drop(rx); + + // Thread should cleanly exit instead of panicking + handle.join().unwrap(); + } + + #[test] + fn test_synthetic_producer_f32_amplitude() { + let mut config = PipelineConfig { + total_batches: 2, + num_qubits: 3, + batch_size: 4, + encoding_method: "amplitude".to_string(), + float32_pipeline: true, + ..Default::default() + }; + config.normalize(); + let vector_len = super::vector_len(config.num_qubits, &config.encoding_method); + let mut producer = SyntheticProducer::new(config, vector_len); + + let batch = producer.produce(None).unwrap().unwrap(); + assert!( + matches!(batch.data, BatchData::F32(_)), + "amplitude with float32_pipeline=true should produce F32 data" + ); + + // Verify data is non-zero (was actually filled) + if let BatchData::F32(ref buf) = batch.data { + assert!( + !buf.iter().all(|&v| v == 0.0), + "batch data should be non-zero" + ); + } + } + + #[test] + fn test_synthetic_producer_f32_fallback_for_angle() { + let mut config = PipelineConfig { + total_batches: 1, + num_qubits: 3, + batch_size: 4, + encoding_method: "angle".to_string(), + float32_pipeline: true, // requested f32, but angle doesn't support it + ..Default::default() + }; + config.normalize(); + let vector_len = super::vector_len(config.num_qubits, &config.encoding_method); + let mut producer = SyntheticProducer::new(config, vector_len); + + let batch = producer.produce(None).unwrap().unwrap(); + assert!( + matches!(batch.data, BatchData::F64(_)), + "angle with float32_pipeline=true should fall back to F64 data" + ); + } + + #[test] + fn test_synthetic_producer_f32_fallback_for_basis() { + let mut config = PipelineConfig { + total_batches: 1, + num_qubits: 3, + batch_size: 4, + encoding_method: "basis".to_string(), + float32_pipeline: true, + ..Default::default() + }; + config.normalize(); + let vector_len = super::vector_len(config.num_qubits, &config.encoding_method); + let mut producer = SyntheticProducer::new(config, vector_len); + + let batch = producer.produce(None).unwrap().unwrap(); + assert!( + matches!(batch.data, BatchData::F64(_)), + "basis with float32_pipeline=true should fall back to F64 data" + ); + } + + #[test] + fn test_encoding_supports_f32() { + assert!(super::encoding_supports_f32("amplitude")); + assert!(super::encoding_supports_f32("Amplitude")); + assert!(super::encoding_supports_f32("AMPLITUDE")); + assert!(!super::encoding_supports_f32("angle")); + assert!(!super::encoding_supports_f32("basis")); + assert!(!super::encoding_supports_f32("iqp")); + } } diff --git a/qdp/qdp-core/src/readers/parquet.rs b/qdp/qdp-core/src/readers/parquet.rs index 97c9787808..ddc0a16ed2 100644 --- a/qdp/qdp-core/src/readers/parquet.rs +++ b/qdp/qdp-core/src/readers/parquet.rs @@ -537,16 +537,18 @@ impl StreamingDataReader for ParquetStreamingReader { } }; - if self.sample_size.is_none() { - self.sample_size = Some(current_sample_size); - limit = calc_limit(current_sample_size); - } else if let Some(expected_size) = self.sample_size - && current_sample_size != expected_size - { - return Err(MahoutError::InvalidInput(format!( - "Inconsistent sample sizes: expected {}, got {}", - expected_size, current_sample_size - ))); + match self.sample_size { + Some(expected_size) if current_sample_size != expected_size => { + return Err(MahoutError::InvalidInput(format!( + "Inconsistent sample sizes: expected {}, got {}", + expected_size, current_sample_size + ))); + } + None => { + self.sample_size = Some(current_sample_size); + limit = calc_limit(current_sample_size); + } + _ => {} } let available = batch_values.len(); diff --git a/qdp/qdp-core/src/readers/tensorflow.rs b/qdp/qdp-core/src/readers/tensorflow.rs index 8ba8efcbd8..0db45245ab 100644 --- a/qdp/qdp-core/src/readers/tensorflow.rs +++ b/qdp/qdp-core/src/readers/tensorflow.rs @@ -202,13 +202,14 @@ impl TensorFlowReader { /// 2. Length check (must be multiple of 8) /// 3. Alignment check (f64 needs 8-byte alignment, Vec handles this automatically) /// 4. Overflow check (ensures no overflow) + #[allow(clippy::manual_is_multiple_of)] fn bytes_to_f64_vec(bytes: &Bytes) -> Result> { if !cfg!(target_endian = "little") { return Err(MahoutError::NotImplemented( "Big-endian platforms are not supported for TensorFlow tensor_content".into(), )); } - if !bytes.len().is_multiple_of(8) { + if bytes.len() % 8 != 0 { return Err(MahoutError::InvalidInput(format!( "tensor_content length {} is not a multiple of 8", bytes.len() diff --git a/qdp/qdp-core/tests/common/mod.rs b/qdp/qdp-core/tests/common/mod.rs index 402a896171..7114ad613a 100644 --- a/qdp/qdp-core/tests/common/mod.rs +++ b/qdp/qdp-core/tests/common/mod.rs @@ -45,11 +45,12 @@ pub fn create_test_data_f32(size: usize) -> Vec { /// Writes a FixedSizeList Parquet file for streaming encoder tests. /// Each `sample_size` consecutive values in `data` form one row. #[allow(dead_code)] +#[allow(clippy::manual_is_multiple_of)] pub fn write_fixed_size_list_parquet(path: &str, data: &[f64], sample_size: usize) { assert!(sample_size > 0, "sample_size must be > 0"); assert!( - data.len().is_multiple_of(sample_size), - "data.len() ({}) must be a multiple of sample_size ({})", + data.len() % sample_size == 0, + "Data length ({}) must be a multiple of sample size ({})", data.len(), sample_size ); diff --git a/qdp/qdp-core/tests/gpu_iqp_encoding.rs b/qdp/qdp-core/tests/gpu_iqp_encoding.rs index 6ca8e987aa..e441f3640d 100644 --- a/qdp/qdp-core/tests/gpu_iqp_encoding.rs +++ b/qdp/qdp-core/tests/gpu_iqp_encoding.rs @@ -811,15 +811,17 @@ fn test_iqp_encoder_via_factory() { // Clean up unsafe { - if let Ok(ptr) = result1 - && let Some(d) = (*ptr).deleter - { - d(ptr); + #[allow(clippy::collapsible_if)] + if let Ok(ptr) = result1 { + if let Some(d) = (*ptr).deleter { + d(ptr); + } } - if let Ok(ptr) = result2 - && let Some(d) = (*ptr).deleter - { - d(ptr); + #[allow(clippy::collapsible_if)] + if let Ok(ptr) = result2 { + if let Some(d) = (*ptr).deleter { + d(ptr); + } } } @@ -843,10 +845,11 @@ fn test_iqp_z_encoder_via_factory() { assert!(result.is_ok(), "'iqp-z' should work"); unsafe { - if let Ok(ptr) = result - && let Some(d) = (*ptr).deleter - { - d(ptr); + #[allow(clippy::collapsible_if)] + if let Ok(ptr) = result { + if let Some(d) = (*ptr).deleter { + d(ptr); + } } } diff --git a/qdp/qdp-python/qumat_qdp/api.py b/qdp/qdp-python/qumat_qdp/api.py index 3661ede56c..ec8715d5ac 100644 --- a/qdp/qdp-python/qumat_qdp/api.py +++ b/qdp/qdp-python/qumat_qdp/api.py @@ -32,6 +32,7 @@ from __future__ import annotations from dataclasses import dataclass +from typing import Any @dataclass @@ -51,7 +52,7 @@ class LatencyResult: # Cached reference to Rust pipeline (avoids repeated import). -_run_throughput_pipeline_py: object | None = None +_run_throughput_pipeline_py: Any = None def _get_run_throughput_pipeline_py(): @@ -127,6 +128,7 @@ def run_throughput(self) -> ThroughputResult: encoding_method=self._encoding_method, warmup_batches=self._warmup_batches, seed=None, + float32_pipeline=True, ) return ThroughputResult( duration_sec=duration_sec, vectors_per_sec=vectors_per_sec @@ -148,6 +150,7 @@ def run_latency(self) -> LatencyResult: encoding_method=self._encoding_method, warmup_batches=self._warmup_batches, seed=None, + float32_pipeline=True, ) return LatencyResult( duration_sec=duration_sec, diff --git a/qdp/qdp-python/src/engine.rs b/qdp/qdp-python/src/engine.rs index 69b857298a..b2b006ff78 100644 --- a/qdp/qdp-python/src/engine.rs +++ b/qdp/qdp-python/src/engine.rs @@ -633,6 +633,7 @@ impl QdpEngine { total_batches, seed, nh, + true, ); let iter = qdp_core::PipelineIterator::new_synthetic(self.engine.clone(), config).map_err( |e| PyRuntimeError::new_err(format!("create_synthetic_loader failed: {}", e)), @@ -665,6 +666,7 @@ impl QdpEngine { 0, None, nh, + true, // float32_pipeline ); let engine = self.engine.clone(); // Resolve remote URLs before detaching from GIL. The _resolved guard keeps the @@ -713,6 +715,7 @@ impl QdpEngine { 0, None, nh, + true, // float32_pipeline ); let engine = self.engine.clone(); // Resolve remote URLs before detaching from GIL. The _resolved guard keeps the diff --git a/qdp/qdp-python/src/lib.rs b/qdp/qdp-python/src/lib.rs index 014f992baf..5348c3f4af 100644 --- a/qdp/qdp-python/src/lib.rs +++ b/qdp/qdp-python/src/lib.rs @@ -31,7 +31,7 @@ use loader::PyQuantumLoader; #[cfg(target_os = "linux")] #[pyfunction] -#[pyo3(signature = (device_id, num_qubits, batch_size, total_batches, encoding_method, warmup_batches=0, seed=None))] +#[pyo3(signature = (device_id, num_qubits, batch_size, total_batches, encoding_method, warmup_batches=0, seed=None, float32_pipeline=false))] #[allow(clippy::too_many_arguments)] fn run_throughput_pipeline_py( py: Python<'_>, @@ -42,6 +42,7 @@ fn run_throughput_pipeline_py( encoding_method: String, warmup_batches: usize, seed: Option, + float32_pipeline: bool, ) -> PyResult<(f64, f64, f64)> { let config = qdp_core::PipelineConfig { device_id, @@ -52,6 +53,8 @@ fn run_throughput_pipeline_py( seed, warmup_batches, null_handling: qdp_core::NullHandling::default(), + float32_pipeline, + prefetch_depth: 16, }; let result = py .detach(|| qdp_core::run_throughput_pipeline(&config)) diff --git a/qdp/qdp-python/src/loader.rs b/qdp/qdp-python/src/loader.rs index 87749d688a..7ad7632cb9 100644 --- a/qdp/qdp-python/src/loader.rs +++ b/qdp/qdp-python/src/loader.rs @@ -84,6 +84,7 @@ mod loader_impl { } /// Build PipelineConfig from Python args. device_id is 0 (engine does not expose it); iterator uses engine clone with correct device. + #[allow(clippy::too_many_arguments)] pub fn config_from_args( _engine: &CoreEngine, batch_size: usize, @@ -92,6 +93,7 @@ mod loader_impl { total_batches: usize, seed: Option, null_handling: NullHandling, + float32_pipeline: bool, ) -> PipelineConfig { PipelineConfig { device_id: 0, @@ -102,6 +104,8 @@ mod loader_impl { seed, warmup_batches: 0, null_handling, + float32_pipeline, + prefetch_depth: 16, } }