diff --git a/README.md b/README.md index a6dbcd1e..169fca72 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ The Builder simulates bundles and transactions against the latest chain state to create valid Signet rollup blocks and submits them to the configured host chain as an [EIP-4844 transaction](https://github.com/ethereum/EIPs/blob/master/EIPS/eip-4844.md). -Bundles are treated as Flashbots-style bundles, meaning that the Builder should respect transaction ordering, bundle atomicity, and the specified revertability. +Bundles are treated as MEV-style bundles, meaning that the Builder should respect transaction ordering, bundle atomicity, and the specified revertability. -------------------------------------------------------------------------------- @@ -16,7 +16,7 @@ The Builder orchestrates a series of asynchronous actors that work together to b 1. **Env** - watches the latest host and rollup blocks to monitor gas rates and block updates. 2. **Cache** - polls bundle and transaction caches and adds them to the cache. 3. **Simulator** - simulates transactions and bundles against rollup state and block environment to build them into a cohesive block. -4. **FlashbotsSubmit** - handles preparing and submitting the simulated block to a private Flashbots relay. +4. **Submit** - handles preparing and submitting the simulated block to all configured MEV relay/builder endpoints concurrently. 5. **Metrics** - records block and tx data over time. ```mermaid @@ -36,12 +36,12 @@ flowchart TD Cache["🪏 Cache Task"] Simulator["💾 Simulator Task"] Metrics["📏 Metrics Task"] - FlashbotsSubmit["📥 Flashbots Submit Task"] + SubmitTask["📥 Submit Task"] end %% Signing - FlashbotsSubmit --hash--> Quincey - Quincey -- signature --> FlashbotsSubmit + SubmitTask --hash--> Quincey + Quincey -- signature --> SubmitTask %% Config wiring Config -.rollup rpc.-> Env @@ -49,19 +49,19 @@ flowchart TD Config -.host rpc.-> Simulator Config -.rollup rpc.-> Simulator Config -.host rpc.-> Metrics - Config -.host rpc.-> FlashbotsSubmit + Config -.host rpc.-> SubmitTask %% Core flow Env ==block env==> Simulator Cache ==sim cache==> Simulator - Simulator ==built block==> FlashbotsSubmit + Simulator ==built block==> SubmitTask %% Network submission - FlashbotsSubmit ==>|"tx bundle"| FlashbotsRelay["🛡️ Flashbots Relay"] - FlashbotsRelay ==> Ethereum + SubmitTask ==>|"tx bundle"| Relays["🛡️ MEV Relays / Builders"] + Relays ==> Ethereum %% Metrics - FlashbotsSubmit ==rollup block tx hash==> Metrics + SubmitTask ==rollup block tx hash==> Metrics ``` ### 💾 Simulation Task @@ -76,11 +76,11 @@ When the deadline is reached, the simulator is stopped, and all open simulation ### ✨ Submit Task -The Flashbots submit task prepares a Flashbots bundle out of the Signet block and its host transactions and then submits that bundle to the Flashbots endpoint. It sends the hash of the rollup block transaction for to the Metrics task for further tracking. +The submit task prepares a MEV bundle from the Signet block and its host transactions, then fans it out to all configured relay/builder endpoints concurrently (`SUBMIT_ENDPOINTS`). At least one successful relay acceptance is required; individual relay failures are tolerated and logged. The blob sidecar is always forwarded to Pylon regardless of relay outcome. If the block received from simulation is empty, the submit task will ignore it. -Finally, if it's non-empty, the submit task attempts to get a signature for the block, and if it fails due to a 403 error, it will skip the current slot and begin waiting for the next block. +If it's non-empty, the submit task attempts to get a signature for the block, and if it fails due to a 403 error, it will skip the current slot and begin waiting for the next block. -------------------------------------------------------------------------------- @@ -97,7 +97,7 @@ The Builder is configured via environment variables. The following values are su | `QUINCEY_URL` | Yes | Remote sequencer signing endpoint | | `SEQUENCER_KEY` | No | AWS Key ID _OR_ local private key for the Sequencer; set IFF using local Sequencer signing instead of remote (via `QUINCEY_URL`) Quincey signing | | `TX_POOL_URL` | Yes | Transaction pool URL | -| `FLASHBOTS_ENDPOINT` | No | Flashbots API to submit blocks to | +| `SUBMIT_ENDPOINTS` | Yes | Comma-separated list of MEV relay/builder RPC endpoints for bundle submission (e.g. `https://rpc.flashbots.net,https://rpc.titanbuilder.xyz`) | | `ROLLUP_BLOCK_GAS_LIMIT` | No | Override for rollup block gas limit | | `MAX_HOST_GAS_COEFFICIENT` | No | Optional maximum host gas coefficient, as a percentage, to use when building blocks | | `BUILDER_KEY` | Yes | AWS KMS key ID _or_ local private key for builder signing | diff --git a/bin/builder.rs b/bin/builder.rs index 8eefa4c9..77573233 100644 --- a/bin/builder.rs +++ b/bin/builder.rs @@ -5,7 +5,7 @@ use builder::{ service::serve_builder, tasks::{ block::sim::SimulatorTask, cache::CacheTasks, env::EnvTask, metrics::MetricsTask, - submit::FlashbotsTask, + submit::SubmitTask, }, }; use eyre::bail; @@ -80,7 +80,7 @@ async fn main() -> eyre::Result<()> { // Set up the cache, submit, and simulator tasks let cache_tasks = CacheTasks::new(block_env.clone()); let (submit_task, simulator_task) = - tokio::try_join!(FlashbotsTask::new(tx_channel.clone()), SimulatorTask::new(block_env),)?; + tokio::try_join!(SubmitTask::new(tx_channel.clone()), SimulatorTask::new(block_env),)?; // Spawn the cache, submit, and simulator tasks let cache_system = cache_tasks.spawn(); diff --git a/src/config.rs b/src/config.rs index 5faf5b45..95276e9d 100644 --- a/src/config.rs +++ b/src/config.rs @@ -175,7 +175,7 @@ pub struct BuilderConfig { )] pub block_query_cutoff_buffer: OptionalU64WithDefault<3000>, - /// Number of milliseconds before the end of the slot by which bundle submission to Flashbots must complete. + /// Number of milliseconds before the end of the slot by which bundle submission must complete. /// If submission completes after this deadline, a warning is logged. #[from_env( var = "SUBMIT_DEADLINE_BUFFER", diff --git a/src/tasks/submit/mod.rs b/src/tasks/submit/mod.rs index b9a7d4c5..158ea63c 100644 --- a/src/tasks/submit/mod.rs +++ b/src/tasks/submit/mod.rs @@ -1,6 +1,6 @@ -/// Submission logic for Flashbots -pub mod flashbots; -pub use flashbots::FlashbotsTask; +/// Bundle submission to MEV relay/builder endpoints. +pub mod task; +pub use task::SubmitTask; mod prep; pub use prep::{Bumpable, SubmitPrep}; diff --git a/src/tasks/submit/prep.rs b/src/tasks/submit/prep.rs index b7c6c7ed..7e97ecda 100644 --- a/src/tasks/submit/prep.rs +++ b/src/tasks/submit/prep.rs @@ -21,9 +21,9 @@ use signet_zenith::Zenith; use tokio::try_join; use tracing::{Instrument, debug, error, instrument, warn}; -/// Preparation logic for transactions issued to the host chain by the [`FlashbotsTask`]. +/// Preparation logic for transactions issued to the host chain by the [`SubmitTask`]. /// -/// [`FlashbotsTask`]: crate::tasks::submit::FlashbotsTask +/// [`SubmitTask`]: crate::tasks::submit::SubmitTask #[derive(Debug, Clone)] pub struct SubmitPrep<'a> { // The block we are preparing a transaction for diff --git a/src/tasks/submit/flashbots.rs b/src/tasks/submit/task.rs similarity index 58% rename from src/tasks/submit/flashbots.rs rename to src/tasks/submit/task.rs index de52af43..b698dc73 100644 --- a/src/tasks/submit/flashbots.rs +++ b/src/tasks/submit/task.rs @@ -8,13 +8,14 @@ //! | `signet.builder.submit.transactions_prepared` | counter | Signed rollup block transactions ready for submission | //! | `signet.builder.submit.empty_blocks` | counter | Empty blocks skipped | //! | `signet.builder.submit.bundle_prep_failures` | counter | Bundle preparation errors | -//! | `signet.builder.submit.relay_submissions` | counter | Per-relay submission attempts | +//! | `signet.builder.submit.relay_submissions` | counter | Relay submission attempts (incremented per relay up front) | //! | `signet.builder.submit.relay_successes` | counter | Per-relay successful submissions | -//! | `signet.builder.submit.relay_failures` | counter | Per-relay failed submissions | +//! | `signet.builder.submit.relay_failures` | counter | Per-relay error responses | +//! | `signet.builder.submit.relay_timeouts` | counter | Per-relay deadline timeouts | //! | `signet.builder.submit.all_relays_failed` | counter | No relay accepted the bundle | //! | `signet.builder.submit.bundles_submitted` | counter | At least one relay accepted | -//! | `signet.builder.submit.deadline_met` | counter | Bundle submitted within slot deadline | -//! | `signet.builder.submit.deadline_missed` | counter | Bundle submitted after slot deadline | +//! | `signet.builder.submit.deadline_met` | counter | Bundle accepted within slot deadline | +//! | `signet.builder.submit.deadline_missed` | counter | Bundle accepted but after slot deadline | //! | `signet.builder.pylon.submission_failures` | counter | Pylon sidecar submission errors | //! | `signet.builder.pylon.sidecars_submitted` | counter | Successful Pylon sidecar submissions | use crate::{ @@ -23,9 +24,13 @@ use crate::{ tasks::{block::sim::SimResult, submit::SubmitPrep}, }; use alloy::{ - consensus::TxEnvelope, eips::Encodable2718, primitives::TxHash, providers::ext::MevApi, + consensus::TxEnvelope, + eips::Encodable2718, + primitives::{Bytes, TxHash}, + providers::ext::MevApi, rpc::types::mev::EthSendBundle, }; +use futures_util::stream::{FuturesUnordered, StreamExt}; use init4_bin_base::{deps::metrics::counter, utils::signer::LocalOrAws}; use std::{ sync::Arc, @@ -40,7 +45,7 @@ use tracing::{Instrument, debug, debug_span, error, info, instrument, warn}; /// Fans out each prepared bundle to all relays concurrently and submits /// the blob sidecar to Pylon regardless of relay outcome. #[derive(Debug)] -pub struct FlashbotsTask { +pub struct SubmitTask { /// Builder configuration for the task. config: &'static BuilderConfig, /// Quincey instance for block signing. @@ -58,10 +63,10 @@ pub struct FlashbotsTask { pylon: PylonClient, } -impl FlashbotsTask { - /// Returns a new `FlashbotsTask` instance that receives `SimResult` types from the given +impl SubmitTask { + /// Returns a new `SubmitTask` instance that receives `SimResult` types from the given /// channel and handles their preparation and submission to MEV relay/builder endpoints. - pub async fn new(outbound: mpsc::UnboundedSender) -> eyre::Result { + pub async fn new(outbound: mpsc::UnboundedSender) -> eyre::Result { let config = crate::config(); let (quincey, host_provider, relays, builder_key) = tokio::try_join!( @@ -190,8 +195,7 @@ impl FlashbotsTask { } span_debug!(span, "submit task received block"); - let result = self.prepare(&sim_result).instrument(span.clone()).await; - let bundle = match result { + let bundle = match self.prepare(&sim_result).instrument(span.clone()).await { Ok(bundle) => bundle, Err(error) => { counter!("signet.builder.submit.bundle_prep_failures").increment(1); @@ -203,105 +207,18 @@ impl FlashbotsTask { // The block transaction is always last in the bundle. let block_tx = bundle.txs.last().unwrap().clone(); + let submission = Submission { + bundle, + block_tx, + relays: Arc::clone(&self.relays), + signer: self.signer.clone(), + pylon: self.pylon.clone(), + deadline, + }; + let _guard = span.enter(); - let submit_span = debug_span!("submit.fan_out",).or_current(); - - let relays = Arc::clone(&self.relays); - let signer = self.signer.clone(); - let pylon = self.pylon.clone(); - - tokio::spawn( - async move { - let n_relays = relays.len(); - - // Build one future per relay - let futs: Vec<_> = relays - .iter() - .map(|(url, provider)| { - let bundle = bundle.clone(); - let signer = signer.clone(); - let url = url.clone(); - async move { - let result = provider - .send_bundle(bundle) - .with_auth(signer) - .into_future() - .await; - (url, result) - } - }) - .collect(); - - // Apply deadline timeout to the fan-out - let deadline_dur = deadline - .saturating_duration_since(Instant::now()) - .max(Duration::from_secs(1)); - - let (mut successes, mut failures) = (0u32, 0u32); - - match tokio::time::timeout(deadline_dur, futures_util::future::join_all(futs)) - .await - { - Ok(relay_results) => { - for (url, result) in &relay_results { - let host = url.host_str().unwrap_or("unknown"); - counter!("signet.builder.submit.relay_submissions").increment(1); - match result { - Ok(_) => { - counter!("signet.builder.submit.relay_successes") - .increment(1); - debug!(relay = host, "bundle accepted"); - successes += 1; - } - Err(err) => { - counter!("signet.builder.submit.relay_failures") - .increment(1); - warn!(relay = host, %err, "bundle rejected"); - failures += 1; - } - } - } - } - Err(_) => { - counter!("signet.builder.submit.deadline_missed").increment(1); - warn!("relay fan-out timed out - some relays may not have responded"); - } - } - - if successes == 0 { - counter!("signet.builder.submit.all_relays_failed").increment(1); - error!( - failures, - n_relays, "all relay submissions failed - bundle may not land" - ); - } else { - counter!("signet.builder.submit.bundles_submitted").increment(1); - if Instant::now() > deadline { - counter!("signet.builder.submit.deadline_missed").increment(1); - warn!(successes, failures, "bundle submitted to relays AFTER deadline"); - } else { - counter!("signet.builder.submit.deadline_met").increment(1); - info!( - successes, - failures, n_relays, "bundle submitted to relays within deadline" - ); - } - } - - // Always submit sidecar to Pylon, regardless of relay - // outcome. The sidecar must be available on the host chain - // even if relay submission failed or timed out. - if let Err(err) = pylon.post_blob_tx(block_tx).await { - counter!("signet.builder.pylon.submission_failures").increment(1); - warn!(%err, "pylon submission failed"); - return; - } - - counter!("signet.builder.pylon.sidecars_submitted").increment(1); - debug!("posted sidecar to pylon"); - } - .instrument(submit_span.clone()), - ); + let submit_span = debug_span!("submit.fan_out").or_current(); + tokio::spawn(submission.run().instrument(submit_span)); } } @@ -338,3 +255,164 @@ impl FlashbotsTask { (sender, handle) } } + +/// Outcome of a single relay submission attempt. +enum RelayOutcome { + /// Relay accepted the bundle before the deadline. + Success, + /// Relay returned an error before the deadline. + Failure, + /// Relay did not respond before the per-relay deadline. + Timeout, +} + +/// State for a single bundle submission attempt across all relays and Pylon. +/// +/// Created per-block by [`SubmitTask::task_future`] and spawned as an +/// independent tokio task so the main loop can immediately begin +/// preparing the next block. +struct Submission { + /// The MEV bundle to fan out to relays. + bundle: EthSendBundle, + /// The encoded block transaction (last entry in the bundle), sent to + /// Pylon as a blob sidecar. + block_tx: Bytes, + /// Relay endpoints for concurrent submission. + relays: Arc>, + /// Signing key for relay authentication. + signer: LocalOrAws, + /// Pylon client for blob sidecar submission. + pylon: PylonClient, + /// Deadline by which relay responses should arrive. + deadline: Instant, +} + +impl Submission { + /// Run the full submission pipeline: relay fan-out then Pylon sidecar. + async fn run(self) { + let outcomes = self.submit_to_relays().await; + self.report_relay_metrics(&outcomes); + self.submit_to_pylon().await; + } + + /// Submit the bundle to a single relay with an individual deadline. + async fn submit_to_relay( + bundle: EthSendBundle, + signer: LocalOrAws, + relay_url: url::Url, + provider: &RelayProvider, + timeout_dur: Duration, + ) -> RelayOutcome { + let host: String = relay_url.host_str().unwrap_or("unknown").to_string(); + + match tokio::time::timeout( + timeout_dur, + provider.send_bundle(bundle).with_auth(signer).into_future(), + ) + .await + { + Ok(Ok(_)) => { + debug!(relay = %host, "bundle accepted"); + RelayOutcome::Success + } + Ok(Err(err)) => { + warn!(relay = %host, %err, "bundle rejected"); + RelayOutcome::Failure + } + Err(_) => { + warn!(relay = %host, "relay submission timed out"); + RelayOutcome::Timeout + } + } + } + + /// Fan out the bundle to all relays, processing results as they arrive. + /// + /// Each relay gets its own timeout so that a slow relay cannot suppress + /// results from faster relays. + async fn submit_to_relays(&self) -> Vec { + let n_relays = self.relays.len(); + counter!("signet.builder.submit.relay_submissions").increment(n_relays as u64); + + let timeout_dur = self.deadline.saturating_duration_since(Instant::now()); + + let mut futs: FuturesUnordered<_> = self + .relays + .iter() + .map(|(url, provider)| { + Self::submit_to_relay( + self.bundle.clone(), + self.signer.clone(), + url.clone(), + provider, + timeout_dur, + ) + }) + .collect(); + + let mut outcomes = Vec::with_capacity(n_relays); + while let Some(outcome) = futs.next().await { + outcomes.push(outcome); + } + + outcomes + } + + /// Report per-relay and aggregate metrics from the collected outcomes. + fn report_relay_metrics(&self, outcomes: &[RelayOutcome]) { + let n_relays = self.relays.len(); + let (mut successes, mut failures, mut timeouts) = (0u32, 0u32, 0u32); + + for outcome in outcomes { + match outcome { + RelayOutcome::Success => { + counter!("signet.builder.submit.relay_successes").increment(1); + successes += 1; + } + RelayOutcome::Failure => { + counter!("signet.builder.submit.relay_failures").increment(1); + failures += 1; + } + RelayOutcome::Timeout => { + counter!("signet.builder.submit.relay_timeouts").increment(1); + timeouts += 1; + } + } + } + + if successes == 0 { + counter!("signet.builder.submit.all_relays_failed").increment(1); + error!( + failures, + timeouts, n_relays, "all relay submissions failed - bundle may not land" + ); + } else { + counter!("signet.builder.submit.bundles_submitted").increment(1); + if Instant::now() > self.deadline { + counter!("signet.builder.submit.deadline_missed").increment(1); + warn!(successes, failures, timeouts, "bundle accepted by relays AFTER deadline"); + } else { + counter!("signet.builder.submit.deadline_met").increment(1); + info!( + successes, + failures, timeouts, n_relays, "bundle submitted to relays within deadline" + ); + } + } + } + + /// Submit the blob sidecar to Pylon unconditionally. + /// + /// The sidecar must be available on the host chain even if relay + /// submission failed or timed out. + async fn submit_to_pylon(&self) { + if let Err(err) = self.pylon.post_blob_tx(self.block_tx.clone()).await { + counter!("signet.builder.pylon.submission_failures").increment(1); + warn!(%err, "pylon submission failed"); + return; + } + + counter!("signet.builder.pylon.sidecars_submitted").increment(1); + debug!("posted sidecar to pylon"); + } +}