Skip to content

Commit 0c005d0

Browse files
joostjagerclaude
andcommitted
Add DeferredChainMonitor wrapper for batched monitor persistence
Introduce DeferredChainMonitor, a wrapper around ChainMonitor that queues watch_channel and update_channel operations, returning InProgress until flush() is called. This enables batched persistence of monitor updates after ChannelManager persistence, ensuring correct ordering. The wrapper implements all public traits that ChainMonitor supports (Listen, Confirm, EventsProvider, etc.) as pass-throughs, allowing it to be used as a drop-in replacement. Includes comprehensive tests covering the full channel lifecycle with payment flows using DeferredChainMonitor. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 1f5cef4 commit 0c005d0

4 files changed

Lines changed: 1146 additions & 291 deletions

File tree

lightning-background-processor/src/lib.rs

Lines changed: 39 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ use fwd_batch::BatchDelay;
3232

3333
use lightning::chain;
3434
use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
35-
use lightning::chain::chainmonitor::{ChainMonitor, Persist};
35+
use lightning::chain::chainmonitor::Persist;
36+
use lightning::chain::deferred::DeferredChainMonitor;
3637
#[cfg(feature = "std")]
3738
use lightning::events::EventHandler;
3839
#[cfg(feature = "std")]
@@ -853,7 +854,7 @@ use futures_util::{dummy_waker, Joiner, OptionalSelector, Selector, SelectorOutp
853854
/// # fn send_data(&mut self, _data: &[u8], _continue_read: bool) -> usize { 0 }
854855
/// # fn disconnect_socket(&mut self) {}
855856
/// # }
856-
/// # type ChainMonitor<B, F, FE> = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<F>, Arc<B>, Arc<FE>, Arc<Logger>, Arc<StoreSync>, Arc<lightning::sign::KeysManager>>;
857+
/// # type ChainMonitor<B, F, FE> = lightning::chain::deferred::DeferredChainMonitor<lightning::sign::InMemorySigner, Arc<F>, Arc<B>, Arc<FE>, Arc<Logger>, Arc<StoreSync>, Arc<lightning::sign::KeysManager>>;
857858
/// # type NetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<Logger>>;
858859
/// # type P2PGossipSync<UL> = lightning::routing::gossip::P2PGossipSync<Arc<NetworkGraph>, Arc<UL>, Arc<Logger>>;
859860
/// # type ChannelManager<B, F, FE> = lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor<B, F, FE>, B, FE, Logger>;
@@ -964,7 +965,9 @@ pub async fn process_events_async<
964965
EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
965966
EventHandler: Fn(Event) -> EventHandlerFuture,
966967
ES: Deref,
967-
M: Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>,
968+
M: Deref<
969+
Target = DeferredChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>,
970+
>,
968971
CM: Deref,
969972
OM: Deref,
970973
PGS: Deref<Target = P2PGossipSync<G, UL, L>>,
@@ -1155,7 +1158,7 @@ where
11551158
// Capture the number of pending monitor writes before persisting the channel manager.
11561159
// We'll only flush this many writes after the manager is persisted, to avoid flushing
11571160
// monitor updates that arrived after the manager state was captured.
1158-
let pending_monitor_writes = chain_monitor.pending_monitor_operation_count();
1161+
let pending_monitor_writes = chain_monitor.pending_operation_count();
11591162

11601163
if channel_manager.get_cm().get_and_clear_needs_persistence() {
11611164
log_trace!(logger, "Persisting ChannelManager...");
@@ -1427,7 +1430,7 @@ where
14271430
.await?;
14281431

14291432
// Flush all pending monitor writes after final channel manager persistence.
1430-
let pending_monitor_writes = chain_monitor.pending_monitor_operation_count();
1433+
let pending_monitor_writes = chain_monitor.pending_operation_count();
14311434
if pending_monitor_writes > 0 {
14321435
log_trace!(logger, "Flushing {} monitor writes on shutdown", pending_monitor_writes);
14331436
chain_monitor.flush(pending_monitor_writes);
@@ -1485,7 +1488,9 @@ pub async fn process_events_async_with_kv_store_sync<
14851488
EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
14861489
EventHandler: Fn(Event) -> EventHandlerFuture,
14871490
ES: Deref,
1488-
M: Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>,
1491+
M: Deref<
1492+
Target = DeferredChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>,
1493+
>,
14891494
CM: Deref,
14901495
OM: Deref,
14911496
PGS: Deref<Target = P2PGossipSync<G, UL, L>>,
@@ -1600,7 +1605,15 @@ impl BackgroundProcessor {
16001605
ES: 'static + Deref + Send,
16011606
M: 'static
16021607
+ Deref<
1603-
Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>,
1608+
Target = DeferredChainMonitor<
1609+
<CM::Target as AChannelManager>::Signer,
1610+
CF,
1611+
T,
1612+
F,
1613+
L,
1614+
P,
1615+
ES,
1616+
>,
16041617
>
16051618
+ Send
16061619
+ Sync,
@@ -1744,7 +1757,7 @@ impl BackgroundProcessor {
17441757
}
17451758

17461759
// Capture the number of pending monitor writes before persisting the channel manager.
1747-
let pending_monitor_writes = chain_monitor.pending_monitor_operation_count();
1760+
let pending_monitor_writes = chain_monitor.pending_operation_count();
17481761

17491762
if channel_manager.get_cm().get_and_clear_needs_persistence() {
17501763
log_trace!(logger, "Persisting ChannelManager...");
@@ -1885,7 +1898,7 @@ impl BackgroundProcessor {
18851898
)?;
18861899

18871900
// Flush all pending monitor writes after final channel manager persistence.
1888-
let pending_monitor_writes = chain_monitor.pending_monitor_operation_count();
1901+
let pending_monitor_writes = chain_monitor.pending_operation_count();
18891902
if pending_monitor_writes > 0 {
18901903
log_trace!(
18911904
logger,
@@ -1978,7 +1991,7 @@ mod tests {
19781991
use core::sync::atomic::{AtomicBool, Ordering};
19791992
use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
19801993
use lightning::chain::transaction::OutPoint;
1981-
use lightning::chain::{chainmonitor, BestBlock, Confirm, Filter};
1994+
use lightning::chain::{deferred, BestBlock, Confirm, Filter};
19821995
use lightning::events::{Event, PathFailure, ReplayEvent};
19831996
use lightning::ln::channelmanager;
19841997
use lightning::ln::channelmanager::{
@@ -2068,7 +2081,7 @@ mod tests {
20682081
Arc<test_utils::TestLogger>,
20692082
>;
20702083

2071-
type ChainMonitor = chainmonitor::ChainMonitor<
2084+
type ChainMonitor = deferred::DeferredChainMonitor<
20722085
InMemorySigner,
20732086
Arc<test_utils::TestChainSource>,
20742087
Arc<test_utils::TestBroadcaster>,
@@ -2496,7 +2509,7 @@ mod tests {
24962509
let now = Duration::from_secs(genesis_block.header.time as u64);
24972510
let keys_manager =
24982511
Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos(), true));
2499-
let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(
2512+
let chain_monitor = Arc::new(deferred::DeferredChainMonitor::new(
25002513
Some(Arc::clone(&chain_source)),
25012514
Arc::clone(&tx_broadcaster),
25022515
Arc::clone(&logger),
@@ -2640,19 +2653,25 @@ mod tests {
26402653
tx.clone(),
26412654
)
26422655
.unwrap();
2656+
// Flush deferred monitor operations so messages aren't held back
2657+
$node_a.chain_monitor.flush_all();
26432658
let msg_a = get_event_msg!(
26442659
$node_a,
26452660
MessageSendEvent::SendFundingCreated,
26462661
$node_b.node.get_our_node_id()
26472662
);
26482663
$node_b.node.handle_funding_created($node_a.node.get_our_node_id(), &msg_a);
2664+
// Flush node_b's monitor so it releases the FundingSigned message
2665+
$node_b.chain_monitor.flush_all();
26492666
get_event!($node_b, Event::ChannelPending);
26502667
let msg_b = get_event_msg!(
26512668
$node_b,
26522669
MessageSendEvent::SendFundingSigned,
26532670
$node_a.node.get_our_node_id()
26542671
);
26552672
$node_a.node.handle_funding_signed($node_b.node.get_our_node_id(), &msg_b);
2673+
// Flush node_a's monitor for the final update
2674+
$node_a.chain_monitor.flush_all();
26562675
get_event!($node_a, Event::ChannelPending);
26572676
tx
26582677
}};
@@ -3099,11 +3118,17 @@ mod tests {
30993118
.node
31003119
.funding_transaction_generated(temporary_channel_id, node_1_id, funding_tx.clone())
31013120
.unwrap();
3121+
// Flush node_0's deferred monitor operations so the FundingCreated message is released
3122+
nodes[0].chain_monitor.flush_all();
31023123
let msg_0 = get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, node_1_id);
31033124
nodes[1].node.handle_funding_created(node_0_id, &msg_0);
3125+
// Flush node_1's deferred monitor operations so events and FundingSigned are released
3126+
nodes[1].chain_monitor.flush_all();
31043127
get_event!(nodes[1], Event::ChannelPending);
31053128
let msg_1 = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, node_0_id);
31063129
nodes[0].node.handle_funding_signed(node_1_id, &msg_1);
3130+
// Flush node_0's monitor for the funding_signed update
3131+
nodes[0].chain_monitor.flush_all();
31073132
channel_pending_recv
31083133
.recv_timeout(EVENT_DEADLINE)
31093134
.expect("ChannelPending not handled within deadline");
@@ -3164,6 +3189,8 @@ mod tests {
31643189
error_message.to_string(),
31653190
)
31663191
.unwrap();
3192+
// Flush the monitor update triggered by force close so the commitment tx is broadcasted
3193+
nodes[0].chain_monitor.flush_all();
31673194
let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
31683195
confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
31693196

0 commit comments

Comments
 (0)