Skip to content

Commit 6274ba0

Browse files
joostjagerclaude
andcommitted
Add deferred monitor write support to chanmon_consistency fuzz target
Add a `deferred` flag to `TestChainMonitor` that controls whether the underlying `ChainMonitor` queues operations instead of executing them immediately. The flag is derived from the fuzz input alongside the existing monitor style bits, so each of the three nodes can independently run in deferred or immediate mode. In deferred mode, `watch_channel` and `update_channel` always return `InProgress`. A new `flush_and_update_latest_monitors` method drains the queued operations and, when the persister reports `Completed`, promotes the pending shadow monitor snapshots to persisted state. This method is called before `release_pending_monitor_events` and at each point where the fuzzer completes pending monitor updates. On node reload, deferred monitors are flushed immediately after `watch_channel` so the node starts with a consistent state. AI tools were used in preparing this commit. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent d830f10 commit 6274ba0

1 file changed

Lines changed: 62 additions & 3 deletions

File tree

fuzz/src/chanmon_consistency.rs

Lines changed: 62 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -266,12 +266,13 @@ struct TestChainMonitor {
266266
Arc<KeyProvider>,
267267
>,
268268
>,
269+
pub deferred: bool,
269270
pub latest_monitors: Mutex<HashMap<ChannelId, LatestMonitorState>>,
270271
}
271272
impl TestChainMonitor {
272273
pub fn new(
273274
broadcaster: Arc<TestBroadcaster>, logger: Arc<dyn Logger>, feeest: Arc<FuzzEstimator>,
274-
persister: Arc<TestPersister>, keys: Arc<KeyProvider>,
275+
persister: Arc<TestPersister>, keys: Arc<KeyProvider>, deferred: bool,
275276
) -> Self {
276277
Self {
277278
chain_monitor: Arc::new(chainmonitor::ChainMonitor::new(
@@ -282,14 +283,44 @@ impl TestChainMonitor {
282283
Arc::clone(&persister),
283284
Arc::clone(&keys),
284285
keys.get_peer_storage_key(),
285-
false,
286+
deferred,
286287
)),
287288
logger,
288289
keys,
289290
persister,
291+
deferred,
290292
latest_monitors: Mutex::new(new_hash_map()),
291293
}
292294
}
295+
296+
/// Flushes all deferred monitor operations and, if the persister reports success, promotes
297+
/// pending monitor states to persisted in our shadow records. `TestChainMonitor` maintains
298+
/// its own `latest_monitors` map that tracks serialized monitor snapshots independently of
299+
/// `ChainMonitor`, so that the fuzzer can simulate node restarts by deserializing from these
300+
/// snapshots rather than relying on the persister's storage.
301+
///
302+
/// This simulates the pattern of snapshotting the pending count, persisting the
303+
/// `ChannelManager`, then flushing the queued monitor writes.
304+
fn flush_and_update_latest_monitors(&self) {
305+
let count = self.chain_monitor.pending_operation_count();
306+
if count == 0 {
307+
return;
308+
}
309+
// Execute all queued watch_channel/update_channel operations inside the ChainMonitor.
310+
self.chain_monitor.flush(count, &self.logger);
311+
let persister_res = *self.persister.update_ret.lock().unwrap();
312+
// Only update our local tracking state when the persister signals completion. When
313+
// persistence is still in-progress, the monitors stay in the pending set so that a
314+
// simulated restart can still reload from the last fully-persisted snapshot.
315+
if persister_res == chain::ChannelMonitorUpdateStatus::Completed {
316+
for (_channel_id, state) in self.latest_monitors.lock().unwrap().iter_mut() {
317+
if let Some((id, data)) = state.pending_monitors.drain(..).last() {
318+
state.persisted_monitor_id = id;
319+
state.persisted_monitor = data;
320+
}
321+
}
322+
}
323+
}
293324
}
294325
impl chain::Watch<TestChannelSigner> for TestChainMonitor {
295326
fn watch_channel(
@@ -299,6 +330,9 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
299330
monitor.write(&mut ser).unwrap();
300331
let monitor_id = monitor.get_latest_update_id();
301332
let res = self.chain_monitor.watch_channel(channel_id, monitor);
333+
if self.deferred {
334+
assert_eq!(res, Ok(chain::ChannelMonitorUpdateStatus::InProgress));
335+
}
302336
let state = match res {
303337
Ok(chain::ChannelMonitorUpdateStatus::Completed) => LatestMonitorState {
304338
persisted_monitor_id: monitor_id,
@@ -348,6 +382,9 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
348382
let mut ser = VecWriter(Vec::new());
349383
deserialized_monitor.write(&mut ser).unwrap();
350384
let res = self.chain_monitor.update_channel(channel_id, update);
385+
if self.deferred {
386+
assert_eq!(res, chain::ChannelMonitorUpdateStatus::InProgress);
387+
}
351388
match res {
352389
chain::ChannelMonitorUpdateStatus::Completed => {
353390
map_entry.persisted_monitor_id = update.update_id;
@@ -364,6 +401,9 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
364401
fn release_pending_monitor_events(
365402
&self,
366403
) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, PublicKey)> {
404+
if self.deferred {
405+
self.flush_and_update_latest_monitors();
406+
}
367407
return self.chain_monitor.release_pending_monitor_events();
368408
}
369409
}
@@ -891,6 +931,11 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
891931
ChannelMonitorUpdateStatus::Completed
892932
}),
893933
];
934+
let deferred = [
935+
initial_mon_styles & 0b001_000 != 0,
936+
initial_mon_styles & 0b010_000 != 0,
937+
initial_mon_styles & 0b100_000 != 0,
938+
];
894939

895940
let mut chain_state = ChainState::new();
896941
let mut node_height_a: u32 = 0;
@@ -919,6 +964,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
919964
update_ret: Mutex::new(mon_style[$node_id as usize].borrow().clone()),
920965
}),
921966
Arc::clone(&keys_manager),
967+
deferred[$node_id as usize],
922968
));
923969

