Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
89a39ed
Initial PoC
DylanTinianov Mar 19, 2026
cc10b6d
Create durable_emitter_integration_test.go
DylanTinianov Mar 23, 2026
6da9ad1
Update DurableEmitterDesign.md
DylanTinianov Mar 23, 2026
a268cb0
Update doc with tests
DylanTinianov Mar 23, 2026
ae0b37b
Add hooks
DylanTinianov Mar 25, 2026
f588de9
Update durable_emitter.go
DylanTinianov Mar 25, 2026
d922048
Add metrics
DylanTinianov Mar 25, 2026
6f9c5dd
Single publish
DylanTinianov Mar 25, 2026
1fd13ea
Add beholder schema?
DylanTinianov Mar 25, 2026
48810bc
Log publish details
DylanTinianov Mar 25, 2026
b787de2
Add logs
DylanTinianov Mar 25, 2026
df410b6
Add Persist Sources
DylanTinianov Mar 25, 2026
ac29275
Update durable_emitter.go
DylanTinianov Mar 25, 2026
61a31c1
mute logging
DylanTinianov Mar 31, 2026
657342c
Update durable_emitter.go
DylanTinianov Mar 31, 2026
d18962e
Update durable_emitter.go
DylanTinianov Mar 31, 2026
7678416
Background delete from event store
DylanTinianov Apr 1, 2026
734fe94
Add metrics
DylanTinianov Apr 2, 2026
5b9f938
Add publish workers
DylanTinianov Apr 9, 2026
9d87637
Revert
DylanTinianov Apr 9, 2026
e5477bc
Add counter
DylanTinianov Apr 9, 2026
ed355a2
Match Metrics + Production Rate
DylanTinianov Apr 10, 2026
f569c89
Async mark delivered
DylanTinianov Apr 10, 2026
27cac71
Update
DylanTinianov Apr 13, 2026
983fc87
Batching
DylanTinianov Apr 13, 2026
1be34c0
Use publish workers for re-transmit
DylanTinianov Apr 15, 2026
c3a4edc
Payload optimization
DylanTinianov Apr 15, 2026
e27e4b6
Batch insert
DylanTinianov Apr 20, 2026
ae84c71
Clean up files
DylanTinianov Apr 30, 2026
351a495
Clean up
DylanTinianov Apr 30, 2026
81c8d1e
Clean up
DylanTinianov Apr 30, 2026
f501ebc
Remove process stats (only used in local testing)
DylanTinianov Apr 30, 2026
332a4bb
Merge branch 'main' into durable-emitter-poc
DylanTinianov Apr 30, 2026
f175829
Update durable_emitter.go
DylanTinianov Apr 30, 2026
aa11bcd
Merge branch 'durable-emitter-poc' of github.com:smartcontractkit/cha…
DylanTinianov Apr 30, 2026
3056d79
tidy
DylanTinianov Apr 30, 2026
7f08d3d
Fix race in tests
DylanTinianov Apr 30, 2026
5d4bf35
Trigger CI
DylanTinianov Apr 30, 2026
9e61f29
Fix context
DylanTinianov May 5, 2026
2288940
Drain insert on shutdown
DylanTinianov May 11, 2026
741c217
Merge branch 'main' into durable-emitter-poc
DylanTinianov May 11, 2026
e357765
Trigger CI
DylanTinianov May 11, 2026
141e128
Merge branch 'durable-emitter-poc' of github.com:smartcontractkit/cha…
DylanTinianov May 11, 2026
6041cf0
Fix ctx handling
DylanTinianov May 11, 2026
70a63f8
Add isHostProcess flag
DylanTinianov May 11, 2026
750c475
Fix wg handling
DylanTinianov May 11, 2026
22e2847
Merge branch 'durable-emitter-poc' of github.com:smartcontractkit/cha…
DylanTinianov May 11, 2026
79640c4
Update durable_emitter.go
DylanTinianov May 11, 2026
99c377e
Merge branch 'main' of github.com:smartcontractkit/chainlink-common i…
DylanTinianov May 11, 2026
82ac630
Fix host process flag + tests
DylanTinianov May 11, 2026
8a894b3
Remove client changes
DylanTinianov May 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 25 additions & 19 deletions pkg/beholder/durable_emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,11 @@ type insertResult struct {
type DurableEmitter struct {
store DurableEventStore
client chipingress.Client
cfg DurableEmitterConfig
log logger.Logger
// isHostProcess determines if the emitter runs retransmit and cleanup loops.
// Should be set to false when initialized inside LOOP plugins.
isHostProcess bool
cfg DurableEmitterConfig
log logger.Logger

metrics *durableEmitterMetrics

Expand Down Expand Up @@ -222,6 +225,7 @@ var _ Emitter = (*DurableEmitter)(nil)
func NewDurableEmitter(
store DurableEventStore,
client chipingress.Client,
isHostProcess bool,
cfg DurableEmitterConfig,
log logger.Logger,
) (*DurableEmitter, error) {
Expand All @@ -247,12 +251,13 @@ func NewDurableEmitter(
store = newMetricsInstrumentedStore(store, m)
}
d := &DurableEmitter{
store: store,
client: client,
cfg: cfg,
log: log,
metrics: m,
stopCh: make(chan struct{}),
store: store,
client: client,
isHostProcess: isHostProcess,
cfg: cfg,
log: log,
metrics: m,
stopCh: make(chan struct{}),
}
if cp, ok := client.(grpcConnProvider); ok {
d.rawConn = cp.Conn()
Expand Down Expand Up @@ -285,7 +290,7 @@ func NewDurableEmitter(

// Start launches the retransmit, expiry, purge, and (optionally) batch publish
// background loops. Cancel the supplied context or call Close to stop them.
func (d *DurableEmitter) Start(ctx context.Context) {
func (d *DurableEmitter) Start(_ context.Context) {
batchWorkers := d.cfg.PublishBatchWorkers
if batchWorkers <= 0 {
d.log.Warnw("configured batchWorkers <=0; defaulting to 1")
Expand All @@ -297,10 +302,12 @@ func (d *DurableEmitter) Start(ctx context.Context) {
insertWorkers = 4
}

d.wg.Go(d.retransmitLoop)
if !d.cfg.DisablePruning {
d.wg.Go(func() { d.expiryLoop(ctx) })
d.wg.Go(func() { d.purgeLoop(ctx) })
if d.isHostProcess {
d.wg.Go(d.retransmitLoop)
if !d.cfg.DisablePruning {
d.wg.Go(d.expiryLoop)
d.wg.Go(d.purgeLoop)
}
}
if d.insertCh != nil {
for i := 0; i < insertWorkers; i++ {
Expand Down Expand Up @@ -845,7 +852,7 @@ func (d *DurableEmitter) retransmit(pending []DurableEvent) {
)
}

func (d *DurableEmitter) purgeLoop(ctx context.Context) {
func (d *DurableEmitter) purgeLoop() {
interval := d.cfg.PurgeInterval
if interval <= 0 {
interval = 250 * time.Millisecond
Expand All @@ -854,12 +861,13 @@ func (d *DurableEmitter) purgeLoop(ctx context.Context) {
if batch <= 0 {
batch = 500
}

ctx, cancel := d.stopCh.NewCtx()
defer cancel()
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-d.stopCh:
return
case <-ticker.C:
Expand All @@ -877,16 +885,14 @@ func (d *DurableEmitter) purgeLoop(ctx context.Context) {
}
}

func (d *DurableEmitter) expiryLoop(ctx context.Context) {
func (d *DurableEmitter) expiryLoop() {
ticker := time.NewTicker(d.cfg.ExpiryInterval)
defer ticker.Stop()

ctx, cancel := d.stopCh.NewCtx()
defer cancel()
for {
select {
case <-ctx.Done():
return
case <-d.stopCh:
return
case <-ticker.C:
Expand Down
81 changes: 66 additions & 15 deletions pkg/beholder/durable_emitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func newTestDurableEmitter(t *testing.T, store DurableEventStore, client chiping
if cfgOverride != nil {
cfg = *cfgOverride
}
em, err := NewDurableEmitter(store, client, cfg, logger.Test(t))
em, err := NewDurableEmitter(store, client, true, cfg, logger.Test(t))
require.NoError(t, err)
return em
}
Expand Down Expand Up @@ -187,7 +187,7 @@ func TestDurableEmitter_HooksBatchPublishPath(t *testing.T) {
OnBatchPublish: func(time.Duration, int, error) { pubCalls.Add(1) },
OnBatchMarkDelivered: func(time.Duration, int) { markCalls.Add(1) },
}
em, err := NewDurableEmitter(store, client, cfg, logger.Test(t))
em, err := NewDurableEmitter(store, client, true, cfg, logger.Test(t))
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -211,7 +211,7 @@ func TestDurableEmitter_HooksPublishFailureSkipsMarkHook(t *testing.T) {
OnBatchPublish: func(time.Duration, int, error) { pubCalls.Add(1) },
OnBatchMarkDelivered: func(time.Duration, int) { markCalls.Add(1) },
}
em, err := NewDurableEmitter(store, client, cfg, logger.Test(t))
em, err := NewDurableEmitter(store, client, true, cfg, logger.Test(t))
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -224,6 +224,57 @@ func TestDurableEmitter_HooksPublishFailureSkipsMarkHook(t *testing.T) {
assert.Equal(t, int32(0), markCalls.Load())
}

func TestDurableEmitter_NonHostProcessSkipsRetransmitAndExpiry(t *testing.T) {
store := NewMemDurableEventStore()
client := &testChipClient{}
client.setPublishErr(errors.New("chip unavailable"))

cfg := DefaultDurableEmitterConfig()
cfg.RetransmitInterval = 40 * time.Millisecond
cfg.RetransmitAfter = 15 * time.Millisecond
cfg.ExpiryInterval = 40 * time.Millisecond
cfg.EventTTL = 25 * time.Millisecond

em, err := NewDurableEmitter(store, client, false, cfg, logger.Test(t))
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
em.Start(ctx)
defer func() { require.NoError(t, em.Close()) }()

require.NoError(t, em.Emit(ctx, []byte("plugin-row"), testEmitAttrs()...))

require.Eventually(t, func() bool {
return client.batchCount.Load() >= 1 && store.Len() == 1
}, 2*time.Second, 5*time.Millisecond, "initial PublishBatch should fail and leave the row")

// Several host-only ticks would have cleared or retried by now.
time.Sleep(250 * time.Millisecond)

assert.Equal(t, 1, store.Len(), "non-host must not run retransmit or expiry loops")
assert.Equal(t, int64(1), client.batchCount.Load(), "non-host must not schedule extra PublishBatch via retransmit")
}

func TestDurableEmitter_NonHostProcessStillDeliversViaBatchWorkers(t *testing.T) {
store := NewMemDurableEventStore()
client := &testChipClient{}

em, err := NewDurableEmitter(store, client, false, DefaultDurableEmitterConfig(), logger.Test(t))
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
em.Start(ctx)
defer func() { require.NoError(t, em.Close()) }()

require.NoError(t, em.Emit(ctx, []byte("loop-plugin"), testEmitAttrs()...))

require.Eventually(t, func() bool {
return store.Len() == 0 && client.batchCount.Load() >= 1
}, 2*time.Second, 10*time.Millisecond, "batch publish workers must still run when isHostProcess is false")
}

func TestDurableEmitter_EmitPersistsAndPublishes(t *testing.T) {
store := NewMemDurableEventStore()
client := &testChipClient{}
Expand Down Expand Up @@ -437,13 +488,13 @@ func TestNewDurableEmitter_ValidationErrors(t *testing.T) {
log := logger.Test(t)
cfg := DefaultDurableEmitterConfig()

_, err := NewDurableEmitter(nil, &testChipClient{}, cfg, log)
_, err := NewDurableEmitter(nil, &testChipClient{}, true, cfg, log)
assert.ErrorContains(t, err, "store")

_, err = NewDurableEmitter(NewMemDurableEventStore(), nil, cfg, log)
_, err = NewDurableEmitter(NewMemDurableEventStore(), nil, true, cfg, log)
assert.ErrorContains(t, err, "client")

_, err = NewDurableEmitter(NewMemDurableEventStore(), &testChipClient{}, cfg, nil)
_, err = NewDurableEmitter(NewMemDurableEventStore(), &testChipClient{}, true, cfg, nil)
assert.ErrorContains(t, err, "logger")
}

Expand All @@ -456,7 +507,7 @@ func TestDurableEmitter_MetricsRegistersEmitSuccess(t *testing.T) {
cfg.RetransmitInterval = time.Hour
cfg.Metrics = &DurableEmitterMetricsConfig{PollInterval: 25 * time.Millisecond}

em, err := NewDurableEmitter(store, client, cfg, logger.Test(t))
em, err := NewDurableEmitter(store, client, true, cfg, logger.Test(t))
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -603,7 +654,7 @@ func TestIntegration_HappyPath(t *testing.T) {
client := newChipClient(t, addr)
store := NewMemDurableEventStore()

em, err := NewDurableEmitter(store, client, fastCfg(), logger.Test(t))
em, err := NewDurableEmitter(store, client, true, fastCfg(), logger.Test(t))
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -631,7 +682,7 @@ func TestIntegration_ServerUnavailable_RetransmitRecovers(t *testing.T) {
client := newChipClient(t, addr)
store := NewMemDurableEventStore()

em, err := NewDurableEmitter(store, client, fastCfg(), logger.Test(t))
em, err := NewDurableEmitter(store, client, true, fastCfg(), logger.Test(t))
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -666,7 +717,7 @@ func TestIntegration_ServerDown_EventsSurvive(t *testing.T) {

cfg := fastCfg()
cfg.PublishTimeout = 500 * time.Millisecond
em, err := NewDurableEmitter(store, client, cfg, logger.Test(t))
em, err := NewDurableEmitter(store, client, true, cfg, logger.Test(t))
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -699,7 +750,7 @@ func TestIntegration_ServerDown_EventsSurvive(t *testing.T) {
require.NoError(t, err)
t.Cleanup(func() { _ = client2.Close() })

em2, err := NewDurableEmitter(store, client2, cfg, logger.Test(t))
em2, err := NewDurableEmitter(store, client2, true, cfg, logger.Test(t))
require.NoError(t, err)
em2.Start(ctx)
defer em2.Close()
Expand All @@ -721,7 +772,7 @@ func TestIntegration_HighThroughput(t *testing.T) {

cfg := fastCfg()
cfg.RetransmitBatchSize = 200
em, err := NewDurableEmitter(store, client, cfg, logger.Test(t))
em, err := NewDurableEmitter(store, client, true, cfg, logger.Test(t))
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -754,7 +805,7 @@ func TestIntegration_EventExpiry(t *testing.T) {
cfg := fastCfg()
cfg.EventTTL = 100 * time.Millisecond
cfg.ExpiryInterval = 100 * time.Millisecond
em, err := NewDurableEmitter(store, client, cfg, logger.Test(t))
em, err := NewDurableEmitter(store, client, true, cfg, logger.Test(t))
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -779,7 +830,7 @@ func TestIntegration_RetransmitEnqueuesBatchWorkers(t *testing.T) {
client := newChipClient(t, addr)
store := NewMemDurableEventStore()

em, err := NewDurableEmitter(store, client, fastCfg(), logger.Test(t))
em, err := NewDurableEmitter(store, client, true, fastCfg(), logger.Test(t))
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -830,7 +881,7 @@ func TestIntegration_GRPCConnection(t *testing.T) {
client := newChipClient(t, addr)
store := NewMemDurableEventStore()

em, err := NewDurableEmitter(store, client, fastCfg(), logger.Test(t))
em, err := NewDurableEmitter(store, client, true, fastCfg(), logger.Test(t))
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
Expand Down
Loading