Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/google/uuid v1.6.0
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1
github.com/hashicorp/go-hclog v1.6.3
github.com/hashicorp/go-plugin v1.7.0
github.com/hashicorp/go-plugin v1.8.0
github.com/iancoleman/strcase v0.3.0
github.com/invopop/jsonschema v0.13.0
github.com/jackc/pgx/v4 v4.18.3
Expand Down Expand Up @@ -67,6 +67,7 @@ require (
go.opentelemetry.io/otel/sdk/log v0.15.0
go.opentelemetry.io/otel/sdk/metric v1.39.0
go.opentelemetry.io/otel/trace v1.39.0
go.uber.org/goleak v1.3.0
go.uber.org/zap v1.27.1
golang.org/x/crypto v0.47.0
golang.org/x/exp v0.0.0-20260112195511-716be5621a96
Expand Down
4 changes: 2 additions & 2 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/loop/internal/core/services/oraclefactory/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type client struct {
serviceClient *goplugin.ServiceClient
}

func NewClient(log logger.Logger, b *net.BrokerExt, conn grpc.ClientConnInterface) *client {
func NewClient(log logger.Logger, b *net.BrokerExt, conn net.ClientConnInterface) *client {
b = b.WithName("OracleFactoryClient")
return &client{
log: log,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type ReportingPluginFactoryClient struct {
grpc pb.ReportingPluginFactoryClient
}

func NewReportingPluginFactoryClient(b *net.BrokerExt, cc grpc.ClientConnInterface) *ReportingPluginFactoryClient {
func NewReportingPluginFactoryClient(b *net.BrokerExt, cc net.ClientConnInterface) *ReportingPluginFactoryClient {
b = b.WithName("ReportingPluginProviderClient")
return &ReportingPluginFactoryClient{
BrokerExt: b,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,23 +59,23 @@ func (m *ReportingPluginServiceClient) NewReportingPluginFactory(
pb.RegisterPipelineRunnerServiceServer(s, pipeline.NewRunnerServer(pipelineRunner))
})
if err != nil {
return 0, nil, err
return 0, deps, err
}
deps.Add(pipelineRunnerRes)

telemetryID, telemetryRes, err := m.ServeNew("Telemetry", func(s *grpc.Server) {
pb.RegisterTelemetryServer(s, telemetry.NewTelemetryServer(telemetryService))
})
if err != nil {
return 0, nil, err
return 0, deps, err
}
deps.Add(telemetryRes)

errorLogID, errorLogRes, err := m.ServeNew("ErrorLog", func(s *grpc.Server) {
pb.RegisterErrorLogServer(s, errorlog.NewServer(errorLog))
})
if err != nil {
return 0, nil, err
return 0, deps, err
}
deps.Add(errorLogRes)

Expand All @@ -84,22 +84,22 @@ func (m *ReportingPluginServiceClient) NewReportingPluginFactory(
})

if err != nil {
return 0, nil, fmt.Errorf("failed to serve new key value store: %w", err)
return 0, deps, fmt.Errorf("failed to serve new key value store: %w", err)
}

deps.Add(keyValueStoreRes)

relayerSetServer, relayerSetServerRes := relayerset.NewRelayerSetServer(m.Logger, relayerSet, m.BrokerExt)
if err != nil {
return 0, nil, fmt.Errorf("failed to create new relayer set: %w", err)
return 0, deps, fmt.Errorf("failed to create new relayer set: %w", err)
}

relayerSetID, relayerSetRes, err := m.ServeNew("RelayerSet", func(s *grpc.Server) {
relayersetpb.RegisterRelayerSetServerWithDependants(s, relayerSetServer)
})

if err != nil {
return 0, nil, fmt.Errorf("failed to serve new relayer set: %w", err)
return 0, deps, fmt.Errorf("failed to serve new relayer set: %w", err)
}

deps.Add(relayerSetRes)
Expand All @@ -121,9 +121,9 @@ func (m *ReportingPluginServiceClient) NewReportingPluginFactory(
RelayerSetID: relayerSetID,
})
if err != nil {
return 0, nil, err
return 0, deps, err
}
return reply.ID, nil, nil
return reply.ID, deps, nil
})
return NewReportingPluginFactoryClient(m.PluginClient.BrokerExt, cc), nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package ocr3

import (
"context"
"errors"
"io"
"math"
"time"

Expand All @@ -24,7 +26,7 @@ type reportingPluginFactoryClient struct {
grpc ocr3.ReportingPluginFactoryClient
}

func NewReportingPluginFactoryClient(b *net.BrokerExt, cc grpc.ClientConnInterface) *reportingPluginFactoryClient {
func NewReportingPluginFactoryClient(b *net.BrokerExt, cc net.ClientConnInterface) *reportingPluginFactoryClient {
b = b.WithName("OCR3ReportingPluginProviderClient")
return &reportingPluginFactoryClient{b, goplugin.NewServiceClient(b, cc), ocr3.NewReportingPluginFactoryClient(cc)}
}
Expand Down Expand Up @@ -126,6 +128,7 @@ var _ ocr3types.ReportingPlugin[[]byte] = (*reportingPluginClient)(nil)

type reportingPluginClient struct {
*net.BrokerExt
cc io.Closer // backing client connection
grpc ocr3.ReportingPluginClient
}

Expand Down Expand Up @@ -221,11 +224,11 @@ func (o *reportingPluginClient) Close() error {
defer cancel()

_, err := o.grpc.Close(ctx, &emptypb.Empty{})
return err
return errors.Join(err, o.cc.Close())
}

func newReportingPluginClient(b *net.BrokerExt, cc grpc.ClientConnInterface) *reportingPluginClient {
return &reportingPluginClient{b.WithName("OCR3ReportingPluginClient"), ocr3.NewReportingPluginClient(cc)}
func newReportingPluginClient(b *net.BrokerExt, cc net.ClientConnInterface) *reportingPluginClient {
return &reportingPluginClient{b.WithName("OCR3ReportingPluginClient"), cc, ocr3.NewReportingPluginClient(cc)}
}

var _ ocr3.ReportingPluginServer = (*reportingPluginServer)(nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,39 +59,39 @@ func (o *ReportingPluginServiceClient) NewReportingPluginFactory(
pb.RegisterPipelineRunnerServiceServer(s, pipeline.NewRunnerServer(pipelineRunner))
})
if err != nil {
return 0, nil, err
return 0, deps, err
}
deps.Add(pipelineRunnerRes)

telemetryID, telemetryRes, err := o.ServeNew("Telemetry", func(s *grpc.Server) {
pb.RegisterTelemetryServer(s, telemetry.NewTelemetryServer(telemetryService))
})
if err != nil {
return 0, nil, err
return 0, deps, err
}
deps.Add(telemetryRes)

errorLogID, errorLogRes, err := o.ServeNew("ErrorLog", func(s *grpc.Server) {
pb.RegisterErrorLogServer(s, errorlog.NewServer(errorLog))
})
if err != nil {
return 0, nil, err
return 0, deps, err
}
deps.Add(errorLogRes)

capRegistryID, capRegistryRes, err := o.ServeNew("CapRegistry", func(s *grpc.Server) {
pb.RegisterCapabilitiesRegistryServer(s, capability.NewCapabilitiesRegistryServer(o.BrokerExt, capRegistry))
})
if err != nil {
return 0, nil, err
return 0, deps, err
}
deps.Add(capRegistryRes)

keyValueStoreID, keyValueStoreRes, err := o.ServeNew("KeyValueStore", func(s *grpc.Server) {
pb.RegisterKeyValueStoreServer(s, keyvalue.NewServer(keyValueStore))
})
if err != nil {
return 0, nil, fmt.Errorf("failed to serve KeyValueStore: %w", err)
return 0, deps, fmt.Errorf("failed to serve KeyValueStore: %w", err)
}
deps.Add(keyValueStoreRes)

Expand All @@ -102,7 +102,7 @@ func (o *ReportingPluginServiceClient) NewReportingPluginFactory(
})

if err != nil {
return 0, nil, fmt.Errorf("failed to serve new relayer set: %w", err)
return 0, deps, fmt.Errorf("failed to serve new relayer set: %w", err)
}

deps.Add(relayerSetRes)
Expand All @@ -125,9 +125,9 @@ func (o *ReportingPluginServiceClient) NewReportingPluginFactory(
RelayerSetID: relayerSetID,
})
if err != nil {
return 0, nil, err
return 0, deps, err
}
return reply.ID, nil, nil
return reply.ID, deps, nil
})
return NewReportingPluginFactoryClient(o.PluginClient.BrokerExt, cc), nil
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/loop/internal/core/services/validation/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package validation
import (
"context"

"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/structpb"

"github.com/smartcontractkit/chainlink-common/pkg/loop/internal/goplugin"
Expand All @@ -29,7 +28,7 @@ func (v *validationServiceClient) ValidateConfig(ctx context.Context, config map
return err
}

func NewValidationServiceClient(b *net.BrokerExt, cc grpc.ClientConnInterface) *validationServiceClient {
func NewValidationServiceClient(b *net.BrokerExt, cc net.ClientConnInterface) *validationServiceClient {
b = b.WithName("ReportingPluginProviderClient")
return &validationServiceClient{b, goplugin.NewServiceClient(b, cc), pb.NewValidationServiceClient(cc)}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/loop/internal/goplugin/plugin.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package goplugin

import (
"github.com/hashicorp/go-plugin"
"google.golang.org/grpc"

"github.com/smartcontractkit/chainlink-common/pkg/loop/internal/net"
Expand All @@ -19,7 +20,7 @@ func NewPluginClient(brokerCfg net.BrokerConfig) *PluginClient {
return &pc
}

func (p *PluginClient) Refresh(broker net.Broker, conn *grpc.ClientConn) {
func (p *PluginClient) Refresh(broker *plugin.GRPCBroker, conn *grpc.ClientConn) {
p.AtomicBroker.Store(broker)
p.AtomicClient.Store(conn)
p.Logger.Debugw("Refreshed PluginClient connection", "state", conn.GetState())
Expand Down
6 changes: 3 additions & 3 deletions pkg/loop/internal/goplugin/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ var (
// to another loop that is proxied through the core node.
type ServiceClient struct {
b *net.BrokerExt
cc grpc.ClientConnInterface
cc net.ClientConnInterface
grpc pb.ServiceClient
}

func NewServiceClient(b *net.BrokerExt, cc grpc.ClientConnInterface) *ServiceClient {
func NewServiceClient(b *net.BrokerExt, cc net.ClientConnInterface) *ServiceClient {
return &ServiceClient{b, cc, pb.NewServiceClient(cc)}
}

Expand All @@ -41,7 +41,7 @@ func (s *ServiceClient) Close() error {
defer cancel()

_, err := s.grpc.Close(ctx, &emptypb.Empty{})
return err
return errors.Join(err, s.cc.Close())
}

func (s *ServiceClient) Ready() error {
Expand Down
51 changes: 46 additions & 5 deletions pkg/loop/internal/net/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,42 @@ var _ ClientConnInterface = (*grpc.ClientConn)(nil)
type ClientConnInterface interface {
grpc.ClientConnInterface
GetState() connectivity.State
Close() error
}

func ClientConnInterfaceFromGRPC(conn grpc.ClientConnInterface) ClientConnInterface {
connCloser, ok := conn.(ClientConnInterface)
if !ok {
connCloser = &noopClientConnInterface{conn}
}
return connCloser
}

// noopClientConnInterface adapts ClientConnInterface to implement net.ClientConnInterface with no-ops.
type noopClientConnInterface struct {
grpc.ClientConnInterface
}

func (c *noopClientConnInterface) GetState() connectivity.State {
return connectivity.State(-1)
}

func (*noopClientConnInterface) Close() error { return nil }

var _ ClientConnInterface = (*AtomicClient)(nil)

// An AtomicClient implements [grpc.ClientConnInterface] and is backed by a swappable [*grpc.ClientConn].
type AtomicClient struct {
cc atomic.Pointer[grpc.ClientConn]
}

func (a *AtomicClient) Close() error {
if v := a.cc.Swap(nil); v != nil {
return (*v).Close()
}
return nil
}

func (a *AtomicClient) GetState() connectivity.State {
return a.cc.Load().GetState()
}
Expand Down Expand Up @@ -61,6 +88,23 @@ type clientConn struct {
cc *grpc.ClientConn
}

func (c *clientConn) Close() error {
c.mu.Lock()
defer c.mu.Unlock()
return c.close()
}

func (c *clientConn) close() error {
if c.cc != nil {
err := c.cc.Close()
c.CloseAll(c.deps...)
c.cc = nil
c.deps = nil
return err
}
return nil
}

func (c *clientConn) GetState() connectivity.State {
c.mu.RLock()
cc := c.cc
Expand Down Expand Up @@ -127,11 +171,8 @@ func (c *clientConn) refresh(ctx context.Context, orig *grpc.ClientConn) (*grpc.
if c.cc != orig {
return c.cc, nil
}
if c.cc != nil {
if err := c.cc.Close(); err != nil {
c.Logger.Errorw("Client close failed", "err", err)
}
c.CloseAll(c.deps...)
if err := c.close(); err != nil {
c.Logger.Errorw("Client close failed", "err", err)
}

try := func() error {
Expand Down
8 changes: 4 additions & 4 deletions pkg/loop/internal/net/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ func (e ErrConnDial) Unwrap() error {
// must be refreshed.
func isErrTerminal(err error) bool {
switch status.Code(err) {
case codes.Unavailable, codes.Canceled:
case codes.Unavailable:
return true
case codes.OK, codes.Unknown, codes.InvalidArgument, codes.DeadlineExceeded, codes.NotFound, codes.AlreadyExists,
codes.PermissionDenied, codes.ResourceExhausted, codes.FailedPrecondition, codes.Aborted, codes.OutOfRange,
codes.Unimplemented, codes.Internal, codes.DataLoss, codes.Unauthenticated:
case codes.OK, codes.Unknown, codes.InvalidArgument, codes.Canceled, codes.DeadlineExceeded, codes.NotFound,
codes.AlreadyExists, codes.PermissionDenied, codes.ResourceExhausted, codes.FailedPrecondition, codes.Aborted,
codes.OutOfRange, codes.Unimplemented, codes.Internal, codes.DataLoss, codes.Unauthenticated:
return false
}
return false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type Client struct {
encodeWith codecpb.EncodingVersion
}

func NewClient(b *net.BrokerExt, cc grpc.ClientConnInterface, opts ...ClientOpt) *Client {
func NewClient(b *net.BrokerExt, cc net.ClientConnInterface, opts ...ClientOpt) *Client {
client := &Client{
ServiceClient: goplugin.NewServiceClient(b, cc),
grpc: pb.NewContractWriterClient(cc),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type CommitProviderClient struct {
grpcClient ccippb.CommitCustomHandlersClient
}

func NewCommitProviderClient(b *net.BrokerExt, conn grpc.ClientConnInterface) *CommitProviderClient {
func NewCommitProviderClient(b *net.BrokerExt, conn net.ClientConnInterface) *CommitProviderClient {
pluginProviderClient := ocr2.NewPluginProviderClient(b, conn)
client := ccippb.NewCommitCustomHandlersClient(conn)
return &CommitProviderClient{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type CommitStoreGRPCClient struct {
conn grpc.ClientConnInterface
}

func NewCommitStoreReaderGRPCClient(brokerExt *net.BrokerExt, cc grpc.ClientConnInterface) *CommitStoreGRPCClient {
func NewCommitStoreReaderGRPCClient(brokerExt *net.BrokerExt, cc net.ClientConnInterface) *CommitStoreGRPCClient {
return &CommitStoreGRPCClient{client: ccippb.NewCommitStoreReaderClient(cc), b: brokerExt, conn: cc}
}

Expand Down
Loading
Loading