Skip to content

Commit bb05308

Browse files
fix(p2p): add exponential backoff for reacher (#5371)
1 parent 1fd7740 commit bb05308

File tree

4 files changed

+420
-15
lines changed

4 files changed

+420
-15
lines changed
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
// Copyright 2026 The Swarm Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
package reacher
6+
7+
import (
8+
m "github.com/ethersphere/bee/v2/pkg/metrics"
9+
"github.com/prometheus/client_golang/prometheus"
10+
)
11+
12+
// metrics groups reacher related prometheus counters.
13+
type metrics struct {
14+
Peers prometheus.Gauge
15+
PingAttemptCount prometheus.Counter
16+
PingErrorCount prometheus.Counter
17+
PingDuration prometheus.Histogram
18+
}
19+
20+
// newMetrics is a convenient constructor for creating new metrics.
21+
func newMetrics() metrics {
22+
const subsystem = "reacher"
23+
24+
return metrics{
25+
Peers: prometheus.NewGauge(prometheus.GaugeOpts{
26+
Namespace: m.Namespace,
27+
Subsystem: subsystem,
28+
Name: "peers",
29+
Help: "Number of peers currently in the reacher queue.",
30+
}),
31+
PingAttemptCount: prometheus.NewCounter(prometheus.CounterOpts{
32+
Namespace: m.Namespace,
33+
Subsystem: subsystem,
34+
Name: "ping_attempt_count",
35+
Help: "Number of ping attempts.",
36+
}),
37+
PingErrorCount: prometheus.NewCounter(prometheus.CounterOpts{
38+
Namespace: m.Namespace,
39+
Subsystem: subsystem,
40+
Name: "ping_error_count",
41+
Help: "Number of failed ping attempts.",
42+
}),
43+
PingDuration: prometheus.NewHistogram(prometheus.HistogramOpts{
44+
Namespace: m.Namespace,
45+
Subsystem: subsystem,
46+
Name: "ping_duration_seconds",
47+
Help: "Ping latency distribution in seconds.",
48+
Buckets: []float64{.1, .25, .5, 1, 2, 5, 10, 15},
49+
}),
50+
}
51+
}
52+
53+
// Metrics returns set of prometheus collectors.
54+
func (r *reacher) Metrics() []prometheus.Collector {
55+
return m.PrometheusCollectorsFromFields(r.metrics)
56+
}

pkg/p2p/libp2p/internal/reacher/reacher.go

Lines changed: 85 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ package reacher
99
import (
1010
"container/heap"
1111
"context"
12+
"math/rand/v2"
1213
"sync"
1314
"time"
1415

@@ -19,16 +20,22 @@ import (
1920
)
2021

2122
const (
22-
pingTimeout = time.Second * 15
23-
workers = 16
24-
retryAfterDuration = time.Minute * 5
23+
pingTimeout = time.Second * 15
24+
workers = 8
25+
retryAfterDuration = time.Minute * 5
26+
maxFailBackoffExponent = 4 // caps failure backoff at retryAfterDuration * 2^4 = 80 min
27+
maxSuccessBackoffExponent = 2 // caps success backoff at retryAfterDuration * 2^2 = 20 min
28+
jitterFactor = 0.2 // ±20% randomization on retry intervals
2529
)
2630

2731
type peer struct {
28-
overlay swarm.Address
29-
addr ma.Multiaddr
30-
retryAfter time.Time
31-
index int // index in the heap
32+
overlay swarm.Address
33+
addr ma.Multiaddr
34+
retryAfter time.Time
35+
failCount int // consecutive ping failures for exponential backoff
36+
successCount int // consecutive ping successes for exponential backoff
37+
generation int // incremented on reconnect; guards against stale notifyResult
38+
index int // index in the heap
3239
}
3340

3441
type reacher struct {
@@ -44,6 +51,7 @@ type reacher struct {
4451

4552
wg sync.WaitGroup
4653

54+
metrics metrics
4755
options *Options
4856
logger log.Logger
4957
}
@@ -52,6 +60,7 @@ type Options struct {
5260
PingTimeout time.Duration
5361
Workers int
5462
RetryAfterDuration time.Duration
63+
JitterFactor float64 // ±N% randomization on retry intervals; 0 disables jitter
5564
}
5665

5766
func New(streamer p2p.Pinger, notifier p2p.ReachableNotifier, o *Options, log log.Logger) *reacher {
@@ -62,6 +71,7 @@ func New(streamer p2p.Pinger, notifier p2p.ReachableNotifier, o *Options, log lo
6271
peerHeap: make(peerHeap, 0),
6372
peerIndex: make(map[string]*peer),
6473
notifier: notifier,
74+
metrics: newMetrics(),
6575
logger: log.WithName("reacher").Register(),
6676
}
6777

@@ -70,6 +80,7 @@ func New(streamer p2p.Pinger, notifier p2p.ReachableNotifier, o *Options, log lo
7080
PingTimeout: pingTimeout,
7181
Workers: workers,
7282
RetryAfterDuration: retryAfterDuration,
83+
JitterFactor: jitterFactor,
7384
}
7485
}
7586
r.options = o
@@ -95,7 +106,6 @@ func (r *reacher) manage() {
95106
}
96107

97108
for {
98-
99109
p, ok, tryAfter := r.tryAcquirePeer()
100110

101111
// if no peer is returned,
@@ -136,15 +146,22 @@ func (r *reacher) ping(c chan peer, ctx context.Context) {
136146
defer r.wg.Done()
137147
for p := range c {
138148
func() {
149+
r.metrics.PingAttemptCount.Inc()
139150
ctxt, cancel := context.WithTimeout(ctx, r.options.PingTimeout)
140151
defer cancel()
152+
start := time.Now()
141153
rtt, err := r.pinger.Ping(ctxt, p.addr)
142154
if err != nil {
155+
r.metrics.PingDuration.Observe(time.Since(start).Seconds())
156+
r.metrics.PingErrorCount.Inc()
143157
r.logger.Debug("ping failed", "peer", p.overlay.String(), "addr", p.addr.String(), "error", err)
144158
r.notifier.Reachable(p.overlay, p2p.ReachabilityStatusPrivate)
159+
r.notifyResult(p.overlay, false, p.generation)
145160
} else {
161+
r.metrics.PingDuration.Observe(rtt.Seconds())
146162
r.logger.Debug("ping succeeded", "peer", p.overlay.String(), "addr", p.addr.String(), "rtt", rtt)
147163
r.notifier.Reachable(p.overlay, p2p.ReachabilityStatusPublic)
164+
r.notifyResult(p.overlay, true, p.generation)
148165
}
149166
}()
150167
}
@@ -168,8 +185,10 @@ func (r *reacher) tryAcquirePeer() (peer, bool, time.Duration) {
168185
return peer{}, false, time.Until(p.retryAfter)
169186
}
170187

171-
// Update retryAfter and fix heap position
172-
p.retryAfter = time.Now().Add(r.options.RetryAfterDuration)
188+
// Set a temporary far-future retryAfter to prevent the manage loop from
189+
// re-dispatching this peer while the ping is in flight. The actual
190+
// retryAfter will be set by notifyResult after the ping completes.
191+
p.retryAfter = now.Add(time.Hour)
173192
heap.Fix(&r.peerHeap, p.index)
174193

175194
// Return a copy so callers can read fields without holding the lock.
@@ -190,13 +209,57 @@ func (r *reacher) Connected(overlay swarm.Address, addr ma.Multiaddr) {
190209
if existing, ok := r.peerIndex[key]; ok {
191210
existing.addr = addr // Update address for reconnecting peer
192211
existing.retryAfter = time.Time{} // Reset to trigger immediate re-ping
212+
existing.failCount = 0 // Fresh start on reconnect
213+
existing.successCount = 0 // Fresh start on reconnect
214+
existing.generation++ // invalidate any in-flight notifyResult
193215
heap.Fix(&r.peerHeap, existing.index)
194216
} else {
195217
p := &peer{overlay: overlay, addr: addr}
196218
r.peerIndex[key] = p
197219
heap.Push(&r.peerHeap, p)
220+
r.metrics.Peers.Inc()
221+
}
222+
223+
select {
224+
case r.newPeer <- struct{}{}:
225+
default:
226+
}
227+
}
228+
229+
// notifyResult updates the peer's retry schedule based on the ping outcome.
230+
// Both success and failure use exponential backoff with different caps:
231+
// - Success: 5m → 10m → 20m (capped at 2^2), resets failCount
232+
// - Failure: 5m → 10m → 20m → 40m → 80m (capped at 2^4), resets successCount
233+
//
234+
// The gen parameter is the generation captured when the ping was dispatched.
235+
// If the peer was reconnected (generation incremented) while the ping was
236+
// in flight, the stale result is discarded.
237+
func (r *reacher) notifyResult(overlay swarm.Address, success bool, gen int) {
238+
r.mu.Lock()
239+
defer r.mu.Unlock()
240+
241+
p, ok := r.peerIndex[overlay.ByteString()]
242+
if !ok {
243+
return // peer was disconnected while ping was in flight
244+
}
245+
if p.generation != gen {
246+
return // peer was reconnected; discard stale result
247+
}
248+
249+
if success {
250+
p.failCount = 0
251+
p.successCount++
252+
backoff := min(p.successCount, maxSuccessBackoffExponent)
253+
p.retryAfter = time.Now().Add(r.jitter(r.options.RetryAfterDuration * time.Duration(1<<backoff)))
254+
} else {
255+
p.successCount = 0
256+
p.failCount++
257+
backoff := min(p.failCount, maxFailBackoffExponent)
258+
p.retryAfter = time.Now().Add(r.jitter(r.options.RetryAfterDuration * time.Duration(1<<backoff)))
198259
}
260+
heap.Fix(&r.peerHeap, p.index)
199261

262+
// Wake the manage loop so it recalculates the next retry time.
200263
select {
201264
case r.newPeer <- struct{}{}:
202265
default:
@@ -212,7 +275,19 @@ func (r *reacher) Disconnected(overlay swarm.Address) {
212275
if p, ok := r.peerIndex[key]; ok {
213276
heap.Remove(&r.peerHeap, p.index)
214277
delete(r.peerIndex, key)
278+
r.metrics.Peers.Dec()
279+
}
280+
}
281+
282+
// jitter adds ±JitterFactor randomization to a duration to prevent peers from
283+
// synchronizing their retry times and causing burst traffic.
284+
func (r *reacher) jitter(d time.Duration) time.Duration {
285+
if r.options.JitterFactor == 0 {
286+
return d
215287
}
288+
// rand.Float64() returns [0.0, 1.0), scale to [-JitterFactor, +JitterFactor)
289+
j := 1.0 + r.options.JitterFactor*(2*rand.Float64()-1)
290+
return time.Duration(float64(d) * j)
216291
}
217292

218293
// Close stops the worker. Must be called once.

0 commit comments

Comments
 (0)