dispatcher,dispatchermanager: deduplicate pending block statuses#5028
dispatcher,dispatchermanager: deduplicate pending block statuses#5028hongyunyan wants to merge 5 commits into
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Warning Rate limit exceeded
You’ve run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (7)
📝 WalkthroughWalkthroughAdds a bounded BlockStatusBuffer that deduplicates pending WAITING/DONE statuses and replaces direct channel usage with Offer/Take APIs on SharedInfo/Dispatcher; adds BlockStatusRequestQueue deduplication with in-flight tracking and batching/splitting in dispatcher manager; adds fanout pass-action quiet-interval throttling in BarrierEvent and related tests. ChangesBlock Status Buffer Infrastructure and Dispatcher Refactoring
Request Queue Deduplication and Dispatcher Manager Integration
Barrier Event Fanout Pass-Action Throttling and Maintainer Logging
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request optimizes the handling of dispatcher block statuses and heartbeats by introducing a BlockStatusBuffer that coalesces identical WAITING and DONE statuses to reduce memory amplification. It also implements deduplication for queued and in-flight status requests in the BlockStatusRequestQueue and adds a buffering/merging mechanism in the Maintainer to process incoming status updates more efficiently. Batch sizes for status requests are now capped, and resending of fan-out pass actions is throttled. Review feedback suggests that the Maintainer should buffer status requests even before initialization is finished to avoid unnecessary delays caused by dropping early messages.
| if !m.initialized.Load() { | ||
| m.recordDroppedStatusRequest(event.message.Type, "maintainer not initialized") | ||
| return true, false | ||
| } |
There was a problem hiding this comment.
Dropping heartbeats and block status requests when the maintainer is not yet initialized can lead to unnecessary delays in DDL processing and status tracking. While dispatchers will eventually resend these statuses (typically every 1 second), it would be more efficient to buffer them even before initialization is complete, and then process the accumulated statuses once the maintainer is ready. This avoids waiting for the next resend cycle.
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (1)
downstreamadapter/dispatcher/block_status_buffer.go (1)
61-71: ⚡ Quick winAvoid recomputing WAITING dedup keys during dequeue.
materializecurrently derives the WAITING key fromentry.statusagain. If that protobuf gets mutated afterOffer, the original pending key may never be removed, causing persistent over-deduplication for that key.♻️ Suggested localized fix
type blockStatusQueueEntry struct { status *heartbeatpb.TableSpanBlockStatus + waitingKey *blockStatusKey doneKey *blockStatusKey } func (b *BlockStatusBuffer) Offer(status *heartbeatpb.TableSpanBlockStatus) { @@ if isWaitingBlockStatus(status) { key := newBlockStatusKey(status) if !b.reserveWaiting(key) { return } - b.queue <- blockStatusQueueEntry{status: status} + b.queue <- blockStatusQueueEntry{status: status, waitingKey: &key} return } @@ func (b *BlockStatusBuffer) materialize(entry blockStatusQueueEntry) *heartbeatpb.TableSpanBlockStatus { if entry.status != nil { - if isWaitingBlockStatus(entry.status) { - key := newBlockStatusKey(entry.status) - b.mu.Lock() - delete(b.pendingWaiting, key) - b.mu.Unlock() - } + if entry.waitingKey != nil { + b.mu.Lock() + delete(b.pendingWaiting, *entry.waitingKey) + b.mu.Unlock() + } return entry.status }Also applies to: 145-151
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@downstreamadapter/dispatcher/block_status_buffer.go` around lines 61 - 71, The WAITING dedup key is recomputed from entry.status during dequeue (in materialize), which breaks if the protobuf is mutated after Offer; modify blockStatusQueueEntry to include and carry the precomputed waiting key (created by newBlockStatusKey in Offer when isWaitingBlockStatus is true), push that key into the queue entry in Offer (instead of recomputing later), and update materialize and any dequeue logic (the code referencing entry.status to re-derive the key around reserveWaiting/newBlockStatusKey usage) to use the carried key for removing the pending reservation; ensure functions/structs touched include BlockStatusBuffer.Offer, blockStatusQueueEntry, reserveWaiting, and materialize so the original pending key is reliably removed.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@downstreamadapter/dispatchermanager/dispatcher_manager.go`:
- Around line 612-621: The loop is assigning message.BlockStatuses =
blockStatusMessage[start:end] which retains the large backing array; instead
allocate a new slice and copy the chunk contents before enqueuing to avoid
holding the full backing array in memory. For each iteration, create a new slice
(length = end-start), copy blockStatusMessage[start:end] into it, set
message.BlockStatuses to that new slice, then wrap into
BlockStatusRequestWithTargetID and call e.blockStatusRequestQueue.Enqueue; keep
the other fields (message.ChangefeedID = e.changefeedID.ToPB(), message.Mode =
mode, TargetID via e.GetMaintainerID()) unchanged.
In `@maintainer/status_request_buffer.go`:
- Around line 245-261: The drain logic is dropping buffered heartbeats that only
carry the completion flag; update the condition that decides to emit an event so
that a HeartBeatRequest with entry.completeStatus set is also forwarded. Locate
the loop over b.heartbeats in drain that constructs heartbeatpb.HeartBeatRequest
(fields: ChangefeedID, Statuses, Watermark, RedoWatermark, Err, CompeleteStatus)
and modify the final if-check (currently testing len(req.Statuses),
req.Watermark, req.RedoWatermark, req.Err) to also include req.CompeleteStatus
(or entry.completeStatus) so that mergeHeartbeat-preserved completion signals
are not dropped. Ensure newBufferedStatusEvent still receives the request
unchanged.
- Around line 118-121: Buffered status events are being replayed even after
shutdown begins; update handleBufferedStatusRequests to drop any buffered events
once m.removing or m.removed is set instead of calling HandleEvent on them.
Concretely, after calling takeBufferedStatusRequestEvents() (or before
processing each event) check the maintainer flags m.removing || m.removed and if
true discard the events (return/skip loop) so no stale heartbeat/block-status
events call HandleEvent; you can also filter the slice to keep events only while
removal is false, but ensure no calls to HandleEvent occur for dropped events.
---
Nitpick comments:
In `@downstreamadapter/dispatcher/block_status_buffer.go`:
- Around line 61-71: The WAITING dedup key is recomputed from entry.status
during dequeue (in materialize), which breaks if the protobuf is mutated after
Offer; modify blockStatusQueueEntry to include and carry the precomputed waiting
key (created by newBlockStatusKey in Offer when isWaitingBlockStatus is true),
push that key into the queue entry in Offer (instead of recomputing later), and
update materialize and any dequeue logic (the code referencing entry.status to
re-derive the key around reserveWaiting/newBlockStatusKey usage) to use the
carried key for removing the pending reservation; ensure functions/structs
touched include BlockStatusBuffer.Offer, blockStatusQueueEntry, reserveWaiting,
and materialize so the original pending key is reliably removed.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: c475db00-27be-4f38-bd91-2e5a3d3e7fee
📒 Files selected for processing (17)
downstreamadapter/dispatcher/basic_dispatcher.godownstreamadapter/dispatcher/basic_dispatcher_info.godownstreamadapter/dispatcher/block_status_buffer.godownstreamadapter/dispatcher/block_status_buffer_test.godownstreamadapter/dispatcher/event_dispatcher.godownstreamadapter/dispatcher/event_dispatcher_test.godownstreamadapter/dispatcher/helper.godownstreamadapter/dispatchermanager/dispatcher_manager.godownstreamadapter/dispatchermanager/dispatcher_manager_test.godownstreamadapter/dispatchermanager/heartbeat_collector.godownstreamadapter/dispatchermanager/heartbeat_queue.godownstreamadapter/dispatchermanager/heartbeat_queue_test.gomaintainer/barrier_event.gomaintainer/barrier_event_test.gomaintainer/maintainer.gomaintainer/maintainer_test.gomaintainer/status_request_buffer.go
💤 Files with no reviewable changes (1)
- downstreamadapter/dispatcher/event_dispatcher.go
| for start := 0; start < len(blockStatusMessage); start += maxBlockStatusesPerRequest { | ||
| end := start + maxBlockStatusesPerRequest | ||
| if end > len(blockStatusMessage) { | ||
| end = len(blockStatusMessage) | ||
| } | ||
| var message heartbeatpb.BlockStatusRequest | ||
| message.ChangefeedID = e.changefeedID.ToPB() | ||
| message.BlockStatuses = blockStatusMessage[start:end] | ||
| message.Mode = mode | ||
| e.blockStatusRequestQueue.Enqueue(&BlockStatusRequestWithTargetID{TargetID: e.GetMaintainerID(), Request: &message}) |
There was a problem hiding this comment.
Split chunks should not share one large backing slice.
At Line 619, message.BlockStatuses = blockStatusMessage[start:end] keeps references to the full batch backing array. Under queueing pressure, this can retain much more memory than intended even after splitting.
🛠️ Suggested fix
for start := 0; start < len(blockStatusMessage); start += maxBlockStatusesPerRequest {
end := start + maxBlockStatusesPerRequest
if end > len(blockStatusMessage) {
end = len(blockStatusMessage)
}
+ chunk := make([]*heartbeatpb.TableSpanBlockStatus, end-start)
+ copy(chunk, blockStatusMessage[start:end])
var message heartbeatpb.BlockStatusRequest
message.ChangefeedID = e.changefeedID.ToPB()
- message.BlockStatuses = blockStatusMessage[start:end]
+ message.BlockStatuses = chunk
message.Mode = mode
e.blockStatusRequestQueue.Enqueue(&BlockStatusRequestWithTargetID{TargetID: e.GetMaintainerID(), Request: &message})
}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@downstreamadapter/dispatchermanager/dispatcher_manager.go` around lines 612 -
621, The loop is assigning message.BlockStatuses = blockStatusMessage[start:end]
which retains the large backing array; instead allocate a new slice and copy the
chunk contents before enqueuing to avoid holding the full backing array in
memory. For each iteration, create a new slice (length = end-start), copy
blockStatusMessage[start:end] into it, set message.BlockStatuses to that new
slice, then wrap into BlockStatusRequestWithTargetID and call
e.blockStatusRequestQueue.Enqueue; keep the other fields (message.ChangefeedID =
e.changefeedID.ToPB(), message.Mode = mode, TargetID via e.GetMaintainerID())
unchanged.
| for from, entry := range b.heartbeats { | ||
| req := &heartbeatpb.HeartBeatRequest{ | ||
| ChangefeedID: entry.changefeedID, | ||
| Statuses: make([]*heartbeatpb.TableSpanStatus, 0, len(entry.order)), | ||
| Watermark: entry.watermark, | ||
| RedoWatermark: entry.redoWatermark, | ||
| Err: entry.err, | ||
| CompeleteStatus: entry.completeStatus, | ||
| } | ||
| for _, key := range entry.order { | ||
| if status, ok := entry.statuses[key]; ok { | ||
| req.Statuses = append(req.Statuses, status) | ||
| } | ||
| } | ||
| if len(req.Statuses) > 0 || req.Watermark != nil || req.RedoWatermark != nil || req.Err != nil { | ||
| events = append(events, newBufferedStatusEvent(changefeedID, from, messaging.TypeHeartBeatRequest, req)) | ||
| } |
There was a problem hiding this comment.
Don't discard heartbeat batches whose only signal is CompeleteStatus.
mergeHeartbeat preserves entry.completeStatus, but drain only emits a heartbeat event when statuses, watermarks, or Err exist. A buffered heartbeat with just CompeleteStatus=true is dropped here, so that flag never reaches normal handling.
Suggested fix
- if len(req.Statuses) > 0 || req.Watermark != nil || req.RedoWatermark != nil || req.Err != nil {
+ if len(req.Statuses) > 0 || req.Watermark != nil || req.RedoWatermark != nil || req.Err != nil || req.CompeleteStatus {
events = append(events, newBufferedStatusEvent(changefeedID, from, messaging.TypeHeartBeatRequest, req))
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| for from, entry := range b.heartbeats { | |
| req := &heartbeatpb.HeartBeatRequest{ | |
| ChangefeedID: entry.changefeedID, | |
| Statuses: make([]*heartbeatpb.TableSpanStatus, 0, len(entry.order)), | |
| Watermark: entry.watermark, | |
| RedoWatermark: entry.redoWatermark, | |
| Err: entry.err, | |
| CompeleteStatus: entry.completeStatus, | |
| } | |
| for _, key := range entry.order { | |
| if status, ok := entry.statuses[key]; ok { | |
| req.Statuses = append(req.Statuses, status) | |
| } | |
| } | |
| if len(req.Statuses) > 0 || req.Watermark != nil || req.RedoWatermark != nil || req.Err != nil { | |
| events = append(events, newBufferedStatusEvent(changefeedID, from, messaging.TypeHeartBeatRequest, req)) | |
| } | |
| for from, entry := range b.heartbeats { | |
| req := &heartbeatpb.HeartBeatRequest{ | |
| ChangefeedID: entry.changefeedID, | |
| Statuses: make([]*heartbeatpb.TableSpanStatus, 0, len(entry.order)), | |
| Watermark: entry.watermark, | |
| RedoWatermark: entry.redoWatermark, | |
| Err: entry.err, | |
| CompeleteStatus: entry.completeStatus, | |
| } | |
| for _, key := range entry.order { | |
| if status, ok := entry.statuses[key]; ok { | |
| req.Statuses = append(req.Statuses, status) | |
| } | |
| } | |
| if len(req.Statuses) > 0 || req.Watermark != nil || req.RedoWatermark != nil || req.Err != nil || req.CompeleteStatus { | |
| events = append(events, newBufferedStatusEvent(changefeedID, from, messaging.TypeHeartBeatRequest, req)) | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@maintainer/status_request_buffer.go` around lines 245 - 261, The drain logic
is dropping buffered heartbeats that only carry the completion flag; update the
condition that decides to emit an event so that a HeartBeatRequest with
entry.completeStatus set is also forwarded. Locate the loop over b.heartbeats in
drain that constructs heartbeatpb.HeartBeatRequest (fields: ChangefeedID,
Statuses, Watermark, RedoWatermark, Err, CompeleteStatus) and modify the final
if-check (currently testing len(req.Statuses), req.Watermark, req.RedoWatermark,
req.Err) to also include req.CompeleteStatus (or entry.completeStatus) so that
mergeHeartbeat-preserved completion signals are not dropped. Ensure
newBufferedStatusEvent still receives the request unchanged.
|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. |
What problem does this PR solve?
Issue Number: ref #0
This PR recreates the useful part of #4814 on top of the latest
pingcap/ticdc:master, while excluding the effects of the following commits from that PR branch:632330358254d147d548d568711c62a9ee509e4d6cc8b8280cc7da0996c6c2433987fe6b73bf0d32Background:
What is changed and how it works?
Summary:
BlockStatusBufferbetween dispatchers and dispatcher manager to keep ordering while coalescing identical pending WAITING and DONE statuses.BlockStatusRequestQueue, with explicit send-complete cleanup.onHeartbeatRequest, including strict same-sequence checkpoint handling and maxLastSyncedTspreservation.Check List
Tests
Unit test
make fmtgit diff --check upstream/master...HEADgo test -count=1 ./maintainergo test -count=1 ./downstreamadapter/dispatchermanagergo test -count=1 ./downstreamadapter/dispatcher -run 'TestBlockStatusBuffer|TestIgnoredBlockStatus|TestBlockingDDLFlushBeforeWaitingAndWriteDoesNotFlushAgain|TestHandleDispatcherStatus|TestDealWithBlockEvent'Note:
go test -count=1 --tags=intest ./downstreamadapter/dispatcher -run '^TestRedoBatchDMLEventsPartialFlush$'fails in this local environment on both this branch and latestupstream/masterbecause/tmp/tidbis owned byroot:rootwith mode700, causingstat /tmp/tidb/tmp_ddl-4000: permission deniedduring TiDB DDL initialization.Questions
Will it cause performance regression or break compatibility?
No protocol or persistent format change is introduced. The change coalesces duplicate pending local statuses and bounds block status request size. It is intended to reduce memory pressure and local queue amplification.
Do you need to update user documentation, design documentation or monitoring documentation?
No.
Release note
Summary by CodeRabbit
Bug Fixes
Improvements
Tests