diff --git a/fuzz/src/lsps_message.rs b/fuzz/src/lsps_message.rs index 8ff85d0fc24..5124b74b8bd 100644 --- a/fuzz/src/lsps_message.rs +++ b/fuzz/src/lsps_message.rs @@ -21,7 +21,7 @@ use lightning::util::test_utils::{ }; use lightning_liquidity::lsps0::ser::LSPS_MESSAGE_TYPE_ID; -use lightning_liquidity::LiquidityManagerSync; +use lightning_liquidity::{DummyOnionMessageInterceptor, LiquidityManagerSync}; use core::time::Duration; @@ -87,6 +87,7 @@ pub fn do_test(data: &[u8]) { Arc::clone(&tx_broadcaster), None, None, + DummyOnionMessageInterceptor, ) .unwrap(), ); diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 4d6e770c099..a49b20f3f9d 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -71,9 +71,9 @@ use lightning::util::wakers::Future; use lightning::util::wakers::Sleeper; use lightning_rapid_gossip_sync::RapidGossipSync; -use lightning_liquidity::ALiquidityManager; #[cfg(feature = "std")] use lightning_liquidity::ALiquidityManagerSync; +use lightning_liquidity::{ALiquidityManager, DummyOnionMessageInterceptor}; use core::ops::Deref; use core::time::Duration; @@ -463,6 +463,7 @@ pub const NO_LIQUIDITY_MANAGER: Option< BroadcasterInterface = &(dyn lightning::chain::chaininterface::BroadcasterInterface + Send + Sync), + OMI = DummyOnionMessageInterceptor, > + Send + Sync, >, @@ -485,6 +486,7 @@ pub const NO_LIQUIDITY_MANAGER_SYNC: Option< BroadcasterInterface = &(dyn lightning::chain::chaininterface::BroadcasterInterface + Send + Sync), + OMI = DummyOnionMessageInterceptor, > + Send + Sync, >, @@ -1972,7 +1974,9 @@ mod tests { use lightning::util::test_utils; use lightning::{get_event, get_event_msg}; use lightning_liquidity::utils::time::DefaultTimeProvider; - use lightning_liquidity::{ALiquidityManagerSync, LiquidityManager, LiquidityManagerSync}; + use lightning_liquidity::{ + ALiquidityManagerSync, DummyOnionMessageInterceptor, LiquidityManager, LiquidityManagerSync, + }; use lightning_persister::fs_store::v1::FilesystemStore; use lightning_rapid_gossip_sync::RapidGossipSync; use std::collections::VecDeque; @@ -2556,6 +2560,7 @@ mod tests { Arc::clone(&tx_broadcaster), None, None, + DummyOnionMessageInterceptor, ) .unwrap(), ); diff --git a/lightning-liquidity/src/lib.rs b/lightning-liquidity/src/lib.rs index 2f1d5bd01e7..e2de3bb1655 100644 --- a/lightning-liquidity/src/lib.rs +++ b/lightning-liquidity/src/lib.rs @@ -74,6 +74,6 @@ mod tests; pub mod utils; pub use manager::{ - ALiquidityManager, ALiquidityManagerSync, LiquidityClientConfig, LiquidityManager, - LiquidityManagerSync, LiquidityServiceConfig, + ALiquidityManager, ALiquidityManagerSync, DummyOnionMessageInterceptor, LiquidityClientConfig, + LiquidityManager, LiquidityManagerSync, LiquidityServiceConfig, }; diff --git a/lightning-liquidity/src/lsps2/event.rs b/lightning-liquidity/src/lsps2/event.rs index 502429b79ec..9ca20863387 100644 --- a/lightning-liquidity/src/lsps2/event.rs +++ b/lightning-liquidity/src/lsps2/event.rs @@ -49,7 +49,17 @@ pub enum LSPS2ClientEvent { /// When the invoice is paid, the LSP will open a channel with the previously agreed upon /// parameters to you. /// + /// For BOLT11 JIT invoices, `intercept_scid` and `cltv_expiry_delta` can be used in a route + /// hint. + /// + /// For BOLT12 JIT flows, register these parameters for your offer id on an + /// [`LSPS2BOLT12Router`] and then proceed with the regular BOLT12 offer + /// flow. The router will inject the LSPS2-specific blinded payment path when creating the + /// invoice. + /// /// **Note: ** This event will *not* be persisted across restarts. + /// + /// [`LSPS2BOLT12Router`]: crate::lsps2::router::LSPS2BOLT12Router InvoiceParametersReady { /// The identifier of the issued bLIP-52 / LSPS2 `buy` request, as returned by /// [`LSPS2ClientHandler::select_opening_params`]. diff --git a/lightning-liquidity/src/lsps2/mod.rs b/lightning-liquidity/src/lsps2/mod.rs index 1d5fb76d3b4..684ad9b26f7 100644 --- a/lightning-liquidity/src/lsps2/mod.rs +++ b/lightning-liquidity/src/lsps2/mod.rs @@ -13,5 +13,6 @@ pub mod client; pub mod event; pub mod msgs; pub(crate) mod payment_queue; +pub mod router; pub mod service; pub mod utils; diff --git a/lightning-liquidity/src/lsps2/router.rs b/lightning-liquidity/src/lsps2/router.rs new file mode 100644 index 00000000000..74832739f04 --- /dev/null +++ b/lightning-liquidity/src/lsps2/router.rs @@ -0,0 +1,540 @@ +// This file is Copyright its original authors, visible in version control +// history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license +// , at your option. +// You may not use this file except in accordance with one or both of these +// licenses. + +//! Router helpers for combining LSPS2 with BOLT12 offer flows. + +use alloc::vec::Vec; + +use crate::prelude::{new_hash_map, HashMap}; +use crate::sync::Mutex; + +use bitcoin::secp256k1::{self, PublicKey, Secp256k1}; + +use lightning::blinded_path::message::{ + BlindedMessagePath, MessageContext, MessageForwardNode, OffersContext, +}; +use lightning::blinded_path::payment::{ + BlindedPaymentPath, Bolt12OfferContext, ForwardTlvs, PaymentConstraints, PaymentContext, + PaymentForwardNode, PaymentRelay, ReceiveTlvs, +}; +use lightning::ln::channel_state::ChannelDetails; +use lightning::ln::channelmanager::{PaymentId, MIN_FINAL_CLTV_EXPIRY_DELTA}; +use lightning::offers::offer::OfferId; +use lightning::onion_message::messenger::{Destination, MessageRouter, OnionMessagePath}; +use lightning::routing::router::{InFlightHtlcs, Route, RouteParameters, Router}; +use lightning::sign::{EntropySource, ReceiveAuthKey}; +use lightning::types::features::BlindedHopFeatures; +use lightning::types::payment::PaymentHash; + +/// LSPS2 invoice parameters required to construct BOLT12 blinded payment paths through an LSP. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct LSPS2Bolt12InvoiceParameters { + /// The LSP node id to use as the blinded path introduction node. + pub counterparty_node_id: PublicKey, + /// The LSPS2 intercept short channel id. + pub intercept_scid: u64, + /// The CLTV expiry delta the LSP requires for forwarding over `intercept_scid`. + pub cltv_expiry_delta: u32, +} + +/// A router wrapper that injects LSPS2-specific BOLT12 blinded paths for registered offer ids +/// while delegating all other routing behavior to the inner routers. +/// +/// For **payment** blinded paths (in invoices), it injects the intercept SCID as the forwarding +/// hop so that the LSP can intercept the HTLC and open a JIT channel. +/// +/// For **message** blinded paths (in offers), it injects the intercept SCID as the +/// [`MessageForwardNode::short_channel_id`] for compact encoding, resulting in significantly +/// smaller offers when bech32-encoded (e.g., for QR codes). The LSP must register the intercept +/// SCID for interception via [`OnionMessageInterceptor::register_scid_for_interception`] so that +/// forwarded messages using the compact encoding are intercepted rather than dropped. +/// +/// [`OnionMessageInterceptor::register_scid_for_interception`]: lightning::onion_message::messenger::OnionMessageInterceptor::register_scid_for_interception +pub struct LSPS2BOLT12Router { + inner_router: R, + inner_message_router: MR, + entropy_source: ES, + offer_to_invoice_params: Mutex>, +} + +impl LSPS2BOLT12Router { + /// Constructs a new wrapper around `inner_router` and `inner_message_router`. + pub fn new(inner_router: R, inner_message_router: MR, entropy_source: ES) -> Self { + Self { + inner_router, + inner_message_router, + entropy_source, + offer_to_invoice_params: Mutex::new(new_hash_map()), + } + } + + /// Registers LSPS2 parameters to be used when generating blinded payment paths for `offer_id`. + pub fn register_offer( + &self, offer_id: OfferId, invoice_params: LSPS2Bolt12InvoiceParameters, + ) -> Option { + self.offer_to_invoice_params.lock().unwrap().insert(offer_id.0, invoice_params) + } + + /// Removes any previously registered LSPS2 parameters for `offer_id`. + pub fn unregister_offer(&self, offer_id: &OfferId) -> Option { + self.offer_to_invoice_params.lock().unwrap().remove(&offer_id.0) + } + + /// Clears all LSPS2 parameters previously registered via [`Self::register_offer`]. + pub fn clear_registered_offers(&self) { + self.offer_to_invoice_params.lock().unwrap().clear(); + } + + fn registered_lsps2_params( + &self, payment_context: &PaymentContext, + ) -> Option { + // We intentionally only match `Bolt12Offer` here and not `AsyncBolt12Offer`, as LSPS2 + // JIT channels are not applicable to async (always-online) BOLT12 offer flows. + let Bolt12OfferContext { offer_id, .. } = match payment_context { + PaymentContext::Bolt12Offer(context) => context, + _ => return None, + }; + + self.offer_to_invoice_params.lock().unwrap().get(&offer_id.0).copied() + } +} + +impl Router + for LSPS2BOLT12Router +{ + fn find_route( + &self, payer: &PublicKey, route_params: &RouteParameters, + first_hops: Option<&[&ChannelDetails]>, inflight_htlcs: InFlightHtlcs, + ) -> Result { + self.inner_router.find_route(payer, route_params, first_hops, inflight_htlcs) + } + + fn find_route_with_id( + &self, payer: &PublicKey, route_params: &RouteParameters, + first_hops: Option<&[&ChannelDetails]>, inflight_htlcs: InFlightHtlcs, + payment_hash: PaymentHash, payment_id: PaymentId, + ) -> Result { + self.inner_router.find_route_with_id( + payer, + route_params, + first_hops, + inflight_htlcs, + payment_hash, + payment_id, + ) + } + + fn create_blinded_payment_paths( + &self, recipient: PublicKey, local_node_receive_key: ReceiveAuthKey, + first_hops: Vec, tlvs: ReceiveTlvs, amount_msats: Option, + secp_ctx: &Secp256k1, + ) -> Result, ()> { + let lsps2_invoice_params = match self.registered_lsps2_params(&tlvs.payment_context) { + Some(params) => params, + None => { + return self.inner_router.create_blinded_payment_paths( + recipient, + local_node_receive_key, + first_hops, + tlvs, + amount_msats, + secp_ctx, + ) + }, + }; + + let payment_relay = PaymentRelay { + cltv_expiry_delta: u16::try_from(lsps2_invoice_params.cltv_expiry_delta) + .map_err(|_| ())?, + fee_proportional_millionths: 0, + fee_base_msat: 0, + }; + let payment_constraints = PaymentConstraints { + max_cltv_expiry: tlvs + .payment_constraints + .max_cltv_expiry + .saturating_add(lsps2_invoice_params.cltv_expiry_delta), + htlc_minimum_msat: 0, + }; + + let forward_node = PaymentForwardNode { + tlvs: ForwardTlvs { + short_channel_id: lsps2_invoice_params.intercept_scid, + payment_relay, + payment_constraints, + features: BlindedHopFeatures::empty(), + next_blinding_override: None, + }, + node_id: lsps2_invoice_params.counterparty_node_id, + htlc_maximum_msat: u64::MAX, + }; + + // We deliberately use `BlindedPaymentPath::new` without dummy hops here. Since the LSP + // is the introduction node and already knows the recipient, adding dummy hops would not + // provide meaningful privacy benefits in the LSPS2 JIT channel context. + let path = BlindedPaymentPath::new( + &[forward_node], + recipient, + local_node_receive_key, + tlvs, + u64::MAX, + MIN_FINAL_CLTV_EXPIRY_DELTA, + &self.entropy_source, + secp_ctx, + )?; + + Ok(vec![path]) + } +} + +impl MessageRouter + for LSPS2BOLT12Router +{ + fn find_path( + &self, sender: PublicKey, peers: Vec, destination: Destination, + ) -> Result { + self.inner_message_router.find_path(sender, peers, destination) + } + + fn create_blinded_paths( + &self, recipient: PublicKey, local_node_receive_key: ReceiveAuthKey, + context: MessageContext, peers: Vec, secp_ctx: &Secp256k1, + ) -> Result, ()> { + // Inject intercept SCIDs for size-constrained contexts (offer QR codes) so that + // the message blinded path uses compact SCID encoding instead of full pubkeys. + // We use the first matching intercept SCID for each peer since the message path + // is only used for routing InvoiceRequests, not for payment interception. + let peers = match &context { + MessageContext::Offers(OffersContext::InvoiceRequest { .. }) => { + let params = self.offer_to_invoice_params.lock().unwrap(); + peers + .into_iter() + .map(|mut peer| { + if let Some(p) = + params.values().find(|p| p.counterparty_node_id == peer.node_id) + { + peer.short_channel_id = Some(p.intercept_scid); + } + peer + }) + .collect() + }, + _ => peers, + }; + + self.inner_message_router.create_blinded_paths( + recipient, + local_node_receive_key, + context, + peers, + secp_ctx, + ) + } +} + +#[cfg(test)] +mod tests { + use super::{LSPS2BOLT12Router, LSPS2Bolt12InvoiceParameters}; + + use bitcoin::network::Network; + use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey}; + + use lightning::blinded_path::payment::{ + Bolt12OfferContext, Bolt12RefundContext, PaymentConstraints, PaymentContext, ReceiveTlvs, + }; + use lightning::blinded_path::NodeIdLookUp; + use lightning::ln::channel_state::ChannelDetails; + use lightning::ln::channelmanager::MIN_FINAL_CLTV_EXPIRY_DELTA; + use lightning::offers::invoice_request::InvoiceRequestFields; + use lightning::offers::offer::OfferId; + use lightning::routing::router::{InFlightHtlcs, Route, RouteParameters, Router}; + use lightning::sign::{EntropySource, NodeSigner, ReceiveAuthKey, Recipient}; + use lightning::types::payment::PaymentSecret; + use lightning::util::test_utils::TestKeysInterface; + + use crate::sync::Mutex; + + use core::sync::atomic::{AtomicUsize, Ordering}; + + struct RecordingLookup { + next_node_id: PublicKey, + short_channel_id: Mutex>, + } + + impl NodeIdLookUp for RecordingLookup { + fn next_node_id(&self, short_channel_id: u64) -> Option { + *self.short_channel_id.lock().unwrap() = Some(short_channel_id); + Some(self.next_node_id) + } + } + + #[derive(Clone)] + struct TestEntropy; + + impl EntropySource for TestEntropy { + fn get_secure_random_bytes(&self) -> [u8; 32] { + [42; 32] + } + } + + struct MockMessageRouter; + + impl lightning::onion_message::messenger::MessageRouter for MockMessageRouter { + fn find_path( + &self, _sender: PublicKey, _peers: Vec, + _destination: lightning::onion_message::messenger::Destination, + ) -> Result { + Err(()) + } + + fn create_blinded_paths< + T: bitcoin::secp256k1::Signing + bitcoin::secp256k1::Verification, + >( + &self, _recipient: PublicKey, _local_node_receive_key: lightning::sign::ReceiveAuthKey, + _context: lightning::blinded_path::message::MessageContext, + _peers: Vec, + _secp_ctx: &Secp256k1, + ) -> Result, ()> { + Err(()) + } + } + + struct MockRouter { + create_blinded_payment_paths_calls: AtomicUsize, + } + + impl MockRouter { + fn new() -> Self { + Self { create_blinded_payment_paths_calls: AtomicUsize::new(0) } + } + + fn create_blinded_payment_paths_calls(&self) -> usize { + self.create_blinded_payment_paths_calls.load(Ordering::Acquire) + } + } + + impl Router for MockRouter { + fn find_route( + &self, _payer: &PublicKey, _route_params: &RouteParameters, + _first_hops: Option<&[&ChannelDetails]>, _inflight_htlcs: InFlightHtlcs, + ) -> Result { + Err("mock router") + } + + fn create_blinded_payment_paths< + T: bitcoin::secp256k1::Signing + bitcoin::secp256k1::Verification, + >( + &self, _recipient: PublicKey, _local_node_receive_key: ReceiveAuthKey, + _first_hops: Vec, _tlvs: ReceiveTlvs, _amount_msats: Option, + _secp_ctx: &Secp256k1, + ) -> Result, ()> { + self.create_blinded_payment_paths_calls.fetch_add(1, Ordering::AcqRel); + Err(()) + } + } + + fn pubkey(byte: u8) -> PublicKey { + let secret_key = SecretKey::from_slice(&[byte; 32]).unwrap(); + PublicKey::from_secret_key(&Secp256k1::new(), &secret_key) + } + + fn bolt12_offer_tlvs(offer_id: OfferId) -> ReceiveTlvs { + ReceiveTlvs { + payment_secret: PaymentSecret([2; 32]), + payment_constraints: PaymentConstraints { max_cltv_expiry: 100, htlc_minimum_msat: 1 }, + payment_context: PaymentContext::Bolt12Offer(Bolt12OfferContext { + offer_id, + invoice_request: InvoiceRequestFields { + payer_signing_pubkey: pubkey(9), + quantity: None, + payer_note_truncated: None, + human_readable_name: None, + }, + }), + } + } + + fn bolt12_refund_tlvs() -> ReceiveTlvs { + ReceiveTlvs { + payment_secret: PaymentSecret([2; 32]), + payment_constraints: PaymentConstraints { max_cltv_expiry: 100, htlc_minimum_msat: 1 }, + payment_context: PaymentContext::Bolt12Refund(Bolt12RefundContext {}), + } + } + + #[test] + fn creates_lsps2_blinded_path_for_registered_offer() { + let inner_router = MockRouter::new(); + let entropy_source = TestEntropy; + let router = LSPS2BOLT12Router::new(inner_router, MockMessageRouter, entropy_source); + + let offer_id = OfferId([8; 32]); + let lsp_keys = TestKeysInterface::new(&[43; 32], Network::Testnet); + let lsp_node_id = lsp_keys.get_node_id(Recipient::Node).unwrap(); + + let expected_scid = 42; + let expected_cltv_delta = 48; + let recipient = pubkey(10); + + router.register_offer( + offer_id, + LSPS2Bolt12InvoiceParameters { + counterparty_node_id: lsp_node_id, + intercept_scid: expected_scid, + cltv_expiry_delta: expected_cltv_delta, + }, + ); + + let secp_ctx = Secp256k1::new(); + let mut paths = router + .create_blinded_payment_paths( + recipient, + ReceiveAuthKey([3; 32]), + Vec::new(), + bolt12_offer_tlvs(offer_id), + Some(5_000), + &secp_ctx, + ) + .unwrap(); + + assert_eq!(paths.len(), 1); + let mut path = paths.pop().unwrap(); + assert_eq!( + path.introduction_node(), + &lightning::blinded_path::IntroductionNode::NodeId(lsp_node_id) + ); + assert_eq!(path.payinfo.fee_base_msat, 0); + assert_eq!(path.payinfo.fee_proportional_millionths, 0); + assert_eq!( + path.payinfo.cltv_expiry_delta, + expected_cltv_delta as u16 + MIN_FINAL_CLTV_EXPIRY_DELTA + ); + + let lookup = + RecordingLookup { next_node_id: recipient, short_channel_id: Mutex::new(None) }; + path.advance_path_by_one(&lsp_keys, &lookup, &secp_ctx).unwrap(); + assert_eq!(*lookup.short_channel_id.lock().unwrap(), Some(expected_scid)); + } + + #[test] + fn delegates_when_offer_is_not_registered() { + let inner_router = MockRouter::new(); + let entropy_source = TestEntropy; + let router = LSPS2BOLT12Router::new(inner_router, MockMessageRouter, entropy_source); + let secp_ctx = Secp256k1::new(); + + let result = router.create_blinded_payment_paths( + pubkey(10), + ReceiveAuthKey([3; 32]), + Vec::new(), + bolt12_refund_tlvs(), + Some(10_000), + &secp_ctx, + ); + + assert!(result.is_err()); + assert_eq!(router.inner_router.create_blinded_payment_paths_calls(), 1); + } + + #[test] + fn delegates_when_offer_id_is_not_registered() { + let inner_router = MockRouter::new(); + let entropy_source = TestEntropy; + let router = LSPS2BOLT12Router::new(inner_router, MockMessageRouter, entropy_source); + let secp_ctx = Secp256k1::new(); + + // Use a Bolt12Offer context with an OfferId that was never registered. + let unregistered_offer_id = OfferId([99; 32]); + let result = router.create_blinded_payment_paths( + pubkey(10), + ReceiveAuthKey([3; 32]), + Vec::new(), + bolt12_offer_tlvs(unregistered_offer_id), + Some(10_000), + &secp_ctx, + ); + + assert!(result.is_err()); + assert_eq!(router.inner_router.create_blinded_payment_paths_calls(), 1); + } + + #[test] + fn rejects_out_of_range_cltv_delta() { + let inner_router = MockRouter::new(); + let entropy_source = TestEntropy; + let router = LSPS2BOLT12Router::new(inner_router, MockMessageRouter, entropy_source); + + let offer_id = OfferId([11; 32]); + router.register_offer( + offer_id, + LSPS2Bolt12InvoiceParameters { + counterparty_node_id: pubkey(12), + intercept_scid: 21, + cltv_expiry_delta: u32::from(u16::MAX) + 1, + }, + ); + + let secp_ctx = Secp256k1::new(); + let result = router.create_blinded_payment_paths( + pubkey(13), + ReceiveAuthKey([3; 32]), + Vec::new(), + bolt12_offer_tlvs(offer_id), + Some(1_000), + &secp_ctx, + ); + + assert!(result.is_err()); + } + + #[test] + fn can_unregister_offer() { + let inner_router = MockRouter::new(); + let entropy_source = TestEntropy; + let router = LSPS2BOLT12Router::new(inner_router, MockMessageRouter, entropy_source); + + let offer_id = OfferId([1; 32]); + let params = LSPS2Bolt12InvoiceParameters { + counterparty_node_id: pubkey(2), + intercept_scid: 7, + cltv_expiry_delta: 40, + }; + assert_eq!(router.register_offer(offer_id, params), None); + assert_eq!(router.unregister_offer(&offer_id), Some(params)); + assert_eq!(router.unregister_offer(&offer_id), None); + } + + #[test] + fn can_clear_registered_offers() { + let inner_router = MockRouter::new(); + let entropy_source = TestEntropy; + let router = LSPS2BOLT12Router::new(inner_router, MockMessageRouter, entropy_source); + + router.register_offer( + OfferId([1; 32]), + LSPS2Bolt12InvoiceParameters { + counterparty_node_id: pubkey(2), + intercept_scid: 7, + cltv_expiry_delta: 40, + }, + ); + router.register_offer( + OfferId([2; 32]), + LSPS2Bolt12InvoiceParameters { + counterparty_node_id: pubkey(3), + intercept_scid: 8, + cltv_expiry_delta: 41, + }, + ); + + router.clear_registered_offers(); + assert_eq!(router.unregister_offer(&OfferId([1; 32])), None); + assert_eq!(router.unregister_offer(&OfferId([2; 32])), None); + } +} diff --git a/lightning-liquidity/src/lsps2/service.rs b/lightning-liquidity/src/lsps2/service.rs index b7f6f2fc64d..5fb8836bb2d 100644 --- a/lightning-liquidity/src/lsps2/service.rs +++ b/lightning-liquidity/src/lsps2/service.rs @@ -31,6 +31,7 @@ use crate::lsps2::payment_queue::{InterceptedHTLC, PaymentQueue}; use crate::lsps2::utils::{ compute_opening_fee, is_expired_opening_fee_params, is_valid_opening_fee_params, }; +use crate::manager::DummyOnionMessageInterceptor; use crate::message_queue::{MessageQueue, MessageQueueNotifierGuard}; use crate::persist::{ LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE, @@ -45,6 +46,7 @@ use lightning::events::HTLCHandlingFailureType; use lightning::ln::channelmanager::{AChannelManager, FailureCode, InterceptId}; use lightning::ln::msgs::{ErrorAction, LightningError}; use lightning::ln::types::ChannelId; +use lightning::onion_message::messenger::OnionMessageInterceptor; use lightning::util::errors::APIError; use lightning::util::logger::Level; use lightning::util::ser::Writeable; @@ -631,17 +633,20 @@ impl PeerState { }); } - fn prune_expired_request_state(&mut self) { + fn prune_expired_request_state(&mut self) -> Vec { + let mut pruned_scids = Vec::new(); self.outbound_channels_by_intercept_scid.retain(|intercept_scid, entry| { if entry.is_prunable() { // We abort the flow, and prune any data kept. self.intercept_scid_by_channel_id.retain(|_, iscid| intercept_scid != iscid); self.intercept_scid_by_user_channel_id.retain(|_, iscid| intercept_scid != iscid); self.needs_persist |= true; + pruned_scids.push(*intercept_scid); return false; } true }); + pruned_scids } fn pending_requests_and_channels(&self) -> usize { @@ -702,8 +707,12 @@ macro_rules! get_or_insert_peer_state_entry { } /// The main object allowing to send and receive bLIP-52 / LSPS2 messages. -pub struct LSPS2ServiceHandler -where +pub struct LSPS2ServiceHandler< + CM: Deref, + K: KVStore + Clone, + T: BroadcasterInterface, + OMI: OnionMessageInterceptor + Send + Sync = DummyOnionMessageInterceptor, +> where CM::Target: AChannelManager, { channel_manager: CM, @@ -717,9 +726,15 @@ where total_pending_requests: AtomicUsize, config: LSPS2ServiceConfig, persistence_in_flight: AtomicUsize, + onion_message_interceptor: OMI, } -impl LSPS2ServiceHandler +impl< + CM: Deref, + K: KVStore + Clone, + T: BroadcasterInterface + Clone, + OMI: OnionMessageInterceptor + Send + Sync, + > LSPS2ServiceHandler where CM::Target: AChannelManager, { @@ -727,7 +742,7 @@ where pub(crate) fn new( per_peer_state: HashMap>, pending_messages: Arc, pending_events: Arc>, channel_manager: CM, kv_store: K, tx_broadcaster: T, - config: LSPS2ServiceConfig, + config: LSPS2ServiceConfig, onion_message_interceptor: OMI, ) -> Result { let mut peer_by_intercept_scid = new_hash_map(); let mut peer_by_channel_id = new_hash_map(); @@ -756,6 +771,14 @@ where } } + // Register all peers and SCIDs with active intercept SCIDs for onion message + // interception, so that messages for offline peers are held rather than dropped. + // Both peer-based and SCID-based registration are needed to support clients using + // either pubkey or compact SCID encoding in their message blinded paths. + for (scid, node_id) in &peer_by_intercept_scid { + onion_message_interceptor.register_scid_for_interception(*scid, *node_id); + } + Ok(Self { pending_messages, pending_events, @@ -768,6 +791,7 @@ where kv_store, tx_broadcaster, config, + onion_message_interceptor, }) } @@ -776,6 +800,25 @@ where &self.config } + /// Cleans up `peer_by_intercept_scid` entries for the given SCIDs, and deregisters the peer + /// from onion message interception if they have no remaining active intercept SCIDs. + fn cleanup_intercept_scids(&self, pruned_scids: &[u64]) { + if pruned_scids.is_empty() { + return; + } + + { + let mut peer_by_intercept_scid = self.peer_by_intercept_scid.write().unwrap(); + for scid in pruned_scids { + peer_by_intercept_scid.remove(scid); + } + } + + for scid in pruned_scids { + self.onion_message_interceptor.deregister_scid_for_interception(*scid); + } + } + /// Returns whether the peer has any active LSPS2 requests. pub(crate) fn has_active_requests(&self, counterparty_node_id: &PublicKey) -> bool { let outer_state_lock = self.per_peer_state.read().unwrap(); @@ -921,6 +964,9 @@ where peer_by_intercept_scid.insert(intercept_scid, *counterparty_node_id); } + self.onion_message_interceptor + .register_scid_for_interception(intercept_scid, *counterparty_node_id); + let outbound_jit_channel = OutboundJITChannel::new( buy_request.payment_size_msat, buy_request.opening_fee_params, @@ -988,19 +1034,19 @@ where payment_hash: PaymentHash, ) -> Result<(), APIError> { let event_queue_notifier = self.pending_events.notifier(); - let mut should_persist = None; + let should_persist; - if let Some(counterparty_node_id) = - self.peer_by_intercept_scid.read().unwrap().get(&intercept_scid) - { + let counterparty_node_id = + self.peer_by_intercept_scid.read().unwrap().get(&intercept_scid).copied(); + if let Some(counterparty_node_id) = counterparty_node_id { let outer_state_lock = self.per_peer_state.read().unwrap(); - match outer_state_lock.get(counterparty_node_id) { + match outer_state_lock.get(&counterparty_node_id) { Some(inner_state_lock) => { let mut peer_state = inner_state_lock.lock().unwrap(); if let Some(jit_channel) = peer_state.outbound_channels_by_intercept_scid.get_mut(&intercept_scid) { - should_persist = Some(*counterparty_node_id); + should_persist = Some(counterparty_node_id); let htlc = InterceptedHTLC { intercept_id, expected_outbound_amount_msat, @@ -1009,7 +1055,7 @@ where match jit_channel.htlc_intercepted(htlc) { Ok(Some(HTLCInterceptedAction::OpenChannel(open_channel_params))) => { let event = LSPS2ServiceEvent::OpenChannel { - their_network_key: counterparty_node_id.clone(), + their_network_key: counterparty_node_id, amt_to_forward_msat: open_channel_params.amt_to_forward_msat, opening_fee_msat: open_channel_params.opening_fee_msat, user_channel_id: jit_channel.user_channel_id, @@ -1021,7 +1067,7 @@ where self.channel_manager.get_cm().forward_intercepted_htlc( intercept_id, &channel_id, - *counterparty_node_id, + counterparty_node_id, expected_outbound_amount_msat, )?; }, @@ -1038,7 +1084,7 @@ where self.channel_manager.get_cm().forward_intercepted_htlc( intercept_id, &channel_id, - *counterparty_node_id, + counterparty_node_id, amount_to_forward_msat, )?; } @@ -1051,10 +1097,14 @@ where peer_state .outbound_channels_by_intercept_scid .remove(&intercept_scid); - // TODO: cleanup peer_by_intercept_scid + self.cleanup_intercept_scids(&[intercept_scid]); return Err(APIError::APIMisuseError { err: e.err }); }, } + } else { + return Err(APIError::APIMisuseError { + err: format!("No JIT channel state found for scid: {}", intercept_scid), + }); } peer_state.needs_persist |= should_persist.is_some(); @@ -1270,6 +1320,7 @@ where pub async fn channel_open_abandoned( &self, counterparty_node_id: &PublicKey, user_channel_id: u128, ) -> Result<(), APIError> { + let intercept_scid; { let outer_state_lock = self.per_peer_state.read().unwrap(); let inner_state_lock = outer_state_lock.get(counterparty_node_id).ok_or_else(|| { @@ -1279,7 +1330,7 @@ where })?; let mut peer_state = inner_state_lock.lock().unwrap(); - let intercept_scid = peer_state + let removed_intercept_scid = peer_state .intercept_scid_by_user_channel_id .get(&user_channel_id) .copied() @@ -1292,13 +1343,13 @@ where let jit_channel = peer_state .outbound_channels_by_intercept_scid - .get(&intercept_scid) + .get(&removed_intercept_scid) .ok_or_else(|| APIError::APIMisuseError { - err: format!( - "Failed to map intercept_scid {} for user_channel_id {} to a channel.", - intercept_scid, user_channel_id, - ), - })?; + err: format!( + "Failed to map intercept_scid {} for user_channel_id {} to a channel.", + removed_intercept_scid, user_channel_id, + ), + })?; let is_pending = matches!( jit_channel.state, @@ -1313,12 +1364,17 @@ where }); } + intercept_scid = removed_intercept_scid; peer_state.intercept_scid_by_user_channel_id.remove(&user_channel_id); - peer_state.outbound_channels_by_intercept_scid.remove(&intercept_scid); - peer_state.intercept_scid_by_channel_id.retain(|_, &mut scid| scid != intercept_scid); + peer_state.outbound_channels_by_intercept_scid.remove(&removed_intercept_scid); + peer_state + .intercept_scid_by_channel_id + .retain(|_, &mut scid| scid != removed_intercept_scid); peer_state.needs_persist |= true; } + self.cleanup_intercept_scids(&[intercept_scid]); + self.persist_peer_state(*counterparty_node_id).await.map_err(|e| { APIError::APIMisuseError { err: format!("Failed to persist peer state for {}: {}", counterparty_node_id, e), @@ -1805,6 +1861,7 @@ where loop { let mut need_remove = Vec::new(); let mut need_persist = Vec::new(); + let mut pruned_scids = Vec::new(); { // First build a list of peers to persist and prune with the read lock. This allows @@ -1812,7 +1869,7 @@ where let outer_state_lock = self.per_peer_state.read().unwrap(); for (counterparty_node_id, inner_state_lock) in outer_state_lock.iter() { let mut peer_state_lock = inner_state_lock.lock().unwrap(); - peer_state_lock.prune_expired_request_state(); + pruned_scids.extend(peer_state_lock.prune_expired_request_state()); let is_prunable = peer_state_lock.is_prunable(); if is_prunable { need_remove.push(*counterparty_node_id); @@ -1822,6 +1879,8 @@ where } } + self.cleanup_intercept_scids(&pruned_scids); + for counterparty_node_id in need_persist.into_iter() { debug_assert!(!need_remove.contains(&counterparty_node_id)); self.persist_peer_state(counterparty_node_id).await?; @@ -1858,6 +1917,22 @@ where debug_assert!(false); } } + if future_opt.is_some() { + // Clean up handler-level maps for the removed peer. + let removed_scids: Vec = self + .peer_by_intercept_scid + .read() + .unwrap() + .iter() + .filter(|(_, nid)| **nid == counterparty_node_id) + .map(|(scid, _)| *scid) + .collect(); + self.cleanup_intercept_scids(&removed_scids); + self.peer_by_channel_id + .write() + .unwrap() + .retain(|_, node_id| *node_id != counterparty_node_id); + } if let Some(future) = future_opt { future.await?; did_persist = true; @@ -1879,14 +1954,19 @@ where } pub(crate) fn peer_disconnected(&self, counterparty_node_id: PublicKey) { - let outer_state_lock = self.per_peer_state.read().unwrap(); - if let Some(inner_state_lock) = outer_state_lock.get(&counterparty_node_id) { - let mut peer_state_lock = inner_state_lock.lock().unwrap(); - // We clean up the peer state, but leave removing the peer entry to the prune logic in - // `persist` which removes it from the store. - peer_state_lock.prune_pending_requests(); - peer_state_lock.prune_expired_request_state(); + let mut pruned_scids = Vec::new(); + { + let outer_state_lock = self.per_peer_state.read().unwrap(); + if let Some(inner_state_lock) = outer_state_lock.get(&counterparty_node_id) { + let mut peer_state_lock = inner_state_lock.lock().unwrap(); + // We clean up the peer state, but leave removing the peer entry to the prune logic in + // `persist` which removes it from the store. + peer_state_lock.prune_pending_requests(); + pruned_scids = peer_state_lock.prune_expired_request_state(); + } } + + self.cleanup_intercept_scids(&pruned_scids); } /// Checks if the JIT channel with the given `user_channel_id` needs manual broadcast. @@ -2050,8 +2130,12 @@ where } } -impl LSPSProtocolMessageHandler - for LSPS2ServiceHandler +impl< + CM: Deref, + K: KVStore + Clone, + T: BroadcasterInterface + Clone, + OMI: OnionMessageInterceptor + Send + Sync, + > LSPSProtocolMessageHandler for LSPS2ServiceHandler where CM::Target: AChannelManager, { @@ -2128,18 +2212,24 @@ pub struct LSPS2ServiceHandlerSync< CM: Deref, K: KVStore + Clone, T: BroadcasterInterface + Clone, + OMI: OnionMessageInterceptor + Send + Sync = DummyOnionMessageInterceptor, > where CM::Target: AChannelManager, { - inner: &'a LSPS2ServiceHandler, + inner: &'a LSPS2ServiceHandler, } -impl<'a, CM: Deref, K: KVStore + Clone, T: BroadcasterInterface + Clone> - LSPS2ServiceHandlerSync<'a, CM, K, T> +impl< + 'a, + CM: Deref, + K: KVStore + Clone, + T: BroadcasterInterface + Clone, + OMI: OnionMessageInterceptor + Send + Sync, + > LSPS2ServiceHandlerSync<'a, CM, K, T, OMI> where CM::Target: AChannelManager, { - pub(crate) fn from_inner(inner: &'a LSPS2ServiceHandler) -> Self { + pub(crate) fn from_inner(inner: &'a LSPS2ServiceHandler) -> Self { Self { inner } } @@ -2360,6 +2450,7 @@ mod tests { use crate::lsps0::ser::LSPSDateTime; use bitcoin::{absolute::LockTime, transaction::Version}; + use core::str::FromStr; const MAX_VALUE_MSAT: u64 = 21_000_000_0000_0000_000; @@ -2445,6 +2536,122 @@ mod tests { ); } + #[test] + #[cfg(feature = "time")] + fn persist_cleans_pruned_intercept_scids_from_handler_maps() { + use alloc::collections::VecDeque; + + use crate::events::EventQueue; + use crate::message_queue::MessageQueue; + use crate::prelude::new_hash_map; + use crate::sync::{Arc, Mutex}; + use crate::utils::async_poll::dummy_waker; + + use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey}; + use lightning::ln::functional_test_utils::{ + create_chanmon_cfgs, create_network, create_node_cfgs, create_node_chanmgrs, + }; + use lightning::util::test_utils::TestStore; + use lightning::util::wakers::Notifier; + + use core::pin::pin; + use core::task; + + fn pubkey(byte: u8) -> PublicKey { + let secret_key = SecretKey::from_slice(&[byte; 32]).unwrap(); + PublicKey::from_secret_key(&Secp256k1::new(), &secret_key) + } + + fn opening_fee_params(valid_until: LSPSDateTime) -> LSPS2OpeningFeeParams { + LSPS2OpeningFeeParams { + min_fee_msat: 100, + proportional: 21, + valid_until, + min_lifetime: 144, + max_client_to_self_delay: 128, + min_payment_size_msat: 1, + max_payment_size_msat: 100_000_000, + promise: "ignore".to_string(), + } + } + + let chanmon_cfgs = create_chanmon_cfgs(1); + let node_cfgs = create_node_cfgs(1, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(1, &node_cfgs, &[None]); + let nodes = create_network(1, &node_cfgs, &node_chanmgrs); + let node = &nodes[0]; + + let counterparty_node_id = pubkey(3); + let expired_scid = 41; + let active_scid = 42; + + let mut peer_state = PeerState::new(); + peer_state.insert_outbound_channel( + expired_scid, + OutboundJITChannel::new( + Some(1_000_000), + opening_fee_params(LSPSDateTime::from_str("2020-01-01T00:00:00Z").unwrap()), + 1, + true, + ), + ); + peer_state.insert_outbound_channel( + active_scid, + OutboundJITChannel::new( + Some(1_000_000), + opening_fee_params(LSPSDateTime::from_str("2035-05-20T08:30:45Z").unwrap()), + 2, + true, + ), + ); + + let mut per_peer_state = new_hash_map(); + per_peer_state.insert(counterparty_node_id, Mutex::new(peer_state)); + + let kv_store = Arc::new(TestStore::new(false)); + let pending_messages = Arc::new(MessageQueue::new(Arc::new(Notifier::new()))); + let pending_events = Arc::new(EventQueue::new( + VecDeque::new(), + Arc::clone(&kv_store), + Arc::new(Notifier::new()), + )); + let handler = LSPS2ServiceHandler::new( + per_peer_state, + pending_messages, + pending_events, + node.node, + Arc::clone(&kv_store), + node.tx_broadcaster, + LSPS2ServiceConfig { promise_secret: [42; 32] }, + DummyOnionMessageInterceptor, + ) + .unwrap(); + + let mut fut = pin!(handler.persist()); + let mut waker = dummy_waker(); + let mut ctx = task::Context::from_waker(&mut waker); + match fut.as_mut().poll(&mut ctx) { + task::Poll::Ready(Ok(true)) => {}, + task::Poll::Pending => { + kv_store.complete_all_async_writes(); + match fut.as_mut().poll(&mut ctx) { + task::Poll::Ready(Ok(true)) => {}, + res => panic!("Unexpected persist result after writes: {:?}", res), + } + }, + res => panic!("Unexpected persist result: {:?}", res), + } + + let peer_by_intercept_scid = handler.peer_by_intercept_scid.read().unwrap(); + assert_eq!(peer_by_intercept_scid.get(&expired_scid), None); + assert_eq!(peer_by_intercept_scid.get(&active_scid), Some(&counterparty_node_id)); + + let per_peer_state = handler.per_peer_state.read().unwrap(); + let peer_state = per_peer_state.get(&counterparty_node_id).unwrap().lock().unwrap(); + assert!(!peer_state.outbound_channels_by_intercept_scid.contains_key(&expired_scid)); + assert!(peer_state.outbound_channels_by_intercept_scid.contains_key(&active_scid)); + } + #[test] fn test_jit_channel_state_mpp() { let payment_size_msat = Some(500_000_000); diff --git a/lightning-liquidity/src/manager.rs b/lightning-liquidity/src/manager.rs index f1b098dbfaa..98fb371e9da 100644 --- a/lightning-liquidity/src/manager.rs +++ b/lightning-liquidity/src/manager.rs @@ -47,6 +47,7 @@ use lightning::ln::channelmanager::AChannelManager; use lightning::ln::msgs::{ErrorAction, LightningError}; use lightning::ln::peer_handler::CustomMessageHandler; use lightning::ln::wire::CustomMessageReader; +use lightning::onion_message::messenger::OnionMessageInterceptor; use lightning::sign::{EntropySource, NodeSigner}; use lightning::util::logger::Level; use lightning::util::persist::{KVStore, KVStoreSync, KVStoreSyncWrapper}; @@ -64,6 +65,19 @@ use core::task; const LSPS_FEATURE_BIT: usize = 729; +/// A no-op [`OnionMessageInterceptor`] that can be used when LSPS2 onion-message interception is +/// not needed. +#[derive(Clone, Copy, Debug, Default)] +pub struct DummyOnionMessageInterceptor; + +impl OnionMessageInterceptor for DummyOnionMessageInterceptor { + fn register_scid_for_interception(&self, _scid: u64, _peer_node_id: PublicKey) {} + + fn deregister_scid_for_interception(&self, _scid: u64) -> bool { + false + } +} + /// A server-side configuration for [`LiquidityManager`]. /// /// Allows end-users to configure options when using the [`LiquidityManager`] @@ -117,6 +131,8 @@ pub trait ALiquidityManager { type TP: Deref + Clone; /// A type implementing [`BroadcasterInterface`]. type BroadcasterInterface: BroadcasterInterface + Clone; + /// A type implementing [`OnionMessageInterceptor`]. + type OMI: OnionMessageInterceptor + Send + Sync; /// Returns a reference to the actual [`LiquidityManager`] object. fn get_lm( &self, @@ -127,6 +143,7 @@ pub trait ALiquidityManager { Self::K, Self::TP, Self::BroadcasterInterface, + Self::OMI, >; } @@ -137,7 +154,8 @@ impl< K: KVStore + Clone, TP: Deref + Clone, T: BroadcasterInterface + Clone, - > ALiquidityManager for LiquidityManager + OMI: OnionMessageInterceptor + Send + Sync, + > ALiquidityManager for LiquidityManager where CM::Target: AChannelManager, TP::Target: TimeProvider, @@ -150,7 +168,8 @@ where type TimeProvider = TP::Target; type TP = TP; type BroadcasterInterface = T; - fn get_lm(&self) -> &LiquidityManager { + type OMI = OMI; + fn get_lm(&self) -> &LiquidityManager { self } } @@ -178,6 +197,8 @@ pub trait ALiquidityManagerSync { type TP: Deref + Clone; /// A type implementing [`BroadcasterInterface`]. type BroadcasterInterface: BroadcasterInterface + Clone; + /// A type implementing [`OnionMessageInterceptor`]. + type OMI: OnionMessageInterceptor + Send + Sync; /// Returns the inner async [`LiquidityManager`] for testing purposes. #[cfg(any(test, feature = "_test_utils"))] fn get_lm_async( @@ -189,6 +210,7 @@ pub trait ALiquidityManagerSync { KVStoreSyncWrapper, Self::TP, Self::BroadcasterInterface, + Self::OMI, >; /// Returns a reference to the actual [`LiquidityManager`] object. fn get_lm( @@ -200,6 +222,7 @@ pub trait ALiquidityManagerSync { Self::KS, Self::TP, Self::BroadcasterInterface, + Self::OMI, >; } @@ -210,7 +233,8 @@ impl< KS: Deref + Clone, TP: Deref + Clone, T: BroadcasterInterface + Clone, - > ALiquidityManagerSync for LiquidityManagerSync + OMI: OnionMessageInterceptor + Send + Sync, + > ALiquidityManagerSync for LiquidityManagerSync where CM::Target: AChannelManager, KS::Target: KVStoreSync, @@ -225,6 +249,7 @@ where type TimeProvider = TP::Target; type TP = TP; type BroadcasterInterface = T; + type OMI = OMI; /// Returns the inner async [`LiquidityManager`] for testing purposes. #[cfg(any(test, feature = "_test_utils"))] fn get_lm_async( @@ -236,10 +261,11 @@ where KVStoreSyncWrapper, Self::TP, Self::BroadcasterInterface, + Self::OMI, > { &self.inner } - fn get_lm(&self) -> &LiquidityManagerSync { + fn get_lm(&self) -> &LiquidityManagerSync { self } } @@ -270,6 +296,7 @@ pub struct LiquidityManager< K: KVStore + Clone, TP: Deref + Clone, T: BroadcasterInterface + Clone, + OMI: OnionMessageInterceptor + Send + Sync = DummyOnionMessageInterceptor, > where CM::Target: AChannelManager, TP::Target: TimeProvider, @@ -283,7 +310,7 @@ pub struct LiquidityManager< lsps0_service_handler: Option, lsps1_service_handler: Option>, lsps1_client_handler: Option>, - lsps2_service_handler: Option>, + lsps2_service_handler: Option>, lsps2_client_handler: Option>, lsps5_service_handler: Option>, lsps5_client_handler: Option>, @@ -299,7 +326,8 @@ impl< CM: Deref + Clone, K: KVStore + Clone, T: BroadcasterInterface + Clone, - > LiquidityManager + OMI: OnionMessageInterceptor + Send + Sync, + > LiquidityManager where CM::Target: AChannelManager, { @@ -309,7 +337,7 @@ where pub async fn new( entropy_source: ES, node_signer: NS, channel_manager: CM, kv_store: K, transaction_broadcaster: T, service_config: Option, - client_config: Option, + client_config: Option, onion_message_interceptor: OMI, ) -> Result { Self::new_with_custom_time_provider( entropy_source, @@ -320,6 +348,7 @@ where service_config, client_config, DefaultTimeProvider, + onion_message_interceptor, ) .await } @@ -332,7 +361,8 @@ impl< K: KVStore + Clone, TP: Deref + Clone, T: BroadcasterInterface + Clone, - > LiquidityManager + OMI: OnionMessageInterceptor + Send + Sync, + > LiquidityManager where CM::Target: AChannelManager, TP::Target: TimeProvider, @@ -349,6 +379,7 @@ where entropy_source: ES, node_signer: NS, channel_manager: CM, transaction_broadcaster: T, kv_store: K, service_config: Option, client_config: Option, time_provider: TP, + onion_message_interceptor: OMI, ) -> Result { let pending_msgs_or_needs_persist_notifier = Arc::new(Notifier::new()); let pending_messages = @@ -377,7 +408,7 @@ where let lsps2_service_handler = if let Some(service_config) = service_config.as_ref() { if let Some(lsps2_service_config) = service_config.lsps2_service_config.as_ref() { if let Some(number) = - as LSPSProtocolMessageHandler>::PROTOCOL_NUMBER + as LSPSProtocolMessageHandler>::PROTOCOL_NUMBER { supported_protocols.push(number); } @@ -391,6 +422,7 @@ where kv_store.clone(), transaction_broadcaster.clone(), lsps2_service_config.clone(), + onion_message_interceptor, )?) } else { None @@ -540,7 +572,7 @@ where /// Returns a reference to the LSPS2 server-side handler. /// /// The returned hendler allows to initiate the LSPS2 service-side flow. - pub fn lsps2_service_handler(&self) -> Option<&LSPS2ServiceHandler> { + pub fn lsps2_service_handler(&self) -> Option<&LSPS2ServiceHandler> { self.lsps2_service_handler.as_ref() } @@ -753,7 +785,8 @@ impl< K: KVStore + Clone, TP: Deref + Clone, T: BroadcasterInterface + Clone, - > CustomMessageReader for LiquidityManager + OMI: OnionMessageInterceptor + Send + Sync, + > CustomMessageReader for LiquidityManager where CM::Target: AChannelManager, TP::Target: TimeProvider, @@ -779,7 +812,8 @@ impl< K: KVStore + Clone, TP: Deref + Clone, T: BroadcasterInterface + Clone, - > CustomMessageHandler for LiquidityManager + OMI: OnionMessageInterceptor + Send + Sync, + > CustomMessageHandler for LiquidityManager where CM::Target: AChannelManager, TP::Target: TimeProvider, @@ -913,12 +947,13 @@ pub struct LiquidityManagerSync< KS: Deref + Clone, TP: Deref + Clone, T: BroadcasterInterface + Clone, + OMI: OnionMessageInterceptor + Send + Sync = DummyOnionMessageInterceptor, > where CM::Target: AChannelManager, KS::Target: KVStoreSync, TP::Target: TimeProvider, { - inner: LiquidityManager, TP, T>, + inner: LiquidityManager, TP, T, OMI>, } #[cfg(feature = "time")] @@ -928,7 +963,8 @@ impl< CM: Deref + Clone, KS: Deref + Clone, T: BroadcasterInterface + Clone, - > LiquidityManagerSync + OMI: OnionMessageInterceptor + Send + Sync, + > LiquidityManagerSync where CM::Target: AChannelManager, KS::Target: KVStoreSync, @@ -939,7 +975,7 @@ where pub fn new( entropy_source: ES, node_signer: NS, channel_manager: CM, kv_store_sync: KS, transaction_broadcaster: T, service_config: Option, - client_config: Option, + client_config: Option, onion_message_interceptor: OMI, ) -> Result { let kv_store = KVStoreSyncWrapper(kv_store_sync); @@ -951,6 +987,7 @@ where transaction_broadcaster, service_config, client_config, + onion_message_interceptor, )); let mut waker = dummy_waker(); @@ -973,7 +1010,8 @@ impl< KS: Deref + Clone, TP: Deref + Clone, T: BroadcasterInterface + Clone, - > LiquidityManagerSync + OMI: OnionMessageInterceptor + Send + Sync, + > LiquidityManagerSync where CM::Target: AChannelManager, KS::Target: KVStoreSync, @@ -986,6 +1024,7 @@ where entropy_source: ES, node_signer: NS, channel_manager: CM, kv_store_sync: KS, transaction_broadcaster: T, service_config: Option, client_config: Option, time_provider: TP, + onion_message_interceptor: OMI, ) -> Result { let kv_store = KVStoreSyncWrapper(kv_store_sync); let mut fut = pin!(LiquidityManager::new_with_custom_time_provider( @@ -997,6 +1036,7 @@ where service_config, client_config, time_provider, + onion_message_interceptor, )); let mut waker = dummy_waker(); @@ -1053,7 +1093,7 @@ where /// Wraps [`LiquidityManager::lsps2_service_handler`]. pub fn lsps2_service_handler<'a>( &'a self, - ) -> Option, T>> { + ) -> Option, T, OMI>> { self.inner.lsps2_service_handler.as_ref().map(|r| LSPS2ServiceHandlerSync::from_inner(r)) } @@ -1134,7 +1174,8 @@ impl< KS: Deref + Clone, TP: Deref + Clone, T: BroadcasterInterface + Clone, - > CustomMessageReader for LiquidityManagerSync + OMI: OnionMessageInterceptor + Send + Sync, + > CustomMessageReader for LiquidityManagerSync where CM::Target: AChannelManager, KS::Target: KVStoreSync, @@ -1156,7 +1197,8 @@ impl< KS: Deref + Clone, TP: Deref + Clone, T: BroadcasterInterface + Clone, - > CustomMessageHandler for LiquidityManagerSync + OMI: OnionMessageInterceptor + Send + Sync, + > CustomMessageHandler for LiquidityManagerSync where CM::Target: AChannelManager, KS::Target: KVStoreSync, diff --git a/lightning-liquidity/tests/common/mod.rs b/lightning-liquidity/tests/common/mod.rs index 2716df7c0a3..9156a13a1c9 100644 --- a/lightning-liquidity/tests/common/mod.rs +++ b/lightning-liquidity/tests/common/mod.rs @@ -1,7 +1,10 @@ #![cfg(test)] use lightning_liquidity::utils::time::TimeProvider; -use lightning_liquidity::{LiquidityClientConfig, LiquidityManagerSync, LiquidityServiceConfig}; +use lightning_liquidity::{ + DummyOnionMessageInterceptor, LiquidityClientConfig, LiquidityManagerSync, + LiquidityServiceConfig, +}; use lightning::ln::functional_test_utils::{Node, TestChannelManager}; use lightning::util::test_utils::{TestBroadcaster, TestKeysInterface, TestStore}; @@ -36,6 +39,7 @@ fn build_service_and_client_nodes<'a, 'b, 'c>( Some(service_config), None, Arc::clone(&time_provider), + DummyOnionMessageInterceptor, ) .unwrap(); @@ -48,6 +52,7 @@ fn build_service_and_client_nodes<'a, 'b, 'c>( None, Some(client_config), time_provider, + DummyOnionMessageInterceptor, ) .unwrap(); @@ -127,6 +132,7 @@ pub(crate) struct LiquidityNode<'a, 'b, 'c> { Arc, Arc, &'c TestBroadcaster, + DummyOnionMessageInterceptor, >, } @@ -140,6 +146,7 @@ impl<'a, 'b, 'c> LiquidityNode<'a, 'b, 'c> { Arc, Arc, &'c TestBroadcaster, + DummyOnionMessageInterceptor, >, ) -> Self { Self { inner: node, liquidity_manager } diff --git a/lightning-liquidity/tests/lsps1_integration_tests.rs b/lightning-liquidity/tests/lsps1_integration_tests.rs index a177b338ad7..4f95db676f7 100644 --- a/lightning-liquidity/tests/lsps1_integration_tests.rs +++ b/lightning-liquidity/tests/lsps1_integration_tests.rs @@ -17,7 +17,10 @@ use lightning_liquidity::lsps1::msgs::{ }; use lightning_liquidity::lsps1::service::{LSPS1ServiceConfig, PaymentMethod}; use lightning_liquidity::utils::time::DefaultTimeProvider; -use lightning_liquidity::{LiquidityClientConfig, LiquidityManagerSync, LiquidityServiceConfig}; +use lightning_liquidity::{ + DummyOnionMessageInterceptor, LiquidityClientConfig, LiquidityManagerSync, + LiquidityServiceConfig, +}; use lightning::ln::functional_test_utils::{ create_chanmon_cfgs, create_node_cfgs, create_node_chanmgrs, @@ -434,6 +437,7 @@ fn lsps1_service_handler_persistence_across_restarts() { Some(service_config), None, Arc::clone(&time_provider), + DummyOnionMessageInterceptor, ) .unwrap(); @@ -454,6 +458,7 @@ fn lsps1_service_handler_persistence_across_restarts() { None, Some(client_config), time_provider, + DummyOnionMessageInterceptor, ) .unwrap(); @@ -1087,6 +1092,7 @@ fn lsps1_expired_orders_are_pruned_and_not_persisted() { Some(service_config), None, Arc::clone(&time_provider), + DummyOnionMessageInterceptor, ) .unwrap(); @@ -1106,6 +1112,7 @@ fn lsps1_expired_orders_are_pruned_and_not_persisted() { None, Some(client_config), time_provider, + DummyOnionMessageInterceptor, ) .unwrap(); diff --git a/lightning-liquidity/tests/lsps2_integration_tests.rs b/lightning-liquidity/tests/lsps2_integration_tests.rs index b8a4a5adebb..2ac8dd6bc4b 100644 --- a/lightning-liquidity/tests/lsps2_integration_tests.rs +++ b/lightning-liquidity/tests/lsps2_integration_tests.rs @@ -7,26 +7,40 @@ use common::{ get_lsps_message, LSPSNodes, LSPSNodesWithPayer, LiquidityNode, }; -use lightning::events::{ClosureReason, Event}; +use lightning::events::{ClosureReason, Event, EventsProvider}; use lightning::get_event_msg; use lightning::ln::channelmanager::{OptionalBolt11PaymentParams, PaymentId}; use lightning::ln::functional_test_utils::*; use lightning::ln::msgs::BaseMessageHandler; use lightning::ln::msgs::ChannelMessageHandler; use lightning::ln::msgs::MessageSendEvent; +use lightning::ln::msgs::OnionMessageHandler; use lightning::ln::types::ChannelId; +use lightning::offers::invoice_request::InvoiceRequestFields; +use lightning::offers::offer::OfferId; +use lightning::routing::router::{InFlightHtlcs, Route, RouteParameters, Router}; +use lightning::sign::{RandomBytes, ReceiveAuthKey}; +use lightning::onion_message::messenger::NullMessageRouter; use lightning_liquidity::events::LiquidityEvent; use lightning_liquidity::lsps0::ser::LSPSDateTime; use lightning_liquidity::lsps2::client::LSPS2ClientConfig; use lightning_liquidity::lsps2::event::LSPS2ClientEvent; use lightning_liquidity::lsps2::event::LSPS2ServiceEvent; use lightning_liquidity::lsps2::msgs::LSPS2RawOpeningFeeParams; +use lightning_liquidity::lsps2::router::{LSPS2BOLT12Router, LSPS2Bolt12InvoiceParameters}; use lightning_liquidity::lsps2::service::LSPS2ServiceConfig; use lightning_liquidity::lsps2::utils::is_valid_opening_fee_params; use lightning_liquidity::utils::time::{DefaultTimeProvider, TimeProvider}; -use lightning_liquidity::{LiquidityClientConfig, LiquidityManagerSync, LiquidityServiceConfig}; +use lightning_liquidity::{ + DummyOnionMessageInterceptor, LiquidityClientConfig, LiquidityManagerSync, + LiquidityServiceConfig, +}; +use lightning::blinded_path::payment::{ + Bolt12OfferContext, PaymentConstraints, PaymentContext, ReceiveTlvs, +}; +use lightning::blinded_path::NodeIdLookUp; use lightning::ln::channelmanager::{InterceptId, MIN_FINAL_CLTV_EXPIRY_DELTA}; use lightning::ln::functional_test_utils::{ create_chanmon_cfgs, create_node_cfgs, create_node_chanmgrs, @@ -56,6 +70,46 @@ use std::time::Duration; const MAX_PENDING_REQUESTS_PER_PEER: usize = 10; const MAX_TOTAL_PENDING_REQUESTS: usize = 1000; +struct RecordingLookup { + next_node_id: PublicKey, + short_channel_id: std::sync::Mutex>, +} + +impl NodeIdLookUp for RecordingLookup { + fn next_node_id(&self, short_channel_id: u64) -> Option { + *self.short_channel_id.lock().unwrap() = Some(short_channel_id); + Some(self.next_node_id) + } +} + +struct FailingRouter; + +impl FailingRouter { + fn new() -> Self { + Self + } +} + +impl Router for FailingRouter { + fn find_route( + &self, _payer: &PublicKey, _route_params: &RouteParameters, + _first_hops: Option<&[&lightning::ln::channel_state::ChannelDetails]>, + _inflight_htlcs: InFlightHtlcs, + ) -> Result { + Err("failing test router") + } + + fn create_blinded_payment_paths< + T: bitcoin::secp256k1::Signing + bitcoin::secp256k1::Verification, + >( + &self, _recipient: PublicKey, _local_node_receive_key: ReceiveAuthKey, + _first_hops: Vec, _tlvs: ReceiveTlvs, + _amount_msats: Option, _secp_ctx: &Secp256k1, + ) -> Result, ()> { + Err(()) + } +} + fn build_lsps2_configs() -> ([u8; 32], LiquidityServiceConfig, LiquidityClientConfig) { let promise_secret = [42; 32]; let lsps2_service_config = LSPS2ServiceConfig { promise_secret }; @@ -556,6 +610,20 @@ fn channel_open_abandoned() { // Call channel_open_abandoned service_handler.channel_open_abandoned(&client_node_id, user_channel_id).unwrap(); + let result = service_handler.htlc_intercepted( + intercept_scid, + InterceptId([3; 32]), + payment_size_msat.unwrap(), + PaymentHash([4; 32]), + ); + assert!(result.is_err()); + match result.unwrap_err() { + APIError::APIMisuseError { err } => { + assert!(err.contains("Unknown scid provided")); + }, + other => panic!("Unexpected error type: {:?}", other), + } + // Verify the channel is gone by trying to abandon it again, which should fail let result = service_handler.channel_open_abandoned(&client_node_id, user_channel_id); assert!(result.is_err()); @@ -1079,6 +1147,7 @@ fn lsps2_service_handler_persistence_across_restarts() { Some(service_config), None, time_provider, + DummyOnionMessageInterceptor, ) .unwrap(); @@ -1476,6 +1545,556 @@ fn execute_lsps2_dance( } } +#[test] +fn bolt12_custom_router_uses_lsps2_intercept_scid() { + let chanmon_cfgs = create_chanmon_cfgs(3); + let node_cfgs = create_node_cfgs(3, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]); + let nodes = create_network(3, &node_cfgs, &node_chanmgrs); + let (lsps_nodes, promise_secret) = setup_test_lsps2_nodes_with_payer(nodes); + + let service_node_id = lsps_nodes.service_node.inner.node.get_our_node_id(); + let client_node_id = lsps_nodes.client_node.inner.node.get_our_node_id(); + + let intercept_scid = lsps_nodes.service_node.node.get_intercept_scid(); + let cltv_expiry_delta = 72; + + execute_lsps2_dance( + &lsps_nodes, + intercept_scid, + 42, + cltv_expiry_delta, + promise_secret, + Some(250_000), + 1_000, + ); + + let inner_router = FailingRouter::new(); + let router = LSPS2BOLT12Router::new( + inner_router, + NullMessageRouter {}, + lsps_nodes.client_node.keys_manager, + ); + let offer_id = OfferId([42; 32]); + + router.register_offer( + offer_id, + LSPS2Bolt12InvoiceParameters { + counterparty_node_id: service_node_id, + intercept_scid, + cltv_expiry_delta, + }, + ); + + let tlvs = ReceiveTlvs { + payment_secret: lightning_types::payment::PaymentSecret([7; 32]), + payment_constraints: PaymentConstraints { max_cltv_expiry: 50, htlc_minimum_msat: 1 }, + payment_context: PaymentContext::Bolt12Offer(Bolt12OfferContext { + offer_id, + invoice_request: InvoiceRequestFields { + payer_signing_pubkey: lsps_nodes.payer_node.node.get_our_node_id(), + quantity: None, + payer_note_truncated: None, + human_readable_name: None, + }, + }), + }; + + let secp_ctx = Secp256k1::new(); + let mut paths = router + .create_blinded_payment_paths( + client_node_id, + ReceiveAuthKey([3; 32]), + Vec::new(), + tlvs, + Some(100_000), + &secp_ctx, + ) + .unwrap(); + + assert_eq!(paths.len(), 1); + let mut path = paths.pop().unwrap(); + assert_eq!( + path.introduction_node(), + &lightning::blinded_path::IntroductionNode::NodeId(service_node_id) + ); + assert_eq!(path.payinfo.fee_base_msat, 0); + assert_eq!(path.payinfo.fee_proportional_millionths, 0); + + let lookup = RecordingLookup { + next_node_id: client_node_id, + short_channel_id: std::sync::Mutex::new(None), + }; + path.advance_path_by_one(lsps_nodes.service_node.keys_manager, &lookup, &secp_ctx).unwrap(); + assert_eq!(*lookup.short_channel_id.lock().unwrap(), Some(intercept_scid)); +} + +#[test] +fn bolt12_lsps2_end_to_end_test() { + // End-to-end test of the BOLT12 + LSPS2 JIT channel flow. Three nodes: payer, service, client. + // client_trusts_lsp=true; funding transaction broadcast happens after client claims the HTLC. + let chanmon_cfgs = create_chanmon_cfgs(3); + let node_cfgs = create_node_cfgs(3, &chanmon_cfgs); + + let mut service_node_config = test_default_channel_config(); + service_node_config.htlc_interception_flags = HTLCInterceptionFlags::ToInterceptSCIDs as u8; + + let mut client_node_config = test_default_channel_config(); + client_node_config.accept_inbound_channels = true; + client_node_config.channel_config.accept_underpaying_htlcs = true; + + let node_chanmgrs = create_node_chanmgrs( + 3, + &node_cfgs, + &[Some(service_node_config), Some(client_node_config), None], + ); + let nodes = create_network(3, &node_cfgs, &node_chanmgrs); + let (lsps_nodes, promise_secret) = setup_test_lsps2_nodes_with_payer(nodes); + let LSPSNodesWithPayer { ref service_node, ref client_node, ref payer_node } = lsps_nodes; + + let payer_node_id = payer_node.node.get_our_node_id(); + let service_node_id = service_node.inner.node.get_our_node_id(); + let client_node_id = client_node.inner.node.get_our_node_id(); + + let service_handler = service_node.liquidity_manager.lsps2_service_handler().unwrap(); + + create_chan_between_nodes_with_value(&payer_node, &service_node.inner, 2_000_000, 100_000); + + let intercept_scid = service_node.node.get_intercept_scid(); + let user_channel_id = 42; + let cltv_expiry_delta: u32 = 144; + let payment_size_msat = Some(1_000_000); + let fee_base_msat = 1_000; + + execute_lsps2_dance( + &lsps_nodes, + intercept_scid, + user_channel_id, + cltv_expiry_delta, + promise_secret, + payment_size_msat, + fee_base_msat, + ); + + // Disconnect payer from client to ensure deterministic onion message routing through service. + payer_node.node.peer_disconnected(client_node_id); + client_node.node.peer_disconnected(payer_node_id); + payer_node.onion_messenger.peer_disconnected(client_node_id); + client_node.onion_messenger.peer_disconnected(payer_node_id); + + #[cfg(c_bindings)] + let offer = { + let mut offer_builder = client_node.node.create_offer_builder().unwrap(); + offer_builder.amount_msats(payment_size_msat.unwrap()); + offer_builder.build().unwrap() + }; + #[cfg(not(c_bindings))] + let offer = client_node + .node + .create_offer_builder() + .unwrap() + .amount_msats(payment_size_msat.unwrap()) + .build() + .unwrap(); + + let lsps2_router = Arc::new(LSPS2BOLT12Router::new( + FailingRouter::new(), + NullMessageRouter {}, + Arc::new(RandomBytes::new([43; 32])), + )); + lsps2_router.register_offer( + offer.id(), + LSPS2Bolt12InvoiceParameters { + counterparty_node_id: service_node_id, + intercept_scid, + cltv_expiry_delta, + }, + ); + + let lsps2_router = Arc::clone(&lsps2_router); + *client_node.router.override_create_blinded_payment_paths.lock().unwrap() = + Some(Box::new(move |recipient, local_node_receive_key, first_hops, tlvs, amount_msats| { + let secp_ctx = Secp256k1::new(); + lsps2_router.create_blinded_payment_paths( + recipient, + local_node_receive_key, + first_hops, + tlvs, + amount_msats, + &secp_ctx, + ) + })); + + let payment_id = PaymentId([1; 32]); + payer_node.node.pay_for_offer(&offer, None, payment_id, Default::default()).unwrap(); + + let onion_msg = payer_node + .onion_messenger + .next_onion_message_for_peer(service_node_id) + .expect("Payer should send InvoiceRequest toward service"); + service_node.onion_messenger.handle_onion_message(payer_node_id, &onion_msg); + + let fwd_msg = service_node + .onion_messenger + .next_onion_message_for_peer(client_node_id) + .expect("Service should forward InvoiceRequest to client"); + client_node.onion_messenger.handle_onion_message(service_node_id, &fwd_msg); + + let onion_msg = client_node + .onion_messenger + .next_onion_message_for_peer(service_node_id) + .expect("Client should send Invoice toward service"); + service_node.onion_messenger.handle_onion_message(client_node_id, &onion_msg); + + let fwd_msg = service_node + .onion_messenger + .next_onion_message_for_peer(payer_node_id) + .expect("Service should forward Invoice to payer"); + payer_node.onion_messenger.handle_onion_message(service_node_id, &fwd_msg); + + check_added_monitors(&payer_node, 1); + let events = payer_node.node.get_and_clear_pending_msg_events(); + assert_eq!(events.len(), 1); + let ev = SendEvent::from_event(events[0].clone()); + + service_node.inner.node.handle_update_add_htlc(payer_node_id, &ev.msgs[0]); + do_commitment_signed_dance(&service_node.inner, &payer_node, &ev.commitment_msg, false, true); + service_node.inner.node.process_pending_htlc_forwards(); + + let events = service_node.inner.node.get_and_clear_pending_events(); + assert_eq!(events.len(), 1); + let (payment_hash, expected_outbound_amount_msat) = match &events[0] { + Event::HTLCIntercepted { + intercept_id, + requested_next_hop_scid, + payment_hash, + expected_outbound_amount_msat, + .. + } => { + assert_eq!(*requested_next_hop_scid, intercept_scid); + + service_handler + .htlc_intercepted( + *requested_next_hop_scid, + *intercept_id, + *expected_outbound_amount_msat, + *payment_hash, + ) + .unwrap(); + (*payment_hash, expected_outbound_amount_msat) + }, + other => panic!("Expected HTLCIntercepted event, got: {:?}", other), + }; + + let open_channel_event = service_node.liquidity_manager.next_event().unwrap(); + + match open_channel_event { + LiquidityEvent::LSPS2Service(LSPS2ServiceEvent::OpenChannel { + their_network_key, + amt_to_forward_msat, + opening_fee_msat, + user_channel_id: uc_id, + intercept_scid: iscd, + }) => { + assert_eq!(their_network_key, client_node_id); + assert_eq!(amt_to_forward_msat, payment_size_msat.unwrap() - fee_base_msat); + assert_eq!(opening_fee_msat, fee_base_msat); + assert_eq!(uc_id, user_channel_id); + assert_eq!(iscd, intercept_scid); + }, + other => panic!("Expected OpenChannel event, got: {:?}", other), + }; + + let result = + service_handler.channel_needs_manual_broadcast(user_channel_id, &client_node_id).unwrap(); + assert!(result, "Channel should require manual broadcast"); + + let (channel_id, funding_tx) = create_channel_with_manual_broadcast( + &service_node_id, + &client_node_id, + &service_node, + &client_node, + user_channel_id, + expected_outbound_amount_msat, + true, + ); + + service_handler.channel_ready(user_channel_id, &channel_id, &client_node_id).unwrap(); + + service_node.inner.node.process_pending_htlc_forwards(); + + let pay_event = { + { + let mut added_monitors = + service_node.inner.chain_monitor.added_monitors.lock().unwrap(); + assert_eq!(added_monitors.len(), 1); + added_monitors.clear(); + } + let mut events = service_node.inner.node.get_and_clear_pending_msg_events(); + assert_eq!(events.len(), 1); + SendEvent::from_event(events.remove(0)) + }; + + client_node.inner.node.handle_update_add_htlc(service_node_id, &pay_event.msgs[0]); + do_commitment_signed_dance( + &client_node.inner, + &service_node.inner, + &pay_event.commitment_msg, + false, + true, + ); + client_node.inner.node.process_pending_htlc_forwards(); + + let client_events = client_node.inner.node.get_and_clear_pending_events(); + assert_eq!(client_events.len(), 1); + let preimage = match &client_events[0] { + Event::PaymentClaimable { payment_hash: ph, purpose, .. } => { + assert_eq!(*ph, payment_hash); + purpose.preimage() + }, + other => panic!("Expected PaymentClaimable event on client, got: {:?}", other), + }; + + let broadcasted = service_node.inner.tx_broadcaster.txn_broadcasted.lock().unwrap(); + assert!(broadcasted.is_empty(), "There should be no broadcasted txs yet"); + drop(broadcasted); + + client_node.inner.node.claim_funds(preimage.unwrap()); + + claim_and_assert_forwarded_only( + &payer_node, + &service_node.inner, + &client_node.inner, + preimage.unwrap(), + ); + + let service_events = service_node.node.get_and_clear_pending_events(); + assert_eq!(service_events.len(), 1); + + let total_fee_msat = match service_events[0].clone() { + Event::PaymentForwarded { + prev_htlcs, + next_htlcs, + skimmed_fee_msat, + total_fee_earned_msat, + .. + } => { + assert_eq!(prev_htlcs[0].node_id, Some(payer_node_id)); + assert_eq!(next_htlcs[0].node_id, Some(client_node_id)); + service_handler.payment_forwarded(channel_id, skimmed_fee_msat.unwrap_or(0)).unwrap(); + Some(total_fee_earned_msat.unwrap() - skimmed_fee_msat.unwrap()) + }, + _ => panic!("Expected PaymentForwarded event, got: {:?}", service_events[0]), + }; + + let broadcasted = service_node.inner.tx_broadcaster.txn_broadcasted.lock().unwrap(); + assert!(broadcasted.iter().any(|b| b.compute_txid() == funding_tx.compute_txid())); + + expect_payment_sent(&payer_node, preimage.unwrap(), Some(total_fee_msat), true, true); +} + +#[test] +fn bolt12_lsps2_compact_message_path_test() { + // Tests that LSPS2 BOLT12 offers work with compact SCID-based message blinded paths. + // The client's offer uses an intercept SCID instead of the full pubkey for the next hop + // in the message blinded path. When the service node receives a forwarded InvoiceRequest + // with the unresolvable intercept SCID, it emits OnionMessageIntercepted instead of + // dropping the message. The test then forwards the message to the connected client. + let chanmon_cfgs = create_chanmon_cfgs(3); + let node_cfgs = create_node_cfgs(3, &chanmon_cfgs); + + let mut service_node_config = test_default_channel_config(); + service_node_config.htlc_interception_flags = HTLCInterceptionFlags::ToInterceptSCIDs as u8; + + let mut client_node_config = test_default_channel_config(); + client_node_config.accept_inbound_channels = true; + client_node_config.channel_config.accept_underpaying_htlcs = true; + + let node_chanmgrs = create_node_chanmgrs( + 3, + &node_cfgs, + &[Some(service_node_config), Some(client_node_config), None], + ); + let nodes = create_network(3, &node_cfgs, &node_chanmgrs); + let (lsps_nodes, promise_secret) = setup_test_lsps2_nodes_with_payer(nodes); + let LSPSNodesWithPayer { ref service_node, ref client_node, ref payer_node } = lsps_nodes; + + let payer_node_id = payer_node.node.get_our_node_id(); + let service_node_id = service_node.inner.node.get_our_node_id(); + let client_node_id = client_node.inner.node.get_our_node_id(); + + create_chan_between_nodes_with_value(&payer_node, &service_node.inner, 2_000_000, 100_000); + + let intercept_scid = service_node.node.get_intercept_scid(); + let user_channel_id = 42; + let cltv_expiry_delta: u32 = 144; + let payment_size_msat = Some(1_000_000); + let fee_base_msat = 1_000; + + execute_lsps2_dance( + &lsps_nodes, + intercept_scid, + user_channel_id, + cltv_expiry_delta, + promise_secret, + payment_size_msat, + fee_base_msat, + ); + + // Register the intercept SCID for onion message interception on the service node. + // This enables the service to intercept forwarded messages addressed by SCID rather than + // dropping them when NodeIdLookUp can't resolve the fake intercept SCID. + service_node.onion_messenger.register_scid_for_interception(intercept_scid, client_node_id); + + // Configure the client's message router to use compact SCID encoding for message + // blinded paths through the service node. + client_node.message_router.peers_override.lock().unwrap().push(service_node_id); + client_node + .message_router + .forward_node_scid_override + .lock() + .unwrap() + .insert(service_node_id, intercept_scid); + + // Disconnect payer from client so messages route through service. + payer_node.node.peer_disconnected(client_node_id); + client_node.node.peer_disconnected(payer_node_id); + payer_node.onion_messenger.peer_disconnected(client_node_id); + client_node.onion_messenger.peer_disconnected(payer_node_id); + + // Disconnect service from client so the service must intercept the compact SCID-based + // InvoiceRequest instead of forwarding it immediately after resolving the registered SCID. + service_node.node.peer_disconnected(client_node_id); + client_node.node.peer_disconnected(service_node_id); + service_node.onion_messenger.peer_disconnected(client_node_id); + client_node.onion_messenger.peer_disconnected(service_node_id); + + #[cfg(c_bindings)] + let offer = { + let mut offer_builder = client_node.node.create_offer_builder().unwrap(); + offer_builder.amount_msats(payment_size_msat.unwrap()); + offer_builder.build().unwrap() + }; + #[cfg(not(c_bindings))] + let offer = client_node + .node + .create_offer_builder() + .unwrap() + .amount_msats(payment_size_msat.unwrap()) + .build() + .unwrap(); + + let lsps2_router = Arc::new(LSPS2BOLT12Router::new( + FailingRouter::new(), + NullMessageRouter {}, + Arc::new(RandomBytes::new([43; 32])), + )); + lsps2_router.register_offer( + offer.id(), + LSPS2Bolt12InvoiceParameters { + counterparty_node_id: service_node_id, + intercept_scid, + cltv_expiry_delta, + }, + ); + + let lsps2_router = Arc::clone(&lsps2_router); + *client_node.router.override_create_blinded_payment_paths.lock().unwrap() = + Some(Box::new(move |recipient, local_node_receive_key, first_hops, tlvs, amount_msats| { + let secp_ctx = Secp256k1::new(); + lsps2_router.create_blinded_payment_paths( + recipient, + local_node_receive_key, + first_hops, + tlvs, + amount_msats, + &secp_ctx, + ) + })); + + // Payer sends InvoiceRequest toward the service node. + let payment_id = PaymentId([1; 32]); + payer_node.node.pay_for_offer(&offer, None, payment_id, Default::default()).unwrap(); + + let onion_msg = payer_node + .onion_messenger + .next_onion_message_for_peer(service_node_id) + .expect("Payer should send InvoiceRequest toward service"); + service_node.onion_messenger.handle_onion_message(payer_node_id, &onion_msg); + + // The service node can't resolve the intercept SCID via NodeIdLookUp (no real channel), + // so the message is intercepted via SCID-based interception. + // It should NOT be available as a normal forwarded message. + assert!( + service_node.onion_messenger.next_onion_message_for_peer(client_node_id).is_none(), + "Message should be intercepted, not forwarded directly" + ); + + // Process the OnionMessageIntercepted event and forward the message. + let events = core::cell::RefCell::new(Vec::new()); + service_node.onion_messenger.process_pending_events(&|e| Ok(events.borrow_mut().push(e))); + let events = events.into_inner(); + + let intercepted_msg = events + .into_iter() + .find_map(|e| match e { + Event::OnionMessageIntercepted { peer_node_id, message } => { + assert_eq!(peer_node_id, client_node_id); + Some(message) + }, + _ => None, + }) + .expect("Service should emit OnionMessageIntercepted for SCID-based forward"); + + // Reconnect the service and client, then forward the intercepted message. + reconnect_nodes(ReconnectArgs::new(&service_node.inner, &client_node.inner)); + + // Forward the intercepted message to the reconnected client. + service_node + .onion_messenger + .forward_onion_message(intercepted_msg, &client_node_id) + .expect("Should succeed since client reconnected"); + + let fwd_msg = service_node + .onion_messenger + .next_onion_message_for_peer(client_node_id) + .expect("Service should have forwarded message to client"); + client_node.onion_messenger.handle_onion_message(service_node_id, &fwd_msg); + + // Client should respond with an Invoice back through the service to the payer. + let onion_msg = client_node + .onion_messenger + .next_onion_message_for_peer(service_node_id) + .expect("Client should send Invoice toward service"); + service_node.onion_messenger.handle_onion_message(client_node_id, &onion_msg); + + let fwd_msg = service_node + .onion_messenger + .next_onion_message_for_peer(payer_node_id) + .expect("Service should forward Invoice to payer"); + payer_node.onion_messenger.handle_onion_message(service_node_id, &fwd_msg); + + // Payer should have queued an HTLC payment. + check_added_monitors(&payer_node, 1); + let events = payer_node.node.get_and_clear_pending_msg_events(); + assert_eq!(events.len(), 1); + let ev = SendEvent::from_event(events[0].clone()); + + // Verify the payment gets intercepted at the service node on the intercept SCID. + service_node.inner.node.handle_update_add_htlc(payer_node_id, &ev.msgs[0]); + do_commitment_signed_dance(&service_node.inner, &payer_node, &ev.commitment_msg, false, true); + service_node.inner.node.process_pending_htlc_forwards(); + + let events = service_node.inner.node.get_and_clear_pending_events(); + assert_eq!(events.len(), 1); + match &events[0] { + Event::HTLCIntercepted { requested_next_hop_scid, .. } => { + assert_eq!(*requested_next_hop_scid, intercept_scid); + }, + other => panic!("Expected HTLCIntercepted event, got: {:?}", other), + }; +} + fn create_channel_with_manual_broadcast( service_node_id: &PublicKey, client_node_id: &PublicKey, service_node: &LiquidityNode, client_node: &LiquidityNode, user_channel_id: u128, expected_outbound_amount_msat: &u64, diff --git a/lightning-liquidity/tests/lsps5_integration_tests.rs b/lightning-liquidity/tests/lsps5_integration_tests.rs index 2b32b4dcbc6..4989cd72b96 100644 --- a/lightning-liquidity/tests/lsps5_integration_tests.rs +++ b/lightning-liquidity/tests/lsps5_integration_tests.rs @@ -38,7 +38,9 @@ use lightning_liquidity::lsps5::service::{ use lightning_liquidity::lsps5::validator::{LSPS5Validator, MAX_RECENT_SIGNATURES}; use lightning_liquidity::utils::time::{DefaultTimeProvider, TimeProvider}; use lightning_liquidity::LiquidityManagerSync; -use lightning_liquidity::{LiquidityClientConfig, LiquidityServiceConfig}; +use lightning_liquidity::{ + DummyOnionMessageInterceptor, LiquidityClientConfig, LiquidityServiceConfig, +}; use lightning_types::payment::PaymentHash; @@ -1604,6 +1606,7 @@ fn lsps5_service_handler_persistence_across_restarts() { Some(service_config), None, Arc::clone(&time_provider), + DummyOnionMessageInterceptor, ) .unwrap(); diff --git a/lightning/src/onion_message/functional_tests.rs b/lightning/src/onion_message/functional_tests.rs index 75e2aaf3c5f..26e1c5b102f 100644 --- a/lightning/src/onion_message/functional_tests.rs +++ b/lightning/src/onion_message/functional_tests.rs @@ -1173,6 +1173,58 @@ fn intercept_offline_peer_oms() { pass_along_path(&vec![nodes.remove(1), final_node_vec.remove(0)]); } +#[test] +fn intercept_offline_peer_oms_registered_by_scid() { + let mut nodes = create_nodes(3); + let fake_scid = 42; + + nodes[1].messenger.register_scid_for_interception(fake_scid, nodes[2].node_id); + + let message = TestCustomMessage::Pong; + let intermediate_nodes = + [MessageForwardNode { node_id: nodes[1].node_id, short_channel_id: Some(fake_scid) }]; + let blinded_path = BlindedMessagePath::new( + &intermediate_nodes, + nodes[2].node_id, + nodes[2].messenger.node_signer.get_receive_auth_key(), + MessageContext::Custom(Vec::new()), + false, + &*nodes[2].entropy_source, + &Secp256k1::new(), + ); + let destination = Destination::BlindedPath(blinded_path); + let instructions = MessageSendInstructions::WithoutReplyPath { destination }; + + disconnect_peers(&nodes[1], &nodes[2]); + nodes[0].messenger.send_onion_message(message, instructions).unwrap(); + let mut final_node_vec = nodes.split_off(2); + pass_along_path(&nodes); + + let mut events = release_events(&nodes[1]); + assert_eq!(events.len(), 1); + let onion_message = match events.remove(0) { + Event::OnionMessageIntercepted { peer_node_id, message } => { + assert_eq!(peer_node_id, final_node_vec[0].node_id); + message + }, + _ => panic!(), + }; + + connect_peers(&nodes[1], &final_node_vec[0]); + let peer_conn_ev = release_events(&nodes[1]); + assert_eq!(peer_conn_ev.len(), 1); + match peer_conn_ev[0] { + Event::OnionMessagePeerConnected { peer_node_id } => { + assert_eq!(peer_node_id, final_node_vec[0].node_id); + }, + _ => panic!(), + } + + nodes[1].messenger.forward_onion_message(onion_message, &final_node_vec[0].node_id).unwrap(); + final_node_vec[0].custom_message_handler.expect_message(TestCustomMessage::Pong); + pass_along_path(&vec![nodes.remove(1), final_node_vec.remove(0)]); +} + #[test] fn spec_test_vector() { let node_cfgs = [ diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index f94eb7877f5..be454211e5d 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -125,6 +125,62 @@ impl< } } +/// A trait for registering peers and SCIDs for onion message interception. +/// +/// When a peer is registered for interception and is currently offline, any onion messages +/// intended to be forwarded to them will generate an [`Event::OnionMessageIntercepted`] instead +/// of being dropped. When a registered peer connects, an [`Event::OnionMessagePeerConnected`] +/// will be generated. +/// +/// Additionally, SCIDs (short channel IDs) can be registered for interception. When an onion +/// message is forwarded with a [`NextMessageHop::ShortChannelId`] that cannot be resolved via +/// [`NodeIdLookUp`] but is registered here, an [`Event::OnionMessageIntercepted`] will be +/// generated using the associated peer's node ID. This enables compact SCID-based encoding in +/// blinded message paths for scenarios like LSPS2 JIT channels where the SCID is a fake +/// intercept SCID that does not correspond to a real channel. +/// +/// [`OnionMessenger`] implements this trait, but it is also useful as a trait object to allow +/// external components (e.g., an LSPS2 service) to register peers for interception without +/// needing to know the concrete [`OnionMessenger`] type. +/// +/// [`NextMessageHop::ShortChannelId`]: crate::blinded_path::message::NextMessageHop::ShortChannelId +/// [`Event::OnionMessageIntercepted`]: crate::events::Event::OnionMessageIntercepted +/// [`Event::OnionMessagePeerConnected`]: crate::events::Event::OnionMessagePeerConnected +pub trait OnionMessageInterceptor { + /// Registers a short channel ID for onion message interception. + /// + /// See [`OnionMessenger::register_scid_for_interception`] for more details. + fn register_scid_for_interception(&self, scid: u64, peer_node_id: PublicKey); + + /// Deregisters a short channel ID from onion message interception. + /// + /// See [`OnionMessenger::deregister_scid_for_interception`] for more details. + /// + /// Returns whether the SCID was previously registered. + fn deregister_scid_for_interception(&self, scid: u64) -> bool; +} + +impl< + ES: EntropySource, + NS: NodeSigner, + L: Logger, + NL: NodeIdLookUp, + MR: MessageRouter, + OMH: OffersMessageHandler, + APH: AsyncPaymentsMessageHandler, + DRH: DNSResolverMessageHandler, + CMH: CustomOnionMessageHandler, + > OnionMessageInterceptor for OnionMessenger +{ + fn register_scid_for_interception(&self, scid: u64, peer_node_id: PublicKey) { + OnionMessenger::register_scid_for_interception(self, scid, peer_node_id) + } + + fn deregister_scid_for_interception(&self, scid: u64) -> bool { + OnionMessenger::deregister_scid_for_interception(self, scid) + } +} + /// A sender, receiver and forwarder of [`OnionMessage`]s. /// /// # Handling Messages @@ -273,6 +329,7 @@ pub struct OnionMessenger< dns_resolver_handler: DRH, custom_handler: CMH, intercept_messages_for_offline_peers: bool, + scids_registered_for_interception: Mutex>, pending_intercepted_msgs_events: Mutex>, pending_peer_connected_events: Mutex>, pending_events_processor: AtomicBool, @@ -1453,6 +1510,7 @@ impl< dns_resolver_handler: dns_resolver, custom_handler, intercept_messages_for_offline_peers, + scids_registered_for_interception: Mutex::new(new_hash_map()), pending_intercepted_msgs_events: Mutex::new(Vec::new()), pending_peer_connected_events: Mutex::new(Vec::new()), pending_events_processor: AtomicBool::new(false), @@ -1470,6 +1528,34 @@ impl< self.async_payments_handler = async_payments_handler; } + /// Registers a short channel ID for onion message interception, associating it with + /// `peer_node_id`. + /// + /// When an onion message is forwarded with a [`NextMessageHop::ShortChannelId`] that cannot + /// be resolved via [`NodeIdLookUp`] but matches a registered SCID, an + /// [`Event::OnionMessageIntercepted`] will be generated using the associated `peer_node_id`. + /// + /// This is useful for services like LSPS2 where fake intercept SCIDs are used in compact + /// blinded message paths. The SCID does not correspond to a real channel, so + /// [`NodeIdLookUp`] cannot resolve it, but the message should still be intercepted rather + /// than dropped. + /// + /// Use [`Self::deregister_scid_for_interception`] to stop intercepting messages for this + /// SCID. + /// + /// [`NextMessageHop::ShortChannelId`]: crate::blinded_path::message::NextMessageHop::ShortChannelId + /// [`Event::OnionMessageIntercepted`]: crate::events::Event::OnionMessageIntercepted + pub fn register_scid_for_interception(&self, scid: u64, peer_node_id: PublicKey) { + self.scids_registered_for_interception.lock().unwrap().insert(scid, peer_node_id); + } + + /// Deregisters a short channel ID from onion message interception. + /// + /// Returns whether the SCID was previously registered. + pub fn deregister_scid_for_interception(&self, scid: u64) -> bool { + self.scids_registered_for_interception.lock().unwrap().remove(&scid).is_some() + } + /// Sends an [`OnionMessage`] based on its [`MessageSendInstructions`]. pub fn send_onion_message( &self, contents: T, instructions: MessageSendInstructions, @@ -1659,15 +1745,32 @@ impl< fn enqueue_forwarded_onion_message( &self, next_hop: NextMessageHop, onion_message: OnionMessage, log_suffix: fmt::Arguments, ) -> Result<(), SendError> { - let next_node_id = match next_hop { - NextMessageHop::NodeId(pubkey) => pubkey, - NextMessageHop::ShortChannelId(scid) => match self.node_id_lookup.next_node_id(scid) { - Some(pubkey) => pubkey, - None => { - log_trace!(self.logger, "Dropping forwarded onion messager: unable to resolve next hop using SCID {} {}", scid, log_suffix); - return Err(SendError::GetNodeIdFailed); + let (next_node_id, is_registered_for_interception) = { + let scids_registered_for_interception = + self.scids_registered_for_interception.lock().unwrap(); + match next_hop { + NextMessageHop::NodeId(pubkey) => { + let is_registered = + scids_registered_for_interception.values().any(|nid| *nid == pubkey); + (pubkey, is_registered) }, - }, + NextMessageHop::ShortChannelId(scid) => { + match self.node_id_lookup.next_node_id(scid) { + Some(pubkey) => (pubkey, false), + None => { + // The SCID is unknown to NodeIdLookUp (not a real channel). Check + // if it's registered for SCID-based interception before dropping. + match scids_registered_for_interception.get(&scid).copied() { + Some(peer_node_id) => (peer_node_id, true), + None => { + log_trace!(self.logger, "Dropping forwarded onion message: unable to resolve next hop using SCID {} {}", scid, log_suffix); + return Err(SendError::GetNodeIdFailed); + }, + } + }, + } + }, + } }; let mut message_recipients = self.message_recipients.lock().unwrap(); @@ -1686,6 +1789,9 @@ impl< .entry(next_node_id) .or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new())); + let should_intercept = + self.intercept_messages_for_offline_peers || is_registered_for_interception; + match message_recipients.entry(next_node_id) { hash_map::Entry::Occupied(mut e) if matches!(e.get(), OnionMessageRecipient::ConnectedPeer(..)) => @@ -1699,7 +1805,7 @@ impl< ); Ok(()) }, - _ if self.intercept_messages_for_offline_peers => { + _ if should_intercept => { log_trace!( self.logger, "Generating OnionMessageIntercepted event for peer {} {}", @@ -2142,7 +2248,13 @@ impl< .or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new())) .mark_connected(); } - if self.intercept_messages_for_offline_peers { + let is_registered_for_interception = self + .scids_registered_for_interception + .lock() + .unwrap() + .values() + .any(|nid| *nid == their_node_id); + if self.intercept_messages_for_offline_peers || is_registered_for_interception { let mut pending_peer_connected_events = self.pending_peer_connected_events.lock().unwrap(); pending_peer_connected_events diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index abcc24adf8d..66e9dce0695 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -165,6 +165,23 @@ impl chaininterface::FeeEstimator for TestFeeEstimator { } } +/// Override closure type for [`TestRouter::override_create_blinded_payment_paths`]. +/// +/// This closure is called instead of the default [`Router::create_blinded_payment_paths`] +/// implementation when set, receiving the actual [`ReceiveTlvs`] so tests can construct custom +/// blinded payment paths using the same TLVs the caller generated. +pub type BlindedPaymentPathOverrideFn = Box< + dyn Fn( + PublicKey, + ReceiveAuthKey, + Vec, + ReceiveTlvs, + Option, + ) -> Result, ()> + + Send + + Sync, +>; + pub struct TestRouter<'a> { pub router: DefaultRouter< Arc>, @@ -177,6 +194,7 @@ pub struct TestRouter<'a> { pub network_graph: Arc>, pub next_routes: Mutex>)>>, pub next_blinded_payment_paths: Mutex>, + pub override_create_blinded_payment_paths: Mutex>, pub scorer: &'a RwLock, } @@ -188,6 +206,7 @@ impl<'a> TestRouter<'a> { let entropy_source = Arc::new(RandomBytes::new([42; 32])); let next_routes = Mutex::new(VecDeque::new()); let next_blinded_payment_paths = Mutex::new(Vec::new()); + let override_create_blinded_payment_paths = Mutex::new(None); Self { router: DefaultRouter::new( Arc::clone(&network_graph), @@ -199,6 +218,7 @@ impl<'a> TestRouter<'a> { network_graph, next_routes, next_blinded_payment_paths, + override_create_blinded_payment_paths, scorer, } } @@ -321,6 +341,12 @@ impl<'a> Router for TestRouter<'a> { first_hops: Vec, tlvs: ReceiveTlvs, amount_msats: Option, secp_ctx: &Secp256k1, ) -> Result, ()> { + if let Some(override_fn) = + self.override_create_blinded_payment_paths.lock().unwrap().as_ref() + { + return override_fn(recipient, local_node_receive_key, first_hops, tlvs, amount_msats); + } + let mut expected_paths = self.next_blinded_payment_paths.lock().unwrap(); if expected_paths.is_empty() { self.router.create_blinded_payment_paths( @@ -366,6 +392,7 @@ pub enum TestMessageRouterInternal<'a> { pub struct TestMessageRouter<'a> { pub inner: TestMessageRouterInternal<'a>, pub peers_override: Mutex>, + pub forward_node_scid_override: Mutex>, } impl<'a> TestMessageRouter<'a> { @@ -378,6 +405,7 @@ impl<'a> TestMessageRouter<'a> { entropy_source, )), peers_override: Mutex::new(Vec::new()), + forward_node_scid_override: Mutex::new(new_hash_map()), } } @@ -390,6 +418,7 @@ impl<'a> TestMessageRouter<'a> { entropy_source, )), peers_override: Mutex::new(Vec::new()), + forward_node_scid_override: Mutex::new(new_hash_map()), } } } @@ -421,9 +450,13 @@ impl<'a> MessageRouter for TestMessageRouter<'a> { { let peers_override = self.peers_override.lock().unwrap(); if !peers_override.is_empty() { + let scid_override = self.forward_node_scid_override.lock().unwrap(); let peer_override_nodes: Vec<_> = peers_override .iter() - .map(|pk| MessageForwardNode { node_id: *pk, short_channel_id: None }) + .map(|pk| MessageForwardNode { + node_id: *pk, + short_channel_id: scid_override.get(pk).copied(), + }) .collect(); peers = peer_override_nodes; }