Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ alloy-primitives = "0.8"
alloy-consensus = "0.9"
alloy-rlp = "0.3"
alloy-eips = "0.9"
bytes = "1.5"
rusqlite = { version = "0.38", features = ["bundled"] }
[workspace.lints.rust]
# missing_docs = "deny"
Expand Down
1 change: 1 addition & 0 deletions crates/app/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ rust-version.workspace = true
description = "Reusable dev-node runner for Evolve applications."

[dependencies]
bytes = { workspace = true }
commonware-runtime = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
Expand Down
122 changes: 90 additions & 32 deletions crates/app/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,20 @@ use borsh::{BorshDeserialize, BorshSerialize};
use commonware_runtime::tokio::{Config as TokioConfig, Context as TokioContext, Runner};
use commonware_runtime::{Runner as RunnerTrait, Spawner};
use evolve_chain_index::{ChainStateProvider, ChainStateProviderConfig, PersistentChainIndex};
use evolve_core::encoding::Encodable;
use evolve_core::ReadonlyKV;
use evolve_eth_jsonrpc::{start_server_with_subscriptions, RpcServerConfig, SubscriptionManager};
use evolve_mempool::{new_shared_mempool, Mempool, MempoolTx, SharedMempool};
use evolve_rpc_types::SyncStatus;
use evolve_server::StfExecutor;
use evolve_server::{
load_chain_state, save_chain_state, ChainState, DevConfig, DevConsensus, CHAIN_STATE_KEY,
};
use evolve_server::{OnBlockArchive, StfExecutor};
use evolve_stf_traits::{AccountsCodeStorage, StateChange, Transaction};
use evolve_storage::{MockStorage, Operation, Storage, StorageConfig};
use evolve_storage::types::BlockHash as ArchiveBlockHash;
use evolve_storage::{
BlockStorage, BlockStorageConfig, MockStorage, Operation, Storage, StorageConfig,
};
use evolve_tx_eth::TxContext;
use std::future::Future;

Expand Down Expand Up @@ -84,7 +88,7 @@ pub fn build_dev_node_with_mempool<Stf, Codes, S, Tx>(
config: DevConfig,
) -> DevNodeMempoolHandles<Stf, S, Codes, Tx>
where
Tx: Transaction + MempoolTx + Send + Sync + 'static,
Tx: Transaction + MempoolTx + Encodable + Send + Sync + 'static,
S: ReadonlyKV + Storage + Clone + Send + Sync + 'static,
Codes: AccountsCodeStorage + Send + Sync + 'static,
Stf: StfExecutor<Tx, S, Codes> + Send + Sync + 'static,
Expand Down Expand Up @@ -163,6 +167,47 @@ pub struct GenesisOutput<G> {

type RuntimeContext = TokioContext;

/// Build the block archive callback.
///
/// Creates a `BlockStorage` backed by the commonware archive and returns
/// an `OnBlockArchive` callback that writes each produced block into it.
///
/// # Panics
///
/// Panics if block storage initialization fails. Block archival is a required
/// subsystem — all produced blocks must be persisted.
async fn build_block_archive(context: TokioContext) -> OnBlockArchive {
let config = BlockStorageConfig::default();
let store = BlockStorage::new(context, config)
.await
.expect("failed to initialize block archive storage");

let (tx, mut rx) = tokio::sync::mpsc::channel::<(u64, ArchiveBlockHash, bytes::Bytes)>(64);

// Single consumer task ensures blocks are written in order.
tokio::spawn(async move {
let mut store = store;
while let Some((block_number, block_hash, block_bytes)) = rx.recv().await {
if let Err(e) = store.put_sync(block_number, block_hash, block_bytes).await {
tracing::warn!("Failed to archive block {}: {:?}", block_number, e);
}
}
});

tracing::info!("Block archive storage enabled");

Arc::new(move |block_number, block_hash, block_bytes| {
let hash_bytes = ArchiveBlockHash::new(block_hash.0);
if let Err(e) = tx.try_send((block_number, hash_bytes, block_bytes)) {
tracing::warn!(
"Block archive channel full or closed for block {}: {}",
block_number,
e
);
}
})
}

/// Run the dev node with default settings (RPC enabled).
pub fn run_dev_node<
Stf,
Expand All @@ -184,7 +229,7 @@ pub fn run_dev_node<
run_genesis: RunGenesis,
build_storage: BuildStorage,
) where
Tx: Transaction + MempoolTx + Send + Sync + 'static,
Tx: Transaction + MempoolTx + Encodable + Send + Sync + 'static,
Codes: AccountsCodeStorage + Send + Sync + 'static,
S: ReadonlyKV + Storage + Clone + Send + Sync + 'static,
Stf: StfExecutor<Tx, S, Codes> + Send + Sync + 'static,
Expand Down Expand Up @@ -233,7 +278,7 @@ pub fn run_dev_node_with_rpc<
build_storage: BuildStorage,
rpc_config: RpcConfig,
) where
Tx: Transaction + MempoolTx + Send + Sync + 'static,
Tx: Transaction + MempoolTx + Encodable + Send + Sync + 'static,
Codes: AccountsCodeStorage + Send + Sync + 'static,
S: ReadonlyKV + Storage + Clone + Send + Sync + 'static,
Stf: StfExecutor<Tx, S, Codes> + Send + Sync + 'static,
Expand Down Expand Up @@ -283,6 +328,7 @@ pub fn run_dev_node_with_rpc<
async move {
// Clone context early since build_storage takes ownership
let context_for_shutdown = context.clone();
let context_for_archive = context.clone();
let storage = (build_storage)(context, storage_config)
.await
.expect("failed to create storage");
Expand Down Expand Up @@ -323,6 +369,9 @@ pub fn run_dev_node_with_rpc<
..Default::default()
};

