Skip to content

Commit 5085e70

Browse files
committed
fix(syncer): include in-flight and pending-cache work in PendingCount
1 parent 312f1e6 commit 5085e70

3 files changed

Lines changed: 14 additions & 1 deletion

File tree

block/internal/cache/generic_cache.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,11 @@ func (c *Cache[T]) getNextItem(height uint64) *T {
113113
return item
114114
}
115115

116+
// itemCount returns the number of items currently stored by height.
117+
func (c *Cache[T]) itemCount() int {
118+
return c.itemsByHeight.Len()
119+
}
120+
116121
// isSeen returns true if the hash has been seen.
117122
func (c *Cache[T]) isSeen(hash string) bool {
118123
seen, ok := c.hashes.Get(hash)

block/internal/cache/manager.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ type CacheManager interface {
5858
// Pending events syncing coordination
5959
GetNextPendingEvent(blockHeight uint64) *common.DAHeightEvent
6060
SetPendingEvent(blockHeight uint64, event *common.DAHeightEvent)
61+
PendingEventsCount() int
6162

6263
// Store operations
6364
SaveToStore() error
@@ -321,6 +322,10 @@ func (m *implementation) SetPendingEvent(height uint64, event *common.DAHeightEv
321322
m.pendingEventsCache.setItem(height, event)
322323
}
323324

325+
func (m *implementation) PendingEventsCount() int {
326+
return m.pendingEventsCache.itemCount()
327+
}
328+
324329
// GetNextPendingEvent efficiently retrieves and removes the event at the specified height.
325330
// Returns nil if no event exists at that height.
326331
func (m *implementation) GetNextPendingEvent(height uint64) *common.DAHeightEvent {

block/internal/syncing/syncer.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ type Syncer struct {
7272
// Channels for coordination
7373
heightInCh chan common.DAHeightEvent
7474
errorCh chan<- error // Channel to report critical execution client failures
75+
inFlight atomic.Int64
7576

7677
// Handlers
7778
daRetriever DARetriever
@@ -379,7 +380,9 @@ func (s *Syncer) processLoop(ctx context.Context) {
379380
return
380381
case heightEvent, ok := <-s.heightInCh:
381382
if ok {
383+
s.inFlight.Add(1)
382384
s.processHeightEvent(ctx, &heightEvent)
385+
s.inFlight.Add(-1)
383386
}
384387
}
385388
}
@@ -403,7 +406,7 @@ func (s *Syncer) HasReachedDAHead() bool {
403406

404407
// PendingCount returns the number of unprocessed height events in the pipeline.
405408
func (s *Syncer) PendingCount() int {
406-
return len(s.heightInCh)
409+
return len(s.heightInCh) + int(s.inFlight.Load()) + s.cache.PendingEventsCount()
407410
}
408411

409412
func (s *Syncer) pendingWorkerLoop(ctx context.Context) {

0 commit comments

Comments
 (0)