From 67e8dcf0acab03282886aad5525fed5443bdbeae Mon Sep 17 00:00:00 2001 From: Kostis Karantias Date: Fri, 8 May 2026 22:31:44 +0300 Subject: [PATCH] Improvements: - OCR3.1: Expose metrics for ReportingPluginInfo & ReportingPluginLimits for better observability - OCR3.1 Blob exchange: Expose far more metrics for better observability and fix an issue in accounting logic to make quota calculations more precise - Improve documentation on quorumhelper package - Fix a benign race on singlewriter package - Improve ergonomics on ocrintegrationtesthelpers package Based on 03dfcfe3e36f89a0d50678860b4b35ddf45a1c7e Co-authored-by: kaleofduty <59616916+kaleofduty@users.noreply.github.com> Co-authored-by: stchrysa Co-authored-by: Philipp Schindler <4274886+PhilippSchindler@users.noreply.github.com> Co-authored-by: Manuel Vidigueira --- .../bigendianbytearray/bigendianbytearray.go | 59 +++ internal/jmt/digest.go | 29 +- internal/metricshelper/lazy_gauge.go | 73 +++ internal/singlewriter/conflict_tracker.go | 3 +- internal/util/generic.go | 22 +- .../internal/ocr3_1/blobtypes/types.go | 13 + .../internal/ocr3_1/protocol/blob_exchange.go | 302 ++++++++--- .../protocol/blob_populate_stale_index.go | 162 ++++++ .../internal/ocr3_1/protocol/blob_reap.go | 117 ++-- .../highest_committed_seq_nr_tracker.go | 69 +++ .../internal/ocr3_1/protocol/kvdb.go | 29 + .../internal/ocr3_1/protocol/metrics.go | 500 +++++++++++++++++- .../internal/ocr3_1/protocol/oracle.go | 40 +- .../internal/shim/ocr3_1_key_value_store.go | 93 +++- offchainreporting2plus/ocr3_1types/blob.go | 13 + .../in_memory_key_value_database.go | 8 +- .../mock_contract_config_tracker.go | 12 +- quorumhelper/quorumhelper.go | 7 + 18 files changed, 1373 insertions(+), 178 deletions(-) create mode 100644 internal/bigendianbytearray/bigendianbytearray.go create mode 100644 internal/metricshelper/lazy_gauge.go create mode 100644 offchainreporting2plus/internal/ocr3_1/protocol/blob_populate_stale_index.go create mode 100644 offchainreporting2plus/internal/ocr3_1/protocol/highest_committed_seq_nr_tracker.go diff --git a/internal/bigendianbytearray/bigendianbytearray.go b/internal/bigendianbytearray/bigendianbytearray.go new file mode 100644 index 00000000..2fbde107 --- /dev/null +++ b/internal/bigendianbytearray/bigendianbytearray.go @@ -0,0 +1,59 @@ +// Package bigendianbytearray provides arithmetic operations on fixed-size byte +// arrays interpreted as unsigned big-endian integers. +package bigendianbytearray + +func Min32[T ~[32]byte]() T { + return T{} +} + +func Max32[T ~[32]byte]() T { + var t T + for i := range t { + t[i] = 0xff + } + return t +} + +func Decrement32[T ~[32]byte](t T) (T, bool) { + tdec := t + for i := len(tdec) - 1; i >= 0; i-- { + if tdec[i] == 0 { + tdec[i] = 0xff + } else { + tdec[i]-- + return tdec, true + } + } + return T{}, false +} + +func WrappingDecrement32[T ~[32]byte](t T) T { + decr, ok := Decrement32(t) + if ok { + return decr + } else { + return Max32[T]() + } +} + +func Increment32[T ~[32]byte](t T) (T, bool) { + tincr := t + for i := len(tincr) - 1; i >= 0; i-- { + if tincr[i] == 0xff { + tincr[i] = 0 + } else { + tincr[i]++ + return tincr, true + } + } + return T{}, false +} + +func WrappingIncrement32[T ~[32]byte](t T) T { + incr, ok := Increment32(t) + if ok { + return incr + } else { + return Min32[T]() + } +} diff --git a/internal/jmt/digest.go b/internal/jmt/digest.go index 60aaf50f..990142f7 100644 --- a/internal/jmt/digest.go +++ b/internal/jmt/digest.go @@ -1,39 +1,22 @@ package jmt import ( - "bytes" "crypto/sha256" + + "github.com/smartcontractkit/libocr/internal/bigendianbytearray" ) type Digest = [sha256.Size]byte var ( - MinDigest = Digest{} - MaxDigest = Digest(bytes.Repeat([]byte{0xff}, len(Digest{}))) + MinDigest = bigendianbytearray.Min32[Digest]() + MaxDigest = bigendianbytearray.Max32[Digest]() ) func DecrementDigest(digest Digest) (Digest, bool) { - decDigest := digest - for i := len(decDigest) - 1; i >= 0; i-- { - if decDigest[i] == 0 { - decDigest[i] = 0xff - } else { - decDigest[i]-- - return decDigest, true - } - } - return Digest{}, false + return bigendianbytearray.Decrement32(digest) } func IncrementDigest(digest Digest) (Digest, bool) { - incDigest := digest - for i := len(incDigest) - 1; i >= 0; i-- { - if incDigest[i] == 0xff { - incDigest[i] = 0 - } else { - incDigest[i]++ - return incDigest, true - } - } - return Digest{}, false + return bigendianbytearray.Increment32(digest) } diff --git a/internal/metricshelper/lazy_gauge.go b/internal/metricshelper/lazy_gauge.go new file mode 100644 index 00000000..dc984535 --- /dev/null +++ b/internal/metricshelper/lazy_gauge.go @@ -0,0 +1,73 @@ +package metricshelper + +import ( + "sync" + + "github.com/prometheus/client_golang/prometheus" + "github.com/smartcontractkit/libocr/commontypes" +) + +// LazyGauge registers its underlying Gauge the first time Set is called. This +// avoids exposing a default zero value before we have an appropriate value to +// expose. +type LazyGauge struct { + // in + registerer prometheus.Registerer + logger commontypes.Logger + name string + + // local + gauge prometheus.Gauge + + mu sync.Mutex + shouldRegister bool +} + +func NewLazyGauge( + registerer prometheus.Registerer, + logger commontypes.Logger, + opts prometheus.GaugeOpts, +) *LazyGauge { + return &LazyGauge{ + registerer, + logger, + opts.Name, + prometheus.NewGauge(opts), + + sync.Mutex{}, + true, + } +} + +func (g *LazyGauge) Set(value float64) { + g.gauge.Set(value) + + var shouldRegister bool + g.mu.Lock() + if g.shouldRegister { + shouldRegister = true + g.shouldRegister = false + } + g.mu.Unlock() + + if shouldRegister { + RegisterOrLogError(g.logger, g.registerer, g.gauge, g.name) + g.gauge.Set(value) // To ensure the value is detected on collection, even if collection only sees values set after registration + } +} + +// Unregister must be used instead of [prometheus.Registerer.Unregister]. +// Otherwise, it would be possible for the LazyGauge to have never been Set and +// thus registered, and then after invocation of +// [prometheus.Registerer.Unregister], the next Set would cause the gauge to be +// registered, which is almost definitely unexpected behavior. In fact, +// LazyGauge intentionally does not implement prometheus.Collector to avoid +// confusion. The meaning of the return value matches that of +// [prometheus.Registerer.Unregister]. +func (g *LazyGauge) Unregister() bool { + g.mu.Lock() + g.shouldRegister = false + g.mu.Unlock() + + return g.registerer.Unregister(g.gauge) +} diff --git a/internal/singlewriter/conflict_tracker.go b/internal/singlewriter/conflict_tracker.go index 9a1dba5f..e4b9611a 100644 --- a/internal/singlewriter/conflict_tracker.go +++ b/internal/singlewriter/conflict_tracker.go @@ -29,8 +29,9 @@ func (ct *ConflictTracker) beginTransaction() (uint64, uint64) { func (ct *ConflictTracker) lockAndPrepareToCommit(maxCommittedTxTimestampAtCreation uint64) error { ct.mu.Lock() if maxCommittedTxTimestampAtCreation != ct.maxCommittedTxTimestamp { + maxCommittedTxTimestamp := ct.maxCommittedTxTimestamp ct.mu.Unlock() - return fmt.Errorf("concurrent conflict detected: expected maxCommittedTxTimestamp: %d, got: %d", maxCommittedTxTimestampAtCreation, ct.maxCommittedTxTimestamp) + return fmt.Errorf("concurrent conflict detected: expected maxCommittedTxTimestamp: %d, got: %d", maxCommittedTxTimestampAtCreation, maxCommittedTxTimestamp) } return nil } diff --git a/internal/util/generic.go b/internal/util/generic.go index c4711523..4dc3601a 100644 --- a/internal/util/generic.go +++ b/internal/util/generic.go @@ -1,6 +1,10 @@ package util -import "golang.org/x/exp/constraints" +import ( + "maps" + + "golang.org/x/exp/constraints" +) func PointerTo[T any](v T) *T { return &v @@ -14,6 +18,14 @@ func PointerIntegerCast[U constraints.Integer, T constraints.Integer](p *T) *U { return &v } +func SaturatingSub[T constraints.Unsigned](a T, b T) T { + if b > a { + var zero T + return zero + } + return a - b +} + func NilCoalesce[T any](maybe *T, default_ T) T { if maybe != nil { return *maybe @@ -29,3 +41,11 @@ func NilCoalesceSlice[T any](maybe []T) []T { return []T{} } } + +// b has priority in case of key conflict with a +func MapsUnion[K comparable, V any](a map[K]V, b map[K]V) map[K]V { + c := make(map[K]V) + maps.Copy(c, a) + maps.Copy(c, b) + return c +} diff --git a/offchainreporting2plus/internal/ocr3_1/blobtypes/types.go b/offchainreporting2plus/internal/ocr3_1/blobtypes/types.go index 0c808a0f..79a3d710 100644 --- a/offchainreporting2plus/internal/ocr3_1/blobtypes/types.go +++ b/offchainreporting2plus/internal/ocr3_1/blobtypes/types.go @@ -9,6 +9,7 @@ import ( "hash" "github.com/smartcontractkit/libocr/commontypes" + "github.com/smartcontractkit/libocr/internal/bigendianbytearray" "github.com/smartcontractkit/libocr/internal/mt" "github.com/smartcontractkit/libocr/offchainreporting2plus/internal/config" "github.com/smartcontractkit/libocr/offchainreporting2plus/types" @@ -71,6 +72,18 @@ func (bd BlobDigest) String() string { return fmt.Sprintf("%x", bd[:]) } +func MinBlobDigest() BlobDigest { + return bigendianbytearray.Min32[BlobDigest]() +} + +func MaxBlobDigest() BlobDigest { + return bigendianbytearray.Max32[BlobDigest]() +} + +func WrappingIncrementBlobDigest(bd BlobDigest) BlobDigest { + return bigendianbytearray.WrappingIncrement32(bd) +} + func MakeBlobDigest( configDigest types.ConfigDigest, chunkDigestsRoot mt.Digest, diff --git a/offchainreporting2plus/internal/ocr3_1/protocol/blob_exchange.go b/offchainreporting2plus/internal/ocr3_1/protocol/blob_exchange.go index b5827ac6..b02062ff 100644 --- a/offchainreporting2plus/internal/ocr3_1/protocol/blob_exchange.go +++ b/offchainreporting2plus/internal/ocr3_1/protocol/blob_exchange.go @@ -25,7 +25,6 @@ func RunBlobExchange[RI any]( ctx context.Context, chNetToBlobExchange <-chan MessageToBlobExchangeWithSender[RI], - chOutcomeGenerationToBlobExchange <-chan EventToBlobExchange[RI], chBlobBroadcastRequest <-chan blobBroadcastRequest, chBlobFetchRequest <-chan blobFetchRequest, @@ -40,17 +39,19 @@ func RunBlobExchange[RI any]( netSender NetworkSender[RI], offchainKeyring types.OffchainKeyring, telemetrySender TelemetrySender, + + initialHighestCommittedSeqNr uint64, ) { broadcastGraceTimeoutScheduler := scheduler.NewScheduler[EventBlobBroadcastGraceTimeout[RI]]() defer broadcastGraceTimeoutScheduler.Close() bex := makeBlobExchangeState[RI]( ctx, chNetToBlobExchange, - chOutcomeGenerationToBlobExchange, chBlobBroadcastRequest, chBlobFetchRequest, config, kv, id, limits, localConfig, logger, metricsRegisterer, netSender, offchainKeyring, telemetrySender, + initialHighestCommittedSeqNr, broadcastGraceTimeoutScheduler, ) bex.run() @@ -60,7 +61,6 @@ func makeBlobExchangeState[RI any]( ctx context.Context, chNetToBlobExchange <-chan MessageToBlobExchangeWithSender[RI], - chOutcomeGenerationToBlobExchange <-chan EventToBlobExchange[RI], chBlobBroadcastRequest <-chan blobBroadcastRequest, chBlobFetchRequest <-chan blobFetchRequest, @@ -75,12 +75,27 @@ func makeBlobExchangeState[RI any]( netSender NetworkSender[RI], offchainKeyring types.OffchainKeyring, telemetrySender TelemetrySender, + initialHighestCommittedSeqNr uint64, broadcastGraceTimeoutScheduler *scheduler.Scheduler[EventBlobBroadcastGraceTimeout[RI]], ) *blobExchangeState[RI] { - offerLogTapers := make([]loghelper.LogarithmicTaper, config.N()) - - tStopExpiredBlobFetchOrBroadcast := time.After(DeltaStopExpiredBlobFetchOrBroadcast) + chHighestCommittedSeqNrTrackerToBlobExchange := make(chan uint64) + chBlobExchangeToBlobReap := make(chan uint64) + + var submitters []*blobSubmitter + var perOracleMetrics []*blobOracleMetrics + for i := range config.N() { + submitters = append(submitters, &blobSubmitter{ + loghelper.LogarithmicTaper{}, + loghelper.LogarithmicTaper{}, + }) + perOracleMetrics = append(perOracleMetrics, newBlobOracleMetrics( + metricsRegisterer, + logger, + commontypes.OracleID(i), + limits, + )) + } bex := &blobExchangeState[RI]{ ctx, @@ -88,7 +103,6 @@ func makeBlobExchangeState[RI any]( make(chan EventToBlobExchange[RI]), chNetToBlobExchange, - chOutcomeGenerationToBlobExchange, chBlobBroadcastRequest, chBlobFetchRequest, @@ -99,7 +113,6 @@ func makeBlobExchangeState[RI any]( limits, localConfig, logger.MakeUpdated(commontypes.LogFields{"proto": "bex"}), - offerLogTapers, newBlobExchangeMetrics(metricsRegisterer, logger), netSender, offchainKeyring, @@ -109,8 +122,13 @@ func makeBlobExchangeState[RI any]( nil, // must be filled right below nil, // must be filled right below - tStopExpiredBlobFetchOrBroadcast, + chHighestCommittedSeqNrTrackerToBlobExchange, + chBlobExchangeToBlobReap, + initialHighestCommittedSeqNr, + true, + submitters, + perOracleMetrics, make(map[BlobDigest]*blob), } @@ -290,11 +308,9 @@ const ( // we will reject the offer. maxOwedOfferResponsesPerOracle = 10 - // DeltaStopExpiredBlobFetchOrBroadcast denotes the interval with which we - // check for in-progress blob broadcasts and fetches for blobs that might - // have expired, and mark them as expired and/or send reject - // MessageBlobOfferResponse to the submitter if appropriate. - DeltaStopExpiredBlobFetchOrBroadcast = 5 * time.Second + // trackHighestCommittedSeqNrInterval denotes the rough interval with which blob + // exchange refreshes its view of the highest committed seq nr. + trackHighestCommittedSeqNrInterval = 1 * time.Second ) type blobBroadcastRequest struct { @@ -340,9 +356,8 @@ type blobExchangeState[RI any] struct { ctx context.Context subs subprocesses.Subprocesses - chLocalEvent chan EventToBlobExchange[RI] - chNetToBlobExchange <-chan MessageToBlobExchangeWithSender[RI] - chOutcomeGenerationToBlobExchange <-chan EventToBlobExchange[RI] + chLocalEvent chan EventToBlobExchange[RI] + chNetToBlobExchange <-chan MessageToBlobExchangeWithSender[RI] chBlobBroadcastRequest <-chan blobBroadcastRequest chBlobFetchRequest <-chan blobFetchRequest @@ -353,7 +368,6 @@ type blobExchangeState[RI any] struct { limits ocr3_1types.ReportingPluginLimits localConfig types.LocalConfig logger loghelper.LoggerWithContext - offerLogTapers []loghelper.LogarithmicTaper metrics *blobExchangeMetrics netSender NetworkSender[RI] offchainKeyring types.OffchainKeyring @@ -366,9 +380,19 @@ type blobExchangeState[RI any] struct { // blob fetch chunkRequesterGadget *requestergadget.RequesterGadget[blobChunkId] - tStopExpiredBlobBroadcastOrFetch <-chan time.Time + chHighestCommittedSeqNrTrackerToBlobExchange chan uint64 + chBlobExchangeToBlobReap chan uint64 + highestCommittedSeqNr uint64 + notifyBlobReapOfHighestCommittedSeqNr bool - blobs map[BlobDigest]*blob + submitters []*blobSubmitter + perOracleMetrics []*blobOracleMetrics + blobs map[BlobDigest]*blob +} + +type blobSubmitter struct { + rejectedOfferLogTaper loghelper.LogarithmicTaper + statsOverflowLogTaper loghelper.LogarithmicTaper } type blobOfferItem struct { @@ -519,8 +543,28 @@ func (b *blob) prunable() bool { func (bex *blobExchangeState[RI]) run() { bex.logger.Info("BlobExchange: running", nil) + if err := bex.initializeStatsMetricsFromKV(); err != nil { + bex.logger.Warn("BlobExchange: failed to initialize stats metrics from kv, stats metrics will only be set as updated during operation", commontypes.LogFields{ + "error": err, + }) + } + + bex.subs.Go(func() { + RunHighestCommittedSeqNrTracker( + bex.ctx, + bex.logger, + bex.kv, + trackHighestCommittedSeqNrInterval, + bex.chHighestCommittedSeqNrTrackerToBlobExchange, + ) + }) + + bex.subs.Go(func() { + RunBlobReap(bex.ctx, bex.logger, bex.kv, bex.chBlobExchangeToBlobReap, bex.perOracleMetrics) + }) + bex.subs.Go(func() { - RunBlobReap(bex.ctx, bex.logger, bex.kv) + RunBlobPopulateStaleIndex(bex.ctx, bex.logger, bex.kv) }) // Take a reference to the ctx.Done channel once, here, to avoid taking the @@ -529,14 +573,20 @@ func (bex *blobExchangeState[RI]) run() { // Event Loop for { + var nilOrChBlobExchangeToBlobReap chan<- uint64 + if bex.notifyBlobReapOfHighestCommittedSeqNr { + nilOrChBlobExchangeToBlobReap = bex.chBlobExchangeToBlobReap + } + select { + case nilOrChBlobExchangeToBlobReap <- bex.highestCommittedSeqNr: + bex.notifyBlobReapOfHighestCommittedSeqNr = false + case ev := <-bex.chLocalEvent: ev.processBlobExchange(bex) case msg := <-bex.chNetToBlobExchange: msg.msg.processBlobExchange(bex, msg.sender) - case ev := <-bex.chOutcomeGenerationToBlobExchange: - ev.processBlobExchange(bex) case req := <-bex.chBlobBroadcastRequest: bex.processBlobBroadcastRequest(req) @@ -550,8 +600,8 @@ func (bex *blobExchangeState[RI]) run() { case <-bex.chunkRequesterGadget.Ticker(): bex.chunkRequesterGadget.Tick() - case <-bex.tStopExpiredBlobBroadcastOrFetch: - bex.eventTStopExpiredBlobBroadcastOrFetch() + case highestCommittedSeqNr := <-bex.chHighestCommittedSeqNrTrackerToBlobExchange: + bex.eventHighestCommittedSeqNr(highestCommittedSeqNr) case <-chDone: } @@ -562,6 +612,9 @@ func (bex *blobExchangeState[RI]) run() { bex.logger.Info("BlobExchange: winding down", nil) bex.subs.Wait() bex.metrics.Close() + for _, m := range bex.perOracleMetrics { + m.Close() + } bex.logger.Info("BlobExchange: exiting", nil) return default: @@ -569,34 +622,22 @@ func (bex *blobExchangeState[RI]) run() { } } -func (bex *blobExchangeState[RI]) eventTStopExpiredBlobBroadcastOrFetch() { - defer func() { - bex.tStopExpiredBlobBroadcastOrFetch = time.After(DeltaStopExpiredBlobFetchOrBroadcast) - }() - - tx, err := bex.kv.NewReadTransactionUnchecked() - if err != nil { - bex.logger.Error("failed to create read transaction for eventTStopExpiredBlobBroadcastOrFetch", commontypes.LogFields{ - "error": err, - }) +func (bex *blobExchangeState[RI]) eventHighestCommittedSeqNr(highestCommittedSeqNr uint64) { + if highestCommittedSeqNr <= bex.highestCommittedSeqNr { return } - defer tx.Discard() + bex.highestCommittedSeqNr = highestCommittedSeqNr - highestCommittedSeqNr, err := tx.ReadHighestCommittedSeqNr() - if err != nil { - bex.logger.Error("failed to read highest committed seq nr for eventTStopExpiredBlobBroadcastOrFetch", commontypes.LogFields{ - "error": err, - }) - return - } + bex.logger.Debug("blob exchange learned of even higher highest committed seq nr", commontypes.LogFields{ + "highestCommittedSeqNr": bex.highestCommittedSeqNr, + }) for blobDigest, blob := range bex.blobs { - if !hasBlobExpired(blob.expirySeqNr, highestCommittedSeqNr) { + if !bex.hasBlobExpired(blob.expirySeqNr) { continue } - broadcastPending := blob.broadcast != nil && blob.broadcast.phase == blobBroadcastPhaseOffering + broadcastPending := blob.broadcast != nil && blob.broadcast.shouldOffer() fetchPending := blob.fetch != nil && !blob.fetch.expired && !blob.haveAllChunks() if !(broadcastPending || fetchPending) { @@ -616,12 +657,14 @@ func (bex *blobExchangeState[RI]) eventTStopExpiredBlobBroadcastOrFetch() { broadcast := blob.broadcast broadcast.phase = blobBroadcastPhaseExpired close(broadcast.chNotify) + bex.incMyBlobsUndeterminedTotalForNonResponders(broadcast) } if fetchPending { fetch := blob.fetch if fetch.exchange != nil { bex.sendBlobOfferResponseRejecting(blobDigest, blob.submitter, fetch.exchange.latestOfferRequestHandle) + bex.perOracleMetrics[blob.submitter].theirBlobsRejectedDueToExpirationTotal.Inc() fetch.exchange.weServiced() } @@ -634,6 +677,12 @@ func (bex *blobExchangeState[RI]) eventTStopExpiredBlobBroadcastOrFetch() { delete(bex.blobs, blobDigest) } } + + bex.notifyBlobReapOfHighestCommittedSeqNr = true +} + +func (bex *blobExchangeState[RI]) hasBlobExpired(expirySeqNr uint64) bool { + return expirySeqNr <= bex.highestCommittedSeqNr } func (bex *blobExchangeState[RI]) allowBlobOfferBasedOnOwedOfferResponsesBudget(sender commontypes.OracleID) error { @@ -672,9 +721,29 @@ func (bex *blobExchangeState[RI]) allowBlobOfferBasedOnQuotaStats(offer MessageB return fmt.Errorf("failed to read blob quota stats: %w", err) } + statsOverflowLogTaper := &bex.submitters[submitter].statsOverflowLogTaper + totalQuotaStats, ok := appendedQuotaStats.Sub(reapedQuotaStats) if !ok { - return fmt.Errorf("overflow when subtracting reaped quota stats from appended quota stats") + statsOverflowLogTaper.Trigger(func(consecutiveOverflows uint64) { + bex.logger.Debug("overflow when subtracting reaped quota stats from appended quota stats, "+ + "reaper is likely ahead of blob exchange, treating as zero", commontypes.LogFields{ + "consecutiveOverflows": consecutiveOverflows, + "appendedQuotaStats": appendedQuotaStats, + "reapedQuotaStats": reapedQuotaStats, + "submitter": submitter, + }) + }) + totalQuotaStats = BlobQuotaStats{0, 0} + } else { + statsOverflowLogTaper.Reset(func(previouslyConsecutiveOverflows uint64) { + bex.logger.Debug("stopped seeing overflow when subtracting reaped quota stats from appended quota stats", commontypes.LogFields{ + "previouslyConsecutiveOverflows": previouslyConsecutiveOverflows, + "appendedQuotaStats": appendedQuotaStats, + "reapedQuotaStats": reapedQuotaStats, + "submitter": submitter, + }) + }) } totalQuotaStatsIncludingOffer, ok := totalQuotaStats.Add(BlobQuotaStats{ @@ -690,12 +759,9 @@ func (bex *blobExchangeState[RI]) allowBlobOfferBasedOnQuotaStats(offer MessageB uint64(bex.limits.MaxPerOracleUnexpiredBlobCumulativePayloadBytes), } - if totalQuotaStatsIncludingOffer.Count > maxQuotaStats.Count { - return fmt.Errorf("accepting the offer would exceed our allowed per-oracle unexpired blob count, have %d, max %d", totalQuotaStats.Count, maxQuotaStats.Count) - } - - if totalQuotaStatsIncludingOffer.CumulativePayloadLength > maxQuotaStats.CumulativePayloadLength { - return fmt.Errorf("accepting the offer would exceed our allowed per-oracle unexpired blob payload length, have %d, offer is for %d, max is %d", totalQuotaStats.CumulativePayloadLength, offer.PayloadLength, maxQuotaStats.CumulativePayloadLength) + if totalQuotaStatsIncludingOffer.Exceeds(maxQuotaStats) { + return fmt.Errorf("accepting the offer would exceed our allowed per-oracle unexpired quota, "+ + "have %+v, would have %+v, max %+v", totalQuotaStats, totalQuotaStatsIncludingOffer, maxQuotaStats) } return nil @@ -703,10 +769,12 @@ func (bex *blobExchangeState[RI]) allowBlobOfferBasedOnQuotaStats(offer MessageB func (bex *blobExchangeState[RI]) allowBlobOffer(offer MessageBlobOffer[RI], submitter commontypes.OracleID) error { if err := bex.allowBlobOfferBasedOnOwedOfferResponsesBudget(submitter); err != nil { + bex.perOracleMetrics[submitter].theirBlobsRejectedDueToManyInflightTotal.Inc() return err } if err := bex.allowBlobOfferBasedOnQuotaStats(offer, submitter); err != nil { + bex.perOracleMetrics[submitter].theirBlobsRejectedDueToQuotaTotal.Inc() return err } @@ -716,6 +784,8 @@ func (bex *blobExchangeState[RI]) allowBlobOffer(offer MessageBlobOffer[RI], sub func (bex *blobExchangeState[RI]) messageBlobOffer(msg MessageBlobOffer[RI], sender commontypes.OracleID) { submitter := sender + bex.perOracleMetrics[submitter].theirBlobOffersTotal.Inc() + blobDigest := blobtypes.MakeBlobDigest( bex.config.ConfigDigest, msg.ChunkDigestsRoot, @@ -724,6 +794,24 @@ func (bex *blobExchangeState[RI]) messageBlobOffer(msg MessageBlobOffer[RI], sen submitter, ) + rejectedOfferLogTaper := &bex.submitters[sender].rejectedOfferLogTaper + + // Reject if payload length exceeds maximum allowed length + if msg.PayloadLength > uint64(bex.limits.MaxBlobPayloadBytes) { + rejectedOfferLogTaper.Trigger(func(consecutiveRejectedOffers uint64) { + bex.logger.Warn("received MessageBlobOffer with payload length that exceeds maximum allowed length, rejecting", commontypes.LogFields{ + "blobDigest": blobDigest, + "submitter": submitter, + "payloadLength": msg.PayloadLength, + "maxPayloadLength": bex.limits.MaxBlobPayloadBytes, + "consecutiveRejectedOffers": consecutiveRejectedOffers, + }) + }) + bex.sendBlobOfferResponseRejecting(blobDigest, submitter, msg.RequestHandle) + bex.perOracleMetrics[submitter].theirBlobsRejectedDueToOversizePayloadBytesTotal.Inc() + return + } + chunkDigests, chunkHaves, err := bex.loadChunkDigestsAndHaves(blobDigest, msg.PayloadLength) if err != nil { bex.logger.Warn("dropping MessageBlobOffer, failed to check if we already know of it", commontypes.LogFields{ @@ -755,48 +843,25 @@ func (bex *blobExchangeState[RI]) messageBlobOffer(msg MessageBlobOffer[RI], sen return } - offerLogTaper := &bex.offerLogTapers[sender] - - // Reject if payload length exceeds maximum allowed length - if msg.PayloadLength > uint64(bex.limits.MaxBlobPayloadBytes) { - offerLogTaper.Trigger(func(consecutiveRejectedOffers uint64) { - bex.logger.Warn("received MessageBlobOffer with payload length that exceeds maximum allowed length, rejecting", commontypes.LogFields{ - "blobDigest": blobDigest, - "submitter": submitter, - "payloadLength": msg.PayloadLength, - "maxPayloadLength": bex.limits.MaxBlobPayloadBytes, - "consecutiveRejectedOffers": consecutiveRejectedOffers, - }) - }) - bex.sendBlobOfferResponseRejecting(blobDigest, submitter, msg.RequestHandle) - return - } - // Reject if blob has already expired - committedSeqNr, err := bex.kv.HighestCommittedSeqNr() - if err != nil { - bex.logger.Error("failed to read highest committed seq nr for MessageBlobOffer", commontypes.LogFields{ - "error": err, - }) - return - } - if hasBlobExpired(msg.ExpirySeqNr, committedSeqNr) { - offerLogTaper.Trigger(func(consecutiveRejectedOffers uint64) { + if bex.hasBlobExpired(msg.ExpirySeqNr) { + rejectedOfferLogTaper.Trigger(func(consecutiveRejectedOffers uint64) { bex.logger.Warn("received MessageBlobOffer for already expired blob, rejecting", commontypes.LogFields{ "blobDigest": blobDigest, "submitter": submitter, "expirySeqNr": msg.ExpirySeqNr, - "committedSeqNr": committedSeqNr, + "highestCommittedSeqNr": bex.highestCommittedSeqNr, "consecutiveRejectedOffers": consecutiveRejectedOffers, }) }) bex.sendBlobOfferResponseRejecting(blobDigest, submitter, msg.RequestHandle) + bex.perOracleMetrics[submitter].theirBlobsRejectedDueToExpirationTotal.Inc() return } if err := bex.allowBlobOffer(msg, submitter); err != nil { - offerLogTaper.Trigger(func(consecutiveRejectedOffers uint64) { + rejectedOfferLogTaper.Trigger(func(consecutiveRejectedOffers uint64) { bex.logger.Info("received MessageBlobOffer that goes over rate limits, rejecting", commontypes.LogFields{ "blobDigest": blobDigest, "submitter": submitter, @@ -805,10 +870,13 @@ func (bex *blobExchangeState[RI]) messageBlobOffer(msg MessageBlobOffer[RI], sen }) }) bex.sendBlobOfferResponseRejecting(blobDigest, sender, msg.RequestHandle) + // bex.perOracleMetrics[submitter].theirBlobsRejectedDueToXTotal + // incremented inside allowBlobOffer, which knows the exact reason. No + // need to do anything here. return } - offerLogTaper.Reset(func(previouslyConsecutiveRejectedOffers uint64) { + rejectedOfferLogTaper.Reset(func(previouslyConsecutiveRejectedOffers uint64) { bex.logger.Info("stopped receiving offers that we keep rejecting from submitter", commontypes.LogFields{ "submitter": submitter, "previouslyConsecutiveRejectedOffers": previouslyConsecutiveRejectedOffers, @@ -934,6 +1002,7 @@ func (bex *blobExchangeState[RI]) messageBlobOfferResponse(msg MessageBlobOfferR true, msg.Signature, } + bex.perOracleMetrics[sender].myBlobsAcceptedTotal.Inc() } else { // save rejection for oracle broadcast.oracles[sender] = blobBroadcastOracleMeta{ @@ -941,6 +1010,7 @@ func (bex *blobExchangeState[RI]) messageBlobOfferResponse(msg MessageBlobOfferR false, nil, } + bex.perOracleMetrics[sender].myBlobsRejectedTotal.Inc() } rejectThreshold := bex.minRejectors() @@ -993,11 +1063,21 @@ func (bex *blobExchangeState[RI]) messageBlobOfferResponse(msg MessageBlobOfferR }) broadcast.phase = blobBroadcastPhaseRejected close(broadcast.chNotify) + bex.incMyBlobsUndeterminedTotalForNonResponders(broadcast) return } } +// Must be called once when entering terminal phases of broadcast: accepted, rejected, expired +func (bex *blobExchangeState[RI]) incMyBlobsUndeterminedTotalForNonResponders(broadcast *blobBroadcastMeta) { + for oracleID, oracle := range broadcast.oracles { + if !oracle.weReceivedOfferResponse { + bex.perOracleMetrics[oracleID].myBlobsUndeterminedTotal.Inc() + } + } +} + func (bex *blobExchangeState[RI]) eventBlobBroadcastGraceTimeout(ev EventBlobBroadcastGraceTimeout[RI]) { blob, ok := bex.blobs[ev.BlobDigest] if !ok { @@ -1057,6 +1137,7 @@ func (bex *blobExchangeState[RI]) eventBlobBroadcastGraceTimeout(ev EventBlobBro broadcast.certOrNil = &lcb broadcast.phase = blobBroadcastPhaseAccepted close(broadcast.chNotify) + bex.incMyBlobsUndeterminedTotalForNonResponders(broadcast) } func (bex *blobExchangeState[RI]) sendBlobOfferResponseAccepting(blobDigest BlobDigest, submitter commontypes.OracleID, requestHandle types.RequestHandle) { @@ -1083,6 +1164,8 @@ func (bex *blobExchangeState[RI]) sendBlobOfferResponseAccepting(blobDigest Blob }, submitter, ) + + bex.perOracleMetrics[submitter].theirBlobAcceptsSentTotal.Inc() } func (bex *blobExchangeState[RI]) sendBlobOfferResponseRejecting(blobDigest BlobDigest, submitter commontypes.OracleID, requestHandle types.RequestHandle) { bex.netSender.SendTo( @@ -1094,6 +1177,8 @@ func (bex *blobExchangeState[RI]) sendBlobOfferResponseRejecting(blobDigest Blob }, submitter, ) + + bex.perOracleMetrics[submitter].theirBlobRejectsSentTotal.Inc() } func (bex *blobExchangeState[RI]) readBlobPayload(blobDigest BlobDigest) ([]byte, error) { @@ -1398,6 +1483,14 @@ func (bex *blobExchangeState[RI]) processBlobBroadcastRequest(req blobBroadcastR }) return } + expirySeqNr := req.expirySeqNr + if bex.hasBlobExpired(expirySeqNr) { + req.respond(bex.ctx, blobBroadcastResponse{ + LightCertifiedBlob{}, + fmt.Errorf("blob expiry seq nr %d is not above highest committed seq nr %d", expirySeqNr, bex.highestCommittedSeqNr), + }) + return + } payload := req.payload payloadLength := uint64(len(payload)) @@ -1415,7 +1508,6 @@ func (bex *blobExchangeState[RI]) processBlobBroadcastRequest(req blobBroadcastR chunkHaves = append(chunkHaves, true) } - expirySeqNr := req.expirySeqNr submitter := bex.id chunkDigestsRoot := blobtypes.MakeBlobChunkDigestsRoot(chunkDigests) @@ -1431,6 +1523,7 @@ func (bex *blobExchangeState[RI]) processBlobBroadcastRequest(req blobBroadcastR bex.logger.Debug("processing BlobBroadcastRequest", commontypes.LogFields{"blobDigest": blobDigest}) var chNotifyCertAvailable chan struct{} + startedNewBroadcast := false if existingBlob, ok := bex.blobs[blobDigest]; ok { if existingBlob.broadcast == nil { existingBlob.broadcast = &blobBroadcastMeta{ @@ -1440,6 +1533,7 @@ func (bex *blobExchangeState[RI]) processBlobBroadcastRequest(req blobBroadcastR nil, make([]blobBroadcastOracleMeta, bex.config.N()), } + startedNewBroadcast = true } else { existingBlob.broadcast.waiters++ } @@ -1477,6 +1571,11 @@ func (bex *blobExchangeState[RI]) processBlobBroadcastRequest(req blobBroadcastR expirySeqNr, submitter, } + startedNewBroadcast = true + } + + if startedNewBroadcast { + bex.metrics.myBroadcastPayloadBytes.Observe(float64(payloadLength)) } bex.offerRequesterGadget.PleaseRecheckPendingItems() @@ -1609,6 +1708,11 @@ func (bex *blobExchangeState[RI]) processBlobFetchRequest(req blobFetchRequest) return } + if bex.hasBlobExpired(cert.ExpirySeqNr) { + req.respond(bex.ctx, blobFetchResponse{nil, fmt.Errorf("blob expired")}) + return + } + blobDigest := blobtypes.MakeBlobDigest( bex.config.ConfigDigest, cert.ChunkDigestsRoot, @@ -1722,6 +1826,28 @@ func (bex *blobExchangeState[RI]) processBlobFetchRequest(req blobFetchRequest) }) } +func (bex *blobExchangeState[RI]) initializeStatsMetricsFromKV() error { + tx, err := bex.kv.NewReadTransactionUnchecked() + if err != nil { + return fmt.Errorf("failed to create read transaction: %w", err) + } + defer tx.Discard() + + for i, m := range bex.perOracleMetrics { + appended, err := tx.ReadBlobQuotaStats(BlobQuotaStatsTypeAppended, commontypes.OracleID(i)) + if err != nil { + return fmt.Errorf("failed to read appended blob quota stats for submitter %d: %w", i, err) + } + reaped, err := tx.ReadBlobQuotaStats(BlobQuotaStatsTypeReaped, commontypes.OracleID(i)) + if err != nil { + return fmt.Errorf("failed to read reaped blob quota stats for submitter %d: %w", i, err) + } + m.SetTheirAppendedStats(appended) + m.SetTheirReapedStats(reaped) + } + return nil +} + func (bex *blobExchangeState[RI]) createOrUpdateBlobMetaAndQuotaStats( blobDigest BlobDigest, payloadLength uint64, @@ -1776,12 +1902,18 @@ func (bex *blobExchangeState[RI]) createOrUpdateBlobMetaAndQuotaStats( if err != nil { return fmt.Errorf("failed to write blob meta: %w", err) } - + err = tx.WriteStaleBlobIndex(staleBlob(expirySeqNr, blobDigest)) + if err != nil { + return fmt.Errorf("failed to write stale blob index: %w", err) + } err = tx.Commit() if err != nil { return fmt.Errorf("failed to commit transaction: %w", err) } + bex.perOracleMetrics[submitter].SetTheirAppendedStats(updatedQuotaStats) + bex.perOracleMetrics[submitter].theirBlobAppendedPayloadBytes.Observe(float64(payloadLength)) + return nil } @@ -1856,7 +1988,3 @@ func (bex *blobExchangeState[RI]) verifyCert(cert *LightCertifiedBlob) error { func staleBlob(expirySeqNr uint64, blobDigest BlobDigest) StaleBlob { return StaleBlob{expirySeqNr, blobDigest} } - -func hasBlobExpired(expirySeqNr uint64, committedSeqNr uint64) bool { - return expirySeqNr <= committedSeqNr -} diff --git a/offchainreporting2plus/internal/ocr3_1/protocol/blob_populate_stale_index.go b/offchainreporting2plus/internal/ocr3_1/protocol/blob_populate_stale_index.go new file mode 100644 index 00000000..c38beb1a --- /dev/null +++ b/offchainreporting2plus/internal/ocr3_1/protocol/blob_populate_stale_index.go @@ -0,0 +1,162 @@ +package protocol + +import ( + "context" + "fmt" + "time" + + "github.com/smartcontractkit/libocr/commontypes" + "github.com/smartcontractkit/libocr/internal/loghelper" + "github.com/smartcontractkit/libocr/offchainreporting2plus/internal/ocr3_1/blobtypes" +) + +const ( + blobPopulateStaleIndexShortInterval = 10 * time.Second + blobPopulateStaleIndexLongInterval = 60 * time.Second + + maxBlobDigestExpirySeqNrsToReadInSingleTransaction = 100_000 + maxBlobsToPopulateStaleIndexForInSingleTransaction = 1_000 +) + +type populateStaleBlobIndexStats struct { + numScannedBlobs int + numPopulatedIndexEntries int +} + +func populateStaleBlobIndex( + ctx context.Context, + logger commontypes.Logger, + kvDb KeyValueDatabase, + minBlobDigest blobtypes.BlobDigest, +) (nextMinBlobDigest BlobDigest, _ populateStaleBlobIndexStats, done bool, err error) { + chDone := ctx.Done() + + tx, err := kvDb.NewUnserializedReadWriteTransactionUnchecked() + if err != nil { + return BlobDigest{}, populateStaleBlobIndexStats{}, false, fmt.Errorf("failed to create read/write transaction: %w", err) + } + defer tx.Discard() + + items, readMore, err := tx.ReadBlobDigestExpirySeqNrs(minBlobDigest, maxBlobDigestExpirySeqNrsToReadInSingleTransaction) + if err != nil { + return BlobDigest{}, populateStaleBlobIndexStats{}, false, fmt.Errorf("failed to read blob digest expiry seq nrs: %w", err) + } + + var highestProcessed *BlobDigest + stats := populateStaleBlobIndexStats{ + 0, + 0, + } + populateMore := false + + for _, item := range items { + select { + case <-chDone: + return BlobDigest{}, populateStaleBlobIndexStats{}, false, ctx.Err() + default: + } + + staleBlob := staleBlob(item.ExpirySeqNr, item.BlobDigest) + exists, err := tx.ExistsStaleBlobIndex(staleBlob) + if err != nil { + return BlobDigest{}, populateStaleBlobIndexStats{}, false, fmt.Errorf("failed to check existence of stale blob index for %v: %w", staleBlob, err) + } + + stats.numScannedBlobs++ + + if exists { + highestProcessed = &item.BlobDigest + continue + } + + if stats.numPopulatedIndexEntries >= maxBlobsToPopulateStaleIndexForInSingleTransaction { + populateMore = true + break + } + + logger.Trace("populateStaleBlobIndex: blob is missing from stale blob index, populating index entry", commontypes.LogFields{ + "expirySeqNr": item.ExpirySeqNr, + "blobDigest": item.BlobDigest, + }) + + err = tx.WriteStaleBlobIndex(staleBlob) + if err != nil { + return BlobDigest{}, populateStaleBlobIndexStats{}, false, fmt.Errorf("failed to write to stale blob index for %v: %w", staleBlob, err) + } + + highestProcessed = &item.BlobDigest + stats.numPopulatedIndexEntries++ + } + + if err := tx.Commit(); err != nil { + return BlobDigest{}, populateStaleBlobIndexStats{}, false, fmt.Errorf("failed to commit transaction: %w", err) + } + + if readMore || populateMore { + var nextMinBlobDigest BlobDigest + if highestProcessed != nil { + nextMinBlobDigest = blobtypes.WrappingIncrementBlobDigest(*highestProcessed) + } else { + // If we never even tried to process anything, while knowing + // that there are more things in the range to process or populate, + // something's very wrong. Try skipping over the minBlobDigest, + // though it's not clear that this will help. + logger.Error("populateStaleBlobIndex: need to read more blob digest expiry seq nrs or populate more stale blob indices, "+ + "but did not read or populate anything in this iteration, so skipping over the minBlobDigest", commontypes.LogFields{ + "minBlobDigest": minBlobDigest, + }) + nextMinBlobDigest = blobtypes.WrappingIncrementBlobDigest(minBlobDigest) + } + return nextMinBlobDigest, stats, false, nil + } + return blobtypes.MinBlobDigest(), stats, true, nil +} + +func RunBlobPopulateStaleIndex( + ctx context.Context, + logger loghelper.LoggerWithContext, + kvDb KeyValueDatabase, +) { + chDone := ctx.Done() + chTick := time.After(0) + + minBlobDigest := blobtypes.MinBlobDigest() + interval := blobPopulateStaleIndexShortInterval + + for { + select { + case <-chTick: + case <-chDone: + return + } + + nextMinBlobDigest, stats, done, err := populateStaleBlobIndex(ctx, logger, kvDb, minBlobDigest) + if err != nil { + logger.Warn("BlobPopulateStaleIndex: failed to populate stale index. Will retry soon.", commontypes.LogFields{ + "minBlobDigest": minBlobDigest, + "error": err, + "waitBeforeRetry": interval.String(), + }) + } else { + if done { + interval = blobPopulateStaleIndexLongInterval + } + var logFn func(msg string, fields commontypes.LogFields) + if stats.numPopulatedIndexEntries > 0 { + logFn = logger.Info + } else { + logFn = logger.Debug + } + logFn("BlobPopulateStaleIndex: populated stale index", commontypes.LogFields{ + "minBlobDigest": minBlobDigest, + "nextMinBlobDigest": nextMinBlobDigest, + "done": done, + "stats": fmt.Sprintf("%+v", stats), + "waitBeforeNextIteration": interval.String(), + }) + minBlobDigest = nextMinBlobDigest + } + + chTick = time.After(interval) + } +} diff --git a/offchainreporting2plus/internal/ocr3_1/protocol/blob_reap.go b/offchainreporting2plus/internal/ocr3_1/protocol/blob_reap.go index 586fbf5b..ddf63d38 100644 --- a/offchainreporting2plus/internal/ocr3_1/protocol/blob_reap.go +++ b/offchainreporting2plus/internal/ocr3_1/protocol/blob_reap.go @@ -10,34 +10,35 @@ import ( ) const ( - blobReapInterval = 3 * time.Second - maxBlobsToReapInSingleTransaction = 100 + blobReapMinInterval = 3 * trackHighestCommittedSeqNrInterval + maxBlobsToReapInSingleTransaction = 1_000 ) -func reapBlobs(ctx context.Context, kvDb KeyValueDatabase) (done bool, err error) { +type blobReapStats struct { + numReaped int +} + +func reapBlobs(ctx context.Context, logger commontypes.Logger, kvDb KeyValueDatabase, maxStaleSinceSeqNr uint64, perOracleMetrics []*blobOracleMetrics) (done bool, _ blobReapStats, err error) { chDone := ctx.Done() tx, err := kvDb.NewUnserializedReadWriteTransactionUnchecked() if err != nil { - return false, fmt.Errorf("failed to create read/write transaction: %w", err) + return false, blobReapStats{}, fmt.Errorf("failed to create read/write transaction: %w", err) } defer tx.Discard() - committedSeqNr, err := tx.ReadHighestCommittedSeqNr() - if err != nil { - return false, fmt.Errorf("failed to read highest committed seq nr: %w", err) - } - - staleBlobs, err := tx.ReadStaleBlobIndex(committedSeqNr, maxBlobsToReapInSingleTransaction+1) + staleBlobs, err := tx.ReadStaleBlobIndex(maxStaleSinceSeqNr, maxBlobsToReapInSingleTransaction+1) if err != nil { - return false, fmt.Errorf("failed to read stale blob index: %w", err) + return false, blobReapStats{}, fmt.Errorf("failed to read stale blob index: %w", err) } if len(staleBlobs) == 0 { - return true, nil + return true, blobReapStats{}, nil } + stats := blobReapStats{} + updatedReapedStats := make(map[commontypes.OracleID]BlobQuotaStats) for i, staleBlob := range staleBlobs { if i >= maxBlobsToReapInSingleTransaction { break @@ -45,30 +46,44 @@ func reapBlobs(ctx context.Context, kvDb KeyValueDatabase) (done bool, err error select { case <-chDone: - return true, ctx.Err() + return true, blobReapStats{}, ctx.Err() default: } - if err := reapSingleBlob(tx, staleBlob); err != nil { - return false, fmt.Errorf("failed to reap single blob: %w", err) + if err := reapSingleBlob(tx, logger, staleBlob, updatedReapedStats); err != nil { + return false, blobReapStats{}, fmt.Errorf("failed to reap single blob: %w", err) } + stats.numReaped++ } if err := tx.Commit(); err != nil { - return false, fmt.Errorf("failed to commit transaction: %w", err) + return false, blobReapStats{}, fmt.Errorf("failed to commit transaction: %w", err) } - return len(staleBlobs) <= maxBlobsToReapInSingleTransaction, nil + for submitter, stats := range updatedReapedStats { + if int(submitter) < len(perOracleMetrics) { + perOracleMetrics[submitter].SetTheirReapedStats(stats) + } + } + + return len(staleBlobs) <= maxBlobsToReapInSingleTransaction, stats, nil } -func reapSingleBlob(tx KeyValueDatabaseReadWriteTransaction, staleBlob StaleBlob) error { +func reapSingleBlob(tx KeyValueDatabaseReadWriteTransaction, logger commontypes.Logger, staleBlob StaleBlob, updatedReapedStats map[commontypes.OracleID]BlobQuotaStats) error { + if err := tx.DeleteStaleBlobIndex(staleBlob); err != nil { + return fmt.Errorf("failed to delete stale blob index: %w", err) + } + meta, err := tx.ReadBlobMeta(staleBlob.BlobDigest) if err != nil { return fmt.Errorf("failed to read blob meta: %w", err) } - if meta == nil { - return fmt.Errorf("blob meta is nil") + logger.Warn("reapSingleBlob: orphan stale blob index entry (no blob meta), dropped index entry", commontypes.LogFields{ + "staleSinceSeqNr": staleBlob.StaleSinceSeqNr, + "blobDigest": staleBlob.BlobDigest, + }) + return nil } for chunkIndex, chunkHave := range meta.ChunkHaves { @@ -84,9 +99,6 @@ func reapSingleBlob(tx KeyValueDatabaseReadWriteTransaction, staleBlob StaleBlob if err := tx.DeleteBlobMeta(staleBlob.BlobDigest); err != nil { return fmt.Errorf("failed to delete blob meta: %w", err) } - if err := tx.DeleteStaleBlobIndex(staleBlob); err != nil { - return fmt.Errorf("failed to delete stale blob index: %w", err) - } // increase reaped quota stats @@ -94,10 +106,8 @@ func reapSingleBlob(tx KeyValueDatabaseReadWriteTransaction, staleBlob StaleBlob if err != nil { return fmt.Errorf("failed to read blob quota stats: %w", err) } - updatedQuotaStats, ok := existingQuotaStats.Add(BlobQuotaStats{ - 1, - meta.PayloadLength, - }) + thisBlob := BlobQuotaStats{1, meta.PayloadLength} + updatedQuotaStats, ok := existingQuotaStats.Add(thisBlob) if !ok { return fmt.Errorf("quotaStats overflow") } @@ -106,6 +116,8 @@ func reapSingleBlob(tx KeyValueDatabaseReadWriteTransaction, staleBlob StaleBlob return fmt.Errorf("failed to write blob quota stats: %w", err) } + updatedReapedStats[meta.Submitter] = updatedQuotaStats + return nil } @@ -113,27 +125,52 @@ func RunBlobReap( ctx context.Context, logger loghelper.LoggerWithContext, kvDb KeyValueDatabase, + chBlobExchangeToBlobReap <-chan uint64, + perOracleMetrics []*blobOracleMetrics, ) { chDone := ctx.Done() - chTick := time.After(0) + var chTick <-chan time.Time + var maxStaleSinceSeqNr uint64 + haveMaxStaleSinceSeqNr := false for { select { + case highestCommittedSeqNr := <-chBlobExchangeToBlobReap: + if !haveMaxStaleSinceSeqNr || maxStaleSinceSeqNr < highestCommittedSeqNr { + logger.Debug("BlobReap: received higher highest committed seq nr from blob exchange", commontypes.LogFields{ + "highestCommittedSeqNr": highestCommittedSeqNr, + }) + + maxStaleSinceSeqNr = highestCommittedSeqNr + haveMaxStaleSinceSeqNr = true + chTick = time.After(0) + } case <-chTick: + if !haveMaxStaleSinceSeqNr { + chTick = nil + continue + } + + done, stats, err := reapBlobs(ctx, logger, kvDb, maxStaleSinceSeqNr, perOracleMetrics) + if err != nil { + logger.Warn("BlobReap: failed to reap blobs", commontypes.LogFields{ + "maxStaleSinceSeqNr": maxStaleSinceSeqNr, + "error": err, + }) + } else { + logger.Info("BlobReap: finished reaping blobs", commontypes.LogFields{ + "maxStaleSinceSeqNr": maxStaleSinceSeqNr, + "done": done, + "stats": fmt.Sprintf("%+v", stats), + }) + } + if done { + chTick = time.After(blobReapMinInterval) + } else { + chTick = time.After(0) + } case <-chDone: return } - - done, err := reapBlobs(ctx, kvDb) - if err != nil { - logger.Warn("BlobReap: failed to reap blobs", commontypes.LogFields{ - "error": err, - }) - } - if done { - chTick = time.After(blobReapInterval) - } else { - chTick = time.After(0) - } } } diff --git a/offchainreporting2plus/internal/ocr3_1/protocol/highest_committed_seq_nr_tracker.go b/offchainreporting2plus/internal/ocr3_1/protocol/highest_committed_seq_nr_tracker.go new file mode 100644 index 00000000..3f7c95f6 --- /dev/null +++ b/offchainreporting2plus/internal/ocr3_1/protocol/highest_committed_seq_nr_tracker.go @@ -0,0 +1,69 @@ +package protocol + +import ( + "context" + "time" + + "github.com/smartcontractkit/libocr/commontypes" + "github.com/smartcontractkit/libocr/internal/loghelper" +) + +func RunHighestCommittedSeqNrTracker( + ctx context.Context, + logger loghelper.LoggerWithContext, + kvDb KeyValueDatabase, + interval time.Duration, + chOut chan<- uint64, +) { + chDone := ctx.Done() + chTick := time.After(0) + + highestCommittedSeqNr := uint64(0) + notifyOut := false + for { + var nilOrChOut chan<- uint64 + if notifyOut { + nilOrChOut = chOut + } else { + nilOrChOut = nil + } + + select { + case nilOrChOut <- highestCommittedSeqNr: + notifyOut = false + case <-chTick: + case <-chDone: + return + } + + tx, err := kvDb.NewReadTransactionUnchecked() + if err != nil { + logger.Warn("RunHighestCommittedSeqNrTracker: failed to create read transaction", commontypes.LogFields{ + "error": err, + }) + chTick = time.After(interval) + continue + } + + highestCommittedSeqNrNew, err := tx.ReadHighestCommittedSeqNr() + tx.Discard() + if err != nil { + logger.Warn("RunHighestCommittedSeqNrTracker: failed to read highest committed seq nr", commontypes.LogFields{ + "error": err, + }) + chTick = time.After(interval) + continue + } + + chTick = time.After(interval) + + if highestCommittedSeqNrNew > highestCommittedSeqNr { + logger.Trace("RunHighestCommittedSeqNrTracker: new highestCommittedSeqNr", commontypes.LogFields{ + "highestCommittedSeqNr": highestCommittedSeqNrNew, + "prevHighestCommittedSeqNr": highestCommittedSeqNr, + }) + notifyOut = true + highestCommittedSeqNr = highestCommittedSeqNrNew + } + } +} diff --git a/offchainreporting2plus/internal/ocr3_1/protocol/kvdb.go b/offchainreporting2plus/internal/ocr3_1/protocol/kvdb.go index 354e3086..69148a6c 100644 --- a/offchainreporting2plus/internal/ocr3_1/protocol/kvdb.go +++ b/offchainreporting2plus/internal/ocr3_1/protocol/kvdb.go @@ -3,6 +3,7 @@ package protocol import ( "github.com/smartcontractkit/libocr/commontypes" "github.com/smartcontractkit/libocr/internal/jmt" + "github.com/smartcontractkit/libocr/internal/util" "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3_1types" "github.com/smartcontractkit/libocr/offchainreporting2plus/types" ) @@ -53,9 +54,11 @@ type KeyValueDatabaseSemanticRead interface { // If only some chunks are present, it returns an error. ReadBlobPayload(BlobDigest) ([]byte, error) ReadBlobMeta(BlobDigest) (*BlobMeta, error) + ReadBlobDigestExpirySeqNrs(minBlobDigest BlobDigest, maxItems int) (_ []BlobDigestExpirySeqNr, more bool, _ error) ReadBlobQuotaStats(blobQuotaStatsType BlobQuotaStatsType, submitter commontypes.OracleID) (BlobQuotaStats, error) ReadBlobChunk(BlobDigest, uint64) ([]byte, error) ReadStaleBlobIndex(maxStaleSinceSeqNr uint64, limit int) ([]StaleBlob, error) + ExistsStaleBlobIndex(StaleBlob) (bool, error) ReadReportsPlusPrecursor(seqNr uint64, reportsPlusPrecursorDigest ReportsPlusPrecursorDigest) (*ocr3_1types.ReportsPlusPrecursor, error) @@ -189,6 +192,13 @@ func (b BlobQuotaStats) Add(other BlobQuotaStats) (BlobQuotaStats, bool) { }, true } +func (b BlobQuotaStats) SaturatingSub(other BlobQuotaStats) BlobQuotaStats { + return BlobQuotaStats{ + util.SaturatingSub(b.Count, other.Count), + util.SaturatingSub(b.CumulativePayloadLength, other.CumulativePayloadLength), + } +} + func (b BlobQuotaStats) Sub(other BlobQuotaStats) (BlobQuotaStats, bool) { diffCount := b.Count - other.Count if diffCount > b.Count { @@ -204,11 +214,30 @@ func (b BlobQuotaStats) Sub(other BlobQuotaStats) (BlobQuotaStats, bool) { }, true } +func (b BlobQuotaStats) Exceeds(limit BlobQuotaStats) bool { + if b.Count > limit.Count { + return true + } + if b.CumulativePayloadLength > limit.CumulativePayloadLength { + return true + } + return false +} + type StaleBlob struct { StaleSinceSeqNr uint64 BlobDigest BlobDigest } +// BlobDigestExpirySeqNr is a projection of a [BlobMeta] entry that pairs a +// blob's digest with its expiry sequence number. It is intentionally distinct +// from [StaleBlob], which is keyed on the sequence number after which a blob +// became stale. +type BlobDigestExpirySeqNr struct { + BlobDigest BlobDigest + ExpirySeqNr uint64 +} + type KeyValueDatabase interface { // Must error if the key value store is not ready to apply state transition // for the given sequence number. Must update the highest committed sequence diff --git a/offchainreporting2plus/internal/ocr3_1/protocol/metrics.go b/offchainreporting2plus/internal/ocr3_1/protocol/metrics.go index 968075c7..286f8ac5 100644 --- a/offchainreporting2plus/internal/ocr3_1/protocol/metrics.go +++ b/offchainreporting2plus/internal/ocr3_1/protocol/metrics.go @@ -1,9 +1,14 @@ package protocol import ( + "fmt" + "sync" + "github.com/prometheus/client_golang/prometheus" "github.com/smartcontractkit/libocr/commontypes" "github.com/smartcontractkit/libocr/internal/metricshelper" + "github.com/smartcontractkit/libocr/internal/util" + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3_1types" ) type pacemakerMetrics struct { @@ -100,10 +105,14 @@ func (om *outcomeGenerationMetrics) Close() { } type blobExchangeMetrics struct { - registerer prometheus.Registerer - blobsInProgress prometheus.Gauge + registerer prometheus.Registerer + blobsInProgress prometheus.Gauge + myBroadcastPayloadBytes prometheus.Histogram } +// 256 bytes, 1KiB, ..., 256MiB, copied from ragep2p_experimental_peer_message_bytes +var blobPayloadBytesBuckets = []float64{1 << 8, 1 << 10, 1 << 12, 1 << 14, 1 << 16, 1 << 18, 1 << 20, 1 << 22, 1 << 24, 1 << 26, 1 << 28} + func newBlobExchangeMetrics(registerer prometheus.Registerer, logger commontypes.Logger) *blobExchangeMetrics { @@ -113,12 +122,499 @@ func newBlobExchangeMetrics(registerer prometheus.Registerer, }) metricshelper.RegisterOrLogError(logger, registerer, blobsInProgress, "ocr3_1_experimental_blobs_in_progress") + myBroadcastPayloadBytes := prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "ocr3_1_blob_my_broadcast_payload_bytes", + Help: "The payload size in bytes of each outbound blob broadcast initiated by this oracle", + Buckets: blobPayloadBytesBuckets, + }) + metricshelper.RegisterOrLogError(logger, registerer, myBroadcastPayloadBytes, "ocr3_1_blob_my_broadcast_payload_bytes") + return &blobExchangeMetrics{ registerer, blobsInProgress, + myBroadcastPayloadBytes, } } func (bm *blobExchangeMetrics) Close() { bm.registerer.Unregister(bm.blobsInProgress) + bm.registerer.Unregister(bm.myBroadcastPayloadBytes) +} + +type blobOracleMetrics struct { + registerer prometheus.Registerer + + // Repeat broadcasts after the initial broadcast has been pruned from bex + // state will be treated as new broadcasts when it comes to metrics. + + // Semantics: Receiving a MessageBlobOffer with the same contents twice + // increments the counter twice. Broadcaster feels like there's some blob you + // should accept/reject still. + // EXPERIMENTAL + theirBlobOffersTotal prometheus.Counter + + // Semantics: Observe when we add to the append stats for the first time + // EXPERIMENTAL + theirBlobAppendedPayloadBytes prometheus.Histogram + + // Semantics: Because of offer resends these are not comparable with + // theirBlobsOfferedTotal. Counting the number of MesssageBlobOfferResponses + // with reject=true/false sent. In typical operation and honest broadcaster, this + // does not overaccount, but might overaccount otherwise. + // + // Usage: Should compare with historical trends, instant value is not very + // meaningful in isolation. e.g., rejectsSentTotal might reasonably be > 0 + // in steady state operation. + // + // EXPERIMENTAL + theirBlobAcceptsSentTotal prometheus.Counter + // EXPERIMENTAL + theirBlobRejectsSentTotal prometheus.Counter + + // Usage: Compare with historical trends, instant values not that meaningful. + // EXPERIMENTAL + myBlobsAcceptedTotal prometheus.Counter + // If you are debugging blob broadcast issues, this metric can tell you + // whether a given recipient oracle is rejecting blobs at an abnormally high + // rate, which could point to an issue with quotas or rate limits, or + // expiration being too tight. + // EXPERIMENTAL + myBlobsRejectedTotal prometheus.Counter + // EXPERIMENTAL + myBlobsUndeterminedTotal prometheus.Counter + + // Semantics: If broadcaster crashes, is adversarial, or re-broadcasts the same + // blob and we re-reject, we will overaccount this metric. An honest oracle + // receiving a rejection for a single BroadcastBlob will not ask us again, + // thus in the typical case there is no overaccounting. + // + // Usage: Instant values are also meaningful, e.g., + // theirBlobsRejectedDueToOversizePayloadBytesTotal > 0 is already + // indicative of a bug. + theirBlobsRejectedDueToOversizePayloadBytesTotal prometheus.Counter + theirBlobsRejectedDueToQuotaTotal prometheus.Counter + theirBlobsRejectedDueToManyInflightTotal prometheus.Counter + theirBlobsRejectedDueToExpirationTotal prometheus.Counter + + // Stats metrics are only meant to be set through SetTheirAppendedStats and + // SetTheirReapedStats, hence their underscore prefix. + + _statsMetrics *blobOracleStatsMetrics +} + +type blobOracleStatsMetrics struct { + // EXPERIMENTAL + theirBlobQuotaAppendedCount *metricshelper.LazyGauge + // EXPERIMENTAL + theirBlobQuotaAppendedCumulativePayloadBytes *metricshelper.LazyGauge + // EXPERIMENTAL + theirBlobQuotaReapedCount *metricshelper.LazyGauge + // EXPERIMENTAL + theirBlobQuotaReapedCumulativePayloadBytes *metricshelper.LazyGauge + + theirBlobQuotaUsedCount *metricshelper.LazyGauge + theirBlobQuotaUsedCumulativePayloadBytes *metricshelper.LazyGauge + + theirBlobQuotaFreeCount *metricshelper.LazyGauge + theirBlobQuotaFreeCumulativePayloadBytes *metricshelper.LazyGauge + + // Need for computation of free quota + limitQuotaStats BlobQuotaStats + + mu sync.Mutex + appended *BlobQuotaStats + reaped *BlobQuotaStats +} + +func newBlobOracleMetrics( + registerer prometheus.Registerer, + logger commontypes.Logger, + remote commontypes.OracleID, + pluginLimits ocr3_1types.ReportingPluginLimits, +) *blobOracleMetrics { + broadcasterOracleIDLabels := prometheus.Labels{"broadcaster_oracle_id": fmt.Sprint(remote)} + fetcherOracleIDLabels := prometheus.Labels{"fetcher_oracle_id": fmt.Sprint(remote)} + + newLazyGauge := func(name string, help string, labels prometheus.Labels) *metricshelper.LazyGauge { + return metricshelper.NewLazyGauge(registerer, logger, prometheus.GaugeOpts{ + Name: name, + Help: help, + ConstLabels: labels, + }) + } + + newCounter := func(name string, help string, labels prometheus.Labels) prometheus.Counter { + c := prometheus.NewCounter(prometheus.CounterOpts{ + Name: name, + Help: help, + ConstLabels: labels, + }) + metricshelper.RegisterOrLogError(logger, registerer, c, name) + return c + } + + theirBlobOffersTotal := newCounter( + "ocr3_1_experimental_their_blob_offers_total", + "Number of blob offers received by the broadcaster oracle. Multiple offers count multiple times, "+ + "including in the case where the oracle is fetching chunks for a previous identical offer, but the same offer is resent.", + broadcasterOracleIDLabels, + ) + + theirBlobAppendedPayloadBytes := prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "ocr3_1_experimental_their_blob_appended_payload_bytes", + Help: "The payload size in bytes of inbound blob broadcasts received from the broadcaster oracle", + ConstLabels: broadcasterOracleIDLabels, + Buckets: blobPayloadBytesBuckets, + }) + metricshelper.RegisterOrLogError(logger, registerer, theirBlobAppendedPayloadBytes, "ocr3_1_experimental_their_blob_appended_payload_bytes") + + type theirBroadcastResponse string + const ( + theirBroadcastResponseAccept theirBroadcastResponse = "accept" + theirBroadcastResponseReject theirBroadcastResponse = "reject" + ) + + newTheirBlobOfferResponsesTotal := func(t theirBroadcastResponse) prometheus.Counter { + return newCounter( + "ocr3_1_experimental_their_blob_offer_responses_total", + fmt.Sprintf("The total number of inbound blob broadcasts received from the broadcaster oracle, distinguished by response label to indicate what we responded. "+ + "Usage: Should compare with historical trends, instant value is "+ + "not very meaningful in isolation. e.g., %s might reasonably "+ + "be > 0 in steady state operation. "+ + "Valid responses are: %s (i.e., we sent an accepting offer response), "+ + "%s (i.e., we sent a rejecting offer response, either due to outright rejecting the offer or the blob expiring before we could accept)", + theirBroadcastResponseReject, + theirBroadcastResponseAccept, theirBroadcastResponseReject), + util.MapsUnion( + broadcasterOracleIDLabels, + prometheus.Labels{"response": string(t)}, + ), + ) + } + + theirBlobAcceptsSentTotal := newTheirBlobOfferResponsesTotal(theirBroadcastResponseAccept) + theirBlobRejectsSentTotal := newTheirBlobOfferResponsesTotal(theirBroadcastResponseReject) + + type myBroadcastResult string + const ( + myBroadcastResultAccepted myBroadcastResult = "accepted" + myBroadcastResultRejected myBroadcastResult = "rejected" + myBroadcastResultUndetermined myBroadcastResult = "undetermined" + ) + + newMyBlobsTotal := func(m myBroadcastResult) prometheus.Counter { + return newCounter( + "ocr3_1_experimental_my_blobs_total", + fmt.Sprintf("The total number of outbound blob broadcasts initiated by us to the fetcher oracle, distinguished by result label. "+ + "Usage: If you are debugging blob broadcast issues, this metric can tell you "+ + "whether a given recipient oracle is rejecting blobs at an abnormally high "+ + "rate, which could point to an issue with quotas or rate limits, or "+ + "expiration being too tight. "+ + "Valid results are: %s (i.e., the fetcher oracle sent us an accepting offer response), "+ + "%s (i.e., the fetcher oracle sent us a rejecting offer response), or "+ + "%s (i.e., the fetcher oracle did not send us an offer response before we gave up on the broadcast). "+ + "Note that once we receive enough accepting or rejecting responses we stop processing responses, "+ + "thus it is expected for a number of oracles that did not respond in time due to being slower to have undetermined result.", + myBroadcastResultAccepted, myBroadcastResultRejected, myBroadcastResultUndetermined), + util.MapsUnion( + fetcherOracleIDLabels, + prometheus.Labels{"result": string(m)}, + ), + ) + } + + myBlobsAcceptedTotal := newMyBlobsTotal(myBroadcastResultAccepted) + myBlobsRejectedTotal := newMyBlobsTotal(myBroadcastResultRejected) + myBlobsUndeterminedTotal := newMyBlobsTotal(myBroadcastResultUndetermined) + + type rejectionReason string + const ( + rejectionReasonOversizePayloadBytes rejectionReason = "oversize_payload_bytes" + rejectionReasonQuota rejectionReason = "quota" + rejectionReasonTooManyInflight rejectionReason = "too_many_inflight" + rejectionReasonExpiration rejectionReason = "expiration" + ) + + newTheirBlobsRejectedTotal := func(r rejectionReason) prometheus.Counter { + return newCounter( + "ocr3_1_their_blobs_rejected_total", + fmt.Sprintf("The total number of blobs that the broadcaster oracle offered to us, distinguished by the reason label for why we rejected them. "+ + "Usage: Instant values are also meaningful, e.g., "+ + "%s > 0 is already "+ + "indicative of a bug. "+ + "Valid reasons are: %s (i.e., the blob had oversize payload bytes compared to MaxBlobPayloadBytes), "+ + "%s (i.e., the blob went over the defined quota in MaxPerOracleUnexpiredBlobCumulativePayloadBytes, MaxPerOracleUnexpiredBlobCount, see also ocr3_1_blob_quota* metrics), "+ + "%s (i.e., too many blobs were inflight already from this broadcaster oracle), "+ + "%s (i.e., the blob was expired either immediately upon offer receipt or before the time we could fetch all chunks and accept it).", + rejectionReasonOversizePayloadBytes, + rejectionReasonOversizePayloadBytes, rejectionReasonQuota, rejectionReasonTooManyInflight, rejectionReasonExpiration, + ), + util.MapsUnion( + broadcasterOracleIDLabels, + prometheus.Labels{"reason": string(r)}, + ), + ) + } + + theirBlobsRejectedDueToOversizePayloadBytesTotal := newTheirBlobsRejectedTotal(rejectionReasonOversizePayloadBytes) + theirBlobsRejectedDueToQuotaTotal := newTheirBlobsRejectedTotal(rejectionReasonQuota) + theirBlobsRejectedDueToManyInflightTotal := newTheirBlobsRejectedTotal(rejectionReasonTooManyInflight) + theirBlobsRejectedDueToExpirationTotal := newTheirBlobsRejectedTotal(rejectionReasonExpiration) + + theirBlobQuotaAppendedCount := newLazyGauge( + "ocr3_1_experimental_blob_quota_appended_count", + "The number of blobs we ever started fetching from the broadcaster oracle. "+ + "This metric is not meaningful for broadcaster=self, as we do not count our own broadcasts against our quota.", + broadcasterOracleIDLabels, + ) + theirBlobQuotaAppendedCumulativePayloadBytes := newLazyGauge( + "ocr3_1_experimental_blob_quota_appended_cumulative_payload_bytes", + "The cumulative payload bytes of blobs we ever started fetching from the broadcaster oracle. "+ + "This metric is not meaningful for broadcaster=self, as we do not count our own broadcasts against our quota.", + broadcasterOracleIDLabels, + ) + theirBlobQuotaReapedCount := newLazyGauge( + "ocr3_1_experimental_blob_quota_reaped_count", + "The number of blobs that we ever started fetching from the broadcaster oracle, "+ + "that have since been reaped.", + broadcasterOracleIDLabels, + ) + theirBlobQuotaReapedCumulativePayloadBytes := newLazyGauge( + "ocr3_1_experimental_blob_quota_reaped_cumulative_payload_bytes", + "The cumulative payload bytes of blobs that we ever started fetching from the broadcaster oracle, "+ + "that have since been reaped.", + broadcasterOracleIDLabels, + ) + + theirBlobQuotaUsedCount := newLazyGauge( + "ocr3_1_blob_quota_used_count", + "The number of blobs that we ever started fetching from the broadcaster oracle, "+ + "that have not yet been reaped. Must be less than or equal to MaxPerOracleUnexpiredBlobCount.", + broadcasterOracleIDLabels, + ) + theirBlobQuotaUsedCumulativePayloadBytes := newLazyGauge( + "ocr3_1_blob_quota_used_cumulative_payload_bytes", + "The cumulative payload bytes of blobs that we ever started fetching from the broadcaster oracle, "+ + "that have not yet been reaped. Must be less than or equal to MaxPerOracleUnexpiredBlobCumulativePayloadBytes.", + broadcasterOracleIDLabels, + ) + + theirBlobQuotaFreeCount := newLazyGauge( + "ocr3_1_blob_quota_free_count", + "The free blob count in the broadcaster oracle's quota. The closer this value is to zero, the more likely it is "+ + "that the broadcaster oracle's offers might be rejected in the future due to insufficient free quota.", + broadcasterOracleIDLabels, + ) + theirBlobQuotaFreeCumulativePayloadBytes := newLazyGauge( + "ocr3_1_blob_quota_free_cumulative_payload_bytes", + "The free cumulative payload bytes in the broadcaster oracle's quota. The closer this value is to zero, the more likely it is "+ + "that the broadcaster oracle's offers might be rejected in the future due to insufficient free quota.", + broadcasterOracleIDLabels, + ) + + return &blobOracleMetrics{ + registerer, + theirBlobOffersTotal, + theirBlobAppendedPayloadBytes, + theirBlobAcceptsSentTotal, + theirBlobRejectsSentTotal, + myBlobsAcceptedTotal, + myBlobsRejectedTotal, + myBlobsUndeterminedTotal, + theirBlobsRejectedDueToOversizePayloadBytesTotal, + theirBlobsRejectedDueToQuotaTotal, + theirBlobsRejectedDueToManyInflightTotal, + theirBlobsRejectedDueToExpirationTotal, + &blobOracleStatsMetrics{ + theirBlobQuotaAppendedCount, + theirBlobQuotaAppendedCumulativePayloadBytes, + theirBlobQuotaReapedCount, + theirBlobQuotaReapedCumulativePayloadBytes, + theirBlobQuotaUsedCount, + theirBlobQuotaUsedCumulativePayloadBytes, + theirBlobQuotaFreeCount, + theirBlobQuotaFreeCumulativePayloadBytes, + + BlobQuotaStats{ + uint64(pluginLimits.MaxPerOracleUnexpiredBlobCount), + uint64(pluginLimits.MaxPerOracleUnexpiredBlobCumulativePayloadBytes), + }, + sync.Mutex{}, + nil, nil, + }, + } +} + +func (bosm *blobOracleStatsMetrics) SetTheirAppendedStats(stats BlobQuotaStats) { + bosm.theirBlobQuotaAppendedCount.Set(float64(stats.Count)) + bosm.theirBlobQuotaAppendedCumulativePayloadBytes.Set(float64(stats.CumulativePayloadLength)) + + bosm.mu.Lock() + defer bosm.mu.Unlock() + bosm.appended = &stats + bosm.updateUsedFreeGauges() +} + +func (bosm *blobOracleStatsMetrics) SetTheirReapedStats(stats BlobQuotaStats) { + bosm.theirBlobQuotaReapedCount.Set(float64(stats.Count)) + bosm.theirBlobQuotaReapedCumulativePayloadBytes.Set(float64(stats.CumulativePayloadLength)) + + bosm.mu.Lock() + defer bosm.mu.Unlock() + bosm.reaped = &stats + bosm.updateUsedFreeGauges() +} + +func (bosm *blobOracleStatsMetrics) updateUsedFreeGauges() { + if bosm.appended == nil || bosm.reaped == nil { + return + } + + used, ok := bosm.appended.Sub(*bosm.reaped) + if !ok { + + used = BlobQuotaStats{} + } + bosm.theirBlobQuotaUsedCount.Set(float64(used.Count)) + bosm.theirBlobQuotaUsedCumulativePayloadBytes.Set(float64(used.CumulativePayloadLength)) + + free := bosm.limitQuotaStats.SaturatingSub(used) + bosm.theirBlobQuotaFreeCount.Set(float64(free.Count)) + bosm.theirBlobQuotaFreeCumulativePayloadBytes.Set(float64(free.CumulativePayloadLength)) +} + +func (bosm *blobOracleStatsMetrics) Close() { + bosm.theirBlobQuotaAppendedCount.Unregister() + bosm.theirBlobQuotaAppendedCumulativePayloadBytes.Unregister() + bosm.theirBlobQuotaReapedCount.Unregister() + bosm.theirBlobQuotaReapedCumulativePayloadBytes.Unregister() + bosm.theirBlobQuotaUsedCount.Unregister() + bosm.theirBlobQuotaUsedCumulativePayloadBytes.Unregister() + bosm.theirBlobQuotaFreeCount.Unregister() + bosm.theirBlobQuotaFreeCumulativePayloadBytes.Unregister() +} + +func (bom *blobOracleMetrics) SetTheirAppendedStats(stats BlobQuotaStats) { + bom._statsMetrics.SetTheirAppendedStats(stats) +} + +func (bom *blobOracleMetrics) SetTheirReapedStats(stats BlobQuotaStats) { + bom._statsMetrics.SetTheirReapedStats(stats) +} + +func (bom *blobOracleMetrics) Close() { + bom.registerer.Unregister(bom.theirBlobOffersTotal) + bom.registerer.Unregister(bom.theirBlobAppendedPayloadBytes) + bom.registerer.Unregister(bom.theirBlobAcceptsSentTotal) + bom.registerer.Unregister(bom.theirBlobRejectsSentTotal) + bom.registerer.Unregister(bom.myBlobsAcceptedTotal) + bom.registerer.Unregister(bom.myBlobsRejectedTotal) + bom.registerer.Unregister(bom.myBlobsUndeterminedTotal) + bom.registerer.Unregister(bom.theirBlobsRejectedDueToOversizePayloadBytesTotal) + bom.registerer.Unregister(bom.theirBlobsRejectedDueToQuotaTotal) + bom.registerer.Unregister(bom.theirBlobsRejectedDueToManyInflightTotal) + bom.registerer.Unregister(bom.theirBlobsRejectedDueToExpirationTotal) + bom._statsMetrics.Close() +} + +type reportingPluginInfo1Metrics struct { + registerer prometheus.Registerer + info prometheus.Gauge + limits *reportingPluginLimitsMetrics +} + +func newReportingPluginInfo1Metrics( + registerer prometheus.Registerer, + logger commontypes.Logger, + reportingPluginInfo ocr3_1types.ReportingPluginInfo1, +) *reportingPluginInfo1Metrics { + info := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "ocr3_1_reporting_plugin_info", + Help: "Exposes the ReportingPluginInfo1.Name field via labels and has value of 1. See https://pkg.go.dev/github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3_1types#ReportingPluginInfo1 for details", + ConstLabels: prometheus.Labels{ + "name": reportingPluginInfo.Name, + }, + }) + info.Set(1) + metricshelper.RegisterOrLogError(logger, registerer, info, "ocr3_1_reporting_plugin_info") + + return &reportingPluginInfo1Metrics{ + registerer, + info, + newReportingPluginLimitsMetrics(registerer, logger, reportingPluginInfo.Limits), + } +} + +func (m *reportingPluginInfo1Metrics) Close() { + m.registerer.Unregister(m.info) + m.limits.Close() +} + +type reportingPluginLimitsMetrics struct { + registerer prometheus.Registerer + + maxQueryBytes prometheus.Gauge + maxObservationBytes prometheus.Gauge + maxReportsPlusPrecursorBytes prometheus.Gauge + maxReportBytes prometheus.Gauge + maxReportCount prometheus.Gauge + maxKeyValueModifiedKeys prometheus.Gauge + maxKeyValueModifiedKeysPlusValuesBytes prometheus.Gauge + maxBlobPayloadBytes prometheus.Gauge + maxPerOracleUnexpiredBlobCumulativePayloadBytes prometheus.Gauge + maxPerOracleUnexpiredBlobCount prometheus.Gauge +} + +func newReportingPluginLimitsMetrics( + registerer prometheus.Registerer, + logger commontypes.Logger, + limits ocr3_1types.ReportingPluginLimits, +) *reportingPluginLimitsMetrics { + newLimitGauge := func(name string, value int) prometheus.Gauge { + gauge := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: name, + Help: "See https://pkg.go.dev/github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3_1types#ReportingPluginLimits for details", + }) + gauge.Set(float64(value)) + metricshelper.RegisterOrLogError(logger, registerer, gauge, name) + return gauge + } + + maxQueryBytes := newLimitGauge("ocr3_1_reporting_plugin_limit_max_query_bytes", limits.MaxQueryBytes) + maxObservationBytes := newLimitGauge("ocr3_1_reporting_plugin_limit_max_observation_bytes", limits.MaxObservationBytes) + maxReportsPlusPrecursorBytes := newLimitGauge("ocr3_1_reporting_plugin_limit_max_reports_plus_precursor_bytes", limits.MaxReportsPlusPrecursorBytes) + maxReportBytes := newLimitGauge("ocr3_1_reporting_plugin_limit_max_report_bytes", limits.MaxReportBytes) + maxReportCount := newLimitGauge("ocr3_1_reporting_plugin_limit_max_report_count", limits.MaxReportCount) + maxKeyValueModifiedKeys := newLimitGauge("ocr3_1_reporting_plugin_limit_max_key_value_modified_keys", limits.MaxKeyValueModifiedKeys) + maxKeyValueModifiedKeysPlusValuesBytes := newLimitGauge("ocr3_1_reporting_plugin_limit_max_key_value_modified_keys_plus_values_bytes", limits.MaxKeyValueModifiedKeysPlusValuesBytes) + maxBlobPayloadBytes := newLimitGauge("ocr3_1_reporting_plugin_limit_max_blob_payload_bytes", limits.MaxBlobPayloadBytes) + maxPerOracleUnexpiredBlobCumulativePayloadBytes := newLimitGauge("ocr3_1_reporting_plugin_limit_max_per_oracle_unexpired_blob_cumulative_payload_bytes", limits.MaxPerOracleUnexpiredBlobCumulativePayloadBytes) + maxPerOracleUnexpiredBlobCount := newLimitGauge("ocr3_1_reporting_plugin_limit_max_per_oracle_unexpired_blob_count", limits.MaxPerOracleUnexpiredBlobCount) + + return &reportingPluginLimitsMetrics{ + registerer, + maxQueryBytes, + maxObservationBytes, + maxReportsPlusPrecursorBytes, + maxReportBytes, + maxReportCount, + maxKeyValueModifiedKeys, + maxKeyValueModifiedKeysPlusValuesBytes, + maxBlobPayloadBytes, + maxPerOracleUnexpiredBlobCumulativePayloadBytes, + maxPerOracleUnexpiredBlobCount, + } +} + +func (m *reportingPluginLimitsMetrics) Close() { + m.registerer.Unregister(m.maxQueryBytes) + m.registerer.Unregister(m.maxObservationBytes) + m.registerer.Unregister(m.maxReportsPlusPrecursorBytes) + m.registerer.Unregister(m.maxReportBytes) + m.registerer.Unregister(m.maxReportCount) + m.registerer.Unregister(m.maxKeyValueModifiedKeys) + m.registerer.Unregister(m.maxKeyValueModifiedKeysPlusValuesBytes) + m.registerer.Unregister(m.maxBlobPayloadBytes) + m.registerer.Unregister(m.maxPerOracleUnexpiredBlobCumulativePayloadBytes) + m.registerer.Unregister(m.maxPerOracleUnexpiredBlobCount) } diff --git a/offchainreporting2plus/internal/ocr3_1/protocol/oracle.go b/offchainreporting2plus/internal/ocr3_1/protocol/oracle.go index 2020d413..2909a912 100644 --- a/offchainreporting2plus/internal/ocr3_1/protocol/oracle.go +++ b/offchainreporting2plus/internal/ocr3_1/protocol/oracle.go @@ -177,8 +177,6 @@ func (o *oracleState[RI]) run() { chNetToBlobExchange := make(chan MessageToBlobExchangeWithSender[RI]) o.chNetToBlobExchange = chNetToBlobExchange - chOutcomeGenerationToBlobExchange := make(chan EventToBlobExchange[RI]) - // communication between blob exchange and blob endpoint chBlobBroadcastRequest := make(chan blobBroadcastRequest) chBlobFetchRequest := make(chan blobFetchRequest) @@ -198,6 +196,14 @@ func (o *oracleState[RI]) run() { return } + initialHighestCommittedSeqNr, err := o.restoreHighestCommittedSeqNrFromKeyValueDatabase() + if err != nil { + o.logger.Error("restoreHighestCommittedSeqNrFromKeyValueDatabase returned an error, exiting oracle", commontypes.LogFields{ + "error": err, + }) + return + } + blobEndpoint := BlobEndpoint{ o.childCtx, @@ -307,7 +313,6 @@ func (o *oracleState[RI]) run() { o.childCtx, chNetToBlobExchange, - chOutcomeGenerationToBlobExchange, chBlobBroadcastRequest, chBlobFetchRequest, @@ -322,12 +327,17 @@ func (o *oracleState[RI]) run() { o.netEndpoint, o.offchainKeyring, o.telemetrySender, + + initialHighestCommittedSeqNr, ) }) publicConfigMetrics := ocr3_1config.NewPublicConfigMetrics(o.metricsRegisterer, o.logger, o.config.PublicConfig) defer publicConfigMetrics.Close() + reportingPluginInfo1Metrics := newReportingPluginInfo1Metrics(o.metricsRegisterer, o.logger, o.reportingPluginInfo) + defer reportingPluginInfo1Metrics.Close() + chNet := o.netEndpoint.Receive() chDone := o.ctx.Done() @@ -433,3 +443,27 @@ func (o *oracleState[RI]) restoreFromDatabase() (PacemakerState, CertifiedPrepar return paceState, cert, nil } + +func (o *oracleState[RI]) restoreHighestCommittedSeqNrFromKeyValueDatabase() (uint64, error) { + const retryPeriod = 5 * time.Second + + highestCommittedSeqNr, err := tryUntilSuccess[uint64]( + o.ctx, + o.logger, + retryPeriod, + o.localConfig.DatabaseTimeout, + "KeyValueDatabase.HighestCommittedSeqNr", + func(context.Context) (uint64, error) { + return o.kvDb.HighestCommittedSeqNr() + }, + ) + if err != nil { + return 0, err + } + + o.logger.Info("restoreHighestCommittedSeqNrFromKeyValueDatabase: successfully restored highest committed seq nr", commontypes.LogFields{ + "highestCommittedSeqNr": highestCommittedSeqNr, + }) + + return highestCommittedSeqNr, nil +} diff --git a/offchainreporting2plus/internal/shim/ocr3_1_key_value_store.go b/offchainreporting2plus/internal/shim/ocr3_1_key_value_store.go index b53091a5..8da76450 100644 --- a/offchainreporting2plus/internal/shim/ocr3_1_key_value_store.go +++ b/offchainreporting2plus/internal/shim/ocr3_1_key_value_store.go @@ -58,6 +58,7 @@ func (s *SemanticOCR3_1KeyValueDatabase) newReadWriteTransaction(tx ocr3_1types. SemanticOCR3_1KeyValueDatabaseReadTransaction{ tx, s.config, + s.logger, }, tx, s.metrics, @@ -167,7 +168,11 @@ func (s *SemanticOCR3_1KeyValueDatabase) NewReadTransactionUnchecked() (protocol if err != nil { return nil, fmt.Errorf("failed to create read transaction: %w", err) } - return &SemanticOCR3_1KeyValueDatabaseReadTransaction{tx, s.config}, nil + return &SemanticOCR3_1KeyValueDatabaseReadTransaction{ + tx, + s.config, + s.logger, + }, nil } type SemanticOCR3_1KeyValueDatabaseReadWriteTransaction struct { @@ -315,17 +320,19 @@ func partialExclusiveRangeKeys(readTransaction ocr3_1types.KeyValueDatabaseReadT return keys, more, nil } -func partialInclusiveRangeKeys(readTransaction ocr3_1types.KeyValueDatabaseReadTransaction, loKey []byte, hiKeyIncl []byte, maxItems int) (keys [][]byte, more bool, err error) { - hiKeyExcl := append(bytes.Clone(hiKeyIncl), 0) - return partialExclusiveRangeKeys(readTransaction, loKey, hiKeyExcl, maxItems) +// hiKeyInclToExcl converts a key intended as an inclusive upper bound into an +// exclusive upper bound suitable for [ocr3_1types.KeyValueDatabaseReadTransaction.Range], +// by appending a zero byte (the smallest byte) to a clone of the input. +func hiKeyInclToExcl(hiKeyIncl []byte) []byte { + return append(bytes.Clone(hiKeyIncl), 0) } -func (s *SemanticOCR3_1KeyValueDatabaseReadWriteTransaction) partialExclusiveRangeKeys(loKey []byte, hiKeyExcl []byte, maxItems int) (keys [][]byte, more bool, err error) { - return partialExclusiveRangeKeys(s.rawTransaction, loKey, hiKeyExcl, maxItems) +func partialInclusiveRangeKeys(readTransaction ocr3_1types.KeyValueDatabaseReadTransaction, loKey []byte, hiKeyIncl []byte, maxItems int) (keys [][]byte, more bool, err error) { + return partialExclusiveRangeKeys(readTransaction, loKey, hiKeyInclToExcl(hiKeyIncl), maxItems) } -func (s *SemanticOCR3_1KeyValueDatabaseReadWriteTransaction) partialInclusiveRangeKeys(loKey []byte, hiKeyIncl []byte, maxItems int) (keys [][]byte, more bool, err error) { - return partialInclusiveRangeKeys(s.rawTransaction, loKey, hiKeyIncl, maxItems) +func (s *SemanticOCR3_1KeyValueDatabaseReadTransaction) partialExclusiveRangeKeys(loKey []byte, hiKeyExcl []byte, maxItems int) (keys [][]byte, more bool, err error) { + return partialExclusiveRangeKeys(s.rawTransaction, loKey, hiKeyExcl, maxItems) } func (s *SemanticOCR3_1KeyValueDatabaseReadTransaction) partialInclusiveRangeKeys(loKey []byte, hiKeyIncl []byte, maxItems int) (keys [][]byte, more bool, err error) { @@ -476,6 +483,7 @@ func (s *SemanticOCR3_1KeyValueDatabaseReadWriteTransaction) Write(key []byte, v type SemanticOCR3_1KeyValueDatabaseReadTransaction struct { rawTransaction ocr3_1types.KeyValueDatabaseReadTransaction config ocr3_1config.PublicConfig + logger commontypes.Logger } var _ protocol.KeyValueDatabaseReadTransaction = &SemanticOCR3_1KeyValueDatabaseReadTransaction{} @@ -967,6 +975,50 @@ func (s *SemanticOCR3_1KeyValueDatabaseReadTransaction) ReadBlobMeta(blobDigest return &blobMeta, nil } +func (s *SemanticOCR3_1KeyValueDatabaseReadTransaction) ReadBlobDigestExpirySeqNrs(minBlobDigest blobtypes.BlobDigest, maxItems int) ([]protocol.BlobDigestExpirySeqNr, bool, error) { + loKey := blobMetaPrefixKey(minBlobDigest) + hiKeyExcl := hiKeyInclToExcl(blobMetaPrefixKey(blobtypes.MaxBlobDigest())) + + it := s.rawTransaction.Range(loKey, hiKeyExcl) + defer it.Close() + + var result []protocol.BlobDigestExpirySeqNr + more := false + + for it.Next() { + if len(result) == maxItems { + more = true + break + } + key := it.Key() + blobDigest, err := deserializeBlobMetaPrefixKey(key) + if err != nil { + return nil, false, fmt.Errorf("error deserializing blob digest from blob meta key: %w", err) + } + blobMetaBytes, err := it.Value() + if err != nil { + return nil, false, fmt.Errorf("error reading blob meta value: %w", err) + } + if blobMetaBytes == nil { + return nil, false, fmt.Errorf("blob meta bytes are unexpectedly nil for blob digest %s", blobDigest) + } + blobMeta, err := serialization.DeserializeBlobMeta(blobMetaBytes) + if err != nil { + return nil, false, fmt.Errorf("error unmarshaling blob meta for blob digest %s: %w", blobDigest, err) + } + result = append(result, protocol.BlobDigestExpirySeqNr{ + blobDigest, + blobMeta.ExpirySeqNr, + }) + } + + if err := it.Err(); err != nil { + return nil, false, fmt.Errorf("error iterating over blob metas: %w", err) + } + + return result, more, nil +} + func (s *SemanticOCR3_1KeyValueDatabaseReadWriteTransaction) WriteBlobMeta(blobDigest protocol.BlobDigest, blobMeta protocol.BlobMeta) error { metaBytes, err := serialization.SerializeBlobMeta(blobMeta) if err != nil { @@ -1028,6 +1080,15 @@ func (s *SemanticOCR3_1KeyValueDatabaseReadTransaction) ReadStaleBlobIndex(maxSt return staleBlobs, nil } +func (s *SemanticOCR3_1KeyValueDatabaseReadTransaction) ExistsStaleBlobIndex(staleBlob protocol.StaleBlob) (bool, error) { + key := staleBlobIndexPrefixKey(staleBlob) + value, err := s.rawTransaction.Read(key) + if err != nil { + return false, fmt.Errorf("Read failed: %w", err) + } + return value != nil, nil +} + func (s *SemanticOCR3_1KeyValueDatabaseReadWriteTransaction) WriteStaleBlobIndex(staleBlob protocol.StaleBlob) error { return s.rawTransaction.Write(staleBlobIndexPrefixKey(staleBlob), []byte{}) } @@ -1275,6 +1336,22 @@ func blobMetaPrefixKey(blobDigest protocol.BlobDigest) []byte { return append([]byte(blobMetaPrefix), blobDigest[:]...) } +func deserializeBlobMetaPrefixKey(enc []byte) (protocol.BlobDigest, error) { + if len(enc) < len(blobMetaPrefix) { + return blobtypes.BlobDigest{}, fmt.Errorf("encoding too short") + } + enc = enc[len(blobMetaPrefix):] + if len(enc) < len(protocol.BlobDigest{}) { + return blobtypes.BlobDigest{}, fmt.Errorf("encoding too short to contain blob digest") + } + blobDigest := protocol.BlobDigest(enc[:len(protocol.BlobDigest{})]) + enc = enc[len(protocol.BlobDigest{}):] + if len(enc) != 0 { + return blobtypes.BlobDigest{}, fmt.Errorf("encoding too long") + } + return blobDigest, nil +} + func blobQuotaStatsPrefixKey(blobQuotaStatsType protocol.BlobQuotaStatsType, submitter commontypes.OracleID) []byte { base := append([]byte(blobQuotaStatsPrefix), []byte(blobQuotaStatsType)...) return append(base, []byte(fmt.Sprintf("%d", submitter))...) diff --git a/offchainreporting2plus/ocr3_1types/blob.go b/offchainreporting2plus/ocr3_1types/blob.go index 7dd72c7a..50255100 100644 --- a/offchainreporting2plus/ocr3_1types/blob.go +++ b/offchainreporting2plus/ocr3_1types/blob.go @@ -21,10 +21,23 @@ type BlobExpirationHintSequenceNumber struct{ SeqNr uint64 } func (BlobExpirationHintSequenceNumber) isBlobExpirationHint() {} type BlobBroadcaster interface { + // BroadcastBlob broadcasts a blob with the given payload and expiration + // hint, returning a handle that can be used to refer to the blob in the + // future (e.g. in a query or observation). Note that BroadcastBlob may + // return an error, and callers must gracefully handle such an error. Errors + // might be transient, and related to resource exhaustion prevention, so + // callers are encouranged to employ a backoff when retrying. An error in + // BroadcastBlob should not cause Observation to error also. BroadcastBlob + // is a blocking call, and callers must ensure that it fits into their + // runtime envelope. BroadcastBlob(ctx context.Context, payload []byte, expirationHint BlobExpirationHint) (BlobHandle, error) } type BlobFetcher interface { + // FetchBlob fetches the payload corresponding to the BlobHandle passed. + // Callers must gracefully handle errors returned by FetchBlob. FetchBlob is + // a blocking call, and callers must ensure that it fits into their runtime + // envelope. FetchBlob(ctx context.Context, handle BlobHandle) ([]byte, error) } diff --git a/offchainreporting2plus/ocrintegrationtesthelpers/in_memory_key_value_database.go b/offchainreporting2plus/ocrintegrationtesthelpers/in_memory_key_value_database.go index f69ec557..16b9ce38 100644 --- a/offchainreporting2plus/ocrintegrationtesthelpers/in_memory_key_value_database.go +++ b/offchainreporting2plus/ocrintegrationtesthelpers/in_memory_key_value_database.go @@ -58,7 +58,7 @@ func (StatelessInMemoryKeyValueDatabaseFactory) NewKeyValueDatabaseIfExists(type // NewKeyValueDatabaseIfExists call for the same configDigest returns a database // with the contents written before Close. Enforces exclusive access: // NewKeyValueDatabase* returns an error if a database for that configDigest has -// been opened but not yet closed. Use [ForgetKeyValueDatabaseForTests] to make +// been opened but not yet closed. Use [StatefulInMemoryKeyValueDatabaseFactory.ForgetKeyValueDatabaseForTests] to make // the factory forget about a configDigest (akin to deleting the database from // the filesystem for a disk-based implementation). type StatefulInMemoryKeyValueDatabaseFactory struct { @@ -88,7 +88,7 @@ func (f *StatefulInMemoryKeyValueDatabaseFactory) NewKeyValueDatabaseIfExists(co // database; NewKeyValueDatabaseIfExists will return ErrKeyValueDatabaseDoesNotExist. // Any currently open database for this configDigest is unaffected and continues // to work, but the exclusivity guarantee is lost: the database opened before -// ForgetKeyValueDatabaseForTests and the database opened after can be open +// [StatefulInMemoryKeyValueDatabaseFactory.ForgetKeyValueDatabaseForTests] and the database opened after can be open // simultaneously. func (f *StatefulInMemoryKeyValueDatabaseFactory) ForgetKeyValueDatabaseForTests(configDigest types.ConfigDigest) { f.mu.Lock() @@ -140,8 +140,8 @@ type InMemoryKeyValueDatabase struct { var _ ocr3_1types.KeyValueDatabase = &InMemoryKeyValueDatabase{} // NewInMemoryKeyValueDatabase creates a standalone in-memory database without -// factory tracking. Use NewInMemoryKeyValueDatabaseFactory for exclusive access -// and persistence semantics. +// factory tracking. Use [NewStatefulInMemoryKeyValueDatabaseFactory] for +// exclusive access and persistence semantics. func NewInMemoryKeyValueDatabase() *InMemoryKeyValueDatabase { return newInMemoryKeyValueDatabaseWithTree(btree.NewG(32, func(a, b item) bool { return a.Less(b) diff --git a/offchainreporting2plus/ocrintegrationtesthelpers/mock_contract_config_tracker.go b/offchainreporting2plus/ocrintegrationtesthelpers/mock_contract_config_tracker.go index 15c373f6..8855e92b 100644 --- a/offchainreporting2plus/ocrintegrationtesthelpers/mock_contract_config_tracker.go +++ b/offchainreporting2plus/ocrintegrationtesthelpers/mock_contract_config_tracker.go @@ -5,7 +5,6 @@ import ( "fmt" "sync" - "github.com/smartcontractkit/libocr/offchainreporting2plus/internal/config" "github.com/smartcontractkit/libocr/offchainreporting2plus/types" ) @@ -35,14 +34,9 @@ func NewMockContractConfigTracker(offchainConfigDigester types.OffchainConfigDig return &MockContractConfigTracker{offchainConfigDigester, sync.RWMutex{}, configs} } -func (c *MockContractConfigTracker) SetConfig(identities []config.OracleIdentity, f uint8, onchainConfig []byte, offchainConfigVersion uint64, offchainConfig []byte) error { - signers := make([]types.OnchainPublicKey, len(identities)) - transmitters := make([]types.Account, len(identities)) - for i, id := range identities { - signers[i] = id.OnchainPublicKey - transmitters[i] = id.TransmitAccount - } - +// SetConfig does not perform *any* validity checks on the supplied config. You +// are responsible for ensuring that the config is valid for your use case. +func (c *MockContractConfigTracker) SetConfig(signers []types.OnchainPublicKey, transmitters []types.Account, f uint8, onchainConfig []byte, offchainConfigVersion uint64, offchainConfig []byte) error { c.mu.Lock() defer c.mu.Unlock() config := types.ContractConfig{types.ConfigDigest{}, uint64(len(c.configs)), signers, transmitters, f, onchainConfig, offchainConfigVersion, offchainConfig} diff --git a/quorumhelper/quorumhelper.go b/quorumhelper/quorumhelper.go index 68c1407a..00b1dc74 100644 --- a/quorumhelper/quorumhelper.go +++ b/quorumhelper/quorumhelper.go @@ -16,6 +16,13 @@ const ( // Guarantees that all sets of observations overlap in at least one honest oracle QuorumByzQuorum // Maximal number of observations we can rely on being available + // + // We discourage use of this quorum for OCR ReportingPlugins. Unlike the quorums + // used in OCR protocol logic, this quorum is not monotone in f (i.e. decreasing + // f increases the threshold n−f) and thus the common practice of + // setting a non-maximum f (i.e. choosing f s.t. 3f+1 < n) to improve + // availability in case of crash-faults while reducing byzantine-fault + // tolerance would actually *reduce* availability when this quorum is used. QuorumNMinusF )