From 13e922d292db9563713faf8c4721033ab0f21f7d Mon Sep 17 00:00:00 2001 From: zealsham Date: Fri, 22 May 2026 01:49:21 +0100 Subject: [PATCH 1/2] Fix resume succeding prematurely This addresses #1545, upon resuming a session where the sender posted the original proposal but went ofline, the cli succeds prematurely and outputs "payjoin transaction detected in mempool" because the cli treats any Ok(_) as scucess. This also addresses a subtle issue where after a certain period without tx detected in the mempool the the monitor_proposal function returned Err("Timeout waiting for payment confirmation after 5s"). That looked like a failure even though the session should stay open in Monitor. And finally the missleading "All resumed sessions completed" message. Co-authored-by: xstoicunicornx --- payjoin-cli/src/app/v2/mod.rs | 62 ++++++++++++++++++++++++----------- payjoin-cli/tests/e2e.rs | 37 ++++++++++++++++++++- 2 files changed, 78 insertions(+), 21 deletions(-) diff --git a/payjoin-cli/src/app/v2/mod.rs b/payjoin-cli/src/app/v2/mod.rs index 67dca2910..556afbcde 100644 --- a/payjoin-cli/src/app/v2/mod.rs +++ b/payjoin-cli/src/app/v2/mod.rs @@ -299,7 +299,6 @@ impl AppTrait for App { Ok(()) } - #[allow(clippy::incompatible_msrv)] async fn resume_payjoins(&self) -> Result<()> { let recv_session_ids = self.db.get_recv_session_ids()?; let send_session_ids = self.db.get_send_session_ids()?; @@ -309,7 +308,8 @@ impl AppTrait for App { return Ok(()); } - let mut tasks = Vec::new(); + //let mut tasks = Vec::new(); + let mut tasks: Vec<(String, tokio::task::JoinHandle>)> = Vec::new(); // Process receiver sessions for session_id in recv_session_ids { @@ -317,12 +317,18 @@ impl AppTrait for App { let recv_persister = ReceiverPersister::from_id(self.db.clone(), session_id.clone()); match replay_receiver_event_log(&recv_persister) { Ok((receiver_state, _)) => { - tasks.push(tokio::spawn(async move { - self_clone.process_receiver_session(receiver_state, &recv_persister).await - })); + tasks.push(( + session_id.to_string(), + tokio::spawn(async move { + self_clone + .process_receiver_session(receiver_state, &recv_persister) + .await + }), + )); } Err(e) => { tracing::error!("An error {:?} occurred while replaying receiver session", e); + println!("Session {session_id} receiver failed to replay - {e}"); Self::close_failed_session(&recv_persister, &session_id, "receiver"); } } @@ -334,12 +340,17 @@ impl AppTrait for App { match replay_sender_event_log(&sender_persister) { Ok((sender_state, _)) => { let self_clone = self.clone(); - tasks.push(tokio::spawn(async move { - self_clone.process_sender_session(sender_state, &sender_persister).await - })); + + tasks.push(( + session_id.clone().to_string(), + tokio::spawn(async move { + self_clone.process_sender_session(sender_state, &sender_persister).await + }), + )); } Err(e) => { tracing::error!("An error {:?} occurred while replaying Sender session", e); + println!("Session {session_id} sender failed to replay - {e}"); Self::close_failed_session(&sender_persister, &session_id, "sender"); } } @@ -348,12 +359,27 @@ impl AppTrait for App { let mut interrupt = self.interrupt.clone(); tokio::select! { _ = async { - for task in tasks { - let _ = task.await; + let mut all_completed = true; + + for (session_id, task) in tasks { + match task.await { + Ok(Ok(())) => { + println!("Session {session_id} completed."); + } + Ok(Err(e)) => { + println!("Session {session_id} error: {e:#}"); + all_completed = false; + } + Err(e) => { + println!("Session {session_id} panicked or was cancelled: {e:?}"); + all_completed = false; + } + } } - } => { + if all_completed { println!("All resumed sessions completed."); } + } => {} _ = interrupt.changed() => { println!("Resumed sessions were interrupted."); } @@ -571,9 +597,8 @@ impl App { sender: Sender, persister: &SenderPersister, ) -> Result<()> { - let (req, ctx) = sender.create_v2_post_request( - self.unwrap_relay_or_else_fetch(Some(&sender.endpoint())).await?.as_str(), - )?; + let relay = self.unwrap_relay_or_else_fetch(Some(&sender.endpoint())).await?; + let (req, ctx) = sender.create_v2_post_request(relay.as_str())?; let response = self.post_request(req).await?; let sender = sender.process_response(&response.bytes().await?, ctx).save(persister)?; println!("Posted Original PSBT..."); @@ -859,15 +884,12 @@ impl App { .save(persister); match check_result { - Ok(_) => { + Ok(OptionalTransitionOutcome::Progress(())) => { println!("Payjoin transaction detected in the mempool!"); return Ok(()); } - Err(_) => { - // keep polling - - continue; - } + Ok(OptionalTransitionOutcome::Stasis(_)) => continue, + Err(_) => continue, } } }) diff --git a/payjoin-cli/tests/e2e.rs b/payjoin-cli/tests/e2e.rs index 827c5543b..e0e3b4ab4 100644 --- a/payjoin-cli/tests/e2e.rs +++ b/payjoin-cli/tests/e2e.rs @@ -313,6 +313,24 @@ mod e2e { .expect("Failed to execute payjoin-cli"); respond_with_payjoin(cli_receive_resumer).await?; + let cli_receive_resumer = Command::new(payjoin_cli) + .arg("--root-certificate") + .arg(cert_path) + .arg("--rpchost") + .arg(&receiver_rpchost) + .arg("--cookie-file") + .arg(cookie_file) + .arg("--db-path") + .arg(&receiver_db_path) + .arg("--ohttp-relays") + .arg(ohttp_relay) + .arg("resume") + .stdout(Stdio::piped()) + .stderr(Stdio::inherit()) + .spawn() + .expect("Failed to execute payjoin-cli"); + check_resume_not_completed(cli_receive_resumer).await?; + let cli_send_resumer = Command::new(payjoin_cli) .arg("--root-certificate") .arg(cert_path) @@ -458,7 +476,24 @@ mod e2e { .await?; terminate(cli_resumer).await.expect("Failed to kill payjoin-cli"); - assert!(res.is_some(), "Expected all resumed sessions completed"); + assert!(res.is_some(), "Expected resume summary with all sessions closed"); + Ok(()) + } + + async fn check_resume_not_completed(mut cli_resumer: Child) -> Result<()> { + let mut stdout = + cli_resumer.stdout.take().expect("Failed to take stdout of child process"); + let timeout = tokio::time::Duration::from_secs(10); + let res = tokio::time::timeout( + timeout, + wait_for_stdout_match(&mut stdout, |line| { + line.contains("All resumed sessions completed.") + || line.contains("No sessions to resume.") + }), + ) + .await?; + terminate(cli_resumer).await.expect("Failed to kill payjoin-cli"); + assert!(res.is_none(), "Expected resumed sessions not yet compeleted"); Ok(()) } Ok(()) From fb014508a55309782eb748e74fb8232a80804c4e Mon Sep 17 00:00:00 2001 From: zealsham Date: Wed, 27 May 2026 13:43:21 +0100 Subject: [PATCH 2/2] Add descriptive message to monitor failing to detect TX in mempool --- payjoin-cli/src/app/v2/mod.rs | 11 +---------- payjoin-cli/tests/e2e.rs | 9 ++++----- 2 files changed, 5 insertions(+), 15 deletions(-) diff --git a/payjoin-cli/src/app/v2/mod.rs b/payjoin-cli/src/app/v2/mod.rs index 556afbcde..3abfa225c 100644 --- a/payjoin-cli/src/app/v2/mod.rs +++ b/payjoin-cli/src/app/v2/mod.rs @@ -308,7 +308,6 @@ impl AppTrait for App { return Ok(()); } - //let mut tasks = Vec::new(); let mut tasks: Vec<(String, tokio::task::JoinHandle>)> = Vec::new(); // Process receiver sessions @@ -340,7 +339,6 @@ impl AppTrait for App { match replay_sender_event_log(&sender_persister) { Ok((sender_state, _)) => { let self_clone = self.clone(); - tasks.push(( session_id.clone().to_string(), tokio::spawn(async move { @@ -359,7 +357,6 @@ impl AppTrait for App { let mut interrupt = self.interrupt.clone(); tokio::select! { _ = async { - let mut all_completed = true; for (session_id, task) in tasks { match task.await { @@ -368,17 +365,12 @@ impl AppTrait for App { } Ok(Err(e)) => { println!("Session {session_id} error: {e:#}"); - all_completed = false; } Err(e) => { println!("Session {session_id} panicked or was cancelled: {e:?}"); - all_completed = false; } } } - if all_completed { - println!("All resumed sessions completed."); - } } => {} _ = interrupt.changed() => { println!("Resumed sessions were interrupted."); @@ -898,8 +890,7 @@ impl App { match result { Ok(ok) => ok, Err(_) => Err(anyhow!( - "Timeout waiting for payment confirmation after {:?}", - timeout_duration + "No payjoin transaction detected in mempool within {timeout_duration:?} seconds, stopping." )), } } diff --git a/payjoin-cli/tests/e2e.rs b/payjoin-cli/tests/e2e.rs index e0e3b4ab4..2d6f45406 100644 --- a/payjoin-cli/tests/e2e.rs +++ b/payjoin-cli/tests/e2e.rs @@ -470,13 +470,13 @@ mod e2e { let res = tokio::time::timeout( timeout, wait_for_stdout_match(&mut stdout, |line| { - line.contains("All resumed sessions completed.") + line.starts_with("Session") && line.ends_with("completed.") }), ) .await?; terminate(cli_resumer).await.expect("Failed to kill payjoin-cli"); - assert!(res.is_some(), "Expected resume summary with all sessions closed"); + assert!(res.is_some(), "Expected all resumed sessions completed"); Ok(()) } @@ -487,13 +487,12 @@ mod e2e { let res = tokio::time::timeout( timeout, wait_for_stdout_match(&mut stdout, |line| { - line.contains("All resumed sessions completed.") - || line.contains("No sessions to resume.") + line.starts_with("Session") && line.ends_with("completed.") }), ) .await?; terminate(cli_resumer).await.expect("Failed to kill payjoin-cli"); - assert!(res.is_none(), "Expected resumed sessions not yet compeleted"); + assert!(res.is_none(), "Expected resumed sessions not yet completed"); Ok(()) } Ok(())