Skip to content

Commit 7652f6f

Browse files
- add CLAUDE.md to .gitignore
- disable log agent by default — require explicit DD_LOGS_ENABLED=true - log the actual error when reqwest client build() fails in start_log_agent - fail fast in start_log_agent when OPW URL is empty - apply rustfmt to integration test formatting.
1 parent eab3991 commit 7652f6f

File tree

5 files changed

+73
-127
lines changed

5 files changed

+73
-127
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
/target
22
/.idea
3+
/CLAUDE.md
34

CLAUDE.md

Lines changed: 0 additions & 102 deletions
This file was deleted.

crates/datadog-log-agent/src/constants.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,5 @@ pub const DEFAULT_SITE: &str = "datadoghq.com";
1616
/// Default flush timeout in seconds.
1717
pub const DEFAULT_FLUSH_TIMEOUT_SECS: u64 = 5;
1818

19-
/// Default zstd compression level. Valid range: 1 (fastest) to 21 (best ratio).
2019
/// Negative values enable ultra-fast modes. Level 3 is the zstd library default.
2120
pub const DEFAULT_COMPRESSION_LEVEL: i32 = 3;

crates/datadog-log-agent/tests/integration_test.rs

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,7 @@
1212
1313
#![allow(clippy::disallowed_methods)] // plain reqwest::Client is fine against local mock server
1414

15-
use datadog_log_agent::{
16-
AggregatorService, FlusherMode, LogEntry, LogFlusher, LogFlusherConfig,
17-
};
15+
use datadog_log_agent::{AggregatorService, FlusherMode, LogEntry, LogFlusher, LogFlusherConfig};
1816
use mockito::{Matcher, Server};
1917
use std::time::Duration;
2018

