Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,20 @@
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.RowType;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.runtime.serializer.TableIdSerializer;
import org.apache.flink.cdc.runtime.serializer.schema.SchemaSerializer;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;

import javax.annotation.Nullable;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -36,6 +46,8 @@
*/
public class PostTransformChangeInfo {

public static final Serializer SERIALIZER = new Serializer();

private final TableId tableId;

private final Schema preTransformedSchema;
Expand Down Expand Up @@ -140,4 +152,57 @@ public RecordData.FieldGetter[] getPreTransformedFieldGetters() {
public BinaryRecordDataGenerator getPostTransformedRecordDataGenerator() {
return postTransformedRecordDataGenerator;
}

/** Serializer for {@link PostTransformChangeInfo} with backward-compatible deserialization. */
public static class Serializer implements SimpleVersionedSerializer<PostTransformChangeInfo> {

private static final int CURRENT_VERSION = 2;
private static final TableId MAGIC_TABLE_ID = TableId.tableId("__magic_post_transform__");

@Override
public int getVersion() {
return CURRENT_VERSION;
}

@Override
public byte[] serialize(PostTransformChangeInfo changeInfo) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputViewStreamWrapper out =
new DataOutputViewStreamWrapper(new DataOutputStream(baos));
// Write magic marker for format detection
TableIdSerializer.INSTANCE.serialize(MAGIC_TABLE_ID, out);
out.writeInt(CURRENT_VERSION);
// Write actual data
TableIdSerializer.INSTANCE.serialize(changeInfo.getTableId(), out);
SchemaSerializer.INSTANCE.serialize(changeInfo.getPreTransformedSchema(), out);
SchemaSerializer.INSTANCE.serialize(changeInfo.getPostTransformedSchema(), out);
return baos.toByteArray();
}

@Override
public PostTransformChangeInfo deserialize(int version, byte[] serialized)
throws IOException {
DataInputViewStreamWrapper in =
new DataInputViewStreamWrapper(
new DataInputStream(new ByteArrayInputStream(serialized)));
// Read first TableId to detect format
TableId firstTableId = TableIdSerializer.INSTANCE.deserialize(in);
TableId tableId;
Schema preSchema;
Schema postSchema;
if (MAGIC_TABLE_ID.equals(firstTableId)) {
// New format: magic marker present
int dataVersion = in.readInt();
tableId = TableIdSerializer.INSTANCE.deserialize(in);
preSchema = SchemaSerializer.INSTANCE.deserialize(in);
postSchema = SchemaSerializer.INSTANCE.deserialize(in);
} else {
// Old format: first TableId is the actual tableId
tableId = firstTableId;
preSchema = SchemaSerializer.INSTANCE.deserialize(in);
postSchema = SchemaSerializer.INSTANCE.deserialize(in);
}
return PostTransformChangeInfo.of(tableId, preSchema, postSchema);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,24 @@

package org.apache.flink.cdc.runtime.operators.transform;

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.converter.JavaObjectConverter;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.ChangeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.udf.UserDefinedFunctionContext;
Expand All @@ -39,21 +45,29 @@
import org.apache.flink.cdc.runtime.parser.TransformParser;
import org.apache.flink.cdc.runtime.typeutils.BinaryInternalObjectConverter;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import org.apache.flink.shaded.guava31.com.google.common.collect.HashBasedTable;
import org.apache.flink.shaded.guava31.com.google.common.collect.Table;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -68,6 +82,7 @@ public class PostTransformOperator extends AbstractStreamOperator<Event>
implements OneInputStreamOperator<Event, Event>, Serializable {

private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(PostTransformOperator.class);

private final String timezone;
private final List<TransformRule> transformRules;
Expand All @@ -88,6 +103,9 @@ public class PostTransformOperator extends AbstractStreamOperator<Event>
projectionProcessors;
private transient Table<TableId, PostTransformer, TransformFilterProcessor> filterProcessors;

private transient ListState<byte[]> state;
private transient Map<TableId, List<SchemaChangeEvent>> pendingSchemaChanges;

public static PostTransformOperatorBuilder newBuilder() {
return new PostTransformOperatorBuilder();
}
Expand All @@ -104,6 +122,106 @@ public static PostTransformOperatorBuilder newBuilder() {
this.udfFunctions = udfFunctions;
}

@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
pendingSchemaChanges = new HashMap<>();

// Initialize transformers early — needed for schema recalculation
initializeUdf();
this.transformers = createTransformers();

ListStateDescriptor<byte[]> descriptor =
new ListStateDescriptor<>(
"postTransformSchemaState", BytePrimitiveArraySerializer.INSTANCE);
state = context.getOperatorStateStore().getUnionListState(descriptor);

if (context.isRestored()) {
for (byte[] serialized : state.get()) {
try {
PostTransformChangeInfo restored =
PostTransformChangeInfo.SERIALIZER.deserialize(
PostTransformChangeInfo.SERIALIZER.getVersion(), serialized);
TableId tableId = restored.getTableId();
Schema preSchema = restored.getPreTransformedSchema();
Schema oldPostSchema = restored.getPostTransformedSchema();

// Recalculate post-transform schema using current projection rules
List<PostTransformer> effectiveTransformers = getEffectiveTransformers(tableId);
Schema recalculatedPostSchema = preSchema;
boolean hasAsterisk = true;
List<String> projectedColumns = new ArrayList<>();

if (!effectiveTransformers.isEmpty()) {
List<Schema> schemas =
effectiveTransformers.stream()
.map(trans -> transformSchema(preSchema, trans))
.collect(Collectors.toList());
recalculatedPostSchema =
SchemaUtils.ensurePkNonNull(
SchemaMergingUtils.strictlyMergeSchemas(schemas));
hasAsterisk =
effectiveTransformers.stream()
.map(PostTransformer::getProjection)
.flatMap(this::optionalToStream)
.map(TransformProjection::getProjection)
.anyMatch(TransformParser::hasAsterisk);
projectedColumns =
preSchema.getColumnNames().stream()
.filter(recalculatedPostSchema.getColumnNames()::contains)
.collect(Collectors.toList());
}

// Detect schema changes
if (!oldPostSchema.equals(recalculatedPostSchema)) {
List<SchemaChangeEvent> events =
generateSchemaChangeEvents(
tableId, oldPostSchema, recalculatedPostSchema);
if (!events.isEmpty()) {
pendingSchemaChanges.put(tableId, events);
LOG.info(
"Detected projection change for table {}: "
+ "{} schema change events pending",
tableId,
events.size());
}
}

// Update operator state with recalculated schema
postTransformInfoMap.put(
tableId,
PostTransformChangeInfo.of(tableId, preSchema, recalculatedPostSchema));
hasAsteriskMap.put(tableId, hasAsterisk);
projectedColumnsMap.put(tableId, projectedColumns);

} catch (Exception e) {
LOG.warn(
"Failed to deserialize PostTransformChangeInfo from state, "
+ "skipping entry (may be upgrading from version "
+ "without state)",
e);
}
}
}
}

@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
List<byte[]> serialized = new ArrayList<>();
for (Map.Entry<TableId, PostTransformChangeInfo> entry : postTransformInfoMap.entrySet()) {
try {
serialized.add(PostTransformChangeInfo.SERIALIZER.serialize(entry.getValue()));
} catch (IOException e) {
LOG.warn(
"Failed to serialize PostTransformChangeInfo for table {}, skipping",
entry.getKey(),
e);
}
}
state.update(serialized);
}

@Override
public void open() throws Exception {
super.open();
Expand All @@ -112,10 +230,18 @@ public void open() throws Exception {
this.projectionProcessors = HashBasedTable.create();
this.filterProcessors = HashBasedTable.create();

// Be sure to initialize UDF related fields before creating transformers
initializeUdf();

this.transformers = createTransformers();
if (pendingSchemaChanges == null) {
pendingSchemaChanges = new HashMap<>();
}
if (this.transformers == null) {
initializeUdf();
this.transformers = createTransformers();
}
if (!pendingSchemaChanges.isEmpty()) {
LOG.info(
"PostTransformOperator restored with {} tables having pending schema changes",
pendingSchemaChanges.size());
}
}

@Override
Expand Down Expand Up @@ -171,16 +297,24 @@ private void processElementInternal(StreamRecord<Event> element) {
}

if (event instanceof CreateTableEvent) {
pendingSchemaChanges.remove(tableId);
invalidateCache(tableId);
processCreateTableEvent((CreateTableEvent) event, transformers)
.map(StreamRecord::new)
.ifPresent(output::collect);
invalidateCache(tableId);
} else if (event instanceof SchemaChangeEvent) {
invalidateCache(tableId);
processSchemaChangeEvent((SchemaChangeEvent) event, transformers)
.map(StreamRecord::new)
.ifPresent(output::collect);
invalidateCache(tableId);
} else if (event instanceof DataChangeEvent) {
// Emit pending schema changes before first data event for this table
List<SchemaChangeEvent> pending = pendingSchemaChanges.remove(tableId);
if (pending != null) {
for (SchemaChangeEvent pendingEvent : pending) {
output.collect(new StreamRecord<>(pendingEvent));
}
}
processDataChangeEvent((DataChangeEvent) event, transformers)
.map(StreamRecord::new)
.ifPresent(output::collect);
Expand Down Expand Up @@ -552,4 +686,51 @@ private void destroyUdf() {
private <T> Stream<T> optionalToStream(Optional<T> optional) {
return optional.map(Stream::of).orElseGet(Stream::empty);
}

/**
* Generates {@link SchemaChangeEvent}s to evolve from {@code oldSchema} to {@code newSchema}.
*/
private List<SchemaChangeEvent> generateSchemaChangeEvents(
TableId tableId, Schema oldSchema, Schema newSchema) {
List<SchemaChangeEvent> events = new ArrayList<>();

// Detect added columns
Set<String> oldColumnNames = new HashSet<>(oldSchema.getColumnNames());
List<AddColumnEvent.ColumnWithPosition> addedColumns = new ArrayList<>();
List<Column> newColumns = newSchema.getColumns();
for (int i = 0; i < newColumns.size(); i++) {
Column col = newColumns.get(i);
if (!oldColumnNames.contains(col.getName())) {
if (i == 0) {
addedColumns.add(AddColumnEvent.first(col));
} else if (i == newColumns.size() - 1) {
addedColumns.add(AddColumnEvent.last(col));
} else {
String prevColName = newColumns.get(i - 1).getName();
if (oldColumnNames.contains(prevColName)) {
addedColumns.add(AddColumnEvent.after(col, prevColName));
} else {
addedColumns.add(AddColumnEvent.last(col));
}
}
}
}
if (!addedColumns.isEmpty()) {
events.add(new AddColumnEvent(tableId, addedColumns));
}

// Detect dropped columns
Set<String> newColumnNames = new HashSet<>(newSchema.getColumnNames());
List<String> droppedColumns = new ArrayList<>();
for (String oldColName : oldSchema.getColumnNames()) {
if (!newColumnNames.contains(oldColName)) {
droppedColumns.add(oldColName);
}
}
if (!droppedColumns.isEmpty()) {
events.add(new DropColumnEvent(tableId, droppedColumns));
}

return events;
}
}
Loading