Skip to content

Commit 703023b

Browse files
authored
fix(query): guard metadata consistency for flashback, time travel, and DDL column operations (#19653)
* fix fix fix * fix review comments
1 parent da54159 commit 703023b

File tree

35 files changed

+1101
-46
lines changed

35 files changed

+1101
-46
lines changed

src/query/service/src/interpreters/interpreter_table_drop_column.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ use databend_common_expression::DataSchema;
2121
use databend_common_meta_app::schema::DatabaseType;
2222
use databend_common_sql::ApproxDistinctColumns;
2323
use databend_common_sql::BloomIndexColumns;
24+
use databend_common_sql::binder::validate_constraints_by_schema;
25+
use databend_common_sql::binder::validate_table_indexes_not_referencing_columns;
2426
use databend_common_sql::plans::DropTableColumnPlan;
2527
use databend_common_storages_basic::view_table::VIEW_ENGINE;
2628
use databend_common_storages_stream::stream_table::STREAM_ENGINE;
@@ -159,6 +161,17 @@ impl Interpreter for DropTableColumnInterpreter {
159161
}
160162
let new_schema = new_table_meta.schema.as_ref().clone();
161163

164+
let dropped_column_ids = field.column_ids().into_iter().collect();
165+
validate_table_indexes_not_referencing_columns(
166+
self.ctx.clone(),
167+
catalog.as_ref(),
168+
&self.ctx.get_tenant(),
169+
table.get_id(),
170+
&dropped_column_ids,
171+
)
172+
.await?;
173+
validate_constraints_by_schema(self.ctx.clone(), &new_table_meta.constraints, &new_schema)?;
174+
162175
commit_table_meta(
163176
&self.ctx,
164177
table.as_ref(),

src/query/service/src/interpreters/interpreter_table_modify_column.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ use databend_common_sql::BloomIndexColumns;
4444
use databend_common_sql::DefaultExprBinder;
4545
use databend_common_sql::Planner;
4646
use databend_common_sql::analyze_cluster_keys;
47+
use databend_common_sql::binder::validate_constraints_by_schema;
48+
use databend_common_sql::binder::validate_table_indexes_not_referencing_columns;
4749
use databend_common_sql::plans::ModifyColumnAction;
4850
use databend_common_sql::plans::ModifyTableColumnPlan;
4951
use databend_common_sql::plans::Plan;
@@ -200,6 +202,7 @@ impl ModifyTableColumnInterpreter {
200202
let table_info = table.get_table_info();
201203
let mut new_schema = schema.as_ref().clone();
202204
let mut modified_cols = HashSet::with_capacity(field_and_comments.len());
205+
let mut modified_column_ids = HashSet::new();
203206
// first check default expr before lock table
204207
for (field, _comment) in field_and_comments {
205208
if let Some((i, old_field)) = schema.column_with_name(&field.name) {
@@ -224,6 +227,10 @@ impl ModifyTableColumnInterpreter {
224227

225228
if old_field.data_type != field.data_type {
226229
modified_cols.insert(field.name.clone());
230+
// Aggregating indexes still bind to the existing table column ids, so
231+
// MODIFY COLUMN must check the ids from the current schema instead of the
232+
// freshly analyzed field definition.
233+
modified_column_ids.extend(old_field.column_ids());
227234
// Check if this column is referenced by computed columns.
228235
let data_schema = DataSchema::from(&new_schema);
229236
check_referenced_computed_columns(
@@ -265,6 +272,20 @@ impl ModifyTableColumnInterpreter {
265272
let catalog_name = table_info.catalog();
266273
let catalog = self.ctx.get_catalog(catalog_name).await?;
267274

275+
validate_table_indexes_not_referencing_columns(
276+
self.ctx.clone(),
277+
catalog.as_ref(),
278+
&self.ctx.get_tenant(),
279+
table.get_id(),
280+
&modified_column_ids,
281+
)
282+
.await?;
283+
validate_constraints_by_schema(
284+
self.ctx.clone(),
285+
&table_info.meta.constraints,
286+
new_schema.as_ref(),
287+
)?;
288+
268289
let base_snapshot = fuse_table.read_table_snapshot().await?;
269290
let prev_snapshot_id = base_snapshot.snapshot_id().map(|(id, _)| id);
270291
let table_meta_timestamps = self

src/query/service/src/interpreters/interpreter_table_set_options.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,7 @@ async fn set_segment_format(
278278
table_snapshot.summary.clone(),
279279
new_segment_locations,
280280
fuse_table.cluster_key_meta(),
281+
fuse_table.cluster_type(),
281282
table_snapshot.table_statistics_location(),
282283
table_meta_timestamps,
283284
)?;

src/query/service/src/test_kits/fuse.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,7 @@ pub async fn generate_snapshots(fixture: &TestFixture) -> Result<()> {
276276
locations,
277277
None,
278278
None,
279+
None,
279280
TestFixture::default_table_meta_timestamps(),
280281
)?;
281282
snapshot_1.timestamp = Some(now - Duration::hours(12));
@@ -460,6 +461,7 @@ pub async fn generate_snapshot_v4(
460461
segments.iter().map(|s| s.0.clone()).collect(),
461462
None,
462463
None,
464+
None,
463465
TestFixture::default_table_meta_timestamps(),
464466
)?;
465467
let new_snapshot_location =

src/query/service/tests/it/storages/fuse/conflict.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ fn test_unresolvable_delete_conflict() {
6363
let result = generator.do_generate_new_snapshot(
6464
&TableInfo::default(),
6565
None,
66+
None,
6667
&Some(Arc::new(latest_snapshot)),
6768
TestFixture::default_table_meta_timestamps(),
6869
Default::default(),
@@ -193,6 +194,7 @@ fn test_resolvable_delete_conflict() {
193194
let result = generator.do_generate_new_snapshot(
194195
&TableInfo::default(),
195196
None,
197+
None,
196198
&Some(Arc::new(latest_snapshot)),
197199
TestFixture::default_table_meta_timestamps(),
198200
Default::default(),
@@ -349,6 +351,7 @@ fn test_resolvable_replace_conflict() {
349351
let result = generator.do_generate_new_snapshot(
350352
&TableInfo::default(),
351353
None,
354+
None,
352355
&Some(Arc::new(latest_snapshot)),
353356
TestFixture::default_table_meta_timestamps(),
354357
Default::default(),

src/query/service/tests/it/storages/fuse/meta/snapshot.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ fn default_snapshot() -> TableSnapshot {
3636
vec![],
3737
None,
3838
None,
39+
None,
3940
TestFixture::default_table_meta_timestamps(),
4041
)
4142
.unwrap()
@@ -59,6 +60,7 @@ fn snapshot_timestamp_monotonic_increase() {
5960
vec![],
6061
None,
6162
None,
63+
None,
6264
TestFixture::default_table_meta_timestamps(),
6365
)
6466
.unwrap();
@@ -84,6 +86,7 @@ fn snapshot_timestamp_time_skew_tolerance() {
8486
vec![],
8587
None,
8688
None,
89+
None,
8790
table_meta_timestamps,
8891
)
8992
.unwrap();

src/query/service/tests/it/storages/fuse/operations/commit.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,7 @@ async fn test_commit_to_meta_server() -> anyhow::Result<()> {
292292
new_segments,
293293
None,
294294
None,
295+
None,
295296
TestFixture::default_table_meta_timestamps(),
296297
)
297298
.unwrap();

src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@ async fn test_safety() -> anyhow::Result<()> {
232232
locations.clone(),
233233
None,
234234
None,
235+
None,
235236
TestFixture::default_table_meta_timestamps(),
236237
)?;
237238

src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,7 @@ async fn test_safety_for_recluster() -> anyhow::Result<()> {
254254
locations.clone(),
255255
None,
256256
None,
257+
None,
257258
TestFixture::default_table_meta_timestamps(),
258259
)?);
259260

src/query/service/tests/it/storages/fuse/utils.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ pub fn new_empty_snapshot(schema: TableSchema) -> TableSnapshot {
3434
vec![],
3535
None,
3636
None,
37+
None,
3738
TableMetaTimestamps::new(None, Duration::hours(1)),
3839
)
3940
.unwrap()

0 commit comments

Comments
 (0)