diff --git a/sentry/src/transports/curl.rs b/sentry/src/transports/curl.rs index f3ae2a86..2f4c2725 100644 --- a/sentry/src/transports/curl.rs +++ b/sentry/src/transports/curl.rs @@ -3,7 +3,10 @@ use std::time::Duration; use curl::easy::Easy as CurlClient; -use super::{thread::TransportThread, HTTP_PAYLOAD_TOO_LARGE, HTTP_PAYLOAD_TOO_LARGE_MESSAGE}; +use super::{ + thread::TransportThread, DEFAULT_CHANNEL_CAPACITY, HTTP_PAYLOAD_TOO_LARGE, + HTTP_PAYLOAD_TOO_LARGE_MESSAGE, +}; use crate::{sentry_debug, types::Scheme, ClientOptions, Envelope, Transport}; @@ -18,15 +21,28 @@ pub struct CurlHttpTransport { impl CurlHttpTransport { /// Creates a new Transport. pub fn new(options: &ClientOptions) -> Self { - Self::new_internal(options, None) + Self::new_internal(options, None, DEFAULT_CHANNEL_CAPACITY) } /// Creates a new Transport that uses the specified [`CurlClient`]. pub fn with_client(options: &ClientOptions, client: CurlClient) -> Self { - Self::new_internal(options, Some(client)) + Self::new_internal(options, Some(client), DEFAULT_CHANNEL_CAPACITY) } - fn new_internal(options: &ClientOptions, client: Option) -> Self { + /// Creates a new Transport with a custom transport channel capacity. + /// + /// The channel capacity bounds how many envelopes may be queued before + /// `send_envelope` blocks. A higher capacity reduces the chance of + /// dropped events in high-throughput scenarios at the cost of memory. + pub fn with_channel_capacity(options: &ClientOptions, channel_capacity: usize) -> Self { + Self::new_internal(options, None, channel_capacity) + } + + fn new_internal( + options: &ClientOptions, + client: Option, + channel_capacity: usize, + ) -> Self { let client = client.unwrap_or_else(CurlClient::new); let http_proxy = options.http_proxy.as_ref().map(ToString::to_string); let https_proxy = options.https_proxy.as_ref().map(ToString::to_string); @@ -38,99 +54,103 @@ impl CurlHttpTransport { let accept_invalid_certs = options.accept_invalid_certs; let mut handle = client; - let thread = TransportThread::new(move |envelope, rl| { - handle.reset(); - handle.url(&url).unwrap(); - handle.custom_request("POST").unwrap(); - - if accept_invalid_certs { - handle.ssl_verify_host(false).unwrap(); - handle.ssl_verify_peer(false).unwrap(); - } - - match (scheme, &http_proxy, &https_proxy) { - (Scheme::Https, _, Some(proxy)) => { - if let Err(err) = handle.proxy(proxy) { - sentry_debug!("invalid proxy: {:?}", err); - } + let thread = TransportThread::with_capacity( + move |envelope, rl| { + handle.reset(); + handle.url(&url).unwrap(); + handle.custom_request("POST").unwrap(); + + if accept_invalid_certs { + handle.ssl_verify_host(false).unwrap(); + handle.ssl_verify_peer(false).unwrap(); } - (_, Some(proxy), _) => { - if let Err(err) = handle.proxy(proxy) { - sentry_debug!("invalid proxy: {:?}", err); + + match (scheme, &http_proxy, &https_proxy) { + (Scheme::Https, _, Some(proxy)) => { + if let Err(err) = handle.proxy(proxy) { + sentry_debug!("invalid proxy: {:?}", err); + } } + (_, Some(proxy), _) => { + if let Err(err) = handle.proxy(proxy) { + sentry_debug!("invalid proxy: {:?}", err); + } + } + _ => {} } - _ => {} - } - - let mut body = Vec::new(); - envelope.to_writer(&mut body).unwrap(); - let mut body = Cursor::new(body); - - let mut retry_after = None; - let mut sentry_header = None; - let mut headers = curl::easy::List::new(); - headers.append(&format!("X-Sentry-Auth: {auth}")).unwrap(); - headers.append("Expect:").unwrap(); - handle.http_headers(headers).unwrap(); - handle.upload(true).unwrap(); - handle.in_filesize(body.get_ref().len() as u64).unwrap(); - handle - .read_function(move |buf| Ok(body.read(buf).unwrap_or(0))) - .unwrap(); - handle.verbose(true).unwrap(); - handle - .debug_function(move |info, data| { - let prefix = match info { - curl::easy::InfoType::HeaderIn => "< ", - curl::easy::InfoType::HeaderOut => "> ", - curl::easy::InfoType::DataOut => "", - _ => return, - }; - sentry_debug!("curl: {}{}", prefix, String::from_utf8_lossy(data).trim()); - }) - .unwrap(); - - { - let mut handle = handle.transfer(); - let retry_after_setter = &mut retry_after; - let sentry_header_setter = &mut sentry_header; + + let mut body = Vec::new(); + envelope.to_writer(&mut body).unwrap(); + let mut body = Cursor::new(body); + + let mut retry_after = None; + let mut sentry_header = None; + let mut headers = curl::easy::List::new(); + headers.append(&format!("X-Sentry-Auth: {auth}")).unwrap(); + headers.append("Expect:").unwrap(); + handle.http_headers(headers).unwrap(); + handle.upload(true).unwrap(); + handle.in_filesize(body.get_ref().len() as u64).unwrap(); + handle + .read_function(move |buf| Ok(body.read(buf).unwrap_or(0))) + .unwrap(); + handle.verbose(true).unwrap(); handle - .header_function(move |data| { - if let Ok(data) = std::str::from_utf8(data) { - let mut iter = data.split(':'); - if let Some(key) = iter.next().map(str::to_lowercase) { - if key == "retry-after" { - *retry_after_setter = iter.next().map(|x| x.trim().to_string()); - } else if key == "x-sentry-rate-limits" { - *sentry_header_setter = - iter.next().map(|x| x.trim().to_string()); + .debug_function(move |info, data| { + let prefix = match info { + curl::easy::InfoType::HeaderIn => "< ", + curl::easy::InfoType::HeaderOut => "> ", + curl::easy::InfoType::DataOut => "", + _ => return, + }; + sentry_debug!("curl: {}{}", prefix, String::from_utf8_lossy(data).trim()); + }) + .unwrap(); + + { + let mut handle = handle.transfer(); + let retry_after_setter = &mut retry_after; + let sentry_header_setter = &mut sentry_header; + handle + .header_function(move |data| { + if let Ok(data) = std::str::from_utf8(data) { + let mut iter = data.split(':'); + if let Some(key) = iter.next().map(str::to_lowercase) { + if key == "retry-after" { + *retry_after_setter = + iter.next().map(|x| x.trim().to_string()); + } else if key == "x-sentry-rate-limits" { + *sentry_header_setter = + iter.next().map(|x| x.trim().to_string()); + } } } + true + }) + .unwrap(); + handle.perform().ok(); + } + + match handle.response_code() { + Ok(response_code) => { + if let Some(sentry_header) = sentry_header { + rl.update_from_sentry_header(&sentry_header); + } else if let Some(retry_after) = retry_after { + rl.update_from_retry_after(&retry_after); + } else if response_code == 429 { + rl.update_from_429(); + } + if response_code == HTTP_PAYLOAD_TOO_LARGE as u32 { + sentry_debug!("{HTTP_PAYLOAD_TOO_LARGE_MESSAGE}"); } - true - }) - .unwrap(); - handle.perform().ok(); - } - - match handle.response_code() { - Ok(response_code) => { - if let Some(sentry_header) = sentry_header { - rl.update_from_sentry_header(&sentry_header); - } else if let Some(retry_after) = retry_after { - rl.update_from_retry_after(&retry_after); - } else if response_code == 429 { - rl.update_from_429(); } - if response_code == HTTP_PAYLOAD_TOO_LARGE as u32 { - sentry_debug!("{HTTP_PAYLOAD_TOO_LARGE_MESSAGE}"); + Err(err) => { + sentry_debug!("Failed to send envelope: {}", err); } } - Err(err) => { - sentry_debug!("Failed to send envelope: {}", err); - } - } - }); + }, + channel_capacity, + ); Self { thread } } } diff --git a/sentry/src/transports/mod.rs b/sentry/src/transports/mod.rs index 7e959612..a18ebcc9 100644 --- a/sentry/src/transports/mod.rs +++ b/sentry/src/transports/mod.rs @@ -48,6 +48,9 @@ pub(crate) const HTTP_PAYLOAD_TOO_LARGE: u16 = 413; pub(crate) const HTTP_PAYLOAD_TOO_LARGE_MESSAGE: &str = "Envelope was discarded due to size limits (HTTP 413)."; +#[cfg(sentry_any_http_transport)] +pub(crate) const DEFAULT_CHANNEL_CAPACITY: usize = 30; + #[cfg(feature = "reqwest")] type DefaultTransport = ReqwestHttpTransport; diff --git a/sentry/src/transports/reqwest.rs b/sentry/src/transports/reqwest.rs index a125b18f..0b1137b0 100644 --- a/sentry/src/transports/reqwest.rs +++ b/sentry/src/transports/reqwest.rs @@ -3,7 +3,8 @@ use std::time::Duration; use reqwest::{header as ReqwestHeaders, Client as ReqwestClient, Proxy, StatusCode}; use super::{ - tokio_thread::TransportThread, HTTP_PAYLOAD_TOO_LARGE, HTTP_PAYLOAD_TOO_LARGE_MESSAGE, + tokio_thread::TransportThread, DEFAULT_CHANNEL_CAPACITY, HTTP_PAYLOAD_TOO_LARGE, + HTTP_PAYLOAD_TOO_LARGE_MESSAGE, }; use crate::{sentry_debug, ClientOptions, Envelope, Transport}; @@ -21,15 +22,28 @@ pub struct ReqwestHttpTransport { impl ReqwestHttpTransport { /// Creates a new Transport. pub fn new(options: &ClientOptions) -> Self { - Self::new_internal(options, None) + Self::new_internal(options, None, DEFAULT_CHANNEL_CAPACITY) } /// Creates a new Transport that uses the specified [`ReqwestClient`]. pub fn with_client(options: &ClientOptions, client: ReqwestClient) -> Self { - Self::new_internal(options, Some(client)) + Self::new_internal(options, Some(client), DEFAULT_CHANNEL_CAPACITY) } - fn new_internal(options: &ClientOptions, client: Option) -> Self { + /// Creates a new Transport with a custom transport channel capacity. + /// + /// The channel capacity bounds how many envelopes may be queued before + /// `send_envelope` blocks. A higher capacity reduces the chance of + /// dropped events in high-throughput scenarios at the cost of memory. + pub fn with_channel_capacity(options: &ClientOptions, channel_capacity: usize) -> Self { + Self::new_internal(options, None, channel_capacity) + } + + fn new_internal( + options: &ClientOptions, + client: Option, + channel_capacity: usize, + ) -> Self { let client = client.unwrap_or_else(|| { let mut builder = reqwest::Client::builder(); if options.accept_invalid_certs { @@ -64,53 +78,56 @@ impl ReqwestHttpTransport { let auth = dsn.to_auth(Some(&user_agent)).to_string(); let url = dsn.envelope_api_url().to_string(); - let thread = TransportThread::new(move |envelope, mut rl| { - let mut body = Vec::new(); - envelope.to_writer(&mut body).unwrap(); - let request = client.post(&url).header("X-Sentry-Auth", &auth).body(body); + let thread = TransportThread::with_capacity( + move |envelope, mut rl| { + let mut body = Vec::new(); + envelope.to_writer(&mut body).unwrap(); + let request = client.post(&url).header("X-Sentry-Auth", &auth).body(body); - // NOTE: because of lifetime issues, building the request using the - // `client` has to happen outside of this async block. - async move { - match request.send().await { - Ok(response) => { - let headers = response.headers(); + // NOTE: because of lifetime issues, building the request using the + // `client` has to happen outside of this async block. + async move { + match request.send().await { + Ok(response) => { + let headers = response.headers(); - if let Some(sentry_header) = headers - .get("x-sentry-rate-limits") - .and_then(|x| x.to_str().ok()) - { - rl.update_from_sentry_header(sentry_header); - } else if let Some(retry_after) = headers - .get(ReqwestHeaders::RETRY_AFTER) - .and_then(|x| x.to_str().ok()) - { - rl.update_from_retry_after(retry_after); - } else if response.status() == StatusCode::TOO_MANY_REQUESTS { - rl.update_from_429(); - } + if let Some(sentry_header) = headers + .get("x-sentry-rate-limits") + .and_then(|x| x.to_str().ok()) + { + rl.update_from_sentry_header(sentry_header); + } else if let Some(retry_after) = headers + .get(ReqwestHeaders::RETRY_AFTER) + .and_then(|x| x.to_str().ok()) + { + rl.update_from_retry_after(retry_after); + } else if response.status() == StatusCode::TOO_MANY_REQUESTS { + rl.update_from_429(); + } - let is_payload_too_large = - response.status().as_u16() == HTTP_PAYLOAD_TOO_LARGE; - match response.text().await { - Err(err) => { - sentry_debug!("Failed to read sentry response: {}", err); + let is_payload_too_large = + response.status().as_u16() == HTTP_PAYLOAD_TOO_LARGE; + match response.text().await { + Err(err) => { + sentry_debug!("Failed to read sentry response: {}", err); + } + Ok(text) => { + sentry_debug!("Get response: `{}`", text); + } } - Ok(text) => { - sentry_debug!("Get response: `{}`", text); + if is_payload_too_large { + sentry_debug!("{HTTP_PAYLOAD_TOO_LARGE_MESSAGE}"); } } - if is_payload_too_large { - sentry_debug!("{HTTP_PAYLOAD_TOO_LARGE_MESSAGE}"); + Err(err) => { + sentry_debug!("Failed to send envelope: {}", err); } } - Err(err) => { - sentry_debug!("Failed to send envelope: {}", err); - } + rl } - rl - } - }); + }, + channel_capacity, + ); Self { thread } } } diff --git a/sentry/src/transports/thread.rs b/sentry/src/transports/thread.rs index 4ab40260..51bf6b78 100644 --- a/sentry/src/transports/thread.rs +++ b/sentry/src/transports/thread.rs @@ -5,6 +5,7 @@ use std::thread::{self, JoinHandle}; use std::time::Duration; use super::ratelimit::{RateLimiter, RateLimitingCategory}; +use super::DEFAULT_CHANNEL_CAPACITY; use crate::{sentry_debug, Envelope}; #[expect( @@ -26,12 +27,27 @@ pub struct TransportThread { } impl TransportThread { - /// Spawn a new background thread. - pub fn new(mut send: SendFn) -> Self + /// Spawn a new background thread with the default channel capacity of 30. + pub fn new(send: SendFn) -> Self where SendFn: FnMut(Envelope, &mut RateLimiter) + Send + 'static, { - let (sender, receiver) = sync_channel(30); + Self::with_capacity(send, DEFAULT_CHANNEL_CAPACITY) + } + + /// Spawn a new background thread with a custom channel capacity. + /// + /// The channel capacity bounds how many envelopes may be queued before + /// `send` blocks. A capacity of `0` creates a rendezvous channel: + /// because `send` uses `try_send`, an envelope is accepted only when the + /// transport thread is currently waiting on the receiver, otherwise it + /// is dropped. That is a no-buffer back-pressure policy, not a blanket + /// "drop everything" mode. + pub(crate) fn with_capacity(mut send: SendFn, channel_capacity: usize) -> Self + where + SendFn: FnMut(Envelope, &mut RateLimiter) + Send + 'static, + { + let (sender, receiver) = sync_channel(channel_capacity); let shutdown = Arc::new(AtomicBool::new(false)); let shutdown_worker = shutdown.clone(); let handle = thread::Builder::new() diff --git a/sentry/src/transports/tokio_thread.rs b/sentry/src/transports/tokio_thread.rs index 39896388..a1ad0cf5 100644 --- a/sentry/src/transports/tokio_thread.rs +++ b/sentry/src/transports/tokio_thread.rs @@ -5,6 +5,7 @@ use std::thread::{self, JoinHandle}; use std::time::Duration; use super::ratelimit::{RateLimiter, RateLimitingCategory}; +use super::DEFAULT_CHANNEL_CAPACITY; use crate::{sentry_debug, Envelope}; #[expect( @@ -26,14 +27,34 @@ pub struct TransportThread { } impl TransportThread { - /// Spawn a new background thread. - pub fn new(mut send: SendFn) -> Self + /// Spawn a new background thread with the default channel capacity of 30. + pub fn new(send: SendFn) -> Self where SendFn: FnMut(Envelope, RateLimiter) -> SendFuture + Send + 'static, // NOTE: returning RateLimiter here, otherwise we are in borrow hell SendFuture: std::future::Future, { - let (sender, receiver) = sync_channel(30); + Self::with_capacity(send, DEFAULT_CHANNEL_CAPACITY) + } + + /// Spawn a new background thread with a custom channel capacity. + /// + /// The channel capacity bounds how many envelopes may be queued before + /// `send` blocks. A capacity of `0` creates a rendezvous channel: + /// because `send` uses `try_send`, an envelope is accepted only when the + /// transport thread is currently waiting on the receiver, otherwise it + /// is dropped. That is a no-buffer back-pressure policy, not a blanket + /// "drop everything" mode. + pub(crate) fn with_capacity( + mut send: SendFn, + channel_capacity: usize, + ) -> Self + where + SendFn: FnMut(Envelope, RateLimiter) -> SendFuture + Send + 'static, + // NOTE: returning RateLimiter here, otherwise we are in borrow hell + SendFuture: std::future::Future, + { + let (sender, receiver) = sync_channel(channel_capacity); let shutdown = Arc::new(AtomicBool::new(false)); let shutdown_worker = shutdown.clone(); let handle = thread::Builder::new() diff --git a/sentry/src/transports/ureq.rs b/sentry/src/transports/ureq.rs index d079028f..614f0787 100644 --- a/sentry/src/transports/ureq.rs +++ b/sentry/src/transports/ureq.rs @@ -5,7 +5,10 @@ use ureq::http::Response; use ureq::tls::{TlsConfig, TlsProvider}; use ureq::{Agent, Proxy}; -use super::{thread::TransportThread, HTTP_PAYLOAD_TOO_LARGE, HTTP_PAYLOAD_TOO_LARGE_MESSAGE}; +use super::{ + thread::TransportThread, DEFAULT_CHANNEL_CAPACITY, HTTP_PAYLOAD_TOO_LARGE, + HTTP_PAYLOAD_TOO_LARGE_MESSAGE, +}; use crate::{sentry_debug, types::Scheme, ClientOptions, Envelope, Transport}; @@ -20,15 +23,28 @@ pub struct UreqHttpTransport { impl UreqHttpTransport { /// Creates a new Transport. pub fn new(options: &ClientOptions) -> Self { - Self::new_internal(options, None) + Self::new_internal(options, None, DEFAULT_CHANNEL_CAPACITY) } /// Creates a new Transport that uses the specified [`ureq::Agent`]. pub fn with_agent(options: &ClientOptions, agent: Agent) -> Self { - Self::new_internal(options, Some(agent)) + Self::new_internal(options, Some(agent), DEFAULT_CHANNEL_CAPACITY) } - fn new_internal(options: &ClientOptions, agent: Option) -> Self { + /// Creates a new Transport with a custom transport channel capacity. + /// + /// The channel capacity bounds how many envelopes may be queued before + /// `send_envelope` blocks. A higher capacity reduces the chance of + /// dropped events in high-throughput scenarios at the cost of memory. + pub fn with_channel_capacity(options: &ClientOptions, channel_capacity: usize) -> Self { + Self::new_internal(options, None, channel_capacity) + } + + fn new_internal( + options: &ClientOptions, + agent: Option, + channel_capacity: usize, + ) -> Self { let dsn = options.dsn.as_ref().unwrap(); let scheme = dsn.scheme(); let agent = agent.unwrap_or_else(|| { @@ -83,42 +99,48 @@ impl UreqHttpTransport { let auth = dsn.to_auth(Some(&user_agent)).to_string(); let url = dsn.envelope_api_url().to_string(); - let thread = TransportThread::new(move |envelope, rl| { - let mut body = Vec::new(); - envelope.to_writer(&mut body).unwrap(); - let request = agent.post(&url).header("X-Sentry-Auth", &auth).send(&body); - - match request { - Ok(mut response) => { - fn header_str<'a, B>(response: &'a Response, key: &str) -> Option<&'a str> { - response.headers().get(key)?.to_str().ok() - } + let thread = TransportThread::with_capacity( + move |envelope, rl| { + let mut body = Vec::new(); + envelope.to_writer(&mut body).unwrap(); + let request = agent.post(&url).header("X-Sentry-Auth", &auth).send(&body); + + match request { + Ok(mut response) => { + fn header_str<'a, B>( + response: &'a Response, + key: &str, + ) -> Option<&'a str> { + response.headers().get(key)?.to_str().ok() + } - if let Some(sentry_header) = header_str(&response, "x-sentry-rate-limits") { - rl.update_from_sentry_header(sentry_header); - } else if let Some(retry_after) = header_str(&response, "retry-after") { - rl.update_from_retry_after(retry_after); - } else if response.status() == 429 { - rl.update_from_429(); - } + if let Some(sentry_header) = header_str(&response, "x-sentry-rate-limits") { + rl.update_from_sentry_header(sentry_header); + } else if let Some(retry_after) = header_str(&response, "retry-after") { + rl.update_from_retry_after(retry_after); + } else if response.status() == 429 { + rl.update_from_429(); + } - match response.body_mut().read_to_string() { - Err(err) => { - sentry_debug!("Failed to read sentry response: {}", err); + match response.body_mut().read_to_string() { + Err(err) => { + sentry_debug!("Failed to read sentry response: {}", err); + } + Ok(text) => { + sentry_debug!("Get response: `{}`", text); + } } - Ok(text) => { - sentry_debug!("Get response: `{}`", text); + if response.status() == HTTP_PAYLOAD_TOO_LARGE { + sentry_debug!("{HTTP_PAYLOAD_TOO_LARGE_MESSAGE}"); } } - if response.status() == HTTP_PAYLOAD_TOO_LARGE { - sentry_debug!("{HTTP_PAYLOAD_TOO_LARGE_MESSAGE}"); + Err(err) => { + sentry_debug!("Failed to send envelope: {}", err); } } - Err(err) => { - sentry_debug!("Failed to send envelope: {}", err); - } - } - }); + }, + channel_capacity, + ); Self { thread } } }