Skip to content

maintainer,dispatcher, coordinator: fence stale generation requests#5182

Open
hongyunyan wants to merge 69 commits into
pingcap:masterfrom
hongyunyan:codex/fix-5083-generation-fence
Open

maintainer,dispatcher, coordinator: fence stale generation requests#5182
hongyunyan wants to merge 69 commits into
pingcap:masterfrom
hongyunyan:codex/fix-5083-generation-fence

Conversation

@hongyunyan

@hongyunyan hongyunyan commented Jun 1, 2026

Copy link
Copy Markdown
Collaborator

What problem does this PR solve?

Issue Number: close #5083

During maintainer failover, a delayed schedule request from the previous
maintainer can still reach a dispatcher manager after the new maintainer has
already bootstrapped and recreated the same table span. Without a receiver-side
ownership fence, the stale request can create an orphan dispatcher that enters
Working and writes to the downstream sink before the new maintainer observes
and removes it.

What is changed and how it works?

This PR adds a receiver-local maintainer generation fence:

  • Adds generation to maintainer bootstrap, schedule, post-bootstrap, and close
    heartbeat messages.
  • Bumps and persists the changefeed epoch before new maintainer ownership is
    scheduled through add/move operators, and before resume/retry scheduling.
  • Serializes persisted epoch bumps in the backend by reading the latest stored
    ChangeFeedInfo and job status, advancing with max(candidate, persisted+1),
    preserving stored status by default, and writing info/job under info-key and
    job-key ModRevision compares.
  • Writes warning retry state/error through the same epoch bump boundary instead
    of first doing an ordinary no-CAS changefeed update.
  • Generates epochs from PD TSO without silent production fallback, and keeps each
    changefeed's generation strictly monotonic with max(candidate, current+1).
  • Keeps AddMaintainerRequest.Config bytes synchronized with the latest
    ChangeFeedInfo.
  • Stamps maintainer outbound control messages with the changefeed epoch.
  • Makes dispatcher managers track the active maintainer owner plus explicit
    request generation and reject stale schedule/post-bootstrap/close requests
    locally.
  • Reconciles existing dispatcher-manager table trigger event and redo
    dispatcher IDs during same-capture higher-epoch maintainer replacement so
    post-bootstrap uses the new maintainer trigger IDs.
  • Waits for mismatched old table trigger event and redo dispatchers to pass
    TryClose before removing them, so takeover bootstrap retries instead of
    dropping pending trigger events.
  • Preserves the old table trigger start timestamp across async drain retries so
    the replacement trigger resumes from the same recovery boundary.
  • Serializes dispatcher-manager control requests with maintainer generation
    changes, and keeps currentOperatorMap keyed by dispatcher ID and generation.
  • Keeps rolling-upgrade compatibility by allowing generation 0 only while the
    receiver has not observed a non-zero generation for the changefeed, and only
    for the current compatibility-mode maintainer owner.

Check List

Tests

  • Unit test

Questions

Will it cause performance regression or break compatibility?

No expected performance regression. The new mutex only serializes per-changefeed
dispatcher-manager control operations such as bootstrap, close, and dispatcher
create/remove scheduling; it is not in the event write path.

The change is wire-compatible. New fields are optional protobuf fields, and a
new receiver still allows generation 0 from the current maintainer owner while
it remains in compatibility mode for that changefeed.

Do you need to update user documentation, design documentation or monitoring documentation?

No.

Release note

Fix a bug where delayed stale maintainer requests could create duplicate dispatchers during maintainer failover.

Validation

  • make generate-protobuf
  • make fmt
  • tools/bin/golangci-lint run --timeout 10m0s --new-from-rev=upstream/master
  • go test ./coordinator/changefeed ./coordinator/operator ./coordinator ./pkg/pdutil
  • go test ./downstreamadapter/dispatchermanager ./downstreamadapter/dispatcherorchestrator ./coordinator ./coordinator/changefeed ./coordinator/operator ./pkg/pdutil ./maintainer ./maintainer/replica ./maintainer/operator
  • go test ./api/v1 ./coordinator ./coordinator/changefeed ./coordinator/drain ./coordinator/operator ./coordinator/scheduler ./downstreamadapter/dispatchermanager ./downstreamadapter/dispatcherorchestrator ./maintainer ./maintainer/replica ./maintainer/operator ./pkg/bootstrap ./pkg/server ./pkg/pdutil
  • go test -count=1 ./downstreamadapter/dispatcherorchestrator ./downstreamadapter/dispatchermanager
  • git diff --check

Summary by CodeRabbit

  • New Features

    • Maintainer-epoch is globally propagated and included in heartbeat, scheduling and maintainer lifecycle messages; epoch-aware ownership bumps are persisted with options to update status/checkpoint.
    • Message and operator flows now carry maintainer epoch to coordinate adds/removes/moves safely.
  • Bug Fixes

    • Stale or out-of-order maintainer messages are dropped to prevent races during bootstrap, resume, handover, and removals.
    • Epoch-bump retries and CAS handling reduce lost or conflicting ownership updates.
  • Tests

    • Expanded unit and integration tests covering epoch semantics, bump/retry behavior, fencing, dispatcher bootstrap/close, and operator concurrency.

@ti-chi-bot

ti-chi-bot Bot commented Jun 1, 2026

Copy link
Copy Markdown

Skipping CI for Draft Pull Request.
If you want CI signal for your change, please convert it to an actual PR.
You can still manually trigger a test run with /test all

@ti-chi-bot ti-chi-bot Bot added do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. release-note Denotes a PR that will be considered when it comes time to generate release notes. do-not-merge/needs-triage-completed labels Jun 1, 2026
@ti-chi-bot

ti-chi-bot Bot commented Jun 1, 2026

Copy link
Copy Markdown

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign charlescheung96 for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@coderabbitai

coderabbitai Bot commented Jun 1, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Threads maintainer_epoch through proto, coordinator, backend, maintainer, operators, dispatcher manager/orchestrator, and span replication; adds Backend.BumpChangefeedEpoch with EpochBumpOptions; gates bootstrap/close/messages by maintainer epoch; implements dispatcher replacement fencing; updates tests.

Changes

Maintainer-epoch fencing and propagation

