diff --git a/bin/debug-trace-server/src/data_provider.rs b/bin/debug-trace-server/src/data_provider.rs index 1ac6d58..9f03450 100644 --- a/bin/debug-trace-server/src/data_provider.rs +++ b/bin/debug-trace-server/src/data_provider.rs @@ -29,7 +29,9 @@ use tokio::sync::broadcast; use tracing::{debug, instrument, trace, warn}; use validator_core::{withdrawals::MptWitness, LightWitness, RpcClient, ValidatorDB}; -use crate::metrics::{ChainSyncMetrics, UpstreamMetrics}; +use crate::metrics::{ + ChainSyncMetrics, DataSourceMetrics, SingleFlightMetrics, UpstreamMetrics, WitnessSourceMetrics, +}; /// Block data bundle containing all information needed for stateless execution. /// @@ -154,6 +156,8 @@ impl DataProvider { elapsed_ms = start.elapsed().as_millis() as u64, "Block data retrieved from local DB" ); + DataSourceMetrics::new_for_source("db").record(); + SingleFlightMetrics::new_for_type("bypassed").record(); self.record_block_distance(data.block.header.number); return Ok(data); } @@ -318,6 +322,7 @@ impl DataProvider { // Subscribe to the existing request let mut receiver = sender.subscribe(); drop(sender); // Release the lock + SingleFlightMetrics::new_for_type("coalesced").record(); trace!( block_hash = %block_hash, "Joining existing in-flight request" @@ -332,6 +337,7 @@ impl DataProvider { // Create a new broadcast channel for this request let (tx, _) = broadcast::channel(1); self.in_flight.insert(block_hash, tx.clone()); + SingleFlightMetrics::new_for_type("new").record(); trace!( block_hash = %block_hash, @@ -374,22 +380,30 @@ impl DataProvider { let block_number = header.number; let fetch_header_ms = start.elapsed().as_millis(); - // Step 2: Fetch witness and full block in parallel - let witness_start = std::time::Instant::now(); - let full_block_start = std::time::Instant::now(); - - let (witness_result, full_block_result) = tokio::join!( - self.fetch_witness_with_fallback(block_number, header.hash), - self.rpc_client.get_block(BlockId::Hash(block_hash.into()), true), + // Step 2: Fetch witness and full block in parallel, timing each independently + let (witness_timed, block_timed) = tokio::join!( + async { + let start = std::time::Instant::now(); + let result = self.fetch_witness_with_fallback(block_number, header.hash).await; + (result, start.elapsed()) + }, + async { + let start = std::time::Instant::now(); + let result = + self.rpc_client.get_block(BlockId::Hash(block_hash.into()), true).await; + (result, start.elapsed()) + }, ); - let fetch_witness_ms = witness_start.elapsed().as_millis(); - upstream_witness.record_request(witness_result.is_ok(), fetch_witness_ms as f64 / 1000.0); + let (witness_result, witness_elapsed) = witness_timed; + let (full_block_result, block_elapsed) = block_timed; + + let fetch_witness_ms = witness_elapsed.as_millis(); + upstream_witness.record_request(witness_result.is_ok(), witness_elapsed.as_secs_f64()); let (salt_witness, _mpt_witness) = witness_result?; - let fetch_full_block_ms = full_block_start.elapsed().as_millis(); - upstream_block - .record_request(full_block_result.is_ok(), fetch_full_block_ms as f64 / 1000.0); + let fetch_full_block_ms = block_elapsed.as_millis(); + upstream_block.record_request(full_block_result.is_ok(), block_elapsed.as_secs_f64()); let block = full_block_result?; // Step 3: Convert SaltWitness to LightWitness @@ -438,8 +452,12 @@ impl DataProvider { block_number: u64, block_hash: B256, ) -> Result<(SaltWitness, MptWitness)> { + let cf_metrics = WitnessSourceMetrics::new_for_source("cloudflare"); + let start = std::time::Instant::now(); match self.rpc_client.get_witness_from_cloudflare(block_number, block_hash).await { Ok(result) => { + cf_metrics.record_request(true, start.elapsed().as_secs_f64()); + cf_metrics.record_size(estimate_witness_size(&result.0, &result.1)); trace!( block_number, block_hash = %block_hash, @@ -448,6 +466,7 @@ impl DataProvider { Ok(result) } Err(e) => { + cf_metrics.record_request(false, start.elapsed().as_secs_f64()); warn!( block_number, block_hash = %block_hash, @@ -482,6 +501,8 @@ impl DataProvider { None => true, // No DB means treat all blocks as "new" (try witness endpoint with retry) }; + let wg_metrics = WitnessSourceMetrics::new_for_source("witness_generator"); + if is_new_block { // Block is newer than DB max: retry witness endpoint until timeout trace!( @@ -490,9 +511,16 @@ impl DataProvider { "Block is new, using retry loop for witness endpoint" ); + let start = std::time::Instant::now(); match self.fetch_witness_with_retry(block_number, block_hash).await { - Ok(result) => Ok(result), + Ok(result) => { + wg_metrics.record_request(true, start.elapsed().as_secs_f64()); + wg_metrics.record_size(estimate_witness_size(&result.0, &result.1)); + DataSourceMetrics::new_for_source("witness_generator").record(); + Ok(result) + } Err(e) => { + wg_metrics.record_request(false, start.elapsed().as_secs_f64()); // Timeout reached, try Cloudflare fallback if self.rpc_client.has_cloudflare_provider() { trace!( @@ -500,7 +528,10 @@ impl DataProvider { %e, "Witness endpoint timeout, falling back to Cloudflare" ); - self.get_witness_from_cloudflare(block_number, block_hash).await + let result = + self.get_witness_from_cloudflare(block_number, block_hash).await?; + DataSourceMetrics::new_for_source("cloudflare").record(); + Ok(result) } else { Err(e) } @@ -514,9 +545,16 @@ impl DataProvider { "Block is old/pruned, single attempt then Cloudflare fallback" ); + let start = std::time::Instant::now(); match self.rpc_client.get_witness(block_number, block_hash).await { - Ok(result) => Ok(result), + Ok(result) => { + wg_metrics.record_request(true, start.elapsed().as_secs_f64()); + wg_metrics.record_size(estimate_witness_size(&result.0, &result.1)); + DataSourceMetrics::new_for_source("witness_generator").record(); + Ok(result) + } Err(e) => { + wg_metrics.record_request(false, start.elapsed().as_secs_f64()); // Single attempt failed, try Cloudflare fallback if self.rpc_client.has_cloudflare_provider() { trace!( @@ -524,7 +562,10 @@ impl DataProvider { %e, "Witness endpoint failed, falling back to Cloudflare" ); - self.get_witness_from_cloudflare(block_number, block_hash).await + let result = + self.get_witness_from_cloudflare(block_number, block_hash).await?; + DataSourceMetrics::new_for_source("cloudflare").record(); + Ok(result) } else { // No Cloudflare configured and witness fetch failed Err(eyre::eyre!("Block witness not found for block {}", block_number)) @@ -663,6 +704,17 @@ impl DataProvider { } } +/// Estimates the total size of witness data in bytes (approximate, avoids serialization). +fn estimate_witness_size(salt: &SaltWitness, mpt: &MptWitness) -> usize { + // SaltKey (8 bytes) + Option (~95 bytes) ≈ 103 bytes per entry + let salt_kvs_size = salt.kvs.len() * 103; + // Proof: commitments (64 bytes each) + IPA proof (~576 bytes) + levels (5 bytes each) + let proof_size = salt.proof.parents_commitments.len() * 64 + 576 + salt.proof.levels.len() * 5; + // MptWitness: storage_root (32 bytes) + sum of state bytes + let mpt_size = 32 + mpt.state.iter().map(|b| b.len()).sum::(); + salt_kvs_size + proof_size + mpt_size +} + #[cfg(test)] mod tests { use super::*; diff --git a/bin/debug-trace-server/src/main.rs b/bin/debug-trace-server/src/main.rs index 5ade8be..52eb6c1 100644 --- a/bin/debug-trace-server/src/main.rs +++ b/bin/debug-trace-server/src/main.rs @@ -540,6 +540,11 @@ async fn history_pruner( let earliest = validator_db.get_earliest_local_block().ok().flatten().map(|(n, _)| n).unwrap_or(0); chain_sync_metrics.set_db_block_range(earliest, current_tip); + + // Update DB file size metric + if let Ok(m) = std::fs::metadata(&db_path) { + chain_sync_metrics.set_db_size(m.len()); + } } tokio::time::sleep(interval).await; diff --git a/bin/debug-trace-server/src/metrics.rs b/bin/debug-trace-server/src/metrics.rs index 90d1f55..db8d1bc 100644 --- a/bin/debug-trace-server/src/metrics.rs +++ b/bin/debug-trace-server/src/metrics.rs @@ -68,7 +68,24 @@ pub const CACHE_TYPE_DEBUG_TRACE: &str = "debug_trace_block"; pub const CACHE_TYPE_TRACE: &str = "trace_block"; // --------------------------------------------------------------------------- -// Metric Structs (using metrics-derive) +// All known RPC methods (for resolving &str → &'static str) +// --------------------------------------------------------------------------- + +const ALL_METHODS: &[&str] = &[ + METHOD_DEBUG_TRACE_BLOCK_BY_NUMBER, + METHOD_DEBUG_TRACE_BLOCK_BY_HASH, + METHOD_DEBUG_TRACE_TRANSACTION, + METHOD_DEBUG_GET_CACHE_STATUS, + METHOD_TRACE_BLOCK, + METHOD_TRACE_TRANSACTION, +]; + +fn resolve_method(method: &str) -> &'static str { + ALL_METHODS.iter().find(|&&m| m == method).copied().unwrap_or("unknown") +} + +// --------------------------------------------------------------------------- +// ── Request Layer ────────────────────────────── // --------------------------------------------------------------------------- /// RPC method metrics with method label. @@ -101,7 +118,7 @@ impl RpcMethodMetrics { } } -/// Global RPC metrics (uses "global" label to distinguish from per-method). +/// Global RPC metrics (singleton). #[derive(Clone, Metrics)] #[metrics(scope = "debug_trace")] pub struct RpcGlobalMetrics { @@ -126,6 +143,30 @@ impl RpcGlobalMetrics { } } +/// Response size metrics with method label. +#[derive(Clone, Metrics)] +#[metrics(scope = "debug_trace")] +pub struct ResponseSizeMetrics { + /// Response size in bytes + response_size_bytes: Histogram, +} + +impl ResponseSizeMetrics { + /// Creates metrics for a specific method. + pub fn new_for_method(method: &'static str) -> Self { + Self::new_with_labels(&[("method", method)]) + } + + /// Records a response size. + pub fn record(&self, size: usize) { + self.response_size_bytes.record(size as f64); + } +} + +// --------------------------------------------------------------------------- +// ── Cache Layer ──────────────────────────────── +// --------------------------------------------------------------------------- + /// Response cache metrics with cache type label. #[derive(Clone, Metrics)] #[metrics(scope = "debug_trace")] @@ -141,7 +182,7 @@ pub struct CacheMetrics { } impl CacheMetrics { - /// Creates metrics for a specific cache type (e.g., "debug_trace_block", "trace_block"). + /// Creates metrics for a specific cache type. pub fn new_for_cache(cache_type: &'static str) -> Self { Self::new_with_labels(&[("type", cache_type)]) } @@ -163,6 +204,50 @@ impl CacheMetrics { } } +// --------------------------------------------------------------------------- +// ── Data Fetch Layer ─────────────────────────── +// --------------------------------------------------------------------------- + +/// Tracks which source provided block data (cache/db/witness_generator/cloudflare). +#[derive(Clone, Metrics)] +#[metrics(scope = "debug_trace")] +pub struct DataSourceMetrics { + /// Total block data fetches by source + block_data_fetches_total: Counter, +} + +impl DataSourceMetrics { + /// Creates metrics for a specific data source. + pub fn new_for_source(source: &'static str) -> Self { + Self::new_with_labels(&[("source", source)]) + } + + /// Records a block data fetch from this source. + pub fn record(&self) { + self.block_data_fetches_total.increment(1); + } +} + +/// Single-flight coalescing metrics (new/coalesced/bypassed). +#[derive(Clone, Metrics)] +#[metrics(scope = "debug_trace")] +pub struct SingleFlightMetrics { + /// Total single-flight events by type + single_flight_total: Counter, +} + +impl SingleFlightMetrics { + /// Creates metrics for a specific single-flight event type. + pub fn new_for_type(event_type: &'static str) -> Self { + Self::new_with_labels(&[("type", event_type)]) + } + + /// Records a single-flight event. + pub fn record(&self) { + self.single_flight_total.increment(1); + } +} + /// Upstream RPC metrics with method label. #[derive(Clone, Metrics)] #[metrics(scope = "debug_trace")] @@ -191,39 +276,77 @@ impl UpstreamMetrics { } } -/// Tracing execution metrics with tracer type label. +// --------------------------------------------------------------------------- +// ── Witness Layer ────────────────────────────── +// --------------------------------------------------------------------------- + +/// Witness fetch metrics by source (witness_generator / cloudflare). #[derive(Clone, Metrics)] #[metrics(scope = "debug_trace")] -pub struct TracingMetrics { - /// Total transactions traced - transactions_traced_total: Counter, - /// Total blocks traced - blocks_traced_total: Counter, - /// Duration of tracing execution in seconds - tracing_duration_seconds: Histogram, +pub struct WitnessSourceMetrics { + /// Total witness fetch requests + witness_requests_total: Counter, + /// Total witness fetch errors + witness_errors_total: Counter, + /// Duration of witness fetch in seconds + witness_duration_seconds: Histogram, + /// Witness response size in bytes + witness_bytes: Histogram, } -impl TracingMetrics { - /// Creates metrics for a specific tracer type (e.g., "geth", "parity"). - pub fn new_for_tracer(tracer: &'static str) -> Self { - Self::new_with_labels(&[("tracer", tracer)]) +impl WitnessSourceMetrics { + /// Creates metrics for a specific witness source. + pub fn new_for_source(source: &'static str) -> Self { + Self::new_with_labels(&[("source", source)]) + } + + /// Records a witness fetch request. + pub fn record_request(&self, success: bool, duration_secs: f64) { + self.witness_requests_total.increment(1); + if !success { + self.witness_errors_total.increment(1); + } + self.witness_duration_seconds.record(duration_secs); } - /// Records a block trace completion. - pub fn record_block(&self, tx_count: usize, duration_secs: f64) { - self.blocks_traced_total.increment(1); - self.transactions_traced_total.increment(tx_count as u64); - self.tracing_duration_seconds.record(duration_secs); + /// Records witness response size. + pub fn record_size(&self, bytes: usize) { + self.witness_bytes.record(bytes as f64); } +} + +// --------------------------------------------------------------------------- +// ── Execution Layer ──────────────────────────── +// --------------------------------------------------------------------------- + +/// EVM execution metrics with method label. +#[derive(Clone, Metrics)] +#[metrics(scope = "debug_trace")] +pub struct EvmExecutionMetrics { + /// EVM execution duration in seconds + evm_execution_seconds: Histogram, + /// Number of transactions per traced block + evm_block_tx_count: Histogram, +} - /// Records a single transaction trace. - pub fn record_transaction(&self, duration_secs: f64) { - self.transactions_traced_total.increment(1); - self.tracing_duration_seconds.record(duration_secs); +impl EvmExecutionMetrics { + /// Creates metrics for a specific method. + pub fn new_for_method(method: &'static str) -> Self { + Self::new_with_labels(&[("method", method)]) + } + + /// Records an EVM execution. + pub fn record(&self, duration_secs: f64, tx_count: usize) { + self.evm_execution_seconds.record(duration_secs); + self.evm_block_tx_count.record(tx_count as f64); } } -/// Chain sync metrics. +// --------------------------------------------------------------------------- +// ── Infrastructure ───────────────────────────── +// --------------------------------------------------------------------------- + +/// Chain sync metrics (singleton). #[derive(Clone, Metrics)] #[metrics(scope = "debug_trace")] pub struct ChainSyncMetrics { @@ -239,6 +362,8 @@ pub struct ChainSyncMetrics { db_earliest_block: Gauge, /// Latest block number in validator DB db_latest_block: Gauge, + /// Database file size in bytes + db_size_bytes: Gauge, } impl ChainSyncMetrics { @@ -272,55 +397,69 @@ impl ChainSyncMetrics { self.db_earliest_block.set(earliest as f64); self.db_latest_block.set(latest as f64); } + + /// Sets the database file size in bytes. + pub fn set_db_size(&self, bytes: u64) { + self.db_size_bytes.set(bytes as f64); + } } // --------------------------------------------------------------------------- // Pre-initialized Metric Instances // --------------------------------------------------------------------------- -/// Collection of all debug-trace-server metrics. -/// Used for pre-registration to ensure metrics are visible before first use. -#[derive(Clone)] -#[allow(dead_code)] -pub struct DebugTraceMetrics { - // RPC method metrics - pub debug_trace_block_by_number: RpcMethodMetrics, - pub debug_trace_block_by_hash: RpcMethodMetrics, - pub debug_trace_transaction: RpcMethodMetrics, - pub trace_block: RpcMethodMetrics, - pub trace_transaction: RpcMethodMetrics, - - // Global RPC metrics - pub rpc_global: RpcGlobalMetrics, - - // Cache metrics by type - pub cache_debug_trace: CacheMetrics, - pub cache_trace: CacheMetrics, - - // Chain sync metrics - pub chain_sync: ChainSyncMetrics, -} - -impl Default for DebugTraceMetrics { - fn default() -> Self { - Self { - debug_trace_block_by_number: RpcMethodMetrics::new_for_method( - METHOD_DEBUG_TRACE_BLOCK_BY_NUMBER, - ), - debug_trace_block_by_hash: RpcMethodMetrics::new_for_method( - METHOD_DEBUG_TRACE_BLOCK_BY_HASH, - ), - debug_trace_transaction: RpcMethodMetrics::new_for_method( - METHOD_DEBUG_TRACE_TRANSACTION, - ), - trace_block: RpcMethodMetrics::new_for_method(METHOD_TRACE_BLOCK), - trace_transaction: RpcMethodMetrics::new_for_method(METHOD_TRACE_TRANSACTION), - rpc_global: RpcGlobalMetrics::create(), - cache_debug_trace: CacheMetrics::new_for_cache(CACHE_TYPE_DEBUG_TRACE), - cache_trace: CacheMetrics::new_for_cache(CACHE_TYPE_TRACE), - chain_sync: ChainSyncMetrics::create(), - } - } +/// Pre-registers all metrics so they appear in Prometheus from startup (with zero values). +fn pre_register_all_metrics() { + // Request Layer: RPC method metrics + let _ = RpcMethodMetrics::new_for_method(METHOD_DEBUG_TRACE_BLOCK_BY_NUMBER); + let _ = RpcMethodMetrics::new_for_method(METHOD_DEBUG_TRACE_BLOCK_BY_HASH); + let _ = RpcMethodMetrics::new_for_method(METHOD_DEBUG_TRACE_TRANSACTION); + let _ = RpcMethodMetrics::new_for_method(METHOD_TRACE_BLOCK); + let _ = RpcMethodMetrics::new_for_method(METHOD_TRACE_TRANSACTION); + + // Request Layer: global + let _ = RpcGlobalMetrics::create(); + + // Request Layer: response size (per method) + let _ = ResponseSizeMetrics::new_for_method(METHOD_DEBUG_TRACE_BLOCK_BY_NUMBER); + let _ = ResponseSizeMetrics::new_for_method(METHOD_DEBUG_TRACE_BLOCK_BY_HASH); + let _ = ResponseSizeMetrics::new_for_method(METHOD_DEBUG_TRACE_TRANSACTION); + let _ = ResponseSizeMetrics::new_for_method(METHOD_TRACE_BLOCK); + let _ = ResponseSizeMetrics::new_for_method(METHOD_TRACE_TRANSACTION); + + // Cache Layer + let _ = CacheMetrics::new_for_cache(CACHE_TYPE_DEBUG_TRACE); + let _ = CacheMetrics::new_for_cache(CACHE_TYPE_TRACE); + + // Data Fetch Layer: data source + let _ = DataSourceMetrics::new_for_source("cache"); + let _ = DataSourceMetrics::new_for_source("db"); + let _ = DataSourceMetrics::new_for_source("witness_generator"); + let _ = DataSourceMetrics::new_for_source("cloudflare"); + + // Data Fetch Layer: single-flight + let _ = SingleFlightMetrics::new_for_type("new"); + let _ = SingleFlightMetrics::new_for_type("coalesced"); + let _ = SingleFlightMetrics::new_for_type("bypassed"); + + // Data Fetch Layer: upstream RPC + let _ = UpstreamMetrics::new_for_method("eth_getHeaderByHash"); + let _ = UpstreamMetrics::new_for_method("eth_getBlockByHash"); + let _ = UpstreamMetrics::new_for_method("mega_getWitness"); + + // Witness Layer + let _ = WitnessSourceMetrics::new_for_source("witness_generator"); + let _ = WitnessSourceMetrics::new_for_source("cloudflare"); + + // Execution Layer (per method) + let _ = EvmExecutionMetrics::new_for_method(METHOD_DEBUG_TRACE_BLOCK_BY_NUMBER); + let _ = EvmExecutionMetrics::new_for_method(METHOD_DEBUG_TRACE_BLOCK_BY_HASH); + let _ = EvmExecutionMetrics::new_for_method(METHOD_DEBUG_TRACE_TRANSACTION); + let _ = EvmExecutionMetrics::new_for_method(METHOD_TRACE_BLOCK); + let _ = EvmExecutionMetrics::new_for_method(METHOD_TRACE_TRANSACTION); + + // Infrastructure + let _ = ChainSyncMetrics::create(); } // --------------------------------------------------------------------------- @@ -328,18 +467,14 @@ impl Default for DebugTraceMetrics { // --------------------------------------------------------------------------- /// Initializes the Prometheus metrics exporter. -/// -/// Starts an HTTP server on the specified address that exposes metrics -/// in Prometheus text format at the `/metrics` endpoint. pub fn init_metrics(addr: SocketAddr) -> Result<()> { PrometheusBuilder::new() .with_http_listener(addr) .install() .map_err(|e| eyre::eyre!("Failed to install metrics exporter: {}", e))?; - // Pre-register all metrics by creating default instances - // This ensures metrics are visible even before first use - let _ = DebugTraceMetrics::default(); + // Pre-register all metrics + pre_register_all_metrics(); Ok(()) } @@ -349,63 +484,20 @@ pub fn init_metrics(addr: SocketAddr) -> Result<()> { // --------------------------------------------------------------------------- /// Strips the `timed_` prefix from a method name if present. -/// Returns the original method name (without prefix) for metrics recording. pub fn strip_timed_prefix(method: &str) -> &str { method.strip_prefix(TIMED_PREFIX).unwrap_or(method) } -/// Records a successful RPC request (backward-compatible helper). -/// -/// Handles both original and `timed_`-prefixed method names by stripping the -/// prefix before recording metrics, so timed variants share the same counters. +/// Records a successful RPC request. pub fn record_rpc_request(method: &str, duration_secs: f64) { - let method = strip_timed_prefix(method); - match method { - METHOD_DEBUG_TRACE_BLOCK_BY_NUMBER => { - RpcMethodMetrics::new_for_method(METHOD_DEBUG_TRACE_BLOCK_BY_NUMBER) - .record_request(duration_secs) - } - METHOD_DEBUG_TRACE_BLOCK_BY_HASH => { - RpcMethodMetrics::new_for_method(METHOD_DEBUG_TRACE_BLOCK_BY_HASH) - .record_request(duration_secs) - } - METHOD_DEBUG_TRACE_TRANSACTION => { - RpcMethodMetrics::new_for_method(METHOD_DEBUG_TRACE_TRANSACTION) - .record_request(duration_secs) - } - METHOD_TRACE_BLOCK => { - RpcMethodMetrics::new_for_method(METHOD_TRACE_BLOCK).record_request(duration_secs) - } - METHOD_TRACE_TRANSACTION => { - RpcMethodMetrics::new_for_method(METHOD_TRACE_TRANSACTION).record_request(duration_secs) - } - _ => { - tracing::warn!(method = method, "Unknown RPC method in metrics"); - } - } + let method = resolve_method(strip_timed_prefix(method)); + RpcMethodMetrics::new_for_method(method).record_request(duration_secs); } -/// Records an RPC error for a specific method (backward-compatible helper). -/// -/// Handles both original and `timed_`-prefixed method names. +/// Records an RPC error for a specific method. pub fn record_rpc_error(method: &str) { - let method = strip_timed_prefix(method); - match method { - METHOD_DEBUG_TRACE_BLOCK_BY_NUMBER => { - RpcMethodMetrics::new_for_method(METHOD_DEBUG_TRACE_BLOCK_BY_NUMBER).record_error() - } - METHOD_DEBUG_TRACE_BLOCK_BY_HASH => { - RpcMethodMetrics::new_for_method(METHOD_DEBUG_TRACE_BLOCK_BY_HASH).record_error() - } - METHOD_DEBUG_TRACE_TRANSACTION => { - RpcMethodMetrics::new_for_method(METHOD_DEBUG_TRACE_TRANSACTION).record_error() - } - METHOD_TRACE_BLOCK => RpcMethodMetrics::new_for_method(METHOD_TRACE_BLOCK).record_error(), - METHOD_TRACE_TRANSACTION => { - RpcMethodMetrics::new_for_method(METHOD_TRACE_TRANSACTION).record_error() - } - _ => {} - } + let method = resolve_method(strip_timed_prefix(method)); + RpcMethodMetrics::new_for_method(method).record_error(); } // --------------------------------------------------------------------------- @@ -423,73 +515,36 @@ mod tests { "debug_traceBlockByNumber" ); assert_eq!(strip_timed_prefix("timed_trace_block"), "trace_block"); - assert_eq!(strip_timed_prefix("timed_trace_transaction"), "trace_transaction"); } #[test] fn test_strip_timed_prefix_without_prefix() { assert_eq!(strip_timed_prefix("debug_traceBlockByNumber"), "debug_traceBlockByNumber"); - assert_eq!(strip_timed_prefix("trace_block"), "trace_block"); assert_eq!(strip_timed_prefix("unknown_method"), "unknown_method"); } #[test] - fn test_strip_timed_prefix_edge_cases() { - assert_eq!(strip_timed_prefix("timed_"), ""); - assert_eq!(strip_timed_prefix(""), ""); - assert_eq!( - strip_timed_prefix("TIMED_debug_traceBlockByNumber"), - "TIMED_debug_traceBlockByNumber" - ); - // Only strips once - assert_eq!(strip_timed_prefix("timed_timed_trace_block"), "timed_trace_block"); + fn test_resolve_method_known() { + assert_eq!(resolve_method("debug_traceBlockByNumber"), "debug_traceBlockByNumber"); + assert_eq!(resolve_method("trace_block"), "trace_block"); + } + + #[test] + fn test_resolve_method_unknown() { + assert_eq!(resolve_method("nonexistent"), "unknown"); } #[test] fn test_timed_aliases_consistency() { - // Every alias must start with the TIMED_PREFIX for &(alias, _original) in TIMED_METHOD_ALIASES { - assert!( - alias.starts_with(TIMED_PREFIX), - "Alias '{}' does not start with '{}'", - alias, - TIMED_PREFIX - ); + assert!(alias.starts_with(TIMED_PREFIX)); } } #[test] fn test_timed_aliases_match_originals() { - // Stripping the prefix from each alias must yield the original method name for &(alias, original) in TIMED_METHOD_ALIASES { - assert_eq!( - strip_timed_prefix(alias), - original, - "Alias '{}' does not map back to '{}'", - alias, - original - ); - } - } - - #[test] - fn test_timed_aliases_cover_all_methods() { - let all_methods = [ - METHOD_DEBUG_TRACE_BLOCK_BY_NUMBER, - METHOD_DEBUG_TRACE_BLOCK_BY_HASH, - METHOD_DEBUG_TRACE_TRANSACTION, - METHOD_DEBUG_GET_CACHE_STATUS, - METHOD_TRACE_BLOCK, - METHOD_TRACE_TRANSACTION, - ]; - let aliased_originals: Vec<&str> = - TIMED_METHOD_ALIASES.iter().map(|&(_, orig)| orig).collect(); - for method in all_methods { - assert!( - aliased_originals.contains(&method), - "Method '{}' has no timed_ alias in TIMED_METHOD_ALIASES", - method - ); + assert_eq!(strip_timed_prefix(alias), original); } } } diff --git a/bin/debug-trace-server/src/response_cache.rs b/bin/debug-trace-server/src/response_cache.rs index 6d4935b..8e56758 100644 --- a/bin/debug-trace-server/src/response_cache.rs +++ b/bin/debug-trace-server/src/response_cache.rs @@ -446,6 +446,7 @@ impl ResponseCache { self.inner.cache.insert(key.clone(), cached); self.inner.indices.insert(block_hash, block_number, key); + self.update_size_metrics(); } /// Gets a cached response or computes it, coalescing concurrent requests for the same key. @@ -487,6 +488,7 @@ impl ResponseCache { let _ = guard.insert(cached); self.inner.indices.insert(block_hash, block_number, key); + self.update_size_metrics(); self.inner.misses.fetch_add(1, Ordering::Relaxed); metrics.record_miss(); @@ -548,6 +550,14 @@ impl ResponseCache { self.inner.cache.weight() } + /// Updates prometheus cache size gauges with current cache state. + fn update_size_metrics(&self) { + let entry_count = self.inner.cache.len(); + let total_bytes = self.inner.cache.weight() as usize; + self.inner.metrics_debug_trace.set_size(entry_count, total_bytes); + self.inner.metrics_trace.set_size(entry_count, total_bytes); + } + /// Returns cache statistics and updates prometheus metrics. pub fn stats(&self) -> CacheStats { let entry_count = self.inner.cache.len(); diff --git a/bin/debug-trace-server/src/rpc_service.rs b/bin/debug-trace-server/src/rpc_service.rs index 29ea3f2..5c0223e 100644 --- a/bin/debug-trace-server/src/rpc_service.rs +++ b/bin/debug-trace-server/src/rpc_service.rs @@ -22,9 +22,9 @@ use validator_core::chain_spec::ChainSpec; use crate::{ data_provider::{BlockData, DataProvider}, metrics::{ - self, RpcGlobalMetrics, TracingMetrics, METHOD_DEBUG_TRACE_BLOCK_BY_HASH, - METHOD_DEBUG_TRACE_BLOCK_BY_NUMBER, METHOD_DEBUG_TRACE_TRANSACTION, METHOD_TRACE_BLOCK, - METHOD_TRACE_TRANSACTION, + self, DataSourceMetrics, EvmExecutionMetrics, ResponseSizeMetrics, RpcGlobalMetrics, + SingleFlightMetrics, METHOD_DEBUG_TRACE_BLOCK_BY_HASH, METHOD_DEBUG_TRACE_BLOCK_BY_NUMBER, + METHOD_DEBUG_TRACE_TRANSACTION, METHOD_TRACE_BLOCK, METHOD_TRACE_TRANSACTION, }, response_cache::{CachedResource, ResponseCache, ResponseVariant}, }; @@ -279,9 +279,9 @@ async fn compute_debug_trace_block( chain_spec: &ChainSpec, data: &BlockData, opts: GethDebugTracingOptions, + method_name: &'static str, ) -> Result { let start = Instant::now(); - let tracing_metrics = TracingMetrics::new_for_tracer("geth"); let results = validator_core::trace_block( chain_spec, @@ -293,13 +293,15 @@ async fn compute_debug_trace_block( .map_err(|e| rpc_err(format!("Trace execution failed: {e}")))?; let trace_ms = start.elapsed().as_millis(); - tracing_metrics.record_block(data.block.transactions.len(), start.elapsed().as_secs_f64()); + EvmExecutionMetrics::new_for_method(method_name) + .record(start.elapsed().as_secs_f64(), data.block.transactions.len()); let value = serde_json::to_value(&results) .map_err(|e| rpc_err(format!("Serialization failed: {e}")))?; let serialize_ms = start.elapsed().as_millis() - trace_ms; let response_size = value.to_string().len(); + ResponseSizeMetrics::new_for_method(method_name).record(response_size); if trace_ms >= SLOW_STAGE_THRESHOLD_MS || serialize_ms >= SLOW_STAGE_THRESHOLD_MS { warn!( @@ -319,9 +321,9 @@ async fn compute_debug_trace_block( async fn compute_parity_trace_block( chain_spec: &ChainSpec, data: &BlockData, + method_name: &'static str, ) -> Result { let start = Instant::now(); - let tracing_metrics = TracingMetrics::new_for_tracer("parity"); let results = validator_core::parity_trace_block( chain_spec, @@ -331,9 +333,14 @@ async fn compute_parity_trace_block( ) .map_err(|e| rpc_err(format!("Trace execution failed: {e}")))?; - tracing_metrics.record_block(data.block.transactions.len(), start.elapsed().as_secs_f64()); + EvmExecutionMetrics::new_for_method(method_name) + .record(start.elapsed().as_secs_f64(), data.block.transactions.len()); + + let value = + serde_json::to_value(results).map_err(|e| rpc_err(format!("Serialization failed: {e}")))?; + ResponseSizeMetrics::new_for_method(method_name).record(value.to_string().len()); - serde_json::to_value(results).map_err(|e| rpc_err(format!("Serialization failed: {e}"))) + Ok(value) } // --------------------------------------------------------------------------- @@ -353,6 +360,8 @@ fn check_cache_by_number( let total_ms = start.elapsed().as_secs_f64() * 1000.0; metrics::record_rpc_request(method_name, total_ms / 1000.0); + DataSourceMetrics::new_for_source("cache").record(); + SingleFlightMetrics::new_for_type("bypassed").record(); trace!( method = method_name, @@ -378,6 +387,8 @@ fn check_cache_by_hash( let total_ms = start.elapsed().as_secs_f64() * 1000.0; metrics::record_rpc_request(method_name, total_ms / 1000.0); + DataSourceMetrics::new_for_source("cache").record(); + SingleFlightMetrics::new_for_type("bypassed").record(); trace!( method = method_name, @@ -460,10 +471,16 @@ impl DebugTraceRpcServer for RpcContext { // Stage 4: Execute trace let t3 = Instant::now(); - let result = - compute_debug_trace_block(&self.chain_spec, &data, opts).await.inspect_err(|_| { - metrics::record_rpc_error(METHOD_DEBUG_TRACE_BLOCK_BY_NUMBER); - })?; + let result = compute_debug_trace_block( + &self.chain_spec, + &data, + opts, + METHOD_DEBUG_TRACE_BLOCK_BY_NUMBER, + ) + .await + .inspect_err(|_| { + metrics::record_rpc_error(METHOD_DEBUG_TRACE_BLOCK_BY_NUMBER); + })?; let trace_ms = t3.elapsed().as_millis(); // Stage 5: Cache result @@ -531,10 +548,16 @@ impl DebugTraceRpcServer for RpcContext { block_data_err_by_hash(block_hash, e) })?; let block_num = data.block.header.number; - let result = - compute_debug_trace_block(&self.chain_spec, &data, opts).await.inspect_err(|_| { - metrics::record_rpc_error(METHOD_DEBUG_TRACE_BLOCK_BY_HASH); - })?; + let result = compute_debug_trace_block( + &self.chain_spec, + &data, + opts, + METHOD_DEBUG_TRACE_BLOCK_BY_HASH, + ) + .await + .inspect_err(|_| { + metrics::record_rpc_error(METHOD_DEBUG_TRACE_BLOCK_BY_HASH); + })?; // Cache and record metrics self.response_cache.insert( @@ -566,6 +589,7 @@ impl DebugTraceRpcServer for RpcContext { tx_data_err(e) })?; + let evm_start = Instant::now(); let result = validator_core::trace_transaction( &self.chain_spec, &data.block, @@ -578,10 +602,11 @@ impl DebugTraceRpcServer for RpcContext { metrics::record_rpc_error(METHOD_DEBUG_TRACE_TRANSACTION); rpc_err(format!("Trace execution failed: {e}")) })?; + EvmExecutionMetrics::new_for_method(METHOD_DEBUG_TRACE_TRANSACTION) + .record(evm_start.elapsed().as_secs_f64(), 1); let elapsed = start.elapsed(); metrics::record_rpc_request(METHOD_DEBUG_TRACE_TRANSACTION, elapsed.as_secs_f64()); - TracingMetrics::new_for_tracer("geth").record_transaction(elapsed.as_secs_f64()); if elapsed > SLOW_REQUEST_THRESHOLD { warn!( @@ -594,7 +619,11 @@ impl DebugTraceRpcServer for RpcContext { ); } - serde_json::to_value(&result).map_err(|e| rpc_err(format!("Serialization failed: {e}"))) + let value = serde_json::to_value(&result) + .map_err(|e| rpc_err(format!("Serialization failed: {e}")))?; + ResponseSizeMetrics::new_for_method(METHOD_DEBUG_TRACE_TRANSACTION) + .record(value.to_string().len()); + Ok(value) } async fn get_cache_status(&self) -> RpcResult { @@ -651,8 +680,9 @@ impl TraceRpcServer for RpcContext { })?; let block_hash = data.block.header.hash; - let result = - compute_parity_trace_block(&self.chain_spec, &data).await.inspect_err(|_| { + let result = compute_parity_trace_block(&self.chain_spec, &data, METHOD_TRACE_BLOCK) + .await + .inspect_err(|_| { metrics::record_rpc_error(METHOD_TRACE_BLOCK); })?; @@ -690,6 +720,7 @@ impl TraceRpcServer for RpcContext { } }; + let evm_start = Instant::now(); let result = validator_core::parity_trace_transaction( &self.chain_spec, &data.block, @@ -701,10 +732,11 @@ impl TraceRpcServer for RpcContext { metrics::record_rpc_error(METHOD_TRACE_TRANSACTION); rpc_err(format!("Trace execution failed: {e}")) })?; + EvmExecutionMetrics::new_for_method(METHOD_TRACE_TRANSACTION) + .record(evm_start.elapsed().as_secs_f64(), 1); let elapsed = start.elapsed(); metrics::record_rpc_request(METHOD_TRACE_TRANSACTION, elapsed.as_secs_f64()); - TracingMetrics::new_for_tracer("parity").record_transaction(elapsed.as_secs_f64()); if elapsed > SLOW_REQUEST_THRESHOLD { warn!( @@ -717,7 +749,11 @@ impl TraceRpcServer for RpcContext { ); } - serde_json::to_value(&result).map_err(|e| rpc_err(format!("Serialization failed: {e}"))) + let value = serde_json::to_value(&result) + .map_err(|e| rpc_err(format!("Serialization failed: {e}")))?; + ResponseSizeMetrics::new_for_method(METHOD_TRACE_TRANSACTION) + .record(value.to_string().len()); + Ok(value) } }