diff --git a/apps/evm/server/force_inclusion_test.go b/apps/evm/server/force_inclusion_test.go index a1ad3059e..280c62712 100644 --- a/apps/evm/server/force_inclusion_test.go +++ b/apps/evm/server/force_inclusion_test.go @@ -18,7 +18,8 @@ import ( // mockDA implements block/internal/da.Client for testing type mockDA struct { - submitFunc func(ctx context.Context, data [][]byte, gasPrice float64, namespace []byte, options []byte) da.ResultSubmit + submitFunc func(ctx context.Context, data [][]byte, gasPrice float64, namespace []byte, options []byte) da.ResultSubmit + subscribeFunc func(ctx context.Context, namespace []byte) (<-chan da.ResultRetrieve, error) } func (m *mockDA) Submit(ctx context.Context, data [][]byte, gasPrice float64, namespace []byte, options []byte) da.ResultSubmit { @@ -29,6 +30,13 @@ func (m *mockDA) Submit(ctx context.Context, data [][]byte, gasPrice float64, na return da.ResultSubmit{BaseResult: da.BaseResult{Code: da.StatusSuccess, Height: 1}} } +func (m *mockDA) Subscribe(ctx context.Context, namespace []byte) (<-chan da.ResultRetrieve, error) { + if m.subscribeFunc != nil { + return m.subscribeFunc(ctx, namespace) + } + return nil, nil +} + func (m *mockDA) Retrieve(ctx context.Context, height uint64, namespace []byte) da.ResultRetrieve { return da.ResultRetrieve{} } diff --git a/block/internal/da/client.go b/block/internal/da/client.go index d2e1d626e..5b4bda474 100644 --- a/block/internal/da/client.go +++ b/block/internal/da/client.go @@ -442,3 +442,61 @@ func (c *client) Validate(ctx context.Context, ids []datypes.ID, proofs []datype return results, nil } + +// Subscribe subscribes to the DA layer for new blobs at the specified namespace. +// It bridges the jsonrpc subscription to the ResultRetrieve channel for internal consumption. +func (c *client) Subscribe(ctx context.Context, namespace []byte) (<-chan datypes.ResultRetrieve, error) { + ns, err := share.NewNamespaceFromBytes(namespace) + if err != nil { + return nil, fmt.Errorf("invalid namespace: %w", err) + } + + subCh, err := c.blobAPI.Internal.Subscribe(ctx, ns) + if err != nil { + return nil, fmt.Errorf("failed to subscribe to blob namespace: %w", err) + } + + outCh := make(chan datypes.ResultRetrieve, 10) + + // Start a goroutine to bridge events + go func() { + defer close(outCh) + + for { + select { + case <-ctx.Done(): + return + case resp, ok := <-subCh: + if !ok { + return + } + if resp == nil { + continue + } + + // Convert Blobs to ResultRetrieve + ids := make([]datypes.ID, len(resp.Blobs)) + data := make([]datypes.Blob, len(resp.Blobs)) + for i, b := range resp.Blobs { + ids[i] = blobrpc.MakeID(resp.Height, b.Commitment) + data[i] = b.Data() + } + + // Ideally we would get the block timestamp here but that would require an extra RPC call. + // For subscription feed, we might use local time or 0 as it's mostly for triggering catchup. + // Using 0 or Now() is a trade-off. Let's use Now() for liveness. + outCh <- datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{ + Code: datypes.StatusSuccess, + IDs: ids, + Height: resp.Height, + //Timestamp: // TODO: set proper value + }, + Data: data, + } + } + } + }() + + return outCh, nil +} diff --git a/block/internal/da/interface.go b/block/internal/da/interface.go index 69c2d18f7..c5a0e6248 100644 --- a/block/internal/da/interface.go +++ b/block/internal/da/interface.go @@ -22,6 +22,9 @@ type Client interface { GetDataNamespace() []byte GetForcedInclusionNamespace() []byte HasForcedInclusionNamespace() bool + + // Subscribe subscribes to the DA layer for new blobs at the specified namespace. + Subscribe(ctx context.Context, namespace []byte) (<-chan datypes.ResultRetrieve, error) } // Verifier defines the interface for DA proof verification operations. diff --git a/block/internal/syncing/da_retriever.go b/block/internal/syncing/da_retriever.go index 1307b3968..130fd304e 100644 --- a/block/internal/syncing/da_retriever.go +++ b/block/internal/syncing/da_retriever.go @@ -5,6 +5,8 @@ import ( "context" "errors" "fmt" + "sort" + "sync" "github.com/rs/zerolog" "google.golang.org/protobuf/proto" @@ -21,6 +23,7 @@ import ( // DARetriever defines the interface for retrieving events from the DA layer type DARetriever interface { RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) + Subscribe(ctx context.Context, ch chan common.DAHeightEvent) error } // daRetriever handles DA retrieval operations for syncing @@ -34,6 +37,7 @@ type daRetriever struct { // on restart, will be refetch as da height is updated by syncer pendingHeaders map[uint64]*types.SignedHeader pendingData map[uint64]*types.Data + mu sync.Mutex // strictMode indicates if the node has seen a valid DAHeaderEnvelope // and should now reject all legacy/unsigned headers. @@ -75,6 +79,70 @@ func (r *daRetriever) RetrieveFromDA(ctx context.Context, daHeight uint64) ([]co return r.processBlobs(ctx, blobsResp.Data, daHeight), nil } +// Subscribe subscribes to specific DA namespace +func (r *daRetriever) Subscribe(ctx context.Context, outCh chan common.DAHeightEvent) error { + subChHeader, err := r.client.Subscribe(ctx, r.client.GetHeaderNamespace()) + if err != nil { + return fmt.Errorf("subscribe to headers: %w", err) + } + + var subChData <-chan datypes.ResultRetrieve + if !bytes.Equal(r.client.GetHeaderNamespace(), r.client.GetDataNamespace()) { + var err error + subChData, err = r.client.Subscribe(ctx, r.client.GetDataNamespace()) + if err != nil { + return fmt.Errorf("subscribe to data: %w", err) + } + } + + go func() { + defer close(outCh) + for { + var blobs [][]byte + var height uint64 + var errCode datypes.StatusCode + + select { + case <-ctx.Done(): + return + case res, ok := <-subChHeader: + if !ok { + return + } + blobs = res.Data + height = res.Height + errCode = res.Code + case res, ok := <-subChData: + if subChData == nil { + continue + } + if !ok { + return + } + blobs = res.Data + height = res.Height + errCode = res.Code + } + + if errCode != datypes.StatusSuccess { + r.logger.Error().Uint64("code", uint64(errCode)).Msg("subscription error") + continue + } + + events := r.processBlobs(ctx, blobs, height) + for _, ev := range events { + select { + case <-ctx.Done(): + return + case outCh <- ev: + } + } + } + }() + + return nil +} + // fetchBlobs retrieves blobs from both header and data namespaces func (r *daRetriever) fetchBlobs(ctx context.Context, daHeight uint64) (datypes.ResultRetrieve, error) { // Retrieve from both namespaces using the DA client @@ -150,6 +218,9 @@ func (r *daRetriever) validateBlobResponse(res datypes.ResultRetrieve, daHeight // processBlobs processes retrieved blobs to extract headers and data and returns height events func (r *daRetriever) processBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent { + r.mu.Lock() + defer r.mu.Unlock() + // Decode all blobs for _, bz := range blobs { if len(bz) == 0 { @@ -212,18 +283,17 @@ func (r *daRetriever) processBlobs(ctx context.Context, blobs [][]byte, daHeight events = append(events, event) } + // Sort events by height to match execution order + sort.Slice(events, func(i, j int) bool { + if events[i].DaHeight != events[j].DaHeight { + return events[i].DaHeight < events[j].DaHeight + } + return events[i].Header.Height() < events[j].Header.Height() + }) + if len(events) > 0 { startHeight := events[0].Header.Height() - endHeight := events[0].Header.Height() - for _, event := range events { - h := event.Header.Height() - if h < startHeight { - startHeight = h - } - if h > endHeight { - endHeight = h - } - } + endHeight := events[len(events)-1].Header.Height() r.logger.Info().Uint64("da_height", daHeight).Uint64("start_height", startHeight).Uint64("end_height", endHeight).Msg("processed blocks from DA") } diff --git a/block/internal/syncing/da_retriever_mock.go b/block/internal/syncing/da_retriever_mock.go index d94dff4d6..595eaff5c 100644 --- a/block/internal/syncing/da_retriever_mock.go +++ b/block/internal/syncing/da_retriever_mock.go @@ -105,3 +105,60 @@ func (_c *MockDARetriever_RetrieveFromDA_Call) RunAndReturn(run func(ctx context _c.Call.Return(run) return _c } + +// Subscribe provides a mock function for the type MockDARetriever +func (_mock *MockDARetriever) Subscribe(ctx context.Context, ch chan common.DAHeightEvent) error { + ret := _mock.Called(ctx, ch) + + if len(ret) == 0 { + panic("no return value specified for Subscribe") + } + + var r0 error + if returnFunc, ok := ret.Get(0).(func(context.Context, chan common.DAHeightEvent) error); ok { + r0 = returnFunc(ctx, ch) + } else { + r0 = ret.Error(0) + } + return r0 +} + +// MockDARetriever_Subscribe_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Subscribe' +type MockDARetriever_Subscribe_Call struct { + *mock.Call +} + +// Subscribe is a helper method to define mock.On call +// - ctx context.Context +// - ch chan common.DAHeightEvent +func (_e *MockDARetriever_Expecter) Subscribe(ctx interface{}, ch interface{}) *MockDARetriever_Subscribe_Call { + return &MockDARetriever_Subscribe_Call{Call: _e.mock.On("Subscribe", ctx, ch)} +} + +func (_c *MockDARetriever_Subscribe_Call) Run(run func(ctx context.Context, ch chan common.DAHeightEvent)) *MockDARetriever_Subscribe_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 chan common.DAHeightEvent + if args[1] != nil { + arg1 = args[1].(chan common.DAHeightEvent) + } + run( + arg0, + arg1, + ) + }) + return _c +} + +func (_c *MockDARetriever_Subscribe_Call) Return(err error) *MockDARetriever_Subscribe_Call { + _c.Call.Return(err) + return _c +} + +func (_c *MockDARetriever_Subscribe_Call) RunAndReturn(run func(ctx context.Context, ch chan common.DAHeightEvent) error) *MockDARetriever_Subscribe_Call { + _c.Call.Return(run) + return _c +} diff --git a/block/internal/syncing/da_retriever_test.go b/block/internal/syncing/da_retriever_test.go index 8b27513a8..4c7baa5b3 100644 --- a/block/internal/syncing/da_retriever_test.go +++ b/block/internal/syncing/da_retriever_test.go @@ -374,3 +374,32 @@ func Test_isEmptyDataExpected(t *testing.T) { h.DataHash = common.DataHashForEmptyTxs assert.True(t, isEmptyDataExpected(h)) } + +func TestDARetriever_ProcessBlobs_Sorting(t *testing.T) { + addr, pub, signer := buildSyncTestSigner(t) + gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr} + r := newTestDARetriever(t, nil, config.DefaultConfig(), gen) + + // Event A: Block Height 10 + // Event B: Block Height 5 + // Although DaHeight is currently identical for all events in a single processBlobs call, + // this test ensures that the secondary sort key (Block Height) behaves correctly. + + data1Bin, data1 := makeSignedDataBytes(t, gen.ChainID, 10, addr, pub, signer, 1) + data2Bin, data2 := makeSignedDataBytes(t, gen.ChainID, 5, addr, pub, signer, 1) + + hdr1Bin, _ := makeSignedHeaderBytes(t, gen.ChainID, 10, addr, pub, signer, nil, &data1.Data, nil) + hdr2Bin, _ := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, &data2.Data, nil) + + // Process blobs. + daHeight := uint64(100) + // We pass them in mixed order to ensure sorting happens. + events := r.processBlobs(context.Background(), [][]byte{hdr1Bin, data1Bin, hdr2Bin, data2Bin}, daHeight) + + require.Len(t, events, 2) + assert.Equal(t, uint64(5), events[0].Header.Height(), "Events should be sorted by block height asc") + assert.Equal(t, uint64(10), events[1].Header.Height()) + + assert.Equal(t, daHeight, events[0].DaHeight) + assert.Equal(t, daHeight, events[1].DaHeight) +} diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 266bc55e4..48fc3dcf9 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -316,26 +316,89 @@ func (s *Syncer) daWorkerLoop() { s.logger.Info().Msg("starting DA worker") defer s.logger.Info().Msg("DA worker stopped") + backoff := s.config.DA.BlockTime.Duration + if backoff <= 0 { + backoff = 2 * time.Second + } for { - err := s.fetchDAUntilCaughtUp() + // 1. Catch up mode: fetch sequentially until we are up to date + if err := s.fetchDAUntilCaughtUp(); err != nil { + s.logger.Error().Err(err).Msg("DA catchup failed, retrying after backoff") + select { + case <-s.ctx.Done(): + return + case <-time.After(backoff): + } + continue + } - var backoff time.Duration - if err == nil { - // No error, means we are caught up. - backoff = s.config.DA.BlockTime.Duration - } else { - // Error, back off for a shorter duration. - backoff = s.config.DA.BlockTime.Duration - if backoff <= 0 { - backoff = 2 * time.Second + // 2. Follow mode: use subscription to receive new blocks + // If subscription fails or gap is detected, we fall back to catchup mode + if err := s.followDA(); err != nil { + if errors.Is(err, context.Canceled) { + return } + s.logger.Warn().Err(err).Msg("DA follow disrupted") + // We don't need explicit backoff here as we'll switch to catchup immediately, + // checking if there are new blocks to fetch. } + s.logger.Info().Msg("DA follow mode resumed") + } +} + +// followDA subscribes to DA events and processes them until a gap is detected or error occurs +func (s *Syncer) followDA() error { + s.logger.Info().Msg("entering DA follow mode") + ctx, cancel := context.WithCancel(s.ctx) + defer cancel() + subCh := make(chan common.DAHeightEvent, 1) + + if err := s.daRetriever.Subscribe(ctx, subCh); err != nil { + return fmt.Errorf("subscribe to DA: %w", err) + } + + for { select { - case <-s.ctx.Done(): - return - case <-time.After(backoff): + case <-ctx.Done(): + return ctx.Err() + case event, ok := <-subCh: + if !ok { + return errors.New("DA subscription channel closed") + } + + // Calculate expected height + nextExpectedHeight := max(s.daRetrieverHeight.Load(), s.cache.DaHeight()) + + // If we receive an event for a future height (gap), break to trigger catchup + if event.DaHeight > nextExpectedHeight { + s.logger.Info(). + Uint64("event_da_height", event.DaHeight). + Uint64("expected_da_height", nextExpectedHeight). + Msg("gap detected in DA stream, switching to catchup") + return nil + } + + // If event is old/duplicate, log and ignore (or just update height if needed) + if event.DaHeight < nextExpectedHeight { + s.logger.Debug(). + Uint64("event_da_height", event.DaHeight). + Uint64("expected_da_height", nextExpectedHeight). + Msg("received old DA event, ignoring") + continue + } + + // Process event (same conceptual logic as catchup) + select { + case s.heightInCh <- event: + default: + // If channel is full, use cache as backup/buffer + s.cache.SetPendingEvent(event.Header.Height(), &event) + } + + // Update expected height + s.daRetrieverHeight.Store(event.DaHeight + 1) } } } diff --git a/block/internal/syncing/syncer_subscribe_test.go b/block/internal/syncing/syncer_subscribe_test.go new file mode 100644 index 000000000..1f8d09b08 --- /dev/null +++ b/block/internal/syncing/syncer_subscribe_test.go @@ -0,0 +1,220 @@ +package syncing + +import ( + "context" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/evstack/ev-node/block/internal/cache" + "github.com/evstack/ev-node/block/internal/common" + "github.com/evstack/ev-node/pkg/config" + datypes "github.com/evstack/ev-node/pkg/da/types" + "github.com/evstack/ev-node/types" + "github.com/rs/zerolog" +) + +func TestSyncer_DAWorker_CatchupThenFollow(t *testing.T) { + // Setup + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockDARetriever := NewMockDARetriever(t) + + c, err := cache.NewCacheManager(config.DefaultConfig(), zerolog.Nop()) + require.NoError(t, err) + + // Create Syncer (simplified) + syncer := &Syncer{ + logger: zerolog.Nop(), + daRetriever: mockDARetriever, + ctx: ctx, + daRetrieverHeight: &atomic.Uint64{}, // Starts at 0 + cache: c, + config: config.DefaultConfig(), + heightInCh: make(chan common.DAHeightEvent, 100), + wg: sync.WaitGroup{}, + } + syncer.daRetrieverHeight.Store(1) + + // Expectations: + // 1. fetchDAUntilCaughtUp called initially. + // It calls RetrieveFromDA. We Simulate 1 block retrieved, then caught up. + + // Event 1 (Catchup) + evt1 := common.DAHeightEvent{ + Header: &types.SignedHeader{Header: types.Header{BaseHeader: types.BaseHeader{Height: 1}}}, + DaHeight: 1, + } + // Event 2 (Follow via subscription) + evt2 := common.DAHeightEvent{ + Header: &types.SignedHeader{Header: types.Header{BaseHeader: types.BaseHeader{Height: 2}}}, + DaHeight: 2, + } + + // 1. Initial Catchup + // Retrieve height 1 -> succeed + mockDARetriever.On("RetrieveFromDA", mock.Anything, uint64(1)).Return([]common.DAHeightEvent{evt1}, nil).Once() + // Retrieve height 2 -> Future/Caught up -> return HeightFromFuture error to signal caught up + mockDARetriever.On("RetrieveFromDA", mock.Anything, uint64(2)).Return(nil, datypes.ErrHeightFromFuture).Once() + + // 2. Subscribe + subCh := make(chan common.DAHeightEvent, 10) + mockDARetriever.On("Subscribe", mock.Anything).Return((<-chan common.DAHeightEvent)(subCh), nil) + + // Run daWorkerLoop in goroutine + syncer.wg.Add(1) + go syncer.daWorkerLoop() + + // Verify Catchup Event received + select { + case e := <-syncer.heightInCh: + assert.Equal(t, uint64(1), e.DaHeight) + case <-time.After(1 * time.Second): + t.Fatal("timed out waiting for catchup event") + } + + // Now we should be in Follow mode (Subscribe called). + // Feed event 2 to subscription + subCh <- evt2 + + // Verify Follow Event received + select { + case e := <-syncer.heightInCh: + assert.Equal(t, uint64(2), e.DaHeight) + case <-time.After(1 * time.Second): + t.Fatal("timed out waiting for follow event") + } + + // Cleanup + cancel() + syncer.wg.Wait() +} + +func TestSyncer_DAWorker_GapDetection(t *testing.T) { + // Setup + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockDARetriever := NewMockDARetriever(t) + + c, err := cache.NewCacheManager(config.DefaultConfig(), zerolog.Nop()) + require.NoError(t, err) + + syncer := &Syncer{ + logger: zerolog.Nop(), + daRetriever: mockDARetriever, + ctx: ctx, + daRetrieverHeight: &atomic.Uint64{}, + cache: c, + config: config.DefaultConfig(), + heightInCh: make(chan common.DAHeightEvent, 100), + wg: sync.WaitGroup{}, + } + syncer.daRetrieverHeight.Store(10) // Assume we are at height 10 + + // Expectations: + // 1. Initial Catchup -> immediately caught up + mockDARetriever.On("RetrieveFromDA", mock.Anything, uint64(10)).Return(nil, datypes.ErrHeightFromFuture).Once() + + // 2. Subscribe + subCh := make(chan common.DAHeightEvent, 10) + mockDARetriever.On("Subscribe", mock.Anything).Return((<-chan common.DAHeightEvent)(subCh), nil) + + // 3. Gap handling + + evtGap := common.DAHeightEvent{ + Header: &types.SignedHeader{Header: types.Header{BaseHeader: types.BaseHeader{Height: 15}}}, + DaHeight: 15, + } + + evtFill := common.DAHeightEvent{ + Header: &types.SignedHeader{Header: types.Header{BaseHeader: types.BaseHeader{Height: 10}}}, + DaHeight: 10, + } + + // Catchup logic when gap is detected: + // 1. RetrieveFromDA(10) -> returns evtFill + // 2. RetrieveFromDA(11) -> returns HeightFromFuture (caught up again) + mockDARetriever.On("RetrieveFromDA", mock.Anything, uint64(10)).Return([]common.DAHeightEvent{evtFill}, nil).Once() + mockDARetriever.On("RetrieveFromDA", mock.Anything, uint64(11)).Return(nil, datypes.ErrHeightFromFuture).Once() + + // Run daWorkerLoop + syncer.wg.Add(1) + go syncer.daWorkerLoop() + + // Wait for initial catchup (nothing to receive) + + // Trigger Gap by sending event 15 + subCh <- evtGap + + // Verify that we received the "Fill" event (height 10) + select { + case e := <-syncer.heightInCh: + assert.Equal(t, uint64(10), e.DaHeight) + case <-time.After(1 * time.Second): + t.Fatal("timed out waiting for fill event") + } + + // Cleanup + cancel() + syncer.wg.Wait() +} + +func TestSyncer_DAWorker_SplitNamespace_OutOfOrder(t *testing.T) { + // Setup + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockDARetriever := NewMockDARetriever(t) + + c, err := cache.NewCacheManager(config.DefaultConfig(), zerolog.Nop()) + require.NoError(t, err) + + syncer := &Syncer{ + logger: zerolog.Nop(), + daRetriever: mockDARetriever, + ctx: ctx, + daRetrieverHeight: &atomic.Uint64{}, + cache: c, + config: config.DefaultConfig(), + heightInCh: make(chan common.DAHeightEvent, 100), + wg: sync.WaitGroup{}, + } + syncer.daRetrieverHeight.Store(10) + + // Expectations: + // 1. Initial Catchup + mockDARetriever.On("RetrieveFromDA", mock.Anything, uint64(10)).Return(nil, datypes.ErrHeightFromFuture).Once() + + // 2. Subscribe + subCh := make(chan common.DAHeightEvent, 10) + mockDARetriever.On("Subscribe", mock.Anything).Return((<-chan common.DAHeightEvent)(subCh), nil) + + // Run daWorkerLoop + syncer.wg.Add(1) + go syncer.daWorkerLoop() + + // Send an event + evt := common.DAHeightEvent{ + Header: &types.SignedHeader{Header: types.Header{BaseHeader: types.BaseHeader{Height: 10}}}, + DaHeight: 10, + } + subCh <- evt + + // Verify + select { + case e := <-syncer.heightInCh: + assert.Equal(t, uint64(10), e.DaHeight) + case <-time.After(1 * time.Second): + t.Fatal("timed out waiting for event") + } + + cancel() + syncer.wg.Wait() +} diff --git a/test/mocks/da.go b/test/mocks/da.go index 0b5c71a49..4e8beeeb9 100644 --- a/test/mocks/da.go +++ b/test/mocks/da.go @@ -432,6 +432,74 @@ func (_c *MockClient_Submit_Call) RunAndReturn(run func(ctx context.Context, dat return _c } +// Subscribe provides a mock function for the type MockClient +func (_mock *MockClient) Subscribe(ctx context.Context, namespace []byte) (<-chan da.ResultRetrieve, error) { + ret := _mock.Called(ctx, namespace) + + if len(ret) == 0 { + panic("no return value specified for Subscribe") + } + + var r0 <-chan da.ResultRetrieve + var r1 error + if returnFunc, ok := ret.Get(0).(func(context.Context, []byte) (<-chan da.ResultRetrieve, error)); ok { + return returnFunc(ctx, namespace) + } + if returnFunc, ok := ret.Get(0).(func(context.Context, []byte) <-chan da.ResultRetrieve); ok { + r0 = returnFunc(ctx, namespace) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(<-chan da.ResultRetrieve) + } + } + if returnFunc, ok := ret.Get(1).(func(context.Context, []byte) error); ok { + r1 = returnFunc(ctx, namespace) + } else { + r1 = ret.Error(1) + } + return r0, r1 +} + +// MockClient_Subscribe_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Subscribe' +type MockClient_Subscribe_Call struct { + *mock.Call +} + +// Subscribe is a helper method to define mock.On call +// - ctx context.Context +// - namespace []byte +func (_e *MockClient_Expecter) Subscribe(ctx interface{}, namespace interface{}) *MockClient_Subscribe_Call { + return &MockClient_Subscribe_Call{Call: _e.mock.On("Subscribe", ctx, namespace)} +} + +func (_c *MockClient_Subscribe_Call) Run(run func(ctx context.Context, namespace []byte)) *MockClient_Subscribe_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 []byte + if args[1] != nil { + arg1 = args[1].([]byte) + } + run( + arg0, + arg1, + ) + }) + return _c +} + +func (_c *MockClient_Subscribe_Call) Return(resultRetrieveCh <-chan da.ResultRetrieve, err error) *MockClient_Subscribe_Call { + _c.Call.Return(resultRetrieveCh, err) + return _c +} + +func (_c *MockClient_Subscribe_Call) RunAndReturn(run func(ctx context.Context, namespace []byte) (<-chan da.ResultRetrieve, error)) *MockClient_Subscribe_Call { + _c.Call.Return(run) + return _c +} + // NewMockVerifier creates a new instance of MockVerifier. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewMockVerifier(t interface { diff --git a/test/testda/dummy.go b/test/testda/dummy.go index 633bf1cf9..417166d66 100644 --- a/test/testda/dummy.go +++ b/test/testda/dummy.go @@ -40,6 +40,9 @@ type DummyDA struct { tickerMu sync.Mutex tickerStop chan struct{} + + subsMu sync.Mutex + subs []chan datypes.ResultRetrieve } // Option configures a DummyDA instance. @@ -113,6 +116,8 @@ func (d *DummyDA) Submit(_ context.Context, data [][]byte, _ float64, namespace } d.mu.Unlock() + d.notifySubscribers(height, now) + return datypes.ResultSubmit{ BaseResult: datypes.BaseResult{ Code: datypes.StatusSuccess, @@ -240,6 +245,7 @@ func (d *DummyDA) StartHeightTicker(interval time.Duration) func() { } } d.mu.Unlock() + d.notifySubscribers(height, now) case <-stopCh: return } @@ -290,3 +296,47 @@ func (d *DummyDA) GetHeaderByHeight(_ context.Context, height uint64) (*Header, } return header, nil } + +// Subscribe simulates a subscription to the DA layer. +// It returns a channel that receives events when new blocks are produced or blobs are submitted. +func (d *DummyDA) Subscribe(ctx context.Context, _ []byte) (<-chan datypes.ResultRetrieve, error) { + d.subsMu.Lock() + defer d.subsMu.Unlock() + + ch := make(chan datypes.ResultRetrieve, 100) + d.subs = append(d.subs, ch) + + // cleanup on context done + go func() { + <-ctx.Done() + d.subsMu.Lock() + defer d.subsMu.Unlock() + for i, sub := range d.subs { + if sub == ch { + d.subs = append(d.subs[:i], d.subs[i+1:]...) + close(ch) + break + } + } + }() + + return ch, nil +} + +func (d *DummyDA) notifySubscribers(height uint64, timestamp time.Time) { + d.subsMu.Lock() + defer d.subsMu.Unlock() + + for _, sub := range d.subs { + select { + case sub <- datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{ + Code: datypes.StatusSuccess, + Height: height, + Timestamp: timestamp, + }, + }: + default: + } + } +}