Skip to content

Commit 40e909a

Browse files
joostjagerclaude
andcommitted
Defer MonitorUpdatingPersister writes to flush()
Update MonitorUpdatingPersister and MonitorUpdatingPersisterAsync to queue persist operations in memory instead of writing immediately to disk. The Persist trait methods now return ChannelMonitorUpdateStatus:: InProgress and the actual writes happen when flush() is called. This fixes a race condition that could cause channel force closures: previously, if the node crashed after writing channel monitors but before writing the channel manager, the monitors would be ahead of the manager on restart. By deferring monitor writes until after the channel manager is persisted (via flush()), we ensure the manager is always at least as up-to-date as the monitors. The flush() method takes an optional count parameter to flush only a specific number of queued writes. The background processor captures the queue size before persisting the channel manager, then flushes exactly that many writes afterward. This prevents flushing monitor updates that arrived after the manager state was captured. Key changes: - Add PendingWrite enum to represent queued write/remove operations - Add pending_writes queue to MonitorUpdatingPersisterAsyncInner - Add pending_write_count() and flush(count) to Persist trait and ChainMonitor - ChainMonitor::flush() calls channel_monitor_updated for each completed write - Update Persist impl to queue writes and return InProgress - Call flush() in background processor after channel manager persistence - Remove unused event_notifier from AsyncPersister Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent b4fb555 commit 40e909a

3 files changed

Lines changed: 265 additions & 194 deletions

File tree

lightning-background-processor/src/lib.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1152,6 +1152,11 @@ where
11521152

11531153
let mut futures = Joiner::new();
11541154

1155+
// Capture the number of pending monitor writes before persisting the channel manager.
1156+
// We'll only flush this many writes after the manager is persisted, to avoid flushing
1157+
// monitor updates that arrived after the manager state was captured.
1158+
let pending_monitor_writes = chain_monitor.pending_write_count();
1159+
11551160
if channel_manager.get_cm().get_and_clear_needs_persistence() {
11561161
log_trace!(logger, "Persisting ChannelManager...");
11571162

@@ -1349,6 +1354,14 @@ where
13491354
res?;
13501355
}
13511356

