From 3e173056c4afc3e86cf2b6d39bb71e968cc52ce4 Mon Sep 17 00:00:00 2001 From: Niran Babalola Date: Sun, 11 Jan 2026 16:58:48 -0600 Subject: [PATCH] Use pending flashblocks state for bundle metering Add support for metering bundles against the current flashblocks pending state instead of just the canonical block state. This ensures that bundle simulations see the same state that will be used when the bundle is actually included. Key changes: - Add FlashblocksState struct containing Cache and BundleState - Update meter_bundle to accept optional FlashblocksState parameter - Implement three-layer database architecture in metering - Add integration tests for flashblock state visibility --- Cargo.lock | 63 +++++---- bin/node/src/main.rs | 14 +- crates/client/flashblocks/src/extension.rs | 2 +- crates/client/flashblocks/src/state.rs | 6 + crates/client/flashblocks/tests/state.rs | 130 ++++++++++++++++++ crates/client/metering/Cargo.toml | 1 + crates/client/metering/src/extension.rs | 48 ++++++- crates/client/metering/src/lib.rs | 4 +- crates/client/metering/src/meter.rs | 137 +++++++++++++++++-- crates/client/metering/src/rpc.rs | 152 ++++++++++++++------- 10 files changed, 458 insertions(+), 99 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c52e0c3e..ff365f0e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2075,6 +2075,7 @@ dependencies = [ "alloy-rpc-client", "base-bundles", "base-client-node", + "base-flashblocks", "eyre", "jsonrpsee", "op-alloy-consensus", @@ -2941,9 +2942,9 @@ dependencies = [ [[package]] name = "comfy-table" -version = "7.2.2" +version = "7.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "958c5d6ecf1f214b4c2bbbbf6ab9523a864bd136dcf71a7e8904799acfe1ad47" +checksum = "b03b7db8e0b4b2fdad6c551e634134e99ec000e5c8c3b6856c65e8bbaded7a3b" dependencies = [ "crossterm 0.29.0", "unicode-segmentation", @@ -5024,9 +5025,9 @@ dependencies = [ [[package]] name = "icu_collections" -version = "2.1.1" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c6b649701667bbe825c3b7e6388cb521c23d88644678e83c0c4d0a621a34b43" +checksum = "200072f5d0e3614556f94a9930d5dc3e0662a652823904c3a75dc3b0af7fee47" dependencies = [ "displaydoc", "potential_utf", @@ -5043,6 +5044,7 @@ checksum = "edba7861004dd3714265b4db54a3c390e880ab658fec5f7db895fae2046b5bb6" dependencies = [ "displaydoc", "litemap", + "serde", "tinystr", "writeable", "zerovec", @@ -5050,10 +5052,11 @@ dependencies = [ [[package]] name = "icu_normalizer" -version = "2.1.1" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f6c8828b67bf8908d82127b2054ea1b4427ff0230ee9141c54251934ab1b599" +checksum = "8b24a59706036ba941c9476a55cd57b82b77f38a3c667d637ee7cabbc85eaedc" dependencies = [ + "displaydoc", "icu_collections", "icu_normalizer_data", "icu_properties", @@ -5064,29 +5067,31 @@ dependencies = [ [[package]] name = "icu_normalizer_data" -version = "2.1.1" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7aedcccd01fc5fe81e6b489c15b247b8b0690feb23304303a9e560f37efc560a" +checksum = "00210d6893afc98edb752b664b8890f0ef174c8adbb8d0be9710fa66fbbf72d3" [[package]] name = "icu_properties" -version = "2.1.2" +version = "2.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "020bfc02fe870ec3a66d93e677ccca0562506e5872c650f893269e08615d74ec" +checksum = "f5a97b8ac6235e69506e8dacfb2adf38461d2ce6d3e9bd9c94c4cbc3cd4400a4" dependencies = [ + "displaydoc", "icu_collections", "icu_locale_core", "icu_properties_data", "icu_provider", + "potential_utf", "zerotrie", "zerovec", ] [[package]] name = "icu_properties_data" -version = "2.1.2" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "616c294cf8d725c6afcd8f55abc17c56464ef6211f9ed59cccffe534129c77af" +checksum = "298459143998310acd25ffe6810ed544932242d3f07083eee1084d83a71bd632" [[package]] name = "icu_provider" @@ -5096,6 +5101,8 @@ checksum = "85962cf0ce02e1e0a629cc34e7ca3e373ce20dda4c4d7294bbd0bf1fdb59e614" dependencies = [ "displaydoc", "icu_locale_core", + "serde", + "stable_deref_trait", "writeable", "yoke", "zerofrom", @@ -8294,7 +8301,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1" dependencies = [ "rand_chacha 0.9.0", - "rand_core 0.9.5", + "rand_core 0.9.4", "serde", ] @@ -8315,7 +8322,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" dependencies = [ "ppv-lite86", - "rand_core 0.9.5", + "rand_core 0.9.4", ] [[package]] @@ -8329,9 +8336,9 @@ dependencies = [ [[package]] name = "rand_core" -version = "0.9.5" +version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76afc826de14238e6e8c374ddcc1fa19e374fd8dd986b0d2af0d02377261d83c" +checksum = "4f1b3bc831f92381018fd9c6350b917c7b21f1eed35a65a51900e0e55a3d7afa" dependencies = [ "getrandom 0.3.4", "serde", @@ -8343,7 +8350,7 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "513962919efc330f829edb2535844d1b912b0fbe2ca165d613e4e8788bb05a5a" dependencies = [ - "rand_core 0.9.5", + "rand_core 0.9.4", ] [[package]] @@ -8352,7 +8359,7 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f703f4665700daf5512dcca5f43afa6af89f09db47fb56be587f80636bda2d41" dependencies = [ - "rand_core 0.9.5", + "rand_core 0.9.4", ] [[package]] @@ -13418,9 +13425,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.45" +version = "0.3.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9e442fc33d7fdb45aa9bfeb312c095964abdf596f7567261062b2a7107aaabd" +checksum = "91e7d9e3bb61134e77bde20dd4825b97c010155709965fedf0f49bb138e52a9d" dependencies = [ "deranged", "itoa", @@ -13428,22 +13435,22 @@ dependencies = [ "num-conv", "num_threads", "powerfmt", - "serde_core", + "serde", "time-core", "time-macros", ] [[package]] name = "time-core" -version = "0.1.7" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b36ee98fd31ec7426d599183e8fe26932a8dc1fb76ddb6214d05493377d34ca" +checksum = "40868e7c1d2f0b8d73e4a8c7f0ff63af4f6d19be117e90bd73eb1d62cf831c6b" [[package]] name = "time-macros" -version = "0.2.25" +version = "0.2.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "71e552d1249bf61ac2a52db88179fd0673def1e1ad8243a00d9ec9ed71fee3dd" +checksum = "30cfb0125f12d9c277f35663a0a33f8c30190f4e4574868a330595412d34ebf3" dependencies = [ "num-conv", "time-core", @@ -13465,6 +13472,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42d3e9c45c09de15d06dd8acf5f4e0e399e85927b7f00711024eb7ae10fa4869" dependencies = [ "displaydoc", + "serde_core", "zerovec", ] @@ -15413,6 +15421,7 @@ version = "0.11.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c28719294829477f525be0186d13efa9a3c602f7ec202ca9e353d310fb9a002" dependencies = [ + "serde", "yoke", "zerofrom", "zerovec-derive", @@ -15431,9 +15440,9 @@ dependencies = [ [[package]] name = "zmij" -version = "1.0.14" +version = "1.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd8f3f50b848df28f887acb68e41201b5aea6bc8a8dacc00fb40635ff9a72fea" +checksum = "ac93432f5b761b22864c774aac244fa5c0fd877678a4c37ebf6cf42208f9c9ec" [[package]] name = "zstd" diff --git a/bin/node/src/main.rs b/bin/node/src/main.rs index 10b591ee..3af028d5 100644 --- a/bin/node/src/main.rs +++ b/bin/node/src/main.rs @@ -6,8 +6,8 @@ pub mod cli; use base_client_node::BaseNodeRunner; -use base_flashblocks::FlashblocksExtension; -use base_metering::MeteringExtension; +use base_flashblocks::{FlashblocksConfig, FlashblocksExtension}; +use base_metering::{MeteringConfig, MeteringExtension}; use base_txpool::TxPoolExtension; #[global_allocator] @@ -26,10 +26,16 @@ fn main() { cli.run(|builder, args| async move { let mut runner = BaseNodeRunner::new(args.rollup_args.clone()); + // Create flashblocks config first so we can share its state with metering + let flashblocks_config: Option = args.clone().into(); + // Feature extensions (FlashblocksExtension must be last - uses replace_configured) runner.install_ext::(args.clone().into()); - runner.install_ext::(args.enable_metering); - runner.install_ext::(args.into()); + runner.install_ext::(MeteringConfig { + enabled: args.enable_metering, + flashblocks_config: flashblocks_config.clone(), + }); + runner.install_ext::(flashblocks_config); let handle = runner.run(builder); handle.await diff --git a/crates/client/flashblocks/src/extension.rs b/crates/client/flashblocks/src/extension.rs index 9365c47e..17f2b659 100644 --- a/crates/client/flashblocks/src/extension.rs +++ b/crates/client/flashblocks/src/extension.rs @@ -15,7 +15,7 @@ use crate::{ }; /// Flashblocks-specific configuration knobs. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct FlashblocksConfig { /// The websocket endpoint that streams flashblock updates. pub websocket_url: Url, diff --git a/crates/client/flashblocks/src/state.rs b/crates/client/flashblocks/src/state.rs index 0db56e7c..3062a0d9 100644 --- a/crates/client/flashblocks/src/state.rs +++ b/crates/client/flashblocks/src/state.rs @@ -110,6 +110,12 @@ impl FlashblocksReceiver for FlashblocksState { } } +impl Default for FlashblocksState { + fn default() -> Self { + Self::new(10) + } +} + impl FlashblocksAPI for FlashblocksState { fn get_pending_blocks(&self) -> Guard>> { self.pending_blocks.load() diff --git a/crates/client/flashblocks/tests/state.rs b/crates/client/flashblocks/tests/state.rs index dbbe3c00..1af80bf6 100644 --- a/crates/client/flashblocks/tests/state.rs +++ b/crates/client/flashblocks/tests/state.rs @@ -797,6 +797,136 @@ async fn test_duplicate_flashblock_ignored() { assert_eq!(block, block_two); } +/// Verifies that eth_call targeting pending block sees flashblock state changes. +/// +/// This test catches database layering bugs where pending state from flashblocks +/// isn't visible to RPC callers. After a flashblock transfers ETH to Bob, an +/// eth_call simulating a transfer FROM Bob should succeed because Bob now has +/// more funds from the flashblock. +#[tokio::test] +async fn test_eth_call_sees_flashblock_state_changes() { + use alloy_eips::BlockNumberOrTag; + use alloy_provider::Provider; + use alloy_rpc_types_eth::TransactionInput; + use op_alloy_rpc_types::OpTransactionRequest; + + let test = TestHarness::new().await; + let provider = test.node.provider(); + + let bob_address = Account::Bob.address(); + let charlie_address = Account::Charlie.address(); + + // Get Bob's canonical balance to calculate a transfer amount that exceeds it + let canonical_balance = provider.get_balance(bob_address).await.unwrap(); + + // Send base flashblock + test.send_flashblock(FlashblockBuilder::new_base(&test).build()).await; + + // Flashblock 1: Alice sends a large amount to Bob + let transfer_to_bob = 1_000_000_000_000_000_000u128; // 1 ETH + let tx = test.build_transaction_to_send_eth_with_nonce( + Account::Alice, + Account::Bob, + transfer_to_bob, + 0, + ); + test.send_flashblock(FlashblockBuilder::new(&test, 1).with_transactions(vec![tx]).build()) + .await; + + // Verify via state overrides that Bob received the funds + let overrides = test + .flashblocks + .get_pending_blocks() + .get_state_overrides() + .expect("state overrides should exist after flashblock execution"); + let bob_override = overrides.get(&bob_address).expect("Bob should have a state override"); + let bob_pending_balance = bob_override.balance.expect("Bob's balance override should be set"); + assert_eq!( + bob_pending_balance, + canonical_balance + U256::from(transfer_to_bob), + "State override should show Bob's increased balance" + ); + + // Now the key test: eth_call from Bob should see this pending balance. + // Try to transfer more than Bob's canonical balance (but less than pending). + // This would fail if eth_call can't see the pending state. + let transfer_amount = canonical_balance + U256::from(100_000u64); + let call_request = OpTransactionRequest::default() + .from(bob_address) + .to(charlie_address) + .value(transfer_amount) + .gas_limit(21_000) + .input(TransactionInput::default()); + + let result = provider.call(call_request).block(BlockNumberOrTag::Pending.into()).await; + assert!( + result.is_ok(), + "eth_call from Bob should succeed because pending state shows increased balance. \ + If this fails, eth_call may not be seeing flashblock state changes. Error: {:?}", + result.err() + ); +} + +/// Verifies that transactions in flashblock N+1 can see state changes from flashblock N. +/// +/// This test catches database layering bugs where writes from earlier flashblocks +/// aren't visible to later flashblock execution. The key is that flashblock 2's +/// transaction uses nonce=1, which only succeeds if the execution layer sees +/// flashblock 1's transaction (which used nonce=0). +#[tokio::test] +async fn test_sequential_nonces_across_flashblocks() { + let test = TestHarness::new().await; + + // Send base flashblock + test.send_flashblock(FlashblockBuilder::new_base(&test).build()).await; + + // Flashblock 1: Alice sends to Bob with nonce 0 + let tx_nonce_0 = + test.build_transaction_to_send_eth_with_nonce(Account::Alice, Account::Bob, 1000, 0); + test.send_flashblock( + FlashblockBuilder::new(&test, 1).with_transactions(vec![tx_nonce_0]).build(), + ) + .await; + + // Verify flashblock 1 was processed - Alice's pending nonce should now be 1 + let alice_state = test.account_state(Account::Alice); + assert_eq!(alice_state.nonce, 1, "After flashblock 1, Alice's pending nonce should be 1"); + + // Flashblock 2: Alice sends to Charlie with nonce 1 + // This will FAIL if the execution layer can't see flashblock 1's state change + let tx_nonce_1 = + test.build_transaction_to_send_eth_with_nonce(Account::Alice, Account::Charlie, 2000, 1); + test.send_flashblock( + FlashblockBuilder::new(&test, 2).with_transactions(vec![tx_nonce_1]).build(), + ) + .await; + + // Verify flashblock 2 was processed - Alice's pending nonce should now be 2 + let alice_state_after = test.account_state(Account::Alice); + assert_eq!( + alice_state_after.nonce, 2, + "After flashblock 2, Alice's pending nonce should be 2. \ + If this fails, the database layering may be preventing flashblock 2 \ + from seeing flashblock 1's state changes." + ); + + // Also verify Bob and Charlie received their funds + let overrides = test + .flashblocks + .get_pending_blocks() + .get_state_overrides() + .expect("state overrides should exist"); + + assert!( + overrides.get(&Account::Bob.address()).is_some(), + "Bob should have received funds from flashblock 1" + ); + assert!( + overrides.get(&Account::Charlie.address()).is_some(), + "Charlie should have received funds from flashblock 2" + ); +} + #[tokio::test] async fn test_progress_canonical_blocks_without_flashblocks() { let mut test = TestHarness::new().await; diff --git a/crates/client/metering/Cargo.toml b/crates/client/metering/Cargo.toml index c361a470..c7147d0c 100644 --- a/crates/client/metering/Cargo.toml +++ b/crates/client/metering/Cargo.toml @@ -15,6 +15,7 @@ workspace = true # workspace base-bundles.workspace = true base-client-node.workspace = true +base-flashblocks.workspace = true # reth reth-provider.workspace = true diff --git a/crates/client/metering/src/extension.rs b/crates/client/metering/src/extension.rs index c089954c..767b3327 100644 --- a/crates/client/metering/src/extension.rs +++ b/crates/client/metering/src/extension.rs @@ -1,22 +1,27 @@ //! Contains the [`MeteringExtension`] which wires up the metering RPC surface //! on the Base node builder. +use std::sync::Arc; + use base_client_node::{BaseNodeExtension, FromExtensionConfig, OpBuilder}; +use base_flashblocks::{FlashblocksConfig, FlashblocksState}; use tracing::info; use crate::{MeteringApiImpl, MeteringApiServer}; /// Helper struct that wires the metering RPC into the node builder. -#[derive(Debug, Clone, Copy)] +#[derive(Debug)] pub struct MeteringExtension { /// Whether metering is enabled. pub enabled: bool, + /// Optional Flashblocks configuration (includes state). + pub flashblocks_config: Option, } impl MeteringExtension { /// Creates a new metering extension. - pub const fn new(enabled: bool) -> Self { - Self { enabled } + pub const fn new(enabled: bool, flashblocks_config: Option) -> Self { + Self { enabled, flashblocks_config } } } @@ -27,19 +32,48 @@ impl BaseNodeExtension for MeteringExtension { return builder; } + let flashblocks_config = self.flashblocks_config; + builder.extend_rpc_modules(move |ctx| { info!(message = "Starting Metering RPC"); - let metering_api = MeteringApiImpl::new(ctx.provider().clone()); + + // Get flashblocks state from config, or create a default one if not configured + let fb_state: Arc = + flashblocks_config.as_ref().map(|cfg| cfg.state.clone()).unwrap_or_default(); + + let metering_api = MeteringApiImpl::new(ctx.provider().clone(), fb_state); ctx.modules.merge_configured(metering_api.into_rpc())?; + Ok(()) }) } } +/// Configuration for building a [`MeteringExtension`]. +#[derive(Debug)] +pub struct MeteringConfig { + /// Whether metering is enabled. + pub enabled: bool, + /// Optional Flashblocks configuration (includes state). + pub flashblocks_config: Option, +} + +impl MeteringConfig { + /// Creates a configuration with metering enabled and no flashblocks integration. + pub const fn enabled() -> Self { + Self { enabled: true, flashblocks_config: None } + } + + /// Creates a configuration with metering enabled and flashblocks integration. + pub const fn with_flashblocks(flashblocks_config: FlashblocksConfig) -> Self { + Self { enabled: true, flashblocks_config: Some(flashblocks_config) } + } +} + impl FromExtensionConfig for MeteringExtension { - type Config = bool; + type Config = MeteringConfig; - fn from_config(enabled: Self::Config) -> Self { - Self::new(enabled) + fn from_config(config: Self::Config) -> Self { + Self::new(config.enabled, config.flashblocks_config) } } diff --git a/crates/client/metering/src/lib.rs b/crates/client/metering/src/lib.rs index 7f90aedb..5c2d89a9 100644 --- a/crates/client/metering/src/lib.rs +++ b/crates/client/metering/src/lib.rs @@ -7,10 +7,10 @@ mod block; pub use block::meter_block; mod extension; -pub use extension::MeteringExtension; +pub use extension::{MeteringConfig, MeteringExtension}; mod meter; -pub use meter::{MeterBundleOutput, meter_bundle}; +pub use meter::{FlashblocksState, MeterBundleOutput, meter_bundle}; mod rpc; pub use rpc::MeteringApiImpl; diff --git a/crates/client/metering/src/meter.rs b/crates/client/metering/src/meter.rs index 4c34956a..5fc38636 100644 --- a/crates/client/metering/src/meter.rs +++ b/crates/client/metering/src/meter.rs @@ -11,7 +11,14 @@ use reth_optimism_chainspec::OpChainSpec; use reth_optimism_evm::{OpEvmConfig, OpNextBlockEnvAttributes}; use reth_primitives_traits::SealedHeader; use reth_revm::{database::StateProviderDatabase, db::State}; -use revm_database::states::bundle_state::BundleRetention; +use revm_database::states::{BundleState, bundle_state::BundleRetention}; + +/// State from pending flashblocks that is used as a base for metering +#[derive(Debug, Clone)] +pub struct FlashblocksState { + /// The accumulated bundle of state changes + pub bundle_state: BundleState, +} const BLOCK_TIME: u64 = 2; // 2 seconds per block @@ -34,16 +41,16 @@ pub struct MeterBundleOutput { /// Simulates and meters a bundle of transactions /// -/// Takes a state provider, chain spec, parsed bundle, and block header, +/// Takes a state provider, chain spec, parsed bundle, block header, and optional flashblocks state, /// then executes transactions in sequence to measure gas usage and execution time. /// -/// Returns [`MeterBundleOutput`] containing transaction results and aggregated metrics, -/// including separate timing for state root calculation. +/// Returns [`MeterBundleOutput`] containing transaction results and aggregated metrics. pub fn meter_bundle( state_provider: SP, chain_spec: Arc, bundle: ParsedBundle, header: &SealedHeader, + flashblocks_state: Option, ) -> EyreResult where SP: reth_provider::StateProvider, @@ -53,7 +60,16 @@ where // Create state database let state_db = StateProviderDatabase::new(state_provider); - let mut db = State::builder().with_database(state_db).with_bundle_update().build(); + + // Track bundle state changes. If metering using flashblocks state, include its bundle prestate. + let mut db = match flashblocks_state { + Some(ref flashblocks) => State::builder() + .with_database(state_db) + .with_bundle_update() + .with_bundle_prestate(flashblocks.bundle_state.clone()) + .build(), + None => State::builder().with_database(state_db).with_bundle_update().build(), + }; // Set up next block attributes // Use bundle.min_timestamp if provided, otherwise use header timestamp + BLOCK_TIME @@ -112,7 +128,8 @@ where } } - // Calculate state root and measure its calculation time + // Calculate state root and measure its calculation time. The bundle already includes + // flashblocks state if it was provided via with_bundle_prestate. db.merge_transitions(BundleRetention::Reverts); let bundle_update = db.take_bundle(); let state_provider = db.database.as_ref(); @@ -142,6 +159,7 @@ mod tests { use eyre::Context; use reth_optimism_primitives::OpTransactionSigned; use reth_provider::StateProviderFactory; + use reth_revm::{bytecode::Bytecode, primitives::KECCAK_EMPTY, state::AccountInfo}; use reth_transaction_pool::test_utils::TransactionBuilder; use super::*; @@ -177,7 +195,8 @@ mod tests { let parsed_bundle = create_parsed_bundle(Vec::new())?; - let output = meter_bundle(state_provider, harness.chain_spec(), parsed_bundle, &header)?; + let output = + meter_bundle(state_provider, harness.chain_spec(), parsed_bundle, &header, None)?; assert!(output.results.is_empty()); assert_eq!(output.total_gas_used, 0); @@ -220,7 +239,8 @@ mod tests { let parsed_bundle = create_parsed_bundle(vec![tx])?; - let output = meter_bundle(state_provider, harness.chain_spec(), parsed_bundle, &header)?; + let output = + meter_bundle(state_provider, harness.chain_spec(), parsed_bundle, &header, None)?; assert_eq!(output.results.len(), 1); let result = &output.results[0]; @@ -297,7 +317,8 @@ mod tests { let parsed_bundle = create_parsed_bundle(vec![tx_1, tx_2])?; - let output = meter_bundle(state_provider, harness.chain_spec(), parsed_bundle, &header)?; + let output = + meter_bundle(state_provider, harness.chain_spec(), parsed_bundle, &header, None)?; assert_eq!(output.results.len(), 2); assert!(output.total_time_us > 0); @@ -369,7 +390,8 @@ mod tests { let parsed_bundle = create_parsed_bundle(vec![tx])?; - let output = meter_bundle(state_provider, harness.chain_spec(), parsed_bundle, &header)?; + let output = + meter_bundle(state_provider, harness.chain_spec(), parsed_bundle, &header, None)?; // Verify invariant: total time must include state root time assert!( @@ -384,4 +406,99 @@ mod tests { Ok(()) } + + /// Integration test: verifies meter_bundle uses flashblocks state correctly. + /// + /// A transaction using nonce=1 should fail without flashblocks state (since + /// canonical nonce is 0), but succeed when flashblocks state indicates nonce=1. + #[tokio::test] + async fn meter_bundle_requires_correct_layering_for_pending_nonce() -> eyre::Result<()> { + let harness = TestHarness::new().await?; + let latest = harness.latest_block(); + let header = latest.sealed_header().clone(); + + // Create a transaction that requires nonce=1 (assuming canonical nonce is 0) + let to = Address::random(); + let signed_tx = TransactionBuilder::default() + .signer(Account::Alice.signer_b256()) + .chain_id(harness.chain_id()) + .nonce(1) // Requires pending state to have nonce=1 + .to(to) + .value(100) + .gas_limit(21_000) + .max_fee_per_gas(10) + .max_priority_fee_per_gas(1) + .into_eip1559(); + + let tx = OpTransactionSigned::Eip1559( + signed_tx.as_eip1559().expect("eip1559 transaction").clone(), + ); + let parsed_bundle = create_parsed_bundle(vec![tx])?; + + // Without flashblocks state, transaction should fail (nonce mismatch) + let state_provider = harness + .blockchain_provider() + .state_by_block_hash(latest.hash()) + .context("getting state provider")?; + + let result_without_flashblocks = meter_bundle( + state_provider, + harness.chain_spec(), + parsed_bundle.clone(), + &header, + None, // No flashblocks state + ); + + assert!( + result_without_flashblocks.is_err(), + "Transaction with nonce=1 should fail without pending state (canonical nonce is 0)" + ); + + // Now create flashblocks state with nonce=1 for Alice + // Use BundleState::new() to properly calculate state_size + let bundle_state = BundleState::new( + [( + Account::Alice.address(), + Some(AccountInfo { + balance: U256::from(1_000_000_000u64), + nonce: 0, // original + code_hash: KECCAK_EMPTY, + code: None, + }), + Some(AccountInfo { + balance: U256::from(1_000_000_000u64), + nonce: 1, // pending (after first flashblock tx) + code_hash: KECCAK_EMPTY, + code: None, + }), + Default::default(), // no storage changes + )], + Vec::>, Vec<(U256, U256)>)>>::new(), + Vec::<(B256, Bytecode)>::new(), + ); + + let flashblocks_state = FlashblocksState { bundle_state }; + + // With correct flashblocks state, transaction should succeed + let state_provider2 = harness + .blockchain_provider() + .state_by_block_hash(latest.hash()) + .context("getting state provider")?; + + let result_with_flashblocks = meter_bundle( + state_provider2, + harness.chain_spec(), + parsed_bundle, + &header, + Some(flashblocks_state), + ); + + assert!( + result_with_flashblocks.is_ok(), + "Transaction with nonce=1 should succeed with pending state showing nonce=1: {:?}", + result_with_flashblocks.err() + ); + + Ok(()) + } } diff --git a/crates/client/metering/src/rpc.rs b/crates/client/metering/src/rpc.rs index c379cb64..d08fe5da 100644 --- a/crates/client/metering/src/rpc.rs +++ b/crates/client/metering/src/rpc.rs @@ -1,28 +1,34 @@ //! Implementation of the metering RPC API. -use alloy_consensus::Header; +use std::sync::Arc; + +use alloy_consensus::{Header, Sealed}; use alloy_eips::BlockNumberOrTag; use alloy_primitives::{B256, U256}; use base_bundles::{Bundle, MeterBundleResponse, ParsedBundle}; +use base_flashblocks::{FlashblocksAPI, PendingBlocksAPI}; use jsonrpsee::core::{RpcResult, async_trait}; use reth_optimism_chainspec::OpChainSpec; use reth_optimism_primitives::OpBlock; +use reth_primitives_traits::SealedHeader; use reth_provider::{ BlockReader, BlockReaderIdExt, ChainSpecProvider, HeaderProvider, StateProviderFactory, }; use tracing::{error, info}; use crate::{ - MeterBlockResponse, block::meter_block, meter::meter_bundle, traits::MeteringApiServer, + FlashblocksState, MeterBlockResponse, block::meter_block, meter::meter_bundle, + traits::MeteringApiServer, }; -/// Implementation of the metering RPC API +/// Implementation of the metering RPC API. #[derive(Debug)] -pub struct MeteringApiImpl { +pub struct MeteringApiImpl { provider: Provider, + flashblocks_state: Arc, } -impl MeteringApiImpl +impl MeteringApiImpl where Provider: StateProviderFactory + ChainSpecProvider @@ -30,15 +36,16 @@ where + BlockReader + HeaderProvider
+ Clone, + FB: FlashblocksAPI, { /// Creates a new instance of MeteringApi - pub const fn new(provider: Provider) -> Self { - Self { provider } + pub const fn new(provider: Provider, flashblocks_state: Arc) -> Self { + Self { provider, flashblocks_state } } } #[async_trait] -impl MeteringApiServer for MeteringApiImpl +impl MeteringApiServer for MeteringApiImpl where Provider: StateProviderFactory + ChainSpecProvider @@ -49,6 +56,7 @@ where + Send + Sync + 'static, + FB: FlashblocksAPI + Send + Sync + 'static, { async fn meter_bundle(&self, bundle: Bundle) -> RpcResult { info!( @@ -57,24 +65,56 @@ where "Starting bundle metering" ); - // Get the latest header - let header = self - .provider - .sealed_header_by_number_or_tag(BlockNumberOrTag::Latest) - .map_err(|e| { - jsonrpsee::types::ErrorObjectOwned::owned( - jsonrpsee::types::ErrorCode::InternalError.code(), - format!("Failed to get latest header: {}", e), - None::<()>, - ) - })? - .ok_or_else(|| { - jsonrpsee::types::ErrorObjectOwned::owned( - jsonrpsee::types::ErrorCode::InternalError.code(), - "Latest block not found".to_string(), - None::<()>, - ) - })?; + // Get pending flashblocks state + let pending_blocks = self.flashblocks_state.get_pending_blocks(); + + // Get header and flashblock index from pending blocks + // If no pending blocks exist, fall back to latest canonical block + let (header, flashblock_index, canonical_block_number) = + if let Some(pb) = pending_blocks.as_ref() { + let latest_header: Sealed
= pb.latest_header(); + let flashblock_index = pb.latest_flashblock_index(); + let canonical_block_number = pb.canonical_block_number(); + + info!( + latest_block = latest_header.number, + canonical_block = %canonical_block_number, + flashblock_index = flashblock_index, + "Using latest flashblock state for metering" + ); + + // Convert Sealed
to SealedHeader + let sealed_header = + SealedHeader::new(latest_header.inner().clone(), latest_header.hash()); + (sealed_header, flashblock_index, canonical_block_number) + } else { + // No pending blocks, use latest canonical block + let canonical_block_number = pending_blocks.get_canonical_block_number(); + let header = self + .provider + .sealed_header_by_number_or_tag(canonical_block_number) + .map_err(|e| { + jsonrpsee::types::ErrorObjectOwned::owned( + jsonrpsee::types::ErrorCode::InternalError.code(), + format!("Failed to get canonical block header: {}", e), + None::<()>, + ) + })? + .ok_or_else(|| { + jsonrpsee::types::ErrorObjectOwned::owned( + jsonrpsee::types::ErrorCode::InternalError.code(), + "Canonical block not found".to_string(), + None::<()>, + ) + })?; + + info!( + canonical_block = header.number, + "No flashblocks available, using canonical block state for metering" + ); + + (header, 0, canonical_block_number) + }; let parsed_bundle = ParsedBundle::try_from(bundle).map_err(|e| { jsonrpsee::types::ErrorObjectOwned::owned( @@ -84,28 +124,39 @@ where ) })?; - // Get state provider for the block - let state_provider = self.provider.state_by_block_hash(header.hash()).map_err(|e| { - error!(error = %e, "Failed to get state provider"); + // Get state provider for the canonical block + let state_provider = + self.provider.state_by_block_number_or_tag(canonical_block_number).map_err(|e| { + error!(error = %e, "Failed to get state provider"); + jsonrpsee::types::ErrorObjectOwned::owned( + jsonrpsee::types::ErrorCode::InternalError.code(), + format!("Failed to get state provider: {}", e), + None::<()>, + ) + })?; + + // If we have pending flashblocks, get the state to apply pending changes + let flashblocks_state = pending_blocks + .as_ref() + .map(|pb| FlashblocksState { bundle_state: pb.get_bundle_state() }); + + // Meter bundle using utility function + let output = meter_bundle( + state_provider, + self.provider.chain_spec(), + parsed_bundle, + &header, + flashblocks_state, + ) + .map_err(|e| { + error!(error = %e, "Bundle metering failed"); jsonrpsee::types::ErrorObjectOwned::owned( jsonrpsee::types::ErrorCode::InternalError.code(), - format!("Failed to get state provider: {}", e), + format!("Bundle metering failed: {}", e), None::<()>, ) })?; - // Meter bundle using utility function - let output = - meter_bundle(state_provider, self.provider.chain_spec(), parsed_bundle, &header) - .map_err(|e| { - error!(error = %e, "Bundle metering failed"); - jsonrpsee::types::ErrorObjectOwned::owned( - jsonrpsee::types::ErrorCode::InternalError.code(), - format!("Bundle metering failed: {}", e), - None::<()>, - ) - })?; - // Calculate average gas price let bundle_gas_price = if output.total_gas_used > 0 { output.total_gas_fees / U256::from(output.total_gas_used) @@ -118,7 +169,8 @@ where num_transactions = output.results.len(), total_gas_used = output.total_gas_used, total_time_us = output.total_time_us, - state_root_time_us = output.state_root_time_us, + state_block_number = header.number, + flashblock_index = flashblock_index, "Bundle metering completed successfully" ); @@ -126,11 +178,11 @@ where bundle_gas_price, bundle_hash: output.bundle_hash, coinbase_diff: output.total_gas_fees, - eth_sent_to_coinbase: U256::ZERO, + eth_sent_to_coinbase: U256::from(0), gas_fees: output.total_gas_fees, results: output.results, state_block_number: header.number, - state_flashblock_index: None, + state_flashblock_index: Some(flashblock_index), total_gas_used: output.total_gas_used, total_execution_time_us: output.total_time_us, state_root_time_us: output.state_root_time_us, @@ -214,7 +266,7 @@ where } } -impl MeteringApiImpl +impl MeteringApiImpl where Provider: StateProviderFactory + ChainSpecProvider @@ -225,6 +277,7 @@ where + Send + Sync + 'static, + FB: FlashblocksAPI + Send + Sync + 'static, { /// Internal helper to meter a block's execution fn meter_block_internal(&self, block: &OpBlock) -> RpcResult { @@ -251,7 +304,7 @@ mod tests { use reth_transaction_pool::test_utils::TransactionBuilder; use super::*; - use crate::MeteringExtension; + use crate::{MeteringConfig, MeteringExtension}; fn create_bundle(txs: Vec, block_number: u64, min_timestamp: Option) -> Bundle { Bundle { @@ -268,7 +321,10 @@ mod tests { } async fn setup() -> eyre::Result<(TestHarness, RpcClient)> { - let harness = TestHarness::builder().with_ext::(true).build().await?; + let harness = TestHarness::builder() + .with_ext::(MeteringConfig::enabled()) + .build() + .await?; let client = harness.rpc_client()?; Ok((harness, client)) }