Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 37 additions & 24 deletions payjoin-cli/src/app/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,6 @@ impl AppTrait for App {
Ok(())
}

#[allow(clippy::incompatible_msrv)]
async fn resume_payjoins(&self) -> Result<()> {
let recv_session_ids = self.db.get_recv_session_ids()?;
let send_session_ids = self.db.get_send_session_ids()?;
Expand All @@ -309,20 +308,26 @@ impl AppTrait for App {
return Ok(());
}

let mut tasks = Vec::new();
let mut tasks: Vec<(String, tokio::task::JoinHandle<Result<()>>)> = Vec::new();

// Process receiver sessions
for session_id in recv_session_ids {
let self_clone = self.clone();
let recv_persister = ReceiverPersister::from_id(self.db.clone(), session_id.clone());
match replay_receiver_event_log(&recv_persister) {
Ok((receiver_state, _)) => {
tasks.push(tokio::spawn(async move {
self_clone.process_receiver_session(receiver_state, &recv_persister).await
}));
tasks.push((
session_id.to_string(),
tokio::spawn(async move {
self_clone
.process_receiver_session(receiver_state, &recv_persister)
.await
}),
));
}
Err(e) => {
tracing::error!("An error {:?} occurred while replaying receiver session", e);
println!("Session {session_id} receiver failed to replay - {e}");
Self::close_failed_session(&recv_persister, &session_id, "receiver");
}
}
Expand All @@ -334,12 +339,16 @@ impl AppTrait for App {
match replay_sender_event_log(&sender_persister) {
Ok((sender_state, _)) => {
let self_clone = self.clone();
tasks.push(tokio::spawn(async move {
self_clone.process_sender_session(sender_state, &sender_persister).await
}));
tasks.push((
session_id.clone().to_string(),
tokio::spawn(async move {
self_clone.process_sender_session(sender_state, &sender_persister).await
}),
));
}
Err(e) => {
tracing::error!("An error {:?} occurred while replaying Sender session", e);
println!("Session {session_id} sender failed to replay - {e}");
Self::close_failed_session(&sender_persister, &session_id, "sender");
}
}
Expand All @@ -348,12 +357,21 @@ impl AppTrait for App {
let mut interrupt = self.interrupt.clone();
tokio::select! {
_ = async {
for task in tasks {
let _ = task.await;

for (session_id, task) in tasks {
match task.await {
Ok(Ok(())) => {
println!("Session {session_id} completed.");
}
Ok(Err(e)) => {
println!("Session {session_id} error: {e:#}");
}
Err(e) => {
println!("Session {session_id} panicked or was cancelled: {e:?}");
}
}
}
} => {
println!("All resumed sessions completed.");
}
} => {}
_ = interrupt.changed() => {
println!("Resumed sessions were interrupted.");
}
Expand Down Expand Up @@ -571,9 +589,8 @@ impl App {
sender: Sender<WithReplyKey>,
persister: &SenderPersister,
) -> Result<()> {
let (req, ctx) = sender.create_v2_post_request(
self.unwrap_relay_or_else_fetch(Some(&sender.endpoint())).await?.as_str(),
)?;
let relay = self.unwrap_relay_or_else_fetch(Some(&sender.endpoint())).await?;
let (req, ctx) = sender.create_v2_post_request(relay.as_str())?;
let response = self.post_request(req).await?;
let sender = sender.process_response(&response.bytes().await?, ctx).save(persister)?;
println!("Posted Original PSBT...");
Expand Down Expand Up @@ -859,15 +876,12 @@ impl App {
.save(persister);

match check_result {
Ok(_) => {
Ok(OptionalTransitionOutcome::Progress(())) => {
println!("Payjoin transaction detected in the mempool!");
return Ok(());
}
Err(_) => {
// keep polling

continue;
}
Ok(OptionalTransitionOutcome::Stasis(_)) => continue,
Err(_) => continue,
Comment on lines +879 to +884
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really the meat of the fix (we were previously returning on Stasis). Would be nice to see some test coverage like https://github.com/payjoin/rust-payjoin/pull/1575/changes#diff-162f6469a0779bd5b6f735aa260e95f854b0e8c0d7cc9da278a2c2f51d394c15R316

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xstoicunicornx since your branch already has the needed test coverage, i can borrow that heavily and then credit you as co-author in the commit

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course!

}
}
})
Expand All @@ -876,8 +890,7 @@ impl App {
match result {
Ok(ok) => ok,
Err(_) => Err(anyhow!(
"Timeout waiting for payment confirmation after {:?}",
timeout_duration
"No payjoin transaction detected in mempool within {timeout_duration:?} seconds, stopping."
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"No payjoin transaction detected in mempool within {timeout_duration:?} seconds, stopping."
"No payjoin transaction detected in mempool within {timeout_duration:?}, stopping."

I was wrong about adding the "seconds", its already included as part of the timeout_duration.

)),
}
}
Expand Down
36 changes: 35 additions & 1 deletion payjoin-cli/tests/e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,24 @@ mod e2e {
.expect("Failed to execute payjoin-cli");
respond_with_payjoin(cli_receive_resumer).await?;

let cli_receive_resumer = Command::new(payjoin_cli)
.arg("--root-certificate")
.arg(cert_path)
.arg("--rpchost")
.arg(&receiver_rpchost)
.arg("--cookie-file")
.arg(cookie_file)
.arg("--db-path")
.arg(&receiver_db_path)
.arg("--ohttp-relays")
.arg(ohttp_relay)
.arg("resume")
.stdout(Stdio::piped())
.stderr(Stdio::inherit())
.spawn()
.expect("Failed to execute payjoin-cli");
check_resume_not_completed(cli_receive_resumer).await?;

let cli_send_resumer = Command::new(payjoin_cli)
.arg("--root-certificate")
.arg(cert_path)
Expand Down Expand Up @@ -452,7 +470,7 @@ mod e2e {
let res = tokio::time::timeout(
timeout,
wait_for_stdout_match(&mut stdout, |line| {
line.contains("All resumed sessions completed.")
line.starts_with("Session") && line.ends_with("completed.")
}),
)
.await?;
Expand All @@ -461,6 +479,22 @@ mod e2e {
assert!(res.is_some(), "Expected all resumed sessions completed");
Ok(())
}

async fn check_resume_not_completed(mut cli_resumer: Child) -> Result<()> {
let mut stdout =
cli_resumer.stdout.take().expect("Failed to take stdout of child process");
let timeout = tokio::time::Duration::from_secs(10);
let res = tokio::time::timeout(
timeout,
wait_for_stdout_match(&mut stdout, |line| {
line.starts_with("Session") && line.ends_with("completed.")
}),
)
.await?;
terminate(cli_resumer).await.expect("Failed to kill payjoin-cli");
assert!(res.is_none(), "Expected resumed sessions not yet completed");
Ok(())
}
Ok(())
}

Expand Down
Loading