Layer / File(s) Summary
Protocol and contracts
heartbeatpb/heartbeat.proto, pkg/common/maintainer_epoch.go
Adds maintainer_epoch fields to heartbeat messages and MaintainerEpochMatches helper.
Epoch bump persistence and Backend API
coordinator/changefeed/changefeed_db_backend.go, coordinator/changefeed/etcd_backend.go, pkg/pdutil/*
Adds EpochBumpOptions and Backend.BumpChangefeedEpoch; implements EtcdBackend.BumpChangefeedEpoch and pkg/pdutil.AdvanceChangefeedEpoch. Tests added for bump, status updates, and CAS-retry behavior.
Coordinator and controller flows
coordinator/controller.go, coordinator/coordinator.go, coordinator/*_test.go, coordinator/operator/*
Resume/stop/state-change/bootstrap flows now call BumpChangefeedEpoch, track multiple remote maintainers, select bootstrap maintainer, propagate maintainer epochs into stop/remove messages, and handle warning-state epoch bumps.
Changefeed message wiring
coordinator/changefeed/changefeed.go, coordinator/changefeed/*_test.go
Changefeed no longer caches serialized config; Add/Remove maintainer messages marshal config on demand and include MaintainerEpoch/KeyspaceId in requests.
Maintainer core and lifecycle
maintainer/*, maintainer/maintainer_manager_maintainers.go, maintainer/*_test.go
Controller tracks maintainer epoch, filters/stales bootstrap/post-bootstrap/close responses by epoch, serializes registry mutations, and updates add/remove admission to be epoch-aware with replacement semantics.
Operators and span replication
maintainer/operator/*, maintainer/replica/replication_span.go, coordinator/operator/*
Add/Remove/Move/Merge/Split/Stop operators and span replication now accept/propagate maintainerEpoch into ScheduleDispatcherRequest/maintainer requests and operator constructors.
Dispatcher manager & orchestrator
downstreamadapter/dispatchermanager/*, downstreamadapter/dispatcherorchestrator/*
DispatcherManager gains MaintainerFenceMu; Orchestrator tracks closed maintainer epochs, gates bootstraps/closes by epoch, reconciles replacements, and clones appended operator requests.
Message handling and streams
downstreamadapter/dispatchermanager/helper.go, heartbeat_collector.go
Wrapper types carry From node.ID and requests now include maintainer epoch; handlers acquire MaintainerFenceMu and drop stale requests/responses.
Tests, utilities, and misc wiring
many _test.go, pkg/pdutil/utils_test.go, tests/integration_tests/*
Extensive test updates asserting epoch behavior, PD helper tests, gomock helper changes, and minor integration script log-scan adjustments.

Estimated code review effort
🎯 4 (Complex) | ⏱️ ~45 minutes

Suggested labels
lgtm, approved

Suggested reviewers

  • asddongmen
  • wk989898
  • flowbehappy

"🐇 Epochs hop in orderly queues,
stale maintainers wait their dues.
One fence keeps duplicates at bay,
handovers safe through night and day.
🥕"

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

@ti-chi-bot ti-chi-bot Bot added the size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. label Jun 1, 2026

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a maintainer generation/epoch fencing mechanism to prevent stale maintainer requests from affecting dispatcher managers. It adds generation fields to heartbeat protobuf messages, implements fencing logic in the dispatcher manager and orchestrator, and stamps outgoing requests with the current maintainer generation. The review feedback highlights two critical head-of-line blocking issues in dispatcher_orchestrator.go where the orchestrator-wide lock m.mutex is held while waiting for the per-changefeed lock manager.LockControl(), and provides suggestions to safely release the lock before acquiring the per-changefeed lock.

Comment on lines +224 to +235
} else {
manager.LockControl()
defer manager.UnlockControl()
if !manager.TryUpdateMaintainer(from, generation) {
log.Warn("drop stale maintainer bootstrap request",
zap.String("changefeed", cfId.Name()),
zap.String("from", from.String()),
zap.Uint64("requestGeneration", generation),
zap.Uint64("currentGeneration", manager.GetMaintainerEpoch()),
zap.String("currentMaintainer", manager.GetMaintainerID().String()))
return nil
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Holding the orchestrator-wide lock m.mutex while waiting for the per-changefeed lock manager.LockControl() can cause head-of-line blocking. If a single changefeed's dispatcher manager is slow or blocked, all other changefeeds on this node will be blocked from bootstrapping or closing.

To avoid this, we should unlock m.mutex as soon as we retrieve the manager from m.dispatcherManagers, and then acquire manager.LockControl(). To prevent races with concurrent close/delete operations, we can re-verify under m.mutex that the manager is still the active one in m.dispatcherManagers before proceeding.

	} else {
		m.mutex.Unlock()
		manager.LockControl()
		defer manager.UnlockControl()

		m.mutex.Lock()
		currentManager, stillExists := m.dispatcherManagers[cfId]
		if !stillExists || currentManager != manager {
			m.mutex.Unlock()
			return nil
		}
		m.mutex.Unlock()

		if !manager.TryUpdateMaintainer(from, generation) {
			log.Warn("drop stale maintainer bootstrap request",
				zap.String("changefeed", cfId.Name()),
				zap.String("from", from.String()),
				zap.Uint64("requestGeneration", generation),
				zap.Uint64("currentGeneration", manager.GetMaintainerEpoch()),
				zap.String("currentMaintainer", manager.GetMaintainerID().String()))
			return nil
		}

Comment on lines 389 to 410
m.mutex.Lock()
if manager, ok := m.dispatcherManagers[cfId]; ok {
if closed := manager.TryClose(req.Removed); closed {
delete(m.dispatcherManagers, cfId)
metrics.DispatcherManagerGauge.WithLabelValues(cfId.Keyspace(), cfId.Name()).Dec()
response.Success = true
manager.LockControl()
if manager.IsMaintainerRequestAllowed(from, req.Generation) {
if closed := manager.TryClose(req.Removed); closed {
delete(m.dispatcherManagers, cfId)
metrics.DispatcherManagerGauge.WithLabelValues(cfId.Keyspace(), cfId.Name()).Dec()
response.Success = true
} else {
response.Success = false
}
} else {
response.Success = false
log.Warn("drop stale maintainer close request",
zap.String("changefeed", cfId.Name()),
zap.String("from", from.String()),
zap.Uint64("requestGeneration", req.Generation),
zap.Uint64("currentGeneration", manager.GetMaintainerEpoch()),
zap.String("currentMaintainer", manager.GetMaintainerID().String()))
}
manager.UnlockControl()
}
m.mutex.Unlock()

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Holding the orchestrator-wide lock m.mutex while waiting for the per-changefeed lock manager.LockControl() can cause head-of-line blocking. If a single changefeed's dispatcher manager is slow or blocked, all other changefeeds on this node will be blocked from bootstrapping or closing.

To avoid this, we should unlock m.mutex as soon as we retrieve the manager from m.dispatcherManagers, and then acquire manager.LockControl(). To prevent races with concurrent close/delete operations, we can re-verify under m.mutex that the manager is still the active one in m.dispatcherManagers before proceeding.

	m.mutex.Lock()
	manager, ok := m.dispatcherManagers[cfId]
	if !ok {
		m.mutex.Unlock()
		return response
	}
	m.mutex.Unlock()

	manager.LockControl()
	defer manager.UnlockControl()

	m.mutex.Lock()
	currentManager, stillExists := m.dispatcherManagers[cfId]
	if !stillExists || currentManager != manager {
		m.mutex.Unlock()
		response.Success = false
		return response
	}

	if manager.IsMaintainerRequestAllowed(from, req.Generation) {
		if closed := manager.TryClose(req.Removed); closed {
			delete(m.dispatcherManagers, cfId)
			metrics.DispatcherManagerGauge.WithLabelValues(cfId.Keyspace(), cfId.Name()).Dec()
			response.Success = true
		} else {
			response.Success = false
		}
	} else {
		log.Warn("drop stale maintainer close request",
			zap.String("changefeed", cfId.Name()),
			zap.String("from", from.String()),
			zap.Uint64("requestGeneration", req.Generation),
			zap.Uint64("currentGeneration", manager.GetMaintainerEpoch()),
			zap.String("currentMaintainer", manager.GetMaintainerID().String()))
	}
	m.mutex.Unlock()

@hongyunyan

Copy link
Copy Markdown
Collaborator Author

/test all

@hongyunyan

Copy link
Copy Markdown
Collaborator Author

/test pull-error-log-review

@hongyunyan

Copy link
Copy Markdown
Collaborator Author

/test all

@hongyunyan hongyunyan marked this pull request as ready for review June 9, 2026 11:52
@ti-chi-bot ti-chi-bot Bot removed the do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. label Jun 9, 2026

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

🧹 Nitpick comments (2)
maintainer/barrier_test.go (1)

932-934: ⚡ Quick win

Assert the propagated maintainer epoch in this test.

This call now passes an explicit epoch (7), but the test only asserts StartTs. Add a direct assertion on ScheduleDispatcherRequest.MaintainerEpoch so this behavior is locked by test.

Suggested test update
 scheduleMsg := ddlSpan.NewAddDispatcherMessage("node1", heartbeatpb.OperatorType_O_Add, 7)
-require.Equal(t, uint64(9), scheduleMsg.Message[0].(*heartbeatpb.ScheduleDispatcherRequest).Config.StartTs, false)
-require.NotEqual(t, uint64(0), scheduleMsg.Message[0].(*heartbeatpb.ScheduleDispatcherRequest).Config.StartTs, false)
+req := scheduleMsg.Message[0].(*heartbeatpb.ScheduleDispatcherRequest)
+require.Equal(t, uint64(7), req.MaintainerEpoch)
+require.Equal(t, uint64(9), req.Config.StartTs)
+require.NotEqual(t, uint64(0), req.Config.StartTs)

As per coding guidelines, **/*_test.go: Prefer focused deterministic tests; see docs/agents/testing.md before adding or changing tests.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@maintainer/barrier_test.go` around lines 932 - 934, The test currently
creates scheduleMsg via ddlSpan.NewAddDispatcherMessage("node1",
heartbeatpb.OperatorType_O_Add, 7) but only asserts StartTs; add an explicit
assertion that the dispatched request's MaintainerEpoch equals the passed epoch
(7). Locate where scheduleMsg is used (variable scheduleMsg and type
heartbeatpb.ScheduleDispatcherRequest) and add a require.Equal(t, uint64(7),
scheduleMsg.Message[0].(*heartbeatpb.ScheduleDispatcherRequest).MaintainerEpoch)
(or the equivalent assertion style used in the file) to lock the behavior.

Source: Coding guidelines

coordinator/operator/operator_stop_test.go (1)

62-82: ⚡ Quick win

Add a stale-epoch Check test for the new fencing path.

Line 75 and Line 79 update constructor epochs, but this file still doesn’t assert that Check ignores non-working status when MaintainerEpoch mismatches. A focused test here would lock in the new safety contract.

Proposed test shape
+func TestStopChangefeedOperator_CheckIgnoresMismatchedEpoch(t *testing.T) {
+	ctrl := gomock.NewController(t)
+	backend := mock_changefeed.NewMockBackend(ctrl)
+	cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName)
+	op := NewStopChangefeedOperator(common.DefaultKeyspaceID, cfID, "n1", "n2", backend, true, 10)
+
+	op.Check("n1", &heartbeatpb.MaintainerStatus{
+		State:           heartbeatpb.ComponentState_Stopped,
+		MaintainerEpoch: 9,
+	})
+	require.False(t, op.IsFinished())
+
+	op.Check("n1", &heartbeatpb.MaintainerStatus{
+		State:           heartbeatpb.ComponentState_Stopped,
+		MaintainerEpoch: 10,
+	})
+	require.True(t, op.IsFinished())
+}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@coordinator/operator/operator_stop_test.go` around lines 62 - 82, Add a
focused unit test in TestStopChangefeedOperator_PostFinish to cover the new
fencing path: create a StopChangefeedOperator via NewStopChangefeedOperator with
a non-matching MaintainerEpoch (simulate epoch mismatch), set the changefeed in
changefeedDB to a non-working status, then call the operator's Check method and
assert it ignores the non-working status (i.e., does not treat the changefeed as
owned/active or proceed with stop actions). Use the existing changefeedDB, cfID
and maintainers used in the test to locate where to add this assertion and
verify Check's behavior when MaintainerEpoch differs.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@coordinator/controller.go`:
- Around line 1008-1018: The resumed changefeed still carries the previous
failure in ChangeFeedInfo.Error; after calling
c.backend.BumpChangefeedEpoch(...) and before cf.SetInfo(info), clear the
persisted error (e.g., set info.Error = "" or nil) so the resumed state
(StateNormal) no longer shows the old failure. Update the resume path around the
BumpChangefeedEpoch return handling (the variables/functions:
c.backend.BumpChangefeedEpoch, changefeed.EpochBumpOptions, ProgressNone,
UpdateStatus, normalState, and cf.SetInfo(info)) to explicitly clear
ChangeFeedInfo.Error on the returned info before calling cf.SetInfo(info).

