Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions libdd-data-pipeline/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ sha2 = "0.10"
either = "1.13.0"
tokio = { version = "1.23", features = [
"rt",
"sync",
"time",
], default-features = false }
uuid = { version = "1.10.0", features = ["v4"] }
tokio-util = "0.7.11"
Expand Down Expand Up @@ -53,9 +55,14 @@ uuid = { version = "1", features = ["js"] }
[lib]
bench = false

[[bench]]
name = "trace_buffer"
harness = false

[dev-dependencies]
libdd-capabilities-impl = { version = "0.1.0", path = "../libdd-capabilities-impl" }
libdd-log = { path = "../libdd-log" }
libdd-shared-runtime = { version = "0.1.0", path = "../libdd-shared-runtime" }
clap = { version = "4.0", features = ["derive"] }
criterion = "0.5.1"
libdd-trace-utils = { path = "../libdd-trace-utils", features = [
Expand Down
99 changes: 99 additions & 0 deletions libdd-data-pipeline/benches/trace_buffer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;

use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use libdd_data_pipeline::trace_buffer::{Export, TraceBuffer, TraceBufferConfig, TraceChunk};
use libdd_data_pipeline::trace_exporter::{
agent_response::AgentResponse, error::TraceExporterError,
};
use libdd_shared_runtime::SharedRuntime;

type Span = [u8; 100];

// Number of chunks each sender thread sends per benchmark iteration.
const CHUNKS_PER_SENDER: usize = 90_000;

// Simulates async IO by sleeping 2ms per export batch.
#[derive(Debug)]
struct SleepExport;

impl Export<Span> for SleepExport {
fn export_trace_chunks(
&mut self,
_trace_chunks: Vec<TraceChunk<Span>>,
) -> Pin<
Box<
dyn std::future::Future<Output = Result<AgentResponse, TraceExporterError>> + Send + '_,
>,
> {
Box::pin(async {
tokio::time::sleep(Duration::from_millis(2)).await;
Ok(AgentResponse::Unchanged)
})
}
}

fn setup_buffer() -> (Arc<SharedRuntime>, Arc<TraceBuffer<Span>>) {
let rt = Arc::new(SharedRuntime::new().expect("SharedRuntime::new"));
let cfg = TraceBufferConfig::new()
.max_buffered_spans(400_000)
.span_flush_threshold(50_000)
.max_flush_interval(Duration::from_secs(2));
let (buf, worker) = TraceBuffer::new(cfg, Box::new(|_| {}), Box::new(SleepExport));
rt.spawn_worker(worker).expect("spawn_worker");
(rt, Arc::new(buf))
}

fn bench_trace_buffer(c: &mut Criterion) {
let mut group = c.benchmark_group("trace_buffer");

// (label, inter-send delay)
let workloads: &[(&str, Option<Duration>)] = &[
("no_delay", None),
("1us_delay", Some(Duration::from_micros(1))),
("10us_delay", Some(Duration::from_micros(100))),
];

for &(delay_label, delay) in workloads {
for num_senders in [1_usize, 2, 4, 8] {
let (rt, sender) = setup_buffer();

group.throughput(Throughput::Elements(
(num_senders * CHUNKS_PER_SENDER) as u64,
));

group.bench_function(
BenchmarkId::new(format!("{}_senders", num_senders), delay_label),
|b| {
b.iter(|| {
std::thread::scope(|s| {
for _ in 0..num_senders {
let sender = sender.clone();
s.spawn(move || {
for _ in 0..CHUNKS_PER_SENDER {
// BatchFull errors are expected under high load.
let _ = sender.send_chunk(vec![[0u8; 100]]);
if let Some(d) = delay {
std::thread::sleep(d);
}
}
});
}
});
});
},
);

rt.shutdown(None).expect("runtime shutdown");
}
}

group.finish();
}

criterion_group!(benches, bench_trace_buffer);
criterion_main!(benches);
1 change: 1 addition & 0 deletions libdd-data-pipeline/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ mod health_metrics;
pub(crate) mod otlp;
#[cfg(feature = "telemetry")]
pub(crate) mod telemetry;
pub mod trace_buffer;
#[allow(missing_docs)]
pub mod trace_exporter;
Loading
Loading