Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 162 additions & 0 deletions dash-spv/src/storage/blocks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
//! Block storage for persisting full blocks that contain wallet-relevant transactions.

use std::collections::HashSet;
use std::path::PathBuf;

use crate::error::StorageResult;
use crate::storage::segments::{Persistable, SegmentCache};
use crate::storage::PersistentStorage;
use crate::types::HashedBlock;
use async_trait::async_trait;
use dashcore::prelude::CoreBlockHeight;
use tokio::sync::RwLock;

/// Trait for block storage operations.
#[async_trait]
pub trait BlockStorage: Send + Sync + 'static {
/// Store a block at a specific height.
async fn store_block(
&mut self,
height: CoreBlockHeight,
block: HashedBlock,
) -> StorageResult<()>;

/// Load a single block by height.
async fn load_block(&self, height: CoreBlockHeight) -> StorageResult<Option<HashedBlock>>;
}

/// Persistent storage for full blocks using segmented files.
pub struct PersistentBlockStorage {
/// Block storage segments.
blocks: RwLock<SegmentCache<HashedBlock>>,
/// Set of available block heights used for fast lookups and to bypass sentinel loading and gap
/// detection asserts (in debug builds) in the underlying segment implementation.
available_heights: HashSet<CoreBlockHeight>,
}

impl PersistentBlockStorage {
const FOLDER_NAME: &str = "blocks";
}

#[async_trait]
impl PersistentStorage for PersistentBlockStorage {
async fn open(storage_path: impl Into<PathBuf> + Send) -> StorageResult<Self> {
let storage_path = storage_path.into();
let blocks_folder = storage_path.join(Self::FOLDER_NAME);

tracing::debug!("Opening PersistentBlockStorage from {:?}", blocks_folder);

let mut blocks: SegmentCache<HashedBlock> =
SegmentCache::load_or_new(&blocks_folder).await?;

let mut available_heights = HashSet::new();

if let (Some(start), Some(end)) = (blocks.start_height(), blocks.tip_height()) {
let hashed_blocks = blocks.get_items(start..end + 1).await?;
let sentinel = HashedBlock::sentinel();
for (i, hashed_block) in hashed_blocks.iter().enumerate() {
if hashed_block != &sentinel {
available_heights.insert(start + i as CoreBlockHeight);
}
}
}

Ok(Self {
blocks: RwLock::new(blocks),
available_heights,
})
}

async fn persist(&mut self, storage_path: impl Into<PathBuf> + Send) -> StorageResult<()> {
let blocks_folder = storage_path.into().join(Self::FOLDER_NAME);
tokio::fs::create_dir_all(&blocks_folder).await?;
self.blocks.write().await.persist(&blocks_folder).await;
Ok(())
}
}

#[async_trait]
impl BlockStorage for PersistentBlockStorage {
async fn store_block(&mut self, height: u32, hashed_block: HashedBlock) -> StorageResult<()> {
self.available_heights.insert(height);
self.blocks.write().await.store_items_at_height(&[hashed_block], height).await
}

async fn load_block(&self, height: u32) -> StorageResult<Option<HashedBlock>> {
// This early return avoids unnecessary disk lookups and bypasses sentinel loading and gap
// detection asserts (in debug builds) in the underlying segment implementation.
if !self.available_heights.contains(&height) {
return Ok(None);
}
Ok(self.blocks.write().await.get_items(height..height + 1).await?.first().cloned())
}
}

#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;

#[tokio::test]
async fn test_store_and_load_block() {
let temp_dir = TempDir::new().unwrap();
let mut storage = PersistentBlockStorage::open(temp_dir.path()).await.unwrap();

let hashed_block = HashedBlock::dummy(100, vec![]);
storage.store_block(100, hashed_block.clone()).await.unwrap();

let loaded = storage.load_block(100).await.unwrap();
assert_eq!(loaded, Some(hashed_block));
}

#[tokio::test]
async fn test_persistence_across_reopen() {
let temp_dir = TempDir::new().unwrap();
let hashed_block = HashedBlock::dummy(100, vec![]);

{
let mut storage = PersistentBlockStorage::open(temp_dir.path()).await.unwrap();
storage.store_block(100, hashed_block.clone()).await.unwrap();
storage.persist(temp_dir.path()).await.unwrap();
}

{
let storage = PersistentBlockStorage::open(temp_dir.path()).await.unwrap();
let loaded = storage.load_block(100).await.unwrap();
assert_eq!(loaded, Some(hashed_block));
}
}