In `@coordinator/coordinator.go`:
- Around line 273-285: The warning-branch currently fences cleanup using
currentInfo.Epoch (currentMaintainerEpoch) which can be advanced during an
in-flight move and causes RemoveMaintainerRequest to be sent with the
destination epoch to the old owner; instead reuse the same origin/target and
origin-epoch resolution that stopChangefeed (and moveOriginStopTargetLocked)
uses so the stop request is built with the correct origin maintainer epoch.
Change the block handling event.state == config.StateWarning to stop using
currentInfo.Epoch directly and call the same resolution helper used by
stopChangefeed/moveOriginStopTargetLocked (or delegate to
StopChangefeedWithMaintainerEpoch via that helper) to compute the correct
maintainer epoch before invoking
c.controller.operatorController.StopChangefeedWithMaintainerEpoch; ensure
updateChangefeedEpoch still sets the warning state and UpdateError but do not
fence the cleanup with the possibly-advanced currentInfo.Epoch.

In `@downstreamadapter/dispatchermanager/dispatcher_manager_test.go`:
- Around line 266-278: The test constructs a RedoDispatcher using manager.sink
but production uses manager.redoSink; update the test to pass manager.redoSink
when calling dispatcher.NewRedoDispatcher (the instance created as oldTrigger
with oldTriggerID and common.KeyspaceDDLSpan(manager.keyspaceID)) so the test
exercises redo-specific close/flush behavior; ensure any setup that populates
manager.redoSink is present or mocked similarly to production before creating
the RedoDispatcher.

