diff --git a/Cargo.lock b/Cargo.lock index 31d5857b..75c3941e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1773,6 +1773,36 @@ dependencies = [ "url", ] +[[package]] +name = "base-testing-e2e" +version = "0.2.1" +dependencies = [ + "alloy-consensus", + "alloy-eips", + "alloy-network", + "alloy-primitives", + "alloy-provider", + "alloy-rpc-client", + "alloy-rpc-types-eth", + "alloy-signer", + "alloy-signer-local", + "alloy-sol-macro", + "alloy-sol-types", + "brotli", + "colored", + "eyre", + "futures-util", + "hex", + "op-alloy-network", + "op-alloy-rpc-types", + "serde", + "serde_json", + "tokio", + "tokio-tungstenite 0.28.0", + "tracing", + "url", +] + [[package]] name = "base-txpool" version = "0.2.1" @@ -2519,6 +2549,16 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" +[[package]] +name = "colored" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "117725a109d387c937a1533ce01b450cbde6b88abceea8473c4d7a85853cda3c" +dependencies = [ + "lazy_static", + "windows-sys 0.59.0", +] + [[package]] name = "combine" version = "4.6.7" @@ -3327,6 +3367,12 @@ dependencies = [ "litrs", ] +[[package]] +name = "dotenvy" +version = "0.15.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" + [[package]] name = "dunce" version = "1.0.5" @@ -3709,6 +3755,20 @@ version = "0.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" +[[package]] +name = "flashblocks-e2e" +version = "0.2.1" +dependencies = [ + "alloy-primitives", + "base-testing-e2e", + "clap", + "dotenvy", + "eyre", + "tokio", + "tracing", + "tracing-subscriber 0.3.22", +] + [[package]] name = "flate2" version = "1.1.5" diff --git a/Cargo.toml b/Cargo.toml index 71d3c104..c005ffc2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ repository = "https://github.com/base/node-reth" [workspace] resolver = "2" -members = ["bin/*", "crates/client/*", "crates/shared/*"] +members = ["bin/*", "crates/client/*", "crates/shared/*", "testing/*"] default-members = ["bin/node"] [workspace.metadata.cargo-udeps.ignore] @@ -63,6 +63,8 @@ base-txpool = { path = "crates/client/txpool" } base-reth-runner = { path = "crates/client/runner" } base-reth-test-utils = { path = "crates/client/test-utils" } base-reth-flashblocks = { path = "crates/client/flashblocks" } +# Testing +base-testing-e2e = { path = "testing/e2e" } # reth reth = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" } @@ -170,3 +172,14 @@ serde_json = "1.0.145" metrics-derive = "0.1.0" tracing-subscriber = "0.3.22" thiserror = "2.0" + +# Additional alloy crates +alloy-transport = "1.0.41" +alloy-transport-http = "1.0.41" +alloy-signer = "1.0.41" +alloy-network = "1.0.41" + +# Testing misc +colored = "2.1" +dotenvy = "0.15" +hex = "0.4" diff --git a/bin/flashblocks-e2e/Cargo.toml b/bin/flashblocks-e2e/Cargo.toml new file mode 100644 index 00000000..2fee72b4 --- /dev/null +++ b/bin/flashblocks-e2e/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "flashblocks-e2e" +description = "End-to-end regression testing tool for node-reth flashblocks RPC" + +version.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true + +[lints] +workspace = true + +[dependencies] +# internal +base-testing-e2e.workspace = true + +# alloy +alloy-primitives.workspace = true + +# misc +clap = { workspace = true, features = ["derive"] } +dotenvy.workspace = true +eyre.workspace = true +tokio = { workspace = true, features = ["full"] } +tracing.workspace = true +tracing-subscriber = { workspace = true, features = ["env-filter"] } diff --git a/bin/flashblocks-e2e/README.md b/bin/flashblocks-e2e/README.md new file mode 100644 index 00000000..ad3a4c6e --- /dev/null +++ b/bin/flashblocks-e2e/README.md @@ -0,0 +1,50 @@ +# flashblocks-e2e + +End-to-end regression testing tool for node-reth flashblocks RPC. + +## Overview + +This tool runs a comprehensive suite of tests against a live node to validate +the flashblocks RPC implementation including state visibility, eth_call, +transaction receipts, WebSocket subscriptions, and metering endpoints. + +## Usage + +```bash +# Run all tests against a local node +flashblocks-e2e --rpc-url http://localhost:8545 --flashblocks-ws-url wss://localhost:8546/ws + +# Run with a filter +flashblocks-e2e --filter "flashblock*" + +# List available tests +flashblocks-e2e --list + +# Continue running after failures +flashblocks-e2e --keep-going + +# Verbose output +flashblocks-e2e -v + +# JSON output for CI +flashblocks-e2e --format json +``` + +## Environment Variables + +- `PRIVATE_KEY`: Hex-encoded private key for transaction-sending tests (optional) + +## Test Categories + +- **blocks**: Block retrieval and pending state visibility +- **call**: `eth_call` and `eth_estimateGas` tests +- **receipts**: Transaction receipt retrieval +- **logs**: `eth_getLogs` including pending logs +- **sanity**: Flashblocks WebSocket endpoint validation +- **metering**: `base_meterBundle` and `base_meteredPriorityFeePerGas` endpoints +- **contracts**: Contract deployment and interaction tests + +## Exit Codes + +- `0`: All tests passed +- `1`: One or more tests failed diff --git a/bin/flashblocks-e2e/src/cli.rs b/bin/flashblocks-e2e/src/cli.rs new file mode 100644 index 00000000..ea505f17 --- /dev/null +++ b/bin/flashblocks-e2e/src/cli.rs @@ -0,0 +1,66 @@ +//! CLI argument parsing and configuration. + +use clap::Parser; +use tracing_subscriber::{EnvFilter, fmt, prelude::*}; + +/// End-to-end regression testing for node-reth flashblocks RPC. +#[derive(Parser, Debug)] +#[command(name = "flashblocks-e2e")] +#[command(about = "End-to-end regression testing for node-reth flashblocks RPC")] +pub(crate) struct Args { + /// HTTP RPC endpoint URL for the node being tested. + #[arg(long, default_value = "http://localhost:8545")] + pub rpc_url: String, + + /// Flashblocks WebSocket URL (sequencer/builder endpoint). + #[arg(long, default_value = "wss://mainnet.flashblocks.base.org/ws")] + pub flashblocks_ws_url: String, + + /// Recipient address for ETH transfers in tests. + /// Required when PRIVATE_KEY is set. + #[arg(long)] + pub recipient: Option, + + /// Run only tests matching this filter (supports glob patterns). + #[arg(long, short)] + pub filter: Option, + + /// List available tests without running them. + #[arg(long)] + pub list: bool, + + /// Continue running tests even after failures. + #[arg(long, default_value = "false")] + pub keep_going: bool, + + /// Verbose output (can be repeated for more verbosity). + #[arg(long, short, action = clap::ArgAction::Count)] + pub verbose: u8, + + /// Output format: text, json. + #[arg(long, default_value = "text")] + pub format: OutputFormat, +} + +/// Output format for test results. +#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)] +pub(crate) enum OutputFormat { + /// Human-readable text output with colors. + Text, + /// JSON output for CI integration. + Json, +} + +/// Initialize tracing with the specified verbosity level. +pub(crate) fn init_tracing(verbose: u8) { + let filter = match verbose { + 0 => "flashblocks_e2e=info,base_testing_e2e=info", + 1 => "flashblocks_e2e=debug,base_testing_e2e=debug", + _ => "flashblocks_e2e=trace,base_testing_e2e=trace", + }; + + tracing_subscriber::registry() + .with(fmt::layer()) + .with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(filter))) + .init(); +} diff --git a/bin/flashblocks-e2e/src/main.rs b/bin/flashblocks-e2e/src/main.rs new file mode 100644 index 00000000..c176ca49 --- /dev/null +++ b/bin/flashblocks-e2e/src/main.rs @@ -0,0 +1,84 @@ +#![doc = include_str!("../README.md")] +#![doc(issue_tracker_base_url = "https://github.com/base/node-reth/issues/")] +#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] +#![cfg_attr(not(test), warn(unused_crate_dependencies))] + +//! Flashblocks E2E - End-to-end regression testing for node-reth flashblocks RPC. + +mod cli; + +use alloy_primitives::Address; +use base_testing_e2e::{ + TestClient, build_test_suite, list_tests, print_results_json, print_results_text, run_tests, +}; +use clap::Parser; +use cli::{Args, OutputFormat}; +use eyre::{Result, WrapErr}; + +#[tokio::main] +async fn main() -> Result<()> { + // Load .env file if present (ignores errors if file doesn't exist) + let _ = dotenvy::dotenv(); + + let args = Args::parse(); + + // Initialize tracing + cli::init_tracing(args.verbose); + + // Get private key from environment (optional) + let private_key = std::env::var("PRIVATE_KEY").ok(); + + if private_key.is_some() { + tracing::info!("Private key loaded from environment"); + } else { + tracing::warn!("No PRIVATE_KEY set - tests requiring transaction signing will be skipped"); + } + + // Parse recipient address if provided + let recipient: Option
= args + .recipient + .as_ref() + .map(|s| s.parse()) + .transpose() + .wrap_err("Invalid recipient address")?; + + if recipient.is_none() { + tracing::warn!("No --recipient set - tests requiring ETH transfers will be skipped"); + } + + // Create test client (chain ID is fetched from RPC) + let client = + TestClient::new(&args.rpc_url, &args.flashblocks_ws_url, private_key.as_deref(), recipient) + .await?; + + if let Some(addr) = client.signer_address() { + tracing::info!(address = ?addr, "Signer configured"); + } + if let Some(addr) = client.recipient() { + tracing::info!(address = ?addr, "Recipient configured"); + } + + // Build test suite + let suite = build_test_suite(); + + if args.list { + list_tests(&suite); + return Ok(()); + } + + // Run tests + let results = run_tests(&client, &suite, args.filter.as_deref(), args.keep_going).await; + + // Output results + match args.format { + OutputFormat::Text => print_results_text(&results), + OutputFormat::Json => print_results_json(&results)?, + } + + // Exit with error code if any tests failed + if results.iter().any(|r| !r.passed) { + std::process::exit(1); + } + + Ok(()) +} diff --git a/testing/e2e/Cargo.toml b/testing/e2e/Cargo.toml new file mode 100644 index 00000000..f465aba7 --- /dev/null +++ b/testing/e2e/Cargo.toml @@ -0,0 +1,58 @@ +[package] +name = "base-testing-e2e" +description = "End-to-end testing library for node-reth flashblocks RPC" + +version.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true + +[lints] +workspace = true + +[dependencies] +# Async runtime +tokio = { workspace = true, features = ["full"] } +tokio-tungstenite.workspace = true +futures-util.workspace = true + +# RPC client +alloy-provider.workspace = true +alloy-rpc-client.workspace = true +alloy-rpc-types-eth.workspace = true + +# Ethereum types +alloy-primitives.workspace = true +alloy-consensus.workspace = true +alloy-eips.workspace = true +alloy-sol-types.workspace = true +alloy-sol-macro = { workspace = true, features = ["json"] } + +# Signing +alloy-signer.workspace = true +alloy-signer-local.workspace = true +alloy-network.workspace = true + +# OP Stack +op-alloy-network.workspace = true +op-alloy-rpc-types.workspace = true + +# Serialization +serde = { workspace = true, features = ["derive"] } +serde_json.workspace = true + +# Error handling +eyre.workspace = true + +# Logging +tracing.workspace = true + +# Compression +brotli.workspace = true + +# Misc +url.workspace = true +colored.workspace = true +hex.workspace = true diff --git a/testing/e2e/README.md b/testing/e2e/README.md new file mode 100644 index 00000000..20e049d8 --- /dev/null +++ b/testing/e2e/README.md @@ -0,0 +1,40 @@ +# base-e2e + +End-to-end testing library for node-reth flashblocks RPC. + +## Overview + +This library provides a reusable test framework for validating flashblocks RPC +implementations. It includes: + +- **TestClient**: RPC client for connecting to node-reth instances +- **FlashblockHarness**: Test helper for running tests within flashblock windows +- **FlashblocksStream**: Direct WebSocket streaming from flashblocks endpoints +- **Test Suite**: Comprehensive test cases for flashblocks functionality + +## Test Categories + +- **blocks**: Block retrieval and pending state visibility +- **call**: `eth_call` and `eth_estimateGas` tests +- **receipts**: Transaction receipt retrieval +- **logs**: `eth_getLogs` including pending logs +- **sanity**: Flashblocks WebSocket endpoint validation +- **metering**: `base_meterBundle` and `base_meteredPriorityFeePerGas` endpoints +- **contracts**: Contract deployment and interaction tests + +## Usage + +This library is used by the `flashblocks-e2e` binary. See `bin/flashblocks-e2e/` +for the CLI tool. + +```rust +use base_e2e::{TestClient, build_test_suite, run_tests}; + +let client = TestClient::new(rpc_url, flashblocks_ws_url, private_key).await?; +let suite = build_test_suite(); +let results = run_tests(&client, &suite, None, false).await; +``` + +## Environment Variables + +- `PRIVATE_KEY`: Hex-encoded private key for transaction-sending tests (optional) diff --git a/testing/e2e/src/client.rs b/testing/e2e/src/client.rs new file mode 100644 index 00000000..3f7a72be --- /dev/null +++ b/testing/e2e/src/client.rs @@ -0,0 +1,361 @@ +//! Test client for connecting to a live node-reth instance. + +use alloy_consensus::SignableTransaction; +use alloy_eips::{BlockNumberOrTag, eip2718::Encodable2718}; +use alloy_network::TransactionBuilder; +use alloy_primitives::{Address, B256, Bytes, FixedBytes, U256}; +use alloy_provider::{Provider, RootProvider}; +use alloy_rpc_client::RpcClient; +use alloy_rpc_types_eth::{Filter, Log}; +use alloy_signer::SignerSync; +use alloy_signer_local::PrivateKeySigner; +use eyre::{Result, WrapErr}; +use op_alloy_network::Optimism; +use op_alloy_rpc_types::{OpTransactionReceipt, OpTransactionRequest}; +use tokio_tungstenite::tungstenite::Message; +use url::Url; + +use crate::{ + harness::{WebSocketSubscription, decode_ws_message}, + types::{MeterBundleRequest, MeterBundleResponse, MeteredPriorityFeeResponse, OpBlock}, +}; + +/// Client for interacting with a live node-reth instance. +#[derive(Debug)] +pub struct TestClient { + /// HTTP provider for standard RPC calls. + provider: RootProvider, + /// Raw RPC client for custom methods. + rpc_client: RpcClient, + /// HTTP RPC URL for the node being tested. + pub rpc_url: String, + /// Flashblocks WebSocket URL (sequencer/builder endpoint). + pub flashblocks_ws_url: String, + /// Transaction signer (optional - only if private key provided). + signer: Option, + /// Recipient address for ETH transfers in tests. + recipient: Option
, + /// Chain ID for signing transactions. + chain_id: u64, +} + +impl TestClient { + /// Create a new test client connected to the given endpoints. + /// + /// # Arguments + /// * `rpc_url` - HTTP RPC endpoint URL + /// * `flashblocks_ws_url` - Flashblocks WebSocket URL + /// * `private_key` - Optional hex-encoded private key for signing transactions + /// * `recipient` - Optional recipient address for ETH transfers in tests + /// + /// # Errors + /// Returns an error if recipient equals the signer address. + pub async fn new( + rpc_url: &str, + flashblocks_ws_url: &str, + private_key: Option<&str>, + recipient: Option
, + ) -> Result { + let url: Url = rpc_url.parse().wrap_err("Invalid RPC URL")?; + + // Build provider using RpcClient directly (similar to node-reth's test harness) + let http_client = RpcClient::builder().http(url.clone()); + let provider = RootProvider::::new(http_client); + + let rpc_client = RpcClient::new_http(url); + + // Fetch chain ID from the node + let chain_id = provider.get_chain_id().await.wrap_err("Failed to get chain ID from RPC")?; + tracing::info!(chain_id, "Connected to chain"); + + // Parse private key if provided + let signer = if let Some(key) = private_key { Some(parse_private_key(key)?) } else { None }; + + // Validate recipient != sender + if let (Some(s), Some(r)) = (&signer, recipient) + && s.address() == r + { + return Err(eyre::eyre!( + "Recipient address cannot be the same as the signer address ({})", + r + )); + } + + Ok(Self { + provider, + rpc_client, + rpc_url: rpc_url.to_string(), + flashblocks_ws_url: flashblocks_ws_url.to_string(), + signer, + recipient, + chain_id, + }) + } + + /// Check if we have a signer configured. + pub const fn has_signer(&self) -> bool { + self.signer.is_some() + } + + /// Get the signer's address, if configured. + pub fn signer_address(&self) -> Option
{ + self.signer.as_ref().map(|s| s.address()) + } + + /// Get the recipient address for ETH transfers, if configured. + pub const fn recipient(&self) -> Option
{ + self.recipient + } + + /// Get the chain ID. + pub const fn chain_id(&self) -> u64 { + self.chain_id + } + + /// Get the underlying provider. + pub const fn provider(&self) -> &RootProvider { + &self.provider + } + + /// Get block by number. + pub async fn get_block_by_number(&self, block: BlockNumberOrTag) -> Result> { + self.provider.get_block_by_number(block).await.wrap_err("Failed to get block by number") + } + + /// Get balance at address. + pub async fn get_balance(&self, address: Address, block: BlockNumberOrTag) -> Result { + self.provider + .get_balance(address) + .block_id(block.into()) + .await + .wrap_err("Failed to get balance") + } + + /// Get transaction count (nonce) for address. + pub async fn get_transaction_count( + &self, + address: Address, + block: BlockNumberOrTag, + ) -> Result { + self.provider + .get_transaction_count(address) + .block_id(block.into()) + .await + .wrap_err("Failed to get transaction count") + } + + /// Execute eth_call. + pub async fn eth_call( + &self, + tx: &OpTransactionRequest, + block: BlockNumberOrTag, + ) -> Result { + self.provider.call(tx.clone()).block(block.into()).await.wrap_err("eth_call failed") + } + + /// Estimate gas for transaction. + pub async fn estimate_gas( + &self, + tx: &OpTransactionRequest, + block: BlockNumberOrTag, + ) -> Result { + self.provider + .estimate_gas(tx.clone()) + .block(block.into()) + .await + .wrap_err("Failed to estimate gas") + } + + /// Get transaction by hash. + pub async fn get_transaction_by_hash( + &self, + hash: B256, + ) -> Result> { + self.provider + .get_transaction_by_hash(hash) + .await + .wrap_err("Failed to get transaction by hash") + } + + /// Get transaction receipt. + pub async fn get_transaction_receipt( + &self, + hash: B256, + ) -> Result> { + self.provider + .get_transaction_receipt(hash) + .await + .wrap_err("Failed to get transaction receipt") + } + + /// Get logs matching filter. + pub async fn get_logs(&self, filter: &Filter) -> Result> { + self.provider.get_logs(filter).await.wrap_err("Failed to get logs") + } + + /// Sign a transaction request and return the raw transaction bytes and hash. + pub fn sign_transaction(&self, tx_request: OpTransactionRequest) -> Result<(Bytes, B256)> { + let signer = self + .signer + .as_ref() + .ok_or_else(|| eyre::eyre!("No signer configured - set PRIVATE_KEY"))?; + + // Build the typed transaction + let tx = tx_request + .build_typed_tx() + .map_err(|e| eyre::eyre!("Failed to build typed tx: {:?}", e))?; + + // Sign it + let signature = signer.sign_hash_sync(&tx.signature_hash())?; + let signed_tx = tx.into_signed(signature); + + // Encode and return + let tx_bytes = Bytes::from(signed_tx.encoded_2718()); + let tx_hash = *signed_tx.hash(); + + Ok((tx_bytes, tx_hash)) + } + + /// Create and sign a simple ETH transfer transaction. + pub async fn create_transfer( + &self, + to: Address, + value: U256, + nonce: Option, + ) -> Result<(Bytes, B256)> { + let signer = self + .signer + .as_ref() + .ok_or_else(|| eyre::eyre!("No signer configured - set PRIVATE_KEY"))?; + + let from = signer.address(); + + // Get nonce if not provided + let nonce = match nonce { + Some(n) => n, + None => self.get_transaction_count(from, BlockNumberOrTag::Pending).await?, + }; + + let mut tx_request = OpTransactionRequest::default() + .from(from) + .to(to) + .value(value) + .nonce(nonce) + .gas_limit(21_000) + .max_fee_per_gas(1_000_000_000) // 1 gwei + .max_priority_fee_per_gas(1_000_000); + tx_request.set_chain_id(self.chain_id); + + self.sign_transaction(tx_request) + } + + /// Send a signed raw transaction. + pub async fn send_raw_transaction(&self, tx: Bytes) -> Result { + self.provider + .send_raw_transaction(&tx) + .await + .wrap_err("Failed to send raw transaction") + .map(|pending| *pending.tx_hash()) + } + + /// Send raw transaction and wait for receipt (sync mode). + pub async fn send_raw_transaction_sync( + &self, + tx: Bytes, + timeout_ms: Option, + ) -> Result { + self.rpc_client + .request::<_, OpTransactionReceipt>("eth_sendRawTransactionSync", (tx, timeout_ms)) + .await + .wrap_err("eth_sendRawTransactionSync failed") + } + + /// Send ETH and wait for confirmation. + pub async fn send_eth_and_wait( + &self, + to: Address, + value: U256, + ) -> Result { + let (tx_bytes, tx_hash) = self.create_transfer(to, value, None).await?; + tracing::debug!(?tx_hash, "Sending transaction"); + + // Use sync mode to wait for receipt + self.send_raw_transaction_sync(tx_bytes, Some(6_000)).await + } + + /// Call `base_meterBundle` RPC method. + pub async fn meter_bundle(&self, bundle: MeterBundleRequest) -> Result { + self.rpc_client + .request::<_, MeterBundleResponse>("base_meterBundle", (bundle,)) + .await + .wrap_err("base_meterBundle failed") + } + + /// Call `base_meteredPriorityFeePerGas` RPC method. + pub async fn metered_priority_fee( + &self, + bundle: MeterBundleRequest, + ) -> Result { + self.rpc_client + .request::<_, MeteredPriorityFeeResponse>("base_meteredPriorityFeePerGas", (bundle,)) + .await + .wrap_err("base_meteredPriorityFeePerGas failed") + } + + /// Connect to WebSocket and subscribe to a topic. + pub async fn ws_subscribe( + &self, + method: &str, + params: serde_json::Value, + ) -> Result { + use futures_util::{SinkExt, StreamExt}; + use tokio_tungstenite::connect_async; + + let (mut ws_stream, _) = connect_async(&self.flashblocks_ws_url) + .await + .wrap_err("Failed to connect to WebSocket")?; + + let subscribe_msg = serde_json::json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "eth_subscribe", + "params": [method, params] + }); + + ws_stream + .send(Message::Text(subscribe_msg.to_string().into())) + .await + .wrap_err("Failed to send subscribe message")?; + + // Wait for subscription confirmation + let response = ws_stream + .next() + .await + .ok_or_else(|| eyre::eyre!("WebSocket closed unexpectedly"))? + .wrap_err("Failed to receive subscription response")?; + + let json_str = decode_ws_message(&response)?; + let sub_response: serde_json::Value = + serde_json::from_str(&json_str).wrap_err("Failed to parse subscription response")?; + + let subscription_id = sub_response["result"] + .as_str() + .ok_or_else(|| eyre::eyre!("No subscription ID in response"))? + .to_string(); + + Ok(WebSocketSubscription { stream: ws_stream, subscription_id }) + } +} + +/// Parse a private key from hex string (with or without 0x prefix). +fn parse_private_key(key: &str) -> Result { + let key = key.strip_prefix("0x").unwrap_or(key); + let key_bytes = hex::decode(key).wrap_err("Invalid hex in private key")?; + + if key_bytes.len() != 32 { + return Err(eyre::eyre!("Private key must be 32 bytes, got {}", key_bytes.len())); + } + + let key_fixed: FixedBytes<32> = FixedBytes::from_slice(&key_bytes); + PrivateKeySigner::from_bytes(&key_fixed).wrap_err("Invalid private key") +} diff --git a/testing/e2e/src/harness.rs b/testing/e2e/src/harness.rs new file mode 100644 index 00000000..6816cf19 --- /dev/null +++ b/testing/e2e/src/harness.rs @@ -0,0 +1,372 @@ +//! Flashblocks streaming and test harness. + +use std::{collections::HashSet, io::Read as _, time::Duration}; + +use alloy_primitives::B256; +use eyre::{Result, WrapErr}; +use tokio_tungstenite::tungstenite::Message; + +use crate::{TestClient, types::Flashblock}; + +/// Decode a WebSocket message, handling both text and brotli-compressed binary. +/// +/// The flashblocks endpoint sends brotli-compressed binary messages for efficiency. +pub fn decode_ws_message(msg: &Message) -> Result { + match msg { + Message::Text(text) => Ok(text.to_string()), + Message::Binary(data) => { + // Try brotli decompression + let mut decompressor = brotli::Decompressor::new(&data[..], 4096); + let mut decompressed = Vec::new(); + decompressor + .read_to_end(&mut decompressed) + .wrap_err("Failed to decompress brotli data")?; + + String::from_utf8(decompressed).wrap_err("Decompressed data is not valid UTF-8") + } + Message::Ping(_) | Message::Pong(_) => Err(eyre::eyre!("Unexpected ping/pong message")), + Message::Close(_) => Err(eyre::eyre!("WebSocket closed")), + Message::Frame(_) => Err(eyre::eyre!("Unexpected raw frame")), + } +} + +/// Active WebSocket subscription. +pub struct WebSocketSubscription { + /// The underlying WebSocket stream. + pub stream: tokio_tungstenite::WebSocketStream< + tokio_tungstenite::MaybeTlsStream, + >, + /// The subscription ID. + pub subscription_id: String, +} + +impl std::fmt::Debug for WebSocketSubscription { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("WebSocketSubscription") + .field("subscription_id", &self.subscription_id) + .finish_non_exhaustive() + } +} + +impl WebSocketSubscription { + /// Receive the next notification. + /// + /// Handles both text messages and brotli-compressed binary messages + /// (the flashblocks endpoint uses brotli compression). + pub async fn next_notification(&mut self) -> Result { + use futures_util::StreamExt; + + let msg = self + .stream + .next() + .await + .ok_or_else(|| eyre::eyre!("WebSocket closed"))? + .wrap_err("Failed to receive message")?; + + let json_str = decode_ws_message(&msg)?; + + let notification: serde_json::Value = + serde_json::from_str(&json_str).wrap_err("Failed to parse notification")?; + + Ok(notification) + } + + /// Unsubscribe and close the connection. + pub async fn unsubscribe(mut self) -> Result<()> { + use futures_util::SinkExt; + + let unsubscribe_msg = serde_json::json!({ + "jsonrpc": "2.0", + "id": 2, + "method": "eth_unsubscribe", + "params": [self.subscription_id] + }); + + self.stream + .send(Message::Text(unsubscribe_msg.to_string().into())) + .await + .wrap_err("Failed to send unsubscribe")?; + + Ok(()) + } +} + +/// Direct streaming connection to the flashblocks WebSocket endpoint. +/// +/// Unlike `WebSocketSubscription`, this doesn't use eth_subscribe - it just +/// connects and immediately starts receiving flashblock messages. +pub struct FlashblocksStream { + stream: tokio_tungstenite::WebSocketStream< + tokio_tungstenite::MaybeTlsStream, + >, +} + +impl std::fmt::Debug for FlashblocksStream { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FlashblocksStream").finish_non_exhaustive() + } +} + +impl FlashblocksStream { + /// Connect to the flashblocks WebSocket endpoint. + pub async fn connect(url: &str) -> Result { + use tokio_tungstenite::connect_async; + + let (ws_stream, _) = + connect_async(url).await.wrap_err("Failed to connect to flashblocks WebSocket")?; + + Ok(Self { stream: ws_stream }) + } + + /// Receive the next flashblock message. + pub async fn next_flashblock(&mut self) -> Result { + use futures_util::StreamExt; + + loop { + let msg = self + .stream + .next() + .await + .ok_or_else(|| eyre::eyre!("Flashblocks WebSocket closed"))? + .wrap_err("Failed to receive flashblock message")?; + + // Handle ping/pong internally + if msg.is_ping() || msg.is_pong() { + continue; + } + + let json_str = decode_ws_message(&msg)?; + + let flashblock: Flashblock = + serde_json::from_str(&json_str).wrap_err("Failed to parse flashblock message")?; + + return Ok(flashblock); + } + } + + /// Close the connection. + pub async fn close(self) -> Result<()> { + // Just drop the stream - it will close gracefully + drop(self.stream); + Ok(()) + } +} + +/// Harness for running tests within a flashblock window. +/// +/// This ensures tests can: +/// 1. Wait for flashblock 0/1 of a new block (fresh start) +/// 2. Query pre-state +/// 3. Send transactions +/// 4. See them appear in flashblocks (same block) +/// 5. Query post-state +/// 6. Confirm no flashblocks for the next block were received +pub struct FlashblockHarness { + stream: FlashblocksStream, + /// The block number we're operating in. + block_number: u64, + /// Count of flashblocks received for current block. + flashblock_count: u64, + /// Transaction hashes seen in the current block (from all flashblocks). + seen_tx_hashes: HashSet, + /// The most recent flashblock received. + current_flashblock: Option, +} + +impl std::fmt::Debug for FlashblockHarness { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FlashblockHarness") + .field("block_number", &self.block_number) + .field("flashblock_count", &self.flashblock_count) + .field("seen_tx_hashes", &self.seen_tx_hashes.len()) + .finish_non_exhaustive() + } +} + +impl FlashblockHarness { + /// Create a new harness and wait for the start of a fresh block. + /// + /// Connects to the flashblocks WebSocket and waits until we see flashblock + /// index 0 or 1 of a new block, ensuring we have a full block window to work with. + /// This is critical because tests need to send transactions and see them in + /// pending state BEFORE the block is committed. + pub async fn new(client: &TestClient) -> Result { + let stream = FlashblocksStream::connect(&client.flashblocks_ws_url).await?; + + tracing::debug!("Connected to flashblocks stream"); + + let mut harness = Self { + stream, + block_number: 0, + flashblock_count: 0, + seen_tx_hashes: HashSet::new(), + current_flashblock: None, + }; + + // Wait for the start of a fresh block (index 0 or 1) + // This ensures we have most of the block window available for our test + loop { + let flashblock = harness.wait_for_next_flashblock().await?; + let fb_index = flashblock.index; + let fb_block = flashblock.metadata.block_number; + + if fb_index <= 1 { + tracing::info!( + block_number = fb_block, + flashblock_index = fb_index, + "Harness ready at start of block" + ); + break; + } + tracing::debug!( + block_number = fb_block, + flashblock_index = fb_index, + "Waiting for fresh block start (index 0 or 1)..." + ); + } + + Ok(harness) + } + + /// Wait for the next flashblock. + /// Updates internal state and returns the new flashblock. + pub async fn wait_for_next_flashblock(&mut self) -> Result<&Flashblock> { + let flashblock = self.stream.next_flashblock().await?; + + let new_block_number = flashblock.metadata.block_number; + + if new_block_number != self.block_number { + // New block started - reset tracking + self.block_number = new_block_number; + self.flashblock_count = 1; + self.seen_tx_hashes.clear(); + tracing::debug!(block_number = new_block_number, "New block started"); + } else { + self.flashblock_count += 1; + } + + // Track all transaction hashes from this flashblock's receipts + for tx_hash in flashblock.metadata.receipts.keys() { + self.seen_tx_hashes.insert(tx_hash.clone()); + } + + tracing::trace!( + block_number = self.block_number, + flashblock_index = flashblock.index, + flashblock_count = self.flashblock_count, + tx_count = flashblock.diff.transactions.len(), + "Received flashblock" + ); + + self.current_flashblock = Some(flashblock); + Ok(self.current_flashblock.as_ref().unwrap()) + } + + /// Wait for a transaction to appear in a flashblock within the current block. + /// + /// This waits for the transaction to appear in a flashblock. If a new block + /// starts before the transaction is seen, this fails - the test must complete + /// within a single block window to properly test pending state visibility. + pub async fn wait_for_tx(&mut self, tx_hash: B256, timeout: Duration) -> Result<()> { + let deadline = tokio::time::Instant::now() + timeout; + let starting_block = self.block_number; + let tx_hash_str = format!("{:#x}", tx_hash); + + // Check if we already have the tx + if self.seen_tx_hashes.contains(&tx_hash_str) { + tracing::debug!( + ?tx_hash, + block = self.block_number, + "Transaction already seen in flashblock" + ); + return Ok(()); + } + + loop { + // Wait for next flashblock with timeout + let remaining = deadline.saturating_duration_since(tokio::time::Instant::now()); + if remaining.is_zero() { + return Err(eyre::eyre!( + "Timeout waiting for tx {:?} in flashblock (block {})", + tx_hash, + starting_block + )); + } + + match tokio::time::timeout(remaining, self.stream.next_flashblock()).await { + Ok(Ok(flashblock)) => { + let new_block_number = flashblock.metadata.block_number; + + if new_block_number != self.block_number { + // Block boundary crossed - fail the test + return Err(eyre::eyre!( + "Block {} ended before tx {:?} appeared in pending state. \ + New block {} started. This can happen if the RPC node forwards \ + transactions to a remote sequencer with latency.", + starting_block, + tx_hash, + new_block_number + )); + } + + self.flashblock_count += 1; + + // Track all transaction hashes from this flashblock + for hash in flashblock.metadata.receipts.keys() { + self.seen_tx_hashes.insert(hash.clone()); + } + + if self.seen_tx_hashes.contains(&tx_hash_str) { + tracing::debug!( + ?tx_hash, + block = self.block_number, + "Transaction found in flashblock" + ); + self.current_flashblock = Some(flashblock); + return Ok(()); + } + + self.current_flashblock = Some(flashblock); + } + Ok(Err(e)) => return Err(e), + Err(_) => { + return Err(eyre::eyre!( + "Timeout waiting for tx {:?} in flashblock (block {})", + tx_hash, + starting_block + )); + } + } + } + } + + /// Assert that we're still in the same block (no next block flashblocks received). + /// + /// This is called after test operations to confirm everything happened + /// within a single block's flashblock window. + pub fn assert_same_block(&self, expected_block: u64) -> Result<()> { + if self.block_number != expected_block { + return Err(eyre::eyre!( + "Block changed during test: expected {}, now at {}", + expected_block, + self.block_number + )); + } + Ok(()) + } + + /// Get the current block number we're operating in. + pub const fn block_number(&self) -> u64 { + self.block_number + } + + /// Get the count of flashblocks received for the current block. + pub const fn flashblock_count(&self) -> u64 { + self.flashblock_count + } + + /// Close the stream. + pub async fn close(self) -> Result<()> { + self.stream.close().await + } +} diff --git a/testing/e2e/src/lib.rs b/testing/e2e/src/lib.rs new file mode 100644 index 00000000..570b1d6c --- /dev/null +++ b/testing/e2e/src/lib.rs @@ -0,0 +1,26 @@ +#![doc = include_str!("../README.md")] +#![doc(issue_tracker_base_url = "https://github.com/base/node-reth/issues/")] +#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] +#![cfg_attr(not(test), warn(unused_crate_dependencies))] + +//! End-to-end testing library for node-reth flashblocks RPC. + +mod client; +pub use client::TestClient; + +pub mod harness; +pub use harness::{FlashblockHarness, FlashblocksStream, WebSocketSubscription}; + +mod runner; +pub use runner::{ + TestResult, TestSummary, list_tests, print_results_json, print_results_text, run_tests, +}; + +pub mod types; +pub use types::{ + Flashblock, FlashblockBase, FlashblockDiff, FlashblockMetadata, MeterBundleRequest, + MeterBundleResponse, MeterTxResult, MeteredPriorityFeeResponse, OpBlock, ResourceEstimate, +}; + +pub mod tests; +pub use tests::{SkipFn, Test, TestCategory, TestFn, TestSuite, build_test_suite}; diff --git a/testing/e2e/src/runner.rs b/testing/e2e/src/runner.rs new file mode 100644 index 00000000..8b24f148 --- /dev/null +++ b/testing/e2e/src/runner.rs @@ -0,0 +1,311 @@ +//! Test runner and result reporting. + +use std::time::{Duration, Instant}; + +use alloy_eips::BlockNumberOrTag; +use colored::Colorize; +use eyre::Result; +use serde::Serialize; + +use crate::{ + TestClient, + tests::{Test, TestSuite}, +}; + +/// Result of running a single test. +#[derive(Debug, Clone, Serialize)] +pub struct TestResult { + /// Name of the test. + pub name: String, + /// Category the test belongs to. + pub category: String, + /// Whether the test passed. + pub passed: bool, + /// Duration in milliseconds. + pub duration_ms: u64, + /// Error message if the test failed. + pub error: Option, + /// Whether the test was skipped. + pub skipped: bool, + /// Reason for skipping if applicable. + pub skip_reason: Option, +} + +/// Summary of test run. +#[derive(Debug, Serialize)] +pub struct TestSummary { + /// Total number of tests. + pub total: usize, + /// Number of passed tests. + pub passed: usize, + /// Number of failed tests. + pub failed: usize, + /// Number of skipped tests. + pub skipped: usize, + /// Total duration in milliseconds. + pub duration_ms: u64, +} + +/// List all tests in the suite. +pub fn list_tests(suite: &TestSuite) { + println!("{}", "Available tests:".bold()); + println!(); + + for category in &suite.categories { + println!(" {} {}", "Category:".cyan(), category.name.bold()); + if let Some(desc) = &category.description { + println!(" {}", desc.dimmed()); + } + for test in &category.tests { + println!(" - {}", test.name); + if let Some(desc) = &test.description { + println!(" {}", desc.dimmed()); + } + } + println!(); + } +} + +/// Run tests matching the optional filter. +pub async fn run_tests( + client: &TestClient, + suite: &TestSuite, + filter: Option<&str>, + keep_going: bool, +) -> Vec { + let mut results = Vec::new(); + let start = Instant::now(); + + // Check connection first + println!("{}", "Connecting to node...".dimmed()); + match client.get_block_by_number(BlockNumberOrTag::Latest).await { + Ok(Some(block)) => { + println!("{} Connected to node at block #{}", "OK".green(), block.header.number); + } + Ok(None) => { + println!("{} Connected but no blocks found", "WARN".yellow()); + } + Err(e) => { + println!("{} Failed to connect: {}", "ERROR".red(), e); + return vec![TestResult { + name: "connection".to_string(), + category: "setup".to_string(), + passed: false, + duration_ms: 0, + error: Some(e.to_string()), + skipped: false, + skip_reason: None, + }]; + } + } + + println!(); + println!("{}", "Running tests...".bold()); + println!(); + + for category in &suite.categories { + let category_tests: Vec<&Test> = + category.tests.iter().filter(|t| matches_filter(&t.name, filter)).collect(); + + if category_tests.is_empty() { + continue; + } + + println!(" {} {}", "Category:".cyan(), category.name.bold()); + + for test in category_tests { + let result = run_single_test(client, &category.name, test).await; + + // Print result + let status = if result.skipped { + "SKIP".yellow() + } else if result.passed { + "PASS".green() + } else { + "FAIL".red() + }; + + println!(" {} {} ({}ms)", status, test.name, result.duration_ms); + + if let Some(ref err) = result.error { + println!(" {}", err.red()); + } + + if let Some(ref reason) = result.skip_reason { + println!(" {}", reason.dimmed()); + } + + let failed = !result.passed && !result.skipped; + results.push(result); + + // Stop early if not keep_going and test failed + if failed && !keep_going { + println!(); + println!( + "{}", + "Stopping due to test failure (use --keep-going to continue)".yellow() + ); + return results; + } + } + + println!(); + } + + let elapsed = start.elapsed(); + println!("Completed {} tests in {:.2}s", results.len(), elapsed.as_secs_f64()); + + results +} + +/// Run a single test. +async fn run_single_test(client: &TestClient, category: &str, test: &Test) -> TestResult { + let start = Instant::now(); + + // Check skip condition + if let Some(ref skip_fn) = test.skip_if + && let Some(reason) = skip_fn(client).await + { + return TestResult { + name: test.name.clone(), + category: category.to_string(), + passed: true, + duration_ms: start.elapsed().as_millis() as u64, + error: None, + skipped: true, + skip_reason: Some(reason), + }; + } + + // Run the test + let result = tokio::time::timeout(Duration::from_secs(30), (test.run)(client)).await; + + let duration_ms = start.elapsed().as_millis() as u64; + + match result { + Ok(Ok(())) => TestResult { + name: test.name.clone(), + category: category.to_string(), + passed: true, + duration_ms, + error: None, + skipped: false, + skip_reason: None, + }, + Ok(Err(e)) => TestResult { + name: test.name.clone(), + category: category.to_string(), + passed: false, + duration_ms, + error: Some(format!("{:#}", e)), + skipped: false, + skip_reason: None, + }, + Err(_) => TestResult { + name: test.name.clone(), + category: category.to_string(), + passed: false, + duration_ms, + error: Some("Test timed out after 30s".to_string()), + skipped: false, + skip_reason: None, + }, + } +} + +/// Check if test name matches filter. +fn matches_filter(name: &str, filter: Option<&str>) -> bool { + match filter { + None => true, + Some(f) => { + // Simple glob matching + if f.contains('*') { + let parts: Vec<&str> = f.split('*').collect(); + let mut remaining = name; + for (i, part) in parts.iter().enumerate() { + if part.is_empty() { + continue; + } + if i == 0 { + // Must start with this part + if !remaining.starts_with(part) { + return false; + } + remaining = &remaining[part.len()..]; + } else if i == parts.len() - 1 { + // Must end with this part + if !remaining.ends_with(part) { + return false; + } + } else { + // Must contain this part + if let Some(pos) = remaining.find(part) { + remaining = &remaining[pos + part.len()..]; + } else { + return false; + } + } + } + true + } else { + name.contains(f) + } + } + } +} + +/// Print results in text format. +pub fn print_results_text(results: &[TestResult]) { + let summary = compute_summary(results); + + println!(); + println!("{}", "=".repeat(60)); + println!("{}", "Test Summary".bold()); + println!("{}", "=".repeat(60)); + println!(); + + println!(" Total: {}", summary.total); + println!(" Passed: {}", format!("{}", summary.passed).green()); + println!(" Failed: {}", format!("{}", summary.failed).red()); + println!(" Skipped: {}", format!("{}", summary.skipped).yellow()); + println!(" Duration: {:.2}s", summary.duration_ms as f64 / 1000.0); + println!(); + + if summary.failed > 0 { + println!("{}", "Failed tests:".red().bold()); + for result in results.iter().filter(|r| !r.passed && !r.skipped) { + println!(" - {} ({})", result.name, result.category); + if let Some(ref err) = result.error { + println!(" {}", err.dimmed()); + } + } + println!(); + } + + if summary.failed == 0 { + println!("{}", "All tests passed!".green().bold()); + } else { + println!("{}", format!("{} test(s) failed", summary.failed).red().bold()); + } +} + +/// Print results in JSON format. +pub fn print_results_json(results: &[TestResult]) -> Result<()> { + let output = serde_json::json!({ + "results": results, + "summary": compute_summary(results), + }); + + println!("{}", serde_json::to_string_pretty(&output)?); + Ok(()) +} + +fn compute_summary(results: &[TestResult]) -> TestSummary { + let total = results.len(); + let passed = results.iter().filter(|r| r.passed && !r.skipped).count(); + let skipped = results.iter().filter(|r| r.skipped).count(); + let failed = results.iter().filter(|r| !r.passed && !r.skipped).count(); + let duration_ms: u64 = results.iter().map(|r| r.duration_ms).sum(); + + TestSummary { total, passed, failed, skipped, duration_ms } +} diff --git a/testing/e2e/src/tests/blocks.rs b/testing/e2e/src/tests/blocks.rs new file mode 100644 index 00000000..20da08f5 --- /dev/null +++ b/testing/e2e/src/tests/blocks.rs @@ -0,0 +1,227 @@ +//! Tests for block retrieval and state visibility. + +use std::time::Duration; + +use alloy_eips::BlockNumberOrTag; +use alloy_primitives::U256; +use eyre::{Result, ensure}; + +use crate::{ + TestClient, + harness::FlashblockHarness, + tests::{Test, TestCategory}, +}; + +/// Build the blocks test category. +pub(crate) fn category() -> TestCategory { + TestCategory { + name: "blocks".to_string(), + description: Some("Block retrieval and pending state tests".to_string()), + tests: vec![ + Test { + name: "get_latest_block".to_string(), + description: Some("Verify we can retrieve the latest block".to_string()), + run: Box::new(|client| Box::pin(test_get_latest_block(client))), + skip_if: None, + }, + Test { + name: "get_pending_block".to_string(), + description: Some("Verify we can retrieve the pending block".to_string()), + run: Box::new(|client| Box::pin(test_get_pending_block(client))), + skip_if: None, + }, + Test { + name: "pending_block_number_gt_latest".to_string(), + description: Some("Pending block number should be >= latest".to_string()), + run: Box::new(|client| Box::pin(test_pending_block_number(client))), + skip_if: None, + }, + Test { + name: "flashblock_balance_change".to_string(), + description: Some( + "Send tx and verify balance change visible in pending state within same block" + .to_string(), + ), + run: Box::new(|client| Box::pin(test_flashblock_balance_change(client))), + skip_if: Some(Box::new(|client| { + Box::pin(async move { skip_if_no_signer_or_recipient(client) }) + })), + }, + Test { + name: "flashblock_nonce_change".to_string(), + description: Some( + "Send tx and verify nonce change visible in pending state within same block" + .to_string(), + ), + run: Box::new(|client| Box::pin(test_flashblock_nonce_change(client))), + skip_if: Some(Box::new(|client| { + Box::pin(async move { skip_if_no_signer_or_recipient(client) }) + })), + }, + ], + } +} + +/// Returns a skip reason if signer or recipient is not configured. +fn skip_if_no_signer_or_recipient(client: &TestClient) -> Option { + if !client.has_signer() { + Some("No PRIVATE_KEY configured".to_string()) + } else if client.recipient().is_none() { + Some("No --recipient configured".to_string()) + } else { + None + } +} + +async fn test_get_latest_block(client: &TestClient) -> Result<()> { + let block = client.get_block_by_number(BlockNumberOrTag::Latest).await?; + ensure!(block.is_some(), "Latest block should exist"); + + let block = block.unwrap(); + tracing::debug!(number = block.header.number, "Got latest block"); + + Ok(()) +} + +async fn test_get_pending_block(client: &TestClient) -> Result<()> { + let block = client.get_block_by_number(BlockNumberOrTag::Pending).await?; + ensure!(block.is_some(), "Pending block should exist"); + + let block = block.unwrap(); + tracing::debug!(number = block.header.number, "Got pending block"); + + Ok(()) +} + +async fn test_pending_block_number(client: &TestClient) -> Result<()> { + let latest = client + .get_block_by_number(BlockNumberOrTag::Latest) + .await? + .ok_or_else(|| eyre::eyre!("No latest block"))?; + + let pending = client + .get_block_by_number(BlockNumberOrTag::Pending) + .await? + .ok_or_else(|| eyre::eyre!("No pending block"))?; + + ensure!( + pending.header.number >= latest.header.number, + "Pending block number ({}) should be >= latest ({})", + pending.header.number, + latest.header.number + ); + + Ok(()) +} + +/// Test that balance changes are visible in pending state via flashblocks. +/// +/// This test: +/// 1. Waits for the start of a fresh block (flashblock index 0 or 1) +/// 2. Queries pre-state balance +/// 3. Sends a transaction +/// 4. Waits for it to appear in a flashblock (must be same block) +/// 5. Queries post-state balance +/// 6. Verifies the pending state shows the balance change +async fn test_flashblock_balance_change(client: &TestClient) -> Result<()> { + let from = client.signer_address().ok_or_else(|| eyre::eyre!("No signer"))?; + + // Start the flashblock harness (waits for start of a fresh block) + let mut harness = FlashblockHarness::new(client).await?; + let block_number = harness.block_number(); + + // Query pre-state + let balance_before = client.get_balance(from, BlockNumberOrTag::Pending).await?; + tracing::debug!(?balance_before, block = block_number, "Balance before tx"); + + // Send transaction + let value = U256::from(1u64); // 1 wei - minimum to detect state change + let recipient = client.recipient().ok_or_else(|| eyre::eyre!("No recipient configured"))?; + let (tx_bytes, tx_hash) = client.create_transfer(recipient, value, None).await?; + + tracing::info!(?tx_hash, "Sending transaction"); + client.send_raw_transaction(tx_bytes).await?; + + // Wait for tx to appear in flashblock (fails if block boundary crossed) + harness.wait_for_tx(tx_hash, Duration::from_secs(10)).await?; + + // Query post-state - this tests that pending state reflects the flashblock + let balance_after = client.get_balance(from, BlockNumberOrTag::Pending).await?; + tracing::debug!(?balance_after, "Balance after flashblock tx"); + + // Verify balance decreased (by value + gas) + ensure!( + balance_after < balance_before, + "Pending balance should decrease after flashblock tx: before={}, after={}", + balance_before, + balance_after + ); + + // Verify we're still in the same block (pending state test is valid) + harness.assert_same_block(block_number)?; + + tracing::info!( + block = block_number, + flashblocks = harness.flashblock_count(), + "Balance change verified in pending state within flashblock window" + ); + + harness.close().await?; + Ok(()) +} + +/// Test that nonce changes are visible in pending state via flashblocks. +/// +/// This test: +/// 1. Waits for the start of a fresh block (flashblock index 0 or 1) +/// 2. Queries pre-state nonce +/// 3. Sends a transaction +/// 4. Waits for it to appear in a flashblock (must be same block) +/// 5. Queries post-state nonce +/// 6. Verifies the pending state shows the nonce change +async fn test_flashblock_nonce_change(client: &TestClient) -> Result<()> { + let from = client.signer_address().ok_or_else(|| eyre::eyre!("No signer"))?; + + // Start the flashblock harness (waits for start of a fresh block) + let mut harness = FlashblockHarness::new(client).await?; + let block_number = harness.block_number(); + + // Query pre-state + let nonce_before = client.get_transaction_count(from, BlockNumberOrTag::Pending).await?; + tracing::debug!(nonce_before, block = block_number, "Nonce before tx"); + + // Send transaction + let value = U256::from(1u64); // 1 wei - minimum to detect state change + let recipient = client.recipient().ok_or_else(|| eyre::eyre!("No recipient configured"))?; + let (tx_bytes, tx_hash) = client.create_transfer(recipient, value, Some(nonce_before)).await?; + + tracing::info!(?tx_hash, "Sending transaction"); + client.send_raw_transaction(tx_bytes).await?; + + // Wait for tx to appear in flashblock (fails if block boundary crossed) + harness.wait_for_tx(tx_hash, Duration::from_secs(10)).await?; + + // Query post-state - this tests that pending state reflects the flashblock + let nonce_after = client.get_transaction_count(from, BlockNumberOrTag::Pending).await?; + tracing::debug!(nonce_after, "Nonce after flashblock tx"); + + // Verify nonce incremented + ensure!( + nonce_after == nonce_before + 1, + "Pending nonce should increment after flashblock tx: before={}, after={}", + nonce_before, + nonce_after + ); + + // Verify we're still in the same block (pending state test is valid) + harness.assert_same_block(block_number)?; + + tracing::info!( + block = block_number, + flashblocks = harness.flashblock_count(), + "Nonce change verified in pending state within flashblock window" + ); + + harness.close().await?; + Ok(()) +} diff --git a/testing/e2e/src/tests/call.rs b/testing/e2e/src/tests/call.rs new file mode 100644 index 00000000..6051ad59 --- /dev/null +++ b/testing/e2e/src/tests/call.rs @@ -0,0 +1,212 @@ +//! Tests for eth_call and eth_estimateGas. + +use std::time::Duration; + +use alloy_eips::BlockNumberOrTag; +use alloy_primitives::{Address, U256}; +use eyre::{Result, ensure}; +use op_alloy_rpc_types::OpTransactionRequest; + +use crate::{ + TestClient, + harness::FlashblockHarness, + tests::{Test, TestCategory}, +}; + +/// Build the call test category. +pub(crate) fn category() -> TestCategory { + TestCategory { + name: "call".to_string(), + description: Some("eth_call and eth_estimateGas tests".to_string()), + tests: vec![ + Test { + name: "eth_call_latest".to_string(), + description: Some("eth_call against latest block".to_string()), + run: Box::new(|client| Box::pin(test_eth_call_latest(client))), + skip_if: Some(Box::new(|client| { + Box::pin(async move { skip_if_no_addresses(client) }) + })), + }, + Test { + name: "eth_call_pending".to_string(), + description: Some("eth_call against pending block".to_string()), + run: Box::new(|client| Box::pin(test_eth_call_pending(client))), + skip_if: Some(Box::new(|client| { + Box::pin(async move { skip_if_no_addresses(client) }) + })), + }, + Test { + name: "eth_estimate_gas_latest".to_string(), + description: Some("eth_estimateGas against latest block".to_string()), + run: Box::new(|client| Box::pin(test_estimate_gas_latest(client))), + skip_if: Some(Box::new(|client| { + Box::pin(async move { skip_if_no_addresses(client) }) + })), + }, + Test { + name: "eth_estimate_gas_pending".to_string(), + description: Some("eth_estimateGas against pending block".to_string()), + run: Box::new(|client| Box::pin(test_estimate_gas_pending(client))), + skip_if: Some(Box::new(|client| { + Box::pin(async move { skip_if_no_addresses(client) }) + })), + }, + Test { + name: "flashblock_eth_call_sees_state".to_string(), + description: Some( + "eth_call against pending sees state changes from flashblock tx".to_string(), + ), + run: Box::new(|client| Box::pin(test_flashblock_eth_call_sees_state(client))), + skip_if: Some(Box::new(|client| { + Box::pin(async move { skip_if_no_signer_or_recipient(client) }) + })), + }, + ], + } +} + +/// Returns a skip reason if neither signer nor recipient is configured. +/// These tests need at least one address to use as from/to. +fn skip_if_no_addresses(client: &TestClient) -> Option { + if client.signer_address().is_none() && client.recipient().is_none() { + Some("No PRIVATE_KEY or --recipient configured".to_string()) + } else { + None + } +} + +/// Returns a skip reason if signer or recipient is not configured. +fn skip_if_no_signer_or_recipient(client: &TestClient) -> Option { + if !client.has_signer() { + Some("No PRIVATE_KEY configured".to_string()) + } else if client.recipient().is_none() { + Some("No --recipient configured".to_string()) + } else { + None + } +} + +/// Get a pair of addresses for from/to in tests. +/// Uses signer as from (if available) and recipient as to (if available). +fn get_test_addresses(client: &TestClient) -> (Address, Address) { + let from = client + .signer_address() + .or(client.recipient()) + .expect("at least one address should be configured"); + let to = client + .recipient() + .or(client.signer_address()) + .expect("at least one address should be configured"); + (from, to) +} + +async fn test_eth_call_latest(client: &TestClient) -> Result<()> { + let (from, to) = get_test_addresses(client); + + // Simple call that should succeed - just calling with no data + let tx = OpTransactionRequest::default().from(from).to(to).value(U256::ZERO); + + let result = client.eth_call(&tx, BlockNumberOrTag::Latest).await?; + tracing::debug!(?result, "eth_call result at latest"); + + Ok(()) +} + +async fn test_eth_call_pending(client: &TestClient) -> Result<()> { + let (from, to) = get_test_addresses(client); + + let tx = OpTransactionRequest::default().from(from).to(to).value(U256::ZERO); + + let result = client.eth_call(&tx, BlockNumberOrTag::Pending).await?; + tracing::debug!(?result, "eth_call result at pending"); + + Ok(()) +} + +async fn test_estimate_gas_latest(client: &TestClient) -> Result<()> { + let (from, to) = get_test_addresses(client); + + let tx = OpTransactionRequest::default().from(from).to(to).value(U256::from(1000)); + + let gas = client.estimate_gas(&tx, BlockNumberOrTag::Latest).await?; + tracing::debug!(gas, "Estimated gas at latest"); + + ensure!(gas >= 21000, "Gas estimate should be at least 21000 for transfer"); + Ok(()) +} + +async fn test_estimate_gas_pending(client: &TestClient) -> Result<()> { + let (from, to) = get_test_addresses(client); + + let tx = OpTransactionRequest::default().from(from).to(to).value(U256::from(1000)); + + let gas = client.estimate_gas(&tx, BlockNumberOrTag::Pending).await?; + tracing::debug!(gas, "Estimated gas at pending"); + + ensure!(gas >= 21000, "Gas estimate should be at least 21000 for transfer"); + Ok(()) +} + +/// Test that eth_call against pending block sees state changes from flashblock transactions. +/// +/// This test: +/// 1. Waits for the start of a fresh block (flashblock index 0 or 1) +/// 2. Gets the recipient balance +/// 3. Sends ETH to the recipient +/// 4. Waits for tx in flashblock (must be same block) +/// 5. Verifies eth_call can now see the updated balance in pending state +async fn test_flashblock_eth_call_sees_state(client: &TestClient) -> Result<()> { + // Verify we have a signer (required for sending transactions) + client.signer_address().ok_or_else(|| eyre::eyre!("No signer"))?; + + let recipient = client.recipient().ok_or_else(|| eyre::eyre!("No recipient configured"))?; + + // Start the flashblock harness (waits for start of a fresh block) + let mut harness = FlashblockHarness::new(client).await?; + let block_number = harness.block_number(); + + // Get recipient balance before + let balance_before = client.get_balance(recipient, BlockNumberOrTag::Pending).await?; + tracing::debug!(?balance_before, "Recipient balance before"); + + // Send ETH to recipient + let transfer_amount = U256::from(1u64); // 1 wei - minimum to detect state change + let (tx_bytes, tx_hash) = client.create_transfer(recipient, transfer_amount, None).await?; + + tracing::info!(?tx_hash, block = block_number, "Sending ETH to recipient"); + client.send_raw_transaction(tx_bytes).await?; + + // Wait for tx in flashblock (fails if block boundary crossed) + harness.wait_for_tx(tx_hash, Duration::from_secs(10)).await?; + + // Now verify eth_call sees the updated state in pending + let balance_after = client.get_balance(recipient, BlockNumberOrTag::Pending).await?; + tracing::debug!(?balance_after, "Recipient balance after flashblock tx"); + + ensure!( + balance_after > balance_before, + "Recipient balance should increase: before={}, after={}", + balance_before, + balance_after + ); + + ensure!( + balance_after >= balance_before + transfer_amount, + "Recipient should have at least transfer amount more: before={}, after={}, transfer={}", + balance_before, + balance_after, + transfer_amount + ); + + // Verify we're still in the same block (pending state test is valid) + harness.assert_same_block(block_number)?; + + tracing::info!( + block = block_number, + flashblocks = harness.flashblock_count(), + "eth_call state visibility verified in pending state within flashblock window" + ); + + harness.close().await?; + Ok(()) +} diff --git a/testing/e2e/src/tests/contracts.rs b/testing/e2e/src/tests/contracts.rs new file mode 100644 index 00000000..e55c9948 --- /dev/null +++ b/testing/e2e/src/tests/contracts.rs @@ -0,0 +1,193 @@ +//! Tests for contract interactions via flashblocks. + +use std::time::Duration; + +use alloy_eips::BlockNumberOrTag; +use alloy_network::TransactionBuilder; +use alloy_primitives::{Address, B256, Bytes, U256}; +use alloy_sol_types::SolCall; +use eyre::{Result, ensure}; +use op_alloy_rpc_types::OpTransactionRequest; + +use crate::{ + TestClient, + harness::FlashblockHarness, + tests::{Test, TestCategory}, +}; + +// Define the Counter contract interface using sol! macro for ABI encoding. +// The bytecode is compiled separately with forge (solc 0.8.30). +// +// Source (src/Counter.sol): +// ```solidity +// // SPDX-License-Identifier: UNLICENSED +// pragma solidity ^0.8.20; +// contract Counter { +// uint256 public count; +// function increment() external { count++; } +// function getCount() external view returns (uint256) { return count; } +// } +// ``` +alloy_sol_macro::sol! { + /// Increment the counter. + function increment() external; + /// Get the current count. + function getCount() external view returns (uint256); +} + +// Counter contract bytecode compiled with forge (solc 0.8.30) +// count is stored in slot 0, anyone can increment +const COUNTER_BYTECODE: &str = "6080604052348015600e575f5ffd5b506101898061001c5f395ff3fe608060405234801561000f575f5ffd5b506004361061003f575f3560e01c806306661abd14610043578063a87d942c14610061578063d09de08a1461007f575b5f5ffd5b61004b610089565b60405161005891906100c6565b60405180910390f35b61006961008e565b60405161007691906100c6565b60405180910390f35b610087610096565b005b5f5481565b5f5f54905090565b5f5f8154809291906100a79061010c565b9190505550565b5f819050919050565b6100c0816100ae565b82525050565b5f6020820190506100d95f8301846100b7565b92915050565b7f4e487b71000000000000000000000000000000000000000000000000000000005f52601160045260245ffd5b5f610116826100ae565b91507fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff8203610148576101476100df565b5b60018201905091905056fea2646970667358221220f20d10175682bbbd1b6bb8f4176629c4124ad6a6532bcaf2cfaa2ed6771b941a64736f6c634300081e0033"; + +/// Returns a skip reason if signer is not configured. +fn skip_if_no_signer(client: &TestClient) -> Option { + if !client.has_signer() { Some("No PRIVATE_KEY configured".to_string()) } else { None } +} + +/// Build the contracts test category. +pub(crate) fn category() -> TestCategory { + TestCategory { + name: "contracts".to_string(), + description: Some("Contract deployment and interaction tests".to_string()), + tests: vec![Test { + name: "flashblock_counter_increment".to_string(), + description: Some( + "Deploy Counter, increment, and verify state change in flashblock".to_string(), + ), + run: Box::new(|client| Box::pin(test_flashblock_counter_increment(client))), + skip_if: Some(Box::new(|client| Box::pin(async move { skip_if_no_signer(client) }))), + }], + } +} + +/// Test that deploys a Counter contract and verifies increment is visible via flashblocks. +/// +/// This test: +/// 1. Deploys a Counter contract (only owner can increment) +/// 2. Waits for deployment in flashblock +/// 3. Starts fresh flashblock window +/// 4. Queries pre-state (count = 0) +/// 5. Sends increment transaction +/// 6. Waits for tx in flashblock +/// 7. Queries post-state (count = 1) +/// 8. Verifies still same block +async fn test_flashblock_counter_increment(client: &TestClient) -> Result<()> { + // Verify we have a signer + client.signer_address().ok_or_else(|| eyre::eyre!("No signer"))?; + + // Deploy the Counter contract + let contract_address = deploy_counter(client).await?; + tracing::info!(?contract_address, "Counter contract deployed"); + + // Start a fresh flashblock window for the increment test + let mut harness = FlashblockHarness::new(client).await?; + let block_number = harness.block_number(); + + // Query pre-state: count should be 0 + let count_before = call_get_count(client, contract_address).await?; + tracing::debug!(count = %count_before, block = block_number, "Count before increment"); + + ensure!(count_before == U256::ZERO, "Initial count should be 0, got {}", count_before); + + // Send increment transaction + let (tx_bytes, tx_hash) = create_increment_tx(client, contract_address).await?; + + tracing::info!(?tx_hash, block = block_number, "Sending increment transaction"); + client.send_raw_transaction(tx_bytes).await?; + + // Wait for tx in flashblock (fails if block boundary crossed) + harness.wait_for_tx(tx_hash, Duration::from_secs(10)).await?; + + // Query post-state: count should be 1 in pending state + let count_after = call_get_count(client, contract_address).await?; + tracing::debug!(count = %count_after, "Count after flashblock tx"); + + ensure!(count_after == U256::from(1), "Count should be 1 after increment, got {}", count_after); + + // Verify we're still in the same block (pending state test is valid) + harness.assert_same_block(block_number)?; + + tracing::info!( + block = block_number, + flashblocks = harness.flashblock_count(), + "Counter increment verified in pending state within flashblock window" + ); + + harness.close().await?; + Ok(()) +} + +/// Deploy the Counter contract and return its address. +async fn deploy_counter(client: &TestClient) -> Result
{ + let owner = client.signer_address().ok_or_else(|| eyre::eyre!("No signer"))?; + + // Get the contract bytecode + let bytecode = hex::decode(COUNTER_BYTECODE) + .map_err(|e| eyre::eyre!("Failed to decode bytecode: {}", e))?; + + // Get nonce + let nonce = client.get_transaction_count(owner, BlockNumberOrTag::Pending).await?; + + // Build deployment transaction using with_deploy_code for contract creation + let mut tx_request = OpTransactionRequest::default() + .from(owner) + .with_deploy_code(Bytes::from(bytecode)) + .nonce(nonce) + .gas_limit(500_000) + .max_fee_per_gas(1_000_000_000) + .max_priority_fee_per_gas(1_000_000); + tx_request.set_chain_id(client.chain_id()); + + // Sign and send + let (tx_bytes, tx_hash) = client.sign_transaction(tx_request)?; + tracing::debug!(?tx_hash, "Deploying Counter contract"); + + // Use sync mode to wait for deployment + let receipt = client.send_raw_transaction_sync(tx_bytes, Some(6_000)).await?; + + ensure!(receipt.inner.inner.status(), "Contract deployment failed"); + + let contract_address = receipt + .inner + .contract_address + .ok_or_else(|| eyre::eyre!("No contract address in receipt"))?; + + Ok(contract_address) +} + +/// Call getCount() on the Counter contract. +async fn call_get_count(client: &TestClient, contract: Address) -> Result { + let call_data = getCountCall {}.abi_encode(); + + let tx = OpTransactionRequest::default().to(contract).input(Bytes::from(call_data).into()); + + let result = client.eth_call(&tx, BlockNumberOrTag::Pending).await?; + + // Decode the result - returns U256 directly + let count = getCountCall::abi_decode_returns(&result) + .map_err(|e| eyre::eyre!("Failed to decode getCount result: {}", e))?; + + Ok(count) +} + +/// Create an increment() transaction. +async fn create_increment_tx(client: &TestClient, contract: Address) -> Result<(Bytes, B256)> { + let owner = client.signer_address().ok_or_else(|| eyre::eyre!("No signer"))?; + + let call_data = incrementCall {}.abi_encode(); + + // Get nonce + let nonce = client.get_transaction_count(owner, BlockNumberOrTag::Pending).await?; + + let mut tx_request = OpTransactionRequest::default() + .from(owner) + .to(contract) + .input(Bytes::from(call_data).into()) + .nonce(nonce) + .gas_limit(100_000) + .max_fee_per_gas(1_000_000_000) + .max_priority_fee_per_gas(1_000_000); + tx_request.set_chain_id(client.chain_id()); + + client.sign_transaction(tx_request) +} diff --git a/testing/e2e/src/tests/logs.rs b/testing/e2e/src/tests/logs.rs new file mode 100644 index 00000000..cd08ee6a --- /dev/null +++ b/testing/e2e/src/tests/logs.rs @@ -0,0 +1,104 @@ +//! Tests for eth_getLogs. + +use alloy_eips::BlockNumberOrTag; +use alloy_rpc_types_eth::Filter; +use eyre::Result; + +use crate::{ + TestClient, + tests::{Test, TestCategory}, +}; + +/// Build the logs test category. +pub(crate) fn category() -> TestCategory { + TestCategory { + name: "logs".to_string(), + description: Some("eth_getLogs tests including pending logs".to_string()), + tests: vec![ + Test { + name: "get_logs_latest".to_string(), + description: Some("Get logs from latest block".to_string()), + run: Box::new(|client| Box::pin(test_get_logs_latest(client))), + skip_if: None, + }, + Test { + name: "get_logs_pending".to_string(), + description: Some("Get logs including pending block".to_string()), + run: Box::new(|client| Box::pin(test_get_logs_pending(client))), + skip_if: None, + }, + Test { + name: "get_logs_range".to_string(), + description: Some("Get logs from a block range".to_string()), + run: Box::new(|client| Box::pin(test_get_logs_range(client))), + skip_if: None, + }, + Test { + name: "get_logs_mixed_range".to_string(), + description: Some("Get logs from fromBlock: 0, toBlock: pending".to_string()), + run: Box::new(|client| Box::pin(test_get_logs_mixed_range(client))), + skip_if: None, + }, + ], + } +} + +async fn test_get_logs_latest(client: &TestClient) -> Result<()> { + let filter = Filter::new().select(BlockNumberOrTag::Latest); + + let logs = client.get_logs(&filter).await?; + tracing::debug!(count = logs.len(), "Got logs at latest block"); + + Ok(()) +} + +async fn test_get_logs_pending(client: &TestClient) -> Result<()> { + // Query logs from pending to pending (flashblocks state only) + let filter = + Filter::new().from_block(BlockNumberOrTag::Pending).to_block(BlockNumberOrTag::Pending); + + let logs = client.get_logs(&filter).await?; + tracing::debug!(count = logs.len(), "Got pending logs"); + + Ok(()) +} + +async fn test_get_logs_range(client: &TestClient) -> Result<()> { + // Get the latest block number + let latest_block = client + .get_block_by_number(BlockNumberOrTag::Latest) + .await? + .ok_or_else(|| eyre::eyre!("No latest block"))?; + + let from_block = latest_block.header.number.saturating_sub(10); + + let filter = Filter::new().from_block(from_block).to_block(BlockNumberOrTag::Latest); + + let logs = client.get_logs(&filter).await?; + tracing::debug!( + count = logs.len(), + from = from_block, + to = latest_block.header.number, + "Got logs in range" + ); + + Ok(()) +} + +async fn test_get_logs_mixed_range(client: &TestClient) -> Result<()> { + // Get the latest block number + let latest_block = client + .get_block_by_number(BlockNumberOrTag::Latest) + .await? + .ok_or_else(|| eyre::eyre!("No latest block"))?; + + let from_block = latest_block.header.number.saturating_sub(10); + + // Query from historical to pending (should include both canonical and flashblocks logs) + let filter = Filter::new().from_block(from_block).to_block(BlockNumberOrTag::Pending); + + let logs = client.get_logs(&filter).await?; + tracing::debug!(count = logs.len(), from = from_block, "Got logs from historical to pending"); + + Ok(()) +} diff --git a/testing/e2e/src/tests/metering.rs b/testing/e2e/src/tests/metering.rs new file mode 100644 index 00000000..d4c6e497 --- /dev/null +++ b/testing/e2e/src/tests/metering.rs @@ -0,0 +1,119 @@ +//! Tests for metering RPC endpoints. +//! +//! These tests require the node to support the `base_meterBundle` and +//! `base_meteredPriorityFeePerGas` RPC methods. Not all nodes have these. + +use alloy_eips::BlockNumberOrTag; +use eyre::{Result, ensure}; + +use crate::{ + TestClient, + tests::{Test, TestCategory}, + types::MeterBundleRequest, +}; + +/// Check if the node supports metering RPC methods. +async fn check_metering_support(client: &TestClient) -> Option { + // Try a simple metering call - if it returns "Method not found", skip + let bundle = MeterBundleRequest::new(vec![], 0); + match client.meter_bundle(bundle).await { + Ok(_) => None, + Err(e) => { + let err_str = format!("{:?}", e); + if err_str.contains("-32601") || err_str.contains("Method not found") { + Some("Node does not support base_meterBundle RPC method".to_string()) + } else { + // Some other error - let the test run and fail with details + None + } + } + } +} + +/// Build the metering test category. +pub(crate) fn category() -> TestCategory { + TestCategory { + name: "metering".to_string(), + description: Some( + "Bundle metering and priority fee estimation tests (requires base_meterBundle support)" + .to_string(), + ), + tests: vec![ + Test { + name: "meter_bundle_empty".to_string(), + description: Some("Meter an empty bundle".to_string()), + run: Box::new(|client| Box::pin(test_meter_bundle_empty(client))), + skip_if: Some(Box::new(|client| Box::pin(check_metering_support(client)))), + }, + Test { + name: "meter_bundle_state_block".to_string(), + description: Some("Verify metering returns valid state block".to_string()), + run: Box::new(|client| Box::pin(test_meter_bundle_state_block(client))), + skip_if: Some(Box::new(|client| Box::pin(check_metering_support(client)))), + }, + Test { + name: "metered_priority_fee".to_string(), + description: Some("Test base_meteredPriorityFeePerGas endpoint".to_string()), + run: Box::new(|client| Box::pin(test_metered_priority_fee(client))), + skip_if: Some(Box::new(|client| Box::pin(check_metering_support(client)))), + }, + ], + } +} + +async fn test_meter_bundle_empty(client: &TestClient) -> Result<()> { + let bundle = MeterBundleRequest::new(vec![], 0); + + let response = client.meter_bundle(bundle).await?; + + ensure!(response.results.is_empty(), "Empty bundle should have no results"); + ensure!(response.total_gas_used == 0, "Empty bundle should use 0 gas"); + + tracing::debug!(state_block = response.state_block_number, "Metered empty bundle"); + + Ok(()) +} + +async fn test_meter_bundle_state_block(client: &TestClient) -> Result<()> { + let bundle = MeterBundleRequest::new(vec![], 0); + + let response = client.meter_bundle(bundle).await?; + + // state_block_number should be a valid block number + tracing::debug!(state_block = response.state_block_number, "State block from metering"); + + // Verify the state block exists + let block = + client.get_block_by_number(BlockNumberOrTag::Number(response.state_block_number)).await?; + + ensure!(block.is_some(), "State block should exist"); + + Ok(()) +} + +async fn test_metered_priority_fee(client: &TestClient) -> Result<()> { + let bundle = MeterBundleRequest::new(vec![], 0); + + // This might fail if there's no metering cache data - that's expected + match client.metered_priority_fee(bundle).await { + Ok(response) => { + tracing::debug!( + blocks_sampled = response.blocks_sampled, + priority_fee = ?response.priority_fee, + "Got metered priority fee" + ); + Ok(()) + } + Err(e) => { + // Check if this is an expected error (no cache data) + let err_str = format!("{:?}", e); + if err_str.contains("cache") || err_str.contains("empty") || err_str.contains("no data") + { + tracing::warn!("Metered priority fee not available (no cache data): {}", e); + Ok(()) + } else { + Err(e) + } + } + } +} diff --git a/testing/e2e/src/tests/mod.rs b/testing/e2e/src/tests/mod.rs new file mode 100644 index 00000000..0aa6f202 --- /dev/null +++ b/testing/e2e/src/tests/mod.rs @@ -0,0 +1,98 @@ +//! Test definitions for flashblocks e2e testing. + +mod blocks; +mod call; +mod contracts; +mod logs; +mod metering; +mod receipts; +mod subscriptions; + +use std::{future::Future, pin::Pin}; + +use eyre::Result; + +use crate::TestClient; + +/// A test function that takes a client and returns a result. +pub type TestFn = + Box Pin> + Send + '_>> + Send + Sync>; + +/// A skip condition function that returns Some(reason) if the test should be skipped. +pub type SkipFn = Box< + dyn Fn(&TestClient) -> Pin> + Send + '_>> + Send + Sync, +>; + +/// A single test case. +pub struct Test { + /// Test name (used for filtering). + pub name: String, + /// Optional description. + pub description: Option, + /// The test function to run. + pub run: TestFn, + /// Optional skip condition. + pub skip_if: Option, +} + +impl std::fmt::Debug for Test { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Test") + .field("name", &self.name) + .field("description", &self.description) + .finish_non_exhaustive() + } +} + +/// A category of related tests. +#[derive(Debug)] +pub struct TestCategory { + /// Category name. + pub name: String, + /// Optional description. + pub description: Option, + /// Tests in this category. + pub tests: Vec, +} + +/// The complete test suite. +#[derive(Debug)] +pub struct TestSuite { + /// Categories in this test suite. + pub categories: Vec, +} + +/// Helper macro for creating test functions. +#[macro_export] +macro_rules! test_fn { + ($f:expr) => { + Box::new(|client: &$crate::TestClient| Box::pin($f(client))) + }; +} + +/// Helper macro for creating skip functions. +#[macro_export] +macro_rules! skip_fn { + ($f:expr) => { + Some(Box::new( + |client: &$crate::TestClient| -> ::std::pin::Pin< + Box> + Send + '_>, + > { Box::pin($f(client)) }, + ) as $crate::tests::SkipFn) + }; +} + +/// Build the complete test suite. +pub fn build_test_suite() -> TestSuite { + TestSuite { + categories: vec![ + blocks::category(), + call::category(), + receipts::category(), + logs::category(), + subscriptions::category(), + metering::category(), + contracts::category(), + ], + } +} diff --git a/testing/e2e/src/tests/receipts.rs b/testing/e2e/src/tests/receipts.rs new file mode 100644 index 00000000..b03a57bd --- /dev/null +++ b/testing/e2e/src/tests/receipts.rs @@ -0,0 +1,127 @@ +//! Tests for transaction receipts. + +use std::time::Duration; + +use alloy_primitives::{U256, b256}; +use eyre::{Result, ensure}; + +use crate::{ + TestClient, + harness::FlashblockHarness, + tests::{Test, TestCategory}, +}; + +/// Build the receipts test category. +pub(crate) fn category() -> TestCategory { + TestCategory { + name: "receipts".to_string(), + description: Some("Transaction receipt retrieval tests".to_string()), + tests: vec![ + Test { + name: "get_receipt_nonexistent".to_string(), + description: Some("Get receipt for non-existent tx returns None".to_string()), + run: Box::new(|client| Box::pin(test_receipt_nonexistent(client))), + skip_if: None, + }, + Test { + name: "flashblock_receipt".to_string(), + description: Some( + "Send tx and verify receipt available in pending state within same block" + .to_string(), + ), + run: Box::new(|client| Box::pin(test_flashblock_receipt(client))), + skip_if: Some(Box::new(|client| { + Box::pin(async move { skip_if_no_signer_or_recipient(client) }) + })), + }, + ], + } +} + +/// Returns a skip reason if signer or recipient is not configured. +fn skip_if_no_signer_or_recipient(client: &TestClient) -> Option { + if !client.has_signer() { + Some("No PRIVATE_KEY configured".to_string()) + } else if client.recipient().is_none() { + Some("No --recipient configured".to_string()) + } else { + None + } +} + +async fn test_receipt_nonexistent(client: &TestClient) -> Result<()> { + // Use a random hash that shouldn't exist + let fake_hash = b256!("1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef"); + + let receipt = client.get_transaction_receipt(fake_hash).await?; + ensure!(receipt.is_none(), "Receipt for non-existent tx should be None"); + + Ok(()) +} + +/// Test that transaction receipts are available in pending state via flashblocks. +/// +/// This test: +/// 1. Waits for the start of a fresh block (flashblock index 0 or 1) +/// 2. Sends a transaction +/// 3. Waits for it to appear in a flashblock (must be same block) +/// 4. Queries the receipt from pending state +/// 5. Verifies receipt structure +async fn test_flashblock_receipt(client: &TestClient) -> Result<()> { + client.signer_address().ok_or_else(|| eyre::eyre!("No signer"))?; + + let recipient = client.recipient().ok_or_else(|| eyre::eyre!("No recipient configured"))?; + + // Start the flashblock harness (waits for start of a fresh block) + let mut harness = FlashblockHarness::new(client).await?; + let block_number = harness.block_number(); + + // Send transaction + let value = U256::from(1u64); // 1 wei - minimum to detect state change + let (tx_bytes, tx_hash) = client.create_transfer(recipient, value, None).await?; + + tracing::info!(?tx_hash, block = block_number, "Sending transaction"); + client.send_raw_transaction(tx_bytes).await?; + + // Wait for tx to appear in flashblock (fails if block boundary crossed) + harness.wait_for_tx(tx_hash, Duration::from_secs(10)).await?; + + // Query the receipt - should be available in pending state via flashblocks + let receipt = client.get_transaction_receipt(tx_hash).await?; + + ensure!( + receipt.is_some(), + "Receipt should be available in pending state after tx appears in flashblock" + ); + + let receipt = receipt.unwrap(); + + // Verify receipt structure + ensure!( + receipt.inner.transaction_hash == tx_hash, + "Receipt tx hash mismatch: expected {}, got {}", + tx_hash, + receipt.inner.transaction_hash + ); + + ensure!(receipt.inner.inner.status(), "Transaction should have succeeded"); + + tracing::debug!( + tx_hash = ?receipt.inner.transaction_hash, + block_number = ?receipt.inner.block_number, + gas_used = ?receipt.inner.gas_used, + "Got flashblock receipt" + ); + + // Verify we're still in the same block (pending state test is valid) + harness.assert_same_block(block_number)?; + + tracing::info!( + block = block_number, + flashblocks = harness.flashblock_count(), + "Receipt verified in pending state within flashblock window" + ); + + harness.close().await?; + Ok(()) +} diff --git a/testing/e2e/src/tests/subscriptions.rs b/testing/e2e/src/tests/subscriptions.rs new file mode 100644 index 00000000..abdda56e --- /dev/null +++ b/testing/e2e/src/tests/subscriptions.rs @@ -0,0 +1,75 @@ +//! Sanity checks for the flashblocks WebSocket endpoint. +//! +//! These tests verify the flashblocks WebSocket is reachable and streaming correctly. +//! They do NOT test the RPC node - they test the external flashblocks endpoint +//! (--flashblocks-ws-url) which is a prerequisite for the flashblock-aware tests. + +use std::time::Duration; + +use eyre::{Result, ensure}; + +use crate::{ + TestClient, + harness::FlashblocksStream, + tests::{Test, TestCategory}, +}; + +/// Build the sanity checks category. +pub(crate) fn category() -> TestCategory { + TestCategory { + name: "sanity".to_string(), + description: Some( + "Sanity checks for flashblocks WebSocket (tests the endpoint, not the RPC node)" + .to_string(), + ), + tests: vec![ + Test { + name: "flashblocks_ws_connect".to_string(), + description: Some("Verify flashblocks WebSocket is reachable".to_string()), + run: Box::new(|client| Box::pin(test_flashblocks_stream_connect(client))), + skip_if: None, + }, + Test { + name: "flashblocks_ws_receive".to_string(), + description: Some("Verify flashblocks are being streamed".to_string()), + run: Box::new(|client| Box::pin(test_flashblocks_stream_receive(client))), + skip_if: None, + }, + ], + } +} + +/// Test that we can connect to the flashblocks WebSocket stream. +async fn test_flashblocks_stream_connect(client: &TestClient) -> Result<()> { + let stream = FlashblocksStream::connect(&client.flashblocks_ws_url).await?; + + tracing::debug!("Connected to flashblocks stream"); + + stream.close().await?; + + Ok(()) +} + +/// Test that we can receive a flashblock message from the stream. +async fn test_flashblocks_stream_receive(client: &TestClient) -> Result<()> { + let mut stream = FlashblocksStream::connect(&client.flashblocks_ws_url).await?; + + // Wait for a flashblock with timeout + let flashblock = tokio::time::timeout(Duration::from_secs(5), stream.next_flashblock()) + .await + .map_err(|_| eyre::eyre!("Timeout waiting for flashblock"))??; + + tracing::debug!( + block_number = flashblock.metadata.block_number, + index = flashblock.index, + tx_count = flashblock.diff.transactions.len(), + "Received flashblock" + ); + + // Verify basic structure + ensure!(flashblock.metadata.block_number > 0, "Block number should be positive"); + + stream.close().await?; + + Ok(()) +} diff --git a/testing/e2e/src/types.rs b/testing/e2e/src/types.rs new file mode 100644 index 00000000..a0a08018 --- /dev/null +++ b/testing/e2e/src/types.rs @@ -0,0 +1,198 @@ +//! RPC types for flashblocks testing. + +use std::collections::HashMap; + +use alloy_primitives::{Address, B256, Bytes, U256}; +use serde::{Deserialize, Serialize}; + +/// Block type for Optimism network. +pub type OpBlock = alloy_rpc_types_eth::Block; + +/// Request for `base_meterBundle`. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MeterBundleRequest { + /// Transactions to meter. + pub txs: Vec, + /// Block number for state context. + pub block_number: u64, + /// Optional minimum timestamp. + #[serde(skip_serializing_if = "Option::is_none")] + pub min_timestamp: Option, + /// Optional maximum timestamp. + #[serde(skip_serializing_if = "Option::is_none")] + pub max_timestamp: Option, + /// Transaction hashes allowed to revert. + #[serde(default)] + pub reverting_tx_hashes: Vec, + /// Optional replacement UUID. + #[serde(skip_serializing_if = "Option::is_none")] + pub replacement_uuid: Option, + /// Transaction hashes to drop. + #[serde(default)] + pub dropping_tx_hashes: Vec, +} + +impl MeterBundleRequest { + /// Create a new meter bundle request. + pub const fn new(txs: Vec, block_number: u64) -> Self { + Self { + txs, + block_number, + min_timestamp: None, + max_timestamp: None, + reverting_tx_hashes: vec![], + replacement_uuid: None, + dropping_tx_hashes: vec![], + } + } +} + +/// Response from `base_meterBundle`. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct MeterBundleResponse { + /// Per-transaction results. + pub results: Vec, + /// Total gas used by all transactions. + pub total_gas_used: u64, + /// Total execution time in microseconds. + pub total_execution_time_us: u64, + /// State root computation time in microseconds. + pub state_root_time_us: Option, + /// Total time in microseconds. + pub total_time_us: Option, + /// Total gas fees paid. + pub gas_fees: U256, + /// Coinbase balance difference. + pub coinbase_diff: U256, + /// Effective gas price for the bundle. + pub bundle_gas_price: U256, + /// Block number used for state. + pub state_block_number: u64, +} + +/// Per-transaction metering result. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct MeterTxResult { + /// Transaction hash. + pub tx_hash: B256, + /// Sender address. + pub from_address: Address, + /// Recipient address (None for contract creation). + pub to_address: Option
, + /// Gas used by this transaction. + pub gas_used: u64, + /// Effective gas price. + pub gas_price: U256, + /// Gas fees paid. + pub gas_fees: U256, + /// Coinbase balance difference. + pub coinbase_diff: U256, + /// Execution time in microseconds. + pub execution_time_us: u64, + /// Error message if transaction failed. + pub error: Option, +} + +/// Response from `base_meteredPriorityFeePerGas`. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct MeteredPriorityFeeResponse { + /// Estimated priority fee. + pub priority_fee: Option, + /// Resource-based estimates. + pub resource_estimates: Vec, + /// Number of blocks sampled. + pub blocks_sampled: u64, + /// Metering response for the bundle. + pub metering: MeterBundleResponse, +} + +/// Resource-based fee estimate. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ResourceEstimate { + /// Type of resource. + pub resource_type: String, + /// Demand for this resource. + pub demand: u64, + /// Estimated fee for this resource. + pub estimated_fee: Option, +} + +// ============================================================================ +// Flashblocks streaming types +// ============================================================================ + +/// Flashblock message from the public flashblocks WebSocket endpoint. +/// +/// The endpoint sends brotli-compressed JSON messages directly (no eth_subscribe). +/// Index 0 flashblocks have a "base" object; subsequent ones only have "diff". +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Flashblock { + /// Payload ID for this flashblock. + pub payload_id: String, + /// Index within the current block (0, 1, 2, ...). + pub index: u64, + /// Base block info (only present on index=0 flashblocks). + #[serde(default)] + pub base: Option, + /// Diff/delta for this flashblock. + pub diff: FlashblockDiff, + /// Metadata about the flashblock. + pub metadata: FlashblockMetadata, +} + +/// Base block info (only present on index=0 flashblocks). +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct FlashblockBase { + /// Base fee per gas. + pub base_fee_per_gas: String, + /// Block number. + pub block_number: String, + /// Block timestamp. + pub timestamp: String, + /// Fee recipient address. + pub fee_recipient: String, + /// Gas limit. + pub gas_limit: String, + /// Parent block hash. + pub parent_hash: String, +} + +/// Diff/delta for this flashblock. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct FlashblockDiff { + /// State root after this flashblock. + pub state_root: String, + /// Receipts root after this flashblock. + pub receipts_root: String, + /// Logs bloom filter. + pub logs_bloom: String, + /// Cumulative gas used. + pub gas_used: String, + /// Block hash (changes with each flashblock). + pub block_hash: String, + /// RLP-encoded transaction hex strings. + pub transactions: Vec, + /// Withdrawals in this flashblock. + #[serde(default)] + pub withdrawals: Vec, + /// Withdrawals root. + #[serde(default)] + pub withdrawals_root: Option, +} + +/// Metadata about the flashblock. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct FlashblockMetadata { + /// Block number this flashblock belongs to. + pub block_number: u64, + /// New account balances from this flashblock. + #[serde(default)] + pub new_account_balances: HashMap, + /// Receipts indexed by transaction hash. + #[serde(default)] + pub receipts: HashMap, +}