Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 50 additions & 50 deletions logservice/eventstore/event_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ type dispatcherStat struct {

resolvedTs atomic.Uint64
// the max ts of events which is not needed by this dispatcher
checkpointTs uint64
checkpointTs atomic.Uint64
// the difference between `subStat`, `pendingSubStat` and `removingSubStat`:
// 1) if there is no existing subscriptions which can be reused,
// or there is a existing subscription with exact span match,
Expand Down Expand Up @@ -500,8 +500,8 @@ func (e *eventStore) RegisterDispatcher(
stat := &dispatcherStat{
dispatcherID: dispatcherID,
tableSpan: dispatcherSpan,
checkpointTs: startTs,
}
stat.checkpointTs.Store(startTs)
stat.resolvedTs.Store(startTs)

wrappedNotifier := func(resolvedTs uint64, latestCommitTs uint64) {
Expand Down Expand Up @@ -712,7 +712,7 @@ func (e *eventStore) UpdateDispatcherCheckpointTs(
if !ok {
return
}
dispatcherStat.checkpointTs = checkpointTs
dispatcherStat.checkpointTs.Store(checkpointTs)

Comment on lines +715 to 716
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Keep the per-dispatcher checkpoint monotonic too.

Line 715 can still overwrite a newer dispatcher checkpoint with a stale one. After that, the min recomputation can stay pinned below the real progress, so the subscription checkpoint and GC stop advancing until this dispatcher reports again.

Suggested fix
-	dispatcherStat.checkpointTs.Store(checkpointTs)
+	util.CompareAndMonotonicIncrease(&dispatcherStat.checkpointTs, checkpointTs)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
dispatcherStat.checkpointTs.Store(checkpointTs)
util.CompareAndMonotonicIncrease(&dispatcherStat.checkpointTs, checkpointTs)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@logservice/eventstore/event_store.go` around lines 715 - 716, The dispatcher
checkpoint write at dispatcherStat.checkpointTs.Store(checkpointTs) can regress
a newer per-dispatcher value; change the update to only advance the
per-dispatcher checkpoint if checkpointTs is greater than the currently stored
value by reading dispatcherStat.checkpointTs (via its Load or atomic read) and
using an atomic compare-and-swap loop to store the new checkpoint only when it
is strictly larger (i.e., retry CAS until success or current >= checkpointTs) so
dispatcherStat.checkpointTs never moves backward and min recomputation reflects
true progress.

updateSubStatCheckpoint := func(subStat *subscriptionStat) {
if subStat == nil {
Expand All @@ -732,8 +732,9 @@ func (e *eventStore) UpdateDispatcherCheckpointTs(
continue
}

if newCheckpointTs == 0 || dispatcherStat.checkpointTs < newCheckpointTs {
newCheckpointTs = dispatcherStat.checkpointTs
dispatcherCheckpointTs := dispatcherStat.checkpointTs.Load()
if newCheckpointTs == 0 || dispatcherCheckpointTs < newCheckpointTs {
newCheckpointTs = dispatcherCheckpointTs
}
}

Expand All @@ -746,44 +747,39 @@ func (e *eventStore) UpdateDispatcherCheckpointTs(
if newCheckpointTs == 0 {
return
}
oldCheckpointTs := subStat.checkpointTs.Load()
if newCheckpointTs == oldCheckpointTs {
return
}
if newCheckpointTs < oldCheckpointTs {
log.Panic("should not happen",
zap.Uint64("newCheckpointTs", newCheckpointTs),
zap.Uint64("oldCheckpointTs", oldCheckpointTs))
}
// If there is no dml event after old checkpoint ts, then there is no data to be deleted.
// So we can skip adding gc item.
lastReceiveDMLTime := subStat.lastReceiveDMLTime.Load()
if lastReceiveDMLTime > 0 {
oldCheckpointPhysicalTime := oracle.GetTimeFromTS(oldCheckpointTs)
if lastReceiveDMLTime >= oldCheckpointPhysicalTime.UnixMilli() {
e.gcManager.addGCItem(
subStat.dbIndex,
uint64(subStat.subID),
subStat.tableSpan.TableID,
oldCheckpointTs,
newCheckpointTs,
)
for {
oldCheckpointTs := subStat.checkpointTs.Load()
if newCheckpointTs == oldCheckpointTs {
return
}
}
e.subscriptionChangeCh.In() <- SubscriptionChange{
ChangeType: SubscriptionChangeTypeUpdate,
SubID: uint64(subStat.subID),
Span: subStat.tableSpan,
CheckpointTs: newCheckpointTs,
ResolvedTs: subStat.resolvedTs.Load(),
}
subStat.checkpointTs.Store(newCheckpointTs)
if log.GetLevel() <= zap.DebugLevel {
log.Debug("update checkpoint ts",
zap.Any("dispatcherID", dispatcherID),
zap.Uint64("subscriptionID", uint64(subStat.subID)),
zap.Uint64("newCheckpointTs", newCheckpointTs),
zap.Uint64("oldCheckpointTs", oldCheckpointTs))
if newCheckpointTs < oldCheckpointTs {
return
}
if !subStat.checkpointTs.CompareAndSwap(oldCheckpointTs, newCheckpointTs) {
continue
}
e.gcManager.addGCItem(
subStat.dbIndex,
uint64(subStat.subID),
subStat.tableSpan.TableID,
oldCheckpointTs,
newCheckpointTs,
)
e.subscriptionChangeCh.In() <- SubscriptionChange{
ChangeType: SubscriptionChangeTypeUpdate,
SubID: uint64(subStat.subID),
Span: subStat.tableSpan,
CheckpointTs: newCheckpointTs,
ResolvedTs: subStat.resolvedTs.Load(),
}
if log.GetLevel() <= zap.DebugLevel {
log.Debug("update checkpoint ts",
zap.Any("dispatcherID", dispatcherID),
zap.Uint64("subscriptionID", uint64(subStat.subID)),
zap.Uint64("newCheckpointTs", newCheckpointTs),
zap.Uint64("oldCheckpointTs", oldCheckpointTs))
}
return
}
}
updateSubStatCheckpoint(dispatcherStat.subStat)
Expand Down Expand Up @@ -1506,7 +1502,7 @@ func (iter *eventStoreIter) Next() (*common.RawKVEntry, bool) {
}
comparableKey := common.ToComparableKey(rawKV.Key)
if bytes.Compare(comparableKey, iter.tableSpan.StartKey) >= 0 &&
bytes.Compare(comparableKey, iter.tableSpan.EndKey) <= 0 {
bytes.Compare(comparableKey, iter.tableSpan.EndKey) < 0 {
break
}
log.Debug("event store iter skip kv not in table span",
Expand Down Expand Up @@ -1650,17 +1646,21 @@ func (e *eventStore) uploadStatePeriodically(ctx context.Context) error {
log.Warn("cannot find subscription state", zap.Uint64("subscriptionID", change.SubID))
continue
}
if change.CheckpointTs < tableState.Subscriptions[targetIndex].CheckpointTs ||
change.ResolvedTs < tableState.Subscriptions[targetIndex].ResolvedTs {
log.Panic("should not happen",
subState := tableState.Subscriptions[targetIndex]
if change.CheckpointTs < subState.CheckpointTs || change.ResolvedTs < subState.ResolvedTs {
log.Warn("ignore stale subscription state update",
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The log message "ignore stale subscription state update" is slightly misleading because the code does not ignore the update. It proceeds to update any timestamps that are advancing monotonically. A more accurate log message would be "received stale subscription state update", which correctly indicates that a stale value was detected while still processing the valid parts of the update. This would be clearer for anyone debugging potential message ordering issues.

Suggested change
log.Warn("ignore stale subscription state update",
log.Warn("received stale subscription state update",

zap.Uint64("subscriptionID", change.SubID),
zap.Uint64("oldCheckpointTs", tableState.Subscriptions[targetIndex].CheckpointTs),
zap.Uint64("oldResolvedTs", tableState.Subscriptions[targetIndex].ResolvedTs),
zap.Uint64("oldCheckpointTs", subState.CheckpointTs),
zap.Uint64("oldResolvedTs", subState.ResolvedTs),
zap.Uint64("newCheckpointTs", change.CheckpointTs),
zap.Uint64("newResolvedTs", change.ResolvedTs))
}
tableState.Subscriptions[targetIndex].CheckpointTs = change.CheckpointTs
tableState.Subscriptions[targetIndex].ResolvedTs = change.ResolvedTs
if change.CheckpointTs > subState.CheckpointTs {
subState.CheckpointTs = change.CheckpointTs
}
if change.ResolvedTs > subState.ResolvedTs {
subState.ResolvedTs = change.ResolvedTs
}
default:
log.Panic("invalid subscription change type", zap.Int("changeType", int(change.ChangeType)))
}
Expand Down
4 changes: 2 additions & 2 deletions logservice/eventstore/event_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1248,8 +1248,8 @@ func TestEventStoreIter_NextWithFiltering(t *testing.T) {
var tableID int64 = 42
iteratorSpan := &heartbeatpb.TableSpan{
TableID: tableID,
StartKey: []byte("keyB"),
EndKey: []byte("keyD"),
StartKey: common.ToComparableKey([]byte("keyB")),
EndKey: common.ToComparableKey([]byte("keyD")),
}

// This test now focuses on a single, more comprehensive scenario.
Expand Down
Loading