// Build block archive callback (always on)
let archive_cb = build_block_archive(context_for_archive).await;

// Set up RPC infrastructure if enabled
let rpc_handle = if rpc_config.enabled {
// Create chain index backed by SQLite
Expand Down Expand Up @@ -369,17 +418,18 @@ pub fn run_dev_node_with_rpc<
.expect("failed to start RPC server");

// Create DevConsensus with RPC support
let dev: Arc<DevConsensus<Stf, S, Codes, Tx, PersistentChainIndex>> = Arc::new(
DevConsensus::with_rpc(
stf,
storage,
codes,
dev_config,
chain_index,
subscriptions,
)
.with_indexing_enabled(rpc_config.enable_block_indexing),
);
let consensus = DevConsensus::with_rpc(
stf,
storage,
codes,
dev_config,
chain_index,
subscriptions,
)
.with_indexing_enabled(rpc_config.enable_block_indexing)
.with_block_archive(archive_cb);
let dev: Arc<DevConsensus<Stf, S, Codes, Tx, PersistentChainIndex>> =
Arc::new(consensus);

tracing::info!(
"Block interval: {:?}, starting at height {}",
Expand Down Expand Up @@ -421,8 +471,10 @@ pub fn run_dev_node_with_rpc<
Some(handle)
} else {
// No RPC - use simple DevConsensus
let consensus = DevConsensus::new(stf, storage, codes, dev_config)
.with_block_archive(archive_cb);
let dev: Arc<DevConsensus<Stf, S, Codes, Tx, evolve_server::NoopChainIndex>> =
Arc::new(DevConsensus::new(stf, storage, codes, dev_config));
Arc::new(consensus);

tracing::info!(
"Block interval: {:?}, starting at height {}",
Expand Down Expand Up @@ -501,7 +553,7 @@ pub fn run_dev_node_with_rpc_and_mempool<
build_storage: BuildStorage,
rpc_config: RpcConfig,
) where
Tx: Transaction + MempoolTx + Send + Sync + 'static,
Tx: Transaction + MempoolTx + Encodable + Send + Sync + 'static,
Codes: AccountsCodeStorage + Send + Sync + 'static,
S: ReadonlyKV + Storage + Clone + Send + Sync + 'static,
Stf: StfExecutor<Tx, S, Codes> + Send + Sync + 'static,
Expand Down Expand Up @@ -711,6 +763,7 @@ pub fn run_dev_node_with_rpc_and_mempool_eth<

async move {
let context_for_shutdown = context.clone();
let context_for_archive = context.clone();
let storage = (build_storage)(context, storage_config)
.await
.expect("failed to create storage");
Expand Down Expand Up @@ -751,6 +804,9 @@ pub fn run_dev_node_with_rpc_and_mempool_eth<

let mempool: SharedMempool<Mempool<TxContext>> = new_shared_mempool();

// Build block archive callback (always on)
let archive_cb = build_block_archive(context_for_archive).await;

let rpc_handle = if rpc_config.enabled {
let chain_index = Arc::new(
PersistentChainIndex::new(&chain_index_db_path)
Expand Down Expand Up @@ -791,19 +847,19 @@ pub fn run_dev_node_with_rpc_and_mempool_eth<
.await
.expect("failed to start RPC server");

let consensus = DevConsensus::with_rpc_and_mempool(
stf,
storage,
codes,
dev_config,
chain_index,
subscriptions,
mempool.clone(),
)
.with_indexing_enabled(rpc_config.enable_block_indexing)
.with_block_archive(archive_cb);
let dev: Arc<DevConsensus<Stf, S, Codes, TxContext, PersistentChainIndex>> =
Arc::new(
DevConsensus::with_rpc_and_mempool(
stf,
storage,
codes,
dev_config,
chain_index,
subscriptions,
mempool.clone(),
)
.with_indexing_enabled(rpc_config.enable_block_indexing),
);
Arc::new(consensus);

tracing::info!(
"Block interval: {:?}, max_txs_per_block: {}, starting at height {}",
Expand Down Expand Up @@ -841,8 +897,10 @@ pub fn run_dev_node_with_rpc_and_mempool_eth<

Some(handle)
} else {
let consensus = DevConsensus::with_mempool(stf, storage, codes, dev_config, mempool)
.with_block_archive(archive_cb);
let dev: Arc<DevConsensus<Stf, S, Codes, TxContext, evolve_server::NoopChainIndex>> =
Arc::new(DevConsensus::with_mempool(stf, storage, codes, dev_config, mempool));
Arc::new(consensus);

tracing::info!(
"Block interval: {:?}, max_txs_per_block: {}, starting at height {}",
Expand Down Expand Up @@ -952,7 +1010,7 @@ pub fn init_dev_node<
run_genesis: RunGenesis,
build_storage: BuildStorage,
) where
Tx: Transaction + MempoolTx + Send + Sync + 'static,
Tx: Transaction + MempoolTx + Encodable + Send + Sync + 'static,
Codes: AccountsCodeStorage + Send + Sync + 'static,
S: ReadonlyKV + Storage + Clone + Send + Sync + 'static,
Stf: StfExecutor<Tx, S, Codes> + Send + Sync + 'static,
Expand Down
1 change: 1 addition & 0 deletions crates/app/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ evolve_tx_eth = { workspace = true }

alloy-primitives = { workspace = true }
borsh = { workspace = true }
bytes = { workspace = true }
tokio = { workspace = true }
async-trait = { workspace = true }
tracing = { workspace = true }
Expand Down
48 changes: 48 additions & 0 deletions crates/app/server/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
//! working with the Evolve STF.

use alloy_primitives::{Address, Bloom, Bytes, B256, B64, U256};
use borsh::{BorshDeserialize, BorshSerialize};
use evolve_core::encoding::Encodable;
use evolve_core::BlockContext;
use evolve_stf_traits::Block as BlockTrait;

Expand Down Expand Up @@ -170,6 +172,52 @@ impl<Tx> BlockTrait<Tx> for Block<Tx> {
}
}

/// Borsh-serializable block snapshot for archive storage.
///
/// Contains the essential block data needed for P2P sync and block serving.
/// Transactions are stored as their encoded byte representation.
#[derive(BorshSerialize, BorshDeserialize)]
pub struct ArchivedBlock {
pub number: u64,
pub timestamp: u64,
pub parent_hash: [u8; 32],
pub state_root: [u8; 32],
pub block_hash: [u8; 32],
pub gas_limit: u64,
pub gas_used: u64,
pub transactions: Vec<Vec<u8>>,
}

impl<Tx: Encodable> Block<Tx> {
/// Create an archived snapshot of this block for storage.
///
/// Encodes each transaction via `Encodable::encode()` and captures
/// the block hash, state root, and gas used from execution results.
pub fn to_archived(
&self,
block_hash: B256,
state_root: B256,
gas_used: u64,
) -> Result<ArchivedBlock, evolve_core::ErrorCode> {
let transactions: Vec<Vec<u8>> = self
.transactions
.iter()
.map(|tx| tx.encode())
.collect::<Result<Vec<_>, _>>()?;

Ok(ArchivedBlock {
number: self.header.number,
timestamp: self.header.timestamp,
parent_hash: self.header.parent_hash.0,
state_root: state_root.0,
block_hash: block_hash.0,
gas_limit: self.header.gas_limit,
gas_used,
transactions,
})
}
}

/// Builder for creating blocks.
#[derive(Debug)]
pub struct BlockBuilder<Tx> {
Expand Down
Loading
Loading