Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
80 commits
Select commit Hold shift + click to select a range
f2aa776
add first round commit
3AceShowHand Feb 14, 2026
f9b46db
add all code
3AceShowHand Feb 14, 2026
fb577e0
add code
3AceShowHand Feb 14, 2026
efcf279
docs: consolidate storage sink documentation
3AceShowHand Feb 14, 2026
c86a33f
maintainer,dispatcher: match barrier done with action
3AceShowHand Mar 7, 2026
9e9cf90
fix the code
3AceShowHand Mar 9, 2026
3f8026a
only enable flush if using the storage sink
3AceShowHand Mar 9, 2026
b1f1a46
fix test
3AceShowHand Mar 9, 2026
783ed7f
revert flush
3AceShowHand Mar 10, 2026
4192711
simplify the test
3AceShowHand Mar 10, 2026
85bdd22
revert all changes in the maintainer
3AceShowHand Mar 10, 2026
4611804
adjust the code
3AceShowHand Mar 10, 2026
67262a5
adjust the code
3AceShowHand Mar 10, 2026
9adef26
adjust the code
3AceShowHand Mar 10, 2026
b2d790f
adjust the code
3AceShowHand Mar 11, 2026
a7070ab
adjust the code
3AceShowHand Mar 11, 2026
699a681
Merge branch 'master' into codex/fix-barrier-done-action
3AceShowHand Mar 11, 2026
5b6eec9
update comments
3AceShowHand Mar 12, 2026
f0b46de
Merge branch 'master' into enhance-storage-sink
3AceShowHand Mar 12, 2026
3c9e247
Merge branch 'codex/fix-barrier-done-action' into enhance-storage-sink
3AceShowHand Mar 12, 2026
dd4f780
docs: remove markdown notes from branch
3AceShowHand Mar 12, 2026
bc5d6ff
Merge branch 'master' into enhance-storage-sink
3AceShowHand Mar 12, 2026
a13c16d
fix conflicts
3AceShowHand Mar 12, 2026
4add4e7
revert all unrelated changes
3AceShowHand Mar 12, 2026
c72b390
run with the spool, first round
3AceShowHand Mar 12, 2026
73ecbb3
fix format and read codec package finished
3AceShowHand Mar 12, 2026
eaa3409
update code
3AceShowHand Mar 12, 2026
db38b62
refactor the storage sink code
3AceShowHand Mar 13, 2026
e647e82
update path.go
3AceShowHand Mar 13, 2026
9c7b684
fix code
3AceShowHand Mar 13, 2026
2b79acd
add a ut
3AceShowHand Mar 13, 2026
fe5db6a
Add related docs
3AceShowHand Mar 13, 2026
7c3d2bc
add more docs
3AceShowHand Mar 14, 2026
8d18029
adjust code
3AceShowHand Mar 18, 2026
843fda6
add budget
3AceShowHand Mar 19, 2026
c16404d
update spool code
3AceShowHand Mar 19, 2026
f785432
remove files
3AceShowHand Mar 19, 2026
8d57e56
rename to spool
3AceShowHand Mar 19, 2026
b697302
Merge branch 'master' into enhance-storage-sink
3AceShowHand Mar 19, 2026
a1df8a0
adjust code
3AceShowHand Mar 19, 2026
712b83f
fix the code
3AceShowHand Mar 19, 2026
1e8bac0
add more comment
3AceShowHand Mar 19, 2026
16f18de
fix test
3AceShowHand Mar 19, 2026
30f71f5
adjust option settings
3AceShowHand Mar 20, 2026
cba9bb0
add buffer manager
3AceShowHand Mar 20, 2026
fec627d
fix all code
3AceShowHand Mar 23, 2026
1ce0476
add more code
3AceShowHand Mar 23, 2026
0920444
update code
3AceShowHand Mar 23, 2026
1a8a0a9
fix read the quota controller
3AceShowHand Mar 23, 2026
bfef51d
finish review the spool
3AceShowHand Mar 24, 2026
087cfbb
adjust the code
3AceShowHand Mar 24, 2026
bbebd18
adjust the code
3AceShowHand Mar 24, 2026
3854c54
read buffer manager code
3AceShowHand Mar 24, 2026
fc5186e
read buffer manager code
3AceShowHand Mar 24, 2026
79fec1d
adjust code
3AceShowHand Mar 24, 2026
217dbd9
adjust code
3AceShowHand Mar 24, 2026
751cd60
adjust code
3AceShowHand Mar 24, 2026
d4ab3fa
make sure the channel close
3AceShowHand Mar 24, 2026
a1bd56e
format code
3AceShowHand Mar 24, 2026
e045446
adjust code
3AceShowHand Mar 25, 2026
083d004
fix the first round
3AceShowHand Mar 25, 2026
7d64545
fix more code
3AceShowHand Mar 25, 2026
2b10ea2
fix bytes calculation
3AceShowHand Mar 25, 2026
6fe776f
use histogram
3AceShowHand Mar 25, 2026
e7c9c59
add waiter count and duration
3AceShowHand Mar 25, 2026
15b9082
optimize code
3AceShowHand Mar 25, 2026
45ae58d
add changes
3AceShowHand Mar 25, 2026
d7def44
add changes
3AceShowHand Mar 25, 2026
e433c12
fix code
3AceShowHand Mar 25, 2026
99b18cb
adjust metrics
3AceShowHand Mar 26, 2026
40d3f6c
add more code
3AceShowHand Mar 26, 2026
d132930
Add grafana
3AceShowHand Mar 26, 2026
12e3041
Add grafana
3AceShowHand Mar 26, 2026
a62a5b5
Merge branch 'master' into enhance-storage-sink
3AceShowHand Mar 26, 2026
e622de8
Add grafana
3AceShowHand Mar 26, 2026
cf3bc1c
Add grafana
3AceShowHand Mar 26, 2026
aa354b1
Grafana is ready to review
3AceShowHand Mar 26, 2026
b7bb03c
next grafana generated
3AceShowHand Mar 26, 2026
aa27812
fix a lot of code
3AceShowHand Mar 27, 2026
fc87d68
Add one unit test
3AceShowHand Mar 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
333 changes: 333 additions & 0 deletions downstreamadapter/sink/cloudstorage/buffer_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,333 @@
// Copyright 2026 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package cloudstorage

