Skip to content

Commit 87757db

Browse files
committed
feat(query): add spill backoff with sleep for low-memory queries under global pressure
When global memory pressure triggers spill but the current query's memory usage is low, sleep with exponential backoff before spilling to give other queries a chance to release memory. Rename low_query_threshold to low_query_memory_bytes for clarity and add observability logging.
1 parent 801c2c1 commit 87757db

File tree

6 files changed

+320
-4
lines changed

6 files changed

+320
-4
lines changed

src/query/pipeline/transforms/src/processors/memory_settings.rs

Lines changed: 222 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,62 @@ use bytesize::ByteSize;
2020
use databend_common_base::runtime::GLOBAL_MEM_STAT;
2121
use databend_common_base::runtime::MemStat;
2222
use databend_common_base::runtime::ThreadTracker;
23+
use log::info;
24+
25+
const GLOBAL_PRESSURE_SLEEP_BACKOFF_INIT_MS: u64 = 200;
26+
27+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28+
pub enum SpillDecision {
29+
NoSpill,
30+
SpillNow,
31+
Sleep(u64),
32+
}
33+
34+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35+
pub struct SpillBackoffSettings {
36+
pub max_sleep_ms: u64,
37+
pub low_query_memory_bytes: u64,
38+
}
39+
40+
impl SpillBackoffSettings {
41+
fn should_backoff(&self, query_usage: usize) -> bool {
42+
query_usage <= self.low_query_memory_bytes as usize
43+
}
44+
}
45+
46+
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
47+
pub struct SpillBackoffState {
48+
consumed_sleep_ms: u64,
49+
attempts: u32,
50+
}
51+
52+
impl SpillBackoffState {
53+
fn reset(&mut self) {
54+
self.consumed_sleep_ms = 0;
55+
self.attempts = 0;
56+
}
57+
58+
fn next_sleep_ms(&mut self, max_sleep_ms: u64) -> Option<u64> {
59+
if max_sleep_ms == 0 {
60+
return None;
61+
}
62+
63+
let remaining_budget = max_sleep_ms.saturating_sub(self.consumed_sleep_ms);
64+
if remaining_budget == 0 {
65+
return None;
66+
}
67+
68+
let mut delay_ms = GLOBAL_PRESSURE_SLEEP_BACKOFF_INIT_MS;
69+
for _ in 0..self.attempts.min(16) {
70+
delay_ms = delay_ms.saturating_mul(2);
71+
}
72+
73+
let actual_sleep_ms = delay_ms.min(remaining_budget);
74+
self.consumed_sleep_ms = self.consumed_sleep_ms.saturating_add(actual_sleep_ms);
75+
self.attempts = self.attempts.saturating_add(1);
76+
Some(actual_sleep_ms)
77+
}
78+
}
2379

2480
#[derive(Clone)]
2581
#[non_exhaustive]
@@ -35,6 +91,8 @@ pub struct MemorySettings {
3591
pub enable_query_level_spill: bool,
3692
pub max_query_memory_usage: usize,
3793
pub query_memory_tracking: Option<Arc<MemStat>>,
94+
95+
pub spill_backoff: Option<SpillBackoffSettings>,
3896
}
3997

4098
impl Debug for MemorySettings {
@@ -69,6 +127,7 @@ impl Debug for MemorySettings {
69127
}
70128

71129
f.field("enable_query_level_spill", &self.enable_query_level_spill)
130+
.field("spill_backoff", &self.spill_backoff)
72131
.field("spill_unit_size", &self.spill_unit_size)
73132
.finish()
74133
}
@@ -82,6 +141,8 @@ pub struct MemorySettingsBuilder {
82141
max_query_memory_usage: Option<usize>,
83142
query_memory_tracking: Option<Arc<MemStat>>,
84143

144+
spill_backoff: Option<SpillBackoffSettings>,
145+
85146
spill_unit_size: Option<usize>,
86147
}
87148

