From dedb6075fa2533234b6327dd736eff497a444fdd Mon Sep 17 00:00:00 2001 From: xdustinface Date: Thu, 29 Jan 2026 20:20:28 +0100 Subject: [PATCH] feat: introduce `PersistentBlockStorage` This is to allow storing blocks during sync to avoid the need of re-fetching for potential rescan. At the moment we don't have persistens wallets in the CLI and storing the blocks also helps there to make restarts ~15s on my machine compared to ~50s without cached blocks. I was thinking about maybe making storing blocks optional by config although i think some "clear sync cache" command for the spv client would be better since usually there are not many blocks stored anyway unless its super heavy used wallets. --- dash-spv/src/storage/blocks.rs | 162 +++++++++++++++++++ dash-spv/src/storage/mod.rs | 22 ++- dash-spv/src/storage/segments.rs | 17 +- dash-spv/src/sync/legacy/message_handlers.rs | 6 + dash-spv/src/test_utils/mod.rs | 1 + dash-spv/src/test_utils/types.rs | 9 ++ dash-spv/src/types.rs | 67 +++++++- 7 files changed, 280 insertions(+), 4 deletions(-) create mode 100644 dash-spv/src/storage/blocks.rs create mode 100644 dash-spv/src/test_utils/types.rs diff --git a/dash-spv/src/storage/blocks.rs b/dash-spv/src/storage/blocks.rs new file mode 100644 index 000000000..a7d368912 --- /dev/null +++ b/dash-spv/src/storage/blocks.rs @@ -0,0 +1,162 @@ +//! Block storage for persisting full blocks that contain wallet-relevant transactions. + +use std::collections::HashSet; +use std::path::PathBuf; + +use crate::error::StorageResult; +use crate::storage::segments::{Persistable, SegmentCache}; +use crate::storage::PersistentStorage; +use crate::types::HashedBlock; +use async_trait::async_trait; +use dashcore::prelude::CoreBlockHeight; +use tokio::sync::RwLock; + +/// Trait for block storage operations. +#[async_trait] +pub trait BlockStorage: Send + Sync + 'static { + /// Store a block at a specific height. + async fn store_block( + &mut self, + height: CoreBlockHeight, + block: HashedBlock, + ) -> StorageResult<()>; + + /// Load a single block by height. + async fn load_block(&self, height: CoreBlockHeight) -> StorageResult>; +} + +/// Persistent storage for full blocks using segmented files. +pub struct PersistentBlockStorage { + /// Block storage segments. + blocks: RwLock>, + /// Set of available block heights used for fast lookups and to bypass sentinel loading and gap + /// detection asserts (in debug builds) in the underlying segment implementation. + available_heights: HashSet, +} + +impl PersistentBlockStorage { + const FOLDER_NAME: &str = "blocks"; +} + +#[async_trait] +impl PersistentStorage for PersistentBlockStorage { + async fn open(storage_path: impl Into + Send) -> StorageResult { + let storage_path = storage_path.into(); + let blocks_folder = storage_path.join(Self::FOLDER_NAME); + + tracing::debug!("Opening PersistentBlockStorage from {:?}", blocks_folder); + + let mut blocks: SegmentCache = + SegmentCache::load_or_new(&blocks_folder).await?; + + let mut available_heights = HashSet::new(); + + if let (Some(start), Some(end)) = (blocks.start_height(), blocks.tip_height()) { + let hashed_blocks = blocks.get_items(start..end + 1).await?; + let sentinel = HashedBlock::sentinel(); + for (i, hashed_block) in hashed_blocks.iter().enumerate() { + if hashed_block != &sentinel { + available_heights.insert(start + i as CoreBlockHeight); + } + } + } + + Ok(Self { + blocks: RwLock::new(blocks), + available_heights, + }) + } + + async fn persist(&mut self, storage_path: impl Into + Send) -> StorageResult<()> { + let blocks_folder = storage_path.into().join(Self::FOLDER_NAME); + tokio::fs::create_dir_all(&blocks_folder).await?; + self.blocks.write().await.persist(&blocks_folder).await; + Ok(()) + } +} + +#[async_trait] +impl BlockStorage for PersistentBlockStorage { + async fn store_block(&mut self, height: u32, hashed_block: HashedBlock) -> StorageResult<()> { + self.available_heights.insert(height); + self.blocks.write().await.store_items_at_height(&[hashed_block], height).await + } + + async fn load_block(&self, height: u32) -> StorageResult> { + // This early return avoids unnecessary disk lookups and bypasses sentinel loading and gap + // detection asserts (in debug builds) in the underlying segment implementation. + if !self.available_heights.contains(&height) { + return Ok(None); + } + Ok(self.blocks.write().await.get_items(height..height + 1).await?.first().cloned()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + + #[tokio::test] + async fn test_store_and_load_block() { + let temp_dir = TempDir::new().unwrap(); + let mut storage = PersistentBlockStorage::open(temp_dir.path()).await.unwrap(); + + let hashed_block = HashedBlock::dummy(100, vec![]); + storage.store_block(100, hashed_block.clone()).await.unwrap(); + + let loaded = storage.load_block(100).await.unwrap(); + assert_eq!(loaded, Some(hashed_block)); + } + + #[tokio::test] + async fn test_persistence_across_reopen() { + let temp_dir = TempDir::new().unwrap(); + let hashed_block = HashedBlock::dummy(100, vec![]); + + { + let mut storage = PersistentBlockStorage::open(temp_dir.path()).await.unwrap(); + storage.store_block(100, hashed_block.clone()).await.unwrap(); + storage.persist(temp_dir.path()).await.unwrap(); + } + + { + let storage = PersistentBlockStorage::open(temp_dir.path()).await.unwrap(); + let loaded = storage.load_block(100).await.unwrap(); + assert_eq!(loaded, Some(hashed_block)); + } + } + + #[tokio::test] + async fn test_load_nonexistent_block() { + let temp_dir = TempDir::new().unwrap(); + let storage = PersistentBlockStorage::open(temp_dir.path()).await.unwrap(); + + let loaded = storage.load_block(999).await.unwrap(); + assert!(loaded.is_none()); + } + + #[tokio::test] + async fn test_returns_none_for_gaps() { + let temp_dir = TempDir::new().unwrap(); + let mut storage = PersistentBlockStorage::open(temp_dir.path()).await.unwrap(); + + // Store blocks at non-contiguous height + let hashed_block_1 = HashedBlock::dummy(100, vec![]); + let hashed_block_2 = HashedBlock::dummy(200, vec![]); + + storage.store_block(100, hashed_block_1.clone()).await.unwrap(); + storage.store_block(200, hashed_block_2.clone()).await.unwrap(); + + // Stored blocks should load correctly + assert_eq!(storage.load_block(100).await.unwrap(), Some(hashed_block_1)); + assert_eq!(storage.load_block(200).await.unwrap(), Some(hashed_block_2)); + + // Height in between (gap) should return None, not a sentinel + assert_eq!(storage.load_block(150).await.unwrap(), None); + + // Heights outside range should also return None + assert_eq!(storage.load_block(50).await.unwrap(), None); + assert_eq!(storage.load_block(250).await.unwrap(), None); + } +} diff --git a/dash-spv/src/storage/mod.rs b/dash-spv/src/storage/mod.rs index 03f9731ad..e1cacdc11 100644 --- a/dash-spv/src/storage/mod.rs +++ b/dash-spv/src/storage/mod.rs @@ -3,6 +3,7 @@ pub mod types; mod block_headers; +mod blocks; mod chainstate; mod filter_headers; mod filters; @@ -33,10 +34,11 @@ use crate::storage::lockfile::LockFile; use crate::storage::masternode::PersistentMasternodeStateStorage; use crate::storage::metadata::PersistentMetadataStorage; use crate::storage::transactions::PersistentTransactionStorage; -use crate::types::{MempoolState, UnconfirmedTransaction}; +use crate::types::{HashedBlock, MempoolState, UnconfirmedTransaction}; use crate::{ChainState, ClientConfig}; pub use crate::storage::block_headers::BlockHeaderStorage; +pub use crate::storage::blocks::{BlockStorage, PersistentBlockStorage}; pub use crate::storage::chainstate::ChainStateStorage; pub use crate::storage::filter_headers::FilterHeaderStorage; pub use crate::storage::filters::FilterStorage; @@ -61,6 +63,7 @@ pub trait StorageManager: BlockHeaderStorage + FilterHeaderStorage + FilterStorage + + BlockStorage + TransactionStorage + MetadataStorage + ChainStateStorage @@ -85,6 +88,7 @@ pub struct DiskStorageManager { block_headers: Arc>, filter_headers: Arc>, filters: Arc>, + blocks: Arc>, transactions: Arc>, metadata: Arc>, chainstate: Arc>, @@ -121,6 +125,7 @@ impl DiskStorageManager { PersistentFilterHeaderStorage::open(&storage_path).await?, )), filters: Arc::new(RwLock::new(PersistentFilterStorage::open(&storage_path).await?)), + blocks: Arc::new(RwLock::new(PersistentBlockStorage::open(&storage_path).await?)), transactions: Arc::new(RwLock::new( PersistentTransactionStorage::open(&storage_path).await?, )), @@ -155,6 +160,7 @@ impl DiskStorageManager { let block_headers = Arc::clone(&self.block_headers); let filter_headers = Arc::clone(&self.filter_headers); let filters = Arc::clone(&self.filters); + let blocks = Arc::clone(&self.blocks); let transactions = Arc::clone(&self.transactions); let metadata = Arc::clone(&self.metadata); let chainstate = Arc::clone(&self.chainstate); @@ -171,6 +177,7 @@ impl DiskStorageManager { let _ = block_headers.write().await.persist(&storage_path).await; let _ = filter_headers.write().await.persist(&storage_path).await; let _ = filters.write().await.persist(&storage_path).await; + let _ = blocks.write().await.persist(&storage_path).await; let _ = transactions.write().await.persist(&storage_path).await; let _ = metadata.write().await.persist(&storage_path).await; let _ = chainstate.write().await.persist(&storage_path).await; @@ -194,6 +201,7 @@ impl DiskStorageManager { let _ = self.block_headers.write().await.persist(storage_path).await; let _ = self.filter_headers.write().await.persist(storage_path).await; let _ = self.filters.write().await.persist(storage_path).await; + let _ = self.blocks.write().await.persist(storage_path).await; let _ = self.transactions.write().await.persist(storage_path).await; let _ = self.metadata.write().await.persist(storage_path).await; let _ = self.chainstate.write().await.persist(storage_path).await; @@ -232,6 +240,7 @@ impl StorageManager for DiskStorageManager { self.filter_headers = Arc::new(RwLock::new(PersistentFilterHeaderStorage::open(storage_path).await?)); self.filters = Arc::new(RwLock::new(PersistentFilterStorage::open(storage_path).await?)); + self.blocks = Arc::new(RwLock::new(PersistentBlockStorage::open(storage_path).await?)); self.transactions = Arc::new(RwLock::new(PersistentTransactionStorage::open(storage_path).await?)); self.metadata = Arc::new(RwLock::new(PersistentMetadataStorage::open(storage_path).await?)); @@ -326,6 +335,17 @@ impl filters::FilterStorage for DiskStorageManager { } } +#[async_trait] +impl BlockStorage for DiskStorageManager { + async fn store_block(&mut self, height: u32, block: HashedBlock) -> StorageResult<()> { + self.blocks.write().await.store_block(height, block).await + } + + async fn load_block(&self, height: u32) -> StorageResult> { + self.blocks.read().await.load_block(height).await + } +} + #[async_trait] impl transactions::TransactionStorage for DiskStorageManager { async fn store_mempool_transaction( diff --git a/dash-spv/src/storage/segments.rs b/dash-spv/src/storage/segments.rs index d98b55d96..6eb23dccf 100644 --- a/dash-spv/src/storage/segments.rs +++ b/dash-spv/src/storage/segments.rs @@ -13,12 +13,15 @@ use dashcore::{ block::{Header as BlockHeader, Version}, consensus::{encode, Decodable, Encodable}, hash_types::FilterHeader, - BlockHash, CompactTarget, + Block, BlockHash, CompactTarget, }; use dashcore_hashes::Hash; use crate::{ - error::StorageResult, storage::io::atomic_write, types::HashedBlockHeader, StorageError, + error::StorageResult, + storage::io::atomic_write, + types::{HashedBlock, HashedBlockHeader}, + StorageError, }; pub trait Persistable: Sized + Encodable + Decodable + PartialEq + Clone { @@ -59,6 +62,16 @@ impl Persistable for FilterHeader { } } +impl Persistable for HashedBlock { + fn sentinel() -> Self { + let block = Block { + header: *HashedBlockHeader::sentinel().header(), + txdata: Vec::new(), + }; + Self::from(block) + } +} + /// In-memory cache for all segments of items #[derive(Debug)] pub struct SegmentCache { diff --git a/dash-spv/src/sync/legacy/message_handlers.rs b/dash-spv/src/sync/legacy/message_handlers.rs index 759998ae3..957fc6a10 100644 --- a/dash-spv/src/sync/legacy/message_handlers.rs +++ b/dash-spv/src/sync/legacy/message_handlers.rs @@ -12,6 +12,7 @@ use super::phases::SyncPhase; use crate::error::{SyncError, SyncResult}; use crate::network::{Message, NetworkManager}; use crate::storage::StorageManager; +use crate::types::HashedBlock; use key_wallet_manager::wallet_interface::WalletInterface; use key_wallet_manager::wallet_manager::{check_compact_filters_for_addresses, FilterMatchKey}; @@ -647,6 +648,11 @@ impl SyncManager) -> Self { + Self::from(Block::dummy(height, transactions)) + } +} diff --git a/dash-spv/src/types.rs b/dash-spv/src/types.rs index 0fac3e048..ddf44a498 100644 --- a/dash-spv/src/types.rs +++ b/dash-spv/src/types.rs @@ -21,7 +21,7 @@ use dashcore::{ hash_types::FilterHeader, network::constants::NetworkExt, sml::masternode_list_engine::MasternodeListEngine, - Amount, BlockHash, Network, Transaction, Txid, + Amount, Block, BlockHash, Network, Transaction, Txid, }; use serde::{Deserialize, Serialize}; @@ -109,6 +109,71 @@ impl Decodable for HashedBlockHeader { } } +/// A block with its cached hash to avoid expensive X11 recomputation. +#[derive(Debug, Clone)] +pub struct HashedBlock { + hash: BlockHash, + block: Block, +} + +impl HashedBlock { + /// Get a reference to the cached block hash. + pub fn hash(&self) -> &BlockHash { + &self.hash + } + + /// Get a reference to the block. + pub fn block(&self) -> &Block { + &self.block + } +} + +impl From for HashedBlock { + fn from(block: Block) -> Self { + Self { + hash: block.block_hash(), + block, + } + } +} + +impl From<&Block> for HashedBlock { + fn from(block: &Block) -> Self { + Self { + hash: block.block_hash(), + block: block.clone(), + } + } +} + +impl PartialEq for HashedBlock { + fn eq(&self, other: &Self) -> bool { + self.block == other.block + } +} + +impl Encodable for HashedBlock { + #[inline] + fn consensus_encode( + &self, + writer: &mut W, + ) -> Result { + Ok(self.hash().consensus_encode(writer)? + self.block().consensus_encode(writer)?) + } +} + +impl Decodable for HashedBlock { + #[inline] + fn consensus_decode( + reader: &mut R, + ) -> Result { + Ok(Self { + hash: BlockHash::consensus_decode(reader)?, + block: Block::consensus_decode(reader)?, + }) + } +} + /// Unique identifier for a peer connection. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct PeerId(pub u64);