diff --git a/go.mod b/go.mod index a400ba39dc..d18b558562 100644 --- a/go.mod +++ b/go.mod @@ -42,7 +42,7 @@ require ( github.com/scylladb/go-reflectx v1.0.1 github.com/shopspring/decimal v1.4.0 github.com/smartcontractkit/chain-selectors v1.0.89 - github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10 + github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260514160304-464c69224909 github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4 github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260514104516-a827acdffe43 github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b diff --git a/go.sum b/go.sum index 50114fe85a..0c213b59e5 100644 --- a/go.sum +++ b/go.sum @@ -266,8 +266,8 @@ github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= github.com/smartcontractkit/chain-selectors v1.0.89 h1:L9oWZGqQXWyTPnC6ODXgu3b0DFyLmJ9eHv+uJrE9IZY= github.com/smartcontractkit/chain-selectors v1.0.89/go.mod h1:qy7whtgG5g+7z0jt0nRyii9bLND9m15NZTzuQPkMZ5w= -github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10 h1:FJAFgXS9oqASnkS03RE1HQwYQQxrO4l46O5JSzxqLgg= -github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10/go.mod h1:oiDa54M0FwxevWwyAX773lwdWvFYYlYHHQV1LQ5HpWY= +github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260514160304-464c69224909 h1:r/llpKW6YNTWNjTDElkC1nGJUz/ryomPnRoNON8qO84= +github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260514160304-464c69224909/go.mod h1:HmUyH2oD9m+GRpKq7q3vuRnm1F2Uczf/Nd1v3ipMSK8= github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4 h1:GCzrxDWn3b7jFfEA+WiYRi8CKoegsayiDoJBCjYkneE= github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4/go.mod h1:HHGeDUpAsPa0pmOx7wrByCitjQ0mbUxf0R9v+g67uCA= github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260514104516-a827acdffe43 h1:9vjqB+iNqwyazVoVjR1rozHXTeRYyeggavt3Q4sbNrg= diff --git a/pkg/beholder/batch_emitter_service.go b/pkg/beholder/batch_emitter_service.go new file mode 100644 index 0000000000..b23dcaebe8 --- /dev/null +++ b/pkg/beholder/batch_emitter_service.go @@ -0,0 +1,204 @@ +package beholder + +import ( + "context" + "fmt" + "sync" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + otelmetric "go.opentelemetry.io/otel/metric" + + "github.com/smartcontractkit/chainlink-common/pkg/chipingress" + "github.com/smartcontractkit/chainlink-common/pkg/chipingress/batch" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" +) + +// ChipIngressBatchEmitterService batches events and sends them via chipingress.Client.PublishBatch. +// It implements the Emitter interface. +type ChipIngressBatchEmitterService struct { + services.Service + eng *services.Engine + + batchClient *batch.Client + + metricAttrsCache sync.Map // map[string]otelmetric.MeasurementOption + metrics batchEmitterMetrics +} + +type batchEmitterMetrics struct { + eventsSent otelmetric.Int64Counter + eventsDropped otelmetric.Int64Counter +} + +// NewChipIngressBatchEmitterService creates a batch emitter service backed by the given chipingress client. +func NewChipIngressBatchEmitterService(client chipingress.Client, cfg Config, lggr logger.Logger) (*ChipIngressBatchEmitterService, error) { + if client == nil { + return nil, fmt.Errorf("chip ingress client is nil") + } + + defaults := DefaultConfig() + bufferSize := int(cfg.ChipIngressBufferSize) + if bufferSize == 0 { + bufferSize = int(defaults.ChipIngressBufferSize) + } + maxBatchSize := int(cfg.ChipIngressMaxBatchSize) + if maxBatchSize == 0 { + maxBatchSize = int(defaults.ChipIngressMaxBatchSize) + } + maxConcurrentSends := cfg.ChipIngressMaxConcurrentSends + if maxConcurrentSends == 0 { + maxConcurrentSends = defaults.ChipIngressMaxConcurrentSends + } + sendInterval := cfg.ChipIngressSendInterval + if sendInterval == 0 { + sendInterval = defaults.ChipIngressSendInterval + } + sendTimeout := cfg.ChipIngressSendTimeout + if sendTimeout == 0 { + sendTimeout = defaults.ChipIngressSendTimeout + } + drainTimeout := cfg.ChipIngressDrainTimeout + if drainTimeout == 0 { + drainTimeout = defaults.ChipIngressDrainTimeout + } + + meter := otel.Meter("beholder/chip_ingress_batch_emitter") + metrics, err := newBatchEmitterMetrics(meter) + if err != nil { + return nil, fmt.Errorf("failed to create batch emitter metrics: %w", err) + } + + batchClient, err := batch.NewBatchClient(client, + batch.WithBatchSize(maxBatchSize), + batch.WithMessageBuffer(bufferSize), + batch.WithBatchInterval(sendInterval), + batch.WithMaxPublishTimeout(sendTimeout), + batch.WithShutdownTimeout(drainTimeout), + batch.WithMaxConcurrentSends(maxConcurrentSends), + batch.WithEventClone(false), + ) + if err != nil { + return nil, fmt.Errorf("failed to create batch client: %w", err) + } + + e := &ChipIngressBatchEmitterService{ + batchClient: batchClient, + metrics: metrics, + } + + e.Service, e.eng = services.Config{ + Name: "ChipIngressBatchEmitterService", + Start: e.start, + Close: e.stop, + }.NewServiceEngine(lggr) + + return e, nil +} + +func (e *ChipIngressBatchEmitterService) start(ctx context.Context) error { + e.batchClient.Start(ctx) + return nil +} + +func (e *ChipIngressBatchEmitterService) stop() error { + e.batchClient.Stop() + return nil +} + +// Emit queues an event for batched delivery without blocking. +// Returns an error if the emitter is stopped or the context is cancelled. +// If the buffer is full, the event is silently dropped. +func (e *ChipIngressBatchEmitterService) Emit(ctx context.Context, body []byte, attrKVs ...any) error { + return e.emitInternal(ctx, body, nil, attrKVs...) +} + +// EmitWithCallback works like Emit but invokes callback once the event's fate +// is determined (nil on success, non-nil on failure or buffer-full drop). +// +// If EmitWithCallback returns a non-nil error, the callback will NOT be invoked. +// If it returns nil, the callback is guaranteed to fire exactly once. +func (e *ChipIngressBatchEmitterService) EmitWithCallback(ctx context.Context, body []byte, callback func(error), attrKVs ...any) error { + return e.emitInternal(ctx, body, callback, attrKVs...) +} + +func (e *ChipIngressBatchEmitterService) emitInternal(ctx context.Context, body []byte, callback func(error), attrKVs ...any) error { + return e.eng.IfStarted(func() error { + domain, entity, err := ExtractSourceAndType(attrKVs...) + if err != nil { + return err + } + + attributes := newAttributes(attrKVs...) + + event, err := chipingress.NewEvent(domain, entity, body, attributes) + if err != nil { + return fmt.Errorf("failed to create CloudEvent: %w", err) + } + eventPb, err := chipingress.EventToProto(event) + if err != nil { + return fmt.Errorf("failed to convert to proto: %w", err) + } + + if err := ctx.Err(); err != nil { + return err + } + + metricAttrs := e.metricAttrsFor(domain, entity) + metricsCtx := context.Background() + + queueErr := e.batchClient.QueueMessage(eventPb, func(sendErr error) { + if sendErr != nil { + e.metrics.eventsDropped.Add(metricsCtx, 1, metricAttrs) + } else { + e.metrics.eventsSent.Add(metricsCtx, 1, metricAttrs) + } + if callback != nil { + callback(sendErr) + } + }) + if queueErr != nil { + e.metrics.eventsDropped.Add(metricsCtx, 1, metricAttrs) + if callback != nil { + callback(queueErr) + } + } + + return nil + }) +} + +func (e *ChipIngressBatchEmitterService) metricAttrsFor(domain, entity string) otelmetric.MeasurementOption { + key := domain + "\x00" + entity + if v, ok := e.metricAttrsCache.Load(key); ok { + return v.(otelmetric.MeasurementOption) + } + attrs := otelmetric.WithAttributeSet(attribute.NewSet( + attribute.String("domain", domain), + attribute.String("entity", entity), + )) + v, _ := e.metricAttrsCache.LoadOrStore(key, attrs) + return v.(otelmetric.MeasurementOption) +} + +func newBatchEmitterMetrics(meter otelmetric.Meter) (batchEmitterMetrics, error) { + eventsSent, err := meter.Int64Counter("chip_ingress.events_sent", + otelmetric.WithDescription("Total events successfully sent via PublishBatch"), + otelmetric.WithUnit("{event}")) + if err != nil { + return batchEmitterMetrics{}, err + } + + eventsDropped, err := meter.Int64Counter("chip_ingress.events_dropped", + otelmetric.WithDescription("Total events dropped (buffer full or send failure)"), + otelmetric.WithUnit("{event}")) + if err != nil { + return batchEmitterMetrics{}, err + } + + return batchEmitterMetrics{ + eventsSent: eventsSent, + eventsDropped: eventsDropped, + }, nil +} diff --git a/pkg/beholder/batch_emitter_service_test.go b/pkg/beholder/batch_emitter_service_test.go new file mode 100644 index 0000000000..d685552f44 --- /dev/null +++ b/pkg/beholder/batch_emitter_service_test.go @@ -0,0 +1,611 @@ +package beholder_test + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + + "github.com/smartcontractkit/chainlink-common/pkg/beholder" + "github.com/smartcontractkit/chainlink-common/pkg/chipingress" + "github.com/smartcontractkit/chainlink-common/pkg/chipingress/mocks" + "github.com/smartcontractkit/chainlink-common/pkg/logger" +) + +func newTestConfig() beholder.Config { + return beholder.Config{ + ChipIngressBufferSize: 10, + ChipIngressMaxBatchSize: 5, + ChipIngressMaxConcurrentSends: 3, + ChipIngressSendInterval: 50 * time.Millisecond, + ChipIngressSendTimeout: 5 * time.Second, + ChipIngressDrainTimeout: 5 * time.Second, + } +} + +func newTestLogger(t *testing.T) logger.Logger { + t.Helper() + lggr, err := logger.New() + require.NoError(t, err) + t.Cleanup(func() { _ = lggr.Sync() }) + return lggr +} + +func TestNewChipIngressBatchEmitterService(t *testing.T) { + t.Run("happy path", func(t *testing.T) { + clientMock := mocks.NewClient(t) + clientMock.EXPECT().Close().Return(nil).Maybe() + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, newTestConfig(), newTestLogger(t)) + require.NoError(t, err) + assert.NotNil(t, emitter) + }) + + t.Run("returns error when client is nil", func(t *testing.T) { + emitter, err := beholder.NewChipIngressBatchEmitterService(nil, newTestConfig(), newTestLogger(t)) + assert.Error(t, err) + assert.Nil(t, emitter) + }) +} + +func TestChipIngressBatchEmitterService_Emit(t *testing.T) { + t.Run("returns error when domain/entity missing", func(t *testing.T) { + clientMock := mocks.NewClient(t) + clientMock.EXPECT().Close().Return(nil).Maybe() + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, newTestConfig(), newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + defer emitter.Close() //nolint:errcheck + + err = emitter.Emit(t.Context(), []byte("test"), "bad_key", "bad_value") + assert.Error(t, err) + }) + + t.Run("events are batched and sent via PublishBatch", func(t *testing.T) { + clientMock := mocks.NewClient(t) + clientMock.EXPECT().Close().Return(nil).Maybe() + + var mu sync.Mutex + var receivedBatches []*chipingress.CloudEventBatch + clientMock. + On("PublishBatch", mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + mu.Lock() + defer mu.Unlock() + batch := args.Get(1).(*chipingress.CloudEventBatch) + receivedBatches = append(receivedBatches, batch) + }). + Return(nil, nil) + + cfg := newTestConfig() + cfg.ChipIngressSendInterval = 50 * time.Millisecond + + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, cfg, newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + + for i := 0; i < 3; i++ { + err = emitter.Emit(t.Context(), []byte("body"), + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + ) + require.NoError(t, err) + } + + assert.Eventually(t, func() bool { + mu.Lock() + defer mu.Unlock() + return len(receivedBatches) > 0 + }, 2*time.Second, 10*time.Millisecond) + + require.NoError(t, emitter.Close()) + + mu.Lock() + defer mu.Unlock() + + totalEvents := 0 + for _, batch := range receivedBatches { + totalEvents += len(batch.Events) + } + assert.Equal(t, 3, totalEvents) + }) +} + +func TestChipIngressBatchEmitterService_CloudEventFormat(t *testing.T) { + clientMock := mocks.NewClient(t) + clientMock.EXPECT().Close().Return(nil).Maybe() + + var mu sync.Mutex + var receivedBatch *chipingress.CloudEventBatch + clientMock. + On("PublishBatch", mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + mu.Lock() + defer mu.Unlock() + receivedBatch = args.Get(1).(*chipingress.CloudEventBatch) + }). + Return(nil, nil) + + cfg := newTestConfig() + cfg.ChipIngressSendInterval = 50 * time.Millisecond + + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, cfg, newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + + err = emitter.Emit(t.Context(), []byte("test-payload"), + beholder.AttrKeyDomain, "my-domain", + beholder.AttrKeyEntity, "my-entity", + ) + require.NoError(t, err) + + assert.Eventually(t, func() bool { + mu.Lock() + defer mu.Unlock() + return receivedBatch != nil + }, 2*time.Second, 10*time.Millisecond) + + require.NoError(t, emitter.Close()) + + mu.Lock() + defer mu.Unlock() + require.Len(t, receivedBatch.Events, 1) + + event := receivedBatch.Events[0] + assert.Equal(t, "my-domain", event.Source) + assert.Equal(t, "my-entity", event.Type) + assert.NotEmpty(t, event.Id) +} + +func TestChipIngressBatchEmitterService_PublishBatchError(t *testing.T) { + clientMock := mocks.NewClient(t) + clientMock.EXPECT().Close().Return(nil).Maybe() + + var mu sync.Mutex + callCount := 0 + clientMock. + On("PublishBatch", mock.Anything, mock.Anything). + Run(func(_ mock.Arguments) { + mu.Lock() + defer mu.Unlock() + callCount++ + }). + Return(nil, assert.AnError) + + cfg := newTestConfig() + cfg.ChipIngressSendInterval = 50 * time.Millisecond + + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, cfg, newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + + for i := 0; i < 3; i++ { + err = emitter.Emit(t.Context(), []byte("body"), + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + ) + require.NoError(t, err) + } + + assert.Eventually(t, func() bool { + mu.Lock() + defer mu.Unlock() + return callCount > 0 + }, 2*time.Second, 10*time.Millisecond) + + require.NoError(t, emitter.Close()) +} + +func TestChipIngressBatchEmitterService_ContextCancellation(t *testing.T) { + clientMock := mocks.NewClient(t) + clientMock.EXPECT().Close().Return(nil).Maybe() + clientMock. + On("PublishBatch", mock.Anything, mock.Anything). + Return(nil, nil). + Maybe() + + cfg := newTestConfig() + cfg.ChipIngressBufferSize = 1 + cfg.ChipIngressSendInterval = 10 * time.Second + + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, cfg, newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + defer emitter.Close() //nolint:errcheck + + ctx, cancel := context.WithCancel(t.Context()) + cancel() + + err = emitter.Emit(ctx, []byte("should-fail"), + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + ) + assert.ErrorIs(t, err, context.Canceled) +} + +func TestChipIngressBatchEmitterService_DefaultConfig(t *testing.T) { + clientMock := mocks.NewClient(t) + clientMock.EXPECT().Close().Return(nil).Maybe() + + var mu sync.Mutex + var receivedBatch *chipingress.CloudEventBatch + clientMock. + On("PublishBatch", mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + mu.Lock() + defer mu.Unlock() + receivedBatch = args.Get(1).(*chipingress.CloudEventBatch) + }). + Return(nil, nil) + + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, beholder.Config{}, newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + + err = emitter.Emit(t.Context(), []byte("body"), + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + ) + require.NoError(t, err) + + assert.Eventually(t, func() bool { + mu.Lock() + defer mu.Unlock() + return receivedBatch != nil + }, 3*time.Second, 50*time.Millisecond) + + require.NoError(t, emitter.Close()) + + mu.Lock() + defer mu.Unlock() + require.Len(t, receivedBatch.Events, 1) +} + +func TestChipIngressBatchEmitterService_EmitAfterClose(t *testing.T) { + clientMock := mocks.NewClient(t) + clientMock.EXPECT().Close().Return(nil).Maybe() + clientMock. + On("PublishBatch", mock.Anything, mock.Anything). + Return(nil, nil). + Maybe() + + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, newTestConfig(), newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + require.NoError(t, emitter.Close()) + + err = emitter.Emit(t.Context(), []byte("body"), + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + ) + assert.Error(t, err) +} + +func TestChipIngressBatchEmitterService_EmitWithCallback(t *testing.T) { + t.Run("callback receives nil on success", func(t *testing.T) { + clientMock := mocks.NewClient(t) + clientMock.EXPECT().Close().Return(nil).Maybe() + clientMock. + On("PublishBatch", mock.Anything, mock.Anything). + Return(nil, nil) + + cfg := newTestConfig() + cfg.ChipIngressSendInterval = 50 * time.Millisecond + + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, cfg, newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + + done := make(chan error, 1) + err = emitter.EmitWithCallback(t.Context(), []byte("body"), func(sendErr error) { + done <- sendErr + }, + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + ) + require.NoError(t, err) + + select { + case sendErr := <-done: + assert.NoError(t, sendErr) + case <-time.After(3 * time.Second): + t.Fatal("callback was not invoked within timeout") + } + + require.NoError(t, emitter.Close()) + }) + + t.Run("callback receives error on PublishBatch failure", func(t *testing.T) { + clientMock := mocks.NewClient(t) + clientMock.EXPECT().Close().Return(nil).Maybe() + clientMock. + On("PublishBatch", mock.Anything, mock.Anything). + Return(nil, assert.AnError) + + cfg := newTestConfig() + cfg.ChipIngressSendInterval = 50 * time.Millisecond + + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, cfg, newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + + done := make(chan error, 1) + err = emitter.EmitWithCallback(t.Context(), []byte("body"), func(sendErr error) { + done <- sendErr + }, + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + ) + require.NoError(t, err) + + select { + case sendErr := <-done: + assert.Error(t, sendErr) + case <-time.After(3 * time.Second): + t.Fatal("callback was not invoked within timeout") + } + + require.NoError(t, emitter.Close()) + }) + + t.Run("callback receives error when buffer is full", func(t *testing.T) { + clientMock := mocks.NewClient(t) + clientMock.EXPECT().Close().Return(nil).Maybe() + + sendBlocked := make(chan struct{}) + firstCallSignal := make(chan struct{}, 1) + clientMock. + On("PublishBatch", mock.Anything, mock.Anything). + Run(func(_ mock.Arguments) { + select { + case firstCallSignal <- struct{}{}: + default: + } + <-sendBlocked + }). + Return(nil, nil). + Maybe() + + cfg := newTestConfig() + cfg.ChipIngressBufferSize = 2 + cfg.ChipIngressMaxBatchSize = 1 + cfg.ChipIngressMaxConcurrentSends = 1 + cfg.ChipIngressSendInterval = 50 * time.Millisecond + cfg.ChipIngressDrainTimeout = 200 * time.Millisecond + + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, cfg, newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + defer close(sendBlocked) + defer emitter.Close() //nolint:errcheck + + err = emitter.Emit(t.Context(), []byte("body"), + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + ) + require.NoError(t, err) + + <-firstCallSignal + time.Sleep(100 * time.Millisecond) + + for i := 0; i < 10; i++ { + _ = emitter.Emit(t.Context(), []byte("filler"), + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + ) + } + + dropped := make(chan error, 1) + err = emitter.EmitWithCallback(t.Context(), []byte("overflow"), func(sendErr error) { + dropped <- sendErr + }, + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + ) + assert.NoError(t, err) + + select { + case dropErr := <-dropped: + assert.Error(t, dropErr) + case <-time.After(time.Second): + t.Fatal("callback was not invoked for dropped event") + } + }) + + t.Run("nil callback behaves like Emit", func(t *testing.T) { + clientMock := mocks.NewClient(t) + clientMock.EXPECT().Close().Return(nil).Maybe() + clientMock. + On("PublishBatch", mock.Anything, mock.Anything). + Return(nil, nil). + Maybe() + + cfg := newTestConfig() + cfg.ChipIngressSendInterval = 50 * time.Millisecond + + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, cfg, newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + + err = emitter.EmitWithCallback(t.Context(), []byte("body"), nil, + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + ) + assert.NoError(t, err) + + require.NoError(t, emitter.Close()) + }) +} + +func TestChipIngressBatchEmitterService_Metrics(t *testing.T) { + t.Run("records events_sent on successful publish", func(t *testing.T) { + reader, restore := useEmitterTestMeterProvider(t) + defer restore() + + clientMock := mocks.NewClient(t) + clientMock.EXPECT().Close().Return(nil).Maybe() + done := make(chan struct{}) + clientMock. + On("PublishBatch", mock.Anything, mock.Anything). + Return(nil, nil). + Run(func(_ mock.Arguments) { close(done) }). + Once() + + cfg := newTestConfig() + cfg.ChipIngressMaxBatchSize = 1 + cfg.ChipIngressSendInterval = time.Second + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, cfg, newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + + err = emitter.Emit(t.Context(), []byte("body"), + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "MetricEvent", + ) + require.NoError(t, err) + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for publish") + } + require.NoError(t, emitter.Close()) + + rm := collectEmitterMetrics(t, reader) + metric := mustEmitterMetric(t, rm, "chip_ingress.events_sent") + sum, ok := metric.Data.(metricdata.Sum[int64]) + require.True(t, ok) + dp := mustEmitterInt64SumPoint(t, sum, "domain", "platform", "entity", "MetricEvent") + assert.GreaterOrEqual(t, dp.Value, int64(1)) + }) + + t.Run("records events_dropped on publish error", func(t *testing.T) { + reader, restore := useEmitterTestMeterProvider(t) + defer restore() + + clientMock := mocks.NewClient(t) + clientMock.EXPECT().Close().Return(nil).Maybe() + done := make(chan struct{}) + clientMock. + On("PublishBatch", mock.Anything, mock.Anything). + Return(nil, assert.AnError). + Run(func(_ mock.Arguments) { close(done) }). + Once() + + cfg := newTestConfig() + cfg.ChipIngressMaxBatchSize = 1 + cfg.ChipIngressSendInterval = time.Second + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, cfg, newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + + err = emitter.Emit(t.Context(), []byte("body"), + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "MetricDropEvent", + ) + require.NoError(t, err) + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for publish") + } + require.NoError(t, emitter.Close()) + + rm := collectEmitterMetrics(t, reader) + metric := mustEmitterMetric(t, rm, "chip_ingress.events_dropped") + sum, ok := metric.Data.(metricdata.Sum[int64]) + require.True(t, ok) + dp := mustEmitterInt64SumPoint(t, sum, "domain", "platform", "entity", "MetricDropEvent") + assert.GreaterOrEqual(t, dp.Value, int64(1)) + }) +} + +func BenchmarkChipIngressBatchEmitterService_Emit(b *testing.B) { + cfg := beholder.Config{ + ChipIngressBufferSize: uint(b.N + 10), + ChipIngressMaxBatchSize: uint(b.N + 1), + ChipIngressMaxConcurrentSends: 1, + ChipIngressSendInterval: time.Hour, + ChipIngressSendTimeout: 5 * time.Second, + ChipIngressDrainTimeout: 5 * time.Second, + } + emitter, err := beholder.NewChipIngressBatchEmitterService(&chipingress.NoopClient{}, cfg, logger.Test(b)) + if err != nil { + b.Fatal(err) + } + if err := emitter.Start(context.Background()); err != nil { + b.Fatal(err) + } + defer func() { _ = emitter.Close() }() + + payload := []byte("benchmark-payload") + b.ResetTimer() + for i := 0; i < b.N; i++ { + if err := emitter.Emit(context.Background(), payload, + beholder.AttrKeyDomain, "bench", + beholder.AttrKeyEntity, "BenchmarkEvent", + ); err != nil { + b.Fatal(err) + } + } +} + +func useEmitterTestMeterProvider(t *testing.T) (*sdkmetric.ManualReader, func()) { + t.Helper() + prev := otel.GetMeterProvider() + reader := sdkmetric.NewManualReader() + provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)) + otel.SetMeterProvider(provider) + return reader, func() { + require.NoError(t, provider.Shutdown(t.Context())) + otel.SetMeterProvider(prev) + } +} + +func collectEmitterMetrics(t *testing.T, reader *sdkmetric.ManualReader) metricdata.ResourceMetrics { + t.Helper() + var rm metricdata.ResourceMetrics + require.NoError(t, reader.Collect(t.Context(), &rm)) + return rm +} + +func mustEmitterMetric(t *testing.T, rm metricdata.ResourceMetrics, name string) metricdata.Metrics { + t.Helper() + for _, sm := range rm.ScopeMetrics { + for _, metric := range sm.Metrics { + if metric.Name == name { + return metric + } + } + } + t.Fatalf("metric %q not found", name) + return metricdata.Metrics{} +} + +func mustEmitterInt64SumPoint(t *testing.T, sum metricdata.Sum[int64], k1, v1, k2, v2 string) metricdata.DataPoint[int64] { + t.Helper() + for _, dp := range sum.DataPoints { + if hasEmitterStringAttr(dp.Attributes, k1, v1) && hasEmitterStringAttr(dp.Attributes, k2, v2) { + return dp + } + } + t.Fatalf("sum datapoint not found for attrs %s=%s,%s=%s", k1, v1, k2, v2) + return metricdata.DataPoint[int64]{} +} + +func hasEmitterStringAttr(set attribute.Set, key, want string) bool { + for _, kv := range set.ToSlice() { + if string(kv.Key) == key { + return kv.Value.AsString() == want + } + } + return false +} diff --git a/pkg/beholder/beholdertest/beholder.go b/pkg/beholder/beholdertest/beholder.go index a06acf325a..6b590db571 100644 --- a/pkg/beholder/beholdertest/beholder.go +++ b/pkg/beholder/beholdertest/beholder.go @@ -8,13 +8,11 @@ import ( "testing" "github.com/stretchr/testify/require" - otellognoop "go.opentelemetry.io/otel/log/noop" - otelmetricnoop "go.opentelemetry.io/otel/metric/noop" - oteltracenoop "go.opentelemetry.io/otel/trace/noop" "google.golang.org/protobuf/proto" "github.com/smartcontractkit/chainlink-common/pkg/beholder" "github.com/smartcontractkit/chainlink-common/pkg/beholder/pb" + "github.com/smartcontractkit/chainlink-common/pkg/logger" ) const ( @@ -120,39 +118,16 @@ messageLoop: func NewObserver(t *testing.T) Observer { t.Helper() - cfg := beholder.DefaultConfig() - - // Logger - loggerProvider := otellognoop.NewLoggerProvider() - logger := loggerProvider.Logger(packageNameBeholder) - - // Tracer - tracerProvider := oteltracenoop.NewTracerProvider() - tracer := tracerProvider.Tracer(packageNameBeholder) - - // Meter - meterProvider := otelmetricnoop.NewMeterProvider() - meter := meterProvider.Meter(packageNameBeholder) - - // MessageEmitter messageEmitter := &assertMessageEmitter{t: t} - client := &beholder.Client{ - Config: cfg, - Logger: logger, - Tracer: tracer, - Meter: meter, - Emitter: messageEmitter, - LoggerProvider: loggerProvider, - TracerProvider: tracerProvider, - MeterProvider: meterProvider, - MessageLoggerProvider: loggerProvider, - OnClose: func() error { return nil }, - } + client := beholder.NewNoopClient(logger.Test(t)) + client.Emitter = messageEmitter + require.NoError(t, client.Start(t.Context())) //reset NewObserver state after the test prevClient := beholder.GetClient() t.Cleanup(func() { + require.NoError(t, client.Close()) beholder.SetClient(prevClient) t.Setenv(packageNameBeholder, packageNameBeholder) }) diff --git a/pkg/beholder/chip_ingress_emitter.go b/pkg/beholder/chip_ingress_emitter.go index 62ceae0186..d13bd8320f 100644 --- a/pkg/beholder/chip_ingress_emitter.go +++ b/pkg/beholder/chip_ingress_emitter.go @@ -5,26 +5,46 @@ import ( "errors" "fmt" "maps" + "sync/atomic" "github.com/smartcontractkit/chainlink-common/pkg/chipingress" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" ) +// ChipIngressEmitter wraps a synchronous chipingress.Client.Publish call +// in a fire-and-forget goroutine so callers are never blocked. type ChipIngressEmitter struct { client chipingress.Client + log logger.Logger + stopCh services.StopChan + wg services.WaitGroup + closed atomic.Bool } -func NewChipIngressEmitter(client chipingress.Client) (Emitter, error) { +func NewChipIngressEmitter(client chipingress.Client, lggr logger.Logger) (Emitter, error) { if client == nil { return nil, errors.New("chip ingress client is nil") } - return &ChipIngressEmitter{client: client}, nil + return &ChipIngressEmitter{ + client: client, + log: lggr, + stopCh: make(services.StopChan), + }, nil } func (c *ChipIngressEmitter) Close() error { + if wasClosed := c.closed.Swap(true); wasClosed { + return errors.New("already closed") + } + close(c.stopCh) + c.wg.Wait() return c.client.Close() } +// Emit fires a synchronous gRPC Publish call in a background goroutine +// so the caller is never blocked. func (c *ChipIngressEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) error { sourceDomain, entityType, err := ExtractSourceAndType(attrKVs...) if err != nil { @@ -41,10 +61,21 @@ func (c *ChipIngressEmitter) Emit(ctx context.Context, body []byte, attrKVs ...a return fmt.Errorf("failed to convert event to proto: %w", err) } - _, err = c.client.Publish(ctx, eventPb) - if err != nil { + if err := c.wg.TryAdd(1); err != nil { return err } + // Legacy ChipIngressEmitter.Emit is a synchronous gRPC call; + // fire-and-forget via goroutine to avoid blocking the caller. + go func(ctx context.Context) { + defer c.wg.Done() + var cancel context.CancelFunc + ctx, cancel = c.stopCh.Ctx(ctx) + defer cancel() + + if _, err := c.client.Publish(ctx, eventPb); err != nil { + c.log.Infof("failed to emit to chip ingress: %v", err) + } + }(context.WithoutCancel(ctx)) return nil } diff --git a/pkg/beholder/chip_ingress_emitter_test.go b/pkg/beholder/chip_ingress_emitter_test.go index c619625960..1ad998adfc 100644 --- a/pkg/beholder/chip_ingress_emitter_test.go +++ b/pkg/beholder/chip_ingress_emitter_test.go @@ -10,18 +10,19 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/beholder" "github.com/smartcontractkit/chainlink-common/pkg/chipingress/mocks" + "github.com/smartcontractkit/chainlink-common/pkg/logger" ) func TestNewChipIngressEmitter(t *testing.T) { t.Run("happy path", func(t *testing.T) { clientMock := mocks.NewClient(t) - emitter, err := beholder.NewChipIngressEmitter(clientMock) + emitter, err := beholder.NewChipIngressEmitter(clientMock, logger.Test(t)) require.NoError(t, err) assert.NotNil(t, emitter) }) t.Run("returns error when client is nil", func(t *testing.T) { - emitter, err := beholder.NewChipIngressEmitter(nil) + emitter, err := beholder.NewChipIngressEmitter(nil, logger.Test(t)) assert.Error(t, err) assert.Nil(t, emitter) }) @@ -44,37 +45,44 @@ func TestChipIngressEmit(t *testing.T) { clientMock. On("Publish", mock.Anything, mock.Anything). Return(nil, nil) + clientMock.On("Close").Return(nil) - emitter, err := beholder.NewChipIngressEmitter(clientMock) + emitter, err := beholder.NewChipIngressEmitter(clientMock, logger.Test(t)) require.NoError(t, err) err = emitter.Emit(t.Context(), body, beholder.AttrKeyDomain, domain, beholder.AttrKeyEntity, entity, attributes) require.NoError(t, err) + // Close drains in-flight goroutines so mock expectations are met. + require.NoError(t, emitter.Close()) clientMock.AssertExpectations(t) }) t.Run("returns error when ExtractSourceAndType fails", func(t *testing.T) { - emitter, err := beholder.NewChipIngressEmitter(mocks.NewClient(t)) + emitter, err := beholder.NewChipIngressEmitter(mocks.NewClient(t), logger.Test(t)) require.NoError(t, err) err = emitter.Emit(t.Context(), body, "bad_key", domain) assert.Error(t, err) }) - t.Run("returns error when Publish fails", func(t *testing.T) { + t.Run("logs error when Publish fails", func(t *testing.T) { clientMock := mocks.NewClient(t) clientMock. On("Publish", mock.Anything, mock.Anything). Return(nil, assert.AnError) + clientMock.On("Close").Return(nil) - emitter, err := beholder.NewChipIngressEmitter(clientMock) + emitter, err := beholder.NewChipIngressEmitter(clientMock, logger.Test(t)) require.NoError(t, err) + // Emit returns nil because the error is logged asynchronously. err = emitter.Emit(t.Context(), body, beholder.AttrKeyDomain, domain, beholder.AttrKeyEntity, entity) - require.Error(t, err) + require.NoError(t, err) + // Close drains in-flight goroutines so mock expectations are met. + require.NoError(t, emitter.Close()) clientMock.AssertExpectations(t) }) } diff --git a/pkg/beholder/client.go b/pkg/beholder/client.go index 12592b7c2a..dc40c9ed70 100644 --- a/pkg/beholder/client.go +++ b/pkg/beholder/client.go @@ -26,6 +26,8 @@ import ( "google.golang.org/grpc/credentials/insecure" "github.com/smartcontractkit/chainlink-common/pkg/chipingress" + pkglogger "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" ) const defaultGRPCCompressor = "gzip" @@ -37,6 +39,9 @@ type Emitter interface { } type Client struct { + services.Service + eng *services.Engine + Config Config // Logger Logger otellog.Logger @@ -62,6 +67,21 @@ type Client struct { OnClose func() error } +// initService wires up the services.Service lifecycle for the Client. +// Must be called exactly once after populating the Client fields. +func (c *Client) initService(lggr pkglogger.Logger, batchSvc *ChipIngressBatchEmitterService) { + c.Service, c.eng = services.Config{ + Name: "BeholderClient", + Close: c.close, + NewSubServices: func(l pkglogger.Logger) []services.Service { + if batchSvc == nil { + return nil + } + return []services.Service{batchSvc} + }, + }.NewServiceEngine(lggr) +} + // NewClient creates a new Client with initialized OpenTelemetry components // To handle OpenTelemetry errors use [otel.SetErrorHandler](https://pkg.go.dev/go.opentelemetry.io/otel#SetErrorHandler) func NewClient(cfg Config) (*Client, error) { @@ -186,8 +206,12 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro // This will eventually be removed in favor of chip-ingress emitter // and logs will be sent via OTLP using the regular Logger instead of calling Emit emitter := NewMessageEmitter(messageLogger) - + var batchEmitterService *ChipIngressBatchEmitterService var chipIngressClient chipingress.Client = &chipingress.NoopClient{} + lggr := cfg.ChipIngressLogger + if lggr == nil { + lggr = pkglogger.Nop() + } // if chip ingress is enabled, create dual source emitter that sends to both otel collector and chip ingress // eventually we will remove the dual source emitter and just use chip ingress if cfg.ChipIngressEmitterEnabled || cfg.ChipIngressEmitterGRPCEndpoint != "" { @@ -221,40 +245,74 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro return nil, err } - chipIngressEmitter, err := NewChipIngressEmitter(chipIngressClient) - if err != nil { - return nil, fmt.Errorf("failed to create chip ingress emitter: %w", err) + var chipIngressEmitter Emitter + if cfg.ChipIngressBatchEmitterEnabled { + if cfg.ChipIngressLogger == nil { + return nil, fmt.Errorf("ChipIngressLogger is required when ChipIngressBatchEmitterEnabled is true") + } + batchEmitterService, err = NewChipIngressBatchEmitterService(chipIngressClient, cfg, cfg.ChipIngressLogger) + if err != nil { + return nil, fmt.Errorf("failed to create chip ingress batch emitter: %w", err) + } + // Batch emitter lifecycle is owned by Client's service-engine sub-service wiring. + // Wrap it with noCloseEmitter so DualSourceEmitter.Close() does not attempt to + // close it directly; the service engine closes batchEmitterService in sub-service + // teardown after parent close hook completes. + chipIngressEmitter = noCloseEmitter{Emitter: batchEmitterService} + } else { + chipIngressEmitter, err = NewChipIngressEmitter(chipIngressClient, cfg.ChipIngressLogger) + if err != nil { + return nil, fmt.Errorf("failed to create chip ingress emitter: %w", err) + } } - emitter, err = NewDualSourceEmitter(chipIngressEmitter, emitter) + emitter, err = NewDualSourceEmitter(chipIngressEmitter, emitter, lggr) if err != nil { return nil, fmt.Errorf("failed to create dual source emitter: %w", err) } } onClose := func() (err error) { - for _, provider := range []shutdowner{messageLoggerProvider, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider} { + for _, provider := range []shutdowner{messageLoggerProvider, loggerProvider, tracerProvider, meterProvider} { err = errors.Join(err, provider.Shutdown(context.Background())) } return } - return &Client{cfg, logger, tracer, meter, emitter, chipIngressClient, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider, signer, onClose}, nil + c := &Client{ + Config: cfg, + Logger: logger, + Tracer: tracer, + Meter: meter, + Emitter: emitter, + Chip: chipIngressClient, + LoggerProvider: loggerProvider, + TracerProvider: tracerProvider, + MeterProvider: meterProvider, + MessageLoggerProvider: messageLoggerProvider, + lazySigner: signer, + OnClose: onClose, + } + c.initService(lggr, batchEmitterService) + return c, nil } -// Closes all providers, flushes all data and stops all background processes -func (c Client) Close() (err error) { - if c.Chip != nil { - err = errors.Join(err, c.Chip.Close()) - } +// close is the lifecycle Close hook: OTel/message shutdown and CHIP close. +func (c *Client) close() (err error) { if c.Emitter != nil { err = errors.Join(err, c.Emitter.Close()) } if c.OnClose != nil { err = errors.Join(err, c.OnClose()) } - return + return err } +// noCloseEmitter delegates Emit to the wrapped emitter but makes Close a no-op. +// Use this when lifecycle ownership is external (e.g. service engine sub-service). +type noCloseEmitter struct{ Emitter } + +func (n noCloseEmitter) Close() error { return nil } + // Returns a new Client with the same configuration but with a different package name // Deprecated: Use ForName func (c Client) ForPackage(name string) Client { @@ -614,6 +672,8 @@ func newLoggerProviderOpts(cfg Config, baseResource *sdkresource.Resource, share // newMessageLoggerProviderOpts creates logger provider options for custom message emitter func newMessageLoggerProviderOpts(cfg Config, baseResource *sdkresource.Resource, sharedLogExporter sdklog.Exporter) ([]sdklog.LoggerProviderOption, error) { var messageLogProcessor sdklog.Processor + // EmitterBatchProcessor=true uses async batching for custom-message logs; + // false uses a simple processor that exports each record immediately. if cfg.EmitterBatchProcessor { batchProcessorOpts := []sdklog.BatchProcessorOption{} if cfg.EmitterExportTimeout > 0 { diff --git a/pkg/beholder/client_test.go b/pkg/beholder/client_test.go index 74c75583ae..a64e4a3751 100644 --- a/pkg/beholder/client_test.go +++ b/pkg/beholder/client_test.go @@ -24,6 +24,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/chipingress" chipmocks "github.com/smartcontractkit/chainlink-common/pkg/chipingress/mocks" "github.com/smartcontractkit/chainlink-common/pkg/chipingress/pb" + "github.com/smartcontractkit/chainlink-common/pkg/services" ) type MockExporter struct { @@ -191,6 +192,7 @@ func TestClient_Close(t *testing.T) { client, err := beholder.NewStdoutClient() require.NoError(t, err) + require.NoError(t, client.Start(t.Context())) err = client.Close() require.NoError(t, err) @@ -220,7 +222,8 @@ func TestClient_ForPackage(t *testing.T) { // Meter counter, _ := clientForTest.Meter.Int64Counter("testMetric") counter.Add(t.Context(), 1) - clientForTest.Close() + require.NoError(t, client.Start(t.Context())) + require.NoError(t, clientForTest.Close()) assert.Contains(t, b.String(), `"Name":"TestClient_ForPackage"`) assert.Contains(t, b.String(), "testMetric") } @@ -926,3 +929,92 @@ func TestChipIngressClient(t *testing.T) { assert.NoError(t, err) }) } + +// TestClient_batchEmitterService groups lifecycle and construction tests for the +// ChipIngress batch emitter sub-service embedded in the beholder Client. +func TestClient_batchEmitterService(t *testing.T) { + newBatchClient := func(t *testing.T) *beholder.Client { + t.Helper() + client, err := beholder.NewClient(beholder.Config{ + OtelExporterGRPCEndpoint: "localhost:4317", + // Use simple exporter in this lifecycle test to avoid batch flush/shutdown delays. + EmitterBatchProcessor: false, + LogBatchProcessor: false, + LogRetryConfig: &beholder.RetryConfig{InitialInterval: time.Millisecond, MaxInterval: time.Millisecond, MaxElapsedTime: 0}, + TraceRetryConfig: &beholder.RetryConfig{InitialInterval: time.Millisecond, MaxInterval: time.Millisecond, MaxElapsedTime: 0}, + MetricRetryConfig: &beholder.RetryConfig{InitialInterval: time.Millisecond, MaxInterval: time.Millisecond, MaxElapsedTime: 0}, + ChipIngressEmitterEnabled: true, + ChipIngressEmitterGRPCEndpoint: "localhost:9090", + ChipIngressInsecureConnection: true, + ChipIngressBatchEmitterEnabled: true, + ChipIngressLogger: newTestLogger(t), + ChipIngressBufferSize: 10, + ChipIngressMaxBatchSize: 5, + ChipIngressSendInterval: 50 * time.Millisecond, + ChipIngressSendTimeout: 1 * time.Second, + ChipIngressDrainTimeout: 1 * time.Second, + }) + require.NoError(t, err) + return client + } + + // startsWithClient: batch emitter sub-service starts and stops with the Client lifecycle. + t.Run("starts with client", func(t *testing.T) { + client := newBatchClient(t) + + // Before Start: service is unready and incomplete emit fails validation. + assert.ErrorContains(t, client.Service.Ready(), "not started") + err := client.Emitter.Emit(t.Context(), []byte("body"), + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + // AttrKeyDataSchema intentionally omitted — triggers required-field validation error. + ) + assert.ErrorContains(t, err, "BeholderDataSchema") + + require.NoError(t, client.Service.Start(t.Context())) + assert.NoError(t, client.Service.Ready()) + _ = client.Close() + }) + + // emitSucceedsBeforeStart: a fully-valid Emit returns no error before Start. + // The OTLP path is always active; the batch emitter's service-not-started error is + // swallowed by DualSourceEmitter and only logged. + t.Run("emit succeeds before start", func(t *testing.T) { + client := newBatchClient(t) + + assert.ErrorContains(t, client.Service.Ready(), "not started") + + err := client.Emitter.Emit(t.Context(), []byte("body"), + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + beholder.AttrKeyDataSchema, "test-schema", + ) + assert.NoError(t, err, "emit must not fail when service is not yet started") + + require.NoError(t, client.Service.Start(t.Context())) + assert.NoError(t, client.Service.Ready()) + _ = client.Close() + }) + + // closeWithoutStart: strict service semantics require Start before Close. + t.Run("close without start", func(t *testing.T) { + client := newBatchClient(t) + err := client.Close() + assert.Error(t, err) + assert.ErrorIs(t, err, services.ErrCannotStopUnstarted) + }) + + // requiresLogger: constructing with batch emitter enabled but no logger returns an error. + t.Run("requires logger", func(t *testing.T) { + _, err := beholder.NewClient(beholder.Config{ + OtelExporterGRPCEndpoint: "localhost:4317", + ChipIngressEmitterEnabled: true, + ChipIngressEmitterGRPCEndpoint: "localhost:9090", + ChipIngressInsecureConnection: true, + ChipIngressBatchEmitterEnabled: true, + ChipIngressLogger: nil, + }) + require.Error(t, err) + assert.Contains(t, err.Error(), "ChipIngressLogger") + }) +} diff --git a/pkg/beholder/config.go b/pkg/beholder/config.go index f3e4561b68..5e5167f0f1 100644 --- a/pkg/beholder/config.go +++ b/pkg/beholder/config.go @@ -7,6 +7,8 @@ import ( "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/trace" "go.uber.org/zap/zapcore" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" ) type Config struct { @@ -22,7 +24,9 @@ type Config struct { EmitterExportInterval time.Duration EmitterExportMaxBatchSize int EmitterMaxQueueSize int - EmitterBatchProcessor bool // Enabled by default. Disable only for testing. + // EmitterBatchProcessor controls custom-message export mode: + // true = batched async export; false = immediate per-record export. + EmitterBatchProcessor bool // OTel Trace TraceSampleRatio float64 @@ -44,6 +48,16 @@ type Config struct { ChipIngressEmitterGRPCEndpoint string ChipIngressInsecureConnection bool // Disables TLS for Chip Ingress Emitter + // Chip Ingress Batch Emitter + ChipIngressBatchEmitterEnabled bool // When true, use batch emitter; when false (default), use legacy per-event emitter + ChipIngressBufferSize uint // Message buffer size (default 1000) + ChipIngressMaxBatchSize uint // Max events per PublishBatch call (default 500) + ChipIngressSendInterval time.Duration // Flush interval (default 100ms) + ChipIngressSendTimeout time.Duration // Timeout per PublishBatch call (default 3s) + ChipIngressDrainTimeout time.Duration // Max time to flush remaining events on shutdown (default 10s) + ChipIngressMaxConcurrentSends int // Max concurrent PublishBatch calls (default 10) + ChipIngressLogger logger.Logger // Required when ChipIngressBatchEmitterEnabled is true + // OTel Log LogExportTimeout time.Duration LogExportInterval time.Duration @@ -91,7 +105,8 @@ var defaultRetryConfig = RetryConfig{ } const ( - defaultPackageName = "beholder" + defaultPackageName = "beholder" + defaultMaxConcurrentSends = 10 ) var defaultOtelAttributes = []attribute.KeyValue{ @@ -110,6 +125,7 @@ func DefaultConfig() Config { EmitterExportMaxBatchSize: 512, EmitterExportInterval: 1 * time.Second, EmitterMaxQueueSize: 2048, + // Keep batched export enabled by default for throughput. EmitterBatchProcessor: true, // OTel message log exporter retry config LogRetryConfig: defaultRetryConfig.Copy(), @@ -131,8 +147,16 @@ func DefaultConfig() Config { LogMaxQueueSize: 2048, LogBatchProcessor: true, LogStreamingEnabled: true, // Enable logs streaming by default - LogLevel: zapcore.InfoLevel, - LogCompressor: "gzip", + LogLevel: zapcore.InfoLevel, + LogCompressor: "gzip", + // Chip Ingress Batch Emitter + ChipIngressBatchEmitterEnabled: false, + ChipIngressBufferSize: 1000, + ChipIngressMaxBatchSize: 500, + ChipIngressSendInterval: 100 * time.Millisecond, + ChipIngressSendTimeout: 3 * time.Second, + ChipIngressDrainTimeout: 10 * time.Second, + ChipIngressMaxConcurrentSends: defaultMaxConcurrentSends, // Auth (defaults to static auth mode with TTL=0) AuthHeadersTTL: 0, } @@ -141,6 +165,7 @@ func DefaultConfig() Config { func TestDefaultConfig() Config { config := DefaultConfig() // Should be only disabled for testing + // Use simple (non-batched) exporter in tests for faster, deterministic teardown. config.EmitterBatchProcessor = false config.LogBatchProcessor = false // Retries are disabled for testing @@ -155,6 +180,7 @@ func TestDefaultConfig() Config { func TestDefaultConfigHTTPClient() Config { config := DefaultConfig() // Should be only disabled for testing + // Use simple (non-batched) exporter in tests for faster, deterministic teardown. config.EmitterBatchProcessor = false config.LogBatchProcessor = false config.OtelExporterGRPCEndpoint = "" diff --git a/pkg/beholder/config_test.go b/pkg/beholder/config_test.go index ae156db9c9..318cc94a91 100644 --- a/pkg/beholder/config_test.go +++ b/pkg/beholder/config_test.go @@ -30,6 +30,7 @@ func ExampleConfig() { EmitterExportMaxBatchSize: 512, EmitterExportInterval: 1 * time.Second, EmitterMaxQueueSize: 2048, + // true uses batched async export for custom messages. EmitterBatchProcessor: true, // OTel message log exporter retry config LogRetryConfig: nil, @@ -67,6 +68,6 @@ func ExampleConfig() { } fmt.Printf("%+v\n", *config.LogRetryConfig) // Output: - // {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 OtelExporterHTTPEndpoint:localhost:4318 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:}}] EmitterExportTimeout:1s EmitterExportInterval:1s EmitterExportMaxBatchSize:512 EmitterMaxQueueSize:2048 EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s TraceSpanExporter: TraceRetryConfig: TraceCompressor:gzip MetricReaderInterval:1s MetricRetryConfig: MetricViews:[] MetricCompressor:gzip ChipIngressEmitterEnabled:false ChipIngressEmitterGRPCEndpoint: ChipIngressInsecureConnection:false LogExportTimeout:1s LogExportInterval:1s LogExportMaxBatchSize:512 LogMaxQueueSize:2048 LogBatchProcessor:true LogRetryConfig: LogStreamingEnabled:false LogLevel:info LogCompressor:gzip AuthHeaders:map[] AuthHeadersTTL:0s AuthKeySigner: AuthPublicKeyHex:} + // {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 OtelExporterHTTPEndpoint:localhost:4318 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:}}] EmitterExportTimeout:1s EmitterExportInterval:1s EmitterExportMaxBatchSize:512 EmitterMaxQueueSize:2048 EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s TraceSpanExporter: TraceRetryConfig: TraceCompressor:gzip MetricReaderInterval:1s MetricRetryConfig: MetricViews:[] MetricCompressor:gzip ChipIngressEmitterEnabled:false ChipIngressEmitterGRPCEndpoint: ChipIngressInsecureConnection:false ChipIngressBatchEmitterEnabled:false ChipIngressBufferSize:0 ChipIngressMaxBatchSize:0 ChipIngressSendInterval:0s ChipIngressSendTimeout:0s ChipIngressDrainTimeout:0s ChipIngressMaxConcurrentSends:0 ChipIngressLogger: LogExportTimeout:1s LogExportInterval:1s LogExportMaxBatchSize:512 LogMaxQueueSize:2048 LogBatchProcessor:true LogRetryConfig: LogStreamingEnabled:false LogLevel:info LogCompressor:gzip AuthHeaders:map[] AuthHeadersTTL:0s AuthKeySigner: AuthPublicKeyHex:} // {InitialInterval:5s MaxInterval:30s MaxElapsedTime:1m0s} } diff --git a/pkg/beholder/dual_source_emitter.go b/pkg/beholder/dual_source_emitter.go index a299999714..4b2edfbc19 100644 --- a/pkg/beholder/dual_source_emitter.go +++ b/pkg/beholder/dual_source_emitter.go @@ -3,11 +3,8 @@ package beholder import ( "context" "errors" - "fmt" - "sync/atomic" "github.com/smartcontractkit/chainlink-common/pkg/logger" - "github.com/smartcontractkit/chainlink-common/pkg/services" ) // DualSourceEmitter emits both to chip ingress and to the otel collector @@ -18,12 +15,9 @@ type DualSourceEmitter struct { chipIngressEmitter Emitter otelCollectorEmitter Emitter log logger.Logger - stopCh services.StopChan - wg services.WaitGroup - closed atomic.Bool } -func NewDualSourceEmitter(chipIngressEmitter Emitter, otelCollectorEmitter Emitter) (Emitter, error) { +func NewDualSourceEmitter(chipIngressEmitter Emitter, otelCollectorEmitter Emitter, lggr logger.Logger) (Emitter, error) { if chipIngressEmitter == nil { return nil, errors.New("chip ingress emitter is nil") } @@ -32,25 +26,14 @@ func NewDualSourceEmitter(chipIngressEmitter Emitter, otelCollectorEmitter Emitt return nil, errors.New("otel collector emitter is nil") } - logger, err := logger.New() - if err != nil { - return nil, fmt.Errorf("failed to create logger: %w", err) - } - return &DualSourceEmitter{ chipIngressEmitter: chipIngressEmitter, otelCollectorEmitter: otelCollectorEmitter, - log: logger, - stopCh: make(services.StopChan), + log: lggr, }, nil } func (d *DualSourceEmitter) Close() error { - if wasClosed := d.closed.Swap(true); wasClosed { - return errors.New("already closed") - } - close(d.stopCh) - d.wg.Wait() return errors.Join(d.chipIngressEmitter.Close(), d.otelCollectorEmitter.Close()) } @@ -60,22 +43,9 @@ func (d *DualSourceEmitter) Emit(ctx context.Context, body []byte, attrKVs ...an return err } - // Emit via chip ingress async - if err := d.wg.TryAdd(1); err != nil { - return err + if err := d.chipIngressEmitter.Emit(ctx, body, attrKVs...); err != nil { + d.log.Infof("failed to emit to chip ingress: %v", err) } - go func(ctx context.Context) { - defer d.wg.Done() - var cancel context.CancelFunc - ctx, cancel = d.stopCh.Ctx(ctx) - defer cancel() - - if err := d.chipIngressEmitter.Emit(ctx, body, attrKVs...); err != nil { - // If the chip ingress emitter fails, we ONLY log the error - // because we still want to send the data to the OTLP collector and not cause disruption - d.log.Infof("failed to emit to chip ingress: %v", err) - } - }(context.WithoutCancel(ctx)) return nil } diff --git a/pkg/beholder/dual_source_emitter_test.go b/pkg/beholder/dual_source_emitter_test.go index 5e99bedae1..a51e621de7 100644 --- a/pkg/beholder/dual_source_emitter_test.go +++ b/pkg/beholder/dual_source_emitter_test.go @@ -3,12 +3,15 @@ package beholder_test import ( "context" "errors" + "sync/atomic" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/smartcontractkit/chainlink-common/pkg/beholder" + "github.com/smartcontractkit/chainlink-common/pkg/logger" ) func TestNewDualSourceEmitter(t *testing.T) { @@ -17,7 +20,7 @@ func TestNewDualSourceEmitter(t *testing.T) { chipEmitter := &mockEmitter{} otelEmitter := &mockEmitter{} - emitter, err := beholder.NewDualSourceEmitter(chipEmitter, otelEmitter) + emitter, err := beholder.NewDualSourceEmitter(chipEmitter, otelEmitter, logger.Test(t)) require.NoError(t, err) assert.NotNil(t, emitter) @@ -27,7 +30,7 @@ func TestNewDualSourceEmitter(t *testing.T) { // Test nil chip ingress emitter t.Run("nil chip ingress emitter", func(t *testing.T) { otelEmitter := &mockEmitter{} - emitter, err := beholder.NewDualSourceEmitter(nil, otelEmitter) + emitter, err := beholder.NewDualSourceEmitter(nil, otelEmitter, logger.Test(t)) assert.Error(t, err) assert.Nil(t, emitter) @@ -36,7 +39,7 @@ func TestNewDualSourceEmitter(t *testing.T) { // Test nil otel collector emitter t.Run("nil otel collector emitter", func(t *testing.T) { chipEmitter := &mockEmitter{} - emitter, err := beholder.NewDualSourceEmitter(chipEmitter, nil) + emitter, err := beholder.NewDualSourceEmitter(chipEmitter, nil, logger.Test(t)) assert.Error(t, err) assert.Nil(t, emitter) @@ -47,7 +50,7 @@ func TestDualSourceEmitterEmit(t *testing.T) { chipEmitter := &mockEmitter{} otelEmitter := &mockEmitter{} - emitter, err := beholder.NewDualSourceEmitter(chipEmitter, otelEmitter) + emitter, err := beholder.NewDualSourceEmitter(chipEmitter, otelEmitter, logger.Test(t)) require.NoError(t, err) err = emitter.Emit(t.Context(), []byte("test message"), "key", "value") @@ -62,7 +65,7 @@ func TestDualSourceEmitterEmit(t *testing.T) { }, } - emitter, err := beholder.NewDualSourceEmitter(chipEmitter, otelEmitter) + emitter, err := beholder.NewDualSourceEmitter(chipEmitter, otelEmitter, logger.Test(t)) require.NoError(t, err) err = emitter.Emit(t.Context(), []byte("test message")) @@ -71,6 +74,51 @@ func TestDualSourceEmitterEmit(t *testing.T) { }) } +func TestDualSourceEmitterBlockingBehavior(t *testing.T) { + t.Run("chip ingress emit does not block caller", func(t *testing.T) { + var chipCalled atomic.Bool + chipEmitter := &mockEmitter{ + emitFunc: func(ctx context.Context, body []byte, attrKVs ...any) error { + // Simulate slow work; the emitter itself is non-blocking + // (fire-and-forget lives inside ChipIngressEmitter or batch service). + time.Sleep(200 * time.Millisecond) + chipCalled.Store(true) + return nil + }, + } + otelEmitter := &mockEmitter{} + + emitter, err := beholder.NewDualSourceEmitter(chipEmitter, otelEmitter, logger.Test(t)) + require.NoError(t, err) + + err = emitter.Emit(t.Context(), []byte("test")) + assert.NoError(t, err) + + require.NoError(t, emitter.Close()) + }) + + t.Run("chip ingress emit completes inline when emitter is synchronous", func(t *testing.T) { + var chipCalled atomic.Bool + chipEmitter := &mockEmitter{ + emitFunc: func(ctx context.Context, body []byte, attrKVs ...any) error { + chipCalled.Store(true) + return nil + }, + } + otelEmitter := &mockEmitter{} + + emitter, err := beholder.NewDualSourceEmitter(chipEmitter, otelEmitter, logger.Test(t)) + require.NoError(t, err) + + err = emitter.Emit(t.Context(), []byte("test")) + assert.NoError(t, err) + assert.True(t, chipCalled.Load(), + "chip ingress emit should complete before Emit returns") + + require.NoError(t, emitter.Close()) + }) +} + // Mock emitter for testing type mockEmitter struct { emitFunc func(ctx context.Context, body []byte, attrKVs ...any) error diff --git a/pkg/beholder/global_test.go b/pkg/beholder/global_test.go index 8de51370ba..69c8c11d15 100644 --- a/pkg/beholder/global_test.go +++ b/pkg/beholder/global_test.go @@ -20,17 +20,18 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/beholder" "github.com/smartcontractkit/chainlink-common/pkg/beholder/internal/mocks" + pkglogger "github.com/smartcontractkit/chainlink-common/pkg/logger" ) func TestGlobal(t *testing.T) { // Get global logger, tracer, meter, messageEmitter // If not initialized with beholder.SetClient will return noop client logger, tracer, meter, messageEmitter := beholder.GetLogger(), beholder.GetTracer(), beholder.GetMeter(), beholder.GetEmitter() - noopClient := beholder.NewNoopClient() + noopClient := beholder.NewNoopClient(pkglogger.Test(t)) assert.IsType(t, otellognoop.Logger{}, logger) assert.IsType(t, oteltracenoop.Tracer{}, tracer) assert.IsType(t, otelmetricnoop.Meter{}, meter) - expectedMessageEmitter := beholder.NewNoopClient().Emitter + expectedMessageEmitter := beholder.NewNoopClient(pkglogger.Test(t)).Emitter assert.IsType(t, expectedMessageEmitter, messageEmitter) assert.IsType(t, noopClient, beholder.GetClient()) @@ -76,6 +77,10 @@ func TestClient_SetGlobalOtelProviders(t *testing.T) { var b strings.Builder client, err := beholder.NewWriterClient(&b) require.NoError(t, err) + require.NoError(t, client.Start(t.Context())) + defer func() { + require.NoError(t, client.Close()) + }() // Set global Otel Client beholder.SetClient(client) diff --git a/pkg/beholder/httpclient.go b/pkg/beholder/httpclient.go index 693f581452..4bc3c78db6 100644 --- a/pkg/beholder/httpclient.go +++ b/pkg/beholder/httpclient.go @@ -15,6 +15,8 @@ import ( sdkmetric "go.opentelemetry.io/otel/sdk/metric" sdkresource "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" + + pkglogger "github.com/smartcontractkit/chainlink-common/pkg/logger" ) // Used for testing to override the default exporter @@ -137,6 +139,8 @@ func NewHTTPClient(cfg Config, otlploghttpNew otlploghttpFactory) (*Client, erro // Message Emitter var messageLogProcessor sdklog.Processor + // EmitterBatchProcessor=true uses async batching for custom-message logs; + // false uses a simple processor that exports each record immediately. if cfg.EmitterBatchProcessor { batchProcessorOpts := []sdklog.BatchProcessorOption{} if cfg.EmitterExportTimeout > 0 { @@ -181,13 +185,32 @@ func NewHTTPClient(cfg Config, otlploghttpNew otlploghttpFactory) (*Client, erro } onClose := func() (err error) { - for _, provider := range []shutdowner{messageLoggerProvider, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider} { + for _, provider := range []shutdowner{messageLoggerProvider, loggerProvider, tracerProvider, meterProvider} { err = errors.Join(err, provider.Shutdown(context.Background())) } return } - // HTTP client doesn't currently support rotating auth, so lazySigner is always nil - return &Client{cfg, logger, tracer, meter, emitter, nil, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider, nil, onClose}, nil + // HTTP client doesn't currently support rotating auth, so lazySigner is always nil. + c := &Client{ + Config: cfg, + Logger: logger, + Tracer: tracer, + Meter: meter, + Emitter: emitter, + Chip: nil, + LoggerProvider: loggerProvider, + TracerProvider: tracerProvider, + MeterProvider: meterProvider, + MessageLoggerProvider: messageLoggerProvider, + lazySigner: nil, + OnClose: onClose, + } + lggr := cfg.ChipIngressLogger + if lggr == nil { + lggr = pkglogger.Nop() + } + c.initService(lggr, nil) + return c, nil } func newHTTPTracerProvider(config Config, resource *sdkresource.Resource, tlsConfig *tls.Config) (*sdktrace.TracerProvider, error) { diff --git a/pkg/beholder/noop.go b/pkg/beholder/noop.go index 3a8378f918..69dcb3b72c 100644 --- a/pkg/beholder/noop.go +++ b/pkg/beholder/noop.go @@ -18,14 +18,17 @@ import ( oteltracenoop "go.opentelemetry.io/otel/trace/noop" "github.com/smartcontractkit/chainlink-common/pkg/chipingress" + pkglogger "github.com/smartcontractkit/chainlink-common/pkg/logger" ) -// Default client to fallback when is is not initialized properly -func NewNoopClient() *Client { +// Default client to fallback when is is not initialized properly. +// An optional logger may be passed to surface service-engine diagnostics; +// when omitted a no-op logger is used. +func NewNoopClient(optLogger ...pkglogger.Logger) *Client { cfg := DefaultConfig() // Logger loggerProvider := otellognoop.NewLoggerProvider() - logger := loggerProvider.Logger(defaultPackageName) + otelLogger := loggerProvider.Logger(defaultPackageName) // Tracer tracerProvider := oteltracenoop.NewTracerProvider() tracer := tracerProvider.Tracer(defaultPackageName) @@ -40,7 +43,25 @@ func NewNoopClient() *Client { // ChipIngress chipClient := &chipingress.NoopClient{} - return &Client{cfg, logger, tracer, meter, messageEmitter, chipClient, loggerProvider, tracerProvider, meterProvider, loggerProvider, nil, noopOnClose} + c := &Client{ + Config: cfg, + Logger: otelLogger, + Tracer: tracer, + Meter: meter, + Emitter: messageEmitter, + Chip: chipClient, + LoggerProvider: loggerProvider, + TracerProvider: tracerProvider, + MeterProvider: meterProvider, + MessageLoggerProvider: loggerProvider, + OnClose: noopOnClose, + } + lggr := pkglogger.Logger(pkglogger.Nop()) + if len(optLogger) > 0 && optLogger[0] != nil { + lggr = optLogger[0] + } + c.initService(lggr, nil) + return c } // NewStdoutClient creates a new Client with exporters which send telemetry data to standard output @@ -62,7 +83,7 @@ func NewWriterClient(w io.Writer) (*Client, error) { return NewNoopClient(), err } loggerProvider := sdklog.NewLoggerProvider(sdklog.WithProcessor(sdklog.NewSimpleProcessor(loggerExporter))) - logger := loggerProvider.Logger(defaultPackageName) + otelLogger := loggerProvider.Logger(defaultPackageName) // Tracer traceExporter, err := stdouttrace.New(cfg.TraceOptions...) @@ -90,7 +111,7 @@ func NewWriterClient(w io.Writer) (*Client, error) { meter := meterProvider.Meter(defaultPackageName) // MessageEmitter - emitter := messageEmitter{messageLogger: logger} + emitter := messageEmitter{messageLogger: otelLogger} onClose := func() (err error) { for _, provider := range []shutdowner{loggerProvider, tracerProvider, meterProvider} { @@ -99,7 +120,22 @@ func NewWriterClient(w io.Writer) (*Client, error) { return } - return &Client{Config: cfg.Config, Logger: logger, Tracer: tracer, Meter: meter, Emitter: emitter, Chip: &chipingress.NoopClient{}, LoggerProvider: loggerProvider, TracerProvider: tracerProvider, MeterProvider: meterProvider, MessageLoggerProvider: loggerProvider, lazySigner: nil, OnClose: onClose}, nil + c := &Client{ + Config: cfg.Config, + Logger: otelLogger, + Tracer: tracer, + Meter: meter, + Emitter: emitter, + Chip: &chipingress.NoopClient{}, + LoggerProvider: loggerProvider, + TracerProvider: tracerProvider, + MeterProvider: meterProvider, + MessageLoggerProvider: loggerProvider, + lazySigner: nil, + OnClose: onClose, + } + c.initService(pkglogger.Nop(), nil) + return c, nil } type noopMessageEmitter struct{} diff --git a/pkg/beholder/noop_test.go b/pkg/beholder/noop_test.go index 332c677f1d..5060b54484 100644 --- a/pkg/beholder/noop_test.go +++ b/pkg/beholder/noop_test.go @@ -15,10 +15,11 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/beholder" "github.com/smartcontractkit/chainlink-common/pkg/chipingress" "github.com/smartcontractkit/chainlink-common/pkg/chipingress/pb" + "github.com/smartcontractkit/chainlink-common/pkg/logger" ) func TestNoopClient(t *testing.T) { - noopClient := beholder.NewNoopClient() + noopClient := beholder.NewNoopClient(logger.Test(t)) assert.NotNil(t, noopClient) // Message Emitter @@ -85,6 +86,7 @@ func TestNoopClient(t *testing.T) { err = noopClient.Chip.Close() assert.NoError(t, err) + require.NoError(t, noopClient.Start(t.Context())) err = noopClient.Close() assert.NoError(t, err) } diff --git a/pkg/loop/config.go b/pkg/loop/config.go index 0335b8ce7b..966be68e66 100644 --- a/pkg/loop/config.go +++ b/pkg/loop/config.go @@ -84,8 +84,9 @@ const ( envTelemetryMetricCompressor = "CL_TELEMETRY_METRIC_COMPRESSOR" envTelemetryLogCompressor = "CL_TELEMETRY_LOG_COMPRESSOR" - envChipIngressEndpoint = "CL_CHIP_INGRESS_ENDPOINT" - envChipIngressInsecureConnection = "CL_CHIP_INGRESS_INSECURE_CONNECTION" + envChipIngressEndpoint = "CL_CHIP_INGRESS_ENDPOINT" + envChipIngressInsecureConnection = "CL_CHIP_INGRESS_INSECURE_CONNECTION" + envChipIngressBatchEmitterEnabled = "CL_CHIP_INGRESS_BATCH_EMITTER_ENABLED" envCRESettings = cresettings.EnvNameSettings envCRESettingsDefault = cresettings.EnvNameSettingsDefault @@ -98,6 +99,7 @@ type EnvConfig struct { ChipIngressEndpoint string ChipIngressInsecureConnection bool + ChipIngressBatchEmitterEnabled bool CRESettings string CRESettingsDefault string @@ -146,6 +148,8 @@ type EnvConfig struct { TelemetryAuthHeaders map[string]string TelemetryAuthPubKeyHex string TelemetryAuthHeadersTTL time.Duration + // TelemetryEmitterBatchProcessor maps to beholder Config.EmitterBatchProcessor + // (batched async custom-message export vs immediate per-record export). TelemetryEmitterBatchProcessor bool TelemetryEmitterExportTimeout time.Duration TelemetryEmitterExportInterval time.Duration @@ -255,6 +259,7 @@ func (e *EnvConfig) AsCmdEnv() (env []string) { add(envChipIngressEndpoint, e.ChipIngressEndpoint) add(envChipIngressInsecureConnection, strconv.FormatBool(e.ChipIngressInsecureConnection)) + add(envChipIngressBatchEmitterEnabled, strconv.FormatBool(e.ChipIngressBatchEmitterEnabled)) if e.CRESettings != "" { add(envCRESettings, e.CRESettings) @@ -486,6 +491,10 @@ func (e *EnvConfig) parse() error { if err != nil { return fmt.Errorf("failed to parse %s: %w", envChipIngressInsecureConnection, err) } + e.ChipIngressBatchEmitterEnabled, err = getBool(envChipIngressBatchEmitterEnabled) + if err != nil { + return fmt.Errorf("failed to parse %s: %w", envChipIngressBatchEmitterEnabled, err) + } } e.CRESettings = os.Getenv(envCRESettings) diff --git a/pkg/loop/config_test.go b/pkg/loop/config_test.go index 60547f4803..024ff85622 100644 --- a/pkg/loop/config_test.go +++ b/pkg/loop/config_test.go @@ -84,8 +84,9 @@ func TestEnvConfig_parse(t *testing.T) { envTelemetryEmitterMaxQueueSize: "1000", envTelemetryLogStreamingEnabled: "false", - envChipIngressEndpoint: "chip-ingress.example.com:50051", - envChipIngressInsecureConnection: "true", + envChipIngressEndpoint: "chip-ingress.example.com:50051", + envChipIngressInsecureConnection: "true", + envChipIngressBatchEmitterEnabled: "false", envCRESettings: `{"global":{}}`, envCRESettingsDefault: `{"foo":"bar"}`, @@ -195,8 +196,9 @@ var envCfgFull = EnvConfig{ TelemetryEmitterMaxQueueSize: 1000, TelemetryLogStreamingEnabled: false, - ChipIngressEndpoint: "chip-ingress.example.com:50051", - ChipIngressInsecureConnection: true, + ChipIngressEndpoint: "chip-ingress.example.com:50051", + ChipIngressInsecureConnection: true, + ChipIngressBatchEmitterEnabled: false, CRESettings: `{"global":{}}`, CRESettingsDefault: `{"foo":"bar"}`, @@ -259,6 +261,7 @@ func TestEnvConfig_AsCmdEnv(t *testing.T) { // Assert ChipIngress environment variables assert.Equal(t, "chip-ingress.example.com:50051", got[envChipIngressEndpoint]) assert.Equal(t, "true", got[envChipIngressInsecureConnection]) + assert.Equal(t, "false", got[envChipIngressBatchEmitterEnabled]) assert.JSONEq(t, `{"global":{}}`, got[envCRESettings]) assert.JSONEq(t, `{"foo":"bar"}`, got[envCRESettingsDefault]) diff --git a/pkg/loop/plugin_relayer_emitter_test.go b/pkg/loop/plugin_relayer_emitter_test.go index 71219e37df..f08c971565 100644 --- a/pkg/loop/plugin_relayer_emitter_test.go +++ b/pkg/loop/plugin_relayer_emitter_test.go @@ -122,7 +122,7 @@ func TestParseOriginURL(t *testing.T) { func TestNewPluginRelayerConfigEmitterDefaults(t *testing.T) { prev := beholder.GetClient() - client := beholder.NewNoopClient() + client := beholder.NewNoopClient(logger.Test(t)) client.Config.AuthPublicKeyHex = "from-beholder" beholder.SetClient(client) t.Cleanup(func() { beholder.SetClient(prev) }) diff --git a/pkg/loop/server.go b/pkg/loop/server.go index a7d92ee705..307603d502 100644 --- a/pkg/loop/server.go +++ b/pkg/loop/server.go @@ -101,6 +101,7 @@ type Server struct { checker *services.HealthChecker LimitsFactory limits.Factory profiler *pyroscope.Profiler + beholderClient *beholder.Client } func newServer(loggerName string) (*Server, error) { @@ -180,6 +181,8 @@ func (s *Server) start(opts ...ServerOpt) error { ChipIngressEmitterEnabled: s.EnvConfig.ChipIngressEndpoint != "", ChipIngressEmitterGRPCEndpoint: s.EnvConfig.ChipIngressEndpoint, ChipIngressInsecureConnection: s.EnvConfig.ChipIngressInsecureConnection, + ChipIngressBatchEmitterEnabled: s.EnvConfig.ChipIngressBatchEmitterEnabled, + ChipIngressLogger: s.Logger, MetricCompressor: s.EnvConfig.TelemetryMetricCompressor, } @@ -212,19 +215,8 @@ func (s *Server) start(opts ...ServerOpt) error { beholderCfg.TraceSpanExporter = exporter } - beholderClient, err := beholder.NewClient(beholderCfg) - if err != nil { - return fmt.Errorf("failed to create beholder client: %w", err) - } - beholder.SetClient(beholderClient) - beholder.SetGlobalOtelProviders() - - if beholderCfg.LogStreamingEnabled { - otelLogger, err := NewOtelLogger(beholderClient.Logger, beholderCfg.LogLevel) - if err != nil { - return fmt.Errorf("failed to enable log streaming: %w", err) - } - s.Logger = logger.Sugared(logger.Named(otelLogger, s.Logger.Name())) + if err := s.startBeholderClient(ctx, beholderCfg); err != nil { + return err } } @@ -349,8 +341,34 @@ func (s *Server) MustRegister(c services.HealthReporter) { func (s *Server) Register(c services.HealthReporter) error { return s.checker.Register(c) } +func (s *Server) startBeholderClient(ctx context.Context, beholderCfg beholder.Config) error { + beholderClient, err := beholder.NewClient(beholderCfg) + if err != nil { + return fmt.Errorf("failed to create beholder client: %w", err) + } + if err := beholderClient.Start(ctx); err != nil { + return fmt.Errorf("failed to start beholder client: %w", err) + } + s.beholderClient = beholderClient + beholder.SetClient(beholderClient) + beholder.SetGlobalOtelProviders() + + if beholderCfg.LogStreamingEnabled { + otelLogger, err := NewOtelLogger(beholderClient.Logger, beholderCfg.LogLevel) + if err != nil { + return fmt.Errorf("failed to enable log streaming: %w", err) + } + s.Logger = logger.Sugared(logger.Named(otelLogger, s.Logger.Name())) + } + + return nil +} + // Stop closes resources and flushes logs. func (s *Server) Stop() { + if s.beholderClient != nil { + s.Logger.ErrorIfFn(s.beholderClient.Close, "Failed to close beholder client") + } if s.dbStatsReporter != nil { s.dbStatsReporter.Stop() } diff --git a/pkg/services/service.go b/pkg/services/service.go index bf19f8d918..6257ff6b51 100644 --- a/pkg/services/service.go +++ b/pkg/services/service.go @@ -254,7 +254,7 @@ func (s *service) Name() string { return s.eng.Name() } func (s *service) Start(ctx context.Context) error { return s.StartOnce(s.cfg.Name, func() error { var span trace.Span - ctx, span = s.eng.tracer.Start(ctx, "Start") //nolint + ctx, span = s.eng.tracer.Start(ctx, "Start") defer span.End() s.eng.Info("Starting")