Skip to content

Commit 628bd2c

Browse files
committed
feat(capablities): sleep & spawn capabilities
1 parent d5fe204 commit 628bd2c

33 files changed

Lines changed: 722 additions & 480 deletions

File tree

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datadog-sidecar/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ anyhow = { version = "1.0" }
1818
arrayref = "0.3.7"
1919
priority-queue = "2.1.1"
2020
libdd-common = { path = "../libdd-common" }
21+
libdd-capabilities-impl = { path = "../libdd-capabilities-impl" }
2122
datadog-sidecar-macros = { path = "../datadog-sidecar-macros" }
2223

2324
libdd-telemetry = { path = "../libdd-telemetry", features = ["tracing"] }

datadog-sidecar/src/service/agent_info.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use datadog_ipc::platform::NamedShmHandle;
1616
use futures::future::Shared;
1717
use futures::FutureExt;
1818
use http::uri::PathAndQuery;
19-
use libdd_common::DefaultHttpClient;
19+
use libdd_capabilities_impl::NativeCapabilities;
2020
use libdd_common::{Endpoint, MutexExt};
2121
use libdd_data_pipeline::agent_info::schema::AgentInfoStruct;
2222
use libdd_data_pipeline::agent_info::{fetch_info_with_state, FetchInfoStatus};
@@ -103,7 +103,7 @@ impl AgentInfoFetcher {
103103
fetch_endpoint.url = http::Uri::from_parts(parts).unwrap();
104104
loop {
105105
let fetched =
106-
fetch_info_with_state::<DefaultHttpClient>(&fetch_endpoint, state.as_deref())
106+
fetch_info_with_state::<NativeCapabilities>(&fetch_endpoint, state.as_deref())
107107
.await;
108108
let mut complete_fut = None;
109109
{

datadog-sidecar/src/service/tracing/trace_flusher.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ use super::TraceSendData;
55
use crate::agent_remote_config::AgentRemoteConfigWriter;
66
use datadog_ipc::platform::NamedShmHandle;
77
use futures::future::join_all;
8-
use libdd_common::capabilities::HttpClientTrait;
9-
use libdd_common::{DefaultHttpClient, Endpoint, MutexExt};
8+
use libdd_capabilities_impl::{HttpClientCapability, NativeCapabilities};
9+
use libdd_common::{Endpoint, MutexExt};
1010
use libdd_trace_utils::trace_utils;
1111
use libdd_trace_utils::trace_utils::SendData;
1212
use libdd_trace_utils::trace_utils::SendDataResult;
@@ -95,7 +95,7 @@ pub(crate) struct TraceFlusher {
9595
pub(crate) min_force_drop_size_bytes: AtomicU32, // put a limit on memory usage
9696
remote_config: Mutex<AgentRemoteConfigs>,
9797
pub metrics: Mutex<TraceFlusherMetrics>,
98-
client: DefaultHttpClient,
98+
capabilities: NativeCapabilities,
9999
}
100100
impl Default for TraceFlusher {
101101
fn default() -> Self {
@@ -106,7 +106,7 @@ impl Default for TraceFlusher {
106106
min_force_drop_size_bytes: AtomicU32::new(trace_utils::MAX_PAYLOAD_SIZE as u32),
107107
remote_config: Mutex::new(Default::default()),
108108
metrics: Mutex::new(Default::default()),
109-
client: DefaultHttpClient::new_client(),
109+
capabilities: NativeCapabilities::new_client(),
110110
}
111111
}
112112
}
@@ -248,7 +248,7 @@ impl TraceFlusher {
248248

249249
async fn send_and_handle_trace(&self, send_data: SendData) {
250250
let endpoint = send_data.get_target().clone();
251-
let response = send_data.send(&self.client).await;
251+
let response = send_data.send(&self.capabilities).await;
252252
self.metrics.lock_or_panic().update(&response);
253253
match response.last_result {
254254
Ok(response) => {

libdd-capabilities-impl/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,4 @@ bytes = "1"
2020
http = "1"
2121
libdd-capabilities = { path = "../libdd-capabilities", version = "0.1.0" }
2222
libdd-common = { path = "../libdd-common", version = "3.0.2" }
23+
tokio = { version = "1", features = ["time", "rt"] }

libdd-capabilities-impl/README.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,13 @@ Native implementations of `libdd-capabilities` traits.
88

99
## Capabilities
1010

11-
- **`DefaultHttpClient`**: HTTP client backed by hyper and the `libdd-common` connector infrastructure (supports Unix sockets, HTTPS with rustls, Windows named pipes).
11+
- **`NativeHttpClient`**: HTTP client backed by hyper and the `libdd-common` connector infrastructure (supports Unix sockets, HTTPS with rustls, Windows named pipes).
12+
- **`NativeSleepCapability`**: Sleep backed by `tokio::time::sleep`.
13+
- **`NativeSpawnCapability`**: Task spawning backed by `tokio::runtime::Handle::spawn`.
1214

1315
## Types
1416

15-
- **`NativeCapabilities`**: Bundle type alias that implements all capability traits using native backends. Currently delegates to `DefaultHttpClient`; as more capability traits are added (spawn, sleep, etc.), this type will implement all of them.
17+
- **`NativeCapabilities`**: Bundle struct that implements all capability traits using native backends. Delegates to `NativeHttpClient`, `NativeSleepCapability`, and `NativeSpawnCapability`.
1618

1719
## Usage
1820

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/
22
// SPDX-License-Identifier: Apache-2.0
33

4-
//! Re-exports `DefaultHttpClient` from `libdd-common`, where it lives alongside
4+
//! Re-exports `NativeHttpClient` from `libdd-common`, where it lives alongside
55
//! the hyper infrastructure it wraps.
66
7-
pub use libdd_common::DefaultHttpClient;
7+
pub use libdd_common::NativeHttpClient;

libdd-capabilities-impl/src/lib.rs

Lines changed: 53 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,34 +8,58 @@
88
//! etc.). Leaf crates (FFI, benchmarks) pin this type as the generic parameter.
99
1010
mod http;
11+
pub mod sleep;
12+
pub mod spawn;
1113

1214
use core::future::Future;
15+
use std::time::Duration;
1316

14-
pub use http::DefaultHttpClient;
17+
pub use http::NativeHttpClient;
1518
use libdd_capabilities::http::HttpError;
16-
pub use libdd_capabilities::HttpClientTrait;
19+
pub use libdd_capabilities::HttpClientCapability;
1720
use libdd_capabilities::MaybeSend;
21+
pub use libdd_capabilities::SleepCapability;
22+
pub use libdd_capabilities::SpawnCapability;
23+
pub use sleep::NativeSleepCapability;
24+
pub use spawn::{NativeJoinHandle, NativeSpawnCapability};
1825

1926
/// Bundle struct for native platform capabilities.
2027
///
21-
/// Delegates to [`DefaultHttpClient`] for HTTP. As more capability traits are
22-
/// added (spawn, sleep, etc.), additional fields and impls are added here
23-
/// without changing the type identity — consumers see the same
24-
/// `NativeCapabilities` throughout.
28+
/// Delegates to [`NativeHttpClient`] for HTTP, [`NativeSleepCapability`] for
29+
/// sleep, and [`NativeSpawnCapability`] for task spawning.
2530
///
2631
/// Individual capability traits keep minimal per-function bounds (e.g.
27-
/// functions that only need HTTP require just `H: HttpClientTrait`, not the
32+
/// functions that only need HTTP require just `H: HttpClientCapability`, not the
2833
/// full bundle) so that native callers like the sidecar can use
29-
/// `DefaultHttpClient` directly without pulling in this bundle.
34+
/// `NativeHttpClient` directly without pulling in this bundle.
3035
#[derive(Clone, Debug)]
3136
pub struct NativeCapabilities {
32-
http: DefaultHttpClient,
37+
http: NativeHttpClient,
38+
sleep: NativeSleepCapability,
39+
spawn: NativeSpawnCapability,
3340
}
3441

35-
impl HttpClientTrait for NativeCapabilities {
42+
impl NativeCapabilities {
43+
/// Create a bundle with an explicit tokio runtime handle for spawning.
44+
///
45+
/// Prefer `new_client()` (via `HttpClientCapability`) when already inside
46+
/// a tokio context. This constructor exists for test code that owns a
47+
/// `SharedRuntime` and needs to pass its handle explicitly.
48+
pub fn new(handle: tokio::runtime::Handle) -> Self {
49+
Self {
50+
http: NativeHttpClient::new_client(),
51+
sleep: NativeSleepCapability,
52+
spawn: NativeSpawnCapability::new(handle),
53+
}
54+
}
55+
}
56+
57+
impl HttpClientCapability for NativeCapabilities {
3658
fn new_client() -> Self {
3759
Self {
38-
http: DefaultHttpClient::new_client(),
60+
http: NativeHttpClient::new_client(),
61+
sleep: NativeSleepCapability,
62+
spawn: NativeSpawnCapability::from_current(),
3963
}
4064
}
4165

@@ -46,3 +70,21 @@ impl HttpClientTrait for NativeCapabilities {
4670
self.http.request(req)
4771
}
4872
}
73+
74+
impl SleepCapability for NativeCapabilities {
75+
fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + MaybeSend {
76+
self.sleep.sleep(duration)
77+
}
78+
}
79+
80+
impl SpawnCapability for NativeCapabilities {
81+
type JoinHandle<T: MaybeSend + 'static> = NativeJoinHandle<T>;
82+
83+
fn spawn<F, T>(&self, future: F) -> NativeJoinHandle<T>
84+
where
85+
F: Future<Output = T> + MaybeSend + 'static,
86+
T: MaybeSend + 'static,
87+
{
88+
self.spawn.spawn(future)
89+
}
90+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
//! Native sleep implementation backed by `tokio::time::sleep`.
5+
6+
use core::future::Future;
7+
use std::time::Duration;
8+
9+
use libdd_capabilities::maybe_send::MaybeSend;
10+
use libdd_capabilities::sleep::SleepCapability;
11+
12+
#[derive(Clone, Debug)]
13+
pub struct NativeSleepCapability;
14+
15+
impl SleepCapability for NativeSleepCapability {
16+
fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + MaybeSend {
17+
tokio::time::sleep(duration)
18+
}
19+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
//! Native spawn implementation backed by `tokio::runtime::Handle::spawn`.
5+
6+
use core::future::Future;
7+
use core::pin::Pin;
8+
use core::task::{Context, Poll};
9+
10+
use libdd_capabilities::maybe_send::MaybeSend;
11+
use libdd_capabilities::spawn::SpawnCapability;
12+
use tokio::task::JoinHandle;
13+
14+
#[derive(Clone, Debug)]
15+
pub struct NativeSpawnCapability {
16+
handle: tokio::runtime::Handle,
17+
}
18+
19+
impl NativeSpawnCapability {
20+
pub fn new(handle: tokio::runtime::Handle) -> Self {
21+
Self { handle }
22+
}
23+
24+
pub fn from_current() -> Self {
25+
Self {
26+
handle: tokio::runtime::Handle::current(),
27+
}
28+
}
29+
}
30+
31+
impl SpawnCapability for NativeSpawnCapability {
32+
type JoinHandle<T: MaybeSend + 'static> = NativeJoinHandle<T>;
33+
34+
fn spawn<F, T>(&self, future: F) -> NativeJoinHandle<T>
35+
where
36+
F: Future<Output = T> + MaybeSend + 'static,
37+
T: MaybeSend + 'static,
38+
{
39+
NativeJoinHandle(self.handle.spawn(future))
40+
}
41+
}
42+
43+
/// Newtype wrapping `tokio::task::JoinHandle<T>` that normalises the output to
44+
/// `T` instead of `Result<T, JoinError>`.
45+
///
46+
/// A `JoinError` means the spawned task panicked or was aborted. Workers use
47+
/// `CancellationToken` for graceful shutdown, so `JoinError` indicates a bug.
48+
pub struct NativeJoinHandle<T>(JoinHandle<T>);
49+
50+
impl<T> Future for NativeJoinHandle<T> {
51+
type Output = T;
52+
53+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
54+
// JoinHandle<T>: Unpin, so Pin::new is safe.
55+
match Pin::new(&mut self.get_mut().0).poll(cx) {
56+
Poll::Ready(Ok(val)) => Poll::Ready(val),
57+
Poll::Ready(Err(e)) => panic!("spawned task failed: {e}"),
58+
Poll::Pending => Poll::Pending,
59+
}
60+
}
61+
}

0 commit comments

Comments
 (0)