import (
"bytes"
"context"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/ticdc/downstreamadapter/sink/cloudstorage/spool"
"github.com/pingcap/ticdc/downstreamadapter/sink/metrics"
"github.com/pingcap/ticdc/pkg/common"
"github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/sink/cloudstorage"
)

const (
defaultBufferManagerChannelSize = 64

flushReasonSize = "size"
flushReasonInterval = "interval"
flushReasonBarrier = "barrier"
flushReasonQuota = "quota"
flushReasonOversize = "oversized"
)

// bufferManager owns pending DML batches for one writer shard. It decides
// when to emit a flush batch and how to react when local spool disk quota is tight.
type bufferManager struct {
changeFeedID common.ChangeFeedID
config *cloudstorage.Config
spool *spool.Spool

// inputCh is a bounded task queue owned by bufferManager.
// Producers stop on ctx cancellation, so the channel does not need to be closed.
inputCh chan *task
enqueueFlushTask func(context.Context, flushTask) error
buffer tableBatches
}

func newBufferManager(
changefeedID common.ChangeFeedID,
config *cloudstorage.Config,
spoolBuffer *spool.Spool,
enqueueFlushTask func(context.Context, flushTask) error,
) *bufferManager {
return &bufferManager{
changeFeedID: changefeedID,
config: config,
spool: spoolBuffer,
inputCh: make(chan *task, defaultBufferManagerChannelSize),
enqueueFlushTask: enqueueFlushTask,
buffer: newTableBatches(),
}
}

func (c *bufferManager) run(ctx context.Context) error {
ticker := time.NewTicker(c.config.FlushInterval)
defer ticker.Stop()

for {
failpoint.Inject("passTickerOnce", func() {
<-ticker.C
})

select {
case <-ctx.Done():
return errors.Trace(context.Cause(ctx))
case <-ticker.C:
if err := c.emitBatch(ctx, flushReasonInterval); err != nil {
return err
}
case task := <-c.inputCh:
if task.isFlushTask() {
dispatcherBatch := c.buffer.detachByDispatcher(task.dispatcherID)
if !dispatcherBatch.isEmpty() {
if err := c.emitFlushTask(ctx, dispatcherBatch, flushReasonBarrier); err != nil {
return err
}
}
if err := c.enqueueFlushTask(ctx, flushTask{marker: task.marker}); err != nil {
return err
}
continue
}
if err := c.handleDMLTask(ctx, task); err != nil {
return err
}
}
}
}

func (c *bufferManager) handleDMLTask(ctx context.Context, task *task) error {
if len(task.encodedMsgs) == 0 {
task.event.PostEnqueue()
return nil
}

for {
action, entry, err := c.spool.TryEnqueue(task.encodedMsgs, task.event.PostEnqueue)
if err != nil {
return err
}
switch action {
case spool.EnqueueActionAcceptedOversized:
c.addEntry(task, entry)
return c.emitTableBatch(ctx, task.versionedTable, flushReasonOversize)
case spool.EnqueueActionWaitDiskQuota:
if err := c.emitBatch(ctx, flushReasonQuota); err != nil {
return err
}
if err := c.spool.WaitForDiskQuota(ctx, task.encodedMsgs); err != nil {
return err
}
continue
default:
c.addEntry(task, entry)
}

version := task.versionedTable
if c.buffer.tables[version].size < uint64(c.config.FileSize) {
return nil
}
return c.emitTableBatch(ctx, version, flushReasonSize)
}
}

func (c *bufferManager) emitBatch(ctx context.Context, reason string) error {
if c.buffer.isEmpty() {
return nil
}

if err := c.emitFlushTask(ctx, c.buffer, reason); err != nil {
return err
}

c.buffer = newTableBatches()
return nil
}

