diff --git a/src/sinks/prometheus/exporter.rs b/src/sinks/prometheus/exporter.rs index 6072bc8274ea0..dbc1ca54ed078 100644 --- a/src/sinks/prometheus/exporter.rs +++ b/src/sinks/prometheus/exporter.rs @@ -22,7 +22,7 @@ use snafu::Snafu; use stream_cancel::{Trigger, Tripwire}; use tower::ServiceBuilder; use tower_http::compression::CompressionLayer; -use tracing::{Instrument, Span}; +use tracing::{Instrument, Span, error, info, warn}; use vector_lib::{ ByteSizeOf, EstimatedJsonEncodedSizeOf, configurable::configurable_component, @@ -32,6 +32,9 @@ use vector_lib::{ }, }; +#[cfg(feature = "kubernetes")] +use kube::{Api, Client}; + use super::collector::{MetricCollector, StringCollector}; use crate::{ config::{AcknowledgementsConfig, GenerateConfig, Input, Resource, SinkConfig, SinkContext}, @@ -39,7 +42,7 @@ use crate::{ Event, EventStatus, Finalizable, metric::{Metric, MetricData, MetricKind, MetricSeries, MetricValue}, }, - http::{Auth, build_http_trace_layer}, + http::build_http_trace_layer, internal_events::PrometheusNormalizationError, sinks::{ Healthcheck, VectorSink, @@ -58,6 +61,159 @@ enum BuildError { FlushPeriodTooShort { min: u64 }, } +/// Authentication configuration for the Prometheus exporter. +#[configurable_component] +#[derive(Clone, Debug)] +#[serde(tag = "strategy")] +#[serde(rename_all = "lowercase")] +#[configurable(metadata(docs::enum_tag_description = "The authentication strategy to use."))] +pub enum PrometheusExporterAuth { + /// Basic authentication. + Basic { + /// The basic authentication username. + #[configurable(metadata(docs::examples = "username"))] + user: String, + + /// The basic authentication password. + #[configurable(metadata(docs::examples = "password"))] + password: vector_lib::sensitive_string::SensitiveString, + }, + + /// Bearer authentication. + Bearer { + /// The bearer authentication token. + token: vector_lib::sensitive_string::SensitiveString, + }, + + /// Custom Authorization Header Value. + Custom { + /// Custom string value of the Authorization header. + #[configurable(metadata(docs::examples = "CUSTOM_PREFIX ${TOKEN}"))] + value: String, + }, + + #[cfg(feature = "kubernetes")] + /// Kubernetes SubjectAccessReview authentication. + /// + /// Validates Bearer tokens using Kubernetes TokenReview and SubjectAccessReview APIs. + /// Supports both resource-based and nonResourceURL-based authorization. + /// + /// ## Required RBAC Permissions + /// + /// Vector's ServiceAccount must have permissions to create TokenReview and SubjectAccessReview resources: + /// + /// ```yaml + /// apiVersion: rbac.authorization.k8s.io/v1 + /// kind: ClusterRole + /// metadata: + /// name: vector-token-validator + /// rules: + /// - apiGroups: ["authentication.k8s.io"] + /// resources: ["tokenreviews"] + /// verbs: ["create"] + /// - apiGroups: ["authorization.k8s.io"] + /// resources: ["subjectaccessreviews"] + /// verbs: ["create"] + /// ``` + /// + /// ## How it Works + /// + /// 1. Client (e.g., Prometheus) sends request with `Authorization: Bearer ` + /// 2. Vector extracts the Bearer token from the request + /// 3. Vector uses its own ServiceAccount to authenticate to the Kubernetes API + /// 4. Vector calls TokenReview API with the client's token to validate it and get user identity + /// 5. Vector calls SubjectAccessReview API to check if that user has the specified permissions + /// 6. Vector allows or denies the request based on the SubjectAccessReview response + /// + /// ## Configuration Examples + /// + /// NonResourceURL-based (for /metrics, /healthz, etc.): + /// ```toml + /// [sinks.prometheus.auth] + /// strategy = "sar" + /// path = "/metrics" + /// verb = "get" + /// ``` + /// + /// Resource-based (for Kubernetes resources): + /// ```toml + /// [sinks.prometheus.auth] + /// strategy = "sar" + /// resource = "pods" + /// verb = "get" + /// resource_group = "" + /// ``` + /// + Sar { + /// The URL path to check access for (nonResourceURL). + /// + /// Use this for API endpoints like /metrics, /healthz, /api. + /// Must start with "/" and match the nonResourceURLs in the client's RBAC. + /// + /// Mutually exclusive with `resource`. Specify either `path` OR `resource`, not both. + /// + /// Example RBAC rule for nonResourceURL: + /// ```yaml + /// - nonResourceURLs: ["/metrics"] + /// verbs: ["get"] + /// ``` + #[serde(default)] + #[configurable(metadata(docs::examples = "/metrics"))] + path: Option, + + /// The resource to check access for (Kubernetes resource). + /// + /// Use this for Kubernetes resources like pods, services, configmaps. + /// Mutually exclusive with `path`. Specify either `path` OR `resource`, not both. + /// + /// Example RBAC rule for resource: + /// ```yaml + /// - apiGroups: [""] + /// resources: ["metrics"] + /// verbs: ["get"] + /// ``` + #[serde(default)] + #[configurable(metadata(docs::examples = "metrics"))] + resource: Option, + + /// The verb to check. + /// + /// For resources: "get", "list", "watch", "create", "update", "delete" + /// For nonResourceURLs: typically "get" or "post" + #[configurable(metadata(docs::examples = "get"))] + verb: String, + + /// The API group for the resource (only used with `resource`, not `path`). + /// + /// Leave empty ("") for core Kubernetes resources. + /// Use the API group name for custom resources (e.g., "metrics.k8s.io"). + #[serde(default)] + #[configurable(metadata(docs::examples = ""))] + resource_group: String, + + /// The namespace to check access in (only used with `resource`, not `path`). + /// + /// If specified, checks for namespaced resource access. + /// If not specified (None), checks for cluster-scoped access. + #[serde(default)] + namespace: Option, + + /// Override the user to check access for. If not specified, uses the user from the TokenReview. + /// Typically left unset to validate the actual token holder's permissions. + #[serde(default)] + #[configurable(metadata( + docs::examples = "system:serviceaccount:my-namespace:myserviceaccount" + ))] + user: Option, + + /// Override the groups to check access for. If not specified, uses the groups from the TokenReview. + /// Typically left unset to validate the actual token holder's permissions. + #[serde(default)] + #[configurable(metadata(docs::examples = "system:authenticated"))] + groups: Option>, + }, +} + /// Configuration for the `prometheus_exporter` sink. #[serde_as] #[configurable_component(sink( @@ -87,7 +243,7 @@ pub struct PrometheusExporterConfig { pub address: SocketAddr, #[configurable(derived)] - pub auth: Option, + pub auth: Option, #[configurable(derived)] pub tls: Option, @@ -308,29 +464,218 @@ impl Hash for MetricRef { } } -fn authorized(req: &Request, auth: &Option) -> bool { +/// Parameters for SubjectAccessReview authorization check. +#[cfg(feature = "kubernetes")] +struct SarAuthParams<'a> { + verb: &'a str, + path: Option<&'a str>, + resource: Option<&'a str>, + resource_group: Option<&'a str>, + namespace: &'a Option, + user: &'a Option, + groups: &'a Option>, +} + +/// Validates a Bearer token using Kubernetes TokenReview and SubjectAccessReview. +/// +/// This function supports both resource-based and nonResourceURL-based authorization: +/// - For resource-based: provide `resource`, `resource_group`, and optionally `namespace` +/// - For nonResourceURL-based: provide `path` +/// +/// This function: +/// 1. Uses Vector's own service account to authenticate to the K8s API +/// 2. Validates the client's token using TokenReview API +/// 3. Extracts user identity from the TokenReview response +/// 4. Checks permissions using SubjectAccessReview API (with ResourceAttributes or NonResourceAttributes) +#[cfg(feature = "kubernetes")] +async fn validate_token_with_sar( + client: &Client, + token: &str, + params: SarAuthParams<'_>, +) -> crate::Result { + use k8s_openapi::api::authentication::v1::{TokenReview, TokenReviewSpec}; + use k8s_openapi::api::authorization::v1::{ + NonResourceAttributes, ResourceAttributes, SubjectAccessReview, SubjectAccessReviewSpec, + }; + + debug!( + message = "Validating bearer token", + path = ?params.path, + resource = ?params.resource + ); + + // Step 1: Validate the client's token using TokenReview + let token_review = TokenReview { + spec: TokenReviewSpec { + token: Some(token.to_string()), + audiences: None, + }, + ..Default::default() + }; + + debug!(message = "Calling TokenReview API"); + let token_api: Api = Api::all(client.clone()); + let token_result = token_api.create(&Default::default(), &token_review).await?; + + // Check if token is valid + let token_status = token_result + .status + .ok_or("TokenReview returned no status")?; + + if !token_status.authenticated.unwrap_or(false) { + warn!(message = "Token authentication failed via TokenReview"); + return Ok(false); + } + + // Extract user info from the validated token + let user_info = token_status + .user + .ok_or("TokenReview returned no user info")?; + + // Log the authenticated user + debug!( + message = "Token authenticated successfully", + username = ?user_info.username, + uid = ?user_info.uid, + groups = ?user_info.groups, + extra = ?user_info.extra + ); + + // Determine the user and groups to check + let check_user = params.user.clone().or(user_info.username); + let check_groups = params.groups.clone().or(user_info.groups); + + // Step 2: Create SubjectAccessReview with appropriate attributes + let sar = match (params.path, params.resource) { + (Some(p), None) => { + // NonResourceURL-based authorization + let non_resource_attrs = NonResourceAttributes { + path: Some(p.to_string()), + verb: Some(params.verb.to_string()), + }; + + debug!( + message = "Calling SubjectAccessReview API for nonResourceURL", + user = ?check_user, + groups = ?check_groups, + path = %p, + verb = %params.verb + ); + + SubjectAccessReview { + spec: SubjectAccessReviewSpec { + non_resource_attributes: Some(non_resource_attrs), + user: check_user.clone(), + groups: check_groups.clone(), + ..Default::default() + }, + ..Default::default() + } + } + (None, Some(r)) => { + // Resource-based authorization + let resource_attrs = ResourceAttributes { + group: Some(params.resource_group.unwrap_or("").to_string()), + resource: Some(r.to_string()), + verb: Some(params.verb.to_string()), + namespace: params.namespace.clone(), + ..Default::default() + }; + + debug!( + message = "Calling SubjectAccessReview API for resource", + user = ?check_user, + groups = ?check_groups, + resource = %r, + verb = %params.verb, + resource_group = %params.resource_group.unwrap_or(""), + namespace = ?params.namespace + ); + + SubjectAccessReview { + spec: SubjectAccessReviewSpec { + resource_attributes: Some(resource_attrs), + user: check_user.clone(), + groups: check_groups.clone(), + ..Default::default() + }, + ..Default::default() + } + } + _ => { + return Err("Must specify either 'path' or 'resource', not both or neither".into()); + } + }; + + // Step 3: Check if the user has the required permissions + let sar_api: Api = Api::all(client.clone()); + let sar_result = sar_api.create(&Default::default(), &sar).await?; + + let allowed = sar_result + .status + .as_ref() + .map(|s| s.allowed) + .unwrap_or(false); + + // Log the SubjectAccessReview result + if allowed { + debug!( + message = "SubjectAccessReview allowed access", + user = ?check_user, + path = ?params.path, + resource = ?params.resource, + verb = %params.verb + ); + } else { + warn!( + message = "SubjectAccessReview denied access", + user = ?check_user, + path = ?params.path, + resource = ?params.resource, + verb = %params.verb, + reason = ?sar_result.status.as_ref().and_then(|s| s.reason.as_ref()), + evaluation_error = ?sar_result.status.as_ref().and_then(|s| s.evaluation_error.as_ref()) + ); + } + + Ok(allowed) +} + +/// Extracts the Bearer token from the Authorization header. +fn extract_bearer_token(req: &Request) -> Option { + req.headers() + .get(hyper::header::AUTHORIZATION)? + .to_str() + .ok()? + .strip_prefix("Bearer ") + .map(|s| s.to_string()) +} + +fn authorized(req: &Request, auth: &Option) -> bool { if let Some(auth) = auth { let headers = req.headers(); if let Some(auth_header) = headers.get(hyper::header::AUTHORIZATION) { let encoded_credentials = match auth { - Auth::Basic { user, password } => Some(HeaderValue::from_str( + PrometheusExporterAuth::Basic { user, password } => Some(HeaderValue::from_str( format!( "Basic {}", BASE64_STANDARD.encode(format!("{}:{}", user, password.inner())) ) .as_str(), )), - Auth::Bearer { token } => Some(HeaderValue::from_str( + PrometheusExporterAuth::Bearer { token } => Some(HeaderValue::from_str( format!("Bearer {}", token.inner()).as_str(), )), - Auth::Custom { value } => Some(HeaderValue::from_str(value)), - #[cfg(feature = "aws-core")] - _ => None, + PrometheusExporterAuth::Custom { value } => Some(HeaderValue::from_str(value)), + #[cfg(feature = "kubernetes")] + PrometheusExporterAuth::Sar { .. } => { + // SubjectAccessReview is handled asynchronously in check_authorization + // This should never be reached + return false; + } }; - if let Some(Ok(encoded_credentials)) = encoded_credentials - && auth_header == encoded_credentials - { + if matches!(encoded_credentials, Some(Ok(ref creds)) if auth_header == creds) { return true; } } @@ -343,23 +688,28 @@ fn authorized(req: &Request, auth: &Option) -> bool { #[derive(Clone)] struct Handler { - auth: Option, + auth: Option, default_namespace: Option, buckets: Box<[f64]>, quantiles: Box<[f64]>, bytes_sent: Registered, events_sent: Registered, + #[cfg(feature = "kubernetes")] + kube_client: Option, } impl Handler { - fn handle( + async fn handle( &self, req: Request, metrics: &RwLock>, ) -> Response { let mut response = Response::new(Body::empty()); - match (authorized(&req, &self.auth), req.method(), req.uri().path()) { + // Check authorization - SAR takes precedence over basic auth when both token and SAR config present + let is_authorized = self.check_authorization(&req).await; + + match (is_authorized, req.method(), req.uri().path()) { (false, _, _) => { *response.status_mut() = StatusCode::UNAUTHORIZED; response.headers_mut().insert( @@ -411,6 +761,75 @@ impl Handler { response } + + async fn check_authorization(&self, req: &Request) -> bool { + // Handle SubjectAccessReview authentication + #[cfg(feature = "kubernetes")] + if let Some(PrometheusExporterAuth::Sar { + path, + resource, + verb, + resource_group, + namespace, + user, + groups, + }) = &self.auth + { + // Ensure we have a Kubernetes client + let client = match &self.kube_client { + Some(c) => c, + None => { + error!( + message = + "SubjectAccessReview configured but Kubernetes client not initialized" + ); + return false; + } + }; + + // Validate token with SubjectAccessReview + if let Some(token) = extract_bearer_token(req) { + debug!(message = "Extracted Bearer token from request"); + + match validate_token_with_sar( + client, + &token, + SarAuthParams { + verb, + path: path.as_deref(), + resource: resource.as_deref(), + resource_group: Some(resource_group.as_str()), + namespace, + user, + groups, + }, + ) + .await + { + Ok(allowed) => { + return allowed; + } + Err(e) => { + error!( + message = "Failed to validate token with SubjectAccessReview", + error = %e, + path = ?path, + resource = ?resource + ); + return false; + } + } + } else { + warn!( + message = "SubjectAccessReview configured but no Bearer token provided in Authorization header" + ); + return false; + } + } + + // Fall back to standard auth (Basic, Bearer, Custom) + authorized(req, &self.auth) + } } impl PrometheusExporter { @@ -427,6 +846,29 @@ impl PrometheusExporter { return Ok(()); } + // Create Kubernetes client if SAR authentication is configured + #[cfg(feature = "kubernetes")] + let kube_client = if matches!(self.config.auth, Some(PrometheusExporterAuth::Sar { .. })) { + match Client::try_default().await { + Ok(client) => { + info!( + message = + "Kubernetes client initialized for SubjectAccessReview authentication" + ); + Some(client) + } + Err(e) => { + error!( + message = "Failed to initialize Kubernetes client for SubjectAccessReview authentication", + error = %e + ); + return Err(Box::new(e)); + } + } + } else { + None + }; + let handler = Handler { bytes_sent: register!(BytesSent::from(Protocol::HTTP)), events_sent: register!(EventsSent::from(Output(None))), @@ -434,6 +876,8 @@ impl PrometheusExporter { buckets: self.config.buckets.clone().into(), quantiles: self.config.quantiles.clone().into(), auth: self.config.auth.clone(), + #[cfg(feature = "kubernetes")] + kube_client, }; let span = Span::current(); @@ -445,9 +889,13 @@ impl PrometheusExporter { let handler = handler.clone(); let inner = service_fn(move |req| { - let response = handler.handle(req, &metrics); + let handler = handler.clone(); + let metrics = Arc::clone(&metrics); - future::ok::<_, Infallible>(response) + async move { + let response = handler.handle(req, &metrics).await; + Ok::<_, Infallible>(response) + } }); let service = ServiceBuilder::new() @@ -683,7 +1131,7 @@ mod tests { let (name2, event2) = tests::create_metric_set(None, vec!["0", "1", "2"]); let events = vec![event1, event2]; - let auth_config = Auth::Basic { + let auth_config = PrometheusExporterAuth::Basic { user: "user".to_string(), password: SensitiveString::from("password".to_string()), }; @@ -720,7 +1168,7 @@ mod tests { let (name2, event2) = tests::create_metric_set(None, vec!["0", "1", "2"]); let events = vec![event1, event2]; - let auth_config = Auth::Bearer { + let auth_config = PrometheusExporterAuth::Bearer { token: SensitiveString::from("token".to_string()), }; @@ -756,7 +1204,7 @@ mod tests { let (_, event2) = tests::create_metric_set(None, vec!["0", "1", "2"]); let events = vec![event1, event2]; - let server_auth_config = Auth::Bearer { + let server_auth_config = PrometheusExporterAuth::Bearer { token: SensitiveString::from("token".to_string()), }; @@ -773,11 +1221,11 @@ mod tests { let (_, event2) = tests::create_metric_set(None, vec!["0", "1", "2"]); let events = vec![event1, event2]; - let server_auth_config = Auth::Bearer { + let server_auth_config = PrometheusExporterAuth::Bearer { token: SensitiveString::from("token".to_string()), }; - let client_auth_config = Auth::Basic { + let client_auth_config = PrometheusExporterAuth::Basic { user: "user".to_string(), password: SensitiveString::from("password".to_string()), }; @@ -982,8 +1430,8 @@ mod tests { } async fn export_and_fetch_with_auth( - server_auth_config: Option, - client_auth_config: Option, + server_auth_config: Option, + client_auth_config: Option, mut events: Vec, suppress_timestamp: bool, ) -> Result { @@ -1027,7 +1475,33 @@ mod tests { .expect("Error creating request."); if let Some(client_auth_config) = client_auth_config { - client_auth_config.apply(&mut request); + match client_auth_config { + PrometheusExporterAuth::Basic { user, password } => { + let credentials = format!("{}:{}", user, password.inner()); + let encoded = BASE64_STANDARD.encode(credentials.as_bytes()); + request.headers_mut().insert( + hyper::header::AUTHORIZATION, + HeaderValue::from_str(&format!("Basic {}", encoded)).unwrap(), + ); + } + PrometheusExporterAuth::Bearer { token } => { + request.headers_mut().insert( + hyper::header::AUTHORIZATION, + HeaderValue::from_str(&format!("Bearer {}", token.inner())).unwrap(), + ); + } + PrometheusExporterAuth::Custom { value } => { + request.headers_mut().insert( + hyper::header::AUTHORIZATION, + HeaderValue::from_str(&value).unwrap(), + ); + } + #[cfg(feature = "kubernetes")] + PrometheusExporterAuth::Sar { .. } => { + // SAR auth is server-side only, not used for client requests in tests + panic!("SAR auth cannot be used for client-side requests in tests"); + } + } } let proxy = ProxyConfig::default();