Skip to content

Commit fddfa19

Browse files
committed
refactor: rename is_mutation to is_merge_into, improve comments and tests
- Rename RowFetch::is_mutation to is_merge_into for clarity - Remove unnecessary serde(default) annotations - Improve comments: explain why SELECT+LIMIT skips repartition, use "reducing" instead of "eliminating" for duplicate reads - Rewrite test with proper structure, comments, and CREATE OR REPLACE - Avoid unwrap in lazy_columns handling
1 parent 4cbc49b commit fddfa19

File tree

6 files changed

+103
-90
lines changed

6 files changed

+103
-90
lines changed

src/query/service/src/physical_plans/physical_limit.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ impl PhysicalPlanBuilder {
296296
cols_to_fetch,
297297
fetched_fields,
298298
need_wrap_nullable: false,
299-
is_mutation: false,
299+
is_merge_into: false,
300300
stat_info: Some(stat_info.clone()),
301301
});
302302
}

src/query/service/src/physical_plans/physical_mutation.rs

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -453,28 +453,23 @@ impl PhysicalPlanBuilder {
453453

454454
// If the mutation type is FullOperation, we use row_id column to split a block
455455
// into matched and not matched parts.
456-
let has_lazy_columns = self
456+
let lazy_columns = self
457457
.metadata
458458
.read()
459459
.get_table_lazy_columns(target_table_index)
460-
.is_some_and(|cols| !cols.is_empty());
460+
.filter(|cols| !cols.is_empty());
461461

462462
if matches!(strategy, MutationStrategy::MixedMatched) {
463463
plan = PhysicalPlan::new(MutationSplit {
464464
input: plan,
465465
split_index: row_id_offset,
466-
has_row_fetch: has_lazy_columns,
466+
has_row_fetch: lazy_columns.is_some(),
467467
meta: PhysicalPlanMeta::new("MutationSplit"),
468468
});
469469
}
470470

471471
// Construct row fetch plan for lazy columns.
472-
if has_lazy_columns {
473-
let lazy_columns = self
474-
.metadata
475-
.read()
476-
.get_table_lazy_columns(target_table_index)
477-
.unwrap();
472+
if let Some(lazy_columns) = lazy_columns {
478473
plan = build_mutation_row_fetch(
479474
plan,
480475
metadata.clone(),
@@ -816,7 +811,7 @@ fn build_mutation_row_fetch(
816811
cols_to_fetch,
817812
fetched_fields,
818813
need_wrap_nullable,
819-
is_mutation: true,
814+
is_merge_into: true,
820815
stat_info: None,
821816
meta: PhysicalPlanMeta::new("RowFetch"),
822817
})

src/query/service/src/physical_plans/physical_mutation_into_split.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ pub struct MutationSplit {
3333
pub meta: PhysicalPlanMeta,
3434
pub input: PhysicalPlan,
3535
pub split_index: IndexType,
36-
/// Whether RowFetch follows this MutationSplit (lazy columns exist).
37-
/// Block_id repartition is only beneficial when RowFetch is present.
36+
/// When true, a block_id repartition is inserted before the split to reduce
37+
/// duplicate block reads in the downstream RowFetch stage.
3838
pub has_row_fetch: bool,
3939
}
4040

@@ -79,9 +79,8 @@ impl IPhysicalPlan for MutationSplit {
7979

8080
let max_threads = builder.settings.get_max_threads()? as usize;
8181

82-
// Add block_id repartition before split so each downstream RowFetch
83-
// processor sees rows from a disjoint set of blocks, eliminating
84-
// duplicate block reads. Only useful when RowFetch follows.
82+
// Repartition by block_id so each downstream RowFetch processor handles
83+
// a disjoint set of blocks, reducing duplicate block reads.
8584
if self.has_row_fetch
8685
&& max_threads > 1
8786
&& builder

src/query/service/src/physical_plans/physical_row_fetch.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,8 @@ pub struct RowFetch {
5151
pub row_id_col_offset: usize,
5252
pub fetched_fields: Vec<DataField>,
5353
pub need_wrap_nullable: bool,
54-
/// True when this RowFetch is part of a MERGE INTO pipeline (not SELECT+LIMIT).
55-
#[serde(default)]
56-
pub is_mutation: bool,
54+
/// True when this RowFetch is part of a MERGE INTO pipeline.
55+
pub is_merge_into: bool,
5756

5857
/// Only used for explain
5958
pub stat_info: Option<PlanStatsInfo>,
@@ -113,7 +112,7 @@ impl IPhysicalPlan for RowFetch {
113112
row_id_col_offset: self.row_id_col_offset,
114113
fetched_fields: self.fetched_fields.clone(),
115114
need_wrap_nullable: self.need_wrap_nullable,
116-
is_mutation: self.is_mutation,
115+
is_merge_into: self.is_merge_into,
117116
stat_info: self.stat_info.clone(),
118117
})
119118
}
@@ -132,10 +131,10 @@ impl IPhysicalPlan for RowFetch {
132131
if !MutationSplit::check_physical_plan(&self.input) {
133132
// For MatchedOnly MERGE INTO, add block_id repartition before RowFetch
134133
// to reduce duplicate block reads.
135-
// Not applicable to SELECT+LIMIT: the exchange would destroy the sort
136-
// order produced by Sort+Limit (MergePartitionProcessor uses Random
137-
// strategy with non-deterministic output order).
138-
if self.is_mutation {
134+
// Not applicable to SELECT+LIMIT: pipeline.exchange() merges partitions
135+
// with non-deterministic output order, which would destroy the sort
136+
// order produced by Sort+Limit.
137+
if self.is_merge_into {
139138
let max_threads = builder.settings.get_max_threads()? as usize;
140139
if max_threads > 1
141140
&& builder

src/query/storages/fuse/src/operations/merge_into/processors/block_id_partition_exchange.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,12 @@ use databend_common_pipeline::basic::Exchange;
2525
/// Partitions data blocks by block_id extracted from the `_row_id` column.
2626
///
2727
/// This ensures that rows belonging to the same physical block are routed
28-
/// to the same downstream processor, eliminating duplicate block reads
28+
/// to the same downstream processor, reducing duplicate block reads
2929
/// in the RowFetch stage of MERGE INTO.
3030
pub struct BlockIdPartitionExchange {
3131
row_id_col_offset: usize,
32-
/// Round-robin counter for NULL row_ids (unmatched rows in MixedMatched).
32+
/// Incrementing counter used by `partition()` to spread NULL row_ids
33+
/// (unmatched rows in MixedMatched) evenly across partitions.
3334
null_counter: AtomicU64,
3435
}
3536

0 commit comments

Comments
 (0)