Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ import (
type BatchWriteCommitVerifierNodeResultHandler struct {
handler *WriteCommitVerifierNodeResultHandler
maxCommitVerifierNodeResultRequestsPerBatch int
wg sync.WaitGroup
}

func (h *BatchWriteCommitVerifierNodeResultHandler) Wait() {
h.wg.Wait()
}

func (h *BatchWriteCommitVerifierNodeResultHandler) logger(ctx context.Context) logger.SugaredLogger {
Expand All @@ -40,12 +45,12 @@ func (h *BatchWriteCommitVerifierNodeResultHandler) Handle(ctx context.Context,
responses := make([]*committeepb.WriteCommitteeVerifierNodeResultResponse, len(requests))
errors := NewBatchErrorArray(len(requests))

wg := sync.WaitGroup{}
h.wg = sync.WaitGroup{}

for i, r := range requests {
wg.Add(1)
h.wg.Add(1)
go func(i int, r *committeepb.WriteCommitteeVerifierNodeResultRequest) {
defer wg.Done()
defer h.wg.Done()
resp, err := h.handler.Handle(ctx, r)
if err != nil {
statusErr, ok := grpcstatus.FromError(err)
Expand All @@ -65,7 +70,7 @@ func (h *BatchWriteCommitVerifierNodeResultHandler) Handle(ctx context.Context,

done := make(chan struct{})
go func() {
wg.Wait()
h.wg.Wait()
close(done)
}()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ func TestBatchWriteCommitCCVNodeDataHandler_CancelledContextReturnsImmediately(t

writeHandler := NewWriteCommitCCVNodeDataHandler(store, agg, mon, lggr, sig, blockDuration)
batchHandler := NewBatchWriteCommitVerifierNodeResultHandler(writeHandler, 10)
defer batchHandler.Wait() // ensure all goroutines finish before the test exits

ctx, cancel := context.WithCancel(auth.ToContext(context.Background(), auth.CreateCallerIdentity(testCallerID, false)))
defer cancel()
Expand Down
16 changes: 14 additions & 2 deletions indexer/pkg/worker/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ type Scheduler struct {
delayHeap *DelayHeap
ready chan *Task
dlq chan *Task
wg sync.WaitGroup
startOnce sync.Once
stopOnce sync.Once
}

func NewScheduler(lggr logger.Logger, config config.SchedulerConfig) (*Scheduler, error) {
Expand All @@ -43,12 +46,21 @@ func NewScheduler(lggr logger.Logger, config config.SchedulerConfig) (*Scheduler
}, nil
}

// Start begins the scheduler's main loop in a separate goroutine. The service may only be started once, subsequent calls to Start will be no-ops.
func (s *Scheduler) Start(ctx context.Context) {
go s.run(ctx)
s.startOnce.Do(func() {
s.wg.Go(func() {
s.run(ctx)
})
})
}

// Stop the scheduler's main loop and wait for it to exit. The service may only be stopped once, subsequent calls to Stop will be no-ops.
func (s *Scheduler) Stop() {
s.stopCh <- struct{}{}
s.stopOnce.Do(func() {
close(s.stopCh)
})
s.wg.Wait()
}

func (s *Scheduler) run(ctx context.Context) {
Expand Down
13 changes: 13 additions & 0 deletions indexer/pkg/worker/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ func TestScheduler_RunMovesDelayedToReady(t *testing.T) {

// start scheduler run loop to process delayed heap
s.Start(ctx)
defer s.Stop()

select {
case got := <-s.Ready():
Expand Down Expand Up @@ -208,6 +209,7 @@ func TestScheduler_RunDoesNotLeakGoroutinesUnderBurst(t *testing.T) {
defer cancel()

s.Start(ctx)
defer s.Stop()

// Do NOT consume from Ready — simulate backpressure from a saturated worker pool.
time.Sleep(100 * time.Millisecond)
Expand All @@ -219,3 +221,14 @@ func TestScheduler_RunDoesNotLeakGoroutinesUnderBurst(t *testing.T) {
"goroutine count grew by %d (from %d to %d); expected bounded growth under backpressure",
peakGoroutines-baselineGoroutines, baselineGoroutines, peakGoroutines)
}

func TestScheduler_DoubleStop(t *testing.T) {
lggr := logger.Test(t)
scfg := config.SchedulerConfig{TickerInterval: 10, BaseDelay: 10, MaxDelay: 1000, VerificationVisibilityWindow: 60}
s, err := NewScheduler(lggr, scfg)
require.NoError(t, err)

s.Start(t.Context())
s.Stop()
s.Stop()
}
Loading