Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 129 additions & 31 deletions logservice/schemastore/persist_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/filter"
"github.com/pingcap/ticdc/pkg/retry"
"github.com/pingcap/ticdc/pkg/txnutil/gc"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/model"
Expand Down Expand Up @@ -169,36 +170,87 @@ func (p *persistentStorage) getGcSafePoint(ctx context.Context) (uint64, error)
return gc.UnifyGetServiceGCSafepoint(ctx, p.pdCli, p.keyspaceID, defaultSchemaStoreGcServiceID)
}

func (p *persistentStorage) initialize(ctx context.Context) error {
var gcSafePoint uint64
// ensuredGcSafePoint describes one bootstrap attempt:
// ts is the gc safepoint observed before reading schema metadata, and cleanup
// removes the temporary GC protection created for ts+1 by EnsureChangefeedStartTsSafety.
// It only protects the next restart point during bootstrap. The current
// bootstrap still reads metadata at ts, so a GC race can still force us to
// discard it and acquire a newer one.
type ensuredGcSafePoint struct {
ts uint64
cleanup func()
}

func (g *ensuredGcSafePoint) close() {
if g != nil && g.cleanup != nil {
g.cleanup()
}
}

func (p *persistentStorage) acquireEnsuredGcSafePoint(ctx context.Context) (*ensuredGcSafePoint, error) {
fakeChangefeedID := common.NewChangefeedID(defaultSchemaStoreGcServiceID)
for {
var gcSafePoint uint64
err := retry.Do(ctx, func() error {
var err error
gcSafePoint, err = p.getGcSafePoint(ctx)
if err == nil {
log.Info("get gc safepoint success", zap.Uint32("keyspaceID", p.keyspaceID), zap.Any("gcSafePoint", gcSafePoint))
// Ensure the start ts is valid during the gc service ttl
err = gc.EnsureChangefeedStartTsSafety(
ctx,
p.pdCli,
defaultSchemaStoreGcServiceID,
p.keyspaceID,
fakeChangefeedID,
defaultGcServiceTTL, gcSafePoint+1)
if err == nil {
break
}
if err != nil {
log.Warn("get ts failed, will retry in 1s", zap.Error(err))
return err
}

log.Warn("get ts failed, will retry in 1s", zap.Error(err))
select {
case <-ctx.Done():
return errors.Trace(err)
case <-time.After(time.Second):
log.Info("get gc safepoint success", zap.Uint32("keyspaceID", p.keyspaceID), zap.Any("gcSafePoint", gcSafePoint))
// Protect gcSafePoint+1 from future GC before bootstrap proceeds so the next
// restart point does not become invalid while schema store is initializing.
err = gc.EnsureChangefeedStartTsSafety(
ctx,
p.pdCli,
defaultSchemaStoreGcServiceID,
p.keyspaceID,
fakeChangefeedID,
defaultGcServiceTTL,
gcSafePoint+1,
)
if err != nil {
log.Warn("ensure gc start ts safety failed, will retry in 1s",
zap.Uint32("keyspaceID", p.keyspaceID),
zap.Uint64("startTs", gcSafePoint+1),
zap.Error(err))
}
return err
},
retry.WithBackoffBaseDelay(1000),
retry.WithBackoffMaxDelay(1000),
retry.WithIsRetryableErr(errors.IsRetryableError),
)
if err != nil {
return nil, errors.Trace(err)
}
cleanup := func() {
cleanupCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := gc.UndoEnsureChangefeedStartTsSafety(
cleanupCtx,
p.pdCli,
p.keyspaceID,
defaultSchemaStoreGcServiceID,
fakeChangefeedID,
); err != nil {
log.Warn("undo ensure gc start ts safety failed",
zap.Uint32("keyspaceID", p.keyspaceID),
zap.Error(err))
}
}
return &ensuredGcSafePoint{
ts: gcSafePoint,
cleanup: cleanup,
}, nil
Comment on lines +190 to +245
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Don't tear down the refreshed GC barrier.

Line 314 closes the old handle after Line 310 has already acquired a new one, but acquireEnsuredGcSafePoint always derives the same GC service ID from defaultSchemaStoreGcServiceID. That means this undo call deletes the freshly refreshed protection too, so the next retry is no longer guarded against GC and can hit the same bootstrap failure again.

🔧 Proposed fix
 			newEnsuredGcTs, refreshErr := p.acquireEnsuredGcSafePoint(ctx)
 			if refreshErr != nil {
 				return refreshErr
 			}
-			ensuredGcTs.close()
 			ensuredGcTs = newEnsuredGcTs
 			return err

Also applies to: 249-253, 310-315

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@logservice/schemastore/persist_storage.go` around lines 190 - 245, The
current acquireEnsuredGcSafePoint uses a fixed fakeChangefeedID
(common.NewChangefeedID(defaultSchemaStoreGcServiceID)) so the cleanup's
UndoEnsureChangefeedStartTsSafety can delete a freshly refreshed GC barrier; fix
by generating a unique changefeed ID inside the retry/ensure call (use a new
local e.g., generatedChangefeedID each attempt) and only capture that exact
generated ID when constructing the cleanup closure after
EnsureChangefeedStartTsSafety returns success; ensure
EnsureChangefeedStartTsSafety and UndoEnsureChangefeedStartTsSafety are called
with the same generatedChangefeedID (not the constant
defaultSchemaStoreGcServiceID-based one) so cleanup only removes the specific
barrier you created, and move creation of the cleanup function to after the
retry loop where gcSafePoint and the successful generated changefeed ID are
known.

}

defer gc.UndoEnsureChangefeedStartTsSafety(ctx, p.pdCli, p.keyspaceID, defaultSchemaStoreGcServiceID, fakeChangefeedID)
func (p *persistentStorage) initialize(ctx context.Context) error {
ensuredGcTs, err := p.acquireEnsuredGcSafePoint(ctx)
if err != nil {
return err
}
defer ensuredGcTs.close()

dbPath := fmt.Sprintf("%s/%s/%d", p.rootDir, dataDir, p.keyspaceID)

Expand All @@ -216,14 +268,14 @@ func (p *persistentStorage) initialize(ctx context.Context) error {
if err != nil {
isDataReusable = false
}
if gcSafePoint < gcTs {
return errors.New(fmt.Sprintf("gc safe point %d is smaller than gcTs %d on disk", gcSafePoint, gcTs))
if ensuredGcTs.ts < gcTs {
return errors.New(fmt.Sprintf("gc safe point %d is smaller than gcTs %d on disk", ensuredGcTs.ts, gcTs))
}
upperBound, err := readUpperBoundMeta(db)
if err != nil {
isDataReusable = false
}
if gcSafePoint >= upperBound.ResolvedTs {
if ensuredGcTs.ts >= upperBound.ResolvedTs {
isDataReusable = false
}

Expand All @@ -237,15 +289,47 @@ func (p *persistentStorage) initialize(ctx context.Context) error {
}
}
if !isDataReusable {
p.initializeFromKVStorage(dbPath, gcSafePoint)
// Why retry here:
// EnsureChangefeedStartTsSafety protects startTs=gcSafePoint+1 from future GC,
// but bootstrap snapshot is still read at gcSafePoint. If upstream GC advances
// while reading metadata, TiDB can return "GC life time is shorter than
// transaction duration". In that case, refresh gcSafePoint and retry bootstrap
// instead of crashing the process.
err := retry.Do(ctx, func() error {
err := p.initializeFromKVStorage(dbPath, ensuredGcTs.ts)
if err == nil {
return nil
}
if !isRetryableInitializeFromKVStorageError(err) {
return err
}
log.Warn("initialize from kv snapshot failed due to stale snapshot, will retry with latest gc safepoint",
zap.Uint32("keyspaceID", p.keyspaceID),
zap.Uint64("snapTs", ensuredGcTs.ts),
zap.Error(err))
newEnsuredGcTs, refreshErr := p.acquireEnsuredGcSafePoint(ctx)
if refreshErr != nil {
return refreshErr
}
ensuredGcTs.close()
ensuredGcTs = newEnsuredGcTs
return err
},
retry.WithBackoffBaseDelay(1000),
retry.WithBackoffMaxDelay(1000),
retry.WithIsRetryableErr(isRetryableInitializeFromKVStorageError),
)
if err != nil {
return errors.Trace(err)
}
}
return nil
}

func (p *persistentStorage) initializeFromKVStorage(dbPath string, gcTs uint64) {
func (p *persistentStorage) initializeFromKVStorage(dbPath string, gcTs uint64) error {
now := time.Now()
if err := os.RemoveAll(dbPath); err != nil {
log.Fatal("fail to remove path in initializeFromKVStorage")
return errors.Trace(err)
}
p.db = openDB(dbPath)

Expand All @@ -254,8 +338,10 @@ func (p *persistentStorage) initializeFromKVStorage(dbPath string, gcTs uint64)

var err error
if p.databaseMap, p.tableMap, p.partitionMap, err = persistSchemaSnapshot(p.db, p.kvStorage, gcTs, true); err != nil {
// TODO: retry
log.Fatal("fail to initialize from kv snapshot", zap.Error(err))
if closeErr := p.db.Close(); closeErr != nil {
log.Warn("close db failed after initialize from kv snapshot failed", zap.Error(closeErr))
}
return errors.Trace(err)
}

p.gcTs = gcTs
Expand All @@ -269,6 +355,18 @@ func (p *persistentStorage) initializeFromKVStorage(dbPath string, gcTs uint64)
zap.Int("databaseMapLen", len(p.databaseMap)),
zap.Int("tableMapLen", len(p.tableMap)),
zap.Any("duration(s)", time.Since(now).Seconds()))
return nil
}

func isRetryableInitializeFromKVStorageError(err error) bool {
if err == nil {
return false
}
if isGCLifeTimeError(err) {
return true
}
code, ok := errors.RFCCode(err)
return ok && code == errors.ErrSnapshotLostByGC.RFCCode()
}

func (p *persistentStorage) initializeFromDisk() {
Expand Down
13 changes: 13 additions & 0 deletions logservice/schemastore/persist_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/common"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/filter"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/ast"
Expand Down Expand Up @@ -2894,6 +2895,18 @@ func TestRegisterTable(t *testing.T) {
}
}

func TestIsRetryableInitializeFromKVStorageError(t *testing.T) {
require.True(t, isRetryableInitializeFromKVStorageError(
fmt.Errorf("snapshot is lost because GC life time is shorter than transaction duration"),
))
require.True(t, isRetryableInitializeFromKVStorageError(
cerror.ErrSnapshotLostByGC.GenWithStackByArgs(100, 200),
))
require.False(t, isRetryableInitializeFromKVStorageError(
fmt.Errorf("non retryable error"),
))
}

func TestGCPersistStorage(t *testing.T) {
dbPath := fmt.Sprintf("/tmp/testdb-%s", t.Name())
err := os.RemoveAll(dbPath)
Expand Down
Loading