#[tokio::test]
async fn test_load_nonexistent_block() {
let temp_dir = TempDir::new().unwrap();
let storage = PersistentBlockStorage::open(temp_dir.path()).await.unwrap();

let loaded = storage.load_block(999).await.unwrap();
assert!(loaded.is_none());
}

#[tokio::test]
async fn test_returns_none_for_gaps() {
let temp_dir = TempDir::new().unwrap();
let mut storage = PersistentBlockStorage::open(temp_dir.path()).await.unwrap();

// Store blocks at non-contiguous height
let hashed_block_1 = HashedBlock::dummy(100, vec![]);
let hashed_block_2 = HashedBlock::dummy(200, vec![]);

storage.store_block(100, hashed_block_1.clone()).await.unwrap();
storage.store_block(200, hashed_block_2.clone()).await.unwrap();

// Stored blocks should load correctly
assert_eq!(storage.load_block(100).await.unwrap(), Some(hashed_block_1));
assert_eq!(storage.load_block(200).await.unwrap(), Some(hashed_block_2));

// Height in between (gap) should return None, not a sentinel
assert_eq!(storage.load_block(150).await.unwrap(), None);

// Heights outside range should also return None
assert_eq!(storage.load_block(50).await.unwrap(), None);
assert_eq!(storage.load_block(250).await.unwrap(), None);
}
}
22 changes: 21 additions & 1 deletion dash-spv/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
pub mod types;

mod block_headers;
mod blocks;
mod chainstate;
mod filter_headers;
mod filters;
Expand Down Expand Up @@ -33,10 +34,11 @@ use crate::storage::lockfile::LockFile;
use crate::storage::masternode::PersistentMasternodeStateStorage;
use crate::storage::metadata::PersistentMetadataStorage;
use crate::storage::transactions::PersistentTransactionStorage;
use crate::types::{MempoolState, UnconfirmedTransaction};
use crate::types::{HashedBlock, MempoolState, UnconfirmedTransaction};
use crate::{ChainState, ClientConfig};

