Skip to content

Commit 8b2a3fc

Browse files
MoonBoi9001claude
andcommitted
perf(index-node): batch block hash lookups for POI queries
Pre-fetch all block hashes in a single batch query before parallel POI processing, reducing database round-trips from 10+ to 1-2 per batch. - Add block_hashes_by_block_numbers batch method to ChainStore trait - Add get_public_proof_of_indexing_with_block_hash to StatusStore trait - Modify resolver to group requests by network and batch-fetch hashes - Pass pre-fetched hashes to avoid redundant lookups during parallel POI Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 888a12e commit 8b2a3fc

File tree

5 files changed

+307
-25
lines changed

5 files changed

+307
-25
lines changed

graph/src/components/store/traits.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -602,6 +602,12 @@ pub trait ChainStore: ChainHeadStore {
602602
number: BlockNumber,
603603
) -> Result<Vec<BlockHash>, Error>;
604604

605+
/// Return the hashes of all blocks with the given numbers (batch version)
606+
async fn block_hashes_by_block_numbers(
607+
&self,
608+
numbers: &[BlockNumber],
609+
) -> Result<HashMap<BlockNumber, Vec<BlockHash>>, Error>;
610+
605611
/// Confirm that block number `number` has hash `hash` and that the store
606612
/// may purge any other blocks with that number
607613
async fn confirm_block_hash(
@@ -790,6 +796,22 @@ pub trait StatusStore: Send + Sync + 'static {
790796
block_number: BlockNumber,
791797
fetch_block_ptr: &dyn BlockPtrForNumber,
792798
) -> Result<Option<(PartialBlockPtr, [u8; 32])>, StoreError>;
799+
800+
/// Like `get_public_proof_of_indexing` but accepts optional pre-fetched block hashes
801+
/// to avoid redundant database lookups when processing batches of POI requests.
802+
async fn get_public_proof_of_indexing_with_block_hash(
803+
&self,
804+
subgraph_id: &DeploymentHash,
805+
block_number: BlockNumber,
806+
prefetched_hashes: Option<&Vec<BlockHash>>,
807+
fetch_block_ptr: &dyn BlockPtrForNumber,
808+
) -> Result<Option<(PartialBlockPtr, [u8; 32])>, StoreError>;
809+
810+
/// Get the network for a deployment
811+
async fn network_for_deployment(
812+
&self,
813+
id: &DeploymentHash,
814+
) -> Result<String, StoreError>;
793815
}
794816

795817
#[async_trait]

server/index-node/src/resolver.rs

Lines changed: 88 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::collections::BTreeMap;
1+
use std::collections::{BTreeMap, HashMap};
22

33
use async_trait::async_trait;
44
use graph::data::query::Trace;
@@ -417,38 +417,101 @@ where
417417
return Err(QueryExecutionError::TooExpensive);
418418
}
419419

