diff --git a/block/internal/reaping/reaper.go b/block/internal/reaping/reaper.go index d35c61e422..66e058fbb2 100644 --- a/block/internal/reaping/reaper.go +++ b/block/internal/reaping/reaper.go @@ -16,6 +16,7 @@ import ( coreexecutor "github.com/evstack/ev-node/core/execution" coresequencer "github.com/evstack/ev-node/core/sequencer" "github.com/evstack/ev-node/pkg/genesis" + "github.com/evstack/ev-node/pkg/sequencers/solo" ) const ( @@ -193,6 +194,17 @@ func (r *Reaper) drainMempool(cleanupCh <-chan time.Time) (bool, error) { Id: []byte(r.chainID), Batch: &coresequencer.Batch{Transactions: newTxs}, }) + if errors.Is(err, solo.ErrQueueFull) { + // Sequencer queue is full — backpressure signal. Mark the + // batch as "seen" so we don't waste cycles re-hashing the + // same dropped txs every reaper tick, and surface the drop + // as a warning rather than tearing down the daemon. The + // loadgen sees lower acceptance via /tx flow control once + // the executor's own mempool fills up. + r.cache.SetTxsSeen(newHashes) + r.logger.Warn().Int("dropped", len(newTxs)).Msg("sequencer queue full, dropping txs (backpressure)") + break + } if err != nil { return totalSubmitted > 0, fmt.Errorf("failed to submit txs to sequencer: %w", err) } diff --git a/block/internal/submitting/submitter.go b/block/internal/submitting/submitter.go index 2c613327f7..fa6aa5bba0 100644 --- a/block/internal/submitting/submitter.go +++ b/block/internal/submitting/submitter.go @@ -52,9 +52,18 @@ type Submitter struct { // DA state daIncludedHeight *atomic.Uint64 - // Submission state to prevent concurrent submissions - headerSubmissionMtx sync.Mutex - dataSubmissionMtx sync.Mutex + // Submission concurrency: each semaphore is a buffered channel + // sized to MaxPendingHeadersAndData. A zero value disables the + // limit and falls back to a single in-flight submission per type + // (= cap 1) so callers that opt out of pending-cap don't get + // unbounded fan-out. Tickets are acquired non-blocking via + // `select` and released by the goroutine that started the + // submission. Replaces the previous single-flight Mutex which + // pinned data-upload throughput at the latency of a single + // gRPC round-trip — under sustained load that capped DA at + // ~20 MB/s even though Fibre's per-blob upload took ≤1.5 s. + headerSubmissionSem chan struct{} + dataSubmissionSem chan struct{} // Batching strategy state lastHeaderSubmit atomic.Int64 // stores Unix nanoseconds @@ -95,20 +104,31 @@ func NewSubmitter( strategy = NewTimeBasedStrategy(config.DA.BlockTime.Duration, 0, 1) } + // Pool size = pending-cap. Each pending blob gets up to one + // in-flight submission; if the cap is 0 (unbounded pending) we + // keep at least one slot so we don't reintroduce single-flight + // behavior accidentally. + poolSize := int(config.Node.MaxPendingHeadersAndData) + if poolSize <= 0 { + poolSize = 1 + } + submitter := &Submitter{ - store: store, - exec: exec, - cache: cache, - metrics: metrics, - config: config, - genesis: genesis, - daSubmitter: daSubmitter, - sequencer: sequencer, - signer: signer, - daIncludedHeight: &atomic.Uint64{}, - batchingStrategy: strategy, - errorCh: errorCh, - logger: submitterLogger, + store: store, + exec: exec, + cache: cache, + metrics: metrics, + config: config, + genesis: genesis, + daSubmitter: daSubmitter, + sequencer: sequencer, + signer: signer, + daIncludedHeight: &atomic.Uint64{}, + batchingStrategy: strategy, + errorCh: errorCh, + logger: submitterLogger, + headerSubmissionSem: make(chan struct{}, poolSize), + dataSubmissionSem: make(chan struct{}, poolSize), } now := time.Now().UnixNano() @@ -194,12 +214,13 @@ func (s *Submitter) daSubmissionLoop() { // For strategy decision, we need to estimate the size // We'll fetch headers to check, but only submit if strategy approves - if s.headerSubmissionMtx.TryLock() { + select { + case s.headerSubmissionSem <- struct{}{}: s.logger.Debug().Time("t", time.Now()).Uint64("headers", headersNb).Msg("Header submission in progress") s.wg.Add(1) go func() { defer func() { - s.headerSubmissionMtx.Unlock() + <-s.headerSubmissionSem s.logger.Debug().Time("t", time.Now()).Uint64("headers", headersNb).Msg("Header submission completed") s.wg.Done() }() @@ -266,6 +287,8 @@ func (s *Submitter) daSubmissionLoop() { s.logger.Error().Err(err).Msg("failed to enqueue header submission") } }() + default: + // All header workers busy; try again on the next tick. } } @@ -274,12 +297,13 @@ func (s *Submitter) daSubmissionLoop() { if dataNb > 0 { lastSubmitNanos := s.lastDataSubmit.Load() timeSinceLastSubmit := time.Since(time.Unix(0, lastSubmitNanos)) - if s.dataSubmissionMtx.TryLock() { + select { + case s.dataSubmissionSem <- struct{}{}: s.logger.Debug().Time("t", time.Now()).Uint64("data", dataNb).Msg("Data submission in progress") s.wg.Add(1) go func() { defer func() { - s.dataSubmissionMtx.Unlock() + <-s.dataSubmissionSem s.logger.Debug().Time("t", time.Now()).Uint64("data", dataNb).Msg("Data submission completed") s.wg.Done() }() @@ -346,6 +370,8 @@ func (s *Submitter) daSubmissionLoop() { s.logger.Error().Err(err).Msg("failed to enqueue data submission") } }() + default: + // All data workers busy; try again on the next tick. } } diff --git a/block/internal/submitting/submitter_test.go b/block/internal/submitting/submitter_test.go index 6e684176be..2231d19f30 100644 --- a/block/internal/submitting/submitter_test.go +++ b/block/internal/submitting/submitter_test.go @@ -361,17 +361,19 @@ func TestSubmitter_daSubmissionLoop(t *testing.T) { require.NoError(t, err) s := &Submitter{ - store: st, - exec: exec, - cache: cm, - metrics: metrics, - config: cfg, - genesis: genesis.Genesis{}, - daSubmitter: fakeDA, - signer: &fakeSigner{}, - daIncludedHeight: &atomic.Uint64{}, - batchingStrategy: batchingStrategy, - logger: zerolog.Nop(), + store: st, + exec: exec, + cache: cm, + metrics: metrics, + config: cfg, + genesis: genesis.Genesis{}, + daSubmitter: fakeDA, + signer: &fakeSigner{}, + daIncludedHeight: &atomic.Uint64{}, + batchingStrategy: batchingStrategy, + logger: zerolog.Nop(), + headerSubmissionSem: make(chan struct{}, 1), + dataSubmissionSem: make(chan struct{}, 1), } // Set last submit times far in past so strategy allows submission diff --git a/pkg/config/config.go b/pkg/config/config.go index 7cbb780a21..43719c1047 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -367,7 +367,12 @@ func (c *Config) ApplyFiberDefaults() { } c.DA.BlockTime = DurationWrapper{Duration: 1 * time.Second} - c.Node.MaxPendingHeadersAndData = 50 + // Tighter pending cap (was 50). At 50, a Fibre upload stall lets the + // submitter accumulate 50 × ~32 MiB blob copies + their per-validator + // retry buffers; under load that exceeded c6in.8xlarge's 64 GiB and + // OOM-killed evnode at 63.8 GiB. 10 keeps the in-flight footprint + // bounded while still letting healthy uploads pipeline. + c.Node.MaxPendingHeadersAndData = 10 } // GetNamespace returns the namespace for header submissions. diff --git a/pkg/sequencers/solo/sequencer.go b/pkg/sequencers/solo/sequencer.go index 86fa08d45d..f8524157e6 100644 --- a/pkg/sequencers/solo/sequencer.go +++ b/pkg/sequencers/solo/sequencer.go @@ -14,7 +14,15 @@ import ( coresequencer "github.com/evstack/ev-node/core/sequencer" ) -var ErrInvalidID = errors.New("invalid chain id") +var ( + ErrInvalidID = errors.New("invalid chain id") + // ErrQueueFull is returned from SubmitBatchTxs when the in-memory + // queue is at its byte cap (see SetMaxQueueBytes). Callers should + // treat this as transient backpressure (drop or retry); the + // reaper bridging executor mempool → sequencer matches it via + // errors.Is and downgrades to a warning. + ErrQueueFull = errors.New("sequencer queue full") +) var ( emptyBatch = &coresequencer.Batch{} @@ -27,6 +35,15 @@ var _ coresequencer.Sequencer = (*SoloSequencer)(nil) // SoloSequencer is a single-leader sequencer without forced inclusion // support. It maintains a simple in-memory queue of mempool transactions and // produces batches on demand. +// +// The queue can be bounded in bytes via SetMaxQueueBytes. A bound is +// strongly recommended in any high-throughput configuration: under +// sustained ingest above the block-production drain rate the queue +// otherwise grows monotonically until OOM. With a bound set, +// SubmitBatchTxs admits only as many incoming txs as fit and returns +// ErrQueueFull if the bound rejected at least one tx, so callers can +// surface backpressure (e.g. via HTTP 503) instead of silently +// retaining bytes. type SoloSequencer struct { logger zerolog.Logger id []byte @@ -34,8 +51,10 @@ type SoloSequencer struct { daHeight atomic.Uint64 - mu sync.Mutex - queue [][]byte + mu sync.Mutex + queue [][]byte + queueBytes uint64 + maxQueueBytes uint64 // 0 = unbounded (legacy default) } func NewSoloSequencer( @@ -51,6 +70,16 @@ func NewSoloSequencer( } } +// SetMaxQueueBytes sets a soft cap on the sequencer's in-memory tx +// queue. SubmitBatchTxs admits txs in arrival order while the cap has +// room and returns ErrQueueFull as soon as one is rejected. A zero value +// disables the cap. Intended to be called once at startup. +func (s *SoloSequencer) SetMaxQueueBytes(n uint64) { + s.mu.Lock() + defer s.mu.Unlock() + s.maxQueueBytes = n +} + func (s *SoloSequencer) isValid(id []byte) bool { return bytes.Equal(s.id, id) } @@ -67,7 +96,30 @@ func (s *SoloSequencer) SubmitBatchTxs(ctx context.Context, req coresequencer.Su s.mu.Lock() defer s.mu.Unlock() + if s.maxQueueBytes == 0 { + // Unbounded path (legacy). Suitable for tests and small + // deployments; in production use SetMaxQueueBytes. + s.queue = append(s.queue, req.Batch.Transactions...) + return submitBatchResp, nil + } + + // All-or-nothing: if the whole incoming batch doesn't fit, reject + // it untouched. Partial admission would force the caller (e.g. + // the reaper bridging executor mempool → sequencer) to reason + // about which prefix was admitted and re-feed only the suffix on + // retry, which it doesn't currently do — leading to duplicate-tx + // resubmission on each retry. Rejecting the whole batch lets the + // reaper just retry with the same batch later when the queue has + // drained. + var batchBytes uint64 + for _, tx := range req.Batch.Transactions { + batchBytes += uint64(len(tx)) + } + if s.queueBytes+batchBytes > s.maxQueueBytes { + return submitBatchResp, ErrQueueFull + } s.queue = append(s.queue, req.Batch.Transactions...) + s.queueBytes += batchBytes return submitBatchResp, nil } @@ -79,6 +131,7 @@ func (s *SoloSequencer) GetNextBatch(ctx context.Context, req coresequencer.GetN s.mu.Lock() txs := s.queue s.queue = nil + s.queueBytes = 0 s.mu.Unlock() if len(txs) == 0 { @@ -122,6 +175,14 @@ func (s *SoloSequencer) GetNextBatch(ctx context.Context, req coresequencer.GetN if len(postponedTxs) > 0 { s.mu.Lock() s.queue = append(postponedTxs, s.queue...) + // Postponed txs were already in the queue's byte count when + // SubmitBatchTxs admitted them. We zeroed queueBytes on drain + // above, so re-queuing requires re-counting whatever survived. + var bytes uint64 + for _, tx := range postponedTxs { + bytes += uint64(len(tx)) + } + s.queueBytes += bytes s.mu.Unlock() } diff --git a/tools/celestia-node-fiber/cmd/evnode-fibre/main.go b/tools/celestia-node-fiber/cmd/evnode-fibre/main.go index 77322116bd..c8fd9f88d5 100644 --- a/tools/celestia-node-fiber/cmd/evnode-fibre/main.go +++ b/tools/celestia-node-fiber/cmd/evnode-fibre/main.go @@ -51,6 +51,7 @@ import ( "github.com/celestiaorg/celestia-app/v9/app" "github.com/celestiaorg/celestia-app/v9/app/encoding" + appfibre "github.com/celestiaorg/celestia-app/v9/fibre" "github.com/celestiaorg/celestia-node/api/client" cnp2p "github.com/celestiaorg/celestia-node/nodebuilder/p2p" @@ -210,14 +211,16 @@ func run(cli cliFlags) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - // Construct the celestia-node-fiber adapter. We don't override - // SubmitConfig.Fibre — the Fibre client defaults (UploadMemoryBudget - // 512 MiB, RPCTimeout 15 s) are sized for the FSP-side concurrency - // the validators can actually absorb. We tried bumping the budget - // to 4 GiB to allow more in-flight blobs; with 16 upload workers - // the FSPs couldn't keep up and the box OOM'd at 63.9 GB. Leaving - // the defaults in place means the upload pipeline self-bounds at - // roughly what the FSPs can sustain. + // Construct the celestia-node-fiber adapter. The Fibre client + // defaults (UploadMemoryBudget 512 MiB, RPCTimeout 15 s) are sized + // for FSP-side concurrency. Bumping the budget alone caused 64 GiB + // OOMs (4 GiB budget × 16 workers), so we leave that conservative + // AND raise RPCTimeout to 30 s so a slow-but-healthy validator + // signature collection isn't cut short under load — under busy + // conditions a 32 MiB row upload + sig aggregation can exceed the + // 15 s default. + fibreCfg := appfibre.DefaultClientConfig() + fibreCfg.RPCTimeout = 30 * time.Second adapter, err := cnfiber.New(ctx, cnfiber.Config{ Client: client.Config{ ReadConfig: client.ReadConfig{ @@ -231,6 +234,7 @@ func run(cli cliFlags) error { CoreGRPCConfig: client.CoreGRPCConfig{ Addr: cli.coreGRPCAddr, }, + Fibre: &fibreCfg, }, }, }, kr) @@ -327,6 +331,19 @@ func run(cli cliFlags) error { executor := newInMemExecutor() sequencer := solo.NewSoloSequencer(logger, []byte(genesis.ChainID), executor) + // Cap the sequencer's in-memory queue at 10× the per-block tx + // budget. Above this, SubmitBatchTxs returns ErrQueueFull and the + // runner's reaper-bridge / tx-ingress applies backpressure (txs + // stay in the executor's txChan until the sequencer drains, and + // the chan's bound 503's /tx). Without this cap a fast loadgen + // (32 vCPU pushing >100 MB/s) outruns the 1 block/s drain and + // the queue grows monotonically — observed pre-fix as 24 GB of + // retained io.ReadAll bytes in heap snapshots before the daemon + // hit the 64 GiB box ceiling and OOM-killed. + // Sized at 10× the per-block tx budget (matches SetMaxBlobSize + // above; both anchor at the per-blob Fibre cap). + const seqQueueBytes = 10 * 100 * 1024 * 1024 // 1 GiB + sequencer.SetMaxQueueBytes(seqQueueBytes) daClient := block.NewFiberDAClient(adapter, cfg, logger, 0) p2pClient, err := p2p.NewClient(cfg.P2P, nodeKey.PrivKey, datastore.NewMapDatastore(), genesis.ChainID, logger, nil) if err != nil { @@ -479,23 +496,23 @@ type inMemExecutor struct { totalTxs atomic.Uint64 } -// txChan capacity caps in-flight memory: at 10 KB tx and 500 slots -// we hold ≤ 5 MB queued before /tx blocks the ingress goroutine — -// which is exactly the backpressure we want against a hot loadgen. -// Reaper drains every 100 ms into the solo sequencer, which then -// accumulates batches between block-production ticks; without a tight -// cap a single block can balloon past the 120 MiB DA blob limit and -// the rest of the daemon's per-block allocations push the box past -// its RAM budget within seconds. +// txChan capacity bounds the HTTP /tx ingest queue. Sized at 10K +// slots (~100 MiB at 10 KB tx-size) so a 100 ms reaper cycle can +// absorb a full max-size block's worth of txs without /tx blocking +// the loadgen. Earlier we used 500 slots (~5 MiB) which forced +// backpressure at ~5,000 tx/s — that turned txsim into the limiting +// factor at ~22 MB/s rather than DA upload. With the per-block +// FilterTxs cap (executor.go:RetrieveBatch via DefaultMaxBlobSize= +// 100 MiB) and the submitter chunker now enforcing the actual blob +// budget, the executor doesn't need an extra ingest-side cap. // -// maxBlockTxs caps GetTxs's per-call return so reaper-cycle batches -// are bounded too. With 500 ≤ 5 MB per block at 10 KB tx-size, we -// stay an order of magnitude under the DA cap so headers/data signing -// + envelope cache + retry buffers all fit. +// maxBlockTxs caps GetTxs's per-call return; pairs with the channel +// size so a reaper poll can fully drain a 100 MiB-block-worth of +// queued txs in a single call instead of needing 20× cycles. func newInMemExecutor() *inMemExecutor { return &inMemExecutor{ - txChan: make(chan []byte, 500), - maxBlockTxs: 500, + txChan: make(chan []byte, 10000), + maxBlockTxs: 10000, } } @@ -547,10 +564,24 @@ func (e *inMemExecutor) GetExecutionInfo(_ context.Context) (coreexecution.Execu return coreexecution.ExecutionInfo{MaxGas: 0}, nil } -func (e *inMemExecutor) FilterTxs(_ context.Context, txs [][]byte, _, _ uint64, _ bool) ([]coreexecution.FilterStatus, error) { +// FilterTxs admits txs in arrival order until the maxBytes budget is +// reached, then postpones the rest back to the sequencer queue so they +// land in a future batch. Skipping this enforcement (a previous version +// returned FilterOK unconditionally) lets a single block sweep up the +// entire mempool — under sustained txsim load that produced 369 MiB +// blocks that exceeded Fibre's per-upload cap and crashed the node +// with `single item exceeds DA blob size limit`. +func (e *inMemExecutor) FilterTxs(_ context.Context, txs [][]byte, maxBytes, _ uint64, _ bool) ([]coreexecution.FilterStatus, error) { st := make([]coreexecution.FilterStatus, len(txs)) - for i := range st { + var used uint64 + for i, tx := range txs { + size := uint64(len(tx)) + if maxBytes > 0 && used+size > maxBytes { + st[i] = coreexecution.FilterPostpone + continue + } st[i] = coreexecution.FilterOK + used += size } return st, nil }