[Fix][Zeta] Guard state cleanup races after node failure#10687
[Fix][Zeta] Guard state cleanup races after node failure#10687zhangshenghang wants to merge 14 commits intoapache:devfrom
Conversation
DanielLeens
left a comment
There was a problem hiding this comment.
I agree with the tombstone approach for late callbacks, but I think the delayed cleanup currently loses its cleanup owner across a second master failover.
scheduleRemoveJobStateMaps() removes runningJobInfoIMap immediately and then schedules removeJobStateMaps() only in the local monitorService. If that master dies during the delay window, the scheduled task disappears with it. When the next master restores, there is no runningJobInfoIMap entry left to rediscover this job, and restoreJobFromMasterActiveSwitch() just returns for terminal states.
That leaves the terminal entries in runningJobStateIMap / runningJobStateTimestampsIMap orphaned permanently. I think the delayed-cleanup intent needs to be persisted in distributed state (or another recoverable cleanup record), otherwise this closes the race only as long as the same master survives until the timer fires.
DanielLeens
left a comment
There was a problem hiding this comment.
Thanks for the update. I re-reviewed the latest HEAD locally, and I still see the same failover hole during the delayed-cleanup window.
cleanJob() still calls scheduleRemoveJobStateMaps() (JobMaster.java:778-782), and that method still removes runningJobInfoIMap immediately (JobMaster.java:644-647) before scheduling the delayed cleanup on the local monitorService (JobMaster.java:658-672). But master-switch restore only scans runningJobInfoIMap.entrySet() (CoordinatorService.java:636-642, 665-681).
So if the active master dies before the delayed task fires, the next master has no distributed record left to rediscover this terminal job, and the remaining state maps are still orphaned. The new end-state guard in restoreJobFromMasterActiveSwitch() does not close that gap, because it only runs for jobs that still have a runningJobInfoIMap entry.
The new JobStateCleanupDelayTest currently asserts that runningJobInfoIMap is already null immediately after terminal completion, which effectively codifies the same gap instead of covering the second-master-failover case.
I think the delayed-cleanup intent still needs to be persisted in recoverable distributed state, or runningJobInfoIMap needs to remain until delayed cleanup actually executes, before this can merge.
DanielLeens
left a comment
There was a problem hiding this comment.
Thanks for the update. I pulled the latest HEAD locally and re-reviewed the delayed-cleanup path.
I still see the same blocking failover hole during the cleanup-delay window. cleanJob() still calls scheduleRemoveJobStateMaps(), and that method still removes runningJobInfoIMap immediately before scheduling the delayed cleanup only on the local monitorService. But master-switch restore still discovers jobs only by scanning runningJobInfoIMap. So if the active master dies before the delayed task fires, the next master has no distributed record left to rediscover this terminal job, and the remaining state maps are still orphaned.
The new end-state guard in restoreJobFromMasterActiveSwitch() does not close that gap because it only runs for jobs that still have a runningJobInfoIMap entry. The new JobStateCleanupDelayTest currently asserts that runningJobInfoIMap is already null during the delay window, which codifies the same hole instead of covering the second-master-failover case.
I think the delayed cleanup intent still needs to be persisted in recoverable distributed state, or runningJobInfoIMap needs to stay until the delayed cleanup actually executes. After that, this will be much closer.
DanielLeens
left a comment
There was a problem hiding this comment.
I pulled the latest HEAD locally again and I still see the same blocking failover hole during the cleanup-delay window.
JobMaster.scheduleRemoveJobStateMaps() still removes runningJobInfoIMap immediately before scheduling the delayed cleanup only on the local monitorService. But master-switch restore still discovers jobs by scanning runningJobInfoIMap in CoordinatorService.restoreAllRunningJobFromMasterNodeSwitch(). So if the active master dies before the delayed task fires, the next master still has no distributed record left to rediscover this terminal job, and the remaining state maps can still be orphaned.
The new stateCleanupDelayMillis=0 test config and the late-checkpoint guard do not close that gap, because they do not persist the delayed-cleanup intent across a second master failover.
I still think this needs one of these two directions before merge:
- keep
runningJobInfoIMapuntil the delayed cleanup actually executes, or - persist the delayed-cleanup intent in recoverable distributed state.
After that, I am happy to re-review.
DanielLeens
left a comment
There was a problem hiding this comment.
I pulled the latest HEAD locally again and re-checked the terminal-cleanup / master-switch path.
The previous failover hole looks closed now: JobMaster.scheduleRemoveJobStateMaps() persists a JobCleanupRecord in IMAP_PENDING_JOB_CLEANUP, CoordinatorService.restoreJobFromMasterActiveSwitch() reschedules terminal cleanup instead of dropping the job blindly, and the REST / overview paths filter delayed-cleanup tombstones so finished jobs are not shown as running. With the new unit / E2E coverage around delayed cleanup and no-restore cluster failure, I do not see the previous blocker in the current revision.
Purpose of this pull request
This PR fixes an engine-side terminal-state convergence bug after worker node failure.
When a worker goes offline, the engine can start cleaning distributed state from the running job state maps before all asynchronous task/pipeline/job callbacks have finished. In the current code path,
PhysicalVertex,SubPlan, andPhysicalPlancan observe missing state entries and throwNullPointerException, which interrupts terminal-state convergence and may leave the job hanging in an intermediate state.This PR changes the cleanup strategy instead of relying on local fallback state:
runningJobInfoIMapimmediately so terminal jobs are not restored on master switchIt also adds:
BATCH + no checkpoint + job.retry.times=0no-restore pathDoes this PR introduce any user-facing change?
No user-facing API/config change in normal operation. This improves failure handling so jobs are less likely to hang in an intermediate state after node failure.
How was this patch tested?
Verified locally:
./mvnw -nsu -pl seatunnel-engine/seatunnel-engine-common spotless:check./mvnw -nsu -pl seatunnel-engine/seatunnel-engine-server spotless:check./mvnw -nsu -pl seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base spotless:checkAdditional notes:
StateTransitionCleanupTest,JobStateCleanupDelayTestClusterFailureNoRestoreITseatunnel-engine-serverreferences missing types in the current checkout, and reactor builds are also blocked byseatunnel-config-shadecompilation issues), so this PR remains draft.