diff --git a/bin/node/src/cli.rs b/bin/node/src/cli.rs index 36aaf70c..ea0d1b0d 100644 --- a/bin/node/src/cli.rs +++ b/bin/node/src/cli.rs @@ -50,10 +50,7 @@ impl Args { impl From for Option { fn from(args: Args) -> Self { - args.websocket_url.map(|url| FlashblocksConfig { - websocket_url: url, - max_pending_blocks_depth: args.max_pending_blocks_depth, - }) + args.websocket_url.map(|url| FlashblocksConfig::new(url, args.max_pending_blocks_depth)) } } diff --git a/crates/client/flashblocks/Cargo.toml b/crates/client/flashblocks/Cargo.toml index b2a92a8f..832e4d95 100644 --- a/crates/client/flashblocks/Cargo.toml +++ b/crates/client/flashblocks/Cargo.toml @@ -79,7 +79,6 @@ serde.workspace = true revm-database.workspace = true # misc -once_cell.workspace = true url.workspace = true thiserror.workspace = true tracing.workspace = true diff --git a/crates/client/flashblocks/benches/pending_state.rs b/crates/client/flashblocks/benches/pending_state.rs index 58609ea2..a8ec6d91 100644 --- a/crates/client/flashblocks/benches/pending_state.rs +++ b/crates/client/flashblocks/benches/pending_state.rs @@ -114,8 +114,8 @@ fn pending_state_benches(c: &mut Criterion) { } async fn build_pending_state(input: BenchInput) { - let state = FlashblocksState::new(input.provider, 5); - state.start(); + let state = FlashblocksState::new(5); + state.start(input.provider); state.on_canonical_block_received(input.canonical_block); for flashblock in input.flashblocks { @@ -148,7 +148,7 @@ fn init_bench_tracing() { } async fn wait_for_pending_state( - state: &FlashblocksState, + state: &FlashblocksState, target_block: BlockNumber, expected_index: u64, ) { diff --git a/crates/client/flashblocks/src/extension.rs b/crates/client/flashblocks/src/extension.rs index a9f68d40..9365c47e 100644 --- a/crates/client/flashblocks/src/extension.rs +++ b/crates/client/flashblocks/src/extension.rs @@ -3,9 +3,8 @@ use std::sync::Arc; -use base_client_node::{BaseNodeExtension, FromExtensionConfig, OpBuilder, OpProvider}; +use base_client_node::{BaseNodeExtension, FromExtensionConfig, OpBuilder}; use futures_util::TryStreamExt; -use once_cell::sync::OnceCell; use reth_exex::ExExEvent; use tracing::info; use url::Url; @@ -15,34 +14,37 @@ use crate::{ FlashblocksSubscriber, }; -/// The flashblocks cell holds a shared state reference. -pub type FlashblocksCell = Arc>>; - /// Flashblocks-specific configuration knobs. -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct FlashblocksConfig { /// The websocket endpoint that streams flashblock updates. - pub websocket_url: String, + pub websocket_url: Url, /// Maximum number of pending flashblocks to retain in memory. pub max_pending_blocks_depth: u64, + /// Shared Flashblocks state. + pub state: Arc, +} + +impl FlashblocksConfig { + /// Create a new Flashblocks configuration. + pub fn new(websocket_url: String, max_pending_blocks_depth: u64) -> Self { + let state = Arc::new(FlashblocksState::new(max_pending_blocks_depth)); + let ws_url = Url::parse(&websocket_url).expect("valid websocket URL"); + Self { websocket_url: ws_url, max_pending_blocks_depth, state } + } } /// Helper struct that wires the Flashblocks feature (canon ExEx and RPC) into the node builder. -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct FlashblocksExtension { - /// Shared Flashblocks state cache. - cell: FlashblocksCell>, - /// Optional Flashblocks configuration. + /// Optional Flashblocks configuration (includes state). config: Option, } impl FlashblocksExtension { /// Create a new Flashblocks extension helper. - pub const fn new( - cell: FlashblocksCell>, - config: Option, - ) -> Self { - Self { cell, config } + pub const fn new(config: Option) -> Self { + Self { config } } } @@ -54,23 +56,17 @@ impl BaseNodeExtension for FlashblocksExtension { return builder; }; - let flashblocks_cell = self.cell; - let cfg_for_rpc = cfg.clone(); - let flashblocks_cell_for_rpc = flashblocks_cell.clone(); + let state = cfg.state; + let mut subscriber = FlashblocksSubscriber::new(state.clone(), cfg.websocket_url); + + let state_for_exex = state.clone(); + let state_for_rpc = state.clone(); + let state_for_start = state; // Install the canon ExEx let builder = builder.install_exex("flashblocks-canon", move |mut ctx| { - let flashblocks_cell = flashblocks_cell.clone(); + let fb = state_for_exex; async move { - let fb = flashblocks_cell - .get_or_init(|| { - Arc::new(FlashblocksState::new( - ctx.provider().clone(), - cfg.max_pending_blocks_depth, - )) - }) - .clone(); - Ok(async move { while let Some(note) = ctx.notifications.try_next().await? { if let Some(committed) = note.committed_chain() { @@ -87,23 +83,19 @@ impl BaseNodeExtension for FlashblocksExtension { } }); + // Start state processor and subscriber after node is started + let builder = builder.on_node_started(move |ctx| { + info!(message = "Starting Flashblocks state processor"); + state_for_start.start(ctx.provider().clone()); + subscriber.start(); + Ok(()) + }); + // Extend with RPC modules builder.extend_rpc_modules(move |ctx| { info!(message = "Starting Flashblocks RPC"); - let ws_url = Url::parse(cfg_for_rpc.websocket_url.as_str())?; - let fb = flashblocks_cell_for_rpc - .get_or_init(|| { - Arc::new(FlashblocksState::new( - ctx.provider().clone(), - cfg_for_rpc.max_pending_blocks_depth, - )) - }) - .clone(); - fb.start(); - - let mut flashblocks_client = FlashblocksSubscriber::new(fb.clone(), ws_url); - flashblocks_client.start(); + let fb = state_for_rpc; let api_ext = EthApiExt::new( ctx.registry.eth_api().clone(), @@ -127,6 +119,6 @@ impl FromExtensionConfig for FlashblocksExtension { type Config = Option; fn from_config(config: Self::Config) -> Self { - Self::new(Arc::new(OnceCell::new()), config) + Self::new(config) } } diff --git a/crates/client/flashblocks/src/lib.rs b/crates/client/flashblocks/src/lib.rs index 06f1f2b0..ad3a288f 100644 --- a/crates/client/flashblocks/src/lib.rs +++ b/crates/client/flashblocks/src/lib.rs @@ -45,7 +45,7 @@ pub use rpc::{ }; mod extension; -pub use extension::{FlashblocksCell, FlashblocksConfig, FlashblocksExtension}; +pub use extension::{FlashblocksConfig, FlashblocksExtension}; #[cfg(any(test, feature = "test-utils"))] pub mod test_harness; diff --git a/crates/client/flashblocks/src/state.rs b/crates/client/flashblocks/src/state.rs index 8c2632fc..0db56e7c 100644 --- a/crates/client/flashblocks/src/state.rs +++ b/crates/client/flashblocks/src/state.rs @@ -25,43 +25,56 @@ use crate::{ const BUFFER_SIZE: usize = 20; /// Manages the pending flashblock state and processes incoming updates. -#[derive(Debug, Clone)] -pub struct FlashblocksState { +#[derive(Debug)] +pub struct FlashblocksState { pending_blocks: Arc>, queue: mpsc::UnboundedSender, + rx: Arc>>, flashblock_sender: Sender>, - state_processor: StateProcessor, + max_pending_blocks_depth: u64, } -impl FlashblocksState -where - Client: StateProviderFactory - + ChainSpecProvider + OpHardforks> - + BlockReaderIdExt
- + Clone - + 'static, -{ +impl FlashblocksState { /// Creates a new flashblocks state manager. - pub fn new(client: Client, max_pending_blocks_depth: u64) -> Self { + /// + /// The state is created without a client. Call [`start`](Self::start) with a client + /// to spawn the state processor after the node is launched. + pub fn new(max_pending_blocks_depth: u64) -> Self { let (tx, rx) = mpsc::unbounded_channel::(); let pending_blocks: Arc> = Arc::new(ArcSwapOption::new(None)); let (flashblock_sender, _) = broadcast::channel(BUFFER_SIZE); + + Self { + pending_blocks, + queue: tx, + rx: Arc::new(Mutex::new(rx)), + flashblock_sender, + max_pending_blocks_depth, + } + } + + /// Starts the flashblocks state processor with the given client. + /// + /// This spawns a background task that processes canonical blocks and flashblocks. + /// Should be called after the node is launched and the provider is available. + pub fn start(&self, client: Client) + where + Client: StateProviderFactory + + ChainSpecProvider + OpHardforks> + + BlockReaderIdExt
+ + Clone + + 'static, + { let state_processor = StateProcessor::new( client, - pending_blocks.clone(), - max_pending_blocks_depth, - Arc::new(Mutex::new(rx)), - flashblock_sender.clone(), + self.pending_blocks.clone(), + self.max_pending_blocks_depth, + self.rx.clone(), + self.flashblock_sender.clone(), ); - Self { pending_blocks, queue: tx, flashblock_sender, state_processor } - } - - /// Starts the flashblocks state processor. - pub fn start(&self) { - let sp = self.state_processor.clone(); tokio::spawn(async move { - sp.start().await; + state_processor.start().await; }); } @@ -79,7 +92,7 @@ where } } -impl FlashblocksReceiver for FlashblocksState { +impl FlashblocksReceiver for FlashblocksState { fn on_flashblock_received(&self, flashblock: Flashblock) { let flashblock_index = flashblock.index; let block_number = flashblock.metadata.block_number; @@ -97,7 +110,7 @@ impl FlashblocksReceiver for FlashblocksState { } } -impl FlashblocksAPI for FlashblocksState { +impl FlashblocksAPI for FlashblocksState { fn get_pending_blocks(&self) -> Guard>> { self.pending_blocks.load() } diff --git a/crates/client/flashblocks/src/test_harness.rs b/crates/client/flashblocks/src/test_harness.rs index 561dba72..654b2968 100644 --- a/crates/client/flashblocks/src/test_harness.rs +++ b/crates/client/flashblocks/src/test_harness.rs @@ -15,14 +15,12 @@ use std::{ use base_client_node::{ BaseNodeExtension, test_utils::{ - LocalNode, LocalNodeProvider, NODE_STARTUP_DELAY_MS, TestHarness, build_test_genesis, - init_silenced_tracing, + LocalNode, NODE_STARTUP_DELAY_MS, TestHarness, build_test_genesis, init_silenced_tracing, }, }; use base_flashtypes::Flashblock; use derive_more::Deref; use eyre::Result; -use once_cell::sync::OnceCell; use reth_optimism_chainspec::OpChainSpec; use reth_provider::CanonStateSubscriptions; use tokio::sync::{mpsc, oneshot}; @@ -33,14 +31,11 @@ use crate::{ FlashblocksState, }; -/// Convenience alias for the Flashblocks state backing the local node. -pub type LocalFlashblocksState = FlashblocksState; - /// Components that allow tests to interact with the Flashblocks worker tasks. #[derive(Clone)] pub struct FlashblocksParts { sender: mpsc::Sender<(Flashblock, oneshot::Sender<()>)>, - state: Arc, + state: Arc, } impl fmt::Debug for FlashblocksParts { @@ -51,7 +46,7 @@ impl fmt::Debug for FlashblocksParts { impl FlashblocksParts { /// Clone the shared [`FlashblocksState`] handle. - pub fn state(&self) -> Arc { + pub fn state(&self) -> Arc { self.state.clone() } @@ -77,7 +72,7 @@ struct FlashblocksTestExtensionInner { sender: mpsc::Sender<(Flashblock, oneshot::Sender<()>)>, #[allow(clippy::type_complexity)] receiver: Arc)>>>>, - fb_cell: Arc>>, + state: Arc, process_canonical: bool, } @@ -99,7 +94,7 @@ impl FlashblocksTestExtension { let inner = FlashblocksTestExtensionInner { sender, receiver: Arc::new(Mutex::new(Some(receiver))), - fb_cell: Arc::new(OnceCell::new()), + state: Arc::new(FlashblocksState::new(5)), process_canonical, }; Self { inner: Arc::new(inner) } @@ -107,27 +102,23 @@ impl FlashblocksTestExtension { /// Get the flashblocks parts after the node has been launched. pub fn parts(&self) -> Result { - let state = self.inner.fb_cell.get().ok_or_else(|| { - eyre::eyre!("FlashblocksState should be initialized during node launch") - })?; - Ok(FlashblocksParts { sender: self.inner.sender.clone(), state: state.clone() }) + Ok(FlashblocksParts { sender: self.inner.sender.clone(), state: self.inner.state.clone() }) } } impl BaseNodeExtension for FlashblocksTestExtension { fn apply(self: Box, builder: base_client_node::OpBuilder) -> base_client_node::OpBuilder { - let fb_cell = self.inner.fb_cell.clone(); + let state = self.inner.state.clone(); let receiver = self.inner.receiver.clone(); let process_canonical = self.inner.process_canonical; - let fb_cell_for_exex = fb_cell.clone(); + let state_for_exex = state.clone(); + let state_for_rpc = state.clone(); builder .install_exex("flashblocks-canon", move |mut ctx| { - let fb_cell = fb_cell_for_exex.clone(); + let fb = state_for_exex.clone(); async move { - let provider = ctx.provider().clone(); - let fb = init_flashblocks_state(&fb_cell, &provider); Ok(async move { use reth_exex::ExExEvent; while let Some(note) = ctx.notifications.try_next().await? { @@ -149,9 +140,11 @@ impl BaseNodeExtension for FlashblocksTestExtension { } }) .extend_rpc_modules(move |ctx| { - let fb_cell = fb_cell.clone(); + let fb = state_for_rpc; let provider = ctx.provider().clone(); - let fb = init_flashblocks_state(&fb_cell, &provider); + + // Start the state processor with the provider + fb.start(provider.clone()); let mut canon_stream = tokio_stream::wrappers::BroadcastStream::new( ctx.provider().subscribe_to_canonical_state(), @@ -193,18 +186,6 @@ impl BaseNodeExtension for FlashblocksTestExtension { } } -fn init_flashblocks_state( - cell: &Arc>>, - provider: &LocalNodeProvider, -) -> Arc { - cell.get_or_init(|| { - let fb = Arc::new(FlashblocksState::new(provider.clone(), 5)); - fb.start(); - fb - }) - .clone() -} - /// Local node wrapper that exposes helpers specific to Flashblocks tests. pub struct FlashblocksLocalNode { node: LocalNode, @@ -245,7 +226,7 @@ impl FlashblocksLocalNode { } /// Access the shared Flashblocks state for assertions or manual driving. - pub fn flashblocks_state(&self) -> Arc { + pub fn flashblocks_state(&self) -> Arc { self.parts.state() } @@ -280,7 +261,7 @@ impl FlashblocksHarness { } /// Get a handle to the in-memory Flashblocks state backing the harness. - pub fn flashblocks_state(&self) -> Arc { + pub fn flashblocks_state(&self) -> Arc { self.parts.state() } diff --git a/crates/client/flashblocks/tests/state.rs b/crates/client/flashblocks/tests/state.rs index a1d47947..dbbe3c00 100644 --- a/crates/client/flashblocks/tests/state.rs +++ b/crates/client/flashblocks/tests/state.rs @@ -31,7 +31,7 @@ const SLEEP_TIME: u64 = 10; struct TestHarness { node: FlashblocksHarness, - flashblocks: Arc>, + flashblocks: Arc, provider: LocalNodeProvider, }