@@ -111,6 +172,11 @@ impl MemorySettingsBuilder {
111172
self
112173
}
113174

175+
pub fn with_spill_backoff(mut self, spill_backoff: Option<SpillBackoffSettings>) -> Self {
176+
self.spill_backoff = spill_backoff;
177+
self
178+
}
179+
114180
pub fn build(self) -> MemorySettings {
115181
MemorySettings {
116182
enable_group_spill: self.enable_group_spill,
@@ -123,6 +189,8 @@ impl MemorySettingsBuilder {
123189
max_query_memory_usage: self.max_query_memory_usage.unwrap_or(usize::MAX),
124190
query_memory_tracking: self.query_memory_tracking,
125191

192+
spill_backoff: self.spill_backoff,
193+
126194
spill_unit_size: self.spill_unit_size.unwrap_or(0),
127195
}
128196
}
@@ -138,6 +206,8 @@ impl MemorySettings {
138206
max_query_memory_usage: None,
139207
query_memory_tracking: None,
140208

209+
spill_backoff: None,
210+
141211
spill_unit_size: None,
142212
}
143213
}
@@ -164,6 +234,55 @@ impl MemorySettings {
164234
}
165235
}
166236

237+
pub fn check_spill_with_backoff(&self, backoff_state: &mut SpillBackoffState) -> SpillDecision {
238+
let decision = (|| {
239+
if !self.check_spill() {
240+
return SpillDecision::NoSpill;
241+
}
242+
243+
let Some(spill_backoff) = self.spill_backoff else {
244+
return SpillDecision::SpillNow;
245+
};
246+
247+
let Some(query_usage) = self.current_query_usage() else {
248+
return SpillDecision::SpillNow;
249+
};
250+
251+
if !spill_backoff.should_backoff(query_usage) {
252+
return SpillDecision::SpillNow;
253+
}
254+
255+
match backoff_state.next_sleep_ms(spill_backoff.max_sleep_ms) {
256+
Some(sleep_ms) => {
257+
info!(
258+
"Spill backoff: sleeping {}ms (query_usage={}, threshold={}, consumed={}ms/{}ms, attempt={})",
259+
sleep_ms,
260+
ByteSize(query_usage as u64),
261+
ByteSize(spill_backoff.low_query_memory_bytes),
262+
backoff_state.consumed_sleep_ms,
263+
spill_backoff.max_sleep_ms,
264+
backoff_state.attempts,
265+
);
266+
SpillDecision::Sleep(sleep_ms)
267+
}
268+
None => {
269+
info!(
270+
"Spill backoff: budget exhausted, proceeding to spill (query_usage={}, consumed={}ms/{}ms)",
271+
ByteSize(query_usage as u64),
272+
backoff_state.consumed_sleep_ms,
273+
spill_backoff.max_sleep_ms,
274+
);
275+
SpillDecision::SpillNow
276+
}
277+
}
278+
})();
279+
280+
if !matches!(decision, SpillDecision::Sleep(_)) {
281+
backoff_state.reset();
282+
}
283+
decision
284+
}
285+
167286
fn check_global(&self) -> Option<isize> {
168287
self.enable_global_level_spill.then(|| {
169288
let usage = self.global_memory_tracking.get_memory_usage();
@@ -200,7 +319,7 @@ impl MemorySettings {
200319
return None;
201320
}
202321

203-
let usage = self.query_memory_tracking.as_ref()?.get_memory_usage();
322+
let usage = self.current_query_usage()?;
204323

205324
Some(if usage >= self.max_query_memory_usage {
206325
-((usage - self.max_query_memory_usage) as isize)
@@ -219,6 +338,12 @@ impl MemorySettings {
219338
.flatten()
220339
.reduce(|a, b| a.min(b))
221340
}
341+
342+
fn current_query_usage(&self) -> Option<usize> {
343+
self.query_memory_tracking
344+
.as_ref()
345+
.map(|tracking| tracking.get_memory_usage())
346+
}
222347
}
223348

224349
#[cfg(test)]
@@ -239,6 +364,7 @@ mod tests {
239364
max_query_memory_usage: 0,
240365
query_memory_tracking: None,
241366
enable_query_level_spill: false,
367+
spill_backoff: None,
242368
spill_unit_size: 4096,
243369
}
244370
}
@@ -370,4 +496,99 @@ mod tests {
370496
};
371497
assert!(!settings.check_spill());
372498
}
499+
500+
#[test]
501+
fn backoff_sleeps_when_spill_triggered_and_query_is_low_memory() {
502+
let query_mem = create_mem_stat(40);
503+
let global_mem = create_static_mem_stat(100);
504+
let settings = MemorySettings {
505+
enable_global_level_spill: true,
506+
global_memory_tracking: global_mem,
507+
max_memory_usage: 100,
508+
query_memory_tracking: Some(query_mem),
509+
spill_backoff: Some(SpillBackoffSettings {
510+
max_sleep_ms: 500,
511+
low_query_memory_bytes: 50,
512+
}),
513+
..Default::default()
514+
};
515+
516+
let mut backoff_state = SpillBackoffState::default();
517+
518+
assert_eq!(
519+
settings.check_spill_with_backoff(&mut backoff_state),
520+
SpillDecision::Sleep(200)
521+
);
522+
assert_eq!(
523+
settings.check_spill_with_backoff(&mut backoff_state),
524+
SpillDecision::Sleep(300)
525+
);
526+
assert_eq!(
527+
settings.check_spill_with_backoff(&mut backoff_state),
528+
SpillDecision::SpillNow
529+
);
530+
}
531+
532+
#[test]
533+
fn backoff_spills_immediately_when_query_is_not_low_memory() {
534+
let query_mem = create_mem_stat(60);
535+
let global_mem = create_static_mem_stat(100);
536+
let settings = MemorySettings {
537+
enable_global_level_spill: true,
538+
global_memory_tracking: global_mem,
539+
max_memory_usage: 100,
540+
query_memory_tracking: Some(query_mem),
541+
spill_backoff: Some(SpillBackoffSettings {
542+
max_sleep_ms: 500,
543+
low_query_memory_bytes: 50,
544+
}),
545+
..Default::default()
546+
};
547+
548+
let mut backoff_state = SpillBackoffState::default();
549+
550+
assert_eq!(
551+
settings.check_spill_with_backoff(&mut backoff_state),
552+
SpillDecision::SpillNow
553+
);
554+
}
555+
556+
#[test]
557+
fn backoff_resets_after_pressure_is_relieved() {
558+
let query_mem = create_mem_stat(40);
559+
let pressured_global_mem = create_static_mem_stat(100);
560+
let relaxed_global_mem = create_static_mem_stat(80);
561+
562+
let pressured_settings = MemorySettings {
563+
enable_global_level_spill: true,
564+
global_memory_tracking: pressured_global_mem,
565+
max_memory_usage: 100,
566+
query_memory_tracking: Some(query_mem.clone()),
567+
spill_backoff: Some(SpillBackoffSettings {
568+
max_sleep_ms: 1000,
569+
low_query_memory_bytes: 50,
570+
}),
571+
..Default::default()
572+
};
573+
574+
let relaxed_settings = MemorySettings {
575+
global_memory_tracking: relaxed_global_mem,
576+
..pressured_settings.clone()
577+
};
578+
579+
let mut backoff_state = SpillBackoffState::default();
580+
581+
assert_eq!(
582+
pressured_settings.check_spill_with_backoff(&mut backoff_state),
583+
SpillDecision::Sleep(200)
584+
);
585+
assert_eq!(
586+
relaxed_settings.check_spill_with_backoff(&mut backoff_state),
587+
SpillDecision::NoSpill
588+
);
589+
assert_eq!(
590+
pressured_settings.check_spill_with_backoff(&mut backoff_state),
591+
SpillDecision::Sleep(200)
592+
);
593+
}
373594
}

src/query/pipeline/transforms/src/processors/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,7 @@ pub mod traits;
1717
mod transforms;
1818

1919
pub use memory_settings::MemorySettings;
20+
pub use memory_settings::SpillBackoffSettings;
21+
pub use memory_settings::SpillBackoffState;
22+
pub use memory_settings::SpillDecision;
2023
pub use transforms::*;

src/query/service/src/pipelines/memory_settings.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use databend_common_catalog::table_context::TableContext;
1616
use databend_common_exception::Result;
1717
use databend_common_pipeline_transforms::MemorySettings;
18+
use databend_common_pipeline_transforms::SpillBackoffSettings;
1819
use databend_common_settings::OutofMemoryBehavior;
1920

2021
use crate::sessions::QueryContext;
@@ -154,6 +155,17 @@ impl MemorySettingsExt for MemorySettings {
154155
);
155156
}
156157

