From ff66aade9f57c558e4e559addddd7825c7046b71 Mon Sep 17 00:00:00 2001 From: Bolek <1416262+bolekk@users.noreply.github.com> Date: Mon, 6 Apr 2026 01:28:20 -0700 Subject: [PATCH 1/2] [DONTime] Rollout flag for pruning fix (#1963) --- pkg/workflows/dontime/pb/dontime.pb.go | 25 ++++-- pkg/workflows/dontime/pb/dontime.proto | 3 + pkg/workflows/dontime/plugin.go | 104 ++++++++++++++++++++++--- pkg/workflows/dontime/plugin_test.go | 94 ++++++++++++++-------- 4 files changed, 174 insertions(+), 52 deletions(-) diff --git a/pkg/workflows/dontime/pb/dontime.pb.go b/pkg/workflows/dontime/pb/dontime.pb.go index 01a5f4b84a..18aa3f3eec 100644 --- a/pkg/workflows/dontime/pb/dontime.pb.go +++ b/pkg/workflows/dontime/pb/dontime.pb.go @@ -22,11 +22,14 @@ const ( ) type Observation struct { - state protoimpl.MessageState `protogen:"open.v1"` - Timestamp int64 `protobuf:"varint,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"` - Requests map[string]int64 `protobuf:"bytes,2,rep,name=requests,proto3" json:"requests,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"varint,2,opt,name=value"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + Timestamp int64 `protobuf:"varint,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + Requests map[string]int64 `protobuf:"bytes,2,rep,name=requests,proto3" json:"requests,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"varint,2,opt,name=value"` + // Flag to roll out execution pruning fix. + // TODO(CRE-2497): Remove after rollout. + PruneExecutions bool `protobuf:"varint,3,opt,name=prune_executions,json=pruneExecutions,proto3" json:"prune_executions,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *Observation) Reset() { @@ -73,6 +76,13 @@ func (x *Observation) GetRequests() map[string]int64 { return nil } +func (x *Observation) GetPruneExecutions() bool { + if x != nil { + return x.PruneExecutions + } + return false +} + type Observations struct { state protoimpl.MessageState `protogen:"open.v1"` // batched observations for multiple workflow execution IDs @@ -218,10 +228,11 @@ var File_dontime_proto protoreflect.FileDescriptor const file_dontime_proto_rawDesc = "" + "\n" + - "\rdontime.proto\"\xa0\x01\n" + + "\rdontime.proto\"\xcb\x01\n" + "\vObservation\x12\x1c\n" + "\ttimestamp\x18\x01 \x01(\x03R\ttimestamp\x126\n" + - "\brequests\x18\x02 \x03(\v2\x1a.Observation.RequestsEntryR\brequests\x1a;\n" + + "\brequests\x18\x02 \x03(\v2\x1a.Observation.RequestsEntryR\brequests\x12)\n" + + "\x10prune_executions\x18\x03 \x01(\bR\x0fpruneExecutions\x1a;\n" + "\rRequestsEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + "\x05value\x18\x02 \x01(\x03R\x05value:\x028\x01\"@\n" + diff --git a/pkg/workflows/dontime/pb/dontime.proto b/pkg/workflows/dontime/pb/dontime.proto index 9e68be2fbc..9dc77edd2e 100644 --- a/pkg/workflows/dontime/pb/dontime.proto +++ b/pkg/workflows/dontime/pb/dontime.proto @@ -5,6 +5,9 @@ option go_package = "github.com/smartcontractkit/chainlink-common/pkg/workflows/ message Observation { int64 timestamp = 1; map requests = 2; + // Flag to roll out execution pruning fix. + // TODO(CRE-2497): Remove after rollout. + bool prune_executions = 3; } message Observations { diff --git a/pkg/workflows/dontime/plugin.go b/pkg/workflows/dontime/plugin.go index 4cd521c630..2476814337 100644 --- a/pkg/workflows/dontime/plugin.go +++ b/pkg/workflows/dontime/plugin.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "go.opentelemetry.io/otel/metric" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/structpb" @@ -16,10 +17,51 @@ import ( "github.com/smartcontractkit/libocr/offchainreporting2plus/types" "github.com/smartcontractkit/libocr/quorumhelper" + "github.com/smartcontractkit/chainlink-common/pkg/beholder" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/workflows/dontime/pb" ) +type pluginMetrics struct { + donTime metric.Int64Gauge + donTimeEntries metric.Int64Gauge + outcomeSize metric.Int64Gauge +} + +func newPluginMetrics() (pluginMetrics, error) { + meter := beholder.GetMeter() + + donTime, err := meter.Int64Gauge("platform_dontime_outcome_don_time_ms", + metric.WithDescription("DON consensus timestamp included in the latest outcome, in milliseconds"), + metric.WithUnit("ms"), + ) + if err != nil { + return pluginMetrics{}, fmt.Errorf("failed to create don_time gauge: %w", err) + } + + donTimeEntries, err := meter.Int64Gauge("platform_dontime_outcome_entries", + metric.WithDescription("Number of workflow execution entries tracked in the latest outcome"), + metric.WithUnit("{entry}"), + ) + if err != nil { + return pluginMetrics{}, fmt.Errorf("failed to create don_time_entries gauge: %w", err) + } + + outcomeSize, err := meter.Int64Gauge("platform_dontime_outcome_size_bytes", + metric.WithDescription("Serialised size of the latest outcome in bytes"), + metric.WithUnit("By"), + ) + if err != nil { + return pluginMetrics{}, fmt.Errorf("failed to create outcome_size gauge: %w", err) + } + + return pluginMetrics{ + donTime: donTime, + donTimeEntries: donTimeEntries, + outcomeSize: outcomeSize, + }, nil +} + type Plugin struct { mu sync.RWMutex @@ -30,6 +72,8 @@ type Plugin struct { batchSize int minTimeIncrease int64 + + metrics pluginMetrics } var _ ocr3types.ReportingPlugin[[]byte] = (*Plugin)(nil) @@ -45,6 +89,11 @@ func NewPlugin(store *Store, config ocr3types.ReportingPluginConfig, offchainCfg return nil, errors.New("execution removal time must be positive") } + metrics, err := newPluginMetrics() + if err != nil { + return nil, err + } + return &Plugin{ store: store, config: config, @@ -52,6 +101,7 @@ func NewPlugin(store *Store, config ocr3types.ReportingPluginConfig, offchainCfg lggr: logger.Named(lggr, "DONTimePlugin"), batchSize: int(offchainCfg.MaxBatchSize), minTimeIncrease: offchainCfg.MinTimeIncrease / int64(time.Millisecond), + metrics: metrics, }, nil } @@ -100,8 +150,9 @@ func (p *Plugin) Observation(_ context.Context, outctx ocr3types.OutcomeContext, } observation := &pb.Observation{ - Timestamp: time.Now().UTC().UnixMilli(), - Requests: requests, + Timestamp: time.Now().UTC().UnixMilli(), + Requests: requests, + PruneExecutions: true, } return proto.MarshalOptions{Deterministic: true}.Marshal(observation) @@ -115,7 +166,7 @@ func (p *Plugin) ObservationQuorum(_ context.Context, _ ocr3types.OutcomeContext return quorumhelper.ObservationCountReachesObservationQuorum(quorumhelper.QuorumTwoFPlusOne, p.config.N, p.config.F, aos), nil } -func (p *Plugin) Outcome(_ context.Context, outctx ocr3types.OutcomeContext, _ types.Query, aos []types.AttributedObservation) (ocr3types.Outcome, error) { +func (p *Plugin) Outcome(ctx context.Context, outctx ocr3types.OutcomeContext, _ types.Query, aos []types.AttributedObservation) (ocr3types.Outcome, error) { observationCounts := map[string]int64{} // counts how many nodes reported where a new DON timestamp might be needed type timestampNodePair struct { Timestamp int64 @@ -132,14 +183,33 @@ func (p *Plugin) Outcome(_ context.Context, outctx ocr3types.OutcomeContext, _ t prevOutcome.ObservedDonTimes = make(map[string]*pb.ObservedDonTimes) } + // Unmarshal all observations once and compute pruneExecutions. + // Only prune when all nodes are updated. Even if this rolls back, the logic is still correct. + parsedAOs := make([]*pb.Observation, len(aos)) + pruneExecutions := true for idx, ao := range aos { observation := &pb.Observation{} if err := proto.Unmarshal(ao.Observation, observation); err != nil { p.lggr.Errorf("failed to unmarshal observation in Outcome phase") continue } + parsedAOs[idx] = observation + if !observation.PruneExecutions { + pruneExecutions = false // need all nodes to agree + } + } + + for idx, observation := range parsedAOs { + if observation == nil { + continue + } for id, requestSeqNum := range observation.Requests { + if !pruneExecutions { // TODO(CRE-2497): legacy behavior, remove after rollout + if _, ok := prevOutcome.ObservedDonTimes[id]; !ok { + prevOutcome.ObservedDonTimes[id] = &pb.ObservedDonTimes{} + } + } var currSeqNum int64 if times, ok := prevOutcome.ObservedDonTimes[id]; ok { currSeqNum = int64(len(times.Timestamps)) @@ -199,14 +269,23 @@ func (p *Plugin) Outcome(_ context.Context, outctx ocr3types.OutcomeContext, _ t // Remove expired and empty workflow executions for id, observedTimes := range outcome.ObservedDonTimes { - if observedTimes == nil || len(observedTimes.Timestamps) == 0 { - delete(outcome.ObservedDonTimes, id) - p.store.deleteExecutionID(id) - continue - } - if donTime >= observedTimes.Timestamps[0]+p.offChainConfig.ExecutionRemovalTime.AsDuration().Milliseconds() { - delete(outcome.ObservedDonTimes, id) - p.store.deleteExecutionID(id) + if !pruneExecutions { // TODO(CRE-2497): legacy behavior, remove after rollout + if observedTimes != nil && len(observedTimes.Timestamps) > 0 { + if donTime >= observedTimes.Timestamps[0]+p.offChainConfig.ExecutionRemovalTime.AsDuration().Milliseconds() { + delete(outcome.ObservedDonTimes, id) + p.store.deleteExecutionID(id) + } + } + } else { + if observedTimes == nil || len(observedTimes.Timestamps) == 0 { + delete(outcome.ObservedDonTimes, id) + p.store.deleteExecutionID(id) + continue + } + if donTime >= observedTimes.Timestamps[0]+p.offChainConfig.ExecutionRemovalTime.AsDuration().Milliseconds() { + delete(outcome.ObservedDonTimes, id) + p.store.deleteExecutionID(id) + } } } @@ -215,6 +294,9 @@ func (p *Plugin) Outcome(_ context.Context, outctx ocr3types.OutcomeContext, _ t "observedDonTimesEntries", len(outcome.ObservedDonTimes), "outcomeSizeBytes", len(outcomeBytes), ) + p.metrics.donTime.Record(ctx, outcome.Timestamp) + p.metrics.donTimeEntries.Record(ctx, int64(len(outcome.ObservedDonTimes))) + p.metrics.outcomeSize.Record(ctx, int64(len(outcomeBytes))) return outcomeBytes, err } diff --git a/pkg/workflows/dontime/plugin_test.go b/pkg/workflows/dontime/plugin_test.go index 89c33fdcde..e3b4f73436 100644 --- a/pkg/workflows/dontime/plugin_test.go +++ b/pkg/workflows/dontime/plugin_test.go @@ -158,28 +158,24 @@ func TestPlugin_Outcome(t *testing.T) { timestamp := time.Now().UnixMilli() observations := []*pb.Observation{ { - Timestamp: timestamp, - Requests: map[string]int64{ - executionID: 0, - }, + Timestamp: timestamp, + Requests: map[string]int64{executionID: 0}, + PruneExecutions: true, }, { - Timestamp: timestamp - int64(time.Second), - Requests: map[string]int64{ - executionID: 0, - }, + Timestamp: timestamp - int64(time.Second), + Requests: map[string]int64{executionID: 0}, + PruneExecutions: true, }, { - Timestamp: timestamp + int64(time.Second), - Requests: map[string]int64{ - executionID: 0, - }, + Timestamp: timestamp + int64(time.Second), + Requests: map[string]int64{executionID: 0}, + PruneExecutions: true, }, { - Timestamp: timestamp, - Requests: map[string]int64{ - executionID: 0, - }, + Timestamp: timestamp, + Requests: map[string]int64{executionID: 0}, + PruneExecutions: true, }, } @@ -223,8 +219,9 @@ func TestPlugin_Outcome_SequenceNumberHandling(t *testing.T) { aos := make([]types.AttributedObservation, numNodes) for i := 0; i < numNodes; i++ { obs := &pb.Observation{ - Timestamp: timestamp + int64(i), - Requests: requests, + Timestamp: timestamp + int64(i), + Requests: requests, + PruneExecutions: true, } rawObs, err := proto.Marshal(obs) require.NoError(t, err) @@ -433,22 +430,10 @@ func TestPlugin_FinishedExecutions(t *testing.T) { t.Run("Outcome: remove expired workflow executions", func(t *testing.T) { timestamp := time.Now().UnixMilli() observations := []*pb.Observation{ - { - Timestamp: timestamp, - Requests: map[string]int64{}, - }, - { - Timestamp: timestamp - int64(time.Second), - Requests: map[string]int64{}, - }, - { - Timestamp: timestamp + int64(time.Second), - Requests: map[string]int64{}, - }, - { - Timestamp: timestamp, - Requests: map[string]int64{}, - }, + {Timestamp: timestamp, Requests: map[string]int64{}, PruneExecutions: true}, + {Timestamp: timestamp - int64(time.Second), Requests: map[string]int64{}, PruneExecutions: true}, + {Timestamp: timestamp + int64(time.Second), Requests: map[string]int64{}, PruneExecutions: true}, + {Timestamp: timestamp, Requests: map[string]int64{}, PruneExecutions: true}, } aos := make([]types.AttributedObservation, 4) @@ -483,6 +468,47 @@ func TestPlugin_FinishedExecutions(t *testing.T) { require.NotContains(t, outcomeProto.ObservedDonTimes, "workflow-123") }) + t.Run("Outcome: legacy path when only half nodes have PruneExecutions set", func(t *testing.T) { + timestamp := time.Now().UnixMilli() + emptyID := "empty-workflow" + + // Only 2 of 4 nodes have PruneExecutions=true → pruneExecutions stays false → legacy path. + observations := []*pb.Observation{ + {Timestamp: timestamp, Requests: map[string]int64{}, PruneExecutions: true}, + {Timestamp: timestamp - int64(time.Second), Requests: map[string]int64{}, PruneExecutions: true}, + {Timestamp: timestamp + int64(time.Second), Requests: map[string]int64{}, PruneExecutions: false}, + {Timestamp: timestamp, Requests: map[string]int64{}, PruneExecutions: false}, + } + + aos := make([]types.AttributedObservation, len(observations)) + for i, obs := range observations { + rawObs, err := proto.Marshal(obs) + require.NoError(t, err) + aos[i] = types.AttributedObservation{Observation: rawObs, Observer: commontypes.OracleID(i)} + } + + // prevOutcome contains an entry for emptyID with no timestamps. + prevOutcome := &pb.Outcome{ + Timestamp: timestamp - 1000, + ObservedDonTimes: map[string]*pb.ObservedDonTimes{ + emptyID: {Timestamps: []int64{}}, + }, + } + prevOutcomeBytes, err := proto.Marshal(prevOutcome) + require.NoError(t, err) + + outcome, err := plugin.Outcome(ctx, ocr3types.OutcomeContext{PreviousOutcome: prevOutcomeBytes}, query, aos) + require.NoError(t, err) + + legacyOutcomeProto := &pb.Outcome{} + err = proto.Unmarshal(outcome, legacyOutcomeProto) + require.NoError(t, err) + + // Legacy behavior: empty-timestamps entry is NOT pruned. + require.Contains(t, legacyOutcomeProto.ObservedDonTimes, emptyID) + require.Empty(t, legacyOutcomeProto.ObservedDonTimes[emptyID].Timestamps) + }) + t.Run("Transmit: delete removed executionIDs", func(t *testing.T) { store.setDonTimes("workflow-123", []int64{time.Now().UnixMilli()}) From 86eeeebd94fb1562cbb6ae124bb1670ddf5e168c Mon Sep 17 00:00:00 2001 From: Oliver Townsend Date: Tue, 28 Apr 2026 09:43:42 -0700 Subject: [PATCH 2/2] Revert "Clean up dropped resources in relayer construction (#1927)" This reverts commit c8e0d77df4210cea915ed317ad835cf7fec8bd54. --- pkg/loop/internal/relayer/relayer.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/loop/internal/relayer/relayer.go b/pkg/loop/internal/relayer/relayer.go index 26220c9b5b..e9f6c68294 100644 --- a/pkg/loop/internal/relayer/relayer.go +++ b/pkg/loop/internal/relayer/relayer.go @@ -61,7 +61,7 @@ func (p *PluginRelayerClient) NewRelayer(ctx context.Context, config string, key pb.RegisterKeystoreServer(s, ks.NewServer(keystore)) }) if err != nil { - return 0, deps, fmt.Errorf("Failed to create relayer client: failed to serve keystore: %w", err) + return 0, nil, fmt.Errorf("Failed to create relayer client: failed to serve keystore: %w", err) } deps.Add(ksRes) @@ -70,7 +70,7 @@ func (p *PluginRelayerClient) NewRelayer(ctx context.Context, config string, key pb.RegisterKeystoreServer(s, ks.NewServer(csaKeystore)) }) if err != nil { - return 0, deps, fmt.Errorf("Failed to create relayer client: failed to serve CSA keystore: %w", err) + return 0, nil, fmt.Errorf("Failed to create relayer client: failed to serve CSA keystore: %w", err) } deps.Add(ksCSARes) @@ -78,7 +78,7 @@ func (p *PluginRelayerClient) NewRelayer(ctx context.Context, config string, key pb.RegisterCapabilitiesRegistryServer(s, capability.NewCapabilitiesRegistryServer(p.BrokerExt, capabilityRegistry)) }) if err != nil { - return 0, deps, fmt.Errorf("failed to serve new capability registry: %w", err) + return 0, nil, fmt.Errorf("failed to serve new capability registry: %w", err) } deps.Add(capabilityRegistryResource) @@ -89,9 +89,9 @@ func (p *PluginRelayerClient) NewRelayer(ctx context.Context, config string, key CapabilityRegistryID: capabilityRegistryID, }) if err != nil { - return 0, deps, fmt.Errorf("Failed to create relayer client: failed request: %w", err) + return 0, nil, fmt.Errorf("Failed to create relayer client: failed request: %w", err) } - return reply.RelayerID, deps, nil + return reply.RelayerID, nil, nil }) return newRelayerClient(p.BrokerExt, cc), nil } @@ -127,7 +127,7 @@ func (p *pluginRelayerServer) NewRelayer(ctx context.Context, request *pb.NewRel p.CloseAll(ksRes) return nil, net.ErrConnDial{Name: "CSAKeystore", ID: request.KeystoreCSAID, Err: err} } - ksCSARes := net.Resource{Closer: ksCSAConn, Name: "CSAKeystore"} + ksCSARes := net.Resource{Closer: ksConn, Name: "CSAKeystore"} capRegistryConn, err := p.Dial(request.CapabilityRegistryID) if err != nil { @@ -324,7 +324,7 @@ func (r *relayerClient) NewCCIPProvider(ctx context.Context, cargs types.CCIPPro ccipocr3pb.RegisterExtraDataCodecBundleServer(s, ccipocr3loop.NewExtraDataCodecBundleServer(cargs.ExtraDataCodecBundle)) }) if err != nil { - return 0, deps, fmt.Errorf("failed to serve ExtraDataCodecBundle: %w", err) + return 0, nil, fmt.Errorf("failed to serve ExtraDataCodecBundle: %w", err) } deps.Add(edcRes) extraDataCodecBundleID = edcID @@ -344,7 +344,7 @@ func (r *relayerClient) NewCCIPProvider(ctx context.Context, cargs types.CCIPPro }, }) if err != nil { - return 0, deps, err + return 0, nil, err } return reply.CcipProviderID, deps, nil })