diff --git a/crates/host-rpc/src/error.rs b/crates/host-rpc/src/error.rs index acd0f5b..1fbd52d 100644 --- a/crates/host-rpc/src/error.rs +++ b/crates/host-rpc/src/error.rs @@ -19,8 +19,9 @@ pub enum RpcHostError { #[error("missing block {0}")] MissingBlock(BlockNumberOrTag), - /// Walk exhaustion recovery requires a cached finalized block number, - /// but none has been fetched yet. - #[error("no cached finalized block number for exhaustion recovery")] - NoFinalizedBlock, + /// The first block of a backfill batch does not chain to the last + /// emitted block (parent-hash mismatch). A reorg occurred during the + /// gap between exhaustion and backfill. + #[error("backfill continuity break: parent hash mismatch after exhaustion recovery")] + BackfillContinuityBreak, } diff --git a/crates/host-rpc/src/notifier.rs b/crates/host-rpc/src/notifier.rs index a8bc2ec..a1929ff 100644 --- a/crates/host-rpc/src/notifier.rs +++ b/crates/host-rpc/src/notifier.rs @@ -84,6 +84,10 @@ pub struct RpcHostNotifier

{ /// Maximum number of concurrent RPC block fetches. max_rpc_concurrency: usize, + /// The last block emitted in a notification, tracked as (number, hash). + /// Used as the authoritative delivery position for exhaustion recovery. + last_emitted: Option<(u64, B256)>, + /// Seconds per slot, used for epoch calculation. slot_seconds: u64, @@ -98,6 +102,7 @@ impl

core::fmt::Debug for RpcHostNotifier

{ .field("buffer_capacity", &self.buffer_capacity) .field("max_rpc_concurrency", &self.max_rpc_concurrency) .field("backfill_from", &self.backfill_from) + .field("last_emitted", &self.last_emitted) .finish_non_exhaustive() } } @@ -127,6 +132,7 @@ where backfill_from: None, backfill_batch_size, max_rpc_concurrency, + last_emitted: None, slot_seconds, genesis_timestamp, } @@ -436,13 +442,14 @@ where return Ok(None); } WalkResult::Exhausted => { - let finalized = self.cached_finalized.ok_or(RpcHostError::NoFinalizedBlock)?; + let resume_from = self.last_emitted.map(|(n, _)| n + 1); warn!( buffer_capacity = self.buffer_capacity, - finalized, "walk exhausted buffer, resetting to backfill from finalized" + ?resume_from, + "walk exhausted buffer, resetting to backfill from last emitted" ); self.chain_view.clear(); - self.backfill_from = Some(finalized); + self.backfill_from = resume_from; self.last_tag_epoch = None; crate::metrics::record_handle_new_head_duration(start.elapsed()); return Ok(None); @@ -451,6 +458,7 @@ where let count = new_chain.len(); let blocks = self.fetch_blocks_by_hash(&new_chain).await?; self.extend_view(&new_chain); + self.last_emitted = new_chain.last().copied(); info!(blocks = count, "chain advanced"); crate::metrics::inc_blocks_fetched(count as u64, "frontfill"); HostNotificationKind::ChainCommitted { new: Arc::new(RpcChainSegment::new(blocks)) } @@ -460,6 +468,7 @@ where let depth = old_tip.saturating_sub(fork_number) + 1; let blocks = self.fetch_blocks_by_hash(&new_chain).await?; self.reorg_view(fork_number, &new_chain); + self.last_emitted = new_chain.last().copied(); info!(depth, fork_number, new_blocks = count, "chain reorged"); crate::metrics::inc_blocks_fetched(count as u64, "frontfill"); crate::metrics::inc_reorgs(depth); @@ -490,8 +499,8 @@ where /// Process a backfill batch if pending. /// - /// Backfills by number up to `(latest - buffer_capacity)` to leave room - /// for hash-based frontfill of recent blocks. + /// Backfills by number up to `(latest - buffer_capacity / 2)` to leave + /// half the buffer depth for hash-based frontfill of recent blocks. #[tracing::instrument( level = "info", skip_all, @@ -519,7 +528,7 @@ where } }; - let backfill_ceiling = tip.saturating_sub(self.buffer_capacity as u64); + let backfill_ceiling = tip.saturating_sub(self.buffer_capacity as u64 / 2); if from > backfill_ceiling { self.backfill_from = None; info!("backfill complete, switching to frontfill"); @@ -538,10 +547,30 @@ where } }; + // Verify parent-hash continuity with last emitted block. + if let Some((_, expected_parent)) = self.last_emitted + && let Some(first) = blocks.first() + && first.parent_hash() != expected_parent + { + warn!( + expected = %expected_parent, + actual = %first.parent_hash(), + "parent hash mismatch after exhaustion recovery" + ); + self.chain_view.clear(); + crate::metrics::record_backfill_batch(start.elapsed()); + return Some(Err(RpcHostError::BackfillContinuityBreak)); + } + let view_entries: Vec<(u64, B256)> = blocks.iter().map(|b| (b.number(), b.hash())).collect(); self.extend_view(&view_entries); + // Update delivery high-water mark. + if let Some(&(n, h)) = view_entries.last() { + self.last_emitted = Some((n, h)); + } + let backfill_done = to >= backfill_ceiling; if backfill_done { self.backfill_from = None; @@ -617,6 +646,7 @@ where fn set_head(&mut self, block_number: u64) { self.backfill_from = Some(block_number); + self.last_emitted = None; } fn set_backfill_thresholds(&mut self, max_blocks: Option) {