diff --git a/Cargo.lock b/Cargo.lock index e2f0a21d..e2fcb4f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2589,6 +2589,7 @@ dependencies = [ "objectstore-metrics", "objectstore-types", "reqwest 0.12.28", + "sentry", "serde", "serde_json", "tempfile", diff --git a/objectstore-service/Cargo.toml b/objectstore-service/Cargo.toml index 65910336..25293f27 100644 --- a/objectstore-service/Cargo.toml +++ b/objectstore-service/Cargo.toml @@ -27,6 +27,7 @@ reqwest = { workspace = true, features = [ "multipart", "json", ] } +sentry = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } thiserror = { workspace = true } diff --git a/objectstore-service/src/concurrency.rs b/objectstore-service/src/concurrency.rs index cb54dbf0..d444247d 100644 --- a/objectstore-service/src/concurrency.rs +++ b/objectstore-service/src/concurrency.rs @@ -13,6 +13,7 @@ use std::sync::Arc; use std::time::Duration; use futures_util::FutureExt; +use sentry::{Hub, SentryFutureExt, TransactionContext}; use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore}; use crate::error::{Error, Result}; @@ -152,31 +153,49 @@ where { objectstore_metrics::counter!("service.task.start": 1, "operation" => operation); + let hub = Hub::current(); + let span = hub.configure_scope(|scope| scope.get_span()); + + let new_hub = Hub::new_from_top(hub); + let transaction = new_hub.start_transaction(TransactionContext::continue_from_span( + operation, + "tokio.task", + span, + )); + + let scope_guard = new_hub.push_scope(); + new_hub.configure_scope(|scope| scope.set_span(Some(transaction.clone().into()))); + let (tx, rx) = tokio::sync::oneshot::channel(); - tokio::spawn(async move { - let start = tokio::time::Instant::now(); - let result = std::panic::AssertUnwindSafe(f) - .catch_unwind() - .await - .unwrap_or_else(|payload| Err(Error::panic(payload))); - - if let Err(ref e) = result { - tracing::error!( - operation, - error = e as &dyn std::error::Error, - "Task failed" - ); - } + tokio::spawn( + async move { + let start = tokio::time::Instant::now(); + let result = std::panic::AssertUnwindSafe(f) + .catch_unwind() + .await + .unwrap_or_else(|payload| Err(Error::panic(payload))); + + if let Err(ref e) = result { + tracing::error!( + operation, + error = e as &dyn std::error::Error, + "Task failed" + ); + } - objectstore_metrics::distribution!( - "service.task.duration"@s: start.elapsed(), - "operation" => operation, - "outcome" => if result.is_ok() { "success" } else { "error" } - ); + objectstore_metrics::distribution!( + "service.task.duration"@s: start.elapsed(), + "operation" => operation, + "outcome" => if result.is_ok() { "success" } else { "error" } + ); - let _ = tx.send(result); - drop(guard); - }); + let _ = tx.send(result); + drop(guard); + transaction.finish(); + drop(scope_guard); + } + .bind_hub(new_hub), + ); rx.await.map_err(|_| Error::Dropped)? }