Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.25.5

require (
cloud.google.com/go/storage v1.39.1
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.16.0
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.20.0
github.com/BurntSushi/toml v1.5.0
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/IBM/sarama v1.41.2
Expand Down Expand Up @@ -65,7 +65,7 @@ require (
github.com/soheilhy/cmux v0.1.5
github.com/spf13/cobra v1.9.1
github.com/spf13/pflag v1.0.7
github.com/stretchr/testify v1.10.0
github.com/stretchr/testify v1.11.1
github.com/thanhpk/randstr v1.0.6
github.com/tikv/client-go/v2 v2.0.8-0.20250304121540-cc8b9491145b
github.com/tikv/pd v1.1.0-beta.0.20251113050911-303c6c3b403e
Expand Down Expand Up @@ -105,11 +105,11 @@ require (
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect
github.com/99designs/keyring v1.2.1 // indirect
github.com/AthenZ/athenz v1.10.39 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.10.1 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.11.2 // indirect
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0 // indirect
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v1.4.2 // indirect
github.com/DataDog/zstd v1.5.5 // indirect
github.com/KyleBanks/depth v1.2.1 // indirect
github.com/Masterminds/semver v1.5.0 // indirect
Expand Down
30 changes: 20 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -829,20 +829,24 @@ github.com/99designs/keyring v1.2.1 h1:tYLp1ULvO7i3fI5vE21ReQuj99QFSs7lGm0xWyJo8
github.com/99designs/keyring v1.2.1/go.mod h1:fc+wB5KTk9wQ9sDx0kFXB3A0MaeGHM9AwRStKOQ5vOA=
github.com/AthenZ/athenz v1.10.39 h1:mtwHTF/v62ewY2Z5KWhuZgVXftBej1/Tn80zx4DcawY=
github.com/AthenZ/athenz v1.10.39/go.mod h1:3Tg8HLsiQZp81BJY58JBeU2BR6B/H4/0MQGfCwhHNEA=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.16.0 h1:JZg6HRh6W6U4OLl6lk7BZ7BLisIzM9dG1R50zUk9C/M=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.16.0/go.mod h1:YL1xnZ6QejvQHWJrX/AvhFl4WW4rqHVoKspWNVwFk0M=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0 h1:tfLQ34V6F7tVSwoTf/4lH5sE0o6eCJuNDTmH09nDpbc=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0/go.mod h1:9kIvujWAA58nmPmWB1m23fyWic1kYZMxD9CxaWn4Qpg=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 h1:ywEEhmNahHBihViHepv3xPBn1663uRv2t2q/ESv9seY=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0/go.mod h1:iZDifYGJTIgIIkYRNWPENUnqx6bJ2xnSDFI2tjwZNuY=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.20.0 h1:JXg2dwJUmPB9JmtVmdEB16APJ7jurfbY5jnfXpJoRMc=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.20.0/go.mod h1:YD5h/ldMsG0XiIw7PdyNhLxaM317eFh5yNLccNfGdyw=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.10.1 h1:B+blDbyVIG3WaikNxPnhPiJ1MThR03b3vKGtER95TP4=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.10.1/go.mod h1:JdM5psgjfBf5fo2uWOZhflPWyDBZ/O/CNAH9CtsuZE4=
github.com/Azure/azure-sdk-for-go/sdk/azidentity/cache v0.3.2 h1:yz1bePFlP5Vws5+8ez6T3HWXPmwOK7Yvq8QxDBD3SKY=
github.com/Azure/azure-sdk-for-go/sdk/azidentity/cache v0.3.2/go.mod h1:Pa9ZNPuoNu/GztvBSKk9J1cDJW6vk/n0zLtV4mgd8N8=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.11.2 h1:9iefClla7iYpfYWdzPCRDozdmndjTm8DXdpCzPajMgA=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.11.2/go.mod h1:XtLgD3ZD34DAaVIIAyG3objl5DynM3CQ/vMcbBNJZGI=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0 h1:u/LLAOFgsMv7HmNL4Qufg58y+qElGOt5qv0z1mURkRY=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0/go.mod h1:2e8rMJtl2+2j+HXbTBwnyGpm5Nou7KhvSfxOq8JpTag=
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8=
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 h1:mFRzDkZVAjdal+s7s0MwaRv9igoPqLRdzOLzw/8Xvq8=
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358/go.mod h1:chxPXzSsl7ZWRAuOIE23GDNzjWuZquvFlgA8xmpunjU=
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 h1:XHOnouVk1mxXfQidrMEnLlPk9UMeRtyBTnEFtxkV0kU=
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI=
github.com/AzureAD/microsoft-authentication-extensions-for-go/cache v0.1.1 h1:WJTmL004Abzc5wDB5VtZG2PJk5ndYDgVacGqfirKxjM=
github.com/AzureAD/microsoft-authentication-extensions-for-go/cache v0.1.1/go.mod h1:tCcJZ0uHAmvjsVYzEFivsRTN00oz5BEsRgQHu5JZ9WE=
github.com/AzureAD/microsoft-authentication-library-for-go v1.4.2 h1:oygO0locgZJe7PpYPXT5A29ZkwJaPqcva7BVeemZOZs=
github.com/AzureAD/microsoft-authentication-library-for-go v1.4.2/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/toml v1.5.0 h1:W5quZX/G/csjUnuI8SUYlsHs9M38FC7znL0lIO+DvMg=
github.com/BurntSushi/toml v1.5.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho=
Expand Down Expand Up @@ -1064,6 +1068,8 @@ github.com/dgryski/go-farm v0.0.0-20190104051053-3adb47b1fb0f/go.mod h1:SqUrOPUn
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da h1:aIftn67I1fkbMa512G+w+Pxci9hJPB8oMnkcP3iZF38=
github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/dimfeld/httptreemux v5.0.1+incompatible h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA=
github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0=
github.com/docker/distribution v2.8.1+incompatible h1:Q50tZOPR6T/hjNsyc9g8/syEs6bk8XXApsHjKukMl68=
Expand Down Expand Up @@ -1504,6 +1510,8 @@ github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfV
github.com/jung-kurt/gofpdf v1.0.0/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes=
github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes=
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
github.com/keybase/go-keychain v0.0.1 h1:way+bWYa6lDppZoZcgMbYsvC7GxljxrskdNInRtuthU=
github.com/keybase/go-keychain v0.0.1/go.mod h1:PdEILRW3i9D8JcdM+FmY6RwkHGnhHxXwkPPMeUgOK1k=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
Expand Down Expand Up @@ -1754,6 +1762,8 @@ github.com/r3labs/diff v1.1.0 h1:V53xhrbTHrWFWq3gI4b94AjgEJOerO1+1l0xyHOBi8M=
github.com/r3labs/diff v1.1.0/go.mod h1:7WjXasNzi0vJetRcB/RqNl5dlIsmXcTTLmF5IoH6Xig=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/redis/go-redis/v9 v9.8.0 h1:q3nRvjrlge/6UD7eTu/DSg2uYiU2mCL0G/uzBWqhicI=
github.com/redis/go-redis/v9 v9.8.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw=
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
Expand Down Expand Up @@ -1857,8 +1867,8 @@ github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o
github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/swaggo/files v0.0.0-20210815190702-a29dd2bc99b2 h1:+iNTcqQJy0OZ5jk6a5NLib47eqXK8uYcPX+O4+cBpEM=
github.com/swaggo/files v0.0.0-20210815190702-a29dd2bc99b2/go.mod h1:lKJPbtWzJ9JhsTN1k1gZgleJWY/cqq0psdoMmaThG3w=
github.com/swaggo/gin-swagger v1.2.0 h1:YskZXEiv51fjOMTsXrOetAjrMDfFaXD79PEoQBOe2W0=
Expand Down
99 changes: 91 additions & 8 deletions pkg/util/external_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,14 @@ func getExternalStorage(
})
if err != nil {
retErr := errors.ErrFailToCreateExternalStorage.Wrap(errors.Trace(err))
return nil, retErr.GenWithStackByArgs("creating ExternalStorage for s3")
return nil, retErr.GenWithStackByArgs("creating ExternalStorage")
}

// Check the connection and ignore the returned bool value, since we don't care if the file exists.
_, err = ret.FileExists(ctx, "test")
if err != nil {
retErr := errors.ErrFailToCreateExternalStorage.Wrap(errors.Trace(err))
return nil, retErr.GenWithStackByArgs("creating ExternalStorage for s3")
return nil, retErr.GenWithStackByArgs("creating ExternalStorage")
}
return ret, nil
}
Expand Down Expand Up @@ -244,21 +244,104 @@ func (s *extStorageWithTimeout) WalkDir(
return err
}

func withTimeoutIfNoDeadline(ctx context.Context, timeout time.Duration) (context.Context, context.CancelFunc) {
// Some call sites pass context.Background() down to external storage APIs.
// For cloud providers, that can translate into "wait forever" on network stalls.
// We only apply a default timeout when the caller didn't set a deadline.
if _, ok := ctx.Deadline(); ok {
return ctx, nil
}
return context.WithTimeout(ctx, timeout)
}

// Create opens a file writer by path. path is relative path to storage base path
func (s *extStorageWithTimeout) Create(
ctx context.Context, path string, option *storage.WriterOption,
) (storage.ExternalFileWriter, error) {
if option.Concurrency <= 1 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, s.timeout)
defer cancel()
// Some backends (notably S3 multipart uploads) spawn background goroutines which
// are bound to the context passed to Create(). If the caller uses a context
// without deadline, those goroutines can hang indefinitely on network stalls.
//
// To keep callers simple and avoid hidden goroutine leaks, we:
// - wrap Write/Close calls with a default timeout if the caller didn't set one;
// - for multipart uploads (Concurrency > 1), pass a cancellable context to Create()
// and cancel it when Write/Close times out/cancels.
concurrency := 1
if option != nil && option.Concurrency > 0 {
concurrency = option.Concurrency
}

var cancelCreate context.CancelFunc
if concurrency > 1 {
ctx, cancelCreate = context.WithCancel(ctx)
}
// multipart uploading spawns a background goroutine, can't set timeout

writer, err := s.ExternalStorage.Create(ctx, path, option)
if err != nil {
if cancelCreate != nil {
cancelCreate()
}
err = errors.ErrExternalStorageAPI.Wrap(err).GenWithStackByArgs("Create")
return nil, err
}
return &writerWithCancelAndTimeout{
ExternalFileWriter: writer,
timeout: s.timeout,
cancelCreate: cancelCreate,
}, nil
}

