From d1f62d8f0fa7e2f6c8e183001c315179d49f4314 Mon Sep 17 00:00:00 2001 From: Borja Castellano Date: Fri, 19 Dec 2025 19:50:14 +0000 Subject: [PATCH 1/9] dropped unuseed code --- dash-spv/src/sync/headers/manager.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/dash-spv/src/sync/headers/manager.rs b/dash-spv/src/sync/headers/manager.rs index 4498c16d9..cf7d6e8bb 100644 --- a/dash-spv/src/sync/headers/manager.rs +++ b/dash-spv/src/sync/headers/manager.rs @@ -48,7 +48,6 @@ pub struct HeaderSyncManager { config: ClientConfig, tip_manager: ChainTipManager, checkpoint_manager: CheckpointManager, - reorg_config: ReorgConfig, chain_state: Arc>, syncing_headers: bool, last_sync_progress: std::time::Instant, @@ -77,7 +76,6 @@ impl HeaderSyncManager { config: config.clone(), tip_manager: ChainTipManager::new(reorg_config.max_forks), checkpoint_manager, - reorg_config, chain_state, syncing_headers: false, last_sync_progress: std::time::Instant::now(), From fc9112344ee33316a6db28da24a85ff86df08701 Mon Sep 17 00:00:00 2001 From: Borja Castellano Date: Mon, 22 Dec 2025 16:34:41 +0000 Subject: [PATCH 2/9] removed current_filter_tip. never updated or queried --- dash-spv-ffi/tests/unit/test_type_conversions.rs | 1 - dash-spv/src/storage/chainstate.rs | 5 ----- dash-spv/src/types.rs | 5 ----- 3 files changed, 11 deletions(-) diff --git a/dash-spv-ffi/tests/unit/test_type_conversions.rs b/dash-spv-ffi/tests/unit/test_type_conversions.rs index 264593c8f..a899881b6 100644 --- a/dash-spv-ffi/tests/unit/test_type_conversions.rs +++ b/dash-spv-ffi/tests/unit/test_type_conversions.rs @@ -165,7 +165,6 @@ mod tests { let state = dash_spv::ChainState { last_chainlock_height: None, last_chainlock_hash: None, - current_filter_tip: None, masternode_engine: None, last_masternode_diff_height: None, sync_base_height: 0, diff --git a/dash-spv/src/storage/chainstate.rs b/dash-spv/src/storage/chainstate.rs index c6c3b69af..7b0afba50 100644 --- a/dash-spv/src/storage/chainstate.rs +++ b/dash-spv/src/storage/chainstate.rs @@ -44,7 +44,6 @@ impl ChainStateStorage for PersistentChainStateStorage { let state_data = serde_json::json!({ "last_chainlock_height": state.last_chainlock_height, "last_chainlock_hash": state.last_chainlock_hash, - "current_filter_tip": state.current_filter_tip, "last_masternode_diff_height": state.last_masternode_diff_height, "sync_base_height": state.sync_base_height, }); @@ -80,10 +79,6 @@ impl ChainStateStorage for PersistentChainStateStorage { .get("last_chainlock_hash") .and_then(|v| v.as_str()) .and_then(|s| s.parse().ok()), - current_filter_tip: value - .get("current_filter_tip") - .and_then(|v| v.as_str()) - .and_then(|s| s.parse().ok()), masternode_engine: None, last_masternode_diff_height: value .get("last_masternode_diff_height") diff --git a/dash-spv/src/types.rs b/dash-spv/src/types.rs index c4ff0803f..fccf27c45 100644 --- a/dash-spv/src/types.rs +++ b/dash-spv/src/types.rs @@ -20,7 +20,6 @@ use std::time::{Duration, Instant, SystemTime}; use dashcore::{ block::Header as BlockHeader, consensus::{Decodable, Encodable}, - hash_types::FilterHeader, network::constants::NetworkExt, sml::masternode_list_engine::MasternodeListEngine, Amount, BlockHash, Network, Transaction, Txid, @@ -265,9 +264,6 @@ pub struct ChainState { /// Last ChainLock hash. pub last_chainlock_hash: Option, - /// Current filter tip. - pub current_filter_tip: Option, - /// Masternode list engine. pub masternode_engine: Option, @@ -381,7 +377,6 @@ impl std::fmt::Debug for ChainState { f.debug_struct("ChainState") .field("last_chainlock_height", &self.last_chainlock_height) .field("last_chainlock_hash", &self.last_chainlock_hash) - .field("current_filter_tip", &self.current_filter_tip) .field("last_masternode_diff_height", &self.last_masternode_diff_height) .field("sync_base_height", &self.sync_base_height) .finish() From 74a52a8b46151646488b805488837cde86e059df Mon Sep 17 00:00:00 2001 From: Borja Castellano Date: Mon, 22 Dec 2025 16:54:09 +0000 Subject: [PATCH 3/9] removed masternode_engine. it was being updated but not queried --- .../tests/unit/test_type_conversions.rs | 1 - dash-spv/src/chain/chainlock_test.rs | 8 ++--- dash-spv/src/client/core.rs | 2 +- dash-spv/src/client/lifecycle.rs | 8 ++--- dash-spv/src/storage/chainstate.rs | 1 - dash-spv/src/sync/headers/manager.rs | 2 ++ dash-spv/src/types.rs | 29 ++----------------- dash-spv/tests/header_sync_test.rs | 4 +-- 8 files changed, 14 insertions(+), 41 deletions(-) diff --git a/dash-spv-ffi/tests/unit/test_type_conversions.rs b/dash-spv-ffi/tests/unit/test_type_conversions.rs index a899881b6..65cfc1a90 100644 --- a/dash-spv-ffi/tests/unit/test_type_conversions.rs +++ b/dash-spv-ffi/tests/unit/test_type_conversions.rs @@ -165,7 +165,6 @@ mod tests { let state = dash_spv::ChainState { last_chainlock_height: None, last_chainlock_hash: None, - masternode_engine: None, last_masternode_diff_height: None, sync_base_height: 0, }; diff --git a/dash-spv/src/chain/chainlock_test.rs b/dash-spv/src/chain/chainlock_test.rs index 96f3b2cae..b99d168c1 100644 --- a/dash-spv/src/chain/chainlock_test.rs +++ b/dash-spv/src/chain/chainlock_test.rs @@ -5,7 +5,7 @@ mod tests { storage::{BlockHeaderStorage, DiskStorageManager}, types::ChainState, }; - use dashcore::{Header, Network}; + use dashcore::{Header}; #[tokio::test] async fn test_chainlock_processing() { @@ -13,7 +13,7 @@ mod tests { let mut storage = DiskStorageManager::with_temp_dir().await.expect("Failed to create tmp storage"); let chainlock_manager = ChainLockManager::new(true); - let chain_state = ChainState::new_for_network(Network::Testnet); + let chain_state = ChainState::new_for_network(); let chainlock = ChainLock::dummy(1000); @@ -41,7 +41,7 @@ mod tests { let mut storage = DiskStorageManager::with_temp_dir().await.expect("Failed to create tmp storage"); let chainlock_manager = ChainLockManager::new(true); - let chain_state = ChainState::new_for_network(Network::Testnet); + let chain_state = ChainState::new_for_network(); let chainlock1 = ChainLock::dummy(1000); @@ -69,7 +69,7 @@ mod tests { #[tokio::test] async fn test_reorganization_protection() { let chainlock_manager = ChainLockManager::new(true); - let chain_state = ChainState::new_for_network(Network::Testnet); + let chain_state = ChainState::new_for_network(); let mut storage = DiskStorageManager::with_temp_dir().await.expect("Failed to create tmp storage"); diff --git a/dash-spv/src/client/core.rs b/dash-spv/src/client/core.rs index b41a7aeae..7dcdc4f1c 100644 --- a/dash-spv/src/client/core.rs +++ b/dash-spv/src/client/core.rs @@ -211,7 +211,7 @@ impl DashSpvClient DashSpvClient @@ -281,11 +281,7 @@ impl DashSpvClient { config: ClientConfig, tip_manager: ChainTipManager, checkpoint_manager: CheckpointManager, + reorg_config: ReorgConfig, chain_state: Arc>, syncing_headers: bool, last_sync_progress: std::time::Instant, @@ -76,6 +77,7 @@ impl HeaderSyncManager { config: config.clone(), tip_manager: ChainTipManager::new(reorg_config.max_forks), checkpoint_manager, + reorg_config, chain_state, syncing_headers: false, last_sync_progress: std::time::Instant::now(), diff --git a/dash-spv/src/types.rs b/dash-spv/src/types.rs index fccf27c45..a07de2739 100644 --- a/dash-spv/src/types.rs +++ b/dash-spv/src/types.rs @@ -20,8 +20,6 @@ use std::time::{Duration, Instant, SystemTime}; use dashcore::{ block::Header as BlockHeader, consensus::{Decodable, Encodable}, - network::constants::NetworkExt, - sml::masternode_list_engine::MasternodeListEngine, Amount, BlockHash, Network, Transaction, Txid, }; use serde::{Deserialize, Serialize}; @@ -264,9 +262,6 @@ pub struct ChainState { /// Last ChainLock hash. pub last_chainlock_hash: Option, - /// Masternode list engine. - pub masternode_engine: Option, - /// Last masternode diff height processed. pub last_masternode_diff_height: Option, @@ -281,16 +276,9 @@ impl ChainState { } /// Create a new chain state for the given network. - pub fn new_for_network(network: Network) -> Self { + pub fn new_for_network() -> Self { let mut state = Self::default(); - // Initialize masternode engine for the network - let mut engine = MasternodeListEngine::default_for_network(network); - if let Some(genesis_hash) = network.known_genesis_block_hash() { - engine.feed_block_height(0, genesis_hash); - } - state.masternode_engine = Some(engine); - // Initialize checkpoint fields state.sync_base_height = 0; @@ -335,26 +323,15 @@ impl ChainState { } /// Initialize chain state from a checkpoint. - pub fn init_from_checkpoint( - &mut self, - checkpoint_height: u32, - checkpoint_header: BlockHeader, - network: Network, - ) { + pub fn init_from_checkpoint(&mut self, checkpoint_height: u32, network: Network) { // Set sync base height to checkpoint self.sync_base_height = checkpoint_height; tracing::info!( - "Initialized ChainState from checkpoint - height: {}, hash: {}, network: {:?}", + "Initialized ChainState from checkpoint - height: {}, network: {:?}", checkpoint_height, - checkpoint_header.block_hash(), network ); - - // Initialize masternode engine for the network, starting from checkpoint - let mut engine = MasternodeListEngine::default_for_network(network); - engine.feed_block_height(checkpoint_height, checkpoint_header.block_hash()); - self.masternode_engine = Some(engine); } /// Get the absolute height for a given index in our headers vector. diff --git a/dash-spv/tests/header_sync_test.rs b/dash-spv/tests/header_sync_test.rs index 8714e4b26..d661fb04d 100644 --- a/dash-spv/tests/header_sync_test.rs +++ b/dash-spv/tests/header_sync_test.rs @@ -95,14 +95,14 @@ async fn test_prepare_sync(sync_base_height: u32, header_count: usize) { let expected_tip_hash = headers.last().unwrap().block_hash(); // Create and store chain state - let mut chain_state = ChainState::new_for_network(Network::Dash); + let mut chain_state = ChainState::new_for_network(); chain_state.sync_base_height = sync_base_height; storage.store_chain_state(&chain_state).await.expect("Failed to store chain state"); storage.store_headers(&headers).await.expect("Failed to store headers"); // Create HeaderSyncManager and load from storage let config = ClientConfig::new(Network::Dash); - let chain_state_arc = Arc::new(RwLock::new(ChainState::new_for_network(Network::Dash))); + let chain_state_arc = Arc::new(RwLock::new(ChainState::new_for_network())); let mut header_sync = HeaderSyncManager::::new( &config, ReorgConfig::default(), From 945503eb8996a12e741e12c8a63fed96fd9e104a Mon Sep 17 00:00:00 2001 From: Borja Castellano Date: Mon, 22 Dec 2025 17:25:43 +0000 Subject: [PATCH 4/9] removed last_masternode_diff_height and SyncProgress is created using the storage MasternodeState stored --- dash-spv-ffi/src/types.rs | 2 - .../tests/unit/test_type_conversions.rs | 2 - dash-spv/src/client/status_display.rs | 15 +- dash-spv/src/storage/chainstate.rs | 5 - dash-spv/src/storage/state.rs | 673 ++++++++++++++++++ dash-spv/src/types.rs | 4 - 6 files changed, 686 insertions(+), 15 deletions(-) create mode 100644 dash-spv/src/storage/state.rs diff --git a/dash-spv-ffi/src/types.rs b/dash-spv-ffi/src/types.rs index e05504e59..ef4c936fe 100644 --- a/dash-spv-ffi/src/types.rs +++ b/dash-spv-ffi/src/types.rs @@ -181,7 +181,6 @@ impl From for FFIDetailedSyncProgress { #[repr(C)] pub struct FFIChainState { - pub masternode_height: u32, pub last_chainlock_height: u32, pub last_chainlock_hash: FFIString, pub current_filter_tip: u32, @@ -190,7 +189,6 @@ pub struct FFIChainState { impl From for FFIChainState { fn from(state: ChainState) -> Self { FFIChainState { - masternode_height: state.last_masternode_diff_height.unwrap_or(0), last_chainlock_height: state.last_chainlock_height.unwrap_or(0), last_chainlock_hash: FFIString::new( &state.last_chainlock_hash.map(|h| h.to_string()).unwrap_or_default(), diff --git a/dash-spv-ffi/tests/unit/test_type_conversions.rs b/dash-spv-ffi/tests/unit/test_type_conversions.rs index 65cfc1a90..2c272a0f0 100644 --- a/dash-spv-ffi/tests/unit/test_type_conversions.rs +++ b/dash-spv-ffi/tests/unit/test_type_conversions.rs @@ -165,13 +165,11 @@ mod tests { let state = dash_spv::ChainState { last_chainlock_height: None, last_chainlock_hash: None, - last_masternode_diff_height: None, sync_base_height: 0, }; let ffi_state = FFIChainState::from(state); - assert_eq!(ffi_state.masternode_height, 0); assert_eq!(ffi_state.last_chainlock_height, 0); assert_eq!(ffi_state.current_filter_tip, 0); diff --git a/dash-spv/src/client/status_display.rs b/dash-spv/src/client/status_display.rs index cc3e9aee5..7a04910cc 100644 --- a/dash-spv/src/client/status_display.rs +++ b/dash-spv/src/client/status_display.rs @@ -115,16 +115,27 @@ impl<'a, S: StorageManager, W: WalletInterface> StatusDisplay<'a, S, W> { // Calculate the actual header height considering checkpoint sync let header_height = self.calculate_header_height(&state).await; - // Get filter header height from storage let storage = self.storage.lock().await; + + // Get filter header height from storage let filter_header_height = storage.get_filter_tip_height().await.ok().flatten().unwrap_or(0); + + // Get masternode height from storage + let masternode_height = storage + .load_masternode_state() + .await + .ok() + .flatten() + .map(|state| state.last_height) + .unwrap_or(0); + drop(storage); Ok(SyncProgress { header_height, filter_header_height, - masternode_height: state.last_masternode_diff_height.unwrap_or(0), + masternode_height, peer_count: 1, // TODO: Get from network manager filter_sync_available: false, // TODO: Get from network manager filters_downloaded: filters_received, diff --git a/dash-spv/src/storage/chainstate.rs b/dash-spv/src/storage/chainstate.rs index 207fbdceb..432c670ed 100644 --- a/dash-spv/src/storage/chainstate.rs +++ b/dash-spv/src/storage/chainstate.rs @@ -44,7 +44,6 @@ impl ChainStateStorage for PersistentChainStateStorage { let state_data = serde_json::json!({ "last_chainlock_height": state.last_chainlock_height, "last_chainlock_hash": state.last_chainlock_hash, - "last_masternode_diff_height": state.last_masternode_diff_height, "sync_base_height": state.sync_base_height, }); @@ -79,10 +78,6 @@ impl ChainStateStorage for PersistentChainStateStorage { .get("last_chainlock_hash") .and_then(|v| v.as_str()) .and_then(|s| s.parse().ok()), - last_masternode_diff_height: value - .get("last_masternode_diff_height") - .and_then(|v| v.as_u64()) - .map(|h| h as u32), sync_base_height: value .get("sync_base_height") .and_then(|v| v.as_u64()) diff --git a/dash-spv/src/storage/state.rs b/dash-spv/src/storage/state.rs new file mode 100644 index 000000000..09dce0ad2 --- /dev/null +++ b/dash-spv/src/storage/state.rs @@ -0,0 +1,673 @@ +//! State persistence and StorageManager trait implementation. + +use async_trait::async_trait; +use std::collections::HashMap; + +use dashcore::{block::Header as BlockHeader, BlockHash, Txid}; + +use crate::error::StorageResult; +use crate::storage::headers::save_index_to_disk; +use crate::storage::{MasternodeState, StorageManager}; +use crate::types::{ChainState, MempoolState, UnconfirmedTransaction}; + +use super::io::atomic_write; +use super::manager::DiskStorageManager; + +impl DiskStorageManager { + /// Store chain state to disk. + pub async fn store_chain_state(&mut self, state: &ChainState) -> StorageResult<()> { + // Store other state as JSON + let state_data = serde_json::json!({ + "last_chainlock_height": state.last_chainlock_height, + "last_chainlock_hash": state.last_chainlock_hash, + "sync_base_height": state.sync_base_height, + }); + + let path = self.base_path.join("state/chain.json"); + let json = state_data.to_string(); + atomic_write(&path, json.as_bytes()).await?; + + Ok(()) + } + + /// Load chain state from disk. + pub async fn load_chain_state(&self) -> StorageResult> { + let path = self.base_path.join("state/chain.json"); + if !path.exists() { + return Ok(None); + } + + let content = tokio::fs::read_to_string(path).await?; + let value: serde_json::Value = serde_json::from_str(&content).map_err(|e| { + crate::error::StorageError::Serialization(format!("Failed to parse chain state: {}", e)) + })?; + + let state = ChainState { + last_chainlock_height: value + .get("last_chainlock_height") + .and_then(|v| v.as_u64()) + .map(|h| h as u32), + last_chainlock_hash: value + .get("last_chainlock_hash") + .and_then(|v| v.as_str()) + .and_then(|s| s.parse().ok()), + sync_base_height: value + .get("sync_base_height") + .and_then(|v| v.as_u64()) + .map(|h| h as u32) + .unwrap_or(0), + }; + + Ok(Some(state)) + } + + /// Store masternode state. + pub async fn store_masternode_state(&mut self, state: &MasternodeState) -> StorageResult<()> { + let path = self.base_path.join("state/masternode.json"); + let json = serde_json::to_string_pretty(state).map_err(|e| { + crate::error::StorageError::Serialization(format!( + "Failed to serialize masternode state: {}", + e + )) + })?; + + atomic_write(&path, json.as_bytes()).await?; + Ok(()) + } + + /// Load masternode state. + pub async fn load_masternode_state(&self) -> StorageResult> { + let path = self.base_path.join("state/masternode.json"); + if !path.exists() { + return Ok(None); + } + + let content = tokio::fs::read_to_string(path).await?; + let state = serde_json::from_str(&content).map_err(|e| { + crate::error::StorageError::Serialization(format!( + "Failed to deserialize masternode state: {}", + e + )) + })?; + + Ok(Some(state)) + } + + /// Store a ChainLock. + pub async fn store_chain_lock( + &mut self, + height: u32, + chain_lock: &dashcore::ChainLock, + ) -> StorageResult<()> { + let path = self.base_path.join("chainlocks").join(format!("chainlock_{:08}.bin", height)); + let data = bincode::serialize(chain_lock).map_err(|e| { + crate::error::StorageError::WriteFailed(format!( + "Failed to serialize chain lock: {}", + e + )) + })?; + + atomic_write(&path, &data).await?; + tracing::debug!("Stored chain lock at height {}", height); + Ok(()) + } + + /// Load a ChainLock. + pub async fn load_chain_lock(&self, height: u32) -> StorageResult> { + let path = self.base_path.join("chainlocks").join(format!("chainlock_{:08}.bin", height)); + + if !path.exists() { + return Ok(None); + } + + let data = tokio::fs::read(&path).await?; + let chain_lock = bincode::deserialize(&data).map_err(|e| { + crate::error::StorageError::ReadFailed(format!( + "Failed to deserialize chain lock: {}", + e + )) + })?; + + Ok(Some(chain_lock)) + } + + /// Get ChainLocks in a height range. + pub async fn get_chain_locks( + &self, + start_height: u32, + end_height: u32, + ) -> StorageResult> { + let chainlocks_dir = self.base_path.join("chainlocks"); + + if !chainlocks_dir.exists() { + return Ok(Vec::new()); + } + + let mut chain_locks = Vec::new(); + let mut entries = tokio::fs::read_dir(&chainlocks_dir).await?; + + while let Some(entry) = entries.next_entry().await? { + let file_name = entry.file_name(); + let file_name_str = file_name.to_string_lossy(); + + // Parse height from filename + if let Some(height_str) = + file_name_str.strip_prefix("chainlock_").and_then(|s| s.strip_suffix(".bin")) + { + if let Ok(height) = height_str.parse::() { + if height >= start_height && height <= end_height { + let path = entry.path(); + let data = tokio::fs::read(&path).await?; + if let Ok(chain_lock) = bincode::deserialize(&data) { + chain_locks.push((height, chain_lock)); + } + } + } + } + } + + // Sort by height + chain_locks.sort_by_key(|(h, _)| *h); + Ok(chain_locks) + } + + /// Store metadata. + pub async fn store_metadata(&mut self, key: &str, value: &[u8]) -> StorageResult<()> { + let path = self.base_path.join(format!("state/{}.dat", key)); + atomic_write(&path, value).await?; + Ok(()) + } + + /// Load metadata. + pub async fn load_metadata(&self, key: &str) -> StorageResult>> { + let path = self.base_path.join(format!("state/{}.dat", key)); + if !path.exists() { + return Ok(None); + } + + let data = tokio::fs::read(path).await?; + Ok(Some(data)) + } + + /// Clear all storage. + pub async fn clear(&mut self) -> StorageResult<()> { + // First, stop the background worker to avoid races with file deletion + self.stop_worker(); + + // Clear in-memory state + self.block_headers.write().await.clear_in_memory(); + self.filter_headers.write().await.clear_in_memory(); + self.filters.write().await.clear_in_memory(); + + self.header_hash_index.write().await.clear(); + self.mempool_transactions.write().await.clear(); + *self.mempool_state.write().await = None; + + // Remove all files and directories under base_path + if self.base_path.exists() { + // Best-effort removal; if concurrent files appear, retry once + match tokio::fs::remove_dir_all(&self.base_path).await { + Ok(_) => {} + Err(e) => { + // Retry once after a short delay to handle transient races + if e.kind() == std::io::ErrorKind::Other + || e.kind() == std::io::ErrorKind::DirectoryNotEmpty + { + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + tokio::fs::remove_dir_all(&self.base_path).await?; + } else { + return Err(crate::error::StorageError::Io(e)); + } + } + } + tokio::fs::create_dir_all(&self.base_path).await?; + } + + // Recreate expected subdirectories + tokio::fs::create_dir_all(self.base_path.join("headers")).await?; + tokio::fs::create_dir_all(self.base_path.join("filters")).await?; + tokio::fs::create_dir_all(self.base_path.join("state")).await?; + + // Restart the background worker for future operations + self.start_worker().await; + + Ok(()) + } + + /// Shutdown the storage manager. + pub async fn shutdown(&mut self) { + self.stop_worker(); + + // Persist all dirty data + self.save_dirty().await; + } + + /// Save all dirty data. + pub(super) async fn save_dirty(&self) { + self.filter_headers.write().await.persist().await; + self.block_headers.write().await.persist().await; + self.filters.write().await.persist().await; + + let path = self.base_path.join("headers/index.dat"); + let index = self.header_hash_index.read().await; + if let Err(e) = save_index_to_disk(&path, &index).await { + tracing::error!("Failed to persist header index: {}", e); + } + } +} + +/// Mempool storage methods +impl DiskStorageManager { + /// Store a mempool transaction. + pub async fn store_mempool_transaction( + &mut self, + txid: &Txid, + tx: &UnconfirmedTransaction, + ) -> StorageResult<()> { + self.mempool_transactions.write().await.insert(*txid, tx.clone()); + Ok(()) + } + + /// Remove a mempool transaction. + pub async fn remove_mempool_transaction(&mut self, txid: &Txid) -> StorageResult<()> { + self.mempool_transactions.write().await.remove(txid); + Ok(()) + } + + /// Get a mempool transaction. + pub async fn get_mempool_transaction( + &self, + txid: &Txid, + ) -> StorageResult> { + Ok(self.mempool_transactions.read().await.get(txid).cloned()) + } + + /// Get all mempool transactions. + pub async fn get_all_mempool_transactions( + &self, + ) -> StorageResult> { + Ok(self.mempool_transactions.read().await.clone()) + } + + /// Store mempool state. + pub async fn store_mempool_state(&mut self, state: &MempoolState) -> StorageResult<()> { + *self.mempool_state.write().await = Some(state.clone()); + Ok(()) + } + + /// Load mempool state. + pub async fn load_mempool_state(&self) -> StorageResult> { + Ok(self.mempool_state.read().await.clone()) + } + + /// Clear mempool. + pub async fn clear_mempool(&mut self) -> StorageResult<()> { + self.mempool_transactions.write().await.clear(); + *self.mempool_state.write().await = None; + Ok(()) + } +} + +#[async_trait] +impl StorageManager for DiskStorageManager { + async fn store_headers(&mut self, headers: &[BlockHeader]) -> StorageResult<()> { + self.store_headers(headers).await + } + + async fn load_headers(&self, range: std::ops::Range) -> StorageResult> { + self.block_headers.write().await.get_items(range).await + } + + async fn get_header(&self, height: u32) -> StorageResult> { + if self.get_tip_height().await.is_none_or(|tip_height| height > tip_height) { + return Ok(None); + } + + if self.get_start_height().await.is_none_or(|start_height| height < start_height) { + return Ok(None); + } + + Ok(self.block_headers.write().await.get_items(height..height + 1).await?.first().copied()) + } + + async fn get_tip_height(&self) -> Option { + self.block_headers.read().await.tip_height() + } + + async fn get_start_height(&self) -> Option { + self.block_headers.read().await.start_height() + } + + async fn get_stored_headers_len(&self) -> u32 { + let headers_guard = self.block_headers.read().await; + let start_height = if let Some(start_height) = headers_guard.start_height() { + start_height + } else { + return 0; + }; + + let end_height = if let Some(end_height) = headers_guard.tip_height() { + end_height + } else { + return 0; + }; + + end_height - start_height + 1 + } + + async fn store_filter_headers( + &mut self, + headers: &[dashcore::hash_types::FilterHeader], + ) -> StorageResult<()> { + self.filter_headers.write().await.store_items(headers).await + } + + async fn load_filter_headers( + &self, + range: std::ops::Range, + ) -> StorageResult> { + self.filter_headers.write().await.get_items(range).await + } + + async fn get_filter_header( + &self, + height: u32, + ) -> StorageResult> { + Ok(self.filter_headers.write().await.get_items(height..height + 1).await?.first().copied()) + } + + async fn get_filter_tip_height(&self) -> StorageResult> { + Ok(self.filter_headers.read().await.tip_height()) + } + + async fn store_masternode_state(&mut self, state: &MasternodeState) -> StorageResult<()> { + Self::store_masternode_state(self, state).await + } + + async fn load_masternode_state(&self) -> StorageResult> { + Self::load_masternode_state(self).await + } + + async fn store_chain_state(&mut self, state: &ChainState) -> StorageResult<()> { + Self::store_chain_state(self, state).await + } + + async fn load_chain_state(&self) -> StorageResult> { + Self::load_chain_state(self).await + } + + async fn store_filter(&mut self, height: u32, filter: &[u8]) -> StorageResult<()> { + self.filters.write().await.store_items_at_height(&[filter.to_vec()], height).await + } + + async fn load_filters(&self, range: std::ops::Range) -> StorageResult>> { + self.filters.write().await.get_items(range).await + } + + async fn store_metadata(&mut self, key: &str, value: &[u8]) -> StorageResult<()> { + Self::store_metadata(self, key, value).await + } + + async fn load_metadata(&self, key: &str) -> StorageResult>> { + Self::load_metadata(self, key).await + } + + async fn clear(&mut self) -> StorageResult<()> { + Self::clear(self).await + } + + async fn clear_filters(&mut self) -> StorageResult<()> { + // Stop worker to prevent concurrent writes to filter directories + self.stop_worker(); + + // Clear in-memory and on-disk filter headers segments + self.filter_headers.write().await.clear_all().await?; + self.filters.write().await.clear_all().await?; + + // Restart background worker for future operations + self.start_worker().await; + + Ok(()) + } + + async fn get_header_height_by_hash(&self, hash: &BlockHash) -> StorageResult> { + Self::get_header_height_by_hash(self, hash).await + } + + async fn store_chain_lock( + &mut self, + height: u32, + chain_lock: &dashcore::ChainLock, + ) -> StorageResult<()> { + Self::store_chain_lock(self, height, chain_lock).await + } + + async fn load_chain_lock(&self, height: u32) -> StorageResult> { + Self::load_chain_lock(self, height).await + } + + async fn get_chain_locks( + &self, + start_height: u32, + end_height: u32, + ) -> StorageResult> { + Self::get_chain_locks(self, start_height, end_height).await + } + + async fn store_mempool_transaction( + &mut self, + txid: &Txid, + tx: &UnconfirmedTransaction, + ) -> StorageResult<()> { + Self::store_mempool_transaction(self, txid, tx).await + } + + async fn remove_mempool_transaction(&mut self, txid: &Txid) -> StorageResult<()> { + Self::remove_mempool_transaction(self, txid).await + } + + async fn get_mempool_transaction( + &self, + txid: &Txid, + ) -> StorageResult> { + Self::get_mempool_transaction(self, txid).await + } + + async fn get_all_mempool_transactions( + &self, + ) -> StorageResult> { + Self::get_all_mempool_transactions(self).await + } + + async fn store_mempool_state(&mut self, state: &MempoolState) -> StorageResult<()> { + Self::store_mempool_state(self, state).await + } + + async fn load_mempool_state(&self) -> StorageResult> { + Self::load_mempool_state(self).await + } + + async fn clear_mempool(&mut self) -> StorageResult<()> { + Self::clear_mempool(self).await + } + + async fn shutdown(&mut self) -> StorageResult<()> { + Self::shutdown(self).await; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use dashcore::{block::Version, pow::CompactTarget}; + use dashcore_hashes::Hash; + use tempfile::TempDir; + + fn build_headers(count: usize) -> Vec { + let mut headers = Vec::with_capacity(count); + let mut prev_hash = BlockHash::from_byte_array([0u8; 32]); + + for i in 0..count { + let header = BlockHeader { + version: Version::from_consensus(1), + prev_blockhash: prev_hash, + merkle_root: dashcore::hashes::sha256d::Hash::from_byte_array( + [(i % 255) as u8; 32], + ) + .into(), + time: 1 + i as u32, + bits: CompactTarget::from_consensus(0x1d00ffff), + nonce: i as u32, + }; + prev_hash = header.block_hash(); + headers.push(header); + } + + headers + } + + #[tokio::test] + async fn test_load_headers() -> Result<(), Box> { + // Create a temporary directory for the test + let temp_dir = TempDir::new()?; + let mut storage = DiskStorageManager::new(temp_dir.path().to_path_buf()) + .await + .expect("Unable to create storage"); + + // Create a test header + let test_header = BlockHeader { + version: Version::from_consensus(1), + prev_blockhash: BlockHash::from_byte_array([1; 32]), + merkle_root: dashcore::hashes::sha256d::Hash::from_byte_array([2; 32]).into(), + time: 12345, + bits: CompactTarget::from_consensus(0x1d00ffff), + nonce: 67890, + }; + + // Store just one header + storage.store_headers(&[test_header]).await?; + + let loaded_headers = storage.load_headers(0..1).await?; + + // Should only get back the one header we stored + assert_eq!(loaded_headers.len(), 1); + assert_eq!(loaded_headers[0], test_header); + + Ok(()) + } + + #[tokio::test] + async fn test_checkpoint_storage_indexing() -> StorageResult<()> { + use dashcore::TxMerkleNode; + use tempfile::tempdir; + + let temp_dir = tempdir().expect("Failed to create temp dir"); + let mut storage = DiskStorageManager::new(temp_dir.path().to_path_buf()).await?; + + // Create test headers starting from checkpoint height + let checkpoint_height = 1_100_000; + let headers: Vec = (0..100) + .map(|i| BlockHeader { + version: Version::from_consensus(1), + prev_blockhash: BlockHash::from_byte_array([i as u8; 32]), + merkle_root: TxMerkleNode::from_byte_array([(i + 1) as u8; 32]), + time: 1234567890 + i, + bits: CompactTarget::from_consensus(0x1a2b3c4d), + nonce: 67890 + i, + }) + .collect(); + + let mut base_state = ChainState::new(); + base_state.sync_base_height = checkpoint_height; + storage.store_chain_state(&base_state).await?; + + storage.store_headers_at_height(&headers, checkpoint_height).await?; + assert_eq!(storage.get_stored_headers_len().await, headers.len() as u32); + + // Verify headers are stored at correct blockchain heights + let header_at_base = storage.get_header(checkpoint_height).await?; + assert_eq!( + header_at_base.expect("Header at base blockchain height should exist"), + headers[0] + ); + + let header_at_ending = storage.get_header(checkpoint_height + 99).await?; + assert_eq!( + header_at_ending.expect("Header at ending blockchain height should exist"), + headers[99] + ); + + // Test the reverse index (hash -> blockchain height) + let hash_0 = headers[0].block_hash(); + let height_0 = storage.get_header_height_by_hash(&hash_0).await?; + assert_eq!( + height_0, + Some(checkpoint_height), + "Hash should map to blockchain height 1,100,000" + ); + + let hash_99 = headers[99].block_hash(); + let height_99 = storage.get_header_height_by_hash(&hash_99).await?; + assert_eq!( + height_99, + Some(checkpoint_height + 99), + "Hash should map to blockchain height 1,100,099" + ); + + // Store chain state to persist sync_base_height + let mut chain_state = ChainState::new(); + chain_state.sync_base_height = checkpoint_height; + storage.store_chain_state(&chain_state).await?; + + // Force save to disk + storage.save_dirty().await; + + drop(storage); + + // Create a new storage instance to test index rebuilding + let storage2 = DiskStorageManager::new(temp_dir.path().to_path_buf()).await?; + + // Verify the index was rebuilt correctly + let height_after_rebuild = storage2.get_header_height_by_hash(&hash_0).await?; + assert_eq!( + height_after_rebuild, + Some(checkpoint_height), + "After index rebuild, hash should still map to blockchain height 1,100,000" + ); + + // Verify header can still be retrieved by blockchain height after reload + let header_after_reload = storage2.get_header(checkpoint_height).await?; + assert!( + header_after_reload.is_some(), + "Header at base blockchain height should exist after reload" + ); + assert_eq!(header_after_reload.unwrap(), headers[0]); + + Ok(()) + } + + #[tokio::test] + async fn test_shutdown_flushes_index() -> Result<(), Box> { + let temp_dir = TempDir::new()?; + let base_path = temp_dir.path().to_path_buf(); + let headers = build_headers(11_000); + let last_hash = headers.last().unwrap().block_hash(); + + { + let mut storage = DiskStorageManager::new(base_path.clone()).await?; + + storage.store_headers(&headers[..10_000]).await?; + storage.save_dirty().await; + + storage.store_headers(&headers[10_000..]).await?; + storage.shutdown().await; + } + + let storage = DiskStorageManager::new(base_path).await?; + let height = storage.get_header_height_by_hash(&last_hash).await?; + assert_eq!(height, Some(10_999)); + + Ok(()) + } +} diff --git a/dash-spv/src/types.rs b/dash-spv/src/types.rs index a07de2739..f121cfa49 100644 --- a/dash-spv/src/types.rs +++ b/dash-spv/src/types.rs @@ -262,9 +262,6 @@ pub struct ChainState { /// Last ChainLock hash. pub last_chainlock_hash: Option, - /// Last masternode diff height processed. - pub last_masternode_diff_height: Option, - /// Base height when syncing from a checkpoint (0 if syncing from genesis). pub sync_base_height: u32, } @@ -354,7 +351,6 @@ impl std::fmt::Debug for ChainState { f.debug_struct("ChainState") .field("last_chainlock_height", &self.last_chainlock_height) .field("last_chainlock_hash", &self.last_chainlock_hash) - .field("last_masternode_diff_height", &self.last_masternode_diff_height) .field("sync_base_height", &self.sync_base_height) .finish() } From 0fdaeee146eec66fc574d830b9e040566345a8dd Mon Sep 17 00:00:00 2001 From: Borja Castellano Date: Mon, 5 Jan 2026 17:47:12 +0000 Subject: [PATCH 5/9] removed new_for_network ChainState constructor --- dash-spv/src/chain/chainlock_test.rs | 6 +++--- dash-spv/src/client/core.rs | 2 +- dash-spv/src/client/lifecycle.rs | 2 +- dash-spv/src/types.rs | 10 ---------- dash-spv/tests/header_sync_test.rs | 4 ++-- 5 files changed, 7 insertions(+), 17 deletions(-) diff --git a/dash-spv/src/chain/chainlock_test.rs b/dash-spv/src/chain/chainlock_test.rs index b99d168c1..f76185fb0 100644 --- a/dash-spv/src/chain/chainlock_test.rs +++ b/dash-spv/src/chain/chainlock_test.rs @@ -13,7 +13,7 @@ mod tests { let mut storage = DiskStorageManager::with_temp_dir().await.expect("Failed to create tmp storage"); let chainlock_manager = ChainLockManager::new(true); - let chain_state = ChainState::new_for_network(); + let chain_state = ChainState::new(); let chainlock = ChainLock::dummy(1000); @@ -41,7 +41,7 @@ mod tests { let mut storage = DiskStorageManager::with_temp_dir().await.expect("Failed to create tmp storage"); let chainlock_manager = ChainLockManager::new(true); - let chain_state = ChainState::new_for_network(); + let chain_state = ChainState::new(); let chainlock1 = ChainLock::dummy(1000); @@ -69,7 +69,7 @@ mod tests { #[tokio::test] async fn test_reorganization_protection() { let chainlock_manager = ChainLockManager::new(true); - let chain_state = ChainState::new_for_network(); + let chain_state = ChainState::new(); let mut storage = DiskStorageManager::with_temp_dir().await.expect("Failed to create tmp storage"); diff --git a/dash-spv/src/client/core.rs b/dash-spv/src/client/core.rs index 7dcdc4f1c..99ea3b619 100644 --- a/dash-spv/src/client/core.rs +++ b/dash-spv/src/client/core.rs @@ -211,7 +211,7 @@ impl DashSpvClient DashSpvClient diff --git a/dash-spv/src/types.rs b/dash-spv/src/types.rs index f121cfa49..e50d3b383 100644 --- a/dash-spv/src/types.rs +++ b/dash-spv/src/types.rs @@ -272,16 +272,6 @@ impl ChainState { Self::default() } - /// Create a new chain state for the given network. - pub fn new_for_network() -> Self { - let mut state = Self::default(); - - // Initialize checkpoint fields - state.sync_base_height = 0; - - state - } - /// Whether the chain was synced from a checkpoint rather than genesis. pub fn synced_from_checkpoint(&self) -> bool { self.sync_base_height > 0 diff --git a/dash-spv/tests/header_sync_test.rs b/dash-spv/tests/header_sync_test.rs index d661fb04d..2c33c3b97 100644 --- a/dash-spv/tests/header_sync_test.rs +++ b/dash-spv/tests/header_sync_test.rs @@ -95,14 +95,14 @@ async fn test_prepare_sync(sync_base_height: u32, header_count: usize) { let expected_tip_hash = headers.last().unwrap().block_hash(); // Create and store chain state - let mut chain_state = ChainState::new_for_network(); + let mut chain_state = ChainState::new(); chain_state.sync_base_height = sync_base_height; storage.store_chain_state(&chain_state).await.expect("Failed to store chain state"); storage.store_headers(&headers).await.expect("Failed to store headers"); // Create HeaderSyncManager and load from storage let config = ClientConfig::new(Network::Dash); - let chain_state_arc = Arc::new(RwLock::new(ChainState::new_for_network())); + let chain_state_arc = Arc::new(RwLock::new(ChainState::new())); let mut header_sync = HeaderSyncManager::::new( &config, ReorgConfig::default(), From 460a2d4fb4d5846fc563063665f7297cd0382bed Mon Sep 17 00:00:00 2001 From: Borja Castellano Date: Mon, 5 Jan 2026 17:54:13 +0000 Subject: [PATCH 6/9] removed network param from initi_from_checkpoint --- dash-spv/src/client/lifecycle.rs | 2 +- dash-spv/src/types.rs | 10 +++------- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/dash-spv/src/client/lifecycle.rs b/dash-spv/src/client/lifecycle.rs index 3596a1569..3a6d22dbe 100644 --- a/dash-spv/src/client/lifecycle.rs +++ b/dash-spv/src/client/lifecycle.rs @@ -281,7 +281,7 @@ impl DashSpvClient Date: Mon, 5 Jan 2026 18:09:05 +0000 Subject: [PATCH 7/9] removed unused ChainState methods --- dash-spv/src/types.rs | 51 ------------------------------------------- 1 file changed, 51 deletions(-) diff --git a/dash-spv/src/types.rs b/dash-spv/src/types.rs index 22a577fa0..9d7d6b337 100644 --- a/dash-spv/src/types.rs +++ b/dash-spv/src/types.rs @@ -272,43 +272,6 @@ impl ChainState { Self::default() } - /// Whether the chain was synced from a checkpoint rather than genesis. - pub fn synced_from_checkpoint(&self) -> bool { - self.sync_base_height > 0 - } - - /// Update chain lock status - pub fn update_chain_lock(&mut self, height: u32, hash: BlockHash) { - // Only update if this is a newer chain lock - if self.last_chainlock_height.is_none_or(|h| height > h) { - self.last_chainlock_height = Some(height); - self.last_chainlock_hash = Some(hash); - } - } - - /// Check if a block at given height is chain-locked - pub fn is_height_chain_locked(&self, height: u32) -> bool { - self.last_chainlock_height.is_some_and(|locked_height| height <= locked_height) - } - - /// Check if we have a chain lock - pub fn has_chain_lock(&self) -> bool { - self.last_chainlock_height.is_some() - } - - /// Get the last chain-locked height - pub fn get_last_chainlock_height(&self) -> Option { - self.last_chainlock_height - } - - /// Get filter matched heights (placeholder for now) - /// In a real implementation, this would track heights where filters matched wallet transactions - pub fn get_filter_matched_heights(&self) -> Option> { - // For now, return an empty vector as we don't track this yet - // This would typically be populated during filter sync when matches are found - Some(Vec::new()) - } - /// Initialize chain state from a checkpoint. pub fn init_from_checkpoint(&mut self, checkpoint_height: u32) { // Set sync base height to checkpoint @@ -316,20 +279,6 @@ impl ChainState { tracing::info!("Initialized ChainState from checkpoint - height: {}", checkpoint_height); } - - /// Get the absolute height for a given index in our headers vector. - pub fn index_to_height(&self, index: usize) -> u32 { - self.sync_base_height + index as u32 - } - - /// Get the index in our headers vector for a given absolute height. - pub fn height_to_index(&self, height: u32) -> Option { - if height < self.sync_base_height { - None - } else { - Some((height - self.sync_base_height) as usize) - } - } } impl std::fmt::Debug for ChainState { From 6726a0692b720b949efe02001e2dbfe3cbc0e68f Mon Sep 17 00:00:00 2001 From: Borja Castellano Date: Mon, 5 Jan 2026 18:12:45 +0000 Subject: [PATCH 8/9] removed last_chainlock_hash field, only updated and persisted --- dash-spv-ffi/src/types.rs | 4 ---- dash-spv-ffi/tests/unit/test_type_conversions.rs | 7 ------- dash-spv/src/client/chainlock.rs | 1 - dash-spv/src/storage/state.rs | 5 ----- dash-spv/src/types.rs | 4 ---- 5 files changed, 21 deletions(-) diff --git a/dash-spv-ffi/src/types.rs b/dash-spv-ffi/src/types.rs index ef4c936fe..3f62709e7 100644 --- a/dash-spv-ffi/src/types.rs +++ b/dash-spv-ffi/src/types.rs @@ -182,7 +182,6 @@ impl From for FFIDetailedSyncProgress { #[repr(C)] pub struct FFIChainState { pub last_chainlock_height: u32, - pub last_chainlock_hash: FFIString, pub current_filter_tip: u32, } @@ -190,9 +189,6 @@ impl From for FFIChainState { fn from(state: ChainState) -> Self { FFIChainState { last_chainlock_height: state.last_chainlock_height.unwrap_or(0), - last_chainlock_hash: FFIString::new( - &state.last_chainlock_hash.map(|h| h.to_string()).unwrap_or_default(), - ), current_filter_tip: 0, // FilterHeader not directly convertible to u32 } } diff --git a/dash-spv-ffi/tests/unit/test_type_conversions.rs b/dash-spv-ffi/tests/unit/test_type_conversions.rs index 2c272a0f0..c83094b7b 100644 --- a/dash-spv-ffi/tests/unit/test_type_conversions.rs +++ b/dash-spv-ffi/tests/unit/test_type_conversions.rs @@ -164,7 +164,6 @@ mod tests { fn test_chain_state_none_values() { let state = dash_spv::ChainState { last_chainlock_height: None, - last_chainlock_hash: None, sync_base_height: 0, }; @@ -172,12 +171,6 @@ mod tests { assert_eq!(ffi_state.last_chainlock_height, 0); assert_eq!(ffi_state.current_filter_tip, 0); - - unsafe { - let hash_str = FFIString::from_ptr(ffi_state.last_chainlock_hash.ptr).unwrap(); - assert_eq!(hash_str, ""); - dash_spv_ffi_string_destroy(ffi_state.last_chainlock_hash); - } } #[test] diff --git a/dash-spv/src/client/chainlock.rs b/dash-spv/src/client/chainlock.rs index 2c374ca4c..2dc989191 100644 --- a/dash-spv/src/client/chainlock.rs +++ b/dash-spv/src/client/chainlock.rs @@ -66,7 +66,6 @@ impl DashSpvClient, - /// Last ChainLock hash. - pub last_chainlock_hash: Option, - /// Base height when syncing from a checkpoint (0 if syncing from genesis). pub sync_base_height: u32, } @@ -285,7 +282,6 @@ impl std::fmt::Debug for ChainState { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ChainState") .field("last_chainlock_height", &self.last_chainlock_height) - .field("last_chainlock_hash", &self.last_chainlock_hash) .field("sync_base_height", &self.sync_base_height) .finish() } From 8127425e2223e782a25ad92754e8d78e95bbb877 Mon Sep 17 00:00:00 2001 From: Borja Castellano Date: Thu, 8 Jan 2026 21:16:28 +0000 Subject: [PATCH 9/9] removed not existing field ChainState.last_chainlock_height from storage logic --- dash-spv/src/chain/chainlock_test.rs | 2 +- dash-spv/src/storage/chainstate.rs | 5 - dash-spv/src/storage/state.rs | 668 --------------------------- 3 files changed, 1 insertion(+), 674 deletions(-) delete mode 100644 dash-spv/src/storage/state.rs diff --git a/dash-spv/src/chain/chainlock_test.rs b/dash-spv/src/chain/chainlock_test.rs index f76185fb0..b153296b6 100644 --- a/dash-spv/src/chain/chainlock_test.rs +++ b/dash-spv/src/chain/chainlock_test.rs @@ -5,7 +5,7 @@ mod tests { storage::{BlockHeaderStorage, DiskStorageManager}, types::ChainState, }; - use dashcore::{Header}; + use dashcore::Header; #[tokio::test] async fn test_chainlock_processing() { diff --git a/dash-spv/src/storage/chainstate.rs b/dash-spv/src/storage/chainstate.rs index 432c670ed..dc44f32a0 100644 --- a/dash-spv/src/storage/chainstate.rs +++ b/dash-spv/src/storage/chainstate.rs @@ -43,7 +43,6 @@ impl ChainStateStorage for PersistentChainStateStorage { async fn store_chain_state(&mut self, state: &ChainState) -> StorageResult<()> { let state_data = serde_json::json!({ "last_chainlock_height": state.last_chainlock_height, - "last_chainlock_hash": state.last_chainlock_hash, "sync_base_height": state.sync_base_height, }); @@ -74,10 +73,6 @@ impl ChainStateStorage for PersistentChainStateStorage { .get("last_chainlock_height") .and_then(|v| v.as_u64()) .map(|h| h as u32), - last_chainlock_hash: value - .get("last_chainlock_hash") - .and_then(|v| v.as_str()) - .and_then(|s| s.parse().ok()), sync_base_height: value .get("sync_base_height") .and_then(|v| v.as_u64()) diff --git a/dash-spv/src/storage/state.rs b/dash-spv/src/storage/state.rs deleted file mode 100644 index f5691bf30..000000000 --- a/dash-spv/src/storage/state.rs +++ /dev/null @@ -1,668 +0,0 @@ -//! State persistence and StorageManager trait implementation. - -use async_trait::async_trait; -use std::collections::HashMap; - -use dashcore::{block::Header as BlockHeader, BlockHash, Txid}; - -use crate::error::StorageResult; -use crate::storage::headers::save_index_to_disk; -use crate::storage::{MasternodeState, StorageManager}; -use crate::types::{ChainState, MempoolState, UnconfirmedTransaction}; - -use super::io::atomic_write; -use super::manager::DiskStorageManager; - -impl DiskStorageManager { - /// Store chain state to disk. - pub async fn store_chain_state(&mut self, state: &ChainState) -> StorageResult<()> { - // Store other state as JSON - let state_data = serde_json::json!({ - "last_chainlock_height": state.last_chainlock_height, - "sync_base_height": state.sync_base_height, - }); - - let path = self.base_path.join("state/chain.json"); - let json = state_data.to_string(); - atomic_write(&path, json.as_bytes()).await?; - - Ok(()) - } - - /// Load chain state from disk. - pub async fn load_chain_state(&self) -> StorageResult> { - let path = self.base_path.join("state/chain.json"); - if !path.exists() { - return Ok(None); - } - - let content = tokio::fs::read_to_string(path).await?; - let value: serde_json::Value = serde_json::from_str(&content).map_err(|e| { - crate::error::StorageError::Serialization(format!("Failed to parse chain state: {}", e)) - })?; - - let state = ChainState { - last_chainlock_height: value - .get("last_chainlock_height") - .and_then(|v| v.as_u64()) - .map(|h| h as u32), - sync_base_height: value - .get("sync_base_height") - .and_then(|v| v.as_u64()) - .map(|h| h as u32) - .unwrap_or(0), - }; - - Ok(Some(state)) - } - - /// Store masternode state. - pub async fn store_masternode_state(&mut self, state: &MasternodeState) -> StorageResult<()> { - let path = self.base_path.join("state/masternode.json"); - let json = serde_json::to_string_pretty(state).map_err(|e| { - crate::error::StorageError::Serialization(format!( - "Failed to serialize masternode state: {}", - e - )) - })?; - - atomic_write(&path, json.as_bytes()).await?; - Ok(()) - } - - /// Load masternode state. - pub async fn load_masternode_state(&self) -> StorageResult> { - let path = self.base_path.join("state/masternode.json"); - if !path.exists() { - return Ok(None); - } - - let content = tokio::fs::read_to_string(path).await?; - let state = serde_json::from_str(&content).map_err(|e| { - crate::error::StorageError::Serialization(format!( - "Failed to deserialize masternode state: {}", - e - )) - })?; - - Ok(Some(state)) - } - - /// Store a ChainLock. - pub async fn store_chain_lock( - &mut self, - height: u32, - chain_lock: &dashcore::ChainLock, - ) -> StorageResult<()> { - let path = self.base_path.join("chainlocks").join(format!("chainlock_{:08}.bin", height)); - let data = bincode::serialize(chain_lock).map_err(|e| { - crate::error::StorageError::WriteFailed(format!( - "Failed to serialize chain lock: {}", - e - )) - })?; - - atomic_write(&path, &data).await?; - tracing::debug!("Stored chain lock at height {}", height); - Ok(()) - } - - /// Load a ChainLock. - pub async fn load_chain_lock(&self, height: u32) -> StorageResult> { - let path = self.base_path.join("chainlocks").join(format!("chainlock_{:08}.bin", height)); - - if !path.exists() { - return Ok(None); - } - - let data = tokio::fs::read(&path).await?; - let chain_lock = bincode::deserialize(&data).map_err(|e| { - crate::error::StorageError::ReadFailed(format!( - "Failed to deserialize chain lock: {}", - e - )) - })?; - - Ok(Some(chain_lock)) - } - - /// Get ChainLocks in a height range. - pub async fn get_chain_locks( - &self, - start_height: u32, - end_height: u32, - ) -> StorageResult> { - let chainlocks_dir = self.base_path.join("chainlocks"); - - if !chainlocks_dir.exists() { - return Ok(Vec::new()); - } - - let mut chain_locks = Vec::new(); - let mut entries = tokio::fs::read_dir(&chainlocks_dir).await?; - - while let Some(entry) = entries.next_entry().await? { - let file_name = entry.file_name(); - let file_name_str = file_name.to_string_lossy(); - - // Parse height from filename - if let Some(height_str) = - file_name_str.strip_prefix("chainlock_").and_then(|s| s.strip_suffix(".bin")) - { - if let Ok(height) = height_str.parse::() { - if height >= start_height && height <= end_height { - let path = entry.path(); - let data = tokio::fs::read(&path).await?; - if let Ok(chain_lock) = bincode::deserialize(&data) { - chain_locks.push((height, chain_lock)); - } - } - } - } - } - - // Sort by height - chain_locks.sort_by_key(|(h, _)| *h); - Ok(chain_locks) - } - - /// Store metadata. - pub async fn store_metadata(&mut self, key: &str, value: &[u8]) -> StorageResult<()> { - let path = self.base_path.join(format!("state/{}.dat", key)); - atomic_write(&path, value).await?; - Ok(()) - } - - /// Load metadata. - pub async fn load_metadata(&self, key: &str) -> StorageResult>> { - let path = self.base_path.join(format!("state/{}.dat", key)); - if !path.exists() { - return Ok(None); - } - - let data = tokio::fs::read(path).await?; - Ok(Some(data)) - } - - /// Clear all storage. - pub async fn clear(&mut self) -> StorageResult<()> { - // First, stop the background worker to avoid races with file deletion - self.stop_worker(); - - // Clear in-memory state - self.block_headers.write().await.clear_in_memory(); - self.filter_headers.write().await.clear_in_memory(); - self.filters.write().await.clear_in_memory(); - - self.header_hash_index.write().await.clear(); - self.mempool_transactions.write().await.clear(); - *self.mempool_state.write().await = None; - - // Remove all files and directories under base_path - if self.base_path.exists() { - // Best-effort removal; if concurrent files appear, retry once - match tokio::fs::remove_dir_all(&self.base_path).await { - Ok(_) => {} - Err(e) => { - // Retry once after a short delay to handle transient races - if e.kind() == std::io::ErrorKind::Other - || e.kind() == std::io::ErrorKind::DirectoryNotEmpty - { - tokio::time::sleep(std::time::Duration::from_millis(50)).await; - tokio::fs::remove_dir_all(&self.base_path).await?; - } else { - return Err(crate::error::StorageError::Io(e)); - } - } - } - tokio::fs::create_dir_all(&self.base_path).await?; - } - - // Recreate expected subdirectories - tokio::fs::create_dir_all(self.base_path.join("headers")).await?; - tokio::fs::create_dir_all(self.base_path.join("filters")).await?; - tokio::fs::create_dir_all(self.base_path.join("state")).await?; - - // Restart the background worker for future operations - self.start_worker().await; - - Ok(()) - } - - /// Shutdown the storage manager. - pub async fn shutdown(&mut self) { - self.stop_worker(); - - // Persist all dirty data - self.save_dirty().await; - } - - /// Save all dirty data. - pub(super) async fn save_dirty(&self) { - self.filter_headers.write().await.persist().await; - self.block_headers.write().await.persist().await; - self.filters.write().await.persist().await; - - let path = self.base_path.join("headers/index.dat"); - let index = self.header_hash_index.read().await; - if let Err(e) = save_index_to_disk(&path, &index).await { - tracing::error!("Failed to persist header index: {}", e); - } - } -} - -/// Mempool storage methods -impl DiskStorageManager { - /// Store a mempool transaction. - pub async fn store_mempool_transaction( - &mut self, - txid: &Txid, - tx: &UnconfirmedTransaction, - ) -> StorageResult<()> { - self.mempool_transactions.write().await.insert(*txid, tx.clone()); - Ok(()) - } - - /// Remove a mempool transaction. - pub async fn remove_mempool_transaction(&mut self, txid: &Txid) -> StorageResult<()> { - self.mempool_transactions.write().await.remove(txid); - Ok(()) - } - - /// Get a mempool transaction. - pub async fn get_mempool_transaction( - &self, - txid: &Txid, - ) -> StorageResult> { - Ok(self.mempool_transactions.read().await.get(txid).cloned()) - } - - /// Get all mempool transactions. - pub async fn get_all_mempool_transactions( - &self, - ) -> StorageResult> { - Ok(self.mempool_transactions.read().await.clone()) - } - - /// Store mempool state. - pub async fn store_mempool_state(&mut self, state: &MempoolState) -> StorageResult<()> { - *self.mempool_state.write().await = Some(state.clone()); - Ok(()) - } - - /// Load mempool state. - pub async fn load_mempool_state(&self) -> StorageResult> { - Ok(self.mempool_state.read().await.clone()) - } - - /// Clear mempool. - pub async fn clear_mempool(&mut self) -> StorageResult<()> { - self.mempool_transactions.write().await.clear(); - *self.mempool_state.write().await = None; - Ok(()) - } -} - -#[async_trait] -impl StorageManager for DiskStorageManager { - async fn store_headers(&mut self, headers: &[BlockHeader]) -> StorageResult<()> { - self.store_headers(headers).await - } - - async fn load_headers(&self, range: std::ops::Range) -> StorageResult> { - self.block_headers.write().await.get_items(range).await - } - - async fn get_header(&self, height: u32) -> StorageResult> { - if self.get_tip_height().await.is_none_or(|tip_height| height > tip_height) { - return Ok(None); - } - - if self.get_start_height().await.is_none_or(|start_height| height < start_height) { - return Ok(None); - } - - Ok(self.block_headers.write().await.get_items(height..height + 1).await?.first().copied()) - } - - async fn get_tip_height(&self) -> Option { - self.block_headers.read().await.tip_height() - } - - async fn get_start_height(&self) -> Option { - self.block_headers.read().await.start_height() - } - - async fn get_stored_headers_len(&self) -> u32 { - let headers_guard = self.block_headers.read().await; - let start_height = if let Some(start_height) = headers_guard.start_height() { - start_height - } else { - return 0; - }; - - let end_height = if let Some(end_height) = headers_guard.tip_height() { - end_height - } else { - return 0; - }; - - end_height - start_height + 1 - } - - async fn store_filter_headers( - &mut self, - headers: &[dashcore::hash_types::FilterHeader], - ) -> StorageResult<()> { - self.filter_headers.write().await.store_items(headers).await - } - - async fn load_filter_headers( - &self, - range: std::ops::Range, - ) -> StorageResult> { - self.filter_headers.write().await.get_items(range).await - } - - async fn get_filter_header( - &self, - height: u32, - ) -> StorageResult> { - Ok(self.filter_headers.write().await.get_items(height..height + 1).await?.first().copied()) - } - - async fn get_filter_tip_height(&self) -> StorageResult> { - Ok(self.filter_headers.read().await.tip_height()) - } - - async fn store_masternode_state(&mut self, state: &MasternodeState) -> StorageResult<()> { - Self::store_masternode_state(self, state).await - } - - async fn load_masternode_state(&self) -> StorageResult> { - Self::load_masternode_state(self).await - } - - async fn store_chain_state(&mut self, state: &ChainState) -> StorageResult<()> { - Self::store_chain_state(self, state).await - } - - async fn load_chain_state(&self) -> StorageResult> { - Self::load_chain_state(self).await - } - - async fn store_filter(&mut self, height: u32, filter: &[u8]) -> StorageResult<()> { - self.filters.write().await.store_items_at_height(&[filter.to_vec()], height).await - } - - async fn load_filters(&self, range: std::ops::Range) -> StorageResult>> { - self.filters.write().await.get_items(range).await - } - - async fn store_metadata(&mut self, key: &str, value: &[u8]) -> StorageResult<()> { - Self::store_metadata(self, key, value).await - } - - async fn load_metadata(&self, key: &str) -> StorageResult>> { - Self::load_metadata(self, key).await - } - - async fn clear(&mut self) -> StorageResult<()> { - Self::clear(self).await - } - - async fn clear_filters(&mut self) -> StorageResult<()> { - // Stop worker to prevent concurrent writes to filter directories - self.stop_worker(); - - // Clear in-memory and on-disk filter headers segments - self.filter_headers.write().await.clear_all().await?; - self.filters.write().await.clear_all().await?; - - // Restart background worker for future operations - self.start_worker().await; - - Ok(()) - } - - async fn get_header_height_by_hash(&self, hash: &BlockHash) -> StorageResult> { - Self::get_header_height_by_hash(self, hash).await - } - - async fn store_chain_lock( - &mut self, - height: u32, - chain_lock: &dashcore::ChainLock, - ) -> StorageResult<()> { - Self::store_chain_lock(self, height, chain_lock).await - } - - async fn load_chain_lock(&self, height: u32) -> StorageResult> { - Self::load_chain_lock(self, height).await - } - - async fn get_chain_locks( - &self, - start_height: u32, - end_height: u32, - ) -> StorageResult> { - Self::get_chain_locks(self, start_height, end_height).await - } - - async fn store_mempool_transaction( - &mut self, - txid: &Txid, - tx: &UnconfirmedTransaction, - ) -> StorageResult<()> { - Self::store_mempool_transaction(self, txid, tx).await - } - - async fn remove_mempool_transaction(&mut self, txid: &Txid) -> StorageResult<()> { - Self::remove_mempool_transaction(self, txid).await - } - - async fn get_mempool_transaction( - &self, - txid: &Txid, - ) -> StorageResult> { - Self::get_mempool_transaction(self, txid).await - } - - async fn get_all_mempool_transactions( - &self, - ) -> StorageResult> { - Self::get_all_mempool_transactions(self).await - } - - async fn store_mempool_state(&mut self, state: &MempoolState) -> StorageResult<()> { - Self::store_mempool_state(self, state).await - } - - async fn load_mempool_state(&self) -> StorageResult> { - Self::load_mempool_state(self).await - } - - async fn clear_mempool(&mut self) -> StorageResult<()> { - Self::clear_mempool(self).await - } - - async fn shutdown(&mut self) -> StorageResult<()> { - Self::shutdown(self).await; - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use dashcore::{block::Version, pow::CompactTarget}; - use dashcore_hashes::Hash; - use tempfile::TempDir; - - fn build_headers(count: usize) -> Vec { - let mut headers = Vec::with_capacity(count); - let mut prev_hash = BlockHash::from_byte_array([0u8; 32]); - - for i in 0..count { - let header = BlockHeader { - version: Version::from_consensus(1), - prev_blockhash: prev_hash, - merkle_root: dashcore::hashes::sha256d::Hash::from_byte_array( - [(i % 255) as u8; 32], - ) - .into(), - time: 1 + i as u32, - bits: CompactTarget::from_consensus(0x1d00ffff), - nonce: i as u32, - }; - prev_hash = header.block_hash(); - headers.push(header); - } - - headers - } - - #[tokio::test] - async fn test_load_headers() -> Result<(), Box> { - // Create a temporary directory for the test - let temp_dir = TempDir::new()?; - let mut storage = DiskStorageManager::new(temp_dir.path().to_path_buf()) - .await - .expect("Unable to create storage"); - - // Create a test header - let test_header = BlockHeader { - version: Version::from_consensus(1), - prev_blockhash: BlockHash::from_byte_array([1; 32]), - merkle_root: dashcore::hashes::sha256d::Hash::from_byte_array([2; 32]).into(), - time: 12345, - bits: CompactTarget::from_consensus(0x1d00ffff), - nonce: 67890, - }; - - // Store just one header - storage.store_headers(&[test_header]).await?; - - let loaded_headers = storage.load_headers(0..1).await?; - - // Should only get back the one header we stored - assert_eq!(loaded_headers.len(), 1); - assert_eq!(loaded_headers[0], test_header); - - Ok(()) - } - - #[tokio::test] - async fn test_checkpoint_storage_indexing() -> StorageResult<()> { - use dashcore::TxMerkleNode; - use tempfile::tempdir; - - let temp_dir = tempdir().expect("Failed to create temp dir"); - let mut storage = DiskStorageManager::new(temp_dir.path().to_path_buf()).await?; - - // Create test headers starting from checkpoint height - let checkpoint_height = 1_100_000; - let headers: Vec = (0..100) - .map(|i| BlockHeader { - version: Version::from_consensus(1), - prev_blockhash: BlockHash::from_byte_array([i as u8; 32]), - merkle_root: TxMerkleNode::from_byte_array([(i + 1) as u8; 32]), - time: 1234567890 + i, - bits: CompactTarget::from_consensus(0x1a2b3c4d), - nonce: 67890 + i, - }) - .collect(); - - let mut base_state = ChainState::new(); - base_state.sync_base_height = checkpoint_height; - storage.store_chain_state(&base_state).await?; - - storage.store_headers_at_height(&headers, checkpoint_height).await?; - assert_eq!(storage.get_stored_headers_len().await, headers.len() as u32); - - // Verify headers are stored at correct blockchain heights - let header_at_base = storage.get_header(checkpoint_height).await?; - assert_eq!( - header_at_base.expect("Header at base blockchain height should exist"), - headers[0] - ); - - let header_at_ending = storage.get_header(checkpoint_height + 99).await?; - assert_eq!( - header_at_ending.expect("Header at ending blockchain height should exist"), - headers[99] - ); - - // Test the reverse index (hash -> blockchain height) - let hash_0 = headers[0].block_hash(); - let height_0 = storage.get_header_height_by_hash(&hash_0).await?; - assert_eq!( - height_0, - Some(checkpoint_height), - "Hash should map to blockchain height 1,100,000" - ); - - let hash_99 = headers[99].block_hash(); - let height_99 = storage.get_header_height_by_hash(&hash_99).await?; - assert_eq!( - height_99, - Some(checkpoint_height + 99), - "Hash should map to blockchain height 1,100,099" - ); - - // Store chain state to persist sync_base_height - let mut chain_state = ChainState::new(); - chain_state.sync_base_height = checkpoint_height; - storage.store_chain_state(&chain_state).await?; - - // Force save to disk - storage.save_dirty().await; - - drop(storage); - - // Create a new storage instance to test index rebuilding - let storage2 = DiskStorageManager::new(temp_dir.path().to_path_buf()).await?; - - // Verify the index was rebuilt correctly - let height_after_rebuild = storage2.get_header_height_by_hash(&hash_0).await?; - assert_eq!( - height_after_rebuild, - Some(checkpoint_height), - "After index rebuild, hash should still map to blockchain height 1,100,000" - ); - - // Verify header can still be retrieved by blockchain height after reload - let header_after_reload = storage2.get_header(checkpoint_height).await?; - assert!( - header_after_reload.is_some(), - "Header at base blockchain height should exist after reload" - ); - assert_eq!(header_after_reload.unwrap(), headers[0]); - - Ok(()) - } - - #[tokio::test] - async fn test_shutdown_flushes_index() -> Result<(), Box> { - let temp_dir = TempDir::new()?; - let base_path = temp_dir.path().to_path_buf(); - let headers = build_headers(11_000); - let last_hash = headers.last().unwrap().block_hash(); - - { - let mut storage = DiskStorageManager::new(base_path.clone()).await?; - - storage.store_headers(&headers[..10_000]).await?; - storage.save_dirty().await; - - storage.store_headers(&headers[10_000..]).await?; - storage.shutdown().await; - } - - let storage = DiskStorageManager::new(base_path).await?; - let height = storage.get_header_height_by_hash(&last_hash).await?; - assert_eq!(height, Some(10_999)); - - Ok(()) - } -}