Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ jobs:
export AWS_LC_FIPS_SYS_NO_ASM=1
fi
# shellcheck disable=SC2046
cargo clippy --workspace --all-targets --all-features -- -D warnings
cargo clippy --workspace --all-targets --all-features -- -D warnings $([ ${{ matrix.rust_version }} = 1.84.1 ] || echo -Aclippy::manual_is_multiple_of)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion but this should probably be discussed in #libdatadog

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've done that in the past already for cases where our MSRV did not support APIs which became the only solution to satisfy a clippy lint in future. It misses a comment though, when we'll be able to drop this.

licensecheck:
runs-on: ubuntu-latest
Expand Down
27 changes: 24 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions datadog-ipc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ publish = false

[dependencies]
anyhow = { version = "1.0" }
zwohash = "0.1.2"
bincode = { version = "1" }
futures = { version = "0.3", default-features = false }
io-lifetimes = { version = "1.0" }
Expand All @@ -19,6 +20,9 @@ libdd-tinybytes = { path = "../libdd-tinybytes", optional = true }


libdd-common = { path = "../libdd-common" }
libdd-ddsketch = { path = "../libdd-ddsketch" }
libdd-trace-protobuf = { path = "../libdd-trace-protobuf" }
libdd-trace-stats = { path = "../libdd-trace-stats" }
datadog-ipc-macros = { path = "../datadog-ipc-macros" }
tracing = { version = "0.1", default-features = false }

Expand Down
123 changes: 123 additions & 0 deletions datadog-ipc/src/atomic_option.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

//! Lock-free `Option<T>` with atomic take, valid for any `T` where
//! `size_of::<Option<T>>() <= 8`.

use std::cell::UnsafeCell;
use std::mem::{self, MaybeUninit};
use std::ptr;
use std::sync::atomic::{AtomicU16, AtomicU32, AtomicU64, AtomicU8, Ordering};

/// An `Option<T>` that supports lock-free atomic take.
///
/// # Constraints
/// `size_of::<Option<T>>()` must be `<= 8`. Enforced by a `debug_assert` in
/// `From<Option<T>>`). This holds for niche-optimised types (`NonNull<T>`,
/// `Box<T>`, …) and for any `Option<T>` that fits in a single machine word.
///
/// # Storage
/// The option is stored in a `UnsafeCell<Option<T>>`, giving it exactly the size
/// and alignment of `Option<T>` itself. `take()` picks the narrowest atomic that
/// covers `size_of::<Option<T>>()` bytes — `AtomicU8` for 1-byte options up to
/// `AtomicU64` for 5–8 byte options. The atomic cast is valid because
/// `align_of::<AtomicUN>() == align_of::<uN>() <= align_of::<Option<T>>()`.
///
/// # None sentinel
/// The "none" bit-pattern is computed by value (`Option::<T>::None`) rather than
/// assumed to be zero, so the implementation is correct for both niche-optimised
/// types and discriminant-based options.
///
/// `UnsafeCell` provides the interior-mutability aliasing permission required by
/// Rust's memory model when mutating through a shared reference.
pub struct AtomicOption<T>(UnsafeCell<Option<T>>);

impl<T> AtomicOption<T> {
/// Encode `val` as a `u64`, transferring ownership into the bit representation.
const fn encode(val: Option<T>) -> u64 {
let mut bits = 0u64;
unsafe {
ptr::copy_nonoverlapping(
ptr::from_ref(&val).cast::<u8>(),
ptr::from_mut(&mut bits).cast::<u8>(),
size_of::<Option<T>>(),
);
mem::forget(val);
}
bits
}

/// Atomically swap the storage with `new_bits`, returning the old bits.
#[inline]
fn atomic_swap(&self, new_bits: u64) -> u64 {
unsafe {
let ptr = self.0.get();
match size_of::<Option<T>>() {
1 => (*(ptr as *const AtomicU8)).swap(new_bits as u8, Ordering::AcqRel) as u64,
2 => (*(ptr as *const AtomicU16)).swap(new_bits as u16, Ordering::AcqRel) as u64,
3 | 4 => {
(*(ptr as *const AtomicU32)).swap(new_bits as u32, Ordering::AcqRel) as u64
}
_ => (*(ptr as *const AtomicU64)).swap(new_bits, Ordering::AcqRel),
}
}
}

/// Reconstruct an `Option<T>` from its `u64` bit representation.
///
/// # Safety
/// `bits` must hold a valid `Option<T>` bit-pattern in its low
/// `size_of::<Option<T>>()` bytes, as produced by a previous `encode`.
const unsafe fn decode(bits: u64) -> Option<T> {
let mut result = MaybeUninit::<Option<T>>::uninit();
ptr::copy_nonoverlapping(
ptr::from_ref(&bits).cast::<u8>(),
result.as_mut_ptr().cast::<u8>(),
size_of::<Option<T>>(),
);
result.assume_init()
}

/// Atomically replace the stored value with `None` and return what was there.
/// Returns `None` if the value was already taken.
pub fn take(&self) -> Option<T> {
let old = self.atomic_swap(Self::encode(None));
// SAFETY: `old` holds a valid `Option<T>` bit-pattern.
unsafe { Self::decode(old) }
}

/// Atomically store `val`, dropping any previous value.
pub fn set(&self, val: Option<T>) -> Option<T> {
let old = self.atomic_swap(Self::encode(val));
unsafe { Self::decode(old) }
}

/// Atomically store `Some(val)`, returning the previous value.
pub fn replace(&self, val: T) -> Option<T> {
self.set(Some(val))
}

/// Borrow the current value without taking it.
///
/// # Safety
/// Must not be called concurrently with [`take`], [`set`], or [`replace`].
pub unsafe fn as_option(&self) -> &Option<T> {
&*self.0.get()
}
}