1357+
// Flush the monitor writes that were pending before we persisted the channel manager.
1358+
// Any writes that arrived after are left in the queue for the next iteration.
1359+
if pending_monitor_writes > 0 {
1360+
if let Err(e) = chain_monitor.flush(Some(pending_monitor_writes)) {
1361+
log_error!(logger, "Failed to flush chain monitor: {}", e);
1362+
}
1363+
}
1364+
13521365
match check_and_reset_sleeper(&mut last_onion_message_handler_call, || {
13531366
sleeper(ONION_MESSAGE_HANDLER_TIMER)
13541367
}) {
@@ -1413,6 +1426,12 @@ where
14131426
channel_manager.get_cm().encode(),
14141427
)
14151428
.await?;
1429+
1430+
// Flush all pending monitor writes after final channel manager persistence.
1431+
if let Err(e) = chain_monitor.flush(None) {
1432+
log_error!(logger, "Failed to flush chain monitor: {}", e);
1433+
}
1434+
14161435
if let Some(ref scorer) = scorer {
14171436
kv_store
14181437
.write(
@@ -1722,6 +1741,9 @@ impl BackgroundProcessor {
17221741
channel_manager.get_cm().timer_tick_occurred();
17231742
last_freshness_call = Instant::now();
17241743
}
1744+
// Capture the number of pending monitor writes before persisting the channel manager.
1745+
let pending_monitor_writes = chain_monitor.pending_write_count();
1746+
17251747
if channel_manager.get_cm().get_and_clear_needs_persistence() {
17261748
log_trace!(logger, "Persisting ChannelManager...");
17271749
(kv_store.write(
@@ -1733,6 +1755,13 @@ impl BackgroundProcessor {
17331755
log_trace!(logger, "Done persisting ChannelManager.");
17341756
}
17351757

1758+
// Flush the monitor writes that were pending before we persisted the channel manager.
1759+
if pending_monitor_writes > 0 {
1760+
if let Err(e) = chain_monitor.flush(Some(pending_monitor_writes)) {
1761+
log_error!(logger, "Failed to flush chain monitor: {}", e);
1762+
}
1763+
}
1764+
17361765
if let Some(liquidity_manager) = liquidity_manager.as_ref() {
17371766
log_trace!(logger, "Persisting LiquidityManager...");
17381767
let _ = liquidity_manager.get_lm().persist().map_err(|e| {
@@ -1853,6 +1882,12 @@ impl BackgroundProcessor {
18531882
CHANNEL_MANAGER_PERSISTENCE_KEY,
18541883
channel_manager.get_cm().encode(),
18551884
)?;
1885+
1886+
// Flush all pending monitor writes after final channel manager persistence.
1887+
if let Err(e) = chain_monitor.flush(None) {
1888+
log_error!(logger, "Failed to flush chain monitor: {}", e);
1889+
}
1890+
18561891
if let Some(ref scorer) = scorer {
18571892
kv_store.write(
18581893
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,

lightning/src/chain/chainmonitor.rs

Lines changed: 60 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ use crate::chain::channelmonitor::{
3939
use crate::chain::transaction::{OutPoint, TransactionData};
4040
use crate::chain::{BestBlock, ChannelMonitorUpdateStatus, Filter, WatchedOutput};
4141
use crate::events::{self, Event, EventHandler, ReplayEvent};
42+
use crate::io;
4243
use crate::ln::channel_state::ChannelDetails;
4344
#[cfg(peer_storage)]
4445
use crate::ln::msgs::PeerStorage;
@@ -208,6 +209,26 @@ pub trait Persist<ChannelSigner: EcdsaChannelSigner> {
208209
fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> {
209210
Vec::new()
210211
}
212+
213+
/// Returns the number of pending writes in the queue.
214+
///
215+
/// This can be used to capture the queue size before persisting the channel manager,
216+
/// then pass that count to [`Self::flush`] to only flush those specific updates.
217+
fn pending_write_count(&self) -> usize {
218+
0
219+
}
220+
221+
/// Flushes pending writes to the underlying storage.
222+
///
223+
/// If `count` is `Some(n)`, only the first `n` pending writes are flushed.
224+
/// If `count` is `None`, all pending writes are flushed.
225+
///
226+
/// For implementations that queue writes (returning [`ChannelMonitorUpdateStatus::InProgress`]
227+
/// from persist methods), this method should write queued data to storage.
228+
///
229+
/// Returns the list of completed updates (channel_id, update_id) on success, or an error if
230+
/// any write failed.
231+
fn flush(&self, count: Option<usize>) -> Result<Vec<(ChannelId, u64)>, io::Error>;
211232
}
212233

213234
struct MonitorHolder<ChannelSigner: EcdsaChannelSigner> {
@@ -272,7 +293,6 @@ pub struct AsyncPersister<
272293
FE::Target: FeeEstimator,
273294
{
274295
persister: MonitorUpdatingPersisterAsync<K, S, L, ES, SP, BI, FE>,
275-
event_notifier: Arc<Notifier>,
276296
}
277297

278298
impl<
@@ -320,17 +340,15 @@ where
320340
&self, monitor_name: MonitorName,
321341
monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
322342
) -> ChannelMonitorUpdateStatus {
323-
let notifier = Arc::clone(&self.event_notifier);
324-
self.persister.spawn_async_persist_new_channel(monitor_name, monitor, notifier);
343+
self.persister.queue_new_channel(monitor_name, monitor);
325344
ChannelMonitorUpdateStatus::InProgress
326345
}
327346

328347
fn update_persisted_channel(
329348
&self, monitor_name: MonitorName, monitor_update: Option<&ChannelMonitorUpdate>,
330349
monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
331350
) -> ChannelMonitorUpdateStatus {
332-
let notifier = Arc::clone(&self.event_notifier);
333-
self.persister.spawn_async_update_channel(monitor_name, monitor_update, monitor, notifier);
351+
self.persister.queue_channel_update(monitor_name, monitor_update, monitor);
334352
ChannelMonitorUpdateStatus::InProgress
335353
}
336354

@@ -341,6 +359,14 @@ where
341359
fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> {
342360
self.persister.get_and_clear_completed_updates()
343361
}
362+
363+
fn pending_write_count(&self) -> usize {
364+
self.persister.pending_write_count()
365+
}
366+
367+
fn flush(&self, count: Option<usize>) -> Result<Vec<(ChannelId, u64)>, io::Error> {
368+
crate::util::persist::poll_sync_future(self.persister.flush(count))
369+
}
344370
}
345371

346372
/// An implementation of [`chain::Watch`] for monitoring channels.
@@ -440,7 +466,6 @@ impl<
440466
persister: MonitorUpdatingPersisterAsync<K, S, L, ES, SP, T, F>, _entropy_source: ES,
441467
_our_peerstorage_encryption_key: PeerStorageKey,
442468
) -> Self {
443-
let event_notifier = Arc::new(Notifier::new());
444469
Self {
445470
monitors: RwLock::new(new_hash_map()),
446471
chain_source,
@@ -450,8 +475,8 @@ impl<
450475
_entropy_source,
451476
pending_monitor_events: Mutex::new(Vec::new()),
452477
highest_chain_height: AtomicUsize::new(0),
453-
event_notifier: Arc::clone(&event_notifier),
454-
persister: AsyncPersister { persister, event_notifier },
478+
event_notifier: Arc::new(Notifier::new()),
479+
persister: AsyncPersister { persister },
455480
pending_send_only_events: Mutex::new(Vec::new()),
456481
#[cfg(peer_storage)]
457482
our_peerstorage_encryption_key: _our_peerstorage_encryption_key,
@@ -742,6 +767,33 @@ where
742767
.collect()
743768
}
744769

770+
/// Returns the number of pending writes in the persister queue.
771+
///
772+
/// This can be used to capture the queue size before persisting the channel manager,
773+
/// then pass that count to [`Self::flush`] to only flush those specific updates.
774+
pub fn pending_write_count(&self) -> usize {
775+
self.persister.pending_write_count()
776+
}
777+
778+
/// Flushes pending writes to the underlying storage.
779+
///
780+
/// If `count` is `Some(n)`, only the first `n` pending writes are flushed.
781+
/// If `count` is `None`, all pending writes are flushed.
782+
///
783+
/// For persisters that queue writes (returning [`ChannelMonitorUpdateStatus::InProgress`]
784+
/// from persist methods), this method writes queued data to storage and signals
785+
/// completion to the channel manager via [`Self::channel_monitor_updated`].
786+
///
787+
/// Returns the list of completed updates (channel_id, update_id) on success, or an error if
788+
/// any write failed. Note that even if an error is returned, some writes may have succeeded.
789+
pub fn flush(&self, count: Option<usize>) -> Result<Vec<(ChannelId, u64)>, io::Error> {
790+
let completed = self.persister.flush(count)?;
791+
for (channel_id, update_id) in &completed {
792+
let _ = self.channel_monitor_updated(*channel_id, *update_id);
793+
}
794+
Ok(completed)
795+
}
796+
745797
#[cfg(any(test, feature = "_test_utils"))]
746798
pub fn remove_monitor(&self, channel_id: &ChannelId) -> ChannelMonitor<ChannelSigner> {
747799
self.monitors.write().unwrap().remove(channel_id).unwrap().monitor

0 commit comments

Comments
 (0)