@@ -16,7 +16,7 @@ use async_trait::async_trait;
1616use futures03:: StreamExt ;
1717use http0:: uri:: { Scheme , Uri } ;
1818use itertools:: Itertools ;
19- use slog:: { error, info, Logger } ;
19+ use slog:: { error, info, trace , Logger } ;
2020use std:: { collections:: HashMap , fmt:: Display , ops:: ControlFlow , sync:: Arc , time:: Duration } ;
2121use tokio:: sync:: OnceCell ;
2222use tonic:: codegen:: InterceptedService ;
@@ -33,6 +33,7 @@ use crate::components::network_provider::NetworkDetails;
3333use crate :: components:: network_provider:: ProviderCheckStrategy ;
3434use crate :: components:: network_provider:: ProviderManager ;
3535use crate :: components:: network_provider:: ProviderName ;
36+ use crate :: prelude:: retry;
3637
3738/// This is constant because we found this magic number of connections after
3839/// which the grpc connections start to hang.
@@ -425,7 +426,7 @@ impl FirehoseEndpoint {
425426 }
426427
427428 pub async fn load_blocks_by_numbers < M > (
428- & self ,
429+ self : Arc < Self > ,
429430 numbers : Vec < u64 > ,
430431 logger : & Logger ,
431432 ) -> Result < Vec < M > , anyhow:: Error >
@@ -435,21 +436,39 @@ impl FirehoseEndpoint {
435436 let mut blocks = Vec :: with_capacity ( numbers. len ( ) ) ;
436437
437438 for number in numbers {
438- debug ! (
439+ let provider_name = self . provider . as_str ( ) ;
440+
441+ trace ! (
439442 logger,
440443 "Loading block for block number {}" , number;
441- "provider" => self . provider . as_str ( ) ,
444+ "provider" => provider_name ,
442445 ) ;
443446
444- match self . get_block_by_number :: < M > ( number, logger) . await {
447+ let retry_log_message = format ! ( "get_block_by_number for block {}" , number) ;
448+ let endpoint_for_retry = self . cheap_clone ( ) ;
449+
450+ let logger_for_retry = logger. clone ( ) ;
451+ let logger_for_error = logger. clone ( ) ;
452+
453+ let block = retry ( retry_log_message, & logger_for_retry)
454+ . limit ( ENV_VARS . firehose_block_fetch_retry_limit )
455+ . timeout_secs ( ENV_VARS . firehose_block_fetch_timeout )
456+ . run ( move || {
457+ let e = endpoint_for_retry. cheap_clone ( ) ;
458+ let l = logger_for_retry. clone ( ) ;
459+ async move { e. get_block_by_number :: < M > ( number, & l) . await }
460+ } )
461+ . await ;
462+
463+ match block {
445464 Ok ( block) => {
446465 blocks. push ( block) ;
447466 }
448467 Err ( e) => {
449468 error ! (
450- logger ,
469+ logger_for_error ,
451470 "Failed to load block number {}: {}" , number, e;
452- "provider" => self . provider . as_str ( ) ,
471+ "provider" => provider_name ,
453472 ) ;
454473 return Err ( anyhow:: format_err!(
455474 "failed to load block number {}: {}" ,
0 commit comments