3131from scalecodec .types import (
3232 GenericCall ,
3333 GenericExtrinsic ,
34- GenericRuntimeCallDefinition ,
3534 ss58_encode ,
3635 MultiAccountId ,
3736)
7473 _bt_decode_to_dict_or_list ,
7574 legacy_scale_decode ,
7675 convert_account_ids ,
76+ decode_query_map_async ,
7777)
7878from async_substrate_interface .utils .storage import StorageKey
7979from async_substrate_interface .type_registry import _TYPE_REGISTRY
80- from async_substrate_interface .utils .decoding import (
81- decode_query_map ,
82- )
8380
8481ResultHandler = Callable [[dict , Any ], Awaitable [tuple [dict , bool ]]]
8582
@@ -526,6 +523,18 @@ async def retrieve_next_page(self, start_key) -> list:
526523 self .last_key = result .last_key
527524 return result .records
528525
526+ async def retrieve_all_records (self ) -> list [Any ]:
527+ """
528+ Retrieves all records from all subsequent pages for the AsyncQueryMapResult,
529+ returning them as a list.
530+
531+ Side effect:
532+ The self.records list will be populated fully after running this method.
533+ """
534+ async for _ in self :
535+ pass
536+ return self .records
537+
529538 def __aiter__ (self ):
530539 return self
531540
@@ -558,6 +567,7 @@ async def __anext__(self):
558567 self .loading_complete = True
559568 raise StopAsyncIteration
560569
570+ self .records .extend (next_page )
561571 # Update the buffer with the newly fetched records
562572 self ._buffer = iter (next_page )
563573 return next (self ._buffer )
@@ -1408,7 +1418,9 @@ async def decode_scale(
14081418 if runtime is None :
14091419 runtime = await self .init_runtime (block_hash = block_hash )
14101420 if runtime .metadata_v15 is not None and force_legacy is False :
1411- obj = decode_by_type_string (type_string , runtime .registry , scale_bytes )
1421+ obj = await asyncio .to_thread (
1422+ decode_by_type_string , type_string , runtime .registry , scale_bytes
1423+ )
14121424 if self .decode_ss58 :
14131425 try :
14141426 type_str_int = int (type_string .split ("::" )[1 ])
@@ -2762,19 +2774,34 @@ async def rpc_request(
27622774 logger .error (f"Substrate Request Exception: { result [payload_id ]} " )
27632775 raise SubstrateRequestException (result [payload_id ][0 ])
27642776
2765- @cached_fetcher (max_size = SUBSTRATE_CACHE_METHOD_SIZE )
2766- async def get_block_hash (self , block_id : int ) -> str :
2777+ async def get_block_hash (self , block_id : Optional [int ]) -> str :
27672778 """
2768- Retrieves the hash of the specified block number
2779+ Retrieves the hash of the specified block number, or the chaintip if None
27692780 Args:
27702781 block_id: block number
27712782
27722783 Returns:
27732784 Hash of the block
27742785 """
2786+ if block_id is None :
2787+ return await self .get_chain_head ()
2788+ else :
2789+ if (block_hash := self .runtime_cache .blocks .get (block_id )) is not None :
2790+ return block_hash
2791+
2792+ block_hash = await self ._cached_get_block_hash (block_id )
2793+ self .runtime_cache .add_item (block_hash = block_hash , block = block_id )
2794+ return block_hash
2795+
2796+ @cached_fetcher (max_size = SUBSTRATE_CACHE_METHOD_SIZE )
2797+ async def _cached_get_block_hash (self , block_id : int ) -> str :
2798+ """
2799+ The design of this method is as such, because it allows for an easy drop-in for a different cache, such
2800+ as is the case with DiskCachedAsyncSubstrateInterface._cached_get_block_hash
2801+ """
27752802 return await self ._get_block_hash (block_id )
27762803
2777- async def _get_block_hash (self , block_id : int ) -> str :
2804+ async def _get_block_hash (self , block_id : Optional [ int ] ) -> str :
27782805 return (await self .rpc_request ("chain_getBlockHash" , [block_id ]))["result" ]
27792806
27802807 async def get_chain_head (self ) -> str :
@@ -3852,18 +3879,20 @@ async def query_map(
38523879 params = [result_keys , block_hash ],
38533880 runtime = runtime ,
38543881 )
3882+ changes = []
38553883 for result_group in response ["result" ]:
3856- result = decode_query_map (
3857- result_group ["changes" ],
3858- prefix ,
3859- runtime ,
3860- param_types ,
3861- params ,
3862- value_type ,
3863- key_hashers ,
3864- ignore_decoding_errors ,
3865- self .decode_ss58 ,
3866- )
3884+ changes .extend (result_group ["changes" ])
3885+ result = await decode_query_map_async (
3886+ changes ,
3887+ prefix ,
3888+ runtime ,
3889+ param_types ,
3890+ params ,
3891+ value_type ,
3892+ key_hashers ,
3893+ ignore_decoding_errors ,
3894+ self .decode_ss58 ,
3895+ )
38673896 else :
38683897 # storage item and value scale type are not included here because this is batch-decoded in rust
38693898 page_batches = [
@@ -3881,8 +3910,8 @@ async def query_map(
38813910 results : RequestResults = await self ._make_rpc_request (
38823911 payloads , runtime = runtime
38833912 )
3884- for result in results .values ():
3885- res = result [0 ]
3913+ for result_ in results .values ():
3914+ res = result_ [0 ]
38863915 if "error" in res :
38873916 err_msg = res ["error" ]["message" ]
38883917 if (
@@ -3900,7 +3929,7 @@ async def query_map(
39003929 else :
39013930 for result_group in res ["result" ]:
39023931 changes .extend (result_group ["changes" ])
3903- result = decode_query_map (
3932+ result = await decode_query_map_async (
39043933 changes ,
39053934 prefix ,
39063935 runtime ,
@@ -4113,6 +4142,14 @@ async def result_handler(message: dict, subscription_id) -> tuple[dict, bool]:
41134142 "extrinsic_hash" : "0x{}" .format (extrinsic .extrinsic_hash .hex ()),
41144143 "finalized" : False ,
41154144 }, True
4145+
4146+ elif "params" in message and message ["params" ].get ("result" ) == "invalid" :
4147+ failure_message = f"Subscription { subscription_id } invalid: { message } "
4148+ async with self .ws as ws :
4149+ await ws .unsubscribe (subscription_id )
4150+ logger .error (failure_message )
4151+ raise SubstrateRequestException (failure_message )
4152+
41164153 return message , False
41174154
41184155 if wait_for_inclusion or wait_for_finalization :
@@ -4250,13 +4287,25 @@ async def get_metadata_event(
42504287
42514288 async def get_block_number (self , block_hash : Optional [str ] = None ) -> int :
42524289 """Async version of `substrateinterface.base.get_block_number` method."""
4253- response = await self .rpc_request ("chain_getHeader" , [block_hash ])
4290+ if block_hash is None :
4291+ return await self ._get_block_number (None )
4292+ if (block := self .runtime_cache .blocks_reverse .get (block_hash )) is not None :
4293+ return block
4294+ block = await self ._cached_get_block_number (block_hash )
4295+ self .runtime_cache .add_item (block_hash = block_hash , block = block )
4296+ return block
42544297
4255- if response ["result" ]:
4256- return int (response ["result" ]["number" ], 16 )
4257- raise SubstrateRequestException (
4258- f"Unable to retrieve block number for { block_hash } "
4259- )
4298+ @cached_fetcher (max_size = SUBSTRATE_CACHE_METHOD_SIZE )
4299+ async def _cached_get_block_number (self , block_hash : str ) -> int :
4300+ """
4301+ The design of this method is as such, because it allows for an easy drop-in for a different cache, such
4302+ as is the case with DiskCachedAsyncSubstrateInterface._cached_get_block_number
4303+ """
4304+ return await self ._get_block_number (block_hash = block_hash )
4305+
4306+ async def _get_block_number (self , block_hash : Optional [str ]) -> int :
4307+ response = await self .rpc_request ("chain_getHeader" , [block_hash ])
4308+ return int (response ["result" ]["number" ], 16 )
42604309
42614310 async def close (self ):
42624311 """
@@ -4351,9 +4400,13 @@ async def get_block_runtime_version_for(self, block_hash: str):
43514400 return await self ._get_block_runtime_version_for (block_hash )
43524401
43534402 @async_sql_lru_cache (maxsize = SUBSTRATE_CACHE_METHOD_SIZE )
4354- async def get_block_hash (self , block_id : int ) -> str :
4403+ async def _cached_get_block_hash (self , block_id : int ) -> str :
43554404 return await self ._get_block_hash (block_id )
43564405
4406+ @async_sql_lru_cache (maxsize = SUBSTRATE_CACHE_METHOD_SIZE )
4407+ async def _cached_get_block_number (self , block_hash : str ) -> int :
4408+ return await self ._get_block_number (block_hash = block_hash )
4409+
43574410
43584411async def get_async_substrate_interface (
43594412 url : str ,
0 commit comments