From 390e37a874b6b15b7b4252deb078a314a02cee5c Mon Sep 17 00:00:00 2001 From: sergerad Date: Tue, 12 May 2026 16:17:41 +1200 Subject: [PATCH 1/4] Move proving inputs from DB to file --- crates/store/src/blocks.rs | 55 ++++ .../down.sql | 5 + .../up.sql | 12 + crates/store/src/db/mod.rs | 114 +------- .../src/db/models/queries/block_headers.rs | 179 ------------- crates/store/src/db/models/queries/mod.rs | 4 +- crates/store/src/db/schema.rs | 2 - crates/store/src/db/tests.rs | 246 +----------------- crates/store/src/errors.rs | 2 + crates/store/src/proven_tip.rs | 153 +++++++++++ crates/store/src/server/mod.rs | 25 +- crates/store/src/server/proof_scheduler.rs | 111 ++++---- crates/store/src/state/apply_block.rs | 25 +- crates/store/src/state/apply_proof.rs | 4 +- crates/store/src/state/mod.rs | 27 +- 15 files changed, 341 insertions(+), 623 deletions(-) create mode 100644 crates/store/src/db/migrations/2026051200000_remove_proving_inputs/down.sql create mode 100644 crates/store/src/db/migrations/2026051200000_remove_proving_inputs/up.sql diff --git a/crates/store/src/blocks.rs b/crates/store/src/blocks.rs index 749ef02892..f2b152b1b6 100644 --- a/crates/store/src/blocks.rs +++ b/crates/store/src/blocks.rs @@ -132,6 +132,47 @@ impl BlockStore { } } + // PROVING INPUTS STORAGE + // -------------------------------------------------------------------------------------------- + + #[instrument( + target = COMPONENT, + name = "store.block_store.save_proving_inputs", + skip_all, + err, + fields(block.number = block_num.as_u32(), inputs_size = data.len()) + )] + pub async fn save_proving_inputs( + &self, + block_num: BlockNumber, + data: &[u8], + ) -> std::io::Result<()> { + let (epoch_path, inputs_path) = self.epoch_inputs_path(block_num)?; + if !epoch_path.exists() { + tokio::fs::create_dir_all(epoch_path).await?; + } + tokio::fs::write(inputs_path, data).await + } + + pub async fn load_proving_inputs( + &self, + block_num: BlockNumber, + ) -> std::io::Result>> { + match tokio::fs::read(self.inputs_path(block_num)).await { + Ok(data) => Ok(Some(data)), + Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None), + Err(err) => Err(err), + } + } + + pub async fn delete_proving_inputs(&self, block_num: BlockNumber) -> std::io::Result<()> { + match tokio::fs::remove_file(self.inputs_path(block_num)).await { + Ok(()) => Ok(()), + Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()), + Err(err) => Err(err), + } + } + // HELPER FUNCTIONS // -------------------------------------------------------------------------------------------- @@ -163,6 +204,20 @@ impl BlockStore { Ok((epoch_path.to_path_buf(), proof_path)) } + fn inputs_path(&self, block_num: BlockNumber) -> PathBuf { + let block_num = block_num.as_u32(); + let epoch = block_num >> 16; + let epoch_dir = self.store_dir.join(format!("{epoch:04x}")); + epoch_dir.join(format!("inputs_{block_num:08x}.dat")) + } + + fn epoch_inputs_path(&self, block_num: BlockNumber) -> std::io::Result<(PathBuf, PathBuf)> { + let inputs_path = self.inputs_path(block_num); + let epoch_path = inputs_path.parent().ok_or(std::io::Error::from(ErrorKind::NotFound))?; + + Ok((epoch_path.to_path_buf(), inputs_path)) + } + pub fn display(&self) -> std::path::Display<'_> { self.store_dir.display() } diff --git a/crates/store/src/db/migrations/2026051200000_remove_proving_inputs/down.sql b/crates/store/src/db/migrations/2026051200000_remove_proving_inputs/down.sql new file mode 100644 index 0000000000..62614bdd90 --- /dev/null +++ b/crates/store/src/db/migrations/2026051200000_remove_proving_inputs/down.sql @@ -0,0 +1,5 @@ +-- Restore proving_inputs and proven_in_sequence columns (data is not recovered). +ALTER TABLE block_headers ADD COLUMN proving_inputs BLOB; +ALTER TABLE block_headers ADD COLUMN proven_in_sequence BOOLEAN NOT NULL DEFAULT FALSE; +CREATE INDEX block_headers_proven_desc ON block_headers(block_num DESC) WHERE proving_inputs IS NULL; +CREATE INDEX block_headers_proven_in_sequence ON block_headers(block_num DESC) WHERE proven_in_sequence = TRUE; diff --git a/crates/store/src/db/migrations/2026051200000_remove_proving_inputs/up.sql b/crates/store/src/db/migrations/2026051200000_remove_proving_inputs/up.sql new file mode 100644 index 0000000000..1591dd0f48 --- /dev/null +++ b/crates/store/src/db/migrations/2026051200000_remove_proving_inputs/up.sql @@ -0,0 +1,12 @@ +-- Move proving inputs out of the database. +-- +-- Proving inputs are large BLOBs that only serve the proof scheduler; they now live as +-- `inputs_.dat` files in the block store alongside block data and proofs. +-- The proven-in-sequence tip is tracked by a small `proven_tip` file in the data directory. +-- +-- Drop indexes that reference the columns being removed first (required by SQLite before DROP +-- COLUMN can succeed for indexed columns). +DROP INDEX block_headers_proven_desc; +DROP INDEX block_headers_proven_in_sequence; +ALTER TABLE block_headers DROP COLUMN proving_inputs; +ALTER TABLE block_headers DROP COLUMN proven_in_sequence; diff --git a/crates/store/src/db/mod.rs b/crates/store/src/db/mod.rs index 89b36a7fae..57a219c9cf 100644 --- a/crates/store/src/db/mod.rs +++ b/crates/store/src/db/mod.rs @@ -7,7 +7,7 @@ use std::sync::Arc; use anyhow::Context; use diesel::{Connection, QueryableByName, RunQueryDsl, SqliteConnection}; use miden_node_proto::domain::account::AccountInfo; -use miden_node_proto::{BlockProofRequest, generated as proto}; +use miden_node_proto::generated as proto; use miden_node_utils::limiter::MAX_RESPONSE_PAYLOAD_BYTES; use miden_node_utils::tracing::OpenTelemetrySpanExt; use miden_protocol::Word; @@ -273,7 +273,7 @@ impl Db { // Insert genesis block data. let genesis_block = genesis.into_inner(); - conn.transaction(move |conn| models::queries::apply_block(conn, &genesis_block, &[], None)) + conn.transaction(move |conn| models::queries::apply_block(conn, &genesis_block, &[])) .context("failed to insert genesis block")?; Ok(()) } @@ -540,10 +540,9 @@ impl Db { acquire_done: oneshot::Receiver<()>, signed_block: SignedBlock, notes: Vec<(NoteRecord, Option)>, - proving_inputs: Option, ) -> Result<()> { self.transact("apply block", move |conn| -> Result<()> { - models::queries::apply_block(conn, &signed_block, ¬es, proving_inputs)?; + models::queries::apply_block(conn, &signed_block, ¬es)?; // XXX FIXME TODO free floating mutex MUST NOT exist // it doesn't bind it properly to the data locked! @@ -565,61 +564,6 @@ impl Db { .await } - /// Marks a previously committed block as proven and advances the proven-in-sequence tip. - /// - /// Atomically clears `proving_inputs` for the given block, then walks forward from the - /// current proven-in-sequence tip through consecutive proven blocks, marking each as - /// proven-in-sequence. - /// - /// Returns the new tip of blocks that are proven in-sequence (which may have been unchanged by - /// this function). - #[instrument(target = COMPONENT, skip_all, err)] - pub async fn mark_proven_and_advance_sequence( - &self, - block_num: BlockNumber, - ) -> Result { - self.transact("mark block proven", move |conn| { - mark_proven_and_advance_sequence(conn, block_num) - }) - .await - } - - /// Returns the proving inputs for a given block number, if stored. - #[instrument(level = "debug", target = COMPONENT, skip_all, err)] - pub async fn select_block_proving_inputs( - &self, - block_num: BlockNumber, - ) -> Result> { - self.transact("select block proving inputs", move |conn| { - models::queries::select_block_proving_inputs(conn, block_num) - }) - .await - } - - /// Returns unproven block numbers greater than `after`, in ascending order, up to `limit`. - #[instrument(level = "debug", target = COMPONENT, skip_all, err)] - pub async fn select_unproven_blocks( - &self, - after: BlockNumber, - limit: usize, - ) -> Result> { - self.transact("select unproven blocks", move |conn| { - models::queries::select_unproven_blocks(conn, after, limit) - }) - .await - } - - /// Returns the highest block number that has been proven in sequence. - /// - /// This includes the genesis block, which is not technically proven, but treated as such. - #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)] - pub async fn proven_chain_tip(&self) -> Result { - self.transact("select latest proven block num", |conn| { - models::queries::select_latest_proven_in_sequence_block_num(conn) - }) - .await - } - /// Selects storage map values for syncing storage maps for a specific account ID. /// /// The returned values are the latest known values up to `block_range.end()`, and no values @@ -828,55 +772,3 @@ impl Db { .await } } - -/// Mark a committed block as proven and advance the proven-in-sequence tip. -/// -/// This is intended to atomically (when run in a transaction): -/// 1. Clears `proving_inputs` for the given block (marking it proven). -/// 2. Queries all blocks where `proving_inputs IS NULL AND proven_in_sequence = FALSE`. -/// 3. Walks forward from the current proven-in-sequence tip through consecutive proven blocks and -/// sets `proven_in_sequence = TRUE` for each. -/// -/// Returns the new tip of blocks that are proven in-sequence (which may have been unchanged by this -/// function). -/// -/// Returns [`DatabaseError::DataCorrupted`] if any proven-but-not-in-sequence block is found at -/// or below the current tip, as that indicates a consistency bug. -pub(crate) fn mark_proven_and_advance_sequence( - conn: &mut SqliteConnection, - block_num: BlockNumber, -) -> Result { - // Clear proving_inputs for the specified block. - models::queries::clear_block_proving_inputs(conn, block_num)?; - - // Get the current proven-in-sequence tip (highest in-sequence). - let current_tip = models::queries::select_latest_proven_in_sequence_block_num(conn)?; - let mut new_tip = current_tip; - - // Get all blocks that are proven but not yet marked in-sequence. - let unsequenced = models::queries::select_proven_not_in_sequence_blocks(conn)?; - - // Walk forward from the tip through consecutive proven blocks. - for candidate in unsequenced { - if candidate <= current_tip { - return Err(DatabaseError::DataCorrupted(format!( - "block {candidate} is proven but not marked in-sequence while the tip is at {current_tip}" - ))); - } - if candidate == new_tip.child() { - // Walk the tip forward. - new_tip = candidate; - } else { - // Sequence has been broken. Discontinue walking tip forward. - break; - } - } - - // Mark the newly contiguous blocks as proven-in-sequence. - if new_tip > current_tip { - let block_from = current_tip.child(); - models::queries::mark_blocks_as_proven_in_sequence(conn, block_from, new_tip)?; - } - - Ok(new_tip) -} diff --git a/crates/store/src/db/models/queries/block_headers.rs b/crates/store/src/db/models/queries/block_headers.rs index 1f2348d1ac..eb0dab2f0d 100644 --- a/crates/store/src/db/models/queries/block_headers.rs +++ b/crates/store/src/db/models/queries/block_headers.rs @@ -13,7 +13,6 @@ use diesel::{ }; use miden_crypto::Word; use miden_crypto::dsa::ecdsa_k256_keccak::Signature; -use miden_node_proto::BlockProofRequest; use miden_node_utils::limiter::{QueryParamBlockLimit, QueryParamLimiter}; use miden_protocol::block::{BlockHeader, BlockNumber}; use miden_protocol::utils::serde::{Deserializable, Serializable}; @@ -206,8 +205,6 @@ pub struct BlockHeaderInsert { pub block_header: Vec, pub signature: Vec, pub commitment: Vec, - pub proving_inputs: Option>, - pub proven_in_sequence: bool, } /// Insert a [`BlockHeader`] to the DB using the given [`SqliteConnection`]. @@ -229,189 +226,13 @@ pub(crate) fn insert_block_header( conn: &mut SqliteConnection, block_header: &BlockHeader, signature: &Signature, - proving_inputs: Option, ) -> Result { - // Genesis block has no proving inputs and is treated as proven in sequence. - // Non-genesis blocks without proving inputs are replica blocks: they arrive pre-proven from - // an upstream store and will not be scheduled for local proving (proven_in_sequence = false). - let proven_in_sequence = block_header.block_num() == BlockNumber::GENESIS; let row = BlockHeaderInsert { block_num: block_header.block_num().to_raw_sql(), block_header: block_header.to_bytes(), signature: signature.to_bytes(), commitment: BlockHeaderCommitment::new(block_header).to_raw_sql(), - proving_inputs: proving_inputs.map(|inputs| inputs.to_bytes()), - proven_in_sequence, }; let count = diesel::insert_into(schema::block_headers::table).values(&[row]).execute(conn)?; Ok(count) } - -/// Select the proving inputs for a given block number. -/// -/// # Returns -/// -/// `None` if the block does not exist or has no proving inputs stored. -/// -/// # Raw SQL -/// -/// ```sql -/// SELECT proving_inputs -/// FROM block_headers -/// WHERE block_num = ?1 -/// ``` -pub(crate) fn select_block_proving_inputs( - conn: &mut SqliteConnection, - block_num: BlockNumber, -) -> Result, DatabaseError> { - let inputs: Option>> = - SelectDsl::select(schema::block_headers::table, schema::block_headers::proving_inputs) - .filter(schema::block_headers::block_num.eq(block_num.to_raw_sql())) - .get_result(conn) - .optional()?; - inputs - .flatten() - .map(|bytes| BlockProofRequest::read_from_bytes(&bytes)) - .transpose() - .map_err(Into::into) -} - -/// Clear `proving_inputs` for the given block, marking it as proven. -/// -/// # Raw SQL -/// -/// ```sql -/// UPDATE block_headers -/// SET proving_inputs = NULL -/// WHERE block_num = ? -/// ``` -pub(crate) fn clear_block_proving_inputs( - conn: &mut SqliteConnection, - block_num: BlockNumber, -) -> Result<(), DatabaseError> { - diesel::update( - schema::block_headers::table - .filter(schema::block_headers::block_num.eq(block_num.to_raw_sql())), - ) - .set(schema::block_headers::proving_inputs.eq(None::>)) - .execute(conn)?; - - Ok(()) -} - -/// Select block numbers that are proven (`proving_inputs IS NULL`) but not yet marked -/// in-sequence, ordered ascending. -/// -/// # Raw SQL -/// -/// ```sql -/// SELECT block_num -/// FROM block_headers -/// WHERE proving_inputs IS NULL -/// AND proven_in_sequence = FALSE -/// ORDER BY block_num ASC -/// ``` -pub(crate) fn select_proven_not_in_sequence_blocks( - conn: &mut SqliteConnection, -) -> Result, DatabaseError> { - let block_nums: Vec = - SelectDsl::select(schema::block_headers::table, schema::block_headers::block_num) - .filter(schema::block_headers::proving_inputs.is_null()) - .filter(schema::block_headers::proven_in_sequence.eq(false)) - .order(schema::block_headers::block_num.asc()) - .load(conn)?; - - block_nums - .into_iter() - .map(BlockNumber::from_raw_sql) - .collect::, _>>() - .map_err(Into::into) -} - -/// Mark blocks in the range `[block_from, block_to]` as proven in sequence. -/// -/// # Raw SQL -/// -/// ```sql -/// UPDATE block_headers -/// SET proven_in_sequence = TRUE -/// WHERE block_num >= ? AND block_num <= ? -/// ``` -pub(crate) fn mark_blocks_as_proven_in_sequence( - conn: &mut SqliteConnection, - block_from: BlockNumber, - block_to: BlockNumber, -) -> Result<(), DatabaseError> { - diesel::update( - schema::block_headers::table - .filter(schema::block_headers::block_num.ge(block_from.to_raw_sql())) - .filter(schema::block_headers::block_num.le(block_to.to_raw_sql())), - ) - .set(schema::block_headers::proven_in_sequence.eq(true)) - .execute(conn)?; - - Ok(()) -} - -/// Select unproven block numbers greater than `after`, in ascending order, up to `limit`. -/// -/// A block is unproven when its `proving_inputs` are non-NULL. -/// -/// # Raw SQL -/// -/// ```sql -/// SELECT block_num -/// FROM block_headers -/// WHERE proving_inputs IS NOT NULL -/// AND block_num > ? -/// ORDER BY block_num ASC -/// LIMIT ? -/// ``` -pub(crate) fn select_unproven_blocks( - conn: &mut SqliteConnection, - after: BlockNumber, - limit: usize, -) -> Result, DatabaseError> { - let block_nums: Vec = - SelectDsl::select(schema::block_headers::table, schema::block_headers::block_num) - .filter(schema::block_headers::proving_inputs.is_not_null()) - .filter(schema::block_headers::block_num.gt(after.to_raw_sql())) - .order(schema::block_headers::block_num.asc()) - .limit(i64::try_from(limit).expect("unproven block number limit should fit in i64")) - .load(conn)?; - - block_nums - .into_iter() - .map(BlockNumber::from_raw_sql) - .collect::, _>>() - .map_err(Into::into) -} - -/// Select the highest block number that has been proven in an unbroken sequence from genesis. -/// -/// A block is marked `proven_in_sequence` when it and all its ancestors have been proven. This -/// is maintained by the proof scheduler as blocks complete proving (potentially out of order). -/// -/// The genesis block is always inserted with `proven_in_sequence = TRUE`. -/// -/// This function is expected to only ever be called after a genesis block has been inserted into -/// the database. As such, if no proven-in-sequence block is found, it is treated as an error. -/// -/// # Raw SQL -/// -/// ```sql -/// SELECT MAX(block_num) -/// FROM block_headers -/// WHERE proven_in_sequence = TRUE -/// ``` -pub(crate) fn select_latest_proven_in_sequence_block_num( - conn: &mut SqliteConnection, -) -> Result { - let block_num: i64 = - SelectDsl::select(schema::block_headers::table, schema::block_headers::block_num) - .filter(schema::block_headers::proven_in_sequence.eq(true)) - .order(schema::block_headers::block_num.desc()) - .first(conn)?; - - BlockNumber::from_raw_sql(block_num).map_err(Into::into) -} diff --git a/crates/store/src/db/models/queries/mod.rs b/crates/store/src/db/models/queries/mod.rs index 7fcc143dba..43a7e58216 100644 --- a/crates/store/src/db/models/queries/mod.rs +++ b/crates/store/src/db/models/queries/mod.rs @@ -26,7 +26,6 @@ //! considered unnecessary boilerplate by default. use diesel::SqliteConnection; -use miden_node_proto::BlockProofRequest; use miden_protocol::block::SignedBlock; use miden_protocol::note::Nullifier; @@ -54,11 +53,10 @@ pub(crate) fn apply_block( conn: &mut SqliteConnection, block: &SignedBlock, notes: &[(NoteRecord, Option)], - proving_inputs: Option, ) -> Result { let mut count = 0; // Note: ordering here is important as the relevant tables have FK dependencies. - count += insert_block_header(conn, block.header(), block.signature(), proving_inputs)?; + count += insert_block_header(conn, block.header(), block.signature())?; count += upsert_accounts(conn, block.body().updated_accounts(), block.header().block_num())?; count += insert_scripts(conn, notes.iter().map(|(note, _)| note))?; count += insert_notes(conn, notes)?; diff --git a/crates/store/src/db/schema.rs b/crates/store/src/db/schema.rs index 38986fd3ae..60660afcbf 100644 --- a/crates/store/src/db/schema.rs +++ b/crates/store/src/db/schema.rs @@ -49,8 +49,6 @@ diesel::table! { block_header -> Binary, signature -> Binary, commitment -> Binary, - proving_inputs -> Nullable, - proven_in_sequence -> Bool, } } diff --git a/crates/store/src/db/tests.rs b/crates/store/src/db/tests.rs index 1c582066ae..081577d7a0 100644 --- a/crates/store/src/db/tests.rs +++ b/crates/store/src/db/tests.rs @@ -2,8 +2,7 @@ use std::num::NonZeroUsize; use std::sync::{Arc, Mutex}; use assert_matches::assert_matches; -use diesel::{Connection, ExpressionMethods, QueryDsl, RunQueryDsl, SqliteConnection}; -use miden_node_proto::BlockProofRequest; +use diesel::{Connection, SqliteConnection}; use miden_node_proto::domain::account::{AccountSummary, StorageMapEntries}; use miden_node_utils::fee::{test_fee, test_fee_params}; use miden_protocol::account::auth::{AuthScheme, PublicKeyCommitment}; @@ -28,11 +27,9 @@ use miden_protocol::account::{ StorageSlotName, }; use miden_protocol::asset::{Asset, FungibleAsset}; -use miden_protocol::batch::OrderedBatches; use miden_protocol::block::{ BlockAccountUpdate, BlockHeader, - BlockInputs, BlockNoteIndex, BlockNoteTree, BlockNumber, @@ -67,7 +64,6 @@ use miden_protocol::transaction::{ InputNoteCommitment, InputNotes, OrderedTransactionHeaders, - PartialBlockchain, TransactionHeader, TransactionId, }; @@ -85,7 +81,6 @@ use crate::account_state_forest::HISTORICAL_BLOCK_RETENTION; use crate::db::migrations::apply_migrations; use crate::db::models::queries::{StorageMapValue, insert_account_storage_map_value}; use crate::db::models::{Page, queries, utils}; -use crate::db::schema; use crate::errors::DatabaseError; fn create_db() -> SqliteConnection { @@ -112,28 +107,8 @@ fn create_block(conn: &mut SqliteConnection, block_num: BlockNumber) { let dummy_signature = SecretKey::new().sign(block_header.commitment()); - let proving_inputs = if block_num == BlockNumber::GENESIS { - None - } else { - Some(dummy_proving_inputs(&block_header)) - }; - conn.transaction(|conn| { - queries::insert_block_header(conn, &block_header, &dummy_signature, proving_inputs)?; - // For non-genesis blocks, simulate the block having been proven and marked in sequence - // so that tests which don't care about proving state get a fully-proven chain. - if block_num != BlockNumber::GENESIS { - use crate::db::models::conv::SqlTypeConvert; - diesel::update( - schema::block_headers::table - .filter(schema::block_headers::block_num.eq(block_num.to_raw_sql())), - ) - .set(( - schema::block_headers::proving_inputs.eq(None::>), - schema::block_headers::proven_in_sequence.eq(true), - )) - .execute(conn)?; - } + queries::insert_block_header(conn, &block_header, &dummy_signature)?; Ok::<_, DatabaseError>(()) }) .unwrap(); @@ -774,13 +749,7 @@ fn db_block_header() { // test insertion let dummy_signature = SecretKey::new().sign(block_header.commitment()); - queries::insert_block_header( - conn, - &block_header, - &dummy_signature, - Some(dummy_proving_inputs(&block_header)), - ) - .unwrap(); + queries::insert_block_header(conn, &block_header, &dummy_signature).unwrap(); // test fetch unknown block header let block_number = 1; @@ -812,13 +781,7 @@ fn db_block_header() { ); let dummy_signature = SecretKey::new().sign(block_header2.commitment()); - queries::insert_block_header( - conn, - &block_header2, - &dummy_signature, - Some(dummy_proving_inputs(&block_header2)), - ) - .unwrap(); + queries::insert_block_header(conn, &block_header2, &dummy_signature).unwrap(); let res = queries::select_block_header_by_block_num(conn, None).unwrap(); assert_eq!(res.unwrap(), block_header2); @@ -2419,13 +2382,7 @@ fn db_roundtrip_block_header() { // Insert let dummy_signature = SecretKey::new().sign(block_header.commitment()); - queries::insert_block_header( - &mut conn, - &block_header, - &dummy_signature, - Some(dummy_proving_inputs(&block_header)), - ) - .unwrap(); + queries::insert_block_header(&mut conn, &block_header, &dummy_signature).unwrap(); // Retrieve let retrieved = @@ -3934,196 +3891,3 @@ fn account_state_forest_preserves_mixed_slots_independently() { let map_a_root_at_1 = forest.get_storage_map_root(account_id, &slot_map_a, block_1); assert!(map_a_root_at_1.is_some(), "Map A block 1 should be pruned"); } - -// PROVEN IN SEQUENCE TESTS -// ================================================================================================ - -/// Creates a minimal dummy `BlockProofRequest` for test purposes. -fn dummy_proving_inputs(block_header: &BlockHeader) -> BlockProofRequest { - BlockProofRequest { - tx_batches: OrderedBatches::new(vec![]), - block_header: block_header.clone(), - block_inputs: BlockInputs::new( - BlockHeader::mock(0, None, None, &[], EMPTY_WORD), - PartialBlockchain::default(), - std::collections::BTreeMap::new(), - std::collections::BTreeMap::new(), - std::collections::BTreeMap::new(), - ), - } -} - -fn create_unproven_block(conn: &mut SqliteConnection, block_num: BlockNumber) { - let block_header = BlockHeader::new( - 1_u8.into(), - num_to_word(2), - block_num, - num_to_word(4), - num_to_word(5), - num_to_word(6), - num_to_word(7), - num_to_word(8), - num_to_word(9), - SecretKey::new().public_key(), - test_fee_params(), - 11_u8.into(), - ); - - let dummy_signature = SecretKey::new().sign(block_header.commitment()); - conn.transaction(|conn| { - queries::insert_block_header( - conn, - &block_header, - &dummy_signature, - Some(dummy_proving_inputs(&block_header)), - ) - }) - .unwrap(); -} - -#[test] -fn select_latest_proven_block_num_only_genesis() { - let mut conn = create_db(); - - // Genesis block (block 0) is proven at insert time (proving_inputs = None). - create_block(&mut conn, BlockNumber::GENESIS); - - let latest = queries::select_latest_proven_in_sequence_block_num(&mut conn).unwrap(); - assert_eq!(latest, BlockNumber::GENESIS); -} - -#[test] -fn mark_block_proven_advances_in_sequence_for_consecutive_blocks() { - let mut conn = create_db(); - - // Insert genesis (proven + in-sequence) and three unproven blocks. - create_block(&mut conn, BlockNumber::GENESIS); - for i in 1u32..=3 { - create_unproven_block(&mut conn, BlockNumber::from(i)); - } - - // Mark all three as proven in order. Each call atomically advances the in-sequence tip. - for i in 1u32..=3 { - let new_tip = - super::mark_proven_and_advance_sequence(&mut conn, BlockNumber::from(i)).unwrap(); - assert_eq!(new_tip, BlockNumber::from(i)); - } - - let latest = queries::select_latest_proven_in_sequence_block_num(&mut conn).unwrap(); - assert_eq!(latest, BlockNumber::from(3u32)); -} - -#[test] -fn mark_block_proven_with_hole_does_not_advance_past_gap() { - let mut conn = create_db(); - - // Insert genesis + blocks 1..=4 as unproven. - create_block(&mut conn, BlockNumber::GENESIS); - for i in 1u32..=4 { - create_unproven_block(&mut conn, BlockNumber::from(i)); - } - - // Prove block 1 — advances tip to 1. - let new_tip = - super::mark_proven_and_advance_sequence(&mut conn, BlockNumber::from(1u32)).unwrap(); - assert_eq!(new_tip, BlockNumber::from(1u32)); - - // Prove blocks 3, 4 (skipping 2) — cannot advance past the gap. - let new_tip = - super::mark_proven_and_advance_sequence(&mut conn, BlockNumber::from(3u32)).unwrap(); - assert_eq!(new_tip, BlockNumber::from(1u32)); - let new_tip = - super::mark_proven_and_advance_sequence(&mut conn, BlockNumber::from(4u32)).unwrap(); - assert_eq!(new_tip, BlockNumber::from(1u32)); - - // Latest proven in sequence should be 1 (blocks 3, 4 are proven but not in sequence). - let latest = queries::select_latest_proven_in_sequence_block_num(&mut conn).unwrap(); - assert_eq!(latest, BlockNumber::from(1u32)); -} - -#[test] -fn mark_block_proven_filling_hole_advances_through_all_consecutive() { - let mut conn = create_db(); - - // Insert genesis + blocks 1..=4 as unproven. - create_block(&mut conn, BlockNumber::GENESIS); - for i in 1u32..=4 { - create_unproven_block(&mut conn, BlockNumber::from(i)); - } - - // Prove blocks out of order: 1, 3, 4 first. - let new_tip = - super::mark_proven_and_advance_sequence(&mut conn, BlockNumber::from(1u32)).unwrap(); - assert_eq!(new_tip, BlockNumber::from(1u32)); - let new_tip = - super::mark_proven_and_advance_sequence(&mut conn, BlockNumber::from(3u32)).unwrap(); - assert_eq!(new_tip, BlockNumber::from(1u32)); - let new_tip = - super::mark_proven_and_advance_sequence(&mut conn, BlockNumber::from(4u32)).unwrap(); - assert_eq!(new_tip, BlockNumber::from(1u32)); - - assert_eq!( - queries::select_latest_proven_in_sequence_block_num(&mut conn).unwrap(), - BlockNumber::from(1u32), - ); - - // Now prove block 2, filling the hole. Should advance tip through to 4. - let new_tip = - super::mark_proven_and_advance_sequence(&mut conn, BlockNumber::from(2u32)).unwrap(); - assert_eq!(new_tip, BlockNumber::from(4u32)); - - // Now all blocks through 4 are proven in sequence. - let latest = queries::select_latest_proven_in_sequence_block_num(&mut conn).unwrap(); - assert_eq!(latest, BlockNumber::from(4u32)); -} - -#[test] -fn select_unproven_blocks_skips_proven() { - let mut conn = create_db(); - - // Genesis is proven. Add blocks 1..=5, some proven and some not. - create_block(&mut conn, BlockNumber::GENESIS); - for i in 1u32..=5 { - create_unproven_block(&mut conn, BlockNumber::from(i)); - } - - // Prove blocks 1 and 3. - super::mark_proven_and_advance_sequence(&mut conn, BlockNumber::from(1u32)).unwrap(); - super::mark_proven_and_advance_sequence(&mut conn, BlockNumber::from(3u32)).unwrap(); - - // Unproven blocks after genesis should be 2, 4, 5. - let unproven = queries::select_unproven_blocks(&mut conn, BlockNumber::GENESIS, 10).unwrap(); - assert_eq!( - unproven, - vec![BlockNumber::from(2u32), BlockNumber::from(4u32), BlockNumber::from(5u32),] - ); -} - -#[test] -fn select_unproven_blocks_respects_limit() { - let mut conn = create_db(); - - create_block(&mut conn, BlockNumber::GENESIS); - for i in 1u32..=5 { - create_unproven_block(&mut conn, BlockNumber::from(i)); - } - - let unproven = queries::select_unproven_blocks(&mut conn, BlockNumber::GENESIS, 2).unwrap(); - assert_eq!(unproven, vec![BlockNumber::from(1u32), BlockNumber::from(2u32)]); -} - -#[test] -fn mark_block_proven_is_idempotent_for_in_sequence() { - let mut conn = create_db(); - - create_block(&mut conn, BlockNumber::GENESIS); - create_unproven_block(&mut conn, BlockNumber::from(1u32)); - - // First call marks block 1 proven and advances it in-sequence. - let new_tip = - super::mark_proven_and_advance_sequence(&mut conn, BlockNumber::from(1u32)).unwrap(); - assert_eq!(new_tip, BlockNumber::from(1u32)); - - let latest = queries::select_latest_proven_in_sequence_block_num(&mut conn).unwrap(); - assert_eq!(latest, BlockNumber::from(1u32)); -} diff --git a/crates/store/src/errors.rs b/crates/store/src/errors.rs index 12df2d5bee..b953526ca4 100644 --- a/crates/store/src/errors.rs +++ b/crates/store/src/errors.rs @@ -130,6 +130,8 @@ pub enum StateInitializationError { DataDirectoryLoadError(#[source] std::io::Error), #[error("failed to load block store")] BlockStoreLoadError(#[source] std::io::Error), + #[error("failed to load proven tip file")] + ProvenTipFileLoadError(#[source] std::io::Error), #[error("failed to load database")] DatabaseLoadError(#[source] DatabaseError), #[error("account state forest error")] diff --git a/crates/store/src/proven_tip.rs b/crates/store/src/proven_tip.rs index bde5bdbb9a..5df7d9922b 100644 --- a/crates/store/src/proven_tip.rs +++ b/crates/store/src/proven_tip.rs @@ -1,3 +1,5 @@ +use std::path::{Path, PathBuf}; + use miden_protocol::block::BlockNumber; use tokio::sync::watch; @@ -40,8 +42,58 @@ impl ProvenTipWriter { } } +// PROVEN TIP FILE +// ================================================================================================ + +/// File-backed store for the proven chain tip. +/// +/// Persists the proven-in-sequence tip as a little-endian `u32` (4 bytes) at the given path. +/// Writes are atomic: a temp file is written and renamed over the target. +/// +/// Multiple [`ProvenTipFile`] instances at the same path are safe as long as only one writer +/// is active at a time. +#[derive(Debug, Clone)] +pub struct ProvenTipFile { + path: PathBuf, +} + +impl ProvenTipFile { + /// Creates a new proven tip file initialised to the genesis block. + pub fn bootstrap(path: PathBuf) -> std::io::Result { + let file = Self { path }; + file.save(BlockNumber::GENESIS)?; + Ok(file) + } + + /// Opens an existing proven tip file and reads the stored tip. + pub fn load(path: PathBuf) -> std::io::Result<(Self, BlockNumber)> { + let tip = Self::read_from(&path)?; + Ok((Self { path }, tip)) + } + + /// Atomically writes `tip` to the file (write to temp, then rename). + pub fn save(&self, tip: BlockNumber) -> std::io::Result<()> { + let tmp = self.path.with_extension("tmp"); + fs_err::write(&tmp, tip.as_u32().to_le_bytes())?; + fs_err::rename(&tmp, &self.path) + } + + fn read_from(path: &Path) -> std::io::Result { + let bytes = fs_err::read(path)?; + let arr: [u8; 4] = bytes.try_into().map_err(|_| { + std::io::Error::new( + std::io::ErrorKind::InvalidData, + "proven tip file has unexpected size (expected 4 bytes)", + ) + })?; + Ok(BlockNumber::from(u32::from_le_bytes(arr))) + } +} + #[cfg(test)] mod tests { + use tempfile::tempdir; + use super::*; #[test] @@ -65,4 +117,105 @@ mod tests { writer.advance(BlockNumber::from(15u32)); assert_eq!(writer.read(), BlockNumber::from(15u32)); } + + // PROVEN TIP FILE TESTS + // ============================================================================================ + + fn load_tip(path: &std::path::Path) -> BlockNumber { + ProvenTipFile::load(path.to_path_buf()).unwrap().1 + } + + #[test] + fn bootstrap_initialises_to_genesis() { + let dir = tempdir().unwrap(); + let path = dir.path().join("proven_tip"); + + ProvenTipFile::bootstrap(path.clone()).unwrap(); + assert_eq!(load_tip(&path), BlockNumber::GENESIS); + } + + #[test] + fn save_persists_to_disk() { + let dir = tempdir().unwrap(); + let path = dir.path().join("proven_tip"); + let file = ProvenTipFile::bootstrap(path.clone()).unwrap(); + + for n in [1u32, 42, 1000, u32::MAX] { + let tip = BlockNumber::from(n); + file.save(tip).unwrap(); + assert_eq!(load_tip(&path), tip); + } + } + + #[test] + fn load_returns_last_saved_tip() { + let dir = tempdir().unwrap(); + let path = dir.path().join("proven_tip"); + + let file = ProvenTipFile::bootstrap(path.clone()).unwrap(); + let tip = BlockNumber::from(99u32); + file.save(tip).unwrap(); + drop(file); + + let (_, loaded_tip) = ProvenTipFile::load(path).unwrap(); + assert_eq!(loaded_tip, tip); + } + + #[test] + fn sequential_saves_preserve_latest() { + let dir = tempdir().unwrap(); + let path = dir.path().join("proven_tip"); + let file = ProvenTipFile::bootstrap(path.clone()).unwrap(); + + for n in 1u32..=10 { + file.save(BlockNumber::from(n)).unwrap(); + } + assert_eq!(load_tip(&path), BlockNumber::from(10u32)); + } + + #[test] + fn clone_writes_to_same_file() { + let dir = tempdir().unwrap(); + let path = dir.path().join("proven_tip"); + let file = ProvenTipFile::bootstrap(path.clone()).unwrap(); + let clone = file.clone(); + + file.save(BlockNumber::from(7u32)).unwrap(); + assert_eq!(load_tip(&path), BlockNumber::from(7u32)); + + clone.save(BlockNumber::from(13u32)).unwrap(); + assert_eq!(load_tip(&path), BlockNumber::from(13u32)); + } + + #[test] + fn load_missing_file_returns_error() { + let dir = tempdir().unwrap(); + let path = dir.path().join("does_not_exist"); + + let result = ProvenTipFile::load(path); + assert!(result.is_err()); + } + + #[test] + fn load_corrupt_file_returns_invalid_data() { + let dir = tempdir().unwrap(); + let path = dir.path().join("proven_tip"); + // Write 3 bytes instead of 4. + fs_err::write(&path, [0u8, 1, 2]).unwrap(); + + let err = ProvenTipFile::load(path).unwrap_err(); + assert_eq!(err.kind(), std::io::ErrorKind::InvalidData); + } + + #[test] + fn save_is_atomic_no_temp_file_remains() { + let dir = tempdir().unwrap(); + let path = dir.path().join("proven_tip"); + let file = ProvenTipFile::bootstrap(path.clone()).unwrap(); + + file.save(BlockNumber::from(5u32)).unwrap(); + + // The .tmp sidecar must not persist after a successful save. + assert!(!path.with_extension("tmp").exists()); + } } diff --git a/crates/store/src/server/mod.rs b/crates/store/src/server/mod.rs index 135e6fd34a..8147994f20 100644 --- a/crates/store/src/server/mod.rs +++ b/crates/store/src/server/mod.rs @@ -22,7 +22,7 @@ use crate::blocks::BlockStore; use crate::db::Db; use crate::errors::ApplyBlockError; use crate::genesis::GenesisBlock; -use crate::proven_tip::ProvenTipWriter; +use crate::proven_tip::{ProvenTipFile, ProvenTipWriter}; use crate::server::replica_sync::{BlockReplicaSync, ProofReplicaSync}; use crate::state::{ProofCache, State}; use crate::{BlockProver, COMPONENT}; @@ -97,13 +97,18 @@ impl Store { })?; tracing::info!(target=COMPONENT, path=%data_directory.display(), "Data directory loaded"); - let block_store = data_directory.block_store_dir(); + let block_store_path = data_directory.block_store_dir(); let block_store = - BlockStore::bootstrap(block_store.clone(), &genesis).with_context(|| { - format!("failed to bootstrap block store at {}", block_store.display()) + BlockStore::bootstrap(block_store_path.clone(), &genesis).with_context(|| { + format!("failed to bootstrap block store at {}", block_store_path.display()) })?; tracing::info!(target=COMPONENT, path=%block_store.display(), "Block store created"); + let proven_tip_path = data_directory.proven_tip_path(); + ProvenTipFile::bootstrap(proven_tip_path.clone()).with_context(|| { + format!("failed to bootstrap proven tip file at {}", proven_tip_path.display()) + })?; + // Create the genesis block and insert it into the database. let database_filepath = data_directory.database_path(); Db::bootstrap(database_filepath.clone(), genesis).with_context(|| { @@ -124,7 +129,7 @@ impl Store { let (termination_ask, mut termination_signal) = tokio::sync::mpsc::channel::(1); - let (state, tx_proven_tip) = + let (state, tx_proven_tip, proven_tip_file) = State::load(&self.data_directory, self.storage_options, termination_ask) .await .context("failed to load state")?; @@ -143,6 +148,7 @@ impl Store { block_prover_url, max_concurrent_proofs, tx_proven_tip, + proven_tip_file, self.grpc_options, self.rpc_listener, ) @@ -181,6 +187,7 @@ impl Store { block_prover_url: Option, max_concurrent_proofs: NonZeroUsize, tx_proven_tip: ProvenTipWriter, + proven_tip_file: ProvenTipFile, grpc_options: GrpcOptionsInternal, rpc_listener: TcpListener, ) -> anyhow::Result { @@ -195,6 +202,7 @@ impl Store { block_prover_url, max_concurrent_proofs, tx_proven_tip, + proven_tip_file, proof_cache, ) .await; @@ -257,6 +265,7 @@ impl Store { block_prover_url: Option, max_concurrent_proofs: NonZeroUsize, proven_tip: ProvenTipWriter, + proven_tip_file: ProvenTipFile, proof_cache: ProofCache, ) -> ( tokio::task::JoinHandle>, @@ -272,11 +281,11 @@ impl Store { let (chain_tip_tx, chain_tip_rx) = watch::channel(chain_tip); let handle = proof_scheduler::spawn( - state.db().clone(), block_prover, state.block_store(), chain_tip_rx, proven_tip, + proven_tip_file, max_concurrent_proofs, proof_cache, ); @@ -422,6 +431,10 @@ impl DataDirectory { self.0.join("miden-store.sqlite3") } + pub fn proven_tip_path(&self) -> PathBuf { + self.0.join("proven_tip") + } + pub fn display(&self) -> std::path::Display<'_> { self.0.display() } diff --git a/crates/store/src/server/proof_scheduler.rs b/crates/store/src/server/proof_scheduler.rs index b3b95f9e52..4ce4d9227f 100644 --- a/crates/store/src/server/proof_scheduler.rs +++ b/crates/store/src/server/proof_scheduler.rs @@ -2,14 +2,14 @@ //! //! The [`proof_scheduler`] is spawned as an internal Store task. It: //! -//! 1. Tracks `chain_tip` via a [`watch::Receiver`] and `latest_proven_block` locally. +//! 1. Tracks `chain_tip` via a [`watch::Receiver`] and `highest_scheduled` locally. //! 2. Maintains up to `max_concurrent_proofs` in-flight proving jobs via a [`JoinSet`]. //! 3. Blocks may be proven out of order since proving jobs run concurrently. Completed proofs are -//! buffered and committed to the block store and database in ascending block-number order. -//! 4. On transient errors (DB reads, prover failures, timeouts), the failed block is retried -//! internally within its proving task, subject to an overall per-block time budget. -//! 5. On fatal errors (e.g. deserialization failures, missing proving inputs), the scheduler -//! returns the error to the caller for node shutdown. +//! buffered and committed to the block store in ascending block-number order. +//! 4. On transient errors (prover failures, timeouts), the failed block is retried internally +//! within its proving task, subject to an overall per-block time budget. +//! 5. On fatal errors (e.g. missing proving inputs files), the scheduler returns the error to the +//! caller for node shutdown. use std::collections::BTreeMap; use std::num::NonZeroUsize; @@ -18,7 +18,9 @@ use std::time::Duration; use anyhow::Context; use miden_crypto::utils::Serializable; +use miden_node_proto::BlockProofRequest; use miden_protocol::block::{BlockNumber, BlockProof}; +use miden_protocol::utils::serde::Deserializable; use miden_remote_prover_client::RemoteProverClientError; use thiserror::Error; use tokio::sync::watch; @@ -27,9 +29,8 @@ use tracing::{Instrument, info, instrument}; use crate::COMPONENT; use crate::blocks::BlockStore; -use crate::db::Db; -use crate::errors::{DatabaseError, ProofSchedulerError}; -use crate::proven_tip::ProvenTipWriter; +use crate::errors::ProofSchedulerError; +use crate::proven_tip::{ProvenTipFile, ProvenTipWriter}; use crate::server::block_prover_client::{BlockProver, StoreProverError}; use crate::state::{ProofCache, ProofNotification}; @@ -62,10 +63,16 @@ impl ProofTaskJoinSet { } /// Spawns a new task to prove a block. - fn spawn(&mut self, db: &Arc, block_prover: &Arc, block_num: BlockNumber) { - let db = Arc::clone(db); + fn spawn( + &mut self, + block_store: &Arc, + block_prover: &Arc, + block_num: BlockNumber, + ) { + let block_store = Arc::clone(block_store); let block_prover = Arc::clone(block_prover); - self.0.spawn(async move { prove_block(&db, &block_prover, block_num).await }); + self.0 + .spawn(async move { prove_block(&block_store, &block_prover, block_num).await }); } /// Returns the result of the next completed task, or pends forever if the set is empty. @@ -88,27 +95,28 @@ impl ProofTaskJoinSet { /// Spawns the proof scheduler as a background tokio task. /// -/// The scheduler uses `chain_tip_rx` to learn about newly committed blocks and queries the DB -/// for unproven blocks to prove. After each proof is saved, the result is pushed into -/// `proof_cache` and the proven tip watch is advanced so replica subscribers are notified. +/// The scheduler uses `chain_tip_rx` to learn about newly committed blocks and checks the +/// block store for proving inputs files to determine which blocks need proving. After each proof +/// is saved, the result is pushed into `proof_cache` and the proven tip watch and file are +/// updated so replica subscribers are notified. /// /// Returns a [`JoinHandle`] that resolves when the scheduler encounters a fatal error or /// completes unexpectedly. pub fn spawn( - db: Arc, block_prover: Arc, block_store: Arc, chain_tip_rx: watch::Receiver, proven_tip: ProvenTipWriter, + proven_tip_file: ProvenTipFile, max_concurrent_proofs: NonZeroUsize, proof_cache: ProofCache, ) -> JoinHandle> { tokio::spawn(run( - db, block_prover, block_store, chain_tip_rx, proven_tip, + proven_tip_file, max_concurrent_proofs, proof_cache, )) @@ -119,16 +127,17 @@ pub fn spawn( /// Maintains a pool of concurrent proving jobs via [`JoinSet`], fills them up to /// `max_concurrent_proofs`, and drains completed results. /// -/// Unproven blocks are discovered by querying the database each iteration. +/// Unproven blocks are discovered by comparing the proven tip against the chain tip: every block +/// in the range `(proven_tip, chain_tip]` has a proving inputs file in the block store. /// -/// Returns `Err` on irrecoverable errors (missing/corrupt proving inputs, DB write failures). +/// Returns `Err` on irrecoverable errors (missing proving inputs, I/O failures). /// Transient errors are retried internally. async fn run( - db: Arc, block_prover: Arc, block_store: Arc, mut chain_tip_rx: watch::Receiver, proven_tip: ProvenTipWriter, + proven_tip_file: ProvenTipFile, max_concurrent_proofs: NonZeroUsize, proof_cache: ProofCache, ) -> anyhow::Result<()> { @@ -137,27 +146,24 @@ async fn run( // In-flight proving tasks. let mut proving_tasks = ProofTaskJoinSet::new(); - // Highest block number that is in-flight or has been proven. Used to avoid re-querying - // blocks we've already scheduled. Initialized from the in-sequence tip so we skip + // Highest block number that is in-flight or has been proven. Used to avoid re-scheduling + // blocks we've already dispatched. Initialized from the proven tip so we skip // already-proven blocks on restart. - let mut highest_scheduled = db.proven_chain_tip().await?; + let mut highest_scheduled = proven_tip.read(); // Completed proofs waiting to be committed in order. let mut pending: BTreeMap> = BTreeMap::new(); loop { - // Query the DB for unproven blocks beyond what we've already scheduled. - let capacity = max_concurrent_proofs.get() - proving_tasks.len(); - if capacity > 0 { - let unproven = db.select_unproven_blocks(highest_scheduled, capacity).await?; - - if let Some(&last) = unproven.last() { - highest_scheduled = last; - } - - for block_num in unproven { - proving_tasks.spawn(&db, &block_prover, block_num); + // Schedule blocks up to chain_tip that haven't been scheduled yet. + let chain_tip = *chain_tip_rx.borrow(); + while proving_tasks.len() < max_concurrent_proofs.get() { + let next = highest_scheduled.child(); + if next > chain_tip { + break; } + proving_tasks.spawn(&block_store, &block_prover, next); + highest_scheduled = next; } // Wait for either a job to complete or the chain tip to advance. @@ -171,13 +177,14 @@ async fn run( let mut next = proven_tip.read().child(); while let Some(proof_bytes) = pending.remove(&next) { block_store.save_proof(next, &proof_bytes).await?; - let tip = db.mark_proven_and_advance_sequence(next).await?; + block_store.delete_proving_inputs(next).await?; + proven_tip_file.save(next)?; proof_cache.push(next, ProofNotification::new(next, proof_bytes)); - proven_tip.advance(tip); + proven_tip.advance(next); next = next.child(); } }, - // New chain tip received — re-query for unproven blocks on next iteration. + // New chain tip received — re-enter the scheduling loop on next iteration. result = chain_tip_rx.changed() => { if result.is_err() { info!(target: COMPONENT, "Chain tip channel closed, proof scheduler exiting"); @@ -195,7 +202,7 @@ async fn run( #[instrument(target = COMPONENT, name = "prove_block", skip_all, fields(block.number=block_num.as_u32()), err)] async fn prove_block( - db: &Db, + block_store: &BlockStore, block_prover: &BlockProver, block_num: BlockNumber, ) -> anyhow::Result<(BlockNumber, Vec)> { @@ -213,7 +220,7 @@ async fn prove_block( let result = tokio::time::timeout( BLOCK_PROVE_ATTEMPT_TIMEOUT, - generate_block_proof(db, block_prover, block_num), + generate_block_proof(block_store, block_prover, block_num), ) .instrument(attempt_span.clone()) .await; @@ -240,23 +247,24 @@ async fn prove_block( ))? } -/// Generates a block proof by loading inputs from the DB and invoking the block prover. -/// -/// Records `block_commitment` on `parent_span` once the block header is available. +/// Generates a block proof by loading inputs from the block store and invoking the block prover. #[instrument(target = COMPONENT, name = "prove_block.generate", skip_all, fields(block.number=block_num.as_u32()), err)] async fn generate_block_proof( - db: &Db, + block_store: &BlockStore, block_prover: &BlockProver, block_num: BlockNumber, ) -> Result { - let request = db - .select_block_proving_inputs(block_num) + let bytes = block_store + .load_proving_inputs(block_num) .await - .map_err(ProveBlockError::from_db_error)? + .map_err(|e| ProveBlockError::Transient(e.into()))? .ok_or_else(|| { ProveBlockError::Fatal(ProofSchedulerError::MissingProvingInputs(block_num)) })?; + let request = BlockProofRequest::read_from_bytes(&bytes) + .map_err(|e| ProveBlockError::Fatal(ProofSchedulerError::DeserializationFailed(e)))?; + let proof = block_prover .prove(request.tx_batches, request.block_inputs, &request.block_header) .await @@ -274,21 +282,12 @@ enum ProveBlockError { /// An irrecoverable error that should cause node shutdown. #[error("fatal error")] Fatal(#[source] ProofSchedulerError), - /// A transient error (DB read, prover failure). The outer loop will retry. + /// A transient error (I/O, prover failure). The outer loop will retry. #[error("transient error: {0}")] Transient(Box), } impl ProveBlockError { - fn from_db_error(err: DatabaseError) -> Self { - match err { - DatabaseError::DeserializationError(err) => { - Self::Fatal(ProofSchedulerError::DeserializationFailed(err)) - }, - _ => Self::Transient(err.into()), - } - } - fn from_prover_error(err: StoreProverError) -> Self { match err { StoreProverError::RemoteProvingFailed(RemoteProverClientError::InvalidEndpoint( diff --git a/crates/store/src/state/apply_block.rs b/crates/store/src/state/apply_block.rs index c3cc48826a..233040c316 100644 --- a/crates/store/src/state/apply_block.rs +++ b/crates/store/src/state/apply_block.rs @@ -63,17 +63,23 @@ impl State { self.validate_block_header(header, body).await?; - // Save the block to the block store. In a case of a rolled-back DB transaction, the - // in-memory state will be unchanged, but the block might still be written into the - // block store. Thus, such block should be considered as block candidates, but not - // finalized blocks. So we should check for the latest block when getting block from - // the store. + // Save the block and proving inputs to the block store. In a case of a rolled-back DB + // transaction, the in-memory state will be unchanged, but these files might still be + // written. Such blocks should be considered candidates, not finalized blocks. let signed_block_bytes = signed_block.to_bytes(); // Clone before moving into the block-save task so we can cache for replicas at commit. let cache_bytes = signed_block_bytes.clone(); + let inputs_bytes = proving_inputs.as_ref().map(Serializable::to_bytes); let store = Arc::clone(&self.block_store); let block_save_task = tokio::spawn( - async move { store.save_block(block_num, &signed_block_bytes).await }.in_current_span(), + async move { + store.save_block(block_num, &signed_block_bytes).await?; + if let Some(bytes) = inputs_bytes { + store.save_proving_inputs(block_num, &bytes).await?; + } + Ok::<(), std::io::Error>(()) + } + .in_current_span(), ); let ( @@ -106,11 +112,8 @@ impl State { // spawned. let db = Arc::clone(&self.db); let db_update_task = tokio::spawn( - async move { - db.apply_block(allow_acquire, acquire_done, signed_block, notes, proving_inputs) - .await - } - .in_current_span(), + async move { db.apply_block(allow_acquire, acquire_done, signed_block, notes).await } + .in_current_span(), ); // Wait for the message from the DB update task, that we ready to commit the DB diff --git a/crates/store/src/state/apply_proof.rs b/crates/store/src/state/apply_proof.rs index 2cfd562228..fce480739a 100644 --- a/crates/store/src/state/apply_proof.rs +++ b/crates/store/src/state/apply_proof.rs @@ -15,9 +15,9 @@ impl State { proof_bytes: Vec, ) -> anyhow::Result<()> { self.block_store.save_proof(block_num, &proof_bytes).await?; - let tip = self.db.mark_proven_and_advance_sequence(block_num).await?; + self.proven_tip_file.save(block_num)?; self.proof_cache.push(block_num, ProofNotification::new(block_num, proof_bytes)); - self.proven_tip.advance(tip); + self.proven_tip.advance(block_num); Ok(()) } } diff --git a/crates/store/src/state/mod.rs b/crates/store/src/state/mod.rs index c52cccfe25..c75d8639bd 100644 --- a/crates/store/src/state/mod.rs +++ b/crates/store/src/state/mod.rs @@ -53,7 +53,7 @@ use crate::errors::{ GetCurrentBlockchainDataError, StateInitializationError, }; -use crate::proven_tip::ProvenTipWriter; +use crate::proven_tip::{ProvenTipFile, ProvenTipWriter}; use crate::{COMPONENT, DataDirectory}; /// Number of recent committed blocks held in the in-memory cache for replica subscriptions. @@ -153,6 +153,10 @@ pub struct State { /// `apply_proof`. proven_tip: ProvenTipWriter, + /// File-backed persistence for the proven tip. Updated alongside `proven_tip` whenever the + /// proven-in-sequence tip advances. + proven_tip_file: ProvenTipFile, + /// Watch sender fired after each block is committed. Replicas subscribe via /// `subscribe_committed_tip()` to be woken when new blocks arrive. committed_tip_tx: watch::Sender, @@ -172,14 +176,15 @@ impl State { /// Loads the state from the data directory. /// - /// Returns `(Self, ProvenTipWriter)`. The `ProvenTipWriter` is used by the proof scheduler to - /// advance the proven tip; callers can subscribe to tip changes via the methods on `Self`. + /// Returns `(Self, ProvenTipWriter, ProvenTipFile)`. The `ProvenTipWriter` and `ProvenTipFile` + /// are used by the proof scheduler (in block-producer mode) to advance and persist the proven + /// tip; callers can subscribe to tip changes via the methods on `Self`. #[instrument(target = COMPONENT, skip_all)] pub async fn load( data_path: &Path, storage_options: StorageOptions, termination_ask: tokio::sync::mpsc::Sender, - ) -> Result<(Self, ProvenTipWriter), StateInitializationError> { + ) -> Result<(Self, ProvenTipWriter, ProvenTipFile), StateInitializationError> { let data_directory = DataDirectory::load(data_path.to_path_buf()) .map_err(StateInitializationError::DataDirectoryLoadError)?; @@ -227,9 +232,10 @@ impl State { let writer = Mutex::new(()); let db = Arc::new(db); - // Initialize the proven tip from database. - let proven_tip_init = - db.proven_chain_tip().await.map_err(StateInitializationError::DatabaseError)?; + // Initialize the proven tip from the filesystem file. + let proven_tip_path = data_directory.proven_tip_path(); + let (proven_tip_file, proven_tip_init) = ProvenTipFile::load(proven_tip_path) + .map_err(StateInitializationError::ProvenTipFileLoadError)?; let (proven_tip, _rx) = ProvenTipWriter::new(proven_tip_init); // Committed-tip watch: fires after each successful apply_block. @@ -244,19 +250,16 @@ impl State { writer, termination_ask, proven_tip: proven_tip.clone(), + proven_tip_file: proven_tip_file.clone(), committed_tip_tx, block_cache: BlockCache::new(BLOCK_CACHE_CAPACITY), proof_cache: ProofCache::new(PROOF_CACHE_CAPACITY), }, proven_tip, + proven_tip_file, )) } - /// Returns the database. - pub(crate) fn db(&self) -> Arc { - Arc::clone(&self.db) - } - /// Returns the block store. pub(crate) fn block_store(&self) -> Arc { Arc::clone(&self.block_store) From 272ca3f660f97a5953a638e3f8de30d1934ac051 Mon Sep 17 00:00:00 2001 From: sergerad Date: Wed, 13 May 2026 14:07:25 +1200 Subject: [PATCH 2/4] Reorder proof save and inputs delete --- crates/store/src/server/proof_scheduler.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/store/src/server/proof_scheduler.rs b/crates/store/src/server/proof_scheduler.rs index 4ce4d9227f..680a637d6d 100644 --- a/crates/store/src/server/proof_scheduler.rs +++ b/crates/store/src/server/proof_scheduler.rs @@ -176,9 +176,11 @@ async fn run( // Commit all consecutive proofs in ascending order. let mut next = proven_tip.read().child(); while let Some(proof_bytes) = pending.remove(&next) { + // Save the proof and tip file before deleting the proving inputs. block_store.save_proof(next, &proof_bytes).await?; - block_store.delete_proving_inputs(next).await?; proven_tip_file.save(next)?; + block_store.delete_proving_inputs(next).await?; + // Notify subscribers of the proof. proof_cache.push(next, ProofNotification::new(next, proof_bytes)); proven_tip.advance(next); next = next.child(); From cf33bac48f41b268042f4818be635a96f90589ea Mon Sep 17 00:00:00 2001 From: sergerad Date: Wed, 13 May 2026 14:27:04 +1200 Subject: [PATCH 3/4] Remove save proving inputs from apply block pipeline --- crates/store/src/server/block_producer.rs | 9 +++++-- crates/store/src/server/replica_sync.rs | 2 +- crates/store/src/state/apply_block.rs | 32 +++++++++++------------ 3 files changed, 23 insertions(+), 20 deletions(-) diff --git a/crates/store/src/server/block_producer.rs b/crates/store/src/server/block_producer.rs index 1cbc4126ed..c19ddfe7fe 100644 --- a/crates/store/src/server/block_producer.rs +++ b/crates/store/src/server/block_producer.rs @@ -101,6 +101,12 @@ impl block_producer_server::BlockProducer for BlockProducerApi { block_header: header.clone(), block_inputs, }; + let block_num = header.block_num(); + self.inner + .state + .save_proving_inputs(block_num, &proving_inputs) + .await + .map_err(|err| Status::new(tonic::Code::Internal, err.as_report()))?; // We perform the apply block work in a separate task. This prevents the caller // cancelling the request and thereby cancelling the task at an arbitrary point of @@ -111,14 +117,13 @@ impl block_producer_server::BlockProducer for BlockProducerApi { let this = self.clone(); tokio::spawn( async move { - let block_num = header.block_num(); let signed_block = SignedBlock::new(header, body, signature) .map_err(|err| Status::new(tonic::Code::Internal, err.as_report()))?; // Note: This is an internal endpoint, so its safe to expose the full error // report. this.inner .state - .apply_block(signed_block, Some(proving_inputs)) + .apply_block(signed_block) .await .inspect(|_| { if let Err(err) = this.chain_tip_sender.send(block_num) { diff --git a/crates/store/src/server/replica_sync.rs b/crates/store/src/server/replica_sync.rs index b4d33259be..0bb0553443 100644 --- a/crates/store/src/server/replica_sync.rs +++ b/crates/store/src/server/replica_sync.rs @@ -111,7 +111,7 @@ impl ReplicaSync for BlockReplicaSync { let event = result?; let block = SignedBlock::read_from_bytes(&event.block) .context("failed to deserialize block from upstream")?; - self.state.apply_block(block, None).await?; + self.state.apply_block(block).await?; } Ok(()) diff --git a/crates/store/src/state/apply_block.rs b/crates/store/src/state/apply_block.rs index 233040c316..787100d25b 100644 --- a/crates/store/src/state/apply_block.rs +++ b/crates/store/src/state/apply_block.rs @@ -1,12 +1,12 @@ use std::sync::Arc; -use miden_node_proto::BlockProofRequest; +use miden_node_proto::domain::proof_request::BlockProofRequest; use miden_node_utils::ErrorReport; use miden_protocol::Word; use miden_protocol::account::delta::AccountUpdateDetails; use miden_protocol::block::account_tree::AccountMutationSet; use miden_protocol::block::nullifier_tree::NullifierMutationSet; -use miden_protocol::block::{BlockBody, BlockHeader, SignedBlock}; +use miden_protocol::block::{BlockBody, BlockHeader, BlockNumber, SignedBlock}; use miden_protocol::note::{NoteDetails, Nullifier}; use miden_protocol::transaction::OutputNote; use miden_protocol::utils::serde::Serializable; @@ -43,16 +43,9 @@ impl State { /// - the in-memory structures are updated, including the latest block pointer and the lock is /// released. /// - /// # Errors - /// - /// Returns an error if `proving_inputs` is `None` and the block is not the genesis block. // TODO: This span is logged in a root span, we should connect it to the parent span. #[instrument(target = COMPONENT, skip_all, err)] - pub async fn apply_block( - &self, - signed_block: SignedBlock, - proving_inputs: Option, - ) -> Result<(), ApplyBlockError> { + pub async fn apply_block(&self, signed_block: SignedBlock) -> Result<(), ApplyBlockError> { let _lock = self.writer.try_lock().map_err(|_| ApplyBlockError::ConcurrentWrite)?; let header = signed_block.header(); @@ -63,20 +56,16 @@ impl State { self.validate_block_header(header, body).await?; - // Save the block and proving inputs to the block store. In a case of a rolled-back DB - // transaction, the in-memory state will be unchanged, but these files might still be - // written. Such blocks should be considered candidates, not finalized blocks. + // Save the block to the block store. In a case of a rolled-back DB transaction, the + // in-memory state will be unchanged, but the file might still be written. Such blocks + // should be considered candidates, not finalized blocks. let signed_block_bytes = signed_block.to_bytes(); // Clone before moving into the block-save task so we can cache for replicas at commit. let cache_bytes = signed_block_bytes.clone(); - let inputs_bytes = proving_inputs.as_ref().map(Serializable::to_bytes); let store = Arc::clone(&self.block_store); let block_save_task = tokio::spawn( async move { store.save_block(block_num, &signed_block_bytes).await?; - if let Some(bytes) = inputs_bytes { - store.save_proving_inputs(block_num, &bytes).await?; - } Ok::<(), std::io::Error>(()) } .in_current_span(), @@ -193,6 +182,15 @@ impl State { Ok(()) } + /// Saves the proving inputs for the given block to the block store. + pub async fn save_proving_inputs( + &self, + block_num: BlockNumber, + proving_inputs: &BlockProofRequest, + ) -> std::io::Result<()> { + self.block_store.save_proving_inputs(block_num, &proving_inputs.to_bytes()).await + } + /// Validates that the block header is consistent with the block body and the current state. #[instrument(target = COMPONENT, skip_all, err)] async fn validate_block_header( From dd223f99dd6d392d19122800df4fb28676c1cb30 Mon Sep 17 00:00:00 2001 From: sergerad Date: Wed, 13 May 2026 14:30:00 +1200 Subject: [PATCH 4/4] RM unnecessary err --- crates/store/src/state/apply_block.rs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/crates/store/src/state/apply_block.rs b/crates/store/src/state/apply_block.rs index 787100d25b..4d0a20c190 100644 --- a/crates/store/src/state/apply_block.rs +++ b/crates/store/src/state/apply_block.rs @@ -42,7 +42,6 @@ impl State { /// use the fresh data. /// - the in-memory structures are updated, including the latest block pointer and the lock is /// released. - /// // TODO: This span is logged in a root span, we should connect it to the parent span. #[instrument(target = COMPONENT, skip_all, err)] pub async fn apply_block(&self, signed_block: SignedBlock) -> Result<(), ApplyBlockError> { @@ -64,11 +63,7 @@ impl State { let cache_bytes = signed_block_bytes.clone(); let store = Arc::clone(&self.block_store); let block_save_task = tokio::spawn( - async move { - store.save_block(block_num, &signed_block_bytes).await?; - Ok::<(), std::io::Error>(()) - } - .in_current_span(), + async move { store.save_block(block_num, &signed_block_bytes).await }.in_current_span(), ); let ( @@ -188,7 +183,9 @@ impl State { block_num: BlockNumber, proving_inputs: &BlockProofRequest, ) -> std::io::Result<()> { - self.block_store.save_proving_inputs(block_num, &proving_inputs.to_bytes()).await + self.block_store + .save_proving_inputs(block_num, &proving_inputs.to_bytes()) + .await } /// Validates that the block header is consistent with the block body and the current state.