diff --git a/Cargo.lock b/Cargo.lock index 02a1930c5c..287ccc7dce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2930,7 +2930,7 @@ dependencies = [ [[package]] name = "google-cloud-gax" -version = "1.10.0" +version = "1.10.1" dependencies = [ "anyhow", "base64", diff --git a/Cargo.toml b/Cargo.toml index 6062af7060..9e5fb01d9f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -470,7 +470,7 @@ tokio-stream = { default-features = false, version = "0.1.16" } # Local packages used as dependencies. google-cloud-auth = { default-features = false, version = "1.10.0", path = "src/auth" } -google-cloud-gax = { default-features = false, version = "1.10.0", path = "src/gax" } +google-cloud-gax = { default-features = false, version = "1.10.1", path = "src/gax" } gaxi = { default-features = false, version = "0.7.13", path = "src/gax-internal", package = "google-cloud-gax-internal" } wkt = { default-features = false, version = "1.4.0", path = "src/wkt", package = "google-cloud-wkt" } google-cloud-wkt = { default-features = false, version = "1.4.0", path = "src/wkt", package = "google-cloud-wkt" } diff --git a/src/gax-internal/src/grpc.rs b/src/gax-internal/src/grpc.rs index cff24a17f2..52a94aaa5f 100644 --- a/src/gax-internal/src/grpc.rs +++ b/src/gax-internal/src/grpc.rs @@ -39,7 +39,7 @@ use google_cloud_gax::polling_error_policy::{ Aip194Strict as PollingAip194Strict, PollingErrorPolicy, }; use google_cloud_gax::response::{Parts, Response}; -use google_cloud_gax::retry_loop_internal::{effective_timeout, retry_loop}; +use google_cloud_gax::retry_loop_internal::retry_loop; use google_cloud_gax::retry_policy::{ Aip194Strict as RetryAip194Strict, RetryPolicy, RetryPolicyExt as _, }; @@ -77,6 +77,7 @@ pub struct Client { retry_throttler: SharedRetryThrottler, polling_error_policy: Arc, polling_backoff_policy: Arc, + attempt_timeout: Option, } impl Client { @@ -140,6 +141,7 @@ impl Client { polling_backoff_policy: config .polling_backoff_policy .unwrap_or_else(|| Arc::new(ExponentialBackoff::default())), + attempt_timeout: config.attempt_timeout, }) } @@ -276,8 +278,10 @@ impl Client { let headers = self.add_auth_headers(headers).await?; let metadata = tonic::MetadataMap::from_headers(headers); let mut request = ::tonic::Request::from_parts(metadata, extensions, request); - if let Some(attempt_timeout) = options.attempt_timeout() { - request.set_timeout(*attempt_timeout); + if let Some(timeout) = + crate::options::resolve_effective_timeout(&options, self.attempt_timeout, None) + { + request.set_timeout(timeout); } let codec = tonic_prost::ProstCodec::::default(); let mut inner = self.inner.clone(); @@ -408,7 +412,9 @@ impl Client { let metadata = tonic::MetadataMap::from_headers(headers); let mut request = ::tonic::Request::from_parts(metadata, extensions, request); - if let Some(timeout) = effective_timeout(options, remaining_time) { + if let Some(timeout) = + crate::options::resolve_effective_timeout(options, self.attempt_timeout, remaining_time) + { request.set_timeout(timeout); } let codec = tonic_prost::ProstCodec::::default(); diff --git a/src/gax-internal/src/http.rs b/src/gax-internal/src/http.rs index f02ceb5386..4eea5d9693 100644 --- a/src/gax-internal/src/http.rs +++ b/src/gax-internal/src/http.rs @@ -43,7 +43,7 @@ use google_cloud_gax::polling_error_policy::{ Aip194Strict as PollingAip194Strict, PollingErrorPolicy, }; use google_cloud_gax::response::{Parts, Response}; -use google_cloud_gax::retry_loop_internal::{effective_timeout, retry_loop}; +use google_cloud_gax::retry_loop_internal::retry_loop; use google_cloud_gax::retry_policy::{ Aip194Strict as RetryAip194Strict, RetryPolicy, RetryPolicyExt as _, }; @@ -68,6 +68,7 @@ pub struct ReqwestClient { retry_throttler: SharedRetryThrottler, polling_error_policy: Arc, polling_backoff_policy: Arc, + attempt_timeout: Option, instrumentation: Option<&'static crate::options::InstrumentationClientInfo>, _tracing_enabled: bool, universe_domain: String, @@ -127,6 +128,7 @@ impl ReqwestClient { instrumentation: None, _tracing_enabled: tracing_enabled, universe_domain, + attempt_timeout: config.attempt_timeout, transport_metric: None, }) } @@ -388,9 +390,14 @@ impl ReqwestClient { options: &RequestOptions, remaining_time: Option, ) -> Result { - builder = effective_timeout(options, remaining_time) - .into_iter() - .fold(builder, |b, t| b.timeout(t)); + let timeout = crate::options::resolve_effective_timeout( + options, + self.attempt_timeout, + remaining_time, + ); + if let Some(timeout) = timeout { + builder = builder.timeout(timeout); + } let mut headers = match self.cred.headers(Extensions::new()).await { Err(e) => return Err(Error::authentication(e)), diff --git a/src/gax-internal/src/options.rs b/src/gax-internal/src/options.rs index 9c1635be38..c6cde0c7ec 100644 --- a/src/gax-internal/src/options.rs +++ b/src/gax-internal/src/options.rs @@ -43,6 +43,26 @@ pub fn tracing_enabled(config: &ClientConfig) -> bool { .unwrap_or(false) } +/// Resolve the effective per-attempt timeout for a request, taking into +/// account a per-request attempt timeout and an optional client-level default. +pub(crate) fn resolve_effective_timeout( + options: &google_cloud_gax::options::RequestOptions, + client_attempt_timeout: Option, + remaining_time: Option, +) -> Option { + let attempt = options + .attempt_timeout() + .as_ref() + .copied() + .or(client_attempt_timeout); + match (attempt, remaining_time) { + (None, None) => None, + (None, Some(t)) => Some(t), + (Some(t), None) => Some(t), + (Some(a), Some(r)) => Some(std::cmp::min(a, r)), + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/gax/Cargo.toml b/src/gax/Cargo.toml index 40fd144b66..e1a739a240 100644 --- a/src/gax/Cargo.toml +++ b/src/gax/Cargo.toml @@ -18,7 +18,7 @@ name = "google-cloud-gax" # version of all downstream dependencies. For details see: # https://github.com/googleapis/google-cloud-rust/issues/3237 # https://github.com/googleapis/google-cloud-rust/issues/3265 -version = "1.10.0" +version = "1.10.1" description = "Google Cloud Client Libraries for Rust" # Inherit other attributes from the workspace. authors.workspace = true diff --git a/src/gax/src/client_builder.rs b/src/gax/src/client_builder.rs index 13304aa757..9d7b040f1c 100644 --- a/src/gax/src/client_builder.rs +++ b/src/gax/src/client_builder.rs @@ -386,6 +386,16 @@ impl ClientBuilder { self } + /// Configure the per-attempt timeout used as the client default. + /// + /// When set, this timeout will be used for each attempt of a request unless + /// a per-request attempt timeout is provided via RequestOptions. Per-request + /// settings take precedence. + pub fn with_attempt_timeout>(mut self, v: V) -> Self { + self.config.attempt_timeout = Some(v.into()); + self + } + /// Configure the retry throttler. /// /// Advanced applications may want to configure a retry throttler to @@ -514,6 +524,7 @@ pub mod internal { pub retry_throttler: SharedRetryThrottler, pub polling_error_policy: Option>, pub polling_backoff_policy: Option>, + pub attempt_timeout: Option, pub disable_automatic_decompression: bool, pub disable_follow_redirects: bool, pub grpc_subchannel_count: Option, @@ -535,6 +546,7 @@ pub mod internal { retry_throttler: Arc::new(Mutex::new(AdaptiveThrottler::default())), polling_error_policy: None, polling_backoff_policy: None, + attempt_timeout: None, disable_automatic_decompression: false, disable_follow_redirects: false, grpc_subchannel_count: None, @@ -643,6 +655,7 @@ pub mod examples { #[cfg(test)] mod tests { use super::*; + use std::time::Duration; #[tokio::test] async fn build_default() { @@ -749,6 +762,18 @@ pub mod examples { ); } + #[tokio::test] + async fn attempt_timeout() { + let timeout = Duration::from_secs(42); + let client = Client::builder() + .with_attempt_timeout(timeout) + .build() + .await + .unwrap(); + let config = client.0; + assert_eq!(config.attempt_timeout, Some(timeout)); + } + #[tokio::test] async fn retry_policy() { use crate::retry_policy::RetryPolicyExt;