diff --git a/crates/cold-mdbx/src/backend.rs b/crates/cold-mdbx/src/backend.rs index bcf7c28..86ed5c6 100644 --- a/crates/cold-mdbx/src/backend.rs +++ b/crates/cold-mdbx/src/backend.rs @@ -10,9 +10,9 @@ use crate::{ }; use alloy::{consensus::transaction::Recovered, primitives::BlockNumber}; use signet_cold::{ - BlockData, ColdReceipt, ColdResult, ColdStorage, ColdStorageError, Confirmed, Filter, - HeaderSpecifier, ReceiptSpecifier, RpcLog, SignetEventsSpecifier, TransactionSpecifier, - ZenithHeaderSpecifier, + BlockData, ColdReceipt, ColdResult, ColdStorage, ColdStorageError, ColdStorageRead, + ColdStorageWrite, Confirmed, Filter, HeaderSpecifier, ReceiptSpecifier, RpcLog, + SignetEventsSpecifier, TransactionSpecifier, ZenithHeaderSpecifier, }; use signet_hot::{ KeySer, MAX_KEY_SIZE, ValSer, @@ -191,6 +191,7 @@ fn produce_log_stream_blocking( /// This backend stores historical blockchain data in an MDBX database. /// It implements the [`ColdStorage`] trait for use with the cold storage /// task runner. +#[derive(Clone)] pub struct MdbxColdBackend { /// The MDBX environment. env: DatabaseEnv, @@ -646,7 +647,7 @@ impl MdbxColdBackend { } } -impl ColdStorage for MdbxColdBackend { +impl ColdStorageRead for MdbxColdBackend { async fn get_header(&self, spec: HeaderSpecifier) -> ColdResult> { Ok(self.get_header_inner(spec)?) } @@ -757,20 +758,24 @@ impl ColdStorage for MdbxColdBackend { .map_err(MdbxColdError::from)?; Ok(latest) } +} - async fn append_block(&self, data: BlockData) -> ColdResult<()> { +impl ColdStorageWrite for MdbxColdBackend { + async fn append_block(&mut self, data: BlockData) -> ColdResult<()> { Ok(self.append_block_inner(data)?) } - async fn append_blocks(&self, data: Vec) -> ColdResult<()> { + async fn append_blocks(&mut self, data: Vec) -> ColdResult<()> { Ok(self.append_blocks_inner(data)?) } - async fn truncate_above(&self, block: BlockNumber) -> ColdResult<()> { + async fn truncate_above(&mut self, block: BlockNumber) -> ColdResult<()> { Ok(self.truncate_above_inner(block)?) } +} - async fn drain_above(&self, block: BlockNumber) -> ColdResult>> { +impl ColdStorage for MdbxColdBackend { + async fn drain_above(&mut self, block: BlockNumber) -> ColdResult>> { Ok(self.drain_above_inner(block)?) } } diff --git a/crates/cold-sql/src/backend.rs b/crates/cold-sql/src/backend.rs index 3921319..22684f2 100644 --- a/crates/cold-sql/src/backend.rs +++ b/crates/cold-sql/src/backend.rs @@ -35,9 +35,9 @@ use alloy::{ }, }; use signet_cold::{ - BlockData, ColdReceipt, ColdResult, ColdStorage, ColdStorageError, Confirmed, Filter, - HeaderSpecifier, ReceiptSpecifier, RpcLog, SignetEventsSpecifier, TransactionSpecifier, - ZenithHeaderSpecifier, + BlockData, ColdReceipt, ColdResult, ColdStorage, ColdStorageError, ColdStorageRead, + ColdStorageWrite, Confirmed, Filter, HeaderSpecifier, ReceiptSpecifier, RpcLog, + SignetEventsSpecifier, TransactionSpecifier, ZenithHeaderSpecifier, }; use signet_storage_types::{ ConfirmationMeta, DbSignetEvent, DbZenithHeader, IndexedReceipt, RecoveredTx, SealedHeader, @@ -971,7 +971,7 @@ fn build_log_filter_clause(filter: &Filter, start_idx: u32) -> (String, Vec<&[u8 // ColdStorage implementation // ============================================================================ -impl ColdStorage for SqlColdBackend { +impl ColdStorageRead for SqlColdBackend { async fn get_header(&self, spec: HeaderSpecifier) -> ColdResult> { let Some(block_num) = self.resolve_header_spec(spec).await? else { return Ok(None); @@ -1364,12 +1364,14 @@ impl ColdStorage for SqlColdBackend { .map_err(SqlColdError::from)?; Ok(row.get::, _>(COL_MAX_BN).map(from_i64)) } +} - async fn append_block(&self, data: BlockData) -> ColdResult<()> { +impl ColdStorageWrite for SqlColdBackend { + async fn append_block(&mut self, data: BlockData) -> ColdResult<()> { self.insert_block(data).await.map_err(ColdStorageError::from) } - async fn append_blocks(&self, data: Vec) -> ColdResult<()> { + async fn append_blocks(&mut self, data: Vec) -> ColdResult<()> { let mut tx = self.pool.begin().await.map_err(SqlColdError::from)?; for block_data in data { write_block_to_tx(&mut tx, block_data).await.map_err(ColdStorageError::from)?; @@ -1378,7 +1380,7 @@ impl ColdStorage for SqlColdBackend { Ok(()) } - async fn truncate_above(&self, block: BlockNumber) -> ColdResult<()> { + async fn truncate_above(&mut self, block: BlockNumber) -> ColdResult<()> { let bn = to_i64(block); let mut tx = self.pool.begin().await.map_err(SqlColdError::from)?; @@ -1398,6 +1400,8 @@ impl ColdStorage for SqlColdBackend { } } +impl ColdStorage for SqlColdBackend {} + #[cfg(all(test, feature = "test-utils"))] mod tests { use super::*; diff --git a/crates/cold/src/lib.rs b/crates/cold/src/lib.rs index 866293c..dd61d99 100644 --- a/crates/cold/src/lib.rs +++ b/crates/cold/src/lib.rs @@ -159,7 +159,7 @@ pub use cold_receipt::ColdReceipt; mod stream; pub use stream::{StreamParams, produce_log_stream_default}; mod traits; -pub use traits::{BlockData, ColdStorage, LogStream}; +pub use traits::{BlockData, ColdStorage, ColdStorageRead, ColdStorageWrite, LogStream}; pub mod connect; pub use connect::ColdConnect; diff --git a/crates/cold/src/mem.rs b/crates/cold/src/mem.rs index f513767..e810c77 100644 --- a/crates/cold/src/mem.rs +++ b/crates/cold/src/mem.rs @@ -4,9 +4,9 @@ //! It is primarily intended for testing and development. use crate::{ - BlockData, ColdReceipt, ColdResult, ColdStorage, ColdStorageError, Confirmed, Filter, - HeaderSpecifier, ReceiptSpecifier, RpcLog, SignetEventsSpecifier, TransactionSpecifier, - ZenithHeaderSpecifier, + BlockData, ColdReceipt, ColdResult, ColdStorage, ColdStorageError, ColdStorageRead, + ColdStorageWrite, Confirmed, Filter, HeaderSpecifier, ReceiptSpecifier, RpcLog, + SignetEventsSpecifier, TransactionSpecifier, ZenithHeaderSpecifier, }; use alloy::primitives::{B256, BlockNumber}; use signet_storage_types::{ @@ -47,6 +47,7 @@ struct MemColdBackendInner { /// /// This backend is thread-safe and suitable for concurrent access. /// All operations are protected by an async read-write lock. +#[derive(Clone)] pub struct MemColdBackend { inner: Arc>, } @@ -100,7 +101,7 @@ impl MemColdBackendInner { } } -impl ColdStorage for MemColdBackend { +impl ColdStorageRead for MemColdBackend { async fn get_header(&self, spec: HeaderSpecifier) -> ColdResult> { let inner = self.inner.read().await; match spec { @@ -274,8 +275,10 @@ impl ColdStorage for MemColdBackend { let inner = self.inner.read().await; Ok(inner.headers.last_key_value().map(|(k, _)| *k)) } +} - async fn append_block(&self, data: BlockData) -> ColdResult<()> { +impl ColdStorageWrite for MemColdBackend { + async fn append_block(&mut self, data: BlockData) -> ColdResult<()> { let mut inner = self.inner.write().await; let block = data.block_number(); @@ -323,20 +326,22 @@ impl ColdStorage for MemColdBackend { Ok(()) } - async fn append_blocks(&self, data: Vec) -> ColdResult<()> { + async fn append_blocks(&mut self, data: Vec) -> ColdResult<()> { for block_data in data { self.append_block(block_data).await?; } Ok(()) } - async fn truncate_above(&self, block: BlockNumber) -> ColdResult<()> { + async fn truncate_above(&mut self, block: BlockNumber) -> ColdResult<()> { let mut inner = self.inner.write().await; inner.truncate_above(block); Ok(()) } +} - async fn drain_above(&self, block: BlockNumber) -> ColdResult>> { +impl ColdStorage for MemColdBackend { + async fn drain_above(&mut self, block: BlockNumber) -> ColdResult>> { let mut inner = self.inner.write().await; // Collect receipts for blocks above `block` in ascending order diff --git a/crates/cold/src/stream.rs b/crates/cold/src/stream.rs index c709d56..6793dfe 100644 --- a/crates/cold/src/stream.rs +++ b/crates/cold/src/stream.rs @@ -1,13 +1,13 @@ //! Log-streaming helper for backends without snapshot semantics. -use crate::{ColdResult, ColdStorage, ColdStorageError, Filter, HeaderSpecifier, RpcLog}; +use crate::{ColdResult, ColdStorageError, ColdStorageRead, Filter, HeaderSpecifier, RpcLog}; use alloy::{primitives::BlockNumber, rpc::types::FilterBlockOption}; use tokio::sync::mpsc; /// Parameters for a log-streaming request. /// /// Bundles the block range, limits, channel, and deadline that every -/// [`ColdStorage::produce_log_stream`] implementation needs. +/// [`ColdStorageRead::produce_log_stream`] implementation needs. #[derive(Debug)] pub struct StreamParams { /// First block in range (inclusive). @@ -28,13 +28,13 @@ pub struct StreamParams { /// /// Captures an anchor hash from the `to` block at the start and /// re-checks it before each block to detect reorgs. Uses -/// [`ColdStorage::get_header`] for anchor checks and -/// [`ColdStorage::get_logs`] with single-block filters per block. +/// [`ColdStorageRead::get_header`] for anchor checks and +/// [`ColdStorageRead::get_logs`] with single-block filters per block. /// /// Backends that hold a consistent read snapshot (MDBX, PostgreSQL /// with REPEATABLE READ) should provide their own -/// [`ColdStorage::produce_log_stream`] implementation instead. -pub async fn produce_log_stream_default( +/// [`ColdStorageRead::produce_log_stream`] implementation instead. +pub async fn produce_log_stream_default( backend: &B, filter: &Filter, params: StreamParams, diff --git a/crates/cold/src/task/runner.rs b/crates/cold/src/task/runner.rs index c6bd6df..c247d90 100644 --- a/crates/cold/src/task/runner.rs +++ b/crates/cold/src/task/runner.rs @@ -17,13 +17,13 @@ //! //! The task owns the streaming configuration (max deadline, concurrency //! limit) and delegates the streaming loop to the backend via -//! [`ColdStorage::produce_log_stream`]. Callers supply a per-request +//! [`ColdStorageRead::produce_log_stream`]. Callers supply a per-request //! deadline that is clamped to the task's configured maximum. use super::cache::ColdCache; use crate::{ ColdReadRequest, ColdReceipt, ColdResult, ColdStorage, ColdStorageError, ColdStorageHandle, - ColdWriteRequest, Confirmed, HeaderSpecifier, LogStream, ReceiptSpecifier, + ColdStorageRead, ColdWriteRequest, Confirmed, HeaderSpecifier, LogStream, ReceiptSpecifier, TransactionSpecifier, }; use signet_storage_types::{RecoveredTx, SealedHeader}; @@ -51,12 +51,12 @@ const MAX_CONCURRENT_STREAMS: usize = 8; /// Channel buffer size for streaming operations. const STREAM_CHANNEL_BUFFER: usize = 256; -/// Shared state for the cold storage task, holding the backend and cache. +/// Shared state for the cold storage task, holding the read backend and cache. /// /// This is wrapped in an `Arc` so that spawned read handlers can access /// the backend and cache without moving ownership. struct ColdStorageTaskInner { - backend: B, + read_backend: B, cache: Mutex, max_stream_deadline: Duration, stream_semaphore: Arc, @@ -66,13 +66,13 @@ struct ColdStorageTaskInner { stream_tracker: TaskTracker, } -impl ColdStorageTaskInner { +impl ColdStorageTaskInner { /// Fetch a header from the backend and cache the result. async fn fetch_and_cache_header( &self, spec: HeaderSpecifier, ) -> ColdResult> { - let r = self.backend.get_header(spec).await; + let r = self.read_backend.get_header(spec).await; if let Ok(Some(ref h)) = r { self.cache.lock().await.put_header(h.number, h.clone()); } @@ -84,7 +84,7 @@ impl ColdStorageTaskInner { &self, spec: TransactionSpecifier, ) -> ColdResult>> { - let r = self.backend.get_transaction(spec).await; + let r = self.read_backend.get_transaction(spec).await; if let Ok(Some(ref c)) = r { let meta = c.meta(); self.cache @@ -100,7 +100,7 @@ impl ColdStorageTaskInner { &self, spec: ReceiptSpecifier, ) -> ColdResult> { - let r = self.backend.get_receipt(spec).await; + let r = self.read_backend.get_receipt(spec).await; if let Ok(Some(ref c)) = r { self.cache.lock().await.put_receipt((c.block_number, c.transaction_index), c.clone()); } @@ -123,7 +123,7 @@ impl ColdStorageTaskInner { let _ = resp.send(result); } ColdReadRequest::GetHeaders { specs, resp } => { - let _ = resp.send(self.backend.get_headers(specs).await); + let _ = resp.send(self.read_backend.get_headers(specs).await); } ColdReadRequest::GetTransaction { spec, resp } => { let result = if let TransactionSpecifier::BlockAndIndex { block, index } = &spec { @@ -138,10 +138,10 @@ impl ColdStorageTaskInner { let _ = resp.send(result); } ColdReadRequest::GetTransactionsInBlock { block, resp } => { - let _ = resp.send(self.backend.get_transactions_in_block(block).await); + let _ = resp.send(self.read_backend.get_transactions_in_block(block).await); } ColdReadRequest::GetTransactionCount { block, resp } => { - let _ = resp.send(self.backend.get_transaction_count(block).await); + let _ = resp.send(self.read_backend.get_transaction_count(block).await); } ColdReadRequest::GetReceipt { spec, resp } => { let result = if let ReceiptSpecifier::BlockAndIndex { block, index } = &spec { @@ -156,25 +156,25 @@ impl ColdStorageTaskInner { let _ = resp.send(result); } ColdReadRequest::GetReceiptsInBlock { block, resp } => { - let _ = resp.send(self.backend.get_receipts_in_block(block).await); + let _ = resp.send(self.read_backend.get_receipts_in_block(block).await); } ColdReadRequest::GetSignetEvents { spec, resp } => { - let _ = resp.send(self.backend.get_signet_events(spec).await); + let _ = resp.send(self.read_backend.get_signet_events(spec).await); } ColdReadRequest::GetZenithHeader { spec, resp } => { - let _ = resp.send(self.backend.get_zenith_header(spec).await); + let _ = resp.send(self.read_backend.get_zenith_header(spec).await); } ColdReadRequest::GetZenithHeaders { spec, resp } => { - let _ = resp.send(self.backend.get_zenith_headers(spec).await); + let _ = resp.send(self.read_backend.get_zenith_headers(spec).await); } ColdReadRequest::GetLogs { filter, max_logs, resp } => { - let _ = resp.send(self.backend.get_logs(&filter, max_logs).await); + let _ = resp.send(self.read_backend.get_logs(&filter, max_logs).await); } ColdReadRequest::StreamLogs { filter, max_logs, deadline, resp } => { let _ = resp.send(self.handle_stream_logs(*filter, max_logs, deadline).await); } ColdReadRequest::GetLatestBlock { resp } => { - let _ = resp.send(self.backend.get_latest_block().await); + let _ = resp.send(self.read_backend.get_latest_block().await); } } } @@ -183,7 +183,7 @@ impl ColdStorageTaskInner { /// /// Acquires a concurrency permit, resolves the block range, then /// spawns a producer task that delegates to - /// [`ColdStorage::produce_log_stream`]. + /// [`ColdStorageRead::produce_log_stream`]. async fn handle_stream_logs( self: &Arc, filter: crate::Filter, @@ -201,7 +201,7 @@ impl ColdStorageTaskInner { let to = match filter.get_to_block() { Some(to) => to, None => { - let Some(latest) = self.backend.get_latest_block().await? else { + let Some(latest) = self.read_backend.get_latest_block().await? else { let (_tx, rx) = mpsc::channel(1); return Ok(ReceiverStream::new(rx)); }; @@ -212,45 +212,17 @@ impl ColdStorageTaskInner { let effective = deadline.min(self.max_stream_deadline); let deadline_instant = tokio::time::Instant::now() + effective; let (sender, rx) = mpsc::channel(STREAM_CHANNEL_BUFFER); - let inner = Arc::clone(self); + let stream_backend = self.read_backend.clone(); self.stream_tracker.spawn(async move { let _permit = permit; let params = crate::StreamParams { from, to, max_logs, sender, deadline: deadline_instant }; - inner.backend.produce_log_stream(&filter, params).await; + stream_backend.produce_log_stream(&filter, params).await; }); Ok(ReceiverStream::new(rx)) } - - /// Handle a write request, invalidating the cache on truncation. - async fn handle_write(&self, req: ColdWriteRequest) { - match req { - ColdWriteRequest::AppendBlock(boxed) => { - let result = self.backend.append_block(boxed.data).await; - let _ = boxed.resp.send(result); - } - ColdWriteRequest::AppendBlocks { data, resp } => { - let result = self.backend.append_blocks(data).await; - let _ = resp.send(result); - } - ColdWriteRequest::TruncateAbove { block, resp } => { - let result = self.backend.truncate_above(block).await; - if result.is_ok() { - self.cache.lock().await.invalidate_above(block); - } - let _ = resp.send(result); - } - ColdWriteRequest::DrainAbove { block, resp } => { - let result = self.backend.drain_above(block).await; - if result.is_ok() { - self.cache.lock().await.invalidate_above(block); - } - let _ = resp.send(result); - } - } - } } /// The cold storage task that processes requests. @@ -278,16 +250,17 @@ impl ColdStorageTaskInner { /// /// The task owns the streaming configuration (max deadline, concurrency /// limit) and delegates the streaming loop to the backend via -/// [`ColdStorage::produce_log_stream`]. Callers supply a per-request +/// [`ColdStorageRead::produce_log_stream`]. Callers supply a per-request /// deadline that is clamped to the task's configured maximum. /// /// # Caching /// /// Transaction, receipt, and header lookups are served from an LRU cache /// when possible. Cache entries are invalidated on -/// [`truncate_above`](crate::ColdStorage::truncate_above) to handle reorgs. +/// [`truncate_above`](crate::ColdStorageWrite::truncate_above) to handle reorgs. pub struct ColdStorageTask { inner: Arc>, + write_backend: B, read_receiver: mpsc::Receiver, write_receiver: mpsc::Receiver, cancel_token: CancellationToken, @@ -306,14 +279,16 @@ impl ColdStorageTask { pub fn new(backend: B, cancel_token: CancellationToken) -> (Self, ColdStorageHandle) { let (read_sender, read_receiver) = mpsc::channel(READ_CHANNEL_SIZE); let (write_sender, write_receiver) = mpsc::channel(WRITE_CHANNEL_SIZE); + let read_backend = backend.clone(); let task = Self { inner: Arc::new(ColdStorageTaskInner { - backend, + read_backend, cache: Mutex::new(ColdCache::new()), max_stream_deadline: DEFAULT_MAX_STREAM_DEADLINE, stream_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_STREAMS)), stream_tracker: TaskTracker::new(), }), + write_backend: backend, read_receiver, write_receiver, cancel_token, @@ -333,6 +308,39 @@ impl ColdStorageTask { handle } + /// Handle a write request using the exclusively owned write backend. + /// + /// Called inline in the run loop (never spawned). The write backend + /// is not shared — no lock acquisition needed. The drain-before-write + /// step in `run()` ensures no read tasks are populating the cache + /// concurrently, preventing stale cache entries after truncation. + async fn handle_write(&mut self, req: ColdWriteRequest) { + match req { + ColdWriteRequest::AppendBlock(boxed) => { + let result = self.write_backend.append_block(boxed.data).await; + let _ = boxed.resp.send(result); + } + ColdWriteRequest::AppendBlocks { data, resp } => { + let result = self.write_backend.append_blocks(data).await; + let _ = resp.send(result); + } + ColdWriteRequest::TruncateAbove { block, resp } => { + let result = self.write_backend.truncate_above(block).await; + if result.is_ok() { + self.inner.cache.lock().await.invalidate_above(block); + } + let _ = resp.send(result); + } + ColdWriteRequest::DrainAbove { block, resp } => { + let result = self.write_backend.drain_above(block).await; + if result.is_ok() { + self.inner.cache.lock().await.invalidate_above(block); + } + let _ = resp.send(result); + } + } + } + /// Run the task, processing requests until shutdown. #[instrument(skip(self), name = "cold_storage_task")] pub async fn run(mut self) { @@ -359,7 +367,7 @@ impl ColdStorageTask { self.task_tracker.wait().await; self.task_tracker.reopen(); - self.inner.handle_write(req).await; + self.handle_write(req).await; } maybe_read = self.read_receiver.recv() => { diff --git a/crates/cold/src/traits.rs b/crates/cold/src/traits.rs index d5bde67..d351d4c 100644 --- a/crates/cold/src/traits.rs +++ b/crates/cold/src/traits.rs @@ -1,9 +1,10 @@ -//! Core trait definition for cold storage backends. +//! Core trait definitions for cold storage backends. //! -//! The [`ColdStorage`] trait defines the interface that all cold storage -//! backends must implement. Backends are responsible for data organization, -//! indexing, and keying - the trait is agnostic to these implementation -//! details. +//! The cold storage interface is split into three traits: +//! +//! - [`ColdStorageRead`] — read-only access (`&self`, `Clone`) +//! - [`ColdStorageWrite`] — write access (`&mut self`) +//! - [`ColdStorage`] — supertrait combining both, with `drain_above` use crate::{ ColdReceipt, ColdResult, Confirmed, Filter, HeaderSpecifier, ReceiptSpecifier, RpcLog, @@ -80,34 +81,20 @@ impl From for BlockData { } } -/// Unified cold storage backend trait. -/// -/// Backend is responsible for all data organization, indexing, and keying. -/// The trait is agnostic to how the backend stores or indexes data. +/// Read-only cold storage backend trait. /// -/// All methods are async and return futures that are `Send`. +/// All methods take `&self` and return `Send` futures. Implementations +/// must be `Clone + Send + Sync + 'static` so that read backends can be +/// shared across tasks (e.g. via `Arc` or cheap cloning). /// /// # Implementation Guide /// /// Implementers must ensure: /// -/// - **Append-only ordering**: `append_block` must enforce monotonically -/// increasing block numbers. Attempting to append a block with a number <= -/// the current latest should return an error. -/// -/// - **Atomic truncation**: `truncate_above` must remove all data for blocks -/// N+1 and higher atomically. Partial truncation is not acceptable. -/// -/// - **Index maintenance**: Hash-based lookups (e.g., header by hash, -/// transaction by hash) require the implementation to maintain appropriate -/// indexes. These indexes must be updated during `append_block` and cleaned -/// during `truncate_above`. -/// -/// - **Consistent reads**: Read operations should return consistent snapshots. -/// A read started before a write completes should not see partial data from -/// that write. -/// -pub trait ColdStorage: Send + Sync + 'static { +/// - **Consistent reads**: Read operations should return consistent +/// snapshots. A read started before a write completes should not see +/// partial data from that write. +pub trait ColdStorageRead: Clone + Send + Sync + 'static { // --- Headers --- /// Get a header by specifier. @@ -226,28 +213,61 @@ pub trait ColdStorage: Send + Sync + 'static { /// All errors are sent through `sender`. When this method returns, /// the sender is dropped, closing the stream. /// - /// [`get_header`]: ColdStorage::get_header - /// [`get_logs`]: ColdStorage::get_logs + /// [`get_header`]: ColdStorageRead::get_header + /// [`get_logs`]: ColdStorageRead::get_logs /// [`produce_log_stream_default`]: crate::produce_log_stream_default fn produce_log_stream( &self, filter: &Filter, params: StreamParams, ) -> impl Future + Send; +} - // --- Write operations --- - +/// Write-only cold storage backend trait. +/// +/// All methods take `&mut self` and return `Send` futures. The write +/// backend is exclusively owned by the task runner — no synchronization +/// is needed. +/// +/// # Implementation Guide +/// +/// Implementers must ensure: +/// +/// - **Append-only ordering**: `append_block` must enforce monotonically +/// increasing block numbers. Attempting to append a block with a number <= +/// the current latest should return an error. +/// +/// - **Atomic truncation**: `truncate_above` must remove all data for blocks +/// N+1 and higher atomically. Partial truncation is not acceptable. +/// +/// - **Index maintenance**: Hash-based lookups (e.g., header by hash, +/// transaction by hash) require the implementation to maintain appropriate +/// indexes. These indexes must be updated during `append_block` and cleaned +/// during `truncate_above`. +pub trait ColdStorageWrite: Send + 'static { /// Append a single block to cold storage. - fn append_block(&self, data: BlockData) -> impl Future> + Send; + fn append_block(&mut self, data: BlockData) -> impl Future> + Send; /// Append multiple blocks to cold storage. - fn append_blocks(&self, data: Vec) -> impl Future> + Send; + fn append_blocks( + &mut self, + data: Vec, + ) -> impl Future> + Send; /// Truncate all data above the given block number (exclusive). /// /// This removes block N+1 and higher from all tables. Used for reorg handling. - fn truncate_above(&self, block: BlockNumber) -> impl Future> + Send; + fn truncate_above(&mut self, block: BlockNumber) + -> impl Future> + Send; +} +/// Combined read and write cold storage backend trait. +/// +/// Combines [`ColdStorageRead`] and [`ColdStorageWrite`] and provides +/// [`drain_above`](ColdStorage::drain_above), which reads receipts then +/// truncates. The default implementation is correct but not atomic; +/// backends should override with an atomic version when possible. +pub trait ColdStorage: ColdStorageRead + ColdStorageWrite { /// Read and remove all blocks above the given block number. /// /// Returns receipts for each block above `block` in ascending order, @@ -259,7 +279,7 @@ pub trait ColdStorage: Send + Sync + 'static { /// not atomic. Backends should override with an atomic version /// when possible. fn drain_above( - &self, + &mut self, block: BlockNumber, ) -> impl Future>>> + Send { async move { diff --git a/crates/storage/src/either.rs b/crates/storage/src/either.rs index 4aa90c6..3020f19 100644 --- a/crates/storage/src/either.rs +++ b/crates/storage/src/either.rs @@ -7,9 +7,9 @@ use alloy::primitives::BlockNumber; use signet_cold::{ - BlockData, ColdConnect, ColdReceipt, ColdResult, ColdStorage, Confirmed, Filter, - HeaderSpecifier, ReceiptSpecifier, SignetEventsSpecifier, StreamParams, TransactionSpecifier, - ZenithHeaderSpecifier, + BlockData, ColdConnect, ColdReceipt, ColdResult, ColdStorage, ColdStorageRead, + ColdStorageWrite, Confirmed, Filter, HeaderSpecifier, ReceiptSpecifier, SignetEventsSpecifier, + StreamParams, TransactionSpecifier, ZenithHeaderSpecifier, }; use signet_cold_mdbx::{MdbxColdBackend, MdbxConnector}; use signet_storage_types::{DbSignetEvent, DbZenithHeader, RecoveredTx, SealedHeader}; @@ -44,7 +44,7 @@ impl Either { } /// Enum to hold either cold backend type. -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum EitherCold { /// MDBX cold backend. Mdbx(MdbxColdBackend), @@ -70,9 +70,9 @@ macro_rules! dispatch_async { }; } -// Implement ColdStorage for EitherCold by dispatching to inner type +// Implement ColdStorageRead for EitherCold by dispatching to inner type #[allow(clippy::manual_async_fn)] -impl ColdStorage for EitherCold { +impl ColdStorageRead for EitherCold { fn get_header( &self, spec: HeaderSpecifier, @@ -162,20 +162,39 @@ impl ColdStorage for EitherCold { ) -> impl Future + Send { dispatch_async!(self, produce_log_stream(filter, params)) } +} - fn append_block(&self, data: BlockData) -> impl Future> + Send { +#[allow(clippy::manual_async_fn)] +impl ColdStorageWrite for EitherCold { + fn append_block(&mut self, data: BlockData) -> impl Future> + Send { dispatch_async!(self, append_block(data)) } - fn append_blocks(&self, data: Vec) -> impl Future> + Send { + fn append_blocks( + &mut self, + data: Vec, + ) -> impl Future> + Send { dispatch_async!(self, append_blocks(data)) } - fn truncate_above(&self, block: BlockNumber) -> impl Future> + Send { + fn truncate_above( + &mut self, + block: BlockNumber, + ) -> impl Future> + Send { dispatch_async!(self, truncate_above(block)) } } +#[allow(clippy::manual_async_fn)] +impl ColdStorage for EitherCold { + fn drain_above( + &mut self, + block: BlockNumber, + ) -> impl Future>>> + Send { + dispatch_async!(self, drain_above(block)) + } +} + // When SQL features are enabled #[cfg(any(feature = "postgres", feature = "sqlite"))] impl ColdConnect for Either { diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index 44ff5eb..48dc3c6 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -81,7 +81,10 @@ pub use signet_cold_mdbx::MdbxConnector; pub use signet_cold_sql::SqlConnector; // Re-export key types for convenience -pub use signet_cold::{ColdStorage, ColdStorageError, ColdStorageHandle, ColdStorageTask}; +pub use signet_cold::{ + ColdStorage, ColdStorageError, ColdStorageHandle, ColdStorageRead, ColdStorageTask, + ColdStorageWrite, +}; pub use signet_cold_mdbx::MdbxColdBackend; pub use signet_hot::{ HistoryError, HistoryRead, HistoryWrite, HotKv,