diff --git a/benches/benches.rs b/benches/benches.rs index 9da896a..c69c68e 100644 --- a/benches/benches.rs +++ b/benches/benches.rs @@ -24,7 +24,8 @@ criterion_group!( writebatch_sorting, hasher, txid_from_hex, - block_cache + block_cache, + history_multi_get ); criterion_main!(benches); @@ -543,3 +544,77 @@ pub fn block_cache(c: &mut Criterion) { group.finish(); } } + +pub fn history_multi_get(c: &mut Criterion) { + const DEFAULT_NUM_HISTORY_KEYS: u64 = 500_000; + let num_history_keys = std::env::var("WATERFALLS_HISTORY_BENCH_ROWS") + .ok() + .and_then(|value| value.parse::().ok()) + .unwrap_or(DEFAULT_NUM_HISTORY_KEYS); + + let cache = Cache::new_hyper_clock_cache(8 * 1024 * 1024, 0); + let dir = tempfile::TempDir::new().unwrap(); + let db = open_cache_bench_db(dir.path(), &cache); + populate_cache_bench_db(&db, 0, num_history_keys); + + let cf = db.cf_handle("history").unwrap(); + for lookup_count in [20usize, 512] { + let mut rng = thread_rng(); + let scripts: Vec = (0..lookup_count) + .map(|_| rng.next_u64() % (num_history_keys * 2)) + .collect(); + + let mut group = c.benchmark_group(format!( + "history_multi_get/{lookup_count}/{num_history_keys}" + )); + group.sample_size(20); + + group.bench_function("batched_unsorted", |b| { + b.iter(|| { + black_box(bench_raw_history_multi_get_old(&db, &cf, &scripts)); + }); + }); + + group.bench_function("batched_sorted_reordered", |b| { + b.iter(|| { + black_box(bench_raw_history_multi_get_new(&db, &cf, &scripts)); + }); + }); + + group.finish(); + } +} + +fn bench_raw_history_multi_get_old( + db: &DB, + cf: &impl rocksdb::AsColumnFamilyRef, + scripts: &[u64], +) -> usize { + let keys: Vec<_> = scripts.iter().map(|script| script.to_be_bytes()).collect(); + let results = db.batched_multi_get_cf(cf, keys.iter(), false); + results + .into_iter() + .map(|result| result.unwrap().as_ref().map_or(0, |value| value.len())) + .sum() +} + +fn bench_raw_history_multi_get_new( + db: &DB, + cf: &impl rocksdb::AsColumnFamilyRef, + scripts: &[u64], +) -> usize { + let mut indexed_keys: Vec<_> = scripts + .iter() + .enumerate() + .map(|(index, script)| (index, script.to_be_bytes())) + .collect(); + indexed_keys.sort_unstable_by_key(|(_, key)| *key); + + let sorted_results = db.batched_multi_get_cf(cf, indexed_keys.iter().map(|(_, key)| key), true); + let mut reordered = vec![0usize; scripts.len()]; + for ((index, _), result) in indexed_keys.into_iter().zip(sorted_results.into_iter()) { + reordered[index] = result.unwrap().as_ref().map_or(0, |value| value.len()); + } + + reordered.into_iter().sum() +} diff --git a/src/server/mempool.rs b/src/server/mempool.rs index 36327b9..b72fb33 100644 --- a/src/server/mempool.rs +++ b/src/server/mempool.rs @@ -40,7 +40,7 @@ impl Mempool { &mut self, db: &AnyStore, removed_txids: &[crate::be::Txid], - txs: &[(crate::be::Txid, be::Transaction)], + txs: &[(crate::be::Txid, &be::Transaction)], ) { self.remove(removed_txids); self.add(db, txs); @@ -63,7 +63,7 @@ impl Mempool { .retain(|k, _| !txids.contains(&k.txid.into())); } - fn add(&mut self, db: &AnyStore, txs: &[(crate::be::Txid, be::Transaction)]) { + fn add(&mut self, db: &AnyStore, txs: &[(crate::be::Txid, &be::Transaction)]) { // update the unconfirmed utxo set let outputs_created = txs .iter() @@ -142,15 +142,22 @@ impl Mempool { } } - pub fn seen(&self, script_hashes: &[ScriptHash]) -> Vec> { + pub fn append_seen(&self, script_hashes: &[ScriptHash], out: &mut [Vec]) { + for (h, tx_seens) in script_hashes.iter().zip(out.iter_mut()) { + let txid_positions = self.hash_txids.get(h).map(Vec::as_slice).unwrap_or(&[]); + tx_seens.reserve(txid_positions.len()); + tx_seens.extend( + txid_positions + .iter() + .map(|(txid, position)| TxSeen::mempool(*txid, V::from_raw(*position))), + ); + } + } + + pub fn has_seen(&self, script_hashes: &[ScriptHash]) -> Vec { let mut result = Vec::with_capacity(script_hashes.len()); for h in script_hashes { - let txid_positions = self.hash_txids.get(h).map(Vec::as_slice).unwrap_or(&[]); - let tx_seens: Vec = txid_positions - .into_iter() - .map(|(txid, position)| TxSeen::mempool(*txid, V::from_raw(*position))) - .collect(); - result.push(tx_seens); + result.push(self.hash_txids.get(h).is_some_and(|entries| !entries.is_empty())); } result } diff --git a/src/server/route.rs b/src/server/route.rs index a2b9305..a135c87 100644 --- a/src/server/route.rs +++ b/src/server/route.rs @@ -18,7 +18,7 @@ use hyper::{ use prometheus::Encoder; use serde::Serialize; use std::{ - collections::{BTreeMap, HashMap, HashSet}, + collections::{BTreeMap, HashSet}, hash::{DefaultHasher, Hash, Hasher}, str::FromStr, sync::Arc, @@ -201,10 +201,9 @@ pub async fn route( .map_err(|_| Error::BodyReadTimeout)? .map_err(|_| Error::BodyTooLarge)? .to_bytes(); - let result = std::str::from_utf8(&whole_body) - .map_err(|e| Error::String(e.to_string()))? - .to_string(); - let tx = be::Transaction::from_str(&result, network.into()) + let tx_hex = std::str::from_utf8(&whole_body) + .map_err(|e| Error::String(e.to_string()))?; + let tx = be::Transaction::from_str(tx_hex, network.into()) .map_err(|e| Error::String(e.to_string()))?; let result = client.lock().await.broadcast(&tx).await; match result { @@ -386,27 +385,23 @@ fn parse_query( max_addresses: usize, network: Network, ) -> Result { - let mut params = form_urlencoded::parse(query.as_bytes()) - .into_owned() - .collect::>(); - - let page = params - .get("page") - .map(|e| e.parse().unwrap_or(0)) - .unwrap_or(0u16); - - let to_index = params - .get("to_index") - .map(|e| e.parse().unwrap_or(0)) - .unwrap_or(0u32); - - let utxo_only = params - .get("utxo_only") - .map(|e| e.parse().unwrap_or(false)) - .unwrap_or(false); - - let descriptor = params.remove("descriptor"); - let addresses = params.remove("addresses"); + let mut page = 0u16; + let mut to_index = 0u32; + let mut utxo_only = false; + let mut descriptor = None; + let mut addresses = None; + + for (key, value) in form_urlencoded::parse(query.as_bytes()) { + match key.as_ref() { + "page" => page = value.parse().unwrap_or(0), + "to_index" => to_index = value.parse().unwrap_or(0), + "utxo_only" => utxo_only = value.parse().unwrap_or(false), + "descriptor" => descriptor = Some(value.into_owned()), + "addresses" => addresses = Some(value.into_owned()), + _ => {} + } + } + match (descriptor, addresses) { (Some(_), Some(_)) => Err(Error::CannotSpecifyBothDescriptorAndAddresses), (Some(desc_str), None) => { @@ -461,18 +456,19 @@ fn parse_descriptor_query( is_testnet_or_regtest: bool, network: Network, ) -> Result { - let params = form_urlencoded::parse(query.as_bytes()) - .into_owned() - .collect::>(); + let mut descriptor = None; + for (key, value) in form_urlencoded::parse(query.as_bytes()) { + if key == "descriptor" { + descriptor = Some(value.into_owned()); + } + } - let desc_str = params - .get("descriptor") - .ok_or(Error::AtLeastOneFieldMandatory)?; + let desc_str = descriptor.ok_or(Error::AtLeastOneFieldMandatory)?; - let desc_str = if is_likely_age_encrypted(desc_str) { - encryption::decrypt(desc_str, key)? + let desc_str = if is_likely_age_encrypted(&desc_str) { + encryption::decrypt(&desc_str, key)? } else { - desc_str.clone() + desc_str }; let descriptor = be::Descriptor::from_str(&desc_str, network)?; @@ -555,8 +551,13 @@ async fn handle_single_address( }) .collect(); - let seen_mempool = state.mempool.lock().await.seen(&script_hash).remove(0); - result.extend(seen_mempool.iter().map(|tx_seen| EsploraTx { + let mut seen_mempool = vec![Vec::new()]; + state + .mempool + .lock() + .await + .append_seen(&script_hash, &mut seen_mempool); + result.extend(seen_mempool[0].iter().map(|tx_seen| EsploraTx { txid: tx_seen.txid, status: Status { block_height: Some(-1), @@ -577,10 +578,8 @@ async fn handle_single_address( } } - let result = serde_json::to_string(&result).unwrap(); - any_resp( - result.into_bytes(), + serde_json::to_vec(&result).unwrap(), hyper::StatusCode::OK, Some("application/json"), Some(state.cache_control_seconds), @@ -677,9 +676,10 @@ async fn handle_waterfalls_req( for tx_seen in tx_seens.iter_mut() { if tx_seen.height > 0 { // unconfirmed has height 0, we don't want to map those to the genesis block - let (hash, ts) = blocks_hash_ts[tx_seen.height as usize]; - tx_seen.block_hash = Some(hash); - tx_seen.block_timestamp = Some(ts); + if let Some((hash, ts)) = blocks_hash_ts.get(tx_seen.height as usize) { + tx_seen.block_hash = Some(*hash); + tx_seen.block_timestamp = Some(*ts); + } if !utxo_only_req { // setting v to undefined avoids to serialize it since is not needed for full history scan @@ -718,10 +718,8 @@ async fn handle_waterfalls_req( minicbor::encode(&waterfall_response, &mut bytes).unwrap(); bytes } else { - serde_json::to_string(&waterfall_response) + serde_json::to_vec(&waterfall_response) .expect("does not contain a map with non-string keys") - .as_bytes() - .to_vec() }; let m = sign_response(&state.secp, &state.wif_key, &result); @@ -778,13 +776,13 @@ async fn handle_last_used_index( derive_script_hashes_batch(state, desc, batch_start, GAP_LIMIT).await; // Check which scripts have history (either confirmed or mempool) - let seen_blockchain = db.get_history(&scripts).unwrap(); - let seen_mempool = state.mempool.lock().await.seen(&scripts); + let seen_blockchain = db.has_history(&scripts).unwrap(); + let seen_mempool = state.mempool.lock().await.has_seen(&scripts); // Find the max index with activity in this batch let mut batch_has_activity = false; for (i, (conf, unconf)) in seen_blockchain.iter().zip(seen_mempool.iter()).enumerate() { - if !conf.is_empty() || !unconf.is_empty() { + if *conf || *unconf { last_used_for_chain = Some(batch_start + i as u32); batch_has_activity = true; } @@ -811,9 +809,7 @@ async fn handle_last_used_index( tip: tip_hash, }; - let result = serde_json::to_string(&response) - .expect("serialization cannot fail") - .into_bytes(); + let result = serde_json::to_vec(&response).expect("serialization cannot fail"); log::info!( "{id:x}: last_used_index external={:?} internal={:?}, elapsed: {:.2?}", @@ -873,13 +869,11 @@ async fn find_scripts( scripts: Vec, ) -> bool { let mut seen_blockchain = db.get_history(&scripts).unwrap(); - let seen_mempool = state.mempool.lock().await.seen(&scripts); - - for (conf, unconf) in seen_blockchain.iter_mut().zip(seen_mempool.iter()) { - for tx_seen in unconf { - conf.push(tx_seen.clone()); - } - } + state + .mempool + .lock() + .await + .append_seen(&scripts, &mut seen_blockchain); let is_last = seen_blockchain.iter().all(|e| e.is_empty()); result.extend(seen_blockchain); is_last diff --git a/src/store/db.rs b/src/store/db.rs index b8f0b6f..f649fb9 100644 --- a/src/store/db.rs +++ b/src/store/db.rs @@ -7,7 +7,8 @@ use elements::{ }; use fxhash::FxHasher; use rocksdb::{ - BlockBasedOptions, BoundColumnFamily, Cache, DBCompressionType, MergeOperands, Options, DB, + BlockBasedOptions, BoundColumnFamily, Cache, DBCompressionType, DBPinnableSlice, + MergeOperands, Options, DB, }; use crate::V; @@ -76,6 +77,39 @@ const VEC_TX_SEEN_MAX_SIZE: usize = 50; // 32 bytes (txid) + 9 bytes (height) + const VEC_TX_SEEN_MIN_SIZE: usize = 34; // 32 bytes (txid) + 1 byte (height) + 1 byte (v) impl DBStore { + fn raw_history_multi_get( + &self, + scripts: &[ScriptHash], + ) -> Result>>> { + if scripts.is_empty() { + return Ok(vec![]); + } + + // Benchmarks on temporary RocksDB instances consistently showed that sorting + // the history keys before batched_multi_get_cf is a net win, including the + // production-shaped 20-lookups case, even after reordering results back. + let cf = self.history_cf(); + let mut indexed_keys: Vec<_> = scripts + .iter() + .enumerate() + .map(|(index, script)| (index, script.to_be_bytes())) + .collect(); + indexed_keys.sort_unstable_by_key(|(_, key)| *key); + let sorted_results = self.db.batched_multi_get_cf( + &cf, + indexed_keys.iter().map(|(_, key)| key), + true, + ); + let mut reordered: Vec>> = std::iter::repeat_with(|| None) + .take(scripts.len()) + .collect(); + for ((index, _), result) in indexed_keys.into_iter().zip(sorted_results.into_iter()) { + reordered[index] = result.map_err(anyhow::Error::from)?; + } + + Ok(reordered) + } + fn create_cf_descriptors(shared_db_cache_mb: u64) -> Vec { let cache_size = (shared_db_cache_mb * 1024 * 1024) as usize; // HyperClockCache is lock-free, reducing mutex contention under concurrent reads. @@ -559,17 +593,14 @@ impl Store for DBStore { } fn get_utxos(&self, outpoints: &[OutPoint]) -> Result>> { - let mut keys = Vec::with_capacity(outpoints.len()); let cf = self.utxo_cf(); - for outpoint in outpoints { - keys.push((&cf, serialize_outpoint(outpoint))); - } - let db_results = self.db.multi_get_cf(keys); + let keys: Vec<_> = outpoints.iter().map(serialize_outpoint).collect(); + let db_results = self.db.batched_multi_get_cf(&cf, keys.iter(), false); let result: Vec<_> = db_results .into_iter() .map(|e| { e.unwrap().map(|e| { - let bytes = e.try_into().unwrap(); + let bytes = e.as_ref().try_into().unwrap(); u64::from_be_bytes(bytes) }) }) @@ -583,18 +614,9 @@ impl Store for DBStore { .with_label_values(&["all"]) .start_timer(); - if scripts.is_empty() { - return Ok(vec![]); - } - let mut keys = Vec::with_capacity(scripts.len()); - let cf = self.history_cf(); - for script in scripts { - keys.push((&cf, script.to_be_bytes())); - } - let db_results = self.db.multi_get_cf(keys); + let db_results = self.raw_history_multi_get(scripts)?; let mut result = Vec::with_capacity(scripts.len()); for db_result in db_results { - let db_result = db_result?; match db_result { None => result.push(vec![]), Some(e) => { @@ -607,6 +629,21 @@ impl Store for DBStore { Ok(result) } + fn has_history(&self, scripts: &[ScriptHash]) -> Result> { + let timer = crate::WATERFALLS_DB_HISTORY_HISTOGRAM + .with_label_values(&["all"]) + .start_timer(); + + let result = self + .raw_history_multi_get(scripts)? + .into_iter() + .map(|entry| entry.is_some_and(|bytes| !bytes.is_empty())) + .collect(); + + timer.observe_duration(); + Ok(result) + } + fn update( &self, block_meta: &BlockMeta, diff --git a/src/store/memory.rs b/src/store/memory.rs index 5e477af..5c0da7d 100644 --- a/src/store/memory.rs +++ b/src/store/memory.rs @@ -53,6 +53,15 @@ impl Store for MemoryStore { Ok(result) } + fn has_history(&self, scripts: &[crate::ScriptHash]) -> anyhow::Result> { + let history = self.history.lock().unwrap(); + let mut result = Vec::with_capacity(scripts.len()); + for script in scripts { + result.push(history.get(script).is_some_and(|entries| !entries.is_empty())); + } + Ok(result) + } + fn update( &self, block_meta: &BlockMeta, diff --git a/src/store/mod.rs b/src/store/mod.rs index 0d83577..d7659fe 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -42,6 +42,9 @@ pub trait Store { /// Get history of multiple (usually 20 like the gap limit) scripts hash at once fn get_history(&self, scripts: &[ScriptHash]) -> Result>>; + /// Check whether multiple scripts have any history without decoding full entries. + fn has_history(&self, scripts: &[ScriptHash]) -> Result>; + /// update the store with all the data from the last block fn update( &self, @@ -92,6 +95,14 @@ impl Store for AnyStore { } } + fn has_history(&self, scripts: &[ScriptHash]) -> Result> { + match self { + #[cfg(feature = "db")] + AnyStore::Db(d) => d.has_history(scripts), + AnyStore::Mem(m) => m.has_history(scripts), + } + } + fn update( &self, block_meta: &BlockMeta, diff --git a/src/threads/mempool.rs b/src/threads/mempool.rs index 6798d97..484ffae 100644 --- a/src/threads/mempool.rs +++ b/src/threads/mempool.rs @@ -53,7 +53,7 @@ async fn sync_mempool_once( let db = &state.store; let tip = state.tip_height().await; - let new: Vec<_> = current.difference(mempool_txids).collect(); + let new: Vec<_> = current.difference(mempool_txids).cloned().collect(); let removed: Vec<_> = mempool_txids.difference(¤t).cloned().collect(); crate::MEMPOOL_NEW_TXS_COUNTER.inc_by(new.len() as u64); let is_big_delta = new.len() >= BIG_MEMPOOL_DELTA_THRESHOLD @@ -75,8 +75,7 @@ async fn sync_mempool_once( // it's fine if the zmq thread wait for us. // and it's also beneficial so that the final mempool_cache.clear() does not delete tx we didn't use yet let mut mempool_cache = state.mempool_cache.lock().await; - let mut txs = vec![]; - for new_txid in new { + for new_txid in &new { let cached_tx = mempool_cache.get(new_txid).cloned(); cache_counter("mempool_cache", cached_tx.is_some()); let tx = if let Some(tx) = cached_tx { @@ -86,7 +85,6 @@ async fn sync_mempool_once( }; match tx { Ok(tx) => { - txs.push((*new_txid, tx.clone())); mempool_cache.insert(*new_txid, tx); } Err(e) => { @@ -102,6 +100,10 @@ async fn sync_mempool_once( } } } + let txs: Vec<_> = new + .iter() + .filter_map(|txid| mempool_cache.get(txid).map(|tx| (*txid, tx))) + .collect(); { let mut m = state.mempool.lock().await; m.update(db, &removed, &txs); diff --git a/tests/data/liquid-testnet-descriptors.md b/tests/data/liquid-testnet-descriptors.md new file mode 100644 index 0000000..0879795 --- /dev/null +++ b/tests/data/liquid-testnet-descriptors.md @@ -0,0 +1,3 @@ +ct(slip77(1bda6cd71a1e206e3eb793e5a4d98a46c3fa473c9ab7bdef9bb9c814764d6614),elwpkh([cb4ba44a/84'/1'/0']tpubDDrybtUajFcgXC85rvwPsh1oU7Azx4kJ9BAiRzMbByqK7UnVXY3gDRJPwEDfaQwguNUZFzrhavJGgEhbsfuebyxUSZQnjLezWVm2Vdqb7UM/<0;1>/*))#za9ktavp # big wallet more than 6k txs + +ct(slip77(ac53739ddde9fdf6bba3dbc51e989b09aa8c9cdce7b7d7eddd49cec86ddf71f7),elwpkh([93970d14/84'/1'/0']tpubDC3BrFCCjXq4jAceV8k6UACxDDJCFb1eb7R7BiKYUGZdNagEhNfJoYtUrRdci9JFs1meiGGModvmNm8PrqkrEjJ6mpt6gA1DRNU8vu7GqXH/<0;1>/*))#u0y4axgs # wallet with 82 txs @April 2026