Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,9 @@ public static boolean isSchemaChangeEventRedundant(
return true;
},
createTableEvent -> {
// It has been applied if such table already exists
return latestSchema.isPresent();
// Redundant only if the table exists AND the schema is identical
return latestSchema.isPresent()
&& latestSchema.get().equals(createTableEvent.getSchema());
},
dropColumnEvent -> {
// It has not been applied if schema does not even exist
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,74 @@ TABLE_ID, of("id", BIGINT), of("name", VARCHAR(17), "id", BIGINT)))
ImmutableMap.of("foo", INT, "baz", FLOAT)));
}

@Test
void testGetSchemaDifferenceForProjectionChanges() {
// Simulate projection change: new schema has an added column
Assertions.assertThat(
getSchemaDifference(
TABLE_ID,
of("id", BIGINT, "name", STRING),
of("id", BIGINT, "name", STRING, "age", INT)))
.as("projection change adding a column should produce AddColumnEvent")
.containsExactly(
new AddColumnEvent(
TABLE_ID,
Collections.singletonList(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("age", INT),
AddColumnEvent.ColumnPosition.AFTER,
"name"))));

// Simulate projection change: new schema has a removed column
Assertions.assertThat(
getSchemaDifference(
TABLE_ID,
of("id", BIGINT, "name", STRING, "age", INT),
of("id", BIGINT, "name", STRING)))
.as("projection change removing a column should produce DropColumnEvent")
.containsExactly(new DropColumnEvent(TABLE_ID, Collections.singletonList("age")));

// Simulate projection change: column type changed (e.g. STRING -> VARCHAR(255))
Assertions.assertThat(
getSchemaDifference(
TABLE_ID,
of("id", BIGINT, "name", STRING),
of("id", BIGINT, "name", DataTypes.VARCHAR(255))))
.as(
"projection change with different column type should produce AlterColumnTypeEvent")
.containsExactly(
new AlterColumnTypeEvent(
TABLE_ID,
Collections.singletonMap("name", DataTypes.VARCHAR(255)),
Collections.singletonMap("name", STRING)));

// Simulate projection change: both added and removed columns (column swap)
Assertions.assertThat(
getSchemaDifference(
TABLE_ID,
of("id", BIGINT, "name", STRING),
of("id", BIGINT, "email", STRING)))
.as("projection change swapping columns should produce Add + Drop events")
.containsExactly(
new AddColumnEvent(
TABLE_ID,
Collections.singletonList(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("email", STRING),
AddColumnEvent.ColumnPosition.AFTER,
"id"))),
new DropColumnEvent(TABLE_ID, Collections.singletonList("name")));

// Simulate projection change: identical schemas should produce no diff
Assertions.assertThat(
getSchemaDifference(
TABLE_ID,
of("id", BIGINT, "name", STRING),
of("id", BIGINT, "name", STRING)))
.as("identical schemas should produce no schema change events")
.isEmpty();
}

@Test
void testMergeAndDiff() {
Assertions.assertThat(mergeAndDiff(null, of("id", BIGINT, "name", VARCHAR(17))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.TableId;
Expand Down Expand Up @@ -484,4 +485,66 @@ void testInferWiderSchema() {
.build()))
.isExactlyInstanceOf(IllegalStateException.class);
}

@Test
void testIsSchemaChangeEventRedundantForCreateTableEvent() {
TableId tableId = TableId.tableId("test_db", "test_table");

Schema schema =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.primaryKey("id")
.build();

// CreateTableEvent is NOT redundant when table doesn't exist (null schema)
CreateTableEvent createEvent = new CreateTableEvent(tableId, schema);
Assertions.assertThat(SchemaUtils.isSchemaChangeEventRedundant(null, createEvent))
.as("CreateTableEvent should not be redundant when table does not exist")
.isFalse();

// CreateTableEvent IS redundant when table exists with the same schema
Assertions.assertThat(SchemaUtils.isSchemaChangeEventRedundant(schema, createEvent))
.as("CreateTableEvent should be redundant when table exists with same schema")
.isTrue();

// CreateTableEvent is NOT redundant when table exists with different schema (extra column)
Schema schemaWithExtraColumn =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.physicalColumn("age", DataTypes.INT())
.primaryKey("id")
.build();
CreateTableEvent createEventWithExtraColumn =
new CreateTableEvent(tableId, schemaWithExtraColumn);
Assertions.assertThat(
SchemaUtils.isSchemaChangeEventRedundant(
schema, createEventWithExtraColumn))
.as("CreateTableEvent should not be redundant when new schema has extra columns")
.isFalse();

// CreateTableEvent is NOT redundant when column types differ
Schema schemaWithDifferentType =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.VARCHAR(255))
.primaryKey("id")
.build();
CreateTableEvent createEventWithDifferentType =
new CreateTableEvent(tableId, schemaWithDifferentType);
Assertions.assertThat(
SchemaUtils.isSchemaChangeEventRedundant(
schema, createEventWithDifferentType))
.as("CreateTableEvent should not be redundant when column types differ")
.isFalse();

// CreateTableEvent is NOT redundant when existing schema has more columns than new schema
Assertions.assertThat(
SchemaUtils.isSchemaChangeEventRedundant(
schemaWithExtraColumn, createEvent))
.as(
"CreateTableEvent should not be redundant when existing schema has more columns")
.isFalse();
}
}
Loading