420-
// Process all POI requests in parallel for better throughput
421-
let poi_futures: Vec<_> = requests
422-
.into_iter()
423-
.map(|request| async move {
424-
let poi_result = match self
425-
.store
426-
.get_public_proof_of_indexing(&request.deployment, request.block_number, self)
420+
// Step 1: Group requests by network and collect block numbers for batch lookup
421+
let mut requests_by_network: HashMap<String, Vec<(usize, BlockNumber)>> = HashMap::new();
422+
let mut request_networks: Vec<Option<String>> = Vec::with_capacity(requests.len());
423+
424+
for (idx, request) in requests.iter().enumerate() {
425+
match self.store.network_for_deployment(&request.deployment).await {
426+
Ok(network) => {
427+
requests_by_network
428+
.entry(network.clone())
429+
.or_default()
430+
.push((idx, request.block_number));
431+
request_networks.push(Some(network));
432+
}
433+
Err(_) => {
434+
request_networks.push(None);
435+
}
436+
}
437+
}
438+
439+
// Step 2: Pre-fetch all block hashes per network in batch
440+
let mut block_hash_cache: HashMap<(String, BlockNumber), Vec<graph::blockchain::BlockHash>> =
441+
HashMap::new();
442+
443+
for (network, network_requests) in &requests_by_network {
444+
let block_numbers: Vec<BlockNumber> =
445+
network_requests.iter().map(|(_, num)| *num).collect();
446+
447+
if let Some(chain_store) = self.store.block_store().chain_store(network).await {
448+
match chain_store
449+
.block_hashes_by_block_numbers(&block_numbers)
427450
.await
428451
{
429-
Ok(Some(poi)) => Some(poi),
430-
Ok(None) => None,
452+
Ok(hashes) => {
453+
for (num, hash_vec) in hashes {
454+
block_hash_cache.insert((network.clone(), num), hash_vec);
455+
}
456+
}
431457
Err(e) => {
432-
error!(
458+
debug!(
433459
self.logger,
434-
"Failed to query public proof of indexing";
435-
"subgraph" => &request.deployment,
436-
"block" => format!("{}", request.block_number),
460+
"Failed to batch fetch block hashes for network";
461+
"network" => network,
437462
"error" => format!("{:?}", e)
438463
);
439-
None
464+
// Continue without pre-fetched hashes - will fall back to individual lookups
465+
}
466+
}
467+
}
468+
}
469+
470+
// Step 3: Process all POI requests in parallel, using cached block hashes
471+
let poi_futures: Vec<_> = requests
472+
.into_iter()
473+
.zip(request_networks.into_iter())
474+
.map(|(request, network_opt)| {
475+
let cache = &block_hash_cache;
476+
async move {
477+
let prefetched_hashes = network_opt
478+
.as_ref()
479+
.and_then(|network| cache.get(&(network.clone(), request.block_number)));
480+
481+
let poi_result = match self
482+
.store
483+
.get_public_proof_of_indexing_with_block_hash(
484+
&request.deployment,
485+
request.block_number,
486+
prefetched_hashes,
487+
self,
488+
)
489+
.await
490+
{
491+
Ok(Some(poi)) => Some(poi),
492+
Ok(None) => None,
493+
Err(e) => {
494+
error!(
495+
self.logger,
496+
"Failed to query public proof of indexing";
497+
"subgraph" => &request.deployment,
498+
"block" => format!("{}", request.block_number),
499+
"error" => format!("{:?}", e)
500+
);
501+
None
502+
}
503+
};
504+
505+
PublicProofOfIndexingResult {
506+
deployment: request.deployment,
507+
block: match poi_result {
508+
Some((ref block, _)) => block.clone(),
509+
None => PartialBlockPtr::from(request.block_number),
510+
},
511+
proof_of_indexing: poi_result.map(|(_, poi)| poi),
440512
}
441-
};
442-
443-
PublicProofOfIndexingResult {
444-
deployment: request.deployment,
445-
block: match poi_result {
446-
Some((ref block, _)) => block.clone(),
447-
None => PartialBlockPtr::from(request.block_number),
448-
},
449-
proof_of_indexing: poi_result.map(|(_, poi)| poi),
513+
.into_value()
450514
}
451-
.into_value()
452515
})
453516
.collect();
454517

store/postgres/src/chain_store.rs

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -739,6 +739,60 @@ mod data {
739739
}
740740
}
741741

742+
/// Return the hashes of all blocks with the given block numbers (batch version)
743+
pub(super) async fn block_hashes_by_block_numbers(
744+
&self,
745+
conn: &mut AsyncPgConnection,
746+
chain: &str,
747+
numbers: &[BlockNumber],
748+
) -> Result<HashMap<BlockNumber, Vec<BlockHash>>, Error> {
749+
if numbers.is_empty() {
750+
return Ok(HashMap::new());
751+
}
752+
753+
match self {
754+
Storage::Shared => {
755+
use public::ethereum_blocks as b;
756+
757+
let results = b::table
758+
.select((b::number, b::hash))
759+
.filter(b::network_name.eq(chain))
760+
.filter(
761+
b::number.eq_any(Vec::from_iter(numbers.iter().map(|&n| n as i64))),
762+
)
763+
.load::<(i64, String)>(conn)
764+
.await?;
765+
766+
let mut map: HashMap<BlockNumber, Vec<BlockHash>> = HashMap::new();
767+
for (num, hash) in results {
768+
let block_hash = hash.parse()?;
769+
map.entry(num as BlockNumber).or_default().push(block_hash);
770+
}
771+
Ok(map)
772+
}
773+
Storage::Private(Schema { blocks, .. }) => {
774+
let results = blocks
775+
.table()
776+
.select((blocks.number(), blocks.hash()))
777+
.filter(
778+
blocks
779+
.number()
780+
.eq_any(Vec::from_iter(numbers.iter().map(|&n| n as i64))),
781+
)
782+
.load::<(i64, Vec<u8>)>(conn)
783+
.await?;
784+
785+
let mut map: HashMap<BlockNumber, Vec<BlockHash>> = HashMap::new();
786+
for (num, hash) in results {
787+
map.entry(num as BlockNumber)
788+
.or_default()
789+
.push(BlockHash::from(hash));
790+
}
791+
Ok(map)
792+
}
793+
}
794+
}
795+
742796
pub(super) async fn confirm_block_hash(
743797
&self,
744798
conn: &mut AsyncPgConnection,
@@ -2971,6 +3025,16 @@ impl ChainStoreTrait for ChainStore {
29713025
.await
29723026
}
29733027

3028+
async fn block_hashes_by_block_numbers(
3029+
&self,
3030+
numbers: &[BlockNumber],
3031+
) -> Result<HashMap<BlockNumber, Vec<BlockHash>>, Error> {
3032+
let mut conn = self.pool.get_permitted().await?;
3033+
self.storage
3034+
.block_hashes_by_block_numbers(&mut conn, &self.chain, numbers)
3035+
.await
3036+
}
3037+
29743038
async fn confirm_block_hash(
29753039
&self,
29763040
number: BlockNumber,

store/postgres/src/store.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,31 @@ impl StatusStore for Store {
171171
.await
172172
}
173173

174+
async fn get_public_proof_of_indexing_with_block_hash(
175+
&self,
176+
subgraph_id: &DeploymentHash,
177+
block_number: BlockNumber,
178+
prefetched_hashes: Option<&Vec<graph::blockchain::BlockHash>>,
179+
fetch_block_ptr: &dyn BlockPtrForNumber,
180+
) -> Result<Option<(PartialBlockPtr, [u8; 32])>, StoreError> {
181+
self.subgraph_store
182+
.get_public_proof_of_indexing_with_block_hash(
183+
subgraph_id,
184+
block_number,
185+
prefetched_hashes,
186+
self.block_store().clone(),
187+
fetch_block_ptr,
188+
)
189+
.await
190+
}
191+
192+
async fn network_for_deployment(
193+
&self,
194+
id: &DeploymentHash,
195+
) -> Result<String, StoreError> {
196+
self.subgraph_store.network_for_deployment(id).await
197+
}
198+
174199
async fn query_permit(&self) -> QueryPermit {
175200
// Status queries go to the primary shard.
176201
self.block_store.query_permit_primary().await

0 commit comments

Comments
 (0)