From 8a465b9935e28337d6aa82b28edc8551cb2c8145 Mon Sep 17 00:00:00 2001 From: samliok Date: Fri, 13 Mar 2026 16:39:11 -0400 Subject: [PATCH 1/4] non validtors --- api.go | 20 +++ nonvalidator/non_validator.go | 241 ++++++++++++++++++++++++++++ nonvalidator/non_validator_test.go | 248 +++++++++++++++++++++++++++++ nonvalidator/verifier.go | 82 ++++++++++ nonvalidator/verifier_test.go | 151 ++++++++++++++++++ testutil/storage.go | 39 +++++ 6 files changed, 781 insertions(+) create mode 100644 nonvalidator/non_validator.go create mode 100644 nonvalidator/non_validator_test.go create mode 100644 nonvalidator/verifier.go create mode 100644 nonvalidator/verifier_test.go diff --git a/api.go b/api.go index 50fa88e3..622c20c0 100644 --- a/api.go +++ b/api.go @@ -56,6 +56,20 @@ type Storage interface { Index(ctx context.Context, block VerifiedBlock, certificate Finalization) error } +type FullStorage interface { + NumBlocks() uint64 + // Retrieve returns the block and finalization at [seq]. + // If [seq] the block cannot be found, returns ErrBlockNotFound. + Retrieve(seq uint64) (FullBlock, Finalization, error) + Index(ctx context.Context, block FullBlock, certificate Finalization) error +} + +type Retriever interface { + // Retrieve returns the block and finalization at [seq]. + // If [seq] the block cannot be found, returns ErrBlockNotFound. + Retrieve(seq uint64) (FullBlock, Finalization, error) +} + type Communication interface { // Nodes returns all nodes that participate in the epoch. Nodes() []NodeID @@ -102,6 +116,12 @@ type VerifiedBlock interface { Bytes() ([]byte, error) } +// Contains all functions on the block +type FullBlock interface { + VerifiedBlock + Block +} + // BlockDeserializer deserializes blocks according to formatting // enforced by the application. type BlockDeserializer interface { diff --git a/nonvalidator/non_validator.go b/nonvalidator/non_validator.go new file mode 100644 index 00000000..a4b58155 --- /dev/null +++ b/nonvalidator/non_validator.go @@ -0,0 +1,241 @@ +package nonvalidator + +import ( + "bytes" + "context" + "fmt" + + "github.com/ava-labs/simplex" + "go.uber.org/zap" +) + +type finalizedSeq struct { + block simplex.FullBlock + finalization *simplex.Finalization +} + +func (f *finalizedSeq) String() string { + seq := uint64(0) + digest := simplex.Digest{} + if f.block != nil { + seq = f.block.BlockHeader().Seq + digest = f.block.BlockHeader().Digest + } + if f.finalization != nil { + seq = f.finalization.Finalization.Seq + digest = f.finalization.Finalization.Digest + } + + return fmt.Sprintf("FinalizedSeq {BlockDigest: %s, Seq: %d, BlockExists %t, FinalizationExists %t}", digest, seq, f.block != nil, f.finalization != nil) +} + +type epochInfo struct { + validators []simplex.NodeID + epoch uint64 +} + +type Config struct { + Logger simplex.Logger + Storage simplex.FullStorage + GenesisValidators []simplex.NodeID +} + +type NonValidator struct { + Config + ctx context.Context + cancelCtx context.CancelFunc + verifier Verifiier + + incompleteSeqs map[simplex.Digest]*finalizedSeq + + // the most recent epoch that we have + latestEpoch *epochInfo +} + +func NewNonValidator(config Config, lastVerifiedBlock simplex.Block) *NonValidator { + ctx, cancelFunc := context.WithCancel(context.Background()) + verifier := NewVerifier(config.Logger, lastVerifiedBlock, config.Storage) + + latestEpoch := &epochInfo{ + epoch: 0, + validators: config.GenesisValidators, + } + + return &NonValidator{ + Config: config, + incompleteSeqs: make(map[simplex.Digest]*finalizedSeq, 0), + ctx: ctx, + cancelCtx: cancelFunc, + verifier: *verifier, + latestEpoch: latestEpoch, + } +} + +func (n *NonValidator) Start() { + n.broadcastLatestEpoch() +} + +// TODO: Broadcast the last known epoch to bootstrap the node. Collect responses marking the latest seal block +// Keep rebroadcasting requests for that sealing block until we have enough responses to verify. +func (n *NonValidator) broadcastLatestEpoch() { + +} + +func skipMessage(msg *simplex.Message) bool { + switch { + case msg == nil: + return true + case msg.EmptyNotarization != nil: + return true + case msg.VoteMessage != nil: + return true + case msg.EmptyVoteMessage != nil: + return true + case msg.Notarization != nil: + return true + // TODO: I think we want to handle replication messages. + // We can just reuse the message types from simplex and only care about sequences not rounds. + case msg.ReplicationResponse != nil: + return true + case msg.ReplicationRequest != nil: + return true + case msg.FinalizeVote != nil: + return true + default: + return false + } +} + +func (n *NonValidator) HandleMessage(msg *simplex.Message, from simplex.NodeID) error { + switch { + case skipMessage(msg): + return nil + case msg.BlockDigestRequest != nil: + // TODO: send out request to help the network + return nil + case msg.BlockMessage != nil: + // convert to full block + block, ok := msg.BlockMessage.Block.(simplex.FullBlock) + if !ok { + n.Logger.Debug("Unable to convert block message to full block", zap.Stringer("Digest", msg.BlockMessage.Block.BlockHeader().Digest), zap.Stringer("from", from)) + return nil + } + return n.handleBlock(block, from) + case msg.Finalization != nil: + return n.handleFinalization(msg.Finalization, from) + default: + n.Logger.Debug("Received unexpected message", zap.Any("Message", msg), zap.Stringer("from", from)) + } + + return nil +} + +// handleBlock handles a block message. +func (n *NonValidator) handleBlock(block simplex.FullBlock, from simplex.NodeID) error { + bh := block.BlockHeader() + + if n.latestEpoch.epoch != bh.Epoch { + n.Logger.Debug("Received a block not from the current epoch", zap.Uint64("Current epoch", n.latestEpoch.epoch), zap.Uint64("Epoch", bh.Epoch), zap.Stringer("From", from)) + } + + // TODO: do we need this check for non-validators? I added because otherwise we would need to store all blocks received until we receive a finalization + // but this could be a dos since a malicious node can spam us with bad blocks. + if !bytes.Equal(simplex.LeaderForRound(n.latestEpoch.validators, bh.Round), from) { + n.Logger.Debug("Received a block not from the leader", zap.Uint64("Epoch", bh.Epoch), zap.Stringer("From", from)) + return nil + } + + // If we have already verified the block discard it + if n.verifier.alreadyVerifiedSeq(bh.Seq) { + n.Logger.Debug("Received a block from an older round") + return nil + } + + // If we have already received this block + incomplete, ok := n.incompleteSeqs[bh.Digest] + if !ok { + // we have not received anything for this digest + incompleteSeq := &finalizedSeq{ + block: block, + } + n.incompleteSeqs[bh.Digest] = incompleteSeq + n.Logger.Debug("Stored incomplete sequence", zap.Stringer("Stored", incompleteSeq)) + return nil + } + + // Already received this block + if incomplete.block != nil || incomplete.finalization == nil { + return nil + } + + if !bytes.Equal(incomplete.finalization.Finalization.Digest[:], bh.Digest[:]) { + n.Logger.Info( + "Received a block from the leader of a round whose digest mismatches the finalization", + zap.Stringer("Finalization Digest", incomplete.finalization.Finalization.Digest), + zap.Stringer("Block digest", bh.Digest), + zap.Stringer("From", from), + ) + return nil + } + + // index and verify block + return n.indexBlock(block, incomplete.finalization) +} + +func (n *NonValidator) indexBlock(block simplex.FullBlock, finalization *simplex.Finalization) error { + if block == nil || finalization == nil { + return nil + } + + if err := n.Storage.Index(n.ctx, block, *finalization); err != nil { + return err + } + + return n.verifier.triggerVerify(block) +} + +func (n *NonValidator) handleFinalization(finalization *simplex.Finalization, from simplex.NodeID) error { + bh := finalization.Finalization.BlockHeader + + if bh.Epoch < n.latestEpoch.epoch { + n.Logger.Debug("Received a finalization from an earlier epoch", zap.Uint64("Current epoch", n.latestEpoch.epoch), zap.Uint64("Epoch", bh.Epoch), zap.Stringer("From", from)) + // TODO: we can still probably process it? + } + + if bh.Epoch > n.latestEpoch.epoch { + // TODO: begin replication + // if message is from n.checkpointEpoch or less, call index block and mark the message as received to the replicator + // otherwise notify replicator we may need to replicate a future epoch. + } + + // If we have already received this block + incomplete, ok := n.incompleteSeqs[bh.Digest] + if !ok { + // we have not received anything for this digest + incompleteSeq := &finalizedSeq{ + finalization: finalization, + } + n.incompleteSeqs[bh.Digest] = incompleteSeq + n.Logger.Debug("Stored incomplete sequence", zap.Stringer("Stored", incompleteSeq)) + return nil + } + + // Already received this block + if incomplete.block == nil || incomplete.finalization != nil { + return nil + } + + digest := incomplete.block.BlockHeader().Digest + if !bytes.Equal(bh.Digest[:], digest[:]) { + // TODO We probably need to start replication, for this block? + n.Logger.Info( + "Received a block from the leader of a round whose digest mismatches the finalization", + zap.Stringer("Finalization Digest", incomplete.finalization.Finalization.Digest), + zap.Stringer("Block digest", bh.Digest), + zap.Stringer("From", from), + ) + return nil + } + + return n.indexBlock(incomplete.block, finalization) +} diff --git a/nonvalidator/non_validator_test.go b/nonvalidator/non_validator_test.go new file mode 100644 index 00000000..15e2ef6d --- /dev/null +++ b/nonvalidator/non_validator_test.go @@ -0,0 +1,248 @@ +package nonvalidator + +import ( + "sync/atomic" + "testing" + "time" + + "github.com/ava-labs/simplex" + "github.com/ava-labs/simplex/testutil" + "github.com/stretchr/testify/require" +) + +func TestSkipMessage(t *testing.T) { + tests := []struct { + name string + msg *simplex.Message + expected bool + }{ + { + name: "EmptyNotarization", + msg: &simplex.Message{EmptyNotarization: &simplex.EmptyNotarization{}}, + expected: true, + }, + { + name: "VoteMessage", + msg: &simplex.Message{VoteMessage: &simplex.Vote{}}, + expected: true, + }, + { + name: "EmptyVoteMessage", + msg: &simplex.Message{EmptyVoteMessage: &simplex.EmptyVote{}}, + expected: true, + }, + { + name: "Notarization", + msg: &simplex.Message{Notarization: &simplex.Notarization{}}, + expected: true, + }, + { + name: "ReplicationResponse", + msg: &simplex.Message{ReplicationResponse: &simplex.ReplicationResponse{}}, + expected: true, + }, + { + name: "ReplicationRequest", + msg: &simplex.Message{ReplicationRequest: &simplex.ReplicationRequest{}}, + expected: true, + }, + { + name: "FinalizeVote", + msg: &simplex.Message{FinalizeVote: &simplex.FinalizeVote{}}, + expected: true, + }, + { + name: "BlockDigestRequest", + msg: &simplex.Message{BlockDigestRequest: &simplex.BlockDigestRequest{}}, + expected: false, + }, + { + name: "BlockMessage", + msg: &simplex.Message{BlockMessage: &simplex.BlockMessage{}}, + expected: false, + }, + { + name: "Finalization", + msg: &simplex.Message{Finalization: &simplex.Finalization{}}, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require.Equal(t, tt.expected, skipMessage(tt.msg)) + }) + } +} + +func newTestNonValidator(t *testing.T, nodes []simplex.NodeID, lastVerifiedBlock simplex.Block) *NonValidator { + config := Config{ + Storage: testutil.NewNonValidatorInMemoryStorage(), + Logger: testutil.MakeLogger(t, 1), + GenesisValidators: nodes, + } + + return NewNonValidator(config, lastVerifiedBlock) +} + +func blockMessage(t *testing.T, block simplex.Block, from simplex.NodeID) *simplex.Message { + vote, err := testutil.NewTestVote(block, from) + require.NoError(t, err) + return &simplex.Message{ + BlockMessage: &simplex.BlockMessage{ + Block: block, + Vote: *vote, + }, + } +} + +func TestHandleFinalizationMessage(t *testing.T) { + nodes := []simplex.NodeID{{1}, {2}, {3}, {4}} + + tests := []struct { + name string + lastVerifiedBlock *testutil.TestBlock + // sendBlock controls whether a block message is sent before the finalization. + sendBlock bool + blockSender simplex.NodeID + expectVerified bool + expectedNumBlocks uint64 + }{ + { + name: "Finalization Only No Block", + }, + { + name: "Block From Non-Leader Then Finalization", + sendBlock: true, + blockSender: nodes[1], + }, + { + name: "Block From Leader Then Finalization", + sendBlock: true, + blockSender: nodes[0], + expectVerified: true, + expectedNumBlocks: 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var lastVerified simplex.Block + if tt.lastVerifiedBlock != nil { + lastVerified = tt.lastVerifiedBlock + } + v := newTestNonValidator(t, nodes, lastVerified) + + var verified atomic.Bool + blockToSend := testutil.NewTestBlock(simplex.ProtocolMetadata{ + Round: 0, + Seq: 0, + Epoch: 0, + }, simplex.Blacklist{}) + blockToSend.OnVerify = func() { + verified.Store(true) + } + + if tt.sendBlock { + err := v.HandleMessage(blockMessage(t, blockToSend, nodes[0]), tt.blockSender) + require.NoError(t, err) + } + + finalization, _ := testutil.NewFinalizationRecord(t, v.Logger, &testutil.TestSignatureAggregator{}, blockToSend, nodes) + err := v.HandleMessage(&simplex.Message{Finalization: &finalization}, nodes[0]) + require.NoError(t, err) + + if tt.expectVerified { + require.Eventually(t, verified.Load, 2*time.Second, 20*time.Millisecond) + } else { + require.Never(t, verified.Load, 2*time.Second, 20*time.Millisecond) + } + require.Equal(t, tt.expectedNumBlocks, v.Storage.NumBlocks()) + }) + } +} + +func TestHandleBlockMessage(t *testing.T) { + nodes := []simplex.NodeID{{1}, {2}, {3}, {4}} + + tests := []struct { + name string + lastVerifiedBlock *testutil.TestBlock + // finalizationBlock returns the block to finalize; nil means no finalization is sent. + // lastVerified may be nil when lastVerifiedBlock is not set for the test case. + finalizationBlock func(lastVerified, blockToSend *testutil.TestBlock) *testutil.TestBlock + blockSender simplex.NodeID + expectVerified bool + expectedNumBlocks uint64 + }{ + { + name: "Next to Verify But No Finalization", + blockSender: nodes[0], + }, + { + name: "BlockMessage not sent from leader", + finalizationBlock: func(_, blockToSend *testutil.TestBlock) *testutil.TestBlock { return blockToSend }, + blockSender: nodes[1], + }, + { + name: "Already Verified", + lastVerifiedBlock: testutil.NewTestBlock(simplex.ProtocolMetadata{ + Round: 0, + Seq: 0, + Epoch: 0, + }, simplex.Blacklist{}), + finalizationBlock: func(lastVerified, _ *testutil.TestBlock) *testutil.TestBlock { return lastVerified }, + blockSender: nodes[1], + }, + { + name: "Finalization Received", + finalizationBlock: func(_, blockToSend *testutil.TestBlock) *testutil.TestBlock { return blockToSend }, + blockSender: nodes[0], + expectVerified: true, + expectedNumBlocks: 1, + }, + { + name: "Finalization Received But Not Next To Verify", + finalizationBlock: func(_, blockToSend *testutil.TestBlock) *testutil.TestBlock { return blockToSend }, + blockSender: nodes[0], + expectVerified: true, + expectedNumBlocks: 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var lastVerified simplex.Block + if tt.lastVerifiedBlock != nil { + lastVerified = tt.lastVerifiedBlock + } + v := newTestNonValidator(t, nodes, lastVerified) + + var verified atomic.Bool + blockToSend := testutil.NewTestBlock(simplex.ProtocolMetadata{ + Round: 0, + Seq: 0, + Epoch: 0, + }, simplex.Blacklist{}) + blockToSend.OnVerify = func() { + verified.Store(true) + } + + if tt.finalizationBlock != nil { + finalizeBlock := tt.finalizationBlock(tt.lastVerifiedBlock, blockToSend) + finalization, _ := testutil.NewFinalizationRecord(t, v.Logger, &testutil.TestSignatureAggregator{}, finalizeBlock, nodes) + v.HandleMessage(&simplex.Message{Finalization: &finalization}, nodes[0]) + } + + err := v.HandleMessage(blockMessage(t, blockToSend, nodes[0]), tt.blockSender) + require.NoError(t, err) + + if tt.expectVerified { + require.Eventually(t, verified.Load, 2*time.Second, 20*time.Millisecond) + } else { + require.Never(t, verified.Load, 2*time.Second, 20*time.Millisecond) + } + require.Equal(t, tt.expectedNumBlocks, v.Storage.NumBlocks()) + }) + } +} diff --git a/nonvalidator/verifier.go b/nonvalidator/verifier.go new file mode 100644 index 00000000..aef12532 --- /dev/null +++ b/nonvalidator/verifier.go @@ -0,0 +1,82 @@ +package nonvalidator + +import ( + "context" + + "github.com/ava-labs/simplex" + "go.uber.org/zap" +) + +// Verifier verifies blocks in order, one at a time. +type Verifiier struct { + ctx context.Context + cancelCtx context.CancelFunc + + logger simplex.Logger + latestVerifiedBlock simplex.Block // last block we have verified + storage simplex.Retriever // access to blocks + scheduler *simplex.BasicScheduler +} + +func NewVerifier(logger simplex.Logger, lastVerifiedBlock simplex.Block, storgae simplex.Retriever) *Verifiier { + return &Verifiier{ + logger: logger, + latestVerifiedBlock: lastVerifiedBlock, + storage: storgae, + scheduler: simplex.NewScheduler(logger, 1), + } +} + +func (v *Verifiier) nextSeqToVerify() uint64 { + if v.latestVerifiedBlock == nil { + return 0 + } + + return v.latestVerifiedBlock.BlockHeader().Seq + 1 +} + +// alreadyVerifiedSeq checks if `seq` has already been verified. +func (v *Verifiier) alreadyVerifiedSeq(seq uint64) bool { + return seq < v.nextSeqToVerify() +} + +// triggerVerify wakes up the verifier by attempting to verify the next seq to be verified. +// We verify `block` if it is the next sequence to be verified, otherwise we try to retrieve the next +// block to be verified from storage. +func (v *Verifiier) triggerVerify(block simplex.Block) error { + nextSeqToVerify := v.nextSeqToVerify() + if block.BlockHeader().Seq != nextSeqToVerify { + block, _, err := v.storage.Retrieve(nextSeqToVerify) + if err == simplex.ErrBlockNotFound { + return nil + } else if err != nil { + return err + } + + // Re-call trigger verify and schedule the block verification task + return v.triggerVerify(block) + } + + task := v.createBlockVerificationTask(block) + v.scheduler.Schedule(task) + return nil +} + +func (v *Verifiier) createBlockVerificationTask(block simplex.Block) func() simplex.Digest { + return func() simplex.Digest { + _, err := block.Verify(v.ctx) + if err != nil { + v.logger.Error("Error verifying block with a finalization", zap.Uint64("Seq", block.BlockHeader().Seq), zap.Error(err)) + return simplex.Digest{} + } + + v.latestVerifiedBlock = block + + if err := v.triggerVerify(block); err != nil { + v.logger.Warn("Error while calling triggerVerify", zap.Error(err)) + // TODO: stop & halt the verifier + notify parent components + } + + return simplex.Digest{} + } +} diff --git a/nonvalidator/verifier_test.go b/nonvalidator/verifier_test.go new file mode 100644 index 00000000..522d385e --- /dev/null +++ b/nonvalidator/verifier_test.go @@ -0,0 +1,151 @@ +package nonvalidator + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/ava-labs/simplex" + "github.com/ava-labs/simplex/testutil" + "github.com/stretchr/testify/require" +) + +func blockWithSeq(seq uint64) *testutil.TestBlock { + return testutil.NewTestBlock(simplex.ProtocolMetadata{Seq: seq}, simplex.Blacklist{}) +} + +func TestAlreadyVerifiedSeq(t *testing.T) { + logger := testutil.MakeLogger(t, 0) + + tests := []struct { + name string + verifier *Verifiier + seq uint64 + expected bool + }{ + { + name: "No Verified Blocks", + verifier: func() *Verifiier { + storage := testutil.NewNonValidatorInMemoryStorage() + return NewVerifier(logger, nil, storage) + }(), + seq: 5, + expected: false, + }, + { + name: "Already verified", + verifier: func() *Verifiier { + storage := testutil.NewNonValidatorInMemoryStorage() + return NewVerifier(logger, blockWithSeq(5), storage) + }(), + seq: 3, + expected: true, + }, + { + name: "Not verified", + verifier: func() *Verifiier { + storage := testutil.NewNonValidatorInMemoryStorage() + return NewVerifier(logger, blockWithSeq(5), storage) + }(), + seq: 6, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := tt.verifier.alreadyVerifiedSeq(tt.seq) + require.Equal(t, tt.expected, result) + }) + } +} + +func TestTriggerVerify(t *testing.T) { + logger := testutil.MakeLogger(t, 0) + + tests := []struct { + name string + verifier *Verifiier + block simplex.Block + expectedErr error + expectedLatestVerifiedSeq uint64 + }{ + { + name: "nothing to verify", + verifier: func() *Verifiier { + s := testutil.NewNonValidatorInMemoryStorage() + return NewVerifier(logger, blockWithSeq(5), s) + }(), + block: blockWithSeq(9), + expectedLatestVerifiedSeq: 5, + }, + { + name: "block is next to verify", + verifier: func() *Verifiier { + s := testutil.NewNonValidatorInMemoryStorage() + return NewVerifier(logger, blockWithSeq(5), s) + }(), + block: blockWithSeq(6), + expectedLatestVerifiedSeq: 6, + }, + { + name: "other block can be verified", + verifier: func() *Verifiier { + s := testutil.NewNonValidatorInMemoryStorage() + require.NoError(t, s.Index(context.Background(), blockWithSeq(6), simplex.Finalization{})) + return NewVerifier(logger, blockWithSeq(5), s) + }(), + block: blockWithSeq(9), + expectedLatestVerifiedSeq: 6, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.verifier.triggerVerify(tt.block) + require.ErrorIs(t, err, tt.expectedErr) + + require.Eventually(t, func() bool { + lv := tt.verifier.latestVerifiedBlock + if lv == nil { + return false + } + return lv.BlockHeader().Seq == tt.expectedLatestVerifiedSeq + }, 5*time.Second, 10*time.Millisecond) + }) + } +} + +func TestTriggerVerifyWhileVerifying(t *testing.T) { + logger := testutil.MakeLogger(t, 0) + + storage := testutil.NewNonValidatorInMemoryStorage() + block7 := blockWithSeq(7) + block6 := blockWithSeq(6) + block6.OnVerify = func() { + storage.Index(context.Background(), block7, simplex.Finalization{}) + } + + v := NewVerifier(logger, blockWithSeq(5), storage) + require.NoError(t, v.triggerVerify(block6)) + + require.Eventually(t, func() bool { + lv := v.latestVerifiedBlock + return lv != nil && lv.BlockHeader().Seq == 7 + }, 5*time.Second, 10*time.Millisecond) +} + +func TestTriggerVerifyDBError(t *testing.T) { + logger := testutil.MakeLogger(t, 0) + + dbErr := errors.New("db error") + storage := testutil.NewNonValidatorInMemoryStorage() + storage.RetrieveF = func(_ uint64) (simplex.FullBlock, simplex.Finalization, error) { + return nil, simplex.Finalization{}, dbErr + } + + v := NewVerifier(logger, blockWithSeq(5), storage) + require.ErrorIs(t, v.triggerVerify(blockWithSeq(9)), dbErr) + require.Equal(t, uint64(5), v.latestVerifiedBlock.BlockHeader().Seq) +} diff --git a/testutil/storage.go b/testutil/storage.go index fdd362b8..a4ce22f6 100644 --- a/testutil/storage.go +++ b/testutil/storage.go @@ -153,3 +153,42 @@ func (mem *InMemStorage) Compare(other *InMemStorage) error { return nil } + +var _ simplex.FullStorage = (*NonValidatorInMemoryStorage)(nil) + +type NonValidatorInMemoryStorage struct { + RetrieveF func(uint64) (simplex.FullBlock, simplex.Finalization, error) + *InMemStorage +} + +func NewNonValidatorInMemoryStorage() *NonValidatorInMemoryStorage { + s := &InMemStorage{ + data: make(map[uint64]struct { + simplex.VerifiedBlock + simplex.Finalization + }), + } + + s.signal = *sync.NewCond(&s.lock) + + return &NonValidatorInMemoryStorage{ + InMemStorage: s, + } +} + +func (mem *NonValidatorInMemoryStorage) Retrieve(seq uint64) (simplex.FullBlock, simplex.Finalization, error) { + if mem.RetrieveF != nil { + return mem.RetrieveF(seq) + } + + block, finalization, err := mem.InMemStorage.Retrieve(seq) + var fullBlock simplex.FullBlock + if block != nil { + fullBlock = block.(simplex.FullBlock) + } + return fullBlock, finalization, err +} + +func (mem *NonValidatorInMemoryStorage) Index(ctx context.Context, block simplex.FullBlock, certificate simplex.Finalization) error { + return mem.InMemStorage.Index(ctx, block, certificate) +} From b1bd17b6654a796b82a64a13c9062023a1cd6393 Mon Sep 17 00:00:00 2001 From: samliok Date: Tue, 17 Mar 2026 13:52:39 -0500 Subject: [PATCH 2/4] add more tests, fix bugs, nit, and styling --- epoch.go | 4 +- nonvalidator/non_validator.go | 147 +++++++++++++++-------------- nonvalidator/non_validator_test.go | 118 ++++++++++++++++++++++- nonvalidator/util.go | 27 ++++++ nonvalidator/verifier.go | 58 +++++++----- nonvalidator/verifier_test.go | 42 +++++---- util.go | 10 +- 7 files changed, 283 insertions(+), 123 deletions(-) create mode 100644 nonvalidator/util.go diff --git a/epoch.go b/epoch.go index 1e5ec722..64c2885d 100644 --- a/epoch.go +++ b/epoch.go @@ -83,7 +83,7 @@ type EpochConfig struct { type Epoch struct { EpochConfig // Runtime - oneTimeVerifier *oneTimeVerifier + oneTimeVerifier *OneTimeVerifier buildBlockScheduler *BasicScheduler blockVerificationScheduler *BlockDependencyManager lock sync.Mutex @@ -189,7 +189,7 @@ func (e *Epoch) init() error { return err } e.initOldestNotFinalizedNotarization() - e.oneTimeVerifier = newOneTimeVerifier(e.Logger) + e.oneTimeVerifier = NewOneTimeVerifier(e.Logger) scheduler := NewScheduler(e.Logger, DefaultProcessingBlocks) e.blockVerificationScheduler = NewBlockVerificationScheduler(e.Logger, DefaultProcessingBlocks, scheduler) e.buildBlockScheduler = NewScheduler(e.Logger, 1) diff --git a/nonvalidator/non_validator.go b/nonvalidator/non_validator.go index a4b58155..220357af 100644 --- a/nonvalidator/non_validator.go +++ b/nonvalidator/non_validator.go @@ -3,34 +3,13 @@ package nonvalidator import ( "bytes" "context" - "fmt" "github.com/ava-labs/simplex" "go.uber.org/zap" ) -type finalizedSeq struct { - block simplex.FullBlock - finalization *simplex.Finalization -} - -func (f *finalizedSeq) String() string { - seq := uint64(0) - digest := simplex.Digest{} - if f.block != nil { - seq = f.block.BlockHeader().Seq - digest = f.block.BlockHeader().Digest - } - if f.finalization != nil { - seq = f.finalization.Finalization.Seq - digest = f.finalization.Finalization.Digest - } - - return fmt.Sprintf("FinalizedSeq {BlockDigest: %s, Seq: %d, BlockExists %t, FinalizationExists %t}", digest, seq, f.block != nil, f.finalization != nil) -} - -type epochInfo struct { - validators []simplex.NodeID +type epochMetadata struct { + validators []simplex.NodeID // the validators of this epoch epoch uint64 } @@ -42,32 +21,37 @@ type Config struct { type NonValidator struct { Config + ctx context.Context cancelCtx context.CancelFunc - verifier Verifiier + verifier *Verifier - incompleteSeqs map[simplex.Digest]*finalizedSeq + // incompleteSequences stores sequences that we have not collected + // both a block and finalization for. Once both have been received, they are indexed & verified. + // TODO: garbage collect old sequences + incompleteSequences map[uint64]*finalizedSeq - // the most recent epoch that we have - latestEpoch *epochInfo + // the most recent epoch we have verified + latestVerifiedEpoch *epochMetadata } +// NewNonValidator creates a NonValidator with the given `config` and `lastVerifiedBlock`. func NewNonValidator(config Config, lastVerifiedBlock simplex.Block) *NonValidator { ctx, cancelFunc := context.WithCancel(context.Background()) - verifier := NewVerifier(config.Logger, lastVerifiedBlock, config.Storage) + verifier := NewVerifier(ctx, config.Logger, lastVerifiedBlock, config.Storage) - latestEpoch := &epochInfo{ + latestEpoch := &epochMetadata{ epoch: 0, validators: config.GenesisValidators, } return &NonValidator{ - Config: config, - incompleteSeqs: make(map[simplex.Digest]*finalizedSeq, 0), - ctx: ctx, - cancelCtx: cancelFunc, - verifier: *verifier, - latestEpoch: latestEpoch, + Config: config, + incompleteSequences: make(map[uint64]*finalizedSeq), + ctx: ctx, + cancelCtx: cancelFunc, + verifier: verifier, + latestVerifiedEpoch: latestEpoch, } } @@ -75,12 +59,11 @@ func (n *NonValidator) Start() { n.broadcastLatestEpoch() } -// TODO: Broadcast the last known epoch to bootstrap the node. Collect responses marking the latest seal block -// Keep rebroadcasting requests for that sealing block until we have enough responses to verify. -func (n *NonValidator) broadcastLatestEpoch() { - -} +// TODO: Broadcast the last known epoch to bootstrap the node. Collect responses marking the latest sealing block. +// Keep rebroadcasting requests for that sealing block until we have enough responses. +func (n *NonValidator) broadcastLatestEpoch() {} +// skipMessage returns whether `msg` should be processed by a non-validator. func skipMessage(msg *simplex.Message) bool { switch { case msg == nil: @@ -93,8 +76,8 @@ func skipMessage(msg *simplex.Message) bool { return true case msg.Notarization != nil: return true - // TODO: I think we want to handle replication messages. - // We can just reuse the message types from simplex and only care about sequences not rounds. + // TODO: handle replication messages. + // We can reuse these message types from Simplex and only care about QuorumRounds for finalized sequences. case msg.ReplicationResponse != nil: return true case msg.ReplicationRequest != nil: @@ -111,7 +94,7 @@ func (n *NonValidator) HandleMessage(msg *simplex.Message, from simplex.NodeID) case skipMessage(msg): return nil case msg.BlockDigestRequest != nil: - // TODO: send out request to help the network + // TODO: it seems reasonable for our non-validator to be able to process these messages and send out responses. return nil case msg.BlockMessage != nil: // convert to full block @@ -125,22 +108,20 @@ func (n *NonValidator) HandleMessage(msg *simplex.Message, from simplex.NodeID) return n.handleFinalization(msg.Finalization, from) default: n.Logger.Debug("Received unexpected message", zap.Any("Message", msg), zap.Stringer("from", from)) + return nil } - - return nil } // handleBlock handles a block message. func (n *NonValidator) handleBlock(block simplex.FullBlock, from simplex.NodeID) error { bh := block.BlockHeader() - if n.latestEpoch.epoch != bh.Epoch { - n.Logger.Debug("Received a block not from the current epoch", zap.Uint64("Current epoch", n.latestEpoch.epoch), zap.Uint64("Epoch", bh.Epoch), zap.Stringer("From", from)) + if n.latestVerifiedEpoch.epoch != bh.Epoch { + n.Logger.Debug("Received a block from a different epoch", zap.Uint64("Current epoch", n.latestVerifiedEpoch.epoch), zap.Uint64("Block Epoch", bh.Epoch), zap.Stringer("From", from)) + return nil } - // TODO: do we need this check for non-validators? I added because otherwise we would need to store all blocks received until we receive a finalization - // but this could be a dos since a malicious node can spam us with bad blocks. - if !bytes.Equal(simplex.LeaderForRound(n.latestEpoch.validators, bh.Round), from) { + if !bytes.Equal(simplex.LeaderForRound(n.latestVerifiedEpoch.validators, bh.Round), from) { n.Logger.Debug("Received a block not from the leader", zap.Uint64("Epoch", bh.Epoch), zap.Stringer("From", from)) return nil } @@ -151,19 +132,18 @@ func (n *NonValidator) handleBlock(block simplex.FullBlock, from simplex.NodeID) return nil } - // If we have already received this block - incomplete, ok := n.incompleteSeqs[bh.Digest] + incomplete, ok := n.incompleteSequences[bh.Seq] + // we have not received any blocks or finalizations for this sequence if !ok { - // we have not received anything for this digest incompleteSeq := &finalizedSeq{ block: block, } - n.incompleteSeqs[bh.Digest] = incompleteSeq - n.Logger.Debug("Stored incomplete sequence", zap.Stringer("Stored", incompleteSeq)) + n.incompleteSequences[bh.Seq] = incompleteSeq + n.Logger.Debug("Stored incomplete sequence", zap.Stringer("Sequence", incompleteSeq)) return nil } - // Already received this block + // Duplicate block, or finalization not yet received. if incomplete.block != nil || incomplete.finalization == nil { return nil } @@ -182,6 +162,7 @@ func (n *NonValidator) handleBlock(block simplex.FullBlock, from simplex.NodeID) return n.indexBlock(block, incomplete.finalization) } +// indexBlock indexes the block into storage and attempts to trigger verification after. func (n *NonValidator) indexBlock(block simplex.FullBlock, finalization *simplex.Finalization) error { if block == nil || finalization == nil { return nil @@ -194,44 +175,64 @@ func (n *NonValidator) indexBlock(block simplex.FullBlock, finalization *simplex return n.verifier.triggerVerify(block) } +// handleFinalization process a finalization message. If its for a future epoch, it will forward the finalization +// to the replication handler. func (n *NonValidator) handleFinalization(finalization *simplex.Finalization, from simplex.NodeID) error { bh := finalization.Finalization.BlockHeader - if bh.Epoch < n.latestEpoch.epoch { - n.Logger.Debug("Received a finalization from an earlier epoch", zap.Uint64("Current epoch", n.latestEpoch.epoch), zap.Uint64("Epoch", bh.Epoch), zap.Stringer("From", from)) - // TODO: we can still probably process it? + if bh.Epoch < n.latestVerifiedEpoch.epoch { + n.Logger.Debug("Received a finalization from an earlier epoch", zap.Uint64("Current epoch", n.latestVerifiedEpoch.epoch), zap.Uint64("Epoch", bh.Epoch), zap.Stringer("From", from)) + return nil + } + + // verify the finalization + if err := finalization.Verify(); err != nil { + n.Logger.Debug("Received a finalization that failed verification", zap.Stringer("From", from), zap.Error(err)) + return nil } - if bh.Epoch > n.latestEpoch.epoch { - // TODO: begin replication - // if message is from n.checkpointEpoch or less, call index block and mark the message as received to the replicator - // otherwise notify replicator we may need to replicate a future epoch. + // TODO: forward to replication. + if bh.Epoch > n.latestVerifiedEpoch.epoch { + // we may need to begin replication on a future epoch, or notify the replicator a future finalization has arrived. } - // If we have already received this block - incomplete, ok := n.incompleteSeqs[bh.Digest] + incomplete, ok := n.incompleteSequences[bh.Seq] if !ok { - // we have not received anything for this digest + // we have not received anything for this sequence incompleteSeq := &finalizedSeq{ finalization: finalization, } - n.incompleteSeqs[bh.Digest] = incompleteSeq - n.Logger.Debug("Stored incomplete sequence", zap.Stringer("Stored", incompleteSeq)) + n.incompleteSequences[bh.Seq] = incompleteSeq + n.Logger.Debug("Stored incomplete sequence", zap.Stringer("Sequence", incompleteSeq)) return nil } - // Already received this block - if incomplete.block == nil || incomplete.finalization != nil { + // Duplicate finalization received. + if incomplete.finalization != nil { + // sanity check: should never happen. + if !bytes.Equal(incomplete.finalization.Finalization.Bytes(), finalization.Finalization.Bytes()) { + n.Logger.Warn( + "Mismatching finalizations", + zap.Uint64("Incoming Sequence", finalization.Finalization.Seq), + zap.Uint64("Stored sequence", incomplete.finalization.Finalization.Seq), + ) + } + return nil + } + + // No block received yet for this sequence. + if incomplete.block == nil { return nil } digest := incomplete.block.BlockHeader().Digest if !bytes.Equal(bh.Digest[:], digest[:]) { - // TODO We probably need to start replication, for this block? + // TODO: this means the leader has equivocated and sent us a wrong block while another has been finalized. + // We should probably handle replication for this block? n.Logger.Info( "Received a block from the leader of a round whose digest mismatches the finalization", - zap.Stringer("Finalization Digest", incomplete.finalization.Finalization.Digest), - zap.Stringer("Block digest", bh.Digest), + zap.Stringer("Finalization Digest", bh.Digest), + zap.Stringer("Block digest", digest), zap.Stringer("From", from), ) return nil diff --git a/nonvalidator/non_validator_test.go b/nonvalidator/non_validator_test.go index 15e2ef6d..6497189f 100644 --- a/nonvalidator/non_validator_test.go +++ b/nonvalidator/non_validator_test.go @@ -1,6 +1,7 @@ package nonvalidator import ( + "errors" "sync/atomic" "testing" "time" @@ -10,6 +11,12 @@ import ( "github.com/stretchr/testify/require" ) +type errQC struct{} + +func (errQC) Signers() []simplex.NodeID { return nil } +func (errQC) Verify([]byte) error { return errors.New("verification failed") } +func (errQC) Bytes() []byte { return nil } + func TestSkipMessage(t *testing.T) { tests := []struct { name string @@ -162,6 +169,109 @@ func TestHandleFinalizationMessage(t *testing.T) { } } +func TestHandleBlockDigestMismatch(t *testing.T) { + nodes := []simplex.NodeID{{1}, {2}, {3}, {4}} + v := newTestNonValidator(t, nodes, nil) + + metadata := simplex.ProtocolMetadata{Seq: 0, Epoch: 0, Round: 0} + blockA := testutil.NewTestBlock(metadata, simplex.Blacklist{}) + blockB := testutil.NewTestBlock(metadata, simplex.Blacklist{}) + blockB.Data = []byte("different") + blockB.ComputeDigest() + + // send finalization for blockB + finalization, _ := testutil.NewFinalizationRecord(t, v.Logger, &testutil.TestSignatureAggregator{}, blockB, nodes) + err := v.HandleMessage(&simplex.Message{Finalization: &finalization}, nodes[0]) + require.NoError(t, err) + + // send block message for blockA — digest differs from stored finalization + err = v.HandleMessage(blockMessage(t, blockA, nodes[0]), nodes[0]) + require.NoError(t, err) + + require.Equal(t, uint64(0), v.Storage.NumBlocks()) +} + +func TestHandleFinalizationDigestMismatch(t *testing.T) { + nodes := []simplex.NodeID{{1}, {2}, {3}, {4}} + v := newTestNonValidator(t, nodes, nil) + + metadata := simplex.ProtocolMetadata{Seq: 0, Epoch: 0, Round: 0} + blockA := testutil.NewTestBlock(metadata, simplex.Blacklist{}) + blockB := testutil.NewTestBlock(metadata, simplex.Blacklist{}) + blockB.Data = []byte("different") + blockB.ComputeDigest() + + // send block message for blockA from leader + err := v.HandleMessage(blockMessage(t, blockA, nodes[0]), nodes[0]) + require.NoError(t, err) + + // send finalization for blockB — digest differs from stored block + finalization, _ := testutil.NewFinalizationRecord(t, v.Logger, &testutil.TestSignatureAggregator{}, blockB, nodes) + err = v.HandleMessage(&simplex.Message{Finalization: &finalization}, nodes[0]) + require.NoError(t, err) + + require.Equal(t, uint64(0), v.Storage.NumBlocks()) +} + +func TestHandleFinalizationFailsVerification(t *testing.T) { + nodes := []simplex.NodeID{{1}, {2}, {3}, {4}} + v := newTestNonValidator(t, nodes, nil) + + var verified atomic.Bool + block := testutil.NewTestBlock(simplex.ProtocolMetadata{ + Round: 0, + Seq: 0, + Epoch: 0, + }, simplex.Blacklist{}) + block.OnVerify = func() { + verified.Store(true) + } + + // send block from leader + err := v.HandleMessage(blockMessage(t, block, nodes[0]), nodes[0]) + require.NoError(t, err) + + // send a finalization that fails verification + finalization := &simplex.Finalization{ + QC: errQC{}, + } + err = v.HandleMessage(&simplex.Message{Finalization: finalization}, nodes[0]) + require.NoError(t, err) + + require.Never(t, verified.Load, 2*time.Second, 20*time.Millisecond) + require.Equal(t, uint64(0), v.Storage.NumBlocks()) +} + +func TestBlockVerifyCalledOnce(t *testing.T) { + nodes := []simplex.NodeID{{1}, {2}, {3}, {4}} + v := newTestNonValidator(t, nodes, nil) + + verificationDelay := make(chan struct{}) + var verifyCount atomic.Int32 + + block := testutil.NewTestBlock(simplex.ProtocolMetadata{Seq: 0, Round: 0, Epoch: 0}, simplex.Blacklist{}) + block.VerificationDelay = verificationDelay + block.OnVerify = func() { + verifyCount.Add(1) + // Schedule a second verification while task 1 is still inside Verify. + // The OneTimeVerifier should return the cached result without calling Verify again. + _ = v.verifier.triggerVerify(block) + } + + finalization, _ := testutil.NewFinalizationRecord(t, v.Logger, &testutil.TestSignatureAggregator{}, block, nodes) + err := v.HandleMessage(&simplex.Message{Finalization: &finalization}, nodes[0]) + require.NoError(t, err) + + err = v.HandleMessage(blockMessage(t, block, nodes[0]), nodes[0]) + require.NoError(t, err) + + // Unblock the in-progress verification. + close(verificationDelay) + + require.Eventually(t, func() bool { return verifyCount.Load() == 1 }, 2*time.Second, 20*time.Millisecond) + require.Never(t, func() bool { return verifyCount.Load() > 1 }, 200*time.Millisecond, 20*time.Millisecond) +} + func TestHandleBlockMessage(t *testing.T) { nodes := []simplex.NodeID{{1}, {2}, {3}, {4}} @@ -172,6 +282,7 @@ func TestHandleBlockMessage(t *testing.T) { // lastVerified may be nil when lastVerifiedBlock is not set for the test case. finalizationBlock func(lastVerified, blockToSend *testutil.TestBlock) *testutil.TestBlock blockSender simplex.NodeID + blockSeq uint64 expectVerified bool expectedNumBlocks uint64 }{ @@ -202,10 +313,13 @@ func TestHandleBlockMessage(t *testing.T) { expectedNumBlocks: 1, }, { + // seq 1 arrives with a finalization but seq 0 has not been verified yet, + // so the block is indexed but verification is deferred. name: "Finalization Received But Not Next To Verify", finalizationBlock: func(_, blockToSend *testutil.TestBlock) *testutil.TestBlock { return blockToSend }, blockSender: nodes[0], - expectVerified: true, + blockSeq: 1, + expectVerified: false, expectedNumBlocks: 1, }, } @@ -221,7 +335,7 @@ func TestHandleBlockMessage(t *testing.T) { var verified atomic.Bool blockToSend := testutil.NewTestBlock(simplex.ProtocolMetadata{ Round: 0, - Seq: 0, + Seq: tt.blockSeq, Epoch: 0, }, simplex.Blacklist{}) blockToSend.OnVerify = func() { diff --git a/nonvalidator/util.go b/nonvalidator/util.go new file mode 100644 index 00000000..ddbd73b8 --- /dev/null +++ b/nonvalidator/util.go @@ -0,0 +1,27 @@ +package nonvalidator + +import ( + "fmt" + + "github.com/ava-labs/simplex" +) + +type finalizedSeq struct { + block simplex.FullBlock + finalization *simplex.Finalization +} + +func (f *finalizedSeq) String() string { + seq := uint64(0) + digest := simplex.Digest{} + if f.block != nil { + seq = f.block.BlockHeader().Seq + digest = f.block.BlockHeader().Digest + } + if f.finalization != nil { + seq = f.finalization.Finalization.Seq + digest = f.finalization.Finalization.Digest + } + + return fmt.Sprintf("FinalizedSeq {BlockDigest: %s, Seq: %d, BlockExists %t, FinalizationExists %t}", digest, seq, f.block != nil, f.finalization != nil) +} diff --git a/nonvalidator/verifier.go b/nonvalidator/verifier.go index aef12532..1b5a49ed 100644 --- a/nonvalidator/verifier.go +++ b/nonvalidator/verifier.go @@ -2,67 +2,79 @@ package nonvalidator import ( "context" + "errors" + "sync" "github.com/ava-labs/simplex" "go.uber.org/zap" ) // Verifier verifies blocks in order, one at a time. -type Verifiier struct { - ctx context.Context - cancelCtx context.CancelFunc +type Verifier struct { + ctx context.Context + lock sync.Mutex logger simplex.Logger latestVerifiedBlock simplex.Block // last block we have verified storage simplex.Retriever // access to blocks scheduler *simplex.BasicScheduler + oneTimeVerifier *simplex.OneTimeVerifier } -func NewVerifier(logger simplex.Logger, lastVerifiedBlock simplex.Block, storgae simplex.Retriever) *Verifiier { - return &Verifiier{ +func NewVerifier(ctx context.Context, logger simplex.Logger, lastVerifiedBlock simplex.Block, storage simplex.Retriever) *Verifier { + return &Verifier{ + ctx: ctx, logger: logger, latestVerifiedBlock: lastVerifiedBlock, - storage: storgae, + storage: storage, scheduler: simplex.NewScheduler(logger, 1), + oneTimeVerifier: simplex.NewOneTimeVerifier(logger), } } -func (v *Verifiier) nextSeqToVerify() uint64 { +// latestVerified returns the most recently verified block. +func (v *Verifier) latestVerified() simplex.Block { + v.lock.Lock() + defer v.lock.Unlock() + return v.latestVerifiedBlock +} + +func (v *Verifier) nextSeqToVerify() uint64 { if v.latestVerifiedBlock == nil { return 0 } - return v.latestVerifiedBlock.BlockHeader().Seq + 1 } // alreadyVerifiedSeq checks if `seq` has already been verified. -func (v *Verifiier) alreadyVerifiedSeq(seq uint64) bool { +func (v *Verifier) alreadyVerifiedSeq(seq uint64) bool { return seq < v.nextSeqToVerify() } // triggerVerify wakes up the verifier by attempting to verify the next seq to be verified. // We verify `block` if it is the next sequence to be verified, otherwise we try to retrieve the next // block to be verified from storage. -func (v *Verifiier) triggerVerify(block simplex.Block) error { - nextSeqToVerify := v.nextSeqToVerify() - if block.BlockHeader().Seq != nextSeqToVerify { - block, _, err := v.storage.Retrieve(nextSeqToVerify) - if err == simplex.ErrBlockNotFound { +func (v *Verifier) triggerVerify(block simplex.Block) error { + v.lock.Lock() + defer v.lock.Unlock() + + nextSeq := v.nextSeqToVerify() + for { + if block.BlockHeader().Seq == nextSeq { + v.scheduler.Schedule(v.createBlockVerificationTask(v.oneTimeVerifier.Wrap(block))) + return nil + } + retrieved, _, err := v.storage.Retrieve(nextSeq) + if errors.Is(err, simplex.ErrBlockNotFound) { return nil } else if err != nil { return err } - - // Re-call trigger verify and schedule the block verification task - return v.triggerVerify(block) + block = retrieved } - - task := v.createBlockVerificationTask(block) - v.scheduler.Schedule(task) - return nil } -func (v *Verifiier) createBlockVerificationTask(block simplex.Block) func() simplex.Digest { +func (v *Verifier) createBlockVerificationTask(block simplex.Block) func() simplex.Digest { return func() simplex.Digest { _, err := block.Verify(v.ctx) if err != nil { @@ -70,7 +82,9 @@ func (v *Verifiier) createBlockVerificationTask(block simplex.Block) func() simp return simplex.Digest{} } + v.lock.Lock() v.latestVerifiedBlock = block + v.lock.Unlock() if err := v.triggerVerify(block); err != nil { v.logger.Warn("Error while calling triggerVerify", zap.Error(err)) diff --git a/nonvalidator/verifier_test.go b/nonvalidator/verifier_test.go index 522d385e..fef2d228 100644 --- a/nonvalidator/verifier_test.go +++ b/nonvalidator/verifier_test.go @@ -16,37 +16,38 @@ func blockWithSeq(seq uint64) *testutil.TestBlock { } func TestAlreadyVerifiedSeq(t *testing.T) { + ctx := context.Background() logger := testutil.MakeLogger(t, 0) tests := []struct { name string - verifier *Verifiier + verifier *Verifier seq uint64 expected bool }{ { name: "No Verified Blocks", - verifier: func() *Verifiier { + verifier: func() *Verifier { storage := testutil.NewNonValidatorInMemoryStorage() - return NewVerifier(logger, nil, storage) + return NewVerifier(ctx, logger, nil, storage) }(), seq: 5, expected: false, }, { name: "Already verified", - verifier: func() *Verifiier { + verifier: func() *Verifier { storage := testutil.NewNonValidatorInMemoryStorage() - return NewVerifier(logger, blockWithSeq(5), storage) + return NewVerifier(ctx, logger, blockWithSeq(5), storage) }(), seq: 3, expected: true, }, { name: "Not verified", - verifier: func() *Verifiier { + verifier: func() *Verifier { storage := testutil.NewNonValidatorInMemoryStorage() - return NewVerifier(logger, blockWithSeq(5), storage) + return NewVerifier(ctx, logger, blockWithSeq(5), storage) }(), seq: 6, expected: false, @@ -62,39 +63,40 @@ func TestAlreadyVerifiedSeq(t *testing.T) { } func TestTriggerVerify(t *testing.T) { + ctx := context.Background() logger := testutil.MakeLogger(t, 0) tests := []struct { name string - verifier *Verifiier + verifier *Verifier block simplex.Block expectedErr error expectedLatestVerifiedSeq uint64 }{ { name: "nothing to verify", - verifier: func() *Verifiier { + verifier: func() *Verifier { s := testutil.NewNonValidatorInMemoryStorage() - return NewVerifier(logger, blockWithSeq(5), s) + return NewVerifier(ctx, logger, blockWithSeq(5), s) }(), block: blockWithSeq(9), expectedLatestVerifiedSeq: 5, }, { name: "block is next to verify", - verifier: func() *Verifiier { + verifier: func() *Verifier { s := testutil.NewNonValidatorInMemoryStorage() - return NewVerifier(logger, blockWithSeq(5), s) + return NewVerifier(ctx, logger, blockWithSeq(5), s) }(), block: blockWithSeq(6), expectedLatestVerifiedSeq: 6, }, { name: "other block can be verified", - verifier: func() *Verifiier { + verifier: func() *Verifier { s := testutil.NewNonValidatorInMemoryStorage() require.NoError(t, s.Index(context.Background(), blockWithSeq(6), simplex.Finalization{})) - return NewVerifier(logger, blockWithSeq(5), s) + return NewVerifier(ctx, logger, blockWithSeq(5), s) }(), block: blockWithSeq(9), expectedLatestVerifiedSeq: 6, @@ -107,7 +109,7 @@ func TestTriggerVerify(t *testing.T) { require.ErrorIs(t, err, tt.expectedErr) require.Eventually(t, func() bool { - lv := tt.verifier.latestVerifiedBlock + lv := tt.verifier.latestVerified() if lv == nil { return false } @@ -118,6 +120,7 @@ func TestTriggerVerify(t *testing.T) { } func TestTriggerVerifyWhileVerifying(t *testing.T) { + ctx := context.Background() logger := testutil.MakeLogger(t, 0) storage := testutil.NewNonValidatorInMemoryStorage() @@ -127,16 +130,17 @@ func TestTriggerVerifyWhileVerifying(t *testing.T) { storage.Index(context.Background(), block7, simplex.Finalization{}) } - v := NewVerifier(logger, blockWithSeq(5), storage) + v := NewVerifier(ctx, logger, blockWithSeq(5), storage) require.NoError(t, v.triggerVerify(block6)) require.Eventually(t, func() bool { - lv := v.latestVerifiedBlock + lv := v.latestVerified() return lv != nil && lv.BlockHeader().Seq == 7 }, 5*time.Second, 10*time.Millisecond) } func TestTriggerVerifyDBError(t *testing.T) { + ctx := context.Background() logger := testutil.MakeLogger(t, 0) dbErr := errors.New("db error") @@ -145,7 +149,7 @@ func TestTriggerVerifyDBError(t *testing.T) { return nil, simplex.Finalization{}, dbErr } - v := NewVerifier(logger, blockWithSeq(5), storage) + v := NewVerifier(ctx, logger, blockWithSeq(5), storage) require.ErrorIs(t, v.triggerVerify(blockWithSeq(9)), dbErr) - require.Equal(t, uint64(5), v.latestVerifiedBlock.BlockHeader().Seq) + require.Equal(t, uint64(5), v.latestVerified().BlockHeader().Seq) } diff --git a/util.go b/util.go index 988cb871..45bdf2da 100644 --- a/util.go +++ b/util.go @@ -129,20 +129,20 @@ func SetRound(block VerifiedBlock, notarization *Notarization, finalization *Fin return round } -type oneTimeVerifier struct { +type OneTimeVerifier struct { lock sync.Mutex digests map[Digest]verifiedResult logger Logger } -func newOneTimeVerifier(logger Logger) *oneTimeVerifier { - return &oneTimeVerifier{ +func NewOneTimeVerifier(logger Logger) *OneTimeVerifier { + return &OneTimeVerifier{ digests: make(map[Digest]verifiedResult), logger: logger, } } -func (otv *oneTimeVerifier) Wrap(block Block) Block { +func (otv *OneTimeVerifier) Wrap(block Block) Block { return &oneTimeVerifiedBlock{ otv: otv, Block: block, @@ -156,7 +156,7 @@ type verifiedResult struct { } type oneTimeVerifiedBlock struct { - otv *oneTimeVerifier + otv *OneTimeVerifier Block } From 9da245b38294622ef4c0195040cd402a9361caa4 Mon Sep 17 00:00:00 2001 From: samliok Date: Tue, 17 Mar 2026 14:26:41 -0500 Subject: [PATCH 3/4] add stop method --- nonvalidator/non_validator.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/nonvalidator/non_validator.go b/nonvalidator/non_validator.go index 220357af..4c791f56 100644 --- a/nonvalidator/non_validator.go +++ b/nonvalidator/non_validator.go @@ -59,6 +59,10 @@ func (n *NonValidator) Start() { n.broadcastLatestEpoch() } +func (n *NonValidator) Stop() { + n.cancelCtx() +} + // TODO: Broadcast the last known epoch to bootstrap the node. Collect responses marking the latest sealing block. // Keep rebroadcasting requests for that sealing block until we have enough responses. func (n *NonValidator) broadcastLatestEpoch() {} @@ -90,6 +94,12 @@ func skipMessage(msg *simplex.Message) bool { } func (n *NonValidator) HandleMessage(msg *simplex.Message, from simplex.NodeID) error { + select { + case <-n.ctx.Done(): + return nil + default: + } + switch { case skipMessage(msg): return nil From 7602d641109509328ff9e25c50024e65641edf7a Mon Sep 17 00:00:00 2001 From: samliok Date: Tue, 17 Mar 2026 14:36:53 -0500 Subject: [PATCH 4/4] rename --- api.go | 6 ++---- nonvalidator/verifier.go | 6 +++--- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/api.go b/api.go index 622c20c0..2f8fa4e2 100644 --- a/api.go +++ b/api.go @@ -57,14 +57,12 @@ type Storage interface { } type FullStorage interface { + FullBlockRetriever NumBlocks() uint64 - // Retrieve returns the block and finalization at [seq]. - // If [seq] the block cannot be found, returns ErrBlockNotFound. - Retrieve(seq uint64) (FullBlock, Finalization, error) Index(ctx context.Context, block FullBlock, certificate Finalization) error } -type Retriever interface { +type FullBlockRetriever interface { // Retrieve returns the block and finalization at [seq]. // If [seq] the block cannot be found, returns ErrBlockNotFound. Retrieve(seq uint64) (FullBlock, Finalization, error) diff --git a/nonvalidator/verifier.go b/nonvalidator/verifier.go index 1b5a49ed..469f1c40 100644 --- a/nonvalidator/verifier.go +++ b/nonvalidator/verifier.go @@ -15,13 +15,13 @@ type Verifier struct { lock sync.Mutex logger simplex.Logger - latestVerifiedBlock simplex.Block // last block we have verified - storage simplex.Retriever // access to blocks + latestVerifiedBlock simplex.Block // last block we have verified + storage simplex.FullBlockRetriever // access to blocks scheduler *simplex.BasicScheduler oneTimeVerifier *simplex.OneTimeVerifier } -func NewVerifier(ctx context.Context, logger simplex.Logger, lastVerifiedBlock simplex.Block, storage simplex.Retriever) *Verifier { +func NewVerifier(ctx context.Context, logger simplex.Logger, lastVerifiedBlock simplex.Block, storage simplex.FullBlockRetriever) *Verifier { return &Verifier{ ctx: ctx, logger: logger,