In `@downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator_test.go`:
- Line 313: Rename the test functions to remove underscores and use camelCase:
change TestPendingMessageQueue_StaleRemovedCloseCannotOverrideNewerEpochClose to
TestPendingMessageQueueStaleRemovedCloseCannotOverrideNewerEpochClose, and
likewise rename the other test at line 352 to a camelCase variant (remove
underscores). Update any references or calls to those functions in the file so
the new names are used.

In `@downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go`:
- Around line 604-642: The bootstrap response currently appends a cloned
ScheduleDispatcherRequest (req.ScheduleDispatcherRequest) including the original
req.MaintainerEpoch into response.Operators, which can be replayed by the
maintainer and get fenced out; to fix, ensure the maintainer-side consumer that
processes MaintainerBootstrapResponse (the code that reads response.Operators
and forwards to dispatcher-manager) does not forward the cloned MaintainerEpoch
verbatim but instead reconstructs the request or overwrites
ScheduleDispatcherRequest.MaintainerEpoch with the maintainer's current epoch
(e.g., manager.GetMaintainerEpoch() or equivalent) before calling
dispatcher-manager methods; locate the maintainer bootstrap consumer handling
MaintainerBootstrapResponse and update it to set the current MaintainerEpoch on
each ScheduleDispatcherRequest prior to dispatching.

In `@pkg/common/maintainer_epoch.go`:
- Around line 21-23: The current MaintainerEpochMatches allows reportedEpoch==0
unconditionally which accepts epoch-0 messages after a strict epoch is
established; update the function MaintainerEpochMatches so it only treats
epoch-0 as a match when the currentEpoch is also 0 (i.e., return true if
reportedEpoch == currentEpoch OR both reportedEpoch and currentEpoch are 0),
thereby rejecting reportedEpoch==0 when currentEpoch > 0 to restore strict
fencing.

In `@pkg/pdutil/utils.go`:
- Around line 70-79: GenerateChangefeedEpoch currently returns a fallback
time.Now().UnixNano() and only logs a warning when pdClient.GetTS(ctx) fails,
allowing coordinator/operator code to call BumpChangefeedEpoch with an
unreliable candidateEpoch; change GenerateChangefeedEpoch to propagate and
return the pdClient.GetTS error instead of returning a fallback value, update
its signature and all callers (coordinator/operator epoch-bump paths that call
GenerateChangefeedEpoch) to handle/abort on that error before calling
BumpChangefeedEpoch, and ensure BumpChangefeedEpoch and AdvanceChangefeedEpoch
remain unchanged in logic so only callers pass a valid candidateEpoch obtained
from PD.

In `@tests/integration_tests/in_flight_syncpoint_during_scheduling/run.sh`:
- Around line 122-123: The two shell commands using $WORK_DIR (the ensure/...
grep line and the merge_line=... assignment) should wrap the variable in double
quotes while leaving the cdc*.log glob unquoted so paths with spaces don't
break; update the ensure invocation that greps "$WORK_DIR"/cdc*.log and the
merge_line assignment to reference "$WORK_DIR"/cdc*.log (preserving the grep
patterns and tail logic) so the glob still expands but the base directory is
safely quoted.

---

Nitpick comments:
In `@coordinator/operator/operator_stop_test.go`:
- Around line 62-82: Add a focused unit test in
TestStopChangefeedOperator_PostFinish to cover the new fencing path: create a
StopChangefeedOperator via NewStopChangefeedOperator with a non-matching
MaintainerEpoch (simulate epoch mismatch), set the changefeed in changefeedDB to
a non-working status, then call the operator's Check method and assert it
ignores the non-working status (i.e., does not treat the changefeed as
owned/active or proceed with stop actions). Use the existing changefeedDB, cfID
and maintainers used in the test to locate where to add this assertion and
verify Check's behavior when MaintainerEpoch differs.

In `@maintainer/barrier_test.go`:
- Around line 932-934: The test currently creates scheduleMsg via
ddlSpan.NewAddDispatcherMessage("node1", heartbeatpb.OperatorType_O_Add, 7) but
only asserts StartTs; add an explicit assertion that the dispatched request's
MaintainerEpoch equals the passed epoch (7). Locate where scheduleMsg is used
(variable scheduleMsg and type heartbeatpb.ScheduleDispatcherRequest) and add a
require.Equal(t, uint64(7),
scheduleMsg.Message[0].(*heartbeatpb.ScheduleDispatcherRequest).MaintainerEpoch)
(or the equivalent assertion style used in the file) to lock the behavior.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 2127b28c-fcd6-4e68-99f4-544aee0798c9

📥 Commits

Reviewing files that changed from the base of the PR and between 6cd6024 and 066dcef.