pub use crate::storage::block_headers::BlockHeaderStorage;
pub use crate::storage::blocks::{BlockStorage, PersistentBlockStorage};
pub use crate::storage::chainstate::ChainStateStorage;
pub use crate::storage::filter_headers::FilterHeaderStorage;
pub use crate::storage::filters::FilterStorage;
Expand All @@ -61,6 +63,7 @@ pub trait StorageManager:
BlockHeaderStorage
+ FilterHeaderStorage
+ FilterStorage
+ BlockStorage
+ TransactionStorage
+ MetadataStorage
+ ChainStateStorage
Expand All @@ -85,6 +88,7 @@ pub struct DiskStorageManager {
block_headers: Arc<RwLock<PersistentBlockHeaderStorage>>,
filter_headers: Arc<RwLock<PersistentFilterHeaderStorage>>,
filters: Arc<RwLock<PersistentFilterStorage>>,
blocks: Arc<RwLock<PersistentBlockStorage>>,
transactions: Arc<RwLock<PersistentTransactionStorage>>,
metadata: Arc<RwLock<PersistentMetadataStorage>>,
chainstate: Arc<RwLock<PersistentChainStateStorage>>,
Expand Down Expand Up @@ -121,6 +125,7 @@ impl DiskStorageManager {
PersistentFilterHeaderStorage::open(&storage_path).await?,
)),
filters: Arc::new(RwLock::new(PersistentFilterStorage::open(&storage_path).await?)),
blocks: Arc::new(RwLock::new(PersistentBlockStorage::open(&storage_path).await?)),
transactions: Arc::new(RwLock::new(
PersistentTransactionStorage::open(&storage_path).await?,
)),
Expand Down Expand Up @@ -155,6 +160,7 @@ impl DiskStorageManager {
let block_headers = Arc::clone(&self.block_headers);
let filter_headers = Arc::clone(&self.filter_headers);
let filters = Arc::clone(&self.filters);
let blocks = Arc::clone(&self.blocks);
let transactions = Arc::clone(&self.transactions);
let metadata = Arc::clone(&self.metadata);
let chainstate = Arc::clone(&self.chainstate);
Expand All @@ -171,6 +177,7 @@ impl DiskStorageManager {
let _ = block_headers.write().await.persist(&storage_path).await;
let _ = filter_headers.write().await.persist(&storage_path).await;
let _ = filters.write().await.persist(&storage_path).await;
let _ = blocks.write().await.persist(&storage_path).await;
let _ = transactions.write().await.persist(&storage_path).await;
let _ = metadata.write().await.persist(&storage_path).await;
let _ = chainstate.write().await.persist(&storage_path).await;
Expand All @@ -194,6 +201,7 @@ impl DiskStorageManager {
let _ = self.block_headers.write().await.persist(storage_path).await;
let _ = self.filter_headers.write().await.persist(storage_path).await;
let _ = self.filters.write().await.persist(storage_path).await;
let _ = self.blocks.write().await.persist(storage_path).await;
let _ = self.transactions.write().await.persist(storage_path).await;
let _ = self.metadata.write().await.persist(storage_path).await;
let _ = self.chainstate.write().await.persist(storage_path).await;
Expand Down Expand Up @@ -232,6 +240,7 @@ impl StorageManager for DiskStorageManager {
self.filter_headers =
Arc::new(RwLock::new(PersistentFilterHeaderStorage::open(storage_path).await?));
self.filters = Arc::new(RwLock::new(PersistentFilterStorage::open(storage_path).await?));
self.blocks = Arc::new(RwLock::new(PersistentBlockStorage::open(storage_path).await?));
self.transactions =
Arc::new(RwLock::new(PersistentTransactionStorage::open(storage_path).await?));
self.metadata = Arc::new(RwLock::new(PersistentMetadataStorage::open(storage_path).await?));
Expand Down Expand Up @@ -326,6 +335,17 @@ impl filters::FilterStorage for DiskStorageManager {
}
}

#[async_trait]
impl BlockStorage for DiskStorageManager {
async fn store_block(&mut self, height: u32, block: HashedBlock) -> StorageResult<()> {
self.blocks.write().await.store_block(height, block).await
}

async fn load_block(&self, height: u32) -> StorageResult<Option<HashedBlock>> {
self.blocks.read().await.load_block(height).await
}
}

#[async_trait]
impl transactions::TransactionStorage for DiskStorageManager {
async fn store_mempool_transaction(
Expand Down
17 changes: 15 additions & 2 deletions dash-spv/src/storage/segments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@ use dashcore::{
block::{Header as BlockHeader, Version},
consensus::{encode, Decodable, Encodable},
hash_types::FilterHeader,
BlockHash, CompactTarget,
Block, BlockHash, CompactTarget,
};
use dashcore_hashes::Hash;

use crate::{
error::StorageResult, storage::io::atomic_write, types::HashedBlockHeader, StorageError,
error::StorageResult,
storage::io::atomic_write,
types::{HashedBlock, HashedBlockHeader},
StorageError,
};

pub trait Persistable: Sized + Encodable + Decodable + PartialEq + Clone {
Expand Down Expand Up @@ -59,6 +62,16 @@ impl Persistable for FilterHeader {
}
}

impl Persistable for HashedBlock {
fn sentinel() -> Self {
let block = Block {
header: *HashedBlockHeader::sentinel().header(),
txdata: Vec::new(),
};
Self::from(block)
}
}

/// In-memory cache for all segments of items
#[derive(Debug)]
pub struct SegmentCache<I: Persistable> {
Expand Down
6 changes: 6 additions & 0 deletions dash-spv/src/sync/legacy/message_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use super::phases::SyncPhase;
use crate::error::{SyncError, SyncResult};
use crate::network::{Message, NetworkManager};
use crate::storage::StorageManager;
use crate::types::HashedBlock;
use key_wallet_manager::wallet_interface::WalletInterface;
use key_wallet_manager::wallet_manager::{check_compact_filters_for_addresses, FilterMatchKey};

Expand Down Expand Up @@ -647,6 +648,11 @@ impl<S: StorageManager, N: NetworkManager, W: WalletInterface> SyncManager<S, N,

let result = wallet.process_block(block, block_height).await;

storage
.store_block(block_height, HashedBlock::from(block))
.await
.map_err(|e| SyncError::Storage(e.to_string()))?;

drop(wallet);

let total_relevant = result.relevant_tx_count();
Expand Down
1 change: 1 addition & 0 deletions dash-spv/src/test_utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ mod chain_work;
mod checkpoint;
mod filter;
mod network;
mod types;

pub use network::{test_socket_address, MockNetworkManager};
9 changes: 9 additions & 0 deletions dash-spv/src/test_utils/types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use crate::types::HashedBlock;
use dashcore::prelude::CoreBlockHeight;
use dashcore::{Block, Transaction};

impl HashedBlock {
pub fn dummy(height: CoreBlockHeight, transactions: Vec<Transaction>) -> Self {
Self::from(Block::dummy(height, transactions))
}
}
Loading
Loading