diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index 1f51678111f..3104b943436 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -154,6 +154,7 @@ concurrent-queue,https://github.com/smol-rs/concurrent-queue,Apache-2.0 OR MIT," console,https://github.com/console-rs/console,MIT,The console Authors console-api,https://github.com/tokio-rs/console,MIT,"Eliza Weisman , Tokio Contributors " console-subscriber,https://github.com/tokio-rs/console,MIT,"Eliza Weisman , Tokio Contributors " +const-hex,https://github.com/danipopes/const-hex,MIT OR Apache-2.0,DaniPopes <57450786+DaniPopes@users.noreply.github.com> const-oid,https://github.com/RustCrypto/formats/tree/master/const-oid,Apache-2.0 OR MIT,RustCrypto Developers const-random,https://github.com/tkaitchuck/constrandom,MIT OR Apache-2.0,Tom Kaitchuck const-random-macro,https://github.com/tkaitchuck/constrandom,MIT OR Apache-2.0,Tom Kaitchuck @@ -545,6 +546,7 @@ proc-macro2-diagnostics,https://github.com/SergioBenitez/proc-macro2-diagnostics procfs,https://github.com/eminence/procfs,MIT OR Apache-2.0,Andrew Chin procfs-core,https://github.com/eminence/procfs,MIT OR Apache-2.0,Andrew Chin prometheus,https://github.com/tikv/rust-prometheus,Apache-2.0,"overvenus@gmail.com, siddontang@gmail.com, vistaswx@gmail.com" +proptest,https://github.com/proptest-rs/proptest,MIT OR Apache-2.0,Jason Lingle prost,https://github.com/tokio-rs/prost,Apache-2.0,"Dan Burkert , Lucio Franco , Casper Meijn , Tokio Contributors " prost-build,https://github.com/tokio-rs/prost,Apache-2.0,"Dan Burkert , Lucio Franco , Casper Meijn , Tokio Contributors " prost-derive,https://github.com/tokio-rs/prost,Apache-2.0,"Dan Burkert , Lucio Franco , Casper Meijn , Tokio Contributors " diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index d099964a383..89eefe2bed7 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -2071,6 +2071,18 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "const-hex" +version = "1.18.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "531185e432bb31db1ecda541e9e7ab21468d4d844ad7505e0546a49b4945d49b" +dependencies = [ + "cfg-if", + "cpufeatures 0.2.17", + "proptest", + "serde_core", +] + [[package]] name = "const-oid" version = "0.9.6" @@ -5664,6 +5676,7 @@ dependencies = [ "opentelemetry_sdk", "prost 0.14.3", "reqwest", + "serde_json", "thiserror 2.0.18", "tokio", "tonic 0.14.5", @@ -5676,9 +5689,13 @@ version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7175df06de5eaee9909d4805a3d07e28bb752c34cab57fa9cff549da596b30f" dependencies = [ + "base64 0.22.1", + "const-hex", "opentelemetry", "opentelemetry_sdk", "prost 0.14.3", + "serde", + "serde_json", "tonic 0.14.5", "tonic-prost", ] diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index d252652b27f..9242390d898 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -169,7 +169,7 @@ openssl-probe = "0.1" opentelemetry = "0.31" opentelemetry-appender-tracing = "0.31" opentelemetry_sdk = { version = "0.31", features = ["rt-tokio"] } -opentelemetry-otlp = { version = "0.31", features = ["grpc-tonic"] } +opentelemetry-otlp = { version = "0.31", features = ["grpc-tonic", "http-json"] } ouroboros = "0.18" parquet = { version = "57", default-features = false, features = ["arrow", "zstd", "snap", "variant_experimental"] } percent-encoding = "2.3" diff --git a/quickwit/quickwit-cli/src/logger.rs b/quickwit/quickwit-cli/src/logger.rs index f64961822f3..6ce2276dbad 100644 --- a/quickwit/quickwit-cli/src/logger.rs +++ b/quickwit/quickwit-cli/src/logger.rs @@ -19,6 +19,7 @@ use anyhow::Context; use opentelemetry::trace::TracerProvider; use opentelemetry::{KeyValue, global}; use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; +use opentelemetry_otlp::{Protocol as OtlpWireProtocol, WithExportConfig}; use opentelemetry_sdk::logs::SdkLoggerProvider; use opentelemetry_sdk::propagation::TraceContextPropagator; use opentelemetry_sdk::trace::{BatchConfigBuilder, SdkTracerProvider}; @@ -39,6 +40,54 @@ use tracing_subscriber::prelude::*; use tracing_subscriber::registry::LookupSpan; use crate::QW_ENABLE_OPENTELEMETRY_OTLP_EXPORTER_ENV_KEY; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum OtlpProtocol { + Grpc, + HttpProtobuf, + HttpJson, +} + +fn parse_otlp_protocol(protocol_str: &str) -> anyhow::Result { + const OTLP_PROTOCOL_GRPC: &str = "grpc"; + const OTLP_PROTOCOL_HTTP_PROTOBUF: &str = "http/protobuf"; + const OTLP_PROTOCOL_HTTP_JSON: &str = "http/json"; + + match protocol_str { + OTLP_PROTOCOL_GRPC => Ok(OtlpProtocol::Grpc), + OTLP_PROTOCOL_HTTP_PROTOBUF => Ok(OtlpProtocol::HttpProtobuf), + OTLP_PROTOCOL_HTTP_JSON => Ok(OtlpProtocol::HttpJson), + other => anyhow::bail!( + "unsupported OTLP protocol `{other}`, supported values are `{OTLP_PROTOCOL_GRPC}`, \ + `{OTLP_PROTOCOL_HTTP_PROTOBUF}` and `{OTLP_PROTOCOL_HTTP_JSON}`" + ), + } +} + +/// Resolves the OTLP protocol from candidates in priority order, defaulting to gRPC. +fn resolve_otlp_protocol(candidates: &[Option<&str>]) -> anyhow::Result { + match candidates.iter().flatten().next() { + Some(protocol_str) => parse_otlp_protocol(protocol_str), + None => Ok(OtlpProtocol::Grpc), + } +} + +macro_rules! build_otlp_exporter { + ($builder:expr, $protocol:expr) => { + match $protocol { + OtlpProtocol::Grpc => $builder.with_tonic().build(), + OtlpProtocol::HttpProtobuf => $builder + .with_http() + .with_protocol(OtlpWireProtocol::HttpBinary) + .build(), + OtlpProtocol::HttpJson => $builder + .with_http() + .with_protocol(OtlpWireProtocol::HttpJson) + .build(), + } + }; +} + #[cfg(feature = "tokio-console")] use crate::QW_ENABLE_TOKIO_CONSOLE_ENV_KEY; @@ -98,10 +147,16 @@ pub fn setup_logging_and_tracing( // Note on disabling ANSI characters: setting the ansi boolean on event format is insufficient. // It is thus set on layers, see https://github.com/tokio-rs/tracing/issues/1817 let provider_opt = if get_bool_from_env(QW_ENABLE_OPENTELEMETRY_OTLP_EXPORTER_ENV_KEY, false) { - let span_exporter = opentelemetry_otlp::SpanExporter::builder() - .with_tonic() - .build() - .context("failed to initialize OpenTelemetry OTLP exporter")?; + let global_protocol = env::var("OTEL_EXPORTER_OTLP_PROTOCOL").ok(); + let traces_protocol = resolve_otlp_protocol(&[ + env::var("OTEL_EXPORTER_OTLP_TRACES_PROTOCOL") + .ok() + .as_deref(), + global_protocol.as_deref(), + ])?; + let span_exporter = + build_otlp_exporter!(opentelemetry_otlp::SpanExporter::builder(), traces_protocol) + .context("failed to initialize OTLP traces exporter")?; let span_processor = trace::BatchSpanProcessor::builder(span_exporter) .with_batch_config( BatchConfigBuilder::default() @@ -117,10 +172,13 @@ pub fn setup_logging_and_tracing( .with_attribute(KeyValue::new("service.version", build_info.version.clone())) .build(); - let logs_exporter = opentelemetry_otlp::LogExporter::builder() - .with_tonic() - .build() - .context("failed to initialize OpenTelemetry OTLP logs")?; + let logs_protocol = resolve_otlp_protocol(&[ + env::var("OTEL_EXPORTER_OTLP_LOGS_PROTOCOL").ok().as_deref(), + global_protocol.as_deref(), + ])?; + let logs_exporter = + build_otlp_exporter!(opentelemetry_otlp::LogExporter::builder(), logs_protocol) + .context("failed to initialize OTLP logs exporter")?; let logger_provider = SdkLoggerProvider::builder() .with_resource(resource.clone()) @@ -430,6 +488,42 @@ mod tests { use super::*; + #[test] + fn test_resolve_otlp_protocol_defaults_to_grpc() { + let protocol = resolve_otlp_protocol(&[None, None]).unwrap(); + assert_eq!(protocol, OtlpProtocol::Grpc); + } + + #[test] + fn test_resolve_otlp_protocol_first_candidate_takes_priority() { + let protocol = resolve_otlp_protocol(&[Some("http/protobuf"), Some("grpc")]).unwrap(); + assert_eq!(protocol, OtlpProtocol::HttpProtobuf); + } + + #[test] + fn test_resolve_otlp_protocol_falls_back_to_later_candidate() { + let protocol = resolve_otlp_protocol(&[None, Some("http/protobuf")]).unwrap(); + assert_eq!(protocol, OtlpProtocol::HttpProtobuf); + } + + #[test] + fn test_resolve_otlp_protocol_grpc_explicit() { + let protocol = resolve_otlp_protocol(&[Some("grpc")]).unwrap(); + assert_eq!(protocol, OtlpProtocol::Grpc); + } + + #[test] + fn test_resolve_otlp_protocol_http_json_explicit() { + let protocol = resolve_otlp_protocol(&[Some("http/json")]).unwrap(); + assert_eq!(protocol, OtlpProtocol::HttpJson); + } + + #[test] + fn test_resolve_otlp_protocol_rejects_unsupported_value() { + let result = resolve_otlp_protocol(&[Some("http/xml")]); + assert!(result.is_err()); + } + /// A shared buffer writer for capturing log output in tests. #[derive(Clone, Default)] struct TestMakeWriter(Arc>>);