Skip to content

Commit 1aa7cdc

Browse files
committed
refactor: use FuturesUnordered and reuse validity checks
- Refactored bundle processing to use FuturesUnordered for concurrent execution - Added cancellation on first failure - Reused validity checks from crates/sim/src/cache/item.rs Addresses PR review feedback from prestwich
1 parent ece4913 commit 1aa7cdc

File tree

1 file changed

+60
-62
lines changed

1 file changed

+60
-62
lines changed

src/tasks/cache/bundle.rs

Lines changed: 60 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,7 @@
11
//! Bundler service responsible for fetching bundles and sending them to the simulator.
22
use crate::config::BuilderConfig;
3-
use alloy::{
4-
consensus::{Transaction, transaction::SignerRecoverable},
5-
eips::Decodable2718,
6-
primitives::Bytes,
7-
providers::Provider,
8-
rlp::Buf,
9-
};
3+
use alloy::providers::Provider;
4+
use futures_util::{TryStreamExt, stream};
105
use init4_bin_base::perms::tx_cache::{BuilderTxCache, BuilderTxCacheError};
116
use signet_tx_cache::{TxCacheError, types::CachedBundle};
127
use tokio::{
@@ -81,14 +76,28 @@ impl BundlePoller {
8176
/// Spawns a tokio task to check the nonces of all host transactions in a bundle
8277
/// before sending it to the cache task via the outbound channel.
8378
///
84-
/// Bundles with stale host transaction nonces are dropped to prevent them from
85-
/// entering the SimCache, failing simulation, and being re-ingested on the next poll.
79+
/// Uses the bundle's `host_tx_reqs()` to extract signer/nonce requirements
80+
/// (reusing the existing validity check pattern from `signet-sim`), then checks
81+
/// all host tx nonces concurrently via [`FuturesUnordered`], cancelling early
82+
/// on the first stale or failed nonce.
83+
///
84+
/// [`FuturesUnordered`]: futures_util::stream::FuturesUnordered
8685
fn spawn_check_bundle_nonces(bundle: CachedBundle, outbound: UnboundedSender<CachedBundle>) {
8786
tokio::spawn(async move {
8887
let span = debug_span!("check_bundle_nonces", bundle_id = %bundle.id);
8988

89+
// Recover the bundle to get typed host tx requirements instead of
90+
// manually decoding and recovering signers.
91+
let recovered = match bundle.bundle.try_to_recovered() {
92+
Ok(r) => r,
93+
Err(e) => {
94+
span_debug!(span, ?e, "Failed to recover bundle, dropping");
95+
return;
96+
}
97+
};
98+
9099
// If no host transactions, forward directly
91-
if bundle.bundle.host_txs.is_empty() {
100+
if recovered.host_txs().is_empty() {
92101
if outbound.send(bundle).is_err() {
93102
span_debug!(span, "Outbound channel closed, stopping nonce check task");
94103
}
@@ -102,56 +111,50 @@ impl BundlePoller {
102111
return;
103112
};
104113

105-
// Check each host transaction's nonce
106-
for (idx, host_tx_bytes) in bundle.bundle.host_txs.iter().enumerate() {
107-
let host_tx = match decode_tx(host_tx_bytes) {
108-
Some(tx) => tx,
109-
None => {
110-
span_debug!(
111-
span,
112-
idx,
113-
"Failed to decode host transaction, dropping bundle"
114-
);
115-
return;
116-
}
117-
};
118-
119-
let sender = match host_tx.recover_signer() {
120-
Ok(s) => s,
121-
Err(_) => {
122-
span_debug!(
123-
span,
124-
idx,
125-
"Failed to recover sender from host tx, dropping bundle"
126-
);
127-
return;
114+
// Collect host tx requirements (signer + nonce) from the recovered bundle
115+
let reqs: Vec<_> = recovered.host_tx_reqs().enumerate().collect();
116+
117+
// Check all host tx nonces concurrently, cancelling on first failure.
118+
let result = stream::iter(reqs)
119+
.map(Ok)
120+
.try_for_each_concurrent(None, |(idx, req)| {
121+
let host_provider = &host_provider;
122+
let span = &span;
123+
async move {
124+
let tx_count = host_provider
125+
.get_transaction_count(req.signer)
126+
.await
127+
.map_err(|_| {
128+
span_debug!(
129+
span,
130+
idx,
131+
sender = %req.signer,
132+
"Failed to fetch nonce for sender, dropping bundle"
133+
);
134+
})?;
135+
136+
if req.nonce < tx_count {
137+
debug!(
138+
parent: span,
139+
sender = %req.signer,
140+
tx_nonce = %req.nonce,
141+
host_nonce = %tx_count,
142+
idx,
143+
"Dropping bundle with stale host tx nonce"
144+
);
145+
return Err(());
146+
}
147+
148+
Ok(())
128149
}
129-
};
130-
131-
let tx_count = match host_provider.get_transaction_count(sender).await {
132-
Ok(count) => count,
133-
Err(_) => {
134-
span_debug!(span, idx, %sender, "Failed to fetch nonce for sender, dropping bundle");
135-
return;
136-
}
137-
};
138-
139-
if host_tx.nonce() < tx_count {
140-
debug!(
141-
parent: &span,
142-
%sender,
143-
tx_nonce = %host_tx.nonce(),
144-
host_nonce = %tx_count,
145-
idx,
146-
"Dropping bundle with stale host tx nonce"
147-
);
148-
return;
149-
}
150-
}
150+
})
151+
.await;
151152

152153
// All host txs have valid nonces, forward the bundle
153-
if outbound.send(bundle).is_err() {
154-
span_debug!(span, "Outbound channel closed, stopping nonce check task");
154+
if result.is_ok() {
155+
if outbound.send(bundle).is_err() {
156+
span_debug!(span, "Outbound channel closed, stopping nonce check task");
157+
}
155158
}
156159
});
157160
}
@@ -191,8 +194,3 @@ impl BundlePoller {
191194
(inbound, jh)
192195
}
193196
}
194-
195-
/// Decodes a transaction from RLP-encoded bytes.
196-
fn decode_tx(bytes: &Bytes) -> Option<alloy::consensus::TxEnvelope> {
197-
alloy::consensus::TxEnvelope::decode_2718(&mut bytes.chunk()).ok()
198-
}

0 commit comments

Comments
 (0)