Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions downstreamadapter/dispatchermanager/dispatcher_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,9 @@ func (e *DispatcherManager) aggregateDispatcherHeartbeats(needCompleteStatus boo
}

if needCompleteStatus {
if dispatcherItem.GetTryRemoving() {
return
}
Comment on lines +746 to +748

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This check is useful to avoid sending heartbeats for dispatchers that are being removed. The same logic should also be applied to the dispatcherMap loop further down in this function (around line 773) for consistency and to prevent meaningless heartbeats from regular event dispatchers.

if watermark != nil {
eventServiceDispatcherHeartbeat.Append(event.NewDispatcherProgress(id, watermark.CheckpointTs))
} else {
Expand Down