Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 33 additions & 7 deletions crates/datadog-serverless-compat/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ use zstd::zstd_safe::CompressionLevel;

use datadog_trace_agent::{
aggregator::TraceAggregator,
config, env_verifier, mini_agent, proxy_flusher, stats_flusher, stats_processor,
config, env_verifier, mini_agent, proxy_flusher, stats_concentrator_service, stats_flusher,
stats_generator, stats_processor,
trace_flusher::{self, TraceFlusher},
trace_processor,
};
Expand Down Expand Up @@ -119,6 +120,12 @@ pub async fn main() {
.ok()
.and_then(|v| v.parse::<u16>().ok())
.unwrap_or(DEFAULT_LOG_INTAKE_PORT);

let dd_serverless_stats_computation_enabled =
env::var("DD_SERVERLESS_STATS_COMPUTATION_ENABLED")
.map(|val| val.to_lowercase() != "false")
.unwrap_or(true);

debug!("Starting serverless trace mini agent");

let env_filter = format!("h2=off,hyper=off,rustls=off,{}", log_level);
Expand All @@ -144,11 +151,6 @@ pub async fn main() {

let env_verifier = Arc::new(env_verifier::ServerlessEnvVerifier::default());

let trace_processor = Arc::new(trace_processor::ServerlessTraceProcessor {});

let stats_flusher = Arc::new(stats_flusher::ServerlessStatsFlusher {});
let stats_processor = Arc::new(stats_processor::ServerlessStatsProcessor {});

let config = match config::Config::new() {
Ok(c) => Arc::new(c),
Err(e) => {
Expand All @@ -157,6 +159,29 @@ pub async fn main() {
}
};

let (stats_concentrator_handle, stats_generator) = if dd_serverless_stats_computation_enabled {
info!("serverless stats computation enabled");
let (service, handle) =
stats_concentrator_service::StatsConcentratorService::new(config.clone());
tokio::spawn(service.run());
(
Some(handle.clone()),
Some(Arc::new(stats_generator::StatsGenerator::new(handle))),
)
} else {
info!("serverless stats computation disabled");
(None, None)
};

let trace_processor = Arc::new(trace_processor::ServerlessTraceProcessor {
stats_generator: stats_generator.clone(),
});

let stats_flusher = Arc::new(stats_flusher::ServerlessStatsFlusher {
stats_concentrator: stats_concentrator_handle.clone(),
});
let stats_processor = Arc::new(stats_processor::ServerlessStatsProcessor {});

let trace_aggregator = Arc::new(TokioMutex::new(TraceAggregator::default()));
let trace_flusher = Arc::new(trace_flusher::ServerlessTraceFlusher::new(
trace_aggregator,
Expand All @@ -175,8 +200,9 @@ pub async fn main() {
proxy_flusher,
});

let (_shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
tokio::spawn(async move {
let res = mini_agent.start_mini_agent().await;
let res = mini_agent.start_mini_agent(shutdown_rx).await;
if let Err(e) = res {
error!("Error when starting serverless trace mini agent: {e:?}");
}
Expand Down
7 changes: 6 additions & 1 deletion crates/datadog-trace-agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,19 @@ async-trait = "0.1.64"
tracing = { version = "0.1", default-features = false }
serde = { version = "1.0.145", features = ["derive"] }
serde_json = "1.0"
thiserror = { version = "1.0.58", default-features = false }
libdd-common = { git = "https://github.com/DataDog/libdatadog", rev = "8c88979985154d6d97c0fc2ca9039682981eacad" }
libdd-trace-protobuf = { git = "https://github.com/DataDog/libdatadog", rev = "8c88979985154d6d97c0fc2ca9039682981eacad" }
libdd-trace-stats = { git = "https://github.com/DataDog/libdatadog", rev = "8c88979985154d6d97c0fc2ca9039682981eacad" }
libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "8c88979985154d6d97c0fc2ca9039682981eacad", features = [
"mini_agent",
] }
libdd-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "8c88979985154d6d97c0fc2ca9039682981eacad" }
datadog-fips = { path = "../datadog-fips" }
reqwest = { version = "0.12.23", features = ["json", "http2"], default-features = false }
reqwest = { version = "0.12.23", features = [
"json",
"http2",
], default-features = false }
bytes = "1.10.1"

[dev-dependencies]
Expand Down
6 changes: 6 additions & 0 deletions crates/datadog-trace-agent/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ pub struct Config {
/// timeout for environment verification, in milliseconds
pub verify_env_timeout_ms: u64,
pub proxy_url: Option<String>,
pub service: Option<String>,
pub env: Option<String>,
}

impl Config {
Expand Down Expand Up @@ -251,6 +253,8 @@ impl Config {
.or_else(|_| env::var("HTTPS_PROXY"))
.ok(),
tags,
service: env::var("DD_SERVICE").ok(),
env: env::var("DD_ENV").ok(),
})
}
}
Expand Down Expand Up @@ -721,6 +725,8 @@ pub mod test_helpers {
proxy_request_retry_backoff_base_ms: 100,
verify_env_timeout_ms: 1000,
proxy_url: None,
service: None,
env: None,
}
}
}
2 changes: 2 additions & 0 deletions crates/datadog-trace-agent/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ pub mod env_verifier;
pub mod http_utils;
pub mod mini_agent;
pub mod proxy_flusher;
pub mod stats_concentrator_service;
pub mod stats_flusher;
pub mod stats_generator;
pub mod stats_processor;
pub mod trace_flusher;
pub mod trace_processor;
25 changes: 14 additions & 11 deletions crates/datadog-trace-agent/src/mini_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tracing::{debug, error, warn};
use tracing::{debug, error};

use crate::http_utils::{log_and_create_http_response, verify_request_content_length};
use crate::proxy_flusher::{ProxyFlusher, ProxyRequest};
Expand Down Expand Up @@ -50,7 +50,10 @@ pub struct MiniAgent {
}

impl MiniAgent {
pub async fn start_mini_agent(&self) -> Result<(), Box<dyn std::error::Error>> {
pub async fn start_mini_agent(
&self,
shutdown_rx: tokio::sync::oneshot::Receiver<()>,
) -> Result<(), Box<dyn std::error::Error>> {
let now = Instant::now();

// verify we are in a serverless function environment. if not, shut down the mini agent.
Expand Down Expand Up @@ -93,7 +96,7 @@ impl MiniAgent {
let stats_config = self.config.clone();
let stats_flusher_handle = tokio::spawn(async move {
stats_flusher
.start_stats_flusher(stats_config, stats_rx)
.start_stats_flusher(stats_config, stats_rx, shutdown_rx)
.await;
});

Expand Down Expand Up @@ -191,14 +194,14 @@ impl MiniAgent {
let sentinel = std::path::Path::new(LAMBDA_LITE_SENTINEL_PATH);
// SAFETY: LAMBDA_LITE_SENTINEL_PATH is a hard-coded absolute path,
// so .parent() always returns Some.
if let Some(parent) = sentinel.parent() {
if let Err(e) = tokio::fs::create_dir_all(parent).await {
error!(
"Could not create parent directory for Lambda Lite sentinel \
if let Some(parent) = sentinel.parent()
&& let Err(e) = tokio::fs::create_dir_all(parent).await
{
error!(
"Could not create parent directory for Lambda Lite sentinel \
file at {}: {}.",
LAMBDA_LITE_SENTINEL_PATH, e
);
}
LAMBDA_LITE_SENTINEL_PATH, e
);
}
if let Err(e) = tokio::fs::write(sentinel, b"").await {
error!(
Expand Down Expand Up @@ -521,7 +524,7 @@ impl MiniAgent {
INFO_ENDPOINT_PATH,
PROFILING_ENDPOINT_PATH
],
"client_drop_p0s": true,
"client_drop_p0s": false,
"config": config_json
}
);
Expand Down
Loading
Loading