diff --git a/pkg/eventservice/dispatcher_stat.go b/pkg/eventservice/dispatcher_stat.go index a99767a704..e0d4d2f93d 100644 --- a/pkg/eventservice/dispatcher_stat.go +++ b/pkg/eventservice/dispatcher_stat.go @@ -205,11 +205,25 @@ func (a *dispatcherStat) setHandshaked() { } func (a *dispatcherStat) updateSentResolvedTs(resolvedTs uint64) { - a.sentResolvedTs.Store(resolvedTs) - a.lastSentResolvedTsTime.Store(time.Now()) + a.updateSentResolvedTsOnly(resolvedTs) a.updateScanRange(resolvedTs, 0) } +func (a *dispatcherStat) updateSentResolvedTsOnly(resolvedTs uint64) { + // Keep sentResolvedTs monotonic when resolvedTs can be emitted by multiple goroutines. + for { + old := a.sentResolvedTs.Load() + if resolvedTs < old { + resolvedTs = old + break + } + if old == resolvedTs || a.sentResolvedTs.CompareAndSwap(old, resolvedTs) { + break + } + } + a.lastSentResolvedTsTime.Store(time.Now()) +} + func (a *dispatcherStat) updateScanRange(txnCommitTs, txnStartTs uint64) { a.lastScannedCommitTs.Store(txnCommitTs) a.lastScannedStartTs.Store(txnStartTs) diff --git a/pkg/eventservice/dispatcher_stat_test.go b/pkg/eventservice/dispatcher_stat_test.go index 0be560ab78..1b06e0b36e 100644 --- a/pkg/eventservice/dispatcher_stat_test.go +++ b/pkg/eventservice/dispatcher_stat_test.go @@ -107,6 +107,11 @@ func TestDispatcherStatGetDataRange(t *testing.T) { r, ok = stat.getDataRange() require.False(t, ok) + // case 3.1: sent resolved ts should not go backward. + stat.updateSentResolvedTs(150) + require.Equal(t, uint64(200), stat.sentResolvedTs.Load()) + require.Equal(t, uint64(200), stat.lastScannedCommitTs.Load()) + // case 4: get range after resolved ts update again stat.onResolvedTs(300) r, ok = stat.getDataRange() diff --git a/pkg/eventservice/event_broker.go b/pkg/eventservice/event_broker.go index a1439f2204..93fce0d202 100644 --- a/pkg/eventservice/event_broker.go +++ b/pkg/eventservice/event_broker.go @@ -45,6 +45,10 @@ const ( defaultMaxBatchSize = 128 defaultFlushResolvedTsInterval = 25 * time.Millisecond + // defaultBootstrapResolvedTsInterval controls the synthetic resolvedTs tick + // before upstream resolvedTs catches up. + defaultBootstrapResolvedTsInterval = time.Second + bootstrapResolvedTsStep = time.Second defaultReportDispatcherStatToStoreInterval = time.Second * 10 @@ -173,6 +177,10 @@ func newEventBroker( return c.logUninitializedDispatchers(ctx) }) + g.Go(func() error { + return c.tickBootstrapResolvedTs(ctx) + }) + g.Go(func() error { return c.reportDispatcherStatToStore(ctx, defaultReportDispatcherStatToStoreInterval) }) @@ -297,6 +305,25 @@ func (c *eventBroker) sendResolvedTs(d *dispatcherStat, watermark uint64) { updateMetricEventServiceSendResolvedTsCount(d.info.GetMode()) } +func (c *eventBroker) sendBootstrapResolvedTs(d *dispatcherStat, watermark uint64) bool { + remoteID := node.ID(d.info.GetServerID()) + c.emitSyncPointEventIfNeeded(watermark, d, remoteID) + re := event.NewResolvedEvent(watermark, d.id, d.epoch) + re.Seq = d.seq.Load() + resolvedEvent := newWrapResolvedEvent(remoteID, re) + ch := c.getMessageCh(d.messageWorkerIndex, common.IsRedoMode(d.info.GetMode())) + select { + case ch <- resolvedEvent: + d.updateSentResolvedTs(watermark) + updateMetricEventServiceSendResolvedTsCount(d.info.GetMode()) + return true + default: + // Avoid blocking broker tick path. The next ticker round will retry. + resolvedEvent.reset() + return false + } +} + func (c *eventBroker) sendNotReusableEvent( server node.ID, d *dispatcherStat, @@ -402,6 +429,48 @@ func (c *eventBroker) logUninitializedDispatchers(ctx context.Context) error { } } +func (c *eventBroker) tickBootstrapResolvedTs(ctx context.Context) error { + ticker := time.NewTicker(defaultBootstrapResolvedTsInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return context.Cause(ctx) + case <-ticker.C: + c.dispatchers.Range(func(_, value interface{}) bool { + dispatcher := value.(*atomic.Pointer[dispatcherStat]).Load() + c.sendBootstrapResolvedTsIfNeeded(dispatcher) + return true + }) + } + } +} + +func (c *eventBroker) sendBootstrapResolvedTsIfNeeded(dispatcher *dispatcherStat) { + if dispatcher == nil || dispatcher.isRemoved.Load() { + return + } + + // For epoch 0 dispatchers, drive the ready/reset handshake first. + if dispatcher.epoch == 0 { + c.checkAndSendReady(dispatcher) + return + } + + sentResolvedTs := dispatcher.sentResolvedTs.Load() + receivedResolvedTs := dispatcher.receivedResolvedTs.Load() + // Stop synthetic advancement once upstream resolved-ts catches up. + if dispatcher.hasReceivedFirstResolvedTs.Load() && receivedResolvedTs >= sentResolvedTs { + return + } + + c.sendHandshakeIfNeed(dispatcher) + nextResolvedTs := oracle.GoTimeToTS(oracle.GetTimeFromTS(sentResolvedTs).Add(bootstrapResolvedTsStep)) + c.sendBootstrapResolvedTs(dispatcher, nextResolvedTs) + log.Debug("fizz send resolvedTs", zap.Any("dispatcherID", dispatcher.id), zap.Any("resolvedTs", nextResolvedTs)) +} + // getScanTaskDataRange determines the valid data range for scanning a given task. // It checks various conditions (dispatcher status, DDL state, max commit ts of dml event) // to decide whether scanning is needed and returns the appropriate time range. @@ -1043,6 +1112,10 @@ func (c *eventBroker) addDispatcher(info DispatcherInfo) error { } c.dispatchers.Store(id, dispatcherPtr) c.metricsCollector.metricDispatcherCount.Inc() + if dispatcher.epoch > 0 { + c.sendHandshakeIfNeed(dispatcher) + c.sendBootstrapResolvedTs(dispatcher, dispatcher.startTs) + } log.Info("register dispatcher", zap.Uint64("clusterID", c.tidbClusterID), zap.Stringer("changefeedID", changefeedID), diff --git a/pkg/eventservice/event_broker_test.go b/pkg/eventservice/event_broker_test.go index 0c5281ef24..7bd74b922d 100644 --- a/pkg/eventservice/event_broker_test.go +++ b/pkg/eventservice/event_broker_test.go @@ -349,6 +349,71 @@ func TestCURDDispatcher(t *testing.T) { require.False(t, ok, "changefeedStatus should be removed after the last dispatcher is removed") } +func TestAddDispatcherSendsStartTsResolvedAfterRegister(t *testing.T) { + broker, _, _, _ := newEventBrokerForTest() + // Close the broker so we can assert the raw message channel ordering. + broker.close() + + dispInfo := newMockDispatcherInfoForTest(t) + dispInfo.epoch = 1 + dispInfo.startTs = 100 + + require.NoError(t, broker.addDispatcher(dispInfo)) + + disp := broker.getDispatcher(dispInfo.GetID()).Load() + require.NotNil(t, disp) + require.True(t, disp.isHandshaked()) + + handshakeEvent := <-broker.messageCh[disp.messageWorkerIndex] + require.Equal(t, event.TypeHandshakeEvent, handshakeEvent.msgType) + + resolvedEvent := <-broker.messageCh[disp.messageWorkerIndex] + require.Equal(t, event.TypeResolvedEvent, resolvedEvent.msgType) + require.Equal(t, dispInfo.startTs, resolvedEvent.resolvedTsEvent.ResolvedTs) + require.Equal(t, uint64(1), resolvedEvent.resolvedTsEvent.Seq) +} + +func TestSendBootstrapResolvedTsIfNeeded(t *testing.T) { + broker, _, _, _ := newEventBrokerForTest() + // Close the broker so we can assert the raw message channel ordering. + broker.close() + + // For epoch 0 dispatcher, broker should drive ready/reset flow first. + dispInfoEpoch0 := newMockDispatcherInfoForTest(t) + dispInfoEpoch0.epoch = 0 + changefeedStatusEpoch0 := broker.getOrSetChangefeedStatus(dispInfoEpoch0.GetChangefeedID(), dispInfoEpoch0.GetSyncPointInterval()) + dispEpoch0 := newDispatcherStat(dispInfoEpoch0, 1, 1, nil, changefeedStatusEpoch0) + broker.sendBootstrapResolvedTsIfNeeded(dispEpoch0) + readyEvent := <-broker.messageCh[dispEpoch0.messageWorkerIndex] + require.Equal(t, event.TypeReadyEvent, readyEvent.msgType) + + dispInfo := newMockDispatcherInfoForTest(t) + dispInfo.epoch = 1 + dispInfo.startTs = 100 + changefeedStatus := broker.getOrSetChangefeedStatus(dispInfo.GetChangefeedID(), dispInfo.GetSyncPointInterval()) + disp := newDispatcherStat(dispInfo, 1, 1, nil, changefeedStatus) + + // Upstream hasn't caught up yet, broker should synthesize handshake + resolvedTs. + broker.sendBootstrapResolvedTsIfNeeded(disp) + handshakeEvent := <-broker.messageCh[disp.messageWorkerIndex] + require.Equal(t, event.TypeHandshakeEvent, handshakeEvent.msgType) + resolvedEvent := <-broker.messageCh[disp.messageWorkerIndex] + require.Equal(t, event.TypeResolvedEvent, resolvedEvent.msgType) + require.Equal(t, oracle.GoTimeToTS(oracle.GetTimeFromTS(dispInfo.startTs).Add(time.Second)), resolvedEvent.resolvedTsEvent.ResolvedTs) + // Synthetic resolvedTs should move scan progress to skip data. + require.Equal(t, resolvedEvent.resolvedTsEvent.ResolvedTs, disp.lastScannedCommitTs.Load()) + + // Upstream catches up, synthetic advancement should stop. + disp.hasReceivedFirstResolvedTs.Store(true) + disp.receivedResolvedTs.Store(disp.sentResolvedTs.Load()) + broker.sendBootstrapResolvedTsIfNeeded(disp) + select { + case <-broker.messageCh[disp.messageWorkerIndex]: + require.Fail(t, "unexpected synthetic resolvedTs after upstream catch up") + default: + } +} + func TestResetDispatcher(t *testing.T) { broker, _, _, _ := newEventBrokerForTest() defer broker.close()