diff --git a/Cargo.toml b/Cargo.toml index c34aab4..a461c5d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -92,6 +92,7 @@ alloy-primitives = "0.8" alloy-consensus = "0.9" alloy-rlp = "0.3" alloy-eips = "0.9" +bytes = "1.5" rusqlite = { version = "0.38", features = ["bundled"] } [workspace.lints.rust] # missing_docs = "deny" diff --git a/crates/app/node/Cargo.toml b/crates/app/node/Cargo.toml index d2f5d10..eb922f6 100644 --- a/crates/app/node/Cargo.toml +++ b/crates/app/node/Cargo.toml @@ -9,6 +9,7 @@ rust-version.workspace = true description = "Reusable dev-node runner for Evolve applications." [dependencies] +bytes = { workspace = true } commonware-runtime = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } diff --git a/crates/app/node/src/lib.rs b/crates/app/node/src/lib.rs index dd690f5..0def792 100644 --- a/crates/app/node/src/lib.rs +++ b/crates/app/node/src/lib.rs @@ -20,16 +20,20 @@ use borsh::{BorshDeserialize, BorshSerialize}; use commonware_runtime::tokio::{Config as TokioConfig, Context as TokioContext, Runner}; use commonware_runtime::{Runner as RunnerTrait, Spawner}; use evolve_chain_index::{ChainStateProvider, ChainStateProviderConfig, PersistentChainIndex}; +use evolve_core::encoding::Encodable; use evolve_core::ReadonlyKV; use evolve_eth_jsonrpc::{start_server_with_subscriptions, RpcServerConfig, SubscriptionManager}; use evolve_mempool::{new_shared_mempool, Mempool, MempoolTx, SharedMempool}; use evolve_rpc_types::SyncStatus; -use evolve_server::StfExecutor; use evolve_server::{ load_chain_state, save_chain_state, ChainState, DevConfig, DevConsensus, CHAIN_STATE_KEY, }; +use evolve_server::{OnBlockArchive, StfExecutor}; use evolve_stf_traits::{AccountsCodeStorage, StateChange, Transaction}; -use evolve_storage::{MockStorage, Operation, Storage, StorageConfig}; +use evolve_storage::types::BlockHash as ArchiveBlockHash; +use evolve_storage::{ + BlockStorage, BlockStorageConfig, MockStorage, Operation, Storage, StorageConfig, +}; use evolve_tx_eth::TxContext; use std::future::Future; @@ -84,7 +88,7 @@ pub fn build_dev_node_with_mempool( config: DevConfig, ) -> DevNodeMempoolHandles where - Tx: Transaction + MempoolTx + Send + Sync + 'static, + Tx: Transaction + MempoolTx + Encodable + Send + Sync + 'static, S: ReadonlyKV + Storage + Clone + Send + Sync + 'static, Codes: AccountsCodeStorage + Send + Sync + 'static, Stf: StfExecutor + Send + Sync + 'static, @@ -163,6 +167,47 @@ pub struct GenesisOutput { type RuntimeContext = TokioContext; +/// Build the block archive callback. +/// +/// Creates a `BlockStorage` backed by the commonware archive and returns +/// an `OnBlockArchive` callback that writes each produced block into it. +/// +/// # Panics +/// +/// Panics if block storage initialization fails. Block archival is a required +/// subsystem — all produced blocks must be persisted. +async fn build_block_archive(context: TokioContext) -> OnBlockArchive { + let config = BlockStorageConfig::default(); + let store = BlockStorage::new(context, config) + .await + .expect("failed to initialize block archive storage"); + + let (tx, mut rx) = tokio::sync::mpsc::channel::<(u64, ArchiveBlockHash, bytes::Bytes)>(64); + + // Single consumer task ensures blocks are written in order. + tokio::spawn(async move { + let mut store = store; + while let Some((block_number, block_hash, block_bytes)) = rx.recv().await { + if let Err(e) = store.put_sync(block_number, block_hash, block_bytes).await { + tracing::warn!("Failed to archive block {}: {:?}", block_number, e); + } + } + }); + + tracing::info!("Block archive storage enabled"); + + Arc::new(move |block_number, block_hash, block_bytes| { + let hash_bytes = ArchiveBlockHash::new(block_hash.0); + if let Err(e) = tx.try_send((block_number, hash_bytes, block_bytes)) { + tracing::warn!( + "Block archive channel full or closed for block {}: {}", + block_number, + e + ); + } + }) +} + /// Run the dev node with default settings (RPC enabled). pub fn run_dev_node< Stf, @@ -184,7 +229,7 @@ pub fn run_dev_node< run_genesis: RunGenesis, build_storage: BuildStorage, ) where - Tx: Transaction + MempoolTx + Send + Sync + 'static, + Tx: Transaction + MempoolTx + Encodable + Send + Sync + 'static, Codes: AccountsCodeStorage + Send + Sync + 'static, S: ReadonlyKV + Storage + Clone + Send + Sync + 'static, Stf: StfExecutor + Send + Sync + 'static, @@ -233,7 +278,7 @@ pub fn run_dev_node_with_rpc< build_storage: BuildStorage, rpc_config: RpcConfig, ) where - Tx: Transaction + MempoolTx + Send + Sync + 'static, + Tx: Transaction + MempoolTx + Encodable + Send + Sync + 'static, Codes: AccountsCodeStorage + Send + Sync + 'static, S: ReadonlyKV + Storage + Clone + Send + Sync + 'static, Stf: StfExecutor + Send + Sync + 'static, @@ -283,6 +328,7 @@ pub fn run_dev_node_with_rpc< async move { // Clone context early since build_storage takes ownership let context_for_shutdown = context.clone(); + let context_for_archive = context.clone(); let storage = (build_storage)(context, storage_config) .await .expect("failed to create storage"); @@ -323,6 +369,9 @@ pub fn run_dev_node_with_rpc< ..Default::default() }; + // Build block archive callback (always on) + let archive_cb = build_block_archive(context_for_archive).await; + // Set up RPC infrastructure if enabled let rpc_handle = if rpc_config.enabled { // Create chain index backed by SQLite @@ -369,17 +418,18 @@ pub fn run_dev_node_with_rpc< .expect("failed to start RPC server"); // Create DevConsensus with RPC support - let dev: Arc> = Arc::new( - DevConsensus::with_rpc( - stf, - storage, - codes, - dev_config, - chain_index, - subscriptions, - ) - .with_indexing_enabled(rpc_config.enable_block_indexing), - ); + let consensus = DevConsensus::with_rpc( + stf, + storage, + codes, + dev_config, + chain_index, + subscriptions, + ) + .with_indexing_enabled(rpc_config.enable_block_indexing) + .with_block_archive(archive_cb); + let dev: Arc> = + Arc::new(consensus); tracing::info!( "Block interval: {:?}, starting at height {}", @@ -421,8 +471,10 @@ pub fn run_dev_node_with_rpc< Some(handle) } else { // No RPC - use simple DevConsensus + let consensus = DevConsensus::new(stf, storage, codes, dev_config) + .with_block_archive(archive_cb); let dev: Arc> = - Arc::new(DevConsensus::new(stf, storage, codes, dev_config)); + Arc::new(consensus); tracing::info!( "Block interval: {:?}, starting at height {}", @@ -501,7 +553,7 @@ pub fn run_dev_node_with_rpc_and_mempool< build_storage: BuildStorage, rpc_config: RpcConfig, ) where - Tx: Transaction + MempoolTx + Send + Sync + 'static, + Tx: Transaction + MempoolTx + Encodable + Send + Sync + 'static, Codes: AccountsCodeStorage + Send + Sync + 'static, S: ReadonlyKV + Storage + Clone + Send + Sync + 'static, Stf: StfExecutor + Send + Sync + 'static, @@ -711,6 +763,7 @@ pub fn run_dev_node_with_rpc_and_mempool_eth< async move { let context_for_shutdown = context.clone(); + let context_for_archive = context.clone(); let storage = (build_storage)(context, storage_config) .await .expect("failed to create storage"); @@ -751,6 +804,9 @@ pub fn run_dev_node_with_rpc_and_mempool_eth< let mempool: SharedMempool> = new_shared_mempool(); + // Build block archive callback (always on) + let archive_cb = build_block_archive(context_for_archive).await; + let rpc_handle = if rpc_config.enabled { let chain_index = Arc::new( PersistentChainIndex::new(&chain_index_db_path) @@ -791,19 +847,19 @@ pub fn run_dev_node_with_rpc_and_mempool_eth< .await .expect("failed to start RPC server"); + let consensus = DevConsensus::with_rpc_and_mempool( + stf, + storage, + codes, + dev_config, + chain_index, + subscriptions, + mempool.clone(), + ) + .with_indexing_enabled(rpc_config.enable_block_indexing) + .with_block_archive(archive_cb); let dev: Arc> = - Arc::new( - DevConsensus::with_rpc_and_mempool( - stf, - storage, - codes, - dev_config, - chain_index, - subscriptions, - mempool.clone(), - ) - .with_indexing_enabled(rpc_config.enable_block_indexing), - ); + Arc::new(consensus); tracing::info!( "Block interval: {:?}, max_txs_per_block: {}, starting at height {}", @@ -841,8 +897,10 @@ pub fn run_dev_node_with_rpc_and_mempool_eth< Some(handle) } else { + let consensus = DevConsensus::with_mempool(stf, storage, codes, dev_config, mempool) + .with_block_archive(archive_cb); let dev: Arc> = - Arc::new(DevConsensus::with_mempool(stf, storage, codes, dev_config, mempool)); + Arc::new(consensus); tracing::info!( "Block interval: {:?}, max_txs_per_block: {}, starting at height {}", @@ -952,7 +1010,7 @@ pub fn init_dev_node< run_genesis: RunGenesis, build_storage: BuildStorage, ) where - Tx: Transaction + MempoolTx + Send + Sync + 'static, + Tx: Transaction + MempoolTx + Encodable + Send + Sync + 'static, Codes: AccountsCodeStorage + Send + Sync + 'static, S: ReadonlyKV + Storage + Clone + Send + Sync + 'static, Stf: StfExecutor + Send + Sync + 'static, diff --git a/crates/app/server/Cargo.toml b/crates/app/server/Cargo.toml index 43e49f3..444c35c 100644 --- a/crates/app/server/Cargo.toml +++ b/crates/app/server/Cargo.toml @@ -19,6 +19,7 @@ evolve_tx_eth = { workspace = true } alloy-primitives = { workspace = true } borsh = { workspace = true } +bytes = { workspace = true } tokio = { workspace = true } async-trait = { workspace = true } tracing = { workspace = true } diff --git a/crates/app/server/src/block.rs b/crates/app/server/src/block.rs index f52db71..5a82d48 100644 --- a/crates/app/server/src/block.rs +++ b/crates/app/server/src/block.rs @@ -4,6 +4,8 @@ //! working with the Evolve STF. use alloy_primitives::{Address, Bloom, Bytes, B256, B64, U256}; +use borsh::{BorshDeserialize, BorshSerialize}; +use evolve_core::encoding::Encodable; use evolve_core::BlockContext; use evolve_stf_traits::Block as BlockTrait; @@ -170,6 +172,52 @@ impl BlockTrait for Block { } } +/// Borsh-serializable block snapshot for archive storage. +/// +/// Contains the essential block data needed for P2P sync and block serving. +/// Transactions are stored as their encoded byte representation. +#[derive(BorshSerialize, BorshDeserialize)] +pub struct ArchivedBlock { + pub number: u64, + pub timestamp: u64, + pub parent_hash: [u8; 32], + pub state_root: [u8; 32], + pub block_hash: [u8; 32], + pub gas_limit: u64, + pub gas_used: u64, + pub transactions: Vec>, +} + +impl Block { + /// Create an archived snapshot of this block for storage. + /// + /// Encodes each transaction via `Encodable::encode()` and captures + /// the block hash, state root, and gas used from execution results. + pub fn to_archived( + &self, + block_hash: B256, + state_root: B256, + gas_used: u64, + ) -> Result { + let transactions: Vec> = self + .transactions + .iter() + .map(|tx| tx.encode()) + .collect::, _>>()?; + + Ok(ArchivedBlock { + number: self.header.number, + timestamp: self.header.timestamp, + parent_hash: self.header.parent_hash.0, + state_root: state_root.0, + block_hash: block_hash.0, + gas_limit: self.header.gas_limit, + gas_used, + transactions, + }) + } +} + /// Builder for creating blocks. #[derive(Debug)] pub struct BlockBuilder { diff --git a/crates/app/server/src/dev.rs b/crates/app/server/src/dev.rs index dd0fe70..6098093 100644 --- a/crates/app/server/src/dev.rs +++ b/crates/app/server/src/dev.rs @@ -38,6 +38,7 @@ use crate::error::ServerError; use alloy_primitives::{Address, B256}; use commonware_runtime::Spawner; use evolve_chain_index::{build_index_data, BlockMetadata, ChainIndex}; +use evolve_core::encoding::Encodable; use evolve_core::ReadonlyKV; use evolve_eth_jsonrpc::SharedSubscriptionManager; use evolve_mempool::{Mempool, MempoolTx, SharedMempool}; @@ -52,6 +53,11 @@ use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tokio::sync::RwLock; +/// Callback invoked after each block is produced with its archived bytes. +/// +/// Arguments: block number, block hash, borsh-encoded `ArchivedBlock` bytes. +pub type OnBlockArchive = Arc; + /// Configuration for dev mode block production. #[derive(Debug, Clone)] pub struct DevConfig { @@ -172,6 +178,8 @@ pub struct DevConsensus { subscriptions: Option, /// Whether block indexing is enabled on block production. index_blocks: bool, + /// Optional callback for archiving block data. + on_block_archive: Option, /// Phantom for Tx type. _tx: std::marker::PhantomData, } @@ -197,6 +205,7 @@ impl DevConsensus DevConsensus { chain_index: Some(chain_index), subscriptions: Some(subscriptions), index_blocks: true, + on_block_archive: None, _tx: std::marker::PhantomData, } } @@ -257,6 +267,7 @@ impl DevConsensus { chain_index: Some(chain_index), subscriptions: Some(subscriptions), index_blocks: true, + on_block_archive: None, _tx: std::marker::PhantomData, } } @@ -291,6 +302,7 @@ impl DevConsensus { chain_index: None, subscriptions: None, index_blocks: false, + on_block_archive: None, _tx: std::marker::PhantomData, } } @@ -340,11 +352,17 @@ impl DevConsensus { self.index_blocks = enabled; self } + + /// Set a callback for archiving block data after production. + pub fn with_block_archive(mut self, cb: OnBlockArchive) -> Self { + self.on_block_archive = Some(cb); + self + } } impl DevConsensus where - Tx: Transaction + MempoolTx + Send + Sync + 'static, + Tx: Transaction + MempoolTx + Encodable + Send + Sync + 'static, S: ReadonlyKV + Storage + Clone + Send + Sync + 'static, Codes: AccountsCodeStorage + Send + Sync + 'static, Stf: StfExecutor + Send + Sync + 'static, @@ -413,6 +431,31 @@ where *self.state.last_hash.write().await = block_hash; self.state.last_timestamp.store(timestamp, Ordering::SeqCst); + // Archive the block if a callback is configured. + // Runs off the hot path via a spawned task (fire-and-forget). + if let Some(ref cb) = self.on_block_archive { + match block.to_archived(block_hash, state_root, gas_used) { + Ok(archived) => { + if let Ok(encoded) = borsh::to_vec(&archived) { + let cb = Arc::clone(cb); + let archived_bytes = bytes::Bytes::from(encoded); + tokio::spawn(async move { + cb(height, block_hash, archived_bytes); + }); + } else { + tracing::warn!("Failed to borsh-encode archived block {}", height); + } + } + Err(e) => { + tracing::warn!( + "Failed to encode transactions for block {}: {:?}", + height, + e + ); + } + } + } + // Index the block for RPC queries if chain index is configured. // This runs off the block-production hot path so indexing I/O does not delay // execution of subsequent blocks. @@ -542,7 +585,7 @@ where /// Works with any transaction type that implements `MempoolTx`. impl DevConsensus where - Tx: Transaction + MempoolTx + Clone + Send + Sync + 'static, + Tx: Transaction + MempoolTx + Encodable + Clone + Send + Sync + 'static, S: ReadonlyKV + Storage + Clone + Send + Sync + 'static, Codes: AccountsCodeStorage + Send + Sync + 'static, Stf: StfExecutor + Send + Sync + 'static, diff --git a/crates/app/server/src/lib.rs b/crates/app/server/src/lib.rs index fa21e33..8ca2f9d 100644 --- a/crates/app/server/src/lib.rs +++ b/crates/app/server/src/lib.rs @@ -27,8 +27,10 @@ pub mod dev; mod error; mod persistence; -pub use block::{Block, BlockBuilder, BlockHeader}; -pub use dev::{DevConfig, DevConsensus, NoopChainIndex, ProducedBlock, StfExecutor}; +pub use block::{ArchivedBlock, Block, BlockBuilder, BlockHeader}; +pub use dev::{ + DevConfig, DevConsensus, NoopChainIndex, OnBlockArchive, ProducedBlock, StfExecutor, +}; pub use error::ServerError; pub use evolve_mempool::{ new_shared_mempool, Mempool, MempoolError, MempoolResult, MempoolTx, SharedMempool, diff --git a/crates/storage/Cargo.toml b/crates/storage/Cargo.toml index 883c3eb..09d151e 100644 --- a/crates/storage/Cargo.toml +++ b/crates/storage/Cargo.toml @@ -7,7 +7,7 @@ version = "0.1.0" [dependencies] evolve_core.workspace = true async-trait = "0.1" -bytes = "1.5" +bytes = { workspace = true } commonware-codec.workspace = true commonware-cryptography.workspace = true commonware-runtime.workspace = true diff --git a/crates/storage/src/block_store.rs b/crates/storage/src/block_store.rs new file mode 100644 index 0000000..caa5bfa --- /dev/null +++ b/crates/storage/src/block_store.rs @@ -0,0 +1,598 @@ +//! Archive-backed block storage that is independent of the QMDB state root. +//! +//! ## Design Rationale +//! +//! QMDB is used for application state (account data, balances, etc.) and produces +//! a Merkle root (`CommitHash`) on every `commit()` call. Any KV pair written via +//! QMDB's `batch()` + `commit()` affects this Merkle root. +//! +//! Block data (headers, transaction lists, receipts) must NOT affect the app hash. +//! This is achieved by using `commonware_storage::archive::prunable::Archive` as a +//! completely separate storage backend. +//! +//! The archive: +//! - Uses two partitions (key journal + value blob) that are independent from QMDB +//! - Does not participate in any Merkle computation +//! - Supports lookups by block number (`u64`) or block hash (`[u8; 32]`) +//! - Supports pruning old blocks by minimum block number +//! +//! ## State Machine +//! +//! The archive is a single mutable struct (not a state machine like QMDB). +//! Writes require `&mut self`; reads require `&self`. +//! +//! ```text +//! BlockStorage::new() → archive initialized +//! put(block_num, block_hash, block_bytes) → appended to journal +//! get_by_number(block_num) → reads from journal +//! get_by_hash(block_hash) → reads from journal via in-memory key index +//! prune(min_block) → removes sections older than min_block +//! sync() → fsync pending writes +//! ``` + +use crate::types::{BlockHash, BlockStorageConfig}; +use commonware_codec::RangeCfg; +use commonware_runtime::{buffer::PoolRef, Clock, Metrics, Storage as RStorage}; +use commonware_storage::{ + archive::{ + prunable::{Archive, Config as ArchiveConfig}, + Archive as ArchiveTrait, Identifier, + }, + translator::EightCap, +}; +use std::num::NonZeroUsize; +use thiserror::Error; + +/// Error types for block storage operations. +#[derive(Debug, Error)] +pub enum BlockStorageError { + #[error("archive error: {0}")] + Archive(#[from] commonware_storage::archive::Error), + + #[error("invalid configuration: {0}")] + InvalidConfig(String), +} + +/// Page size for the key journal buffer pool (bytes). +const KEY_JOURNAL_PAGE_SIZE: u16 = 4096; + +/// Number of cached pages in the key journal buffer pool. +/// Total cache: KEY_JOURNAL_PAGE_SIZE * KEY_JOURNAL_CACHE_PAGES = 256KB by default. +const KEY_JOURNAL_CACHE_PAGES: usize = 64; + +/// Archive-backed block storage. +/// +/// Stores block data indexed by both block number and block hash. Independent +/// of QMDB — writes here have no effect on the app hash / commit hash. +/// +/// Block data is stored as raw bytes (`bytes::Bytes`). The caller is responsible +/// for serializing and deserializing block contents. +/// +/// # Thread Safety +/// +/// `BlockStorage` requires `&mut self` for writes and `&self` for reads. It +/// should be wrapped in a lock (`tokio::sync::Mutex`) when shared across tasks. +pub struct BlockStorage +where + C: RStorage + Clock + Metrics + Clone + Send + Sync + 'static, +{ + archive: Archive, +} + +impl BlockStorage +where + C: RStorage + Clock + Metrics + Clone + Send + Sync + 'static, +{ + /// Initialize block storage using the provided runtime context and config. + /// + /// If block data was previously stored, the in-memory index is rebuilt from + /// the key journal on startup (no value reads are performed during init). + pub async fn new(context: C, config: BlockStorageConfig) -> Result { + let blocks_per_section = + std::num::NonZeroU64::new(config.blocks_per_section).ok_or_else(|| { + BlockStorageError::InvalidConfig("blocks_per_section must be non-zero".to_string()) + })?; + + let key_write_buffer = NonZeroUsize::new(config.key_write_buffer).ok_or_else(|| { + BlockStorageError::InvalidConfig("key_write_buffer must be non-zero".to_string()) + })?; + + let value_write_buffer = NonZeroUsize::new(config.value_write_buffer).ok_or_else(|| { + BlockStorageError::InvalidConfig("value_write_buffer must be non-zero".to_string()) + })?; + + let replay_buffer = NonZeroUsize::new(config.replay_buffer).ok_or_else(|| { + BlockStorageError::InvalidConfig("replay_buffer must be non-zero".to_string()) + })?; + + // Buffer pool for the key journal. + let page_size = std::num::NonZeroU16::new(KEY_JOURNAL_PAGE_SIZE).unwrap(); + let cache_pages = std::num::NonZeroUsize::new(KEY_JOURNAL_CACHE_PAGES).unwrap(); + let key_buffer_pool = PoolRef::new(page_size, cache_pages); + + let cfg = ArchiveConfig { + translator: EightCap, + key_partition: format!("{}-block-index", config.partition_prefix), + key_buffer_pool, + value_partition: format!("{}-block-data", config.partition_prefix), + // No compression by default. Blocks are often already compressed (gzip/zstd + // at the application layer), so double-compression wastes CPU. + compression: None, + // `bytes::Bytes` uses `RangeCfg` as its codec config. + // An unbounded range accepts blocks of any size. + codec_config: RangeCfg::from(..), + items_per_section: blocks_per_section, + key_write_buffer, + value_write_buffer, + replay_buffer, + }; + + let archive = Archive::init(context, cfg).await?; + + Ok(Self { archive }) + } + + /// Store a block by its block number and hash. + /// + /// Both `block_number` and `block_hash` are assumed to be globally unique. + /// If `block_number` already exists, this is a no-op (idempotent). + /// + /// Returns an error if the block number is older than the current prune horizon. + #[allow(clippy::disallowed_types)] // Instant is for metrics only, not consensus. + pub async fn put( + &mut self, + block_number: u64, + block_hash: BlockHash, + block_bytes: bytes::Bytes, + ) -> Result<(), BlockStorageError> { + let start = std::time::Instant::now(); + self.archive + .put(block_number, block_hash, block_bytes) + .await?; + tracing::debug!( + block_number, + elapsed_us = start.elapsed().as_micros(), + "block stored" + ); + Ok(()) + } + + /// Retrieve block bytes by block number. + /// + /// Returns `None` if the block number is not present (never stored, or pruned). + pub async fn get_by_number( + &self, + block_number: u64, + ) -> Result, BlockStorageError> { + Ok(self.archive.get(Identifier::Index(block_number)).await?) + } + + /// Retrieve block bytes by block hash. + /// + /// Returns `None` if the hash is not in the index (never stored, or pruned). + pub async fn get_by_hash( + &self, + block_hash: &BlockHash, + ) -> Result, BlockStorageError> { + Ok(self.archive.get(Identifier::Key(block_hash)).await?) + } + + /// Check whether a block number exists in the archive. + pub async fn has_block_number(&self, block_number: u64) -> Result { + Ok(self.archive.has(Identifier::Index(block_number)).await?) + } + + /// Check whether a block hash exists in the archive. + pub async fn has_block_hash(&self, block_hash: &BlockHash) -> Result { + Ok(self.archive.has(Identifier::Key(block_hash)).await?) + } + + /// Return the lowest stored block number, or `None` if the archive is empty. + pub fn first_block_number(&self) -> Option { + self.archive.first_index() + } + + /// Return the highest stored block number, or `None` if the archive is empty. + pub fn last_block_number(&self) -> Option { + self.archive.last_index() + } + + /// Prune all blocks with `block_number < min_block`. + /// + /// Pruning is done at section granularity (`blocks_per_section`). The actual + /// prune horizon may be lower than `min_block` when `min_block` is not aligned + /// to a section boundary. + /// + /// Calling prune with a `min_block` lower than the current prune horizon is a + /// no-op (safe to call repeatedly). + pub async fn prune(&mut self, min_block: u64) -> Result<(), BlockStorageError> { + self.archive.prune(min_block).await?; + tracing::debug!(min_block, "block storage pruned"); + Ok(()) + } + + /// Sync all pending writes to durable storage. + /// + /// Must be called periodically to ensure crash safety. Each call to `put` + /// buffers data in memory; `sync` flushes it to the underlying journal blobs. + pub async fn sync(&mut self) -> Result<(), BlockStorageError> { + self.archive.sync().await?; + Ok(()) + } + + /// Store a block and immediately sync. + /// + /// Equivalent to `put` followed by `sync`. Use when write durability is + /// required for each block (e.g., during block finalization). + #[allow(clippy::disallowed_types)] // Instant is for metrics only, not consensus. + pub async fn put_sync( + &mut self, + block_number: u64, + block_hash: BlockHash, + block_bytes: bytes::Bytes, + ) -> Result<(), BlockStorageError> { + let start = std::time::Instant::now(); + self.archive + .put_sync(block_number, block_hash, block_bytes) + .await?; + tracing::debug!( + block_number, + elapsed_us = start.elapsed().as_micros(), + "block stored (durable)" + ); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use commonware_runtime::tokio::{Config as TokioConfig, Runner}; + use commonware_runtime::Runner as RunnerTrait; + use commonware_utils::sequence::FixedBytes; + use tempfile::TempDir; + + fn make_block_hash(n: u8) -> BlockHash { + FixedBytes::new([n; 32]) + } + + fn make_block_bytes(content: &[u8]) -> bytes::Bytes { + bytes::Bytes::copy_from_slice(content) + } + + #[test] + fn test_block_storage_basic() { + let temp_dir = TempDir::new().unwrap(); + + let runtime_config = TokioConfig::default() + .with_storage_directory(temp_dir.path()) + .with_worker_threads(2); + + let runner = Runner::new(runtime_config); + runner.start(|context| async move { + let config = BlockStorageConfig::default(); + let mut store = BlockStorage::new(context, config).await.unwrap(); + + // Store a block + let block_number = 1u64; + let block_hash = make_block_hash(1); + let block_bytes = make_block_bytes(b"block data for block 1"); + + store + .put(block_number, block_hash.clone(), block_bytes.clone()) + .await + .unwrap(); + + // Retrieve by block number + let retrieved = store.get_by_number(block_number).await.unwrap(); + assert_eq!(retrieved, Some(block_bytes.clone())); + + // Retrieve by block hash + let retrieved = store.get_by_hash(&block_hash).await.unwrap(); + assert_eq!(retrieved, Some(block_bytes)); + + // Non-existent block + let missing = store.get_by_number(999).await.unwrap(); + assert_eq!(missing, None); + }); + } + + #[test] + fn test_block_storage_has() { + let temp_dir = TempDir::new().unwrap(); + + let runtime_config = TokioConfig::default() + .with_storage_directory(temp_dir.path()) + .with_worker_threads(2); + + let runner = Runner::new(runtime_config); + runner.start(|context| async move { + let config = BlockStorageConfig::default(); + let mut store = BlockStorage::new(context, config).await.unwrap(); + + let block_hash = make_block_hash(42); + + // Before storing + assert!(!store.has_block_number(5).await.unwrap()); + assert!(!store.has_block_hash(&block_hash).await.unwrap()); + + // After storing + store + .put(5, block_hash.clone(), make_block_bytes(b"block 5")) + .await + .unwrap(); + + assert!(store.has_block_number(5).await.unwrap()); + assert!(store.has_block_hash(&block_hash).await.unwrap()); + }); + } + + #[test] + fn test_block_storage_first_last_index() { + let temp_dir = TempDir::new().unwrap(); + + let runtime_config = TokioConfig::default() + .with_storage_directory(temp_dir.path()) + .with_worker_threads(2); + + let runner = Runner::new(runtime_config); + runner.start(|context| async move { + let config = BlockStorageConfig::default(); + let mut store = BlockStorage::new(context, config).await.unwrap(); + + assert_eq!(store.first_block_number(), None); + assert_eq!(store.last_block_number(), None); + + store + .put(10, make_block_hash(10), make_block_bytes(b"block 10")) + .await + .unwrap(); + store + .put(20, make_block_hash(20), make_block_bytes(b"block 20")) + .await + .unwrap(); + store + .put(30, make_block_hash(30), make_block_bytes(b"block 30")) + .await + .unwrap(); + + assert_eq!(store.first_block_number(), Some(10)); + assert_eq!(store.last_block_number(), Some(30)); + }); + } + + #[test] + fn test_block_storage_idempotent_put() { + let temp_dir = TempDir::new().unwrap(); + + let runtime_config = TokioConfig::default() + .with_storage_directory(temp_dir.path()) + .with_worker_threads(2); + + let runner = Runner::new(runtime_config); + runner.start(|context| async move { + let config = BlockStorageConfig::default(); + let mut store = BlockStorage::new(context, config).await.unwrap(); + + let block_number = 1u64; + let block_hash = make_block_hash(1); + let block_bytes_v1 = make_block_bytes(b"first write"); + let block_bytes_v2 = make_block_bytes(b"second write - should be ignored"); + + // First put + store + .put(block_number, block_hash.clone(), block_bytes_v1.clone()) + .await + .unwrap(); + + // Second put with same index - should be idempotent (no-op) + store + .put(block_number, block_hash.clone(), block_bytes_v2) + .await + .unwrap(); + + // Value should be from first put + let retrieved = store.get_by_number(block_number).await.unwrap().unwrap(); + assert_eq!(retrieved, block_bytes_v1); + }); + } + + #[test] + fn test_block_storage_multiple_blocks() { + let temp_dir = TempDir::new().unwrap(); + + let runtime_config = TokioConfig::default() + .with_storage_directory(temp_dir.path()) + .with_worker_threads(2); + + let runner = Runner::new(runtime_config); + runner.start(|context| async move { + let config = BlockStorageConfig::default(); + let mut store = BlockStorage::new(context, config).await.unwrap(); + + // Store 10 blocks + for i in 0..10u64 { + let hash = make_block_hash(i as u8); + let data = format!("block data {i}"); + store + .put(i, hash, make_block_bytes(data.as_bytes())) + .await + .unwrap(); + } + + // Verify all 10 blocks are retrievable + for i in 0..10u64 { + let result = store.get_by_number(i).await.unwrap(); + let expected = format!("block data {i}"); + assert_eq!(result, Some(make_block_bytes(expected.as_bytes()))); + } + }); + } + + #[test] + fn test_block_storage_large_block() { + let temp_dir = TempDir::new().unwrap(); + + let runtime_config = TokioConfig::default() + .with_storage_directory(temp_dir.path()) + .with_worker_threads(2); + + let runner = Runner::new(runtime_config); + runner.start(|context| async move { + let config = BlockStorageConfig::default(); + let mut store = BlockStorage::new(context, config).await.unwrap(); + + // Store a block larger than the QMDB 4KB value chunk limit (>4092 bytes) + // Block storage has no such constraint. + let large_block: Vec = (0..=255u8).cycle().take(1_000_000).collect(); + let block_bytes = bytes::Bytes::from(large_block.clone()); + + store + .put(1, make_block_hash(1), block_bytes.clone()) + .await + .unwrap(); + + let retrieved = store.get_by_number(1).await.unwrap().unwrap(); + assert_eq!(retrieved.len(), 1_000_000); + assert_eq!(retrieved.as_ref(), large_block.as_slice()); + }); + } + + #[test] + fn test_block_storage_sync_and_retrieve() { + let temp_dir = TempDir::new().unwrap(); + + let runtime_config = TokioConfig::default() + .with_storage_directory(temp_dir.path()) + .with_worker_threads(2); + + let runner = Runner::new(runtime_config); + runner.start(|context| async move { + let config = BlockStorageConfig::default(); + let mut store = BlockStorage::new(context, config).await.unwrap(); + + let block_bytes = make_block_bytes(b"durable block"); + + store + .put_sync(1, make_block_hash(1), block_bytes.clone()) + .await + .unwrap(); + + let retrieved = store.get_by_number(1).await.unwrap(); + assert_eq!(retrieved, Some(block_bytes)); + }); + } + + /// Set up a QMDB + BlockStorage pair for isolation tests. + async fn setup_qmdb_and_block_store( + context: commonware_runtime::tokio::Context, + state_config: crate::types::StorageConfig, + ) -> ( + crate::QmdbStorage, + BlockStorage, + ) { + let qmdb = crate::QmdbStorage::new(context.clone(), state_config) + .await + .unwrap(); + let block_config = BlockStorageConfig { + partition_prefix: "test-blocks".to_string(), + ..Default::default() + }; + let block_store = BlockStorage::new(context, block_config).await.unwrap(); + (qmdb, block_store) + } + + /// Verifies block storage writes do NOT affect the QMDB commit hash. + #[test] + fn test_block_storage_does_not_affect_commit_hash() { + let temp_dir = TempDir::new().unwrap(); + let state_config = crate::types::StorageConfig { + path: temp_dir.path().to_path_buf(), + ..Default::default() + }; + + let runtime_config = TokioConfig::default() + .with_storage_directory(temp_dir.path()) + .with_worker_threads(2); + + let runner = Runner::new(runtime_config); + runner.start(|context| async move { + let (qmdb, mut block_store) = setup_qmdb_and_block_store(context, state_config).await; + + qmdb.apply_batch(vec![crate::types::Operation::Set { + key: b"account_1".to_vec(), + value: b"balance_100".to_vec(), + }]) + .await + .unwrap(); + let hash_before = qmdb.commit_state().await.unwrap(); + + for i in 0..100u64 { + let hash = make_block_hash(i as u8); + let data = format!("block {i} data with lots of content"); + block_store + .put(i, hash, make_block_bytes(data.as_bytes())) + .await + .unwrap(); + } + block_store.sync().await.unwrap(); + + let hash_after = qmdb.commit_state().await.unwrap(); + assert_eq!( + hash_before.as_bytes(), + hash_after.as_bytes(), + "block storage writes must not change the QMDB commit hash" + ); + }); + } + + /// Verifies QMDB remains functional after block storage writes. + #[test] + fn test_qmdb_functional_after_block_storage_writes() { + let temp_dir = TempDir::new().unwrap(); + let state_config = crate::types::StorageConfig { + path: temp_dir.path().to_path_buf(), + ..Default::default() + }; + + let runtime_config = TokioConfig::default() + .with_storage_directory(temp_dir.path()) + .with_worker_threads(2); + + let runner = Runner::new(runtime_config); + runner.start(|context| async move { + let (qmdb, mut block_store) = setup_qmdb_and_block_store(context, state_config).await; + + qmdb.apply_batch(vec![crate::types::Operation::Set { + key: b"account_1".to_vec(), + value: b"balance_100".to_vec(), + }]) + .await + .unwrap(); + let hash_before = qmdb.commit_state().await.unwrap(); + + for i in 0..10u64 { + let hash = make_block_hash(i as u8); + block_store + .put(i, hash, make_block_bytes(b"block data")) + .await + .unwrap(); + } + block_store.sync().await.unwrap(); + + qmdb.apply_batch(vec![crate::types::Operation::Set { + key: b"account_2".to_vec(), + value: b"balance_200".to_vec(), + }]) + .await + .unwrap(); + let hash_after = qmdb.commit_state().await.unwrap(); + + assert_ne!( + hash_before.as_bytes(), + hash_after.as_bytes(), + "new state write must change the QMDB commit hash" + ); + }); + } +} diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index 686abb4..8aefdc7 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -1,3 +1,4 @@ +pub mod block_store; pub mod cache; pub mod metrics; pub mod mock; @@ -5,6 +6,7 @@ pub mod qmdb_impl; pub mod types; pub mod warming; +pub use block_store::{BlockStorage, BlockStorageError}; pub use cache::{CachedStorage, CachedValue, DbCache, ShardedDbCache}; pub use metrics::{OptionalMetrics, StorageMetrics}; pub use mock::MockStorage; diff --git a/crates/storage/src/types.rs b/crates/storage/src/types.rs index b0be899..b9b7bcc 100644 --- a/crates/storage/src/types.rs +++ b/crates/storage/src/types.rs @@ -2,6 +2,59 @@ use commonware_utils::sequence::FixedBytes; use evolve_core::{define_error, ErrorCode}; use std::fmt; +/// A 32-byte block hash used as a key in block storage. +/// +/// This is a fixed-size key suitable for use with `commonware_storage::archive::prunable::Archive`. +pub type BlockHash = FixedBytes<32>; + +/// Configuration for the block archive storage. +/// +/// Block storage is implemented as a `commonware_storage::archive::prunable::Archive`, which is +/// completely independent of QMDB. Writes to block storage do NOT affect the app hash +/// (state root / `CommitHash`). +#[derive(Debug, Clone)] +pub struct BlockStorageConfig { + /// Partition prefix used for block storage journal files. + /// + /// Two partitions are created from this prefix: + /// - `{prefix}-block-index` for the key journal + /// - `{prefix}-block-data` for the value blob + pub partition_prefix: String, + + /// Number of blocks per archive section (granularity of pruning). + /// + /// Lower values allow finer-grained pruning at the cost of more open file handles. + /// Default: 65536 (prune in ~64k block increments). + pub blocks_per_section: u64, + + /// Write buffer size for the key journal, in bytes. + /// + /// Default: 1MB + pub key_write_buffer: usize, + + /// Write buffer size for the value blob, in bytes. + /// + /// Default: 4MB (blocks can be large) + pub value_write_buffer: usize, + + /// Read buffer size for journal replay on startup, in bytes. + /// + /// Default: 4096 bytes + pub replay_buffer: usize, +} + +impl Default for BlockStorageConfig { + fn default() -> Self { + Self { + partition_prefix: "evolve-blocks".to_string(), + blocks_per_section: 65_536, + key_write_buffer: 1024 * 1024, // 1MB + value_write_buffer: 4 * 1024 * 1024, // 4MB + replay_buffer: 4096, + } + } +} + /// Represents a commit hash from the storage layer #[derive(Clone, Copy, PartialEq, Eq, Hash)] pub struct CommitHash([u8; 32]);