Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions pkg/eventservice/dispatcher_stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,25 @@ func (a *dispatcherStat) setHandshaked() {
}

func (a *dispatcherStat) updateSentResolvedTs(resolvedTs uint64) {
a.sentResolvedTs.Store(resolvedTs)
a.lastSentResolvedTsTime.Store(time.Now())
a.updateSentResolvedTsOnly(resolvedTs)
a.updateScanRange(resolvedTs, 0)
}

func (a *dispatcherStat) updateSentResolvedTsOnly(resolvedTs uint64) {
// Keep sentResolvedTs monotonic when resolvedTs can be emitted by multiple goroutines.
for {
old := a.sentResolvedTs.Load()
if resolvedTs < old {
resolvedTs = old
break
}
if old == resolvedTs || a.sentResolvedTs.CompareAndSwap(old, resolvedTs) {
break
}
}
Comment on lines +213 to +223

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic in this loop to ensure sentResolvedTs is monotonic is correct, but it could be simplified for better readability and maintainability. The current implementation with two separate if conditions and modification of the resolvedTs parameter can be made more straightforward.

Suggested change
// Keep sentResolvedTs monotonic when resolvedTs can be emitted by multiple goroutines.
for {
old := a.sentResolvedTs.Load()
if resolvedTs < old {
resolvedTs = old
break
}
if old == resolvedTs || a.sentResolvedTs.CompareAndSwap(old, resolvedTs) {
break
}
}
for {
old := a.sentResolvedTs.Load()
if resolvedTs <= old {
resolvedTs = old
break
}
if a.sentResolvedTs.CompareAndSwap(old, resolvedTs) {
break
}
}

a.lastSentResolvedTsTime.Store(time.Now())
}