924970
let mut config = UserConfig::default();
@@ -971,6 +1017,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
9711017
update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed),
9721018
}),
9731019
Arc::clone(keys),
1020+
deferred[node_id as usize],
9741021
));
9751022

9761023
let mut config = UserConfig::default();
@@ -1037,18 +1084,28 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
10371084
let manager =
10381085
<(BlockHash, ChanMan)>::read(&mut &ser[..], read_args).expect("Failed to read manager");
10391086
let res = (manager.1, chain_monitor.clone());
1087+
let expected_status = if deferred[node_id as usize] {
1088+
ChannelMonitorUpdateStatus::InProgress
1089+
} else {
1090+
ChannelMonitorUpdateStatus::Completed
1091+
};
10401092
for (channel_id, mon) in monitors.drain() {
10411093
assert_eq!(
10421094
chain_monitor.chain_monitor.watch_channel(channel_id, mon),
1043-
Ok(ChannelMonitorUpdateStatus::Completed)
1095+
Ok(expected_status)
10441096
);
10451097
}
1098+
if deferred[node_id as usize] {
1099+
let count = chain_monitor.chain_monitor.pending_operation_count();
1100+
chain_monitor.chain_monitor.flush(count, &chain_monitor.logger);
1101+
}
10461102
*chain_monitor.persister.update_ret.lock().unwrap() = *mon_style[node_id as usize].borrow();
10471103
res
10481104
};
10491105

10501106
macro_rules! complete_all_pending_monitor_updates {
10511107
($monitor: expr) => {{
1108+
$monitor.flush_and_update_latest_monitors();
10521109
for (channel_id, state) in $monitor.latest_monitors.lock().unwrap().iter_mut() {
10531110
for (id, data) in state.pending_monitors.drain(..) {
10541111
$monitor.chain_monitor.channel_monitor_updated(*channel_id, id).unwrap();
@@ -2008,6 +2065,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
20082065
|monitor: &Arc<TestChainMonitor>,
20092066
chan_funding,
20102067
compl_selector: &dyn Fn(&mut Vec<(u64, Vec<u8>)>) -> Option<(u64, Vec<u8>)>| {
2068+
monitor.flush_and_update_latest_monitors();
20112069
if let Some(state) = monitor.latest_monitors.lock().unwrap().get_mut(chan_funding) {
20122070
assert!(
20132071
state.pending_monitors.windows(2).all(|pair| pair[0].0 < pair[1].0),
@@ -2024,6 +2082,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
20242082
};
20252083

20262084
let complete_all_monitor_updates = |monitor: &Arc<TestChainMonitor>, chan_id| {
2085+
monitor.flush_and_update_latest_monitors();
20272086
if let Some(state) = monitor.latest_monitors.lock().unwrap().get_mut(chan_id) {
20282087
assert!(
20292088
state.pending_monitors.windows(2).all(|pair| pair[0].0 < pair[1].0),

0 commit comments

Comments
 (0)