type writerWithCancelAndTimeout struct {
storage.ExternalFileWriter
timeout time.Duration
cancelCreate context.CancelFunc
}

func (w *writerWithCancelAndTimeout) Write(ctx context.Context, p []byte) (int, error) {
ctx, cancel := withTimeoutIfNoDeadline(ctx, w.timeout)
var stop func() bool
if w.cancelCreate != nil {
// If the backend binds background uploads to the Create() context, ensure a
// Write timeout/cancel also aborts the background work so the call unblocks.
stop = context.AfterFunc(ctx, w.cancelCreate)
}

n, err := w.ExternalFileWriter.Write(ctx, p)

if stop != nil {
stop()
}
return writer, err
if cancel != nil {
cancel()
}
if err != nil {
err = errors.ErrExternalStorageAPI.Wrap(err).GenWithStackByArgs("Write")
}
return n, err
}

func (w *writerWithCancelAndTimeout) Close(ctx context.Context) error {
ctx, cancel := withTimeoutIfNoDeadline(ctx, w.timeout)
var stop func() bool
if w.cancelCreate != nil {
// Same rationale as Write(): on multipart backends the ctx argument is often
// ignored in Close(), so we must cancel the Create() context to abort uploads.
stop = context.AfterFunc(ctx, w.cancelCreate)
defer w.cancelCreate()
}

err := w.ExternalFileWriter.Close(ctx)

if stop != nil {
stop()
}
if cancel != nil {
cancel()
}
if err != nil {
err = errors.ErrExternalStorageAPI.Wrap(err).GenWithStackByArgs("Close")
}
return err
}

