From 6dc65e5e03cfc097a318df51386339af9ee0e524 Mon Sep 17 00:00:00 2001 From: cawthorne Date: Wed, 29 Apr 2026 22:04:46 +0100 Subject: [PATCH] fix: replace stale LOOP relayer resources --- pkg/loop/internal/relayer/relayer.go | 20 +++++++++++- .../relayer/relayer_lifecycle_test.go | 32 +++++++++++++++++++ pkg/loop/internal/types/types.go | 3 ++ 3 files changed, 54 insertions(+), 1 deletion(-) create mode 100644 pkg/loop/internal/relayer/relayer_lifecycle_test.go diff --git a/pkg/loop/internal/relayer/relayer.go b/pkg/loop/internal/relayer/relayer.go index 26220c9b5b..913ae9ebd9 100644 --- a/pkg/loop/internal/relayer/relayer.go +++ b/pkg/loop/internal/relayer/relayer.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "math/big" + "sync" "github.com/google/uuid" "google.golang.org/grpc" @@ -102,6 +103,9 @@ type pluginRelayerServer struct { *net.BrokerExt impl looptypes.PluginRelayer + + activeRelayerMu sync.Mutex + activeRelayer net.Resource } func RegisterPluginRelayerServer(server *grpc.Server, broker net.Broker, brokerCfg net.BrokerConfig, impl looptypes.PluginRelayer) error { @@ -150,12 +154,13 @@ func (p *pluginRelayerServer) NewRelayer(ctx context.Context, request *pb.NewRel err = r.Start(ctx) if err != nil { p.CloseAll(ksRes, ksCSARes, crRes) + err = errors.Join(err, r.Close()) return nil, err } const name = "Relayer" rRes := net.Resource{Closer: r, Name: name} - id, _, err := p.ServeNew(name, func(s *grpc.Server) { + id, relayerResource, err := p.ServeNew(name, func(s *grpc.Server) { pb.RegisterServiceServer(s, &goplugin.ServiceServer{Srv: r}) pb.RegisterRelayerServer(s, newChainRelayerServer(r, p.BrokerExt)) if evmService, ok := r.(types.EVMService); ok { @@ -175,9 +180,22 @@ func (p *pluginRelayerServer) NewRelayer(ctx context.Context, request *pb.NewRel return nil, err } + p.replaceActiveRelayer(relayerResource) + return &pb.NewRelayerReply{RelayerID: id}, nil } +func (p *pluginRelayerServer) replaceActiveRelayer(next net.Resource) { + p.activeRelayerMu.Lock() + prev := p.activeRelayer + p.activeRelayer = next + p.activeRelayerMu.Unlock() + + if prev.Closer != nil { + p.CloseAll(prev) + } +} + // relayerClient adapts a GRPC [pb.RelayerClient] to implement [Relayer]. type relayerClient struct { *net.BrokerExt diff --git a/pkg/loop/internal/relayer/relayer_lifecycle_test.go b/pkg/loop/internal/relayer/relayer_lifecycle_test.go new file mode 100644 index 0000000000..b79e44fb1f --- /dev/null +++ b/pkg/loop/internal/relayer/relayer_lifecycle_test.go @@ -0,0 +1,32 @@ +package relayer + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + loopnet "github.com/smartcontractkit/chainlink-common/pkg/loop/internal/net" +) + +func TestReplaceActiveRelayerClosesPreviousResource(t *testing.T) { + server := newPluginRelayerServer(nil, loopnet.BrokerConfig{Logger: logger.Test(t)}, nil) + first := &countingCloser{} + second := &countingCloser{} + + server.replaceActiveRelayer(loopnet.Resource{Closer: first, Name: "first"}) + require.Equal(t, 0, first.closed) + + server.replaceActiveRelayer(loopnet.Resource{Closer: second, Name: "second"}) + require.Equal(t, 1, first.closed) + require.Equal(t, 0, second.closed) +} + +type countingCloser struct { + closed int +} + +func (c *countingCloser) Close() error { + c.closed++ + return nil +} diff --git a/pkg/loop/internal/types/types.go b/pkg/loop/internal/types/types.go index 0bfa6e9b0f..2ce1df89eb 100644 --- a/pkg/loop/internal/types/types.go +++ b/pkg/loop/internal/types/types.go @@ -11,6 +11,9 @@ import ( type PluginRelayer interface { services.Service + // NewRelayer returns the active relayer for the plugin. Re-invoking NewRelayer + // replaces the previously served relayer; implementations must not retain + // background resources from old relayers outside the returned service. NewRelayer(ctx context.Context, config string, keystore, csaKeystore core.Keystore, capabilityRegistry core.CapabilitiesRegistry) (Relayer, error) }