Skip to content

Core P2P contiguous data retrieval: hedged requests, concurrency limiting, consistent hash routing #646

@djwhitt

Description

@djwhitt

Core P2P Contiguous Data Retrieval Improvements

Implements the three highest-impact improvements from #612 for P2P retrieval: hedged requests, per-peer concurrency limiting, and consistent hash routing. Hash ring routing covers both contiguous data and chunks.

Problem

The current ArIODataSource.getData() uses a sequential retry loop that tries peers one at a time. This creates three compounding problems:

  1. Tail latency from sequential attempts — A slow-but-not-failed peer burns the full 10s connection timeout before the next peer is tried. Worst case: retryCount * 10s = 30s.
  2. No load spreading — Nothing caps how many concurrent requests hit a single peer. High-weight peers attract the most traffic under load, the opposite of healthy network behavior.
  3. No cache locality — Peer selection is purely weight-based random. Two requests for the same data ID may route to completely different peers, wasting cache potential across the network.

Requirements

1. Per-Peer Concurrency Limiter

Cap concurrent outbound requests to any single peer. When a peer's slots are full, skip it immediately and try the next peer.

  • New shared limiter module (pattern: p-limit per peer, lazy initialization)
  • Configurable via PEER_MAX_CONCURRENT_OUTBOUND (default: 10)
  • Single shared instance across ArIODataSource and GatewaysDataSource
  • Fail-fast: don't queue, skip saturated peers

Why this matters: Without concurrency limits, hedged requests would amplify the overload problem — more fan-out to already-saturated peers. This is a prerequisite for hedging to be safe.

2. Hedged Requests

Replace the sequential for-loop in ArIODataSource.getData() with a hedged request pattern that fires parallel requests after a configurable delay.

Request timeline (hedge delay = 500ms):

t=0ms    → Request to Peer A
t=500ms  → Peer A hasn't responded → fire request to Peer B
t=650ms  → Peer B responds → abort Peer A, return result
  • Configurable via PEER_HEDGE_DELAY_MS (default: 500ms, 0 = disabled/sequential fallback)
  • Configurable via PEER_MAX_HEDGED_REQUESTS (default: 3, caps concurrent fan-out)
  • First success aborts all other in-flight requests via AbortController
  • Hard failures immediately trigger the next peer (no wait for hedge timer)
  • Requires decoupling candidate pool from retry count: PEER_CANDIDATE_COUNT (default: 5) selects from a wider pool while PEER_MAX_HEDGED_REQUESTS caps actual attempts

3. Consistent Hash Routing

Use a consistent hash ring to map content keys to a stable "home set" of peers, creating cache locality without coordination. Applied to both contiguous data (by data ID) and chunks (by absolute offset).

  • Hash each peer to positions on a ring (virtual nodes for even distribution)
  • For a given key, select the N closest peers as its "home set"
  • Try home set first (ranked by weight within the set), fall back to general weighted selection
  • Rebuild ring on updatePeerList() (peers join/leave)
  • Composes with weight system and concurrency limits:
Peer selection for key X:
  1. homeSet = hashRing.getHomeSet(X, homeSetSize)
  2. ranked  = rankByWeight(homeSet)
  3. filtered = filterByConcurrencySlots(ranked)   // skip saturated
  4. fallback = generalWeightedSelection(exclude=homeSet)
  5. candidates = [...filtered, ...fallback]

Why this matters: Creates emergent cache specialization — each peer naturally warms up for "its" slice of the key space. Same content always tries the same peers, strictly better for cache hit rates than random selection. Data IDs are SHA-256 hashes and absolute offsets are uniformly distributed, so distribution across the ring is inherently uniform.

Implementation Notes

Existing Patterns to Follow

  • ArIOChunkSource.fetchChunkFromPeers() already uses Promise.any() with parallel peer requests and AbortController cancellation — proves the pattern works in this codebase
  • p-limit (v6.2.0) is already a dependency, used in CompositeChunkDataSource and ArweaveCompositeClient
  • AbortSignal.any() and AbortSignal.timeout() used throughout for combined signal handling
  • Two-phase timeouts (connection vs. stream stall) already implemented in ArIODataSource

Key Files

File Change
src/data/peer-request-limiter.ts NEW — Per-peer concurrency limiter (Map<string, pLimit>)
src/data/peer-hash-ring.ts NEW — Consistent hash ring for content key → peer mapping
src/data/ar-io-data-source.ts Integrate limiter, hedged requests, hash ring selection
src/data/ar-io-chunk-source.ts Integrate hash ring selection (by absolute offset)
src/data/gateways-data-source.ts Integrate limiter in gateway tier loop
src/peers/ar-io-peer-manager.ts Expose peer list for hash ring; decouple candidate pool from retry count
src/system.ts Create and inject shared limiter and hash ring
src/config.ts New env vars: PEER_MAX_CONCURRENT_OUTBOUND, PEER_HEDGE_DELAY_MS, PEER_MAX_HEDGED_REQUESTS, PEER_CANDIDATE_COUNT

Suggested Implementation Order

  1. Per-peer concurrency limiter — standalone module, integrate into both data sources
  2. Decouple candidate pool from retry count — small change to peer selection, enables hedging to draw from deeper bench
  3. Hedged requests — replace sequential loop in ArIODataSource.getData()
  4. Consistent hash ring — new module, integrate into peer selection pipeline for both contiguous data and chunks

New Environment Variables

Variable Default Description
PEER_MAX_CONCURRENT_OUTBOUND 10 Max concurrent outbound requests to a single peer
PEER_HEDGE_DELAY_MS 500 Delay before firing next hedged request (0 = disabled)
PEER_MAX_HEDGED_REQUESTS 3 Max concurrent hedged requests per getData() call
PEER_CANDIDATE_COUNT 5 Number of candidate peers to select (independent of max attempts)

Related

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions