diff --git a/pkg/beholder/durable_emitter.go b/pkg/beholder/durable_emitter.go index 7b6b97e9e1..cd28bca41e 100644 --- a/pkg/beholder/durable_emitter.go +++ b/pkg/beholder/durable_emitter.go @@ -135,8 +135,11 @@ type insertResult struct { type DurableEmitter struct { store DurableEventStore client chipingress.Client - cfg DurableEmitterConfig - log logger.Logger + // isHostProcess determines if the emitter runs retransmit and cleanup loops. + // Should be set to false when initialized inside LOOP plugins. + isHostProcess bool + cfg DurableEmitterConfig + log logger.Logger metrics *durableEmitterMetrics @@ -222,6 +225,7 @@ var _ Emitter = (*DurableEmitter)(nil) func NewDurableEmitter( store DurableEventStore, client chipingress.Client, + isHostProcess bool, cfg DurableEmitterConfig, log logger.Logger, ) (*DurableEmitter, error) { @@ -247,12 +251,13 @@ func NewDurableEmitter( store = newMetricsInstrumentedStore(store, m) } d := &DurableEmitter{ - store: store, - client: client, - cfg: cfg, - log: log, - metrics: m, - stopCh: make(chan struct{}), + store: store, + client: client, + isHostProcess: isHostProcess, + cfg: cfg, + log: log, + metrics: m, + stopCh: make(chan struct{}), } if cp, ok := client.(grpcConnProvider); ok { d.rawConn = cp.Conn() @@ -285,7 +290,7 @@ func NewDurableEmitter( // Start launches the retransmit, expiry, purge, and (optionally) batch publish // background loops. Cancel the supplied context or call Close to stop them. -func (d *DurableEmitter) Start(ctx context.Context) { +func (d *DurableEmitter) Start(_ context.Context) { batchWorkers := d.cfg.PublishBatchWorkers if batchWorkers <= 0 { d.log.Warnw("configured batchWorkers <=0; defaulting to 1") @@ -297,10 +302,12 @@ func (d *DurableEmitter) Start(ctx context.Context) { insertWorkers = 4 } - d.wg.Go(d.retransmitLoop) - if !d.cfg.DisablePruning { - d.wg.Go(func() { d.expiryLoop(ctx) }) - d.wg.Go(func() { d.purgeLoop(ctx) }) + if d.isHostProcess { + d.wg.Go(d.retransmitLoop) + if !d.cfg.DisablePruning { + d.wg.Go(d.expiryLoop) + d.wg.Go(d.purgeLoop) + } } if d.insertCh != nil { for i := 0; i < insertWorkers; i++ { @@ -845,7 +852,7 @@ func (d *DurableEmitter) retransmit(pending []DurableEvent) { ) } -func (d *DurableEmitter) purgeLoop(ctx context.Context) { +func (d *DurableEmitter) purgeLoop() { interval := d.cfg.PurgeInterval if interval <= 0 { interval = 250 * time.Millisecond @@ -854,12 +861,13 @@ func (d *DurableEmitter) purgeLoop(ctx context.Context) { if batch <= 0 { batch = 500 } + + ctx, cancel := d.stopCh.NewCtx() + defer cancel() ticker := time.NewTicker(interval) defer ticker.Stop() for { select { - case <-ctx.Done(): - return case <-d.stopCh: return case <-ticker.C: @@ -877,7 +885,7 @@ func (d *DurableEmitter) purgeLoop(ctx context.Context) { } } -func (d *DurableEmitter) expiryLoop(ctx context.Context) { +func (d *DurableEmitter) expiryLoop() { ticker := time.NewTicker(d.cfg.ExpiryInterval) defer ticker.Stop() @@ -885,8 +893,6 @@ func (d *DurableEmitter) expiryLoop(ctx context.Context) { defer cancel() for { select { - case <-ctx.Done(): - return case <-d.stopCh: return case <-ticker.C: diff --git a/pkg/beholder/durable_emitter_test.go b/pkg/beholder/durable_emitter_test.go index 0ab1e8841f..ad96b53071 100644 --- a/pkg/beholder/durable_emitter_test.go +++ b/pkg/beholder/durable_emitter_test.go @@ -111,7 +111,7 @@ func newTestDurableEmitter(t *testing.T, store DurableEventStore, client chiping if cfgOverride != nil { cfg = *cfgOverride } - em, err := NewDurableEmitter(store, client, cfg, logger.Test(t)) + em, err := NewDurableEmitter(store, client, true, cfg, logger.Test(t)) require.NoError(t, err) return em } @@ -187,7 +187,7 @@ func TestDurableEmitter_HooksBatchPublishPath(t *testing.T) { OnBatchPublish: func(time.Duration, int, error) { pubCalls.Add(1) }, OnBatchMarkDelivered: func(time.Duration, int) { markCalls.Add(1) }, } - em, err := NewDurableEmitter(store, client, cfg, logger.Test(t)) + em, err := NewDurableEmitter(store, client, true, cfg, logger.Test(t)) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -211,7 +211,7 @@ func TestDurableEmitter_HooksPublishFailureSkipsMarkHook(t *testing.T) { OnBatchPublish: func(time.Duration, int, error) { pubCalls.Add(1) }, OnBatchMarkDelivered: func(time.Duration, int) { markCalls.Add(1) }, } - em, err := NewDurableEmitter(store, client, cfg, logger.Test(t)) + em, err := NewDurableEmitter(store, client, true, cfg, logger.Test(t)) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -224,6 +224,57 @@ func TestDurableEmitter_HooksPublishFailureSkipsMarkHook(t *testing.T) { assert.Equal(t, int32(0), markCalls.Load()) } +func TestDurableEmitter_NonHostProcessSkipsRetransmitAndExpiry(t *testing.T) { + store := NewMemDurableEventStore() + client := &testChipClient{} + client.setPublishErr(errors.New("chip unavailable")) + + cfg := DefaultDurableEmitterConfig() + cfg.RetransmitInterval = 40 * time.Millisecond + cfg.RetransmitAfter = 15 * time.Millisecond + cfg.ExpiryInterval = 40 * time.Millisecond + cfg.EventTTL = 25 * time.Millisecond + + em, err := NewDurableEmitter(store, client, false, cfg, logger.Test(t)) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + em.Start(ctx) + defer func() { require.NoError(t, em.Close()) }() + + require.NoError(t, em.Emit(ctx, []byte("plugin-row"), testEmitAttrs()...)) + + require.Eventually(t, func() bool { + return client.batchCount.Load() >= 1 && store.Len() == 1 + }, 2*time.Second, 5*time.Millisecond, "initial PublishBatch should fail and leave the row") + + // Several host-only ticks would have cleared or retried by now. + time.Sleep(250 * time.Millisecond) + + assert.Equal(t, 1, store.Len(), "non-host must not run retransmit or expiry loops") + assert.Equal(t, int64(1), client.batchCount.Load(), "non-host must not schedule extra PublishBatch via retransmit") +} + +func TestDurableEmitter_NonHostProcessStillDeliversViaBatchWorkers(t *testing.T) { + store := NewMemDurableEventStore() + client := &testChipClient{} + + em, err := NewDurableEmitter(store, client, false, DefaultDurableEmitterConfig(), logger.Test(t)) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + em.Start(ctx) + defer func() { require.NoError(t, em.Close()) }() + + require.NoError(t, em.Emit(ctx, []byte("loop-plugin"), testEmitAttrs()...)) + + require.Eventually(t, func() bool { + return store.Len() == 0 && client.batchCount.Load() >= 1 + }, 2*time.Second, 10*time.Millisecond, "batch publish workers must still run when isHostProcess is false") +} + func TestDurableEmitter_EmitPersistsAndPublishes(t *testing.T) { store := NewMemDurableEventStore() client := &testChipClient{} @@ -437,13 +488,13 @@ func TestNewDurableEmitter_ValidationErrors(t *testing.T) { log := logger.Test(t) cfg := DefaultDurableEmitterConfig() - _, err := NewDurableEmitter(nil, &testChipClient{}, cfg, log) + _, err := NewDurableEmitter(nil, &testChipClient{}, true, cfg, log) assert.ErrorContains(t, err, "store") - _, err = NewDurableEmitter(NewMemDurableEventStore(), nil, cfg, log) + _, err = NewDurableEmitter(NewMemDurableEventStore(), nil, true, cfg, log) assert.ErrorContains(t, err, "client") - _, err = NewDurableEmitter(NewMemDurableEventStore(), &testChipClient{}, cfg, nil) + _, err = NewDurableEmitter(NewMemDurableEventStore(), &testChipClient{}, true, cfg, nil) assert.ErrorContains(t, err, "logger") } @@ -456,7 +507,7 @@ func TestDurableEmitter_MetricsRegistersEmitSuccess(t *testing.T) { cfg.RetransmitInterval = time.Hour cfg.Metrics = &DurableEmitterMetricsConfig{PollInterval: 25 * time.Millisecond} - em, err := NewDurableEmitter(store, client, cfg, logger.Test(t)) + em, err := NewDurableEmitter(store, client, true, cfg, logger.Test(t)) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -603,7 +654,7 @@ func TestIntegration_HappyPath(t *testing.T) { client := newChipClient(t, addr) store := NewMemDurableEventStore() - em, err := NewDurableEmitter(store, client, fastCfg(), logger.Test(t)) + em, err := NewDurableEmitter(store, client, true, fastCfg(), logger.Test(t)) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -631,7 +682,7 @@ func TestIntegration_ServerUnavailable_RetransmitRecovers(t *testing.T) { client := newChipClient(t, addr) store := NewMemDurableEventStore() - em, err := NewDurableEmitter(store, client, fastCfg(), logger.Test(t)) + em, err := NewDurableEmitter(store, client, true, fastCfg(), logger.Test(t)) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -666,7 +717,7 @@ func TestIntegration_ServerDown_EventsSurvive(t *testing.T) { cfg := fastCfg() cfg.PublishTimeout = 500 * time.Millisecond - em, err := NewDurableEmitter(store, client, cfg, logger.Test(t)) + em, err := NewDurableEmitter(store, client, true, cfg, logger.Test(t)) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -699,7 +750,7 @@ func TestIntegration_ServerDown_EventsSurvive(t *testing.T) { require.NoError(t, err) t.Cleanup(func() { _ = client2.Close() }) - em2, err := NewDurableEmitter(store, client2, cfg, logger.Test(t)) + em2, err := NewDurableEmitter(store, client2, true, cfg, logger.Test(t)) require.NoError(t, err) em2.Start(ctx) defer em2.Close() @@ -721,7 +772,7 @@ func TestIntegration_HighThroughput(t *testing.T) { cfg := fastCfg() cfg.RetransmitBatchSize = 200 - em, err := NewDurableEmitter(store, client, cfg, logger.Test(t)) + em, err := NewDurableEmitter(store, client, true, cfg, logger.Test(t)) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -754,7 +805,7 @@ func TestIntegration_EventExpiry(t *testing.T) { cfg := fastCfg() cfg.EventTTL = 100 * time.Millisecond cfg.ExpiryInterval = 100 * time.Millisecond - em, err := NewDurableEmitter(store, client, cfg, logger.Test(t)) + em, err := NewDurableEmitter(store, client, true, cfg, logger.Test(t)) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -779,7 +830,7 @@ func TestIntegration_RetransmitEnqueuesBatchWorkers(t *testing.T) { client := newChipClient(t, addr) store := NewMemDurableEventStore() - em, err := NewDurableEmitter(store, client, fastCfg(), logger.Test(t)) + em, err := NewDurableEmitter(store, client, true, fastCfg(), logger.Test(t)) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -830,7 +881,7 @@ func TestIntegration_GRPCConnection(t *testing.T) { client := newChipClient(t, addr) store := NewMemDurableEventStore() - em, err := NewDurableEmitter(store, client, fastCfg(), logger.Test(t)) + em, err := NewDurableEmitter(store, client, true, fastCfg(), logger.Test(t)) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background())