diff --git a/Cargo.lock b/Cargo.lock index aa1b04e7..bf991516 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -173,7 +173,7 @@ version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" dependencies = [ - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -184,7 +184,7 @@ checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" dependencies = [ "anstyle", "once_cell_polyfill", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -739,7 +739,7 @@ dependencies = [ "bitflags", "cexpr", "clang-sys", - "itertools 0.12.1", + "itertools 0.13.0", "proc-macro2", "quote", "regex", @@ -1971,7 +1971,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -2041,6 +2041,7 @@ version = "0.1.0" dependencies = [ "clap", "ethlambda-blockchain", + "ethlambda-ethrex-client", "ethlambda-network-api", "ethlambda-p2p", "ethlambda-rpc", @@ -2068,6 +2069,7 @@ version = "0.1.0" dependencies = [ "datatest-stable 0.3.3", "ethlambda-crypto", + "ethlambda-ethrex-client", "ethlambda-fork-choice", "ethlambda-metrics", "ethlambda-network-api", @@ -2101,6 +2103,21 @@ dependencies = [ "thiserror 2.0.18", ] +[[package]] +name = "ethlambda-ethrex-client" +version = "0.1.0" +dependencies = [ + "ethlambda-types", + "hex", + "jsonwebtoken", + "reqwest", + "serde", + "serde_json", + "thiserror 2.0.18", + "tokio", + "tracing", +] + [[package]] name = "ethlambda-fork-choice" version = "0.1.0" @@ -3632,6 +3649,21 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "jsonwebtoken" +version = "9.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a87cc7a48537badeae96744432de36f4be2b4a34a05a5ef32e9dd8a1c169dde" +dependencies = [ + "base64", + "js-sys", + "pem", + "ring", + "serde", + "serde_json", + "simple_asn1", +] + [[package]] name = "jubjub" version = "0.9.0" @@ -4900,7 +4932,7 @@ source = "git+https://github.com/leanEthereum/leanMultisig.git?rev=2eb4b9d#2eb4b dependencies = [ "itertools 0.14.0", "mt-utils", - "num-bigint 0.3.3", + "num-bigint 0.4.6", "paste", "rand 0.10.0", "rayon", @@ -4916,7 +4948,7 @@ dependencies = [ "itertools 0.14.0", "mt-field", "mt-utils", - "num-bigint 0.3.3", + "num-bigint 0.4.6", "paste", "rand 0.10.0", "rayon", @@ -5164,7 +5196,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -6114,7 +6146,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "27c6023962132f4b30eb4c172c91ce92d933da334c59c23cddee82358ddafb0b" dependencies = [ "anyhow", - "itertools 0.12.1", + "itertools 0.14.0", "proc-macro2", "quote", "syn 2.0.117", @@ -6787,7 +6819,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -7152,6 +7184,18 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3a9fe34e3e7a50316060351f37187a3f546bce95496156754b601a5fa71b76e" +[[package]] +name = "simple_asn1" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d585997b0ac10be3c5ee635f1bab02d512760d14b7c468801ac8a01d9ae5f1d" +dependencies = [ + "num-bigint 0.4.6", + "num-traits", + "thiserror 2.0.18", + "time", +] + [[package]] name = "slab" version = "0.4.12" @@ -7286,7 +7330,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a766e1110788c36f4fa1c2b71b387a7815aa65f88ce0229841826633d93723e" dependencies = [ "libc", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -7615,7 +7659,7 @@ dependencies = [ "getrandom 0.4.2", "once_cell", "rustix", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -8436,7 +8480,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index c407f5e1..9b023fac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ members = [ "crates/common/test-fixtures", "crates/common/types", "crates/net/api", + "crates/net/ethrex-client", "crates/net/p2p", "crates/net/rpc", "crates/storage", @@ -35,6 +36,7 @@ ethlambda-metrics = { path = "crates/common/metrics" } ethlambda-test-fixtures = { path = "crates/common/test-fixtures" } ethlambda-types = { path = "crates/common/types" } ethlambda-network-api = { path = "crates/net/api" } +ethlambda-ethrex-client = { path = "crates/net/ethrex-client" } ethlambda-p2p = { path = "crates/net/p2p" } ethlambda-rpc = { path = "crates/net/rpc" } ethlambda-storage = { path = "crates/storage" } diff --git a/bin/ethlambda/Cargo.toml b/bin/ethlambda/Cargo.toml index e5e22ee9..9ac780eb 100644 --- a/bin/ethlambda/Cargo.toml +++ b/bin/ethlambda/Cargo.toml @@ -7,6 +7,7 @@ license.workspace = true [dependencies] ethlambda-blockchain.workspace = true +ethlambda-ethrex-client.workspace = true ethlambda-network-api.workspace = true ethlambda-p2p.workspace = true ethlambda-types.workspace = true diff --git a/bin/ethlambda/src/main.rs b/bin/ethlambda/src/main.rs index f6d3ca3e..b75e7f0f 100644 --- a/bin/ethlambda/src/main.rs +++ b/bin/ethlambda/src/main.rs @@ -34,6 +34,7 @@ use tracing::{error, info, warn}; use tracing_subscriber::{EnvFilter, Layer, Registry, layer::SubscriberExt}; use ethlambda_blockchain::BlockChain; +use ethlambda_ethrex_client::{ETHLAMBDA_ENGINE_CAPABILITIES, EngineClient, JwtSecret}; use ethlambda_rpc::RpcConfig; use ethlambda_storage::{StorageBackend, Store, backend::RocksDBBackend}; @@ -109,6 +110,17 @@ struct CliOptions { /// Directory for RocksDB storage #[arg(long, default_value = "./data")] data_dir: PathBuf, + /// URL of the ethrex (or other EL) Engine API auth endpoint, e.g. `http://127.0.0.1:8551`. + /// + /// When unset, Engine API integration is disabled and ethlambda runs as + /// a consensus-only node. When set, `--execution-jwt-secret` is required. + #[arg(long, requires = "execution_jwt_secret")] + execution_endpoint: Option, + /// Path to a file containing the 32-byte JWT secret shared with the EL, + /// as a single line of hex (optionally `0x`-prefixed). Same format used + /// by Lighthouse/Teku/Prysm/ethrex. + #[arg(long, requires = "execution_endpoint")] + execution_jwt_secret: Option, } #[tokio::main] @@ -221,7 +233,18 @@ async fn main() -> eyre::Result<()> { // and the API server (which exposes GET/POST admin endpoints). let aggregator = AggregatorController::new(options.is_aggregator); - let blockchain = BlockChain::spawn(store.clone(), validator_keys, aggregator.clone()); + let execution_client = build_execution_client( + options.execution_endpoint.as_deref(), + options.execution_jwt_secret.as_deref(), + ) + .await; + + let blockchain = BlockChain::spawn( + store.clone(), + validator_keys, + aggregator.clone(), + execution_client, + ); // Note: SwarmConfig.is_aggregator is intentionally a plain bool, not the // AggregatorController — subnet subscriptions are decided once here and @@ -542,6 +565,57 @@ fn read_validator_keys( Ok(validator_keys) } +/// Build the optional Engine API client and run the capability handshake. +/// +/// Returns `None` when integration is disabled (neither flag provided). +/// Returns `None` and logs an error when construction or the handshake +/// fails — consensus must keep running regardless of EL state. +async fn build_execution_client( + endpoint: Option<&str>, + jwt_path: Option<&Path>, +) -> Option { + // CLI requires both-or-neither; defensive recheck for clarity. + let (endpoint, jwt_path) = match (endpoint, jwt_path) { + (Some(e), Some(p)) => (e, p), + (None, None) => return None, + _ => { + error!("Both --execution-endpoint and --execution-jwt-secret are required together"); + return None; + } + }; + + let secret = match JwtSecret::from_file(jwt_path) { + Ok(s) => s, + Err(err) => { + error!(path = %jwt_path.display(), %err, "Failed to load JWT secret"); + return None; + } + }; + + let client = match EngineClient::new(endpoint, secret) { + Ok(c) => c, + Err(err) => { + error!(%err, "Failed to construct EngineClient"); + return None; + } + }; + + info!(endpoint, "Engine API integration enabled"); + + match client + .exchange_capabilities(ETHLAMBDA_ENGINE_CAPABILITIES) + .await + { + Ok(caps) => info!(count = caps.len(), "EL capability handshake succeeded"), + Err(err) => warn!( + %err, + "EL capability handshake failed; per-slot FCU calls will still be attempted" + ), + } + + Some(client) +} + fn read_hex_file_bytes(path: impl AsRef) -> Vec { let path = path.as_ref(); let Ok(file_content) = std::fs::read_to_string(path) diff --git a/crates/blockchain/Cargo.toml b/crates/blockchain/Cargo.toml index 65c6ecf2..4ba0a88d 100644 --- a/crates/blockchain/Cargo.toml +++ b/crates/blockchain/Cargo.toml @@ -11,6 +11,7 @@ version.workspace = true autotests = false [dependencies] +ethlambda-ethrex-client.workspace = true ethlambda-network-api.workspace = true ethlambda-storage.workspace = true ethlambda-state-transition.workspace = true diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 28390c3f..cdabb3f4 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -1,6 +1,7 @@ use std::collections::{HashMap, HashSet, VecDeque}; use std::time::{Duration, Instant, SystemTime}; +use ethlambda_ethrex_client::{EngineClient, ForkChoiceState}; use ethlambda_network_api::{BlockChainToP2PRef, InitP2P}; use ethlambda_state_transition::is_proposer; use ethlambda_storage::{ALL_TABLES, Store}; @@ -60,6 +61,7 @@ impl BlockChain { store: Store, validator_keys: HashMap, aggregator: AggregatorController, + execution_client: Option, ) -> BlockChain { metrics::set_is_aggregator(aggregator.is_enabled()); metrics::set_node_sync_status(metrics::SyncStatus::Idle); @@ -74,6 +76,7 @@ impl BlockChain { pending_block_parents: HashMap::new(), current_aggregation: None, last_tick_instant: None, + execution_client, } .start(); let time_until_genesis = (SystemTime::UNIX_EPOCH + Duration::from_secs(genesis_time)) @@ -127,6 +130,17 @@ pub struct BlockChainServer { /// Last tick instant for measuring interval duration. last_tick_instant: Option, + + /// Optional Engine API client to the execution layer (e.g. ethrex). + /// + /// Present only when ethlambda was started with `--execution-endpoint` + /// and `--execution-jwt-secret`. When set, the actor fires + /// `engine_forkchoiceUpdatedV3` at the start of each slot to keep the EL + /// informed of our head/justified/finalized. The schema is currently + /// scaffolding only — Lean blocks do not yet carry execution payloads, + /// so the EL responds `SYNCING` against zeros until a real payload + /// pipeline is wired (see docs/plans/engine-api-integration.md). + execution_client: Option, } impl BlockChainServer { @@ -195,6 +209,45 @@ impl BlockChainServer { metrics::update_safe_target_slot(self.store.safe_target_slot()); // Update head slot metric (head may change when attestations are promoted at intervals 0/4) metrics::update_head_slot(self.store.head_slot()); + + // Notify the execution layer once per slot (interval 0). Fire and + // forget: the EL is informational here, never on the consensus + // critical path. Until Lean blocks carry execution payloads, we + // send all-zero hashes — beacon roots are not EL block hashes, and + // passing them confuses the EL into attempting to sync to garbage. + // Zero is the spec-friendly "unknown head" sentinel; the EL reliably + // replies `SYNCING`, which is the expected scaffold response. + if interval == 0 && self.execution_client.is_some() { + self.notify_execution_layer(); + } + } + + /// Send a zero-valued forkchoice update to the execution layer via + /// `engine_forkchoiceUpdatedV3`. Errors are logged but never propagated — + /// the consensus loop must continue regardless of EL state. + /// + /// Once Lean blocks carry an `executionPayload`, swap `H256::ZERO` for + /// the corresponding EL block hashes derived from the latest known + /// head / safe / finalized blocks. + fn notify_execution_layer(&self) { + let Some(client) = self.execution_client.as_ref() else { + return; + }; + let state = ForkChoiceState { + head_block_hash: H256::ZERO, + safe_block_hash: H256::ZERO, + finalized_block_hash: H256::ZERO, + }; + let client = client.clone(); + tokio::spawn(async move { + match client.forkchoice_updated_v3(state, None).await { + Ok(resp) => trace!( + status = ?resp.payload_status.status, + "engine_forkchoiceUpdatedV3 ok" + ), + Err(err) => warn!(%err, "engine_forkchoiceUpdatedV3 failed"), + } + }); } /// Kick off a committee-signature aggregation session: diff --git a/crates/net/ethrex-client/Cargo.toml b/crates/net/ethrex-client/Cargo.toml new file mode 100644 index 00000000..98b853ae --- /dev/null +++ b/crates/net/ethrex-client/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "ethlambda-ethrex-client" +authors.workspace = true +edition.workspace = true +keywords.workspace = true +license.workspace = true +readme.workspace = true +repository.workspace = true +rust-version.workspace = true +version.workspace = true + +[dependencies] +ethlambda-types.workspace = true +reqwest.workspace = true +serde.workspace = true +serde_json.workspace = true +thiserror.workspace = true +tokio.workspace = true +tracing.workspace = true +hex.workspace = true +jsonwebtoken = "9.3" + +[dev-dependencies] +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } diff --git a/crates/net/ethrex-client/examples/smoke.rs b/crates/net/ethrex-client/examples/smoke.rs new file mode 100644 index 00000000..b9f61ebc --- /dev/null +++ b/crates/net/ethrex-client/examples/smoke.rs @@ -0,0 +1,112 @@ +//! Live smoke test against a running EL (e.g. ethrex). +//! +//! Two modes: +//! +//! # one-shot +//! cargo run -p ethlambda-ethrex-client --example smoke -- \ +//! +//! +//! # slot-cadence loop (4s/slot, matches ethlambda's tick interval) +//! cargo run -p ethlambda-ethrex-client --example smoke -- \ +//! --loop +//! +//! The loop mode mirrors exactly what `BlockChainServer::on_tick` does at +//! interval 0 of every slot: build a `ForkChoiceState` and call +//! `engine_forkchoiceUpdatedV3`. Useful for end-to-end demos when a full +//! consensus run is overkill. + +use std::time::Duration; + +use ethlambda_ethrex_client::{ + ETHLAMBDA_ENGINE_CAPABILITIES, EngineClient, ForkChoiceState, JwtSecret, +}; +use ethlambda_types::primitives::H256; + +const SLOT_DURATION: Duration = Duration::from_secs(4); + +#[tokio::main] +async fn main() -> Result<(), Box> { + let mut args = std::env::args().skip(1); + let url = args + .next() + .expect("usage: smoke [--loop ]"); + let jwt_path = args + .next() + .expect("usage: smoke [--loop ]"); + let slot_count: Option = match (args.next(), args.next()) { + (Some(ref flag), Some(n)) if flag == "--loop" => Some(n.parse()?), + (None, None) => None, + _ => { + eprintln!("usage: smoke [--loop ]"); + std::process::exit(2); + } + }; + + let secret = JwtSecret::from_file(&jwt_path)?; + let client = EngineClient::new(url, secret)?; + + println!("--- engine_exchangeCapabilities"); + let caps = client + .exchange_capabilities(ETHLAMBDA_ENGINE_CAPABILITIES) + .await?; + println!( + "EL advertises {} capabilities (showing first 6):", + caps.len() + ); + for c in caps.iter().take(6) { + println!(" {c}"); + } + + let Some(slots) = slot_count else { + println!("\n--- engine_forkchoiceUpdatedV3 (one-shot, zeros)"); + let resp = client.forkchoice_updated_v3(zero_state(), None).await?; + println!("status = {:?}", resp.payload_status.status); + println!("payloadId = {:?}", resp.payload_id); + return Ok(()); + }; + + println!("\n--- engine_forkchoiceUpdatedV3 loop ({slots} slots @ 4s/slot)"); + for slot in 0..slots { + let started = std::time::Instant::now(); + // Distinct head per slot so each call carries new data, exactly as + // a real consensus run would (head_root changes on block import). + let state = ForkChoiceState { + head_block_hash: derive_root(b"head", slot), + safe_block_hash: derive_root(b"safe", slot), + finalized_block_hash: derive_root(b"final", slot), + }; + let label = format!("slot={slot:>3}"); + match client.forkchoice_updated_v3(state, None).await { + Ok(resp) => println!( + "{label} engine_forkchoiceUpdatedV3 -> {:?} (latency {:?})", + resp.payload_status.status, + started.elapsed() + ), + Err(err) => println!("{label} engine_forkchoiceUpdatedV3 FAILED: {err}"), + } + if slot + 1 < slots { + tokio::time::sleep(SLOT_DURATION.saturating_sub(started.elapsed())).await; + } + } + + Ok(()) +} + +fn zero_state() -> ForkChoiceState { + ForkChoiceState { + head_block_hash: H256::ZERO, + safe_block_hash: H256::ZERO, + finalized_block_hash: H256::ZERO, + } +} + +/// Hash-free pseudo-root derivation: just splat the slot number into the +/// 32-byte buffer prefixed by a domain tag. Real consensus uses +/// `hash_tree_root(Block)` — here we just want distinct values per slot. +fn derive_root(tag: &[u8], slot: u32) -> H256 { + let mut out = [0u8; 32]; + let tag = &tag[..tag.len().min(8)]; + out[..tag.len()].copy_from_slice(tag); + out[28..].copy_from_slice(&slot.to_be_bytes()); + H256(out) +} diff --git a/crates/net/ethrex-client/src/auth.rs b/crates/net/ethrex-client/src/auth.rs new file mode 100644 index 00000000..0fa29a9c --- /dev/null +++ b/crates/net/ethrex-client/src/auth.rs @@ -0,0 +1,140 @@ +//! Engine API JWT authentication. +//! +//! Per the execution-apis spec, every request to the auth RPC endpoint +//! must carry a fresh `Authorization: Bearer ` header. The token is +//! a JWT signed with HS256 using a 32-byte secret shared out of band +//! between CL and EL. +//! +//! Token claims: +//! - `iat` (issued-at, seconds since Unix epoch). EL accepts a window of +//! ±60s around its own clock. +//! +//! Secret format follows the convention shared by Lighthouse/Teku/Prysm/ +//! ethrex: a single-line hex string (optionally `0x`-prefixed) in a file. + +use std::path::Path; + +use jsonwebtoken::{EncodingKey, Header, encode}; +use serde::{Deserialize, Serialize}; + +/// A 32-byte shared secret used for HS256 token signing. +#[derive(Debug, Clone)] +pub struct JwtSecret { + bytes: Vec, +} + +#[derive(Debug, thiserror::Error)] +pub enum JwtSecretError { + #[error("failed to read JWT secret from {path}: {source}")] + Io { + path: String, + #[source] + source: std::io::Error, + }, + #[error("JWT secret hex decode failed: {0}")] + Hex(#[from] hex::FromHexError), + #[error("JWT secret must decode to 32 bytes (got {0})")] + WrongLength(usize), + #[error("failed to encode JWT: {0}")] + Jwt(#[from] jsonwebtoken::errors::Error), + #[error("system clock is before Unix epoch")] + ClockBeforeEpoch, +} + +#[derive(Debug, Serialize, Deserialize)] +struct Claims { + /// Issued-at (Unix seconds). + iat: u64, +} + +impl JwtSecret { + /// Construct from raw bytes; must be exactly 32 bytes. + pub fn from_bytes(bytes: Vec) -> Result { + if bytes.len() != 32 { + return Err(JwtSecretError::WrongLength(bytes.len())); + } + Ok(Self { bytes }) + } + + /// Parse from a hex string (with or without `0x` prefix). + pub fn from_hex(hex_str: &str) -> Result { + let trimmed = hex_str.trim(); + let stripped = trimmed.strip_prefix("0x").unwrap_or(trimmed); + let bytes = hex::decode(stripped)?; + Self::from_bytes(bytes) + } + + /// Read a hex-encoded secret from a file path. + pub fn from_file(path: impl AsRef) -> Result { + let path_ref = path.as_ref(); + let contents = std::fs::read_to_string(path_ref).map_err(|source| JwtSecretError::Io { + path: path_ref.display().to_string(), + source, + })?; + Self::from_hex(&contents) + } + + /// Generate a fresh bearer token signed with this secret and the given + /// issued-at time (seconds since the Unix epoch). Token is valid for + /// ~60s on the EL side. + pub fn sign(&self, iat_secs: u64) -> Result { + let claims = Claims { iat: iat_secs }; + let token = encode( + &Header::default(), + &claims, + &EncodingKey::from_secret(&self.bytes), + )?; + Ok(token) + } + + /// Generate a bearer token using the current system clock. + pub fn sign_now(&self) -> Result { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map_err(|_| JwtSecretError::ClockBeforeEpoch)? + .as_secs(); + self.sign(now) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + const SAMPLE_HEX: &str = "0x0102030405060708091011121314151617181920212223242526272829303132"; + + #[test] + fn parses_hex_with_and_without_prefix() { + let with = JwtSecret::from_hex(SAMPLE_HEX).unwrap(); + let without = JwtSecret::from_hex(SAMPLE_HEX.strip_prefix("0x").unwrap()).unwrap(); + assert_eq!(with.bytes, without.bytes); + assert_eq!(with.bytes.len(), 32); + } + + #[test] + fn rejects_wrong_length() { + let short = "0x010203"; + assert!(matches!( + JwtSecret::from_hex(short), + Err(JwtSecretError::WrongLength(_)) + )); + } + + #[test] + fn sign_is_deterministic_for_fixed_iat() { + let secret = JwtSecret::from_hex(SAMPLE_HEX).unwrap(); + let a = secret.sign(1_700_000_000).unwrap(); + let b = secret.sign(1_700_000_000).unwrap(); + assert_eq!(a, b); + // Header.Payload.Signature + assert_eq!(a.matches('.').count(), 2); + } + + #[test] + fn sign_differs_for_different_iat() { + let secret = JwtSecret::from_hex(SAMPLE_HEX).unwrap(); + let a = secret.sign(1_700_000_000).unwrap(); + let b = secret.sign(1_700_000_001).unwrap(); + assert_ne!(a, b); + } +} diff --git a/crates/net/ethrex-client/src/client.rs b/crates/net/ethrex-client/src/client.rs new file mode 100644 index 00000000..16867693 --- /dev/null +++ b/crates/net/ethrex-client/src/client.rs @@ -0,0 +1,216 @@ +//! `EngineClient` — typed wrapper around the engine_* JSON-RPC methods. +//! +//! Single `reqwest::Client` instance per `EngineClient`, mints a fresh JWT +//! per request (cheap — HMAC-SHA256 over ~70 bytes). + +use std::time::Duration; + +use serde::{Deserialize, Serialize}; +use serde_json::{Value, json}; +use tracing::{debug, trace}; + +use crate::{ + auth::JwtSecret, + error::EngineClientError, + types::{ + ExecutionPayloadV3, ForkChoiceState, ForkChoiceUpdatedResponse, PayloadAttributesV3, + PayloadId, PayloadStatus, + }, +}; + +const DEFAULT_TIMEOUT: Duration = Duration::from_secs(8); + +#[derive(Debug, Clone)] +pub struct EngineClient { + http: reqwest::Client, + url: String, + secret: JwtSecret, +} + +impl EngineClient { + /// Build a client targeting `url` (e.g. `http://127.0.0.1:8551`) with + /// the given shared secret. + pub fn new(url: impl Into, secret: JwtSecret) -> Result { + let http = reqwest::Client::builder() + .timeout(DEFAULT_TIMEOUT) + .build()?; + Ok(Self { + http, + url: url.into(), + secret, + }) + } + + /// Build a client with a caller-supplied `reqwest::Client` (lets the + /// caller plug in a custom timeout / connector). Useful for tests. + pub fn with_http_client( + url: impl Into, + secret: JwtSecret, + http: reqwest::Client, + ) -> Self { + Self { + http, + url: url.into(), + secret, + } + } + + /// Endpoint URL this client targets. + pub fn endpoint(&self) -> &str { + &self.url + } + + async fn rpc_call( + &self, + method: &str, + params: Value, + ) -> Result { + let token = self.secret.sign_now()?; + let body = JsonRpcRequest { + jsonrpc: "2.0", + id: 1, + method, + params, + }; + let body_str = serde_json::to_string(&body).map_err(EngineClientError::SerializeRequest)?; + trace!(method, body = %body_str, "engine RPC request"); + + let raw = self + .http + .post(&self.url) + .bearer_auth(&token) + .header("content-type", "application/json") + .body(body_str) + .send() + .await? + .error_for_status()? + .text() + .await?; + trace!(method, response = %raw, "engine RPC response"); + + let envelope: JsonRpcEnvelope = + serde_json::from_str(&raw).map_err(EngineClientError::DeserializeResponse)?; + if let Some(err) = envelope.error { + return Err(EngineClientError::Rpc { + code: err.code, + message: err.message, + data: err.data, + }); + } + let result = envelope.result.ok_or(EngineClientError::EmptyResponse)?; + serde_json::from_value(result).map_err(EngineClientError::DeserializeResponse) + } + + /// `engine_exchangeCapabilities` — sent at startup. Returns the + /// intersection of what we advertise and what the EL supports. + pub async fn exchange_capabilities( + &self, + our_capabilities: &[&str], + ) -> Result, EngineClientError> { + let params = json!([our_capabilities]); + let caps: Vec = self.rpc_call("engine_exchangeCapabilities", params).await?; + debug!(count = caps.len(), "received EL capabilities"); + Ok(caps) + } + + /// `engine_forkchoiceUpdatedV3` — head/safe/finalized update, with + /// optional payload attributes to request a build. + pub async fn forkchoice_updated_v3( + &self, + state: ForkChoiceState, + payload_attributes: Option, + ) -> Result { + let params = json!([state, payload_attributes]); + self.rpc_call("engine_forkchoiceUpdatedV3", params).await + } + + /// `engine_newPayloadV3` — submit a Cancun-era payload to the EL. + pub async fn new_payload_v3( + &self, + payload: ExecutionPayloadV3, + expected_blob_versioned_hashes: Vec, + parent_beacon_block_root: ethlambda_types::primitives::H256, + ) -> Result { + let params = json!([ + payload, + expected_blob_versioned_hashes, + parent_beacon_block_root + ]); + self.rpc_call("engine_newPayloadV3", params).await + } + + /// `engine_getPayloadV3` — fetch a payload built under a previously + /// returned `payload_id`. + pub async fn get_payload_v3(&self, payload_id: PayloadId) -> Result { + // Returns a tagged blob containing `executionPayload`, `blockValue`, + // `blobsBundle`, `shouldOverrideBuilder`. We surface the raw JSON + // until block-import path needs to consume it. + let params = json!([payload_id.to_hex()]); + self.rpc_call("engine_getPayloadV3", params).await + } + + /// `engine_getClientVersionV1` — used for diagnostics in startup logs. + pub async fn get_client_version_v1(&self) -> Result { + let our = json!({ + "code": "EL", + "name": "ethlambda", + "version": "0", + "commit": "0x00000000", + }); + self.rpc_call("engine_getClientVersionV1", json!([our])) + .await + } +} + +// ---------- JSON-RPC envelope ---------- + +#[derive(Serialize)] +struct JsonRpcRequest<'a> { + jsonrpc: &'static str, + id: u64, + method: &'a str, + params: Value, +} + +#[derive(Deserialize)] +struct JsonRpcEnvelope { + #[serde(default)] + result: Option, + #[serde(default)] + error: Option, +} + +#[derive(Deserialize)] +struct JsonRpcError { + code: i64, + message: String, + #[serde(default)] + data: Option, +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::auth::JwtSecret; + + fn fake_secret() -> JwtSecret { + JwtSecret::from_bytes(vec![7u8; 32]).unwrap() + } + + #[test] + fn client_builds_with_url() { + let c = EngineClient::new("http://127.0.0.1:8551", fake_secret()).unwrap(); + assert_eq!(c.endpoint(), "http://127.0.0.1:8551"); + } + + #[tokio::test] + async fn transport_error_surfaced_when_no_server() { + // Unbound localhost port — connection should fail. + let c = EngineClient::new("http://127.0.0.1:1", fake_secret()).unwrap(); + let err = c + .exchange_capabilities(crate::ETHLAMBDA_ENGINE_CAPABILITIES) + .await + .unwrap_err(); + assert!(matches!(err, EngineClientError::Transport(_))); + } +} diff --git a/crates/net/ethrex-client/src/error.rs b/crates/net/ethrex-client/src/error.rs new file mode 100644 index 00000000..95240930 --- /dev/null +++ b/crates/net/ethrex-client/src/error.rs @@ -0,0 +1,26 @@ +use crate::auth::JwtSecretError; + +#[derive(Debug, thiserror::Error)] +pub enum EngineClientError { + #[error("JWT auth error: {0}")] + Auth(#[from] JwtSecretError), + + #[error("HTTP transport error: {0}")] + Transport(#[from] reqwest::Error), + + #[error("failed to serialize request: {0}")] + SerializeRequest(serde_json::Error), + + #[error("failed to deserialize response: {0}")] + DeserializeResponse(serde_json::Error), + + #[error("EL returned RPC error {code} ({message})")] + Rpc { + code: i64, + message: String, + data: Option, + }, + + #[error("EL response missing both `result` and `error` fields")] + EmptyResponse, +} diff --git a/crates/net/ethrex-client/src/lib.rs b/crates/net/ethrex-client/src/lib.rs new file mode 100644 index 00000000..b7d908b0 --- /dev/null +++ b/crates/net/ethrex-client/src/lib.rs @@ -0,0 +1,40 @@ +//! JSON-RPC client for the Ethereum Engine API, scoped to ethlambda's +//! integration with the ethrex execution client. +//! +//! Speaks HS256-JWT-authenticated JSON-RPC against an ethrex auth port +//! (default `:8551`). Exposes typed wrappers for the four engine methods +//! ethlambda currently uses: +//! +//! - `engine_exchangeCapabilities` (startup handshake) +//! - `engine_forkchoiceUpdatedV3` (per-tick head/safe/finalized update) +//! - `engine_newPayloadV3` (block import — not wired in the M4 milestone) +//! - `engine_getPayloadV3` (block proposal — not wired in the M4 milestone) +//! +//! The schema mirrors the mainline execution-apis spec; we re-derive it +//! locally instead of depending on ethrex's RPC crate because ethrex is a +//! sibling project, not an upstream library. + +pub mod auth; +pub mod client; +pub mod error; +pub mod types; + +pub use auth::{JwtSecret, JwtSecretError}; +pub use client::EngineClient; +pub use error::EngineClientError; +pub use types::{ + ExecutionPayloadV3, ForkChoiceState, ForkChoiceUpdatedResponse, PayloadAttributesV3, PayloadId, + PayloadStatus, PayloadStatusKind, +}; + +/// Capabilities ethlambda advertises in `engine_exchangeCapabilities`. +/// +/// We list everything we *might* call; the EL's response is the source of +/// truth for what we can actually invoke. Today only V3 is exercised. +pub const ETHLAMBDA_ENGINE_CAPABILITIES: &[&str] = &[ + "engine_exchangeCapabilities", + "engine_forkchoiceUpdatedV3", + "engine_newPayloadV3", + "engine_getPayloadV3", + "engine_getClientVersionV1", +]; diff --git a/crates/net/ethrex-client/src/types.rs b/crates/net/ethrex-client/src/types.rs new file mode 100644 index 00000000..7854c988 --- /dev/null +++ b/crates/net/ethrex-client/src/types.rs @@ -0,0 +1,383 @@ +//! Engine API V3 wire types. +//! +//! Field names + hex encodings match the canonical execution-apis schema +//! so JSON wire format is identical to lighthouse/teku/prysm/ethrex. +//! +//! Only the V3 (Cancun) subset is defined here. V1/V2 are unused by Lean; +//! V4/V5 (Prague+) will be added when needed. + +use ethlambda_types::primitives::H256; +use serde::{Deserialize, Serialize}; + +/// `engine_forkchoiceUpdated` head/safe/finalized triplet. +/// +/// All hashes are *execution-layer* block hashes. For ethlambda's M4 +/// scaffold, we pass zeros for all three; the EL responds `SYNCING`. +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ForkChoiceState { + pub head_block_hash: H256, + pub safe_block_hash: H256, + pub finalized_block_hash: H256, +} + +/// Optional attributes that tell the EL to start building a payload. +/// +/// V3 = Cancun (introduces blob-related fields on the resulting payload but +/// the attributes themselves keep the V2 shape plus `parent_beacon_block_root`). +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct PayloadAttributesV3 { + /// Unix seconds the EL should stamp on the produced block. + #[serde(with = "hex_u64")] + pub timestamp: u64, + pub prev_randao: H256, + #[serde(with = "hex_address")] + pub suggested_fee_recipient: [u8; 20], + pub withdrawals: Vec, + pub parent_beacon_block_root: H256, +} + +/// EIP-4895 withdrawal record carried in payload attributes. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Withdrawal { + #[serde(with = "hex_u64")] + pub index: u64, + #[serde(with = "hex_u64")] + pub validator_index: u64, + #[serde(with = "hex_address")] + pub address: [u8; 20], + #[serde(with = "hex_u64")] + pub amount: u64, +} + +/// Opaque identifier returned by FCU when payload building was requested. +/// +/// 8 bytes on the wire as a hex `DATA` string (`0x` + 16 hex digits), per +/// the execution-apis spec. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct PayloadId(pub [u8; 8]); + +impl PayloadId { + pub fn to_hex(&self) -> String { + format!("0x{}", hex::encode(self.0)) + } +} + +impl Serialize for PayloadId { + fn serialize(&self, ser: S) -> Result { + ser.serialize_str(&self.to_hex()) + } +} + +impl<'de> Deserialize<'de> for PayloadId { + fn deserialize>(de: D) -> Result { + let s = String::deserialize(de)?; + let stripped = s.strip_prefix("0x").unwrap_or(&s); + let bytes = hex::decode(stripped).map_err(serde::de::Error::custom)?; + if bytes.len() != 8 { + return Err(serde::de::Error::custom(format!( + "PayloadId expected 8 bytes, got {}", + bytes.len() + ))); + } + let mut out = [0u8; 8]; + out.copy_from_slice(&bytes); + Ok(Self(out)) + } +} + +/// EL's verdict on a payload or forkchoice update. +/// +/// `SCREAMING_SNAKE_CASE` matches the canonical spec values +/// (`VALID`, `INVALID`, `SYNCING`, `ACCEPTED`, `INVALID_BLOCK_HASH`). +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "SCREAMING_SNAKE_CASE")] +pub enum PayloadStatusKind { + Valid, + Invalid, + Syncing, + Accepted, + InvalidBlockHash, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct PayloadStatus { + pub status: PayloadStatusKind, + pub latest_valid_hash: Option, + pub validation_error: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ForkChoiceUpdatedResponse { + pub payload_status: PayloadStatus, + pub payload_id: Option, +} + +/// `ExecutionPayloadV3` — Cancun-era payload shape. +/// +/// Not consumed by M4 (the FCU-on-tick scaffold) but defined so that the +/// `engine_newPayloadV3` / `engine_getPayloadV3` wrappers compile against +/// the right schema for later milestones. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ExecutionPayloadV3 { + pub parent_hash: H256, + #[serde(with = "hex_address")] + pub fee_recipient: [u8; 20], + pub state_root: H256, + pub receipts_root: H256, + #[serde(with = "hex_bytes")] + pub logs_bloom: Vec, + pub prev_randao: H256, + #[serde(with = "hex_u64")] + pub block_number: u64, + #[serde(with = "hex_u64")] + pub gas_limit: u64, + #[serde(with = "hex_u64")] + pub gas_used: u64, + #[serde(with = "hex_u64")] + pub timestamp: u64, + #[serde(with = "hex_bytes")] + pub extra_data: Vec, + #[serde(with = "hex_u256")] + pub base_fee_per_gas: [u8; 32], + pub block_hash: H256, + pub transactions: Vec, + pub withdrawals: Vec, + #[serde(with = "hex_u64")] + pub blob_gas_used: u64, + #[serde(with = "hex_u64")] + pub excess_blob_gas: u64, +} + +/// Hex-encoded byte string wrapper for typed `Vec` fields +/// (the spec encodes each transaction as a `DATA` string). +#[derive(Debug, Clone)] +pub struct HexBytes(pub Vec); + +impl Serialize for HexBytes { + fn serialize(&self, ser: S) -> Result { + ser.serialize_str(&format!("0x{}", hex::encode(&self.0))) + } +} + +impl<'de> Deserialize<'de> for HexBytes { + fn deserialize>(de: D) -> Result { + let s = String::deserialize(de)?; + let stripped = s.strip_prefix("0x").unwrap_or(&s); + hex::decode(stripped) + .map(HexBytes) + .map_err(serde::de::Error::custom) + } +} + +// ---------- Hex serde helpers ---------- + +mod hex_u64 { + use serde::{Deserialize, Deserializer, Serializer}; + + pub fn serialize(v: &u64, ser: S) -> Result { + ser.serialize_str(&format!("0x{v:x}")) + } + + pub fn deserialize<'de, D: Deserializer<'de>>(de: D) -> Result { + let s = String::deserialize(de)?; + let stripped = s.strip_prefix("0x").unwrap_or(&s); + u64::from_str_radix(stripped, 16).map_err(serde::de::Error::custom) + } +} + +mod hex_bytes { + use serde::{Deserialize, Deserializer, Serializer}; + + pub fn serialize(v: &Vec, ser: S) -> Result { + ser.serialize_str(&format!("0x{}", hex::encode(v))) + } + + pub fn deserialize<'de, D: Deserializer<'de>>(de: D) -> Result, D::Error> { + let s = String::deserialize(de)?; + let stripped = s.strip_prefix("0x").unwrap_or(&s); + hex::decode(stripped).map_err(serde::de::Error::custom) + } +} + +mod hex_u256 { + use serde::{Deserialize, Deserializer, Serializer}; + + pub fn serialize(v: &[u8; 32], ser: S) -> Result { + // Trim leading zero bytes for the canonical `QUANTITY` form. + let first_nonzero = v.iter().position(|b| *b != 0).unwrap_or(31); + let stripped = &v[first_nonzero..]; + let hex_str = hex::encode(stripped); + // Remove leading zero nibble (canonical form has no leading zero in odd-length). + let trimmed = hex_str.trim_start_matches('0'); + let out = if trimmed.is_empty() { "0" } else { trimmed }; + ser.serialize_str(&format!("0x{out}")) + } + + pub fn deserialize<'de, D: Deserializer<'de>>(de: D) -> Result<[u8; 32], D::Error> { + let s = String::deserialize(de)?; + let stripped = s.strip_prefix("0x").unwrap_or(&s); + // Left-pad to 64 hex chars (32 bytes); reject overflow. + if stripped.len() > 64 { + return Err(serde::de::Error::custom(format!( + "u256 hex too long: {} chars (max 64)", + stripped.len() + ))); + } + let padded = format!("{stripped:0>64}"); + let bytes = hex::decode(&padded).map_err(serde::de::Error::custom)?; + let mut out = [0u8; 32]; + out.copy_from_slice(&bytes); + Ok(out) + } +} + +/// 20-byte Ethereum address as a `0x`-prefixed hex `DATA` string. +mod hex_address { + use serde::{Deserialize, Deserializer, Serializer}; + + pub fn serialize(v: &[u8; 20], ser: S) -> Result { + ser.serialize_str(&format!("0x{}", hex::encode(v))) + } + + pub fn deserialize<'de, D: Deserializer<'de>>(de: D) -> Result<[u8; 20], D::Error> { + let s = String::deserialize(de)?; + let stripped = s.strip_prefix("0x").unwrap_or(&s); + let bytes = hex::decode(stripped).map_err(serde::de::Error::custom)?; + if bytes.len() != 20 { + return Err(serde::de::Error::custom(format!( + "address expected 20 bytes, got {}", + bytes.len() + ))); + } + let mut out = [0u8; 20]; + out.copy_from_slice(&bytes); + Ok(out) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn forkchoice_state_roundtrip() { + let original = ForkChoiceState { + head_block_hash: H256([1; 32]), + safe_block_hash: H256([2; 32]), + finalized_block_hash: H256([3; 32]), + }; + let json = serde_json::to_string(&original).unwrap(); + // camelCase + 0x-prefixed hex + assert!(json.contains("headBlockHash")); + assert!(json.contains("finalizedBlockHash")); + let round: ForkChoiceState = serde_json::from_str(&json).unwrap(); + assert_eq!(round.head_block_hash.0, original.head_block_hash.0); + assert_eq!( + round.finalized_block_hash.0, + original.finalized_block_hash.0 + ); + } + + #[test] + fn payload_status_parses_syncing() { + let json = r#"{"status":"SYNCING","latestValidHash":null,"validationError":null}"#; + let parsed: PayloadStatus = serde_json::from_str(json).unwrap(); + assert_eq!(parsed.status, PayloadStatusKind::Syncing); + } + + #[test] + fn fcu_response_with_no_payload_id() { + let json = r#"{"payloadStatus":{"status":"VALID","latestValidHash":"0x0000000000000000000000000000000000000000000000000000000000000000","validationError":null},"payloadId":null}"#; + let parsed: ForkChoiceUpdatedResponse = serde_json::from_str(json).unwrap(); + assert_eq!(parsed.payload_status.status, PayloadStatusKind::Valid); + assert!(parsed.payload_id.is_none()); + } + + #[test] + fn hex_u64_roundtrip() { + #[derive(Serialize, Deserialize)] + struct Wrap { + #[serde(with = "hex_u64")] + n: u64, + } + let s = serde_json::to_string(&Wrap { n: 0xdead_beef }).unwrap(); + assert_eq!(s, r#"{"n":"0xdeadbeef"}"#); + let back: Wrap = serde_json::from_str(&s).unwrap(); + assert_eq!(back.n, 0xdead_beef); + } + + #[test] + fn payload_status_invalid_block_hash_uses_screaming_snake() { + let json = r#"{"status":"INVALID_BLOCK_HASH","latestValidHash":null,"validationError":"bad hash"}"#; + let parsed: PayloadStatus = serde_json::from_str(json).unwrap(); + assert_eq!(parsed.status, PayloadStatusKind::InvalidBlockHash); + let back = serde_json::to_string(&parsed).unwrap(); + assert!( + back.contains(r#""status":"INVALID_BLOCK_HASH""#), + "got: {back}" + ); + } + + #[test] + fn payload_id_is_hex_string_on_wire() { + let id = PayloadId([0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef]); + let json = serde_json::to_string(&id).unwrap(); + assert_eq!(json, r#""0x0123456789abcdef""#); + let back: PayloadId = serde_json::from_str(&json).unwrap(); + assert_eq!(back, id); + } + + #[test] + fn payload_id_rejects_wrong_length() { + // 6 bytes instead of 8. + let err = serde_json::from_str::(r#""0x010203040506""#).unwrap_err(); + assert!(err.to_string().contains("expected 8 bytes")); + } + + #[test] + fn address_serializes_as_hex_data_string() { + #[derive(Serialize, Deserialize)] + struct Wrap { + #[serde(with = "hex_address")] + addr: [u8; 20], + } + let w = Wrap { addr: [0xab; 20] }; + let json = serde_json::to_string(&w).unwrap(); + let expected = format!(r#"{{"addr":"0x{}"}}"#, "ab".repeat(20)); + assert_eq!(json, expected); + let back: Wrap = serde_json::from_str(&json).unwrap(); + assert_eq!(back.addr, w.addr); + } + + #[test] + fn address_rejects_wrong_length() { + #[derive(Debug, Deserialize)] + struct Wrap { + #[serde(with = "hex_address")] + #[allow(dead_code)] + addr: [u8; 20], + } + let err = serde_json::from_str::(r#"{"addr":"0xabcd"}"#).unwrap_err(); + assert!(err.to_string().contains("expected 20 bytes")); + } + + #[test] + fn hex_u256_rejects_overflow_instead_of_panicking() { + #[derive(Debug, Deserialize)] + struct Wrap { + #[serde(with = "hex_u256")] + #[allow(dead_code)] + n: [u8; 32], + } + // 65 hex chars = 33 bytes > 32; must error, not panic. + let too_long = format!(r#"{{"n":"0x{}"}}"#, "a".repeat(65)); + let err = serde_json::from_str::(&too_long).unwrap_err(); + assert!(err.to_string().contains("too long")); + } +} diff --git a/crates/net/ethrex-client/tests/wire_smoke.rs b/crates/net/ethrex-client/tests/wire_smoke.rs new file mode 100644 index 00000000..d3b561d8 --- /dev/null +++ b/crates/net/ethrex-client/tests/wire_smoke.rs @@ -0,0 +1,115 @@ +//! End-to-end wire smoke test. +//! +//! Spawns a minimal HTTP/1.1 server on a random localhost port, has the +//! `EngineClient` call `engine_forkchoiceUpdatedV3` against it, and +//! verifies: +//! - the request body shape (jsonrpc envelope + camelCase params), +//! - the `Authorization: Bearer ` header is present, +//! - the typed `ForkChoiceUpdatedResponse` parses correctly from the +//! `SYNCING` canned reply. +//! +//! No external mock server crate; just `tokio::net::TcpListener` and a +//! hand-rolled HTTP/1.1 response. + +use std::sync::Arc; +use std::sync::Mutex; + +use ethlambda_ethrex_client::{EngineClient, ForkChoiceState, JwtSecret, PayloadStatusKind}; +use ethlambda_types::primitives::H256; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpListener; + +const JWT_HEX: &str = "0x0102030405060708091011121314151617181920212223242526272829303132"; + +#[tokio::test] +async fn forkchoice_updated_v3_round_trip() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let url = format!("http://{addr}"); + + let captured: Arc>> = Arc::new(Mutex::new(None)); + let captured_for_server = captured.clone(); + + tokio::spawn(async move { + let (mut sock, _) = listener.accept().await.unwrap(); + // Read until we have headers + body (request is small). + let mut buf = vec![0u8; 8192]; + let n = sock.read(&mut buf).await.unwrap(); + let raw = String::from_utf8_lossy(&buf[..n]).into_owned(); + *captured_for_server.lock().unwrap() = Some(raw); + + let body = r#"{"jsonrpc":"2.0","id":1,"result":{"payloadStatus":{"status":"SYNCING","latestValidHash":null,"validationError":null},"payloadId":null}}"#; + let resp = format!( + "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}", + body.len(), + body + ); + sock.write_all(resp.as_bytes()).await.unwrap(); + sock.shutdown().await.unwrap(); + }); + + let secret = JwtSecret::from_hex(JWT_HEX).unwrap(); + let client = EngineClient::new(&url, secret).unwrap(); + + let state = ForkChoiceState { + head_block_hash: H256([0xaa; 32]), + safe_block_hash: H256([0xbb; 32]), + finalized_block_hash: H256([0xcc; 32]), + }; + let resp = client + .forkchoice_updated_v3(state, None) + .await + .expect("FCU should succeed against mock"); + assert_eq!(resp.payload_status.status, PayloadStatusKind::Syncing); + assert!(resp.payload_id.is_none()); + + let raw_req = captured.lock().unwrap().clone().expect("request captured"); + let lower = raw_req.to_lowercase(); + assert!( + lower.contains("authorization: bearer "), + "missing JWT header in:\n{raw_req}" + ); + assert!( + raw_req.contains(r#""method":"engine_forkchoiceUpdatedV3""#), + "wrong method name in body: {raw_req}" + ); + assert!(raw_req.contains("headBlockHash"), "params not camelCase"); + assert!( + raw_req.contains("0xaaaaaa"), + "head hash not encoded in body" + ); +} + +#[tokio::test] +async fn rpc_error_surfaces_typed() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let url = format!("http://{addr}"); + + tokio::spawn(async move { + let (mut sock, _) = listener.accept().await.unwrap(); + let mut buf = vec![0u8; 8192]; + let _ = sock.read(&mut buf).await.unwrap(); + let body = r#"{"jsonrpc":"2.0","id":1,"error":{"code":-32700,"message":"parse error"}}"#; + let resp = format!( + "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}", + body.len(), + body + ); + sock.write_all(resp.as_bytes()).await.unwrap(); + sock.shutdown().await.unwrap(); + }); + + let secret = JwtSecret::from_hex(JWT_HEX).unwrap(); + let client = EngineClient::new(&url, secret).unwrap(); + let err = client + .exchange_capabilities(&["engine_forkchoiceUpdatedV3"]) + .await + .unwrap_err(); + let msg = err.to_string(); + assert!(msg.contains("-32700"), "expected RPC code in error: {msg}"); + assert!( + msg.contains("parse error"), + "expected RPC message in error: {msg}" + ); +} diff --git a/docs/plans/engine-api-integration.md b/docs/plans/engine-api-integration.md new file mode 100644 index 00000000..70e37160 --- /dev/null +++ b/docs/plans/engine-api-integration.md @@ -0,0 +1,172 @@ +# Engine API integration: ethlambda ↔ ethrex + +> Plan owner: pablo +> Created: 2026-05-13 +> Status: draft, awaiting scope confirmation + +## Goal + +Integrate ethlambda (Lean consensus client) with ethrex (Ethereum execution +client) over the standard Engine API: JWT-authenticated JSON-RPC on a separate +"auth" port, with `engine_*` methods driving execution-layer fork choice, +payload validation, and payload building. + +## Starting state + +**ethlambda** (this repo): +- Pure consensus, no execution layer awareness. +- `BlockBody` carries `attestations` only — no `execution_payload` field + (`crates/common/types/src/block.rs`). +- `State` carries justification/finalization data but no + `latest_execution_payload_header`. +- No JWT / JSON-RPC client crate. +- Slot duration: 4s, tick intervals 0-4 per slot. + +**ethrex** ([lambdaclass/ethrex](https://github.com/lambdaclass/ethrex)): +- Full mainline Engine API on an auth RPC port: V1-V5 of `engine_newPayload`, + V1-V4 of `engine_forkchoiceUpdated`, V1-V5 of `engine_getPayload`, plus + `engine_exchangeCapabilities` and `engine_getClientVersionV1`. +- JWT HS256 bearer auth (`crates/networking/rpc/authentication.rs`). +- Reference Engine *client* (used when ethrex acts as a rollup sequencer) + in `crates/networking/rpc/clients/auth/mod.rs` — direct template for our + new client crate. +- `PayloadAttributesV4` already includes `slot_number: u64`, friendly to + Lean's slot-driven model. + +**leanSpec**: no execution payload definition. Lean Ethereum consensus does +not currently mandate an EL. This means integration is *additive* — we choose +when to carry/validate payloads. + +## Scope options (the question that needs answering) + +Three plausible interpretations of "integrate": + +| Option | What it means | Effort | Spec dependency | +|---|---|---|---| +| **A. Spike** | ethlambda speaks JWT+JSON-RPC to ethrex. On each tick, fires `engine_forkchoiceUpdated` with the current head/finality hashes (initially dummy `H256::zero()`). Validates JWT plumbing end-to-end. No block-schema changes. | ~1 day | none | +| **B. Scaffold** | Spike + typed Rust wrappers for all four engine methods, CLI flags, capability handshake on startup, observability. Block schema unchanged. Still no real payload flow because blocks have no payload. | ~3-5 days | none | +| **C. Full merge** | Add `execution_payload(_header)` to Lean `BlockBody` + `State`, propagate through STF (call `engine_newPayload` on import, `engine_getPayload` on proposal), require ethrex for consensus validity. | weeks | requires leanSpec proposal — not yet drafted | + +**Recommendation**: do **A → B → wait for spec**. Option C should not be +attempted ahead of a leanSpec change; doing so forks ethlambda from the other +six Lean clients. + +## Architecture (B target) + +### New crate: `crates/net/ethrex-client` + +``` +crates/net/ethrex-client/ +├── Cargo.toml # reqwest (rustls-tls), serde, jsonwebtoken, bytes, eyre/thiserror +└── src/ + ├── lib.rs # public EngineClient API + ├── auth.rs # JWT HS256 generation (iat-based, 60s expiry per spec) + ├── transport.rs # reqwest + bearer + JSON-RPC envelope + ├── methods.rs # engine_exchangeCapabilities / fcu / newPayload / getPayload wrappers + └── types/ # PayloadStatus, ForkChoiceState, ExecutionPayload, PayloadAttributes(V3,V4) + └── ... # ported from ethrex's rpc/types/ — minimal subset, ours own +``` + +Why a separate crate (not in `crates/net/rpc`): rpc crate today serves the +*beacon* HTTP API and the metrics server. Engine API is conceptually a +*client* to a different process, so it belongs in its own crate to keep +dependencies clean (rpc doesn't need `jsonwebtoken`; ethrex-client doesn't +need axum). + +### Types + +We re-derive the mainline Engine API types locally (not depend on +`ethrex_rpc`) — ethrex is a sibling project, not an upstream library. We mirror +field names exactly so JSON wire format is identical. + +Minimal V1 subset to start: +- `ForkChoiceState { head_block_hash, safe_block_hash, finalized_block_hash }` +- `PayloadAttributesV3` (Cancun) and `PayloadAttributesV4` (Prague, with + `slot_number`) — both supported, picked per ethrex's capabilities. +- `ExecutionPayload` (with optional V3/V4 fields) +- `PayloadStatus { status, latest_valid_hash, validation_error }` + +### CLI flags (`bin/ethlambda`) + +| Flag | Default | Purpose | +|---|---|---| +| `--execution-endpoint` | (unset; integration disabled if missing) | URL of ethrex auth RPC, e.g. `http://127.0.0.1:8551` | +| `--execution-jwt-secret` | (unset) | Path to JWT hex secret file (same format ethrex/lighthouse/etc. use) | +| `--execution-fee-recipient` | (unset) | 20-byte hex; required only when proposing | + +Behavior: +- Both unset → integration **disabled**, ethlambda runs as before. +- Both set → instantiate `EngineClient`, run capability handshake on startup + (log mismatches as warnings, not errors), pass client to `BlockChain` actor. +- Capability handshake also fetches `engine_getClientVersionV1` and logs + ethrex name/version for support diagnostics. + +### Blockchain actor hookup (Option B level) + +In `crates/blockchain/src/lib.rs`: +- On each `Tick`, if integration is enabled and tick interval is 0 (block + proposal time): call `engine_forkchoiceUpdated` with our current + `(head, safe, finalized)` hashes mapped onto dummy execution-block hashes + (e.g., `H256::zero()` or `keccak256(beacon_root)` — TBD). +- On block import: log only, no payload flow. + +This is deliberately a no-op for ethrex (the FCU it receives points at hashes +it doesn't know about → it returns `SYNCING`). The point is to exercise the +*wire* end-to-end so the real schema work (Option C) can land without surprises. + +### Observability + +Three new metrics (`ethrex_engine_*` to disambiguate from internal ethlambda +metrics; falls under "Custom Metrics" in `docs/metrics.md`): + +- `lean_ethrex_engine_request_duration_seconds{method}` — histogram +- `lean_ethrex_engine_request_total{method, status}` — counter (`status` ∈ `ok`, `rpc_error`, `transport_error`) +- `lean_ethrex_engine_last_payload_status{}` — int gauge (0=unknown, 1=valid, 2=invalid, 3=syncing, 4=accepted) + +## Milestones + +### M1 — Plan locked + scope decided (TODAY) +Resolve A/B/C with user. Plan stays in `docs/plans/`. + +### M2 — `ethrex-client` crate skeleton (1-2 days, parallelizable) +- New crate compiles in workspace, exports `EngineClient` with all four + methods returning `eyre::Result<_>`, JWT auth implemented and unit-tested + (fixed `iat`, deterministic token). +- Stub integration test against `mockito` (no real ethrex). + +### M3 — Wire into `bin/ethlambda` (1 day) +- CLI flags added, client constructed at startup, capability handshake logged. +- Disabled by default; `make test` unchanged. + +### M4 — FCU on tick (½ day) +- Blockchain actor fires `engine_forkchoiceUpdated` on interval 0 of every + slot when client is configured. Use dummy hashes initially. +- Add metrics. Document expected `SYNCING` response. + +### M5 — End-to-end test against real ethrex (1 day) +- Devnet config wiring ethlambda → local ethrex; verify ethrex logs receive + the FCU and respond. No consensus block changes yet. + +### M6 — *(blocked on leanSpec)* — Real payload flow (Option C) +Out of scope for this plan unless C is selected up front. + +## Open questions + +1. **Genesis EL hash mapping**: when Lean genesis is created, what + execution-block hash do we pin? `H256::zero()` is the simplest convention + but means ethrex must accept ethlambda's FCU pointing at zero. +2. **Multi-EL support** (Lighthouse/Lodestar style): not in M2-M5. Single EL + endpoint only. +3. **JWT secret format**: file vs. inline hex. ethrex/lighthouse/teku all + accept a file containing `0x`-prefixed hex; we follow the same convention. +4. **Slot → timestamp mapping**: ethlambda has `GENESIS_TIME` + slot duration + = 4s. Lean slot 0 timestamp = `GENESIS_TIME`. ethrex `PayloadAttributesV4` + wants Unix `timestamp` + `slot_number`. Both available. + +## References + +- ethrex Engine API: +- ethrex auth client (template): +- ethrex JWT auth: +- Engine API spec: +- Capability list (mainline): `engine_*V1..V5` — see `engine/mod.rs:CAPABILITIES`