@@ -254,7 +252,11 @@ async fn test_max_entries_fits_in_one_batch() {
254252
handle.insert_batch(entries).expect("insert");
255253

256254
let batches = handle.get_batches().await.expect("get_batches");
257-
assert_eq!(batches.len(), 1, "exactly MAX_BATCH_ENTRIES fits in one batch");
255+
assert_eq!(
256+
batches.len(),
257+
1,
258+
"exactly MAX_BATCH_ENTRIES fits in one batch"
259+
);
258260

259261
let arr: serde_json::Value = serde_json::from_slice(&batches[0]).expect("valid JSON");
260262
assert_eq!(arr.as_array().unwrap().len(), MAX);
@@ -442,7 +444,9 @@ async fn test_opw_mode_disables_compression_regardless_of_config() {
442444

443445
let (svc, handle) = AggregatorService::new();
444446
let _task = tokio::spawn(svc.run());
445-
handle.insert_batch(vec![entry("not compressed in OPW")]).expect("insert");
447+
handle
448+
.insert_batch(vec![entry("not compressed in OPW")])
449+
.expect("insert");
446450

447451
// use_compression: true — but OPW mode overrides this to false
448452
let config = LogFlusherConfig {
@@ -457,7 +461,9 @@ async fn test_opw_mode_disables_compression_regardless_of_config() {
457461
flush_timeout: Duration::from_secs(5),
458462
};
459463

460-
let result = LogFlusher::new(config, build_client(), handle).flush().await;
464+
let result = LogFlusher::new(config, build_client(), handle)
465+
.flush()
466+
.await;
461467

462468
assert!(result);
463469
mock.assert_async().await;
@@ -485,7 +491,9 @@ async fn test_retry_on_500_succeeds_on_second_attempt() {
485491

486492
let (svc, handle) = AggregatorService::new();
487493
let _task = tokio::spawn(svc.run());
488-
handle.insert_batch(vec![entry("retry me")]).expect("insert");
494+
handle
495+
.insert_batch(vec![entry("retry me")])
496+
.expect("insert");
489497

490498
let result = LogFlusher::new(opw_config(&server.url()), build_client(), handle)
491499
.flush()
@@ -507,7 +515,9 @@ async fn test_permanent_error_on_403_no_retry() {
507515

508516
let (svc, handle) = AggregatorService::new();
509517
let _task = tokio::spawn(svc.run());
510-
handle.insert_batch(vec![entry("forbidden")]).expect("insert");
518+
handle
519+
.insert_batch(vec![entry("forbidden")])
520+
.expect("insert");
511521

512522
let result = LogFlusher::new(opw_config(&server.url()), build_client(), handle)
513523
.flush()
@@ -530,7 +540,9 @@ async fn test_exhausted_retries_returns_false() {
530540

531541
let (svc, handle) = AggregatorService::new();
532542
let _task = tokio::spawn(svc.run());
533-
handle.insert_batch(vec![entry("keep failing")]).expect("insert");
543+
handle
544+
.insert_batch(vec![entry("keep failing")])
545+
.expect("insert");
534546

535547
let result = LogFlusher::new(opw_config(&server.url()), build_client(), handle)
536548
.flush()
@@ -564,7 +576,9 @@ async fn test_additional_endpoints_receive_same_batch() {
564576

565577
let (svc, handle) = AggregatorService::new();
566578
let _task = tokio::spawn(svc.run());
567-
handle.insert_batch(vec![entry("multi-endpoint")]).expect("insert");
579+
handle
580+
.insert_batch(vec![entry("multi-endpoint")])
581+
.expect("insert");
568582

569583
let config = LogFlusherConfig {
570584
api_key: "test-api-key".to_string(),
@@ -578,7 +592,9 @@ async fn test_additional_endpoints_receive_same_batch() {
578592
flush_timeout: Duration::from_secs(5),
579593
};
580594

581-
let result = LogFlusher::new(config, build_client(), handle).flush().await;
595+
let result = LogFlusher::new(config, build_client(), handle)
596+
.flush()
597+
.await;
582598

583599
assert!(result);
584600
primary_mock.assert_async().await;
@@ -620,7 +636,9 @@ async fn test_additional_endpoint_failure_does_not_affect_return_value() {
620636
flush_timeout: Duration::from_secs(5),
621637
};
622638

623-
let result = LogFlusher::new(config, build_client(), handle).flush().await;
639+
let result = LogFlusher::new(config, build_client(), handle)
640+
.flush()
641+
.await;
624642

625643
assert!(
626644
result,

crates/datadog-serverless-compat/src/main.rs

Lines changed: 42 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ use libdd_trace_utils::{config_utils::read_cloud_env, trace_utils::EnvironmentTy
2727

2828
use datadog_fips::reqwest_adapter::create_reqwest_client_builder;
2929
use datadog_log_agent::{
30-
AggregatorHandle as LogAggregatorHandle, AggregatorService as LogAggregatorService, LogFlusher,
31-
LogFlusherConfig,
30+
AggregatorHandle as LogAggregatorHandle, AggregatorService as LogAggregatorService,
31+
FlusherMode as LogFlusherMode, LogFlusher, LogFlusherConfig,
3232
};
3333
use dogstatsd::{
3434
aggregator::{AggregatorHandle, AggregatorService},
@@ -112,8 +112,8 @@ pub async fn main() {
112112
.or_else(|_| env::var("HTTPS_PROXY"))
113113
.ok();
114114
let dd_logs_enabled = env::var("DD_LOGS_ENABLED")
115-
.map(|val| val.to_lowercase() != "false")
116-
.unwrap_or(true);
115+
.map(|val| val.to_lowercase() == "true")
116+
.unwrap_or(false);
117117
debug!("Starting serverless trace mini agent");
118118

119119
let env_filter = format!("h2=off,hyper=off,rustls=off,{}", log_level);
@@ -360,21 +360,30 @@ fn start_log_agent(dd_api_key: Option<String>, https_proxy: Option<String>) -> O
360360
Err(e) => error!("invalid HTTPS proxy for log agent: {e}"),
361361
}
362362
}
363-
builder.build().ok()
363+
match builder.build() {
364+
Ok(c) => Some(c),
365+
Err(e) => {
366+
error!("failed to build HTTP client for log agent: {e}");
367+
None
368+
}
369+
}
364370
});
365371

366-
let client = match client {
367-
Some(c) => c,
368-
None => {
369-
error!("failed to build HTTP client for log agent, log flushing disabled");
370-
return None;
371-
}
372-
};
372+
let client = client?; // error already logged above
373373

374374
let config = LogFlusherConfig {
375375
api_key,
376376
..LogFlusherConfig::from_env()
377377
};
378+
379+
// Fail fast: OPW mode with an empty URL will always produce a network error at flush time.
380+
if let LogFlusherMode::ObservabilityPipelinesWorker { url } = &config.mode {
381+
if url.is_empty() {
382+
error!("OPW mode enabled but DD_OBSERVABILITY_PIPELINES_WORKER_LOGS_URL is empty — log agent disabled");
383+
return None;
384+
}
385+
}
386+
378387
Some(LogFlusher::new(config, client, handle))
379388
}
380389

@@ -412,6 +421,27 @@ mod log_agent_integration_tests {
412421
handle.shutdown().expect("shutdown");
413422
}
414423

424+
/// start_log_agent must reject OPW mode with an empty URL.
425+
#[test]
426+
fn test_opw_empty_url_is_detected() {
427+
let config = LogFlusherConfig {
428+
api_key: "key".to_string(),
429+
site: "datadoghq.com".to_string(),
430+
mode: FlusherMode::ObservabilityPipelinesWorker { url: String::new() },
431+
additional_endpoints: Vec::new(),
432+
use_compression: false,
433+
compression_level: 3,
434+
flush_timeout: std::time::Duration::from_secs(5),
435+
};
436+
assert!(
437+
matches!(
438+
&config.mode,
439+
FlusherMode::ObservabilityPipelinesWorker { url } if url.is_empty()
440+
),
441+
"should detect empty OPW URL"
442+
);
443+
}
444+
415445
/// Verify the LogFlusher struct is constructible from within this crate — compile-time test.
416446
#[allow(dead_code)]
417447
fn _assert_log_flusher_constructible() {

0 commit comments

Comments
 (0)