// Rename file name from oldFileName to newFileName
Expand Down
96 changes: 96 additions & 0 deletions pkg/util/external_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,99 @@ func TestExtStorageOpenReaderRespectsCallerCancel(t *testing.T) {
require.Error(t, err)
require.True(t, errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded))
}

// blockingCtxWriter blocks in Write/Close until the passed ctx is done.
// It is used to verify extStorageWithTimeout wraps streaming writer operations
// with default deadlines when callers use context without deadline.
type blockingCtxWriter struct{}

func (*blockingCtxWriter) Write(ctx context.Context, _ []byte) (int, error) {
<-ctx.Done()
return 0, ctx.Err()
}

func (*blockingCtxWriter) Close(ctx context.Context) error {
<-ctx.Done()
return ctx.Err()
}

// blockingCreateCtxWriter blocks in Write/Close until the ctx passed to Create() is done.
// This simulates multipart backends (e.g., S3 uploader) where background work is bound to
// the Create() context rather than the Write/Close context.
type blockingCreateCtxWriter struct {
createCtx context.Context
}

func (w *blockingCreateCtxWriter) Write(_ context.Context, _ []byte) (int, error) {
<-w.createCtx.Done()
return 0, w.createCtx.Err()
}

func (w *blockingCreateCtxWriter) Close(_ context.Context) error {
<-w.createCtx.Done()
return w.createCtx.Err()
}

