maintainer,dispatcher, coordinator: fence stale generation requests#5182
maintainer,dispatcher, coordinator: fence stale generation requests#5182hongyunyan wants to merge 69 commits into
Conversation
|
Skipping CI for Draft Pull Request. |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughThreads maintainer_epoch through proto, coordinator, backend, maintainer, operators, dispatcher manager/orchestrator, and span replication; adds Backend.BumpChangefeedEpoch with EpochBumpOptions; gates bootstrap/close/messages by maintainer epoch; implements dispatcher replacement fencing; updates tests. ChangesMaintainer-epoch fencing and propagation
Estimated code review effort Suggested labels Suggested reviewers
✨ Finishing Touches🧪 Generate unit tests (beta)
|
There was a problem hiding this comment.
Code Review
This pull request introduces a maintainer generation/epoch fencing mechanism to prevent stale maintainer requests from affecting dispatcher managers. It adds generation fields to heartbeat protobuf messages, implements fencing logic in the dispatcher manager and orchestrator, and stamps outgoing requests with the current maintainer generation. The review feedback highlights two critical head-of-line blocking issues in dispatcher_orchestrator.go where the orchestrator-wide lock m.mutex is held while waiting for the per-changefeed lock manager.LockControl(), and provides suggestions to safely release the lock before acquiring the per-changefeed lock.
| } else { | ||
| manager.LockControl() | ||
| defer manager.UnlockControl() | ||
| if !manager.TryUpdateMaintainer(from, generation) { | ||
| log.Warn("drop stale maintainer bootstrap request", | ||
| zap.String("changefeed", cfId.Name()), | ||
| zap.String("from", from.String()), | ||
| zap.Uint64("requestGeneration", generation), | ||
| zap.Uint64("currentGeneration", manager.GetMaintainerEpoch()), | ||
| zap.String("currentMaintainer", manager.GetMaintainerID().String())) | ||
| return nil | ||
| } |
There was a problem hiding this comment.
Holding the orchestrator-wide lock m.mutex while waiting for the per-changefeed lock manager.LockControl() can cause head-of-line blocking. If a single changefeed's dispatcher manager is slow or blocked, all other changefeeds on this node will be blocked from bootstrapping or closing.
To avoid this, we should unlock m.mutex as soon as we retrieve the manager from m.dispatcherManagers, and then acquire manager.LockControl(). To prevent races with concurrent close/delete operations, we can re-verify under m.mutex that the manager is still the active one in m.dispatcherManagers before proceeding.
} else {
m.mutex.Unlock()
manager.LockControl()
defer manager.UnlockControl()
m.mutex.Lock()
currentManager, stillExists := m.dispatcherManagers[cfId]
if !stillExists || currentManager != manager {
m.mutex.Unlock()
return nil
}
m.mutex.Unlock()
if !manager.TryUpdateMaintainer(from, generation) {
log.Warn("drop stale maintainer bootstrap request",
zap.String("changefeed", cfId.Name()),
zap.String("from", from.String()),
zap.Uint64("requestGeneration", generation),
zap.Uint64("currentGeneration", manager.GetMaintainerEpoch()),
zap.String("currentMaintainer", manager.GetMaintainerID().String()))
return nil
}| m.mutex.Lock() | ||
| if manager, ok := m.dispatcherManagers[cfId]; ok { | ||
| if closed := manager.TryClose(req.Removed); closed { | ||
| delete(m.dispatcherManagers, cfId) | ||
| metrics.DispatcherManagerGauge.WithLabelValues(cfId.Keyspace(), cfId.Name()).Dec() | ||
| response.Success = true | ||
| manager.LockControl() | ||
| if manager.IsMaintainerRequestAllowed(from, req.Generation) { | ||
| if closed := manager.TryClose(req.Removed); closed { | ||
| delete(m.dispatcherManagers, cfId) | ||
| metrics.DispatcherManagerGauge.WithLabelValues(cfId.Keyspace(), cfId.Name()).Dec() | ||
| response.Success = true | ||
| } else { | ||
| response.Success = false | ||
| } | ||
| } else { | ||
| response.Success = false | ||
| log.Warn("drop stale maintainer close request", | ||
| zap.String("changefeed", cfId.Name()), | ||
| zap.String("from", from.String()), | ||
| zap.Uint64("requestGeneration", req.Generation), | ||
| zap.Uint64("currentGeneration", manager.GetMaintainerEpoch()), | ||
| zap.String("currentMaintainer", manager.GetMaintainerID().String())) | ||
| } | ||
| manager.UnlockControl() | ||
| } | ||
| m.mutex.Unlock() |
There was a problem hiding this comment.
Holding the orchestrator-wide lock m.mutex while waiting for the per-changefeed lock manager.LockControl() can cause head-of-line blocking. If a single changefeed's dispatcher manager is slow or blocked, all other changefeeds on this node will be blocked from bootstrapping or closing.
To avoid this, we should unlock m.mutex as soon as we retrieve the manager from m.dispatcherManagers, and then acquire manager.LockControl(). To prevent races with concurrent close/delete operations, we can re-verify under m.mutex that the manager is still the active one in m.dispatcherManagers before proceeding.
m.mutex.Lock()
manager, ok := m.dispatcherManagers[cfId]
if !ok {
m.mutex.Unlock()
return response
}
m.mutex.Unlock()
manager.LockControl()
defer manager.UnlockControl()
m.mutex.Lock()
currentManager, stillExists := m.dispatcherManagers[cfId]
if !stillExists || currentManager != manager {
m.mutex.Unlock()
response.Success = false
return response
}
if manager.IsMaintainerRequestAllowed(from, req.Generation) {
if closed := manager.TryClose(req.Removed); closed {
delete(m.dispatcherManagers, cfId)
metrics.DispatcherManagerGauge.WithLabelValues(cfId.Keyspace(), cfId.Name()).Dec()
response.Success = true
} else {
response.Success = false
}
} else {
log.Warn("drop stale maintainer close request",
zap.String("changefeed", cfId.Name()),
zap.String("from", from.String()),
zap.Uint64("requestGeneration", req.Generation),
zap.Uint64("currentGeneration", manager.GetMaintainerEpoch()),
zap.String("currentMaintainer", manager.GetMaintainerID().String()))
}
m.mutex.Unlock()…neration-fence # Conflicts: # coordinator/controller.go # heartbeatpb/heartbeat.pb.go
|
/test all |
|
/test pull-error-log-review |
This reverts commit 7587ff1.
|
/test all |
There was a problem hiding this comment.
Actionable comments posted: 8
🧹 Nitpick comments (2)
maintainer/barrier_test.go (1)
932-934: ⚡ Quick winAssert the propagated maintainer epoch in this test.
This call now passes an explicit epoch (
7), but the test only assertsStartTs. Add a direct assertion onScheduleDispatcherRequest.MaintainerEpochso this behavior is locked by test.Suggested test update
scheduleMsg := ddlSpan.NewAddDispatcherMessage("node1", heartbeatpb.OperatorType_O_Add, 7) -require.Equal(t, uint64(9), scheduleMsg.Message[0].(*heartbeatpb.ScheduleDispatcherRequest).Config.StartTs, false) -require.NotEqual(t, uint64(0), scheduleMsg.Message[0].(*heartbeatpb.ScheduleDispatcherRequest).Config.StartTs, false) +req := scheduleMsg.Message[0].(*heartbeatpb.ScheduleDispatcherRequest) +require.Equal(t, uint64(7), req.MaintainerEpoch) +require.Equal(t, uint64(9), req.Config.StartTs) +require.NotEqual(t, uint64(0), req.Config.StartTs)As per coding guidelines,
**/*_test.go: Prefer focused deterministic tests; see docs/agents/testing.md before adding or changing tests.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@maintainer/barrier_test.go` around lines 932 - 934, The test currently creates scheduleMsg via ddlSpan.NewAddDispatcherMessage("node1", heartbeatpb.OperatorType_O_Add, 7) but only asserts StartTs; add an explicit assertion that the dispatched request's MaintainerEpoch equals the passed epoch (7). Locate where scheduleMsg is used (variable scheduleMsg and type heartbeatpb.ScheduleDispatcherRequest) and add a require.Equal(t, uint64(7), scheduleMsg.Message[0].(*heartbeatpb.ScheduleDispatcherRequest).MaintainerEpoch) (or the equivalent assertion style used in the file) to lock the behavior.Source: Coding guidelines
coordinator/operator/operator_stop_test.go (1)
62-82: ⚡ Quick winAdd a stale-epoch
Checktest for the new fencing path.Line 75 and Line 79 update constructor epochs, but this file still doesn’t assert that
Checkignores non-working status whenMaintainerEpochmismatches. A focused test here would lock in the new safety contract.Proposed test shape
+func TestStopChangefeedOperator_CheckIgnoresMismatchedEpoch(t *testing.T) { + ctrl := gomock.NewController(t) + backend := mock_changefeed.NewMockBackend(ctrl) + cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName) + op := NewStopChangefeedOperator(common.DefaultKeyspaceID, cfID, "n1", "n2", backend, true, 10) + + op.Check("n1", &heartbeatpb.MaintainerStatus{ + State: heartbeatpb.ComponentState_Stopped, + MaintainerEpoch: 9, + }) + require.False(t, op.IsFinished()) + + op.Check("n1", &heartbeatpb.MaintainerStatus{ + State: heartbeatpb.ComponentState_Stopped, + MaintainerEpoch: 10, + }) + require.True(t, op.IsFinished()) +}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@coordinator/operator/operator_stop_test.go` around lines 62 - 82, Add a focused unit test in TestStopChangefeedOperator_PostFinish to cover the new fencing path: create a StopChangefeedOperator via NewStopChangefeedOperator with a non-matching MaintainerEpoch (simulate epoch mismatch), set the changefeed in changefeedDB to a non-working status, then call the operator's Check method and assert it ignores the non-working status (i.e., does not treat the changefeed as owned/active or proceed with stop actions). Use the existing changefeedDB, cfID and maintainers used in the test to locate where to add this assertion and verify Check's behavior when MaintainerEpoch differs.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@coordinator/controller.go`:
- Around line 1008-1018: The resumed changefeed still carries the previous
failure in ChangeFeedInfo.Error; after calling
c.backend.BumpChangefeedEpoch(...) and before cf.SetInfo(info), clear the
persisted error (e.g., set info.Error = "" or nil) so the resumed state
(StateNormal) no longer shows the old failure. Update the resume path around the
BumpChangefeedEpoch return handling (the variables/functions:
c.backend.BumpChangefeedEpoch, changefeed.EpochBumpOptions, ProgressNone,
UpdateStatus, normalState, and cf.SetInfo(info)) to explicitly clear
ChangeFeedInfo.Error on the returned info before calling cf.SetInfo(info).
In `@coordinator/coordinator.go`:
- Around line 273-285: The warning-branch currently fences cleanup using
currentInfo.Epoch (currentMaintainerEpoch) which can be advanced during an
in-flight move and causes RemoveMaintainerRequest to be sent with the
destination epoch to the old owner; instead reuse the same origin/target and
origin-epoch resolution that stopChangefeed (and moveOriginStopTargetLocked)
uses so the stop request is built with the correct origin maintainer epoch.
Change the block handling event.state == config.StateWarning to stop using
currentInfo.Epoch directly and call the same resolution helper used by
stopChangefeed/moveOriginStopTargetLocked (or delegate to
StopChangefeedWithMaintainerEpoch via that helper) to compute the correct
maintainer epoch before invoking
c.controller.operatorController.StopChangefeedWithMaintainerEpoch; ensure
updateChangefeedEpoch still sets the warning state and UpdateError but do not
fence the cleanup with the possibly-advanced currentInfo.Epoch.
In `@downstreamadapter/dispatchermanager/dispatcher_manager_test.go`:
- Around line 266-278: The test constructs a RedoDispatcher using manager.sink
but production uses manager.redoSink; update the test to pass manager.redoSink
when calling dispatcher.NewRedoDispatcher (the instance created as oldTrigger
with oldTriggerID and common.KeyspaceDDLSpan(manager.keyspaceID)) so the test
exercises redo-specific close/flush behavior; ensure any setup that populates
manager.redoSink is present or mocked similarly to production before creating
the RedoDispatcher.
In `@downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator_test.go`:
- Line 313: Rename the test functions to remove underscores and use camelCase:
change TestPendingMessageQueue_StaleRemovedCloseCannotOverrideNewerEpochClose to
TestPendingMessageQueueStaleRemovedCloseCannotOverrideNewerEpochClose, and
likewise rename the other test at line 352 to a camelCase variant (remove
underscores). Update any references or calls to those functions in the file so
the new names are used.
In `@downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go`:
- Around line 604-642: The bootstrap response currently appends a cloned
ScheduleDispatcherRequest (req.ScheduleDispatcherRequest) including the original
req.MaintainerEpoch into response.Operators, which can be replayed by the
maintainer and get fenced out; to fix, ensure the maintainer-side consumer that
processes MaintainerBootstrapResponse (the code that reads response.Operators
and forwards to dispatcher-manager) does not forward the cloned MaintainerEpoch
verbatim but instead reconstructs the request or overwrites
ScheduleDispatcherRequest.MaintainerEpoch with the maintainer's current epoch
(e.g., manager.GetMaintainerEpoch() or equivalent) before calling
dispatcher-manager methods; locate the maintainer bootstrap consumer handling
MaintainerBootstrapResponse and update it to set the current MaintainerEpoch on
each ScheduleDispatcherRequest prior to dispatching.
In `@pkg/common/maintainer_epoch.go`:
- Around line 21-23: The current MaintainerEpochMatches allows reportedEpoch==0
unconditionally which accepts epoch-0 messages after a strict epoch is
established; update the function MaintainerEpochMatches so it only treats
epoch-0 as a match when the currentEpoch is also 0 (i.e., return true if
reportedEpoch == currentEpoch OR both reportedEpoch and currentEpoch are 0),
thereby rejecting reportedEpoch==0 when currentEpoch > 0 to restore strict
fencing.
In `@pkg/pdutil/utils.go`:
- Around line 70-79: GenerateChangefeedEpoch currently returns a fallback
time.Now().UnixNano() and only logs a warning when pdClient.GetTS(ctx) fails,
allowing coordinator/operator code to call BumpChangefeedEpoch with an
unreliable candidateEpoch; change GenerateChangefeedEpoch to propagate and
return the pdClient.GetTS error instead of returning a fallback value, update
its signature and all callers (coordinator/operator epoch-bump paths that call
GenerateChangefeedEpoch) to handle/abort on that error before calling
BumpChangefeedEpoch, and ensure BumpChangefeedEpoch and AdvanceChangefeedEpoch
remain unchanged in logic so only callers pass a valid candidateEpoch obtained
from PD.
In `@tests/integration_tests/in_flight_syncpoint_during_scheduling/run.sh`:
- Around line 122-123: The two shell commands using $WORK_DIR (the ensure/...
grep line and the merge_line=... assignment) should wrap the variable in double
quotes while leaving the cdc*.log glob unquoted so paths with spaces don't
break; update the ensure invocation that greps "$WORK_DIR"/cdc*.log and the
merge_line assignment to reference "$WORK_DIR"/cdc*.log (preserving the grep
patterns and tail logic) so the glob still expands but the base directory is
safely quoted.
---
Nitpick comments:
In `@coordinator/operator/operator_stop_test.go`:
- Around line 62-82: Add a focused unit test in
TestStopChangefeedOperator_PostFinish to cover the new fencing path: create a
StopChangefeedOperator via NewStopChangefeedOperator with a non-matching
MaintainerEpoch (simulate epoch mismatch), set the changefeed in changefeedDB to
a non-working status, then call the operator's Check method and assert it
ignores the non-working status (i.e., does not treat the changefeed as
owned/active or proceed with stop actions). Use the existing changefeedDB, cfID
and maintainers used in the test to locate where to add this assertion and
verify Check's behavior when MaintainerEpoch differs.
In `@maintainer/barrier_test.go`:
- Around line 932-934: The test currently creates scheduleMsg via
ddlSpan.NewAddDispatcherMessage("node1", heartbeatpb.OperatorType_O_Add, 7) but
only asserts StartTs; add an explicit assertion that the dispatched request's
MaintainerEpoch equals the passed epoch (7). Locate where scheduleMsg is used
(variable scheduleMsg and type heartbeatpb.ScheduleDispatcherRequest) and add a
require.Equal(t, uint64(7),
scheduleMsg.Message[0].(*heartbeatpb.ScheduleDispatcherRequest).MaintainerEpoch)
(or the equivalent assertion style used in the file) to lock the behavior.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 2127b28c-fcd6-4e68-99f4-544aee0798c9
⛔ Files ignored due to path filters (1)
heartbeatpb/heartbeat.pb.gois excluded by!**/*.pb.go
📒 Files selected for processing (70)
coordinator/changefeed/changefeed.gocoordinator/changefeed/changefeed_db_backend.gocoordinator/changefeed/changefeed_test.gocoordinator/changefeed/etcd_backend.gocoordinator/changefeed/etcd_backend_test.gocoordinator/changefeed/mock/changefeed_db_backend.gocoordinator/controller.gocoordinator/controller_drain_test.gocoordinator/controller_test.gocoordinator/coordinator.gocoordinator/coordinator_test.gocoordinator/create_changefeed_gc_test.gocoordinator/operator/operator_add.gocoordinator/operator/operator_add_test.gocoordinator/operator/operator_controller.gocoordinator/operator/operator_controller_test.gocoordinator/operator/operator_move.gocoordinator/operator/operator_move_test.gocoordinator/operator/operator_stop.gocoordinator/operator/operator_stop_test.gocoordinator/scheduler/balance_test.gocoordinator/scheduler/basic_test.gocoordinator/scheduler/drain_test.godownstreamadapter/dispatchermanager/dispatcher_manager.godownstreamadapter/dispatchermanager/dispatcher_manager_helper.godownstreamadapter/dispatchermanager/dispatcher_manager_info.godownstreamadapter/dispatchermanager/dispatcher_manager_redo.godownstreamadapter/dispatchermanager/dispatcher_manager_test.godownstreamadapter/dispatchermanager/heartbeat_collector.godownstreamadapter/dispatchermanager/helper.godownstreamadapter/dispatchermanager/helper_test.godownstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.godownstreamadapter/dispatcherorchestrator/dispatcher_orchestrator_test.godownstreamadapter/dispatcherorchestrator/helper.goheartbeatpb/heartbeat.protomaintainer/barrier.gomaintainer/barrier_event.gomaintainer/barrier_test.gomaintainer/maintainer.gomaintainer/maintainer_controller.gomaintainer/maintainer_controller_bootstrap.gomaintainer/maintainer_controller_helper.gomaintainer/maintainer_manager_maintainers.gomaintainer/maintainer_manager_test.gomaintainer/maintainer_test.gomaintainer/operator/operator_add.gomaintainer/operator/operator_add_test.gomaintainer/operator/operator_controller.gomaintainer/operator/operator_controller_test.gomaintainer/operator/operator_merge.gomaintainer/operator/operator_merge_test.gomaintainer/operator/operator_move.gomaintainer/operator/operator_move_test.gomaintainer/operator/operator_remove.gomaintainer/operator/operator_remove_test.gomaintainer/operator/operator_split.gomaintainer/operator/operator_split_test.gomaintainer/replica/replication_span.gomaintainer/replica/replication_span_test.gomaintainer/scheduler/balance.gomaintainer/scheduler/balance_splits.gomaintainer/scheduler/basic.gomaintainer/scheduler/drain.gomaintainer/scheduler/drain_test.gomaintainer/span/span_controller_test.gopkg/common/format.gopkg/common/maintainer_epoch.gopkg/pdutil/utils.gopkg/pdutil/utils_test.gotests/integration_tests/in_flight_syncpoint_during_scheduling/run.sh
| info, err := c.backend.BumpChangefeedEpoch(ctx, id, epoch, changefeed.EpochBumpOptions{ | ||
| CheckpointTs: checkpointTs, | ||
| Progress: config.ProgressNone, | ||
| UpdateStatus: true, | ||
| State: &normalState, | ||
| }) | ||
| if err != nil { | ||
| return err | ||
| return errors.Trace(err) | ||
| } | ||
|
|
||
| clone.State = config.StateNormal | ||
| clone.Epoch = pdutil.GenerateChangefeedEpoch(ctx, c.pdClient) | ||
| cf.SetInfo(clone) | ||
| cf.SetInfo(info) | ||
|
|
There was a problem hiding this comment.
Clear the persisted changefeed error when resuming.
This resume path bumps epoch, state, and status, but it never clears ChangeFeedInfo.Error. Since Line 1017 writes the returned info back into memory, a resumed failed changefeed can stay annotated with its old failure in GetChangefeed/metrics even after moving back to StateNormal.
Suggested fix
info, err := c.backend.BumpChangefeedEpoch(ctx, id, epoch, changefeed.EpochBumpOptions{
CheckpointTs: checkpointTs,
Progress: config.ProgressNone,
UpdateStatus: true,
State: &normalState,
+ UpdateError: true,
+ Error: nil,
})📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| info, err := c.backend.BumpChangefeedEpoch(ctx, id, epoch, changefeed.EpochBumpOptions{ | |
| CheckpointTs: checkpointTs, | |
| Progress: config.ProgressNone, | |
| UpdateStatus: true, | |
| State: &normalState, | |
| }) | |
| if err != nil { | |
| return err | |
| return errors.Trace(err) | |
| } | |
| clone.State = config.StateNormal | |
| clone.Epoch = pdutil.GenerateChangefeedEpoch(ctx, c.pdClient) | |
| cf.SetInfo(clone) | |
| cf.SetInfo(info) | |
| info, err := c.backend.BumpChangefeedEpoch(ctx, id, epoch, changefeed.EpochBumpOptions{ | |
| CheckpointTs: checkpointTs, | |
| Progress: config.ProgressNone, | |
| UpdateStatus: true, | |
| State: &normalState, | |
| UpdateError: true, | |
| Error: nil, | |
| }) | |
| if err != nil { | |
| return errors.Trace(err) | |
| } | |
| cf.SetInfo(info) |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@coordinator/controller.go` around lines 1008 - 1018, The resumed changefeed
still carries the previous failure in ChangeFeedInfo.Error; after calling
c.backend.BumpChangefeedEpoch(...) and before cf.SetInfo(info), clear the
persisted error (e.g., set info.Error = "" or nil) so the resumed state
(StateNormal) no longer shows the old failure. Update the resume path around the
BumpChangefeedEpoch return handling (the variables/functions:
c.backend.BumpChangefeedEpoch, changefeed.EpochBumpOptions, ProgressNone,
UpdateStatus, normalState, and cf.SetInfo(info)) to explicitly clear
ChangeFeedInfo.Error on the returned info before calling cf.SetInfo(info).
| if event.state == config.StateWarning { | ||
| warningState := config.StateWarning | ||
| currentMaintainerEpoch := currentInfo.Epoch | ||
| if err := c.controller.updateChangefeedEpoch(ctx, event.changefeedID, changefeed.EpochBumpOptions{ | ||
| State: &warningState, | ||
| Error: event.err, | ||
| UpdateError: true, | ||
| }); err != nil { | ||
| return errors.Trace(err) | ||
| } | ||
| c.controller.operatorController.StopChangefeedWithMaintainerEpoch(ctx, event.changefeedID, false, currentMaintainerEpoch) | ||
| c.controller.moveChangefeedToSchedulingQueue(event.changefeedID, false, false) | ||
| return nil |
There was a problem hiding this comment.
Don't fence warning cleanup with currentInfo.Epoch during an in-flight move.
currentInfo.Epoch is already advanced once ownership has been bumped for a move/add, but the origin maintainer can still be the one reporting the warning. This branch bypasses the new moveOriginStopTargetLocked logic in stopChangefeed, so it can emit a RemoveMaintainerRequest with the destination epoch to the old owner. That request will be ignored, leaving the stale writer alive—the exact downstream-conflict case this PR is trying to fence. Reuse the same origin-target/origin-epoch resolution that StopChangefeed uses before building the stop request here.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@coordinator/coordinator.go` around lines 273 - 285, The warning-branch
currently fences cleanup using currentInfo.Epoch (currentMaintainerEpoch) which
can be advanced during an in-flight move and causes RemoveMaintainerRequest to
be sent with the destination epoch to the old owner; instead reuse the same
origin/target and origin-epoch resolution that stopChangefeed (and
moveOriginStopTargetLocked) uses so the stop request is built with the correct
origin maintainer epoch. Change the block handling event.state ==
config.StateWarning to stop using currentInfo.Epoch directly and call the same
resolution helper used by stopChangefeed/moveOriginStopTargetLocked (or delegate
to StopChangefeedWithMaintainerEpoch via that helper) to compute the correct
maintainer epoch before invoking
c.controller.operatorController.StopChangefeedWithMaintainerEpoch; ensure
updateChangefeedEpoch still sets the warning state and UpdateError but do not
fence the cleanup with the possibly-advanced currentInfo.Epoch.
| require.True(t, req.Removed) | ||
| } | ||
|
|
||
| func TestPendingMessageQueue_StaleRemovedCloseCannotOverrideNewerEpochClose(t *testing.T) { |
There was a problem hiding this comment.
Rename new test functions to camelCase (no underscores).
Line 313 and Line 352 introduce new test function names with underscores, which violates the repository Go naming rule.
As per coding guidelines, **/*.go functions should use camelCase naming and do not include underscores.
🔧 Proposed rename
-func TestPendingMessageQueue_StaleRemovedCloseCannotOverrideNewerEpochClose(t *testing.T) {
+func TestPendingMessageQueueStaleRemovedCloseCannotOverrideNewerEpochClose(t *testing.T) {
@@
-func TestPendingMessageQueue_NewerMaintainerEpochOverridesPendingRequest(t *testing.T) {
+func TestPendingMessageQueueNewerMaintainerEpochOverridesPendingRequest(t *testing.T) {Also applies to: 352-352
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator_test.go` at
line 313, Rename the test functions to remove underscores and use camelCase:
change TestPendingMessageQueue_StaleRemovedCloseCannotOverrideNewerEpochClose to
TestPendingMessageQueueStaleRemovedCloseCannotOverrideNewerEpochClose, and
likewise rename the other test at line 352 to a camelCase variant (remove
underscores). Update any references or calls to those functions in the file so
the new names are used.
Source: Coding guidelines
| manager.GetCurrentOperatorMap().Range(func(_, value any) bool { | ||
| req := value.(dispatchermanager.SchedulerDispatcherRequest) | ||
| requestAllowed := manager.IsMaintainerRequestAllowed(req.From, req.MaintainerEpoch) | ||
| dispatcherID := common.NewDispatcherIDFromPB(req.Config.DispatcherID) | ||
| if common.IsRedoMode(req.Config.Mode) { | ||
| if manager.IsRedoReady() { | ||
| _, ok := manager.GetRedoDispatcherMap().Get(dispatcherID) | ||
| // Log error if dispatcher not found and action is not create | ||
| // It's possible that the dispatcher is not found when the action is create | ||
| // because the dispatcher may be created after the operator is stored | ||
| if !ok && req.ScheduleAction != heartbeatpb.ScheduleAction_Create { | ||
| log.Error("Redo dispatcher not found, this should not happen", | ||
| zap.String("changefeed", changefeedID.String()), | ||
| zap.String("dispatcherID", req.Config.DispatcherID.String()), | ||
| ) | ||
| } | ||
| dispatcherExistsKnown := !common.IsRedoMode(req.Config.Mode) || manager.IsRedoReady() | ||
| _, dispatcherReported := reportedDispatchers[reportedDispatcherKey{ | ||
| id: dispatcherID, | ||
| mode: req.Config.Mode, | ||
| }] | ||
| if !requestAllowed { | ||
| // Restore stale remove only when the same bootstrap snapshot reports the dispatcher. | ||
| // This keeps the working span and cleanup intent consistent even if live maps change during cleanup. | ||
| if req.ScheduleAction != heartbeatpb.ScheduleAction_Remove || !dispatcherReported { | ||
| return true | ||
| } | ||
| } else { | ||
| _, ok := manager.GetDispatcherMap().Get(dispatcherID) | ||
| // Log error if dispatcher not found and action is not create | ||
| // It's possible that the dispatcher is not found when the action is create | ||
| // because the dispatcher may be created after the operator is stored | ||
| if !ok && req.ScheduleAction != heartbeatpb.ScheduleAction_Create { | ||
| log.Info("include stale remove operator in bootstrap response", | ||
| zap.String("changefeed", changefeedID.String()), | ||
| zap.String("dispatcherID", req.Config.DispatcherID.String()), | ||
| zap.String("from", req.From.String()), | ||
| zap.Uint64("requestMaintainerEpoch", req.MaintainerEpoch), | ||
| zap.Uint64("currentMaintainerEpoch", manager.GetMaintainerEpoch()), | ||
| zap.String("currentMaintainer", manager.GetMaintainerID().String())) | ||
| } | ||
| // Log error if dispatcher not found and action is not create. | ||
| // It's possible that the dispatcher is not found when the action is create | ||
| // because the dispatcher may be created after the operator is stored. | ||
| if dispatcherExistsKnown && !dispatcherReported && req.ScheduleAction != heartbeatpb.ScheduleAction_Create { | ||
| if common.IsRedoMode(req.Config.Mode) { | ||
| log.Error("Redo dispatcher not found, this should not happen", | ||
| zap.String("changefeed", changefeedID.String()), | ||
| zap.String("dispatcherID", req.Config.DispatcherID.String())) | ||
| } else { | ||
| log.Error("Dispatcher not found, this should not happen", | ||
| zap.String("changefeed", changefeedID.String()), | ||
| zap.String("dispatcherID", req.Config.DispatcherID.String()), | ||
| ) | ||
| zap.String("dispatcherID", req.Config.DispatcherID.String())) | ||
| } | ||
| } | ||
| response.Operators = append(response.Operators, &heartbeatpb.ScheduleDispatcherRequest{ | ||
| ChangefeedID: req.ChangefeedID, | ||
| Config: req.Config, | ||
| ScheduleAction: req.ScheduleAction, | ||
| OperatorType: req.OperatorType, | ||
| }) | ||
| response.Operators = append(response.Operators, | ||
| proto.Clone(req.ScheduleDispatcherRequest).(*heartbeatpb.ScheduleDispatcherRequest)) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
rg -n -C3 'MaintainerBootstrapResponse|\.Operators\b|ScheduleDispatcherRequest|MaintainerEpoch' maintainer downstreamadapter coordinatorRepository: pingcap/ticdc
Length of output: 50370
Flag/adjust bootstrap replay epoch handling: confirm maintainer-side rewrites cloned MaintainerEpoch
The dispatcher-orchestrator bootstrap response clones ScheduleDispatcherRequest (including req.MaintainerEpoch) when building response.Operators; if the maintainer-side bootstrap restore path replays those requests verbatim, the next schedule hop could fence them out and the “stale remove” cleanup intent would be lost. Inspect the maintainer bootstrap consumer for MaintainerBootstrapResponse / response.Operators and ensure it reconstructs requests with the current maintainer epoch (or overwrites ScheduleDispatcherRequest.MaintainerEpoch) before sending to dispatcher-manager.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go` around
lines 604 - 642, The bootstrap response currently appends a cloned
ScheduleDispatcherRequest (req.ScheduleDispatcherRequest) including the original
req.MaintainerEpoch into response.Operators, which can be replayed by the
maintainer and get fenced out; to fix, ensure the maintainer-side consumer that
processes MaintainerBootstrapResponse (the code that reads response.Operators
and forwards to dispatcher-manager) does not forward the cloned MaintainerEpoch
verbatim but instead reconstructs the request or overwrites
ScheduleDispatcherRequest.MaintainerEpoch with the maintainer's current epoch
(e.g., manager.GetMaintainerEpoch() or equivalent) before calling
dispatcher-manager methods; locate the maintainer bootstrap consumer handling
MaintainerBootstrapResponse and update it to set the current MaintainerEpoch on
each ScheduleDispatcherRequest prior to dispatching.
| func MaintainerEpochMatches(reportedEpoch, currentEpoch uint64) bool { | ||
| return reportedEpoch == 0 || currentEpoch == 0 || reportedEpoch == currentEpoch | ||
| } |
There was a problem hiding this comment.
Epoch-0 responses are still accepted after strict epoch is established.
Line 22 currently returns true when reportedEpoch == 0, even if currentEpoch > 0. That weakens stale-message fencing in strict mode and can let legacy/stale responses pass epoch checks after failover.
Proposed fix
func MaintainerEpochMatches(reportedEpoch, currentEpoch uint64) bool {
- return reportedEpoch == 0 || currentEpoch == 0 || reportedEpoch == currentEpoch
+ if currentEpoch == 0 {
+ // Compatibility-mode maintainer accepts both legacy and strict peers.
+ return true
+ }
+ // Strict mode: only exact epoch match is valid.
+ return reportedEpoch == currentEpoch
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func MaintainerEpochMatches(reportedEpoch, currentEpoch uint64) bool { | |
| return reportedEpoch == 0 || currentEpoch == 0 || reportedEpoch == currentEpoch | |
| } | |
| func MaintainerEpochMatches(reportedEpoch, currentEpoch uint64) bool { | |
| if currentEpoch == 0 { | |
| // Compatibility-mode maintainer accepts both legacy and strict peers. | |
| return true | |
| } | |
| // Strict mode: only exact epoch match is valid. | |
| return reportedEpoch == currentEpoch | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@pkg/common/maintainer_epoch.go` around lines 21 - 23, The current
MaintainerEpochMatches allows reportedEpoch==0 unconditionally which accepts
epoch-0 messages after a strict epoch is established; update the function
MaintainerEpochMatches so it only treats epoch-0 as a match when the
currentEpoch is also 0 (i.e., return true if reportedEpoch == currentEpoch OR
both reportedEpoch and currentEpoch are 0), thereby rejecting reportedEpoch==0
when currentEpoch > 0 to restore strict fencing.
| // AdvanceChangefeedEpoch returns max(candidate, current+1). | ||
| func AdvanceChangefeedEpoch(candidate, current uint64) (uint64, error) { | ||
| if candidate > current { | ||
| return candidate, nil | ||
| } | ||
| if current == ^uint64(0) { | ||
| return 0, cerror.ErrSchedulerRequestFailed.GenWithStackByArgs("changefeed epoch overflow") | ||
| } | ||
| return current + 1, nil | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Call sites: GenerateChangefeedEpoch (fallback-capable) ==="
rg -n -C2 --type go '\bGenerateChangefeedEpoch\s*\('
echo
echo "=== Call sites: AdvanceChangefeedEpoch (monotonic helper) ==="
rg -n -C2 --type go '\bAdvanceChangefeedEpoch\s*\('
echo
echo "=== Direct PD TS usage (GetTS) in Go files ==="
rg -n -C2 --type go '\.GetTS\s*\('Repository: pingcap/ticdc
Length of output: 6754
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Show GenerateChangefeedEpoch implementation ==="
sed -n '1,140p' pkg/pdutil/utils.go | nl -ba | sed -n '40,120p'
echo
echo "=== Find bump/fence related epoch paths ==="
rg -n --type go '\bBumpChangefeedEpoch\s*\(' coordinator pkg | head -n 50
echo
echo "=== Search for 'fence' terminology around epochs ==="
rg -n --type go '\bfence(d|r|ing)?\b' coordinator pkg | head -n 80
echo
echo "=== Show call sites context for GenerateChangefeedEpoch ==="
for f in coordinator/controller.go coordinator/operator/operator_controller.go; do
echo "--- $f : context around GenerateChangefeedEpoch ---"
rg -n -C3 --type go 'GenerateChangefeedEpoch\s*\(' "$f"
done
echo
echo "=== Show call sites context for AdvanceChangefeedEpoch ==="
for f in coordinator/changefeed/etcd_backend.go coordinator/coordinator_test.go; do
echo "--- $f : context around AdvanceChangefeedEpoch ---"
rg -n -C3 --type go 'AdvanceChangefeedEpoch\s*\(' "$f"
doneRepository: pingcap/ticdc
Length of output: 170
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Show GenerateChangefeedEpoch implementation (pkg/pdutil/utils.go) ==="
sed -n '40,120p' pkg/pdutil/utils.go
echo
echo "=== Find bump/fence related epoch paths (BumpChangefeedEpoch) ==="
rg -n --type go '\bBumpChangefeedEpoch\s*\(' coordinator pkg | head -n 80
echo
echo "=== Search for 'fence' terminology around epochs ==="
rg -n --type go '\bfence(d|r|ing)?\b' coordinator pkg | head -n 80 || true
echo
echo "=== Call sites context for GenerateChangefeedEpoch ==="
for f in coordinator/controller.go coordinator/operator/operator_controller.go; do
echo "--- $f : context around GenerateChangefeedEpoch ---"
rg -n -C5 --type go 'GenerateChangefeedEpoch\s*\(' "$f" || true
done
echo
echo "=== Call sites context for AdvanceChangefeedEpoch ==="
for f in coordinator/changefeed/etcd_backend.go coordinator/coordinator_test.go pkg/pdutil/utils_test.go; do
echo "--- $f : context around AdvanceChangefeedEpoch ---"
rg -n -C5 --type go 'AdvanceChangefeedEpoch\s*\(' "$f" || true
doneRepository: pingcap/ticdc
Length of output: 7703
Update: epoch fencing paths still use the fallback timestamp when PD GetTS fails.
pdutil.GenerateChangefeedEpoch falls back to time.Now().UnixNano() (and logs a warn) on pdClient.GetTS(ctx) error, and the coordinator/operator epoch-bump paths still call GenerateChangefeedEpoch to produce the candidateEpoch passed into BumpChangefeedEpoch (which only uses AdvanceChangefeedEpoch to reconcile with the stored epoch).
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@pkg/pdutil/utils.go` around lines 70 - 79, GenerateChangefeedEpoch currently
returns a fallback time.Now().UnixNano() and only logs a warning when
pdClient.GetTS(ctx) fails, allowing coordinator/operator code to call
BumpChangefeedEpoch with an unreliable candidateEpoch; change
GenerateChangefeedEpoch to propagate and return the pdClient.GetTS error instead
of returning a fallback value, update its signature and all callers
(coordinator/operator epoch-bump paths that call GenerateChangefeedEpoch) to
handle/abort on that error before calling BumpChangefeedEpoch, and ensure
BumpChangefeedEpoch and AdvanceChangefeedEpoch remain unchanged in logic so only
callers pass a valid candidateEpoch obtained from PD.
| ensure 60 "grep \"merge dispatcher uses pending block event to calculate start ts\" $WORK_DIR/cdc*.log | grep -Eq \"pendingCommitTs[^0-9]*${syncpoint_ts}\"" | ||
| merge_line=$(grep "merge dispatcher uses pending block event to calculate start ts" $WORK_DIR/cdc*.log | grep -E "pendingCommitTs[^0-9]*${syncpoint_ts}" | tail -n 1 || true) |
There was a problem hiding this comment.
Quote $WORK_DIR while preserving the cdc*.log glob.
Line 122 and Line 123 use unquoted $WORK_DIR, which can break command parsing if the path contains spaces and make this test flaky.
Proposed fix
- ensure 60 "grep \"merge dispatcher uses pending block event to calculate start ts\" $WORK_DIR/cdc*.log | grep -Eq \"pendingCommitTs[^0-9]*${syncpoint_ts}\""
- merge_line=$(grep "merge dispatcher uses pending block event to calculate start ts" $WORK_DIR/cdc*.log | grep -E "pendingCommitTs[^0-9]*${syncpoint_ts}" | tail -n 1 || true)
+ ensure 60 "grep \"merge dispatcher uses pending block event to calculate start ts\" \"$WORK_DIR\"/cdc*.log | grep -Eq \"pendingCommitTs[^0-9]*${syncpoint_ts}\""
+ merge_line=$(grep "merge dispatcher uses pending block event to calculate start ts" "$WORK_DIR"/cdc*.log | grep -E "pendingCommitTs[^0-9]*${syncpoint_ts}" | tail -n 1 || true)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| ensure 60 "grep \"merge dispatcher uses pending block event to calculate start ts\" $WORK_DIR/cdc*.log | grep -Eq \"pendingCommitTs[^0-9]*${syncpoint_ts}\"" | |
| merge_line=$(grep "merge dispatcher uses pending block event to calculate start ts" $WORK_DIR/cdc*.log | grep -E "pendingCommitTs[^0-9]*${syncpoint_ts}" | tail -n 1 || true) | |
| ensure 60 "grep \"merge dispatcher uses pending block event to calculate start ts\" \"$WORK_DIR\"/cdc*.log | grep -Eq \"pendingCommitTs[^0-9]*${syncpoint_ts}\"" | |
| merge_line=$(grep "merge dispatcher uses pending block event to calculate start ts" "$WORK_DIR"/cdc*.log | grep -E "pendingCommitTs[^0-9]*${syncpoint_ts}" | tail -n 1 || true) |
🧰 Tools
🪛 Shellcheck (0.11.0)
[info] 123-123: Double quote to prevent globbing and word splitting.
(SC2086)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tests/integration_tests/in_flight_syncpoint_during_scheduling/run.sh` around
lines 122 - 123, The two shell commands using $WORK_DIR (the ensure/... grep
line and the merge_line=... assignment) should wrap the variable in double
quotes while leaving the cdc*.log glob unquoted so paths with spaces don't
break; update the ensure invocation that greps "$WORK_DIR"/cdc*.log and the
merge_line assignment to reference "$WORK_DIR"/cdc*.log (preserving the grep
patterns and tail logic) so the glob still expands but the base directory is
safely quoted.
Source: Linters/SAST tools
There was a problem hiding this comment.
🧹 Nitpick comments (2)
coordinator/operator/operator_controller_test.go (2)
91-91: ⚡ Quick winRename the newly added tests to camelCase.
These new test functions introduce underscores, which conflicts with the repo's Go naming rule for functions. Please keep the new additions in camelCase even if older tests in this file still use the legacy style.
As per coding guidelines, "Functions should use camelCase naming and do not include underscores (e.g.,
getPartitionNum, notget_partition_num)."Also applies to: 111-111, 226-226, 309-309, 367-367, 446-446
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@coordinator/operator/operator_controller_test.go` at line 91, The test function names use underscores and must be renamed to camelCase; specifically rename TestController_StopChangefeedWithMaintainerEpoch to TestControllerStopChangefeedWithMaintainerEpoch and apply the same transformation to the other new tests mentioned (remove underscores and PascalCase each word), then update any references in the test file (t.Run names or direct calls) to match the new identifiers so the tests compile and follow the repo naming guideline.Source: Coding guidelines
273-285: ⚡ Quick winAssert the exact PD-derived candidate epoch here.
This test injects a deterministic PD client, but the mock only checks that
candidateEpochis non-zero. That still misses regressions whereAddOperatorstops using the PD TSO and falls back to some other non-zero value, which is one of the key contracts this PR is meant to lock down.Suggested assertion
backend.EXPECT(). BumpChangefeedEpoch(gomock.Any(), cfID, gomock.Any(), changefeed.EpochBumpOptions{}). DoAndReturn(func(ctx context.Context, id common.ChangeFeedID, candidateEpoch uint64, options changefeed.EpochBumpOptions) (*config.ChangeFeedInfo, error) { - require.NotZero(t, candidateEpoch) + require.Equal(t, oracle.ComposeTS(100, 1), candidateEpoch) require.Equal(t, cfID, id) require.False(t, options.UpdateStatus)As per coding guidelines, "Prefer focused deterministic tests; see docs/agents/testing.md before adding or changing tests."
Source: Coding guidelines
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@coordinator/operator/operator_controller_test.go`:
- Line 91: The test function names use underscores and must be renamed to
camelCase; specifically rename TestController_StopChangefeedWithMaintainerEpoch
to TestControllerStopChangefeedWithMaintainerEpoch and apply the same
transformation to the other new tests mentioned (remove underscores and
PascalCase each word), then update any references in the test file (t.Run names
or direct calls) to match the new identifiers so the tests compile and follow
the repo naming guideline.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: bc22ecc0-26a9-4730-86ce-6fa8402cb687
📒 Files selected for processing (3)
coordinator/controller_test.gocoordinator/operator/operator_controller_test.godownstreamadapter/dispatchermanager/dispatcher_manager_test.go
🚧 Files skipped from review as they are similar to previous changes (2)
- downstreamadapter/dispatchermanager/dispatcher_manager_test.go
- coordinator/controller_test.go
|
/test all |
|
@hongyunyan: The following test failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
What problem does this PR solve?
Issue Number: close #5083
During maintainer failover, a delayed schedule request from the previous
maintainer can still reach a dispatcher manager after the new maintainer has
already bootstrapped and recreated the same table span. Without a receiver-side
ownership fence, the stale request can create an orphan dispatcher that enters
Workingand writes to the downstream sink before the new maintainer observesand removes it.
What is changed and how it works?
This PR adds a receiver-local maintainer generation fence:
generationto maintainer bootstrap, schedule, post-bootstrap, and closeheartbeat messages.
scheduled through add/move operators, and before resume/retry scheduling.
ChangeFeedInfoand job status, advancing withmax(candidate, persisted+1),preserving stored status by default, and writing info/job under info-key and
job-key
ModRevisioncompares.of first doing an ordinary no-CAS changefeed update.
changefeed's generation strictly monotonic with
max(candidate, current+1).AddMaintainerRequest.Configbytes synchronized with the latestChangeFeedInfo.request generation and reject stale schedule/post-bootstrap/close requests
locally.
dispatcher IDs during same-capture higher-epoch maintainer replacement so
post-bootstrap uses the new maintainer trigger IDs.
TryClosebefore removing them, so takeover bootstrap retries instead ofdropping pending trigger events.
the replacement trigger resumes from the same recovery boundary.
changes, and keeps
currentOperatorMapkeyed by dispatcher ID and generation.0only while thereceiver has not observed a non-zero generation for the changefeed, and only
for the current compatibility-mode maintainer owner.
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
No expected performance regression. The new mutex only serializes per-changefeed
dispatcher-manager control operations such as bootstrap, close, and dispatcher
create/remove scheduling; it is not in the event write path.
The change is wire-compatible. New fields are optional protobuf fields, and a
new receiver still allows generation
0from the current maintainer owner whileit remains in compatibility mode for that changefeed.
Do you need to update user documentation, design documentation or monitoring documentation?
No.
Release note
Validation
make generate-protobufmake fmttools/bin/golangci-lint run --timeout 10m0s --new-from-rev=upstream/mastergo test ./coordinator/changefeed ./coordinator/operator ./coordinator ./pkg/pdutilgo test ./downstreamadapter/dispatchermanager ./downstreamadapter/dispatcherorchestrator ./coordinator ./coordinator/changefeed ./coordinator/operator ./pkg/pdutil ./maintainer ./maintainer/replica ./maintainer/operatorgo test ./api/v1 ./coordinator ./coordinator/changefeed ./coordinator/drain ./coordinator/operator ./coordinator/scheduler ./downstreamadapter/dispatchermanager ./downstreamadapter/dispatcherorchestrator ./maintainer ./maintainer/replica ./maintainer/operator ./pkg/bootstrap ./pkg/server ./pkg/pdutilgo test -count=1 ./downstreamadapter/dispatcherorchestrator ./downstreamadapter/dispatchermanagergit diff --checkSummary by CodeRabbit
New Features
Bug Fixes
Tests