From 2dfcb576edf8c362df9dabd8b051edfaef9c63fb Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Fri, 8 May 2026 13:37:27 +0200 Subject: [PATCH 1/3] fix(rs-platform-wallet): close same-UTXO concurrent-selection race MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR #3585 closes broadcast-failure atomicity on `send_to_addresses` but leaves the headline race open: two callers acquiring the wallet write lock independently still snapshot the same `spendable` set, both select the same outpoint, both broadcast, and the loser gets an opaque network rejection — never `ConcurrentSpendConflict`. Add a per-wallet `OutpointReservations` set (RAII guard, panic-safe via poisoned-mutex recovery). Coin selection consults the set under the write lock; selected outpoints are reserved before the lock drops, so a second concurrent caller sees them filtered out of its spendable snapshot and short-circuits with the new typed `NoSpendableInputs` error *before* hitting the network. Successful broadcasts transition inputs from "reserved" to "spent" via `check_core_transaction(Mempool)` under the second write lock — no observable gap. Failed broadcasts and panics release the reservation through `Drop`, so the next call picks up the released UTXOs. Adds three reservation-set unit tests plus two race-property tests: - concurrent same-UTXO sends → exactly one Ok, one `NoSpendableInputs` (the loser must NOT reach the broadcaster — that's the bug being closed) - broadcast failure releases the reservation so the same UTXO can be retried `tokio::sync::RwLock` is already in use; DET's original `!Send` blocker that forced the weaker asset-lock pattern does not apply here, so we take Option B (reservation set) over Option A (lock-held mark-spent + Err-rollback) — sidesteps building a rollback primitive in `key-wallet` that `check_core_transaction` does not expose today. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/rs-platform-wallet/src/error.rs | 6 + .../src/wallet/core/broadcast.rs | 366 +++++++++++++++++- .../rs-platform-wallet/src/wallet/core/mod.rs | 1 + .../src/wallet/core/reservations.rs | 155 ++++++++ .../src/wallet/core/wallet.rs | 7 + 5 files changed, 527 insertions(+), 8 deletions(-) create mode 100644 packages/rs-platform-wallet/src/wallet/core/reservations.rs diff --git a/packages/rs-platform-wallet/src/error.rs b/packages/rs-platform-wallet/src/error.rs index ed724b1f161..ce505753b9b 100644 --- a/packages/rs-platform-wallet/src/error.rs +++ b/packages/rs-platform-wallet/src/error.rs @@ -63,6 +63,12 @@ pub enum PlatformWalletError { #[error("Transaction builder selected an unavailable UTXO (concurrent spend); retry")] ConcurrentSpendConflict, + #[error( + "no spendable inputs available for {context} \ + (other in-flight transactions reserved the wallet's UTXOs; retry once they confirm)" + )] + NoSpendableInputs { context: String }, + #[error("Asset lock proof waiting failed: {0}")] AssetLockProofWait(String), diff --git a/packages/rs-platform-wallet/src/wallet/core/broadcast.rs b/packages/rs-platform-wallet/src/wallet/core/broadcast.rs index 732e398eb59..b71e1f21ecc 100644 --- a/packages/rs-platform-wallet/src/wallet/core/broadcast.rs +++ b/packages/rs-platform-wallet/src/wallet/core/broadcast.rs @@ -32,6 +32,24 @@ impl CoreWallet { /// Uses key-wallet's [`TransactionBuilder`] for UTXO selection, fee /// estimation, and signing. Change is sent to the next internal address /// of the specified account. + /// + /// ## Race-safety against concurrent calls on the same wallet + /// + /// Coin selection consults a per-wallet **reservation set** (see + /// [`super::reservations`]) under the write lock. Selected outpoints + /// are reserved before the lock is dropped, so a second concurrent + /// caller — which acquires the write lock after this one — sees the + /// reserved outpoints filtered out of its spendable snapshot. If that + /// leaves the second caller with insufficient inputs, it short-circuits + /// with [`PlatformWalletError::NoSpendableInputs`] *before* touching + /// the network. The reservation is held until either: + /// + /// - broadcast succeeds → `check_core_transaction(Mempool, …)` marks + /// the inputs spent under the second write lock, then the guard + /// drops; the spent transition is observable to other callers with + /// no gap, or + /// - any error path → the guard drops and the outpoints are released, + /// so a retry can pick them up again. pub async fn send_to_addresses( &self, account_type: StandardAccountType, @@ -47,7 +65,7 @@ impl CoreWallet { )); } - let (tx, xpub) = { + let (tx, xpub, _reservation) = { let mut wm = self.wallet_manager.write().await; let (wallet, info) = wm.get_wallet_and_info_mut(&self.wallet_id).ok_or_else(|| { crate::error::PlatformWalletError::WalletNotFound( @@ -76,12 +94,26 @@ impl CoreWallet { )) })?; + // Snapshot spendable UTXOs minus any in-flight reservations from + // a concurrent `send_to_addresses` on this handle. Any outpoint + // already reserved is owned by another caller's still-pending + // broadcast and must not be selected here. let spendable: Vec<_> = account .spendable_utxos(current_height) .into_iter() + .filter(|utxo| !self.reservations.contains(&utxo.outpoint)) .cloned() .collect(); + if spendable.is_empty() { + return Err(PlatformWalletError::NoSpendableInputs { + context: format!( + "{:?} account {} (all UTXOs reserved by in-flight transactions)", + account_type, account_index + ), + }); + } + let xpub = wallet_accounts .get(&account_index) .map(|a| a.account_xpub) @@ -141,7 +173,25 @@ impl CoreWallet { None }, ) - .map_err(|e| PlatformWalletError::TransactionBuild(e.to_string()))?; + .map_err(|e| { + // Insufficient/no-utxo errors from coin selection map + // to the typed `NoSpendableInputs` variant so callers + // can distinguish "race-loser" from "network rejected + // my tx". `select_inputs` wraps the underlying + // `SelectionError` in a `BuilderError`; we string-match + // here because the typed wrapper is not exposed. + let msg = e.to_string(); + if msg.contains("Insufficient funds") || msg.contains("No UTXOs available") { + PlatformWalletError::NoSpendableInputs { + context: format!( + "{:?} account {} ({})", + account_type, account_index, msg + ), + } + } else { + PlatformWalletError::TransactionBuild(msg) + } + })?; let tx = builder .build() @@ -163,7 +213,18 @@ impl CoreWallet { return Err(PlatformWalletError::ConcurrentSpendConflict); } - (tx, xpub) + // Reserve the selected outpoints *before* releasing the write + // lock. The next caller acquiring the lock will see these + // outpoints filtered out and either select disjoint inputs or + // short-circuit with `NoSpendableInputs`. + // + // The guard is held until the end of the function: success + // path drops it after `check_core_transaction` has marked the + // inputs spent (no observable gap); error paths drop it on + // unwinding the `Result`, releasing the outpoints for retry. + let reservation = self.reservations.reserve(selected.into_iter().collect()); + + (tx, xpub, reservation) }; // Broadcast first; if the network rejects we leave wallet state @@ -177,11 +238,10 @@ impl CoreWallet { // Now that the tx is in flight, register it as a mempool transaction // so subsequent callers see the inputs as spent and don't reselect - // them. The trade-off is that two callers racing between the lock - // drop above and the broadcast can both pick the same UTXOs; the - // network resolves that race exactly as it does on `v3.1-dev` - // today, but neither caller corrupts local state on a transient - // broadcast failure. + // them. The reservation guard above kept those inputs filtered out + // for concurrent callers throughout the broadcast `await`; this + // call transitions them from "reserved" to "spent" before the + // guard drops, so the spent state is observable with no gap. // // Broadcast-first semantics: by the time we get here the network has // already accepted the transaction, so the two warning paths below @@ -272,6 +332,13 @@ impl CoreWallet { } } + // Reservation guard drops here, releasing the outpoints. The + // `check_core_transaction` call above already marked them spent + // under the write lock — there is no observable gap during which + // both the reservation is gone and the spent state isn't yet + // visible. + drop(_reservation); + Ok(tx) } } @@ -409,4 +476,287 @@ mod tests { "broadcaster must be called exactly once on a successful broadcast" ); } + + // ----------------------------------------------------------------- + // Race-closing tests: same-UTXO concurrent `send_to_addresses`. + // + // The audit (`/tmp/pr3585-race-audit.md`) captures the property: + // two callers A and B must not both broadcast against the same + // outpoint. The reservation set guarantees B short-circuits with + // `NoSpendableInputs` *before* hitting the network — never with a + // `TransactionBroadcast` failure (that would mean B reached the + // broadcaster, which is exactly the bug being closed). + // ----------------------------------------------------------------- + + use std::collections::BTreeMap; + use std::time::Duration; + + use dashcore::hashes::Hash; + use dashcore::{Address as DashAddress, OutPoint, TxOut}; + use key_wallet::wallet::initialization::WalletAccountCreationOptions; + use key_wallet::wallet::Wallet; + use key_wallet::Utxo; + use tokio::sync::Notify; + + use crate::wallet::platform_wallet::PlatformWalletInfo; + use key_wallet::wallet::managed_wallet_info::ManagedWalletInfo; + + /// Mock broadcaster that gates the broadcast on an external `Notify`. + /// Lets the test pin caller A inside its `await` while caller B + /// races for the wallet's write lock. + struct GatedBroadcaster { + gate: Arc, + succeed: bool, + } + + #[async_trait] + impl TransactionBroadcaster for GatedBroadcaster { + async fn broadcast(&self, transaction: &Transaction) -> Result { + // Wait until the test signals the gate. Allows the test to + // observe the wallet state mid-broadcast (specifically: the + // reservation set populated, the input not yet marked spent). + self.gate.notified().await; + if self.succeed { + Ok(transaction.txid()) + } else { + Err(PlatformWalletError::TransactionBroadcast( + "mock failure".to_string(), + )) + } + } + } + + /// Always-failing mock broadcaster — used to assert that a failed + /// broadcast releases the reservation so a retry can pick up the + /// same UTXO. + struct FailingBroadcaster; + + #[async_trait] + impl TransactionBroadcaster for FailingBroadcaster { + async fn broadcast(&self, _transaction: &Transaction) -> Result { + Err(PlatformWalletError::TransactionBroadcast( + "always fails".to_string(), + )) + } + } + + /// Build a single-wallet `WalletManager` containing one BIP-44 + /// account (index 0) funded with one large UTXO at the account's + /// first receive address. Returns the wallet manager handle, the + /// wallet id, and a recipient address (a separate derived address + /// in the same account — funding/sending to the same address is + /// not the property under test). + fn build_funded_wallet_manager( + utxo_value: u64, + ) -> ( + Arc>>, + crate::wallet::platform_wallet::WalletId, + DashAddress, + ) { + let wallet = Wallet::new_random(Network::Testnet, WalletAccountCreationOptions::Default) + .expect("test wallet"); + + let xpub = wallet + .accounts + .standard_bip44_accounts + .get(&0) + .expect("bip44 account 0") + .account_xpub; + let mut wallet_info = ManagedWalletInfo::from_wallet(&wallet, 0); + + // Lift the synced height well past the UTXO height so the + // `min_confirmations >= 1` filter in coin selection accepts the + // UTXO. Without this, the UTXO appears in `spendable_utxos` (its + // own `is_spendable` check passes) but `select_coins_with_size` + // filters it out via the confirmation-count guard. + use key_wallet::wallet::managed_wallet_info::wallet_info_interface::WalletInfoInterface as _; + wallet_info.update_synced_height(100); + + let funding_address = wallet_info + .accounts + .standard_bip44_accounts + .get_mut(&0) + .expect("managed bip44 account 0") + .next_receive_address(Some(&xpub), true) + .expect("derive receive address"); + + let outpoint = OutPoint::new(Txid::from_byte_array([7u8; 32]), 0); + let mut utxo = Utxo::new( + outpoint, + TxOut { + value: utxo_value, + script_pubkey: funding_address.script_pubkey(), + }, + funding_address, + 1, + false, + ); + utxo.is_confirmed = true; + wallet_info + .accounts + .standard_bip44_accounts + .get_mut(&0) + .expect("managed bip44 account 0") + .utxos + .insert(outpoint, utxo); + + let info = PlatformWalletInfo { + core_wallet: wallet_info, + balance: Arc::new(WalletBalance::new()), + identity_manager: crate::wallet::identity::IdentityManager::new(), + tracked_asset_locks: BTreeMap::new(), + }; + + let mut wm: WalletManager = WalletManager::new(Network::Testnet); + let wallet_id = wm.insert_wallet(wallet, info).expect("insert"); + + // Recipient — use the second receive address as a stable target. + let recipient = { + let info = wm.get_wallet_info_mut(&wallet_id).expect("info"); + info.core_wallet + .accounts + .standard_bip44_accounts + .get_mut(&0) + .expect("acc") + .next_receive_address(Some(&xpub), true) + .expect("derive recipient") + }; + + (Arc::new(RwLock::new(wm)), wallet_id, recipient) + } + + fn make_core_wallet_for_manager( + wm: Arc>>, + wallet_id: crate::wallet::platform_wallet::WalletId, + broadcaster: Arc, + ) -> CoreWallet { + let sdk = Arc::new(dash_sdk::SdkBuilder::new_mock().build().expect("mock sdk")); + CoreWallet::new( + sdk, + wm, + wallet_id, + broadcaster, + Arc::new(WalletBalance::new()), + ) + } + + /// Race test — the headline property: two concurrent `send_to_addresses` + /// calls on the same wallet, against the same single spendable UTXO, + /// must yield exactly one network broadcast. The loser must short-circuit + /// with [`PlatformWalletError::NoSpendableInputs`] *before* hitting the + /// network — i.e. the loser's error must NOT be a `TransactionBroadcast` + /// failure (that would mean it reached the broadcaster, which is exactly + /// the bug we're closing). + #[tokio::test] + async fn concurrent_same_utxo_sends_resolve_via_reservation_set() { + use key_wallet::account::account_type::StandardAccountType; + + let (wm, wallet_id, recipient) = build_funded_wallet_manager(2_000_000); + let gate = Arc::new(Notify::new()); + let broadcaster: Arc = Arc::new(GatedBroadcaster { + gate: Arc::clone(&gate), + succeed: true, + }); + let core = make_core_wallet_for_manager(wm, wallet_id, broadcaster); + + let send_value = 100_000; + let outputs_a = vec![(recipient.clone(), send_value)]; + let outputs_b = vec![(recipient.clone(), send_value)]; + + // Spawn caller A. It will reserve the only spendable outpoint + // under the wallet write lock, drop the lock, and block on the + // broadcast `Notify`. + let core_a = core.clone(); + let a_handle = tokio::spawn(async move { + core_a + .send_to_addresses(StandardAccountType::BIP44Account, 0, outputs_a) + .await + }); + + // Give A enough scheduler time to acquire the lock, build the + // tx, reserve the outpoint, and reach the gated broadcast. + // The property is monotonic — once A is in the broadcast + // `await`, the reservation is in place forever until the gate + // fires. + tokio::time::sleep(Duration::from_millis(50)).await; + + // Caller B starts now. The wallet's only UTXO is reserved by A, + // so B's spendable snapshot is empty → `NoSpendableInputs`. + let b_result = core + .send_to_addresses(StandardAccountType::BIP44Account, 0, outputs_b) + .await; + + match &b_result { + Err(PlatformWalletError::NoSpendableInputs { context }) => { + assert!( + context.contains("reserved") + || context.contains("Insufficient") + || context.contains("No UTXOs"), + "B's NoSpendableInputs context should mention reservation \ + or insufficient/no-utxos; got: {context}" + ); + } + other => panic!( + "B must short-circuit with NoSpendableInputs (the race-loser \ + must not reach the broadcaster); got: {other:?}" + ), + } + + // Now release A's broadcast. + gate.notify_one(); + + let a_result = a_handle.await.expect("a task panicked"); + assert!( + a_result.is_ok(), + "A must succeed once its broadcast gate fires; got: {a_result:?}" + ); + } + + /// On broadcast failure, the reservation must be released so the + /// caller can retry. This is the regression-tripwire for the + /// reservation guard's Drop semantics. + #[tokio::test] + async fn broadcast_failure_releases_reservation_for_retry() { + use key_wallet::account::account_type::StandardAccountType; + + let (wm, wallet_id, recipient) = build_funded_wallet_manager(2_000_000); + let broadcaster: Arc = Arc::new(FailingBroadcaster); + let core = make_core_wallet_for_manager(wm, wallet_id, broadcaster); + + let outputs = vec![(recipient.clone(), 100_000)]; + + // First call fails at the broadcast step → guard drops → + // reservation released. The change-address index is also rolled + // back by virtue of #3585's peek-then-commit pattern. + let first = core + .send_to_addresses(StandardAccountType::BIP44Account, 0, outputs.clone()) + .await; + assert!( + matches!(first, Err(PlatformWalletError::TransactionBroadcast(_))), + "first call must surface broadcast failure; got: {first:?}" + ); + + // Reservation set is now empty — verifiable through behaviour: + // a second call sees the same UTXO as spendable again. We + // can't broadcast successfully (broadcaster always fails) but + // the second call must reach the broadcaster, not short-circuit + // with `NoSpendableInputs` (which would mean the reservation + // leaked). + let second = core + .send_to_addresses(StandardAccountType::BIP44Account, 0, outputs) + .await; + match second { + Err(PlatformWalletError::TransactionBroadcast(_)) => { + // Expected — reservation released, coin selection + // succeeded, broadcaster rejected as designed. + } + Err(PlatformWalletError::NoSpendableInputs { .. }) => { + panic!( + "reservation leaked after broadcast failure — second \ + call should have selected the released UTXO" + ); + } + other => panic!("unexpected second call result: {other:?}"), + } + } } diff --git a/packages/rs-platform-wallet/src/wallet/core/mod.rs b/packages/rs-platform-wallet/src/wallet/core/mod.rs index 106a4108c22..e068dfacb4d 100644 --- a/packages/rs-platform-wallet/src/wallet/core/mod.rs +++ b/packages/rs-platform-wallet/src/wallet/core/mod.rs @@ -1,6 +1,7 @@ pub mod balance; pub mod balance_handler; mod broadcast; +mod reservations; pub mod wallet; pub use balance::WalletBalance; diff --git a/packages/rs-platform-wallet/src/wallet/core/reservations.rs b/packages/rs-platform-wallet/src/wallet/core/reservations.rs new file mode 100644 index 00000000000..d739cce866b --- /dev/null +++ b/packages/rs-platform-wallet/src/wallet/core/reservations.rs @@ -0,0 +1,155 @@ +//! Per-wallet outpoint reservation set. +//! +//! Closes the same-UTXO concurrent-selection race in +//! [`CoreWallet::send_to_addresses`](super::broadcast). Two callers racing +//! for the same wallet take the write lock independently — between dropping +//! that lock and finishing the network broadcast, neither has yet marked +//! its inputs spent in `ManagedWalletInfo`. Without a reservation set, both +//! select the same UTXOs and one must lose at the network layer with an +//! opaque broadcast rejection. +//! +//! With this set, the **first** caller adds its selected outpoints to the +//! reservations under the write lock; the **second** caller — also under +//! the write lock — sees those outpoints filtered out of its spendable +//! snapshot and short-circuits with a typed +//! [`PlatformWalletError::NoSpendableInputs`](crate::PlatformWalletError) +//! before ever touching the network. +//! +//! ## Lifetime +//! +//! Reservations are held by an RAII [`OutpointReservationGuard`]. On drop +//! (success, error, or panic) the outpoints are released. A successful +//! broadcast is reconciled by `check_core_transaction(Mempool, …)` *before* +//! the guard drops, so the inputs transition from "reserved" to "spent" with +//! no observable gap to other callers on the same wallet handle. +//! +//! ## Scope +//! +//! Reservations are **per-wallet-instance**. Multiple `PlatformWallet`s in +//! the same process do not false-conflict; multiple processes sharing a +//! wallet on disk are out of scope (the existing `tokio::sync::RwLock` +//! already does not protect against that case). + +use std::collections::HashSet; +use std::sync::{Arc, Mutex}; + +use dashcore::OutPoint; + +/// Per-wallet set of outpoints that have been selected for an in-flight +/// broadcast but not yet marked spent in `ManagedWalletInfo`. +/// +/// Cheaply cloneable: holds an `Arc>` internally. All clones share +/// the same set. +#[derive(Debug, Default, Clone)] +pub(crate) struct OutpointReservations { + inner: Arc>>, +} + +impl OutpointReservations { + /// Create an empty reservation set. + pub(crate) fn new() -> Self { + Self::default() + } + + /// Test whether `outpoint` is currently reserved. + /// + /// Used by coin selection to filter out in-flight outpoints from the + /// spendable snapshot. + pub(crate) fn contains(&self, outpoint: &OutPoint) -> bool { + let guard = self + .inner + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + guard.contains(outpoint) + } + + /// Reserve `outpoints`, returning an RAII guard that releases them on + /// drop. The guard must be held until the broadcast outcome is + /// reconciled into wallet state (success → `check_core_transaction` + /// has run; failure → caller has propagated the error). + pub(crate) fn reserve(&self, outpoints: Vec) -> OutpointReservationGuard { + { + let mut guard = self + .inner + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + for op in &outpoints { + guard.insert(*op); + } + } + OutpointReservationGuard { + reservations: Arc::clone(&self.inner), + outpoints, + } + } +} + +/// RAII guard releasing reservations on drop. +/// +/// Drop is infallible and panic-safe — the underlying `Mutex` is recovered +/// from poisoning so a panicking caller still releases its outpoints. +#[must_use = "dropping the guard immediately releases the reservation"] +pub(crate) struct OutpointReservationGuard { + reservations: Arc>>, + outpoints: Vec, +} + +impl Drop for OutpointReservationGuard { + fn drop(&mut self) { + let mut guard = self + .reservations + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + for op in &self.outpoints { + guard.remove(op); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use dashcore::hashes::Hash; + use dashcore::Txid; + + fn op(n: u32) -> OutPoint { + OutPoint::new(Txid::all_zeros(), n) + } + + #[test] + fn reserve_then_drop_releases() { + let res = OutpointReservations::new(); + let a = op(1); + { + let _g = res.reserve(vec![a]); + assert!(res.contains(&a)); + } + assert!(!res.contains(&a)); + } + + #[test] + fn second_reservation_is_disjoint() { + let res = OutpointReservations::new(); + let a = op(1); + let b = op(2); + let _g1 = res.reserve(vec![a]); + let _g2 = res.reserve(vec![b]); + assert!(res.contains(&a)); + assert!(res.contains(&b)); + } + + #[test] + fn poisoned_mutex_still_releases() { + let res = OutpointReservations::new(); + let a = op(7); + let res_clone = res.clone(); + let _ = std::thread::spawn(move || { + let _g = res_clone.reserve(vec![a]); + panic!("intentional"); + }) + .join(); + // Guard dropped during unwind — outpoint must be released even + // though the mutex was poisoned. + assert!(!res.contains(&a)); + } +} diff --git a/packages/rs-platform-wallet/src/wallet/core/wallet.rs b/packages/rs-platform-wallet/src/wallet/core/wallet.rs index 5a29db29002..83a4a662a88 100644 --- a/packages/rs-platform-wallet/src/wallet/core/wallet.rs +++ b/packages/rs-platform-wallet/src/wallet/core/wallet.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use super::balance::WalletBalance; +use super::reservations::OutpointReservations; use dashcore::Address as DashAddress; use tokio::sync::RwLock; @@ -31,6 +32,10 @@ pub struct CoreWallet { pub(crate) broadcaster: Arc, /// Lock-free balance for UI reads. balance: Arc, + /// Outpoints currently reserved by an in-flight `send_to_addresses` + /// call on this handle. Closes the same-UTXO concurrent-selection + /// race — see [`super::reservations`]. + pub(crate) reservations: OutpointReservations, } impl CoreWallet { @@ -47,6 +52,7 @@ impl CoreWallet { wallet_id, broadcaster, balance, + reservations: OutpointReservations::new(), } } @@ -244,6 +250,7 @@ impl Clone for CoreWallet { wallet_id: self.wallet_id, broadcaster: Arc::clone(&self.broadcaster), balance: Arc::clone(&self.balance), + reservations: self.reservations.clone(), } } } From 9ab8e6869b14781624a9e1a71c53de5b8c829156 Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Fri, 8 May 2026 14:28:52 +0200 Subject: [PATCH 2/3] docs(rs-platform-wallet): tighten comments in race-close (#3622 review) Drop obvious comments, cap non-obvious ones at 3 lines per reviewer feedback. No behavior change. Co-Authored-By: Claude Sonnet 4.6 --- .../src/wallet/core/broadcast.rs | 163 ++++-------------- .../src/wallet/core/reservations.rs | 36 +--- 2 files changed, 41 insertions(+), 158 deletions(-) diff --git a/packages/rs-platform-wallet/src/wallet/core/broadcast.rs b/packages/rs-platform-wallet/src/wallet/core/broadcast.rs index b71e1f21ecc..bdf0ff3d61f 100644 --- a/packages/rs-platform-wallet/src/wallet/core/broadcast.rs +++ b/packages/rs-platform-wallet/src/wallet/core/broadcast.rs @@ -29,27 +29,11 @@ impl CoreWallet { /// Build, sign, and broadcast a payment to the given addresses. /// - /// Uses key-wallet's [`TransactionBuilder`] for UTXO selection, fee - /// estimation, and signing. Change is sent to the next internal address - /// of the specified account. - /// - /// ## Race-safety against concurrent calls on the same wallet - /// - /// Coin selection consults a per-wallet **reservation set** (see - /// [`super::reservations`]) under the write lock. Selected outpoints - /// are reserved before the lock is dropped, so a second concurrent - /// caller — which acquires the write lock after this one — sees the - /// reserved outpoints filtered out of its spendable snapshot. If that - /// leaves the second caller with insufficient inputs, it short-circuits - /// with [`PlatformWalletError::NoSpendableInputs`] *before* touching - /// the network. The reservation is held until either: - /// - /// - broadcast succeeds → `check_core_transaction(Mempool, …)` marks - /// the inputs spent under the second write lock, then the guard - /// drops; the spent transition is observable to other callers with - /// no gap, or - /// - any error path → the guard drops and the outpoints are released, - /// so a retry can pick them up again. + /// Uses key-wallet's [`TransactionBuilder`] for UTXO selection, fee estimation, and signing. + /// Change is sent to the next internal address of the specified account. Concurrent calls on + /// the same wallet handle are race-safe via the reservation set in [`super::reservations`]: + /// the second caller short-circuits with [`PlatformWalletError::NoSpendableInputs`] before + /// touching the network if all UTXOs are reserved by an in-flight broadcast. pub async fn send_to_addresses( &self, account_type: StandardAccountType, @@ -174,12 +158,9 @@ impl CoreWallet { }, ) .map_err(|e| { - // Insufficient/no-utxo errors from coin selection map - // to the typed `NoSpendableInputs` variant so callers - // can distinguish "race-loser" from "network rejected - // my tx". `select_inputs` wraps the underlying - // `SelectionError` in a `BuilderError`; we string-match - // here because the typed wrapper is not exposed. + // Map coin-selection failures to `NoSpendableInputs` so callers can + // distinguish a race-loser from a network rejection. String-matching because + // the underlying `SelectionError` is not exposed through `BuilderError`. let msg = e.to_string(); if msg.contains("Insufficient funds") || msg.contains("No UTXOs available") { PlatformWalletError::NoSpendableInputs { @@ -197,11 +178,8 @@ impl CoreWallet { .build() .map_err(|e| PlatformWalletError::TransactionBuild(e.to_string()))?; - // Defense-in-depth: by builder contract `tx.input` outpoints are - // a subset of the height-aware `spendable` set we passed to - // `select_inputs`, so this branch is unreachable in normal - // operation. Marking inputs spent is deferred to after broadcast - // (see #3466) regardless. + // Defense-in-depth: unreachable under normal builder contract but guards against + // a future regression where `select_inputs` picks an outpoint outside `spendable`. let selected: BTreeSet = tx.input.iter().map(|txin| txin.previous_output).collect(); let spendable_outpoints: BTreeSet = @@ -213,70 +191,27 @@ impl CoreWallet { return Err(PlatformWalletError::ConcurrentSpendConflict); } - // Reserve the selected outpoints *before* releasing the write - // lock. The next caller acquiring the lock will see these - // outpoints filtered out and either select disjoint inputs or - // short-circuit with `NoSpendableInputs`. - // - // The guard is held until the end of the function: success - // path drops it after `check_core_transaction` has marked the - // inputs spent (no observable gap); error paths drop it on - // unwinding the `Result`, releasing the outpoints for retry. + // Reserve before releasing the lock so the next caller sees these outpoints + // filtered out. Guard held until `check_core_transaction` marks them spent + // (success) or the error unwinds (failure → outpoints released for retry). let reservation = self.reservations.reserve(selected.into_iter().collect()); (tx, xpub, reservation) }; - // Broadcast first; if the network rejects we leave wallet state - // untouched so the caller can retry without manual sync repair. - // This is intentional even if the remote accepted the transaction - // but the broadcast path returned an error: in that ambiguous case - // later attempts may reuse the same inputs locally, but the network - // rejects the duplicate spend instead of us marking UTXOs spent for - // a transaction that might not have propagated. + // Broadcast first — on error we leave wallet state untouched so the caller can retry. + // If the network accepted but the call errored (ambiguous outcome), a retry will be + // rejected as a duplicate spend rather than us marking UTXOs spent prematurely. self.broadcast_transaction(&tx).await?; - // Now that the tx is in flight, register it as a mempool transaction - // so subsequent callers see the inputs as spent and don't reselect - // them. The reservation guard above kept those inputs filtered out - // for concurrent callers throughout the broadcast `await`; this - // call transitions them from "reserved" to "spent" before the - // guard drops, so the spent state is observable with no gap. - // - // Broadcast-first semantics: by the time we get here the network has - // already accepted the transaction, so the two warning paths below - // intentionally do NOT convert into a post-success `Err`. They - // simply mean local wallet state did not get updated to reflect the - // mempool spend / change output. Recovery in both cases: - // - // * The next `send_to_addresses` from the same handle may reselect - // the same UTXOs because they still look spendable locally. That - // follow-up transaction will be rejected by the network as a - // duplicate spend (the broadcaster surfaces that as an error to - // the caller), so funds are never double-spent on-chain. - // * Once mempool/block sync catches up, the wallet will see the - // original transaction and reconcile its UTXO set, after which - // subsequent sends pick up the correct change outputs. - // - // The two cases differ in what they imply: - // - // * `!check_result.is_relevant` is the expected transient: the - // wallet just hasn't ingested the tx yet (or some derivation - // path/script is unrecognised), and a later sync will fix it. - // * The `else` branch (wallet missing in the manager) is NOT a - // normal transient — the broadcast succeeded against a - // `CoreWallet` handle whose underlying wallet entry is gone - // from the manager. That is a broken/inconsistent local handle - // and the warning exists so operators can spot it; future - // sends through the same handle will keep failing the lookup - // above and surface a clean `WalletNotFound` error. + // Mark inputs spent under the write lock, transitioning them from "reserved" to "spent" + // before the reservation guard drops — no observable gap for concurrent callers. + // Warning paths below do NOT return Err: the network already accepted the tx. { let mut wm = self.wallet_manager.write().await; if let Some((wallet, info)) = wm.get_wallet_mut_and_info_mut(&self.wallet_id) { - // Broadcast succeeded — commit the change-address advance now - // so a future send picks up a fresh index. Doing this before - // the broadcast would burn a derivation index on a network - // rejection, widening the gap-limit window on retry. + // Commit the change-address advance post-broadcast; doing it before would burn + // a derivation index on network rejection, widening the gap-limit window. let change_account = match account_type { StandardAccountType::BIP44Account => info .core_wallet @@ -309,10 +244,8 @@ impl CoreWallet { .check_core_transaction(&tx, TransactionContext::Mempool, wallet, true, true) .await; if !check_result.is_relevant { - // CMT-004: own-built tx unrecognised by our own checker - // is an internal-invariant violation, not a transient. - // Structured `error!` with stable fields so operators can - // alert independent of message text. + // CMT-004: own-built tx unrecognised by our checker — internal invariant + // violation, not a transient. Stable event field for operator alerting. tracing::error!( target: "platform_wallet::broadcast", event = "post_broadcast_unrelated_to_own_wallet", @@ -332,11 +265,8 @@ impl CoreWallet { } } - // Reservation guard drops here, releasing the outpoints. The - // `check_core_transaction` call above already marked them spent - // under the write lock — there is no observable gap during which - // both the reservation is gone and the spent state isn't yet - // visible. + // Explicit drop: inputs are already marked spent above; no gap between + // "reservation released" and "spent visible" to concurrent callers. drop(_reservation); Ok(tx) @@ -477,16 +407,9 @@ mod tests { ); } - // ----------------------------------------------------------------- // Race-closing tests: same-UTXO concurrent `send_to_addresses`. - // - // The audit (`/tmp/pr3585-race-audit.md`) captures the property: - // two callers A and B must not both broadcast against the same - // outpoint. The reservation set guarantees B short-circuits with - // `NoSpendableInputs` *before* hitting the network — never with a - // `TransactionBroadcast` failure (that would mean B reached the - // broadcaster, which is exactly the bug being closed). - // ----------------------------------------------------------------- + // B must short-circuit with `NoSpendableInputs` before the network — a `TransactionBroadcast` + // failure from B would mean the bug is still open. use std::collections::BTreeMap; use std::time::Duration; @@ -564,11 +487,8 @@ mod tests { .account_xpub; let mut wallet_info = ManagedWalletInfo::from_wallet(&wallet, 0); - // Lift the synced height well past the UTXO height so the - // `min_confirmations >= 1` filter in coin selection accepts the - // UTXO. Without this, the UTXO appears in `spendable_utxos` (its - // own `is_spendable` check passes) but `select_coins_with_size` - // filters it out via the confirmation-count guard. + // Height must be well past UTXO height: `select_coins_with_size` enforces + // `min_confirmations >= 1`, which requires synced_height > utxo_height. use key_wallet::wallet::managed_wallet_info::wallet_info_interface::WalletInfoInterface as _; wallet_info.update_synced_height(100); @@ -640,13 +560,9 @@ mod tests { ) } - /// Race test — the headline property: two concurrent `send_to_addresses` - /// calls on the same wallet, against the same single spendable UTXO, - /// must yield exactly one network broadcast. The loser must short-circuit - /// with [`PlatformWalletError::NoSpendableInputs`] *before* hitting the - /// network — i.e. the loser's error must NOT be a `TransactionBroadcast` - /// failure (that would mean it reached the broadcaster, which is exactly - /// the bug we're closing). + /// Two concurrent `send_to_addresses` calls on one wallet with one UTXO must yield exactly + /// one broadcast. The loser must get [`PlatformWalletError::NoSpendableInputs`] — never + /// `TransactionBroadcast` (that would mean it reached the network, which is the bug closed). #[tokio::test] async fn concurrent_same_utxo_sends_resolve_via_reservation_set() { use key_wallet::account::account_type::StandardAccountType; @@ -673,11 +589,8 @@ mod tests { .await }); - // Give A enough scheduler time to acquire the lock, build the - // tx, reserve the outpoint, and reach the gated broadcast. - // The property is monotonic — once A is in the broadcast - // `await`, the reservation is in place forever until the gate - // fires. + // Let A acquire the lock, reserve the outpoint, and block on the gate. + // Monotonic property: once A is inside the broadcast `await`, the reservation holds. tokio::time::sleep(Duration::from_millis(50)).await; // Caller B starts now. The wallet's only UTXO is reserved by A, @@ -736,12 +649,8 @@ mod tests { "first call must surface broadcast failure; got: {first:?}" ); - // Reservation set is now empty — verifiable through behaviour: - // a second call sees the same UTXO as spendable again. We - // can't broadcast successfully (broadcaster always fails) but - // the second call must reach the broadcaster, not short-circuit - // with `NoSpendableInputs` (which would mean the reservation - // leaked). + // Reservation released: the second call must reach the broadcaster (same UTXO visible), + // not short-circuit with `NoSpendableInputs` (which would indicate a leaked reservation). let second = core .send_to_addresses(StandardAccountType::BIP44Account, 0, outputs) .await; diff --git a/packages/rs-platform-wallet/src/wallet/core/reservations.rs b/packages/rs-platform-wallet/src/wallet/core/reservations.rs index d739cce866b..88dba7afd11 100644 --- a/packages/rs-platform-wallet/src/wallet/core/reservations.rs +++ b/packages/rs-platform-wallet/src/wallet/core/reservations.rs @@ -1,34 +1,9 @@ -//! Per-wallet outpoint reservation set. +//! Per-wallet outpoint reservation set for [`CoreWallet::send_to_addresses`](super::broadcast). //! -//! Closes the same-UTXO concurrent-selection race in -//! [`CoreWallet::send_to_addresses`](super::broadcast). Two callers racing -//! for the same wallet take the write lock independently — between dropping -//! that lock and finishing the network broadcast, neither has yet marked -//! its inputs spent in `ManagedWalletInfo`. Without a reservation set, both -//! select the same UTXOs and one must lose at the network layer with an -//! opaque broadcast rejection. -//! -//! With this set, the **first** caller adds its selected outpoints to the -//! reservations under the write lock; the **second** caller — also under -//! the write lock — sees those outpoints filtered out of its spendable -//! snapshot and short-circuits with a typed -//! [`PlatformWalletError::NoSpendableInputs`](crate::PlatformWalletError) -//! before ever touching the network. -//! -//! ## Lifetime -//! -//! Reservations are held by an RAII [`OutpointReservationGuard`]. On drop -//! (success, error, or panic) the outpoints are released. A successful -//! broadcast is reconciled by `check_core_transaction(Mempool, …)` *before* -//! the guard drops, so the inputs transition from "reserved" to "spent" with -//! no observable gap to other callers on the same wallet handle. -//! -//! ## Scope -//! -//! Reservations are **per-wallet-instance**. Multiple `PlatformWallet`s in -//! the same process do not false-conflict; multiple processes sharing a -//! wallet on disk are out of scope (the existing `tokio::sync::RwLock` -//! already does not protect against that case). +//! Closes the same-UTXO concurrent-selection race: the first caller reserves its selected +//! outpoints under the write lock; subsequent callers filter them out and short-circuit with +//! [`PlatformWalletError::NoSpendableInputs`](crate::PlatformWalletError) before hitting the +//! network. Reservations are released by an RAII guard on success, error, or panic. use std::collections::HashSet; use std::sync::{Arc, Mutex}; @@ -46,7 +21,6 @@ pub(crate) struct OutpointReservations { } impl OutpointReservations { - /// Create an empty reservation set. pub(crate) fn new() -> Self { Self::default() } From 86d599e52684088a4af306ce77181e7a5f4a3bba Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Fri, 8 May 2026 14:36:58 +0200 Subject: [PATCH 3/3] refactor(rs-platform-wallet): tighten race-close test + reservation snapshot (#3622 review) - Replace sleep-based race coordination with Notify handshake. - Assert broadcaster called exactly once across concurrent senders. - Snapshot reservation set once for spendable filter (drop per-UTXO lock). - Pin BuilderError error-text regression test; flag typed-wrapper TODO. --- .../src/wallet/core/broadcast.rs | 79 ++++++++++++++----- .../src/wallet/core/reservations.rs | 16 +++- 2 files changed, 74 insertions(+), 21 deletions(-) diff --git a/packages/rs-platform-wallet/src/wallet/core/broadcast.rs b/packages/rs-platform-wallet/src/wallet/core/broadcast.rs index bdf0ff3d61f..636427244d2 100644 --- a/packages/rs-platform-wallet/src/wallet/core/broadcast.rs +++ b/packages/rs-platform-wallet/src/wallet/core/broadcast.rs @@ -79,13 +79,13 @@ impl CoreWallet { })?; // Snapshot spendable UTXOs minus any in-flight reservations from - // a concurrent `send_to_addresses` on this handle. Any outpoint - // already reserved is owned by another caller's still-pending - // broadcast and must not be selected here. + // a concurrent `send_to_addresses` on this handle. Single lock + // acquisition for the whole filter pass. + let reserved = self.reservations.snapshot(); let spendable: Vec<_> = account .spendable_utxos(current_height) .into_iter() - .filter(|utxo| !self.reservations.contains(&utxo.outpoint)) + .filter(|utxo| !reserved.contains(&utxo.outpoint)) .cloned() .collect(); @@ -158,9 +158,9 @@ impl CoreWallet { }, ) .map_err(|e| { - // Map coin-selection failures to `NoSpendableInputs` so callers can - // distinguish a race-loser from a network rejection. String-matching because - // the underlying `SelectionError` is not exposed through `BuilderError`. + // Map coin-selection failures to `NoSpendableInputs`. String-match pinned by + // `builder_error_text_contract_for_no_inputs`. + // TODO(typed-wrapper): drop once upstream exposes `SelectionError` typed. let msg = e.to_string(); if msg.contains("Insufficient funds") || msg.contains("No UTXOs available") { PlatformWalletError::NoSpendableInputs { @@ -412,7 +412,6 @@ mod tests { // failure from B would mean the bug is still open. use std::collections::BTreeMap; - use std::time::Duration; use dashcore::hashes::Hash; use dashcore::{Address as DashAddress, OutPoint, TxOut}; @@ -425,19 +424,20 @@ mod tests { use key_wallet::wallet::managed_wallet_info::ManagedWalletInfo; /// Mock broadcaster that gates the broadcast on an external `Notify`. - /// Lets the test pin caller A inside its `await` while caller B - /// races for the wallet's write lock. + /// `entered` fires the moment `broadcast()` is awaited — by then the + /// caller has reserved its outpoints and dropped the wallet write lock. struct GatedBroadcaster { gate: Arc, + entered: Arc, + calls: AtomicUsize, succeed: bool, } #[async_trait] impl TransactionBroadcaster for GatedBroadcaster { async fn broadcast(&self, transaction: &Transaction) -> Result { - // Wait until the test signals the gate. Allows the test to - // observe the wallet state mid-broadcast (specifically: the - // reservation set populated, the input not yet marked spent). + self.calls.fetch_add(1, Ordering::SeqCst); + self.entered.notify_one(); self.gate.notified().await; if self.succeed { Ok(transaction.txid()) @@ -569,11 +569,18 @@ mod tests { let (wm, wallet_id, recipient) = build_funded_wallet_manager(2_000_000); let gate = Arc::new(Notify::new()); - let broadcaster: Arc = Arc::new(GatedBroadcaster { + let entered = Arc::new(Notify::new()); + let broadcaster = Arc::new(GatedBroadcaster { gate: Arc::clone(&gate), + entered: Arc::clone(&entered), + calls: AtomicUsize::new(0), succeed: true, }); - let core = make_core_wallet_for_manager(wm, wallet_id, broadcaster); + let core = make_core_wallet_for_manager( + wm, + wallet_id, + Arc::clone(&broadcaster) as Arc, + ); let send_value = 100_000; let outputs_a = vec![(recipient.clone(), send_value)]; @@ -589,9 +596,9 @@ mod tests { .await }); - // Let A acquire the lock, reserve the outpoint, and block on the gate. - // Monotonic property: once A is inside the broadcast `await`, the reservation holds. - tokio::time::sleep(Duration::from_millis(50)).await; + // Deterministic handshake: wait until A has reached the broadcast gate. + // By that point A has reserved the outpoint and dropped the wallet write lock. + entered.notified().await; // Caller B starts now. The wallet's only UTXO is reserved by A, // so B's spendable snapshot is empty → `NoSpendableInputs`. @@ -623,6 +630,13 @@ mod tests { a_result.is_ok(), "A must succeed once its broadcast gate fires; got: {a_result:?}" ); + + // Pin "loser never reached the network" directly: only A invoked the broadcaster. + assert_eq!( + broadcaster.calls.load(Ordering::SeqCst), + 1, + "broadcaster must be called exactly once across both concurrent senders" + ); } /// On broadcast failure, the reservation must be released so the @@ -668,4 +682,33 @@ mod tests { other => panic!("unexpected second call result: {other:?}"), } } + + /// Pins the upstream error text the production string-match in + /// `send_to_addresses` depends on. If `key-wallet` ever rephrases + /// "Insufficient funds" / "No UTXOs available", this test breaks + /// loudly so the matcher can be updated (or, ideally, replaced + /// with a typed `SelectionError` once upstream exposes it). + #[test] + fn builder_error_text_contract_for_no_inputs() { + use key_wallet::wallet::managed_wallet_info::coin_selection::SelectionStrategy; + use key_wallet::wallet::managed_wallet_info::transaction_builder::TransactionBuilder; + + let (_, _, recipient) = build_funded_wallet_manager(2_000_000); + + let result = TransactionBuilder::new() + .add_output(&recipient, 100_000) + .expect("add_output") + .select_inputs(&[], SelectionStrategy::LargestFirst, 100, |_| None); + + let err = match result { + Ok(_) => panic!("empty UTXO slice must fail coin selection"), + Err(e) => e, + }; + let msg = err.to_string(); + assert!( + msg.contains("Insufficient funds") || msg.contains("No UTXOs available"), + "production string-match in send_to_addresses depends on these tokens; \ + got: {msg}" + ); + } } diff --git a/packages/rs-platform-wallet/src/wallet/core/reservations.rs b/packages/rs-platform-wallet/src/wallet/core/reservations.rs index 88dba7afd11..070c60e96a3 100644 --- a/packages/rs-platform-wallet/src/wallet/core/reservations.rs +++ b/packages/rs-platform-wallet/src/wallet/core/reservations.rs @@ -26,9 +26,7 @@ impl OutpointReservations { } /// Test whether `outpoint` is currently reserved. - /// - /// Used by coin selection to filter out in-flight outpoints from the - /// spendable snapshot. + #[cfg(test)] pub(crate) fn contains(&self, outpoint: &OutPoint) -> bool { let guard = self .inner @@ -37,6 +35,18 @@ impl OutpointReservations { guard.contains(outpoint) } + /// Clone the current reservation set under a single lock acquisition. + /// + /// Callers filter spendable UTXOs against the returned snapshot to + /// avoid one mutex lock per candidate outpoint. + pub(crate) fn snapshot(&self) -> HashSet { + let guard = self + .inner + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + guard.clone() + } + /// Reserve `outpoints`, returning an RAII guard that releases them on /// drop. The guard must be held until the broadcast outcome is /// reconciled into wallet state (success → `check_core_transaction`