Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/query/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ databend-storages-common-table-meta = { workspace = true }
dyn-clone = { workspace = true }
jsonb = { workspace = true }
log = { workspace = true }
opendal = { workspace = true }
parking_lot = { workspace = true }
parquet = { workspace = true }
rand = { workspace = true }
Expand Down
42 changes: 42 additions & 0 deletions src/query/catalog/src/runtime_filter_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::Any;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;

use databend_common_exception::Result;
use databend_common_expression::Column;
use databend_common_expression::Expr;
use databend_common_expression::types::Bitmap;
use opendal::Operator;
use tokio::sync::watch;
use tokio::sync::watch::Receiver;
use tokio::sync::watch::Sender;

use crate::plan::PartInfoPtr;
use crate::sbbf::Sbbf;

pub type RuntimeBloomFilter = Arc<Sbbf>;
Expand Down Expand Up @@ -175,3 +181,39 @@ impl Default for RuntimeFilterReady {
}
}
}

/// Runtime filter that prunes partitions using only partition metadata (e.g. min/max stats).
/// No IO required. Applied in PartitionStreamSource.
pub trait PartitionRuntimeFilter: Send + Sync {
/// Returns true if the partition should be pruned (skipped).
fn prune(&self, part: &PartInfoPtr) -> bool;
}

pub type PartitionRuntimeFilters = Vec<Arc<dyn PartitionRuntimeFilter>>;
pub type IndexRuntimeFilters = Vec<Arc<dyn IndexRuntimeFilter>>;
pub type RowRuntimeFilters = Vec<Arc<dyn RowRuntimeFilter>>;

/// Runtime filter that prunes partitions by loading index files (bloom index, spatial index).
/// Requires async IO. Applied in ReadDataTransform.
/// Split into load_index (IO) and prune (computation) for caller-controlled IO scheduling.
/// ReadSettings should be embedded at construction time.
#[async_trait::async_trait]
pub trait IndexRuntimeFilter: Send + Sync {
/// Load index data for the given partition.
async fn load_index(
&self,
part: &PartInfoPtr,
op: &Operator,
) -> Result<Option<Box<dyn Any + Send>>>;

/// Returns true if the partition should be pruned (skipped).
/// `index` is the data returned by `load_index`, None if no index available.
fn prune(&self, part: &PartInfoPtr, index: Option<&dyn Any>) -> Result<bool>;
}

/// Runtime filter applied per-row during block deserialization (e.g. Sbbf bloom filter).
/// Applied in NativeDeserializeDataTransform / ReadState.
pub trait RowRuntimeFilter: Send + Sync {
fn column_name(&self) -> &str;
fn apply(&self, column: Column) -> Result<Bitmap>;
}
15 changes: 15 additions & 0 deletions src/query/catalog/src/table_context/runtime_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ use std::sync::Arc;
use databend_common_exception::Result;
use databend_common_expression::Expr;

