Skip to content
Merged
77 changes: 76 additions & 1 deletion benches/benches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ criterion_group!(
writebatch_sorting,
hasher,
txid_from_hex,
block_cache
block_cache,
history_multi_get
);
criterion_main!(benches);

Expand Down Expand Up @@ -543,3 +544,77 @@ pub fn block_cache(c: &mut Criterion) {
group.finish();
}
}

pub fn history_multi_get(c: &mut Criterion) {
const DEFAULT_NUM_HISTORY_KEYS: u64 = 500_000;
let num_history_keys = std::env::var("WATERFALLS_HISTORY_BENCH_ROWS")
.ok()
.and_then(|value| value.parse::<u64>().ok())
.unwrap_or(DEFAULT_NUM_HISTORY_KEYS);

let cache = Cache::new_hyper_clock_cache(8 * 1024 * 1024, 0);
let dir = tempfile::TempDir::new().unwrap();
let db = open_cache_bench_db(dir.path(), &cache);
populate_cache_bench_db(&db, 0, num_history_keys);

let cf = db.cf_handle("history").unwrap();
for lookup_count in [20usize, 512] {
let mut rng = thread_rng();
let scripts: Vec<u64> = (0..lookup_count)
.map(|_| rng.next_u64() % (num_history_keys * 2))
.collect();

let mut group = c.benchmark_group(format!(
"history_multi_get/{lookup_count}/{num_history_keys}"
));
group.sample_size(20);

group.bench_function("batched_unsorted", |b| {
b.iter(|| {
black_box(bench_raw_history_multi_get_old(&db, &cf, &scripts));
});
});

group.bench_function("batched_sorted_reordered", |b| {
b.iter(|| {
black_box(bench_raw_history_multi_get_new(&db, &cf, &scripts));
});
});

group.finish();
}
}

fn bench_raw_history_multi_get_old(
db: &DB,
cf: &impl rocksdb::AsColumnFamilyRef,
scripts: &[u64],
) -> usize {
let keys: Vec<_> = scripts.iter().map(|script| script.to_be_bytes()).collect();
let results = db.batched_multi_get_cf(cf, keys.iter(), false);
results
.into_iter()
.map(|result| result.unwrap().as_ref().map_or(0, |value| value.len()))
.sum()
}