158+
let max_sleep_ms = settings.get_spill_global_backoff_max_sleep_ms()?;
159+
let low_query_ratio = settings.get_spill_global_backoff_low_query_ratio()? as f64 / 100.0;
160+
let low_query_memory_bytes = (max_memory_usage as f64 * low_query_ratio) as u64;
161+
162+
if max_sleep_ms > 0 && max_memory_usage != 0 {
163+
builder = builder.with_spill_backoff(Some(SpillBackoffSettings {
164+
max_sleep_ms,
165+
low_query_memory_bytes,
166+
}));
167+
}
168+
157169
Ok(builder.build())
158170
}
159171
}
@@ -163,6 +175,7 @@ mod tests {
163175
use databend_common_catalog::table_context::TableContext;
164176
use databend_common_exception::Result;
165177
use databend_common_pipeline_transforms::MemorySettings;
178+
use databend_common_pipeline_transforms::SpillBackoffSettings;
166179

167180
use crate::pipelines::memory_settings::MemorySettingsExt;
168181
use crate::test_kits::TestFixture;
@@ -384,4 +397,40 @@ mod tests {
384397
assert_eq!(memory_settings.spill_unit_size, 3 * 1024 * 1024);
385398
Ok(())
386399
}
400+
401+
#[tokio::test(flavor = "multi_thread")]
402+
async fn test_aggregate_spill_backoff_enabled_from_settings() -> Result<()> {
403+
let fixture = TestFixture::setup().await?;
404+
let ctx = fixture.new_query_ctx().await?;
405+
let settings = ctx.get_settings();
406+
407+
settings.set_setting("max_memory_usage".into(), "1000".into())?;
408+
settings.set_setting("spill_global_backoff_max_sleep_ms".into(), "100".into())?;
409+
settings.set_setting("spill_global_backoff_low_query_ratio".into(), "25".into())?;
410+
411+
let memory_settings = MemorySettings::from_aggregate_settings(&ctx)?;
412+
413+
assert_eq!(
414+
memory_settings.spill_backoff,
415+
Some(SpillBackoffSettings {
416+
max_sleep_ms: 100,
417+
low_query_memory_bytes: 250,
418+
})
419+
);
420+
Ok(())
421+
}
422+
423+
#[tokio::test(flavor = "multi_thread")]
424+
async fn test_aggregate_spill_backoff_disabled_when_sleep_is_zero() -> Result<()> {
425+
let fixture = TestFixture::setup().await?;
426+
let ctx = fixture.new_query_ctx().await?;
427+
let settings = ctx.get_settings();
428+
429+
settings.set_setting("spill_global_backoff_max_sleep_ms".into(), "0".into())?;
430+
431+
let memory_settings = MemorySettings::from_aggregate_settings(&ctx)?;
432+
433+
assert_eq!(memory_settings.spill_backoff, None);
434+
Ok(())
435+
}
387436
}

0 commit comments

Comments
 (0)