@@ -351,6 +351,152 @@ void testRestartingJobFromSavepointInSnapshotMode(TarballFetcher.CdcVersion migr
351351 LOG .info ("Snapshot stage finished successfully." );
352352 }
353353
354+ @ Test
355+ void testProjectionChangeAfterSavepointRestart () throws Exception {
356+ runInContainerAsRoot (jobManager , "chmod" , "0777" , "-R" , "/tmp/cdc/" );
357+
358+ // Phase 1: Start with narrow projection (id, name only)
359+ String contentV1 = buildCustomersProjectionPipeline ("id, name" );
360+ JobID jobID = submitPipelineJob (contentV1 );
361+ Assertions .assertThat (jobID ).isNotNull ();
362+ LOG .info ("Submitted Job ID is {} " , jobID );
363+
364+ // Phase 2: Validate snapshot with narrow projection
365+ validateResult (
366+ dbNameFormatter ,
367+ "CreateTableEvent{tableId=%s.customers, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink'}, primaryKeys=id, options=()}" ,
368+ "DataChangeEvent{tableId=%s.customers, before=[], after=[101, user_1], op=INSERT, meta=()}" ,
369+ "DataChangeEvent{tableId=%s.customers, before=[], after=[102, user_2], op=INSERT, meta=()}" ,
370+ "DataChangeEvent{tableId=%s.customers, before=[], after=[103, user_3], op=INSERT, meta=()}" ,
371+ "DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4], op=INSERT, meta=()}" );
372+ LOG .info ("Snapshot stage finished successfully." );
373+
374+ // Phase 3: Validate incremental with narrow projection
375+ executeMySqlStatements (
376+ mysqlInventoryDatabase ,
377+ "INSERT INTO customers VALUES (105, 'user_5', 'Beijing', '123567891235');" );
378+ validateResult (
379+ dbNameFormatter ,
380+ "DataChangeEvent{tableId=%s.customers, before=[], after=[105, user_5], op=INSERT, meta=()}" );
381+ LOG .info ("Incremental stage 1 finished successfully." );
382+
383+ // Phase 4: Stop with savepoint
384+ String savepointPath = stopJobWithSavepoint (jobID );
385+ LOG .info ("Stopped Job {} and created a savepoint at {}." , jobID , savepointPath );
386+
387+ // Phase 5: Restart with expanded projection (id, name, address)
388+ String contentV2 = buildCustomersProjectionPipeline ("id, name, address" );
389+ JobID newJobID = submitPipelineJob (contentV2 , savepointPath , true );
390+ LOG .info ("Reincarnated Job {} has been submitted successfully." , newJobID );
391+
392+ // Phase 6: Validate schema evolution - AddColumnEvent for address
393+ validateResult (
394+ dbNameFormatter ,
395+ "AddColumnEvent{tableId=%s.customers, addedColumns=[ColumnWithPosition{column=`address` VARCHAR(1024), position=LAST, existedColumnName=null}]}" );
396+ LOG .info ("Schema evolution (AddColumn) validated successfully." );
397+
398+ // Phase 7: Validate new data includes address
399+ executeMySqlStatements (
400+ mysqlInventoryDatabase ,
401+ "INSERT INTO customers VALUES (106, 'user_6', 'Shenzhen', '123567891236');" );
402+ validateResult (
403+ dbNameFormatter ,
404+ "DataChangeEvent{tableId=%s.customers, before=[], after=[106, user_6, Shenzhen], op=INSERT, meta=()}" );
405+ LOG .info ("Incremental stage 2 with expanded projection finished successfully." );
406+
407+ // Cleanup
408+ cancelJob (newJobID );
409+ }
410+
411+ @ Test
412+ void testProjectionShrinkAfterSavepointRestart () throws Exception {
413+ runInContainerAsRoot (jobManager , "chmod" , "0777" , "-R" , "/tmp/cdc/" );
414+
415+ // Phase 1: Start with wider projection (id, name, address)
416+ String contentV1 = buildCustomersProjectionPipeline ("id, name, address" );
417+ JobID jobID = submitPipelineJob (contentV1 );
418+ Assertions .assertThat (jobID ).isNotNull ();
419+ LOG .info ("Submitted Job ID is {} " , jobID );
420+
421+ // Phase 2: Validate snapshot with wider projection
422+ validateResult (
423+ dbNameFormatter ,
424+ "CreateTableEvent{tableId=%s.customers, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`address` VARCHAR(1024)}, primaryKeys=id, options=()}" ,
425+ "DataChangeEvent{tableId=%s.customers, before=[], after=[101, user_1, Shanghai], op=INSERT, meta=()}" ,
426+ "DataChangeEvent{tableId=%s.customers, before=[], after=[102, user_2, Shanghai], op=INSERT, meta=()}" ,
427+ "DataChangeEvent{tableId=%s.customers, before=[], after=[103, user_3, Shanghai], op=INSERT, meta=()}" ,
428+ "DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4, Shanghai], op=INSERT, meta=()}" );
429+ LOG .info ("Snapshot stage finished successfully." );
430+
431+ // Phase 3: Validate incremental with wider projection
432+ executeMySqlStatements (
433+ mysqlInventoryDatabase ,
434+ "INSERT INTO customers VALUES (105, 'user_5', 'Beijing', '123567891235');" );
435+ validateResult (
436+ dbNameFormatter ,
437+ "DataChangeEvent{tableId=%s.customers, before=[], after=[105, user_5, Beijing], op=INSERT, meta=()}" );
438+ LOG .info ("Incremental stage 1 finished successfully." );
439+
440+ // Phase 4: Stop with savepoint
441+ String savepointPath = stopJobWithSavepoint (jobID );
442+ LOG .info ("Stopped Job {} and created a savepoint at {}." , jobID , savepointPath );
443+
444+ // Phase 5: Restart with narrower projection (id, name only)
445+ String contentV2 = buildCustomersProjectionPipeline ("id, name" );
446+ JobID newJobID = submitPipelineJob (contentV2 , savepointPath , true );
447+ LOG .info ("Reincarnated Job {} has been submitted successfully." , newJobID );
448+
449+ // Phase 6: Validate schema evolution - DropColumnEvent for address
450+ validateResult (
451+ dbNameFormatter ,
452+ "DropColumnEvent{tableId=%s.customers, droppedColumnNames=[address]}" );
453+ LOG .info ("Schema evolution (DropColumn) validated successfully." );
454+
455+ // Phase 7: Validate new data has only id and name
456+ executeMySqlStatements (
457+ mysqlInventoryDatabase ,
458+ "INSERT INTO customers VALUES (106, 'user_6', 'Shenzhen', '123567891236');" );
459+ validateResult (
460+ dbNameFormatter ,
461+ "DataChangeEvent{tableId=%s.customers, before=[], after=[106, user_6], op=INSERT, meta=()}" );
462+ LOG .info ("Incremental stage 2 with narrowed projection finished successfully." );
463+
464+ // Cleanup
465+ cancelJob (newJobID );
466+ }
467+
468+ private String buildCustomersProjectionPipeline (String projection ) {
469+ return String .format (
470+ "source:\n "
471+ + " type: mysql\n "
472+ + " hostname: %s\n "
473+ + " port: %d\n "
474+ + " username: %s\n "
475+ + " password: %s\n "
476+ + " tables: %s.customers\n "
477+ + " server-id: 5400-5404\n "
478+ + " server-time-zone: UTC\n "
479+ + "\n "
480+ + "sink:\n "
481+ + " type: values\n "
482+ + "\n "
483+ + "transform:\n "
484+ + " - source-table: %s.customers\n "
485+ + " projection: %s\n "
486+ + "\n "
487+ + "pipeline:\n "
488+ + " parallelism: %d\n "
489+ + " schema.change.behavior: evolve\n " ,
490+ INTER_CONTAINER_MYSQL_ALIAS ,
491+ MySqlContainer .MYSQL_PORT ,
492+ MYSQL_TEST_USER ,
493+ MYSQL_TEST_PASSWORD ,
494+ mysqlInventoryDatabase .getDatabaseName (),
495+ mysqlInventoryDatabase .getDatabaseName (),
496+ projection ,
497+ parallelism );
498+ }
499+
354500 private void generateIncrementalEventsPhaseOne () {
355501 executeMySqlStatements (
356502 mysqlInventoryDatabase ,
0 commit comments