fn bench_raw_history_multi_get_new(
db: &DB,
cf: &impl rocksdb::AsColumnFamilyRef,
scripts: &[u64],
) -> usize {
let mut indexed_keys: Vec<_> = scripts
.iter()
.enumerate()
.map(|(index, script)| (index, script.to_be_bytes()))
.collect();
indexed_keys.sort_unstable_by_key(|(_, key)| *key);

let sorted_results = db.batched_multi_get_cf(cf, indexed_keys.iter().map(|(_, key)| key), true);
let mut reordered = vec![0usize; scripts.len()];
for ((index, _), result) in indexed_keys.into_iter().zip(sorted_results.into_iter()) {
reordered[index] = result.unwrap().as_ref().map_or(0, |value| value.len());
}

reordered.into_iter().sum()
}
25 changes: 16 additions & 9 deletions src/server/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl Mempool {
&mut self,
db: &AnyStore,
removed_txids: &[crate::be::Txid],
txs: &[(crate::be::Txid, be::Transaction)],
txs: &[(crate::be::Txid, &be::Transaction)],
) {
self.remove(removed_txids);
self.add(db, txs);
Expand All @@ -63,7 +63,7 @@ impl Mempool {
.retain(|k, _| !txids.contains(&k.txid.into()));
}

fn add(&mut self, db: &AnyStore, txs: &[(crate::be::Txid, be::Transaction)]) {
fn add(&mut self, db: &AnyStore, txs: &[(crate::be::Txid, &be::Transaction)]) {
// update the unconfirmed utxo set
let outputs_created = txs
.iter()
Expand Down Expand Up @@ -142,15 +142,22 @@ impl Mempool {
}
}

pub fn seen(&self, script_hashes: &[ScriptHash]) -> Vec<Vec<TxSeen>> {
pub fn append_seen(&self, script_hashes: &[ScriptHash], out: &mut [Vec<TxSeen>]) {
for (h, tx_seens) in script_hashes.iter().zip(out.iter_mut()) {
let txid_positions = self.hash_txids.get(h).map(Vec::as_slice).unwrap_or(&[]);
tx_seens.reserve(txid_positions.len());
tx_seens.extend(
txid_positions
.iter()
.map(|(txid, position)| TxSeen::mempool(*txid, V::from_raw(*position))),
);
}
}

pub fn has_seen(&self, script_hashes: &[ScriptHash]) -> Vec<bool> {
let mut result = Vec::with_capacity(script_hashes.len());
for h in script_hashes {
let txid_positions = self.hash_txids.get(h).map(Vec::as_slice).unwrap_or(&[]);
let tx_seens: Vec<TxSeen> = txid_positions
.into_iter()
.map(|(txid, position)| TxSeen::mempool(*txid, V::from_raw(*position)))
.collect();
result.push(tx_seens);
result.push(self.hash_txids.get(h).is_some_and(|entries| !entries.is_empty()));
}
result
}
Expand Down
112 changes: 53 additions & 59 deletions src/server/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use hyper::{
use prometheus::Encoder;
use serde::Serialize;
use std::{
collections::{BTreeMap, HashMap, HashSet},
collections::{BTreeMap, HashSet},
hash::{DefaultHasher, Hash, Hasher},
str::FromStr,
sync::Arc,
Expand Down Expand Up @@ -201,10 +201,9 @@ pub async fn route(
.map_err(|_| Error::BodyReadTimeout)?
.map_err(|_| Error::BodyTooLarge)?
.to_bytes();
let result = std::str::from_utf8(&whole_body)
.map_err(|e| Error::String(e.to_string()))?
.to_string();
let tx = be::Transaction::from_str(&result, network.into())
let tx_hex = std::str::from_utf8(&whole_body)
.map_err(|e| Error::String(e.to_string()))?;
let tx = be::Transaction::from_str(tx_hex, network.into())
.map_err(|e| Error::String(e.to_string()))?;
let result = client.lock().await.broadcast(&tx).await;
match result {
Expand Down Expand Up @@ -386,27 +385,23 @@ fn parse_query(
max_addresses: usize,
network: Network,
) -> Result<WaterfallRequest, Error> {
let mut params = form_urlencoded::parse(query.as_bytes())
.into_owned()
.collect::<HashMap<String, String>>();

let page = params
.get("page")
.map(|e| e.parse().unwrap_or(0))
.unwrap_or(0u16);

let to_index = params
.get("to_index")
.map(|e| e.parse().unwrap_or(0))
.unwrap_or(0u32);

let utxo_only = params
.get("utxo_only")
.map(|e| e.parse().unwrap_or(false))
.unwrap_or(false);

let descriptor = params.remove("descriptor");
let addresses = params.remove("addresses");
let mut page = 0u16;
let mut to_index = 0u32;
let mut utxo_only = false;
let mut descriptor = None;
let mut addresses = None;

for (key, value) in form_urlencoded::parse(query.as_bytes()) {
match key.as_ref() {
"page" => page = value.parse().unwrap_or(0),
"to_index" => to_index = value.parse().unwrap_or(0),
"utxo_only" => utxo_only = value.parse().unwrap_or(false),
"descriptor" => descriptor = Some(value.into_owned()),
"addresses" => addresses = Some(value.into_owned()),
_ => {}
}
}

match (descriptor, addresses) {
(Some(_), Some(_)) => Err(Error::CannotSpecifyBothDescriptorAndAddresses),
(Some(desc_str), None) => {
Expand Down Expand Up @@ -461,18 +456,19 @@ fn parse_descriptor_query(
is_testnet_or_regtest: bool,
network: Network,
) -> Result<be::Descriptor, Error> {
let params = form_urlencoded::parse(query.as_bytes())
.into_owned()
.collect::<HashMap<String, String>>();
let mut descriptor = None;
for (key, value) in form_urlencoded::parse(query.as_bytes()) {
if key == "descriptor" {
descriptor = Some(value.into_owned());
}
}

let desc_str = params
.get("descriptor")
.ok_or(Error::AtLeastOneFieldMandatory)?;
let desc_str = descriptor.ok_or(Error::AtLeastOneFieldMandatory)?;

let desc_str = if is_likely_age_encrypted(desc_str) {
encryption::decrypt(desc_str, key)?
let desc_str = if is_likely_age_encrypted(&desc_str) {
encryption::decrypt(&desc_str, key)?
} else {
desc_str.clone()
desc_str
};

let descriptor = be::Descriptor::from_str(&desc_str, network)?;
Expand Down Expand Up @@ -555,8 +551,13 @@ async fn handle_single_address(
})
.collect();

let seen_mempool = state.mempool.lock().await.seen(&script_hash).remove(0);
result.extend(seen_mempool.iter().map(|tx_seen| EsploraTx {
let mut seen_mempool = vec![Vec::new()];
state
.mempool
.lock()
.await
.append_seen(&script_hash, &mut seen_mempool);
result.extend(seen_mempool[0].iter().map(|tx_seen| EsploraTx {
txid: tx_seen.txid,
status: Status {
block_height: Some(-1),
Expand All @@ -577,10 +578,8 @@ async fn handle_single_address(
}
}

let result = serde_json::to_string(&result).unwrap();

any_resp(
result.into_bytes(),
serde_json::to_vec(&result).unwrap(),
hyper::StatusCode::OK,
Some("application/json"),
Some(state.cache_control_seconds),
Expand Down Expand Up @@ -677,9 +676,10 @@ async fn handle_waterfalls_req(
for tx_seen in tx_seens.iter_mut() {
if tx_seen.height > 0 {
// unconfirmed has height 0, we don't want to map those to the genesis block
let (hash, ts) = blocks_hash_ts[tx_seen.height as usize];
tx_seen.block_hash = Some(hash);
tx_seen.block_timestamp = Some(ts);
if let Some((hash, ts)) = blocks_hash_ts.get(tx_seen.height as usize) {
tx_seen.block_hash = Some(*hash);
tx_seen.block_timestamp = Some(*ts);
}

if !utxo_only_req {
// setting v to undefined avoids to serialize it since is not needed for full history scan
Expand Down Expand Up @@ -718,10 +718,8 @@ async fn handle_waterfalls_req(
minicbor::encode(&waterfall_response, &mut bytes).unwrap();
bytes
} else {
serde_json::to_string(&waterfall_response)
serde_json::to_vec(&waterfall_response)
.expect("does not contain a map with non-string keys")
.as_bytes()
.to_vec()
};

let m = sign_response(&state.secp, &state.wif_key, &result);
Expand Down Expand Up @@ -778,13 +776,13 @@ async fn handle_last_used_index(
derive_script_hashes_batch(state, desc, batch_start, GAP_LIMIT).await;

// Check which scripts have history (either confirmed or mempool)
let seen_blockchain = db.get_history(&scripts).unwrap();
let seen_mempool = state.mempool.lock().await.seen(&scripts);
let seen_blockchain = db.has_history(&scripts).unwrap();
let seen_mempool = state.mempool.lock().await.has_seen(&scripts);

// Find the max index with activity in this batch
let mut batch_has_activity = false;
for (i, (conf, unconf)) in seen_blockchain.iter().zip(seen_mempool.iter()).enumerate() {
if !conf.is_empty() || !unconf.is_empty() {
if *conf || *unconf {
last_used_for_chain = Some(batch_start + i as u32);
batch_has_activity = true;
}
Expand All @@ -811,9 +809,7 @@ async fn handle_last_used_index(
tip: tip_hash,
};

let result = serde_json::to_string(&response)
.expect("serialization cannot fail")
.into_bytes();
let result = serde_json::to_vec(&response).expect("serialization cannot fail");

log::info!(
"{id:x}: last_used_index external={:?} internal={:?}, elapsed: {:.2?}",
Expand Down Expand Up @@ -873,13 +869,11 @@ async fn find_scripts(
scripts: Vec<u64>,
) -> bool {
let mut seen_blockchain = db.get_history(&scripts).unwrap();
let seen_mempool = state.mempool.lock().await.seen(&scripts);

for (conf, unconf) in seen_blockchain.iter_mut().zip(seen_mempool.iter()) {
for tx_seen in unconf {
conf.push(tx_seen.clone());
}
}
state
.mempool
.lock()
.await
.append_seen(&scripts, &mut seen_blockchain);
let is_last = seen_blockchain.iter().all(|e| e.is_empty());
result.extend(seen_blockchain);
is_last
Expand Down
Loading
Loading