diff --git a/src/homedb/core/src/index/btree_index.rs b/src/homedb/core/src/index/btree_index.rs index a8dd284d4..2019c6f2f 100644 --- a/src/homedb/core/src/index/btree_index.rs +++ b/src/homedb/core/src/index/btree_index.rs @@ -11,6 +11,8 @@ pub trait IndexQueryHandle: Send fn results(&self) -> &[(K, V)]; fn has_more(&self) -> bool; fn into_any_send(self: Box) -> Box; + /// Take ownership of the results, avoiding a clone. Default falls back to `to_vec()`. + fn into_results(self: Box) -> Vec<(K, V)>; } /// Convert to Box for downcast in query_next_batch. Requires IndexQueryHandle: Any. diff --git a/src/homedb/core/src/index/iterator.rs b/src/homedb/core/src/index/iterator.rs index 8cd848de4..5c929edeb 100644 --- a/src/homedb/core/src/index/iterator.rs +++ b/src/homedb/core/src/index/iterator.rs @@ -43,11 +43,15 @@ impl PlainRangeIterator { reverse: bool, key_spec: KeySpec, ) -> Self { - let results = handle.results().to_vec(); - let has_more = handle.has_more(); + let (mut results, handle) = if handle.has_more() { + (handle.results().to_vec(), Some(handle)) + } else { + (handle.into_results(), None) + }; + results.reverse(); Self { index, - handle: if has_more { Some(handle) } else { None }, + handle, current_batch: results, start_key, end_key, @@ -60,8 +64,7 @@ impl PlainRangeIterator { #[maybe_async_cfg::maybe(keep_self, sync(feature = "sync_frontend"), async(feature = "async_frontend"))] async fn next(&mut self) -> Result, Vec)>> { loop { - if !self.current_batch.is_empty() { - let (key, value) = self.current_batch.remove(0); + if let Some((key, value)) = self.current_batch.pop() { return Ok(Some((key.into_vec(), value.into_vec()))); } @@ -72,8 +75,14 @@ impl PlainRangeIterator { .query_next_batch(h) .await .map_err(|e| HomeDbError::BtreeError(format!("{:?}", e)))?; - self.current_batch = next_handle.results().to_vec(); - self.handle = if next_handle.has_more() { Some(next_handle) } else { None }; + let (mut results, handle) = if next_handle.has_more() { + (next_handle.results().to_vec(), Some(next_handle)) + } else { + (next_handle.into_results(), None) + }; + results.reverse(); + self.current_batch = results; + self.handle = handle; } _ => return Ok(None), } @@ -102,8 +111,14 @@ impl PlainRangeIterator { return Ok(false); } - self.current_batch = new_handle.results().to_vec(); - self.handle = if new_handle.has_more() { Some(new_handle) } else { None }; + let (mut results, handle) = if new_handle.has_more() { + (new_handle.results().to_vec(), Some(new_handle)) + } else { + (new_handle.into_results(), None) + }; + results.reverse(); + self.current_batch = results; + self.handle = handle; Ok(true) } } @@ -129,6 +144,7 @@ struct MvccRangeIterator { handle: Option, MvccValue>>>, current_batch: Vec<(MvccKey, MvccValue)>, snapshot_ts: u64, + filter: Arc, MvccValue>>, start_key: Vec, end_key: Vec, batch_size: u32, @@ -149,13 +165,20 @@ impl MvccRangeIterator { snapshot_ts: u64, key_spec: KeySpec, ) -> Self { - let results = handle.results().to_vec(); - let has_more = handle.has_more(); + let filter: Arc, MvccValue>> = + Arc::new(MvccQueryFilter::new(snapshot_ts)); + let (mut results, handle) = if handle.has_more() { + (handle.results().to_vec(), Some(handle)) + } else { + (handle.into_results(), None) + }; + results.reverse(); Self { index, - handle: if has_more { Some(handle) } else { None }, + handle, current_batch: results, snapshot_ts, + filter, start_key, end_key, batch_size, @@ -169,8 +192,7 @@ impl MvccRangeIterator { loop { // The MvccQueryFilter passed at query time already removed all MVCC duplicates, // too-new versions, and tombstones. Drain the batch as plain key-value pairs. - if !self.current_batch.is_empty() { - let (mvcc_key, mvcc_val) = self.current_batch.remove(0); + if let Some((mvcc_key, mvcc_val)) = self.current_batch.pop() { let user_key_bytes = mvcc_key.inner.into_vec(); let value_bytes = mvcc_val.into_inner().map(|v| v.into_vec()).unwrap_or_default(); return Ok(Some((user_key_bytes, value_bytes))); @@ -184,8 +206,14 @@ impl MvccRangeIterator { .query_next_batch(h) .await .map_err(|e| HomeDbError::BtreeError(format!("{:?}", e)))?; - self.current_batch = next_handle.results().to_vec(); - self.handle = if next_handle.has_more() { Some(next_handle) } else { None }; + let (mut results, handle) = if next_handle.has_more() { + (next_handle.results().to_vec(), Some(next_handle)) + } else { + (next_handle.into_results(), None) + }; + results.reverse(); + self.current_batch = results; + self.handle = handle; } _ => return Ok(None), } @@ -212,11 +240,9 @@ impl MvccRangeIterator { }; let range = BtreeKeyRange::new(range_start, true, range_end, self.reverse); - let filter: Arc, MvccValue>> = - Arc::new(MvccQueryFilter::new(self.snapshot_ts)); let new_handle = self .index - .query(range, self.batch_size, Some(filter), self.reverse) + .query(range, self.batch_size, Some(Arc::clone(&self.filter)), self.reverse) .await .map_err(|e| HomeDbError::BtreeError(format!("{:?}", e)))?; @@ -224,8 +250,14 @@ impl MvccRangeIterator { return Ok(false); } - self.current_batch = new_handle.results().to_vec(); - self.handle = if new_handle.has_more() { Some(new_handle) } else { None }; + let (mut results, handle) = if new_handle.has_more() { + (new_handle.results().to_vec(), Some(new_handle)) + } else { + (new_handle.into_results(), None) + }; + results.reverse(); + self.current_batch = results; + self.handle = handle; Ok(true) } } diff --git a/src/homedb/core/src/index/sharded_btree.rs b/src/homedb/core/src/index/sharded_btree.rs index b68924e49..47c43a31b 100644 --- a/src/homedb/core/src/index/sharded_btree.rs +++ b/src/homedb/core/src/index/sharded_btree.rs @@ -197,6 +197,9 @@ impl IndexQueryHandle for fn results(&self) -> &[(K, V)] { &self.results } fn has_more(&self) -> bool { self.has_more_impl() } fn into_any_send(self: Box) -> Box { self } + fn into_results(self: Box) -> Vec<(K, V)> { + self.results + } } fn to_sharded_query_handle( diff --git a/src/homedb/core/src/index/unsharded_btree.rs b/src/homedb/core/src/index/unsharded_btree.rs index e99ea9c11..fdf85fa91 100644 --- a/src/homedb/core/src/index/unsharded_btree.rs +++ b/src/homedb/core/src/index/unsharded_btree.rs @@ -36,10 +36,14 @@ impl IndexQueryHandle for fn has_more(&self) -> bool { self.0.has_more() } - + fn into_any_send(self: Box) -> Box { self } + + fn into_results(self: Box) -> Vec<(K, V)> { + self.0.results + } } pub struct UnshardedBtree { diff --git a/src/homedb/mem_db/src/table_index.rs b/src/homedb/mem_db/src/table_index.rs index 20c8c9e47..f1e4d6cc6 100644 --- a/src/homedb/mem_db/src/table_index.rs +++ b/src/homedb/mem_db/src/table_index.rs @@ -19,7 +19,7 @@ //! no MVCC branching. use std::sync::Arc; -use homestore::index::btree::BtreeConfig; +use homestore::index::btree::{BtreeConfig, MergePolicy}; use homedb_core::{IndexOps, MvccGc, MvccOps, NonTxnOps, RangeIterator, Snapshot, SnapshotOps}; use crate::{HomeDbError, KeySpec, KeyType, PrefixType, Result, TableSpec, ValueSpec}; @@ -74,6 +74,13 @@ impl TableIndex { config.leaf_node_variant = node_variant; config.int_node_variant = node_variant; + // MVCC tables with inline/background GC: disable node merges to prevent + // NodeNotFound errors when concurrent GC removes trigger node merges + // while foreground puts are traversing the tree. + if spec.mvcc_enabled { + config.merge_policy = MergePolicy::Never; + } + // Prefix compression is only meaningful for plain tables. if !spec.mvcc_enabled { if let PrefixType::Prefixable(Some(prefix_size)) = spec.key_spec.prefix_type { diff --git a/src/homestore/index/btree/detail/mutate.rs b/src/homestore/index/btree/detail/mutate.rs index 78dd000e6..ac98fceaa 100644 --- a/src/homestore/index/btree/detail/mutate.rs +++ b/src/homestore/index/btree/detail/mutate.rs @@ -64,8 +64,8 @@ where match self.put_one_walk(root, &mut req).await { Ok(hb) => { hit_boundary = hb; break; } - Err(BtreeError::Retry) => continue, // Retriable errors - Err(e) => return Err(e), // Non-retriable errors + Err(BtreeError::Retry) => continue, + Err(e) => return Err(e), } } Ok((req.into_stats(), hit_boundary))