From 4bf750bc14bfd8d00a77ea21c935ae4065a22b25 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Sat, 9 May 2026 23:46:33 +0800 Subject: [PATCH] eventstore: fix checkpoint update race --- logservice/eventstore/event_store.go | 100 +++++++++++----------- logservice/eventstore/event_store_test.go | 4 +- 2 files changed, 52 insertions(+), 52 deletions(-) diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index 4138cb3609..30b07fa8ca 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -117,7 +117,7 @@ type dispatcherStat struct { resolvedTs atomic.Uint64 // the max ts of events which is not needed by this dispatcher - checkpointTs uint64 + checkpointTs atomic.Uint64 // the difference between `subStat`, `pendingSubStat` and `removingSubStat`: // 1) if there is no existing subscriptions which can be reused, // or there is a existing subscription with exact span match, @@ -500,8 +500,8 @@ func (e *eventStore) RegisterDispatcher( stat := &dispatcherStat{ dispatcherID: dispatcherID, tableSpan: dispatcherSpan, - checkpointTs: startTs, } + stat.checkpointTs.Store(startTs) stat.resolvedTs.Store(startTs) wrappedNotifier := func(resolvedTs uint64, latestCommitTs uint64) { @@ -712,7 +712,7 @@ func (e *eventStore) UpdateDispatcherCheckpointTs( if !ok { return } - dispatcherStat.checkpointTs = checkpointTs + dispatcherStat.checkpointTs.Store(checkpointTs) updateSubStatCheckpoint := func(subStat *subscriptionStat) { if subStat == nil { @@ -732,8 +732,9 @@ func (e *eventStore) UpdateDispatcherCheckpointTs( continue } - if newCheckpointTs == 0 || dispatcherStat.checkpointTs < newCheckpointTs { - newCheckpointTs = dispatcherStat.checkpointTs + dispatcherCheckpointTs := dispatcherStat.checkpointTs.Load() + if newCheckpointTs == 0 || dispatcherCheckpointTs < newCheckpointTs { + newCheckpointTs = dispatcherCheckpointTs } } @@ -746,44 +747,39 @@ func (e *eventStore) UpdateDispatcherCheckpointTs( if newCheckpointTs == 0 { return } - oldCheckpointTs := subStat.checkpointTs.Load() - if newCheckpointTs == oldCheckpointTs { - return - } - if newCheckpointTs < oldCheckpointTs { - log.Panic("should not happen", - zap.Uint64("newCheckpointTs", newCheckpointTs), - zap.Uint64("oldCheckpointTs", oldCheckpointTs)) - } - // If there is no dml event after old checkpoint ts, then there is no data to be deleted. - // So we can skip adding gc item. - lastReceiveDMLTime := subStat.lastReceiveDMLTime.Load() - if lastReceiveDMLTime > 0 { - oldCheckpointPhysicalTime := oracle.GetTimeFromTS(oldCheckpointTs) - if lastReceiveDMLTime >= oldCheckpointPhysicalTime.UnixMilli() { - e.gcManager.addGCItem( - subStat.dbIndex, - uint64(subStat.subID), - subStat.tableSpan.TableID, - oldCheckpointTs, - newCheckpointTs, - ) + for { + oldCheckpointTs := subStat.checkpointTs.Load() + if newCheckpointTs == oldCheckpointTs { + return } - } - e.subscriptionChangeCh.In() <- SubscriptionChange{ - ChangeType: SubscriptionChangeTypeUpdate, - SubID: uint64(subStat.subID), - Span: subStat.tableSpan, - CheckpointTs: newCheckpointTs, - ResolvedTs: subStat.resolvedTs.Load(), - } - subStat.checkpointTs.Store(newCheckpointTs) - if log.GetLevel() <= zap.DebugLevel { - log.Debug("update checkpoint ts", - zap.Any("dispatcherID", dispatcherID), - zap.Uint64("subscriptionID", uint64(subStat.subID)), - zap.Uint64("newCheckpointTs", newCheckpointTs), - zap.Uint64("oldCheckpointTs", oldCheckpointTs)) + if newCheckpointTs < oldCheckpointTs { + return + } + if !subStat.checkpointTs.CompareAndSwap(oldCheckpointTs, newCheckpointTs) { + continue + } + e.gcManager.addGCItem( + subStat.dbIndex, + uint64(subStat.subID), + subStat.tableSpan.TableID, + oldCheckpointTs, + newCheckpointTs, + ) + e.subscriptionChangeCh.In() <- SubscriptionChange{ + ChangeType: SubscriptionChangeTypeUpdate, + SubID: uint64(subStat.subID), + Span: subStat.tableSpan, + CheckpointTs: newCheckpointTs, + ResolvedTs: subStat.resolvedTs.Load(), + } + if log.GetLevel() <= zap.DebugLevel { + log.Debug("update checkpoint ts", + zap.Any("dispatcherID", dispatcherID), + zap.Uint64("subscriptionID", uint64(subStat.subID)), + zap.Uint64("newCheckpointTs", newCheckpointTs), + zap.Uint64("oldCheckpointTs", oldCheckpointTs)) + } + return } } updateSubStatCheckpoint(dispatcherStat.subStat) @@ -1506,7 +1502,7 @@ func (iter *eventStoreIter) Next() (*common.RawKVEntry, bool) { } comparableKey := common.ToComparableKey(rawKV.Key) if bytes.Compare(comparableKey, iter.tableSpan.StartKey) >= 0 && - bytes.Compare(comparableKey, iter.tableSpan.EndKey) <= 0 { + bytes.Compare(comparableKey, iter.tableSpan.EndKey) < 0 { break } log.Debug("event store iter skip kv not in table span", @@ -1650,17 +1646,21 @@ func (e *eventStore) uploadStatePeriodically(ctx context.Context) error { log.Warn("cannot find subscription state", zap.Uint64("subscriptionID", change.SubID)) continue } - if change.CheckpointTs < tableState.Subscriptions[targetIndex].CheckpointTs || - change.ResolvedTs < tableState.Subscriptions[targetIndex].ResolvedTs { - log.Panic("should not happen", + subState := tableState.Subscriptions[targetIndex] + if change.CheckpointTs < subState.CheckpointTs || change.ResolvedTs < subState.ResolvedTs { + log.Warn("ignore stale subscription state update", zap.Uint64("subscriptionID", change.SubID), - zap.Uint64("oldCheckpointTs", tableState.Subscriptions[targetIndex].CheckpointTs), - zap.Uint64("oldResolvedTs", tableState.Subscriptions[targetIndex].ResolvedTs), + zap.Uint64("oldCheckpointTs", subState.CheckpointTs), + zap.Uint64("oldResolvedTs", subState.ResolvedTs), zap.Uint64("newCheckpointTs", change.CheckpointTs), zap.Uint64("newResolvedTs", change.ResolvedTs)) } - tableState.Subscriptions[targetIndex].CheckpointTs = change.CheckpointTs - tableState.Subscriptions[targetIndex].ResolvedTs = change.ResolvedTs + if change.CheckpointTs > subState.CheckpointTs { + subState.CheckpointTs = change.CheckpointTs + } + if change.ResolvedTs > subState.ResolvedTs { + subState.ResolvedTs = change.ResolvedTs + } default: log.Panic("invalid subscription change type", zap.Int("changeType", int(change.ChangeType))) } diff --git a/logservice/eventstore/event_store_test.go b/logservice/eventstore/event_store_test.go index 5b3a8ac9ec..618c481f8c 100644 --- a/logservice/eventstore/event_store_test.go +++ b/logservice/eventstore/event_store_test.go @@ -1248,8 +1248,8 @@ func TestEventStoreIter_NextWithFiltering(t *testing.T) { var tableID int64 = 42 iteratorSpan := &heartbeatpb.TableSpan{ TableID: tableID, - StartKey: []byte("keyB"), - EndKey: []byte("keyD"), + StartKey: common.ToComparableKey([]byte("keyB")), + EndKey: common.ToComparableKey([]byte("keyD")), } // This test now focuses on a single, more comprehensive scenario.