diff --git a/payjoin-cli/src/app/v2/mod.rs b/payjoin-cli/src/app/v2/mod.rs index 67dca2910..3abfa225c 100644 --- a/payjoin-cli/src/app/v2/mod.rs +++ b/payjoin-cli/src/app/v2/mod.rs @@ -299,7 +299,6 @@ impl AppTrait for App { Ok(()) } - #[allow(clippy::incompatible_msrv)] async fn resume_payjoins(&self) -> Result<()> { let recv_session_ids = self.db.get_recv_session_ids()?; let send_session_ids = self.db.get_send_session_ids()?; @@ -309,7 +308,7 @@ impl AppTrait for App { return Ok(()); } - let mut tasks = Vec::new(); + let mut tasks: Vec<(String, tokio::task::JoinHandle>)> = Vec::new(); // Process receiver sessions for session_id in recv_session_ids { @@ -317,12 +316,18 @@ impl AppTrait for App { let recv_persister = ReceiverPersister::from_id(self.db.clone(), session_id.clone()); match replay_receiver_event_log(&recv_persister) { Ok((receiver_state, _)) => { - tasks.push(tokio::spawn(async move { - self_clone.process_receiver_session(receiver_state, &recv_persister).await - })); + tasks.push(( + session_id.to_string(), + tokio::spawn(async move { + self_clone + .process_receiver_session(receiver_state, &recv_persister) + .await + }), + )); } Err(e) => { tracing::error!("An error {:?} occurred while replaying receiver session", e); + println!("Session {session_id} receiver failed to replay - {e}"); Self::close_failed_session(&recv_persister, &session_id, "receiver"); } } @@ -334,12 +339,16 @@ impl AppTrait for App { match replay_sender_event_log(&sender_persister) { Ok((sender_state, _)) => { let self_clone = self.clone(); - tasks.push(tokio::spawn(async move { - self_clone.process_sender_session(sender_state, &sender_persister).await - })); + tasks.push(( + session_id.clone().to_string(), + tokio::spawn(async move { + self_clone.process_sender_session(sender_state, &sender_persister).await + }), + )); } Err(e) => { tracing::error!("An error {:?} occurred while replaying Sender session", e); + println!("Session {session_id} sender failed to replay - {e}"); Self::close_failed_session(&sender_persister, &session_id, "sender"); } } @@ -348,12 +357,21 @@ impl AppTrait for App { let mut interrupt = self.interrupt.clone(); tokio::select! { _ = async { - for task in tasks { - let _ = task.await; + + for (session_id, task) in tasks { + match task.await { + Ok(Ok(())) => { + println!("Session {session_id} completed."); + } + Ok(Err(e)) => { + println!("Session {session_id} error: {e:#}"); + } + Err(e) => { + println!("Session {session_id} panicked or was cancelled: {e:?}"); + } + } } - } => { - println!("All resumed sessions completed."); - } + } => {} _ = interrupt.changed() => { println!("Resumed sessions were interrupted."); } @@ -571,9 +589,8 @@ impl App { sender: Sender, persister: &SenderPersister, ) -> Result<()> { - let (req, ctx) = sender.create_v2_post_request( - self.unwrap_relay_or_else_fetch(Some(&sender.endpoint())).await?.as_str(), - )?; + let relay = self.unwrap_relay_or_else_fetch(Some(&sender.endpoint())).await?; + let (req, ctx) = sender.create_v2_post_request(relay.as_str())?; let response = self.post_request(req).await?; let sender = sender.process_response(&response.bytes().await?, ctx).save(persister)?; println!("Posted Original PSBT..."); @@ -859,15 +876,12 @@ impl App { .save(persister); match check_result { - Ok(_) => { + Ok(OptionalTransitionOutcome::Progress(())) => { println!("Payjoin transaction detected in the mempool!"); return Ok(()); } - Err(_) => { - // keep polling - - continue; - } + Ok(OptionalTransitionOutcome::Stasis(_)) => continue, + Err(_) => continue, } } }) @@ -876,8 +890,7 @@ impl App { match result { Ok(ok) => ok, Err(_) => Err(anyhow!( - "Timeout waiting for payment confirmation after {:?}", - timeout_duration + "No payjoin transaction detected in mempool within {timeout_duration:?} seconds, stopping." )), } } diff --git a/payjoin-cli/tests/e2e.rs b/payjoin-cli/tests/e2e.rs index 827c5543b..2d6f45406 100644 --- a/payjoin-cli/tests/e2e.rs +++ b/payjoin-cli/tests/e2e.rs @@ -313,6 +313,24 @@ mod e2e { .expect("Failed to execute payjoin-cli"); respond_with_payjoin(cli_receive_resumer).await?; + let cli_receive_resumer = Command::new(payjoin_cli) + .arg("--root-certificate") + .arg(cert_path) + .arg("--rpchost") + .arg(&receiver_rpchost) + .arg("--cookie-file") + .arg(cookie_file) + .arg("--db-path") + .arg(&receiver_db_path) + .arg("--ohttp-relays") + .arg(ohttp_relay) + .arg("resume") + .stdout(Stdio::piped()) + .stderr(Stdio::inherit()) + .spawn() + .expect("Failed to execute payjoin-cli"); + check_resume_not_completed(cli_receive_resumer).await?; + let cli_send_resumer = Command::new(payjoin_cli) .arg("--root-certificate") .arg(cert_path) @@ -452,7 +470,7 @@ mod e2e { let res = tokio::time::timeout( timeout, wait_for_stdout_match(&mut stdout, |line| { - line.contains("All resumed sessions completed.") + line.starts_with("Session") && line.ends_with("completed.") }), ) .await?; @@ -461,6 +479,22 @@ mod e2e { assert!(res.is_some(), "Expected all resumed sessions completed"); Ok(()) } + + async fn check_resume_not_completed(mut cli_resumer: Child) -> Result<()> { + let mut stdout = + cli_resumer.stdout.take().expect("Failed to take stdout of child process"); + let timeout = tokio::time::Duration::from_secs(10); + let res = tokio::time::timeout( + timeout, + wait_for_stdout_match(&mut stdout, |line| { + line.starts_with("Session") && line.ends_with("completed.") + }), + ) + .await?; + terminate(cli_resumer).await.expect("Failed to kill payjoin-cli"); + assert!(res.is_none(), "Expected resumed sessions not yet completed"); + Ok(()) + } Ok(()) }