fix(*): merge operator inconsistent after maintainer move#3769
fix(*): merge operator inconsistent after maintainer move#3769wlwilliamx wants to merge 17 commits intopingcap:masterfrom
Conversation
…e, split after maintainer restart
…st-due-to-maintainer-move-operator-lost
…st-due-to-maintainer-move-operator-lost
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
Summary of ChangesHello @wlwilliamx, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the robustness and consistency of dispatcher operations, particularly in distributed environments where maintainer nodes might restart or fail over. By introducing explicit tracking of different operator types and implementing comprehensive recovery logic for in-flight operations during bootstrap, the system can now maintain a more accurate state and seamlessly resume complex tasks like merging dispatchers, even after disruptions. This change addresses potential inconsistencies that could arise from untracked operations, leading to a more reliable and fault-tolerant system. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
…r-inconsistent-after-maintainer-move
|
/test pull-cdc-mysql-integration-heavy |
|
/test pull-cdc-mysql-integration-light |
There was a problem hiding this comment.
Code Review
This pull request introduces a mechanism to track and restore in-flight dispatcher operations (add, remove, move, split, merge) during maintainer failover and bootstrap. Key changes include adding currentOperatorMap, redoCurrentOperatorMap, and mergeOperatorMap to DispatcherManager to store ongoing operations, and updating the protobuf definitions to include OperatorType and lists of in-flight operators in the MaintainerBootstrapResponse. The HeartBeatCollector now tracks merge operators, and the SchedulerDispatcherRequestHandler prevents concurrent operations on the same span by checking these new operator maps. During bootstrap, the maintainer now restores these in-flight operators. Review comments highlight that the OperatorType should be correctly propagated and not hardcoded, especially for move and split operations, and suggest simplifying the concurrent operator check logic by potentially unifying the currentOperatorMap and redoCurrentOperatorMap.
| case heartbeatpb.ScheduleAction_Create: | ||
| switch req.OperatorType { | ||
| case heartbeatpb.OperatorType_O_Add, heartbeatpb.OperatorType_O_Move, heartbeatpb.OperatorType_O_Split: | ||
| op := operator.NewAddDispatcherOperator(spanController, replicaSet, node, heartbeatpb.OperatorType_O_Add) |
There was a problem hiding this comment.
When restoring an add operator, the original operator type from the request (req.OperatorType) should be preserved. Hardcoding OperatorType_O_Add here will cause move and split operators to be incorrectly restored as simple add operators, breaking the operator restoration logic.
| op := operator.NewAddDispatcherOperator(spanController, replicaSet, node, heartbeatpb.OperatorType_O_Add) | |
| op := operator.NewAddDispatcherOperator(spanController, replicaSet, node, req.OperatorType) |
maintainer/operator/operator_move.go
Outdated
| return m.replicaSet.NewAddDispatcherMessage(m.dest, heartbeatpb.OperatorType_O_Add) | ||
| case moveStateRemoveOrigin, moveStateAbortRemoveOrigin: | ||
| return m.replicaSet.NewRemoveDispatcherMessage(m.origin) | ||
| return m.replicaSet.NewRemoveDispatcherMessage(m.origin, heartbeatpb.OperatorType_O_Remove) |
There was a problem hiding this comment.
The add and remove parts of a move operation should both be typed as O_Move. Using O_Add and O_Remove is incorrect and will break operator restoration logic on maintainer failover, as the new maintainer will not recognize these as parts of a single move operation.
| return m.replicaSet.NewAddDispatcherMessage(m.dest, heartbeatpb.OperatorType_O_Add) | |
| case moveStateRemoveOrigin, moveStateAbortRemoveOrigin: | |
| return m.replicaSet.NewRemoveDispatcherMessage(m.origin) | |
| return m.replicaSet.NewRemoveDispatcherMessage(m.origin, heartbeatpb.OperatorType_O_Remove) | |
| return m.replicaSet.NewAddDispatcherMessage(m.dest, heartbeatpb.OperatorType_O_Move) | |
| case moveStateRemoveOrigin, moveStateAbortRemoveOrigin: | |
| return m.replicaSet.NewRemoveDispatcherMessage(m.origin, heartbeatpb.OperatorType_O_Move) |
| _, exists := dispatcherManager.currentOperatorMap.Load(operatorKey) | ||
| if exists { | ||
| log.Warn("operator key exists, skip this request", | ||
| zap.String("changefeedID", req.ChangefeedID.String()), | ||
| zap.String("dispatcherID", common.NewDispatcherIDFromPB(req.Config.DispatcherID).String()), | ||
| zap.String("operatorKey", operatorKey), | ||
| zap.Any("operator", req), | ||
| ) | ||
| continue | ||
| } | ||
| _, redoExists := dispatcherManager.redoCurrentOperatorMap.Load(operatorKey) | ||
| if redoExists { | ||
| log.Warn("redo operator key exists, skip this request", | ||
| zap.String("changefeedID", req.ChangefeedID.String()), | ||
| zap.String("dispatcherID", common.NewDispatcherIDFromPB(req.Config.DispatcherID).String()), | ||
| zap.String("operatorKey", operatorKey), | ||
| zap.Any("operator", req), | ||
| ) | ||
| continue | ||
| } |
There was a problem hiding this comment.
The logic to prevent concurrent operators on the same span or dispatcher is split between checking currentOperatorMap and redoCurrentOperatorMap. This could be simplified by using a single map for both, with a composite key or value to distinguish between redo and normal modes. This would reduce code duplication and make the logic easier to follow.
|
/test pull-cdc-mysql-integration-heavy |
|
/test pull-cdc-mysql-integration-light |
|
@wlwilliamx: The following test failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
…r-inconsistent-after-maintainer-move # Conflicts: # downstreamadapter/dispatchermanager/dispatcher_manager.go # downstreamadapter/dispatchermanager/dispatcher_manager_info.go # downstreamadapter/dispatchermanager/dispatcher_manager_redo.go # downstreamadapter/dispatchermanager/helper.go # downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go # heartbeatpb/heartbeat.pb.go # heartbeatpb/heartbeat.proto # maintainer/maintainer_controller.go # maintainer/maintainer_controller_bootstrap.go # maintainer/maintainer_controller_helper.go # maintainer/maintainer_manager_test.go # maintainer/maintainer_test.go # maintainer/operator/operator_add.go # maintainer/operator/operator_move.go # maintainer/operator/operator_remove.go # maintainer/replica/replication_span.go # maintainer/replica/replication_span_test.go
📝 WalkthroughWalkthroughThis PR introduces in-flight merge operator tracking to enable bootstrap recovery after maintainer failover. It adds a mergeOperatorMap to DispatcherManager, new public methods for tracking and querying merge operations, integration into heartbeat collection and bootstrap responses, and bootstrap restoration logic to recreate merge operations from persisted state. Changes
Sequence Diagram(s)sequenceDiagram
participant Helper as Merge Helper
participant DM as DispatcherManager
participant HC as HeartBeatCollector
participant DO as DispatcherOrchestrator
participant Maintainer as Maintainer Controller
Helper->>DM: TrackMergeOperator(request)
activate DM
DM->>DM: Store request in mergeOperatorMap
deactivate DM
HC->>DM: GetMergeOperators()
activate DM
DM-->>HC: Return tracked requests
deactivate DM
DO->>DM: GetMergeOperators()
activate DM
DM-->>DO: Return tracked requests
deactivate DM
DO->>Maintainer: BootstrapResponse with merge_operators
activate Maintainer
Maintainer->>Maintainer: Restore merge operators from state
Maintainer->>Maintainer: Create SpanReplications
Maintainer->>Maintainer: Invoke AddRestoredMergeOperator
Maintainer->>Maintainer: Start restored merge operation
deactivate Maintainer
sequenceDiagram
participant Task as Merge Task
participant DM as DispatcherManager
participant Cleanup as Cleanup Handler
Task->>Task: doMerge() completes
activate Task
Task->>DM: RemoveMergeOperator(mergedDispatcherID)
activate DM
DM->>DM: Remove from mergeOperatorMap
deactivate DM
Task->>Cleanup: Mark merge complete
deactivate Task
Task->>Task: abortMerge() executes
activate Task
Task->>DM: RemoveMergeOperator(mergedDispatcherID)
activate DM
DM->>DM: Remove from mergeOperatorMap
deactivate DM
Task->>Cleanup: Log abort event
deactivate Task
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Tip Try Coding Plans. Let us write the prompt for your AI agent so you can ship faster (with fewer bugs). Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. |
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (2)
downstreamadapter/dispatchermanager/heartbeat_collector.go (1)
294-296: Duplicate tracking—consider consolidating.This
TrackMergeOperatorcall is redundant with the one inMergeDispatcherRequestHandler.Handle(helper.go, line 788). Both track the same request. While idempotent and harmless, consolidating to a single call would reduce confusion. Consider removing this early tracking and relying solely on the handler, or vice versa.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/dispatchermanager/heartbeat_collector.go` around lines 294 - 296, The call to DispatcherManager.TrackMergeOperator in the heartbeat collector is a duplicate of the tracking performed in MergeDispatcherRequestHandler.Handle (MergeDispatcherRequestHandler.Handle already tracks the same merge request); remove the early TrackMergeOperator invocation from the heartbeat_collector code path (the manager.(*DispatcherManager).TrackMergeOperator(mergeDispatcherRequest) call) so the single canonical tracking remains in MergeDispatcherRequestHandler.Handle, keeping idempotency while avoiding confusing redundant calls.downstreamadapter/dispatchermanager/helper.go (1)
788-800: RedundantTrackMergeOperatorcall.
TrackMergeOperatoris already invoked inRecvMessages(heartbeat_collector.go, line 295) before pushing to the dynamic stream. Calling it again here is idempotent but unnecessary. Consider removing one of the calls to avoid confusion—keeping it in the handler (here) is preferable since it's closer to the actual merge logic and ensures tracking even if the request arrives through a different path.The nil-check and
MaybeCleanupMergeOperatorcall for failed merges is correct and ensures no stale entries remain inmergeOperatorMap.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/dispatchermanager/helper.go` around lines 788 - 800, Redundant call to TrackMergeOperator: remove the earlier invocation in RecvMessages (heartbeat_collector.go) so that tracking happens only in this handler before calling MergeDispatcher; keep the TrackMergeOperator(dispatcherManager.TrackMergeOperator(mergeDispatcherRequest.MergeDispatcherRequest)) call here, ensure MergeDispatcher(...) and the nil-check that calls MaybeCleanupMergeOperator(...) remain unchanged, and run tests to confirm no behavior change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@downstreamadapter/dispatchermanager/dispatcher_manager_merge.go`:
- Around line 24-29: The TrackMergeOperator method currently stores requests
whose protobuf ID decodes to a zero-valued DispatcherID, causing all such
requests to collide; before storing into e.mergeOperatorMap call
mergedID.IsZero() and return if true (mirror the guard used in
MaybeCleanupMergeOperator), and do the same check wherever mergeOperatorMap is
written (e.g., in the second occurrence around MaybeCleanupMergeOperator
handling) so that cloneMergeDispatcherRequest(req) and the
mergeOperatorMap.Store only run for non-zero mergedID values.
In `@maintainer/maintainer_controller_bootstrap.go`:
- Around line 133-136: The restoreCurrentMergeOperators call always uses
buildTableSplitMap(tables) but must use a mode-specific split map because
restoreCurrentMergeOperators rebuilds merges based on mergeReq.Mode; when
redoStartTs != startTs redo-only tables need the redo-mode splitEnabled values.
Modify the bootstrap path that calls restoreCurrentMergeOperators so it computes
and passes a split map appropriate for the merge mode (e.g., choose
buildTableSplitMap(tables) for default mode and buildTableSplitMap(redoTables)
or a map derived for redo mode when mergeReq.Mode indicates redo), ensuring
restoreCurrentMergeOperators receives the mode-specific split map rather than
the always-normal tables view.
- Around line 956-963: The code assumes spanInfo and mergedSpanInfo are non-nil
and dereferences spanInfo.Span.TableID (and mergedSpanInfo.Span.TableID) causing
panics for malformed bootstrap entries; update the bootstrap handling (where
indexBootstrapSpans / spanInfoByID are read) to nil-check the outer entry and
its inner Span before accessing fields: e.g., before using spanInfo.Span.TableID
or passing spanInfo to
spanController.ShouldEnableSplit/createSpanReplication/AddReplicatingSpan,
return or skip the entry (set sourceComplete=false or continue) when
spanInfo==nil || spanInfo.Span==nil (and similarly for mergedSpanInfo), and
ensure any subsequent logic that relies on a non-nil Span only runs after these
guards.
---
Nitpick comments:
In `@downstreamadapter/dispatchermanager/heartbeat_collector.go`:
- Around line 294-296: The call to DispatcherManager.TrackMergeOperator in the
heartbeat collector is a duplicate of the tracking performed in
MergeDispatcherRequestHandler.Handle (MergeDispatcherRequestHandler.Handle
already tracks the same merge request); remove the early TrackMergeOperator
invocation from the heartbeat_collector code path (the
manager.(*DispatcherManager).TrackMergeOperator(mergeDispatcherRequest) call) so
the single canonical tracking remains in MergeDispatcherRequestHandler.Handle,
keeping idempotency while avoiding confusing redundant calls.
In `@downstreamadapter/dispatchermanager/helper.go`:
- Around line 788-800: Redundant call to TrackMergeOperator: remove the earlier
invocation in RecvMessages (heartbeat_collector.go) so that tracking happens
only in this handler before calling MergeDispatcher; keep the
TrackMergeOperator(dispatcherManager.TrackMergeOperator(mergeDispatcherRequest.MergeDispatcherRequest))
call here, ensure MergeDispatcher(...) and the nil-check that calls
MaybeCleanupMergeOperator(...) remain unchanged, and run tests to confirm no
behavior change.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 0a0daa28-8fc8-4a2c-b916-05a14a5723d0
⛔ Files ignored due to path filters (1)
heartbeatpb/heartbeat.pb.gois excluded by!**/*.pb.go
📒 Files selected for processing (10)
downstreamadapter/dispatchermanager/dispatcher_manager.godownstreamadapter/dispatchermanager/dispatcher_manager_merge.godownstreamadapter/dispatchermanager/heartbeat_collector.godownstreamadapter/dispatchermanager/helper.godownstreamadapter/dispatchermanager/task.godownstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.goheartbeatpb/heartbeat.protomaintainer/maintainer_controller_bootstrap.gomaintainer/operator/operator_controller.gomaintainer/operator/operator_merge.go
| func (e *DispatcherManager) TrackMergeOperator(req *heartbeatpb.MergeDispatcherRequest) { | ||
| if req == nil || req.MergedDispatcherID == nil { | ||
| return | ||
| } | ||
| mergedID := common.NewDispatcherIDFromPB(req.MergedDispatcherID) | ||
| e.mergeOperatorMap.Store(mergedID.String(), cloneMergeDispatcherRequest(req)) |
There was a problem hiding this comment.
Reject zero-valued merged dispatcher IDs before touching mergeOperatorMap.
A non-nil protobuf ID can still decode to DispatcherID{}. Right now those requests all collapse to the same key, and the bootstrap side later treats that zero ID as a real merged dispatcher, so one malformed request can poison recovery state.
🔒 Suggested guard
func (e *DispatcherManager) TrackMergeOperator(req *heartbeatpb.MergeDispatcherRequest) {
if req == nil || req.MergedDispatcherID == nil {
return
}
mergedID := common.NewDispatcherIDFromPB(req.MergedDispatcherID)
+ if mergedID.IsZero() {
+ log.Warn("merge operator has invalid merged dispatcher ID",
+ zap.Stringer("changefeedID", e.changefeedID))
+ return
+ }
e.mergeOperatorMap.Store(mergedID.String(), cloneMergeDispatcherRequest(req))
}Apply the same mergedID.IsZero() guard in MaybeCleanupMergeOperator.
Also applies to: 38-43
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@downstreamadapter/dispatchermanager/dispatcher_manager_merge.go` around lines
24 - 29, The TrackMergeOperator method currently stores requests whose protobuf
ID decodes to a zero-valued DispatcherID, causing all such requests to collide;
before storing into e.mergeOperatorMap call mergedID.IsZero() and return if true
(mirror the guard used in MaybeCleanupMergeOperator), and do the same check
wherever mergeOperatorMap is written (e.g., in the second occurrence around
MaybeCleanupMergeOperator handling) so that cloneMergeDispatcherRequest(req) and
the mergeOperatorMap.Store only run for non-zero mergedID values.
| // Restore merge operators after task state is rebuilt from bootstrap spans/operators. | ||
| // Merge restoration needs the per-dispatcher task map from buildTaskInfo, but must run | ||
| // before we discard any leftover working tasks as dropped-table artifacts. | ||
| if err := c.restoreCurrentMergeOperators(allNodesResp, buildTableSplitMap(tables)); err != nil { |
There was a problem hiding this comment.
Pass a mode-specific split map into merge restoration.
Line 136 always calls restoreCurrentMergeOperators with the normal tables view, but the restore path rebuilds both default and redo merges via mergeReq.Mode. If redoStartTs and startTs diverge, redo-only tables get recreated with the wrong splitEnabled flag after failover.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@maintainer/maintainer_controller_bootstrap.go` around lines 133 - 136, The
restoreCurrentMergeOperators call always uses buildTableSplitMap(tables) but
must use a mode-specific split map because restoreCurrentMergeOperators rebuilds
merges based on mergeReq.Mode; when redoStartTs != startTs redo-only tables need
the redo-mode splitEnabled values. Modify the bootstrap path that calls
restoreCurrentMergeOperators so it computes and passes a split map appropriate
for the merge mode (e.g., choose buildTableSplitMap(tables) for default mode and
buildTableSplitMap(redoTables) or a map derived for redo mode when mergeReq.Mode
indicates redo), ensuring restoreCurrentMergeOperators receives the
mode-specific split map rather than the always-normal tables view.
| spanInfo := spanInfoByID[dispatcherID] | ||
| if spanInfo == nil { | ||
| sourceComplete = false | ||
| break | ||
| } | ||
| splitEnabled := spanController.ShouldEnableSplit(tableSplitMap[spanInfo.Span.TableID]) | ||
| replicaSet = c.createSpanReplication(spanInfo, nodeID, splitEnabled) | ||
| spanController.AddReplicatingSpan(replicaSet) |
There was a problem hiding this comment.
Guard nil Span before dereferencing bootstrap entries.
indexBootstrapSpans only filters nil IDs, not nil Spans. The new recovery path then reads spanInfo.Span.TableID / mergedSpanInfo.Span.TableID in several places, so one malformed bootstrap snapshot can panic the maintainer during bootstrap.
🛡️ Suggested guard
replicaSet := spanController.GetTaskByID(dispatcherID)
if replicaSet == nil {
spanInfo := spanInfoByID[dispatcherID]
- if spanInfo == nil {
+ if spanInfo == nil || spanInfo.Span == nil {
sourceComplete = false
break
}
splitEnabled := spanController.ShouldEnableSplit(tableSplitMap[spanInfo.Span.TableID])
replicaSet = c.createSpanReplication(spanInfo, nodeID, splitEnabled)
@@
- mergedSpanInfo := spanInfoByID[mergedDispatcherID]
+ mergedSpanInfo := spanInfoByID[mergedDispatcherID]
+ if mergedSpanInfo != nil && mergedSpanInfo.Span == nil {
+ log.Warn("merge operator missing merged span, skip restoring it",
+ zap.String("nodeID", nodeID.String()),
+ zap.String("changefeed", resp.ChangefeedID.String()),
+ zap.String("dispatcher", mergedDispatcherID.String()))
+ continue
+ }Also applies to: 972-974, 1001-1004, 1025-1027
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@maintainer/maintainer_controller_bootstrap.go` around lines 956 - 963, The
code assumes spanInfo and mergedSpanInfo are non-nil and dereferences
spanInfo.Span.TableID (and mergedSpanInfo.Span.TableID) causing panics for
malformed bootstrap entries; update the bootstrap handling (where
indexBootstrapSpans / spanInfoByID are read) to nil-check the outer entry and
its inner Span before accessing fields: e.g., before using spanInfo.Span.TableID
or passing spanInfo to
spanController.ShouldEnableSplit/createSpanReplication/AddReplicatingSpan,
return or skip the entry (set sourceComplete=false or continue) when
spanInfo==nil || spanInfo.Span==nil (and similarly for mergedSpanInfo), and
ensure any subsequent logic that relies on a non-nil Span only runs after these
guards.
What problem does this PR solve?
Issue Number: close #xxx
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
None
Do you need to update user documentation, design documentation or monitoring documentation?
None
Release note
Summary by CodeRabbit