Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions bin/node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,7 @@ impl Args {

impl From<Args> for Option<FlashblocksConfig> {
fn from(args: Args) -> Self {
args.websocket_url.map(|url| FlashblocksConfig {
websocket_url: url,
max_pending_blocks_depth: args.max_pending_blocks_depth,
})
args.websocket_url.map(|url| FlashblocksConfig::new(url, args.max_pending_blocks_depth))
}
}

Expand Down
1 change: 0 additions & 1 deletion crates/client/flashblocks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ serde.workspace = true
revm-database.workspace = true

# misc
once_cell.workspace = true
url.workspace = true
thiserror.workspace = true
tracing.workspace = true
Expand Down
6 changes: 3 additions & 3 deletions crates/client/flashblocks/benches/pending_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ fn pending_state_benches(c: &mut Criterion) {
}

async fn build_pending_state(input: BenchInput) {
let state = FlashblocksState::new(input.provider, 5);
state.start();
let state = FlashblocksState::new(5);
state.start(input.provider);
state.on_canonical_block_received(input.canonical_block);

for flashblock in input.flashblocks {
Expand Down Expand Up @@ -148,7 +148,7 @@ fn init_bench_tracing() {
}

async fn wait_for_pending_state(
state: &FlashblocksState<LocalNodeProvider>,
state: &FlashblocksState,
target_block: BlockNumber,
expected_index: u64,
) {
Expand Down
78 changes: 35 additions & 43 deletions crates/client/flashblocks/src/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@

use std::sync::Arc;

use base_client_node::{BaseNodeExtension, FromExtensionConfig, OpBuilder, OpProvider};
use base_client_node::{BaseNodeExtension, FromExtensionConfig, OpBuilder};
use futures_util::TryStreamExt;
use once_cell::sync::OnceCell;
use reth_exex::ExExEvent;
use tracing::info;
use url::Url;
Expand All @@ -15,34 +14,37 @@ use crate::{
FlashblocksSubscriber,
};

/// The flashblocks cell holds a shared state reference.
pub type FlashblocksCell<T> = Arc<OnceCell<Arc<T>>>;

/// Flashblocks-specific configuration knobs.
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct FlashblocksConfig {
/// The websocket endpoint that streams flashblock updates.
pub websocket_url: String,
pub websocket_url: Url,
/// Maximum number of pending flashblocks to retain in memory.
pub max_pending_blocks_depth: u64,
/// Shared Flashblocks state.
pub state: Arc<FlashblocksState>,
}

impl FlashblocksConfig {
/// Create a new Flashblocks configuration.
pub fn new(websocket_url: String, max_pending_blocks_depth: u64) -> Self {
let state = Arc::new(FlashblocksState::new(max_pending_blocks_depth));
let ws_url = Url::parse(&websocket_url).expect("valid websocket URL");
Self { websocket_url: ws_url, max_pending_blocks_depth, state }
}
}

/// Helper struct that wires the Flashblocks feature (canon ExEx and RPC) into the node builder.
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct FlashblocksExtension {
/// Shared Flashblocks state cache.
cell: FlashblocksCell<FlashblocksState<OpProvider>>,
/// Optional Flashblocks configuration.
/// Optional Flashblocks configuration (includes state).
config: Option<FlashblocksConfig>,
}

impl FlashblocksExtension {
/// Create a new Flashblocks extension helper.
pub const fn new(
cell: FlashblocksCell<FlashblocksState<OpProvider>>,
config: Option<FlashblocksConfig>,
) -> Self {
Self { cell, config }
pub const fn new(config: Option<FlashblocksConfig>) -> Self {
Self { config }
}
}

Expand All @@ -54,23 +56,17 @@ impl BaseNodeExtension for FlashblocksExtension {
return builder;
};

let flashblocks_cell = self.cell;
let cfg_for_rpc = cfg.clone();
let flashblocks_cell_for_rpc = flashblocks_cell.clone();
let state = cfg.state;
let mut subscriber = FlashblocksSubscriber::new(state.clone(), cfg.websocket_url);

let state_for_exex = state.clone();
let state_for_rpc = state.clone();
let state_for_start = state;

// Install the canon ExEx
let builder = builder.install_exex("flashblocks-canon", move |mut ctx| {
let flashblocks_cell = flashblocks_cell.clone();
let fb = state_for_exex;
async move {
let fb = flashblocks_cell
.get_or_init(|| {
Arc::new(FlashblocksState::new(
ctx.provider().clone(),
cfg.max_pending_blocks_depth,
))
})
.clone();

Ok(async move {
while let Some(note) = ctx.notifications.try_next().await? {
if let Some(committed) = note.committed_chain() {
Expand All @@ -87,23 +83,19 @@ impl BaseNodeExtension for FlashblocksExtension {
}
});

// Start state processor and subscriber after node is started
let builder = builder.on_node_started(move |ctx| {
info!(message = "Starting Flashblocks state processor");
state_for_start.start(ctx.provider().clone());
subscriber.start();
Ok(())
});

// Extend with RPC modules
builder.extend_rpc_modules(move |ctx| {
info!(message = "Starting Flashblocks RPC");

let ws_url = Url::parse(cfg_for_rpc.websocket_url.as_str())?;
let fb = flashblocks_cell_for_rpc
.get_or_init(|| {
Arc::new(FlashblocksState::new(
ctx.provider().clone(),
cfg_for_rpc.max_pending_blocks_depth,
))
})
.clone();
fb.start();

let mut flashblocks_client = FlashblocksSubscriber::new(fb.clone(), ws_url);
flashblocks_client.start();
let fb = state_for_rpc;

let api_ext = EthApiExt::new(
ctx.registry.eth_api().clone(),
Expand All @@ -127,6 +119,6 @@ impl FromExtensionConfig for FlashblocksExtension {
type Config = Option<FlashblocksConfig>;

fn from_config(config: Self::Config) -> Self {
Self::new(Arc::new(OnceCell::new()), config)
Self::new(config)
}
}
2 changes: 1 addition & 1 deletion crates/client/flashblocks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub use rpc::{
};

mod extension;
pub use extension::{FlashblocksCell, FlashblocksConfig, FlashblocksExtension};
pub use extension::{FlashblocksConfig, FlashblocksExtension};

#[cfg(any(test, feature = "test-utils"))]
pub mod test_harness;
63 changes: 38 additions & 25 deletions crates/client/flashblocks/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,43 +25,56 @@ use crate::{
const BUFFER_SIZE: usize = 20;

/// Manages the pending flashblock state and processes incoming updates.
#[derive(Debug, Clone)]
pub struct FlashblocksState<Client> {
#[derive(Debug)]
pub struct FlashblocksState {
pending_blocks: Arc<ArcSwapOption<PendingBlocks>>,
queue: mpsc::UnboundedSender<StateUpdate>,
rx: Arc<Mutex<mpsc::UnboundedReceiver<StateUpdate>>>,
flashblock_sender: Sender<Arc<PendingBlocks>>,
state_processor: StateProcessor<Client>,
max_pending_blocks_depth: u64,
}

impl<Client> FlashblocksState<Client>
where
Client: StateProviderFactory
+ ChainSpecProvider<ChainSpec: EthChainSpec<Header = Header> + OpHardforks>
+ BlockReaderIdExt<Header = Header>
+ Clone
+ 'static,
{
impl FlashblocksState {
/// Creates a new flashblocks state manager.
pub fn new(client: Client, max_pending_blocks_depth: u64) -> Self {
///
/// The state is created without a client. Call [`start`](Self::start) with a client
/// to spawn the state processor after the node is launched.
pub fn new(max_pending_blocks_depth: u64) -> Self {
let (tx, rx) = mpsc::unbounded_channel::<StateUpdate>();
let pending_blocks: Arc<ArcSwapOption<PendingBlocks>> = Arc::new(ArcSwapOption::new(None));
let (flashblock_sender, _) = broadcast::channel(BUFFER_SIZE);

Self {
pending_blocks,
queue: tx,
rx: Arc::new(Mutex::new(rx)),
flashblock_sender,
max_pending_blocks_depth,
}
}

/// Starts the flashblocks state processor with the given client.
///
/// This spawns a background task that processes canonical blocks and flashblocks.
/// Should be called after the node is launched and the provider is available.
pub fn start<Client>(&self, client: Client)
where
Client: StateProviderFactory
+ ChainSpecProvider<ChainSpec: EthChainSpec<Header = Header> + OpHardforks>
+ BlockReaderIdExt<Header = Header>
+ Clone
+ 'static,
{
let state_processor = StateProcessor::new(
client,
pending_blocks.clone(),
max_pending_blocks_depth,
Arc::new(Mutex::new(rx)),
flashblock_sender.clone(),
self.pending_blocks.clone(),
self.max_pending_blocks_depth,
self.rx.clone(),
self.flashblock_sender.clone(),
);

Self { pending_blocks, queue: tx, flashblock_sender, state_processor }
}

/// Starts the flashblocks state processor.
pub fn start(&self) {
let sp = self.state_processor.clone();
tokio::spawn(async move {
sp.start().await;
state_processor.start().await;
});
}

Expand All @@ -79,7 +92,7 @@ where
}
}

impl<Client> FlashblocksReceiver for FlashblocksState<Client> {
impl FlashblocksReceiver for FlashblocksState {
fn on_flashblock_received(&self, flashblock: Flashblock) {
let flashblock_index = flashblock.index;
let block_number = flashblock.metadata.block_number;
Expand All @@ -97,7 +110,7 @@ impl<Client> FlashblocksReceiver for FlashblocksState<Client> {
}
}

impl<Client> FlashblocksAPI for FlashblocksState<Client> {
impl FlashblocksAPI for FlashblocksState {
fn get_pending_blocks(&self) -> Guard<Option<Arc<PendingBlocks>>> {
self.pending_blocks.load()
}
Expand Down
Loading
Loading