use crate::runtime_filter_info::IndexRuntimeFilters;
use crate::runtime_filter_info::PartitionRuntimeFilters;
use crate::runtime_filter_info::RowRuntimeFilters;
use crate::runtime_filter_info::RuntimeBloomFilter;
use crate::runtime_filter_info::RuntimeFilterEntry;
use crate::runtime_filter_info::RuntimeFilterInfo;
Expand Down Expand Up @@ -50,4 +53,16 @@ pub trait TableContextRuntimeFilter: Send + Sync {
fn runtime_filter_reports(&self) -> HashMap<usize, Vec<RuntimeFilterReport>>;

fn has_bloom_runtime_filters(&self, id: usize) -> bool;

fn add_partition_runtime_filters(&self, _: usize, _: PartitionRuntimeFilters);

fn add_index_runtime_filters(&self, _: usize, _: IndexRuntimeFilters);

fn add_row_runtime_filters(&self, _: usize, _: RowRuntimeFilters);

fn get_partition_runtime_filters(&self, _: usize) -> PartitionRuntimeFilters;

fn get_row_runtime_filters(&self, _: usize) -> RowRuntimeFilters;

fn get_index_runtime_filters(&self, _: usize) -> IndexRuntimeFilters;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

use std::time::Instant;

use databend_common_catalog::runtime_filter_info::RowRuntimeFilters;
use databend_common_exception::Result;
use databend_common_storages_fuse::pruning::BloomRowFilter;

use super::convert::build_runtime_filter_infos;
use super::global::get_global_runtime_filter_packet;
Expand Down Expand Up @@ -72,6 +74,25 @@ pub async fn build_and_push_down_runtime_filter(
runtime_filter_infos
);

// Extract BloomRowFilter trait objects from the entries
for (scan_id, info) in &runtime_filter_infos {
let row_filters: RowRuntimeFilters = info
.filters
.iter()
.filter_map(|entry| {
let bloom = entry.bloom.as_ref()?;
Some(BloomRowFilter::create(
bloom.column_name.clone(),
bloom.filter.clone(),
))
})
.collect();

if !row_filters.is_empty() {
join.ctx.add_row_runtime_filters(*scan_id, row_filters);
}
}

join.ctx.set_runtime_filter(runtime_filter_infos);
join.set_bloom_filter_ready()?;
Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@

use std::sync::Arc;

use databend_common_catalog::runtime_filter_info::RowRuntimeFilters;
use databend_common_catalog::runtime_filter_info::RuntimeFilterReady;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::FunctionContext;
use databend_common_storages_fuse::pruning::BloomRowFilter;

use crate::physical_plans::HashJoin;
use crate::pipelines::processors::transforms::JoinRuntimeFilterPacket;
Expand Down Expand Up @@ -108,7 +110,25 @@ impl RuntimeFiltersDesc {
)
.await?;

self.ctx.set_runtime_filter(runtime_filter_infos);
self.ctx.set_runtime_filter(runtime_filter_infos.clone());

// Extract BloomRowFilter trait objects for the new trait-based API
for (scan_id, info) in &runtime_filter_infos {
let row_filters: RowRuntimeFilters = info
.filters
.iter()
.filter_map(|entry| {
let bloom = entry.bloom.as_ref()?;
Some(BloomRowFilter::create(
bloom.column_name.clone(),
bloom.filter.clone(),
))
})
.collect();
if !row_filters.is_empty() {
self.ctx.add_row_runtime_filters(*scan_id, row_filters);
}
}

for runtime_filter_ready in self.runtime_filters_ready.iter() {
runtime_filter_ready
Expand Down
3 changes: 3 additions & 0 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ use databend_common_catalog::plan::PartStatistics;
use databend_common_catalog::plan::Partitions;
use databend_common_catalog::plan::StageTableInfo;
use databend_common_catalog::query_kind::QueryKind;
use databend_common_catalog::runtime_filter_info::IndexRuntimeFilters;
use databend_common_catalog::runtime_filter_info::PartitionRuntimeFilters;
use databend_common_catalog::runtime_filter_info::RowRuntimeFilters;
use databend_common_catalog::runtime_filter_info::RuntimeBloomFilter;
use databend_common_catalog::runtime_filter_info::RuntimeFilterEntry;
use databend_common_catalog::runtime_filter_info::RuntimeFilterInfo;
Expand Down
36 changes: 36 additions & 0 deletions src/query/service/src/sessions/query_ctx/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,42 @@ impl TableContextRuntimeFilter for QueryContext {
.runtime_filter_state
.has_bloom_runtime_filters(id)
}

fn add_partition_runtime_filters(&self, scan_id: usize, filters: PartitionRuntimeFilters) {
self.shared
.runtime_filter_state
.add_partition_runtime_filters(scan_id, filters);
}

fn add_index_runtime_filters(&self, scan_id: usize, filters: IndexRuntimeFilters) {
self.shared
.runtime_filter_state
.add_index_runtime_filters(scan_id, filters);
}

fn add_row_runtime_filters(&self, scan_id: usize, filters: RowRuntimeFilters) {
self.shared
.runtime_filter_state
.add_row_runtime_filters(scan_id, filters);
}

fn get_partition_runtime_filters(&self, scan_id: usize) -> PartitionRuntimeFilters {
self.shared
.runtime_filter_state
.get_partition_runtime_filters(scan_id)
}

fn get_index_runtime_filters(&self, scan_id: usize) -> IndexRuntimeFilters {
self.shared
.runtime_filter_state
.get_index_runtime_filters(scan_id)
}

fn get_row_runtime_filters(&self, scan_id: usize) -> RowRuntimeFilters {
self.shared
.runtime_filter_state
.get_row_runtime_filters(scan_id)
}
}

impl TableContextResultCache for QueryContext {
Expand Down
48 changes: 48 additions & 0 deletions src/query/service/src/sessions/runtime_filter_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;

use databend_common_catalog::runtime_filter_info::IndexRuntimeFilters;
use databend_common_catalog::runtime_filter_info::PartitionRuntimeFilters;
use databend_common_catalog::runtime_filter_info::RowRuntimeFilters;
use databend_common_catalog::runtime_filter_info::RuntimeBloomFilter;
use databend_common_catalog::runtime_filter_info::RuntimeFilterEntry;
use databend_common_catalog::runtime_filter_info::RuntimeFilterInfo;
Expand All @@ -33,6 +36,9 @@ pub struct RuntimeFilterState {
runtime_filters: RwLock<HashMap<usize, RuntimeFilterInfo>>,
runtime_filter_ready: RwLock<HashMap<usize, Vec<Arc<RuntimeFilterReady>>>>,
runtime_filter_logged: AtomicBool,
partition_runtime_filters: RwLock<HashMap<usize, PartitionRuntimeFilters>>,
index_runtime_filters: RwLock<HashMap<usize, IndexRuntimeFilters>>,
row_runtime_filters: RwLock<HashMap<usize, RowRuntimeFilters>>,
}

impl RuntimeFilterState {
Expand All @@ -44,6 +50,9 @@ impl RuntimeFilterState {
self.runtime_filters.write().clear();
self.runtime_filter_ready.write().clear();
self.runtime_filter_logged.store(false, Ordering::SeqCst);
self.partition_runtime_filters.write().clear();
self.index_runtime_filters.write().clear();
self.row_runtime_filters.write().clear();
}

pub fn assert_empty(&self, query_id: &str) -> Result<()> {
Expand Down Expand Up @@ -157,4 +166,43 @@ impl RuntimeFilterState {
})
.unwrap_or(false)
}

pub fn add_partition_runtime_filters(&self, scan_id: usize, filters: PartitionRuntimeFilters) {
let mut map = self.partition_runtime_filters.write();
map.insert(scan_id, filters);
}

pub fn add_index_runtime_filters(&self, scan_id: usize, filters: IndexRuntimeFilters) {
let mut map = self.index_runtime_filters.write();
map.insert(scan_id, filters);
}

pub fn add_row_runtime_filters(&self, scan_id: usize, filters: RowRuntimeFilters) {
let mut map = self.row_runtime_filters.write();
map.insert(scan_id, filters);
}

pub fn get_partition_runtime_filters(&self, scan_id: usize) -> PartitionRuntimeFilters {
self.partition_runtime_filters
.read()
.get(&scan_id)
.cloned()
.unwrap_or_default()
}

pub fn get_index_runtime_filters(&self, scan_id: usize) -> IndexRuntimeFilters {
self.index_runtime_filters
.read()
.get(&scan_id)
.cloned()
.unwrap_or_default()
}

pub fn get_row_runtime_filters(&self, scan_id: usize) -> RowRuntimeFilters {
self.row_runtime_filters
.read()
.get(&scan_id)
.cloned()
.unwrap_or_default()
}
}
22 changes: 22 additions & 0 deletions src/query/service/tests/it/sql/exec/get_table_bind_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ use databend_common_catalog::plan::DataSourcePlan;
use databend_common_catalog::plan::PartInfoPtr;
use databend_common_catalog::plan::Partitions;
use databend_common_catalog::query_kind::QueryKind;
use databend_common_catalog::runtime_filter_info::IndexRuntimeFilters;
use databend_common_catalog::runtime_filter_info::PartitionRuntimeFilters;
use databend_common_catalog::runtime_filter_info::RowRuntimeFilters;
use databend_common_catalog::runtime_filter_info::RuntimeBloomFilter;
use databend_common_catalog::runtime_filter_info::RuntimeFilterEntry;
use databend_common_catalog::runtime_filter_info::RuntimeFilterReady;
Expand Down Expand Up @@ -1105,6 +1108,25 @@ impl TableContextRuntimeFilter for CtxDelegation {
fn has_bloom_runtime_filters(&self, _id: usize) -> bool {
todo!()
}

fn add_partition_runtime_filters(&self, _: usize, _: PartitionRuntimeFilters) {
todo!()
}
fn add_index_runtime_filters(&self, _: usize, _: IndexRuntimeFilters) {
todo!()
}
fn add_row_runtime_filters(&self, _: usize, _: RowRuntimeFilters) {
todo!()
}
fn get_partition_runtime_filters(&self, _: usize) -> PartitionRuntimeFilters {
todo!()
}
fn get_row_runtime_filters(&self, _: usize) -> RowRuntimeFilters {
todo!()
}
fn get_index_runtime_filters(&self, _: usize) -> IndexRuntimeFilters {
todo!()
}
}

impl TableContextResultCache for CtxDelegation {
Expand Down
22 changes: 22 additions & 0 deletions src/query/service/tests/it/storages/fuse/operations/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ use databend_common_catalog::plan::DataSourcePlan;
use databend_common_catalog::plan::PartInfoPtr;
use databend_common_catalog::plan::Partitions;
use databend_common_catalog::query_kind::QueryKind;
use databend_common_catalog::runtime_filter_info::IndexRuntimeFilters;
use databend_common_catalog::runtime_filter_info::PartitionRuntimeFilters;
use databend_common_catalog::runtime_filter_info::RowRuntimeFilters;
use databend_common_catalog::runtime_filter_info::RuntimeBloomFilter;
use databend_common_catalog::runtime_filter_info::RuntimeFilterEntry;
use databend_common_catalog::runtime_filter_info::RuntimeFilterReady;
Expand Down Expand Up @@ -926,6 +929,25 @@ impl TableContextRuntimeFilter for CtxDelegation {
fn has_bloom_runtime_filters(&self, _id: usize) -> bool {
todo!()
}

fn add_partition_runtime_filters(&self, _: usize, _: PartitionRuntimeFilters) {
todo!()
}
fn add_index_runtime_filters(&self, _: usize, _: IndexRuntimeFilters) {
todo!()
}
fn add_row_runtime_filters(&self, _: usize, _: RowRuntimeFilters) {
todo!()
}
fn get_partition_runtime_filters(&self, _: usize) -> PartitionRuntimeFilters {
todo!()
}
fn get_row_runtime_filters(&self, _: usize) -> RowRuntimeFilters {
todo!()
}
fn get_index_runtime_filters(&self, _: usize) -> IndexRuntimeFilters {
todo!()
}
}

impl TableContextResultCache for CtxDelegation {
Expand Down
Loading
Loading