-
Notifications
You must be signed in to change notification settings - Fork 9
feat: implement parallel filter matching #303
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,166 @@ | ||
| use alloc::vec::Vec; | ||
| use dashcore::bip158::BlockFilter; | ||
| use dashcore::prelude::CoreBlockHeight; | ||
| use dashcore::{Address, BlockHash}; | ||
| use rayon::prelude::{IntoParallelIterator, ParallelIterator}; | ||
| use std::collections::{BTreeSet, HashMap}; | ||
|
|
||
| #[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] | ||
ZocoLini marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| pub struct FilterMatchKey { | ||
| height: CoreBlockHeight, | ||
| hash: BlockHash, | ||
| } | ||
|
|
||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I cannot give suggestions about this struct since I don't know how is it gonna be used |
||
| impl FilterMatchKey { | ||
| pub fn new(height: CoreBlockHeight, hash: BlockHash) -> Self { | ||
| Self { | ||
| height, | ||
| hash, | ||
| } | ||
| } | ||
| pub fn height(&self) -> CoreBlockHeight { | ||
| self.height | ||
| } | ||
| pub fn hash(&self) -> &BlockHash { | ||
| &self.hash | ||
| } | ||
| } | ||
|
|
||
| /// Check compact filters for addresses and return the keys that matched. | ||
| pub fn check_compact_filters_for_addresses( | ||
| input: &HashMap<FilterMatchKey, BlockFilter>, | ||
| addresses: Vec<Address>, | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since we are only using the hash inside FilterMatchKey it feels overkill to have a new struct for that. I also see kinda excesive a HashMap when we are only iterating over the entries, isn't possible to do: pub fn check_compact_filters_for_addresses(
input: &[BlockFilter],
addresses: &[Address],
) -> BTreeSet<&BlockFilter> {
let script_pubkey_bytes: Vec<Vec<u8>> =
addresses.iter().map(|address| address.script_pubkey().to_bytes()).collect();
input
.into_par_iter()
.filter_map(|(filter)| {
filter
.match_any(filter.block_hash, script_pubkey_bytes.iter().map(|v| v.as_slice()))
.unwrap_or(false)
.then_some(key.clone())
})
.collect()
}And if we need the height at some point after this method call, we can query the storage using the filter_block_hash. About the returned collections, since idk the usage of it I cant tell if a Vec<&BlockFilter> can do the job
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's done like this to get sorted filter match outputs to request/process blocks in height order. Sure there might be other ways to do this but thats how its handled now in the sync rewrite. You can later come up with something else if you need to. |
||
| ) -> BTreeSet<FilterMatchKey> { | ||
| let script_pubkey_bytes: Vec<Vec<u8>> = | ||
| addresses.iter().map(|address| address.script_pubkey().to_bytes()).collect(); | ||
|
|
||
| input | ||
| .into_par_iter() | ||
| .filter_map(|(key, filter)| { | ||
| filter | ||
| .match_any(key.hash(), script_pubkey_bytes.iter().map(|v| v.as_slice())) | ||
| .unwrap_or(false) | ||
| .then_some(key.clone()) | ||
| }) | ||
| .collect() | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use super::*; | ||
| use crate::Network; | ||
| use dashcore::{Block, Transaction}; | ||
|
|
||
| #[test] | ||
| fn test_empty_input_returns_empty() { | ||
| let result = check_compact_filters_for_addresses(&HashMap::new(), vec![]); | ||
| assert!(result.is_empty()); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_empty_addresses_returns_empty() { | ||
| let address = Address::dummy(Network::Regtest, 1); | ||
| let tx = Transaction::dummy(&address, 0..0, &[1]); | ||
| let block = Block::dummy(100, vec![tx]); | ||
| let filter = BlockFilter::dummy(&block); | ||
| let key = FilterMatchKey::new(100, block.block_hash()); | ||
|
|
||
| let mut input = HashMap::new(); | ||
| input.insert(key.clone(), filter); | ||
|
|
||
| let output = check_compact_filters_for_addresses(&input, vec![]); | ||
| assert!(!output.contains(&key)); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_matching_filter() { | ||
| let address = Address::dummy(Network::Regtest, 1); | ||
| let tx = Transaction::dummy(&address, 0..0, &[1]); | ||
| let block = Block::dummy(100, vec![tx]); | ||
| let filter = BlockFilter::dummy(&block); | ||
| let key = FilterMatchKey::new(100, block.block_hash()); | ||
|
|
||
| let mut input = HashMap::new(); | ||
| input.insert(key.clone(), filter); | ||
|
|
||
| let output = check_compact_filters_for_addresses(&input, vec![address]); | ||
| assert!(output.contains(&key)); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_non_matching_filter() { | ||
| let address = Address::dummy(Network::Regtest, 1); | ||
| let address_other = Address::dummy(Network::Regtest, 2); | ||
|
|
||
| let tx = Transaction::dummy(&address_other, 0..0, &[1]); | ||
| let block = Block::dummy(100, vec![tx]); | ||
| let filter = BlockFilter::dummy(&block); | ||
| let key = FilterMatchKey::new(100, block.block_hash()); | ||
|
|
||
| let mut input = HashMap::new(); | ||
| input.insert(key.clone(), filter); | ||
|
|
||
| let output = check_compact_filters_for_addresses(&input, vec![address]); | ||
| assert!(!output.contains(&key)); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_batch_mixed_results() { | ||
| let unrelated_address = Address::dummy(Network::Regtest, 0); | ||
| let address_1 = Address::dummy(Network::Regtest, 1); | ||
| let address_2 = Address::dummy(Network::Regtest, 2); | ||
|
|
||
| let tx_1 = Transaction::dummy(&address_1, 0..0, &[1]); | ||
| let block_1 = Block::dummy(100, vec![tx_1]); | ||
| let filter_1 = BlockFilter::dummy(&block_1); | ||
| let key_1 = FilterMatchKey::new(100, block_1.block_hash()); | ||
|
|
||
| let tx_2 = Transaction::dummy(&address_2, 0..0, &[2]); | ||
| let block_2 = Block::dummy(200, vec![tx_2]); | ||
| let filter_2 = BlockFilter::dummy(&block_2); | ||
| let key_2 = FilterMatchKey::new(200, block_2.block_hash()); | ||
|
|
||
| let tx_3 = Transaction::dummy(&unrelated_address, 0..0, &[10]); | ||
| let block_3 = Block::dummy(300, vec![tx_3]); | ||
| let filter_3 = BlockFilter::dummy(&block_3); | ||
| let key_3 = FilterMatchKey::new(300, block_3.block_hash()); | ||
|
|
||
| let mut input = HashMap::new(); | ||
| input.insert(key_1.clone(), filter_1); | ||
| input.insert(key_2.clone(), filter_2); | ||
| input.insert(key_3.clone(), filter_3); | ||
|
|
||
| let output = check_compact_filters_for_addresses(&input, vec![address_1, address_2]); | ||
| assert_eq!(output.len(), 2); | ||
| assert!(output.contains(&key_1)); | ||
| assert!(output.contains(&key_2)); | ||
| assert!(!output.contains(&key_3)); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_output_sorted_by_height() { | ||
| let address = Address::dummy(Network::Regtest, 1); | ||
|
|
||
| // Create blocks at different heights (inserted in non-sorted order) | ||
| let heights = [500, 100, 300, 200, 400]; | ||
| let mut input = HashMap::new(); | ||
|
|
||
| for (i, &height) in heights.iter().enumerate() { | ||
| let tx = Transaction::dummy(&address, 0..0, &[i as u64]); | ||
| let block = Block::dummy(height, vec![tx]); | ||
| let filter = BlockFilter::dummy(&block); | ||
| let key = FilterMatchKey::new(height, block.block_hash()); | ||
| input.insert(key, filter); | ||
| } | ||
|
|
||
| let output = check_compact_filters_for_addresses(&input, vec![address]); | ||
|
|
||
| // Verify output is sorted by height (ascending) | ||
| let heights_out: Vec<CoreBlockHeight> = output.iter().map(|k| k.height()).collect(); | ||
| let mut sorted_heights = heights_out.clone(); | ||
| sorted_heights.sort(); | ||
|
|
||
| assert_eq!(heights_out, sorted_heights); | ||
| assert_eq!(heights_out, vec![100, 200, 300, 400, 500]); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,9 +4,8 @@ use alloc::string::String; | |
| use alloc::vec::Vec; | ||
| use async_trait::async_trait; | ||
| use core::fmt::Write as _; | ||
| use dashcore::bip158::BlockFilter; | ||
| use dashcore::prelude::CoreBlockHeight; | ||
| use dashcore::{Block, BlockHash, Transaction}; | ||
| use dashcore::{Address, Block, Transaction}; | ||
| use key_wallet::transaction_checking::transaction_router::TransactionRouter; | ||
| use key_wallet::transaction_checking::TransactionContext; | ||
| use key_wallet::wallet::managed_wallet_info::wallet_info_interface::WalletInfoInterface; | ||
|
|
@@ -58,30 +57,8 @@ impl<T: WalletInfoInterface + Send + Sync + 'static> WalletInterface for WalletM | |
| .await; | ||
| } | ||
|
|
||
| async fn check_compact_filter(&mut self, filter: &BlockFilter, block_hash: &BlockHash) -> bool { | ||
| // Collect all scripts we're watching | ||
| let mut script_bytes = Vec::new(); | ||
|
|
||
| // Get all wallet addresses for this network | ||
| for info in self.wallet_infos.values() { | ||
| let monitored = info.monitored_addresses(); | ||
| for address in monitored { | ||
| script_bytes.push(address.script_pubkey().as_bytes().to_vec()); | ||
| } | ||
| } | ||
|
|
||
| // If we don't watch any scripts for this network, there can be no match. | ||
| // Note: BlockFilterReader::match_any returns true for an empty query set, | ||
| // so we must guard this case explicitly to avoid false positives. | ||
| let hit = if script_bytes.is_empty() { | ||
| false | ||
| } else { | ||
| filter | ||
| .match_any(block_hash, &mut script_bytes.iter().map(|s| s.as_slice())) | ||
| .unwrap_or(false) | ||
| }; | ||
|
|
||
| hit | ||
| fn monitored_addresses(&self) -> Vec<Address> { | ||
| self.monitored_addresses() | ||
| } | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why dont you include the matching.rs module logic inside this method?? That way we can also take rid of the unit tests you wrote in matching.rs since they are testing the same logic the new integrations tests are testing
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Need to match for specific addresses too so moving the logic in here wouldnt work. But i dropped this and added |
||
|
|
||
| async fn transaction_effect(&self, tx: &Transaction) -> Option<(i64, Vec<String>)> { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.