type mockCreateExternalStorage struct {
storage.ExternalStorage
writer storage.ExternalFileWriter
}

func (m *mockCreateExternalStorage) Create(ctx context.Context, _ string, _ *storage.WriterOption) (storage.ExternalFileWriter, error) {
if w, ok := m.writer.(*blockingCreateCtxWriter); ok {
w.createCtx = ctx
}
return m.writer, nil
}

func TestExtStorageCreateWriterWriteTimeout(t *testing.T) {
// Scenario: a streaming writer should not hang forever when the caller passes
// a context without deadline.
//
// Steps:
// 1) Use a writer that blocks until the Write() ctx is done.
// 2) Call extStorageWithTimeout.Create and then writer.Write with context.Background().
// 3) Verify the call fails within the default timeout.
testTimeout := 50 * time.Millisecond
timedStore := &extStorageWithTimeout{
ExternalStorage: &mockCreateExternalStorage{writer: &blockingCtxWriter{}},
timeout: testTimeout,
}

w, err := timedStore.Create(context.Background(), "file", &storage.WriterOption{Concurrency: 1})
require.NoError(t, err)

start := time.Now()
_, err = w.Write(context.Background(), []byte("x"))
elapsed := time.Since(start)

require.Error(t, err)
require.True(t, errors.Is(err, context.DeadlineExceeded), "got %v", err)
require.InDelta(t, testTimeout, elapsed, float64(testTimeout)*0.5)
}

func TestExtStorageCreateMultipartWriteCancelsCreateCtxOnTimeout(t *testing.T) {
// Scenario: multipart backends can bind background work to the Create() context.
// When Write() times out, TiCDC should cancel that Create() context so the call unblocks.
//
// Steps:
// 1) Use a writer that blocks until the Create() ctx is canceled.
// 2) Call extStorageWithTimeout.Create with Concurrency > 1.
// 3) Call Write() with a ctx without deadline and verify it returns in time.
testTimeout := 50 * time.Millisecond
timedStore := &extStorageWithTimeout{
ExternalStorage: &mockCreateExternalStorage{writer: &blockingCreateCtxWriter{}},
timeout: testTimeout,
}

w, err := timedStore.Create(context.Background(), "file", &storage.WriterOption{Concurrency: 2})
require.NoError(t, err)

start := time.Now()
_, err = w.Write(context.Background(), []byte("x"))
elapsed := time.Since(start)

require.Error(t, err)
require.True(t, errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded), "got %v", err)
require.InDelta(t, testTimeout, elapsed, float64(testTimeout)*0.5)
}
Loading