From bb5dc317f3ee5399e3425817a2908fc67075a08d Mon Sep 17 00:00:00 2001 From: De Clercq Wentzel <10665586+wentzeld@users.noreply.github.com> Date: Mon, 27 Apr 2026 10:56:33 -0700 Subject: [PATCH] feat(consensus/ocr3): add OCR3_1 plugin scaffold + libocr bump Adds parallel OCR3_1 implementation alongside the existing OCR3 plugin so the CRE consensus DON can move bulk observation payloads onto blobs (>512 KiB customer payloads no longer fit under OCR3_1's halved cap). - New files: factory_ocr3_1.go, ocr3_1.go, reporting_plugin_ocr3_1.go (+ test). End-to-end blob path wired (Observation broadcasts, ValidateObservation parses handles, StateTransition fetches with materializeBlobs). - Versioned KV via OutcomeEnvelope{Version:1}; reads reject unknown versions. - Deterministic encoder tiebreak fix (count desc, sha asc) for the OCR3 map-iteration bug. - Proto extended: BlobbedObservation, OutcomeEnvelope, plus 7 OCR3_1-only ReportingPluginConfig fields including the three mandatory blob/KV rate-limits. - libocr bumped to v0.0.0-20260403184524-b6409238958d to align with chainlink. Canonical names: ReportingPluginInfo1, KeyValueStateReader/ReadWriter, *Bytes limit fields. - Verified: go build/vet/test ./... clean (294 packages). --- go.mod | 6 +- go.sum | 14 +- .../consensus/ocr3/factory_ocr3_1.go | 167 ++++ pkg/capabilities/consensus/ocr3/ocr3_1.go | 114 +++ .../consensus/ocr3/reporting_plugin_ocr3_1.go | 841 ++++++++++++++++++ .../ocr3/reporting_plugin_ocr3_1_test.go | 88 ++ .../ocr3/types/ocr3_config_types.pb.go | 92 +- .../ocr3/types/ocr3_config_types.proto | 39 + .../consensus/ocr3/types/ocr3_types.pb.go | 194 +++- .../consensus/ocr3/types/ocr3_types.proto | 35 + 10 files changed, 1563 insertions(+), 27 deletions(-) create mode 100644 pkg/capabilities/consensus/ocr3/factory_ocr3_1.go create mode 100644 pkg/capabilities/consensus/ocr3/ocr3_1.go create mode 100644 pkg/capabilities/consensus/ocr3/reporting_plugin_ocr3_1.go create mode 100644 pkg/capabilities/consensus/ocr3/reporting_plugin_ocr3_1_test.go diff --git a/go.mod b/go.mod index a289bf0fbe..40d7abd9f0 100644 --- a/go.mod +++ b/go.mod @@ -50,7 +50,7 @@ require ( github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260323124644-faea187e6997 github.com/smartcontractkit/freeport v0.1.3-0.20250716200817-cb5dfd0e369e github.com/smartcontractkit/grpc-proxy v0.0.0-20240830132753-a7e17fec5ab7 - github.com/smartcontractkit/libocr v0.0.0-20250912173940-f3ab0246e23d + github.com/smartcontractkit/libocr v0.0.0-20260403184524-b6409238958d github.com/stretchr/testify v1.11.1 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.63.0 go.opentelemetry.io/otel v1.43.0 @@ -84,6 +84,7 @@ require ( ) require ( + github.com/ProjectZKM/Ziren/crates/go-runtime/zkvm_runtime v0.0.0-20251001021608-1fe7b43fc4d6 // indirect github.com/apache/arrow-go/v18 v18.3.1 // indirect github.com/aybabtme/rgbterm v0.0.0-20170906152045-cc83f3b3ce59 // indirect github.com/bahlo/generic-list-go v0.2.0 // indirect @@ -94,6 +95,8 @@ require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.16.1 // indirect github.com/cloudevents/sdk-go/v2 v2.16.1 // indirect + github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 // indirect + github.com/ethereum/go-ethereum v1.17.0 // indirect github.com/fatih/color v1.18.0 // indirect github.com/gabriel-vasile/mimetype v1.4.8 // indirect github.com/go-logr/logr v1.4.3 // indirect @@ -109,6 +112,7 @@ require ( github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 // indirect github.com/hako/durafmt v0.0.0-20200710122514-c0fb7b4da026 // indirect github.com/hashicorp/yamux v0.1.2 // indirect + github.com/holiman/uint256 v1.3.2 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect diff --git a/go.sum b/go.sum index ce466a4d01..2eae1831ad 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,8 @@ filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Masterminds/semver/v3 v3.4.0 h1:Zog+i5UMtVoCU8oKka5P7i9q9HgrJeGzI9SA1Xbatp0= github.com/Masterminds/semver/v3 v3.4.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM= +github.com/ProjectZKM/Ziren/crates/go-runtime/zkvm_runtime v0.0.0-20251001021608-1fe7b43fc4d6 h1:1zYrtlhrZ6/b6SAjLSfKzWtdgqK0U+HtH/VcBWh1BaU= +github.com/ProjectZKM/Ziren/crates/go-runtime/zkvm_runtime v0.0.0-20251001021608-1fe7b43fc4d6/go.mod h1:ioLG6R+5bUSO1oeGSDxOV3FADARuMoytZCSX6MEMQkI= github.com/XSAM/otelsql v0.37.0 h1:ya5RNw028JW0eJW8Ma4AmoKxAYsJSGuNVbC7F1J457A= github.com/XSAM/otelsql v0.37.0/go.mod h1:LHbCu49iU8p255nCn1oi04oX2UjSoRcUMiKEHo2a5qM= github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA= @@ -46,12 +48,18 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/decred/dcrd/crypto/blake256 v1.0.1 h1:7PltbUIQB7u/FfZ39+DGa/ShuMyJ5ilcvdfma9wOH6Y= +github.com/decred/dcrd/crypto/blake256 v1.0.1/go.mod h1:2OfgNZ5wDpcsFmHmCK5gZTPcCXqlm2ArzUIkw9czNJo= +github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 h1:rpfIENRNNilwHwZeG5+P150SMrnNEcHYvcCuK6dPZSg= +github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0= github.com/dominikbraun/graph v0.23.0 h1:TdZB4pPqCLFxYhdyMFb1TBdFxp8XLcJfTTBQucVPgCo= github.com/dominikbraun/graph v0.23.0/go.mod h1:yOjYyogZLY1LSG9E33JWZJiq5k83Qy2C6POAuiViluc= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/ethereum/go-ethereum v1.17.0 h1:2D+1Fe23CwZ5tQoAS5DfwKFNI1HGcTwi65/kRlAVxes= +github.com/ethereum/go-ethereum v1.17.0/go.mod h1:2W3msvdosS/MCWytpqTcqgFiRYbTH59FxDJzqah120o= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM= github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU= @@ -142,6 +150,8 @@ github.com/hashicorp/go-plugin v1.7.0 h1:YghfQH/0QmPNc/AZMTFE3ac8fipZyZECHdDPshf github.com/hashicorp/go-plugin v1.7.0/go.mod h1:BExt6KEaIYx804z8k4gRzRLEvxKVb+kn0NMcihqOqb8= github.com/hashicorp/yamux v0.1.2 h1:XtB8kyFOyHXYVFnwT5C3+Bdo8gArse7j2AQ0DA0Uey8= github.com/hashicorp/yamux v0.1.2/go.mod h1:C+zze2n6e/7wshOZep2A70/aQU6QBRWJO/G6FT1wIns= +github.com/holiman/uint256 v1.3.2 h1:a9EgMPSC1AAaj1SZL5zIQD3WbwTuHrMGOerLjGmM/TA= +github.com/holiman/uint256 v1.3.2/go.mod h1:EOMSn4q6Nyt9P6efbI3bueV4e1b3dGlUCXeiRV4ng7E= github.com/iancoleman/strcase v0.3.0 h1:nTXanmYxhfFAMjZL34Ov6gkzEsSJZ5DbhxWjvSASxEI= github.com/iancoleman/strcase v0.3.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho= github.com/invopop/jsonschema v0.13.0 h1:KvpoAJWEjR3uD9Kbm2HWJmqsEaHt8lBUpd0qHcIi21E= @@ -276,8 +286,8 @@ github.com/smartcontractkit/freeport v0.1.3-0.20250716200817-cb5dfd0e369e h1:Hv9 github.com/smartcontractkit/freeport v0.1.3-0.20250716200817-cb5dfd0e369e/go.mod h1:T4zH9R8R8lVWKfU7tUvYz2o2jMv1OpGCdpY2j2QZXzU= github.com/smartcontractkit/grpc-proxy v0.0.0-20240830132753-a7e17fec5ab7 h1:12ijqMM9tvYVEm+nR826WsrNi6zCKpwBhuApq127wHs= github.com/smartcontractkit/grpc-proxy v0.0.0-20240830132753-a7e17fec5ab7/go.mod h1:FX7/bVdoep147QQhsOPkYsPEXhGZjeYx6lBSaSXtZOA= -github.com/smartcontractkit/libocr v0.0.0-20250912173940-f3ab0246e23d h1:LokA9PoCNb8mm8mDT52c3RECPMRsGz1eCQORq+J3n74= -github.com/smartcontractkit/libocr v0.0.0-20250912173940-f3ab0246e23d/go.mod h1:Acy3BTBxou83ooMESLO90s8PKSu7RvLCzwSTbxxfOK0= +github.com/smartcontractkit/libocr v0.0.0-20260403184524-b6409238958d h1:PvXor5Fjer7FIONSqYXbpd1LkA14hWrlAyxXzOrC9t8= +github.com/smartcontractkit/libocr v0.0.0-20260403184524-b6409238958d/go.mod h1:PLdNK6GlqfxIWXzziPkU7dCAVlVFeYkyyW7AQY0R+4Q= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= diff --git a/pkg/capabilities/consensus/ocr3/factory_ocr3_1.go b/pkg/capabilities/consensus/ocr3/factory_ocr3_1.go new file mode 100644 index 0000000000..95b828138c --- /dev/null +++ b/pkg/capabilities/consensus/ocr3/factory_ocr3_1.go @@ -0,0 +1,167 @@ +package ocr3 + +import ( + "context" + "time" + + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/durationpb" + + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3_1types" + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/consensus/ocr3/types" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/consensus/requests" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" +) + +// OCR3_1 observation/report bytes defaults. Held below the libocr hard caps: +// MaxMaxObservationBytes = 512 KiB (halved vs OCR3) +// MaxMaxQueryBytes = 512 KiB +// MaxMaxReportBytes = 5 MiB +// Any DON offchain config that exceeds these will fail ReportingPluginInfo +// validation at factory time — preflight rotation (plan §3.7) is mandatory. +const ( + defaultMaxObservationBytesOCR3_1 = 400 * 1024 // 400 KiB (~80% of the 512 KiB cap) + defaultMaxQueryBytesOCR3_1 = 400 * 1024 + defaultMaxReportsPlusPrecursorBytesOCR3_1 = 1 * 1024 * 1024 // 1 MiB — small, precursor only + defaultMaxReportBytesOCR3_1 = 1 * 1024 * 1024 + defaultMaxReportCountOCR3_1 = 20 + + // KV write budget. Bounded by batch size × AggregationOutcome size. + // Well below the libocr caps (10_000 keys / 10 MiB). + defaultMaxKeyValueModifiedKeysOCR3_1 = 1024 + defaultMaxKeyValueModifiedKeysPlusValuesBytesOCR3_1 = 4 * 1024 * 1024 + + // Blob limits. v1 uses blobs for observation payloads only. + defaultMaxBlobPayloadBytesOCR3_1 = 1 * 1024 * 1024 // 1 MiB per blob + defaultMaxPerOracleUnexpiredBlobCountOCR3_1 = 500 + defaultMaxPerOracleUnexpiredBlobCumulativePayloadBytesOCR3_1 = 500 * 1024 * 1024 +) + +type factoryOCR3_1 struct { + store *requests.Store[*ReportRequest] + capability *capability + lggr logger.Logger + + services.StateMachine +} + +func newFactoryOCR3_1( + s *requests.Store[*ReportRequest], + c *capability, + lggr logger.Logger, +) (*factoryOCR3_1, error) { + return &factoryOCR3_1{ + store: s, + capability: c, + lggr: logger.Named(lggr, "OCR3_1ReportingPluginFactory"), + }, nil +} + +// NewReportingPlugin implements ocr3_1types.ReportingPluginFactory[[]byte]. +// The BlobBroadcastFetcher must not be captured long-term; libocr only +// guarantees it within method scopes (see ocr3_1types/plugin.go doc). We +// deliberately do not stash it on the factory — each method on the plugin +// receives it fresh. +func (o *factoryOCR3_1) NewReportingPlugin( + _ context.Context, + config ocr3types.ReportingPluginConfig, + _ ocr3_1types.BlobBroadcastFetcher, +) (ocr3_1types.ReportingPlugin[[]byte], ocr3_1types.ReportingPluginInfo, error) { + var configProto types.ReportingPluginConfig + if err := proto.Unmarshal(config.OffchainConfig, &configProto); err != nil { + return nil, ocr3_1types.ReportingPluginInfo1{}, err + } + + // Defaults: OCR3_1 caps are tighter than OCR3, so we cannot inherit the + // OCR3 1 MiB defaults. Any value the operator supplied is kept; zero + // values are filled with OCR3_1-safe defaults. + if configProto.MaxQueryLengthBytes <= 0 { + configProto.MaxQueryLengthBytes = defaultMaxQueryBytesOCR3_1 + } + if configProto.MaxObservationLengthBytes <= 0 { + configProto.MaxObservationLengthBytes = defaultMaxObservationBytesOCR3_1 + } + if configProto.MaxOutcomeLengthBytes <= 0 { + configProto.MaxOutcomeLengthBytes = defaultMaxReportsPlusPrecursorBytesOCR3_1 + } + if configProto.MaxReportLengthBytes <= 0 { + configProto.MaxReportLengthBytes = defaultMaxReportBytesOCR3_1 + } + if configProto.MaxReportCount <= 0 { + configProto.MaxReportCount = defaultMaxReportCountOCR3_1 + } + if configProto.OutcomePruningThreshold <= 0 { + configProto.OutcomePruningThreshold = defaultOutcomePruningThreshold + } + if configProto.RequestTimeout == nil { + configProto.RequestTimeout = durationpb.New(defaultRequestExpiry) + } + // OCR3_1-only fields: honor operator-supplied values; fall back to + // defaults when unset. Keeps OCR3 offchain configs forward-compatible. + if configProto.MaxReportsPlusPrecursorBytes == 0 { + configProto.MaxReportsPlusPrecursorBytes = defaultMaxReportsPlusPrecursorBytesOCR3_1 + } + if configProto.MaxKeyValueModifiedKeysPlusValuesBytes == 0 { + configProto.MaxKeyValueModifiedKeysPlusValuesBytes = defaultMaxKeyValueModifiedKeysPlusValuesBytesOCR3_1 + } + if configProto.MaxBlobPayloadBytes == 0 { + configProto.MaxBlobPayloadBytes = defaultMaxBlobPayloadBytesOCR3_1 + } + if configProto.BlobExpirationK == 0 { + configProto.BlobExpirationK = defaultBlobExpirationK + } + if configProto.MaxKeyValueModifiedKeys == 0 { + configProto.MaxKeyValueModifiedKeys = defaultMaxKeyValueModifiedKeysOCR3_1 + } + if configProto.MaxPerOracleUnexpiredBlobCount == 0 { + configProto.MaxPerOracleUnexpiredBlobCount = defaultMaxPerOracleUnexpiredBlobCountOCR3_1 + } + if configProto.MaxPerOracleUnexpiredBlobCumulativePayloadBytes == 0 { + configProto.MaxPerOracleUnexpiredBlobCumulativePayloadBytes = defaultMaxPerOracleUnexpiredBlobCumulativePayloadBytesOCR3_1 + } + o.capability.setRequestTimeout(configProto.RequestTimeout.AsDuration()) + + rp, err := newReportingPluginOCR3_1(o.store, o.capability, config, &configProto, o.lggr) + if err != nil { + return nil, ocr3_1types.ReportingPluginInfo1{}, err + } + + info := ocr3_1types.ReportingPluginInfo1{ + Name: "OCR3_1 CRE Consensus Plugin", + Limits: ocr3_1types.ReportingPluginLimits{ + MaxQueryBytes: int(configProto.MaxQueryLengthBytes), + MaxObservationBytes: int(configProto.MaxObservationLengthBytes), + MaxReportsPlusPrecursorBytes: int(configProto.MaxReportsPlusPrecursorBytes), + MaxReportBytes: int(configProto.MaxReportLengthBytes), + MaxReportCount: int(configProto.MaxReportCount), + + MaxKeyValueModifiedKeys: int(configProto.MaxKeyValueModifiedKeys), + MaxKeyValueModifiedKeysPlusValuesBytes: int(configProto.MaxKeyValueModifiedKeysPlusValuesBytes), + + MaxBlobPayloadBytes: int(configProto.MaxBlobPayloadBytes), + MaxPerOracleUnexpiredBlobCount: int(configProto.MaxPerOracleUnexpiredBlobCount), + MaxPerOracleUnexpiredBlobCumulativePayloadBytes: int(configProto.MaxPerOracleUnexpiredBlobCumulativePayloadBytes), + }, + } + return rp, info, nil +} + +func (o *factoryOCR3_1) Start(ctx context.Context) error { + return o.StartOnce("OCR3_1ReportingPlugin", func() error { return nil }) +} + +func (o *factoryOCR3_1) Close() error { + return o.StopOnce("OCR3_1ReportingPlugin", func() error { return nil }) +} + +func (o *factoryOCR3_1) Name() string { return o.lggr.Name() } +func (o *factoryOCR3_1) HealthReport() map[string]error { return map[string]error{o.Name(): o.Healthy()} } + +// Ensure factoryOCR3_1 satisfies the libocr interface. +var _ ocr3_1types.ReportingPluginFactory[[]byte] = (*factoryOCR3_1)(nil) + +// ensure time import is retained regardless of future edits +var _ = time.Second diff --git a/pkg/capabilities/consensus/ocr3/ocr3_1.go b/pkg/capabilities/consensus/ocr3/ocr3_1.go new file mode 100644 index 0000000000..ac46a8e78e --- /dev/null +++ b/pkg/capabilities/consensus/ocr3/ocr3_1.go @@ -0,0 +1,114 @@ +package ocr3 + +import ( + "context" + "errors" + "time" + + "github.com/jonboulle/clockwork" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/consensus/ocr3/types" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/consensus/requests" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/loop" + "github.com/smartcontractkit/chainlink-common/pkg/loop/reportingplugins" + "github.com/smartcontractkit/chainlink-common/pkg/types/core" +) + +// CapabilityOCR3_1 is the OCR3_1 entry point, parallel to Capability in +// ocr3.go. It is intentionally its own type so the OCR3 path remains +// untouched during the staged rollout (plan §3.8). +// +// Unlike the OCR3 Capability, this one does not implement the LOOP +// ProviderServer interface in v1 — following the Vault precedent where the +// OCR3_1 plugin is instantiated directly in-process rather than over LOOP's +// gRPC boundary. Adding a LOOP sibling is a separate follow-up (plan §3.12). +type CapabilityOCR3_1 struct { + loop.Plugin + reportingplugins.PluginProviderServer + config Config + capabilityRegistry core.CapabilitiesRegistry +} + +// NewOCR3_1 constructs the OCR3_1 capability using the same Config shape as +// NewOCR3. Defaults mirror the OCR3 path so migration does not require +// reconfiguring the caller-supplied fields. +func NewOCR3_1(config Config) *CapabilityOCR3_1 { + if config.RequestTimeout == nil { + dre := defaultRequestExpiry + config.RequestTimeout = &dre + } + if config.SendBufferSize == 0 { + config.SendBufferSize = defaultSendBufferSize + } + if config.clock == nil { + config.clock = clockwork.NewRealClock() + } + if config.store == nil { + config.store = requests.NewStore[*ReportRequest]() + } + if config.capability == nil { + ci := NewCapability( + config.store, + config.clock, + *config.RequestTimeout, + config.AggregatorFactory, + config.EncoderFactory, + config.Logger, + config.SendBufferSize, + ) + config.capability = ci + } + cp := &CapabilityOCR3_1{ + Plugin: loop.Plugin{Logger: config.Logger}, + PluginProviderServer: reportingplugins.PluginProviderServer{}, + config: config, + } + cp.SubService(config.capability) + return cp +} + +// NewReportingPluginFactoryOCR3_1 returns the OCR3_1 factory directly +// (*factoryOCR3_1 implements ocr3_1types.ReportingPluginFactory[[]byte]). +// +// Callers that drive libocr's OCR3_1 oracle harness should use this entry +// point. The integration-test framework in chainlink wires through here. +func (o *CapabilityOCR3_1) NewReportingPluginFactoryOCR3_1( + ctx context.Context, + _ core.ReportingPluginServiceConfig, + capabilityRegistry core.CapabilitiesRegistry, +) (*factoryOCR3_1, error) { + f, err := newFactoryOCR3_1(o.config.store, o.config.capability, o.config.Logger) + if err != nil { + return nil, err + } + if err := capabilityRegistry.Add(ctx, o.config.capability); err != nil { + return nil, err + } + o.capabilityRegistry = capabilityRegistry + return f, nil +} + +// NewValidationServiceOCR3_1 mirrors the OCR3 validation-service entry. +// No behavioral difference — validation is over offchain config bytes which +// share a schema across OCR3 / OCR3_1 (with the new OCR3_1 fields additive). +func (o *CapabilityOCR3_1) NewValidationServiceOCR3_1(ctx context.Context) (core.ValidationService, error) { + s := &validationService{lggr: o.Logger} + o.SubService(s) + return s, nil +} + +func (o *CapabilityOCR3_1) Close() error { + err := o.Plugin.Close() + if o.capabilityRegistry != nil { + err = errors.Join(err, o.capabilityRegistry.Remove(context.TODO(), o.config.capability.ID)) + } + return err +} + +// ensure unused imports are retained against future additions +var ( + _ = time.Second + _ *types.ReportingPluginConfig + _ = logger.Nop +) diff --git a/pkg/capabilities/consensus/ocr3/reporting_plugin_ocr3_1.go b/pkg/capabilities/consensus/ocr3/reporting_plugin_ocr3_1.go new file mode 100644 index 0000000000..a4a5ef93bc --- /dev/null +++ b/pkg/capabilities/consensus/ocr3/reporting_plugin_ocr3_1.go @@ -0,0 +1,841 @@ +package ocr3 + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "slices" + "sort" + "time" + + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/structpb" + "google.golang.org/protobuf/types/known/timestamppb" + + ocrcommon "github.com/smartcontractkit/libocr/commontypes" + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3_1types" + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" + "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + "github.com/smartcontractkit/libocr/quorumhelper" + + "github.com/smartcontractkit/chainlink-protos/cre/go/values" + "github.com/smartcontractkit/chainlink-protos/cre/go/values/pb" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/consensus/requests" + pbtypes "github.com/smartcontractkit/chainlink-common/pkg/capabilities/consensus/ocr3/types" + "github.com/smartcontractkit/chainlink-common/pkg/logger" +) + +// KV key layout (v1): +// outcomes/ -> marshalled pbtypes.AggregationOutcome +// +// Future versions must introduce a prefix bump (e.g. "v2/outcomes/") so we can +// detect at read time and migrate. Do not reuse the v1 prefix with a new +// value schema. +const ( + kvPrefixOutcomesV1 = "outcomes/" + + // outcomeEnvelopeVersionV1 stamps the currently-written KV envelope + // schema. Bump together with kvPrefixOutcomesV1 when the AggregationOutcome + // payload shape changes incompatibly. A reader must check the version + // field and refuse to deserialize unknown values rather than blindly + // proto-unmarshal (proto unmarshal of a future schema into the v1 struct + // silently drops added fields, which breaks determinism). + outcomeEnvelopeVersionV1 uint32 = 1 + + // defaultBlobExpirationK bounds how many seqNrs a blob is guaranteed to + // survive past broadcast. Must cover the full round lifetime + any epoch + // retry window. See plan §3.4. Per-DON override via offchain config. + defaultBlobExpirationK uint64 = 60 +) + +var _ ocr3_1types.ReportingPlugin[[]byte] = (*reportingPluginOCR3_1)(nil) + +type reportingPluginOCR3_1 struct { + s *requests.Store[*ReportRequest] + r CapabilityIface + config ocr3types.ReportingPluginConfig + limits *pbtypes.ReportingPluginConfig + // blobBroadcastFetcher is captured at factory time but used only from + // Query/Observation. Per libocr rules, it must not escape the scope of + // the method that receives it. + blobExpirationK uint64 + lggr logger.Logger +} + +func newReportingPluginOCR3_1( + s *requests.Store[*ReportRequest], + r CapabilityIface, + config ocr3types.ReportingPluginConfig, + limits *pbtypes.ReportingPluginConfig, + lggr logger.Logger, +) (*reportingPluginOCR3_1, error) { + // BlobExpirationK is populated by the factory from the (now-extended) + // ReportingPluginConfig proto. A zero value here would indicate the + // factory failed to default it — fall back defensively. + k := limits.BlobExpirationK + if k == 0 { + k = defaultBlobExpirationK + } + return &reportingPluginOCR3_1{ + s: s, + r: r, + config: config, + limits: limits, + blobExpirationK: k, + lggr: logger.Named(lggr, "OCR3_1ConsensusReportingPlugin"), + }, nil +} + +// Query selects a batch of pending requests to seek consensus on this round. +// Under OCR3_1 the per-request values.List payload moves to blobs; the Query +// still carries the lightweight Id list since it is sent by the leader once. +func (r *reportingPluginOCR3_1) Query( + ctx context.Context, + seqNr uint64, + kvReader ocr3_1types.KeyValueStateReader, + blobBroadcastFetcher ocr3_1types.BlobBroadcastFetcher, +) (types.Query, error) { + // Batching is bounded by MaxQueryLengthBytes; MaxBatchSize is deprecated + // (see chainlink-deployments zone-b TOML comment). + batch, err := r.s.FirstN(defaultBatchSize) + if err != nil { + r.lggr.Errorw("could not retrieve batch", "error", err) + return nil, err + } + + ids := make([]*pbtypes.Id, 0, len(batch)) + allExecutionIDs := make([]string, 0, len(batch)) + seenIds := make(map[idKey]bool) + cachedQuerySize := 0 + + for _, rq := range batch { + key := GetIDKey(rq) + if seenIds[key] { + continue + } + newId := &pbtypes.Id{ + WorkflowExecutionId: rq.WorkflowExecutionID, + WorkflowId: rq.WorkflowID, + WorkflowOwner: rq.WorkflowOwner, + WorkflowName: rq.WorkflowName, + WorkflowDonId: rq.WorkflowDonID, + WorkflowDonConfigVersion: rq.WorkflowDonConfigVersion, + ReportId: rq.ReportID, + KeyId: rq.KeyID, + } + ok, newSize := QueryBatchHasCapacity(cachedQuerySize, newId, int(r.limits.MaxQueryLengthBytes)) + if !ok { + break + } + seenIds[key] = true + ids = append(ids, newId) + allExecutionIDs = append(allExecutionIDs, rq.WorkflowExecutionID) + cachedQuerySize = newSize + } + + r.lggr.Debugw("Query complete", "seqNr", seqNr, "len", len(ids), "allExecutionIDs", allExecutionIDs) + return proto.MarshalOptions{Deterministic: true}.Marshal(&pbtypes.Query{Ids: ids}) +} + +// Observation gathers local data for the Query ids, serializes the bulk +// payload (the existing Observations proto, unchanged), broadcasts it as a +// blob, and returns a small on-wire BlobbedObservation carrying the blob +// handle and lightweight metadata. This keeps the on-wire observation well +// under the 512 KiB OCR3_1 cap regardless of per-request payload size. +func (r *reportingPluginOCR3_1) Observation( + ctx context.Context, + seqNr uint64, + aq types.AttributedQuery, + kvReader ocr3_1types.KeyValueStateReader, + blobBroadcastFetcher ocr3_1types.BlobBroadcastFetcher, +) (types.Observation, error) { + queryReq := &pbtypes.Query{} + if err := proto.Unmarshal(aq.Query, queryReq); err != nil { + return nil, err + } + + weids := make([]string, 0, len(queryReq.Ids)) + for _, q := range queryReq.Ids { + if q == nil { + continue + } + weids = append(weids, q.WorkflowExecutionId) + } + + reqs := r.s.GetByIDs(weids) + reqMap := make(map[string]*ReportRequest, len(reqs)) + for _, req := range reqs { + reqMap[req.WorkflowExecutionID] = req + } + + nowTs := timestamppb.New(time.Now()) + regIDs := r.r.GetRegisteredWorkflowsIDs() + + // Blob payload = the full Observations proto. Same shape as OCR3, now + // moved off the consensus channel. + payload := &pbtypes.Observations{ + RegisteredWorkflowIds: regIDs, + Timestamp: nowTs, + } + // On-wire ids mirror the payload's Observation.Id list. Built in lockstep. + onWireIds := make([]*pbtypes.Id, 0, len(weids)) + allExecutionIDs := make([]string, 0, len(weids)) + + for _, weid := range weids { + rq, ok := reqMap[weid] + if !ok { + continue + } + listProto := values.Proto(rq.Observations).GetListValue() + if listProto == nil { + r.lggr.Errorw("observations are not a list", "executionID", rq.WorkflowExecutionID) + continue + } + var cfgProto *pb.Map + if rq.OverriddenEncoderConfig != nil { + cfgProto = values.Proto(rq.OverriddenEncoderConfig).GetMapValue() + } + id := &pbtypes.Id{ + WorkflowExecutionId: rq.WorkflowExecutionID, + WorkflowId: rq.WorkflowID, + WorkflowOwner: rq.WorkflowOwner, + WorkflowName: rq.WorkflowName, + WorkflowDonId: rq.WorkflowDonID, + WorkflowDonConfigVersion: rq.WorkflowDonConfigVersion, + ReportId: rq.ReportID, + KeyId: rq.KeyID, + } + payload.Observations = append(payload.Observations, &pbtypes.Observation{ + Id: id, + Observations: listProto, + OverriddenEncoderName: rq.OverriddenEncoderName, + OverriddenEncoderConfig: cfgProto, + }) + onWireIds = append(onWireIds, id) + allExecutionIDs = append(allExecutionIDs, rq.WorkflowExecutionID) + } + + payloadBytes, err := proto.MarshalOptions{Deterministic: true}.Marshal(payload) + if err != nil { + return nil, fmt.Errorf("marshal blob payload: %w", err) + } + + // BroadcastBlob MUST complete within MaxDurationObservation minus + // marshalling slack. The outer libocr context carries the deadline; + // honor it rather than imposing our own timeout. + hint := ocr3_1types.BlobExpirationHintSequenceNumber{SeqNr: seqNr + r.blobExpirationK} + handle, err := blobBroadcastFetcher.BroadcastBlob(ctx, payloadBytes, hint) + if err != nil { + // A broadcast failure here means this node's observation will be + // missing from the round. Log loudly; quorum assessment is on the + // libocr side. + r.lggr.Errorw("blob broadcast failed — observation will be dropped from round", + "seqNr", seqNr, "payloadBytes", len(payloadBytes), "error", err) + return nil, err + } + // BlobHandle is not a proto message — it uses encoding.BinaryMarshaler. + handleBytes, err := handle.MarshalBinary() + if err != nil { + return nil, fmt.Errorf("marshal blob handle: %w", err) + } + + wire := &pbtypes.BlobbedObservation{ + BlobHandle: handleBytes, + Ids: onWireIds, + Timestamp: nowTs, + RegisteredWorkflowIds: regIDs, + } + wireBytes, err := proto.MarshalOptions{Deterministic: true}.Marshal(wire) + if err != nil { + return nil, err + } + r.lggr.Debugw("Observation complete", + "seqNr", seqNr, + "len", len(payload.Observations), + "payloadBytes", len(payloadBytes), + "wireBytes", len(wireBytes), + "expirationSeqNr", seqNr+r.blobExpirationK, + "allExecutionIDs", allExecutionIDs) + return wireBytes, nil +} + +// ValidateObservation rejects observations that are malformed, have an +// unparseable blob handle, or reference missing blobs. A non-nil return here +// drops this observation from the round's quorum assessment. +// +// Fetching the blob here is expensive; we only do the parse checks +// (BlobbedObservation unmarshals, handle unmarshals, ids non-empty). The +// actual blob fetch happens in StateTransition where cost is already paid. +// Trade-off: a byzantine node could broadcast garbage blob bytes under a +// valid-looking handle and waste fetch cycles in StateTransition. The cost +// is bounded by MaxMaxBlobPayloadLength × quorum, well within budget. +func (r *reportingPluginOCR3_1) ValidateObservation( + ctx context.Context, + seqNr uint64, + aq types.AttributedQuery, + ao types.AttributedObservation, + kvReader ocr3_1types.KeyValueStateReader, + blobFetcher ocr3_1types.BlobFetcher, +) error { + wire := &pbtypes.BlobbedObservation{} + if err := proto.Unmarshal(ao.Observation, wire); err != nil { + return fmt.Errorf("unmarshal BlobbedObservation: %w", err) + } + if len(wire.BlobHandle) == 0 { + return fmt.Errorf("empty blob handle in observation from oracle %d", ao.Observer) + } + handle := ocr3_1types.BlobHandle{} + if err := handle.UnmarshalBinary(wire.BlobHandle); err != nil { + return fmt.Errorf("unmarshal blob handle: %w", err) + } + // Ids may be empty (a node with no registered workflows broadcasts an + // empty observation on purpose); do not reject on that. + return nil +} + +func (r *reportingPluginOCR3_1) ObservationQuorum( + ctx context.Context, + seqNr uint64, + aq types.AttributedQuery, + aos []types.AttributedObservation, + kvReader ocr3_1types.KeyValueStateReader, + blobFetcher ocr3_1types.BlobFetcher, +) (bool, error) { + return quorumhelper.ObservationCountReachesObservationQuorum(quorumhelper.QuorumTwoFPlusOne, r.config.N, r.config.F, aos), nil +} + +// StateTransition replaces OCR3's Outcome method. It: +// - reads every existing outcome key from KV (what OCR3 carried forward in +// PreviousOutcome.Outcomes) +// - aggregates this round's observations per workflow +// - writes the updated AggregationOutcome back to KV +// - prunes workflows not seen for OutcomePruningThreshold rounds (via +// Delete, collected post-iteration per kvdb.go:34) +// - emits a ReportsPlusPrecursor carrying the per-workflow reports Reports() +// needs, since Reports() has no KV access +func (r *reportingPluginOCR3_1) StateTransition( + ctx context.Context, + seqNr uint64, + aq types.AttributedQuery, + aos []types.AttributedObservation, + kvReadWriter ocr3_1types.KeyValueStateReadWriter, + blobFetcher ocr3_1types.BlobFetcher, +) (ocr3_1types.ReportsPlusPrecursor, error) { + // Materialize blobs: replace each AO's on-wire BlobbedObservation bytes + // with the fetched Observations payload bytes so the existing grouping + // logic can run unchanged. + materializedAos, fetchDropped := r.materializeBlobs(ctx, aos, blobFetcher) + if fetchDropped > 0 { + r.lggr.Warnw("dropped observations due to blob fetch failure", + "seqNr", seqNr, "dropped", fetchDropped, "remaining", len(materializedAos)) + } + + execIDToOracleObservations, seenWorkflowIDs, execIDToEncoderShaToCount, shaToEncoder, finalTimestamp, err := + r.groupObservations(materializedAos) + if err != nil { + return nil, err + } + + q := &pbtypes.Query{} + if err := proto.Unmarshal(aq.Query, q); err != nil { + return nil, err + } + + // Load previous AggregationOutcomes for every workflow referenced this + // round, plus any existing keys we need to consider for pruning. + previousOutcomes, err := r.loadAllOutcomes(kvReadWriter) + if err != nil { + return nil, fmt.Errorf("load outcomes from KV: %w", err) + } + + currentReports := make([]*pbtypes.Report, 0, len(q.Ids)) + allExecutionIDs := make([]string, 0, len(q.Ids)) + cachedReportSize := 0 + + for _, weid := range q.Ids { + if weid == nil { + continue + } + lggr := logger.With(r.lggr, "executionID", weid.WorkflowExecutionId, "workflowID", weid.WorkflowId) + + obs, ok := execIDToOracleObservations[weid.WorkflowExecutionId] + if !ok { + continue + } + if len(obs) < (2*r.config.F + 1) { + continue + } + + agg, err := r.r.GetAggregator(weid.WorkflowId) + if err != nil { + lggr.Errorw("could not retrieve aggregator for workflow", "error", err) + continue + } + + prev := previousOutcomes[weid.WorkflowId] + outcome, err := agg.Aggregate(lggr, prev, obs, r.config.F) + if err != nil { + lggr.Errorw("error aggregating outcome", "error", err) + continue + } + + if prev != nil { + outcome.LastSeenAt = prev.LastSeenAt + } + outcome.Timestamp = finalTimestamp + + // Deterministic encoder-override tiebreak (fix for the OCR3 map-iteration + // bug at reporting_plugin.go:396-407). Sort by (count desc, sha asc) + // before picking the first entry that reaches 2F+1. + if enc := pickEncoderDeterministic( + execIDToEncoderShaToCount[weid.WorkflowExecutionId], + shaToEncoder, + 2*r.config.F+1, + ); enc != nil { + outcome.EncoderName = enc.name + outcome.EncoderConfig = enc.config + } + + report := &pbtypes.Report{Outcome: outcome, Id: weid} + ok, newSize := ReportBatchHasCapacity(cachedReportSize, report, int(r.limits.MaxOutcomeLengthBytes)) + if !ok { + break + } + currentReports = append(currentReports, report) + allExecutionIDs = append(allExecutionIDs, weid.WorkflowExecutionId) + cachedReportSize = newSize + + previousOutcomes[weid.WorkflowId] = outcome + } + + // Pruning: collect delete-targets first (kvdb.go:34 forbids mutation + // during iteration, so we already drained the Range in loadAllOutcomes). + // Then apply Write/Delete. + toDelete := make([]string, 0) + for workflowID, outcome := range previousOutcomes { + if seenWorkflowIDs[workflowID] >= (r.config.F + 1) { + outcome.LastSeenAt = seqNr + continue + } + if seqNr-outcome.LastSeenAt > r.limits.OutcomePruningThreshold { + toDelete = append(toDelete, workflowID) + } + } + // Deterministic order for writes (maps iterate randomly; under OCR3_1 + // divergent KV mutation order across nodes would cascade). + writeIDs := make([]string, 0, len(previousOutcomes)) + for workflowID := range previousOutcomes { + writeIDs = append(writeIDs, workflowID) + } + sort.Strings(writeIDs) + for _, workflowID := range writeIDs { + if containsString(toDelete, workflowID) { + continue + } + envelope := &pbtypes.OutcomeEnvelope{ + Version: outcomeEnvelopeVersionV1, + Outcome: previousOutcomes[workflowID], + } + val, err := proto.MarshalOptions{Deterministic: true}.Marshal(envelope) + if err != nil { + return nil, fmt.Errorf("marshal outcome envelope for %s: %w", workflowID, err) + } + if err := kvReadWriter.Write([]byte(kvPrefixOutcomesV1+workflowID), val); err != nil { + return nil, fmt.Errorf("kv write %s: %w", workflowID, err) + } + } + sort.Strings(toDelete) + for _, workflowID := range toDelete { + if err := kvReadWriter.Delete([]byte(kvPrefixOutcomesV1 + workflowID)); err != nil { + return nil, fmt.Errorf("kv delete %s: %w", workflowID, err) + } + r.r.UnregisterWorkflowID(workflowID) + } + + // Precursor must be self-contained: Reports() has no KV access. + precursor := &pbtypes.Outcome{ + CurrentReports: currentReports, + } + raw, err := proto.MarshalOptions{Deterministic: true}.Marshal(precursor) + if err != nil { + return nil, err + } + + h := sha256.New() + h.Write(raw) + r.lggr.Debugw("StateTransition complete", + "seqNr", seqNr, + "reports", len(currentReports), + "prunedWorkflows", len(toDelete), + "allExecutionIDs", allExecutionIDs, + "precursorHash", hex.EncodeToString(h.Sum(nil))) + return raw, nil +} + +// Committed is best-effort notification; returning an error does NOT abort. +// Use for metrics/logging only. NEVER put load-bearing persistence here. +func (r *reportingPluginOCR3_1) Committed( + ctx context.Context, + seqNr uint64, + kvReader ocr3_1types.KeyValueStateReader, +) error { + r.lggr.Debugw("Committed", "seqNr", seqNr) + return nil +} + +// Reports consumes only the precursor. No KV access here. +func (r *reportingPluginOCR3_1) Reports( + ctx context.Context, + seqNr uint64, + precursor ocr3_1types.ReportsPlusPrecursor, +) ([]ocr3types.ReportPlus[[]byte], error) { + o := &pbtypes.Outcome{} + if err := proto.Unmarshal(precursor, o); err != nil { + return nil, err + } + + reports := make([]ocr3types.ReportPlus[[]byte], 0, len(o.CurrentReports)) + for _, report := range o.CurrentReports { + if report == nil || report.Id == nil || report.Outcome == nil { + continue + } + lggr := logger.With(r.lggr, + "workflowID", report.Id.WorkflowId, + "executionID", report.Id.WorkflowExecutionId, + "shouldReport", report.Outcome.ShouldReport) + + outcome, id := report.Outcome, report.Id + info := &pbtypes.ReportInfo{Id: id, ShouldReport: outcome.ShouldReport} + + var rawReport []byte + if info.ShouldReport { + meta := &pbtypes.Metadata{ + Version: 1, + ExecutionID: id.WorkflowExecutionId, + Timestamp: uint32(outcome.Timestamp.AsTime().Unix()), + DONID: id.WorkflowDonId, + DONConfigVersion: id.WorkflowDonConfigVersion, + WorkflowID: id.WorkflowId, + WorkflowName: id.WorkflowName, + WorkflowOwner: id.WorkflowOwner, + ReportID: id.ReportId, + } + newOutcome, err := pbtypes.AppendMetadata(outcome, meta) + if err != nil { + lggr.Errorw("could not append IDs") + continue + } + + var encoder pbtypes.Encoder + if newOutcome.EncoderName != "" { + encoderConfig, err := values.FromMapValueProto(newOutcome.EncoderConfig) + if err != nil { + lggr.Errorw("could not convert encoder config", "error", err) + } else { + encoder, err = r.r.GetEncoderByName(newOutcome.EncoderName, encoderConfig) + if err != nil { + lggr.Errorw("could not retrieve encoder, falling back to default", "error", err) + } + } + } + if encoder == nil { + var err error + encoder, err = r.r.GetEncoderByWorkflowID(id.WorkflowId) + if err != nil { + lggr.Errorw("could not retrieve encoder for workflow", "error", err) + continue + } + } + + mv, err := values.FromMapValueProto(newOutcome.EncodableOutcome) + if err != nil { + lggr.Errorw("could not decode map from proto", "error", err) + continue + } + rawReport, err = encoder.Encode(ctx, *mv) + if err != nil { + if cerr := ctx.Err(); cerr != nil { + return nil, cerr + } + lggr.Errorw("could not encode report", "error", err) + continue + } + } + + infob, err := marshalReportInfo(info, id.KeyId) + if err != nil { + lggr.Errorw("could not marshal ReportWithInfo", "error", err) + continue + } + reports = append(reports, ocr3types.ReportPlus[[]byte]{ + ReportWithInfo: ocr3types.ReportWithInfo[[]byte]{ + Report: rawReport, + Info: infob, + }, + }) + } + + r.lggr.Debugw("Reports complete", "seqNr", seqNr, "len", len(reports)) + return reports, nil +} + +func (r *reportingPluginOCR3_1) ShouldAcceptAttestedReport( + ctx context.Context, + seqNr uint64, + rwi ocr3types.ReportWithInfo[[]byte], +) (bool, error) { + return true, nil +} + +func (r *reportingPluginOCR3_1) ShouldTransmitAcceptedReport( + ctx context.Context, + seqNr uint64, + rwi ocr3types.ReportWithInfo[[]byte], +) (bool, error) { + return true, nil +} + +func (r *reportingPluginOCR3_1) Close() error { return nil } + +// ---- helpers (private to the ocr3_1 path) ---- + +// materializeBlobs fetches each oracle's blob payload and replaces the AO's +// Observation bytes with the payload bytes. Failures are logged and the AO +// is dropped from the returned slice. +// +// Fetches are done sequentially in v1. A parallel fetch is a post-soak +// optimization; up to 2F+1 fetches per round is tolerable sequentially +// inside StateTransition for current CRE DONs (N≤17). +// +// Determinism: AOs are processed in the order libocr supplied; blob bytes +// are deterministic given the handle, so the resulting group across honest +// nodes is identical. Parallel fetching must also preserve this invariant. +func (r *reportingPluginOCR3_1) materializeBlobs( + ctx context.Context, + aos []types.AttributedObservation, + blobFetcher ocr3_1types.BlobFetcher, +) ([]types.AttributedObservation, int) { + out := make([]types.AttributedObservation, 0, len(aos)) + dropped := 0 + for _, ao := range aos { + wire := &pbtypes.BlobbedObservation{} + if err := proto.Unmarshal(ao.Observation, wire); err != nil { + r.lggr.Warnw("drop observation: unmarshal BlobbedObservation", "oracleID", ao.Observer, "error", err) + dropped++ + continue + } + if len(wire.BlobHandle) == 0 { + r.lggr.Warnw("drop observation: empty blob handle", "oracleID", ao.Observer) + dropped++ + continue + } + handle := ocr3_1types.BlobHandle{} + if err := handle.UnmarshalBinary(wire.BlobHandle); err != nil { + r.lggr.Warnw("drop observation: unmarshal blob handle", "oracleID", ao.Observer, "error", err) + dropped++ + continue + } + payload, err := blobFetcher.FetchBlob(ctx, handle) + if err != nil { + r.lggr.Warnw("drop observation: blob fetch failed", "oracleID", ao.Observer, "error", err) + dropped++ + continue + } + // Sanity-check that the payload unmarshals to an Observations. If + // not, something is wrong with the broadcasting node — drop it. + if err := proto.Unmarshal(payload, &pbtypes.Observations{}); err != nil { + r.lggr.Warnw("drop observation: blob payload not a valid Observations", "oracleID", ao.Observer, "error", err) + dropped++ + continue + } + out = append(out, types.AttributedObservation{ + Observer: ao.Observer, + Observation: payload, + }) + } + return out, dropped +} + + +// loadAllOutcomes drains the Range iterator fully before returning. +// kvdb.go:34 forbids any writes/deletes while the iterator is open, so we +// must materialize first and apply mutations later. +func (r *reportingPluginOCR3_1) loadAllOutcomes( + kvReader ocr3_1types.KeyValueStateReader, +) (map[string]*pbtypes.AggregationOutcome, error) { + // NOTE: KeyValueStateReader does not expose Range directly + // (ocr3_1types.KeyValueStateReader has only Read). The Range iterator + // lives on ocr3_1types.KeyValueDatabaseReadTransaction, not on the + // per-call reader. For v1 we therefore track workflow IDs in the + // capability layer and look each up by Read. Ranging over KV from within + // the plugin is a candidate for a future libocr extension. + // + // TODO(OCRBump): confirm with libocr maintainers whether iteration over + // KeyValueStateReader is planned; if so, move to Range-based loading. + result := make(map[string]*pbtypes.AggregationOutcome) + for _, workflowID := range r.r.GetRegisteredWorkflowsIDs() { + val, err := kvReader.Read([]byte(kvPrefixOutcomesV1 + workflowID)) + if err != nil { + return nil, fmt.Errorf("kv read %s: %w", workflowID, err) + } + if val == nil { + continue + } + envelope := &pbtypes.OutcomeEnvelope{} + if err := proto.Unmarshal(val, envelope); err != nil { + return nil, fmt.Errorf("unmarshal outcome envelope %s: %w", workflowID, err) + } + if envelope.Version != outcomeEnvelopeVersionV1 { + return nil, fmt.Errorf( + "outcome envelope for %s has unknown version %d (expected %d) — "+ + "refusing to deserialize; a KV schema migration is required", + workflowID, envelope.Version, outcomeEnvelopeVersionV1) + } + if envelope.Outcome == nil { + // A v1 envelope with a nil outcome is malformed; treat as absent. + continue + } + result[workflowID] = envelope.Outcome + } + return result, nil +} + +// groupObservations replicates the OCR3 Outcome() preamble but returns its +// intermediate state so StateTransition can consume it cleanly. +func (r *reportingPluginOCR3_1) groupObservations( + aos []types.AttributedObservation, +) ( + map[string]map[ocrcommon.OracleID][]values.Value, + map[string]int, + map[string]map[string]int, + map[string]encoderConfig, + *timestamppb.Timestamp, + error, +) { + execIDToOracleObservations := map[string]map[ocrcommon.OracleID][]values.Value{} + seenWorkflowIDs := map[string]int{} + execIDToEncoderShaToCount := map[string]map[string]int{} + shaToEncoder := map[string]encoderConfig{} + var sortedTimestamps []*timestamppb.Timestamp + + for _, attributedObservation := range aos { + obs := &pbtypes.Observations{} + if err := proto.Unmarshal(attributedObservation.Observation, obs); err != nil { + r.lggr.Errorw("could not unmarshal observation", "error", err) + continue + } + + countedWorkflowIDs := map[string]bool{} + for _, id := range obs.RegisteredWorkflowIds { + if _, ok := countedWorkflowIDs[id]; ok { + continue + } + seenWorkflowIDs[id]++ + countedWorkflowIDs[id] = true + } + sortedTimestamps = append(sortedTimestamps, obs.Timestamp) + + for _, request := range obs.Observations { + if request == nil || request.Id == nil { + continue + } + weid := request.Id.WorkflowExecutionId + obsList, innerErr := values.FromListValueProto(request.Observations) + if obsList == nil || innerErr != nil { + r.lggr.Errorw("observations are not a list", "weID", weid, "oracleID", attributedObservation.Observer, "err", innerErr) + continue + } + if _, ok := execIDToOracleObservations[weid]; !ok { + execIDToOracleObservations[weid] = make(map[ocrcommon.OracleID][]values.Value) + } + execIDToOracleObservations[weid][attributedObservation.Observer] = obsList.Underlying + + sha, err := shaForOverriddenEncoder(request) + if err != nil { + r.lggr.Errorw("could not calculate sha for overridden encoder", "error", err) + continue + } + shaToEncoder[sha] = encoderConfig{name: request.OverriddenEncoderName, config: request.OverriddenEncoderConfig} + if _, ok := execIDToEncoderShaToCount[weid]; !ok { + execIDToEncoderShaToCount[weid] = map[string]int{} + } + execIDToEncoderShaToCount[weid][sha]++ + } + } + + slices.SortFunc(sortedTimestamps, func(a, b *timestamppb.Timestamp) int { + if a.AsTime().Before(b.AsTime()) { + return -1 + } + if a.AsTime().After(b.AsTime()) { + return 1 + } + return 0 + }) + var finalTimestamp *timestamppb.Timestamp + tc := len(sortedTimestamps) + if tc > 0 { + mid := tc / 2 + if tc%2 == 1 { + finalTimestamp = sortedTimestamps[mid] + } else { + a := sortedTimestamps[mid-1].AsTime().Unix() + b := sortedTimestamps[mid].AsTime().Unix() + finalTimestamp = timestamppb.New(time.Unix(a+(b-a)/2, 0)) + } + } + + return execIDToOracleObservations, seenWorkflowIDs, execIDToEncoderShaToCount, shaToEncoder, finalTimestamp, nil +} + +// pickEncoderDeterministic replaces the OCR3 code's map-iteration tiebreak +// (reporting_plugin.go:396-407). Go maps iterate in randomized order; under +// OCR3_1's per-node KV writes, non-deterministic picks cause state divergence +// that never self-heals. Sort by (count desc, sha asc) and take the first +// that reaches the quorum threshold. +func pickEncoderDeterministic( + shaToCount map[string]int, + shaToEncoder map[string]encoderConfig, + threshold int, +) *encoderConfig { + if len(shaToCount) == 0 { + return nil + } + shas := make([]string, 0, len(shaToCount)) + for sha := range shaToCount { + shas = append(shas, sha) + } + sort.Slice(shas, func(i, j int) bool { + if shaToCount[shas[i]] != shaToCount[shas[j]] { + return shaToCount[shas[i]] > shaToCount[shas[j]] + } + return shas[i] < shas[j] + }) + for _, sha := range shas { + if shaToCount[sha] < threshold { + continue + } + enc, ok := shaToEncoder[sha] + if !ok { + continue + } + return &enc + } + return nil +} + +func containsString(xs []string, s string) bool { + for _, x := range xs { + if x == s { + return true + } + } + return false +} + +// marshalReportInfoOCR3_1 is a placeholder — the existing marshalReportInfo +// in reporting_plugin.go is package-scoped and reused here. Declared to make +// import expectations explicit; no alternate implementation. +var _ = structpb.NewStruct diff --git a/pkg/capabilities/consensus/ocr3/reporting_plugin_ocr3_1_test.go b/pkg/capabilities/consensus/ocr3/reporting_plugin_ocr3_1_test.go new file mode 100644 index 0000000000..2f4746d76f --- /dev/null +++ b/pkg/capabilities/consensus/ocr3/reporting_plugin_ocr3_1_test.go @@ -0,0 +1,88 @@ +package ocr3 + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-protos/cre/go/values/pb" +) + +// TestPickEncoderDeterministic_BugFix pins the fix for the OCR3 map-iteration +// tiebreak bug at reporting_plugin.go:396-407. Under OCR3 this only caused +// intermittent outcome divergence that the outer consensus masked; under +// OCR3_1 each node writes the result to its local KV, so divergence never +// self-heals. This test runs the tiebreak many times with distinct map +// insertion orders to ensure the result is byte-identical. +func TestPickEncoderDeterministic_BugFix(t *testing.T) { + cfgA := encoderConfig{name: "enc-a", config: &pb.Map{}} + cfgB := encoderConfig{name: "enc-b", config: &pb.Map{}} + cfgC := encoderConfig{name: "enc-c", config: &pb.Map{}} + + shaToEncoder := map[string]encoderConfig{ + "sha-a": cfgA, + "sha-b": cfgB, + "sha-c": cfgC, + } + + t.Run("picks winner by count desc", func(t *testing.T) { + counts := map[string]int{ + "sha-a": 4, + "sha-b": 7, // winner + "sha-c": 5, + } + got := pickEncoderDeterministic(counts, shaToEncoder, 5) + require.NotNil(t, got) + assert.Equal(t, "enc-b", got.name) + }) + + t.Run("ties broken by sha asc (deterministic across runs)", func(t *testing.T) { + counts := map[string]int{ + "sha-a": 7, + "sha-b": 7, + "sha-c": 7, + } + // Run many times; Go map iteration order is randomized, but our + // sort must make the outcome identical every time. + for i := 0; i < 50; i++ { + got := pickEncoderDeterministic(counts, shaToEncoder, 5) + require.NotNil(t, got) + assert.Equal(t, "enc-a", got.name, "iteration %d", i) + } + }) + + t.Run("nil when nothing reaches threshold", func(t *testing.T) { + counts := map[string]int{ + "sha-a": 3, + "sha-b": 4, + "sha-c": 2, + } + got := pickEncoderDeterministic(counts, shaToEncoder, 5) + assert.Nil(t, got) + }) + + t.Run("skips sha with no matching encoder", func(t *testing.T) { + counts := map[string]int{ + "sha-ghost": 10, // top count but no entry in shaToEncoder + "sha-a": 5, + } + got := pickEncoderDeterministic(counts, shaToEncoder, 5) + require.NotNil(t, got) + assert.Equal(t, "enc-a", got.name) + }) + + t.Run("empty input returns nil", func(t *testing.T) { + assert.Nil(t, pickEncoderDeterministic(nil, shaToEncoder, 5)) + assert.Nil(t, pickEncoderDeterministic(map[string]int{}, shaToEncoder, 5)) + }) +} + +// TestContainsString is trivial but pinned because containsString is on the +// hot prune path — a silent rewrite of it breaks KV pruning correctness. +func TestContainsString(t *testing.T) { + assert.True(t, containsString([]string{"a", "b", "c"}, "b")) + assert.False(t, containsString([]string{"a", "b", "c"}, "d")) + assert.False(t, containsString(nil, "a")) + assert.False(t, containsString([]string{}, "a")) +} diff --git a/pkg/capabilities/consensus/ocr3/types/ocr3_config_types.pb.go b/pkg/capabilities/consensus/ocr3/types/ocr3_config_types.pb.go index cdcddeeb39..487a1918b9 100644 --- a/pkg/capabilities/consensus/ocr3/types/ocr3_config_types.pb.go +++ b/pkg/capabilities/consensus/ocr3/types/ocr3_config_types.pb.go @@ -35,8 +35,35 @@ type ReportingPluginConfig struct { OutcomePruningThreshold uint64 `protobuf:"varint,7,opt,name=outcomePruningThreshold,proto3" json:"outcomePruningThreshold,omitempty"` RequestTimeout *durationpb.Duration `protobuf:"bytes,8,opt,name=requestTimeout,proto3" json:"requestTimeout,omitempty"` HistoricalOutcomeExpirySeqNrSpan uint64 `protobuf:"varint,9,opt,name=historical_outcome_expiry_seq_nr_span,json=historicalOutcomeExpirySeqNrSpan,proto3" json:"historical_outcome_expiry_seq_nr_span,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + // maxReportsPlusPrecursorBytes caps the precursor emitted by + // StateTransition. OCR3_1's MaxMaxReportsPlusPrecursorLength is 5 MiB. + MaxReportsPlusPrecursorBytes uint32 `protobuf:"varint,10,opt,name=maxReportsPlusPrecursorBytes,proto3" json:"maxReportsPlusPrecursorBytes,omitempty"` + // maxKeyValueModifiedKeysPlusValuesBytes caps the KV write budget of a + // single StateTransition. OCR3_1's cap is 10 MiB. + MaxKeyValueModifiedKeysPlusValuesBytes uint32 `protobuf:"varint,11,opt,name=maxKeyValueModifiedKeysPlusValuesBytes,proto3" json:"maxKeyValueModifiedKeysPlusValuesBytes,omitempty"` + // maxBlobPayloadBytes caps a single blob's payload size. OCR3_1's cap + // is 5 MiB. For CRE consensus, set to the largest expected workflow + // input. + MaxBlobPayloadBytes uint32 `protobuf:"varint,12,opt,name=maxBlobPayloadBytes,proto3" json:"maxBlobPayloadBytes,omitempty"` + // blobExpirationK is the number of seqNrs a broadcast blob must survive + // past its broadcast. Must cover the full round lifetime plus any epoch + // retry window (DeltaStage). Default 60. Zone-a (DeltaRound=0) may need + // 150. See plan §3.4. + BlobExpirationK uint64 `protobuf:"varint,13,opt,name=blobExpirationK,proto3" json:"blobExpirationK,omitempty"` + // maxKeyValueModifiedKeys caps the number of distinct keys a single + // StateTransition may modify (write or delete). OCR3_1's cap is 10_000. + MaxKeyValueModifiedKeys uint32 `protobuf:"varint,14,opt,name=maxKeyValueModifiedKeys,proto3" json:"maxKeyValueModifiedKeys,omitempty"` + // maxPerOracleUnexpiredBlobCount caps the number of unreaped blobs a + // single oracle may have outstanding. Must account for blob reaping + // intervals "in the tens of seconds" per libocr docs. Plan §3.7 step 8 + // sizes this at 500. + MaxPerOracleUnexpiredBlobCount uint32 `protobuf:"varint,15,opt,name=maxPerOracleUnexpiredBlobCount,proto3" json:"maxPerOracleUnexpiredBlobCount,omitempty"` + // maxPerOracleUnexpiredBlobCumulativePayloadBytes caps the cumulative + // payload bytes for all unreaped blobs from a single oracle. Plan §3.7 + // step 8 sizes this at MaxPerOracleUnexpiredBlobCount × MaxBlobPayloadBytes. + MaxPerOracleUnexpiredBlobCumulativePayloadBytes uint64 `protobuf:"varint,16,opt,name=maxPerOracleUnexpiredBlobCumulativePayloadBytes,proto3" json:"maxPerOracleUnexpiredBlobCumulativePayloadBytes,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *ReportingPluginConfig) Reset() { @@ -132,11 +159,60 @@ func (x *ReportingPluginConfig) GetHistoricalOutcomeExpirySeqNrSpan() uint64 { return 0 } +func (x *ReportingPluginConfig) GetMaxReportsPlusPrecursorBytes() uint32 { + if x != nil { + return x.MaxReportsPlusPrecursorBytes + } + return 0 +} + +func (x *ReportingPluginConfig) GetMaxKeyValueModifiedKeysPlusValuesBytes() uint32 { + if x != nil { + return x.MaxKeyValueModifiedKeysPlusValuesBytes + } + return 0 +} + +func (x *ReportingPluginConfig) GetMaxBlobPayloadBytes() uint32 { + if x != nil { + return x.MaxBlobPayloadBytes + } + return 0 +} + +func (x *ReportingPluginConfig) GetBlobExpirationK() uint64 { + if x != nil { + return x.BlobExpirationK + } + return 0 +} + +func (x *ReportingPluginConfig) GetMaxKeyValueModifiedKeys() uint32 { + if x != nil { + return x.MaxKeyValueModifiedKeys + } + return 0 +} + +func (x *ReportingPluginConfig) GetMaxPerOracleUnexpiredBlobCount() uint32 { + if x != nil { + return x.MaxPerOracleUnexpiredBlobCount + } + return 0 +} + +func (x *ReportingPluginConfig) GetMaxPerOracleUnexpiredBlobCumulativePayloadBytes() uint64 { + if x != nil { + return x.MaxPerOracleUnexpiredBlobCumulativePayloadBytes + } + return 0 +} + var File_ocr3_config_types_proto protoreflect.FileDescriptor const file_ocr3_config_types_proto_rawDesc = "" + "\n" + - "\x17ocr3_config_types.proto\x12\x11ocr3_config_types\x1a\x1egoogle/protobuf/duration.proto\"\x8b\x04\n" + + "\x17ocr3_config_types.proto\x12\x11ocr3_config_types\x1a\x1egoogle/protobuf/duration.proto\"\xef\a\n" + "\x15ReportingPluginConfig\x120\n" + "\x13maxQueryLengthBytes\x18\x01 \x01(\rR\x13maxQueryLengthBytes\x12<\n" + "\x19maxObservationLengthBytes\x18\x02 \x01(\rR\x19maxObservationLengthBytes\x124\n" + @@ -146,7 +222,15 @@ const file_ocr3_config_types_proto_rawDesc = "" + "\fmaxBatchSize\x18\x06 \x01(\rR\fmaxBatchSize\x128\n" + "\x17outcomePruningThreshold\x18\a \x01(\x04R\x17outcomePruningThreshold\x12A\n" + "\x0erequestTimeout\x18\b \x01(\v2\x19.google.protobuf.DurationR\x0erequestTimeout\x12O\n" + - "%historical_outcome_expiry_seq_nr_span\x18\t \x01(\x04R historicalOutcomeExpirySeqNrSpanB#Z!capabilities/consensus/ocr3/typesb\x06proto3" + "%historical_outcome_expiry_seq_nr_span\x18\t \x01(\x04R historicalOutcomeExpirySeqNrSpan\x12B\n" + + "\x1cmaxReportsPlusPrecursorBytes\x18\n" + + " \x01(\rR\x1cmaxReportsPlusPrecursorBytes\x12V\n" + + "&maxKeyValueModifiedKeysPlusValuesBytes\x18\v \x01(\rR&maxKeyValueModifiedKeysPlusValuesBytes\x120\n" + + "\x13maxBlobPayloadBytes\x18\f \x01(\rR\x13maxBlobPayloadBytes\x12(\n" + + "\x0fblobExpirationK\x18\r \x01(\x04R\x0fblobExpirationK\x128\n" + + "\x17maxKeyValueModifiedKeys\x18\x0e \x01(\rR\x17maxKeyValueModifiedKeys\x12F\n" + + "\x1emaxPerOracleUnexpiredBlobCount\x18\x0f \x01(\rR\x1emaxPerOracleUnexpiredBlobCount\x12h\n" + + "/maxPerOracleUnexpiredBlobCumulativePayloadBytes\x18\x10 \x01(\x04R/maxPerOracleUnexpiredBlobCumulativePayloadBytesB#Z!capabilities/consensus/ocr3/typesb\x06proto3" var ( file_ocr3_config_types_proto_rawDescOnce sync.Once diff --git a/pkg/capabilities/consensus/ocr3/types/ocr3_config_types.proto b/pkg/capabilities/consensus/ocr3/types/ocr3_config_types.proto index e3954a3b7c..575390f845 100644 --- a/pkg/capabilities/consensus/ocr3/types/ocr3_config_types.proto +++ b/pkg/capabilities/consensus/ocr3/types/ocr3_config_types.proto @@ -19,4 +19,43 @@ message ReportingPluginConfig { uint64 outcomePruningThreshold = 7; google.protobuf.Duration requestTimeout = 8; uint64 historical_outcome_expiry_seq_nr_span = 9; + + // --- OCR3_1-only fields (added under OCRBump) ------------------------ + // Consumed only by the OCR3_1 factory; ignored by the OCR3 factory. + // Operators may leave them unset under OCR3; zero values fall back to + // plugin defaults when the OCR3_1 factory is used. + + // maxReportsPlusPrecursorBytes caps the precursor emitted by + // StateTransition. OCR3_1's MaxMaxReportsPlusPrecursorLength is 5 MiB. + uint32 maxReportsPlusPrecursorBytes = 10; + + // maxKeyValueModifiedKeysPlusValuesBytes caps the KV write budget of a + // single StateTransition. OCR3_1's cap is 10 MiB. + uint32 maxKeyValueModifiedKeysPlusValuesBytes = 11; + + // maxBlobPayloadBytes caps a single blob's payload size. OCR3_1's cap + // is 5 MiB. For CRE consensus, set to the largest expected workflow + // input. + uint32 maxBlobPayloadBytes = 12; + + // blobExpirationK is the number of seqNrs a broadcast blob must survive + // past its broadcast. Must cover the full round lifetime plus any epoch + // retry window (DeltaStage). Default 60. Zone-a (DeltaRound=0) may need + // 150. See plan §3.4. + uint64 blobExpirationK = 13; + + // maxKeyValueModifiedKeys caps the number of distinct keys a single + // StateTransition may modify (write or delete). OCR3_1's cap is 10_000. + uint32 maxKeyValueModifiedKeys = 14; + + // maxPerOracleUnexpiredBlobCount caps the number of unreaped blobs a + // single oracle may have outstanding. Must account for blob reaping + // intervals "in the tens of seconds" per libocr docs. Plan §3.7 step 8 + // sizes this at 500. + uint32 maxPerOracleUnexpiredBlobCount = 15; + + // maxPerOracleUnexpiredBlobCumulativePayloadBytes caps the cumulative + // payload bytes for all unreaped blobs from a single oracle. Plan §3.7 + // step 8 sizes this at MaxPerOracleUnexpiredBlobCount × MaxBlobPayloadBytes. + uint64 maxPerOracleUnexpiredBlobCumulativePayloadBytes = 16; } \ No newline at end of file diff --git a/pkg/capabilities/consensus/ocr3/types/ocr3_types.pb.go b/pkg/capabilities/consensus/ocr3/types/ocr3_types.pb.go index 86d363a5eb..cebcc7a4b6 100644 --- a/pkg/capabilities/consensus/ocr3/types/ocr3_types.pb.go +++ b/pkg/capabilities/consensus/ocr3/types/ocr3_types.pb.go @@ -549,6 +549,146 @@ func (x *Outcome) GetCurrentReports() []*Report { return nil } +// --- OCR3_1-only messages (added under OCRBump) --------------------------- +// +// Under OCR3_1 the bulk per-request values.List payload moves to blobs, keeping +// the on-wire observation well under the 512 KiB OCR3_1 cap. The existing +// Observations message is broadcast as the blob contents; BlobbedObservation +// is what travels the consensus channel and carries only the blob handle plus +// small metadata. +type BlobbedObservation struct { + state protoimpl.MessageState `protogen:"open.v1"` + // Handle returned by BlobBroadcastFetcher.BroadcastBlob. The fetching node + // calls FetchBlob(handle) to retrieve the Observations payload. + BlobHandle []byte `protobuf:"bytes,1,opt,name=blob_handle,json=blobHandle,proto3" json:"blob_handle,omitempty"` + // IDs this observation has data for. Mirrors the Observations.observations + // list' Id fields so ValidateObservation can reject mismatches without + // dereferencing the blob. + Ids []*Id `protobuf:"bytes,2,rep,name=ids,proto3" json:"ids,omitempty"` + // Node timestamp, carried out-of-blob so StateTransition can compute the + // median timestamp without fetching every blob first. + Timestamp *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + // Registered workflow IDs on this node; carried out-of-blob for the same + // reason as `timestamp`. + RegisteredWorkflowIds []string `protobuf:"bytes,4,rep,name=registered_workflow_ids,json=registeredWorkflowIds,proto3" json:"registered_workflow_ids,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *BlobbedObservation) Reset() { + *x = BlobbedObservation{} + mi := &file_ocr3_types_proto_msgTypes[8] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *BlobbedObservation) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*BlobbedObservation) ProtoMessage() {} + +func (x *BlobbedObservation) ProtoReflect() protoreflect.Message { + mi := &file_ocr3_types_proto_msgTypes[8] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use BlobbedObservation.ProtoReflect.Descriptor instead. +func (*BlobbedObservation) Descriptor() ([]byte, []int) { + return file_ocr3_types_proto_rawDescGZIP(), []int{8} +} + +func (x *BlobbedObservation) GetBlobHandle() []byte { + if x != nil { + return x.BlobHandle + } + return nil +} + +func (x *BlobbedObservation) GetIds() []*Id { + if x != nil { + return x.Ids + } + return nil +} + +func (x *BlobbedObservation) GetTimestamp() *timestamppb.Timestamp { + if x != nil { + return x.Timestamp + } + return nil +} + +func (x *BlobbedObservation) GetRegisteredWorkflowIds() []string { + if x != nil { + return x.RegisteredWorkflowIds + } + return nil +} + +// OutcomeEnvelope wraps a per-workflow AggregationOutcome stored in the +// OCR3_1 replicated KV. The explicit version field lets us detect stored +// values from older schemas and migrate. Never reuse a version number with +// an incompatible payload — bump the number and add a new key prefix. +type OutcomeEnvelope struct { + state protoimpl.MessageState `protogen:"open.v1"` + Version uint32 `protobuf:"varint,1,opt,name=version,proto3" json:"version,omitempty"` // currently 1 + Outcome *AggregationOutcome `protobuf:"bytes,2,opt,name=outcome,proto3" json:"outcome,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *OutcomeEnvelope) Reset() { + *x = OutcomeEnvelope{} + mi := &file_ocr3_types_proto_msgTypes[9] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *OutcomeEnvelope) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*OutcomeEnvelope) ProtoMessage() {} + +func (x *OutcomeEnvelope) ProtoReflect() protoreflect.Message { + mi := &file_ocr3_types_proto_msgTypes[9] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use OutcomeEnvelope.ProtoReflect.Descriptor instead. +func (*OutcomeEnvelope) Descriptor() ([]byte, []int) { + return file_ocr3_types_proto_rawDescGZIP(), []int{9} +} + +func (x *OutcomeEnvelope) GetVersion() uint32 { + if x != nil { + return x.Version + } + return 0 +} + +func (x *OutcomeEnvelope) GetOutcome() *AggregationOutcome { + if x != nil { + return x.Outcome + } + return nil +} + var File_ocr3_types_proto protoreflect.FileDescriptor const file_ocr3_types_proto_rawDesc = "" + @@ -599,7 +739,16 @@ const file_ocr3_types_proto_rawDesc = "" + "\x0fcurrent_reports\x18\x02 \x03(\v2\x12.ocr3_types.ReportR\x0ecurrentReports\x1a[\n" + "\rOutcomesEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x124\n" + - "\x05value\x18\x02 \x01(\v2\x1e.ocr3_types.AggregationOutcomeR\x05value:\x028\x01B#Z!capabilities/consensus/ocr3/typesb\x06proto3" + "\x05value\x18\x02 \x01(\v2\x1e.ocr3_types.AggregationOutcomeR\x05value:\x028\x01\"\xc9\x01\n" + + "\x12BlobbedObservation\x12\x1f\n" + + "\vblob_handle\x18\x01 \x01(\fR\n" + + "blobHandle\x12 \n" + + "\x03ids\x18\x02 \x03(\v2\x0e.ocr3_types.IdR\x03ids\x128\n" + + "\ttimestamp\x18\x03 \x01(\v2\x1a.google.protobuf.TimestampR\ttimestamp\x126\n" + + "\x17registered_workflow_ids\x18\x04 \x03(\tR\x15registeredWorkflowIds\"e\n" + + "\x0fOutcomeEnvelope\x12\x18\n" + + "\aversion\x18\x01 \x01(\rR\aversion\x128\n" + + "\aoutcome\x18\x02 \x01(\v2\x1e.ocr3_types.AggregationOutcomeR\aoutcomeB#Z!capabilities/consensus/ocr3/typesb\x06proto3" var ( file_ocr3_types_proto_rawDescOnce sync.Once @@ -613,7 +762,7 @@ func file_ocr3_types_proto_rawDescGZIP() []byte { return file_ocr3_types_proto_rawDescData } -var file_ocr3_types_proto_msgTypes = make([]protoimpl.MessageInfo, 9) +var file_ocr3_types_proto_msgTypes = make([]protoimpl.MessageInfo, 11) var file_ocr3_types_proto_goTypes = []any{ (*AggregationOutcome)(nil), // 0: ocr3_types.AggregationOutcome (*Query)(nil), // 1: ocr3_types.Query @@ -623,32 +772,37 @@ var file_ocr3_types_proto_goTypes = []any{ (*Report)(nil), // 5: ocr3_types.Report (*ReportInfo)(nil), // 6: ocr3_types.ReportInfo (*Outcome)(nil), // 7: ocr3_types.Outcome - nil, // 8: ocr3_types.Outcome.OutcomesEntry - (*pb.Map)(nil), // 9: values.v1.Map - (*timestamppb.Timestamp)(nil), // 10: google.protobuf.Timestamp - (*pb.List)(nil), // 11: values.v1.List + (*BlobbedObservation)(nil), // 8: ocr3_types.BlobbedObservation + (*OutcomeEnvelope)(nil), // 9: ocr3_types.OutcomeEnvelope + nil, // 10: ocr3_types.Outcome.OutcomesEntry + (*pb.Map)(nil), // 11: values.v1.Map + (*timestamppb.Timestamp)(nil), // 12: google.protobuf.Timestamp + (*pb.List)(nil), // 13: values.v1.List } var file_ocr3_types_proto_depIdxs = []int32{ - 9, // 0: ocr3_types.AggregationOutcome.encodableOutcome:type_name -> values.v1.Map - 10, // 1: ocr3_types.AggregationOutcome.timestamp:type_name -> google.protobuf.Timestamp - 9, // 2: ocr3_types.AggregationOutcome.encoderConfig:type_name -> values.v1.Map + 11, // 0: ocr3_types.AggregationOutcome.encodableOutcome:type_name -> values.v1.Map + 12, // 1: ocr3_types.AggregationOutcome.timestamp:type_name -> google.protobuf.Timestamp + 11, // 2: ocr3_types.AggregationOutcome.encoderConfig:type_name -> values.v1.Map 2, // 3: ocr3_types.Query.ids:type_name -> ocr3_types.Id 2, // 4: ocr3_types.Observation.id:type_name -> ocr3_types.Id - 11, // 5: ocr3_types.Observation.observations:type_name -> values.v1.List - 9, // 6: ocr3_types.Observation.overriddenEncoderConfig:type_name -> values.v1.Map + 13, // 5: ocr3_types.Observation.observations:type_name -> values.v1.List + 11, // 6: ocr3_types.Observation.overriddenEncoderConfig:type_name -> values.v1.Map 3, // 7: ocr3_types.Observations.observations:type_name -> ocr3_types.Observation - 10, // 8: ocr3_types.Observations.timestamp:type_name -> google.protobuf.Timestamp + 12, // 8: ocr3_types.Observations.timestamp:type_name -> google.protobuf.Timestamp 2, // 9: ocr3_types.Report.id:type_name -> ocr3_types.Id 0, // 10: ocr3_types.Report.outcome:type_name -> ocr3_types.AggregationOutcome 2, // 11: ocr3_types.ReportInfo.id:type_name -> ocr3_types.Id - 8, // 12: ocr3_types.Outcome.outcomes:type_name -> ocr3_types.Outcome.OutcomesEntry + 10, // 12: ocr3_types.Outcome.outcomes:type_name -> ocr3_types.Outcome.OutcomesEntry 5, // 13: ocr3_types.Outcome.current_reports:type_name -> ocr3_types.Report - 0, // 14: ocr3_types.Outcome.OutcomesEntry.value:type_name -> ocr3_types.AggregationOutcome - 15, // [15:15] is the sub-list for method output_type - 15, // [15:15] is the sub-list for method input_type - 15, // [15:15] is the sub-list for extension type_name - 15, // [15:15] is the sub-list for extension extendee - 0, // [0:15] is the sub-list for field type_name + 2, // 14: ocr3_types.BlobbedObservation.ids:type_name -> ocr3_types.Id + 12, // 15: ocr3_types.BlobbedObservation.timestamp:type_name -> google.protobuf.Timestamp + 0, // 16: ocr3_types.OutcomeEnvelope.outcome:type_name -> ocr3_types.AggregationOutcome + 0, // 17: ocr3_types.Outcome.OutcomesEntry.value:type_name -> ocr3_types.AggregationOutcome + 18, // [18:18] is the sub-list for method output_type + 18, // [18:18] is the sub-list for method input_type + 18, // [18:18] is the sub-list for extension type_name + 18, // [18:18] is the sub-list for extension extendee + 0, // [0:18] is the sub-list for field type_name } func init() { file_ocr3_types_proto_init() } @@ -662,7 +816,7 @@ func file_ocr3_types_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_ocr3_types_proto_rawDesc), len(file_ocr3_types_proto_rawDesc)), NumEnums: 0, - NumMessages: 9, + NumMessages: 11, NumExtensions: 0, NumServices: 0, }, diff --git a/pkg/capabilities/consensus/ocr3/types/ocr3_types.proto b/pkg/capabilities/consensus/ocr3/types/ocr3_types.proto index a3cb2a9ffe..4c906bb17d 100644 --- a/pkg/capabilities/consensus/ocr3/types/ocr3_types.proto +++ b/pkg/capabilities/consensus/ocr3/types/ocr3_types.proto @@ -66,3 +66,38 @@ message Outcome { map outcomes = 1; repeated Report current_reports = 2; } + +// --- OCR3_1-only messages (added under OCRBump) --------------------------- +// +// Under OCR3_1 the bulk per-request values.List payload moves to blobs, keeping +// the on-wire observation well under the 512 KiB OCR3_1 cap. The existing +// Observations message is broadcast as the blob contents; BlobbedObservation +// is what travels the consensus channel and carries only the blob handle plus +// small metadata. +message BlobbedObservation { + // Handle returned by BlobBroadcastFetcher.BroadcastBlob. The fetching node + // calls FetchBlob(handle) to retrieve the Observations payload. + bytes blob_handle = 1; + + // IDs this observation has data for. Mirrors the Observations.observations + // list' Id fields so ValidateObservation can reject mismatches without + // dereferencing the blob. + repeated Id ids = 2; + + // Node timestamp, carried out-of-blob so StateTransition can compute the + // median timestamp without fetching every blob first. + google.protobuf.Timestamp timestamp = 3; + + // Registered workflow IDs on this node; carried out-of-blob for the same + // reason as `timestamp`. + repeated string registered_workflow_ids = 4; +} + +// OutcomeEnvelope wraps a per-workflow AggregationOutcome stored in the +// OCR3_1 replicated KV. The explicit version field lets us detect stored +// values from older schemas and migrate. Never reuse a version number with +// an incompatible payload — bump the number and add a new key prefix. +message OutcomeEnvelope { + uint32 version = 1; // currently 1 + AggregationOutcome outcome = 2; +}