From ff61cfb12319f8d0ccf51084e089643707fc9ad6 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 11 Mar 2026 11:39:48 +0100 Subject: [PATCH 1/8] fix(service): Bind Sentry Hub in concurrency::spawn_metered --- Cargo.lock | 1 + objectstore-service/Cargo.toml | 1 + objectstore-service/src/concurrency.rs | 48 ++++++++++++++------------ 3 files changed, 28 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e2f0a21d..e2fcb4f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2589,6 +2589,7 @@ dependencies = [ "objectstore-metrics", "objectstore-types", "reqwest 0.12.28", + "sentry", "serde", "serde_json", "tempfile", diff --git a/objectstore-service/Cargo.toml b/objectstore-service/Cargo.toml index 65910336..25293f27 100644 --- a/objectstore-service/Cargo.toml +++ b/objectstore-service/Cargo.toml @@ -27,6 +27,7 @@ reqwest = { workspace = true, features = [ "multipart", "json", ] } +sentry = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } thiserror = { workspace = true } diff --git a/objectstore-service/src/concurrency.rs b/objectstore-service/src/concurrency.rs index cb54dbf0..c2b48926 100644 --- a/objectstore-service/src/concurrency.rs +++ b/objectstore-service/src/concurrency.rs @@ -13,6 +13,7 @@ use std::sync::Arc; use std::time::Duration; use futures_util::FutureExt; +use sentry::{Hub, SentryFutureExt}; use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore}; use crate::error::{Error, Result}; @@ -153,30 +154,33 @@ where objectstore_metrics::counter!("service.task.start": 1, "operation" => operation); let (tx, rx) = tokio::sync::oneshot::channel(); - tokio::spawn(async move { - let start = tokio::time::Instant::now(); - let result = std::panic::AssertUnwindSafe(f) - .catch_unwind() - .await - .unwrap_or_else(|payload| Err(Error::panic(payload))); - - if let Err(ref e) = result { - tracing::error!( - operation, - error = e as &dyn std::error::Error, - "Task failed" - ); - } + tokio::spawn( + async move { + let start = tokio::time::Instant::now(); + let result = std::panic::AssertUnwindSafe(f) + .catch_unwind() + .await + .unwrap_or_else(|payload| Err(Error::panic(payload))); + + if let Err(ref e) = result { + tracing::error!( + operation, + error = e as &dyn std::error::Error, + "Task failed" + ); + } - objectstore_metrics::distribution!( - "service.task.duration"@s: start.elapsed(), - "operation" => operation, - "outcome" => if result.is_ok() { "success" } else { "error" } - ); + objectstore_metrics::distribution!( + "service.task.duration"@s: start.elapsed(), + "operation" => operation, + "outcome" => if result.is_ok() { "success" } else { "error" } + ); - let _ = tx.send(result); - drop(guard); - }); + let _ = tx.send(result); + drop(guard); + } + .bind_hub(Hub::current()), + ); rx.await.map_err(|_| Error::Dropped)? } From 1b6cfaeeff65c3f240f04a33a75642fa073a183b Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 11 Mar 2026 12:06:14 +0100 Subject: [PATCH 2/8] use new_from_top instead --- objectstore-service/src/concurrency.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/objectstore-service/src/concurrency.rs b/objectstore-service/src/concurrency.rs index c2b48926..cca54d00 100644 --- a/objectstore-service/src/concurrency.rs +++ b/objectstore-service/src/concurrency.rs @@ -179,7 +179,7 @@ where let _ = tx.send(result); drop(guard); } - .bind_hub(Hub::current()), + .bind_hub(Hub::new_from_top(Hub::current())), ); rx.await.map_err(|_| Error::Dropped)? } From 41fdb43689587b3b6c3f298de6e697923bfcf49d Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 11 Mar 2026 15:14:37 +0100 Subject: [PATCH 3/8] fix --- objectstore-service/src/concurrency.rs | 5 +++-- objectstore-service/src/service.rs | 21 ++++++++++++++++++++- objectstore-service/src/streaming.rs | 10 +++++++--- 3 files changed, 30 insertions(+), 6 deletions(-) diff --git a/objectstore-service/src/concurrency.rs b/objectstore-service/src/concurrency.rs index cca54d00..11c73d06 100644 --- a/objectstore-service/src/concurrency.rs +++ b/objectstore-service/src/concurrency.rs @@ -145,11 +145,12 @@ impl Drop for ConcurrencyPermit { /// `service.task.duration` (distribution) when the task completes, both tagged /// with the given `operation` name. The duration tag includes an `outcome` of /// `"success"` or `"error"`. -pub async fn spawn_metered(operation: &'static str, guard: G, f: F) -> Result +pub async fn spawn_metered(operation: &'static str, guard: G, f: F, hub: H) -> Result where T: Send + 'static, G: Send + 'static, F: Future> + Send + 'static, + H: Into>, { objectstore_metrics::counter!("service.task.start": 1, "operation" => operation); @@ -179,7 +180,7 @@ where let _ = tx.send(result); drop(guard); } - .bind_hub(Hub::new_from_top(Hub::current())), + .bind_hub(hub), ); rx.await.map_err(|_| Error::Dropped)? } diff --git a/objectstore-service/src/service.rs b/objectstore-service/src/service.rs index e0bb94d2..d16b2103 100644 --- a/objectstore-service/src/service.rs +++ b/objectstore-service/src/service.rs @@ -10,6 +10,7 @@ use std::path::Path; use std::sync::Arc; use objectstore_types::metadata::Metadata; +use sentry::{Hub, SentryFutureExt, Transaction, TransactionContext}; use crate::PayloadStream; use crate::backend::common::BoxedBackend; @@ -251,7 +252,25 @@ impl StorageService { objectstore_metrics::counter!("service.concurrency.rejected": 1); })?; - crate::concurrency::spawn_metered(operation, permit, f).await + let hub = Hub::current(); + let span = hub.configure_scope(|scope| scope.get_span()); + + let new_hub = Hub::new_from_top(hub); + let ctx = TransactionContext::continue_from_span("StorageService::spawn", operation, span); + let tx = new_hub.start_transaction(ctx); + + struct TransactionGuard { + inner: Option, + } + impl Drop for TransactionGuard { + fn drop(&mut self) { + self.inner.take().map(|x| x.finish()); + } + } + let tx_guard = TransactionGuard { inner: Some(tx) }; + + let to_drop = (permit, tx_guard); + crate::concurrency::spawn_metered(operation, to_drop, f, new_hub).await } /// Creates or overwrites an object. diff --git a/objectstore-service/src/streaming.rs b/objectstore-service/src/streaming.rs index 9d999cb0..24990c76 100644 --- a/objectstore-service/src/streaming.rs +++ b/objectstore-service/src/streaming.rs @@ -41,6 +41,7 @@ use std::sync::Arc; use futures_util::{Stream, StreamExt}; use objectstore_types::metadata::Metadata; +use sentry::Hub; use crate::PayloadStream; use crate::concurrency::ConcurrencyPermit; @@ -244,9 +245,12 @@ impl StreamExecutor { Err(e) => return (idx, Err(e)), }; - let spawn = crate::concurrency::spawn_metered(op.kind(), permit, async move { - execute_operation(tiered, context, op).await - }); + let spawn = crate::concurrency::spawn_metered( + op.kind(), + permit, + async move { execute_operation(tiered, context, op).await }, + Hub::current(), + ); (idx, spawn.await.map_err(E::from)) } From 100217904b988c7421d2a6a0a2b730168267fc55 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 11 Mar 2026 15:20:39 +0100 Subject: [PATCH 4/8] fix --- objectstore-service/src/concurrency.rs | 23 +++++++++++++++++++++++ objectstore-service/src/service.rs | 12 ++---------- objectstore-service/src/streaming.rs | 18 +++++++++++++++--- 3 files changed, 40 insertions(+), 13 deletions(-) diff --git a/objectstore-service/src/concurrency.rs b/objectstore-service/src/concurrency.rs index 11c73d06..7e45fc1a 100644 --- a/objectstore-service/src/concurrency.rs +++ b/objectstore-service/src/concurrency.rs @@ -135,6 +135,29 @@ impl Drop for ConcurrencyPermit { } } +/// RAII guard that finishes a [`sentry::Transaction`] on drop. +/// +/// Wraps the transaction in an `Option` so it can be consumed in `Drop::drop`, +/// which only receives `&mut self`. +pub(crate) struct TransactionGuard { + inner: Option, +} + +impl TransactionGuard { + /// Creates a new guard that will finish the given transaction on drop. + pub(crate) fn new(tx: sentry::Transaction) -> Self { + Self { inner: Some(tx) } + } +} + +impl Drop for TransactionGuard { + fn drop(&mut self) { + if let Some(tx) = self.inner.take() { + tx.finish(); + } + } +} + /// Spawns a future on a dedicated task with panic isolation and timing metrics. /// /// The `guard` is moved into the spawned task and dropped after the future diff --git a/objectstore-service/src/service.rs b/objectstore-service/src/service.rs index d16b2103..aa5ee357 100644 --- a/objectstore-service/src/service.rs +++ b/objectstore-service/src/service.rs @@ -10,7 +10,7 @@ use std::path::Path; use std::sync::Arc; use objectstore_types::metadata::Metadata; -use sentry::{Hub, SentryFutureExt, Transaction, TransactionContext}; +use sentry::{Hub, TransactionContext}; use crate::PayloadStream; use crate::backend::common::BoxedBackend; @@ -259,15 +259,7 @@ impl StorageService { let ctx = TransactionContext::continue_from_span("StorageService::spawn", operation, span); let tx = new_hub.start_transaction(ctx); - struct TransactionGuard { - inner: Option, - } - impl Drop for TransactionGuard { - fn drop(&mut self) { - self.inner.take().map(|x| x.finish()); - } - } - let tx_guard = TransactionGuard { inner: Some(tx) }; + let tx_guard = crate::concurrency::TransactionGuard::new(tx); let to_drop = (permit, tx_guard); crate::concurrency::spawn_metered(operation, to_drop, f, new_hub).await diff --git a/objectstore-service/src/streaming.rs b/objectstore-service/src/streaming.rs index 24990c76..02a0d84a 100644 --- a/objectstore-service/src/streaming.rs +++ b/objectstore-service/src/streaming.rs @@ -41,7 +41,7 @@ use std::sync::Arc; use futures_util::{Stream, StreamExt}; use objectstore_types::metadata::Metadata; -use sentry::Hub; +use sentry::{Hub, TransactionContext}; use crate::PayloadStream; use crate::concurrency::ConcurrencyPermit; @@ -245,11 +245,23 @@ impl StreamExecutor { Err(e) => return (idx, Err(e)), }; + let hub = Hub::current(); + let parent_span = hub.configure_scope(|scope| scope.get_span()); + let new_hub = Hub::new_from_top(hub); + let tx_ctx = TransactionContext::continue_from_span( + "StreamExecutor::execute", + op.kind(), + parent_span, + ); + let tx = new_hub.start_transaction(tx_ctx); + + let tx_guard = crate::concurrency::TransactionGuard::new(tx); + let spawn = crate::concurrency::spawn_metered( op.kind(), - permit, + (permit, tx_guard), async move { execute_operation(tiered, context, op).await }, - Hub::current(), + new_hub, ); (idx, spawn.await.map_err(E::from)) From e2f08111cc1fdf5cf343fa23f750e10abaa94e2c Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 11 Mar 2026 15:22:48 +0100 Subject: [PATCH 5/8] fix --- objectstore-service/src/service.rs | 1 - objectstore-service/src/streaming.rs | 3 ++- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/objectstore-service/src/service.rs b/objectstore-service/src/service.rs index aa5ee357..d02e8607 100644 --- a/objectstore-service/src/service.rs +++ b/objectstore-service/src/service.rs @@ -258,7 +258,6 @@ impl StorageService { let new_hub = Hub::new_from_top(hub); let ctx = TransactionContext::continue_from_span("StorageService::spawn", operation, span); let tx = new_hub.start_transaction(ctx); - let tx_guard = crate::concurrency::TransactionGuard::new(tx); let to_drop = (permit, tx_guard); diff --git a/objectstore-service/src/streaming.rs b/objectstore-service/src/streaming.rs index 02a0d84a..13fea775 100644 --- a/objectstore-service/src/streaming.rs +++ b/objectstore-service/src/streaming.rs @@ -256,10 +256,11 @@ impl StreamExecutor { let tx = new_hub.start_transaction(tx_ctx); let tx_guard = crate::concurrency::TransactionGuard::new(tx); + let to_drop = (permit, tx_guard); let spawn = crate::concurrency::spawn_metered( op.kind(), - (permit, tx_guard), + to_drop, async move { execute_operation(tiered, context, op).await }, new_hub, ); From 2a2279e3c25bd3bb8d50ae8dddc6fc30d9295cd7 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Thu, 12 Mar 2026 11:51:24 +0100 Subject: [PATCH 6/8] set tx on the scope --- objectstore-service/src/service.rs | 7 +++++-- objectstore-service/src/streaming.rs | 6 ++++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/objectstore-service/src/service.rs b/objectstore-service/src/service.rs index d02e8607..64e5f822 100644 --- a/objectstore-service/src/service.rs +++ b/objectstore-service/src/service.rs @@ -258,9 +258,12 @@ impl StorageService { let new_hub = Hub::new_from_top(hub); let ctx = TransactionContext::continue_from_span("StorageService::spawn", operation, span); let tx = new_hub.start_transaction(ctx); - let tx_guard = crate::concurrency::TransactionGuard::new(tx); - let to_drop = (permit, tx_guard); + let scope_guard = new_hub.push_scope(); + new_hub.configure_scope(|scope| scope.set_span(Some(tx.clone().into()))); + let tx_guard = crate::concurrency::TransactionGuard::new(tx.clone()); + + let to_drop = (permit, tx_guard, scope_guard); crate::concurrency::spawn_metered(operation, to_drop, f, new_hub).await } diff --git a/objectstore-service/src/streaming.rs b/objectstore-service/src/streaming.rs index 13fea775..07076b99 100644 --- a/objectstore-service/src/streaming.rs +++ b/objectstore-service/src/streaming.rs @@ -255,9 +255,11 @@ impl StreamExecutor { ); let tx = new_hub.start_transaction(tx_ctx); - let tx_guard = crate::concurrency::TransactionGuard::new(tx); - let to_drop = (permit, tx_guard); + let scope_guard = new_hub.push_scope(); + new_hub.configure_scope(|scope| scope.set_span(Some(tx.clone().into()))); + let tx_guard = crate::concurrency::TransactionGuard::new(tx.clone()); + let to_drop = (permit, tx_guard, scope_guard); let spawn = crate::concurrency::spawn_metered( op.kind(), to_drop, From c2417c76dc298e77b47b431241df4fa4e673ef7d Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Thu, 12 Mar 2026 13:37:55 +0100 Subject: [PATCH 7/8] improve --- objectstore-service/src/concurrency.rs | 45 +++++++++++--------------- objectstore-service/src/service.rs | 14 +------- objectstore-service/src/streaming.rs | 24 ++------------ 3 files changed, 22 insertions(+), 61 deletions(-) diff --git a/objectstore-service/src/concurrency.rs b/objectstore-service/src/concurrency.rs index 7e45fc1a..6c681dae 100644 --- a/objectstore-service/src/concurrency.rs +++ b/objectstore-service/src/concurrency.rs @@ -13,7 +13,7 @@ use std::sync::Arc; use std::time::Duration; use futures_util::FutureExt; -use sentry::{Hub, SentryFutureExt}; +use sentry::{Hub, SentryFutureExt, TransactionContext, protocol::Span}; use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore}; use crate::error::{Error, Result}; @@ -135,29 +135,6 @@ impl Drop for ConcurrencyPermit { } } -/// RAII guard that finishes a [`sentry::Transaction`] on drop. -/// -/// Wraps the transaction in an `Option` so it can be consumed in `Drop::drop`, -/// which only receives `&mut self`. -pub(crate) struct TransactionGuard { - inner: Option, -} - -impl TransactionGuard { - /// Creates a new guard that will finish the given transaction on drop. - pub(crate) fn new(tx: sentry::Transaction) -> Self { - Self { inner: Some(tx) } - } -} - -impl Drop for TransactionGuard { - fn drop(&mut self) { - if let Some(tx) = self.inner.take() { - tx.finish(); - } - } -} - /// Spawns a future on a dedicated task with panic isolation and timing metrics. /// /// The `guard` is moved into the spawned task and dropped after the future @@ -168,15 +145,27 @@ impl Drop for TransactionGuard { /// `service.task.duration` (distribution) when the task completes, both tagged /// with the given `operation` name. The duration tag includes an `outcome` of /// `"success"` or `"error"`. -pub async fn spawn_metered(operation: &'static str, guard: G, f: F, hub: H) -> Result +pub async fn spawn_metered(operation: &'static str, guard: G, f: F) -> Result where T: Send + 'static, G: Send + 'static, F: Future> + Send + 'static, - H: Into>, { objectstore_metrics::counter!("service.task.start": 1, "operation" => operation); + let hub = Hub::current(); + let span = hub.configure_scope(|scope| scope.get_span()); + + let new_hub = Hub::new_from_top(hub); + let transaction = new_hub.start_transaction(TransactionContext::continue_from_span( + operation, + "tokio.task", + span, + )); + + let scope_guard = new_hub.push_scope(); + new_hub.configure_scope(|scope| scope.set_span(Some(transaction.clone().into()))); + let (tx, rx) = tokio::sync::oneshot::channel(); tokio::spawn( async move { @@ -202,8 +191,10 @@ where let _ = tx.send(result); drop(guard); + transaction.finish(); + drop(scope_guard); } - .bind_hub(hub), + .bind_hub(new_hub), ); rx.await.map_err(|_| Error::Dropped)? } diff --git a/objectstore-service/src/service.rs b/objectstore-service/src/service.rs index 64e5f822..8a11c21c 100644 --- a/objectstore-service/src/service.rs +++ b/objectstore-service/src/service.rs @@ -252,19 +252,7 @@ impl StorageService { objectstore_metrics::counter!("service.concurrency.rejected": 1); })?; - let hub = Hub::current(); - let span = hub.configure_scope(|scope| scope.get_span()); - - let new_hub = Hub::new_from_top(hub); - let ctx = TransactionContext::continue_from_span("StorageService::spawn", operation, span); - let tx = new_hub.start_transaction(ctx); - - let scope_guard = new_hub.push_scope(); - new_hub.configure_scope(|scope| scope.set_span(Some(tx.clone().into()))); - let tx_guard = crate::concurrency::TransactionGuard::new(tx.clone()); - - let to_drop = (permit, tx_guard, scope_guard); - crate::concurrency::spawn_metered(operation, to_drop, f, new_hub).await + crate::concurrency::spawn_metered(operation, permit, f).await } /// Creates or overwrites an object. diff --git a/objectstore-service/src/streaming.rs b/objectstore-service/src/streaming.rs index 07076b99..c018d345 100644 --- a/objectstore-service/src/streaming.rs +++ b/objectstore-service/src/streaming.rs @@ -245,27 +245,9 @@ impl StreamExecutor { Err(e) => return (idx, Err(e)), }; - let hub = Hub::current(); - let parent_span = hub.configure_scope(|scope| scope.get_span()); - let new_hub = Hub::new_from_top(hub); - let tx_ctx = TransactionContext::continue_from_span( - "StreamExecutor::execute", - op.kind(), - parent_span, - ); - let tx = new_hub.start_transaction(tx_ctx); - - let scope_guard = new_hub.push_scope(); - new_hub.configure_scope(|scope| scope.set_span(Some(tx.clone().into()))); - let tx_guard = crate::concurrency::TransactionGuard::new(tx.clone()); - - let to_drop = (permit, tx_guard, scope_guard); - let spawn = crate::concurrency::spawn_metered( - op.kind(), - to_drop, - async move { execute_operation(tiered, context, op).await }, - new_hub, - ); + let spawn = crate::concurrency::spawn_metered(op.kind(), permit, async move { + execute_operation(tiered, context, op).await + }); (idx, spawn.await.map_err(E::from)) } From 400a4282a5967558d74a18502a3e7b0a398c3827 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Thu, 12 Mar 2026 13:39:16 +0100 Subject: [PATCH 8/8] fix --- objectstore-service/src/concurrency.rs | 2 +- objectstore-service/src/service.rs | 1 - objectstore-service/src/streaming.rs | 1 - 3 files changed, 1 insertion(+), 3 deletions(-) diff --git a/objectstore-service/src/concurrency.rs b/objectstore-service/src/concurrency.rs index 6c681dae..d444247d 100644 --- a/objectstore-service/src/concurrency.rs +++ b/objectstore-service/src/concurrency.rs @@ -13,7 +13,7 @@ use std::sync::Arc; use std::time::Duration; use futures_util::FutureExt; -use sentry::{Hub, SentryFutureExt, TransactionContext, protocol::Span}; +use sentry::{Hub, SentryFutureExt, TransactionContext}; use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore}; use crate::error::{Error, Result}; diff --git a/objectstore-service/src/service.rs b/objectstore-service/src/service.rs index 8a11c21c..e0bb94d2 100644 --- a/objectstore-service/src/service.rs +++ b/objectstore-service/src/service.rs @@ -10,7 +10,6 @@ use std::path::Path; use std::sync::Arc; use objectstore_types::metadata::Metadata; -use sentry::{Hub, TransactionContext}; use crate::PayloadStream; use crate::backend::common::BoxedBackend; diff --git a/objectstore-service/src/streaming.rs b/objectstore-service/src/streaming.rs index c018d345..9d999cb0 100644 --- a/objectstore-service/src/streaming.rs +++ b/objectstore-service/src/streaming.rs @@ -41,7 +41,6 @@ use std::sync::Arc; use futures_util::{Stream, StreamExt}; use objectstore_types::metadata::Metadata; -use sentry::{Hub, TransactionContext}; use crate::PayloadStream; use crate::concurrency::ConcurrencyPermit;