From 911fd9d2554731f568f3ed19e1812178ead1d522 Mon Sep 17 00:00:00 2001 From: Matt Van Horn <455140+mvanhorn@users.noreply.github.com> Date: Fri, 27 Mar 2026 03:08:34 -0700 Subject: [PATCH 1/5] feat: Make transport channel capacity configurable Add `transport_channel_capacity` field to `ClientOptions` that controls the bounded sync channel size used by the transport thread. Defaults to 30 (preserving current behavior). Users in high-throughput scenarios can increase this to reduce the chance of dropped envelopes. Closes #994 Co-Authored-By: Claude Opus 4.6 --- sentry-core/src/clientoptions.rs | 7 +++++++ sentry/src/transports/curl.rs | 3 ++- sentry/src/transports/reqwest.rs | 3 ++- sentry/src/transports/thread.rs | 4 ++-- sentry/src/transports/tokio_thread.rs | 4 ++-- sentry/src/transports/ureq.rs | 3 ++- 6 files changed, 17 insertions(+), 7 deletions(-) diff --git a/sentry-core/src/clientoptions.rs b/sentry-core/src/clientoptions.rs index 02d5d5a98..07d1d1092 100644 --- a/sentry-core/src/clientoptions.rs +++ b/sentry-core/src/clientoptions.rs @@ -191,6 +191,12 @@ pub struct ClientOptions { pub session_mode: SessionMode, /// The user agent that should be reported. pub user_agent: Cow<'static, str>, + /// The capacity of the transport channel. + /// + /// This controls how many envelopes can be queued before the transport + /// starts dropping them. In high-throughput scenarios, increasing this + /// value can reduce the chance of losing events. Defaults to 30. + pub transport_channel_capacity: usize, } impl ClientOptions { @@ -313,6 +319,7 @@ impl Default for ClientOptions { session_mode: SessionMode::Application, user_agent: Cow::Borrowed(USER_AGENT), max_request_body_size: MaxRequestBodySize::Medium, + transport_channel_capacity: 30, #[cfg(feature = "logs")] enable_logs: true, #[cfg(feature = "logs")] diff --git a/sentry/src/transports/curl.rs b/sentry/src/transports/curl.rs index f3ae2a864..6d81589f2 100644 --- a/sentry/src/transports/curl.rs +++ b/sentry/src/transports/curl.rs @@ -36,6 +36,7 @@ impl CurlHttpTransport { let url = dsn.envelope_api_url().to_string(); let scheme = dsn.scheme(); let accept_invalid_certs = options.accept_invalid_certs; + let channel_capacity = options.transport_channel_capacity; let mut handle = client; let thread = TransportThread::new(move |envelope, rl| { @@ -130,7 +131,7 @@ impl CurlHttpTransport { sentry_debug!("Failed to send envelope: {}", err); } } - }); + }, channel_capacity); Self { thread } } } diff --git a/sentry/src/transports/reqwest.rs b/sentry/src/transports/reqwest.rs index a125b18ff..d6e68da24 100644 --- a/sentry/src/transports/reqwest.rs +++ b/sentry/src/transports/reqwest.rs @@ -63,6 +63,7 @@ impl ReqwestHttpTransport { let user_agent = options.user_agent.clone(); let auth = dsn.to_auth(Some(&user_agent)).to_string(); let url = dsn.envelope_api_url().to_string(); + let channel_capacity = options.transport_channel_capacity; let thread = TransportThread::new(move |envelope, mut rl| { let mut body = Vec::new(); @@ -110,7 +111,7 @@ impl ReqwestHttpTransport { } rl } - }); + }, channel_capacity); Self { thread } } } diff --git a/sentry/src/transports/thread.rs b/sentry/src/transports/thread.rs index 4ab402600..30e1416ef 100644 --- a/sentry/src/transports/thread.rs +++ b/sentry/src/transports/thread.rs @@ -27,11 +27,11 @@ pub struct TransportThread { impl TransportThread { /// Spawn a new background thread. - pub fn new(mut send: SendFn) -> Self + pub fn new(mut send: SendFn, channel_capacity: usize) -> Self where SendFn: FnMut(Envelope, &mut RateLimiter) + Send + 'static, { - let (sender, receiver) = sync_channel(30); + let (sender, receiver) = sync_channel(channel_capacity); let shutdown = Arc::new(AtomicBool::new(false)); let shutdown_worker = shutdown.clone(); let handle = thread::Builder::new() diff --git a/sentry/src/transports/tokio_thread.rs b/sentry/src/transports/tokio_thread.rs index 398963882..790b1a6ed 100644 --- a/sentry/src/transports/tokio_thread.rs +++ b/sentry/src/transports/tokio_thread.rs @@ -27,13 +27,13 @@ pub struct TransportThread { impl TransportThread { /// Spawn a new background thread. - pub fn new(mut send: SendFn) -> Self + pub fn new(mut send: SendFn, channel_capacity: usize) -> Self where SendFn: FnMut(Envelope, RateLimiter) -> SendFuture + Send + 'static, // NOTE: returning RateLimiter here, otherwise we are in borrow hell SendFuture: std::future::Future, { - let (sender, receiver) = sync_channel(30); + let (sender, receiver) = sync_channel(channel_capacity); let shutdown = Arc::new(AtomicBool::new(false)); let shutdown_worker = shutdown.clone(); let handle = thread::Builder::new() diff --git a/sentry/src/transports/ureq.rs b/sentry/src/transports/ureq.rs index d079028f7..e884d1203 100644 --- a/sentry/src/transports/ureq.rs +++ b/sentry/src/transports/ureq.rs @@ -82,6 +82,7 @@ impl UreqHttpTransport { let user_agent = options.user_agent.clone(); let auth = dsn.to_auth(Some(&user_agent)).to_string(); let url = dsn.envelope_api_url().to_string(); + let channel_capacity = options.transport_channel_capacity; let thread = TransportThread::new(move |envelope, rl| { let mut body = Vec::new(); @@ -118,7 +119,7 @@ impl UreqHttpTransport { sentry_debug!("Failed to send envelope: {}", err); } } - }); + }, channel_capacity); Self { thread } } } From 9c8f904293204f4b83e845201daf78483d3c8c7e Mon Sep 17 00:00:00 2001 From: Matt Van Horn <455140+mvanhorn@users.noreply.github.com> Date: Mon, 13 Apr 2026 09:05:50 -0400 Subject: [PATCH 2/5] fix: add Debug impl, clamp channel capacity, run cargo fmt - Add transport_channel_capacity to manual Debug implementation - Clamp channel capacity to minimum of 1 to prevent rendezvous channel from silently dropping envelopes via try_send - Run cargo fmt to fix lint CI failure --- sentry-core/src/clientoptions.rs | 8 +- sentry/src/transports/curl.rs | 170 +++++++++++++------------- sentry/src/transports/reqwest.rs | 79 ++++++------ sentry/src/transports/thread.rs | 2 +- sentry/src/transports/tokio_thread.rs | 2 +- sentry/src/transports/ureq.rs | 64 +++++----- 6 files changed, 172 insertions(+), 153 deletions(-) diff --git a/sentry-core/src/clientoptions.rs b/sentry-core/src/clientoptions.rs index 07d1d1092..2e3f6c21c 100644 --- a/sentry-core/src/clientoptions.rs +++ b/sentry-core/src/clientoptions.rs @@ -284,7 +284,13 @@ impl fmt::Debug for ClientOptions { .field("enable_logs", &self.enable_logs) .field("before_send_log", &before_send_log); - debug_struct.field("user_agent", &self.user_agent).finish() + debug_struct + .field("user_agent", &self.user_agent) + .field( + "transport_channel_capacity", + &self.transport_channel_capacity, + ) + .finish() } } diff --git a/sentry/src/transports/curl.rs b/sentry/src/transports/curl.rs index 6d81589f2..cbbd2c904 100644 --- a/sentry/src/transports/curl.rs +++ b/sentry/src/transports/curl.rs @@ -39,99 +39,103 @@ impl CurlHttpTransport { let channel_capacity = options.transport_channel_capacity; let mut handle = client; - let thread = TransportThread::new(move |envelope, rl| { - handle.reset(); - handle.url(&url).unwrap(); - handle.custom_request("POST").unwrap(); - - if accept_invalid_certs { - handle.ssl_verify_host(false).unwrap(); - handle.ssl_verify_peer(false).unwrap(); - } - - match (scheme, &http_proxy, &https_proxy) { - (Scheme::Https, _, Some(proxy)) => { - if let Err(err) = handle.proxy(proxy) { - sentry_debug!("invalid proxy: {:?}", err); - } + let thread = TransportThread::new( + move |envelope, rl| { + handle.reset(); + handle.url(&url).unwrap(); + handle.custom_request("POST").unwrap(); + + if accept_invalid_certs { + handle.ssl_verify_host(false).unwrap(); + handle.ssl_verify_peer(false).unwrap(); } - (_, Some(proxy), _) => { - if let Err(err) = handle.proxy(proxy) { - sentry_debug!("invalid proxy: {:?}", err); + + match (scheme, &http_proxy, &https_proxy) { + (Scheme::Https, _, Some(proxy)) => { + if let Err(err) = handle.proxy(proxy) { + sentry_debug!("invalid proxy: {:?}", err); + } + } + (_, Some(proxy), _) => { + if let Err(err) = handle.proxy(proxy) { + sentry_debug!("invalid proxy: {:?}", err); + } } + _ => {} } - _ => {} - } - - let mut body = Vec::new(); - envelope.to_writer(&mut body).unwrap(); - let mut body = Cursor::new(body); - - let mut retry_after = None; - let mut sentry_header = None; - let mut headers = curl::easy::List::new(); - headers.append(&format!("X-Sentry-Auth: {auth}")).unwrap(); - headers.append("Expect:").unwrap(); - handle.http_headers(headers).unwrap(); - handle.upload(true).unwrap(); - handle.in_filesize(body.get_ref().len() as u64).unwrap(); - handle - .read_function(move |buf| Ok(body.read(buf).unwrap_or(0))) - .unwrap(); - handle.verbose(true).unwrap(); - handle - .debug_function(move |info, data| { - let prefix = match info { - curl::easy::InfoType::HeaderIn => "< ", - curl::easy::InfoType::HeaderOut => "> ", - curl::easy::InfoType::DataOut => "", - _ => return, - }; - sentry_debug!("curl: {}{}", prefix, String::from_utf8_lossy(data).trim()); - }) - .unwrap(); - - { - let mut handle = handle.transfer(); - let retry_after_setter = &mut retry_after; - let sentry_header_setter = &mut sentry_header; + + let mut body = Vec::new(); + envelope.to_writer(&mut body).unwrap(); + let mut body = Cursor::new(body); + + let mut retry_after = None; + let mut sentry_header = None; + let mut headers = curl::easy::List::new(); + headers.append(&format!("X-Sentry-Auth: {auth}")).unwrap(); + headers.append("Expect:").unwrap(); + handle.http_headers(headers).unwrap(); + handle.upload(true).unwrap(); + handle.in_filesize(body.get_ref().len() as u64).unwrap(); + handle + .read_function(move |buf| Ok(body.read(buf).unwrap_or(0))) + .unwrap(); + handle.verbose(true).unwrap(); handle - .header_function(move |data| { - if let Ok(data) = std::str::from_utf8(data) { - let mut iter = data.split(':'); - if let Some(key) = iter.next().map(str::to_lowercase) { - if key == "retry-after" { - *retry_after_setter = iter.next().map(|x| x.trim().to_string()); - } else if key == "x-sentry-rate-limits" { - *sentry_header_setter = - iter.next().map(|x| x.trim().to_string()); + .debug_function(move |info, data| { + let prefix = match info { + curl::easy::InfoType::HeaderIn => "< ", + curl::easy::InfoType::HeaderOut => "> ", + curl::easy::InfoType::DataOut => "", + _ => return, + }; + sentry_debug!("curl: {}{}", prefix, String::from_utf8_lossy(data).trim()); + }) + .unwrap(); + + { + let mut handle = handle.transfer(); + let retry_after_setter = &mut retry_after; + let sentry_header_setter = &mut sentry_header; + handle + .header_function(move |data| { + if let Ok(data) = std::str::from_utf8(data) { + let mut iter = data.split(':'); + if let Some(key) = iter.next().map(str::to_lowercase) { + if key == "retry-after" { + *retry_after_setter = + iter.next().map(|x| x.trim().to_string()); + } else if key == "x-sentry-rate-limits" { + *sentry_header_setter = + iter.next().map(|x| x.trim().to_string()); + } } } + true + }) + .unwrap(); + handle.perform().ok(); + } + + match handle.response_code() { + Ok(response_code) => { + if let Some(sentry_header) = sentry_header { + rl.update_from_sentry_header(&sentry_header); + } else if let Some(retry_after) = retry_after { + rl.update_from_retry_after(&retry_after); + } else if response_code == 429 { + rl.update_from_429(); + } + if response_code == HTTP_PAYLOAD_TOO_LARGE as u32 { + sentry_debug!("{HTTP_PAYLOAD_TOO_LARGE_MESSAGE}"); } - true - }) - .unwrap(); - handle.perform().ok(); - } - - match handle.response_code() { - Ok(response_code) => { - if let Some(sentry_header) = sentry_header { - rl.update_from_sentry_header(&sentry_header); - } else if let Some(retry_after) = retry_after { - rl.update_from_retry_after(&retry_after); - } else if response_code == 429 { - rl.update_from_429(); } - if response_code == HTTP_PAYLOAD_TOO_LARGE as u32 { - sentry_debug!("{HTTP_PAYLOAD_TOO_LARGE_MESSAGE}"); + Err(err) => { + sentry_debug!("Failed to send envelope: {}", err); } } - Err(err) => { - sentry_debug!("Failed to send envelope: {}", err); - } - } - }, channel_capacity); + }, + channel_capacity, + ); Self { thread } } } diff --git a/sentry/src/transports/reqwest.rs b/sentry/src/transports/reqwest.rs index d6e68da24..50f9191fd 100644 --- a/sentry/src/transports/reqwest.rs +++ b/sentry/src/transports/reqwest.rs @@ -65,53 +65,56 @@ impl ReqwestHttpTransport { let url = dsn.envelope_api_url().to_string(); let channel_capacity = options.transport_channel_capacity; - let thread = TransportThread::new(move |envelope, mut rl| { - let mut body = Vec::new(); - envelope.to_writer(&mut body).unwrap(); - let request = client.post(&url).header("X-Sentry-Auth", &auth).body(body); + let thread = TransportThread::new( + move |envelope, mut rl| { + let mut body = Vec::new(); + envelope.to_writer(&mut body).unwrap(); + let request = client.post(&url).header("X-Sentry-Auth", &auth).body(body); - // NOTE: because of lifetime issues, building the request using the - // `client` has to happen outside of this async block. - async move { - match request.send().await { - Ok(response) => { - let headers = response.headers(); + // NOTE: because of lifetime issues, building the request using the + // `client` has to happen outside of this async block. + async move { + match request.send().await { + Ok(response) => { + let headers = response.headers(); - if let Some(sentry_header) = headers - .get("x-sentry-rate-limits") - .and_then(|x| x.to_str().ok()) - { - rl.update_from_sentry_header(sentry_header); - } else if let Some(retry_after) = headers - .get(ReqwestHeaders::RETRY_AFTER) - .and_then(|x| x.to_str().ok()) - { - rl.update_from_retry_after(retry_after); - } else if response.status() == StatusCode::TOO_MANY_REQUESTS { - rl.update_from_429(); - } + if let Some(sentry_header) = headers + .get("x-sentry-rate-limits") + .and_then(|x| x.to_str().ok()) + { + rl.update_from_sentry_header(sentry_header); + } else if let Some(retry_after) = headers + .get(ReqwestHeaders::RETRY_AFTER) + .and_then(|x| x.to_str().ok()) + { + rl.update_from_retry_after(retry_after); + } else if response.status() == StatusCode::TOO_MANY_REQUESTS { + rl.update_from_429(); + } - let is_payload_too_large = - response.status().as_u16() == HTTP_PAYLOAD_TOO_LARGE; - match response.text().await { - Err(err) => { - sentry_debug!("Failed to read sentry response: {}", err); + let is_payload_too_large = + response.status().as_u16() == HTTP_PAYLOAD_TOO_LARGE; + match response.text().await { + Err(err) => { + sentry_debug!("Failed to read sentry response: {}", err); + } + Ok(text) => { + sentry_debug!("Get response: `{}`", text); + } } - Ok(text) => { - sentry_debug!("Get response: `{}`", text); + if is_payload_too_large { + sentry_debug!("{HTTP_PAYLOAD_TOO_LARGE_MESSAGE}"); } } - if is_payload_too_large { - sentry_debug!("{HTTP_PAYLOAD_TOO_LARGE_MESSAGE}"); + Err(err) => { + sentry_debug!("Failed to send envelope: {}", err); } } - Err(err) => { - sentry_debug!("Failed to send envelope: {}", err); - } + rl } - rl - } - }, channel_capacity); + }, + channel_capacity, + ); Self { thread } } } diff --git a/sentry/src/transports/thread.rs b/sentry/src/transports/thread.rs index 30e1416ef..6d133ac12 100644 --- a/sentry/src/transports/thread.rs +++ b/sentry/src/transports/thread.rs @@ -31,7 +31,7 @@ impl TransportThread { where SendFn: FnMut(Envelope, &mut RateLimiter) + Send + 'static, { - let (sender, receiver) = sync_channel(channel_capacity); + let (sender, receiver) = sync_channel(channel_capacity.max(1)); let shutdown = Arc::new(AtomicBool::new(false)); let shutdown_worker = shutdown.clone(); let handle = thread::Builder::new() diff --git a/sentry/src/transports/tokio_thread.rs b/sentry/src/transports/tokio_thread.rs index 790b1a6ed..1487f2adf 100644 --- a/sentry/src/transports/tokio_thread.rs +++ b/sentry/src/transports/tokio_thread.rs @@ -33,7 +33,7 @@ impl TransportThread { // NOTE: returning RateLimiter here, otherwise we are in borrow hell SendFuture: std::future::Future, { - let (sender, receiver) = sync_channel(channel_capacity); + let (sender, receiver) = sync_channel(channel_capacity.max(1)); let shutdown = Arc::new(AtomicBool::new(false)); let shutdown_worker = shutdown.clone(); let handle = thread::Builder::new() diff --git a/sentry/src/transports/ureq.rs b/sentry/src/transports/ureq.rs index e884d1203..599e9b535 100644 --- a/sentry/src/transports/ureq.rs +++ b/sentry/src/transports/ureq.rs @@ -84,42 +84,48 @@ impl UreqHttpTransport { let url = dsn.envelope_api_url().to_string(); let channel_capacity = options.transport_channel_capacity; - let thread = TransportThread::new(move |envelope, rl| { - let mut body = Vec::new(); - envelope.to_writer(&mut body).unwrap(); - let request = agent.post(&url).header("X-Sentry-Auth", &auth).send(&body); - - match request { - Ok(mut response) => { - fn header_str<'a, B>(response: &'a Response, key: &str) -> Option<&'a str> { - response.headers().get(key)?.to_str().ok() - } + let thread = TransportThread::new( + move |envelope, rl| { + let mut body = Vec::new(); + envelope.to_writer(&mut body).unwrap(); + let request = agent.post(&url).header("X-Sentry-Auth", &auth).send(&body); + + match request { + Ok(mut response) => { + fn header_str<'a, B>( + response: &'a Response, + key: &str, + ) -> Option<&'a str> { + response.headers().get(key)?.to_str().ok() + } - if let Some(sentry_header) = header_str(&response, "x-sentry-rate-limits") { - rl.update_from_sentry_header(sentry_header); - } else if let Some(retry_after) = header_str(&response, "retry-after") { - rl.update_from_retry_after(retry_after); - } else if response.status() == 429 { - rl.update_from_429(); - } + if let Some(sentry_header) = header_str(&response, "x-sentry-rate-limits") { + rl.update_from_sentry_header(sentry_header); + } else if let Some(retry_after) = header_str(&response, "retry-after") { + rl.update_from_retry_after(retry_after); + } else if response.status() == 429 { + rl.update_from_429(); + } - match response.body_mut().read_to_string() { - Err(err) => { - sentry_debug!("Failed to read sentry response: {}", err); + match response.body_mut().read_to_string() { + Err(err) => { + sentry_debug!("Failed to read sentry response: {}", err); + } + Ok(text) => { + sentry_debug!("Get response: `{}`", text); + } } - Ok(text) => { - sentry_debug!("Get response: `{}`", text); + if response.status() == HTTP_PAYLOAD_TOO_LARGE { + sentry_debug!("{HTTP_PAYLOAD_TOO_LARGE_MESSAGE}"); } } - if response.status() == HTTP_PAYLOAD_TOO_LARGE { - sentry_debug!("{HTTP_PAYLOAD_TOO_LARGE_MESSAGE}"); + Err(err) => { + sentry_debug!("Failed to send envelope: {}", err); } } - Err(err) => { - sentry_debug!("Failed to send envelope: {}", err); - } - } - }, channel_capacity); + }, + channel_capacity, + ); Self { thread } } } From fbff3ea24d782a35565b6e0f324a0cb11b7edebe Mon Sep 17 00:00:00 2001 From: Matt Van Horn <455140+mvanhorn@users.noreply.github.com> Date: Tue, 14 Apr 2026 21:13:25 -0400 Subject: [PATCH 3/5] refactor: make channel capacity additive via with_capacity/with_channel_capacity Per review: avoid public API breakages in ClientOptions and the transport constructors. Replace the 'transport_channel_capacity: usize' field and the changed TransportThread/transport constructor signatures with purely additive APIs: - TransportThread::new(send) restored to original signature, delegates to with_capacity(send, 30). TransportThread::with_capacity(send, capacity) is the new entry point for custom capacity. - Same pattern for tokio_thread::TransportThread. - ReqwestHttpTransport/CurlHttpTransport/UreqHttpTransport gain with_channel_capacity(options, capacity). Existing new/with_client/ with_agent keep the default capacity of 30. - Remove transport_channel_capacity from ClientOptions (field, Default, and Debug impl). Users that need to override capacity configure it via ClientOptions.transport with a factory returning the desired transport, e.g. Arc::new(move |opts| Arc::new(ReqwestHttpTransport::with_channel_capacity(opts, 256))). --- sentry-core/src/clientoptions.rs | 15 +-------------- sentry/src/transports/curl.rs | 22 +++++++++++++++++----- sentry/src/transports/reqwest.rs | 22 +++++++++++++++++----- sentry/src/transports/thread.rs | 17 +++++++++++++++-- sentry/src/transports/tokio_thread.rs | 19 +++++++++++++++++-- sentry/src/transports/ureq.rs | 22 +++++++++++++++++----- 6 files changed, 84 insertions(+), 33 deletions(-) diff --git a/sentry-core/src/clientoptions.rs b/sentry-core/src/clientoptions.rs index 2e3f6c21c..02d5d5a98 100644 --- a/sentry-core/src/clientoptions.rs +++ b/sentry-core/src/clientoptions.rs @@ -191,12 +191,6 @@ pub struct ClientOptions { pub session_mode: SessionMode, /// The user agent that should be reported. pub user_agent: Cow<'static, str>, - /// The capacity of the transport channel. - /// - /// This controls how many envelopes can be queued before the transport - /// starts dropping them. In high-throughput scenarios, increasing this - /// value can reduce the chance of losing events. Defaults to 30. - pub transport_channel_capacity: usize, } impl ClientOptions { @@ -284,13 +278,7 @@ impl fmt::Debug for ClientOptions { .field("enable_logs", &self.enable_logs) .field("before_send_log", &before_send_log); - debug_struct - .field("user_agent", &self.user_agent) - .field( - "transport_channel_capacity", - &self.transport_channel_capacity, - ) - .finish() + debug_struct.field("user_agent", &self.user_agent).finish() } } @@ -325,7 +313,6 @@ impl Default for ClientOptions { session_mode: SessionMode::Application, user_agent: Cow::Borrowed(USER_AGENT), max_request_body_size: MaxRequestBodySize::Medium, - transport_channel_capacity: 30, #[cfg(feature = "logs")] enable_logs: true, #[cfg(feature = "logs")] diff --git a/sentry/src/transports/curl.rs b/sentry/src/transports/curl.rs index cbbd2c904..27f28b5bf 100644 --- a/sentry/src/transports/curl.rs +++ b/sentry/src/transports/curl.rs @@ -18,15 +18,28 @@ pub struct CurlHttpTransport { impl CurlHttpTransport { /// Creates a new Transport. pub fn new(options: &ClientOptions) -> Self { - Self::new_internal(options, None) + Self::new_internal(options, None, 30) } /// Creates a new Transport that uses the specified [`CurlClient`]. pub fn with_client(options: &ClientOptions, client: CurlClient) -> Self { - Self::new_internal(options, Some(client)) + Self::new_internal(options, Some(client), 30) } - fn new_internal(options: &ClientOptions, client: Option) -> Self { + /// Creates a new Transport with a custom transport channel capacity. + /// + /// The channel capacity bounds how many envelopes may be queued before + /// `send_envelope` blocks. A higher capacity reduces the chance of + /// dropped events in high-throughput scenarios at the cost of memory. + pub fn with_channel_capacity(options: &ClientOptions, channel_capacity: usize) -> Self { + Self::new_internal(options, None, channel_capacity) + } + + fn new_internal( + options: &ClientOptions, + client: Option, + channel_capacity: usize, + ) -> Self { let client = client.unwrap_or_else(CurlClient::new); let http_proxy = options.http_proxy.as_ref().map(ToString::to_string); let https_proxy = options.https_proxy.as_ref().map(ToString::to_string); @@ -36,10 +49,9 @@ impl CurlHttpTransport { let url = dsn.envelope_api_url().to_string(); let scheme = dsn.scheme(); let accept_invalid_certs = options.accept_invalid_certs; - let channel_capacity = options.transport_channel_capacity; let mut handle = client; - let thread = TransportThread::new( + let thread = TransportThread::with_capacity( move |envelope, rl| { handle.reset(); handle.url(&url).unwrap(); diff --git a/sentry/src/transports/reqwest.rs b/sentry/src/transports/reqwest.rs index 50f9191fd..ab5d4365a 100644 --- a/sentry/src/transports/reqwest.rs +++ b/sentry/src/transports/reqwest.rs @@ -21,15 +21,28 @@ pub struct ReqwestHttpTransport { impl ReqwestHttpTransport { /// Creates a new Transport. pub fn new(options: &ClientOptions) -> Self { - Self::new_internal(options, None) + Self::new_internal(options, None, 30) } /// Creates a new Transport that uses the specified [`ReqwestClient`]. pub fn with_client(options: &ClientOptions, client: ReqwestClient) -> Self { - Self::new_internal(options, Some(client)) + Self::new_internal(options, Some(client), 30) } - fn new_internal(options: &ClientOptions, client: Option) -> Self { + /// Creates a new Transport with a custom transport channel capacity. + /// + /// The channel capacity bounds how many envelopes may be queued before + /// `send_envelope` blocks. A higher capacity reduces the chance of + /// dropped events in high-throughput scenarios at the cost of memory. + pub fn with_channel_capacity(options: &ClientOptions, channel_capacity: usize) -> Self { + Self::new_internal(options, None, channel_capacity) + } + + fn new_internal( + options: &ClientOptions, + client: Option, + channel_capacity: usize, + ) -> Self { let client = client.unwrap_or_else(|| { let mut builder = reqwest::Client::builder(); if options.accept_invalid_certs { @@ -63,9 +76,8 @@ impl ReqwestHttpTransport { let user_agent = options.user_agent.clone(); let auth = dsn.to_auth(Some(&user_agent)).to_string(); let url = dsn.envelope_api_url().to_string(); - let channel_capacity = options.transport_channel_capacity; - let thread = TransportThread::new( + let thread = TransportThread::with_capacity( move |envelope, mut rl| { let mut body = Vec::new(); envelope.to_writer(&mut body).unwrap(); diff --git a/sentry/src/transports/thread.rs b/sentry/src/transports/thread.rs index 6d133ac12..7e2f2aac0 100644 --- a/sentry/src/transports/thread.rs +++ b/sentry/src/transports/thread.rs @@ -26,8 +26,21 @@ pub struct TransportThread { } impl TransportThread { - /// Spawn a new background thread. - pub fn new(mut send: SendFn, channel_capacity: usize) -> Self + /// Spawn a new background thread with the default channel capacity of 30. + pub fn new(send: SendFn) -> Self + where + SendFn: FnMut(Envelope, &mut RateLimiter) + Send + 'static, + { + Self::with_capacity(send, 30) + } + + /// Spawn a new background thread with a custom channel capacity. + /// + /// The channel capacity bounds how many envelopes may be queued before + /// `send` blocks. `channel_capacity` is clamped to a minimum of 1 to + /// avoid a rendezvous channel, which would silently drop envelopes under + /// `try_send`. + pub fn with_capacity(mut send: SendFn, channel_capacity: usize) -> Self where SendFn: FnMut(Envelope, &mut RateLimiter) + Send + 'static, { diff --git a/sentry/src/transports/tokio_thread.rs b/sentry/src/transports/tokio_thread.rs index 1487f2adf..f862eb3a6 100644 --- a/sentry/src/transports/tokio_thread.rs +++ b/sentry/src/transports/tokio_thread.rs @@ -26,8 +26,23 @@ pub struct TransportThread { } impl TransportThread { - /// Spawn a new background thread. - pub fn new(mut send: SendFn, channel_capacity: usize) -> Self + /// Spawn a new background thread with the default channel capacity of 30. + pub fn new(send: SendFn) -> Self + where + SendFn: FnMut(Envelope, RateLimiter) -> SendFuture + Send + 'static, + // NOTE: returning RateLimiter here, otherwise we are in borrow hell + SendFuture: std::future::Future, + { + Self::with_capacity(send, 30) + } + + /// Spawn a new background thread with a custom channel capacity. + /// + /// The channel capacity bounds how many envelopes may be queued before + /// `send` blocks. `channel_capacity` is clamped to a minimum of 1 to + /// avoid a rendezvous channel, which would silently drop envelopes under + /// `try_send`. + pub fn with_capacity(mut send: SendFn, channel_capacity: usize) -> Self where SendFn: FnMut(Envelope, RateLimiter) -> SendFuture + Send + 'static, // NOTE: returning RateLimiter here, otherwise we are in borrow hell diff --git a/sentry/src/transports/ureq.rs b/sentry/src/transports/ureq.rs index 599e9b535..dc236a9a7 100644 --- a/sentry/src/transports/ureq.rs +++ b/sentry/src/transports/ureq.rs @@ -20,15 +20,28 @@ pub struct UreqHttpTransport { impl UreqHttpTransport { /// Creates a new Transport. pub fn new(options: &ClientOptions) -> Self { - Self::new_internal(options, None) + Self::new_internal(options, None, 30) } /// Creates a new Transport that uses the specified [`ureq::Agent`]. pub fn with_agent(options: &ClientOptions, agent: Agent) -> Self { - Self::new_internal(options, Some(agent)) + Self::new_internal(options, Some(agent), 30) } - fn new_internal(options: &ClientOptions, agent: Option) -> Self { + /// Creates a new Transport with a custom transport channel capacity. + /// + /// The channel capacity bounds how many envelopes may be queued before + /// `send_envelope` blocks. A higher capacity reduces the chance of + /// dropped events in high-throughput scenarios at the cost of memory. + pub fn with_channel_capacity(options: &ClientOptions, channel_capacity: usize) -> Self { + Self::new_internal(options, None, channel_capacity) + } + + fn new_internal( + options: &ClientOptions, + agent: Option, + channel_capacity: usize, + ) -> Self { let dsn = options.dsn.as_ref().unwrap(); let scheme = dsn.scheme(); let agent = agent.unwrap_or_else(|| { @@ -82,9 +95,8 @@ impl UreqHttpTransport { let user_agent = options.user_agent.clone(); let auth = dsn.to_auth(Some(&user_agent)).to_string(); let url = dsn.envelope_api_url().to_string(); - let channel_capacity = options.transport_channel_capacity; - let thread = TransportThread::new( + let thread = TransportThread::with_capacity( move |envelope, rl| { let mut body = Vec::new(); envelope.to_writer(&mut body).unwrap(); From 6d1c9ff12103628bbf3c804b00e719f4cd8c6343 Mon Sep 17 00:00:00 2001 From: Matt Van Horn <455140+mvanhorn@users.noreply.github.com> Date: Tue, 5 May 2026 08:59:59 -0700 Subject: [PATCH 4/5] fix: make with_capacity pub(crate) and extract DEFAULT_CHANNEL_CAPACITY const Per @szokeasaurusrex review: with_capacity narrowed to pub(crate) on both TransportThread types; the 30 literal extracted to a shared pub(crate) const reused across curl, reqwest, and ureq transports. Includes: Authorized scope (mod.rs) + correct extension (ureq.rs has the same literal). Mechanical change verified by build + fmt; no runtime effect needs test coverage. --- sentry/src/transports/curl.rs | 9 ++++++--- sentry/src/transports/mod.rs | 3 +++ sentry/src/transports/reqwest.rs | 7 ++++--- sentry/src/transports/thread.rs | 5 +++-- sentry/src/transports/tokio_thread.rs | 8 ++++++-- sentry/src/transports/ureq.rs | 9 ++++++--- 6 files changed, 28 insertions(+), 13 deletions(-) diff --git a/sentry/src/transports/curl.rs b/sentry/src/transports/curl.rs index 27f28b5bf..2f4c27259 100644 --- a/sentry/src/transports/curl.rs +++ b/sentry/src/transports/curl.rs @@ -3,7 +3,10 @@ use std::time::Duration; use curl::easy::Easy as CurlClient; -use super::{thread::TransportThread, HTTP_PAYLOAD_TOO_LARGE, HTTP_PAYLOAD_TOO_LARGE_MESSAGE}; +use super::{ + thread::TransportThread, DEFAULT_CHANNEL_CAPACITY, HTTP_PAYLOAD_TOO_LARGE, + HTTP_PAYLOAD_TOO_LARGE_MESSAGE, +}; use crate::{sentry_debug, types::Scheme, ClientOptions, Envelope, Transport}; @@ -18,12 +21,12 @@ pub struct CurlHttpTransport { impl CurlHttpTransport { /// Creates a new Transport. pub fn new(options: &ClientOptions) -> Self { - Self::new_internal(options, None, 30) + Self::new_internal(options, None, DEFAULT_CHANNEL_CAPACITY) } /// Creates a new Transport that uses the specified [`CurlClient`]. pub fn with_client(options: &ClientOptions, client: CurlClient) -> Self { - Self::new_internal(options, Some(client), 30) + Self::new_internal(options, Some(client), DEFAULT_CHANNEL_CAPACITY) } /// Creates a new Transport with a custom transport channel capacity. diff --git a/sentry/src/transports/mod.rs b/sentry/src/transports/mod.rs index 7e959612b..a18ebcc98 100644 --- a/sentry/src/transports/mod.rs +++ b/sentry/src/transports/mod.rs @@ -48,6 +48,9 @@ pub(crate) const HTTP_PAYLOAD_TOO_LARGE: u16 = 413; pub(crate) const HTTP_PAYLOAD_TOO_LARGE_MESSAGE: &str = "Envelope was discarded due to size limits (HTTP 413)."; +#[cfg(sentry_any_http_transport)] +pub(crate) const DEFAULT_CHANNEL_CAPACITY: usize = 30; + #[cfg(feature = "reqwest")] type DefaultTransport = ReqwestHttpTransport; diff --git a/sentry/src/transports/reqwest.rs b/sentry/src/transports/reqwest.rs index ab5d4365a..0b1137b05 100644 --- a/sentry/src/transports/reqwest.rs +++ b/sentry/src/transports/reqwest.rs @@ -3,7 +3,8 @@ use std::time::Duration; use reqwest::{header as ReqwestHeaders, Client as ReqwestClient, Proxy, StatusCode}; use super::{ - tokio_thread::TransportThread, HTTP_PAYLOAD_TOO_LARGE, HTTP_PAYLOAD_TOO_LARGE_MESSAGE, + tokio_thread::TransportThread, DEFAULT_CHANNEL_CAPACITY, HTTP_PAYLOAD_TOO_LARGE, + HTTP_PAYLOAD_TOO_LARGE_MESSAGE, }; use crate::{sentry_debug, ClientOptions, Envelope, Transport}; @@ -21,12 +22,12 @@ pub struct ReqwestHttpTransport { impl ReqwestHttpTransport { /// Creates a new Transport. pub fn new(options: &ClientOptions) -> Self { - Self::new_internal(options, None, 30) + Self::new_internal(options, None, DEFAULT_CHANNEL_CAPACITY) } /// Creates a new Transport that uses the specified [`ReqwestClient`]. pub fn with_client(options: &ClientOptions, client: ReqwestClient) -> Self { - Self::new_internal(options, Some(client), 30) + Self::new_internal(options, Some(client), DEFAULT_CHANNEL_CAPACITY) } /// Creates a new Transport with a custom transport channel capacity. diff --git a/sentry/src/transports/thread.rs b/sentry/src/transports/thread.rs index 7e2f2aac0..156cb6d11 100644 --- a/sentry/src/transports/thread.rs +++ b/sentry/src/transports/thread.rs @@ -5,6 +5,7 @@ use std::thread::{self, JoinHandle}; use std::time::Duration; use super::ratelimit::{RateLimiter, RateLimitingCategory}; +use super::DEFAULT_CHANNEL_CAPACITY; use crate::{sentry_debug, Envelope}; #[expect( @@ -31,7 +32,7 @@ impl TransportThread { where SendFn: FnMut(Envelope, &mut RateLimiter) + Send + 'static, { - Self::with_capacity(send, 30) + Self::with_capacity(send, DEFAULT_CHANNEL_CAPACITY) } /// Spawn a new background thread with a custom channel capacity. @@ -40,7 +41,7 @@ impl TransportThread { /// `send` blocks. `channel_capacity` is clamped to a minimum of 1 to /// avoid a rendezvous channel, which would silently drop envelopes under /// `try_send`. - pub fn with_capacity(mut send: SendFn, channel_capacity: usize) -> Self + pub(crate) fn with_capacity(mut send: SendFn, channel_capacity: usize) -> Self where SendFn: FnMut(Envelope, &mut RateLimiter) + Send + 'static, { diff --git a/sentry/src/transports/tokio_thread.rs b/sentry/src/transports/tokio_thread.rs index f862eb3a6..657399118 100644 --- a/sentry/src/transports/tokio_thread.rs +++ b/sentry/src/transports/tokio_thread.rs @@ -5,6 +5,7 @@ use std::thread::{self, JoinHandle}; use std::time::Duration; use super::ratelimit::{RateLimiter, RateLimitingCategory}; +use super::DEFAULT_CHANNEL_CAPACITY; use crate::{sentry_debug, Envelope}; #[expect( @@ -33,7 +34,7 @@ impl TransportThread { // NOTE: returning RateLimiter here, otherwise we are in borrow hell SendFuture: std::future::Future, { - Self::with_capacity(send, 30) + Self::with_capacity(send, DEFAULT_CHANNEL_CAPACITY) } /// Spawn a new background thread with a custom channel capacity. @@ -42,7 +43,10 @@ impl TransportThread { /// `send` blocks. `channel_capacity` is clamped to a minimum of 1 to /// avoid a rendezvous channel, which would silently drop envelopes under /// `try_send`. - pub fn with_capacity(mut send: SendFn, channel_capacity: usize) -> Self + pub(crate) fn with_capacity( + mut send: SendFn, + channel_capacity: usize, + ) -> Self where SendFn: FnMut(Envelope, RateLimiter) -> SendFuture + Send + 'static, // NOTE: returning RateLimiter here, otherwise we are in borrow hell diff --git a/sentry/src/transports/ureq.rs b/sentry/src/transports/ureq.rs index dc236a9a7..614f0787d 100644 --- a/sentry/src/transports/ureq.rs +++ b/sentry/src/transports/ureq.rs @@ -5,7 +5,10 @@ use ureq::http::Response; use ureq::tls::{TlsConfig, TlsProvider}; use ureq::{Agent, Proxy}; -use super::{thread::TransportThread, HTTP_PAYLOAD_TOO_LARGE, HTTP_PAYLOAD_TOO_LARGE_MESSAGE}; +use super::{ + thread::TransportThread, DEFAULT_CHANNEL_CAPACITY, HTTP_PAYLOAD_TOO_LARGE, + HTTP_PAYLOAD_TOO_LARGE_MESSAGE, +}; use crate::{sentry_debug, types::Scheme, ClientOptions, Envelope, Transport}; @@ -20,12 +23,12 @@ pub struct UreqHttpTransport { impl UreqHttpTransport { /// Creates a new Transport. pub fn new(options: &ClientOptions) -> Self { - Self::new_internal(options, None, 30) + Self::new_internal(options, None, DEFAULT_CHANNEL_CAPACITY) } /// Creates a new Transport that uses the specified [`ureq::Agent`]. pub fn with_agent(options: &ClientOptions, agent: Agent) -> Self { - Self::new_internal(options, Some(agent), 30) + Self::new_internal(options, Some(agent), DEFAULT_CHANNEL_CAPACITY) } /// Creates a new Transport with a custom transport channel capacity. From c819ce9dc77fcb6afd7ff7d8767107939a6ff832 Mon Sep 17 00:00:00 2001 From: Matt Van Horn <455140+mvanhorn@users.noreply.github.com> Date: Wed, 6 May 2026 18:27:26 -0700 Subject: [PATCH 5/5] fix(transports): honor channel_capacity=0 (rendezvous), drop the .max(1) clamp Per @szokeasaurusrex's review: this is an advanced opt-in transport API, and sync_channel(0) has defined rendezvous semantics. With try_send, capacity 0 accepts an envelope when the transport thread is currently waiting on the receiver and drops it otherwise, which is a valid no-buffer back-pressure policy rather than "drop everything." Update the doc comment in thread.rs and tokio_thread.rs to describe that behavior accurately. --- sentry/src/transports/thread.rs | 10 ++++++---- sentry/src/transports/tokio_thread.rs | 10 ++++++---- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/sentry/src/transports/thread.rs b/sentry/src/transports/thread.rs index 156cb6d11..51bf6b78a 100644 --- a/sentry/src/transports/thread.rs +++ b/sentry/src/transports/thread.rs @@ -38,14 +38,16 @@ impl TransportThread { /// Spawn a new background thread with a custom channel capacity. /// /// The channel capacity bounds how many envelopes may be queued before - /// `send` blocks. `channel_capacity` is clamped to a minimum of 1 to - /// avoid a rendezvous channel, which would silently drop envelopes under - /// `try_send`. + /// `send` blocks. A capacity of `0` creates a rendezvous channel: + /// because `send` uses `try_send`, an envelope is accepted only when the + /// transport thread is currently waiting on the receiver, otherwise it + /// is dropped. That is a no-buffer back-pressure policy, not a blanket + /// "drop everything" mode. pub(crate) fn with_capacity(mut send: SendFn, channel_capacity: usize) -> Self where SendFn: FnMut(Envelope, &mut RateLimiter) + Send + 'static, { - let (sender, receiver) = sync_channel(channel_capacity.max(1)); + let (sender, receiver) = sync_channel(channel_capacity); let shutdown = Arc::new(AtomicBool::new(false)); let shutdown_worker = shutdown.clone(); let handle = thread::Builder::new() diff --git a/sentry/src/transports/tokio_thread.rs b/sentry/src/transports/tokio_thread.rs index 657399118..a1ad0cf5a 100644 --- a/sentry/src/transports/tokio_thread.rs +++ b/sentry/src/transports/tokio_thread.rs @@ -40,9 +40,11 @@ impl TransportThread { /// Spawn a new background thread with a custom channel capacity. /// /// The channel capacity bounds how many envelopes may be queued before - /// `send` blocks. `channel_capacity` is clamped to a minimum of 1 to - /// avoid a rendezvous channel, which would silently drop envelopes under - /// `try_send`. + /// `send` blocks. A capacity of `0` creates a rendezvous channel: + /// because `send` uses `try_send`, an envelope is accepted only when the + /// transport thread is currently waiting on the receiver, otherwise it + /// is dropped. That is a no-buffer back-pressure policy, not a blanket + /// "drop everything" mode. pub(crate) fn with_capacity( mut send: SendFn, channel_capacity: usize, @@ -52,7 +54,7 @@ impl TransportThread { // NOTE: returning RateLimiter here, otherwise we are in borrow hell SendFuture: std::future::Future, { - let (sender, receiver) = sync_channel(channel_capacity.max(1)); + let (sender, receiver) = sync_channel(channel_capacity); let shutdown = Arc::new(AtomicBool::new(false)); let shutdown_worker = shutdown.clone(); let handle = thread::Builder::new()