From 0596fe9830a0fc78c7d13ad912ed147e2b5d86f1 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Wed, 4 Mar 2026 10:56:33 +0800 Subject: [PATCH 1/4] for test Signed-off-by: dongmen <414110582@qq.com> --- pkg/eventservice/dispatcher_stat.go | 12 +++++- pkg/eventservice/dispatcher_stat_test.go | 5 +++ pkg/eventservice/event_broker.go | 47 +++++++++++++++++++++ pkg/eventservice/event_broker_test.go | 54 ++++++++++++++++++++++++ 4 files changed, 117 insertions(+), 1 deletion(-) diff --git a/pkg/eventservice/dispatcher_stat.go b/pkg/eventservice/dispatcher_stat.go index a99767a704..63d389f8c6 100644 --- a/pkg/eventservice/dispatcher_stat.go +++ b/pkg/eventservice/dispatcher_stat.go @@ -205,7 +205,17 @@ func (a *dispatcherStat) setHandshaked() { } func (a *dispatcherStat) updateSentResolvedTs(resolvedTs uint64) { - a.sentResolvedTs.Store(resolvedTs) + // Keep sentResolvedTs monotonic when resolvedTs can be emitted by multiple goroutines. + for { + old := a.sentResolvedTs.Load() + if resolvedTs < old { + resolvedTs = old + break + } + if old == resolvedTs || a.sentResolvedTs.CompareAndSwap(old, resolvedTs) { + break + } + } a.lastSentResolvedTsTime.Store(time.Now()) a.updateScanRange(resolvedTs, 0) } diff --git a/pkg/eventservice/dispatcher_stat_test.go b/pkg/eventservice/dispatcher_stat_test.go index 0be560ab78..1b06e0b36e 100644 --- a/pkg/eventservice/dispatcher_stat_test.go +++ b/pkg/eventservice/dispatcher_stat_test.go @@ -107,6 +107,11 @@ func TestDispatcherStatGetDataRange(t *testing.T) { r, ok = stat.getDataRange() require.False(t, ok) + // case 3.1: sent resolved ts should not go backward. + stat.updateSentResolvedTs(150) + require.Equal(t, uint64(200), stat.sentResolvedTs.Load()) + require.Equal(t, uint64(200), stat.lastScannedCommitTs.Load()) + // case 4: get range after resolved ts update again stat.onResolvedTs(300) r, ok = stat.getDataRange() diff --git a/pkg/eventservice/event_broker.go b/pkg/eventservice/event_broker.go index a1439f2204..3c13a343db 100644 --- a/pkg/eventservice/event_broker.go +++ b/pkg/eventservice/event_broker.go @@ -45,6 +45,10 @@ const ( defaultMaxBatchSize = 128 defaultFlushResolvedTsInterval = 25 * time.Millisecond + // defaultBootstrapResolvedTsInterval controls the synthetic resolvedTs tick + // before upstream resolvedTs catches up. + defaultBootstrapResolvedTsInterval = time.Second + bootstrapResolvedTsStep = time.Second defaultReportDispatcherStatToStoreInterval = time.Second * 10 @@ -173,6 +177,10 @@ func newEventBroker( return c.logUninitializedDispatchers(ctx) }) + g.Go(func() error { + return c.tickBootstrapResolvedTs(ctx) + }) + g.Go(func() error { return c.reportDispatcherStatToStore(ctx, defaultReportDispatcherStatToStoreInterval) }) @@ -402,6 +410,41 @@ func (c *eventBroker) logUninitializedDispatchers(ctx context.Context) error { } } +func (c *eventBroker) tickBootstrapResolvedTs(ctx context.Context) error { + ticker := time.NewTicker(defaultBootstrapResolvedTsInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return context.Cause(ctx) + case <-ticker.C: + c.dispatchers.Range(func(_, value interface{}) bool { + dispatcher := value.(*atomic.Pointer[dispatcherStat]).Load() + c.sendBootstrapResolvedTsIfNeeded(dispatcher) + return true + }) + } + } +} + +func (c *eventBroker) sendBootstrapResolvedTsIfNeeded(dispatcher *dispatcherStat) { + if dispatcher == nil || dispatcher.isRemoved.Load() || dispatcher.epoch == 0 { + return + } + + sentResolvedTs := dispatcher.sentResolvedTs.Load() + receivedResolvedTs := dispatcher.receivedResolvedTs.Load() + // Stop synthetic advancement once upstream resolvedTs has caught up. + if dispatcher.hasReceivedFirstResolvedTs.Load() && receivedResolvedTs >= sentResolvedTs { + return + } + + c.sendHandshakeIfNeed(dispatcher) + nextResolvedTs := oracle.GoTimeToTS(oracle.GetTimeFromTS(sentResolvedTs).Add(bootstrapResolvedTsStep)) + c.sendResolvedTs(dispatcher, nextResolvedTs) +} + // getScanTaskDataRange determines the valid data range for scanning a given task. // It checks various conditions (dispatcher status, DDL state, max commit ts of dml event) // to decide whether scanning is needed and returns the appropriate time range. @@ -1043,6 +1086,10 @@ func (c *eventBroker) addDispatcher(info DispatcherInfo) error { } c.dispatchers.Store(id, dispatcherPtr) c.metricsCollector.metricDispatcherCount.Inc() + if dispatcher.epoch > 0 { + c.sendHandshakeIfNeed(dispatcher) + c.sendResolvedTs(dispatcher, dispatcher.startTs) + } log.Info("register dispatcher", zap.Uint64("clusterID", c.tidbClusterID), zap.Stringer("changefeedID", changefeedID), diff --git a/pkg/eventservice/event_broker_test.go b/pkg/eventservice/event_broker_test.go index 0c5281ef24..ac9ed9ae91 100644 --- a/pkg/eventservice/event_broker_test.go +++ b/pkg/eventservice/event_broker_test.go @@ -349,6 +349,60 @@ func TestCURDDispatcher(t *testing.T) { require.False(t, ok, "changefeedStatus should be removed after the last dispatcher is removed") } +func TestAddDispatcherSendsStartTsResolvedAfterRegister(t *testing.T) { + broker, _, _, _ := newEventBrokerForTest() + // Close the broker so we can assert the raw message channel ordering. + broker.close() + + dispInfo := newMockDispatcherInfoForTest(t) + dispInfo.epoch = 1 + dispInfo.startTs = 100 + + require.NoError(t, broker.addDispatcher(dispInfo)) + + disp := broker.getDispatcher(dispInfo.GetID()).Load() + require.NotNil(t, disp) + require.True(t, disp.isHandshaked()) + + handshakeEvent := <-broker.messageCh[disp.messageWorkerIndex] + require.Equal(t, event.TypeHandshakeEvent, handshakeEvent.msgType) + + resolvedEvent := <-broker.messageCh[disp.messageWorkerIndex] + require.Equal(t, event.TypeResolvedEvent, resolvedEvent.msgType) + require.Equal(t, dispInfo.startTs, resolvedEvent.resolvedTsEvent.ResolvedTs) + require.Equal(t, uint64(1), resolvedEvent.resolvedTsEvent.Seq) +} + +func TestSendBootstrapResolvedTsIfNeeded(t *testing.T) { + broker, _, _, _ := newEventBrokerForTest() + // Close the broker so we can assert the raw message channel ordering. + broker.close() + + dispInfo := newMockDispatcherInfoForTest(t) + dispInfo.epoch = 1 + dispInfo.startTs = 100 + changefeedStatus := broker.getOrSetChangefeedStatus(dispInfo.GetChangefeedID(), dispInfo.GetSyncPointInterval()) + disp := newDispatcherStat(dispInfo, 1, 1, nil, changefeedStatus) + + // Upstream hasn't caught up yet, broker should synthesize handshake + resolvedTs. + broker.sendBootstrapResolvedTsIfNeeded(disp) + handshakeEvent := <-broker.messageCh[disp.messageWorkerIndex] + require.Equal(t, event.TypeHandshakeEvent, handshakeEvent.msgType) + resolvedEvent := <-broker.messageCh[disp.messageWorkerIndex] + require.Equal(t, event.TypeResolvedEvent, resolvedEvent.msgType) + require.Equal(t, oracle.GoTimeToTS(oracle.GetTimeFromTS(dispInfo.startTs).Add(time.Second)), resolvedEvent.resolvedTsEvent.ResolvedTs) + + // Upstream catches up, synthetic advancement should stop. + disp.hasReceivedFirstResolvedTs.Store(true) + disp.receivedResolvedTs.Store(disp.sentResolvedTs.Load()) + broker.sendBootstrapResolvedTsIfNeeded(disp) + select { + case <-broker.messageCh[disp.messageWorkerIndex]: + require.Fail(t, "unexpected synthetic resolvedTs after upstream catch up") + default: + } +} + func TestResetDispatcher(t *testing.T) { broker, _, _, _ := newEventBrokerForTest() defer broker.close() From 0b46ad8d88d03e5e36521b04aa08859cfb857f23 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Wed, 4 Mar 2026 14:41:00 +0800 Subject: [PATCH 2/4] for test 2 Signed-off-by: dongmen <414110582@qq.com> --- pkg/eventservice/event_broker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/eventservice/event_broker.go b/pkg/eventservice/event_broker.go index 3c13a343db..0d61a6221e 100644 --- a/pkg/eventservice/event_broker.go +++ b/pkg/eventservice/event_broker.go @@ -429,7 +429,7 @@ func (c *eventBroker) tickBootstrapResolvedTs(ctx context.Context) error { } func (c *eventBroker) sendBootstrapResolvedTsIfNeeded(dispatcher *dispatcherStat) { - if dispatcher == nil || dispatcher.isRemoved.Load() || dispatcher.epoch == 0 { + if dispatcher == nil || dispatcher.isRemoved.Load() || dispatcher.epoch > 0 { return } From f2cb0b23b6f29ad6b3c7d151f6672ef0f2d5b809 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Wed, 4 Mar 2026 15:38:35 +0800 Subject: [PATCH 3/4] for test 3 Signed-off-by: dongmen <414110582@qq.com> --- pkg/eventservice/dispatcher_stat.go | 6 ++++- pkg/eventservice/event_broker.go | 33 +++++++++++++++++++++++---- pkg/eventservice/event_broker_test.go | 11 +++++++++ 3 files changed, 45 insertions(+), 5 deletions(-) diff --git a/pkg/eventservice/dispatcher_stat.go b/pkg/eventservice/dispatcher_stat.go index 63d389f8c6..e0d4d2f93d 100644 --- a/pkg/eventservice/dispatcher_stat.go +++ b/pkg/eventservice/dispatcher_stat.go @@ -205,6 +205,11 @@ func (a *dispatcherStat) setHandshaked() { } func (a *dispatcherStat) updateSentResolvedTs(resolvedTs uint64) { + a.updateSentResolvedTsOnly(resolvedTs) + a.updateScanRange(resolvedTs, 0) +} + +func (a *dispatcherStat) updateSentResolvedTsOnly(resolvedTs uint64) { // Keep sentResolvedTs monotonic when resolvedTs can be emitted by multiple goroutines. for { old := a.sentResolvedTs.Load() @@ -217,7 +222,6 @@ func (a *dispatcherStat) updateSentResolvedTs(resolvedTs uint64) { } } a.lastSentResolvedTsTime.Store(time.Now()) - a.updateScanRange(resolvedTs, 0) } func (a *dispatcherStat) updateScanRange(txnCommitTs, txnStartTs uint64) { diff --git a/pkg/eventservice/event_broker.go b/pkg/eventservice/event_broker.go index 0d61a6221e..4437cba752 100644 --- a/pkg/eventservice/event_broker.go +++ b/pkg/eventservice/event_broker.go @@ -305,6 +305,25 @@ func (c *eventBroker) sendResolvedTs(d *dispatcherStat, watermark uint64) { updateMetricEventServiceSendResolvedTsCount(d.info.GetMode()) } +func (c *eventBroker) sendBootstrapResolvedTs(d *dispatcherStat, watermark uint64) bool { + remoteID := node.ID(d.info.GetServerID()) + c.emitSyncPointEventIfNeeded(watermark, d, remoteID) + re := event.NewResolvedEvent(watermark, d.id, d.epoch) + re.Seq = d.seq.Load() + resolvedEvent := newWrapResolvedEvent(remoteID, re) + ch := c.getMessageCh(d.messageWorkerIndex, common.IsRedoMode(d.info.GetMode())) + select { + case ch <- resolvedEvent: + d.updateSentResolvedTs(watermark) + updateMetricEventServiceSendResolvedTsCount(d.info.GetMode()) + return true + default: + // Avoid blocking broker tick path. The next ticker round will retry. + resolvedEvent.reset() + return false + } +} + func (c *eventBroker) sendNotReusableEvent( server node.ID, d *dispatcherStat, @@ -429,20 +448,26 @@ func (c *eventBroker) tickBootstrapResolvedTs(ctx context.Context) error { } func (c *eventBroker) sendBootstrapResolvedTsIfNeeded(dispatcher *dispatcherStat) { - if dispatcher == nil || dispatcher.isRemoved.Load() || dispatcher.epoch > 0 { + if dispatcher == nil || dispatcher.isRemoved.Load() { + return + } + + // For epoch 0 dispatchers, drive the ready/reset handshake first. + if dispatcher.epoch == 0 { + c.checkAndSendReady(dispatcher) return } sentResolvedTs := dispatcher.sentResolvedTs.Load() receivedResolvedTs := dispatcher.receivedResolvedTs.Load() - // Stop synthetic advancement once upstream resolvedTs has caught up. + // Stop synthetic advancement once upstream resolved-ts catches up. if dispatcher.hasReceivedFirstResolvedTs.Load() && receivedResolvedTs >= sentResolvedTs { return } c.sendHandshakeIfNeed(dispatcher) nextResolvedTs := oracle.GoTimeToTS(oracle.GetTimeFromTS(sentResolvedTs).Add(bootstrapResolvedTsStep)) - c.sendResolvedTs(dispatcher, nextResolvedTs) + c.sendBootstrapResolvedTs(dispatcher, nextResolvedTs) } // getScanTaskDataRange determines the valid data range for scanning a given task. @@ -1088,7 +1113,7 @@ func (c *eventBroker) addDispatcher(info DispatcherInfo) error { c.metricsCollector.metricDispatcherCount.Inc() if dispatcher.epoch > 0 { c.sendHandshakeIfNeed(dispatcher) - c.sendResolvedTs(dispatcher, dispatcher.startTs) + c.sendBootstrapResolvedTs(dispatcher, dispatcher.startTs) } log.Info("register dispatcher", zap.Uint64("clusterID", c.tidbClusterID), diff --git a/pkg/eventservice/event_broker_test.go b/pkg/eventservice/event_broker_test.go index ac9ed9ae91..7bd74b922d 100644 --- a/pkg/eventservice/event_broker_test.go +++ b/pkg/eventservice/event_broker_test.go @@ -378,6 +378,15 @@ func TestSendBootstrapResolvedTsIfNeeded(t *testing.T) { // Close the broker so we can assert the raw message channel ordering. broker.close() + // For epoch 0 dispatcher, broker should drive ready/reset flow first. + dispInfoEpoch0 := newMockDispatcherInfoForTest(t) + dispInfoEpoch0.epoch = 0 + changefeedStatusEpoch0 := broker.getOrSetChangefeedStatus(dispInfoEpoch0.GetChangefeedID(), dispInfoEpoch0.GetSyncPointInterval()) + dispEpoch0 := newDispatcherStat(dispInfoEpoch0, 1, 1, nil, changefeedStatusEpoch0) + broker.sendBootstrapResolvedTsIfNeeded(dispEpoch0) + readyEvent := <-broker.messageCh[dispEpoch0.messageWorkerIndex] + require.Equal(t, event.TypeReadyEvent, readyEvent.msgType) + dispInfo := newMockDispatcherInfoForTest(t) dispInfo.epoch = 1 dispInfo.startTs = 100 @@ -391,6 +400,8 @@ func TestSendBootstrapResolvedTsIfNeeded(t *testing.T) { resolvedEvent := <-broker.messageCh[disp.messageWorkerIndex] require.Equal(t, event.TypeResolvedEvent, resolvedEvent.msgType) require.Equal(t, oracle.GoTimeToTS(oracle.GetTimeFromTS(dispInfo.startTs).Add(time.Second)), resolvedEvent.resolvedTsEvent.ResolvedTs) + // Synthetic resolvedTs should move scan progress to skip data. + require.Equal(t, resolvedEvent.resolvedTsEvent.ResolvedTs, disp.lastScannedCommitTs.Load()) // Upstream catches up, synthetic advancement should stop. disp.hasReceivedFirstResolvedTs.Store(true) From 7ecd02a637982c29332646d0c638cf276b3793b4 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Wed, 4 Mar 2026 15:49:22 +0800 Subject: [PATCH 4/4] add log Signed-off-by: dongmen <414110582@qq.com> --- pkg/eventservice/event_broker.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/eventservice/event_broker.go b/pkg/eventservice/event_broker.go index 4437cba752..93fce0d202 100644 --- a/pkg/eventservice/event_broker.go +++ b/pkg/eventservice/event_broker.go @@ -468,6 +468,7 @@ func (c *eventBroker) sendBootstrapResolvedTsIfNeeded(dispatcher *dispatcherStat c.sendHandshakeIfNeed(dispatcher) nextResolvedTs := oracle.GoTimeToTS(oracle.GetTimeFromTS(sentResolvedTs).Add(bootstrapResolvedTsStep)) c.sendBootstrapResolvedTs(dispatcher, nextResolvedTs) + log.Debug("fizz send resolvedTs", zap.Any("dispatcherID", dispatcher.id), zap.Any("resolvedTs", nextResolvedTs)) } // getScanTaskDataRange determines the valid data range for scanning a given task.