⛔ Files ignored due to path filters (1)
  • heartbeatpb/heartbeat.pb.go is excluded by !**/*.pb.go
📒 Files selected for processing (70)
  • coordinator/changefeed/changefeed.go
  • coordinator/changefeed/changefeed_db_backend.go
  • coordinator/changefeed/changefeed_test.go
  • coordinator/changefeed/etcd_backend.go
  • coordinator/changefeed/etcd_backend_test.go
  • coordinator/changefeed/mock/changefeed_db_backend.go
  • coordinator/controller.go
  • coordinator/controller_drain_test.go
  • coordinator/controller_test.go
  • coordinator/coordinator.go
  • coordinator/coordinator_test.go
  • coordinator/create_changefeed_gc_test.go
  • coordinator/operator/operator_add.go
  • coordinator/operator/operator_add_test.go
  • coordinator/operator/operator_controller.go
  • coordinator/operator/operator_controller_test.go
  • coordinator/operator/operator_move.go
  • coordinator/operator/operator_move_test.go
  • coordinator/operator/operator_stop.go
  • coordinator/operator/operator_stop_test.go
  • coordinator/scheduler/balance_test.go
  • coordinator/scheduler/basic_test.go
  • coordinator/scheduler/drain_test.go
  • downstreamadapter/dispatchermanager/dispatcher_manager.go
  • downstreamadapter/dispatchermanager/dispatcher_manager_helper.go
  • downstreamadapter/dispatchermanager/dispatcher_manager_info.go
  • downstreamadapter/dispatchermanager/dispatcher_manager_redo.go
  • downstreamadapter/dispatchermanager/dispatcher_manager_test.go
  • downstreamadapter/dispatchermanager/heartbeat_collector.go
  • downstreamadapter/dispatchermanager/helper.go
  • downstreamadapter/dispatchermanager/helper_test.go
  • downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go
  • downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator_test.go
  • downstreamadapter/dispatcherorchestrator/helper.go
  • heartbeatpb/heartbeat.proto
  • maintainer/barrier.go
  • maintainer/barrier_event.go
  • maintainer/barrier_test.go
  • maintainer/maintainer.go
  • maintainer/maintainer_controller.go
  • maintainer/maintainer_controller_bootstrap.go
  • maintainer/maintainer_controller_helper.go
  • maintainer/maintainer_manager_maintainers.go
  • maintainer/maintainer_manager_test.go
  • maintainer/maintainer_test.go
  • maintainer/operator/operator_add.go
  • maintainer/operator/operator_add_test.go
  • maintainer/operator/operator_controller.go
  • maintainer/operator/operator_controller_test.go
  • maintainer/operator/operator_merge.go
  • maintainer/operator/operator_merge_test.go
  • maintainer/operator/operator_move.go
  • maintainer/operator/operator_move_test.go
  • maintainer/operator/operator_remove.go
  • maintainer/operator/operator_remove_test.go
  • maintainer/operator/operator_split.go
  • maintainer/operator/operator_split_test.go
  • maintainer/replica/replication_span.go
  • maintainer/replica/replication_span_test.go
  • maintainer/scheduler/balance.go
  • maintainer/scheduler/balance_splits.go
  • maintainer/scheduler/basic.go
  • maintainer/scheduler/drain.go
  • maintainer/scheduler/drain_test.go
  • maintainer/span/span_controller_test.go
  • pkg/common/format.go
  • pkg/common/maintainer_epoch.go
  • pkg/pdutil/utils.go
  • pkg/pdutil/utils_test.go
  • tests/integration_tests/in_flight_syncpoint_during_scheduling/run.sh

Comment thread coordinator/controller.go
Comment on lines +1008 to 1018
info, err := c.backend.BumpChangefeedEpoch(ctx, id, epoch, changefeed.EpochBumpOptions{
CheckpointTs: checkpointTs,
Progress: config.ProgressNone,
UpdateStatus: true,
State: &normalState,
})
if err != nil {
return err
return errors.Trace(err)
}

clone.State = config.StateNormal
clone.Epoch = pdutil.GenerateChangefeedEpoch(ctx, c.pdClient)
cf.SetInfo(clone)
cf.SetInfo(info)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Clear the persisted changefeed error when resuming.

This resume path bumps epoch, state, and status, but it never clears ChangeFeedInfo.Error. Since Line 1017 writes the returned info back into memory, a resumed failed changefeed can stay annotated with its old failure in GetChangefeed/metrics even after moving back to StateNormal.

Suggested fix
 	info, err := c.backend.BumpChangefeedEpoch(ctx, id, epoch, changefeed.EpochBumpOptions{
 		CheckpointTs: checkpointTs,
 		Progress:     config.ProgressNone,
 		UpdateStatus: true,
 		State:        &normalState,
+		UpdateError:  true,
+		Error:        nil,
 	})
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
info, err := c.backend.BumpChangefeedEpoch(ctx, id, epoch, changefeed.EpochBumpOptions{
CheckpointTs: checkpointTs,
Progress: config.ProgressNone,
UpdateStatus: true,
State: &normalState,
})
if err != nil {
return err
return errors.Trace(err)
}
clone.State = config.StateNormal
clone.Epoch = pdutil.GenerateChangefeedEpoch(ctx, c.pdClient)
cf.SetInfo(clone)
cf.SetInfo(info)
info, err := c.backend.BumpChangefeedEpoch(ctx, id, epoch, changefeed.EpochBumpOptions{
CheckpointTs: checkpointTs,
Progress: config.ProgressNone,
UpdateStatus: true,
State: &normalState,
UpdateError: true,
Error: nil,
})
if err != nil {
return errors.Trace(err)
}
cf.SetInfo(info)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@coordinator/controller.go` around lines 1008 - 1018, The resumed changefeed
still carries the previous failure in ChangeFeedInfo.Error; after calling
c.backend.BumpChangefeedEpoch(...) and before cf.SetInfo(info), clear the
persisted error (e.g., set info.Error = "" or nil) so the resumed state
(StateNormal) no longer shows the old failure. Update the resume path around the
BumpChangefeedEpoch return handling (the variables/functions:
c.backend.BumpChangefeedEpoch, changefeed.EpochBumpOptions, ProgressNone,
UpdateStatus, normalState, and cf.SetInfo(info)) to explicitly clear
ChangeFeedInfo.Error on the returned info before calling cf.SetInfo(info).

Comment on lines +273 to +285
if event.state == config.StateWarning {
warningState := config.StateWarning
currentMaintainerEpoch := currentInfo.Epoch
if err := c.controller.updateChangefeedEpoch(ctx, event.changefeedID, changefeed.EpochBumpOptions{
State: &warningState,
Error: event.err,
UpdateError: true,
}); err != nil {
return errors.Trace(err)
}
c.controller.operatorController.StopChangefeedWithMaintainerEpoch(ctx, event.changefeedID, false, currentMaintainerEpoch)
c.controller.moveChangefeedToSchedulingQueue(event.changefeedID, false, false)
return nil

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Don't fence warning cleanup with currentInfo.Epoch during an in-flight move.

currentInfo.Epoch is already advanced once ownership has been bumped for a move/add, but the origin maintainer can still be the one reporting the warning. This branch bypasses the new moveOriginStopTargetLocked logic in stopChangefeed, so it can emit a RemoveMaintainerRequest with the destination epoch to the old owner. That request will be ignored, leaving the stale writer alive—the exact downstream-conflict case this PR is trying to fence. Reuse the same origin-target/origin-epoch resolution that StopChangefeed uses before building the stop request here.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@coordinator/coordinator.go` around lines 273 - 285, The warning-branch
currently fences cleanup using currentInfo.Epoch (currentMaintainerEpoch) which
can be advanced during an in-flight move and causes RemoveMaintainerRequest to
be sent with the destination epoch to the old owner; instead reuse the same
origin/target and origin-epoch resolution that stopChangefeed (and
moveOriginStopTargetLocked) uses so the stop request is built with the correct
origin maintainer epoch. Change the block handling event.state ==
config.StateWarning to stop using currentInfo.Epoch directly and call the same
resolution helper used by stopChangefeed/moveOriginStopTargetLocked (or delegate
to StopChangefeedWithMaintainerEpoch via that helper) to compute the correct
maintainer epoch before invoking
c.controller.operatorController.StopChangefeedWithMaintainerEpoch; ensure
updateChangefeedEpoch still sets the warning state and UpdateError but do not
fence the cleanup with the possibly-advanced currentInfo.Epoch.

Comment thread downstreamadapter/dispatchermanager/dispatcher_manager_test.go Outdated
require.True(t, req.Removed)
}

func TestPendingMessageQueue_StaleRemovedCloseCannotOverrideNewerEpochClose(t *testing.T) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Rename new test functions to camelCase (no underscores).

Line 313 and Line 352 introduce new test function names with underscores, which violates the repository Go naming rule.

As per coding guidelines, **/*.go functions should use camelCase naming and do not include underscores.

