Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 69 additions & 17 deletions bin/debug-trace-server/src/data_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ use tokio::sync::broadcast;
use tracing::{debug, instrument, trace, warn};
use validator_core::{withdrawals::MptWitness, LightWitness, RpcClient, ValidatorDB};

use crate::metrics::{ChainSyncMetrics, UpstreamMetrics};
use crate::metrics::{
ChainSyncMetrics, DataSourceMetrics, SingleFlightMetrics, UpstreamMetrics, WitnessSourceMetrics,
};

/// Block data bundle containing all information needed for stateless execution.
///
Expand Down Expand Up @@ -154,6 +156,8 @@ impl DataProvider {
elapsed_ms = start.elapsed().as_millis() as u64,
"Block data retrieved from local DB"
);
DataSourceMetrics::new_for_source("db").record();
SingleFlightMetrics::new_for_type("bypassed").record();
self.record_block_distance(data.block.header.number);
return Ok(data);
}
Expand Down Expand Up @@ -318,6 +322,7 @@ impl DataProvider {
// Subscribe to the existing request
let mut receiver = sender.subscribe();
drop(sender); // Release the lock
SingleFlightMetrics::new_for_type("coalesced").record();
trace!(
block_hash = %block_hash,
"Joining existing in-flight request"
Expand All @@ -332,6 +337,7 @@ impl DataProvider {
// Create a new broadcast channel for this request
let (tx, _) = broadcast::channel(1);
self.in_flight.insert(block_hash, tx.clone());
SingleFlightMetrics::new_for_type("new").record();

trace!(
block_hash = %block_hash,
Expand Down Expand Up @@ -374,22 +380,30 @@ impl DataProvider {
let block_number = header.number;
let fetch_header_ms = start.elapsed().as_millis();

// Step 2: Fetch witness and full block in parallel
let witness_start = std::time::Instant::now();
let full_block_start = std::time::Instant::now();

let (witness_result, full_block_result) = tokio::join!(
self.fetch_witness_with_fallback(block_number, header.hash),
self.rpc_client.get_block(BlockId::Hash(block_hash.into()), true),
// Step 2: Fetch witness and full block in parallel, timing each independently
let (witness_timed, block_timed) = tokio::join!(
async {
let start = std::time::Instant::now();
let result = self.fetch_witness_with_fallback(block_number, header.hash).await;
(result, start.elapsed())
},
async {
let start = std::time::Instant::now();
let result =
self.rpc_client.get_block(BlockId::Hash(block_hash.into()), true).await;
(result, start.elapsed())
},
);

let fetch_witness_ms = witness_start.elapsed().as_millis();
upstream_witness.record_request(witness_result.is_ok(), fetch_witness_ms as f64 / 1000.0);
let (witness_result, witness_elapsed) = witness_timed;
let (full_block_result, block_elapsed) = block_timed;

let fetch_witness_ms = witness_elapsed.as_millis();
upstream_witness.record_request(witness_result.is_ok(), witness_elapsed.as_secs_f64());
let (salt_witness, _mpt_witness) = witness_result?;

let fetch_full_block_ms = full_block_start.elapsed().as_millis();
upstream_block
.record_request(full_block_result.is_ok(), fetch_full_block_ms as f64 / 1000.0);
let fetch_full_block_ms = block_elapsed.as_millis();
upstream_block.record_request(full_block_result.is_ok(), block_elapsed.as_secs_f64());
let block = full_block_result?;

// Step 3: Convert SaltWitness to LightWitness
Expand Down Expand Up @@ -438,8 +452,12 @@ impl DataProvider {
block_number: u64,
block_hash: B256,
) -> Result<(SaltWitness, MptWitness)> {
let cf_metrics = WitnessSourceMetrics::new_for_source("cloudflare");
let start = std::time::Instant::now();
match self.rpc_client.get_witness_from_cloudflare(block_number, block_hash).await {
Ok(result) => {
cf_metrics.record_request(true, start.elapsed().as_secs_f64());
cf_metrics.record_size(estimate_witness_size(&result.0, &result.1));
trace!(
block_number,
block_hash = %block_hash,
Expand All @@ -448,6 +466,7 @@ impl DataProvider {
Ok(result)
}
Err(e) => {
cf_metrics.record_request(false, start.elapsed().as_secs_f64());
warn!(
block_number,
block_hash = %block_hash,
Expand Down Expand Up @@ -482,6 +501,8 @@ impl DataProvider {
None => true, // No DB means treat all blocks as "new" (try witness endpoint with retry)
};

let wg_metrics = WitnessSourceMetrics::new_for_source("witness_generator");

if is_new_block {
// Block is newer than DB max: retry witness endpoint until timeout
trace!(
Expand All @@ -490,17 +511,27 @@ impl DataProvider {
"Block is new, using retry loop for witness endpoint"
);

let start = std::time::Instant::now();
match self.fetch_witness_with_retry(block_number, block_hash).await {
Ok(result) => Ok(result),
Ok(result) => {
wg_metrics.record_request(true, start.elapsed().as_secs_f64());
wg_metrics.record_size(estimate_witness_size(&result.0, &result.1));
DataSourceMetrics::new_for_source("witness_generator").record();
Ok(result)
}
Err(e) => {
wg_metrics.record_request(false, start.elapsed().as_secs_f64());
// Timeout reached, try Cloudflare fallback
if self.rpc_client.has_cloudflare_provider() {
trace!(
block_number,
%e,
"Witness endpoint timeout, falling back to Cloudflare"
);
self.get_witness_from_cloudflare(block_number, block_hash).await
let result =
self.get_witness_from_cloudflare(block_number, block_hash).await?;
DataSourceMetrics::new_for_source("cloudflare").record();
Ok(result)
} else {
Err(e)
}
Expand All @@ -514,17 +545,27 @@ impl DataProvider {
"Block is old/pruned, single attempt then Cloudflare fallback"
);

let start = std::time::Instant::now();
match self.rpc_client.get_witness(block_number, block_hash).await {
Ok(result) => Ok(result),
Ok(result) => {
wg_metrics.record_request(true, start.elapsed().as_secs_f64());
wg_metrics.record_size(estimate_witness_size(&result.0, &result.1));
DataSourceMetrics::new_for_source("witness_generator").record();
Ok(result)
}
Err(e) => {
wg_metrics.record_request(false, start.elapsed().as_secs_f64());
// Single attempt failed, try Cloudflare fallback
if self.rpc_client.has_cloudflare_provider() {
trace!(
block_number,
%e,
"Witness endpoint failed, falling back to Cloudflare"
);
self.get_witness_from_cloudflare(block_number, block_hash).await
let result =
self.get_witness_from_cloudflare(block_number, block_hash).await?;
DataSourceMetrics::new_for_source("cloudflare").record();
Ok(result)
} else {
// No Cloudflare configured and witness fetch failed
Err(eyre::eyre!("Block witness not found for block {}", block_number))
Expand Down Expand Up @@ -663,6 +704,17 @@ impl DataProvider {
}
}

/// Estimates the total size of witness data in bytes (approximate, avoids serialization).
fn estimate_witness_size(salt: &SaltWitness, mpt: &MptWitness) -> usize {
// SaltKey (8 bytes) + Option<SaltValue> (~95 bytes) ≈ 103 bytes per entry
let salt_kvs_size = salt.kvs.len() * 103;
// Proof: commitments (64 bytes each) + IPA proof (~576 bytes) + levels (5 bytes each)
let proof_size = salt.proof.parents_commitments.len() * 64 + 576 + salt.proof.levels.len() * 5;
// MptWitness: storage_root (32 bytes) + sum of state bytes
let mpt_size = 32 + mpt.state.iter().map(|b| b.len()).sum::<usize>();
salt_kvs_size + proof_size + mpt_size
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
5 changes: 5 additions & 0 deletions bin/debug-trace-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,11 @@ async fn history_pruner(
let earliest =
validator_db.get_earliest_local_block().ok().flatten().map(|(n, _)| n).unwrap_or(0);
chain_sync_metrics.set_db_block_range(earliest, current_tip);

// Update DB file size metric
if let Ok(m) = std::fs::metadata(&db_path) {
chain_sync_metrics.set_db_size(m.len());
}
}

tokio::time::sleep(interval).await;
Expand Down
Loading