diff --git a/Cargo.lock b/Cargo.lock index d416eaef5..32d09de43 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -321,7 +321,7 @@ dependencies = [ "proptest", "rand 0.9.2", "ruint", - "rustc-hash", + "rustc-hash 2.1.1", "serde", "sha3", "tiny-keccak", @@ -1795,7 +1795,7 @@ dependencies = [ "proc-macro2", "quote", "regex", - "rustc-hash", + "rustc-hash 2.1.1", "shlex", "syn 2.0.109", ] @@ -2759,6 +2759,22 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "dhat" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98cd11d84628e233de0ce467de10b8633f4ddaecafadefc86e13b84b8739b827" +dependencies = [ + "backtrace", + "lazy_static", + "mintex", + "parking_lot", + "rustc-hash 1.1.0", + "serde", + "serde_json", + "thousands", +] + [[package]] name = "difflib" version = "0.4.0" @@ -4229,6 +4245,7 @@ dependencies = [ "ciborium", "clap", "console_error_panic_hook", + "dhat", "enum_dispatch", "futures-util", "hex", @@ -4606,6 +4623,12 @@ dependencies = [ "adler2", ] +[[package]] +name = "mintex" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c505b3e17ed6b70a7ed2e67fbb2c560ee327353556120d6e72f5232b6880d536" + [[package]] name = "mio" version = "1.1.0" @@ -5844,7 +5867,7 @@ dependencies = [ "pin-project-lite", "quinn-proto", "quinn-udp", - "rustc-hash", + "rustc-hash 2.1.1", "rustls 0.23.31", "socket2 0.6.1", "thiserror 2.0.12", @@ -5864,7 +5887,7 @@ dependencies = [ "lru-slab", "rand 0.9.2", "ring", - "rustc-hash", + "rustc-hash 2.1.1", "rustls 0.23.31", "rustls-pki-types", "slab", @@ -6401,6 +6424,12 @@ version = "0.1.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56f7d92ca342cea22a06f2121d944b4fd82af56988c270852495420f961d4ace" +[[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" + [[package]] name = "rustc-hash" version = "2.1.1" @@ -7557,6 +7586,12 @@ dependencies = [ "syn 2.0.109", ] +[[package]] +name = "thousands" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3bf63baf9f5039dadc247375c29eb13706706cfde997d0330d05aa63a77d8820" + [[package]] name = "thread_local" version = "1.1.9" diff --git a/Cargo.toml b/Cargo.toml index 80b130292..ed9d05081 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -116,6 +116,7 @@ crypto-bigint = { version = "=0.6.1", features = ["serde", "rand_core", "extra-s ctor = "=0.4.2" # Constructor functions - HIGH RISK: Individual maintainer (mmastrac), test-only dependency dashmap = "=6.1.0" # Concurrent hashmap - HIGH RISK: Individual maintainer (xacrimon), despite 156M+ downloads derive_more = { version = "=2.0.1", features = ["display"] } # Derive macros for common traits - HIGH RISK: Individual maintainer (JelteF), despite 180M+ downloads +dhat = "=0.3.3" # Heap profiling - MEDIUM RISK: David Tolnay adjacent (rustacean), useful for memory debugging, dev-only enum_dispatch = "=0.3.13" # Enum dispatch optimization - HIGH RISK: Individual maintainer (Anton Lazarev), despite 29M+ downloads futures = "=0.3.31" # Async futures - LOW RISK: rust-lang team futures-util = "=0.3.31" # Futures utilities - LOW RISK: rust-lang team diff --git a/core/service/Cargo.toml b/core/service/Cargo.toml index 68b4e212f..333295c5c 100644 --- a/core/service/Cargo.toml +++ b/core/service/Cargo.toml @@ -65,6 +65,7 @@ cfg-if.workspace = true ciborium.workspace = true clap = { workspace = true, features = ["derive"] } console_error_panic_hook.workspace = true +dhat = { workspace = true, optional = true } enum_dispatch.workspace = true futures-util.workspace = true hex.workspace = true @@ -183,3 +184,7 @@ insecure = [ "threshold-fhe/testing", "dep:nsm-nitro-enclave-utils" ] +# Memory profiling feature for debugging memory consumption issues +# Enable with: cargo build --features memory-profiling +# Produces dhat-heap.json file that can be viewed with https://nnethercote.github.io/dh_view/dh_view.html +memory-profiling = ["dep:dhat"] diff --git a/core/service/src/bin/kms-server.rs b/core/service/src/bin/kms-server.rs index 3f9413f08..5b74a8e3c 100644 --- a/core/service/src/bin/kms-server.rs +++ b/core/service/src/bin/kms-server.rs @@ -331,7 +331,18 @@ async fn build_tls_config( Ok((server_config, client_config, verifier)) } +// Memory profiling with dhat (enable with --features memory-profiling) +// View results at: https://nnethercote.github.io/dh_view/dh_view.html +#[cfg(feature = "memory-profiling")] +#[global_allocator] +static ALLOC: dhat::Alloc = dhat::Alloc; + fn main() -> anyhow::Result<()> { + // Initialize dhat profiler when memory-profiling feature is enabled + // This will produce a dhat-heap.json file on program exit + #[cfg(feature = "memory-profiling")] + let _profiler = dhat::Profiler::new_heap(); + let args = KmsArgs::parse(); // NOTE: this config is only needed to set up the tokio runtime // we read it again in [main_exec] to set up the rest of the server diff --git a/core/service/src/engine/threshold/service/key_generator.rs b/core/service/src/engine/threshold/service/key_generator.rs index 5809ddd85..9116f0652 100644 --- a/core/service/src/engine/threshold/service/key_generator.rs +++ b/core/service/src/engine/threshold/service/key_generator.rs @@ -53,10 +53,7 @@ use crate::{ BaseKmsStruct, KeyGenMetadata, DSEP_PUBDATA_KEY, }, keyset_configuration::InternalKeySetConfig, - threshold::{ - service::{session::ImmutableSessionMaker, ThresholdFheKeys}, - traits::KeyGenerator, - }, + threshold::{service::session::ImmutableSessionMaker, traits::KeyGenerator}, utils::MetricedError, validation::{ parse_optional_proto_request_id, parse_proto_request_id, RequestIdParsingErr, @@ -1092,39 +1089,14 @@ impl< } }; - let (integer_server_key, decompression_key, sns_key) = { - let ( - raw_server_key, - _raw_ksk_material, - _raw_compression_key, - raw_decompression_key, - raw_noise_squashing_key, - _raw_noise_squashing_compression_key, - _raw_rerandomization_key, - _raw_tag, - ) = pub_key_set.server_key.clone().into_raw_parts(); - ( - raw_server_key, - raw_decompression_key, - raw_noise_squashing_key, - ) - }; - - let threshold_fhe_keys = ThresholdFheKeys { - private_keys: Arc::new(private_keys), - integer_server_key: Arc::new(integer_server_key), - sns_key: sns_key.map(Arc::new), - decompression_key: decompression_key.map(Arc::new), - meta_data: info.clone(), - }; - - //Note: We can't easily check here whether we succeeded writing to the meta store - //thus we can't increment the error counter if it fails + // Memory optimization: Storage function now serializes server_key first, + // then consumes it to extract components. This eliminates the need to clone the large + // server_key structure (~tens of GiB with production parameters). crypto_storage .write_threshold_keys_with_dkg_meta_store( req_id, epoch_id, - threshold_fhe_keys, + private_keys, pub_key_set, info, meta_store, diff --git a/core/service/src/engine/threshold/service/kms_impl.rs b/core/service/src/engine/threshold/service/kms_impl.rs index 8ab9bdf07..5a63c000d 100644 --- a/core/service/src/engine/threshold/service/kms_impl.rs +++ b/core/service/src/engine/threshold/service/kms_impl.rs @@ -770,5 +770,29 @@ mod tests { (priv_key_set, pub_key_set) } + + /// Initializes dummy keys for storage tests. + /// Returns (PrivateKeySet, FhePubKeySet) for the new memory-optimized storage API. + pub fn init_dummy_for_storage( + param: threshold_fhe::execution::tfhe_internals::parameters::DKGParams, + tag: tfhe::Tag, + rng: &mut R, + ) -> ( + PrivateKeySet<{ ResiduePolyF4Z128::EXTENSION_DEGREE }>, + FhePubKeySet, + ) { + let keyset = threshold_fhe::execution::tfhe_internals::test_feature::gen_key_set( + param, tag, rng, + ); + + let pub_key_set = FhePubKeySet { + public_key: keyset.public_keys.public_key, + server_key: keyset.public_keys.server_key, + }; + + let priv_key_set = PrivateKeySet::init_dummy(param); + + (priv_key_set, pub_key_set) + } } } diff --git a/core/service/src/engine/threshold/service/public_decryptor.rs b/core/service/src/engine/threshold/service/public_decryptor.rs index bc31d5d03..e1af9dbbf 100644 --- a/core/service/src/engine/threshold/service/public_decryptor.rs +++ b/core/service/src/engine/threshold/service/public_decryptor.rs @@ -871,9 +871,9 @@ mod tests { let key_id = RequestId::new_random(rng); - // make a dummy private keyset - let (threshold_fhe_keys, fhe_key_set) = - ThresholdFheKeys::init_dummy(param, key_id.into(), rng); + // make a dummy private keyset for storage + let (private_keys, fhe_key_set) = + ThresholdFheKeys::init_dummy_for_storage(param, key_id.into(), rng); // Not a huge deal if we clone this server key since we only use small/test parameters tfhe::set_server_key(fhe_key_set.server_key.clone()); @@ -912,7 +912,7 @@ mod tests { .write_threshold_keys_with_dkg_meta_store( &key_id, &epoch_id, - threshold_fhe_keys, + private_keys, fhe_key_set, info, Arc::clone(&key_meta_store), diff --git a/core/service/src/engine/threshold/service/resharer.rs b/core/service/src/engine/threshold/service/resharer.rs index be9ce14e7..79bd48fb8 100644 --- a/core/service/src/engine/threshold/service/resharer.rs +++ b/core/service/src/engine/threshold/service/resharer.rs @@ -5,10 +5,7 @@ use crate::{ compute_info_standard_keygen, retrieve_parameters, BaseKmsStruct, KeyGenMetadata, DSEP_PUBDATA_KEY, }, - threshold::{ - service::{session::ImmutableSessionMaker, ThresholdFheKeys}, - traits::Resharer, - }, + threshold::{service::session::ImmutableSessionMaker, traits::Resharer}, utils::MetricedError, validation::{ parse_optional_proto_request_id, parse_proto_request_id, RequestIdParsingErr, @@ -570,9 +567,6 @@ impl::new(1, 1)); @@ -618,12 +604,14 @@ impl ( ThresholdCryptoMaterialStorage, - ThresholdFheKeys, + PrivateKeySet<4>, FhePubKeySet, ) { let crypto_storage = ThresholdCryptoMaterialStorage::new( @@ -555,16 +556,7 @@ fn setup_threshold_store( keygen_all_party_shares_from_keyset(&keyset, pbs_params, &mut rng, 4, 1).unwrap(); let fhe_key_set = keyset.public_keys.clone(); + let private_keys = key_shares[0].to_owned(); - let (integer_server_key, _, _, _, sns_key, _, _, _) = - keyset.public_keys.server_key.clone().into_raw_parts(); - - let threshold_fhe_keys = ThresholdFheKeys { - private_keys: Arc::new(key_shares[0].to_owned()), - integer_server_key: Arc::new(integer_server_key), - sns_key: sns_key.map(Arc::new), - decompression_key: None, - meta_data: dummy_info(), - }; - (crypto_storage, threshold_fhe_keys, fhe_key_set) + (crypto_storage, private_keys, fhe_key_set) } diff --git a/core/service/src/vault/storage/crypto_material/threshold.rs b/core/service/src/vault/storage/crypto_material/threshold.rs index d67f4cba3..2086a6570 100644 --- a/core/service/src/vault/storage/crypto_material/threshold.rs +++ b/core/service/src/vault/storage/crypto_material/threshold.rs @@ -12,7 +12,10 @@ use kms_grpc::{ RequestId, }; use tfhe::{integer::compression_keys::DecompressionKey, zk::CompactPkeCrs}; -use threshold_fhe::execution::tfhe_internals::public_keysets::FhePubKeySet; +use threshold_fhe::{ + algebra::{galois_rings::degree_4::ResiduePolyF4Z128, structure_traits::Ring}, + execution::tfhe_internals::{private_keysets::PrivateKeySet, public_keysets::FhePubKeySet}, +}; use crate::{ engine::{ @@ -93,23 +96,73 @@ impl::crs_exists(&self.inner, req_id).await } + /// Internal function to write threshold keys with memory optimization. + /// + /// Memory optimization: Serializes server_key FIRST, then consumes it + /// with into_raw_parts() to extract components for ThresholdFheKeys. This eliminates + /// the need to clone the large server_key structure (~tens of GiB with production params). #[allow(clippy::too_many_arguments)] async fn inner_write_threshold_keys( &self, reshare_id: Option<&RequestId>, key_id: &RequestId, epoch_id: &EpochId, - threshold_fhe_keys: ThresholdFheKeys, + private_keys: PrivateKeySet<{ ResiduePolyF4Z128::EXTENSION_DEGREE }>, fhe_key_set: FhePubKeySet, info: KeyGenMetadata, meta_store: Arc>>, ) { // use guarded_meta_store as the synchronization point - // all other locks are taken as needed so that we don't lock up - // other function calls too much let mut guarded_meta_storage = meta_store.write().await; + + // Step 1: Serialize server_key to public storage FIRST (before consuming it) + let server_result = { + let mut pub_storage = self.inner.public_storage.lock().await; + let result = store_versioned_at_request_id( + &mut (*pub_storage), + key_id, + &fhe_key_set.server_key, + &PubDataType::ServerKey.to_string(), + ) + .await; + + if let Err(e) = &result { + tracing::error!("Failed to store server key for request {}: {}", key_id, e); + } else { + log_storage_success( + key_id, + pub_storage.info(), + &PubDataType::ServerKey.to_string(), + true, + true, + ); + } + result + }; // pub_storage lock released here + + // Step 2: Consume server_key to extract components + let ( + integer_server_key, + _raw_ksk_material, + _raw_compression_key, + decompression_key, + sns_key, + _raw_noise_squashing_compression_key, + _raw_rerandomization_key, + _raw_tag, + ) = fhe_key_set.server_key.into_raw_parts(); + + // Step 3: Construct ThresholdFheKeys with extracted components + let threshold_fhe_keys = ThresholdFheKeys { + private_keys: Arc::new(private_keys), + integer_server_key: Arc::new(integer_server_key), + sns_key: sns_key.map(Arc::new), + decompression_key: decompression_key.map(Arc::new), + meta_data: info.clone(), + }; + + // Step 4: Run remaining storage operations in parallel let (r1, r2, r3) = { - // Lock the storage components in the correct order to avoid deadlocks. let mut pub_storage = self.inner.public_storage.lock().await; let mut priv_storage = self.inner.private_storage.lock().await; let back_vault = match self.inner.backup_vault { @@ -144,6 +197,7 @@ impl, fhe_key_set: FhePubKeySet, info: KeyGenMetadata, meta_store: Arc>>, @@ -282,24 +316,28 @@ impl, fhe_key_set: FhePubKeySet, info: KeyGenMetadata, meta_store: Arc>>, @@ -308,7 +346,7 @@ impl>, cpu_load_gauge: TaggedMetric>, memory_usage_gauge: TaggedMetric>, + process_memory_gauge: TaggedMetric>, // Process-specific memory usage (more accurate for cross-party comparison) file_descriptor_gauge: TaggedMetric>, // Number of file descriptors of the KMS socat_file_descriptor_gauge: TaggedMetric>, // Number of socat file descriptors - socat_task_gauge: TaggedMetric>, // Number of socat file descriptors - task_gauge: TaggedMetric>, // Numbers active child processes of the KMS + socat_task_gauge: TaggedMetric>, // Number of socat file descriptors + task_gauge: TaggedMetric>, // Numbers active child processes of the KMS // Internal system gauges // TODO rate limiter, session gauge and meta store should actually be counters but we need to add decorators to ensure it is always updated rate_limiter_gauge: TaggedMetric>, // Number tokens used in the rate limiter @@ -113,6 +114,8 @@ impl CoreMetrics { let cpu_load_metric: Cow<'static, str> = format!("{}_cpu_load", config.prefix).into(); let memory_usage_metric: Cow<'static, str> = format!("{}_memory_usage", config.prefix).into(); + let process_memory_metric: Cow<'static, str> = + format!("{}_process_memory", config.prefix).into(); let network_rx_metric: Cow<'static, str> = format!("{}_network_rx_bytes", config.prefix).into(); let network_tx_metric: Cow<'static, str> = @@ -193,12 +196,22 @@ impl CoreMetrics { let memory_gauge = meter .u64_gauge(memory_usage_metric) - .with_description("Memory used for KMS") + .with_description("Total system memory used (may vary by instance type)") .with_unit("bytes") .build(); //Record 0 just to make sure the gauge is exported memory_gauge.record(0, &[]); + let process_memory_gauge = meter + .u64_gauge(process_memory_metric) + .with_description( + "Memory used by the KMS process (accurate for cross-party comparison)", + ) + .with_unit("bytes") + .build(); + //Record 0 just to make sure the gauge is exported + process_memory_gauge.record(0, &[]); + let file_descriptor_gauge = meter .u64_gauge(file_descriptors_metric) .with_description("File descriptor usage for the KMS") @@ -288,6 +301,7 @@ impl CoreMetrics { size_histogram: TaggedMetric::new(size_histogram), cpu_load_gauge: TaggedMetric::new(cpu_gauge), memory_usage_gauge: TaggedMetric::new(memory_gauge), + process_memory_gauge: TaggedMetric::new(process_memory_gauge), file_descriptor_gauge: TaggedMetric::new(file_descriptor_gauge), socat_file_descriptor_gauge: TaggedMetric::new(socat_file_descriptor_gauge), socat_task_gauge: TaggedMetric::new(socat_task_gauge), @@ -403,13 +417,22 @@ impl CoreMetrics { .record(load, &self.cpu_load_gauge.with_tags(&[])); } - /// Record the current memory usage into the gauge + /// Record the current memory usage into the gauge (total system memory) pub fn record_memory_usage(&self, usage: u64) { self.memory_usage_gauge .metric .record(usage, &self.memory_usage_gauge.with_tags(&[])); } + /// Record the current process-specific memory usage into the gauge + /// This is more accurate for cross-party comparison as it excludes + /// system memory usage that varies by instance type + pub fn record_process_memory(&self, usage: u64) { + self.process_memory_gauge + .metric + .record(usage, &self.process_memory_gauge.with_tags(&[])); + } + /// Record the current number of tasks into the gauge pub fn record_tasks(&self, count: u64) { self.task_gauge diff --git a/observability/src/sys_metrics.rs b/observability/src/sys_metrics.rs index 8663ccc0b..b5601a82c 100644 --- a/observability/src/sys_metrics.rs +++ b/observability/src/sys_metrics.rs @@ -7,11 +7,17 @@ pub fn start_sys_metrics_collection(refresh_interval: Duration) -> anyhow::Resul let specifics = RefreshKind::nothing() .with_cpu(CpuRefreshKind::nothing()) .with_memory(MemoryRefreshKind::nothing().with_ram()) - .with_processes(ProcessRefreshKind::nothing()); + .with_processes(ProcessRefreshKind::nothing().with_memory()); let mut system = sysinfo::System::new_with_specifics(specifics); let num_cpus = system.cpus().len(); + // Get current process PID for process-specific memory tracking + let current_pid = sysinfo::get_current_pid().ok(); + if current_pid.is_none() { + tracing::warn!("Could not get current PID for process memory tracking"); + } + let total_ram = system.total_memory(); let free_ram = system.free_memory(); tracing::info!("Starting system metrics collection...\n Running on {} CPUs. Total memory: {} bytes, Free memory: {} bytes.", @@ -31,9 +37,18 @@ pub fn start_sys_metrics_collection(refresh_interval: Duration) -> anyhow::Resul METRICS.record_cpu_load(cpus_load_avg); - // Update memory metrics + // Update memory metrics (total system memory) METRICS.record_memory_usage(system.used_memory()); + // Update process-specific memory (more accurate for cross-party comparison) + if let Some(pid) = current_pid { + if let Some(process) = system.process(pid) { + METRICS.record_process_memory(process.memory()); + } else { + tracing::debug!("Could not find process {:?} for memory tracking", pid); + } + } + // Update network metrics networks.refresh(true); let (total_tx, total_rx) = networks.iter().fold((0u64, 0u64), |(tx, rx), net| {