func (a *dispatcherStat) updateScanRange(txnCommitTs, txnStartTs uint64) {
a.lastScannedCommitTs.Store(txnCommitTs)
a.lastScannedStartTs.Store(txnStartTs)
Expand Down
5 changes: 5 additions & 0 deletions pkg/eventservice/dispatcher_stat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ func TestDispatcherStatGetDataRange(t *testing.T) {
r, ok = stat.getDataRange()
require.False(t, ok)

// case 3.1: sent resolved ts should not go backward.
stat.updateSentResolvedTs(150)
require.Equal(t, uint64(200), stat.sentResolvedTs.Load())
require.Equal(t, uint64(200), stat.lastScannedCommitTs.Load())

// case 4: get range after resolved ts update again
stat.onResolvedTs(300)
r, ok = stat.getDataRange()
Expand Down
73 changes: 73 additions & 0 deletions pkg/eventservice/event_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ const (

defaultMaxBatchSize = 128
defaultFlushResolvedTsInterval = 25 * time.Millisecond
// defaultBootstrapResolvedTsInterval controls the synthetic resolvedTs tick
// before upstream resolvedTs catches up.
defaultBootstrapResolvedTsInterval = time.Second
bootstrapResolvedTsStep = time.Second

defaultReportDispatcherStatToStoreInterval = time.Second * 10

Expand Down Expand Up @@ -173,6 +177,10 @@ func newEventBroker(
return c.logUninitializedDispatchers(ctx)
})

g.Go(func() error {
return c.tickBootstrapResolvedTs(ctx)
})

g.Go(func() error {
return c.reportDispatcherStatToStore(ctx, defaultReportDispatcherStatToStoreInterval)
})
Expand Down Expand Up @@ -297,6 +305,25 @@ func (c *eventBroker) sendResolvedTs(d *dispatcherStat, watermark uint64) {
updateMetricEventServiceSendResolvedTsCount(d.info.GetMode())
}

func (c *eventBroker) sendBootstrapResolvedTs(d *dispatcherStat, watermark uint64) bool {
remoteID := node.ID(d.info.GetServerID())
c.emitSyncPointEventIfNeeded(watermark, d, remoteID)
re := event.NewResolvedEvent(watermark, d.id, d.epoch)
re.Seq = d.seq.Load()
resolvedEvent := newWrapResolvedEvent(remoteID, re)
ch := c.getMessageCh(d.messageWorkerIndex, common.IsRedoMode(d.info.GetMode()))
select {
case ch <- resolvedEvent:
d.updateSentResolvedTs(watermark)
updateMetricEventServiceSendResolvedTsCount(d.info.GetMode())
return true
default:
// Avoid blocking broker tick path. The next ticker round will retry.
resolvedEvent.reset()
return false
}
}

func (c *eventBroker) sendNotReusableEvent(
server node.ID,
d *dispatcherStat,
Expand Down Expand Up @@ -402,6 +429,48 @@ func (c *eventBroker) logUninitializedDispatchers(ctx context.Context) error {
}
}

func (c *eventBroker) tickBootstrapResolvedTs(ctx context.Context) error {
ticker := time.NewTicker(defaultBootstrapResolvedTsInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return context.Cause(ctx)
case <-ticker.C:
c.dispatchers.Range(func(_, value interface{}) bool {
dispatcher := value.(*atomic.Pointer[dispatcherStat]).Load()
c.sendBootstrapResolvedTsIfNeeded(dispatcher)
return true
})
Comment on lines +441 to +445

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This loop only iterates over c.dispatchers. However, other periodic functions in this file, such as logUninitializedDispatchers and reportDispatcherStatToStore, iterate over both c.dispatchers and c.tableTriggerDispatchers.

tableTriggerDispatchers could also stall if they don't receive updates from the schema store. To prevent this, they should also be included in the bootstrap resolved timestamp mechanism. Please consider adding a similar loop for c.tableTriggerDispatchers.

			c.dispatchers.Range(func(_, value interface{}) bool {
				dispatcher := value.(*atomic.Pointer[dispatcherStat]).Load()
				c.sendBootstrapResolvedTsIfNeeded(dispatcher)
				return true
			})
			c.tableTriggerDispatchers.Range(func(_, value interface{}) bool {
				dispatcher := value.(*atomic.Pointer[dispatcherStat]).Load()
				c.sendBootstrapResolvedTsIfNeeded(dispatcher)
				return true
			})

}
}
}

func (c *eventBroker) sendBootstrapResolvedTsIfNeeded(dispatcher *dispatcherStat) {
if dispatcher == nil || dispatcher.isRemoved.Load() {
return
}

// For epoch 0 dispatchers, drive the ready/reset handshake first.
if dispatcher.epoch == 0 {
c.checkAndSendReady(dispatcher)
return
}

sentResolvedTs := dispatcher.sentResolvedTs.Load()
receivedResolvedTs := dispatcher.receivedResolvedTs.Load()
// Stop synthetic advancement once upstream resolved-ts catches up.
if dispatcher.hasReceivedFirstResolvedTs.Load() && receivedResolvedTs >= sentResolvedTs {
return
}

c.sendHandshakeIfNeed(dispatcher)
nextResolvedTs := oracle.GoTimeToTS(oracle.GetTimeFromTS(sentResolvedTs).Add(bootstrapResolvedTsStep))
c.sendBootstrapResolvedTs(dispatcher, nextResolvedTs)
log.Debug("fizz send resolvedTs", zap.Any("dispatcherID", dispatcher.id), zap.Any("resolvedTs", nextResolvedTs))
}

// getScanTaskDataRange determines the valid data range for scanning a given task.
// It checks various conditions (dispatcher status, DDL state, max commit ts of dml event)
// to decide whether scanning is needed and returns the appropriate time range.
Expand Down Expand Up @@ -1043,6 +1112,10 @@ func (c *eventBroker) addDispatcher(info DispatcherInfo) error {
}
c.dispatchers.Store(id, dispatcherPtr)
c.metricsCollector.metricDispatcherCount.Inc()
if dispatcher.epoch > 0 {
c.sendHandshakeIfNeed(dispatcher)
c.sendBootstrapResolvedTs(dispatcher, dispatcher.startTs)
}
log.Info("register dispatcher",
zap.Uint64("clusterID", c.tidbClusterID),
zap.Stringer("changefeedID", changefeedID),
Expand Down
65 changes: 65 additions & 0 deletions pkg/eventservice/event_broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,71 @@ func TestCURDDispatcher(t *testing.T) {
require.False(t, ok, "changefeedStatus should be removed after the last dispatcher is removed")
}

func TestAddDispatcherSendsStartTsResolvedAfterRegister(t *testing.T) {
broker, _, _, _ := newEventBrokerForTest()
// Close the broker so we can assert the raw message channel ordering.
broker.close()

dispInfo := newMockDispatcherInfoForTest(t)
dispInfo.epoch = 1
dispInfo.startTs = 100

require.NoError(t, broker.addDispatcher(dispInfo))

disp := broker.getDispatcher(dispInfo.GetID()).Load()
require.NotNil(t, disp)
require.True(t, disp.isHandshaked())

handshakeEvent := <-broker.messageCh[disp.messageWorkerIndex]
require.Equal(t, event.TypeHandshakeEvent, handshakeEvent.msgType)

resolvedEvent := <-broker.messageCh[disp.messageWorkerIndex]
require.Equal(t, event.TypeResolvedEvent, resolvedEvent.msgType)
require.Equal(t, dispInfo.startTs, resolvedEvent.resolvedTsEvent.ResolvedTs)
require.Equal(t, uint64(1), resolvedEvent.resolvedTsEvent.Seq)
}

func TestSendBootstrapResolvedTsIfNeeded(t *testing.T) {
broker, _, _, _ := newEventBrokerForTest()
// Close the broker so we can assert the raw message channel ordering.
broker.close()

// For epoch 0 dispatcher, broker should drive ready/reset flow first.
dispInfoEpoch0 := newMockDispatcherInfoForTest(t)
dispInfoEpoch0.epoch = 0
changefeedStatusEpoch0 := broker.getOrSetChangefeedStatus(dispInfoEpoch0.GetChangefeedID(), dispInfoEpoch0.GetSyncPointInterval())
dispEpoch0 := newDispatcherStat(dispInfoEpoch0, 1, 1, nil, changefeedStatusEpoch0)
broker.sendBootstrapResolvedTsIfNeeded(dispEpoch0)
readyEvent := <-broker.messageCh[dispEpoch0.messageWorkerIndex]
require.Equal(t, event.TypeReadyEvent, readyEvent.msgType)

dispInfo := newMockDispatcherInfoForTest(t)
dispInfo.epoch = 1
dispInfo.startTs = 100
changefeedStatus := broker.getOrSetChangefeedStatus(dispInfo.GetChangefeedID(), dispInfo.GetSyncPointInterval())
disp := newDispatcherStat(dispInfo, 1, 1, nil, changefeedStatus)

// Upstream hasn't caught up yet, broker should synthesize handshake + resolvedTs.
broker.sendBootstrapResolvedTsIfNeeded(disp)
handshakeEvent := <-broker.messageCh[disp.messageWorkerIndex]
require.Equal(t, event.TypeHandshakeEvent, handshakeEvent.msgType)
resolvedEvent := <-broker.messageCh[disp.messageWorkerIndex]
require.Equal(t, event.TypeResolvedEvent, resolvedEvent.msgType)
require.Equal(t, oracle.GoTimeToTS(oracle.GetTimeFromTS(dispInfo.startTs).Add(time.Second)), resolvedEvent.resolvedTsEvent.ResolvedTs)
// Synthetic resolvedTs should move scan progress to skip data.
require.Equal(t, resolvedEvent.resolvedTsEvent.ResolvedTs, disp.lastScannedCommitTs.Load())

// Upstream catches up, synthetic advancement should stop.
disp.hasReceivedFirstResolvedTs.Store(true)
disp.receivedResolvedTs.Store(disp.sentResolvedTs.Load())
broker.sendBootstrapResolvedTsIfNeeded(disp)
select {
case <-broker.messageCh[disp.messageWorkerIndex]:
require.Fail(t, "unexpected synthetic resolvedTs after upstream catch up")
default:
}
}

func TestResetDispatcher(t *testing.T) {
broker, _, _, _ := newEventBrokerForTest()
defer broker.close()
Expand Down
Loading