From 05bdcc0d815de39c678a4b33606afaddda3d8170 Mon Sep 17 00:00:00 2001 From: Jeff Cantrill Date: Tue, 21 Apr 2026 16:47:16 -0400 Subject: [PATCH 1/5] LOG-9375: Enhance the prometheus exporter with SAR auth strategy --- src/sinks/prometheus/exporter.rs | 612 +++++++++++++++++++++++++++++-- 1 file changed, 590 insertions(+), 22 deletions(-) diff --git a/src/sinks/prometheus/exporter.rs b/src/sinks/prometheus/exporter.rs index 6072bc8274ea0..e4b7bf844be7c 100644 --- a/src/sinks/prometheus/exporter.rs +++ b/src/sinks/prometheus/exporter.rs @@ -22,7 +22,7 @@ use snafu::Snafu; use stream_cancel::{Trigger, Tripwire}; use tower::ServiceBuilder; use tower_http::compression::CompressionLayer; -use tracing::{Instrument, Span}; +use tracing::{Instrument, Span, error, info, warn}; use vector_lib::{ ByteSizeOf, EstimatedJsonEncodedSizeOf, configurable::configurable_component, @@ -32,6 +32,9 @@ use vector_lib::{ }, }; +#[cfg(feature = "kubernetes")] +use kube::{Api, Client}; + use super::collector::{MetricCollector, StringCollector}; use crate::{ config::{AcknowledgementsConfig, GenerateConfig, Input, Resource, SinkConfig, SinkContext}, @@ -39,7 +42,7 @@ use crate::{ Event, EventStatus, Finalizable, metric::{Metric, MetricData, MetricKind, MetricSeries, MetricValue}, }, - http::{Auth, build_http_trace_layer}, + http::build_http_trace_layer, internal_events::PrometheusNormalizationError, sinks::{ Healthcheck, VectorSink, @@ -58,6 +61,159 @@ enum BuildError { FlushPeriodTooShort { min: u64 }, } +/// Authentication configuration for the Prometheus exporter. +#[configurable_component] +#[derive(Clone, Debug)] +#[serde(tag = "strategy")] +#[serde(rename_all = "lowercase")] +#[configurable(metadata(docs::enum_tag_description = "The authentication strategy to use."))] +pub enum PrometheusExporterAuth { + /// Basic authentication. + Basic { + /// The basic authentication username. + #[configurable(metadata(docs::examples = "username"))] + user: String, + + /// The basic authentication password. + #[configurable(metadata(docs::examples = "password"))] + password: vector_lib::sensitive_string::SensitiveString, + }, + + /// Bearer authentication. + Bearer { + /// The bearer authentication token. + token: vector_lib::sensitive_string::SensitiveString, + }, + + /// Custom Authorization Header Value. + Custom { + /// Custom string value of the Authorization header. + #[configurable(metadata(docs::examples = "CUSTOM_PREFIX ${TOKEN}"))] + value: String, + }, + + #[cfg(feature = "kubernetes")] + /// Kubernetes SubjectAccessReview authentication. + /// + /// Validates Bearer tokens using Kubernetes TokenReview and SubjectAccessReview APIs. + /// Supports both resource-based and nonResourceURL-based authorization. + /// + /// ## Required RBAC Permissions + /// + /// Vector's ServiceAccount must have permissions to create TokenReview and SubjectAccessReview resources: + /// + /// ```yaml + /// apiVersion: rbac.authorization.k8s.io/v1 + /// kind: ClusterRole + /// metadata: + /// name: vector-token-validator + /// rules: + /// - apiGroups: ["authentication.k8s.io"] + /// resources: ["tokenreviews"] + /// verbs: ["create"] + /// - apiGroups: ["authorization.k8s.io"] + /// resources: ["subjectaccessreviews"] + /// verbs: ["create"] + /// ``` + /// + /// ## How it Works + /// + /// 1. Client (e.g., Prometheus) sends request with `Authorization: Bearer ` + /// 2. Vector extracts the Bearer token from the request + /// 3. Vector uses its own ServiceAccount to authenticate to the Kubernetes API + /// 4. Vector calls TokenReview API with the client's token to validate it and get user identity + /// 5. Vector calls SubjectAccessReview API to check if that user has the specified permissions + /// 6. Vector allows or denies the request based on the SubjectAccessReview response + /// + /// ## Configuration Examples + /// + /// NonResourceURL-based (for /metrics, /healthz, etc.): + /// ```toml + /// [sinks.prometheus.auth] + /// strategy = "sar" + /// path = "/metrics" + /// verb = "get" + /// ``` + /// + /// Resource-based (for Kubernetes resources): + /// ```toml + /// [sinks.prometheus.auth] + /// strategy = "sar" + /// resource = "pods" + /// verb = "get" + /// resource_group = "" + /// ``` + /// + Sar { + /// The URL path to check access for (nonResourceURL). + /// + /// Use this for API endpoints like /metrics, /healthz, /api. + /// Must start with "/" and match the nonResourceURLs in the client's RBAC. + /// + /// Mutually exclusive with `resource`. Specify either `path` OR `resource`, not both. + /// + /// Example RBAC rule for nonResourceURL: + /// ```yaml + /// - nonResourceURLs: ["/metrics"] + /// verbs: ["get"] + /// ``` + #[serde(default)] + #[configurable(metadata(docs::examples = "/metrics"))] + path: Option, + + /// The resource to check access for (Kubernetes resource). + /// + /// Use this for Kubernetes resources like pods, services, configmaps. + /// Mutually exclusive with `path`. Specify either `path` OR `resource`, not both. + /// + /// Example RBAC rule for resource: + /// ```yaml + /// - apiGroups: [""] + /// resources: ["metrics"] + /// verbs: ["get"] + /// ``` + #[serde(default)] + #[configurable(metadata(docs::examples = "metrics"))] + resource: Option, + + /// The verb to check. + /// + /// For resources: "get", "list", "watch", "create", "update", "delete" + /// For nonResourceURLs: typically "get" or "post" + #[configurable(metadata(docs::examples = "get"))] + verb: String, + + /// The API group for the resource (only used with `resource`, not `path`). + /// + /// Leave empty ("") for core Kubernetes resources. + /// Use the API group name for custom resources (e.g., "metrics.k8s.io"). + #[serde(default)] + #[configurable(metadata(docs::examples = ""))] + resource_group: String, + + /// The namespace to check access in (only used with `resource`, not `path`). + /// + /// If specified, checks for namespaced resource access. + /// If not specified (None), checks for cluster-scoped access. + #[serde(default)] + namespace: Option, + + /// Override the user to check access for. If not specified, uses the user from the TokenReview. + /// Typically left unset to validate the actual token holder's permissions. + #[serde(default)] + #[configurable(metadata( + docs::examples = "system:serviceaccount:my-namespace:myserviceaccount" + ))] + user: Option, + + /// Override the groups to check access for. If not specified, uses the groups from the TokenReview. + /// Typically left unset to validate the actual token holder's permissions. + #[serde(default)] + #[configurable(metadata(docs::examples = "system:authenticated"))] + groups: Option>, + }, +} + /// Configuration for the `prometheus_exporter` sink. #[serde_as] #[configurable_component(sink( @@ -87,7 +243,7 @@ pub struct PrometheusExporterConfig { pub address: SocketAddr, #[configurable(derived)] - pub auth: Option, + pub auth: Option, #[configurable(derived)] pub tls: Option, @@ -308,24 +464,296 @@ impl Hash for MetricRef { } } -fn authorized(req: &Request, auth: &Option) -> bool { +/// Validates a Bearer token using Kubernetes TokenReview and SubjectAccessReview. +/// +/// This function: +/// 1. Uses Vector's own service account to authenticate to the K8s API +/// 2. Validates the client's token using TokenReview API +/// 3. Extracts user identity from the TokenReview response +/// 4. Checks permissions using SubjectAccessReview API +#[cfg(feature = "kubernetes")] +async fn validate_token_with_sar( + token: &str, + resource: &str, + verb: &str, + resource_group: &str, + namespace: &Option, + user: &Option, + groups: &Option>, +) -> crate::Result { + use k8s_openapi::api::authentication::v1::{TokenReview, TokenReviewSpec}; + use k8s_openapi::api::authorization::v1::{ + ResourceAttributes, SubjectAccessReview, SubjectAccessReviewSpec, + }; + + debug!(message = "Validating bearer token"); + + // Create a Kubernetes client using Vector's own service account + // This uses the in-cluster configuration with Vector's token + let client = Client::try_default().await?; + + // Step 1: Validate the client's token using TokenReview + let token_review = TokenReview { + spec: TokenReviewSpec { + token: Some(token.to_string()), + audiences: None, + }, + ..Default::default() + }; + + debug!(message = "Calling TokenReview API"); + let token_api: Api = Api::all(client.clone()); + let token_result = token_api.create(&Default::default(), &token_review).await?; + + // Check if token is valid + let token_status = token_result + .status + .ok_or("TokenReview returned no status")?; + + if !token_status.authenticated.unwrap_or(false) { + warn!(message = "Token authentication failed via TokenReview"); + return Ok(false); + } + + // Extract user info from the validated token + let user_info = token_status + .user + .ok_or("TokenReview returned no user info")?; + + // Log the authenticated user + debug!( + message = "Token authenticated successfully", + username = ?user_info.username, + uid = ?user_info.uid, + groups = ?user_info.groups, + extra = ?user_info.extra + ); + + // Step 2: Create SubjectAccessReview with the validated user info + let resource_attrs = ResourceAttributes { + group: Some(resource_group.to_string()), + resource: Some(resource.to_string()), + verb: Some(verb.to_string()), + namespace: namespace.clone(), + ..Default::default() + }; + + // Determine the user and groups to check + let check_user = user.clone().or(user_info.username); + let check_groups = groups.clone().or(user_info.groups); + + let sar = SubjectAccessReview { + spec: SubjectAccessReviewSpec { + resource_attributes: Some(resource_attrs), + // Use user/groups from config if specified, otherwise use from token + user: check_user.clone(), + groups: check_groups.clone(), + ..Default::default() + }, + ..Default::default() + }; + + // Log the SubjectAccessReview request + debug!( + message = "Calling SubjectAccessReview API", + user = ?check_user, + groups = ?check_groups, + resource = %resource, + verb = %verb, + resource_group = %resource_group, + namespace = ?namespace + ); + + // Step 3: Check if the user has the required permissions + let sar_api: Api = Api::all(client); + let sar_result = sar_api.create(&Default::default(), &sar).await?; + + let allowed = sar_result + .status + .as_ref() + .map(|s| s.allowed) + .unwrap_or(false); + + // Log the SubjectAccessReview result + if allowed { + debug!( + message = "SubjectAccessReview allowed access", + user = ?check_user, + resource = %resource, + verb = %verb + ); + } else { + warn!( + message = "SubjectAccessReview denied access", + user = ?check_user, + resource = %resource, + verb = %verb, + reason = ?sar_result.status.as_ref().and_then(|s| s.reason.as_ref()), + evaluation_error = ?sar_result.status.as_ref().and_then(|s| s.evaluation_error.as_ref()) + ); + } + + Ok(allowed) +} + +/// Validates a Bearer token using Kubernetes TokenReview and SubjectAccessReview for nonResourceURLs. +/// +/// This function: +/// 1. Uses Vector's own service account to authenticate to the K8s API +/// 2. Validates the client's token using TokenReview API +/// 3. Extracts user identity from the TokenReview response +/// 4. Checks nonResourceURL permissions using SubjectAccessReview API +#[cfg(feature = "kubernetes")] +async fn validate_token_with_sar_nonresource( + token: &str, + path: &str, + verb: &str, + user: &Option, + groups: &Option>, +) -> crate::Result { + use k8s_openapi::api::authentication::v1::{TokenReview, TokenReviewSpec}; + use k8s_openapi::api::authorization::v1::{ + NonResourceAttributes, SubjectAccessReview, SubjectAccessReviewSpec, + }; + + debug!( + message = "Validating bearer token for nonResourceURL", + path = %path + ); + + // Create a Kubernetes client using Vector's own service account + let client = Client::try_default().await?; + + // Step 1: Validate the client's token using TokenReview + let token_review = TokenReview { + spec: TokenReviewSpec { + token: Some(token.to_string()), + audiences: None, + }, + ..Default::default() + }; + + debug!(message = "Calling TokenReview API"); + let token_api: Api = Api::all(client.clone()); + let token_result = token_api.create(&Default::default(), &token_review).await?; + + // Check if token is valid + let token_status = token_result + .status + .ok_or("TokenReview returned no status")?; + + if !token_status.authenticated.unwrap_or(false) { + warn!(message = "Token authentication failed via TokenReview"); + return Ok(false); + } + + // Extract user info from the validated token + let user_info = token_status + .user + .ok_or("TokenReview returned no user info")?; + + // Log the authenticated user + debug!( + message = "Token authenticated successfully", + username = ?user_info.username, + uid = ?user_info.uid, + groups = ?user_info.groups, + extra = ?user_info.extra + ); + + // Step 2: Create SubjectAccessReview with nonResourceAttributes + let non_resource_attrs = NonResourceAttributes { + path: Some(path.to_string()), + verb: Some(verb.to_string()), + }; + + // Determine the user and groups to check + let check_user = user.clone().or(user_info.username); + let check_groups = groups.clone().or(user_info.groups); + + let sar = SubjectAccessReview { + spec: SubjectAccessReviewSpec { + non_resource_attributes: Some(non_resource_attrs), + user: check_user.clone(), + groups: check_groups.clone(), + ..Default::default() + }, + ..Default::default() + }; + + // Log the SubjectAccessReview request + debug!( + message = "Calling SubjectAccessReview API for nonResourceURL", + user = ?check_user, + groups = ?check_groups, + path = %path, + verb = %verb + ); + + // Step 3: Check if the user has the required permissions + let sar_api: Api = Api::all(client); + let sar_result = sar_api.create(&Default::default(), &sar).await?; + + let allowed = sar_result + .status + .as_ref() + .map(|s| s.allowed) + .unwrap_or(false); + + // Log the SubjectAccessReview result + if allowed { + debug!( + message = "SubjectAccessReview allowed access to nonResourceURL", + user = ?check_user, + path = %path, + verb = %verb + ); + } else { + warn!( + message = "SubjectAccessReview denied access to nonResourceURL", + user = ?check_user, + path = %path, + verb = %verb, + reason = ?sar_result.status.as_ref().and_then(|s| s.reason.as_ref()), + evaluation_error = ?sar_result.status.as_ref().and_then(|s| s.evaluation_error.as_ref()) + ); + } + + Ok(allowed) +} + +/// Extracts the Bearer token from the Authorization header. +fn extract_bearer_token(req: &Request) -> Option { + req.headers() + .get(hyper::header::AUTHORIZATION)? + .to_str() + .ok()? + .strip_prefix("Bearer ") + .map(|s| s.to_string()) +} + +fn authorized(req: &Request, auth: &Option) -> bool { if let Some(auth) = auth { let headers = req.headers(); if let Some(auth_header) = headers.get(hyper::header::AUTHORIZATION) { let encoded_credentials = match auth { - Auth::Basic { user, password } => Some(HeaderValue::from_str( + PrometheusExporterAuth::Basic { user, password } => Some(HeaderValue::from_str( format!( "Basic {}", BASE64_STANDARD.encode(format!("{}:{}", user, password.inner())) ) .as_str(), )), - Auth::Bearer { token } => Some(HeaderValue::from_str( + PrometheusExporterAuth::Bearer { token } => Some(HeaderValue::from_str( format!("Bearer {}", token.inner()).as_str(), )), - Auth::Custom { value } => Some(HeaderValue::from_str(value)), - #[cfg(feature = "aws-core")] - _ => None, + PrometheusExporterAuth::Custom { value } => Some(HeaderValue::from_str(value)), + #[cfg(feature = "kubernetes")] + PrometheusExporterAuth::Sar { .. } => { + // SubjectAccessReview is handled asynchronously in check_authorization + // This should never be reached + return false; + } }; if let Some(Ok(encoded_credentials)) = encoded_credentials @@ -343,7 +771,7 @@ fn authorized(req: &Request, auth: &Option) -> bool { #[derive(Clone)] struct Handler { - auth: Option, + auth: Option, default_namespace: Option, buckets: Box<[f64]>, quantiles: Box<[f64]>, @@ -352,14 +780,17 @@ struct Handler { } impl Handler { - fn handle( + async fn handle( &self, req: Request, metrics: &RwLock>, ) -> Response { let mut response = Response::new(Body::empty()); - match (authorized(&req, &self.auth), req.method(), req.uri().path()) { + // Check authorization - SAR takes precedence over basic auth when both token and SAR config present + let is_authorized = self.check_authorization(&req).await; + + match (is_authorized, req.method(), req.uri().path()) { (false, _, _) => { *response.status_mut() = StatusCode::UNAUTHORIZED; response.headers_mut().insert( @@ -411,6 +842,113 @@ impl Handler { response } + + async fn check_authorization(&self, req: &Request) -> bool { + // Handle SubjectAccessReview authentication + #[cfg(feature = "kubernetes")] + if let Some(PrometheusExporterAuth::Sar { + path, + resource, + verb, + resource_group, + namespace, + user, + groups, + }) = &self.auth + { + // Determine whether to use nonResourceURL or resource-based validation + match (path, resource) { + (Some(p), None) => { + // NonResourceURL-based authorization + debug!( + message = "Using SubjectAccessReview authentication for nonResourceURL", + path = %p, + verb = %verb + ); + + if let Some(token) = extract_bearer_token(req) { + debug!(message = "Extracted Bearer token from request"); + + match validate_token_with_sar_nonresource(&token, p, verb, user, groups) + .await + { + Ok(allowed) => { + return allowed; + } + Err(e) => { + error!( + message = "Failed to validate token with SubjectAccessReview for nonResourceURL", + error = %e + ); + return false; + } + } + } else { + warn!( + message = "SubjectAccessReview configured but no Bearer token provided in Authorization header" + ); + return false; + } + } + (None, Some(r)) => { + // Resource-based authorization + debug!( + message = "Using SubjectAccessReview authentication for resource", + resource = %r, + verb = %verb, + resource_group = %resource_group + ); + + if let Some(token) = extract_bearer_token(req) { + debug!(message = "Extracted Bearer token from request"); + + match validate_token_with_sar( + &token, + r, + verb, + resource_group, + namespace, + user, + groups, + ) + .await + { + Ok(allowed) => { + return allowed; + } + Err(e) => { + error!( + message = "Failed to validate token with SubjectAccessReview for resource", + error = %e + ); + return false; + } + } + } else { + warn!( + message = "SubjectAccessReview configured but no Bearer token provided in Authorization header" + ); + return false; + } + } + (Some(_), Some(_)) => { + error!( + message = "SubjectAccessReview configuration error: cannot specify both 'path' and 'resource'" + ); + return false; + } + (None, None) => { + error!( + message = "SubjectAccessReview configuration error: must specify either 'path' or 'resource'" + ); + return false; + } + } + } + + // Fall back to standard auth (Basic, Bearer, Custom) + authorized(req, &self.auth) + } } impl PrometheusExporter { @@ -445,9 +983,13 @@ impl PrometheusExporter { let handler = handler.clone(); let inner = service_fn(move |req| { - let response = handler.handle(req, &metrics); + let handler = handler.clone(); + let metrics = Arc::clone(&metrics); - future::ok::<_, Infallible>(response) + async move { + let response = handler.handle(req, &metrics).await; + Ok::<_, Infallible>(response) + } }); let service = ServiceBuilder::new() @@ -683,7 +1225,7 @@ mod tests { let (name2, event2) = tests::create_metric_set(None, vec!["0", "1", "2"]); let events = vec![event1, event2]; - let auth_config = Auth::Basic { + let auth_config = PrometheusExporterAuth::Basic { user: "user".to_string(), password: SensitiveString::from("password".to_string()), }; @@ -720,7 +1262,7 @@ mod tests { let (name2, event2) = tests::create_metric_set(None, vec!["0", "1", "2"]); let events = vec![event1, event2]; - let auth_config = Auth::Bearer { + let auth_config = PrometheusExporterAuth::Bearer { token: SensitiveString::from("token".to_string()), }; @@ -756,7 +1298,7 @@ mod tests { let (_, event2) = tests::create_metric_set(None, vec!["0", "1", "2"]); let events = vec![event1, event2]; - let server_auth_config = Auth::Bearer { + let server_auth_config = PrometheusExporterAuth::Bearer { token: SensitiveString::from("token".to_string()), }; @@ -773,11 +1315,11 @@ mod tests { let (_, event2) = tests::create_metric_set(None, vec!["0", "1", "2"]); let events = vec![event1, event2]; - let server_auth_config = Auth::Bearer { + let server_auth_config = PrometheusExporterAuth::Bearer { token: SensitiveString::from("token".to_string()), }; - let client_auth_config = Auth::Basic { + let client_auth_config = PrometheusExporterAuth::Basic { user: "user".to_string(), password: SensitiveString::from("password".to_string()), }; @@ -982,8 +1524,8 @@ mod tests { } async fn export_and_fetch_with_auth( - server_auth_config: Option, - client_auth_config: Option, + server_auth_config: Option, + client_auth_config: Option, mut events: Vec, suppress_timestamp: bool, ) -> Result { @@ -1027,7 +1569,33 @@ mod tests { .expect("Error creating request."); if let Some(client_auth_config) = client_auth_config { - client_auth_config.apply(&mut request); + match client_auth_config { + PrometheusExporterAuth::Basic { user, password } => { + let credentials = format!("{}:{}", user, password.inner()); + let encoded = BASE64_STANDARD.encode(credentials.as_bytes()); + request.headers_mut().insert( + hyper::header::AUTHORIZATION, + HeaderValue::from_str(&format!("Basic {}", encoded)).unwrap(), + ); + } + PrometheusExporterAuth::Bearer { token } => { + request.headers_mut().insert( + hyper::header::AUTHORIZATION, + HeaderValue::from_str(&format!("Bearer {}", token.inner())).unwrap(), + ); + } + PrometheusExporterAuth::Custom { value } => { + request.headers_mut().insert( + hyper::header::AUTHORIZATION, + HeaderValue::from_str(&value).unwrap(), + ); + } + #[cfg(feature = "kubernetes")] + PrometheusExporterAuth::Sar { .. } => { + // SAR auth is server-side only, not used for client requests in tests + panic!("SAR auth cannot be used for client-side requests in tests"); + } + } } let proxy = ProxyConfig::default(); From d5c171ab94fa91d017dc1884bc86bfc8519b41d3 Mon Sep 17 00:00:00 2001 From: Jeff Cantrill Date: Tue, 5 May 2026 12:13:10 -0400 Subject: [PATCH 2/5] remove sar redudency and inst. client once --- src/sinks/prometheus/exporter.rs | 386 +++++++++++-------------------- 1 file changed, 140 insertions(+), 246 deletions(-) diff --git a/src/sinks/prometheus/exporter.rs b/src/sinks/prometheus/exporter.rs index e4b7bf844be7c..197eca86d90d6 100644 --- a/src/sinks/prometheus/exporter.rs +++ b/src/sinks/prometheus/exporter.rs @@ -466,31 +466,37 @@ impl Hash for MetricRef { /// Validates a Bearer token using Kubernetes TokenReview and SubjectAccessReview. /// +/// This function supports both resource-based and nonResourceURL-based authorization: +/// - For resource-based: provide `resource`, `resource_group`, and optionally `namespace` +/// - For nonResourceURL-based: provide `path` +/// /// This function: /// 1. Uses Vector's own service account to authenticate to the K8s API /// 2. Validates the client's token using TokenReview API /// 3. Extracts user identity from the TokenReview response -/// 4. Checks permissions using SubjectAccessReview API +/// 4. Checks permissions using SubjectAccessReview API (with ResourceAttributes or NonResourceAttributes) #[cfg(feature = "kubernetes")] async fn validate_token_with_sar( + client: &Client, token: &str, - resource: &str, verb: &str, - resource_group: &str, + path: Option<&str>, + resource: Option<&str>, + resource_group: Option<&str>, namespace: &Option, user: &Option, groups: &Option>, ) -> crate::Result { use k8s_openapi::api::authentication::v1::{TokenReview, TokenReviewSpec}; use k8s_openapi::api::authorization::v1::{ - ResourceAttributes, SubjectAccessReview, SubjectAccessReviewSpec, + NonResourceAttributes, ResourceAttributes, SubjectAccessReview, SubjectAccessReviewSpec, }; - debug!(message = "Validating bearer token"); - - // Create a Kubernetes client using Vector's own service account - // This uses the in-cluster configuration with Vector's token - let client = Client::try_default().await?; + debug!( + message = "Validating bearer token", + path = ?path, + resource = ?resource + ); // Step 1: Validate the client's token using TokenReview let token_review = TokenReview { @@ -529,169 +535,74 @@ async fn validate_token_with_sar( extra = ?user_info.extra ); - // Step 2: Create SubjectAccessReview with the validated user info - let resource_attrs = ResourceAttributes { - group: Some(resource_group.to_string()), - resource: Some(resource.to_string()), - verb: Some(verb.to_string()), - namespace: namespace.clone(), - ..Default::default() - }; - // Determine the user and groups to check let check_user = user.clone().or(user_info.username); let check_groups = groups.clone().or(user_info.groups); - let sar = SubjectAccessReview { - spec: SubjectAccessReviewSpec { - resource_attributes: Some(resource_attrs), - // Use user/groups from config if specified, otherwise use from token - user: check_user.clone(), - groups: check_groups.clone(), - ..Default::default() - }, - ..Default::default() - }; - - // Log the SubjectAccessReview request - debug!( - message = "Calling SubjectAccessReview API", - user = ?check_user, - groups = ?check_groups, - resource = %resource, - verb = %verb, - resource_group = %resource_group, - namespace = ?namespace - ); - - // Step 3: Check if the user has the required permissions - let sar_api: Api = Api::all(client); - let sar_result = sar_api.create(&Default::default(), &sar).await?; - - let allowed = sar_result - .status - .as_ref() - .map(|s| s.allowed) - .unwrap_or(false); - - // Log the SubjectAccessReview result - if allowed { - debug!( - message = "SubjectAccessReview allowed access", - user = ?check_user, - resource = %resource, - verb = %verb - ); - } else { - warn!( - message = "SubjectAccessReview denied access", - user = ?check_user, - resource = %resource, - verb = %verb, - reason = ?sar_result.status.as_ref().and_then(|s| s.reason.as_ref()), - evaluation_error = ?sar_result.status.as_ref().and_then(|s| s.evaluation_error.as_ref()) - ); - } - - Ok(allowed) -} - -/// Validates a Bearer token using Kubernetes TokenReview and SubjectAccessReview for nonResourceURLs. -/// -/// This function: -/// 1. Uses Vector's own service account to authenticate to the K8s API -/// 2. Validates the client's token using TokenReview API -/// 3. Extracts user identity from the TokenReview response -/// 4. Checks nonResourceURL permissions using SubjectAccessReview API -#[cfg(feature = "kubernetes")] -async fn validate_token_with_sar_nonresource( - token: &str, - path: &str, - verb: &str, - user: &Option, - groups: &Option>, -) -> crate::Result { - use k8s_openapi::api::authentication::v1::{TokenReview, TokenReviewSpec}; - use k8s_openapi::api::authorization::v1::{ - NonResourceAttributes, SubjectAccessReview, SubjectAccessReviewSpec, - }; - - debug!( - message = "Validating bearer token for nonResourceURL", - path = %path - ); - - // Create a Kubernetes client using Vector's own service account - let client = Client::try_default().await?; - - // Step 1: Validate the client's token using TokenReview - let token_review = TokenReview { - spec: TokenReviewSpec { - token: Some(token.to_string()), - audiences: None, - }, - ..Default::default() - }; - - debug!(message = "Calling TokenReview API"); - let token_api: Api = Api::all(client.clone()); - let token_result = token_api.create(&Default::default(), &token_review).await?; - - // Check if token is valid - let token_status = token_result - .status - .ok_or("TokenReview returned no status")?; - - if !token_status.authenticated.unwrap_or(false) { - warn!(message = "Token authentication failed via TokenReview"); - return Ok(false); - } - - // Extract user info from the validated token - let user_info = token_status - .user - .ok_or("TokenReview returned no user info")?; + // Step 2: Create SubjectAccessReview with appropriate attributes + let sar = match (path, resource) { + (Some(p), None) => { + // NonResourceURL-based authorization + let non_resource_attrs = NonResourceAttributes { + path: Some(p.to_string()), + verb: Some(verb.to_string()), + }; - // Log the authenticated user - debug!( - message = "Token authenticated successfully", - username = ?user_info.username, - uid = ?user_info.uid, - groups = ?user_info.groups, - extra = ?user_info.extra - ); + debug!( + message = "Calling SubjectAccessReview API for nonResourceURL", + user = ?check_user, + groups = ?check_groups, + path = %p, + verb = %verb + ); - // Step 2: Create SubjectAccessReview with nonResourceAttributes - let non_resource_attrs = NonResourceAttributes { - path: Some(path.to_string()), - verb: Some(verb.to_string()), - }; + SubjectAccessReview { + spec: SubjectAccessReviewSpec { + non_resource_attributes: Some(non_resource_attrs), + user: check_user.clone(), + groups: check_groups.clone(), + ..Default::default() + }, + ..Default::default() + } + } + (None, Some(r)) => { + // Resource-based authorization + let resource_attrs = ResourceAttributes { + group: Some(resource_group.unwrap_or("").to_string()), + resource: Some(r.to_string()), + verb: Some(verb.to_string()), + namespace: namespace.clone(), + ..Default::default() + }; - // Determine the user and groups to check - let check_user = user.clone().or(user_info.username); - let check_groups = groups.clone().or(user_info.groups); + debug!( + message = "Calling SubjectAccessReview API for resource", + user = ?check_user, + groups = ?check_groups, + resource = %r, + verb = %verb, + resource_group = %resource_group.unwrap_or(""), + namespace = ?namespace + ); - let sar = SubjectAccessReview { - spec: SubjectAccessReviewSpec { - non_resource_attributes: Some(non_resource_attrs), - user: check_user.clone(), - groups: check_groups.clone(), - ..Default::default() - }, - ..Default::default() + SubjectAccessReview { + spec: SubjectAccessReviewSpec { + resource_attributes: Some(resource_attrs), + user: check_user.clone(), + groups: check_groups.clone(), + ..Default::default() + }, + ..Default::default() + } + } + _ => { + return Err("Must specify either 'path' or 'resource', not both or neither".into()); + } }; - // Log the SubjectAccessReview request - debug!( - message = "Calling SubjectAccessReview API for nonResourceURL", - user = ?check_user, - groups = ?check_groups, - path = %path, - verb = %verb - ); - // Step 3: Check if the user has the required permissions - let sar_api: Api = Api::all(client); + let sar_api: Api = Api::all(client.clone()); let sar_result = sar_api.create(&Default::default(), &sar).await?; let allowed = sar_result @@ -703,16 +614,18 @@ async fn validate_token_with_sar_nonresource( // Log the SubjectAccessReview result if allowed { debug!( - message = "SubjectAccessReview allowed access to nonResourceURL", + message = "SubjectAccessReview allowed access", user = ?check_user, - path = %path, + path = ?path, + resource = ?resource, verb = %verb ); } else { warn!( - message = "SubjectAccessReview denied access to nonResourceURL", + message = "SubjectAccessReview denied access", user = ?check_user, - path = %path, + path = ?path, + resource = ?resource, verb = %verb, reason = ?sar_result.status.as_ref().and_then(|s| s.reason.as_ref()), evaluation_error = ?sar_result.status.as_ref().and_then(|s| s.evaluation_error.as_ref()) @@ -777,6 +690,8 @@ struct Handler { quantiles: Box<[f64]>, bytes_sent: Registered, events_sent: Registered, + #[cfg(feature = "kubernetes")] + kube_client: Option, } impl Handler { @@ -856,93 +771,50 @@ impl Handler { groups, }) = &self.auth { - // Determine whether to use nonResourceURL or resource-based validation - match (path, resource) { - (Some(p), None) => { - // NonResourceURL-based authorization - debug!( - message = "Using SubjectAccessReview authentication for nonResourceURL", - path = %p, - verb = %verb - ); - - if let Some(token) = extract_bearer_token(req) { - debug!(message = "Extracted Bearer token from request"); - - match validate_token_with_sar_nonresource(&token, p, verb, user, groups) - .await - { - Ok(allowed) => { - return allowed; - } - Err(e) => { - error!( - message = "Failed to validate token with SubjectAccessReview for nonResourceURL", - error = %e - ); - return false; - } - } - } else { - warn!( - message = "SubjectAccessReview configured but no Bearer token provided in Authorization header" - ); - return false; - } + // Ensure we have a Kubernetes client + let client = match &self.kube_client { + Some(c) => c, + None => { + error!(message = "SubjectAccessReview configured but Kubernetes client not initialized"); + return false; } - (None, Some(r)) => { - // Resource-based authorization - debug!( - message = "Using SubjectAccessReview authentication for resource", - resource = %r, - verb = %verb, - resource_group = %resource_group - ); + }; - if let Some(token) = extract_bearer_token(req) { - debug!(message = "Extracted Bearer token from request"); - - match validate_token_with_sar( - &token, - r, - verb, - resource_group, - namespace, - user, - groups, - ) - .await - { - Ok(allowed) => { - return allowed; - } - Err(e) => { - error!( - message = "Failed to validate token with SubjectAccessReview for resource", - error = %e - ); - return false; - } - } - } else { - warn!( - message = "SubjectAccessReview configured but no Bearer token provided in Authorization header" + // Validate token with SubjectAccessReview + if let Some(token) = extract_bearer_token(req) { + debug!(message = "Extracted Bearer token from request"); + + match validate_token_with_sar( + client, + &token, + verb, + path.as_deref(), + resource.as_deref(), + resource_group.as_deref(), + namespace, + user, + groups, + ) + .await + { + Ok(allowed) => { + return allowed; + } + Err(e) => { + error!( + message = "Failed to validate token with SubjectAccessReview", + error = %e, + path = ?path, + resource = ?resource ); return false; } } - (Some(_), Some(_)) => { - error!( - message = "SubjectAccessReview configuration error: cannot specify both 'path' and 'resource'" - ); - return false; - } - (None, None) => { - error!( - message = "SubjectAccessReview configuration error: must specify either 'path' or 'resource'" - ); - return false; - } + } else { + warn!( + message = "SubjectAccessReview configured but no Bearer token provided in Authorization header" + ); + return false; } } @@ -965,6 +837,26 @@ impl PrometheusExporter { return Ok(()); } + // Create Kubernetes client if SAR authentication is configured + #[cfg(feature = "kubernetes")] + let kube_client = if matches!(self.config.auth, Some(PrometheusExporterAuth::Sar { .. })) { + match Client::try_default().await { + Ok(client) => { + info!(message = "Kubernetes client initialized for SubjectAccessReview authentication"); + Some(client) + } + Err(e) => { + error!( + message = "Failed to initialize Kubernetes client for SubjectAccessReview authentication", + error = %e + ); + return Err(Box::new(e)); + } + } + } else { + None + }; + let handler = Handler { bytes_sent: register!(BytesSent::from(Protocol::HTTP)), events_sent: register!(EventsSent::from(Output(None))), @@ -972,6 +864,8 @@ impl PrometheusExporter { buckets: self.config.buckets.clone().into(), quantiles: self.config.quantiles.clone().into(), auth: self.config.auth.clone(), + #[cfg(feature = "kubernetes")] + kube_client, }; let span = Span::current(); From 6bc870e588f2508d50d298542c17425557a879a4 Mon Sep 17 00:00:00 2001 From: Jeff Cantrill Date: Tue, 5 May 2026 14:02:58 -0400 Subject: [PATCH 3/5] fix compile error --- src/sinks/prometheus/exporter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sinks/prometheus/exporter.rs b/src/sinks/prometheus/exporter.rs index 197eca86d90d6..4f340dbddb9b2 100644 --- a/src/sinks/prometheus/exporter.rs +++ b/src/sinks/prometheus/exporter.rs @@ -790,7 +790,7 @@ impl Handler { verb, path.as_deref(), resource.as_deref(), - resource_group.as_deref(), + Some(resource_group.as_str()), namespace, user, groups, From 898ab9f77b8c881aef0c46d7f23384c6ddd152dd Mon Sep 17 00:00:00 2001 From: Jeff Cantrill Date: Tue, 5 May 2026 16:29:31 -0400 Subject: [PATCH 4/5] fix clippy and fmt errors --- src/sinks/prometheus/exporter.rs | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/sinks/prometheus/exporter.rs b/src/sinks/prometheus/exporter.rs index 4f340dbddb9b2..50ea604893cb9 100644 --- a/src/sinks/prometheus/exporter.rs +++ b/src/sinks/prometheus/exporter.rs @@ -669,10 +669,10 @@ fn authorized(req: &Request, auth: &Option c, None => { - error!(message = "SubjectAccessReview configured but Kubernetes client not initialized"); + error!( + message = + "SubjectAccessReview configured but Kubernetes client not initialized" + ); return false; } }; @@ -842,7 +845,10 @@ impl PrometheusExporter { let kube_client = if matches!(self.config.auth, Some(PrometheusExporterAuth::Sar { .. })) { match Client::try_default().await { Ok(client) => { - info!(message = "Kubernetes client initialized for SubjectAccessReview authentication"); + info!( + message = + "Kubernetes client initialized for SubjectAccessReview authentication" + ); Some(client) } Err(e) => { From c7a3408fbccfcae28511ad90c7b986c08f0b1b84 Mon Sep 17 00:00:00 2001 From: Jeff Cantrill Date: Wed, 6 May 2026 09:49:47 -0400 Subject: [PATCH 5/5] fix clippy failures --- src/sinks/prometheus/exporter.rs | 80 +++++++++++++++++--------------- 1 file changed, 43 insertions(+), 37 deletions(-) diff --git a/src/sinks/prometheus/exporter.rs b/src/sinks/prometheus/exporter.rs index 50ea604893cb9..dbc1ca54ed078 100644 --- a/src/sinks/prometheus/exporter.rs +++ b/src/sinks/prometheus/exporter.rs @@ -464,6 +464,18 @@ impl Hash for MetricRef { } } +/// Parameters for SubjectAccessReview authorization check. +#[cfg(feature = "kubernetes")] +struct SarAuthParams<'a> { + verb: &'a str, + path: Option<&'a str>, + resource: Option<&'a str>, + resource_group: Option<&'a str>, + namespace: &'a Option, + user: &'a Option, + groups: &'a Option>, +} + /// Validates a Bearer token using Kubernetes TokenReview and SubjectAccessReview. /// /// This function supports both resource-based and nonResourceURL-based authorization: @@ -479,13 +491,7 @@ impl Hash for MetricRef { async fn validate_token_with_sar( client: &Client, token: &str, - verb: &str, - path: Option<&str>, - resource: Option<&str>, - resource_group: Option<&str>, - namespace: &Option, - user: &Option, - groups: &Option>, + params: SarAuthParams<'_>, ) -> crate::Result { use k8s_openapi::api::authentication::v1::{TokenReview, TokenReviewSpec}; use k8s_openapi::api::authorization::v1::{ @@ -494,8 +500,8 @@ async fn validate_token_with_sar( debug!( message = "Validating bearer token", - path = ?path, - resource = ?resource + path = ?params.path, + resource = ?params.resource ); // Step 1: Validate the client's token using TokenReview @@ -536,16 +542,16 @@ async fn validate_token_with_sar( ); // Determine the user and groups to check - let check_user = user.clone().or(user_info.username); - let check_groups = groups.clone().or(user_info.groups); + let check_user = params.user.clone().or(user_info.username); + let check_groups = params.groups.clone().or(user_info.groups); // Step 2: Create SubjectAccessReview with appropriate attributes - let sar = match (path, resource) { + let sar = match (params.path, params.resource) { (Some(p), None) => { // NonResourceURL-based authorization let non_resource_attrs = NonResourceAttributes { path: Some(p.to_string()), - verb: Some(verb.to_string()), + verb: Some(params.verb.to_string()), }; debug!( @@ -553,7 +559,7 @@ async fn validate_token_with_sar( user = ?check_user, groups = ?check_groups, path = %p, - verb = %verb + verb = %params.verb ); SubjectAccessReview { @@ -569,10 +575,10 @@ async fn validate_token_with_sar( (None, Some(r)) => { // Resource-based authorization let resource_attrs = ResourceAttributes { - group: Some(resource_group.unwrap_or("").to_string()), + group: Some(params.resource_group.unwrap_or("").to_string()), resource: Some(r.to_string()), - verb: Some(verb.to_string()), - namespace: namespace.clone(), + verb: Some(params.verb.to_string()), + namespace: params.namespace.clone(), ..Default::default() }; @@ -581,9 +587,9 @@ async fn validate_token_with_sar( user = ?check_user, groups = ?check_groups, resource = %r, - verb = %verb, - resource_group = %resource_group.unwrap_or(""), - namespace = ?namespace + verb = %params.verb, + resource_group = %params.resource_group.unwrap_or(""), + namespace = ?params.namespace ); SubjectAccessReview { @@ -616,17 +622,17 @@ async fn validate_token_with_sar( debug!( message = "SubjectAccessReview allowed access", user = ?check_user, - path = ?path, - resource = ?resource, - verb = %verb + path = ?params.path, + resource = ?params.resource, + verb = %params.verb ); } else { warn!( message = "SubjectAccessReview denied access", user = ?check_user, - path = ?path, - resource = ?resource, - verb = %verb, + path = ?params.path, + resource = ?params.resource, + verb = %params.verb, reason = ?sar_result.status.as_ref().and_then(|s| s.reason.as_ref()), evaluation_error = ?sar_result.status.as_ref().and_then(|s| s.evaluation_error.as_ref()) ); @@ -669,10 +675,8 @@ fn authorized(req: &Request, auth: &Option