impl<T> From<Option<T>> for AtomicOption<T> {
fn from(val: Option<T>) -> Self {
// we may raise this to 16 once AtomicU128 becomes stable
debug_assert!(
size_of::<Option<T>>() <= size_of::<u64>(),
"AtomicOption requires size_of::<Option<T>>() <= 8, got {}",
size_of::<Option<T>>()
);
Self(UnsafeCell::new(val))
}
}

// `AtomicOption<T>` is `Send`/`Sync` when `T: Send` — same contract as `Mutex<Option<T>>`.
unsafe impl<T: Send> Send for AtomicOption<T> {}
unsafe impl<T: Send> Sync for AtomicOption<T> {}
3 changes: 3 additions & 0 deletions datadog-ipc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@ pub mod handles;

pub mod platform;
pub mod rate_limiter;
pub mod shm_stats;

mod atomic_option;
pub mod client;
pub mod codec;
pub use atomic_option::AtomicOption;

pub use client::IpcClientConn;
#[cfg(target_os = "linux")]
Expand Down
50 changes: 38 additions & 12 deletions datadog-ipc/src/platform/mem_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@

use crate::handles::{HandlesTransport, TransferHandles};
use crate::platform::{mmap_handle, munmap_handle, OwnedFileHandle, PlatformHandle};
use crate::AtomicOption;
#[cfg(feature = "tiny-bytes")]
use libdd_tinybytes::UnderlyingBytes;
use serde::{Deserialize, Serialize};
#[cfg(target_os = "linux")]
use std::os::fd::AsRawFd;
use std::{ffi::CString, io, ptr::NonNull};

#[derive(Clone, Serialize, Deserialize, Debug)]
Expand Down Expand Up @@ -37,15 +40,16 @@ pub(crate) struct ShmPath {

pub struct NamedShmHandle {
pub(crate) inner: ShmHandle,
pub(crate) path: Option<ShmPath>,
pub(crate) path: AtomicOption<Box<ShmPath>>,
}

impl NamedShmHandle {
pub fn get_path(&self) -> &[u8] {
if let Some(ref shm_path) = &self.path {
shm_path.name.as_bytes()
} else {
b""
/// # Safety
/// Must not be called concurrently with `unlink()`.
pub unsafe fn get_path(&self) -> &[u8] {
match self.path.as_option() {
Some(shm_path) => shm_path.name.to_bytes(),
None => b"",
}
}
}
Expand Down Expand Up @@ -87,10 +91,19 @@ where
unsafe {
self.set_mapping_size(size)?;
}
nix::unistd::ftruncate(
self.get_shm().handle.as_owned_fd()?,
self.get_shm().size as libc::off_t,
let new_size = self.get_shm().size as libc::off_t;
let fd = self.get_shm().handle.as_owned_fd()?;
// Use fallocate on Linux to eagerly commit the new pages: ENOSPC at resize time is
// recoverable; a later SIGBUS mid-execution is not.
#[cfg(target_os = "linux")]
nix::fcntl::fallocate(
fd.as_raw_fd(),
nix::fcntl::FallocateFlags::empty(),
0,
new_size,
)?;
#[cfg(not(target_os = "linux"))]
nix::unistd::ftruncate(&fd, new_size)?;
Ok(())
}
/// # Safety
Expand Down Expand Up @@ -131,6 +144,16 @@ impl FileBackedHandle for NamedShmHandle {
}
}

impl MappedMem<NamedShmHandle> {
/// Unlink the backing SHM file from the filesystem so new openers get `ENOENT`.
/// Existing mappings remain valid. On Windows the mapping is managed by the OS
/// via handle reference counts and there is no filesystem entry to remove.
#[cfg(unix)]
pub fn unlink(&self) {
self.mem.unlink();
}
}

impl<T: MemoryHandle> MappedMem<T> {
pub fn as_slice(&self) -> &[u8] {
unsafe { std::slice::from_raw_parts(self.ptr.as_ptr().cast(), self.mem.get_size()) }
Expand All @@ -152,7 +175,9 @@ impl<T: MemoryHandle> AsRef<[u8]> for MappedMem<T> {
}

impl MappedMem<NamedShmHandle> {
pub fn get_path(&self) -> &[u8] {
/// # Safety
/// Must not be called concurrently with `unlink()`.
pub unsafe fn get_path(&self) -> &[u8] {
self.mem.get_path()
}
}
Expand All @@ -167,9 +192,10 @@ impl<T: FileBackedHandle> From<MappedMem<T>> for ShmHandle {
}

impl From<MappedMem<NamedShmHandle>> for NamedShmHandle {
fn from(mut handle: MappedMem<NamedShmHandle>) -> NamedShmHandle {
fn from(handle: MappedMem<NamedShmHandle>) -> NamedShmHandle {
let path = handle.mem.path.take().into();
NamedShmHandle {
path: handle.mem.path.take(),
path,
inner: handle.into(),
}
}
Expand Down
Loading
Loading