🔧 Proposed rename
-func TestPendingMessageQueue_StaleRemovedCloseCannotOverrideNewerEpochClose(t *testing.T) {
+func TestPendingMessageQueueStaleRemovedCloseCannotOverrideNewerEpochClose(t *testing.T) {
@@
-func TestPendingMessageQueue_NewerMaintainerEpochOverridesPendingRequest(t *testing.T) {
+func TestPendingMessageQueueNewerMaintainerEpochOverridesPendingRequest(t *testing.T) {

Also applies to: 352-352

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator_test.go` at
line 313, Rename the test functions to remove underscores and use camelCase:
change TestPendingMessageQueue_StaleRemovedCloseCannotOverrideNewerEpochClose to
TestPendingMessageQueueStaleRemovedCloseCannotOverrideNewerEpochClose, and
likewise rename the other test at line 352 to a camelCase variant (remove
underscores). Update any references or calls to those functions in the file so
the new names are used.

Source: Coding guidelines

Comment on lines +604 to +642
manager.GetCurrentOperatorMap().Range(func(_, value any) bool {
req := value.(dispatchermanager.SchedulerDispatcherRequest)
requestAllowed := manager.IsMaintainerRequestAllowed(req.From, req.MaintainerEpoch)
dispatcherID := common.NewDispatcherIDFromPB(req.Config.DispatcherID)
if common.IsRedoMode(req.Config.Mode) {
if manager.IsRedoReady() {
_, ok := manager.GetRedoDispatcherMap().Get(dispatcherID)
// Log error if dispatcher not found and action is not create
// It's possible that the dispatcher is not found when the action is create
// because the dispatcher may be created after the operator is stored
if !ok && req.ScheduleAction != heartbeatpb.ScheduleAction_Create {
log.Error("Redo dispatcher not found, this should not happen",
zap.String("changefeed", changefeedID.String()),
zap.String("dispatcherID", req.Config.DispatcherID.String()),
)
}
dispatcherExistsKnown := !common.IsRedoMode(req.Config.Mode) || manager.IsRedoReady()
_, dispatcherReported := reportedDispatchers[reportedDispatcherKey{
id: dispatcherID,
mode: req.Config.Mode,
}]
if !requestAllowed {
// Restore stale remove only when the same bootstrap snapshot reports the dispatcher.
// This keeps the working span and cleanup intent consistent even if live maps change during cleanup.
if req.ScheduleAction != heartbeatpb.ScheduleAction_Remove || !dispatcherReported {
return true
}
} else {
_, ok := manager.GetDispatcherMap().Get(dispatcherID)
// Log error if dispatcher not found and action is not create
// It's possible that the dispatcher is not found when the action is create
// because the dispatcher may be created after the operator is stored
if !ok && req.ScheduleAction != heartbeatpb.ScheduleAction_Create {
log.Info("include stale remove operator in bootstrap response",
zap.String("changefeed", changefeedID.String()),
zap.String("dispatcherID", req.Config.DispatcherID.String()),
zap.String("from", req.From.String()),
zap.Uint64("requestMaintainerEpoch", req.MaintainerEpoch),
zap.Uint64("currentMaintainerEpoch", manager.GetMaintainerEpoch()),
zap.String("currentMaintainer", manager.GetMaintainerID().String()))
}
// Log error if dispatcher not found and action is not create.
// It's possible that the dispatcher is not found when the action is create
// because the dispatcher may be created after the operator is stored.
if dispatcherExistsKnown && !dispatcherReported && req.ScheduleAction != heartbeatpb.ScheduleAction_Create {
if common.IsRedoMode(req.Config.Mode) {
log.Error("Redo dispatcher not found, this should not happen",
zap.String("changefeed", changefeedID.String()),
zap.String("dispatcherID", req.Config.DispatcherID.String()))
} else {
log.Error("Dispatcher not found, this should not happen",
zap.String("changefeed", changefeedID.String()),
zap.String("dispatcherID", req.Config.DispatcherID.String()),
)
zap.String("dispatcherID", req.Config.DispatcherID.String()))
}
}
response.Operators = append(response.Operators, &heartbeatpb.ScheduleDispatcherRequest{
ChangefeedID: req.ChangefeedID,
Config: req.Config,
ScheduleAction: req.ScheduleAction,
OperatorType: req.OperatorType,
})
response.Operators = append(response.Operators,
proto.Clone(req.ScheduleDispatcherRequest).(*heartbeatpb.ScheduleDispatcherRequest))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

rg -n -C3 'MaintainerBootstrapResponse|\.Operators\b|ScheduleDispatcherRequest|MaintainerEpoch' maintainer downstreamadapter coordinator

Repository: pingcap/ticdc

Length of output: 50370


Flag/adjust bootstrap replay epoch handling: confirm maintainer-side rewrites cloned MaintainerEpoch

The dispatcher-orchestrator bootstrap response clones ScheduleDispatcherRequest (including req.MaintainerEpoch) when building response.Operators; if the maintainer-side bootstrap restore path replays those requests verbatim, the next schedule hop could fence them out and the “stale remove” cleanup intent would be lost. Inspect the maintainer bootstrap consumer for MaintainerBootstrapResponse / response.Operators and ensure it reconstructs requests with the current maintainer epoch (or overwrites ScheduleDispatcherRequest.MaintainerEpoch) before sending to dispatcher-manager.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go` around
lines 604 - 642, The bootstrap response currently appends a cloned
ScheduleDispatcherRequest (req.ScheduleDispatcherRequest) including the original
req.MaintainerEpoch into response.Operators, which can be replayed by the
maintainer and get fenced out; to fix, ensure the maintainer-side consumer that
processes MaintainerBootstrapResponse (the code that reads response.Operators
and forwards to dispatcher-manager) does not forward the cloned MaintainerEpoch
verbatim but instead reconstructs the request or overwrites
ScheduleDispatcherRequest.MaintainerEpoch with the maintainer's current epoch
(e.g., manager.GetMaintainerEpoch() or equivalent) before calling
dispatcher-manager methods; locate the maintainer bootstrap consumer handling
MaintainerBootstrapResponse and update it to set the current MaintainerEpoch on
each ScheduleDispatcherRequest prior to dispatching.

Comment on lines +21 to +23
func MaintainerEpochMatches(reportedEpoch, currentEpoch uint64) bool {
return reportedEpoch == 0 || currentEpoch == 0 || reportedEpoch == currentEpoch
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Epoch-0 responses are still accepted after strict epoch is established.

Line 22 currently returns true when reportedEpoch == 0, even if currentEpoch > 0. That weakens stale-message fencing in strict mode and can let legacy/stale responses pass epoch checks after failover.

Proposed fix
 func MaintainerEpochMatches(reportedEpoch, currentEpoch uint64) bool {
-	return reportedEpoch == 0 || currentEpoch == 0 || reportedEpoch == currentEpoch
+	if currentEpoch == 0 {
+		// Compatibility-mode maintainer accepts both legacy and strict peers.
+		return true
+	}
+	// Strict mode: only exact epoch match is valid.
+	return reportedEpoch == currentEpoch
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func MaintainerEpochMatches(reportedEpoch, currentEpoch uint64) bool {
return reportedEpoch == 0 || currentEpoch == 0 || reportedEpoch == currentEpoch
}
func MaintainerEpochMatches(reportedEpoch, currentEpoch uint64) bool {
if currentEpoch == 0 {
// Compatibility-mode maintainer accepts both legacy and strict peers.
return true
}
// Strict mode: only exact epoch match is valid.
return reportedEpoch == currentEpoch
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@pkg/common/maintainer_epoch.go` around lines 21 - 23, The current
MaintainerEpochMatches allows reportedEpoch==0 unconditionally which accepts
epoch-0 messages after a strict epoch is established; update the function
MaintainerEpochMatches so it only treats epoch-0 as a match when the
currentEpoch is also 0 (i.e., return true if reportedEpoch == currentEpoch OR
both reportedEpoch and currentEpoch are 0), thereby rejecting reportedEpoch==0
when currentEpoch > 0 to restore strict fencing.

Comment thread pkg/pdutil/utils.go
Comment on lines +70 to +79
// AdvanceChangefeedEpoch returns max(candidate, current+1).
func AdvanceChangefeedEpoch(candidate, current uint64) (uint64, error) {
if candidate > current {
return candidate, nil
}
if current == ^uint64(0) {
return 0, cerror.ErrSchedulerRequestFailed.GenWithStackByArgs("changefeed epoch overflow")
}
return current + 1, nil
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== Call sites: GenerateChangefeedEpoch (fallback-capable) ==="
rg -n -C2 --type go '\bGenerateChangefeedEpoch\s*\('

echo
echo "=== Call sites: AdvanceChangefeedEpoch (monotonic helper) ==="
rg -n -C2 --type go '\bAdvanceChangefeedEpoch\s*\('

echo
echo "=== Direct PD TS usage (GetTS) in Go files ==="
rg -n -C2 --type go '\.GetTS\s*\('

Repository: pingcap/ticdc

Length of output: 6754


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== Show GenerateChangefeedEpoch implementation ==="
sed -n '1,140p' pkg/pdutil/utils.go | nl -ba | sed -n '40,120p'

echo
echo "=== Find bump/fence related epoch paths ==="
rg -n --type go '\bBumpChangefeedEpoch\s*\(' coordinator pkg | head -n 50

echo
echo "=== Search for 'fence' terminology around epochs ==="
rg -n --type go '\bfence(d|r|ing)?\b' coordinator pkg | head -n 80

echo
echo "=== Show call sites context for GenerateChangefeedEpoch ==="
for f in coordinator/controller.go coordinator/operator/operator_controller.go; do
  echo "--- $f : context around GenerateChangefeedEpoch ---"
  rg -n -C3 --type go 'GenerateChangefeedEpoch\s*\(' "$f"
done

echo
echo "=== Show call sites context for AdvanceChangefeedEpoch ==="
for f in coordinator/changefeed/etcd_backend.go coordinator/coordinator_test.go; do
  echo "--- $f : context around AdvanceChangefeedEpoch ---"
  rg -n -C3 --type go 'AdvanceChangefeedEpoch\s*\(' "$f"
done

Repository: pingcap/ticdc

Length of output: 170


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== Show GenerateChangefeedEpoch implementation (pkg/pdutil/utils.go) ==="
sed -n '40,120p' pkg/pdutil/utils.go

echo
echo "=== Find bump/fence related epoch paths (BumpChangefeedEpoch) ==="
rg -n --type go '\bBumpChangefeedEpoch\s*\(' coordinator pkg | head -n 80

echo
echo "=== Search for 'fence' terminology around epochs ==="
rg -n --type go '\bfence(d|r|ing)?\b' coordinator pkg | head -n 80 || true

echo
echo "=== Call sites context for GenerateChangefeedEpoch ==="
for f in coordinator/controller.go coordinator/operator/operator_controller.go; do
  echo "--- $f : context around GenerateChangefeedEpoch ---"
  rg -n -C5 --type go 'GenerateChangefeedEpoch\s*\(' "$f" || true
done

echo
echo "=== Call sites context for AdvanceChangefeedEpoch ==="
for f in coordinator/changefeed/etcd_backend.go coordinator/coordinator_test.go pkg/pdutil/utils_test.go; do
  echo "--- $f : context around AdvanceChangefeedEpoch ---"
  rg -n -C5 --type go 'AdvanceChangefeedEpoch\s*\(' "$f" || true
done

Repository: pingcap/ticdc

Length of output: 7703


Update: epoch fencing paths still use the fallback timestamp when PD GetTS fails.
pdutil.GenerateChangefeedEpoch falls back to time.Now().UnixNano() (and logs a warn) on pdClient.GetTS(ctx) error, and the coordinator/operator epoch-bump paths still call GenerateChangefeedEpoch to produce the candidateEpoch passed into BumpChangefeedEpoch (which only uses AdvanceChangefeedEpoch to reconcile with the stored epoch).

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@pkg/pdutil/utils.go` around lines 70 - 79, GenerateChangefeedEpoch currently
returns a fallback time.Now().UnixNano() and only logs a warning when
pdClient.GetTS(ctx) fails, allowing coordinator/operator code to call
BumpChangefeedEpoch with an unreliable candidateEpoch; change
GenerateChangefeedEpoch to propagate and return the pdClient.GetTS error instead
of returning a fallback value, update its signature and all callers
(coordinator/operator epoch-bump paths that call GenerateChangefeedEpoch) to
handle/abort on that error before calling BumpChangefeedEpoch, and ensure
BumpChangefeedEpoch and AdvanceChangefeedEpoch remain unchanged in logic so only
callers pass a valid candidateEpoch obtained from PD.

Comment on lines +122 to +123
ensure 60 "grep \"merge dispatcher uses pending block event to calculate start ts\" $WORK_DIR/cdc*.log | grep -Eq \"pendingCommitTs[^0-9]*${syncpoint_ts}\""
merge_line=$(grep "merge dispatcher uses pending block event to calculate start ts" $WORK_DIR/cdc*.log | grep -E "pendingCommitTs[^0-9]*${syncpoint_ts}" | tail -n 1 || true)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Quote $WORK_DIR while preserving the cdc*.log glob.

Line 122 and Line 123 use unquoted $WORK_DIR, which can break command parsing if the path contains spaces and make this test flaky.

Proposed fix
-	ensure 60 "grep \"merge dispatcher uses pending block event to calculate start ts\" $WORK_DIR/cdc*.log | grep -Eq \"pendingCommitTs[^0-9]*${syncpoint_ts}\""
-	merge_line=$(grep "merge dispatcher uses pending block event to calculate start ts" $WORK_DIR/cdc*.log | grep -E "pendingCommitTs[^0-9]*${syncpoint_ts}" | tail -n 1 || true)
+	ensure 60 "grep \"merge dispatcher uses pending block event to calculate start ts\" \"$WORK_DIR\"/cdc*.log | grep -Eq \"pendingCommitTs[^0-9]*${syncpoint_ts}\""
+	merge_line=$(grep "merge dispatcher uses pending block event to calculate start ts" "$WORK_DIR"/cdc*.log | grep -E "pendingCommitTs[^0-9]*${syncpoint_ts}" | tail -n 1 || true)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
ensure 60 "grep \"merge dispatcher uses pending block event to calculate start ts\" $WORK_DIR/cdc*.log | grep -Eq \"pendingCommitTs[^0-9]*${syncpoint_ts}\""
merge_line=$(grep "merge dispatcher uses pending block event to calculate start ts" $WORK_DIR/cdc*.log | grep -E "pendingCommitTs[^0-9]*${syncpoint_ts}" | tail -n 1 || true)
ensure 60 "grep \"merge dispatcher uses pending block event to calculate start ts\" \"$WORK_DIR\"/cdc*.log | grep -Eq \"pendingCommitTs[^0-9]*${syncpoint_ts}\""
merge_line=$(grep "merge dispatcher uses pending block event to calculate start ts" "$WORK_DIR"/cdc*.log | grep -E "pendingCommitTs[^0-9]*${syncpoint_ts}" | tail -n 1 || true)
🧰 Tools
🪛 Shellcheck (0.11.0)

[info] 123-123: Double quote to prevent globbing and word splitting.

(SC2086)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/integration_tests/in_flight_syncpoint_during_scheduling/run.sh` around
lines 122 - 123, The two shell commands using $WORK_DIR (the ensure/... grep
line and the merge_line=... assignment) should wrap the variable in double
quotes while leaving the cdc*.log glob unquoted so paths with spaces don't
break; update the ensure invocation that greps "$WORK_DIR"/cdc*.log and the
merge_line assignment to reference "$WORK_DIR"/cdc*.log (preserving the grep
patterns and tail logic) so the glob still expands but the base directory is
safely quoted.

Source: Linters/SAST tools

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
coordinator/operator/operator_controller_test.go (2)

91-91: ⚡ Quick win

Rename the newly added tests to camelCase.

These new test functions introduce underscores, which conflicts with the repo's Go naming rule for functions. Please keep the new additions in camelCase even if older tests in this file still use the legacy style.

As per coding guidelines, "Functions should use camelCase naming and do not include underscores (e.g., getPartitionNum, not get_partition_num)."

Also applies to: 111-111, 226-226, 309-309, 367-367, 446-446

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@coordinator/operator/operator_controller_test.go` at line 91, The test
function names use underscores and must be renamed to camelCase; specifically
rename TestController_StopChangefeedWithMaintainerEpoch to
TestControllerStopChangefeedWithMaintainerEpoch and apply the same
transformation to the other new tests mentioned (remove underscores and
PascalCase each word), then update any references in the test file (t.Run names
or direct calls) to match the new identifiers so the tests compile and follow
the repo naming guideline.

Source: Coding guidelines


273-285: ⚡ Quick win

Assert the exact PD-derived candidate epoch here.

This test injects a deterministic PD client, but the mock only checks that candidateEpoch is non-zero. That still misses regressions where AddOperator stops using the PD TSO and falls back to some other non-zero value, which is one of the key contracts this PR is meant to lock down.

Suggested assertion
 			backend.EXPECT().
 				BumpChangefeedEpoch(gomock.Any(), cfID, gomock.Any(), changefeed.EpochBumpOptions{}).
 				DoAndReturn(func(ctx context.Context, id common.ChangeFeedID, candidateEpoch uint64, options changefeed.EpochBumpOptions) (*config.ChangeFeedInfo, error) {
-					require.NotZero(t, candidateEpoch)
+					require.Equal(t, oracle.ComposeTS(100, 1), candidateEpoch)
 					require.Equal(t, cfID, id)
 					require.False(t, options.UpdateStatus)

As per coding guidelines, "Prefer focused deterministic tests; see docs/agents/testing.md before adding or changing tests."

Source: Coding guidelines

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@coordinator/operator/operator_controller_test.go`:
- Line 91: The test function names use underscores and must be renamed to
camelCase; specifically rename TestController_StopChangefeedWithMaintainerEpoch
to TestControllerStopChangefeedWithMaintainerEpoch and apply the same
transformation to the other new tests mentioned (remove underscores and
PascalCase each word), then update any references in the test file (t.Run names
or direct calls) to match the new identifiers so the tests compile and follow
the repo naming guideline.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: bc22ecc0-26a9-4730-86ce-6fa8402cb687

📥 Commits

Reviewing files that changed from the base of the PR and between 998c7e0 and f9c6625.

📒 Files selected for processing (3)
  • coordinator/controller_test.go
  • coordinator/operator/operator_controller_test.go
  • downstreamadapter/dispatchermanager/dispatcher_manager_test.go
🚧 Files skipped from review as they are similar to previous changes (2)
  • downstreamadapter/dispatchermanager/dispatcher_manager_test.go
  • coordinator/controller_test.go

@hongyunyan

Copy link
Copy Markdown
Collaborator Author

/test all

@ti-chi-bot

ti-chi-bot Bot commented Jun 10, 2026

Copy link
Copy Markdown

@hongyunyan: The following test failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
pull-error-log-review 2845011 link true /test pull-error-log-review

Full PR test history. Your PR dashboard.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

@hongyunyan hongyunyan changed the title maintainer,dispatcher: fence stale generation requests maintainer,dispatcher, coordinator: fence stale generation requests Jun 11, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

do-not-merge/needs-triage-completed release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Duplicate dispatcher can be created during maintainer failover before orphan dispatcher drains

1 participant