diff --git a/.mockery.yaml b/.mockery.yaml index f1d971730..b5e092cd9 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -80,3 +80,10 @@ packages: dir: ./pkg/da/jsonrpc/mocks pkgname: mocks filename: header_module_mock.go + github.com/evstack/ev-node/block: + interfaces: + ForcedInclusionRetriever: + config: + dir: ./pkg/sequencers/common + pkgname: common + filename: forced_inclusion_retriever_mock.go diff --git a/apps/evm/cmd/run.go b/apps/evm/cmd/run.go index 08dcd8109..eef0fa379 100644 --- a/apps/evm/cmd/run.go +++ b/apps/evm/cmd/run.go @@ -167,8 +167,7 @@ func createSequencer( return nil, fmt.Errorf("based sequencer mode requires aggregator mode to be enabled") } - fiRetriever := block.NewForcedInclusionRetriever(daClient, logger, genesis.DAStartHeight, genesis.DAEpochForcedInclusion) - basedSeq, err := based.NewBasedSequencer(fiRetriever, datastore, genesis, logger) + basedSeq, err := based.NewBasedSequencer(daClient, nodeConfig, datastore, genesis, logger) if err != nil { return nil, fmt.Errorf("failed to create based sequencer: %w", err) } @@ -185,8 +184,8 @@ func createSequencer( logger, datastore, daClient, + nodeConfig, []byte(genesis.ChainID), - nodeConfig.Node.BlockTime.Duration, 1000, genesis, ) diff --git a/apps/grpc/cmd/run.go b/apps/grpc/cmd/run.go index 021a3bb8f..2cd4d5610 100644 --- a/apps/grpc/cmd/run.go +++ b/apps/grpc/cmd/run.go @@ -127,8 +127,7 @@ func createSequencer( return nil, fmt.Errorf("based sequencer mode requires aggregator mode to be enabled") } - fiRetriever := block.NewForcedInclusionRetriever(daClient, logger, genesis.DAStartHeight, genesis.DAEpochForcedInclusion) - basedSeq, err := based.NewBasedSequencer(fiRetriever, datastore, genesis, logger) + basedSeq, err := based.NewBasedSequencer(daClient, nodeConfig, datastore, genesis, logger) if err != nil { return nil, fmt.Errorf("failed to create based sequencer: %w", err) } @@ -145,8 +144,8 @@ func createSequencer( logger, datastore, daClient, + nodeConfig, []byte(genesis.ChainID), - nodeConfig.Node.BlockTime.Duration, 1000, genesis, ) diff --git a/apps/testapp/cmd/run.go b/apps/testapp/cmd/run.go index 122ca9279..bc8ab889d 100644 --- a/apps/testapp/cmd/run.go +++ b/apps/testapp/cmd/run.go @@ -128,8 +128,7 @@ func createSequencer( return nil, fmt.Errorf("based sequencer mode requires aggregator mode to be enabled") } - fiRetriever := block.NewForcedInclusionRetriever(daClient, logger, genesis.DAStartHeight, genesis.DAEpochForcedInclusion) - basedSeq, err := based.NewBasedSequencer(fiRetriever, datastore, genesis, logger) + basedSeq, err := based.NewBasedSequencer(daClient, nodeConfig, datastore, genesis, logger) if err != nil { return nil, fmt.Errorf("failed to create based sequencer: %w", err) } @@ -146,8 +145,8 @@ func createSequencer( logger, datastore, daClient, + nodeConfig, []byte(genesis.ChainID), - nodeConfig.Node.BlockTime.Duration, 1000, genesis, ) diff --git a/block/internal/da/async_block_retriever.go b/block/internal/da/async_block_retriever.go new file mode 100644 index 000000000..5c147de30 --- /dev/null +++ b/block/internal/da/async_block_retriever.go @@ -0,0 +1,332 @@ +package da + +import ( + "context" + "errors" + "fmt" + "sync" + "sync/atomic" + "time" + + ds "github.com/ipfs/go-datastore" + dsq "github.com/ipfs/go-datastore/query" + dsync "github.com/ipfs/go-datastore/sync" + "github.com/rs/zerolog" + "google.golang.org/protobuf/proto" + + "github.com/evstack/ev-node/pkg/config" + datypes "github.com/evstack/ev-node/pkg/da/types" + pb "github.com/evstack/ev-node/types/pb/evnode/v1" +) + +// AsyncBlockRetriever provides background prefetching of DA blocks +type AsyncBlockRetriever interface { + Start() + Stop() + GetCachedBlock(ctx context.Context, daHeight uint64) (*BlockData, error) + UpdateCurrentHeight(height uint64) +} + +// BlockData contains data retrieved from a single DA height +type BlockData struct { + Height uint64 + Timestamp time.Time + Blobs [][]byte +} + +// asyncBlockRetriever handles background prefetching of individual DA blocks +// from a specific namespace. +type asyncBlockRetriever struct { + client Client + logger zerolog.Logger + namespace []byte + daStartHeight uint64 + + // In-memory cache for prefetched block data + cache ds.Batching + + // Background fetcher control + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + + // Current DA height tracking (accessed atomically) + currentDAHeight atomic.Uint64 + + // Prefetch window - how many blocks ahead to prefetch + prefetchWindow uint64 + + // Polling interval for checking new DA heights + pollInterval time.Duration +} + +// NewAsyncBlockRetriever creates a new async block retriever with in-memory cache. +func NewAsyncBlockRetriever( + client Client, + logger zerolog.Logger, + namespace []byte, + config config.Config, + daStartHeight uint64, + prefetchWindow uint64, +) AsyncBlockRetriever { + if prefetchWindow == 0 { + prefetchWindow = 10 // Default: prefetch next 10 blocks + } + + ctx, cancel := context.WithCancel(context.Background()) + + fetcher := &asyncBlockRetriever{ + client: client, + logger: logger.With().Str("component", "async_block_retriever").Logger(), + namespace: namespace, + daStartHeight: daStartHeight, + cache: dsync.MutexWrap(ds.NewMapDatastore()), + ctx: ctx, + cancel: cancel, + prefetchWindow: prefetchWindow, + pollInterval: config.DA.BlockTime.Duration, + } + fetcher.currentDAHeight.Store(daStartHeight) + return fetcher +} + +// Start begins the background prefetching process. +func (f *asyncBlockRetriever) Start() { + f.wg.Add(1) + go f.backgroundFetchLoop() + f.logger.Debug(). + Uint64("da_start_height", f.daStartHeight). + Uint64("prefetch_window", f.prefetchWindow). + Dur("poll_interval", f.pollInterval). + Msg("async block retriever started") +} + +// Stop gracefully stops the background prefetching process. +func (f *asyncBlockRetriever) Stop() { + f.logger.Debug().Msg("stopping async block retriever") + f.cancel() + f.wg.Wait() +} + +// UpdateCurrentHeight updates the current DA height for prefetching. +func (f *asyncBlockRetriever) UpdateCurrentHeight(height uint64) { + // Use atomic compare-and-swap to update only if the new height is greater + for { + current := f.currentDAHeight.Load() + if height <= current { + return + } + if f.currentDAHeight.CompareAndSwap(current, height) { + f.logger.Debug(). + Uint64("new_height", height). + Msg("updated current DA height") + return + } + } +} + +func newBlockDataKey(height uint64) ds.Key { + return ds.NewKey(fmt.Sprintf("/block/%d", height)) +} + +// GetCachedBlock retrieves a cached block from memory. +// Returns nil if the block is not cached. +func (f *asyncBlockRetriever) GetCachedBlock(ctx context.Context, daHeight uint64) (*BlockData, error) { + if len(f.namespace) == 0 { + return nil, nil + } + + if daHeight < f.daStartHeight { + return nil, fmt.Errorf("DA height %d is before the configured start height %d", daHeight, f.daStartHeight) + } + + key := newBlockDataKey(daHeight) + data, err := f.cache.Get(ctx, key) + if err != nil { + if errors.Is(err, ds.ErrNotFound) { + return nil, nil // Not cached yet + } + return nil, fmt.Errorf("failed to get cached block: %w", err) + } + + // Deserialize the cached block + var pbBlock pb.BlockData + if err := proto.Unmarshal(data, &pbBlock); err != nil { + return nil, fmt.Errorf("failed to unmarshal cached block: %w", err) + } + + block := &BlockData{ + Height: pbBlock.Height, + Timestamp: time.Unix(pbBlock.Timestamp, 0).UTC(), + Blobs: pbBlock.Blobs, + } + + f.logger.Debug(). + Uint64("da_height", daHeight). + Int("blob_count", len(block.Blobs)). + Msg("retrieved block from cache") + + return block, nil +} + +// backgroundFetchLoop runs in the background and prefetches blocks ahead of time. +func (f *asyncBlockRetriever) backgroundFetchLoop() { + defer f.wg.Done() + + ticker := time.NewTicker(f.pollInterval) + defer ticker.Stop() + + for { + select { + case <-f.ctx.Done(): + return + case <-ticker.C: + f.prefetchBlocks() + } + } +} + +// prefetchBlocks prefetches blocks within the prefetch window. +func (f *asyncBlockRetriever) prefetchBlocks() { + if len(f.namespace) == 0 { + return + } + + currentHeight := f.currentDAHeight.Load() + + // Prefetch upcoming blocks + for i := uint64(0); i < f.prefetchWindow; i++ { + targetHeight := currentHeight + i + + // Check if already cached + key := newBlockDataKey(targetHeight) + _, err := f.cache.Get(f.ctx, key) + if err == nil { + // Already cached + continue + } + + // Fetch and cache the block + f.fetchAndCacheBlock(targetHeight) + } + + // Clean up old blocks from cache to prevent memory growth + f.cleanupOldBlocks(currentHeight) +} + +// fetchAndCacheBlock fetches a block and stores it in the cache. +func (f *asyncBlockRetriever) fetchAndCacheBlock(height uint64) { + f.logger.Debug(). + Uint64("height", height). + Msg("prefetching block") + + result := f.client.Retrieve(f.ctx, height, f.namespace) + + block := &BlockData{ + Height: height, + Timestamp: result.Timestamp, + Blobs: [][]byte{}, + } + + switch result.Code { + case datypes.StatusHeightFromFuture: + f.logger.Debug(). + Uint64("height", height). + Msg("block height not yet available - will retry") + return + case datypes.StatusNotFound: + f.logger.Debug(). + Uint64("height", height). + Msg("no blobs at height") + // Cache empty result to avoid re-fetching + case datypes.StatusSuccess: + // Process each blob + for _, blob := range result.Data { + if len(blob) > 0 { + block.Blobs = append(block.Blobs, blob) + } + } + f.logger.Debug(). + Uint64("height", height). + Int("blob_count", len(result.Data)). + Msg("processed blobs for prefetch") + default: + f.logger.Debug(). + Uint64("height", height). + Str("status", result.Message). + Msg("failed to retrieve block - will retry") + return + } + + // Serialize and cache the block + pbBlock := &pb.BlockData{ + Height: block.Height, + Timestamp: block.Timestamp.Unix(), + Blobs: block.Blobs, + } + data, err := proto.Marshal(pbBlock) + if err != nil { + f.logger.Error(). + Err(err). + Uint64("height", height). + Msg("failed to marshal block for caching") + return + } + + key := newBlockDataKey(height) + err = f.cache.Put(f.ctx, key, data) + if err != nil { + f.logger.Error(). + Err(err). + Uint64("height", height). + Msg("failed to cache block") + return + } + + f.logger.Debug(). + Uint64("height", height). + Int("blob_count", len(block.Blobs)). + Msg("successfully prefetched and cached block") +} + +// cleanupOldBlocks removes blocks older than a threshold from cache. +func (f *asyncBlockRetriever) cleanupOldBlocks(currentHeight uint64) { + // Remove blocks older than current - prefetchWindow + if currentHeight < f.prefetchWindow { + return + } + + cleanupThreshold := currentHeight - f.prefetchWindow + + // Query all keys + query := dsq.Query{Prefix: "/block/"} + results, err := f.cache.Query(f.ctx, query) + if err != nil { + f.logger.Debug().Err(err).Msg("failed to query cache for cleanup") + return + } + defer results.Close() + + for result := range results.Next() { + if result.Error != nil { + continue + } + + key := ds.NewKey(result.Key) + // Extract height from key + var height uint64 + _, err := fmt.Sscanf(key.String(), "/block/%d", &height) + if err != nil { + continue + } + + if height < cleanupThreshold { + if err := f.cache.Delete(f.ctx, key); err != nil { + f.logger.Debug(). + Err(err). + Uint64("height", height). + Msg("failed to delete old block from cache") + } + } + } +} diff --git a/block/internal/da/async_block_retriever_test.go b/block/internal/da/async_block_retriever_test.go new file mode 100644 index 000000000..82525ea31 --- /dev/null +++ b/block/internal/da/async_block_retriever_test.go @@ -0,0 +1,271 @@ +package da + +import ( + "context" + "testing" + "time" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + + "github.com/evstack/ev-node/pkg/config" + datypes "github.com/evstack/ev-node/pkg/da/types" + pb "github.com/evstack/ev-node/types/pb/evnode/v1" + + mocks "github.com/evstack/ev-node/test/mocks" +) + +func TestAsyncBlockRetriever_GetCachedBlock_NoNamespace(t *testing.T) { + client := &mocks.MockClient{} + + logger := zerolog.Nop() + fetcher := NewAsyncBlockRetriever(client, logger, nil, config.DefaultConfig(), 100, 10) + + ctx := context.Background() + block, err := fetcher.GetCachedBlock(ctx, 100) + assert.NoError(t, err) + assert.Nil(t, block) +} + +func TestAsyncBlockRetriever_GetCachedBlock_CacheMiss(t *testing.T) { + client := &mocks.MockClient{} + fiNs := datypes.NamespaceFromString("test-fi-ns").Bytes() + + logger := zerolog.Nop() + fetcher := NewAsyncBlockRetriever(client, logger, fiNs, config.DefaultConfig(), 100, 10) + + ctx := context.Background() + + // Nothing cached yet + block, err := fetcher.GetCachedBlock(ctx, 100) + require.NoError(t, err) + assert.Nil(t, block) // Cache miss +} + +func TestAsyncBlockRetriever_FetchAndCache(t *testing.T) { + testBlobs := [][]byte{ + []byte("tx1"), + []byte("tx2"), + []byte("tx3"), + } + + client := &mocks.MockClient{} + fiNs := datypes.NamespaceFromString("test-fi-ns").Bytes() + + // Mock Retrieve call for height 100 + client.On("Retrieve", mock.Anything, uint64(100), fiNs).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{ + Code: datypes.StatusSuccess, + Timestamp: time.Unix(1000, 0), + }, + Data: testBlobs, + }).Once() + + // Mock other heights that will be prefetched + for height := uint64(101); height <= 109; height++ { + client.On("Retrieve", mock.Anything, height, fiNs).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusNotFound}, + }).Maybe() + } + + logger := zerolog.Nop() + // Use a short poll interval for faster test execution + cfg := config.DefaultConfig() + cfg.DA.BlockTime.Duration = 100 * time.Millisecond + fetcher := NewAsyncBlockRetriever(client, logger, fiNs, cfg, 100, 10) + fetcher.Start() + defer fetcher.Stop() + + // Update current height to trigger prefetch + fetcher.UpdateCurrentHeight(100) + + // Wait for the background fetch to complete by polling the cache + ctx := context.Background() + var block *BlockData + var err error + + // Poll for up to 2 seconds for the block to be cached + for i := 0; i < 40; i++ { + block, err = fetcher.GetCachedBlock(ctx, 100) + require.NoError(t, err) + if block != nil { + break + } + time.Sleep(50 * time.Millisecond) + } + + require.NotNil(t, block, "block should be cached after background fetch") + assert.Equal(t, uint64(100), block.Height) + assert.Equal(t, 3, len(block.Blobs)) + for i, tb := range testBlobs { + assert.Equal(t, tb, block.Blobs[i]) + } +} + +func TestAsyncBlockRetriever_BackgroundPrefetch(t *testing.T) { + testBlobs := [][]byte{ + []byte("tx1"), + []byte("tx2"), + } + + client := &mocks.MockClient{} + fiNs := datypes.NamespaceFromString("test-fi-ns").Bytes() + + // Mock for heights 100-110 (current + prefetch window) + for height := uint64(100); height <= 110; height++ { + if height == 105 { + client.On("Retrieve", mock.Anything, height, fiNs).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{ + Code: datypes.StatusSuccess, + Timestamp: time.Unix(2000, 0), + }, + Data: testBlobs, + }).Maybe() + } else { + client.On("Retrieve", mock.Anything, height, fiNs).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusNotFound}, + }).Maybe() + } + } + + logger := zerolog.Nop() + cfg := config.DefaultConfig() + cfg.DA.BlockTime.Duration = 100 * time.Millisecond + fetcher := NewAsyncBlockRetriever(client, logger, fiNs, cfg, 100, 10) + + fetcher.Start() + defer fetcher.Stop() + + // Update current height to trigger prefetch + fetcher.UpdateCurrentHeight(100) + + // Wait for background prefetch to happen (wait for at least one poll cycle) + time.Sleep(250 * time.Millisecond) + + // Check if block was prefetched + ctx := context.Background() + block, err := fetcher.GetCachedBlock(ctx, 105) + require.NoError(t, err) + assert.NotNil(t, block) + assert.Equal(t, uint64(105), block.Height) + assert.Equal(t, 2, len(block.Blobs)) + +} + +func TestAsyncBlockRetriever_HeightFromFuture(t *testing.T) { + client := &mocks.MockClient{} + fiNs := datypes.NamespaceFromString("test-fi-ns").Bytes() + + // All heights in prefetch window not available yet + for height := uint64(100); height <= 109; height++ { + client.On("Retrieve", mock.Anything, height, fiNs).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, + }).Maybe() + } + + logger := zerolog.Nop() + cfg := config.DefaultConfig() + cfg.DA.BlockTime.Duration = 100 * time.Millisecond + fetcher := NewAsyncBlockRetriever(client, logger, fiNs, cfg, 100, 10) + fetcher.Start() + defer fetcher.Stop() + + fetcher.UpdateCurrentHeight(100) + + // Wait for at least one poll cycle + time.Sleep(250 * time.Millisecond) + + // Cache should be empty + ctx := context.Background() + block, err := fetcher.GetCachedBlock(ctx, 100) + require.NoError(t, err) + assert.Nil(t, block) +} + +func TestAsyncBlockRetriever_StopGracefully(t *testing.T) { + client := &mocks.MockClient{} + fiNs := datypes.NamespaceFromString("test-fi-ns").Bytes() + + logger := zerolog.Nop() + fetcher := NewAsyncBlockRetriever(client, logger, fiNs, config.DefaultConfig(), 100, 10) + + fetcher.Start() + time.Sleep(100 * time.Millisecond) + + // Should stop gracefully without panic + fetcher.Stop() +} + +func TestBlockData_Serialization(t *testing.T) { + block := &BlockData{ + Height: 100, + Timestamp: time.Unix(12345, 0).UTC(), + Blobs: [][]byte{ + []byte("blob1"), + []byte("blob2"), + []byte("another_blob"), + }, + } + + // Serialize using protobuf + pbBlock := &pb.BlockData{ + Height: block.Height, + Timestamp: block.Timestamp.Unix(), + Blobs: block.Blobs, + } + data, err := proto.Marshal(pbBlock) + require.NoError(t, err) + assert.Greater(t, len(data), 0) + + // Deserialize using protobuf + var decodedPb pb.BlockData + err = proto.Unmarshal(data, &decodedPb) + require.NoError(t, err) + + decoded := &BlockData{ + Height: decodedPb.Height, + Timestamp: time.Unix(decodedPb.Timestamp, 0).UTC(), + Blobs: decodedPb.Blobs, + } + + assert.Equal(t, block.Timestamp.Unix(), decoded.Timestamp.Unix()) + assert.Equal(t, block.Height, decoded.Height) + assert.Equal(t, len(block.Blobs), len(decoded.Blobs)) + for i := range block.Blobs { + assert.Equal(t, block.Blobs[i], decoded.Blobs[i]) + } +} + +func TestBlockData_SerializationEmpty(t *testing.T) { + block := &BlockData{ + Height: 100, + Timestamp: time.Unix(0, 0).UTC(), + Blobs: [][]byte{}, + } + + // Serialize using protobuf + pbBlock := &pb.BlockData{ + Height: block.Height, + Timestamp: block.Timestamp.Unix(), + Blobs: block.Blobs, + } + data, err := proto.Marshal(pbBlock) + require.NoError(t, err) + + // Deserialize using protobuf + var decodedPb pb.BlockData + err = proto.Unmarshal(data, &decodedPb) + require.NoError(t, err) + + decoded := &BlockData{ + Height: decodedPb.Height, + Timestamp: time.Unix(decodedPb.Timestamp, 0).UTC(), + Blobs: decodedPb.Blobs, + } + + assert.Equal(t, uint64(100), decoded.Height) + assert.Equal(t, 0, len(decoded.Blobs)) +} diff --git a/block/internal/da/forced_inclusion_retriever.go b/block/internal/da/forced_inclusion_retriever.go index 651c071a3..7b07b7d5d 100644 --- a/block/internal/da/forced_inclusion_retriever.go +++ b/block/internal/da/forced_inclusion_retriever.go @@ -8,6 +8,7 @@ import ( "github.com/rs/zerolog" + "github.com/evstack/ev-node/pkg/config" datypes "github.com/evstack/ev-node/pkg/da/types" "github.com/evstack/ev-node/types" ) @@ -21,6 +22,7 @@ type ForcedInclusionRetriever struct { logger zerolog.Logger daEpochSize uint64 daStartHeight uint64 + asyncFetcher AsyncBlockRetriever } // ForcedInclusionEvent contains forced inclusion transactions retrieved from DA. @@ -32,21 +34,43 @@ type ForcedInclusionEvent struct { } // NewForcedInclusionRetriever creates a new forced inclusion retriever. +// It internally creates and manages an AsyncBlockRetriever for background prefetching. func NewForcedInclusionRetriever( client Client, logger zerolog.Logger, + cfg config.Config, daStartHeight, daEpochSize uint64, ) *ForcedInclusionRetriever { + retrieverLogger := logger.With().Str("component", "forced_inclusion_retriever").Logger() + + // Create async block retriever for background prefetching + asyncFetcher := NewAsyncBlockRetriever( + client, + logger, + client.GetForcedInclusionNamespace(), + cfg, + daStartHeight, + daEpochSize*2, // prefetch window: 2x epoch size + ) + asyncFetcher.Start() + return &ForcedInclusionRetriever{ client: client, - logger: logger.With().Str("component", "forced_inclusion_retriever").Logger(), + logger: retrieverLogger, daStartHeight: daStartHeight, daEpochSize: daEpochSize, + asyncFetcher: asyncFetcher, } } +// Stop stops the background prefetcher. +func (r *ForcedInclusionRetriever) Stop() { + r.asyncFetcher.Stop() +} + // RetrieveForcedIncludedTxs retrieves forced inclusion transactions at the given DA height. -// It respects epoch boundaries and only fetches at epoch start. +// It respects epoch boundaries and only fetches at epoch end. +// It tries to get blocks from the async fetcher cache first, then falls back to sync fetching. func (r *ForcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context, daHeight uint64) (*ForcedInclusionEvent, error) { // when daStartHeight is not set or no namespace is configured, we retrieve nothing. if !r.client.HasForcedInclusionNamespace() { @@ -59,6 +83,9 @@ func (r *ForcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context epochStart, epochEnd, currentEpochNumber := types.CalculateEpochBoundaries(daHeight, r.daStartHeight, r.daEpochSize) + // Update the async fetcher's current height so it knows what to prefetch + r.asyncFetcher.UpdateCurrentHeight(daHeight) + if daHeight != epochEnd { r.logger.Debug(). Uint64("da_height", daHeight). @@ -72,54 +99,113 @@ func (r *ForcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context }, nil } + r.logger.Debug(). + Uint64("da_height", daHeight). + Uint64("epoch_start", epochStart). + Uint64("epoch_end", epochEnd). + Uint64("epoch_num", currentEpochNumber). + Msg("retrieving forced included transactions from DA epoch") + event := &ForcedInclusionEvent{ StartDaHeight: epochStart, EndDaHeight: epochEnd, Txs: [][]byte{}, } - epochEndResult := r.client.Retrieve(ctx, epochEnd, r.client.GetForcedInclusionNamespace()) - if epochEndResult.Code == datypes.StatusHeightFromFuture { - r.logger.Debug(). - Uint64("epoch_end", epochEnd). - Msg("epoch end height not yet available on DA - backoff required") - return nil, fmt.Errorf("%w: epoch end height %d not yet available", datypes.ErrHeightFromFuture, epochEnd) + // Collect all heights in this epoch + var heights []uint64 + for h := epochStart; h <= epochEnd; h++ { + heights = append(heights, h) } - epochStartResult := epochEndResult - if epochStart != epochEnd { - epochStartResult = r.client.Retrieve(ctx, epochStart, r.client.GetForcedInclusionNamespace()) - if epochStartResult.Code == datypes.StatusHeightFromFuture { + // Try to get blocks from cache first + cachedBlocks := make(map[uint64]*BlockData) + var missingHeights []uint64 + + for _, h := range heights { + block, err := r.asyncFetcher.GetCachedBlock(ctx, h) + if err != nil { r.logger.Debug(). - Uint64("epoch_start", epochStart). - Msg("epoch start height not yet available on DA - backoff required") - return nil, fmt.Errorf("%w: epoch start height %d not yet available", datypes.ErrHeightFromFuture, epochStart) + Err(err). + Uint64("height", h). + Msg("error getting cached block, will fetch synchronously") + missingHeights = append(missingHeights, h) + continue + } + if block == nil { // Cache miss + missingHeights = append(missingHeights, h) + } else { // Cache hit + cachedBlocks[h] = block } } - r.logger.Debug(). - Uint64("da_height", daHeight). - Uint64("epoch_start", epochStart). - Uint64("epoch_end", epochEnd). - Uint64("epoch_num", currentEpochNumber). - Msg("retrieving forced included transactions from DA") - + // Fetch missing heights synchronously and store in map + syncFetchedBlocks := make(map[uint64]*BlockData) var processErrs error - err := r.processForcedInclusionBlobs(event, epochStartResult, epochStart) - processErrs = errors.Join(processErrs, err) + for _, h := range missingHeights { + result := r.client.Retrieve(ctx, h, r.client.GetForcedInclusionNamespace()) + if result.Code == datypes.StatusHeightFromFuture { + r.logger.Debug(). + Uint64("height", h). + Msg("height not yet available on DA - backoff required") + return nil, fmt.Errorf("%w: height %d not yet available", datypes.ErrHeightFromFuture, h) + } + + if result.Code == datypes.StatusNotFound { + r.logger.Debug().Uint64("height", h).Msg("no forced inclusion blobs at height") + continue + } - // Process heights between start and end (exclusive) - for epochHeight := epochStart + 1; epochHeight < epochEnd; epochHeight++ { - result := r.client.Retrieve(ctx, epochHeight, r.client.GetForcedInclusionNamespace()) + if result.Code != datypes.StatusSuccess { + err := fmt.Errorf("failed to retrieve forced inclusion blobs at height %d: %s", h, result.Message) + processErrs = errors.Join(processErrs, err) + continue + } - err = r.processForcedInclusionBlobs(event, result, epochHeight) - processErrs = errors.Join(processErrs, err) + // Store the sync-fetched block data + syncFetchedBlocks[h] = &BlockData{ + Blobs: result.Data, + Timestamp: result.Timestamp, + } } - // Process epoch end (only if different from start) - if epochEnd != epochStart { - err = r.processForcedInclusionBlobs(event, epochEndResult, epochEnd) - processErrs = errors.Join(processErrs, err) + // Process all blocks in height order + for _, h := range heights { + var block *BlockData + var source string + + // Check cached blocks first, then sync-fetched + if cachedBlock, ok := cachedBlocks[h]; ok { + block = cachedBlock + source = "cache" + } else if syncBlock, ok := syncFetchedBlocks[h]; ok { + block = syncBlock + source = "sync" + } + + if block != nil { + // Add blobs from block + for _, blob := range block.Blobs { + if len(blob) > 0 { + event.Txs = append(event.Txs, blob) + } + } + + // Update timestamp if newer + if block.Timestamp.After(event.Timestamp) { + event.Timestamp = block.Timestamp + } + + r.logger.Debug(). + Uint64("height", h). + Int("blob_count", len(block.Blobs)). + Str("source", source). + Msg("added blobs from block") + } + + // Clean up maps to prevent unbounded memory growth + delete(cachedBlocks, h) + delete(syncFetchedBlocks, h) } // any error during process, need to retry at next call @@ -141,37 +227,3 @@ func (r *ForcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context return event, nil } - -// processForcedInclusionBlobs processes blobs from a single DA height for forced inclusion. -func (r *ForcedInclusionRetriever) processForcedInclusionBlobs( - event *ForcedInclusionEvent, - result datypes.ResultRetrieve, - height uint64, -) error { - if result.Code == datypes.StatusNotFound { - r.logger.Debug().Uint64("height", height).Msg("no forced inclusion blobs at height") - return nil - } - - if result.Code != datypes.StatusSuccess { - return fmt.Errorf("failed to retrieve forced inclusion blobs at height %d: %s", height, result.Message) - } - - // Process each blob as a transaction - for _, blob := range result.Data { - if len(blob) > 0 { - event.Txs = append(event.Txs, blob) - } - } - - if result.Timestamp.After(event.Timestamp) { - event.Timestamp = result.Timestamp - } - - r.logger.Debug(). - Uint64("height", height). - Int("blob_count", len(result.Data)). - Msg("processed forced inclusion blobs") - - return nil -} diff --git a/block/internal/da/forced_inclusion_retriever_test.go b/block/internal/da/forced_inclusion_retriever_test.go index 6f35d0783..446b655be 100644 --- a/block/internal/da/forced_inclusion_retriever_test.go +++ b/block/internal/da/forced_inclusion_retriever_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/mock" "gotest.tools/v3/assert" + "github.com/evstack/ev-node/pkg/config" datypes "github.com/evstack/ev-node/pkg/da/types" "github.com/evstack/ev-node/pkg/genesis" "github.com/evstack/ev-node/test/mocks" @@ -16,7 +17,6 @@ import ( func TestNewForcedInclusionRetriever(t *testing.T) { client := mocks.NewMockClient(t) - client.On("HasForcedInclusionNamespace").Return(true).Maybe() client.On("GetForcedInclusionNamespace").Return(datypes.NamespaceFromString("test-fi-ns").Bytes()).Maybe() gen := genesis.Genesis{ @@ -24,20 +24,23 @@ func TestNewForcedInclusionRetriever(t *testing.T) { DAEpochForcedInclusion: 10, } - retriever := NewForcedInclusionRetriever(client, zerolog.Nop(), gen.DAStartHeight, gen.DAEpochForcedInclusion) + retriever := NewForcedInclusionRetriever(client, zerolog.Nop(), config.DefaultConfig(), gen.DAStartHeight, gen.DAEpochForcedInclusion) assert.Assert(t, retriever != nil) + retriever.Stop() } func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_NoNamespace(t *testing.T) { client := mocks.NewMockClient(t) - client.On("HasForcedInclusionNamespace").Return(false).Once() + client.On("HasForcedInclusionNamespace").Return(false).Maybe() + client.On("GetForcedInclusionNamespace").Return([]byte(nil)).Maybe() gen := genesis.Genesis{ DAStartHeight: 100, DAEpochForcedInclusion: 10, } - retriever := NewForcedInclusionRetriever(client, zerolog.Nop(), gen.DAStartHeight, gen.DAEpochForcedInclusion) + retriever := NewForcedInclusionRetriever(client, zerolog.Nop(), config.DefaultConfig(), gen.DAStartHeight, gen.DAEpochForcedInclusion) + defer retriever.Stop() ctx := context.Background() _, err := retriever.RetrieveForcedIncludedTxs(ctx, 100) @@ -48,7 +51,7 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_NoNamespace(t *testi func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_NotAtEpochStart(t *testing.T) { client := mocks.NewMockClient(t) fiNs := datypes.NamespaceFromString("test-fi-ns").Bytes() - client.On("HasForcedInclusionNamespace").Return(true).Once() + client.On("HasForcedInclusionNamespace").Return(true).Maybe() client.On("GetForcedInclusionNamespace").Return(fiNs).Maybe() gen := genesis.Genesis{ @@ -56,7 +59,8 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_NotAtEpochStart(t *t DAEpochForcedInclusion: 10, } - retriever := NewForcedInclusionRetriever(client, zerolog.Nop(), gen.DAStartHeight, gen.DAEpochForcedInclusion) + retriever := NewForcedInclusionRetriever(client, zerolog.Nop(), config.DefaultConfig(), gen.DAStartHeight, gen.DAEpochForcedInclusion) + defer retriever.Stop() ctx := context.Background() // Height 105 is not an epoch start (100, 110, 120, etc. are epoch starts) @@ -77,19 +81,20 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_EpochStartSuccess(t client := mocks.NewMockClient(t) fiNs := datypes.NamespaceFromString("test-fi-ns").Bytes() - client.On("HasForcedInclusionNamespace").Return(true).Once() + client.On("HasForcedInclusionNamespace").Return(true).Maybe() client.On("GetForcedInclusionNamespace").Return(fiNs).Maybe() client.On("Retrieve", mock.Anything, mock.Anything, fiNs).Return(datypes.ResultRetrieve{ BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, IDs: []datypes.ID{[]byte("id1"), []byte("id2"), []byte("id3")}, Timestamp: time.Now()}, Data: testBlobs, - }).Once() + }).Maybe() gen := genesis.Genesis{ DAStartHeight: 100, DAEpochForcedInclusion: 1, // Single height epoch } - retriever := NewForcedInclusionRetriever(client, zerolog.Nop(), gen.DAStartHeight, gen.DAEpochForcedInclusion) + retriever := NewForcedInclusionRetriever(client, zerolog.Nop(), config.DefaultConfig(), gen.DAStartHeight, gen.DAEpochForcedInclusion) + defer retriever.Stop() ctx := context.Background() // Height 100 is an epoch start @@ -105,9 +110,11 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_EpochStartSuccess(t func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_EpochStartNotAvailable(t *testing.T) { client := mocks.NewMockClient(t) fiNs := datypes.NamespaceFromString("test-fi-ns").Bytes() - client.On("HasForcedInclusionNamespace").Return(true).Once() + client.On("HasForcedInclusionNamespace").Return(true).Maybe() client.On("GetForcedInclusionNamespace").Return(fiNs).Maybe() - client.On("Retrieve", mock.Anything, uint64(109), fiNs).Return(datypes.ResultRetrieve{ + + // Mock the first height in epoch as not available + client.On("Retrieve", mock.Anything, uint64(100), fiNs).Return(datypes.ResultRetrieve{ BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, }).Once() @@ -116,10 +123,11 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_EpochStartNotAvailab DAEpochForcedInclusion: 10, } - retriever := NewForcedInclusionRetriever(client, zerolog.Nop(), gen.DAStartHeight, gen.DAEpochForcedInclusion) + retriever := NewForcedInclusionRetriever(client, zerolog.Nop(), config.DefaultConfig(), gen.DAStartHeight, gen.DAEpochForcedInclusion) + defer retriever.Stop() ctx := context.Background() - // Epoch boundaries: [100, 109] - retrieval happens at epoch end (109) + // Epoch boundaries: [100, 109] - now tries to fetch all blocks in epoch _, err := retriever.RetrieveForcedIncludedTxs(ctx, 109) assert.Assert(t, err != nil) assert.ErrorContains(t, err, "not yet available") @@ -128,7 +136,7 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_EpochStartNotAvailab func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_NoBlobsAtHeight(t *testing.T) { client := mocks.NewMockClient(t) fiNs := datypes.NamespaceFromString("test-fi-ns").Bytes() - client.On("HasForcedInclusionNamespace").Return(true).Once() + client.On("HasForcedInclusionNamespace").Return(true).Maybe() client.On("GetForcedInclusionNamespace").Return(fiNs).Maybe() client.On("Retrieve", mock.Anything, uint64(100), fiNs).Return(datypes.ResultRetrieve{ BaseResult: datypes.BaseResult{Code: datypes.StatusNotFound}, @@ -139,7 +147,8 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_NoBlobsAtHeight(t *t DAEpochForcedInclusion: 1, // Single height epoch } - retriever := NewForcedInclusionRetriever(client, zerolog.Nop(), gen.DAStartHeight, gen.DAEpochForcedInclusion) + retriever := NewForcedInclusionRetriever(client, zerolog.Nop(), config.DefaultConfig(), gen.DAStartHeight, gen.DAEpochForcedInclusion) + defer retriever.Stop() ctx := context.Background() event, err := retriever.RetrieveForcedIncludedTxs(ctx, 100) @@ -157,7 +166,7 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_MultiHeightEpoch(t * client := mocks.NewMockClient(t) fiNs := datypes.NamespaceFromString("test-fi-ns").Bytes() - client.On("HasForcedInclusionNamespace").Return(true).Once() + client.On("HasForcedInclusionNamespace").Return(true).Maybe() client.On("GetForcedInclusionNamespace").Return(fiNs).Maybe() client.On("Retrieve", mock.Anything, uint64(102), fiNs).Return(datypes.ResultRetrieve{ BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: time.Now()}, @@ -177,7 +186,8 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_MultiHeightEpoch(t * DAEpochForcedInclusion: 3, // Epoch: 100-102 } - retriever := NewForcedInclusionRetriever(client, zerolog.Nop(), gen.DAStartHeight, gen.DAEpochForcedInclusion) + retriever := NewForcedInclusionRetriever(client, zerolog.Nop(), config.DefaultConfig(), gen.DAStartHeight, gen.DAEpochForcedInclusion) + defer retriever.Stop() ctx := context.Background() // Epoch boundaries: [100, 102] - retrieval happens at epoch end (102) @@ -193,86 +203,112 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_MultiHeightEpoch(t * assert.Equal(t, len(event.Txs), expectedTxCount) } -func TestForcedInclusionRetriever_processForcedInclusionBlobs(t *testing.T) { +func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_ErrorHandling(t *testing.T) { client := mocks.NewMockClient(t) + fiNs := datypes.NamespaceFromString("test-fi-ns").Bytes() + client.On("HasForcedInclusionNamespace").Return(true).Maybe() + client.On("GetForcedInclusionNamespace").Return(fiNs).Maybe() + client.On("Retrieve", mock.Anything, uint64(100), fiNs).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{ + Code: datypes.StatusError, + Message: "test error", + }, + }).Once() gen := genesis.Genesis{ DAStartHeight: 100, - DAEpochForcedInclusion: 10, + DAEpochForcedInclusion: 1, // Single height epoch } - retriever := NewForcedInclusionRetriever(client, zerolog.Nop(), gen.DAStartHeight, gen.DAEpochForcedInclusion) - - tests := []struct { - name string - result datypes.ResultRetrieve - height uint64 - expectedTxCount int - expectError bool - }{ - { - name: "success with blobs", - result: datypes.ResultRetrieve{ - BaseResult: datypes.BaseResult{ - Code: datypes.StatusSuccess, - }, - Data: [][]byte{[]byte("tx1"), []byte("tx2")}, - }, - height: 100, - expectedTxCount: 2, - expectError: false, - }, - { - name: "not found", - result: datypes.ResultRetrieve{ - BaseResult: datypes.BaseResult{ - Code: datypes.StatusNotFound, - }, - }, - height: 100, - expectedTxCount: 0, - expectError: false, - }, - { - name: "error status", - result: datypes.ResultRetrieve{ - BaseResult: datypes.BaseResult{ - Code: datypes.StatusError, - Message: "test error", - }, - }, - height: 100, - expectError: true, - }, - { - name: "empty blobs are skipped", - result: datypes.ResultRetrieve{ - BaseResult: datypes.BaseResult{ - Code: datypes.StatusSuccess, - }, - Data: [][]byte{[]byte("tx1"), {}, []byte("tx2")}, - }, - height: 100, - expectedTxCount: 2, - expectError: false, - }, + retriever := NewForcedInclusionRetriever(client, zerolog.Nop(), config.DefaultConfig(), gen.DAStartHeight, gen.DAEpochForcedInclusion) + defer retriever.Stop() + ctx := context.Background() + + // Should return empty event with no error (errors are logged and retried later) + event, err := retriever.RetrieveForcedIncludedTxs(ctx, 100) + assert.NilError(t, err) + assert.Assert(t, event != nil) + assert.Equal(t, len(event.Txs), 0) +} + +func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_EmptyBlobsSkipped(t *testing.T) { + client := mocks.NewMockClient(t) + fiNs := datypes.NamespaceFromString("test-fi-ns").Bytes() + client.On("HasForcedInclusionNamespace").Return(true).Maybe() + client.On("GetForcedInclusionNamespace").Return(fiNs).Maybe() + client.On("Retrieve", mock.Anything, uint64(100), fiNs).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: time.Now()}, + Data: [][]byte{[]byte("tx1"), {}, []byte("tx2"), nil, []byte("tx3")}, + }).Once() + + gen := genesis.Genesis{ + DAStartHeight: 100, + DAEpochForcedInclusion: 1, // Single height epoch } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - event := &ForcedInclusionEvent{ - Txs: [][]byte{}, - } - - err := retriever.processForcedInclusionBlobs(event, tt.result, tt.height) - - if tt.expectError { - assert.Assert(t, err != nil) - } else { - assert.NilError(t, err) - assert.Equal(t, len(event.Txs), tt.expectedTxCount) - assert.Equal(t, event.Timestamp, time.Time{}) - } - }) + retriever := NewForcedInclusionRetriever(client, zerolog.Nop(), config.DefaultConfig(), gen.DAStartHeight, gen.DAEpochForcedInclusion) + defer retriever.Stop() + ctx := context.Background() + + event, err := retriever.RetrieveForcedIncludedTxs(ctx, 100) + assert.NilError(t, err) + assert.Assert(t, event != nil) + // Should skip empty and nil blobs + assert.Equal(t, len(event.Txs), 3) + assert.DeepEqual(t, event.Txs[0], []byte("tx1")) + assert.DeepEqual(t, event.Txs[1], []byte("tx2")) + assert.DeepEqual(t, event.Txs[2], []byte("tx3")) +} + +func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_OrderPreserved(t *testing.T) { + // Test that transactions are returned in height order even when fetched out of order + testBlobsByHeight := map[uint64][][]byte{ + 100: {[]byte("tx-100-1"), []byte("tx-100-2")}, + 101: {[]byte("tx-101-1")}, + 102: {[]byte("tx-102-1"), []byte("tx-102-2")}, + } + + client := mocks.NewMockClient(t) + fiNs := datypes.NamespaceFromString("test-fi-ns").Bytes() + client.On("HasForcedInclusionNamespace").Return(true).Maybe() + client.On("GetForcedInclusionNamespace").Return(fiNs).Maybe() + // Return heights out of order to test ordering is preserved + client.On("Retrieve", mock.Anything, uint64(102), fiNs).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: time.Now()}, + Data: testBlobsByHeight[102], + }).Once() + client.On("Retrieve", mock.Anything, uint64(100), fiNs).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: time.Now()}, + Data: testBlobsByHeight[100], + }).Once() + client.On("Retrieve", mock.Anything, uint64(101), fiNs).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: time.Now()}, + Data: testBlobsByHeight[101], + }).Once() + + gen := genesis.Genesis{ + DAStartHeight: 100, + DAEpochForcedInclusion: 3, // Epoch: 100-102 + } + + retriever := NewForcedInclusionRetriever(client, zerolog.Nop(), config.DefaultConfig(), gen.DAStartHeight, gen.DAEpochForcedInclusion) + defer retriever.Stop() + ctx := context.Background() + + event, err := retriever.RetrieveForcedIncludedTxs(ctx, 102) + assert.NilError(t, err) + assert.Assert(t, event != nil) + + // Verify transactions are in height order: 100, 100, 101, 102, 102 + expectedOrder := [][]byte{ + []byte("tx-100-1"), + []byte("tx-100-2"), + []byte("tx-101-1"), + []byte("tx-102-1"), + []byte("tx-102-2"), + } + assert.Equal(t, len(event.Txs), len(expectedOrder)) + for i, expected := range expectedOrder { + assert.DeepEqual(t, event.Txs[i], expected) } } diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 266bc55e4..0d6d56f0d 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -187,7 +187,7 @@ func (s *Syncer) Start(ctx context.Context) error { // Initialize handlers s.daRetriever = NewDARetriever(s.daClient, s.cache, s.genesis, s.logger) - s.fiRetriever = da.NewForcedInclusionRetriever(s.daClient, s.logger, s.genesis.DAStartHeight, s.genesis.DAEpochForcedInclusion) + s.fiRetriever = da.NewForcedInclusionRetriever(s.daClient, s.logger, s.config, s.genesis.DAStartHeight, s.genesis.DAEpochForcedInclusion) s.p2pHandler = NewP2PHandler(s.headerStore.Store(), s.dataStore.Store(), s.cache, s.genesis, s.logger) if currentHeight, err := s.store.Height(s.ctx); err != nil { s.logger.Error().Err(err).Msg("failed to set initial processed height for p2p handler") diff --git a/block/internal/syncing/syncer_forced_inclusion_test.go b/block/internal/syncing/syncer_forced_inclusion_test.go index 8b0622a42..61c556e83 100644 --- a/block/internal/syncing/syncer_forced_inclusion_test.go +++ b/block/internal/syncing/syncer_forced_inclusion_test.go @@ -14,7 +14,7 @@ import ( "github.com/evstack/ev-node/block/internal/cache" "github.com/evstack/ev-node/block/internal/common" - da "github.com/evstack/ev-node/block/internal/da" + "github.com/evstack/ev-node/block/internal/da" "github.com/evstack/ev-node/pkg/config" datypes "github.com/evstack/ev-node/pkg/da/types" "github.com/evstack/ev-node/pkg/genesis" @@ -370,7 +370,8 @@ func TestVerifyForcedInclusionTxs_AllTransactionsIncluded(t *testing.T) { client.On("GetForcedInclusionNamespace").Return([]byte(cfg.DA.ForcedInclusionNamespace)).Maybe() client.On("HasForcedInclusionNamespace").Return(true).Maybe() daRetriever := NewDARetriever(client, cm, gen, zerolog.Nop()) - fiRetriever := da.NewForcedInclusionRetriever(client, zerolog.Nop(), gen.DAStartHeight, gen.DAEpochForcedInclusion) + fiRetriever := da.NewForcedInclusionRetriever(client, zerolog.Nop(), config.DefaultConfig(), gen.DAStartHeight, gen.DAEpochForcedInclusion) + defer fiRetriever.Stop() s := NewSyncer( st, @@ -443,7 +444,8 @@ func TestVerifyForcedInclusionTxs_MissingTransactions(t *testing.T) { client.On("GetForcedInclusionNamespace").Return([]byte(cfg.DA.ForcedInclusionNamespace)).Maybe() client.On("HasForcedInclusionNamespace").Return(true).Maybe() daRetriever := NewDARetriever(client, cm, gen, zerolog.Nop()) - fiRetriever := da.NewForcedInclusionRetriever(client, zerolog.Nop(), gen.DAStartHeight, gen.DAEpochForcedInclusion) + fiRetriever := da.NewForcedInclusionRetriever(client, zerolog.Nop(), config.DefaultConfig(), gen.DAStartHeight, gen.DAEpochForcedInclusion) + defer fiRetriever.Stop() s := NewSyncer( st, @@ -546,7 +548,8 @@ func TestVerifyForcedInclusionTxs_PartiallyIncluded(t *testing.T) { client.On("GetForcedInclusionNamespace").Return([]byte(cfg.DA.ForcedInclusionNamespace)).Maybe() client.On("HasForcedInclusionNamespace").Return(true).Maybe() daRetriever := NewDARetriever(client, cm, gen, zerolog.Nop()) - fiRetriever := da.NewForcedInclusionRetriever(client, zerolog.Nop(), gen.DAStartHeight, gen.DAEpochForcedInclusion) + fiRetriever := da.NewForcedInclusionRetriever(client, zerolog.Nop(), config.DefaultConfig(), gen.DAStartHeight, gen.DAEpochForcedInclusion) + defer fiRetriever.Stop() s := NewSyncer( st, @@ -653,7 +656,8 @@ func TestVerifyForcedInclusionTxs_NoForcedTransactions(t *testing.T) { client.On("GetForcedInclusionNamespace").Return([]byte(cfg.DA.ForcedInclusionNamespace)).Maybe() client.On("HasForcedInclusionNamespace").Return(true).Maybe() daRetriever := NewDARetriever(client, cm, gen, zerolog.Nop()) - fiRetriever := da.NewForcedInclusionRetriever(client, zerolog.Nop(), gen.DAStartHeight, gen.DAEpochForcedInclusion) + fiRetriever := da.NewForcedInclusionRetriever(client, zerolog.Nop(), config.DefaultConfig(), gen.DAStartHeight, gen.DAEpochForcedInclusion) + defer fiRetriever.Stop() s := NewSyncer( st, @@ -718,8 +722,10 @@ func TestVerifyForcedInclusionTxs_NamespaceNotConfigured(t *testing.T) { client.On("GetDataNamespace").Return([]byte(cfg.DA.DataNamespace)).Maybe() client.On("GetForcedInclusionNamespace").Return([]byte(nil)).Maybe() client.On("HasForcedInclusionNamespace").Return(false).Maybe() + client.On("GetForcedInclusionNamespace").Return([]byte(nil)).Maybe() daRetriever := NewDARetriever(client, cm, gen, zerolog.Nop()) - fiRetriever := da.NewForcedInclusionRetriever(client, zerolog.Nop(), gen.DAStartHeight, gen.DAEpochForcedInclusion) + fiRetriever := da.NewForcedInclusionRetriever(client, zerolog.Nop(), config.DefaultConfig(), gen.DAStartHeight, gen.DAEpochForcedInclusion) + defer fiRetriever.Stop() s := NewSyncer( st, @@ -784,7 +790,8 @@ func TestVerifyForcedInclusionTxs_DeferralWithinEpoch(t *testing.T) { client.On("GetForcedInclusionNamespace").Return([]byte(cfg.DA.ForcedInclusionNamespace)).Maybe() client.On("HasForcedInclusionNamespace").Return(true).Maybe() daRetriever := NewDARetriever(client, cm, gen, zerolog.Nop()) - fiRetriever := da.NewForcedInclusionRetriever(client, zerolog.Nop(), gen.DAStartHeight, gen.DAEpochForcedInclusion) + fiRetriever := da.NewForcedInclusionRetriever(client, zerolog.Nop(), config.DefaultConfig(), gen.DAStartHeight, gen.DAEpochForcedInclusion) + defer fiRetriever.Stop() s := NewSyncer( st, @@ -907,7 +914,8 @@ func TestVerifyForcedInclusionTxs_MaliciousAfterEpochEnd(t *testing.T) { client.On("GetForcedInclusionNamespace").Return([]byte(cfg.DA.ForcedInclusionNamespace)).Maybe() client.On("HasForcedInclusionNamespace").Return(true).Maybe() daRetriever := NewDARetriever(client, cm, gen, zerolog.Nop()) - fiRetriever := da.NewForcedInclusionRetriever(client, zerolog.Nop(), gen.DAStartHeight, gen.DAEpochForcedInclusion) + fiRetriever := da.NewForcedInclusionRetriever(client, zerolog.Nop(), config.DefaultConfig(), gen.DAStartHeight, gen.DAEpochForcedInclusion) + defer fiRetriever.Stop() s := NewSyncer( st, @@ -996,7 +1004,8 @@ func TestVerifyForcedInclusionTxs_SmoothingExceedsEpoch(t *testing.T) { client.On("HasForcedInclusionNamespace").Return(true).Maybe() daRetriever := NewDARetriever(client, cm, gen, zerolog.Nop()) - fiRetriever := da.NewForcedInclusionRetriever(client, zerolog.Nop(), gen.DAStartHeight, gen.DAEpochForcedInclusion) + fiRetriever := da.NewForcedInclusionRetriever(client, zerolog.Nop(), config.DefaultConfig(), gen.DAStartHeight, gen.DAEpochForcedInclusion) + defer fiRetriever.Stop() s := NewSyncer( st, diff --git a/block/public.go b/block/public.go index 61a1e068a..1e3e0de07 100644 --- a/block/public.go +++ b/block/public.go @@ -67,14 +67,17 @@ type ForcedInclusionEvent = da.ForcedInclusionEvent // ForcedInclusionRetriever defines the interface for retrieving forced inclusion transactions from DA type ForcedInclusionRetriever interface { - RetrieveForcedIncludedTxs(ctx context.Context, daHeight uint64) (*da.ForcedInclusionEvent, error) + RetrieveForcedIncludedTxs(ctx context.Context, daHeight uint64) (*ForcedInclusionEvent, error) + Stop() } -// NewForcedInclusionRetriever creates a new forced inclusion retriever +// NewForcedInclusionRetriever creates a new forced inclusion retriever. +// It internally creates and manages an AsyncBlockRetriever for background prefetching. func NewForcedInclusionRetriever( client DAClient, + cfg config.Config, logger zerolog.Logger, daStartHeight, daEpochSize uint64, ) ForcedInclusionRetriever { - return da.NewForcedInclusionRetriever(client, logger, daStartHeight, daEpochSize) + return da.NewForcedInclusionRetriever(client, logger, cfg, daStartHeight, daEpochSize) } diff --git a/pkg/p2p/utils_test.go b/pkg/p2p/utils_test.go index dadb151c7..5bd5664dd 100644 --- a/pkg/p2p/utils_test.go +++ b/pkg/p2p/utils_test.go @@ -113,13 +113,19 @@ func startTestNetwork(ctx context.Context, t *testing.T, n int, conf map[int]hos nodeKey, err := key.GenerateNodeKey() require.NoError(err) + // Use chainID from conf if specified, otherwise default to "test-chain" + chainID := "test-chain" + if descr, ok := conf[i]; ok && descr.chainID != "" { + chainID = descr.chainID + } + client, err := NewClient( config.P2PConfig{ Peers: seeds[i], }, nodeKey.PrivKey, sync.MutexWrap(datastore.NewMapDatastore()), - "test-chain", + chainID, logger, NopMetrics(), ) diff --git a/pkg/sequencers/based/sequencer.go b/pkg/sequencers/based/sequencer.go index feaca53f5..1b6cd519e 100644 --- a/pkg/sequencers/based/sequencer.go +++ b/pkg/sequencers/based/sequencer.go @@ -12,6 +12,7 @@ import ( "github.com/evstack/ev-node/block" coresequencer "github.com/evstack/ev-node/core/sequencer" + "github.com/evstack/ev-node/pkg/config" datypes "github.com/evstack/ev-node/pkg/da/types" "github.com/evstack/ev-node/pkg/genesis" seqcommon "github.com/evstack/ev-node/pkg/sequencers/common" @@ -38,7 +39,8 @@ type BasedSequencer struct { // NewBasedSequencer creates a new based sequencer instance func NewBasedSequencer( - fiRetriever block.ForcedInclusionRetriever, + daClient block.FullDAClient, + cfg config.Config, db ds.Batching, genesis genesis.Genesis, logger zerolog.Logger, @@ -46,7 +48,6 @@ func NewBasedSequencer( bs := &BasedSequencer{ logger: logger.With().Str("component", "based_sequencer").Logger(), checkpointStore: seqcommon.NewCheckpointStore(db, ds.NewKey("/based/checkpoint")), - fiRetriever: fiRetriever, } // based sequencers need community consensus about the da start height given no submission are done bs.SetDAHeight(genesis.DAStartHeight) @@ -78,6 +79,8 @@ func NewBasedSequencer( } } + bs.fiRetriever = block.NewForcedInclusionRetriever(daClient, cfg, logger, genesis.DAStartHeight, genesis.DAEpochForcedInclusion) + return bs, nil } diff --git a/pkg/sequencers/based/sequencer_test.go b/pkg/sequencers/based/sequencer_test.go index 60525f09c..493694227 100644 --- a/pkg/sequencers/based/sequencer_test.go +++ b/pkg/sequencers/based/sequencer_test.go @@ -14,37 +14,45 @@ import ( "github.com/evstack/ev-node/block" coresequencer "github.com/evstack/ev-node/core/sequencer" + "github.com/evstack/ev-node/pkg/config" datypes "github.com/evstack/ev-node/pkg/da/types" "github.com/evstack/ev-node/pkg/genesis" + "github.com/evstack/ev-node/pkg/sequencers/common" + "github.com/evstack/ev-node/test/mocks" ) -// MockForcedInclusionRetriever is a mock implementation of ForcedInclusionRetriever for testing -type MockForcedInclusionRetriever struct { - mock.Mock +// MockFullDAClient combines MockClient and MockVerifier to implement FullDAClient +type MockFullDAClient struct { + *mocks.MockClient + *mocks.MockVerifier } -func (m *MockForcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context, daHeight uint64) (*block.ForcedInclusionEvent, error) { - args := m.Called(ctx, daHeight) - if args.Get(0) == nil { - return nil, args.Error(1) - } - return args.Get(0).(*block.ForcedInclusionEvent), args.Error(1) -} - -// createTestSequencer is a helper function to create a sequencer for testing -func createTestSequencer(t *testing.T, mockRetriever *MockForcedInclusionRetriever, gen genesis.Genesis) *BasedSequencer { +func createTestSequencer(t *testing.T, mockRetriever *common.MockForcedInclusionRetriever, gen genesis.Genesis) *BasedSequencer { t.Helper() // Create in-memory datastore db := syncds.MutexWrap(ds.NewMapDatastore()) - seq, err := NewBasedSequencer(mockRetriever, db, gen, zerolog.Nop()) + // Create mock DA client + mockDAClient := &MockFullDAClient{ + MockClient: mocks.NewMockClient(t), + MockVerifier: mocks.NewMockVerifier(t), + } + // Mock the forced inclusion namespace call + mockDAClient.MockClient.On("GetForcedInclusionNamespace").Return([]byte("test-forced-inclusion-ns")).Maybe() + mockDAClient.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + + seq, err := NewBasedSequencer(mockDAClient, config.DefaultConfig(), db, gen, zerolog.Nop()) require.NoError(t, err) + + // Replace the fiRetriever with our mock so tests work as before + seq.fiRetriever = mockRetriever + return seq } func TestBasedSequencer_SubmitBatchTxs(t *testing.T) { - mockRetriever := new(MockForcedInclusionRetriever) + mockRetriever := common.NewMockForcedInclusionRetriever(t) gen := genesis.Genesis{ ChainID: "test-chain", DAEpochForcedInclusion: 10, @@ -71,7 +79,7 @@ func TestBasedSequencer_SubmitBatchTxs(t *testing.T) { func TestBasedSequencer_GetNextBatch_WithForcedTxs(t *testing.T) { testBlobs := [][]byte{[]byte("tx1"), []byte("tx2")} - mockRetriever := new(MockForcedInclusionRetriever) + mockRetriever := common.NewMockForcedInclusionRetriever(t) mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, uint64(100)).Return(&block.ForcedInclusionEvent{ Txs: testBlobs, StartDaHeight: 100, @@ -107,7 +115,7 @@ func TestBasedSequencer_GetNextBatch_WithForcedTxs(t *testing.T) { } func TestBasedSequencer_GetNextBatch_EmptyDA(t *testing.T) { - mockRetriever := new(MockForcedInclusionRetriever) + mockRetriever := common.NewMockForcedInclusionRetriever(t) mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, uint64(100)).Return(&block.ForcedInclusionEvent{ Txs: [][]byte{}, StartDaHeight: 100, @@ -138,7 +146,7 @@ func TestBasedSequencer_GetNextBatch_EmptyDA(t *testing.T) { } func TestBasedSequencer_GetNextBatch_NotConfigured(t *testing.T) { - mockRetriever := new(MockForcedInclusionRetriever) + mockRetriever := common.NewMockForcedInclusionRetriever(t) mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, uint64(100)).Return(nil, block.ErrForceInclusionNotConfigured) gen := genesis.Genesis{ @@ -169,7 +177,7 @@ func TestBasedSequencer_GetNextBatch_WithMaxBytes(t *testing.T) { tx3 := make([]byte, 200) testBlobs := [][]byte{tx1, tx2, tx3} - mockRetriever := new(MockForcedInclusionRetriever) + mockRetriever := common.NewMockForcedInclusionRetriever(t) mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, uint64(100)).Return(&block.ForcedInclusionEvent{ Txs: testBlobs, StartDaHeight: 100, @@ -223,7 +231,7 @@ func TestBasedSequencer_GetNextBatch_MultipleDABlocks(t *testing.T) { testBlobs1 := [][]byte{[]byte("tx1"), []byte("tx2")} testBlobs2 := [][]byte{[]byte("tx3"), []byte("tx4")} - mockRetriever := new(MockForcedInclusionRetriever) + mockRetriever := common.NewMockForcedInclusionRetriever(t) // First DA block mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, uint64(100)).Return(&block.ForcedInclusionEvent{ Txs: testBlobs1, @@ -274,7 +282,7 @@ func TestBasedSequencer_GetNextBatch_MultipleDABlocks(t *testing.T) { func TestBasedSequencer_GetNextBatch_ResumesFromCheckpoint(t *testing.T) { testBlobs := [][]byte{[]byte("tx1"), []byte("tx2"), []byte("tx3")} - mockRetriever := new(MockForcedInclusionRetriever) + mockRetriever := common.NewMockForcedInclusionRetriever(t) gen := genesis.Genesis{ ChainID: "test-chain", @@ -312,7 +320,7 @@ func TestBasedSequencer_GetNextBatch_ForcedInclusionExceedsMaxBytes(t *testing.T largeTx := make([]byte, 2000) testBlobs := [][]byte{largeTx} - mockRetriever := new(MockForcedInclusionRetriever) + mockRetriever := common.NewMockForcedInclusionRetriever(t) mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, uint64(100)).Return(&block.ForcedInclusionEvent{ Txs: testBlobs, StartDaHeight: 100, @@ -343,7 +351,7 @@ func TestBasedSequencer_GetNextBatch_ForcedInclusionExceedsMaxBytes(t *testing.T } func TestBasedSequencer_VerifyBatch(t *testing.T) { - mockRetriever := new(MockForcedInclusionRetriever) + mockRetriever := common.NewMockForcedInclusionRetriever(t) gen := genesis.Genesis{ ChainID: "test-chain", DAEpochForcedInclusion: 10, @@ -364,7 +372,7 @@ func TestBasedSequencer_VerifyBatch(t *testing.T) { } func TestBasedSequencer_SetDAHeight(t *testing.T) { - mockRetriever := new(MockForcedInclusionRetriever) + mockRetriever := common.NewMockForcedInclusionRetriever(t) gen := genesis.Genesis{ ChainID: "test-chain", DAStartHeight: 100, @@ -382,7 +390,7 @@ func TestBasedSequencer_SetDAHeight(t *testing.T) { } func TestBasedSequencer_GetNextBatch_ErrorHandling(t *testing.T) { - mockRetriever := new(MockForcedInclusionRetriever) + mockRetriever := common.NewMockForcedInclusionRetriever(t) mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, uint64(100)).Return(nil, block.ErrForceInclusionNotConfigured) gen := genesis.Genesis{ @@ -407,7 +415,7 @@ func TestBasedSequencer_GetNextBatch_ErrorHandling(t *testing.T) { } func TestBasedSequencer_GetNextBatch_HeightFromFuture(t *testing.T) { - mockRetriever := new(MockForcedInclusionRetriever) + mockRetriever := common.NewMockForcedInclusionRetriever(t) mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, uint64(100)).Return(nil, datypes.ErrHeightFromFuture) gen := genesis.Genesis{ @@ -438,7 +446,7 @@ func TestBasedSequencer_GetNextBatch_HeightFromFuture(t *testing.T) { func TestBasedSequencer_CheckpointPersistence(t *testing.T) { testBlobs := [][]byte{[]byte("tx1"), []byte("tx2")} - mockRetriever := new(MockForcedInclusionRetriever) + mockRetriever := common.NewMockForcedInclusionRetriever(t) mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, uint64(100)).Return(&block.ForcedInclusionEvent{ Txs: testBlobs, StartDaHeight: 100, @@ -454,10 +462,21 @@ func TestBasedSequencer_CheckpointPersistence(t *testing.T) { // Create persistent datastore db := syncds.MutexWrap(ds.NewMapDatastore()) + // Create mock DA client + mockDAClient := &MockFullDAClient{ + MockClient: mocks.NewMockClient(t), + MockVerifier: mocks.NewMockVerifier(t), + } + mockDAClient.MockClient.On("GetForcedInclusionNamespace").Return([]byte("test-forced-inclusion-ns")).Maybe() + mockDAClient.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + // Create first sequencer - seq1, err := NewBasedSequencer(mockRetriever, db, gen, zerolog.Nop()) + seq1, err := NewBasedSequencer(mockDAClient, config.DefaultConfig(), db, gen, zerolog.Nop()) require.NoError(t, err) + // Replace the fiRetriever with our mock so tests work as before + seq1.fiRetriever = mockRetriever + req := coresequencer.GetNextBatchRequest{ MaxBytes: 1000000, LastBatchData: nil, @@ -470,9 +489,18 @@ func TestBasedSequencer_CheckpointPersistence(t *testing.T) { assert.Equal(t, 2, len(resp.Batch.Transactions)) // Create a new sequencer with the same datastore (simulating restart) - seq2, err := NewBasedSequencer(mockRetriever, db, gen, zerolog.Nop()) + mockDAClient2 := &MockFullDAClient{ + MockClient: mocks.NewMockClient(t), + MockVerifier: mocks.NewMockVerifier(t), + } + mockDAClient2.MockClient.On("GetForcedInclusionNamespace").Return([]byte("test-forced-inclusion-ns")).Maybe() + mockDAClient2.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + seq2, err := NewBasedSequencer(mockDAClient2, config.DefaultConfig(), db, gen, zerolog.Nop()) require.NoError(t, err) + // Replace the fiRetriever with our mock so tests work as before + seq2.fiRetriever = mockRetriever + // Checkpoint should be loaded from DB assert.Equal(t, uint64(101), seq2.checkpoint.DAHeight) assert.Equal(t, uint64(0), seq2.checkpoint.TxIndex) @@ -481,7 +509,7 @@ func TestBasedSequencer_CheckpointPersistence(t *testing.T) { } func TestBasedSequencer_GetNextBatch_EmptyDABatch_IncreasesDAHeight(t *testing.T) { - mockRetriever := new(MockForcedInclusionRetriever) + mockRetriever := common.NewMockForcedInclusionRetriever(t) // First DA block returns empty transactions mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, uint64(100)).Return(&block.ForcedInclusionEvent{ @@ -548,7 +576,7 @@ func TestBasedSequencer_GetNextBatch_TimestampAdjustment(t *testing.T) { testBlobs := [][]byte{[]byte("tx1"), []byte("tx2"), []byte("tx3")} daEndTime := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC) - mockRetriever := new(MockForcedInclusionRetriever) + mockRetriever := common.NewMockForcedInclusionRetriever(t) mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, uint64(100)).Return(&block.ForcedInclusionEvent{ Txs: testBlobs, StartDaHeight: 100, @@ -590,7 +618,7 @@ func TestBasedSequencer_GetNextBatch_TimestampAdjustment_PartialBatch(t *testing testBlobs := [][]byte{tx1, tx2, tx3} daEndTime := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC) - mockRetriever := new(MockForcedInclusionRetriever) + mockRetriever := common.NewMockForcedInclusionRetriever(t) mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, uint64(100)).Return(&block.ForcedInclusionEvent{ Txs: testBlobs, StartDaHeight: 100, @@ -646,7 +674,7 @@ func TestBasedSequencer_GetNextBatch_TimestampAdjustment_EmptyBatch(t *testing.T // Test that timestamp is zero when batch is empty daEndTime := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC) - mockRetriever := new(MockForcedInclusionRetriever) + mockRetriever := common.NewMockForcedInclusionRetriever(t) mockRetriever.On("RetrieveForcedIncludedTxs", mock.Anything, uint64(100)).Return(&block.ForcedInclusionEvent{ Txs: [][]byte{}, StartDaHeight: 100, diff --git a/pkg/sequencers/common/forced_inclusion_retriever_mock.go b/pkg/sequencers/common/forced_inclusion_retriever_mock.go new file mode 100644 index 000000000..c90b84f7f --- /dev/null +++ b/pkg/sequencers/common/forced_inclusion_retriever_mock.go @@ -0,0 +1,140 @@ +// Code generated by mockery; DO NOT EDIT. +// github.com/vektra/mockery +// template: testify + +package common + +import ( + "context" + + "github.com/evstack/ev-node/block" + mock "github.com/stretchr/testify/mock" +) + +// NewMockForcedInclusionRetriever creates a new instance of MockForcedInclusionRetriever. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockForcedInclusionRetriever(t interface { + mock.TestingT + Cleanup(func()) +}) *MockForcedInclusionRetriever { + mock := &MockForcedInclusionRetriever{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} + +// MockForcedInclusionRetriever is an autogenerated mock type for the ForcedInclusionRetriever type +type MockForcedInclusionRetriever struct { + mock.Mock +} + +type MockForcedInclusionRetriever_Expecter struct { + mock *mock.Mock +} + +func (_m *MockForcedInclusionRetriever) EXPECT() *MockForcedInclusionRetriever_Expecter { + return &MockForcedInclusionRetriever_Expecter{mock: &_m.Mock} +} + +// RetrieveForcedIncludedTxs provides a mock function for the type MockForcedInclusionRetriever +func (_mock *MockForcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context, daHeight uint64) (*block.ForcedInclusionEvent, error) { + ret := _mock.Called(ctx, daHeight) + + if len(ret) == 0 { + panic("no return value specified for RetrieveForcedIncludedTxs") + } + + var r0 *block.ForcedInclusionEvent + var r1 error + if returnFunc, ok := ret.Get(0).(func(context.Context, uint64) (*block.ForcedInclusionEvent, error)); ok { + return returnFunc(ctx, daHeight) + } + if returnFunc, ok := ret.Get(0).(func(context.Context, uint64) *block.ForcedInclusionEvent); ok { + r0 = returnFunc(ctx, daHeight) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*block.ForcedInclusionEvent) + } + } + if returnFunc, ok := ret.Get(1).(func(context.Context, uint64) error); ok { + r1 = returnFunc(ctx, daHeight) + } else { + r1 = ret.Error(1) + } + return r0, r1 +} + +// MockForcedInclusionRetriever_RetrieveForcedIncludedTxs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RetrieveForcedIncludedTxs' +type MockForcedInclusionRetriever_RetrieveForcedIncludedTxs_Call struct { + *mock.Call +} + +// RetrieveForcedIncludedTxs is a helper method to define mock.On call +// - ctx context.Context +// - daHeight uint64 +func (_e *MockForcedInclusionRetriever_Expecter) RetrieveForcedIncludedTxs(ctx interface{}, daHeight interface{}) *MockForcedInclusionRetriever_RetrieveForcedIncludedTxs_Call { + return &MockForcedInclusionRetriever_RetrieveForcedIncludedTxs_Call{Call: _e.mock.On("RetrieveForcedIncludedTxs", ctx, daHeight)} +} + +func (_c *MockForcedInclusionRetriever_RetrieveForcedIncludedTxs_Call) Run(run func(ctx context.Context, daHeight uint64)) *MockForcedInclusionRetriever_RetrieveForcedIncludedTxs_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 uint64 + if args[1] != nil { + arg1 = args[1].(uint64) + } + run( + arg0, + arg1, + ) + }) + return _c +} + +func (_c *MockForcedInclusionRetriever_RetrieveForcedIncludedTxs_Call) Return(v *block.ForcedInclusionEvent, err error) *MockForcedInclusionRetriever_RetrieveForcedIncludedTxs_Call { + _c.Call.Return(v, err) + return _c +} + +func (_c *MockForcedInclusionRetriever_RetrieveForcedIncludedTxs_Call) RunAndReturn(run func(ctx context.Context, daHeight uint64) (*block.ForcedInclusionEvent, error)) *MockForcedInclusionRetriever_RetrieveForcedIncludedTxs_Call { + _c.Call.Return(run) + return _c +} + +// Stop provides a mock function for the type MockForcedInclusionRetriever +func (_mock *MockForcedInclusionRetriever) Stop() { + _mock.Called() + return +} + +// MockForcedInclusionRetriever_Stop_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Stop' +type MockForcedInclusionRetriever_Stop_Call struct { + *mock.Call +} + +// Stop is a helper method to define mock.On call +func (_e *MockForcedInclusionRetriever_Expecter) Stop() *MockForcedInclusionRetriever_Stop_Call { + return &MockForcedInclusionRetriever_Stop_Call{Call: _e.mock.On("Stop")} +} + +func (_c *MockForcedInclusionRetriever_Stop_Call) Run(run func()) *MockForcedInclusionRetriever_Stop_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockForcedInclusionRetriever_Stop_Call) Return() *MockForcedInclusionRetriever_Stop_Call { + _c.Call.Return() + return _c +} + +func (_c *MockForcedInclusionRetriever_Stop_Call) RunAndReturn(run func()) *MockForcedInclusionRetriever_Stop_Call { + _c.Run(run) + return _c +} diff --git a/pkg/sequencers/single/sequencer.go b/pkg/sequencers/single/sequencer.go index 45baf17cb..07c32c835 100644 --- a/pkg/sequencers/single/sequencer.go +++ b/pkg/sequencers/single/sequencer.go @@ -16,6 +16,7 @@ import ( "github.com/evstack/ev-node/block" coresequencer "github.com/evstack/ev-node/core/sequencer" + "github.com/evstack/ev-node/pkg/config" datypes "github.com/evstack/ev-node/pkg/da/types" "github.com/evstack/ev-node/pkg/genesis" seqcommon "github.com/evstack/ev-node/pkg/sequencers/common" @@ -32,6 +33,7 @@ type Sequencer struct { logger zerolog.Logger genesis genesis.Genesis db ds.Batching + cfg config.Config Id []byte daClient block.FullDAClient @@ -55,8 +57,8 @@ func NewSequencer( logger zerolog.Logger, db ds.Batching, daClient block.FullDAClient, + cfg config.Config, id []byte, - batchTime time.Duration, maxQueueSize int, genesis genesis.Genesis, ) (*Sequencer, error) { @@ -64,7 +66,8 @@ func NewSequencer( db: db, logger: logger, daClient: daClient, - batchTime: batchTime, + cfg: cfg, + batchTime: cfg.Node.BlockTime.Duration, Id: id, queue: NewBatchQueue(db, "batches", maxQueueSize), checkpointStore: seqcommon.NewCheckpointStore(db, ds.NewKey("/single/checkpoint")), @@ -90,8 +93,6 @@ func NewSequencer( DAHeight: s.GetDAHeight(), TxIndex: 0, } - - s.fiRetriever = block.NewForcedInclusionRetriever(daClient, logger, s.GetDAHeight(), genesis.DAEpochForcedInclusion) } else { return nil, fmt.Errorf("failed to load checkpoint from DB: %w", err) } @@ -105,10 +106,13 @@ func NewSequencer( Uint64("da_height", checkpoint.DAHeight). Msg("resuming from checkpoint within DA epoch") } - - s.fiRetriever = block.NewForcedInclusionRetriever(daClient, logger, s.getInitialDAStartHeight(context.Background()), genesis.DAEpochForcedInclusion) } + // Determine initial DA height for forced inclusion + initialDAHeight := s.getInitialDAStartHeight(context.Background()) + + s.fiRetriever = block.NewForcedInclusionRetriever(daClient, cfg, logger, initialDAHeight, genesis.DAEpochForcedInclusion) + return s, nil } @@ -175,7 +179,12 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB TxIndex: 0, } - c.fiRetriever = block.NewForcedInclusionRetriever(c.daClient, c.logger, c.getInitialDAStartHeight(ctx), c.genesis.DAEpochForcedInclusion) + // override forced inclusion retriever, as the da start height have been updated + // Stop the old retriever first + if c.fiRetriever != nil { + c.fiRetriever.Stop() + } + c.fiRetriever = block.NewForcedInclusionRetriever(c.daClient, c.cfg, c.logger, c.getInitialDAStartHeight(ctx), c.genesis.DAEpochForcedInclusion) } // If we have no cached transactions or we've consumed all from the current cache, diff --git a/pkg/sequencers/single/sequencer_test.go b/pkg/sequencers/single/sequencer_test.go index 3c0afdbf5..b73d49bfd 100644 --- a/pkg/sequencers/single/sequencer_test.go +++ b/pkg/sequencers/single/sequencer_test.go @@ -15,35 +15,23 @@ import ( "github.com/evstack/ev-node/block" coresequencer "github.com/evstack/ev-node/core/sequencer" + "github.com/evstack/ev-node/pkg/config" datypes "github.com/evstack/ev-node/pkg/da/types" "github.com/evstack/ev-node/pkg/genesis" - damocks "github.com/evstack/ev-node/test/mocks" + "github.com/evstack/ev-node/test/mocks" "github.com/evstack/ev-node/test/testda" ) -// MockForcedInclusionRetriever is a mock implementation of DARetriever for testing -type MockForcedInclusionRetriever struct { - mock.Mock -} - -func (m *MockForcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context, daHeight uint64) (*block.ForcedInclusionEvent, error) { - args := m.Called(ctx, daHeight) - if args.Get(0) == nil { - return nil, args.Error(1) - } - return args.Get(0).(*block.ForcedInclusionEvent), args.Error(1) -} - // MockFullDAClient combines MockClient and MockVerifier to implement FullDAClient type MockFullDAClient struct { - *damocks.MockClient - *damocks.MockVerifier + *mocks.MockClient + *mocks.MockVerifier } func newMockFullDAClient(t *testing.T) *MockFullDAClient { return &MockFullDAClient{ - MockClient: damocks.NewMockClient(t), - MockVerifier: damocks.NewMockVerifier(t), + MockClient: mocks.NewMockClient(t), + MockVerifier: mocks.NewMockVerifier(t), } } @@ -64,8 +52,8 @@ func newTestSequencer(t *testing.T, db ds.Batching, daClient block.FullDAClient) logger, db, daClient, + config.DefaultConfig(), []byte("test"), - 1*time.Second, 0, // unlimited queue gen, ) @@ -78,7 +66,7 @@ func TestSequencer_SubmitBatchTxs(t *testing.T) { db := ds.NewMapDatastore() Id := []byte("test1") logger := zerolog.Nop() - seq, err := NewSequencer(logger, db, dummyDA, Id, 10*time.Second, 1000, genesis.Genesis{}) + seq, err := NewSequencer(logger, db, dummyDA, config.DefaultConfig(), Id, 1000, genesis.Genesis{}) if err != nil { t.Fatalf("Failed to create sequencer: %v", err) } @@ -127,7 +115,7 @@ func TestSequencer_SubmitBatchTxs_EmptyBatch(t *testing.T) { db := ds.NewMapDatastore() Id := []byte("test1") logger := zerolog.Nop() - seq, err := NewSequencer(logger, db, dummyDA, Id, 10*time.Second, 1000, genesis.Genesis{}) + seq, err := NewSequencer(logger, db, dummyDA, config.DefaultConfig(), Id, 1000, genesis.Genesis{}) require.NoError(t, err, "Failed to create sequencer") defer func() { err := db.Close() @@ -178,8 +166,8 @@ func TestSequencer_GetNextBatch_NoLastBatch(t *testing.T) { logger, db, dummyDA, + config.DefaultConfig(), []byte("test"), - 1*time.Second, 0, // unlimited queue gen, ) @@ -298,7 +286,7 @@ func TestSequencer_GetNextBatch_BeforeDASubmission(t *testing.T) { dummyDA := newDummyDA(100_000_000) db := ds.NewMapDatastore() logger := zerolog.Nop() - seq, err := NewSequencer(logger, db, dummyDA, []byte("test1"), 1*time.Second, 1000, genesis.Genesis{}) + seq, err := NewSequencer(logger, db, dummyDA, config.DefaultConfig(), []byte("test1"), 1000, genesis.Genesis{}) if err != nil { t.Fatalf("Failed to create sequencer: %v", err) } @@ -376,8 +364,8 @@ func TestSequencer_GetNextBatch_ForcedInclusionAndBatch_MaxBytes(t *testing.T) { logger, db, mockDA, + config.DefaultConfig(), []byte("test-chain"), - 1*time.Second, 100, gen, ) @@ -469,8 +457,8 @@ func TestSequencer_GetNextBatch_ForcedInclusion_ExceedsMaxBytes(t *testing.T) { logger, db, mockDA, + config.DefaultConfig(), []byte("test-chain"), - 1*time.Second, 100, gen, ) @@ -544,8 +532,8 @@ func TestSequencer_GetNextBatch_AlwaysCheckPendingForcedInclusion(t *testing.T) logger, db, mockDA, + config.DefaultConfig(), []byte("test-chain"), - 1*time.Second, 100, gen, ) @@ -618,8 +606,8 @@ func TestSequencer_QueueLimit_Integration(t *testing.T) { logger, db, dummyDA, + config.DefaultConfig(), []byte("test"), - time.Second, 2, // Very small limit for testing gen, ) @@ -733,8 +721,8 @@ func TestSequencer_DAFailureAndQueueThrottling_Integration(t *testing.T) { logger, db, dummyDA, + config.DefaultConfig(), []byte("test-chain"), - 100*time.Millisecond, queueSize, genesis.Genesis{}, ) @@ -878,8 +866,8 @@ func TestSequencer_CheckpointPersistence_CrashRecovery(t *testing.T) { logger, db, mockDA, + config.DefaultConfig(), []byte("test-chain"), - 1*time.Second, 100, gen, ) @@ -918,8 +906,8 @@ func TestSequencer_CheckpointPersistence_CrashRecovery(t *testing.T) { logger, db, mockDA, + config.DefaultConfig(), []byte("test-chain"), - 1*time.Second, 100, gen, ) @@ -979,8 +967,8 @@ func TestSequencer_GetNextBatch_EmptyDABatch_IncreasesDAHeight(t *testing.T) { zerolog.Nop(), db, mockDA, + config.DefaultConfig(), []byte("test"), - 1*time.Second, 1000, gen, ) diff --git a/proto/evnode/v1/da.proto b/proto/evnode/v1/da.proto new file mode 100644 index 000000000..1fcd0e112 --- /dev/null +++ b/proto/evnode/v1/da.proto @@ -0,0 +1,11 @@ +syntax = "proto3"; +package evnode.v1; + +option go_package = "github.com/evstack/ev-node/types/pb/evnode/v1"; + +// BlockData contains data retrieved from a single DA height. +message BlockData { + uint64 height = 1; + int64 timestamp = 2; // Unix timestamp in seconds + repeated bytes blobs = 3; +} diff --git a/types/pb/evnode/v1/da.pb.go b/types/pb/evnode/v1/da.pb.go new file mode 100644 index 000000000..a70d235f2 --- /dev/null +++ b/types/pb/evnode/v1/da.pb.go @@ -0,0 +1,141 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.11 +// protoc (unknown) +// source: evnode/v1/da.proto + +package v1 + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" + unsafe "unsafe" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// BlockData contains data retrieved from a single DA height. +type BlockData struct { + state protoimpl.MessageState `protogen:"open.v1"` + Height uint64 `protobuf:"varint,1,opt,name=height,proto3" json:"height,omitempty"` + Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` // Unix timestamp in seconds + Blobs [][]byte `protobuf:"bytes,3,rep,name=blobs,proto3" json:"blobs,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *BlockData) Reset() { + *x = BlockData{} + mi := &file_evnode_v1_da_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *BlockData) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*BlockData) ProtoMessage() {} + +func (x *BlockData) ProtoReflect() protoreflect.Message { + mi := &file_evnode_v1_da_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use BlockData.ProtoReflect.Descriptor instead. +func (*BlockData) Descriptor() ([]byte, []int) { + return file_evnode_v1_da_proto_rawDescGZIP(), []int{0} +} + +func (x *BlockData) GetHeight() uint64 { + if x != nil { + return x.Height + } + return 0 +} + +func (x *BlockData) GetTimestamp() int64 { + if x != nil { + return x.Timestamp + } + return 0 +} + +func (x *BlockData) GetBlobs() [][]byte { + if x != nil { + return x.Blobs + } + return nil +} + +var File_evnode_v1_da_proto protoreflect.FileDescriptor + +const file_evnode_v1_da_proto_rawDesc = "" + + "\n" + + "\x12evnode/v1/da.proto\x12\tevnode.v1\"W\n" + + "\tBlockData\x12\x16\n" + + "\x06height\x18\x01 \x01(\x04R\x06height\x12\x1c\n" + + "\ttimestamp\x18\x02 \x01(\x03R\ttimestamp\x12\x14\n" + + "\x05blobs\x18\x03 \x03(\fR\x05blobsB/Z-github.com/evstack/ev-node/types/pb/evnode/v1b\x06proto3" + +var ( + file_evnode_v1_da_proto_rawDescOnce sync.Once + file_evnode_v1_da_proto_rawDescData []byte +) + +func file_evnode_v1_da_proto_rawDescGZIP() []byte { + file_evnode_v1_da_proto_rawDescOnce.Do(func() { + file_evnode_v1_da_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_evnode_v1_da_proto_rawDesc), len(file_evnode_v1_da_proto_rawDesc))) + }) + return file_evnode_v1_da_proto_rawDescData +} + +var file_evnode_v1_da_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_evnode_v1_da_proto_goTypes = []any{ + (*BlockData)(nil), // 0: evnode.v1.BlockData +} +var file_evnode_v1_da_proto_depIdxs = []int32{ + 0, // [0:0] is the sub-list for method output_type + 0, // [0:0] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_evnode_v1_da_proto_init() } +func file_evnode_v1_da_proto_init() { + if File_evnode_v1_da_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_evnode_v1_da_proto_rawDesc), len(file_evnode_v1_da_proto_rawDesc)), + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_evnode_v1_da_proto_goTypes, + DependencyIndexes: file_evnode_v1_da_proto_depIdxs, + MessageInfos: file_evnode_v1_da_proto_msgTypes, + }.Build() + File_evnode_v1_da_proto = out.File + file_evnode_v1_da_proto_goTypes = nil + file_evnode_v1_da_proto_depIdxs = nil +}