diff --git a/Cargo.lock b/Cargo.lock index de0ca2c9..e33869fb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2158,8 +2158,8 @@ dependencies = [ "jsonrpsee", "lru 0.16.3", "metrics", - "reth-exex", "metrics-derive", + "reth-exex", "reth-node-api", "reth-primitives-traits", "reth-provider", diff --git a/crates/client/flashblocks/src/extension.rs b/crates/client/flashblocks/src/extension.rs index 17f2b659..ce97118b 100644 --- a/crates/client/flashblocks/src/extension.rs +++ b/crates/client/flashblocks/src/extension.rs @@ -3,7 +3,7 @@ use std::sync::Arc; -use base_client_node::{BaseNodeExtension, FromExtensionConfig, OpBuilder}; +use base_client_node::{BaseBuilder, BaseNodeExtension, FromExtensionConfig}; use futures_util::TryStreamExt; use reth_exex::ExExEvent; use tracing::info; @@ -50,7 +50,7 @@ impl FlashblocksExtension { impl BaseNodeExtension for FlashblocksExtension { /// Applies the extension to the supplied builder. - fn apply(self: Box, builder: OpBuilder) -> OpBuilder { + fn apply(self: Box, mut builder: BaseBuilder) -> BaseBuilder { let Some(cfg) = self.config else { info!(message = "flashblocks integration is disabled"); return builder; @@ -63,8 +63,8 @@ impl BaseNodeExtension for FlashblocksExtension { let state_for_rpc = state.clone(); let state_for_start = state; - // Install the canon ExEx - let builder = builder.install_exex("flashblocks-canon", move |mut ctx| { + // Install the canon ExEx (directly on inner builder - no accumulation needed) + builder.builder = builder.builder.install_exex("flashblocks-canon", move |mut ctx| { let fb = state_for_exex; async move { Ok(async move { @@ -84,7 +84,7 @@ impl BaseNodeExtension for FlashblocksExtension { }); // Start state processor and subscriber after node is started - let builder = builder.on_node_started(move |ctx| { + let builder = builder.add_node_started_hook(move |ctx| { info!(message = "Starting Flashblocks state processor"); state_for_start.start(ctx.provider().clone()); subscriber.start(); @@ -92,7 +92,7 @@ impl BaseNodeExtension for FlashblocksExtension { }); // Extend with RPC modules - builder.extend_rpc_modules(move |ctx| { + builder.add_rpc_module(move |ctx| { info!(message = "Starting Flashblocks RPC"); let fb = state_for_rpc; diff --git a/crates/client/flashblocks/src/test_harness.rs b/crates/client/flashblocks/src/test_harness.rs index 654b2968..8400b5b4 100644 --- a/crates/client/flashblocks/src/test_harness.rs +++ b/crates/client/flashblocks/src/test_harness.rs @@ -13,7 +13,7 @@ use std::{ }; use base_client_node::{ - BaseNodeExtension, + BaseBuilder, BaseNodeExtension, test_utils::{ LocalNode, NODE_STARTUP_DELAY_MS, TestHarness, build_test_genesis, init_silenced_tracing, }, @@ -21,10 +21,11 @@ use base_client_node::{ use base_flashtypes::Flashblock; use derive_more::Deref; use eyre::Result; +use futures_util::TryStreamExt as _; +use reth_exex::ExExEvent; use reth_optimism_chainspec::OpChainSpec; use reth_provider::CanonStateSubscriptions; use tokio::sync::{mpsc, oneshot}; -use tokio_stream::StreamExt; use crate::{ EthApiExt, EthApiOverrideServer, EthPubSub, EthPubSubApiServer, FlashblocksReceiver, @@ -107,7 +108,7 @@ impl FlashblocksTestExtension { } impl BaseNodeExtension for FlashblocksTestExtension { - fn apply(self: Box, builder: base_client_node::OpBuilder) -> base_client_node::OpBuilder { + fn apply(self: Box, mut builder: BaseBuilder) -> BaseBuilder { let state = self.inner.state.clone(); let receiver = self.inner.receiver.clone(); let process_canonical = self.inner.process_canonical; @@ -115,74 +116,74 @@ impl BaseNodeExtension for FlashblocksTestExtension { let state_for_exex = state.clone(); let state_for_rpc = state.clone(); - builder - .install_exex("flashblocks-canon", move |mut ctx| { - let fb = state_for_exex.clone(); - async move { - Ok(async move { - use reth_exex::ExExEvent; - while let Some(note) = ctx.notifications.try_next().await? { - if let Some(committed) = note.committed_chain() { - let hash = committed.tip().num_hash(); - if process_canonical { - // Many suites drive canonical updates manually to reproduce race conditions, so - // allowing this to be disabled keeps canonical replay deterministic. - let chain = Arc::unwrap_or_clone(committed); - for (_, block) in chain.into_blocks() { - fb.on_canonical_block_received(block); - } + // Install the canon ExEx (directly on inner builder - no accumulation needed) + builder.builder = builder.builder.install_exex("flashblocks-canon", move |mut ctx| { + let fb = state_for_exex.clone(); + async move { + Ok(async move { + while let Some(note) = ctx.notifications.try_next().await? { + if let Some(committed) = note.committed_chain() { + let hash = committed.tip().num_hash(); + if process_canonical { + // Many suites drive canonical updates manually to reproduce race conditions, so + // allowing this to be disabled keeps canonical replay deterministic. + let chain = Arc::unwrap_or_clone(committed); + for (_, block) in chain.into_blocks() { + fb.on_canonical_block_received(block); } - let _ = ctx.events.send(ExExEvent::FinishedHeight(hash)); } + let _ = ctx.events.send(ExExEvent::FinishedHeight(hash)); } - Ok(()) - }) - } - }) - .extend_rpc_modules(move |ctx| { - let fb = state_for_rpc; - let provider = ctx.provider().clone(); - - // Start the state processor with the provider - fb.start(provider.clone()); - - let mut canon_stream = tokio_stream::wrappers::BroadcastStream::new( - ctx.provider().subscribe_to_canonical_state(), - ); - tokio::spawn(async move { - use tokio_stream::StreamExt; - while let Some(Ok(notification)) = canon_stream.next().await { - provider.canonical_in_memory_state().notify_canon_state(notification); - } - }); - let api_ext = EthApiExt::new( - ctx.registry.eth_api().clone(), - ctx.registry.eth_handlers().filter.clone(), - fb.clone(), - ); - ctx.modules.replace_configured(api_ext.into_rpc())?; - - // Register eth_subscribe subscription endpoint for flashblocks - // Uses replace_configured since eth_subscribe already exists from reth's standard module - // Pass eth_api to enable proxying standard subscription types to reth's implementation - let eth_pubsub = EthPubSub::new(ctx.registry.eth_api().clone(), fb.clone()); - ctx.modules.replace_configured(eth_pubsub.into_rpc())?; - - let fb_for_task = fb.clone(); - let mut receiver = receiver - .lock() - .expect("flashblock receiver mutex poisoned") - .take() - .expect("flashblock receiver should only be initialized once"); - tokio::spawn(async move { - while let Some((payload, tx)) = receiver.recv().await { - fb_for_task.on_flashblock_received(payload); - let _ = tx.send(()); } - }); + Ok(()) + }) + } + }); + + builder.add_rpc_module(move |ctx| { + let fb = state_for_rpc; + let provider = ctx.provider().clone(); + + // Start the state processor with the provider + fb.start(provider.clone()); + + let mut canon_stream = tokio_stream::wrappers::BroadcastStream::new( + ctx.provider().subscribe_to_canonical_state(), + ); + tokio::spawn(async move { + use tokio_stream::StreamExt; + while let Some(Ok(notification)) = canon_stream.next().await { + provider.canonical_in_memory_state().notify_canon_state(notification); + } + }); + let api_ext = EthApiExt::new( + ctx.registry.eth_api().clone(), + ctx.registry.eth_handlers().filter.clone(), + fb.clone(), + ); + ctx.modules.replace_configured(api_ext.into_rpc())?; + + // Register eth_subscribe subscription endpoint for flashblocks + // Uses replace_configured since eth_subscribe already exists from reth's standard module + // Pass eth_api to enable proxying standard subscription types to reth's implementation + let eth_pubsub = EthPubSub::new(ctx.registry.eth_api().clone(), fb.clone()); + ctx.modules.replace_configured(eth_pubsub.into_rpc())?; + + let fb_for_task = fb.clone(); + let mut receiver = receiver + .lock() + .expect("flashblock receiver mutex poisoned") + .take() + .expect("flashblock receiver should only be initialized once"); + tokio::spawn(async move { + while let Some((payload, tx)) = receiver.recv().await { + fb_for_task.on_flashblock_received(payload); + let _ = tx.send(()); + } + }); - Ok(()) - }) + Ok(()) + }) } } diff --git a/crates/client/metering/src/extension.rs b/crates/client/metering/src/extension.rs index 767b3327..0a76ea28 100644 --- a/crates/client/metering/src/extension.rs +++ b/crates/client/metering/src/extension.rs @@ -3,7 +3,7 @@ use std::sync::Arc; -use base_client_node::{BaseNodeExtension, FromExtensionConfig, OpBuilder}; +use base_client_node::{BaseBuilder, BaseNodeExtension, FromExtensionConfig}; use base_flashblocks::{FlashblocksConfig, FlashblocksState}; use tracing::info; @@ -27,14 +27,14 @@ impl MeteringExtension { impl BaseNodeExtension for MeteringExtension { /// Applies the extension to the supplied builder. - fn apply(self: Box, builder: OpBuilder) -> OpBuilder { + fn apply(self: Box, builder: BaseBuilder) -> BaseBuilder { if !self.enabled { return builder; } let flashblocks_config = self.flashblocks_config; - builder.extend_rpc_modules(move |ctx| { + builder.add_rpc_module(move |ctx| { info!(message = "Starting Metering RPC"); // Get flashblocks state from config, or create a default one if not configured diff --git a/crates/client/node/src/builder.rs b/crates/client/node/src/builder.rs new file mode 100644 index 00000000..ac44b17b --- /dev/null +++ b/crates/client/node/src/builder.rs @@ -0,0 +1,125 @@ +//! Wrapper around the OP node builder that accumulates hooks instead of replacing them. + +use std::fmt; + +use eyre::Result; +use reth_node_builder::{ + NodeAdapter, NodeComponentsBuilder, + node::FullNode, + rpc::{RethRpcAddOns, RpcContext}, +}; + +use crate::{ + OpBuilder, + types::{OpAddOns, OpComponentsBuilder, OpNodeTypes}, +}; + +/// Convenience alias for the OP node adapter type used by the reth builder. +pub(crate) type OpNodeAdapter = NodeAdapter< + OpNodeTypes, + >::Components, +>; + +/// Convenience alias for the OP Eth API type exposed by the reth RPC add-ons. +type OpEthApi = >::EthApi; + +/// Convenience alias for the full OP node handle produced after launch. +type OpFullNode = FullNode; + +/// Alias for the RPC context used by Base extensions. +pub type BaseRpcContext<'a> = RpcContext<'a, OpNodeAdapter, OpEthApi>; + +/// Hook type for extending RPC modules. +type RpcModuleHook = Box) -> Result<()> + Send + 'static>; + +/// Hook type for node-started callbacks. +type NodeStartedHook = Box Result<()> + Send + 'static>; + +/// A thin wrapper over [`OpBuilder`] that accumulates RPC and node-start hooks. +pub struct BaseBuilder { + /// The underlying OP node builder. + /// + /// Exposed publicly so extensions can call methods that don't need accumulation + /// (e.g., `install_exex`) directly on the inner builder. + pub builder: OpBuilder, + rpc_hooks: Vec, + node_started_hooks: Vec, +} + +impl BaseBuilder { + /// Create a new BaseBuilder wrapping the provided OP builder. + pub const fn new(builder: OpBuilder) -> Self { + Self { builder, rpc_hooks: Vec::new(), node_started_hooks: Vec::new() } + } + + /// Consumes the wrapper and returns the inner builder after installing the accumulated hooks. + pub fn build(self) -> OpBuilder { + let Self { mut builder, mut rpc_hooks, node_started_hooks } = self; + + if !rpc_hooks.is_empty() { + builder = builder.extend_rpc_modules(move |mut ctx: BaseRpcContext<'_>| { + for hook in rpc_hooks.iter_mut() { + hook(&mut ctx)?; + } + + Ok(()) + }); + } + + if !node_started_hooks.is_empty() { + builder = builder.on_node_started(move |full_node: OpFullNode| { + let mut hooks = node_started_hooks; + for hook in hooks.iter_mut() { + hook(full_node.clone())?; + } + Ok(()) + }); + } + + builder + } + + /// Adds an RPC hook that will run when RPC modules are configured. + pub fn add_rpc_module(mut self, hook: F) -> Self + where + F: FnOnce(&mut BaseRpcContext<'_>) -> Result<()> + Send + 'static, + { + let mut hook = Some(hook); + self.rpc_hooks.push(Box::new(move |ctx| { + if let Some(hook) = hook.take() { + hook(ctx)?; + } + Ok(()) + })); + self + } + + /// Adds a node-started hook that will run after the node has started. + pub fn add_node_started_hook(mut self, hook: F) -> Self + where + F: FnOnce(OpFullNode) -> Result<()> + Send + 'static, + { + let mut hook = Some(hook); + self.node_started_hooks.push(Box::new(move |node| { + if let Some(hook) = hook.take() { + hook(node)?; + } + Ok(()) + })); + self + } + + /// Launches the node after applying accumulated hooks, delegating to the provided closure. + pub fn launch_with_fn(self, launcher: L) -> R + where + L: FnOnce(OpBuilder) -> R, + { + launcher(self.build()) + } +} + +impl fmt::Debug for BaseBuilder { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("BaseBuilder").finish_non_exhaustive() + } +} diff --git a/crates/client/node/src/extension.rs b/crates/client/node/src/extension.rs index beec0801..fe871240 100644 --- a/crates/client/node/src/extension.rs +++ b/crates/client/node/src/extension.rs @@ -2,14 +2,14 @@ use std::fmt::Debug; -use crate::OpBuilder; +use crate::BaseBuilder; /// Customizes the node builder before launch. /// /// Register extensions via [`BaseNodeRunner::install_ext`]. pub trait BaseNodeExtension: Send + Sync + Debug { /// Applies the extension to the supplied builder. - fn apply(self: Box, builder: OpBuilder) -> OpBuilder; + fn apply(self: Box, builder: BaseBuilder) -> BaseBuilder; } /// An extension that can be built from a config. diff --git a/crates/client/node/src/lib.rs b/crates/client/node/src/lib.rs index eeeb27f4..f2fa0480 100644 --- a/crates/client/node/src/lib.rs +++ b/crates/client/node/src/lib.rs @@ -3,6 +3,9 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] #![cfg_attr(not(test), warn(unused_crate_dependencies))] +mod builder; +pub use builder::{BaseBuilder, BaseRpcContext}; + mod extension; pub use extension::{BaseNodeExtension, FromExtensionConfig}; diff --git a/crates/client/node/src/runner.rs b/crates/client/node/src/runner.rs index 1456b12a..816b38fe 100644 --- a/crates/client/node/src/runner.rs +++ b/crates/client/node/src/runner.rs @@ -6,7 +6,7 @@ use reth_optimism_node::{OpNode, args::RollupArgs}; use reth_provider::providers::BlockchainProvider; use tracing::info; -use crate::{BaseNodeBuilder, BaseNodeExtension, BaseNodeHandle, FromExtensionConfig}; +use crate::{BaseBuilder, BaseNodeBuilder, BaseNodeExtension, BaseNodeHandle, FromExtensionConfig}; /// Wraps the Base node configuration and orchestrates builder wiring. #[derive(Debug)] @@ -50,8 +50,9 @@ impl BaseNodeRunner { .with_add_ons(op_node.add_ons()) .on_component_initialized(move |_ctx| Ok(())); - let builder = - extensions.into_iter().fold(builder, |builder, extension| extension.apply(builder)); + let builder = extensions + .into_iter() + .fold(BaseBuilder::new(builder), |builder, extension| extension.apply(builder)); builder .launch_with_fn(|builder| { diff --git a/crates/client/node/src/test_utils/node.rs b/crates/client/node/src/test_utils/node.rs index 8d3c7df4..7086be52 100644 --- a/crates/client/node/src/test_utils/node.rs +++ b/crates/client/node/src/test_utils/node.rs @@ -22,7 +22,7 @@ use reth_optimism_node::{OpNode, args::RollupArgs}; use reth_provider::providers::BlockchainProvider; use reth_tasks::TaskManager; -use crate::{BaseNodeExtension, OpProvider, test_utils::engine::EngineApi}; +use crate::{BaseBuilder, BaseNodeExtension, OpProvider, test_utils::engine::EngineApi}; /// Convenience alias for the local blockchain provider type. pub type LocalNodeProvider = OpProvider; @@ -103,8 +103,9 @@ impl LocalNode { .on_component_initialized(move |_ctx| Ok(())); // Apply all extensions - let builder = - extensions.into_iter().fold(builder, |builder, extension| extension.apply(builder)); + let builder = extensions + .into_iter() + .fold(BaseBuilder::new(builder), |builder, extension| extension.apply(builder)); // Launch with EngineNodeLauncher let NodeHandle { node: node_handle, node_exit_future } = builder @@ -166,6 +167,11 @@ impl LocalNode { Ok(RootProvider::::new(client)) } + /// HTTP RPC address for the local node. + pub const fn http_addr(&self) -> SocketAddr { + self.http_api_addr + } + /// Build an Engine API client that talks to the node's IPC endpoint. pub fn engine_api(&self) -> Result> { EngineApi::::new(self.engine_ipc_path.clone()) diff --git a/crates/client/node/src/types.rs b/crates/client/node/src/types.rs index ea7d2750..7974fff9 100644 --- a/crates/client/node/src/types.rs +++ b/crates/client/node/src/types.rs @@ -11,9 +11,12 @@ use reth_optimism_chainspec::OpChainSpec; use reth_optimism_node::OpNode; use reth_provider::providers::BlockchainProvider; -type OpNodeTypes = FullNodeTypesAdapter, OpProvider>; -type OpComponentsBuilder = >::ComponentsBuilder; -type OpAddOns = >::AddOns; +/// Internal alias for the OP node type adapter. +pub(crate) type OpNodeTypes = FullNodeTypesAdapter, OpProvider>; +/// Internal alias for the OP node components builder. +pub(crate) type OpComponentsBuilder = >::ComponentsBuilder; +/// Internal alias for the OP node add-ons. +pub(crate) type OpAddOns = >::AddOns; /// A [`BlockchainProvider`] instance. pub type OpProvider = BlockchainProvider>>; diff --git a/crates/client/txpool/src/extension.rs b/crates/client/txpool/src/extension.rs index 3800db71..4c855cfe 100644 --- a/crates/client/txpool/src/extension.rs +++ b/crates/client/txpool/src/extension.rs @@ -1,7 +1,7 @@ //! Contains the [`TxPoolExtension`] which wires up the transaction pool features //! (tracing ExEx and status RPC) on the Base node builder. -use base_client_node::{BaseNodeExtension, FromExtensionConfig, OpBuilder}; +use base_client_node::{BaseBuilder, BaseNodeExtension, FromExtensionConfig}; use tracing::info; use crate::{TransactionStatusApiImpl, TransactionStatusApiServer, tracex_exex}; @@ -33,19 +33,20 @@ impl TxPoolExtension { impl BaseNodeExtension for TxPoolExtension { /// Applies the extension to the supplied builder. - fn apply(self: Box, builder: OpBuilder) -> OpBuilder { + fn apply(self: Box, mut builder: BaseBuilder) -> BaseBuilder { let config = self.config; - // Install the tracing ExEx if enabled - let logs_enabled = config.tracing_logs_enabled; - let builder = - builder.install_exex_if(config.tracing_enabled, "tracex", move |ctx| async move { + // Install the tracing ExEx if enabled (directly on inner builder - no accumulation needed) + if config.tracing_enabled { + let logs_enabled = config.tracing_logs_enabled; + builder.builder = builder.builder.install_exex("tracex", move |ctx| async move { Ok(tracex_exex(ctx, logs_enabled)) }); + } // Extend with RPC modules let sequencer_rpc = config.sequencer_rpc; - builder.extend_rpc_modules(move |ctx| { + builder.add_rpc_module(move |ctx| { info!(message = "Starting Transaction Status RPC"); let proxy_api = TransactionStatusApiImpl::new(sequencer_rpc, ctx.pool().clone()) .expect("Failed to create transaction status proxy");