diff --git a/api.go b/api.go index 50fa88e3..2f8fa4e2 100644 --- a/api.go +++ b/api.go @@ -56,6 +56,18 @@ type Storage interface { Index(ctx context.Context, block VerifiedBlock, certificate Finalization) error } +type FullStorage interface { + FullBlockRetriever + NumBlocks() uint64 + Index(ctx context.Context, block FullBlock, certificate Finalization) error +} + +type FullBlockRetriever interface { + // Retrieve returns the block and finalization at [seq]. + // If [seq] the block cannot be found, returns ErrBlockNotFound. + Retrieve(seq uint64) (FullBlock, Finalization, error) +} + type Communication interface { // Nodes returns all nodes that participate in the epoch. Nodes() []NodeID @@ -102,6 +114,12 @@ type VerifiedBlock interface { Bytes() ([]byte, error) } +// Contains all functions on the block +type FullBlock interface { + VerifiedBlock + Block +} + // BlockDeserializer deserializes blocks according to formatting // enforced by the application. type BlockDeserializer interface { diff --git a/epoch.go b/epoch.go index ca827367..64c2885d 100644 --- a/epoch.go +++ b/epoch.go @@ -189,7 +189,7 @@ func (e *Epoch) init() error { return err } e.initOldestNotFinalizedNotarization() - e.oneTimeVerifier = newOneTimeVerifier(e.Logger) + e.oneTimeVerifier = NewOneTimeVerifier(e.Logger) scheduler := NewScheduler(e.Logger, DefaultProcessingBlocks) e.blockVerificationScheduler = NewBlockVerificationScheduler(e.Logger, DefaultProcessingBlocks, scheduler) e.buildBlockScheduler = NewScheduler(e.Logger, 1) diff --git a/nonvalidator/non_validator.go b/nonvalidator/non_validator.go new file mode 100644 index 00000000..4c791f56 --- /dev/null +++ b/nonvalidator/non_validator.go @@ -0,0 +1,252 @@ +package nonvalidator + +import ( + "bytes" + "context" + + "github.com/ava-labs/simplex" + "go.uber.org/zap" +) + +type epochMetadata struct { + validators []simplex.NodeID // the validators of this epoch + epoch uint64 +} + +type Config struct { + Logger simplex.Logger + Storage simplex.FullStorage + GenesisValidators []simplex.NodeID +} + +type NonValidator struct { + Config + + ctx context.Context + cancelCtx context.CancelFunc + verifier *Verifier + + // incompleteSequences stores sequences that we have not collected + // both a block and finalization for. Once both have been received, they are indexed & verified. + // TODO: garbage collect old sequences + incompleteSequences map[uint64]*finalizedSeq + + // the most recent epoch we have verified + latestVerifiedEpoch *epochMetadata +} + +// NewNonValidator creates a NonValidator with the given `config` and `lastVerifiedBlock`. +func NewNonValidator(config Config, lastVerifiedBlock simplex.Block) *NonValidator { + ctx, cancelFunc := context.WithCancel(context.Background()) + verifier := NewVerifier(ctx, config.Logger, lastVerifiedBlock, config.Storage) + + latestEpoch := &epochMetadata{ + epoch: 0, + validators: config.GenesisValidators, + } + + return &NonValidator{ + Config: config, + incompleteSequences: make(map[uint64]*finalizedSeq), + ctx: ctx, + cancelCtx: cancelFunc, + verifier: verifier, + latestVerifiedEpoch: latestEpoch, + } +} + +func (n *NonValidator) Start() { + n.broadcastLatestEpoch() +} + +func (n *NonValidator) Stop() { + n.cancelCtx() +} + +// TODO: Broadcast the last known epoch to bootstrap the node. Collect responses marking the latest sealing block. +// Keep rebroadcasting requests for that sealing block until we have enough responses. +func (n *NonValidator) broadcastLatestEpoch() {} + +// skipMessage returns whether `msg` should be processed by a non-validator. +func skipMessage(msg *simplex.Message) bool { + switch { + case msg == nil: + return true + case msg.EmptyNotarization != nil: + return true + case msg.VoteMessage != nil: + return true + case msg.EmptyVoteMessage != nil: + return true + case msg.Notarization != nil: + return true + // TODO: handle replication messages. + // We can reuse these message types from Simplex and only care about QuorumRounds for finalized sequences. + case msg.ReplicationResponse != nil: + return true + case msg.ReplicationRequest != nil: + return true + case msg.FinalizeVote != nil: + return true + default: + return false + } +} + +func (n *NonValidator) HandleMessage(msg *simplex.Message, from simplex.NodeID) error { + select { + case <-n.ctx.Done(): + return nil + default: + } + + switch { + case skipMessage(msg): + return nil + case msg.BlockDigestRequest != nil: + // TODO: it seems reasonable for our non-validator to be able to process these messages and send out responses. + return nil + case msg.BlockMessage != nil: + // convert to full block + block, ok := msg.BlockMessage.Block.(simplex.FullBlock) + if !ok { + n.Logger.Debug("Unable to convert block message to full block", zap.Stringer("Digest", msg.BlockMessage.Block.BlockHeader().Digest), zap.Stringer("from", from)) + return nil + } + return n.handleBlock(block, from) + case msg.Finalization != nil: + return n.handleFinalization(msg.Finalization, from) + default: + n.Logger.Debug("Received unexpected message", zap.Any("Message", msg), zap.Stringer("from", from)) + return nil + } +} + +// handleBlock handles a block message. +func (n *NonValidator) handleBlock(block simplex.FullBlock, from simplex.NodeID) error { + bh := block.BlockHeader() + + if n.latestVerifiedEpoch.epoch != bh.Epoch { + n.Logger.Debug("Received a block from a different epoch", zap.Uint64("Current epoch", n.latestVerifiedEpoch.epoch), zap.Uint64("Block Epoch", bh.Epoch), zap.Stringer("From", from)) + return nil + } + + if !bytes.Equal(simplex.LeaderForRound(n.latestVerifiedEpoch.validators, bh.Round), from) { + n.Logger.Debug("Received a block not from the leader", zap.Uint64("Epoch", bh.Epoch), zap.Stringer("From", from)) + return nil + } + + // If we have already verified the block discard it + if n.verifier.alreadyVerifiedSeq(bh.Seq) { + n.Logger.Debug("Received a block from an older round") + return nil + } + + incomplete, ok := n.incompleteSequences[bh.Seq] + // we have not received any blocks or finalizations for this sequence + if !ok { + incompleteSeq := &finalizedSeq{ + block: block, + } + n.incompleteSequences[bh.Seq] = incompleteSeq + n.Logger.Debug("Stored incomplete sequence", zap.Stringer("Sequence", incompleteSeq)) + return nil + } + + // Duplicate block, or finalization not yet received. + if incomplete.block != nil || incomplete.finalization == nil { + return nil + } + + if !bytes.Equal(incomplete.finalization.Finalization.Digest[:], bh.Digest[:]) { + n.Logger.Info( + "Received a block from the leader of a round whose digest mismatches the finalization", + zap.Stringer("Finalization Digest", incomplete.finalization.Finalization.Digest), + zap.Stringer("Block digest", bh.Digest), + zap.Stringer("From", from), + ) + return nil + } + + // index and verify block + return n.indexBlock(block, incomplete.finalization) +} + +// indexBlock indexes the block into storage and attempts to trigger verification after. +func (n *NonValidator) indexBlock(block simplex.FullBlock, finalization *simplex.Finalization) error { + if block == nil || finalization == nil { + return nil + } + + if err := n.Storage.Index(n.ctx, block, *finalization); err != nil { + return err + } + + return n.verifier.triggerVerify(block) +} + +// handleFinalization process a finalization message. If its for a future epoch, it will forward the finalization +// to the replication handler. +func (n *NonValidator) handleFinalization(finalization *simplex.Finalization, from simplex.NodeID) error { + bh := finalization.Finalization.BlockHeader + + if bh.Epoch < n.latestVerifiedEpoch.epoch { + n.Logger.Debug("Received a finalization from an earlier epoch", zap.Uint64("Current epoch", n.latestVerifiedEpoch.epoch), zap.Uint64("Epoch", bh.Epoch), zap.Stringer("From", from)) + return nil + } + + // verify the finalization + if err := finalization.Verify(); err != nil { + n.Logger.Debug("Received a finalization that failed verification", zap.Stringer("From", from), zap.Error(err)) + return nil + } + + // TODO: forward to replication. + if bh.Epoch > n.latestVerifiedEpoch.epoch { + // we may need to begin replication on a future epoch, or notify the replicator a future finalization has arrived. + } + + incomplete, ok := n.incompleteSequences[bh.Seq] + if !ok { + // we have not received anything for this sequence + incompleteSeq := &finalizedSeq{ + finalization: finalization, + } + n.incompleteSequences[bh.Seq] = incompleteSeq + n.Logger.Debug("Stored incomplete sequence", zap.Stringer("Sequence", incompleteSeq)) + return nil + } + + // Duplicate finalization received. + if incomplete.finalization != nil { + // sanity check: should never happen. + if !bytes.Equal(incomplete.finalization.Finalization.Bytes(), finalization.Finalization.Bytes()) { + n.Logger.Warn( + "Mismatching finalizations", + zap.Uint64("Incoming Sequence", finalization.Finalization.Seq), + zap.Uint64("Stored sequence", incomplete.finalization.Finalization.Seq), + ) + } + return nil + } + + // No block received yet for this sequence. + if incomplete.block == nil { + return nil + } + + digest := incomplete.block.BlockHeader().Digest + if !bytes.Equal(bh.Digest[:], digest[:]) { + // TODO: this means the leader has equivocated and sent us a wrong block while another has been finalized. + // We should probably handle replication for this block? + n.Logger.Info( + "Received a block from the leader of a round whose digest mismatches the finalization", + zap.Stringer("Finalization Digest", bh.Digest), + zap.Stringer("Block digest", digest), + zap.Stringer("From", from), + ) + return nil + } + + return n.indexBlock(incomplete.block, finalization) +} diff --git a/nonvalidator/non_validator_test.go b/nonvalidator/non_validator_test.go new file mode 100644 index 00000000..6497189f --- /dev/null +++ b/nonvalidator/non_validator_test.go @@ -0,0 +1,362 @@ +package nonvalidator + +import ( + "errors" + "sync/atomic" + "testing" + "time" + + "github.com/ava-labs/simplex" + "github.com/ava-labs/simplex/testutil" + "github.com/stretchr/testify/require" +) + +type errQC struct{} + +func (errQC) Signers() []simplex.NodeID { return nil } +func (errQC) Verify([]byte) error { return errors.New("verification failed") } +func (errQC) Bytes() []byte { return nil } + +func TestSkipMessage(t *testing.T) { + tests := []struct { + name string + msg *simplex.Message + expected bool + }{ + { + name: "EmptyNotarization", + msg: &simplex.Message{EmptyNotarization: &simplex.EmptyNotarization{}}, + expected: true, + }, + { + name: "VoteMessage", + msg: &simplex.Message{VoteMessage: &simplex.Vote{}}, + expected: true, + }, + { + name: "EmptyVoteMessage", + msg: &simplex.Message{EmptyVoteMessage: &simplex.EmptyVote{}}, + expected: true, + }, + { + name: "Notarization", + msg: &simplex.Message{Notarization: &simplex.Notarization{}}, + expected: true, + }, + { + name: "ReplicationResponse", + msg: &simplex.Message{ReplicationResponse: &simplex.ReplicationResponse{}}, + expected: true, + }, + { + name: "ReplicationRequest", + msg: &simplex.Message{ReplicationRequest: &simplex.ReplicationRequest{}}, + expected: true, + }, + { + name: "FinalizeVote", + msg: &simplex.Message{FinalizeVote: &simplex.FinalizeVote{}}, + expected: true, + }, + { + name: "BlockDigestRequest", + msg: &simplex.Message{BlockDigestRequest: &simplex.BlockDigestRequest{}}, + expected: false, + }, + { + name: "BlockMessage", + msg: &simplex.Message{BlockMessage: &simplex.BlockMessage{}}, + expected: false, + }, + { + name: "Finalization", + msg: &simplex.Message{Finalization: &simplex.Finalization{}}, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require.Equal(t, tt.expected, skipMessage(tt.msg)) + }) + } +} + +func newTestNonValidator(t *testing.T, nodes []simplex.NodeID, lastVerifiedBlock simplex.Block) *NonValidator { + config := Config{ + Storage: testutil.NewNonValidatorInMemoryStorage(), + Logger: testutil.MakeLogger(t, 1), + GenesisValidators: nodes, + } + + return NewNonValidator(config, lastVerifiedBlock) +} + +func blockMessage(t *testing.T, block simplex.Block, from simplex.NodeID) *simplex.Message { + vote, err := testutil.NewTestVote(block, from) + require.NoError(t, err) + return &simplex.Message{ + BlockMessage: &simplex.BlockMessage{ + Block: block, + Vote: *vote, + }, + } +} + +func TestHandleFinalizationMessage(t *testing.T) { + nodes := []simplex.NodeID{{1}, {2}, {3}, {4}} + + tests := []struct { + name string + lastVerifiedBlock *testutil.TestBlock + // sendBlock controls whether a block message is sent before the finalization. + sendBlock bool + blockSender simplex.NodeID + expectVerified bool + expectedNumBlocks uint64 + }{ + { + name: "Finalization Only No Block", + }, + { + name: "Block From Non-Leader Then Finalization", + sendBlock: true, + blockSender: nodes[1], + }, + { + name: "Block From Leader Then Finalization", + sendBlock: true, + blockSender: nodes[0], + expectVerified: true, + expectedNumBlocks: 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var lastVerified simplex.Block + if tt.lastVerifiedBlock != nil { + lastVerified = tt.lastVerifiedBlock + } + v := newTestNonValidator(t, nodes, lastVerified) + + var verified atomic.Bool + blockToSend := testutil.NewTestBlock(simplex.ProtocolMetadata{ + Round: 0, + Seq: 0, + Epoch: 0, + }, simplex.Blacklist{}) + blockToSend.OnVerify = func() { + verified.Store(true) + } + + if tt.sendBlock { + err := v.HandleMessage(blockMessage(t, blockToSend, nodes[0]), tt.blockSender) + require.NoError(t, err) + } + + finalization, _ := testutil.NewFinalizationRecord(t, v.Logger, &testutil.TestSignatureAggregator{}, blockToSend, nodes) + err := v.HandleMessage(&simplex.Message{Finalization: &finalization}, nodes[0]) + require.NoError(t, err) + + if tt.expectVerified { + require.Eventually(t, verified.Load, 2*time.Second, 20*time.Millisecond) + } else { + require.Never(t, verified.Load, 2*time.Second, 20*time.Millisecond) + } + require.Equal(t, tt.expectedNumBlocks, v.Storage.NumBlocks()) + }) + } +} + +func TestHandleBlockDigestMismatch(t *testing.T) { + nodes := []simplex.NodeID{{1}, {2}, {3}, {4}} + v := newTestNonValidator(t, nodes, nil) + + metadata := simplex.ProtocolMetadata{Seq: 0, Epoch: 0, Round: 0} + blockA := testutil.NewTestBlock(metadata, simplex.Blacklist{}) + blockB := testutil.NewTestBlock(metadata, simplex.Blacklist{}) + blockB.Data = []byte("different") + blockB.ComputeDigest() + + // send finalization for blockB + finalization, _ := testutil.NewFinalizationRecord(t, v.Logger, &testutil.TestSignatureAggregator{}, blockB, nodes) + err := v.HandleMessage(&simplex.Message{Finalization: &finalization}, nodes[0]) + require.NoError(t, err) + + // send block message for blockA — digest differs from stored finalization + err = v.HandleMessage(blockMessage(t, blockA, nodes[0]), nodes[0]) + require.NoError(t, err) + + require.Equal(t, uint64(0), v.Storage.NumBlocks()) +} + +func TestHandleFinalizationDigestMismatch(t *testing.T) { + nodes := []simplex.NodeID{{1}, {2}, {3}, {4}} + v := newTestNonValidator(t, nodes, nil) + + metadata := simplex.ProtocolMetadata{Seq: 0, Epoch: 0, Round: 0} + blockA := testutil.NewTestBlock(metadata, simplex.Blacklist{}) + blockB := testutil.NewTestBlock(metadata, simplex.Blacklist{}) + blockB.Data = []byte("different") + blockB.ComputeDigest() + + // send block message for blockA from leader + err := v.HandleMessage(blockMessage(t, blockA, nodes[0]), nodes[0]) + require.NoError(t, err) + + // send finalization for blockB — digest differs from stored block + finalization, _ := testutil.NewFinalizationRecord(t, v.Logger, &testutil.TestSignatureAggregator{}, blockB, nodes) + err = v.HandleMessage(&simplex.Message{Finalization: &finalization}, nodes[0]) + require.NoError(t, err) + + require.Equal(t, uint64(0), v.Storage.NumBlocks()) +} + +func TestHandleFinalizationFailsVerification(t *testing.T) { + nodes := []simplex.NodeID{{1}, {2}, {3}, {4}} + v := newTestNonValidator(t, nodes, nil) + + var verified atomic.Bool + block := testutil.NewTestBlock(simplex.ProtocolMetadata{ + Round: 0, + Seq: 0, + Epoch: 0, + }, simplex.Blacklist{}) + block.OnVerify = func() { + verified.Store(true) + } + + // send block from leader + err := v.HandleMessage(blockMessage(t, block, nodes[0]), nodes[0]) + require.NoError(t, err) + + // send a finalization that fails verification + finalization := &simplex.Finalization{ + QC: errQC{}, + } + err = v.HandleMessage(&simplex.Message{Finalization: finalization}, nodes[0]) + require.NoError(t, err) + + require.Never(t, verified.Load, 2*time.Second, 20*time.Millisecond) + require.Equal(t, uint64(0), v.Storage.NumBlocks()) +} + +func TestBlockVerifyCalledOnce(t *testing.T) { + nodes := []simplex.NodeID{{1}, {2}, {3}, {4}} + v := newTestNonValidator(t, nodes, nil) + + verificationDelay := make(chan struct{}) + var verifyCount atomic.Int32 + + block := testutil.NewTestBlock(simplex.ProtocolMetadata{Seq: 0, Round: 0, Epoch: 0}, simplex.Blacklist{}) + block.VerificationDelay = verificationDelay + block.OnVerify = func() { + verifyCount.Add(1) + // Schedule a second verification while task 1 is still inside Verify. + // The OneTimeVerifier should return the cached result without calling Verify again. + _ = v.verifier.triggerVerify(block) + } + + finalization, _ := testutil.NewFinalizationRecord(t, v.Logger, &testutil.TestSignatureAggregator{}, block, nodes) + err := v.HandleMessage(&simplex.Message{Finalization: &finalization}, nodes[0]) + require.NoError(t, err) + + err = v.HandleMessage(blockMessage(t, block, nodes[0]), nodes[0]) + require.NoError(t, err) + + // Unblock the in-progress verification. + close(verificationDelay) + + require.Eventually(t, func() bool { return verifyCount.Load() == 1 }, 2*time.Second, 20*time.Millisecond) + require.Never(t, func() bool { return verifyCount.Load() > 1 }, 200*time.Millisecond, 20*time.Millisecond) +} + +func TestHandleBlockMessage(t *testing.T) { + nodes := []simplex.NodeID{{1}, {2}, {3}, {4}} + + tests := []struct { + name string + lastVerifiedBlock *testutil.TestBlock + // finalizationBlock returns the block to finalize; nil means no finalization is sent. + // lastVerified may be nil when lastVerifiedBlock is not set for the test case. + finalizationBlock func(lastVerified, blockToSend *testutil.TestBlock) *testutil.TestBlock + blockSender simplex.NodeID + blockSeq uint64 + expectVerified bool + expectedNumBlocks uint64 + }{ + { + name: "Next to Verify But No Finalization", + blockSender: nodes[0], + }, + { + name: "BlockMessage not sent from leader", + finalizationBlock: func(_, blockToSend *testutil.TestBlock) *testutil.TestBlock { return blockToSend }, + blockSender: nodes[1], + }, + { + name: "Already Verified", + lastVerifiedBlock: testutil.NewTestBlock(simplex.ProtocolMetadata{ + Round: 0, + Seq: 0, + Epoch: 0, + }, simplex.Blacklist{}), + finalizationBlock: func(lastVerified, _ *testutil.TestBlock) *testutil.TestBlock { return lastVerified }, + blockSender: nodes[1], + }, + { + name: "Finalization Received", + finalizationBlock: func(_, blockToSend *testutil.TestBlock) *testutil.TestBlock { return blockToSend }, + blockSender: nodes[0], + expectVerified: true, + expectedNumBlocks: 1, + }, + { + // seq 1 arrives with a finalization but seq 0 has not been verified yet, + // so the block is indexed but verification is deferred. + name: "Finalization Received But Not Next To Verify", + finalizationBlock: func(_, blockToSend *testutil.TestBlock) *testutil.TestBlock { return blockToSend }, + blockSender: nodes[0], + blockSeq: 1, + expectVerified: false, + expectedNumBlocks: 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var lastVerified simplex.Block + if tt.lastVerifiedBlock != nil { + lastVerified = tt.lastVerifiedBlock + } + v := newTestNonValidator(t, nodes, lastVerified) + + var verified atomic.Bool + blockToSend := testutil.NewTestBlock(simplex.ProtocolMetadata{ + Round: 0, + Seq: tt.blockSeq, + Epoch: 0, + }, simplex.Blacklist{}) + blockToSend.OnVerify = func() { + verified.Store(true) + } + + if tt.finalizationBlock != nil { + finalizeBlock := tt.finalizationBlock(tt.lastVerifiedBlock, blockToSend) + finalization, _ := testutil.NewFinalizationRecord(t, v.Logger, &testutil.TestSignatureAggregator{}, finalizeBlock, nodes) + v.HandleMessage(&simplex.Message{Finalization: &finalization}, nodes[0]) + } + + err := v.HandleMessage(blockMessage(t, blockToSend, nodes[0]), tt.blockSender) + require.NoError(t, err) + + if tt.expectVerified { + require.Eventually(t, verified.Load, 2*time.Second, 20*time.Millisecond) + } else { + require.Never(t, verified.Load, 2*time.Second, 20*time.Millisecond) + } + require.Equal(t, tt.expectedNumBlocks, v.Storage.NumBlocks()) + }) + } +} diff --git a/nonvalidator/util.go b/nonvalidator/util.go new file mode 100644 index 00000000..ddbd73b8 --- /dev/null +++ b/nonvalidator/util.go @@ -0,0 +1,27 @@ +package nonvalidator + +import ( + "fmt" + + "github.com/ava-labs/simplex" +) + +type finalizedSeq struct { + block simplex.FullBlock + finalization *simplex.Finalization +} + +func (f *finalizedSeq) String() string { + seq := uint64(0) + digest := simplex.Digest{} + if f.block != nil { + seq = f.block.BlockHeader().Seq + digest = f.block.BlockHeader().Digest + } + if f.finalization != nil { + seq = f.finalization.Finalization.Seq + digest = f.finalization.Finalization.Digest + } + + return fmt.Sprintf("FinalizedSeq {BlockDigest: %s, Seq: %d, BlockExists %t, FinalizationExists %t}", digest, seq, f.block != nil, f.finalization != nil) +} diff --git a/nonvalidator/verifier.go b/nonvalidator/verifier.go new file mode 100644 index 00000000..469f1c40 --- /dev/null +++ b/nonvalidator/verifier.go @@ -0,0 +1,96 @@ +package nonvalidator + +import ( + "context" + "errors" + "sync" + + "github.com/ava-labs/simplex" + "go.uber.org/zap" +) + +// Verifier verifies blocks in order, one at a time. +type Verifier struct { + ctx context.Context + + lock sync.Mutex + logger simplex.Logger + latestVerifiedBlock simplex.Block // last block we have verified + storage simplex.FullBlockRetriever // access to blocks + scheduler *simplex.BasicScheduler + oneTimeVerifier *simplex.OneTimeVerifier +} + +func NewVerifier(ctx context.Context, logger simplex.Logger, lastVerifiedBlock simplex.Block, storage simplex.FullBlockRetriever) *Verifier { + return &Verifier{ + ctx: ctx, + logger: logger, + latestVerifiedBlock: lastVerifiedBlock, + storage: storage, + scheduler: simplex.NewScheduler(logger, 1), + oneTimeVerifier: simplex.NewOneTimeVerifier(logger), + } +} + +// latestVerified returns the most recently verified block. +func (v *Verifier) latestVerified() simplex.Block { + v.lock.Lock() + defer v.lock.Unlock() + return v.latestVerifiedBlock +} + +func (v *Verifier) nextSeqToVerify() uint64 { + if v.latestVerifiedBlock == nil { + return 0 + } + return v.latestVerifiedBlock.BlockHeader().Seq + 1 +} + +// alreadyVerifiedSeq checks if `seq` has already been verified. +func (v *Verifier) alreadyVerifiedSeq(seq uint64) bool { + return seq < v.nextSeqToVerify() +} + +// triggerVerify wakes up the verifier by attempting to verify the next seq to be verified. +// We verify `block` if it is the next sequence to be verified, otherwise we try to retrieve the next +// block to be verified from storage. +func (v *Verifier) triggerVerify(block simplex.Block) error { + v.lock.Lock() + defer v.lock.Unlock() + + nextSeq := v.nextSeqToVerify() + for { + if block.BlockHeader().Seq == nextSeq { + v.scheduler.Schedule(v.createBlockVerificationTask(v.oneTimeVerifier.Wrap(block))) + return nil + } + retrieved, _, err := v.storage.Retrieve(nextSeq) + if errors.Is(err, simplex.ErrBlockNotFound) { + return nil + } else if err != nil { + return err + } + block = retrieved + } +} + +func (v *Verifier) createBlockVerificationTask(block simplex.Block) func() simplex.Digest { + return func() simplex.Digest { + _, err := block.Verify(v.ctx) + if err != nil { + v.logger.Error("Error verifying block with a finalization", zap.Uint64("Seq", block.BlockHeader().Seq), zap.Error(err)) + return simplex.Digest{} + } + + v.lock.Lock() + v.latestVerifiedBlock = block + v.lock.Unlock() + + if err := v.triggerVerify(block); err != nil { + v.logger.Warn("Error while calling triggerVerify", zap.Error(err)) + // TODO: stop & halt the verifier + notify parent components + } + + return simplex.Digest{} + } +} diff --git a/nonvalidator/verifier_test.go b/nonvalidator/verifier_test.go new file mode 100644 index 00000000..fef2d228 --- /dev/null +++ b/nonvalidator/verifier_test.go @@ -0,0 +1,155 @@ +package nonvalidator + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/ava-labs/simplex" + "github.com/ava-labs/simplex/testutil" + "github.com/stretchr/testify/require" +) + +func blockWithSeq(seq uint64) *testutil.TestBlock { + return testutil.NewTestBlock(simplex.ProtocolMetadata{Seq: seq}, simplex.Blacklist{}) +} + +func TestAlreadyVerifiedSeq(t *testing.T) { + ctx := context.Background() + logger := testutil.MakeLogger(t, 0) + + tests := []struct { + name string + verifier *Verifier + seq uint64 + expected bool + }{ + { + name: "No Verified Blocks", + verifier: func() *Verifier { + storage := testutil.NewNonValidatorInMemoryStorage() + return NewVerifier(ctx, logger, nil, storage) + }(), + seq: 5, + expected: false, + }, + { + name: "Already verified", + verifier: func() *Verifier { + storage := testutil.NewNonValidatorInMemoryStorage() + return NewVerifier(ctx, logger, blockWithSeq(5), storage) + }(), + seq: 3, + expected: true, + }, + { + name: "Not verified", + verifier: func() *Verifier { + storage := testutil.NewNonValidatorInMemoryStorage() + return NewVerifier(ctx, logger, blockWithSeq(5), storage) + }(), + seq: 6, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := tt.verifier.alreadyVerifiedSeq(tt.seq) + require.Equal(t, tt.expected, result) + }) + } +} + +func TestTriggerVerify(t *testing.T) { + ctx := context.Background() + logger := testutil.MakeLogger(t, 0) + + tests := []struct { + name string + verifier *Verifier + block simplex.Block + expectedErr error + expectedLatestVerifiedSeq uint64 + }{ + { + name: "nothing to verify", + verifier: func() *Verifier { + s := testutil.NewNonValidatorInMemoryStorage() + return NewVerifier(ctx, logger, blockWithSeq(5), s) + }(), + block: blockWithSeq(9), + expectedLatestVerifiedSeq: 5, + }, + { + name: "block is next to verify", + verifier: func() *Verifier { + s := testutil.NewNonValidatorInMemoryStorage() + return NewVerifier(ctx, logger, blockWithSeq(5), s) + }(), + block: blockWithSeq(6), + expectedLatestVerifiedSeq: 6, + }, + { + name: "other block can be verified", + verifier: func() *Verifier { + s := testutil.NewNonValidatorInMemoryStorage() + require.NoError(t, s.Index(context.Background(), blockWithSeq(6), simplex.Finalization{})) + return NewVerifier(ctx, logger, blockWithSeq(5), s) + }(), + block: blockWithSeq(9), + expectedLatestVerifiedSeq: 6, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.verifier.triggerVerify(tt.block) + require.ErrorIs(t, err, tt.expectedErr) + + require.Eventually(t, func() bool { + lv := tt.verifier.latestVerified() + if lv == nil { + return false + } + return lv.BlockHeader().Seq == tt.expectedLatestVerifiedSeq + }, 5*time.Second, 10*time.Millisecond) + }) + } +} + +func TestTriggerVerifyWhileVerifying(t *testing.T) { + ctx := context.Background() + logger := testutil.MakeLogger(t, 0) + + storage := testutil.NewNonValidatorInMemoryStorage() + block7 := blockWithSeq(7) + block6 := blockWithSeq(6) + block6.OnVerify = func() { + storage.Index(context.Background(), block7, simplex.Finalization{}) + } + + v := NewVerifier(ctx, logger, blockWithSeq(5), storage) + require.NoError(t, v.triggerVerify(block6)) + + require.Eventually(t, func() bool { + lv := v.latestVerified() + return lv != nil && lv.BlockHeader().Seq == 7 + }, 5*time.Second, 10*time.Millisecond) +} + +func TestTriggerVerifyDBError(t *testing.T) { + ctx := context.Background() + logger := testutil.MakeLogger(t, 0) + + dbErr := errors.New("db error") + storage := testutil.NewNonValidatorInMemoryStorage() + storage.RetrieveF = func(_ uint64) (simplex.FullBlock, simplex.Finalization, error) { + return nil, simplex.Finalization{}, dbErr + } + + v := NewVerifier(ctx, logger, blockWithSeq(5), storage) + require.ErrorIs(t, v.triggerVerify(blockWithSeq(9)), dbErr) + require.Equal(t, uint64(5), v.latestVerified().BlockHeader().Seq) +} diff --git a/testutil/storage.go b/testutil/storage.go index fdd362b8..a4ce22f6 100644 --- a/testutil/storage.go +++ b/testutil/storage.go @@ -153,3 +153,42 @@ func (mem *InMemStorage) Compare(other *InMemStorage) error { return nil } + +var _ simplex.FullStorage = (*NonValidatorInMemoryStorage)(nil) + +type NonValidatorInMemoryStorage struct { + RetrieveF func(uint64) (simplex.FullBlock, simplex.Finalization, error) + *InMemStorage +} + +func NewNonValidatorInMemoryStorage() *NonValidatorInMemoryStorage { + s := &InMemStorage{ + data: make(map[uint64]struct { + simplex.VerifiedBlock + simplex.Finalization + }), + } + + s.signal = *sync.NewCond(&s.lock) + + return &NonValidatorInMemoryStorage{ + InMemStorage: s, + } +} + +func (mem *NonValidatorInMemoryStorage) Retrieve(seq uint64) (simplex.FullBlock, simplex.Finalization, error) { + if mem.RetrieveF != nil { + return mem.RetrieveF(seq) + } + + block, finalization, err := mem.InMemStorage.Retrieve(seq) + var fullBlock simplex.FullBlock + if block != nil { + fullBlock = block.(simplex.FullBlock) + } + return fullBlock, finalization, err +} + +func (mem *NonValidatorInMemoryStorage) Index(ctx context.Context, block simplex.FullBlock, certificate simplex.Finalization) error { + return mem.InMemStorage.Index(ctx, block, certificate) +} diff --git a/util.go b/util.go index 4152f6b7..45bdf2da 100644 --- a/util.go +++ b/util.go @@ -135,7 +135,7 @@ type OneTimeVerifier struct { logger Logger } -func newOneTimeVerifier(logger Logger) *OneTimeVerifier { +func NewOneTimeVerifier(logger Logger) *OneTimeVerifier { return &OneTimeVerifier{ digests: make(map[Digest]verifiedResult), logger: logger,