Skip to content

[Fix][Zeta] Guard state cleanup races after node failure#10687

Open
zhangshenghang wants to merge 14 commits intoapache:devfrom
zhangshenghang:fix/zeta-state-cleanup-convergence
Open

[Fix][Zeta] Guard state cleanup races after node failure#10687
zhangshenghang wants to merge 14 commits intoapache:devfrom
zhangshenghang:fix/zeta-state-cleanup-convergence

Conversation

@zhangshenghang
Copy link
Copy Markdown
Member

@zhangshenghang zhangshenghang commented Apr 1, 2026

Purpose of this pull request

This PR fixes an engine-side terminal-state convergence bug after worker node failure.

When a worker goes offline, the engine can start cleaning distributed state from the running job state maps before all asynchronous task/pipeline/job callbacks have finished. In the current code path, PhysicalVertex, SubPlan, and PhysicalPlan can observe missing state entries and throw NullPointerException, which interrupts terminal-state convergence and may leave the job hanging in an intermediate state.

This PR changes the cleanup strategy instead of relying on local fallback state:

  • keep terminal job/pipeline/task state in distributed maps for a short cleanup delay window
  • remove runningJobInfoIMap immediately so terminal jobs are not restored on master switch
  • delay physical removal of distributed state maps until late callbacks have time to drain
  • treat already-cleaned state as a no-op defensive path instead of rebuilding distributed state

It also adds:

  • targeted regression tests for terminal tombstone behavior
  • a delay-based cleanup regression test
  • an engine E2E scenario for the BATCH + no checkpoint + job.retry.times=0 no-restore path

Does this PR introduce any user-facing change?

No user-facing API/config change in normal operation. This improves failure handling so jobs are less likely to hang in an intermediate state after node failure.

How was this patch tested?

Verified locally:

  • ./mvnw -nsu -pl seatunnel-engine/seatunnel-engine-common spotless:check
  • ./mvnw -nsu -pl seatunnel-engine/seatunnel-engine-server spotless:check
  • ./mvnw -nsu -pl seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base spotless:check

Additional notes:

  • Added targeted regression tests: StateTransitionCleanupTest, JobStateCleanupDelayTest
  • Added engine E2E coverage: ClusterFailureNoRestoreIT
  • Full Maven test/compile validation in this checkout is currently blocked by unrelated upstream build issues in other modules (for example seatunnel-engine-server references missing types in the current checkout, and reactor builds are also blocked by seatunnel-config-shade compilation issues), so this PR remains draft.

@zhangshenghang zhangshenghang marked this pull request as ready for review April 1, 2026 09:52
Copy link
Copy Markdown

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with the tombstone approach for late callbacks, but I think the delayed cleanup currently loses its cleanup owner across a second master failover.

scheduleRemoveJobStateMaps() removes runningJobInfoIMap immediately and then schedules removeJobStateMaps() only in the local monitorService. If that master dies during the delay window, the scheduled task disappears with it. When the next master restores, there is no runningJobInfoIMap entry left to rediscover this job, and restoreJobFromMasterActiveSwitch() just returns for terminal states.

That leaves the terminal entries in runningJobStateIMap / runningJobStateTimestampsIMap orphaned permanently. I think the delayed-cleanup intent needs to be persisted in distributed state (or another recoverable cleanup record), otherwise this closes the race only as long as the same master survives until the timer fires.

Copy link
Copy Markdown

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. I re-reviewed the latest HEAD locally, and I still see the same failover hole during the delayed-cleanup window.

cleanJob() still calls scheduleRemoveJobStateMaps() (JobMaster.java:778-782), and that method still removes runningJobInfoIMap immediately (JobMaster.java:644-647) before scheduling the delayed cleanup on the local monitorService (JobMaster.java:658-672). But master-switch restore only scans runningJobInfoIMap.entrySet() (CoordinatorService.java:636-642, 665-681).

So if the active master dies before the delayed task fires, the next master has no distributed record left to rediscover this terminal job, and the remaining state maps are still orphaned. The new end-state guard in restoreJobFromMasterActiveSwitch() does not close that gap, because it only runs for jobs that still have a runningJobInfoIMap entry.

The new JobStateCleanupDelayTest currently asserts that runningJobInfoIMap is already null immediately after terminal completion, which effectively codifies the same gap instead of covering the second-master-failover case.

I think the delayed-cleanup intent still needs to be persisted in recoverable distributed state, or runningJobInfoIMap needs to remain until delayed cleanup actually executes, before this can merge.

Copy link
Copy Markdown

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. I pulled the latest HEAD locally and re-reviewed the delayed-cleanup path.

I still see the same blocking failover hole during the cleanup-delay window. cleanJob() still calls scheduleRemoveJobStateMaps(), and that method still removes runningJobInfoIMap immediately before scheduling the delayed cleanup only on the local monitorService. But master-switch restore still discovers jobs only by scanning runningJobInfoIMap. So if the active master dies before the delayed task fires, the next master has no distributed record left to rediscover this terminal job, and the remaining state maps are still orphaned.

The new end-state guard in restoreJobFromMasterActiveSwitch() does not close that gap because it only runs for jobs that still have a runningJobInfoIMap entry. The new JobStateCleanupDelayTest currently asserts that runningJobInfoIMap is already null during the delay window, which codifies the same hole instead of covering the second-master-failover case.

I think the delayed cleanup intent still needs to be persisted in recoverable distributed state, or runningJobInfoIMap needs to stay until the delayed cleanup actually executes. After that, this will be much closer.

Copy link
Copy Markdown

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pulled the latest HEAD locally again and I still see the same blocking failover hole during the cleanup-delay window.

JobMaster.scheduleRemoveJobStateMaps() still removes runningJobInfoIMap immediately before scheduling the delayed cleanup only on the local monitorService. But master-switch restore still discovers jobs by scanning runningJobInfoIMap in CoordinatorService.restoreAllRunningJobFromMasterNodeSwitch(). So if the active master dies before the delayed task fires, the next master still has no distributed record left to rediscover this terminal job, and the remaining state maps can still be orphaned.

The new stateCleanupDelayMillis=0 test config and the late-checkpoint guard do not close that gap, because they do not persist the delayed-cleanup intent across a second master failover.

I still think this needs one of these two directions before merge:

  • keep runningJobInfoIMap until the delayed cleanup actually executes, or
  • persist the delayed-cleanup intent in recoverable distributed state.

After that, I am happy to re-review.

Copy link
Copy Markdown

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pulled the latest HEAD locally again and re-checked the terminal-cleanup / master-switch path.

The previous failover hole looks closed now: JobMaster.scheduleRemoveJobStateMaps() persists a JobCleanupRecord in IMAP_PENDING_JOB_CLEANUP, CoordinatorService.restoreJobFromMasterActiveSwitch() reschedules terminal cleanup instead of dropping the job blindly, and the REST / overview paths filter delayed-cleanup tombstones so finished jobs are not shown as running. With the new unit / E2E coverage around delayed cleanup and no-restore cluster failure, I do not see the previous blocker in the current revision.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants