Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
242 changes: 125 additions & 117 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ reth-node-api = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" }
reth-rpc-engine-api = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" }
reth-node-ethereum = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" }
reth-trie = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" }
reth-trie-common = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" }
reth-trie-parallel = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" }
reth-basic-payload-builder = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" }
reth-chainspec = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" }
Expand Down
4 changes: 4 additions & 0 deletions crates/client/metering/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ reth-optimism-evm.workspace = true
reth-optimism-chainspec.workspace = true
reth-optimism-primitives.workspace = true
reth-revm.workspace = true
reth-trie-common.workspace = true

# revm
revm-database.workspace = true
Expand All @@ -38,6 +39,9 @@ alloy-eips.workspace = true
jsonrpsee.workspace = true

# misc
arc-swap.workspace = true
metrics.workspace = true
metrics-derive.workspace = true
tracing.workspace = true
eyre.workspace = true
serde.workspace = true
Expand Down
7 changes: 6 additions & 1 deletion crates/client/metering/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,18 @@ mod extension;
pub use extension::{MeteringConfig, MeteringExtension};

mod meter;
pub use meter::{FlashblocksState, MeterBundleOutput, meter_bundle};
pub use meter::{MeterBundleOutput, PendingState, PendingTrieInput, meter_bundle};

mod metrics;

mod rpc;
pub use rpc::MeteringApiImpl;

mod traits;
pub use traits::MeteringApiServer;

mod trie_cache;
pub use trie_cache::PendingTrieCache;

mod types;
pub use types::{MeterBlockResponse, MeterBlockTransactions};
117 changes: 95 additions & 22 deletions crates/client/metering/src/meter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,58 @@ use reth_optimism_chainspec::OpChainSpec;
use reth_optimism_evm::{OpEvmConfig, OpNextBlockEnvAttributes};
use reth_primitives_traits::SealedHeader;
use reth_revm::{database::StateProviderDatabase, db::State};
use reth_trie_common::TrieInput;
use revm_database::states::{BundleState, bundle_state::BundleRetention};

/// State from pending flashblocks that is used as a base for metering
use crate::metrics::Metrics;

/// Computes the pending trie input from the bundle state.
///
/// This function records metrics for cache misses and compute duration.
pub(crate) fn compute_pending_trie_input<SP>(
state_provider: &SP,
bundle_state: &BundleState,
metrics: &Metrics,
) -> EyreResult<PendingTrieInput>
where
SP: reth_provider::StateProvider + ?Sized,
{
metrics.pending_trie_cache_misses.increment(1);
let start = Instant::now();

let hashed = state_provider.hashed_post_state(bundle_state);
let (_state_root, trie_updates) = state_provider.state_root_with_updates(hashed.clone())?;

let elapsed = start.elapsed();
metrics.pending_trie_compute_duration.record(elapsed.as_secs_f64());

Ok(PendingTrieInput { trie_updates, hashed_state: hashed })
}

/// Pre-computed trie input from pending state for efficient state root calculation.
///
/// When metering bundles on top of pending flashblocks, we first compute the trie updates
/// and hashed state for the pending state. This can then be prepended to the bundle's
/// trie input, so state root calculation only performs I/O for the bundle's changes.
#[derive(Debug, Clone)]
pub struct PendingTrieInput {
/// Trie updates from computing pending state root.
pub trie_updates: reth_trie_common::updates::TrieUpdates,
/// Hashed state from pending flashblocks.
pub hashed_state: reth_trie_common::HashedPostState,
}