func (c *bufferManager) emitTableBatch(
ctx context.Context,
table cloudstorage.VersionedTableName,
reason string,
) error {
tableBatch, err := c.buffer.detachByTable(table)
if err != nil {
return err
}
if err := c.emitFlushTask(ctx, tableBatch, reason); err != nil {
return err
}
return nil
}

func (c *bufferManager) enqueueTask(ctx context.Context, t *task) error {
select {
case <-ctx.Done():
return errors.Trace(context.Cause(ctx))
case c.inputCh <- t:
return nil
}
}

type tableBatches struct {
tables map[cloudstorage.VersionedTableName]*tableBatch
nBytes uint64
}

type tableBatch struct {
size uint64
tableInfo *common.TableInfo
entries []*spool.Entry
}

func newTableBatches() tableBatches {
return tableBatches{
tables: make(map[cloudstorage.VersionedTableName]*tableBatch),
}
}

func (t *tableBatches) isEmpty() bool {
return len(t.tables) == 0
}

func (t *tableBatches) addEntry(event *task, entry *spool.Entry) {
table := event.versionedTable
if _, ok := t.tables[table]; !ok {
t.tables[table] = &tableBatch{
size: 0,
tableInfo: event.event.TableInfo,
}
}

tableTask := t.tables[table]
tableTask.size += entry.FileBytes()
tableTask.entries = append(tableTask.entries, entry)
t.nBytes += entry.FileBytes()
}

func (c *bufferManager) addEntry(event *task, entry *spool.Entry) {
c.buffer.addEntry(event, entry)
}

func (t *tableBatches) detachByTable(version cloudstorage.VersionedTableName) (tableBatches, error) {
tableTask := t.tables[version]
if tableTask == nil {
return tableBatches{}, errors.ErrInternalCheckFailed.GenWithStack(
"table batch not found: %+v",
version,
)
}
delete(t.tables, version)
t.nBytes -= tableTask.size

return tableBatches{
tables: map[cloudstorage.VersionedTableName]*tableBatch{version: tableTask},
nBytes: tableTask.size,
}, nil
}

func (t *tableBatches) detachByDispatcher(dispatcherID common.DispatcherID) tableBatches {
detached := newTableBatches()
for version, tableTask := range t.tables {
if version.DispatcherID != dispatcherID {
continue
}
detached.tables[version] = tableTask
detached.nBytes += tableTask.size
t.nBytes -= tableTask.size
delete(t.tables, version)
}
return detached
}

func (c *bufferManager) emitFlushTask(ctx context.Context, batch tableBatches, reason string) error {
batches := make([]tablePayload, 0, len(batch.tables))
for table, tableTask := range batch.tables {
payload, err := c.buildPayload(tableTask)
if err != nil {
return err
}
batches = append(batches, tablePayload{
table: table,
payload: payload,
})
}

if err := c.enqueueFlushTask(ctx, flushTask{batches: batches}); err != nil {
return err
}
metrics.CloudStorageFlushReasonCounter.WithLabelValues(c.changeFeedID.Keyspace(), c.changeFeedID.Name(), reason).Inc()
return nil
}

func (c *bufferManager) buildPayload(batch *tableBatch) (*payload, error) {
builder := newPayloadBuilder(batch)
for _, entry := range batch.entries {
if err := builder.appendEntry(c.spool, entry); err != nil {
return nil, err
}
}
return builder.Build(), nil
}

type payloadBuilder struct {
buf bytes.Buffer
batch *tableBatch
rowsCount int
nBytes int64
shouldWriteKey bool
postFlushCallbacks []func()
}

func newPayloadBuilder(batch *tableBatch) *payloadBuilder {
builder := &payloadBuilder{}
builder.batch = batch
builder.shouldWriteKey = true
builder.buf.Grow(int(batch.size))
return builder
}

func (b *payloadBuilder) Build() *payload {
return &payload{
tableInfo: b.batch.tableInfo,
data: b.buf.Bytes(),
rowsCount: b.rowsCount,
nBytes: b.nBytes,
entries: b.batch.entries,
postFlushCallbacks: b.postFlushCallbacks,
}
}

func (b *payloadBuilder) appendEntry(spoolBuffer *spool.Spool, entry *spool.Entry) error {
reader, err := spoolBuffer.NewMessageReader(entry)
if err != nil {
return err
}
for {
key, value, rowCount, ok, err := reader.Next()
if err != nil {
return err
}
if !ok {
break
}
// Only the first encoded message contributes the leading key/header.
if b.shouldWriteKey {
b.shouldWriteKey = false
if key != nil {
b.buf.Write(key)
b.nBytes += int64(len(key))
}
}
b.buf.Write(value)
b.nBytes += int64(len(value))
b.rowsCount += rowCount
}
b.postFlushCallbacks = append(b.postFlushCallbacks, reader.PostFlushCallbacks()...)
return nil
}
Loading
Loading