Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/homedb/core/src/index/btree_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ pub trait IndexQueryHandle<K: 'static + BtreeKey, V: 'static + BtreeValue>: Send
fn results(&self) -> &[(K, V)];
fn has_more(&self) -> bool;
fn into_any_send(self: Box<Self>) -> Box<dyn std::any::Any + Send>;
/// Take ownership of the results, avoiding a clone. Default falls back to `to_vec()`.
fn into_results(self: Box<Self>) -> Vec<(K, V)>;
}

/// Convert to Box<dyn Any> for downcast in query_next_batch. Requires IndexQueryHandle: Any.
Expand Down
74 changes: 53 additions & 21 deletions src/homedb/core/src/index/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,15 @@ impl PlainRangeIterator {
reverse: bool,
key_spec: KeySpec,
) -> Self {
let results = handle.results().to_vec();
let has_more = handle.has_more();
let (mut results, handle) = if handle.has_more() {
(handle.results().to_vec(), Some(handle))
} else {
(handle.into_results(), None)
};
results.reverse();
Self {
index,
handle: if has_more { Some(handle) } else { None },
handle,
current_batch: results,
start_key,
end_key,
Expand All @@ -60,8 +64,7 @@ impl PlainRangeIterator {
#[maybe_async_cfg::maybe(keep_self, sync(feature = "sync_frontend"), async(feature = "async_frontend"))]
async fn next(&mut self) -> Result<Option<(Vec<u8>, Vec<u8>)>> {
loop {
if !self.current_batch.is_empty() {
let (key, value) = self.current_batch.remove(0);
if let Some((key, value)) = self.current_batch.pop() {
return Ok(Some((key.into_vec(), value.into_vec())));
}

Expand All @@ -72,8 +75,14 @@ impl PlainRangeIterator {
.query_next_batch(h)
.await
.map_err(|e| HomeDbError::BtreeError(format!("{:?}", e)))?;
self.current_batch = next_handle.results().to_vec();
self.handle = if next_handle.has_more() { Some(next_handle) } else { None };
let (mut results, handle) = if next_handle.has_more() {
(next_handle.results().to_vec(), Some(next_handle))
} else {
(next_handle.into_results(), None)
};
results.reverse();
self.current_batch = results;
self.handle = handle;
}
_ => return Ok(None),
}
Expand Down Expand Up @@ -102,8 +111,14 @@ impl PlainRangeIterator {
return Ok(false);
}

self.current_batch = new_handle.results().to_vec();
self.handle = if new_handle.has_more() { Some(new_handle) } else { None };
let (mut results, handle) = if new_handle.has_more() {
(new_handle.results().to_vec(), Some(new_handle))
} else {
(new_handle.into_results(), None)
};
results.reverse();
self.current_batch = results;
self.handle = handle;
Ok(true)
}
}
Expand All @@ -129,6 +144,7 @@ struct MvccRangeIterator {
handle: Option<Box<dyn IndexQueryHandle<MvccKey<DbKey>, MvccValue<DbValue>>>>,
current_batch: Vec<(MvccKey<DbKey>, MvccValue<DbValue>)>,
snapshot_ts: u64,
filter: Arc<dyn GetFilter<MvccKey<DbKey>, MvccValue<DbValue>>>,
start_key: Vec<u8>,
end_key: Vec<u8>,
batch_size: u32,
Expand All @@ -149,13 +165,20 @@ impl MvccRangeIterator {
snapshot_ts: u64,
key_spec: KeySpec,
) -> Self {
let results = handle.results().to_vec();
let has_more = handle.has_more();
let filter: Arc<dyn GetFilter<MvccKey<DbKey>, MvccValue<DbValue>>> =
Arc::new(MvccQueryFilter::new(snapshot_ts));
let (mut results, handle) = if handle.has_more() {
(handle.results().to_vec(), Some(handle))
} else {
(handle.into_results(), None)
};
results.reverse();
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not able to follow this change as well.

Self {
index,
handle: if has_more { Some(handle) } else { None },
handle,
current_batch: results,
snapshot_ts,
filter,
start_key,
end_key,
batch_size,
Expand All @@ -169,8 +192,7 @@ impl MvccRangeIterator {
loop {
// The MvccQueryFilter passed at query time already removed all MVCC duplicates,
// too-new versions, and tombstones. Drain the batch as plain key-value pairs.
if !self.current_batch.is_empty() {
let (mvcc_key, mvcc_val) = self.current_batch.remove(0);
if let Some((mvcc_key, mvcc_val)) = self.current_batch.pop() {
let user_key_bytes = mvcc_key.inner.into_vec();
let value_bytes = mvcc_val.into_inner().map(|v| v.into_vec()).unwrap_or_default();
return Ok(Some((user_key_bytes, value_bytes)));
Expand All @@ -184,8 +206,14 @@ impl MvccRangeIterator {
.query_next_batch(h)
.await
.map_err(|e| HomeDbError::BtreeError(format!("{:?}", e)))?;
self.current_batch = next_handle.results().to_vec();
self.handle = if next_handle.has_more() { Some(next_handle) } else { None };
let (mut results, handle) = if next_handle.has_more() {
(next_handle.results().to_vec(), Some(next_handle))
} else {
(next_handle.into_results(), None)
};
results.reverse();
self.current_batch = results;
self.handle = handle;
}
_ => return Ok(None),
}
Expand All @@ -212,20 +240,24 @@ impl MvccRangeIterator {
};

let range = BtreeKeyRange::new(range_start, true, range_end, self.reverse);
let filter: Arc<dyn GetFilter<MvccKey<DbKey>, MvccValue<DbValue>>> =
Arc::new(MvccQueryFilter::new(self.snapshot_ts));
let new_handle = self
.index
.query(range, self.batch_size, Some(filter), self.reverse)
.query(range, self.batch_size, Some(Arc::clone(&self.filter)), self.reverse)
.await
.map_err(|e| HomeDbError::BtreeError(format!("{:?}", e)))?;

if new_handle.results().is_empty() {
return Ok(false);
}

self.current_batch = new_handle.results().to_vec();
self.handle = if new_handle.has_more() { Some(new_handle) } else { None };
let (mut results, handle) = if new_handle.has_more() {
(new_handle.results().to_vec(), Some(new_handle))
} else {
(new_handle.into_results(), None)
};
results.reverse();
self.current_batch = results;
self.handle = handle;
Ok(true)
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/homedb/core/src/index/sharded_btree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ impl<K: 'static + BtreeKey, V: 'static + BtreeValue> IndexQueryHandle<K, V> for
fn results(&self) -> &[(K, V)] { &self.results }
fn has_more(&self) -> bool { self.has_more_impl() }
fn into_any_send(self: Box<Self>) -> Box<dyn std::any::Any + Send> { self }
fn into_results(self: Box<Self>) -> Vec<(K, V)> {
self.results
}
}

fn to_sharded_query_handle<K: 'static + BtreeKey, V: 'static + BtreeValue>(
Expand Down
6 changes: 5 additions & 1 deletion src/homedb/core/src/index/unsharded_btree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,14 @@ impl<K: 'static + BtreeKey, V: 'static + BtreeValue> IndexQueryHandle<K, V> for
fn has_more(&self) -> bool {
self.0.has_more()
}

fn into_any_send(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
self
}

fn into_results(self: Box<Self>) -> Vec<(K, V)> {
self.0.results
}
}

pub struct UnshardedBtree<K: 'static + BtreeKey, V: 'static + BtreeValue> {
Expand Down
9 changes: 8 additions & 1 deletion src/homedb/mem_db/src/table_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
//! no MVCC branching.

use std::sync::Arc;
use homestore::index::btree::BtreeConfig;
use homestore::index::btree::{BtreeConfig, MergePolicy};
use homedb_core::{IndexOps, MvccGc, MvccOps, NonTxnOps, RangeIterator, Snapshot, SnapshotOps};
use crate::{HomeDbError, KeySpec, KeyType, PrefixType, Result, TableSpec, ValueSpec};

Expand Down Expand Up @@ -74,6 +74,13 @@ impl TableIndex {
config.leaf_node_variant = node_variant;
config.int_node_variant = node_variant;

// MVCC tables with inline/background GC: disable node merges to prevent
// NodeNotFound errors when concurrent GC removes trigger node merges
// while foreground puts are traversing the tree.
if spec.mvcc_enabled {
config.merge_policy = MergePolicy::Never;
}

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't want to merge this particular change and let it be in local changes as we should get to the bottom of NodeNotFound errors, the perf number wouldn't be representable with this issue

// Prefix compression is only meaningful for plain tables.
if !spec.mvcc_enabled {
if let PrefixType::Prefixable(Some(prefix_size)) = spec.key_spec.prefix_type {
Expand Down
4 changes: 2 additions & 2 deletions src/homestore/index/btree/detail/mutate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ where

match self.put_one_walk(root, &mut req).await {
Ok(hb) => { hit_boundary = hb; break; }
Err(BtreeError::Retry) => continue, // Retriable errors
Err(e) => return Err(e), // Non-retriable errors
Err(BtreeError::Retry) => continue,
Err(e) => return Err(e),
}
}
Ok((req.into_stats(), hit_boundary))
Expand Down