/// Pending state from flashblocks used as the base for bundle metering.
///
/// This contains the accumulated state changes from pending flashblocks,
/// allowing bundle simulation to build on top of not-yet-canonical state.
#[derive(Debug, Clone)]
pub struct FlashblocksState {
/// The accumulated bundle of state changes
pub struct PendingState {
/// The accumulated bundle of state changes from pending flashblocks.
pub bundle_state: BundleState,
/// Optional pre-computed trie input for faster state root calculation.
/// If provided, state root calculation skips recomputing the pending state's trie.
pub trie_input: Option<PendingTrieInput>,
}

const BLOCK_TIME: u64 = 2; // 2 seconds per block
Expand All @@ -39,9 +84,9 @@ pub struct MeterBundleOutput {
pub state_root_time_us: u128,
}

/// Simulates and meters a bundle of transactions
/// Simulates and meters a bundle of transactions.
///
/// Takes a state provider, chain spec, parsed bundle, block header, and optional flashblocks state,
/// Takes a state provider, chain spec, parsed bundle, block header, and optional pending state,
/// then executes transactions in sequence to measure gas usage and execution time.
///
/// Returns [`MeterBundleOutput`] containing transaction results and aggregated metrics.
Expand All @@ -50,25 +95,42 @@ pub fn meter_bundle<SP>(
chain_spec: Arc<OpChainSpec>,
bundle: ParsedBundle,
header: &SealedHeader,
flashblocks_state: Option<FlashblocksState>,
pending_state: Option<PendingState>,
) -> EyreResult<MeterBundleOutput>
where
SP: reth_provider::StateProvider,
{
// Get bundle hash
let bundle_hash = bundle.bundle_hash();

// Get pending trie input before starting timers. This ensures we only measure
// the bundle's incremental I/O cost, not I/O from pending flashblocks.
let metrics = Metrics::default();
let pending_trie = pending_state
.as_ref()
.map(|ps| -> EyreResult<PendingTrieInput> {
// Use cached trie input if available, otherwise compute it
if let Some(ref cached) = ps.trie_input {
metrics.pending_trie_cache_hits.increment(1);
Ok(cached.clone())
} else {
compute_pending_trie_input(&state_provider, &ps.bundle_state, &metrics)
}
})
.transpose()?;

// Create state database
let state_db = StateProviderDatabase::new(state_provider);

// Track bundle state changes. If metering using flashblocks state, include its bundle prestate.
let mut db = match flashblocks_state {
Some(ref flashblocks) => State::builder()
// Track bundle state changes. If metering with pending state, include it as bundle prestate.
let mut db = if let Some(ref ps) = pending_state {
State::builder()
.with_database(state_db)
.with_bundle_update()
.with_bundle_prestate(flashblocks.bundle_state.clone())
.build(),
None => State::builder().with_database(state_db).with_bundle_update().build(),
.with_bundle_prestate(ps.bundle_state.clone())
.build()
} else {
State::builder().with_database(state_db).with_bundle_update().build()
};

// Set up next block attributes
Expand Down Expand Up @@ -129,14 +191,25 @@ where
}

// Calculate state root and measure its calculation time. The bundle already includes
// flashblocks state if it was provided via with_bundle_prestate.
// pending state if it was provided via with_bundle_prestate.
db.merge_transitions(BundleRetention::Reverts);
let bundle_update = db.take_bundle();
let state_provider = db.database.as_ref();

let state_root_start = Instant::now();
let hashed_state = state_provider.hashed_post_state(&bundle_update);
let _ = state_provider.state_root_with_updates(hashed_state)?;

if let Some(cached_trie) = pending_trie {
// Prepend cached pending trie so state root calculation only performs I/O
// for this bundle's changes, not for pending flashblocks.
let mut trie_input = TrieInput::from_state(hashed_state);
trie_input.prepend_cached(cached_trie.trie_updates, cached_trie.hashed_state);
let _ = state_provider.state_root_from_nodes_with_updates(trie_input)?;
} else {
// No pending state, just calculate bundle state root
let _ = state_provider.state_root_with_updates(hashed_state)?;
}

let state_root_time_us = state_root_start.elapsed().as_micros();
let total_time_us = total_start.elapsed().as_micros();

Expand Down Expand Up @@ -446,15 +519,15 @@ mod tests {
harness.chain_spec(),
parsed_bundle.clone(),
&header,
None, // No flashblocks state
None, // No pending state
);

assert!(
result_without_flashblocks.is_err(),
"Transaction with nonce=1 should fail without pending state (canonical nonce is 0)"
);

// Now create flashblocks state with nonce=1 for Alice
// Now create pending state with nonce=1 for Alice
// Use BundleState::new() to properly calculate state_size
let bundle_state = BundleState::new(
[(
Expand All @@ -477,26 +550,26 @@ mod tests {
Vec::<(B256, Bytecode)>::new(),
);

let flashblocks_state = FlashblocksState { bundle_state };
let pending_state = PendingState { bundle_state, trie_input: None };

// With correct flashblocks state, transaction should succeed
// With correct pending state, transaction should succeed
let state_provider2 = harness
.blockchain_provider()
.state_by_block_hash(latest.hash())
.context("getting state provider")?;

let result_with_flashblocks = meter_bundle(
let result_with_pending = meter_bundle(
state_provider2,
harness.chain_spec(),
parsed_bundle,
&header,
Some(flashblocks_state),
Some(pending_state),
);

assert!(
result_with_flashblocks.is_ok(),
result_with_pending.is_ok(),
"Transaction with nonce=1 should succeed with pending state showing nonce=1: {:?}",
result_with_flashblocks.err()
result_with_pending.err()
);

Ok(())
Expand Down
24 changes: 24 additions & 0 deletions crates/client/metering/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
//! Metrics for bundle metering.

use metrics::{Counter, Histogram};
use metrics_derive::Metrics;

/// Metrics for the `reth_metering` component.
/// Conventions:
/// - Durations are recorded in seconds (histograms).
/// - Counters are monotonic event counts.
#[derive(Metrics, Clone)]
#[metrics(scope = "reth_metering")]
pub(crate) struct Metrics {
/// Count of pending trie cache hits.
#[metric(describe = "Count of pending trie cache hits")]
pub pending_trie_cache_hits: Counter,

/// Count of pending trie cache misses (trie computation required).
#[metric(describe = "Count of pending trie cache misses")]
pub pending_trie_cache_misses: Counter,

/// Time taken to compute pending trie (cache miss).
#[metric(describe = "Time taken to compute pending trie on cache miss")]
pub pending_trie_compute_duration: Histogram,
}
53 changes: 40 additions & 13 deletions crates/client/metering/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@ use reth_provider::{
use tracing::{error, info};

use crate::{
FlashblocksState, MeterBlockResponse, block::meter_block, meter::meter_bundle,
MeterBlockResponse, PendingState, PendingTrieCache, block::meter_block, meter::meter_bundle,
traits::MeteringApiServer,
};

/// Implementation of the metering RPC API.
#[derive(Debug)]
pub struct MeteringApiImpl<Provider, FB> {
provider: Provider,
flashblocks_state: Arc<FB>,
flashblocks_api: Arc<FB>,
/// Cache for pending trie input, ensuring each bundle's state root
/// calculation only measures the bundle's incremental I/O.
pending_trie_cache: PendingTrieCache,
}

impl<Provider, FB> MeteringApiImpl<Provider, FB>
Expand All @@ -38,9 +41,9 @@ where
+ Clone,
FB: FlashblocksAPI,
{
/// Creates a new instance of MeteringApi
pub const fn new(provider: Provider, flashblocks_state: Arc<FB>) -> Self {
Self { provider, flashblocks_state }
/// Creates a new instance of MeteringApi.
pub fn new(provider: Provider, flashblocks_api: Arc<FB>) -> Self {
Self { provider, flashblocks_api, pending_trie_cache: PendingTrieCache::new() }
}
}

Expand All @@ -65,8 +68,8 @@ where
"Starting bundle metering"
);

// Get pending flashblocks state
let pending_blocks = self.flashblocks_state.get_pending_blocks();
// Get pending blocks from flashblocks API
let pending_blocks = self.flashblocks_api.get_pending_blocks();

// Get header and flashblock index from pending blocks
// If no pending blocks exist, fall back to latest canonical block
Expand Down Expand Up @@ -135,18 +138,42 @@ where
)
})?;

// If we have pending flashblocks, get the state to apply pending changes
let flashblocks_state = pending_blocks
.as_ref()
.map(|pb| FlashblocksState { bundle_state: pb.get_bundle_state() });
// Get the flashblock index if we have pending blocks
let state_flashblock_index = pending_blocks.as_ref().map(|pb| pb.latest_flashblock_index());

// If we have pending blocks, extract the pending state for metering
let pending_state = if let Some(pb) = pending_blocks.as_ref() {
let bundle_state = pb.get_bundle_state();

// Build a temporary PendingState without trie_input to get the cached trie
let temp_state = PendingState { bundle_state: bundle_state.clone(), trie_input: None };

// Ensure the pending trie input is cached for reuse across bundle simulations
let fb_index = state_flashblock_index.unwrap();
let trie_input = self
.pending_trie_cache
.ensure_cached(header.hash(), fb_index, &temp_state, &*state_provider)
.map_err(|e| {
error!(error = %e, "Failed to cache pending trie input");
jsonrpsee::types::ErrorObjectOwned::owned(
jsonrpsee::types::ErrorCode::InternalError.code(),
format!("Failed to cache pending trie input: {}", e),
None::<()>,
)
})?;

Some(PendingState { bundle_state, trie_input: Some(trie_input) })
} else {
None
};

// Meter bundle using utility function
let output = meter_bundle(
state_provider,
self.provider.chain_spec(),
parsed_bundle,
&header,
flashblocks_state,
pending_state,
)
.map_err(|e| {
error!(error = %e, "Bundle metering failed");
Expand Down Expand Up @@ -182,7 +209,7 @@ where
gas_fees: output.total_gas_fees,
results: output.results,
state_block_number: header.number,
state_flashblock_index: Some(flashblock_index),
state_flashblock_index,
total_gas_used: output.total_gas_used,
total_execution_time_us: output.total_time_us,
state_root_time_us: output.state_root_time_us,
Expand Down
Loading