diff --git a/src/http.rs b/src/http.rs index 76aff937631d3..550abc4a0e334 100644 --- a/src/http.rs +++ b/src/http.rs @@ -35,6 +35,7 @@ use vector_lib::sensitive_string::SensitiveString; #[cfg(feature = "aws-core")] use crate::aws::AwsAuthentication; +use crate::sinks::util::http::ConnectionConfig; use crate::{ config::ProxyConfig, internal_events::{http_client, HttpServerRequestReceived, HttpServerResponseSent}, @@ -92,7 +93,26 @@ where tls_settings: impl Into, proxy_config: &ProxyConfig, ) -> Result, HttpError> { - HttpClient::new_with_custom_client(tls_settings, proxy_config, &mut Client::builder()) + HttpClient::new_with_connection_config(tls_settings, proxy_config, None) + } + + pub fn new_with_connection_config( + tls_settings: impl Into, + proxy_config: &ProxyConfig, + connection_config: Option, + ) -> Result, HttpError> { + let mut builder = Client::builder(); + + if let Some(config) = connection_config { + if let Some(idle_secs) = config.idle_timeout_secs { + builder.pool_idle_timeout(Duration::from_secs(idle_secs)); + } + if let Some(max_idle) = config.pool_idle_per_host { + builder.pool_max_idle_per_host(max_idle); + } + } + + HttpClient::new_with_custom_client(tls_settings, proxy_config, &mut builder) } pub fn new_with_custom_client( diff --git a/src/sinks/axiom.rs b/src/sinks/axiom.rs index dfc4ab124dcc1..c30d679d78a66 100644 --- a/src/sinks/axiom.rs +++ b/src/sinks/axiom.rs @@ -133,6 +133,7 @@ impl SinkConfig for AxiomConfig { ), payload_prefix: "".into(), // Always newline delimited JSON payload_suffix: "".into(), // Always newline delimited JSON + connection: None, }; http_sink_config.build(cx).await diff --git a/src/sinks/elasticsearch/common.rs b/src/sinks/elasticsearch/common.rs index 1060ee5c7a22a..cb6d498666ed8 100644 --- a/src/sinks/elasticsearch/common.rs +++ b/src/sinks/elasticsearch/common.rs @@ -10,6 +10,7 @@ use super::{ request_builder::ElasticsearchRequestBuilder, ElasticsearchApiVersion, ElasticsearchEncoder, InvalidHostSnafu, Request, VersionType, }; +use crate::sinks::util::http::ConnectionConfig; use crate::{ http::{HttpClient, MaybeAuth, QueryParameterValue, QueryParameters}, sinks::{ @@ -190,6 +191,7 @@ impl ElasticsearchCommon { &request, &tls_settings, proxy_config, + config.connection, ) .await { @@ -341,6 +343,7 @@ async fn get_version( request: &RequestConfig, tls_settings: &TlsSettings, proxy_config: &ProxyConfig, + connection_config: Option, ) -> crate::Result { #[derive(Deserialize)] struct Version { @@ -351,7 +354,11 @@ async fn get_version( version: Option, } - let client = HttpClient::new(tls_settings.clone(), proxy_config)?; + let client = HttpClient::new_with_connection_config( + tls_settings.clone(), + proxy_config, + connection_config, + )?; let response = get( base_url, auth, diff --git a/src/sinks/elasticsearch/config.rs b/src/sinks/elasticsearch/config.rs index ba639c6ec794b..7fe503969b0a4 100644 --- a/src/sinks/elasticsearch/config.rs +++ b/src/sinks/elasticsearch/config.rs @@ -6,6 +6,7 @@ use std::{ use futures::{FutureExt, TryFutureExt}; use vector_lib::configurable::configurable_component; +use crate::sinks::util::http::ConnectionConfig; use crate::{ codecs::Transformer, config::{AcknowledgementsConfig, DataType, Input, SinkConfig, SinkContext}, @@ -218,6 +219,15 @@ pub struct ElasticsearchConfig { )] #[configurable(derived)] pub acknowledgements: AcknowledgementsConfig, + + /// Connection-level settings for the underlying HTTP client. + /// + /// This allows configuring parameters like connection idle timeout and + /// maximum idle connections per host. Useful when running behind load + /// balancers with strict idle policies. + #[configurable(derived)] + #[serde(default)] + pub connection: Option, } fn default_doc_type() -> String { @@ -255,6 +265,7 @@ impl Default for ElasticsearchConfig { data_stream: None, metrics: None, acknowledgements: Default::default(), + connection: None, } } } @@ -541,7 +552,11 @@ impl SinkConfig for ElasticsearchConfig { let commons = ElasticsearchCommon::parse_many(self, cx.proxy()).await?; let common = commons[0].clone(); - let client = HttpClient::new(common.tls_settings.clone(), cx.proxy())?; + let client = HttpClient::new_with_connection_config( + common.tls_settings.clone(), + cx.proxy(), + self.connection, + )?; let request_limits = self.request.tower.into_settings(); diff --git a/src/sinks/http/config.rs b/src/sinks/http/config.rs index 4094768c34a43..560fc6ce419fb 100644 --- a/src/sinks/http/config.rs +++ b/src/sinks/http/config.rs @@ -21,6 +21,7 @@ use super::{ }; #[cfg(feature = "aws-core")] use crate::aws::AwsAuthentication; +use crate::sinks::util::http::ConnectionConfig; #[cfg(feature = "aws-core")] use crate::sinks::util::http::SigV4Config; use crate::{ @@ -107,6 +108,10 @@ pub struct HttpSinkConfig { skip_serializing_if = "crate::serde::is_default" )] pub acknowledgements: AcknowledgementsConfig, + + #[configurable(derived)] + #[serde(default)] + pub connection: Option, } /// HTTP method. @@ -163,7 +168,11 @@ impl From for Method { impl HttpSinkConfig { fn build_http_client(&self, cx: &SinkContext) -> crate::Result { let tls = TlsSettings::from_options(self.tls.as_ref())?; - Ok(HttpClient::new(tls, cx.proxy())?) + Ok(HttpClient::new_with_connection_config( + tls, + cx.proxy(), + self.connection, + )?) } pub(super) fn build_encoder(&self) -> crate::Result> { @@ -392,6 +401,7 @@ mod tests { acknowledgements: AcknowledgementsConfig::default(), payload_prefix: String::new(), payload_suffix: String::new(), + connection: None, }; let external_resource = ExternalResource::new( @@ -413,4 +423,32 @@ mod tests { } register_validatable_component!(HttpSinkConfig); + + #[test] + fn deserialize_connection_config_defaults() { + let cfg: ConnectionConfig = serde_yaml::from_str("{}").unwrap(); + // Defaults should be None + assert!(cfg.idle_timeout_secs.is_none()); + assert!(cfg.pool_idle_per_host.is_none()); + } + + #[test] + fn http_sink_config_with_connection() { + let yaml = r#" +uri: "http://example.com" +encoding: + codec: "json" +connection: + idle_timeout_secs: 120 + pool_idle_per_host: 20 +"#; + + let cfg: HttpSinkConfig = serde_yaml::from_str(yaml).unwrap(); + + assert_eq!(cfg.uri.uri, "http://example.com"); + assert!(cfg.connection.is_some()); + let conn = cfg.connection.unwrap(); + assert_eq!(conn.idle_timeout_secs, Some(120)); + assert_eq!(conn.pool_idle_per_host, Some(20)); + } } diff --git a/src/sinks/http/tests.rs b/src/sinks/http/tests.rs index 363877380c308..dfe6bfb121836 100644 --- a/src/sinks/http/tests.rs +++ b/src/sinks/http/tests.rs @@ -60,6 +60,7 @@ fn default_cfg(encoding: EncodingConfigWithFraming) -> HttpSinkConfig { request: Default::default(), tls: Default::default(), acknowledgements: Default::default(), + connection: Default::default(), } } diff --git a/src/sinks/humio/logs.rs b/src/sinks/humio/logs.rs index ba6b00f066407..5566e83847e9f 100644 --- a/src/sinks/humio/logs.rs +++ b/src/sinks/humio/logs.rs @@ -209,6 +209,7 @@ impl HumioLogsConfig { timestamp_key: Some(config_timestamp_key_target_path()), endpoint_target: EndpointTarget::Event, auto_extract_timestamp: None, + connection: None, } } } diff --git a/src/sinks/opentelemetry/mod.rs b/src/sinks/opentelemetry/mod.rs index 90d26da27e202..d572a841f26ff 100644 --- a/src/sinks/opentelemetry/mod.rs +++ b/src/sinks/opentelemetry/mod.rs @@ -48,6 +48,7 @@ impl Default for Protocol { request: Default::default(), tls: Default::default(), acknowledgements: Default::default(), + connection: None, }) } } diff --git a/src/sinks/splunk_hec/common/util.rs b/src/sinks/splunk_hec/common/util.rs index 99bfc1fbd6d16..ac8a7b98971c6 100644 --- a/src/sinks/splunk_hec/common/util.rs +++ b/src/sinks/splunk_hec/common/util.rs @@ -13,6 +13,7 @@ use super::{ service::{HttpRequestBuilder, MetadataFields}, EndpointTarget, }; +use crate::sinks::util::http::ConnectionConfig; use crate::{ http::HttpClient, internal_events::TemplateRenderingError, @@ -50,6 +51,24 @@ pub fn create_client( Ok(HttpClient::new(tls_settings, proxy_config)?) } +pub fn create_client_with_connection_config( + tls: Option<&TlsConfig>, + proxy_config: &ProxyConfig, + connection: Option, +) -> crate::Result { + let tls_settings = TlsSettings::from_options(tls)?; + + if let Some(conn) = connection { + Ok(HttpClient::new_with_connection_config( + tls_settings, + proxy_config, + Some(conn), + )?) + } else { + Ok(HttpClient::new(tls_settings, proxy_config)?) + } +} + // TODO: `HttpBatchService` has been deprecated for direct use in sinks. // This sink should undergo a refactor to utilize the `HttpService` // instead, which extracts much of the boilerplate code for `Service`. diff --git a/src/sinks/splunk_hec/logs/config.rs b/src/sinks/splunk_hec/logs/config.rs index 7479a58d0b2ba..61c33b43d7c68 100644 --- a/src/sinks/splunk_hec/logs/config.rs +++ b/src/sinks/splunk_hec/logs/config.rs @@ -6,6 +6,9 @@ use vector_lib::{ sensitive_string::SensitiveString, }; +use super::{encoder::HecLogsEncoder, request_builder::HecLogsRequestBuilder, sink::HecLogsSink}; +use crate::sinks::splunk_hec::common::create_client_with_connection_config; +use crate::sinks::util::http::ConnectionConfig; use crate::{ http::HttpClient, sinks::{ @@ -20,8 +23,6 @@ use crate::{ }, }; -use super::{encoder::HecLogsEncoder, request_builder::HecLogsRequestBuilder, sink::HecLogsSink}; - /// Configuration for the `splunk_hec_logs` sink. #[configurable_component(sink( "splunk_hec_logs", @@ -152,6 +153,15 @@ pub struct HecLogsSinkConfig { #[configurable(metadata(docs::advanced))] #[serde(default = "default_endpoint_target")] pub endpoint_target: EndpointTarget, + + /// Connection-level settings for the underlying HTTP client. + /// + /// This allows configuring parameters like connection idle timeout and + /// maximum idle connections per host. Useful when running behind load + /// balancers with strict idle policies. + #[configurable(derived)] + #[serde(default)] + pub connection: Option, } const fn default_endpoint_target() -> EndpointTarget { @@ -178,6 +188,7 @@ impl GenerateConfig for HecLogsSinkConfig { timestamp_key: None, auto_extract_timestamp: None, endpoint_target: EndpointTarget::Event, + connection: None, }) .unwrap() } @@ -191,7 +202,14 @@ impl SinkConfig for HecLogsSinkConfig { return Err("`auto_extract_timestamp` cannot be set for the `raw` endpoint.".into()); } - let client = create_client(self.tls.as_ref(), cx.proxy())?; + //let client = create_client(self.tls.as_ref(), cx.proxy())?; + + let client = if let Some(conn) = &self.connection { + create_client_with_connection_config(self.tls.as_ref(), cx.proxy(), Some(*conn))? + } else { + create_client(self.tls.as_ref(), cx.proxy())? + }; + let healthcheck = build_healthcheck( self.endpoint.clone(), self.default_token.inner().to_owned(), @@ -334,6 +352,7 @@ mod tests { timestamp_key: None, auto_extract_timestamp: None, endpoint_target: EndpointTarget::Raw, + connection: None, }; let endpoint = format!("{endpoint}/services/collector/raw"); @@ -360,4 +379,25 @@ mod tests { } register_validatable_component!(HecLogsSinkConfig); + + #[test] + fn splunk_config_with_connection() { + let yaml = r#" +default_token: "test_token" +endpoint: "http://splunk:8088" +encoding: + codec: "json" +connection: + idle_timeout_secs: 120 + pool_idle_per_host: 15 +"#; + + let cfg: HecLogsSinkConfig = serde_yaml::from_str(yaml).unwrap(); + assert_eq!(cfg.default_token.inner(), "test_token"); + assert!(cfg.connection.is_some()); + + let conn = cfg.connection.unwrap(); + assert_eq!(conn.idle_timeout_secs, Some(120)); + assert_eq!(conn.pool_idle_per_host, Some(15)); + } } diff --git a/src/sinks/splunk_hec/logs/tests.rs b/src/sinks/splunk_hec/logs/tests.rs index 794fff1a4b684..f4c08508c0619 100644 --- a/src/sinks/splunk_hec/logs/tests.rs +++ b/src/sinks/splunk_hec/logs/tests.rs @@ -237,6 +237,7 @@ async fn splunk_passthrough_token() { timestamp_key: None, auto_extract_timestamp: None, endpoint_target: EndpointTarget::Event, + connection: None, }; let cx = SinkContext::default(); diff --git a/src/sinks/util/http.rs b/src/sinks/util/http.rs index aa8ba4bb3446e..248d952bd061b 100644 --- a/src/sinks/util/http.rs +++ b/src/sinks/util/http.rs @@ -633,6 +633,25 @@ impl RequestConfig { } } +#[configurable_component] +#[configurable(title = "Configuration for connection behavior in the HTTP client.")] +#[configurable(description = "Configuration for connection behavior in the HTTP client.")] +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] +pub struct ConnectionConfig { + /// Maximum idle time for a connection before it’s closed (seconds) + pub idle_timeout_secs: Option, + + /// Maximum number of idle connections to keep per host + pub pool_idle_per_host: Option, +} + +impl ConnectionConfig { + pub const DEFAULT: Self = Self { + idle_timeout_secs: None, + pool_idle_per_host: None, + }; +} + #[derive(Debug, Snafu)] pub enum HeaderValidationError { #[snafu(display("{}: {}", source, name))] diff --git a/src/sources/splunk_hec/mod.rs b/src/sources/splunk_hec/mod.rs index 0be523bf8fe19..b0733b9b0f879 100644 --- a/src/sources/splunk_hec/mod.rs +++ b/src/sources/splunk_hec/mod.rs @@ -1357,6 +1357,7 @@ mod tests { timestamp_key: None, auto_extract_timestamp: None, endpoint_target: Default::default(), + connection: None, } .build(SinkContext::default()) .await