From ff66aade9f57c558e4e559addddd7825c7046b71 Mon Sep 17 00:00:00 2001 From: Bolek <1416262+bolekk@users.noreply.github.com> Date: Mon, 6 Apr 2026 01:28:20 -0700 Subject: [PATCH 1/4] [DONTime] Rollout flag for pruning fix (#1963) --- pkg/workflows/dontime/pb/dontime.pb.go | 25 ++++-- pkg/workflows/dontime/pb/dontime.proto | 3 + pkg/workflows/dontime/plugin.go | 104 ++++++++++++++++++++++--- pkg/workflows/dontime/plugin_test.go | 94 ++++++++++++++-------- 4 files changed, 174 insertions(+), 52 deletions(-) diff --git a/pkg/workflows/dontime/pb/dontime.pb.go b/pkg/workflows/dontime/pb/dontime.pb.go index 01a5f4b84a..18aa3f3eec 100644 --- a/pkg/workflows/dontime/pb/dontime.pb.go +++ b/pkg/workflows/dontime/pb/dontime.pb.go @@ -22,11 +22,14 @@ const ( ) type Observation struct { - state protoimpl.MessageState `protogen:"open.v1"` - Timestamp int64 `protobuf:"varint,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"` - Requests map[string]int64 `protobuf:"bytes,2,rep,name=requests,proto3" json:"requests,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"varint,2,opt,name=value"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + Timestamp int64 `protobuf:"varint,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + Requests map[string]int64 `protobuf:"bytes,2,rep,name=requests,proto3" json:"requests,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"varint,2,opt,name=value"` + // Flag to roll out execution pruning fix. + // TODO(CRE-2497): Remove after rollout. + PruneExecutions bool `protobuf:"varint,3,opt,name=prune_executions,json=pruneExecutions,proto3" json:"prune_executions,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *Observation) Reset() { @@ -73,6 +76,13 @@ func (x *Observation) GetRequests() map[string]int64 { return nil } +func (x *Observation) GetPruneExecutions() bool { + if x != nil { + return x.PruneExecutions + } + return false +} + type Observations struct { state protoimpl.MessageState `protogen:"open.v1"` // batched observations for multiple workflow execution IDs @@ -218,10 +228,11 @@ var File_dontime_proto protoreflect.FileDescriptor const file_dontime_proto_rawDesc = "" + "\n" + - "\rdontime.proto\"\xa0\x01\n" + + "\rdontime.proto\"\xcb\x01\n" + "\vObservation\x12\x1c\n" + "\ttimestamp\x18\x01 \x01(\x03R\ttimestamp\x126\n" + - "\brequests\x18\x02 \x03(\v2\x1a.Observation.RequestsEntryR\brequests\x1a;\n" + + "\brequests\x18\x02 \x03(\v2\x1a.Observation.RequestsEntryR\brequests\x12)\n" + + "\x10prune_executions\x18\x03 \x01(\bR\x0fpruneExecutions\x1a;\n" + "\rRequestsEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + "\x05value\x18\x02 \x01(\x03R\x05value:\x028\x01\"@\n" + diff --git a/pkg/workflows/dontime/pb/dontime.proto b/pkg/workflows/dontime/pb/dontime.proto index 9e68be2fbc..9dc77edd2e 100644 --- a/pkg/workflows/dontime/pb/dontime.proto +++ b/pkg/workflows/dontime/pb/dontime.proto @@ -5,6 +5,9 @@ option go_package = "github.com/smartcontractkit/chainlink-common/pkg/workflows/ message Observation { int64 timestamp = 1; map requests = 2; + // Flag to roll out execution pruning fix. + // TODO(CRE-2497): Remove after rollout. + bool prune_executions = 3; } message Observations { diff --git a/pkg/workflows/dontime/plugin.go b/pkg/workflows/dontime/plugin.go index 4cd521c630..2476814337 100644 --- a/pkg/workflows/dontime/plugin.go +++ b/pkg/workflows/dontime/plugin.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "go.opentelemetry.io/otel/metric" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/structpb" @@ -16,10 +17,51 @@ import ( "github.com/smartcontractkit/libocr/offchainreporting2plus/types" "github.com/smartcontractkit/libocr/quorumhelper" + "github.com/smartcontractkit/chainlink-common/pkg/beholder" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/workflows/dontime/pb" ) +type pluginMetrics struct { + donTime metric.Int64Gauge + donTimeEntries metric.Int64Gauge + outcomeSize metric.Int64Gauge +} + +func newPluginMetrics() (pluginMetrics, error) { + meter := beholder.GetMeter() + + donTime, err := meter.Int64Gauge("platform_dontime_outcome_don_time_ms", + metric.WithDescription("DON consensus timestamp included in the latest outcome, in milliseconds"), + metric.WithUnit("ms"), + ) + if err != nil { + return pluginMetrics{}, fmt.Errorf("failed to create don_time gauge: %w", err) + } + + donTimeEntries, err := meter.Int64Gauge("platform_dontime_outcome_entries", + metric.WithDescription("Number of workflow execution entries tracked in the latest outcome"), + metric.WithUnit("{entry}"), + ) + if err != nil { + return pluginMetrics{}, fmt.Errorf("failed to create don_time_entries gauge: %w", err) + } + + outcomeSize, err := meter.Int64Gauge("platform_dontime_outcome_size_bytes", + metric.WithDescription("Serialised size of the latest outcome in bytes"), + metric.WithUnit("By"), + ) + if err != nil { + return pluginMetrics{}, fmt.Errorf("failed to create outcome_size gauge: %w", err) + } + + return pluginMetrics{ + donTime: donTime, + donTimeEntries: donTimeEntries, + outcomeSize: outcomeSize, + }, nil +} + type Plugin struct { mu sync.RWMutex @@ -30,6 +72,8 @@ type Plugin struct { batchSize int minTimeIncrease int64 + + metrics pluginMetrics } var _ ocr3types.ReportingPlugin[[]byte] = (*Plugin)(nil) @@ -45,6 +89,11 @@ func NewPlugin(store *Store, config ocr3types.ReportingPluginConfig, offchainCfg return nil, errors.New("execution removal time must be positive") } + metrics, err := newPluginMetrics() + if err != nil { + return nil, err + } + return &Plugin{ store: store, config: config, @@ -52,6 +101,7 @@ func NewPlugin(store *Store, config ocr3types.ReportingPluginConfig, offchainCfg lggr: logger.Named(lggr, "DONTimePlugin"), batchSize: int(offchainCfg.MaxBatchSize), minTimeIncrease: offchainCfg.MinTimeIncrease / int64(time.Millisecond), + metrics: metrics, }, nil } @@ -100,8 +150,9 @@ func (p *Plugin) Observation(_ context.Context, outctx ocr3types.OutcomeContext, } observation := &pb.Observation{ - Timestamp: time.Now().UTC().UnixMilli(), - Requests: requests, + Timestamp: time.Now().UTC().UnixMilli(), + Requests: requests, + PruneExecutions: true, } return proto.MarshalOptions{Deterministic: true}.Marshal(observation) @@ -115,7 +166,7 @@ func (p *Plugin) ObservationQuorum(_ context.Context, _ ocr3types.OutcomeContext return quorumhelper.ObservationCountReachesObservationQuorum(quorumhelper.QuorumTwoFPlusOne, p.config.N, p.config.F, aos), nil } -func (p *Plugin) Outcome(_ context.Context, outctx ocr3types.OutcomeContext, _ types.Query, aos []types.AttributedObservation) (ocr3types.Outcome, error) { +func (p *Plugin) Outcome(ctx context.Context, outctx ocr3types.OutcomeContext, _ types.Query, aos []types.AttributedObservation) (ocr3types.Outcome, error) { observationCounts := map[string]int64{} // counts how many nodes reported where a new DON timestamp might be needed type timestampNodePair struct { Timestamp int64 @@ -132,14 +183,33 @@ func (p *Plugin) Outcome(_ context.Context, outctx ocr3types.OutcomeContext, _ t prevOutcome.ObservedDonTimes = make(map[string]*pb.ObservedDonTimes) } + // Unmarshal all observations once and compute pruneExecutions. + // Only prune when all nodes are updated. Even if this rolls back, the logic is still correct. + parsedAOs := make([]*pb.Observation, len(aos)) + pruneExecutions := true for idx, ao := range aos { observation := &pb.Observation{} if err := proto.Unmarshal(ao.Observation, observation); err != nil { p.lggr.Errorf("failed to unmarshal observation in Outcome phase") continue } + parsedAOs[idx] = observation + if !observation.PruneExecutions { + pruneExecutions = false // need all nodes to agree + } + } + + for idx, observation := range parsedAOs { + if observation == nil { + continue + } for id, requestSeqNum := range observation.Requests { + if !pruneExecutions { // TODO(CRE-2497): legacy behavior, remove after rollout + if _, ok := prevOutcome.ObservedDonTimes[id]; !ok { + prevOutcome.ObservedDonTimes[id] = &pb.ObservedDonTimes{} + } + } var currSeqNum int64 if times, ok := prevOutcome.ObservedDonTimes[id]; ok { currSeqNum = int64(len(times.Timestamps)) @@ -199,14 +269,23 @@ func (p *Plugin) Outcome(_ context.Context, outctx ocr3types.OutcomeContext, _ t // Remove expired and empty workflow executions for id, observedTimes := range outcome.ObservedDonTimes { - if observedTimes == nil || len(observedTimes.Timestamps) == 0 { - delete(outcome.ObservedDonTimes, id) - p.store.deleteExecutionID(id) - continue - } - if donTime >= observedTimes.Timestamps[0]+p.offChainConfig.ExecutionRemovalTime.AsDuration().Milliseconds() { - delete(outcome.ObservedDonTimes, id) - p.store.deleteExecutionID(id) + if !pruneExecutions { // TODO(CRE-2497): legacy behavior, remove after rollout + if observedTimes != nil && len(observedTimes.Timestamps) > 0 { + if donTime >= observedTimes.Timestamps[0]+p.offChainConfig.ExecutionRemovalTime.AsDuration().Milliseconds() { + delete(outcome.ObservedDonTimes, id) + p.store.deleteExecutionID(id) + } + } + } else { + if observedTimes == nil || len(observedTimes.Timestamps) == 0 { + delete(outcome.ObservedDonTimes, id) + p.store.deleteExecutionID(id) + continue + } + if donTime >= observedTimes.Timestamps[0]+p.offChainConfig.ExecutionRemovalTime.AsDuration().Milliseconds() { + delete(outcome.ObservedDonTimes, id) + p.store.deleteExecutionID(id) + } } } @@ -215,6 +294,9 @@ func (p *Plugin) Outcome(_ context.Context, outctx ocr3types.OutcomeContext, _ t "observedDonTimesEntries", len(outcome.ObservedDonTimes), "outcomeSizeBytes", len(outcomeBytes), ) + p.metrics.donTime.Record(ctx, outcome.Timestamp) + p.metrics.donTimeEntries.Record(ctx, int64(len(outcome.ObservedDonTimes))) + p.metrics.outcomeSize.Record(ctx, int64(len(outcomeBytes))) return outcomeBytes, err } diff --git a/pkg/workflows/dontime/plugin_test.go b/pkg/workflows/dontime/plugin_test.go index 89c33fdcde..e3b4f73436 100644 --- a/pkg/workflows/dontime/plugin_test.go +++ b/pkg/workflows/dontime/plugin_test.go @@ -158,28 +158,24 @@ func TestPlugin_Outcome(t *testing.T) { timestamp := time.Now().UnixMilli() observations := []*pb.Observation{ { - Timestamp: timestamp, - Requests: map[string]int64{ - executionID: 0, - }, + Timestamp: timestamp, + Requests: map[string]int64{executionID: 0}, + PruneExecutions: true, }, { - Timestamp: timestamp - int64(time.Second), - Requests: map[string]int64{ - executionID: 0, - }, + Timestamp: timestamp - int64(time.Second), + Requests: map[string]int64{executionID: 0}, + PruneExecutions: true, }, { - Timestamp: timestamp + int64(time.Second), - Requests: map[string]int64{ - executionID: 0, - }, + Timestamp: timestamp + int64(time.Second), + Requests: map[string]int64{executionID: 0}, + PruneExecutions: true, }, { - Timestamp: timestamp, - Requests: map[string]int64{ - executionID: 0, - }, + Timestamp: timestamp, + Requests: map[string]int64{executionID: 0}, + PruneExecutions: true, }, } @@ -223,8 +219,9 @@ func TestPlugin_Outcome_SequenceNumberHandling(t *testing.T) { aos := make([]types.AttributedObservation, numNodes) for i := 0; i < numNodes; i++ { obs := &pb.Observation{ - Timestamp: timestamp + int64(i), - Requests: requests, + Timestamp: timestamp + int64(i), + Requests: requests, + PruneExecutions: true, } rawObs, err := proto.Marshal(obs) require.NoError(t, err) @@ -433,22 +430,10 @@ func TestPlugin_FinishedExecutions(t *testing.T) { t.Run("Outcome: remove expired workflow executions", func(t *testing.T) { timestamp := time.Now().UnixMilli() observations := []*pb.Observation{ - { - Timestamp: timestamp, - Requests: map[string]int64{}, - }, - { - Timestamp: timestamp - int64(time.Second), - Requests: map[string]int64{}, - }, - { - Timestamp: timestamp + int64(time.Second), - Requests: map[string]int64{}, - }, - { - Timestamp: timestamp, - Requests: map[string]int64{}, - }, + {Timestamp: timestamp, Requests: map[string]int64{}, PruneExecutions: true}, + {Timestamp: timestamp - int64(time.Second), Requests: map[string]int64{}, PruneExecutions: true}, + {Timestamp: timestamp + int64(time.Second), Requests: map[string]int64{}, PruneExecutions: true}, + {Timestamp: timestamp, Requests: map[string]int64{}, PruneExecutions: true}, } aos := make([]types.AttributedObservation, 4) @@ -483,6 +468,47 @@ func TestPlugin_FinishedExecutions(t *testing.T) { require.NotContains(t, outcomeProto.ObservedDonTimes, "workflow-123") }) + t.Run("Outcome: legacy path when only half nodes have PruneExecutions set", func(t *testing.T) { + timestamp := time.Now().UnixMilli() + emptyID := "empty-workflow" + + // Only 2 of 4 nodes have PruneExecutions=true → pruneExecutions stays false → legacy path. + observations := []*pb.Observation{ + {Timestamp: timestamp, Requests: map[string]int64{}, PruneExecutions: true}, + {Timestamp: timestamp - int64(time.Second), Requests: map[string]int64{}, PruneExecutions: true}, + {Timestamp: timestamp + int64(time.Second), Requests: map[string]int64{}, PruneExecutions: false}, + {Timestamp: timestamp, Requests: map[string]int64{}, PruneExecutions: false}, + } + + aos := make([]types.AttributedObservation, len(observations)) + for i, obs := range observations { + rawObs, err := proto.Marshal(obs) + require.NoError(t, err) + aos[i] = types.AttributedObservation{Observation: rawObs, Observer: commontypes.OracleID(i)} + } + + // prevOutcome contains an entry for emptyID with no timestamps. + prevOutcome := &pb.Outcome{ + Timestamp: timestamp - 1000, + ObservedDonTimes: map[string]*pb.ObservedDonTimes{ + emptyID: {Timestamps: []int64{}}, + }, + } + prevOutcomeBytes, err := proto.Marshal(prevOutcome) + require.NoError(t, err) + + outcome, err := plugin.Outcome(ctx, ocr3types.OutcomeContext{PreviousOutcome: prevOutcomeBytes}, query, aos) + require.NoError(t, err) + + legacyOutcomeProto := &pb.Outcome{} + err = proto.Unmarshal(outcome, legacyOutcomeProto) + require.NoError(t, err) + + // Legacy behavior: empty-timestamps entry is NOT pruned. + require.Contains(t, legacyOutcomeProto.ObservedDonTimes, emptyID) + require.Empty(t, legacyOutcomeProto.ObservedDonTimes[emptyID].Timestamps) + }) + t.Run("Transmit: delete removed executionIDs", func(t *testing.T) { store.setDonTimes("workflow-123", []int64{time.Now().UnixMilli()}) From 86eeeebd94fb1562cbb6ae124bb1670ddf5e168c Mon Sep 17 00:00:00 2001 From: Oliver Townsend Date: Tue, 28 Apr 2026 09:43:42 -0700 Subject: [PATCH 2/4] Revert "Clean up dropped resources in relayer construction (#1927)" This reverts commit c8e0d77df4210cea915ed317ad835cf7fec8bd54. --- pkg/loop/internal/relayer/relayer.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/loop/internal/relayer/relayer.go b/pkg/loop/internal/relayer/relayer.go index 26220c9b5b..e9f6c68294 100644 --- a/pkg/loop/internal/relayer/relayer.go +++ b/pkg/loop/internal/relayer/relayer.go @@ -61,7 +61,7 @@ func (p *PluginRelayerClient) NewRelayer(ctx context.Context, config string, key pb.RegisterKeystoreServer(s, ks.NewServer(keystore)) }) if err != nil { - return 0, deps, fmt.Errorf("Failed to create relayer client: failed to serve keystore: %w", err) + return 0, nil, fmt.Errorf("Failed to create relayer client: failed to serve keystore: %w", err) } deps.Add(ksRes) @@ -70,7 +70,7 @@ func (p *PluginRelayerClient) NewRelayer(ctx context.Context, config string, key pb.RegisterKeystoreServer(s, ks.NewServer(csaKeystore)) }) if err != nil { - return 0, deps, fmt.Errorf("Failed to create relayer client: failed to serve CSA keystore: %w", err) + return 0, nil, fmt.Errorf("Failed to create relayer client: failed to serve CSA keystore: %w", err) } deps.Add(ksCSARes) @@ -78,7 +78,7 @@ func (p *PluginRelayerClient) NewRelayer(ctx context.Context, config string, key pb.RegisterCapabilitiesRegistryServer(s, capability.NewCapabilitiesRegistryServer(p.BrokerExt, capabilityRegistry)) }) if err != nil { - return 0, deps, fmt.Errorf("failed to serve new capability registry: %w", err) + return 0, nil, fmt.Errorf("failed to serve new capability registry: %w", err) } deps.Add(capabilityRegistryResource) @@ -89,9 +89,9 @@ func (p *PluginRelayerClient) NewRelayer(ctx context.Context, config string, key CapabilityRegistryID: capabilityRegistryID, }) if err != nil { - return 0, deps, fmt.Errorf("Failed to create relayer client: failed request: %w", err) + return 0, nil, fmt.Errorf("Failed to create relayer client: failed request: %w", err) } - return reply.RelayerID, deps, nil + return reply.RelayerID, nil, nil }) return newRelayerClient(p.BrokerExt, cc), nil } @@ -127,7 +127,7 @@ func (p *pluginRelayerServer) NewRelayer(ctx context.Context, request *pb.NewRel p.CloseAll(ksRes) return nil, net.ErrConnDial{Name: "CSAKeystore", ID: request.KeystoreCSAID, Err: err} } - ksCSARes := net.Resource{Closer: ksCSAConn, Name: "CSAKeystore"} + ksCSARes := net.Resource{Closer: ksConn, Name: "CSAKeystore"} capRegistryConn, err := p.Dial(request.CapabilityRegistryID) if err != nil { @@ -324,7 +324,7 @@ func (r *relayerClient) NewCCIPProvider(ctx context.Context, cargs types.CCIPPro ccipocr3pb.RegisterExtraDataCodecBundleServer(s, ccipocr3loop.NewExtraDataCodecBundleServer(cargs.ExtraDataCodecBundle)) }) if err != nil { - return 0, deps, fmt.Errorf("failed to serve ExtraDataCodecBundle: %w", err) + return 0, nil, fmt.Errorf("failed to serve ExtraDataCodecBundle: %w", err) } deps.Add(edcRes) extraDataCodecBundleID = edcID @@ -344,7 +344,7 @@ func (r *relayerClient) NewCCIPProvider(ctx context.Context, cargs types.CCIPPro }, }) if err != nil { - return 0, deps, err + return 0, nil, err } return reply.CcipProviderID, deps, nil }) From f4b1c4aa03efd2eeef5b8ba1629c94ff84e19927 Mon Sep 17 00:00:00 2001 From: Jordan Krage Date: Mon, 4 May 2026 11:58:27 -0500 Subject: [PATCH 3/4] pkg/loop/internal: cleanup resources; fail TestRelayerService if goroutines leak (#2027) --- go.mod | 3 +- go.sum | 4 +- .../core/services/oraclefactory/client.go | 2 +- .../reportingplugin/ocr2/reporting.go | 2 +- .../ocr2/reporting_plugin_service.go | 16 +++--- .../reportingplugin/ocr3/reporting.go | 11 ++-- .../ocr3/reporting_plugin_service.go | 16 +++--- .../core/services/validation/validation.go | 3 +- pkg/loop/internal/goplugin/plugin.go | 3 +- pkg/loop/internal/goplugin/service.go | 6 +-- pkg/loop/internal/net/client.go | 51 +++++++++++++++++-- .../contractwriter/contract_writer.go | 2 +- .../ext/ccip/commit_provider.go | 2 +- .../pluginprovider/ext/ccip/commit_store.go | 2 +- .../ext/ccip/execution_provider.go | 2 +- .../pluginprovider/ext/ccip/offramp.go | 2 +- .../ccip/test/commit_gas_estimator_test.go | 2 +- .../ext/ccip/test/exec_gas_estimator_test.go | 2 +- .../ext/ccip/test/onramp_test.go | 2 +- .../ext/ccip/test/price_registry_test.go | 2 +- .../ext/ccip/test/pricegetter_test.go | 2 +- .../ext/ccip/test/token_data_test.go | 2 +- .../ext/ccip/test/token_pool_test.go | 2 +- .../ext/ccipocr3/ccip_provider.go | 4 +- .../pluginprovider/ext/median/median.go | 4 +- .../pluginprovider/ext/mercury/mercury.go | 2 +- .../ext/ocr3capability/capability.go | 4 +- .../relayer/pluginprovider/ocr2/config.go | 2 +- .../pluginprovider/ocr2/plugin_provider.go | 7 ++- pkg/loop/internal/relayer/relayer.go | 4 +- pkg/loop/internal/relayerset/client.go | 4 +- .../internal/reportingplugin/ccip/commit.go | 2 +- .../reportingplugin/ccip/execution.go | 4 +- .../internal/reportingplugin/median/median.go | 14 ++--- .../reportingplugin/mercury/mercury.go | 16 +++--- .../mercury/mercury_reporting.go | 2 +- pkg/loop/internal/test/grpc_scaffold.go | 2 +- pkg/loop/relayer_service_test.go | 7 +-- pkg/loop/reportingplugins/grpc.go | 6 +-- pkg/loop/reportingplugins/ocr3/grpc.go | 6 +-- pkg/utils/tests/leak.go | 25 +++++++++ 41 files changed, 162 insertions(+), 94 deletions(-) create mode 100644 pkg/utils/tests/leak.go diff --git a/go.mod b/go.mod index addfd096ae..58b823ffdc 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/google/uuid v1.6.0 github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1 github.com/hashicorp/go-hclog v1.6.3 - github.com/hashicorp/go-plugin v1.7.0 + github.com/hashicorp/go-plugin v1.8.0 github.com/iancoleman/strcase v0.3.0 github.com/invopop/jsonschema v0.13.0 github.com/jackc/pgx/v4 v4.18.3 @@ -67,6 +67,7 @@ require ( go.opentelemetry.io/otel/sdk/log v0.15.0 go.opentelemetry.io/otel/sdk/metric v1.39.0 go.opentelemetry.io/otel/trace v1.39.0 + go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.1 golang.org/x/crypto v0.47.0 golang.org/x/exp v0.0.0-20260112195511-716be5621a96 diff --git a/go.sum b/go.sum index b90e6b6184..fe8b15a167 100644 --- a/go.sum +++ b/go.sum @@ -143,8 +143,8 @@ github.com/hako/durafmt v0.0.0-20200710122514-c0fb7b4da026 h1:BpJ2o0OR5FV7vrkDYf github.com/hako/durafmt v0.0.0-20200710122514-c0fb7b4da026/go.mod h1:5Scbynm8dF1XAPwIwkGPqzkM/shndPm79Jd1003hTjE= github.com/hashicorp/go-hclog v1.6.3 h1:Qr2kF+eVWjTiYmU7Y31tYlP1h0q/X3Nl3tPGdaB11/k= github.com/hashicorp/go-hclog v1.6.3/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M= -github.com/hashicorp/go-plugin v1.7.0 h1:YghfQH/0QmPNc/AZMTFE3ac8fipZyZECHdDPshfk+mA= -github.com/hashicorp/go-plugin v1.7.0/go.mod h1:BExt6KEaIYx804z8k4gRzRLEvxKVb+kn0NMcihqOqb8= +github.com/hashicorp/go-plugin v1.8.0 h1:ie8S6RRY8RvB2usYZv+AAZ/wBvx2AU5p5QeP5j/FORs= +github.com/hashicorp/go-plugin v1.8.0/go.mod h1:BExt6KEaIYx804z8k4gRzRLEvxKVb+kn0NMcihqOqb8= github.com/hashicorp/yamux v0.1.2 h1:XtB8kyFOyHXYVFnwT5C3+Bdo8gArse7j2AQ0DA0Uey8= github.com/hashicorp/yamux v0.1.2/go.mod h1:C+zze2n6e/7wshOZep2A70/aQU6QBRWJO/G6FT1wIns= github.com/iancoleman/strcase v0.3.0 h1:nTXanmYxhfFAMjZL34Ov6gkzEsSJZ5DbhxWjvSASxEI= diff --git a/pkg/loop/internal/core/services/oraclefactory/client.go b/pkg/loop/internal/core/services/oraclefactory/client.go index 25ebd860da..1a2b32adba 100644 --- a/pkg/loop/internal/core/services/oraclefactory/client.go +++ b/pkg/loop/internal/core/services/oraclefactory/client.go @@ -28,7 +28,7 @@ type client struct { serviceClient *goplugin.ServiceClient } -func NewClient(log logger.Logger, b *net.BrokerExt, conn grpc.ClientConnInterface) *client { +func NewClient(log logger.Logger, b *net.BrokerExt, conn net.ClientConnInterface) *client { b = b.WithName("OracleFactoryClient") return &client{ log: log, diff --git a/pkg/loop/internal/core/services/reportingplugin/ocr2/reporting.go b/pkg/loop/internal/core/services/reportingplugin/ocr2/reporting.go index f3969d7640..107753ee23 100644 --- a/pkg/loop/internal/core/services/reportingplugin/ocr2/reporting.go +++ b/pkg/loop/internal/core/services/reportingplugin/ocr2/reporting.go @@ -21,7 +21,7 @@ type ReportingPluginFactoryClient struct { grpc pb.ReportingPluginFactoryClient } -func NewReportingPluginFactoryClient(b *net.BrokerExt, cc grpc.ClientConnInterface) *ReportingPluginFactoryClient { +func NewReportingPluginFactoryClient(b *net.BrokerExt, cc net.ClientConnInterface) *ReportingPluginFactoryClient { b = b.WithName("ReportingPluginProviderClient") return &ReportingPluginFactoryClient{ BrokerExt: b, diff --git a/pkg/loop/internal/core/services/reportingplugin/ocr2/reporting_plugin_service.go b/pkg/loop/internal/core/services/reportingplugin/ocr2/reporting_plugin_service.go index 2621ba778c..a8c4869e43 100644 --- a/pkg/loop/internal/core/services/reportingplugin/ocr2/reporting_plugin_service.go +++ b/pkg/loop/internal/core/services/reportingplugin/ocr2/reporting_plugin_service.go @@ -59,7 +59,7 @@ func (m *ReportingPluginServiceClient) NewReportingPluginFactory( pb.RegisterPipelineRunnerServiceServer(s, pipeline.NewRunnerServer(pipelineRunner)) }) if err != nil { - return 0, nil, err + return 0, deps, err } deps.Add(pipelineRunnerRes) @@ -67,7 +67,7 @@ func (m *ReportingPluginServiceClient) NewReportingPluginFactory( pb.RegisterTelemetryServer(s, telemetry.NewTelemetryServer(telemetryService)) }) if err != nil { - return 0, nil, err + return 0, deps, err } deps.Add(telemetryRes) @@ -75,7 +75,7 @@ func (m *ReportingPluginServiceClient) NewReportingPluginFactory( pb.RegisterErrorLogServer(s, errorlog.NewServer(errorLog)) }) if err != nil { - return 0, nil, err + return 0, deps, err } deps.Add(errorLogRes) @@ -84,14 +84,14 @@ func (m *ReportingPluginServiceClient) NewReportingPluginFactory( }) if err != nil { - return 0, nil, fmt.Errorf("failed to serve new key value store: %w", err) + return 0, deps, fmt.Errorf("failed to serve new key value store: %w", err) } deps.Add(keyValueStoreRes) relayerSetServer, relayerSetServerRes := relayerset.NewRelayerSetServer(m.Logger, relayerSet, m.BrokerExt) if err != nil { - return 0, nil, fmt.Errorf("failed to create new relayer set: %w", err) + return 0, deps, fmt.Errorf("failed to create new relayer set: %w", err) } relayerSetID, relayerSetRes, err := m.ServeNew("RelayerSet", func(s *grpc.Server) { @@ -99,7 +99,7 @@ func (m *ReportingPluginServiceClient) NewReportingPluginFactory( }) if err != nil { - return 0, nil, fmt.Errorf("failed to serve new relayer set: %w", err) + return 0, deps, fmt.Errorf("failed to serve new relayer set: %w", err) } deps.Add(relayerSetRes) @@ -121,9 +121,9 @@ func (m *ReportingPluginServiceClient) NewReportingPluginFactory( RelayerSetID: relayerSetID, }) if err != nil { - return 0, nil, err + return 0, deps, err } - return reply.ID, nil, nil + return reply.ID, deps, nil }) return NewReportingPluginFactoryClient(m.PluginClient.BrokerExt, cc), nil } diff --git a/pkg/loop/internal/core/services/reportingplugin/ocr3/reporting.go b/pkg/loop/internal/core/services/reportingplugin/ocr3/reporting.go index 702c069620..10f2a26579 100644 --- a/pkg/loop/internal/core/services/reportingplugin/ocr3/reporting.go +++ b/pkg/loop/internal/core/services/reportingplugin/ocr3/reporting.go @@ -2,6 +2,8 @@ package ocr3 import ( "context" + "errors" + "io" "math" "time" @@ -24,7 +26,7 @@ type reportingPluginFactoryClient struct { grpc ocr3.ReportingPluginFactoryClient } -func NewReportingPluginFactoryClient(b *net.BrokerExt, cc grpc.ClientConnInterface) *reportingPluginFactoryClient { +func NewReportingPluginFactoryClient(b *net.BrokerExt, cc net.ClientConnInterface) *reportingPluginFactoryClient { b = b.WithName("OCR3ReportingPluginProviderClient") return &reportingPluginFactoryClient{b, goplugin.NewServiceClient(b, cc), ocr3.NewReportingPluginFactoryClient(cc)} } @@ -126,6 +128,7 @@ var _ ocr3types.ReportingPlugin[[]byte] = (*reportingPluginClient)(nil) type reportingPluginClient struct { *net.BrokerExt + cc io.Closer // backing client connection grpc ocr3.ReportingPluginClient } @@ -221,11 +224,11 @@ func (o *reportingPluginClient) Close() error { defer cancel() _, err := o.grpc.Close(ctx, &emptypb.Empty{}) - return err + return errors.Join(err, o.cc.Close()) } -func newReportingPluginClient(b *net.BrokerExt, cc grpc.ClientConnInterface) *reportingPluginClient { - return &reportingPluginClient{b.WithName("OCR3ReportingPluginClient"), ocr3.NewReportingPluginClient(cc)} +func newReportingPluginClient(b *net.BrokerExt, cc net.ClientConnInterface) *reportingPluginClient { + return &reportingPluginClient{b.WithName("OCR3ReportingPluginClient"), cc, ocr3.NewReportingPluginClient(cc)} } var _ ocr3.ReportingPluginServer = (*reportingPluginServer)(nil) diff --git a/pkg/loop/internal/core/services/reportingplugin/ocr3/reporting_plugin_service.go b/pkg/loop/internal/core/services/reportingplugin/ocr3/reporting_plugin_service.go index 185e1a0681..72f49b5d5f 100644 --- a/pkg/loop/internal/core/services/reportingplugin/ocr3/reporting_plugin_service.go +++ b/pkg/loop/internal/core/services/reportingplugin/ocr3/reporting_plugin_service.go @@ -59,7 +59,7 @@ func (o *ReportingPluginServiceClient) NewReportingPluginFactory( pb.RegisterPipelineRunnerServiceServer(s, pipeline.NewRunnerServer(pipelineRunner)) }) if err != nil { - return 0, nil, err + return 0, deps, err } deps.Add(pipelineRunnerRes) @@ -67,7 +67,7 @@ func (o *ReportingPluginServiceClient) NewReportingPluginFactory( pb.RegisterTelemetryServer(s, telemetry.NewTelemetryServer(telemetryService)) }) if err != nil { - return 0, nil, err + return 0, deps, err } deps.Add(telemetryRes) @@ -75,7 +75,7 @@ func (o *ReportingPluginServiceClient) NewReportingPluginFactory( pb.RegisterErrorLogServer(s, errorlog.NewServer(errorLog)) }) if err != nil { - return 0, nil, err + return 0, deps, err } deps.Add(errorLogRes) @@ -83,7 +83,7 @@ func (o *ReportingPluginServiceClient) NewReportingPluginFactory( pb.RegisterCapabilitiesRegistryServer(s, capability.NewCapabilitiesRegistryServer(o.BrokerExt, capRegistry)) }) if err != nil { - return 0, nil, err + return 0, deps, err } deps.Add(capRegistryRes) @@ -91,7 +91,7 @@ func (o *ReportingPluginServiceClient) NewReportingPluginFactory( pb.RegisterKeyValueStoreServer(s, keyvalue.NewServer(keyValueStore)) }) if err != nil { - return 0, nil, fmt.Errorf("failed to serve KeyValueStore: %w", err) + return 0, deps, fmt.Errorf("failed to serve KeyValueStore: %w", err) } deps.Add(keyValueStoreRes) @@ -102,7 +102,7 @@ func (o *ReportingPluginServiceClient) NewReportingPluginFactory( }) if err != nil { - return 0, nil, fmt.Errorf("failed to serve new relayer set: %w", err) + return 0, deps, fmt.Errorf("failed to serve new relayer set: %w", err) } deps.Add(relayerSetRes) @@ -125,9 +125,9 @@ func (o *ReportingPluginServiceClient) NewReportingPluginFactory( RelayerSetID: relayerSetID, }) if err != nil { - return 0, nil, err + return 0, deps, err } - return reply.ID, nil, nil + return reply.ID, deps, nil }) return NewReportingPluginFactoryClient(o.PluginClient.BrokerExt, cc), nil } diff --git a/pkg/loop/internal/core/services/validation/validation.go b/pkg/loop/internal/core/services/validation/validation.go index e06ad7f7ca..c2f6789a90 100644 --- a/pkg/loop/internal/core/services/validation/validation.go +++ b/pkg/loop/internal/core/services/validation/validation.go @@ -3,7 +3,6 @@ package validation import ( "context" - "google.golang.org/grpc" "google.golang.org/protobuf/types/known/structpb" "github.com/smartcontractkit/chainlink-common/pkg/loop/internal/goplugin" @@ -29,7 +28,7 @@ func (v *validationServiceClient) ValidateConfig(ctx context.Context, config map return err } -func NewValidationServiceClient(b *net.BrokerExt, cc grpc.ClientConnInterface) *validationServiceClient { +func NewValidationServiceClient(b *net.BrokerExt, cc net.ClientConnInterface) *validationServiceClient { b = b.WithName("ReportingPluginProviderClient") return &validationServiceClient{b, goplugin.NewServiceClient(b, cc), pb.NewValidationServiceClient(cc)} } diff --git a/pkg/loop/internal/goplugin/plugin.go b/pkg/loop/internal/goplugin/plugin.go index 013e822fb8..6f5635f2ed 100644 --- a/pkg/loop/internal/goplugin/plugin.go +++ b/pkg/loop/internal/goplugin/plugin.go @@ -1,6 +1,7 @@ package goplugin import ( + "github.com/hashicorp/go-plugin" "google.golang.org/grpc" "github.com/smartcontractkit/chainlink-common/pkg/loop/internal/net" @@ -19,7 +20,7 @@ func NewPluginClient(brokerCfg net.BrokerConfig) *PluginClient { return &pc } -func (p *PluginClient) Refresh(broker net.Broker, conn *grpc.ClientConn) { +func (p *PluginClient) Refresh(broker *plugin.GRPCBroker, conn *grpc.ClientConn) { p.AtomicBroker.Store(broker) p.AtomicClient.Store(conn) p.Logger.Debugw("Refreshed PluginClient connection", "state", conn.GetState()) diff --git a/pkg/loop/internal/goplugin/service.go b/pkg/loop/internal/goplugin/service.go index f8f1557513..b4bc00b0ce 100644 --- a/pkg/loop/internal/goplugin/service.go +++ b/pkg/loop/internal/goplugin/service.go @@ -24,11 +24,11 @@ var ( // to another loop that is proxied through the core node. type ServiceClient struct { b *net.BrokerExt - cc grpc.ClientConnInterface + cc net.ClientConnInterface grpc pb.ServiceClient } -func NewServiceClient(b *net.BrokerExt, cc grpc.ClientConnInterface) *ServiceClient { +func NewServiceClient(b *net.BrokerExt, cc net.ClientConnInterface) *ServiceClient { return &ServiceClient{b, cc, pb.NewServiceClient(cc)} } @@ -41,7 +41,7 @@ func (s *ServiceClient) Close() error { defer cancel() _, err := s.grpc.Close(ctx, &emptypb.Empty{}) - return err + return errors.Join(err, s.cc.Close()) } func (s *ServiceClient) Ready() error { diff --git a/pkg/loop/internal/net/client.go b/pkg/loop/internal/net/client.go index 3807584082..b8d956873b 100644 --- a/pkg/loop/internal/net/client.go +++ b/pkg/loop/internal/net/client.go @@ -20,8 +20,28 @@ var _ ClientConnInterface = (*grpc.ClientConn)(nil) type ClientConnInterface interface { grpc.ClientConnInterface GetState() connectivity.State + Close() error } +func ClientConnInterfaceFromGRPC(conn grpc.ClientConnInterface) ClientConnInterface { + connCloser, ok := conn.(ClientConnInterface) + if !ok { + connCloser = &noopClientConnInterface{conn} + } + return connCloser +} + +// noopClientConnInterface adapts ClientConnInterface to implement net.ClientConnInterface with no-ops. +type noopClientConnInterface struct { + grpc.ClientConnInterface +} + +func (c *noopClientConnInterface) GetState() connectivity.State { + return connectivity.State(-1) +} + +func (*noopClientConnInterface) Close() error { return nil } + var _ ClientConnInterface = (*AtomicClient)(nil) // An AtomicClient implements [grpc.ClientConnInterface] and is backed by a swappable [*grpc.ClientConn]. @@ -29,6 +49,13 @@ type AtomicClient struct { cc atomic.Pointer[grpc.ClientConn] } +func (a *AtomicClient) Close() error { + if v := a.cc.Swap(nil); v != nil { + return (*v).Close() + } + return nil +} + func (a *AtomicClient) GetState() connectivity.State { return a.cc.Load().GetState() } @@ -61,6 +88,23 @@ type clientConn struct { cc *grpc.ClientConn } +func (c *clientConn) Close() error { + c.mu.Lock() + defer c.mu.Unlock() + return c.close() +} + +func (c *clientConn) close() error { + if c.cc != nil { + err := c.cc.Close() + c.CloseAll(c.deps...) + c.cc = nil + c.deps = nil + return err + } + return nil +} + func (c *clientConn) GetState() connectivity.State { c.mu.RLock() cc := c.cc @@ -127,11 +171,8 @@ func (c *clientConn) refresh(ctx context.Context, orig *grpc.ClientConn) (*grpc. if c.cc != orig { return c.cc, nil } - if c.cc != nil { - if err := c.cc.Close(); err != nil { - c.Logger.Errorw("Client close failed", "err", err) - } - c.CloseAll(c.deps...) + if err := c.close(); err != nil { + c.Logger.Errorw("Client close failed", "err", err) } try := func() error { diff --git a/pkg/loop/internal/relayer/pluginprovider/contractwriter/contract_writer.go b/pkg/loop/internal/relayer/pluginprovider/contractwriter/contract_writer.go index eca3a69555..5f09e5b4e3 100644 --- a/pkg/loop/internal/relayer/pluginprovider/contractwriter/contract_writer.go +++ b/pkg/loop/internal/relayer/pluginprovider/contractwriter/contract_writer.go @@ -25,7 +25,7 @@ type Client struct { encodeWith codecpb.EncodingVersion } -func NewClient(b *net.BrokerExt, cc grpc.ClientConnInterface, opts ...ClientOpt) *Client { +func NewClient(b *net.BrokerExt, cc net.ClientConnInterface, opts ...ClientOpt) *Client { client := &Client{ ServiceClient: goplugin.NewServiceClient(b, cc), grpc: pb.NewContractWriterClient(cc), diff --git a/pkg/loop/internal/relayer/pluginprovider/ext/ccip/commit_provider.go b/pkg/loop/internal/relayer/pluginprovider/ext/ccip/commit_provider.go index 4f9cd5294a..5eb11414e7 100644 --- a/pkg/loop/internal/relayer/pluginprovider/ext/ccip/commit_provider.go +++ b/pkg/loop/internal/relayer/pluginprovider/ext/ccip/commit_provider.go @@ -35,7 +35,7 @@ type CommitProviderClient struct { grpcClient ccippb.CommitCustomHandlersClient } -func NewCommitProviderClient(b *net.BrokerExt, conn grpc.ClientConnInterface) *CommitProviderClient { +func NewCommitProviderClient(b *net.BrokerExt, conn net.ClientConnInterface) *CommitProviderClient { pluginProviderClient := ocr2.NewPluginProviderClient(b, conn) client := ccippb.NewCommitCustomHandlersClient(conn) return &CommitProviderClient{ diff --git a/pkg/loop/internal/relayer/pluginprovider/ext/ccip/commit_store.go b/pkg/loop/internal/relayer/pluginprovider/ext/ccip/commit_store.go index 4b821417b5..6a30079c78 100644 --- a/pkg/loop/internal/relayer/pluginprovider/ext/ccip/commit_store.go +++ b/pkg/loop/internal/relayer/pluginprovider/ext/ccip/commit_store.go @@ -35,7 +35,7 @@ type CommitStoreGRPCClient struct { conn grpc.ClientConnInterface } -func NewCommitStoreReaderGRPCClient(brokerExt *net.BrokerExt, cc grpc.ClientConnInterface) *CommitStoreGRPCClient { +func NewCommitStoreReaderGRPCClient(brokerExt *net.BrokerExt, cc net.ClientConnInterface) *CommitStoreGRPCClient { return &CommitStoreGRPCClient{client: ccippb.NewCommitStoreReaderClient(cc), b: brokerExt, conn: cc} } diff --git a/pkg/loop/internal/relayer/pluginprovider/ext/ccip/execution_provider.go b/pkg/loop/internal/relayer/pluginprovider/ext/ccip/execution_provider.go index 7e862c99c4..d0c71f0129 100644 --- a/pkg/loop/internal/relayer/pluginprovider/ext/ccip/execution_provider.go +++ b/pkg/loop/internal/relayer/pluginprovider/ext/ccip/execution_provider.go @@ -35,7 +35,7 @@ type ExecProviderClient struct { grpcClient ccippb.ExecutionCustomHandlersClient } -func NewExecProviderClient(b *net.BrokerExt, conn grpc.ClientConnInterface) *ExecProviderClient { +func NewExecProviderClient(b *net.BrokerExt, conn net.ClientConnInterface) *ExecProviderClient { pluginProviderClient := ocr2.NewPluginProviderClient(b, conn) grpc := ccippb.NewExecutionCustomHandlersClient(conn) return &ExecProviderClient{ diff --git a/pkg/loop/internal/relayer/pluginprovider/ext/ccip/offramp.go b/pkg/loop/internal/relayer/pluginprovider/ext/ccip/offramp.go index e6f834bef8..2628641bbd 100644 --- a/pkg/loop/internal/relayer/pluginprovider/ext/ccip/offramp.go +++ b/pkg/loop/internal/relayer/pluginprovider/ext/ccip/offramp.go @@ -36,7 +36,7 @@ type OffRampReaderGRPCClient struct { // NewOffRampReaderGRPCClient creates a new OffRampReaderGRPCClient. It is used by the reporting plugin to call the offramp reader service. // The client is created by wrapping a grpc client connection. It requires a brokerExt to allocate and serve the gas estimator server. // *must* be the same broker used by the server BCF-3061 -func NewOffRampReaderGRPCClient(brokerExt *net.BrokerExt, cc grpc.ClientConnInterface) *OffRampReaderGRPCClient { +func NewOffRampReaderGRPCClient(brokerExt *net.BrokerExt, cc net.ClientConnInterface) *OffRampReaderGRPCClient { return &OffRampReaderGRPCClient{client: ccippb.NewOffRampReaderClient(cc), b: brokerExt, conn: cc} } diff --git a/pkg/loop/internal/relayer/pluginprovider/ext/ccip/test/commit_gas_estimator_test.go b/pkg/loop/internal/relayer/pluginprovider/ext/ccip/test/commit_gas_estimator_test.go index 9b15f4676e..2223aa7af6 100644 --- a/pkg/loop/internal/relayer/pluginprovider/ext/ccip/test/commit_gas_estimator_test.go +++ b/pkg/loop/internal/relayer/pluginprovider/ext/ccip/test/commit_gas_estimator_test.go @@ -74,6 +74,6 @@ func setupCommitGasEstimatorServer(t *testing.T, s *grpc.Server, b *loopnet.Brok } // adapt the client constructor so we can use it with the grpc scaffold -func setupCommitGasEstimatorClient(b *loopnet.BrokerExt, conn grpc.ClientConnInterface) *ccip.CommitGasEstimatorGRPCClient { +func setupCommitGasEstimatorClient(b *loopnet.BrokerExt, conn loopnet.ClientConnInterface) *ccip.CommitGasEstimatorGRPCClient { return ccip.NewCommitGasEstimatorGRPCClient(conn) } diff --git a/pkg/loop/internal/relayer/pluginprovider/ext/ccip/test/exec_gas_estimator_test.go b/pkg/loop/internal/relayer/pluginprovider/ext/ccip/test/exec_gas_estimator_test.go index 13b5175eb1..3fc2bcf2cd 100644 --- a/pkg/loop/internal/relayer/pluginprovider/ext/ccip/test/exec_gas_estimator_test.go +++ b/pkg/loop/internal/relayer/pluginprovider/ext/ccip/test/exec_gas_estimator_test.go @@ -74,7 +74,7 @@ func setupExecGasEstimatorServer(t *testing.T, s *grpc.Server, b *loopnet.Broker } // adapt the client constructor so we can use it with the grpc scaffold -func setupExecGasEstimatorClient(b *loopnet.BrokerExt, conn grpc.ClientConnInterface) *ccip.ExecGasEstimatorGRPCClient { +func setupExecGasEstimatorClient(b *loopnet.BrokerExt, conn loopnet.ClientConnInterface) *ccip.ExecGasEstimatorGRPCClient { return ccip.NewExecGasEstimatorGRPCClient(conn) } diff --git a/pkg/loop/internal/relayer/pluginprovider/ext/ccip/test/onramp_test.go b/pkg/loop/internal/relayer/pluginprovider/ext/ccip/test/onramp_test.go index ff1dac78bb..a8dd768612 100644 --- a/pkg/loop/internal/relayer/pluginprovider/ext/ccip/test/onramp_test.go +++ b/pkg/loop/internal/relayer/pluginprovider/ext/ccip/test/onramp_test.go @@ -98,6 +98,6 @@ func setupOnRampServer(t *testing.T, server *grpc.Server, b *loopnet.BrokerExt) return onRamp } -func setupOnRampClient(b *loopnet.BrokerExt, conn grpc.ClientConnInterface) *ccip.OnRampReaderGRPCClient { +func setupOnRampClient(b *loopnet.BrokerExt, conn loopnet.ClientConnInterface) *ccip.OnRampReaderGRPCClient { return ccip.NewOnRampReaderGRPCClient(conn) } diff --git a/pkg/loop/internal/relayer/pluginprovider/ext/ccip/test/price_registry_test.go b/pkg/loop/internal/relayer/pluginprovider/ext/ccip/test/price_registry_test.go index 9918f07a62..849bab1a28 100644 --- a/pkg/loop/internal/relayer/pluginprovider/ext/ccip/test/price_registry_test.go +++ b/pkg/loop/internal/relayer/pluginprovider/ext/ccip/test/price_registry_test.go @@ -107,6 +107,6 @@ func setupPriceRegistryServer(t *testing.T, server *grpc.Server, b *loopnet.Brok } // wrapper to enable use of the grpc scaffold -func setupPriceRegistryClient(b *loopnet.BrokerExt, conn grpc.ClientConnInterface) *ccip.PriceRegistryGRPCClient { +func setupPriceRegistryClient(b *loopnet.BrokerExt, conn loopnet.ClientConnInterface) *ccip.PriceRegistryGRPCClient { return ccip.NewPriceRegistryGRPCClient(conn) } diff --git a/pkg/loop/internal/relayer/pluginprovider/ext/ccip/test/pricegetter_test.go b/pkg/loop/internal/relayer/pluginprovider/ext/ccip/test/pricegetter_test.go index aa37b8a4dc..d6dad02d12 100644 --- a/pkg/loop/internal/relayer/pluginprovider/ext/ccip/test/pricegetter_test.go +++ b/pkg/loop/internal/relayer/pluginprovider/ext/ccip/test/pricegetter_test.go @@ -70,7 +70,7 @@ func setupPriceGetterServer(t *testing.T, s *grpc.Server, b *loopnet.BrokerExt) return priceGetter } -func setupPriceGetterClient(b *loopnet.BrokerExt, conn grpc.ClientConnInterface) *ccip.PriceGetterGRPCClient { +func setupPriceGetterClient(b *loopnet.BrokerExt, conn loopnet.ClientConnInterface) *ccip.PriceGetterGRPCClient { return ccip.NewPriceGetterGRPCClient(conn) } diff --git a/pkg/loop/internal/relayer/pluginprovider/ext/ccip/test/token_data_test.go b/pkg/loop/internal/relayer/pluginprovider/ext/ccip/test/token_data_test.go index e3c7c0583b..28ae126743 100644 --- a/pkg/loop/internal/relayer/pluginprovider/ext/ccip/test/token_data_test.go +++ b/pkg/loop/internal/relayer/pluginprovider/ext/ccip/test/token_data_test.go @@ -58,7 +58,7 @@ func setupTokenDataServer(t *testing.T, s *grpc.Server, b *loopnet.BrokerExt) *c return tokenData } -func setupTokenDataClient(b *loopnet.BrokerExt, conn grpc.ClientConnInterface) *ccip.TokenDataReaderGRPCClient { +func setupTokenDataClient(b *loopnet.BrokerExt, conn loopnet.ClientConnInterface) *ccip.TokenDataReaderGRPCClient { return ccip.NewTokenDataReaderGRPCClient(conn) } diff --git a/pkg/loop/internal/relayer/pluginprovider/ext/ccip/test/token_pool_test.go b/pkg/loop/internal/relayer/pluginprovider/ext/ccip/test/token_pool_test.go index 443bf95b1b..58875f5cfa 100644 --- a/pkg/loop/internal/relayer/pluginprovider/ext/ccip/test/token_pool_test.go +++ b/pkg/loop/internal/relayer/pluginprovider/ext/ccip/test/token_pool_test.go @@ -58,7 +58,7 @@ func setupTokenPoolServer(t *testing.T, s *grpc.Server, b *loopnet.BrokerExt) *c return tokenPool } -func setupTokenPoolClient(b *loopnet.BrokerExt, conn grpc.ClientConnInterface) *ccip.TokenPoolBatchedReaderGRPCClient { +func setupTokenPoolClient(b *loopnet.BrokerExt, conn loopnet.ClientConnInterface) *ccip.TokenPoolBatchedReaderGRPCClient { return ccip.NewTokenPoolBatchedReaderGRPCClient(conn) } diff --git a/pkg/loop/internal/relayer/pluginprovider/ext/ccipocr3/ccip_provider.go b/pkg/loop/internal/relayer/pluginprovider/ext/ccipocr3/ccip_provider.go index cbb21d2ee5..f27c427115 100644 --- a/pkg/loop/internal/relayer/pluginprovider/ext/ccipocr3/ccip_provider.go +++ b/pkg/loop/internal/relayer/pluginprovider/ext/ccipocr3/ccip_provider.go @@ -32,7 +32,7 @@ type CCIPProviderClient struct { messageHasher ccipocr3.MessageHasher } -func NewCCIPProviderClient(b *net.BrokerExt, cc grpc.ClientConnInterface) *CCIPProviderClient { +func NewCCIPProviderClient(b *net.BrokerExt, cc net.ClientConnInterface) *CCIPProviderClient { c := &CCIPProviderClient{ ServiceClient: goplugin.NewServiceClient(b.WithName("CCIPProviderClient"), cc), } @@ -87,7 +87,7 @@ type CCIPProviderServer struct{} func (s CCIPProviderServer) ConnToProvider(conn grpc.ClientConnInterface, broker net.Broker, brokerCfg net.BrokerConfig) types.CCIPProvider { be := &net.BrokerExt{Broker: broker, BrokerConfig: brokerCfg} - return NewCCIPProviderClient(be, conn) + return NewCCIPProviderClient(be, net.ClientConnInterfaceFromGRPC(conn)) } func RegisterProviderServices(s *grpc.Server, provider types.CCIPProvider) { diff --git a/pkg/loop/internal/relayer/pluginprovider/ext/median/median.go b/pkg/loop/internal/relayer/pluginprovider/ext/median/median.go index 24cbbcff71..a935533f8d 100644 --- a/pkg/loop/internal/relayer/pluginprovider/ext/median/median.go +++ b/pkg/loop/internal/relayer/pluginprovider/ext/median/median.go @@ -39,7 +39,7 @@ type ProviderClient struct { codec types.Codec } -func NewProviderClient(b *net.BrokerExt, cc grpc.ClientConnInterface) *ProviderClient { +func NewProviderClient(b *net.BrokerExt, cc net.ClientConnInterface) *ProviderClient { m := &ProviderClient{PluginProviderClient: ocr2.NewPluginProviderClient(b.WithName("MedianProviderClient"), cc)} m.reportCodec = &reportCodecClient{b, pb.NewReportCodecClient(cc)} m.medianContract = &medianContractClient{pb.NewMedianContractClient(cc)} @@ -313,7 +313,7 @@ type ProviderServer struct{} func (m ProviderServer) ConnToProvider(conn grpc.ClientConnInterface, broker net.Broker, brokerCfg net.BrokerConfig) types.MedianProvider { be := &net.BrokerExt{Broker: broker, BrokerConfig: brokerCfg} - pc := NewProviderClient(be, conn) + pc := NewProviderClient(be, net.ClientConnInterfaceFromGRPC(conn)) pc.RmUnimplemented(context.Background()) return pc } diff --git a/pkg/loop/internal/relayer/pluginprovider/ext/mercury/mercury.go b/pkg/loop/internal/relayer/pluginprovider/ext/mercury/mercury.go index 6c7a7b14bb..455ea65b21 100644 --- a/pkg/loop/internal/relayer/pluginprovider/ext/mercury/mercury.go +++ b/pkg/loop/internal/relayer/pluginprovider/ext/mercury/mercury.go @@ -40,7 +40,7 @@ type ProviderClient struct { mercuryChainReader mercury.ChainReader } -func NewProviderClient(b *net.BrokerExt, cc grpc.ClientConnInterface) *ProviderClient { +func NewProviderClient(b *net.BrokerExt, cc net.ClientConnInterface) *ProviderClient { m := &ProviderClient{PluginProviderClient: ocr2.NewPluginProviderClient(b.WithName("MercuryProviderClient"), cc)} m.reportCodecV1 = newReportCodecV1Client(mercury_v1_internal.NewReportCodecClient(cc)) diff --git a/pkg/loop/internal/relayer/pluginprovider/ext/ocr3capability/capability.go b/pkg/loop/internal/relayer/pluginprovider/ext/ocr3capability/capability.go index 9dc9f4b841..8adf90ffef 100644 --- a/pkg/loop/internal/relayer/pluginprovider/ext/ocr3capability/capability.go +++ b/pkg/loop/internal/relayer/pluginprovider/ext/ocr3capability/capability.go @@ -29,7 +29,7 @@ func (p *ProviderClient) OCR3ContractTransmitter() ocr3types.ContractTransmitter return p.ocr3ContractTransmitter } -func NewProviderClient(b *net.BrokerExt, cc grpc.ClientConnInterface) *ProviderClient { +func NewProviderClient(b *net.BrokerExt, cc net.ClientConnInterface) *ProviderClient { m := &ProviderClient{ PluginProviderClient: ocr2.NewPluginProviderClient(b.WithName("OCR3CapabilityProviderClient"), cc), ocr3ContractTransmitter: ocr3.NewContractTransmitterClient(b.WithName("OCR3ContractTransmitter"), cc), @@ -42,7 +42,7 @@ type ProviderServer struct{} func (m ProviderServer) ConnToProvider(conn grpc.ClientConnInterface, broker net.Broker, brokerCfg net.BrokerConfig) types.OCR3CapabilityProvider { be := &net.BrokerExt{Broker: broker, BrokerConfig: brokerCfg} - return NewProviderClient(be, conn) + return NewProviderClient(be, net.ClientConnInterfaceFromGRPC(conn)) } func RegisterProviderServices(s *grpc.Server, provider types.OCR3CapabilityProvider) { diff --git a/pkg/loop/internal/relayer/pluginprovider/ocr2/config.go b/pkg/loop/internal/relayer/pluginprovider/ocr2/config.go index 783dd81641..99cfd3ce60 100644 --- a/pkg/loop/internal/relayer/pluginprovider/ocr2/config.go +++ b/pkg/loop/internal/relayer/pluginprovider/ocr2/config.go @@ -26,7 +26,7 @@ type ConfigProviderClient struct { contractTracker libocr.ContractConfigTracker } -func NewConfigProviderClient(b *net.BrokerExt, cc grpc.ClientConnInterface) *ConfigProviderClient { +func NewConfigProviderClient(b *net.BrokerExt, cc net.ClientConnInterface) *ConfigProviderClient { c := &ConfigProviderClient{ServiceClient: goplugin.NewServiceClient(b, cc)} c.offchainDigester = &offchainConfigDigesterClient{b, pb.NewOffchainConfigDigesterClient(cc)} c.contractTracker = &contractConfigTrackerClient{pb.NewContractConfigTrackerClient(cc)} diff --git a/pkg/loop/internal/relayer/pluginprovider/ocr2/plugin_provider.go b/pkg/loop/internal/relayer/pluginprovider/ocr2/plugin_provider.go index ab777e4ba4..828b9d084d 100644 --- a/pkg/loop/internal/relayer/pluginprovider/ocr2/plugin_provider.go +++ b/pkg/loop/internal/relayer/pluginprovider/ocr2/plugin_provider.go @@ -1,9 +1,8 @@ package ocr2 import ( - "google.golang.org/grpc" - libocr "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + "google.golang.org/grpc" "github.com/smartcontractkit/chainlink-common/pkg/loop/internal/goplugin" "github.com/smartcontractkit/chainlink-common/pkg/loop/internal/net" @@ -24,7 +23,7 @@ var _ types.PluginProvider = (*PluginProviderClient)(nil) // in practice, inherited from configProviderClient. var _ goplugin.GRPCClientConn = (*PluginProviderClient)(nil) -func NewPluginProviderClient(b *net.BrokerExt, cc grpc.ClientConnInterface) *PluginProviderClient { +func NewPluginProviderClient(b *net.BrokerExt, cc net.ClientConnInterface) *PluginProviderClient { p := &PluginProviderClient{ConfigProviderClient: NewConfigProviderClient(b.WithName("PluginProviderClient"), cc)} p.contractTransmitter = &contractTransmitterClient{b, pb.NewContractTransmitterClient(cc)} p.contractReader = contractreader.NewClient(goplugin.NewServiceClient(b, cc), pb.NewContractReaderClient(cc)) @@ -48,5 +47,5 @@ type PluginProviderServer struct{} func (p PluginProviderServer) ConnToProvider(conn grpc.ClientConnInterface, broker net.Broker, brokerCfg net.BrokerConfig) types.PluginProvider { be := &net.BrokerExt{Broker: broker, BrokerConfig: brokerCfg} - return NewPluginProviderClient(be, conn) + return NewPluginProviderClient(be, net.ClientConnInterfaceFromGRPC(conn)) } diff --git a/pkg/loop/internal/relayer/relayer.go b/pkg/loop/internal/relayer/relayer.go index e9f6c68294..47464aecfa 100644 --- a/pkg/loop/internal/relayer/relayer.go +++ b/pkg/loop/internal/relayer/relayer.go @@ -190,7 +190,7 @@ type relayerClient struct { aptosClient aptospb.AptosClient } -func newRelayerClient(b *net.BrokerExt, conn grpc.ClientConnInterface) *relayerClient { +func newRelayerClient(b *net.BrokerExt, conn net.ClientConnInterface) *relayerClient { b = b.WithName("RelayerClient") return &relayerClient{ b, goplugin.NewServiceClient(b, conn), @@ -276,7 +276,7 @@ type PluginProviderClient interface { goplugin.GRPCClientConn } -func WrapProviderClientConnection(ctx context.Context, providerType string, cc grpc.ClientConnInterface, broker *net.BrokerExt) (PluginProviderClient, error) { +func WrapProviderClientConnection(ctx context.Context, providerType string, cc net.ClientConnInterface, broker *net.BrokerExt) (PluginProviderClient, error) { // TODO: Remove this when we have fully transitioned all relayers to running in LOOPPs. // This allows callers to type assert a PluginProvider into a product provider type (eg. MedianProvider) // for interoperability with legacy code. diff --git a/pkg/loop/internal/relayerset/client.go b/pkg/loop/internal/relayerset/client.go index 3c858080c8..f2678586ee 100644 --- a/pkg/loop/internal/relayerset/client.go +++ b/pkg/loop/internal/relayerset/client.go @@ -5,8 +5,6 @@ import ( "errors" "fmt" - "google.golang.org/grpc" - "github.com/smartcontractkit/chainlink-common/pkg/chains/aptos" "github.com/smartcontractkit/chainlink-common/pkg/chains/evm" "github.com/smartcontractkit/chainlink-common/pkg/chains/solana" @@ -36,7 +34,7 @@ type Client struct { aptosRelayerSetClient aptos.AptosClient } -func NewRelayerSetClient(log logger.Logger, b *net.BrokerExt, conn grpc.ClientConnInterface) *Client { +func NewRelayerSetClient(log logger.Logger, b *net.BrokerExt, conn net.ClientConnInterface) *Client { b = b.WithName("ChainRelayerClient") return &Client{ log: log, diff --git a/pkg/loop/internal/reportingplugin/ccip/commit.go b/pkg/loop/internal/reportingplugin/ccip/commit.go index 515de52bfb..ecbd0b4d90 100644 --- a/pkg/loop/internal/reportingplugin/ccip/commit.go +++ b/pkg/loop/internal/reportingplugin/ccip/commit.go @@ -73,7 +73,7 @@ func (c *CommitLOOPClient) NewCommitFactory(ctx context.Context, provider types. ProviderServiceId: providerID, }) if err != nil { - return 0, nil, err + return 0, deps, err } return resp.CommitFactoryServiceId, deps, nil } diff --git a/pkg/loop/internal/reportingplugin/ccip/execution.go b/pkg/loop/internal/reportingplugin/ccip/execution.go index 5024a58773..d588db3ac5 100644 --- a/pkg/loop/internal/reportingplugin/ccip/execution.go +++ b/pkg/loop/internal/reportingplugin/ccip/execution.go @@ -83,7 +83,7 @@ func (c *ExecutionLOOPClient) NewExecutionFactory(ctx context.Context, srcProvid }) } if err != nil { - return 0, nil, err + return 0, deps, err } deps.Add(dstProviderResource) @@ -95,7 +95,7 @@ func (c *ExecutionLOOPClient) NewExecutionFactory(ctx context.Context, srcProvid SrcTokenAddress: srcTokenAddress, }) if err != nil { - return 0, nil, err + return 0, deps, err } return resp.ExecutionFactoryServiceId, deps, nil } diff --git a/pkg/loop/internal/reportingplugin/median/median.go b/pkg/loop/internal/reportingplugin/median/median.go index 5fc7820fae..5cc49df628 100644 --- a/pkg/loop/internal/reportingplugin/median/median.go +++ b/pkg/loop/internal/reportingplugin/median/median.go @@ -50,7 +50,7 @@ func (m *PluginMedianClient) NewMedianFactory(ctx context.Context, provider type pb.RegisterDataSourceServer(s, newDataSourceServer(juelsPerFeeCoin)) }) if err != nil { - return 0, nil, err + return 0, deps, err } deps.Add(juelsPerFeeCoinDataSourceRes) @@ -58,7 +58,7 @@ func (m *PluginMedianClient) NewMedianFactory(ctx context.Context, provider type pb.RegisterDataSourceServer(s, newDataSourceServer(gasPriceSubunits)) }) if err != nil { - return 0, nil, err + return 0, deps, err } deps.Add(gasPriceSubunitsDataSourceRes) @@ -74,7 +74,7 @@ func (m *PluginMedianClient) NewMedianFactory(ctx context.Context, provider type }) } if err != nil { - return 0, nil, err + return 0, deps, err } deps.Add(providerRes) @@ -82,7 +82,7 @@ func (m *PluginMedianClient) NewMedianFactory(ctx context.Context, provider type pb.RegisterErrorLogServer(s, errorlog.NewServer(errorLog)) }) if err != nil { - return 0, nil, err + return 0, deps, err } deps.Add(errorLogRes) @@ -90,7 +90,7 @@ func (m *PluginMedianClient) NewMedianFactory(ctx context.Context, provider type if deviationFuncDefinition != nil { deviationFuncDefinitionJSON, err = json.Marshal(deviationFuncDefinition) if err != nil { - return 0, nil, fmt.Errorf("failed to marshal deviationFuncDefinition: %w", err) + return 0, deps, fmt.Errorf("failed to marshal deviationFuncDefinition: %w", err) } } @@ -104,9 +104,9 @@ func (m *PluginMedianClient) NewMedianFactory(ctx context.Context, provider type DeviationFuncDefinition: deviationFuncDefinitionJSON, }) if err != nil { - return 0, nil, err + return 0, deps, err } - return reply.ReportingPluginFactoryID, nil, nil + return reply.ReportingPluginFactoryID, deps, nil }) return ocr2.NewReportingPluginFactoryClient(m.PluginClient.BrokerExt, cc), nil } diff --git a/pkg/loop/internal/reportingplugin/mercury/mercury.go b/pkg/loop/internal/reportingplugin/mercury/mercury.go index 07cde5774d..69963da8f7 100644 --- a/pkg/loop/internal/reportingplugin/mercury/mercury.go +++ b/pkg/loop/internal/reportingplugin/mercury/mercury.go @@ -76,7 +76,7 @@ func (c *AdapterClient) NewMercuryV1Factory(ctx context.Context, }) } if err != nil { - return 0, nil, err + return 0, deps, err } deps.Add(providerRes) @@ -85,7 +85,7 @@ func (c *AdapterClient) NewMercuryV1Factory(ctx context.Context, DataSourceV1ID: dataSourceID, }) if err != nil { - return 0, nil, err + return 0, deps, err } return reply.MercuryV1FactoryID, deps, nil } @@ -124,7 +124,7 @@ func (c *AdapterClient) NewMercuryV2Factory(ctx context.Context, }) } if err != nil { - return 0, nil, err + return 0, deps, err } deps.Add(providerRes) @@ -133,7 +133,7 @@ func (c *AdapterClient) NewMercuryV2Factory(ctx context.Context, DataSourceV2ID: dataSourceID, }) if err != nil { - return 0, nil, err + return 0, deps, err } return reply.MercuryV2FactoryID, deps, nil } @@ -174,7 +174,7 @@ func (c *AdapterClient) NewMercuryV3Factory(ctx context.Context, }) } if err != nil { - return 0, nil, err + return 0, deps, err } deps.Add(providerRes) @@ -183,7 +183,7 @@ func (c *AdapterClient) NewMercuryV3Factory(ctx context.Context, DataSourceV3ID: dataSourceID, }) if err != nil { - return 0, nil, err + return 0, deps, err } return reply.MercuryV3FactoryID, deps, nil } @@ -224,7 +224,7 @@ func (c *AdapterClient) NewMercuryV4Factory(ctx context.Context, }) } if err != nil { - return 0, nil, err + return 0, deps, err } deps.Add(providerRes) @@ -233,7 +233,7 @@ func (c *AdapterClient) NewMercuryV4Factory(ctx context.Context, DataSourceV4ID: dataSourceID, }) if err != nil { - return 0, nil, err + return 0, deps, err } return reply.MercuryV4FactoryID, deps, nil } diff --git a/pkg/loop/internal/reportingplugin/mercury/mercury_reporting.go b/pkg/loop/internal/reportingplugin/mercury/mercury_reporting.go index fd8bab4226..87d9fea1e2 100644 --- a/pkg/loop/internal/reportingplugin/mercury/mercury_reporting.go +++ b/pkg/loop/internal/reportingplugin/mercury/mercury_reporting.go @@ -22,7 +22,7 @@ type PluginFactoryClient struct { client mercurypb.MercuryPluginFactoryClient } -func NewPluginFactoryClient(b *net.BrokerExt, cc grpc.ClientConnInterface) *PluginFactoryClient { +func NewPluginFactoryClient(b *net.BrokerExt, cc net.ClientConnInterface) *PluginFactoryClient { b = b.WithName("MercuryPluginProviderClient") return &PluginFactoryClient{b, goplugin.NewServiceClient(b, cc), mercurypb.NewMercuryPluginFactoryClient(cc)} } diff --git a/pkg/loop/internal/test/grpc_scaffold.go b/pkg/loop/internal/test/grpc_scaffold.go index 06422254bc..69c31fde97 100644 --- a/pkg/loop/internal/test/grpc_scaffold.go +++ b/pkg/loop/internal/test/grpc_scaffold.go @@ -101,7 +101,7 @@ type SetupGRPCServer[S any] func(t *testing.T, s *grpc.Server, b *loopnet.Broker // SetupGRPCClient is a function that sets up a grpc client with a given broker and connection // analogous to SetupGRPCServer. Typically it is implemented as a light wrapper around the grpc client constructor -type SetupGRPCClient[T Client] func(b *loopnet.BrokerExt, conn grpc.ClientConnInterface) T +type SetupGRPCClient[T Client] func(b *loopnet.BrokerExt, conn loopnet.ClientConnInterface) T // MockDep is a mock dependency that can be used to test that a grpc client closes its dependencies // to be used in tests that require a grpc client to close its dependencies diff --git a/pkg/loop/relayer_service_test.go b/pkg/loop/relayer_service_test.go index 394b87d29b..878d171447 100644 --- a/pkg/loop/relayer_service_test.go +++ b/pkg/loop/relayer_service_test.go @@ -39,7 +39,8 @@ var relayerServiceNames = []string{ } func TestRelayerService(t *testing.T) { - t.Parallel() + tests.VerifyNoLeaks(t) + capRegistry := mocks.NewCapabilitiesRegistry(t) relayer := loop.NewRelayerService(logger.Test(t), loop.GRPCOpts{}, func() *exec.Cmd { return NewHelperProcessCommand(loop.PluginRelayerName, false, 0) @@ -56,7 +57,7 @@ func TestRelayerService(t *testing.T) { hook.Kill() // wait for relaunch - time.Sleep(2 * goplugin.KeepAliveTickDuration) + time.Sleep(goplugin.KeepAliveTickDuration) relayertest.Run(t, relayer) servicetest.AssertHealthReportNames(t, relayer.HealthReport(), relayerServiceNames...) @@ -67,7 +68,7 @@ func TestRelayerService(t *testing.T) { hook.Reset() // wait for relaunch - time.Sleep(2 * goplugin.KeepAliveTickDuration) + time.Sleep(goplugin.KeepAliveTickDuration) relayertest.Run(t, relayer) servicetest.AssertHealthReportNames(t, relayer.HealthReport(), relayerServiceNames...) diff --git a/pkg/loop/reportingplugins/grpc.go b/pkg/loop/reportingplugins/grpc.go index 172ee12d26..c16cf61ad6 100644 --- a/pkg/loop/reportingplugins/grpc.go +++ b/pkg/loop/reportingplugins/grpc.go @@ -50,7 +50,7 @@ type serverAdapter struct { NewReportingPluginFactoryFn func( ctx context.Context, config core.ReportingPluginServiceConfig, - conn grpc.ClientConnInterface, + conn net.ClientConnInterface, pr core.PipelineRunnerService, ts core.TelemetryService, errorLog core.ErrorLog, @@ -79,14 +79,14 @@ func (s serverAdapter) NewReportingPluginFactory( kv core.KeyValueStore, rs core.RelayerSet, ) (types.ReportingPluginFactory, error) { - return s.NewReportingPluginFactoryFn(ctx, config, conn, pr, ts, errorLog, kv, rs) + return s.NewReportingPluginFactoryFn(ctx, config, net.ClientConnInterfaceFromGRPC(conn), pr, ts, errorLog, kv, rs) } func (g *GRPCService[T]) GRPCServer(broker *plugin.GRPCBroker, server *grpc.Server) error { newReportingPluginFactoryFn := func( ctx context.Context, cfg core.ReportingPluginServiceConfig, - conn grpc.ClientConnInterface, + conn net.ClientConnInterface, pr core.PipelineRunnerService, ts core.TelemetryService, el core.ErrorLog, diff --git a/pkg/loop/reportingplugins/ocr3/grpc.go b/pkg/loop/reportingplugins/ocr3/grpc.go index 8bb18dd6bf..c56f558ad1 100644 --- a/pkg/loop/reportingplugins/ocr3/grpc.go +++ b/pkg/loop/reportingplugins/ocr3/grpc.go @@ -37,7 +37,7 @@ type serverAdapter struct { NewReportingPluginFactoryFn func( context.Context, core.ReportingPluginServiceConfig, - grpc.ClientConnInterface, + net.ClientConnInterface, core.PipelineRunnerService, core.TelemetryService, core.ErrorLog, @@ -69,14 +69,14 @@ func (s serverAdapter) NewReportingPluginFactory( kv core.KeyValueStore, rs core.RelayerSet, ) (core.OCR3ReportingPluginFactory, error) { - return s.NewReportingPluginFactoryFn(ctx, config, conn, pr, ts, errorLog, capRegistry, kv, rs) + return s.NewReportingPluginFactoryFn(ctx, config, net.ClientConnInterfaceFromGRPC(conn), pr, ts, errorLog, capRegistry, kv, rs) } func (g *GRPCService[T]) GRPCServer(broker *plugin.GRPCBroker, server *grpc.Server) error { newReportingPluginFactoryFn := func( ctx context.Context, cfg core.ReportingPluginServiceConfig, - conn grpc.ClientConnInterface, + conn net.ClientConnInterface, pr core.PipelineRunnerService, ts core.TelemetryService, el core.ErrorLog, diff --git a/pkg/utils/tests/leak.go b/pkg/utils/tests/leak.go new file mode 100644 index 0000000000..b4b68bdd4b --- /dev/null +++ b/pkg/utils/tests/leak.go @@ -0,0 +1,25 @@ +package tests + +import ( + "math/rand/v2" + "strconv" + "testing" + + "go.uber.org/goleak" +) + +// VerifyNoLeaks verifies that the test does not leak any goroutines. +// Must be called at the start of the test, before any goroutines have spawned. +// Cannot be used from parallel tests. +func VerifyNoLeaks(t testing.TB) { + // Set a random environment variable to trigger testing.checkParallel() + t.Setenv(strconv.Itoa(rand.Int()), strconv.Itoa(rand.Int())) + current := goleak.IgnoreCurrent() + t.Cleanup(func() { + if t.Failed() { + t.Log("Test failed - skipping goroutine leak check") + return + } + goleak.VerifyNone(t, current) + }) +} From 16734b25488d63fe735edb899b35fb22c52f112e Mon Sep 17 00:00:00 2001 From: Jordan Krage Date: Tue, 28 Apr 2026 14:25:32 -0500 Subject: [PATCH 4/4] pkg/loop/internal/net: classify Cenceled as non terminal (#2009) --- pkg/loop/internal/net/errors.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/loop/internal/net/errors.go b/pkg/loop/internal/net/errors.go index 47bb892e94..b295e5a4d9 100644 --- a/pkg/loop/internal/net/errors.go +++ b/pkg/loop/internal/net/errors.go @@ -41,11 +41,11 @@ func (e ErrConnDial) Unwrap() error { // must be refreshed. func isErrTerminal(err error) bool { switch status.Code(err) { - case codes.Unavailable, codes.Canceled: + case codes.Unavailable: return true - case codes.OK, codes.Unknown, codes.InvalidArgument, codes.DeadlineExceeded, codes.NotFound, codes.AlreadyExists, - codes.PermissionDenied, codes.ResourceExhausted, codes.FailedPrecondition, codes.Aborted, codes.OutOfRange, - codes.Unimplemented, codes.Internal, codes.DataLoss, codes.Unauthenticated: + case codes.OK, codes.Unknown, codes.InvalidArgument, codes.Canceled, codes.DeadlineExceeded, codes.NotFound, + codes.AlreadyExists, codes.PermissionDenied, codes.ResourceExhausted, codes.FailedPrecondition, codes.Aborted, + codes.OutOfRange, codes.Unimplemented, codes.Internal, codes.DataLoss, codes.Unauthenticated: return false } return false