Skip to content

Commit bde0cea

Browse files
committed
feat: message writter with O_DSYNC | O_DIRECT
1 parent 50fe8ec commit bde0cea

29 files changed

Lines changed: 1804 additions & 59 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/bench/src/actors/producer/benchmark_producer.rs

100644100755
File mode changed.

core/common/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,3 +75,4 @@ nix = { workspace = true }
7575

7676
[dev-dependencies]
7777
serial_test = { workspace = true }
78+
tempfile = { workspace = true }

core/common/src/alloc/buffer.rs

Lines changed: 161 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use compio::buf::{IoBuf, IoBufMut, SetLen};
2424
use std::{
2525
mem::MaybeUninit,
2626
ops::{Deref, DerefMut},
27+
sync::Arc,
2728
};
2829

2930
/// A buffer wrapper that participates in memory pooling.
@@ -265,10 +266,9 @@ impl PooledBuffer {
265266
/// After calling this method, the PooledBuffer becomes empty and will not
266267
/// return memory to the pool on drop (the frozen Bytes owns the allocation).
267268
/// The returned `Bytes` is Arc-backed, allowing cheap clones.
268-
pub fn freeze(&mut self) -> Bytes {
269+
pub fn freeze_to_bytes(&mut self) -> Bytes {
269270
let buf = std::mem::replace(&mut self.inner, AlignedBuffer::new(ALIGNMENT));
270271

271-
// Update pool accounting
272272
if self.from_pool
273273
&& let Some(bucket_idx) = self.original_bucket_idx
274274
{
@@ -278,10 +278,37 @@ impl PooledBuffer {
278278
self.original_capacity = 0;
279279
self.original_bucket_idx = None;
280280

281-
// Zero copy: Bytes takes ownership of the AlignedBuffer
282-
// and will drop it when refcount reaches zero
283281
Bytes::from_owner(buf)
284282
}
283+
284+
pub fn freeze(&mut self) -> FrozenPooledBuffer {
285+
let buf = std::mem::replace(&mut self.inner, AlignedBuffer::new(ALIGNMENT));
286+
let len = buf.len();
287+
288+
// Transfer pool metadata to frozen buffer
289+
let pool_meta = if self.from_pool {
290+
Some(PoolMeta {
291+
original_capacity: self.original_capacity,
292+
original_bucket_idx: self.original_bucket_idx,
293+
})
294+
} else {
295+
None
296+
};
297+
298+
// Reset self, pool accounting now lives in FrozenPooledBuffer
299+
self.from_pool = false;
300+
self.original_capacity = 0;
301+
self.original_bucket_idx = None;
302+
303+
FrozenPooledBuffer {
304+
inner: Arc::new(FrozenInner {
305+
buffer: buf,
306+
pool_meta,
307+
}),
308+
offset: 0,
309+
len,
310+
}
311+
}
285312
}
286313

287314
impl Deref for PooledBuffer {
@@ -346,3 +373,133 @@ impl IoBufMut for PooledBuffer {
346373
unsafe { std::slice::from_raw_parts_mut(ptr, cap) }
347374
}
348375
}
376+
377+
#[derive(Debug, Clone)]
378+
struct PoolMeta {
379+
original_capacity: usize,
380+
original_bucket_idx: Option<usize>,
381+
}
382+
383+
#[derive(Debug)]
384+
struct FrozenInner {
385+
buffer: AlignedBuffer,
386+
pool_meta: Option<PoolMeta>,
387+
}
388+
389+
impl Drop for FrozenInner {
390+
fn drop(&mut self) {
391+
if let Some(ref meta) = self.pool_meta {
392+
let buf = std::mem::replace(&mut self.buffer, AlignedBuffer::new(ALIGNMENT));
393+
buf.return_to_pool(meta.original_capacity, true);
394+
}
395+
}
396+
}
397+
398+
#[derive(Clone, Debug)]
399+
pub struct FrozenPooledBuffer {
400+
inner: Arc<FrozenInner>,
401+
offset: usize,
402+
len: usize,
403+
}
404+
405+
impl FrozenPooledBuffer {
406+
/// Try to reclaim the underlying `PooledBuffer` without copying.
407+
///
408+
/// Success when:
409+
/// 1. This is the sole `Arc` reference (refcount == 1)
410+
/// 2. This view covers the entire buffer (not a sub-slice)
411+
///
412+
///
413+
/// On success, pool accounting is transferred back to the returned `PooledBuffer`.
414+
/// On failure, `self` is return unchanged
415+
pub fn thaw(self) -> Result<PooledBuffer, FrozenPooledBuffer> {
416+
// Sub-slice views can't reclaim the whole buffer
417+
if self.offset != 0 || self.len != self.inner.buffer.len() {
418+
return Err(self);
419+
}
420+
421+
match Arc::try_unwrap(self.inner) {
422+
Ok(mut frozen_inner) => {
423+
let buffer =
424+
std::mem::replace(&mut frozen_inner.buffer, AlignedBuffer::new(ALIGNMENT));
425+
426+
// Extract pool metadata and prevent FrozenInner::drop from returning the buffer to
427+
// the pool -> we are taking ownership
428+
let pool_meta = frozen_inner.pool_meta.take();
429+
let (from_pool, original_capacity, original_bucket_idx) = match pool_meta {
430+
Some(meta) => (true, meta.original_capacity, meta.original_bucket_idx),
431+
None => (false, buffer.capacity(), None),
432+
};
433+
434+
Ok(PooledBuffer {
435+
from_pool,
436+
original_capacity,
437+
original_bucket_idx,
438+
inner: buffer,
439+
})
440+
}
441+
Err(arc) => Err(FrozenPooledBuffer {
442+
inner: arc,
443+
offset: self.offset,
444+
len: self.len,
445+
}),
446+
}
447+
}
448+
449+
/// Create a subslice view. Try to be cheap.
450+
/// Panics if the range is out of bounds
451+
pub fn slice(&self, range: std::ops::Range<usize>) -> FrozenPooledBuffer {
452+
assert!(
453+
range.end <= self.len,
454+
"slice out of bounds: {}..{} but len is {}",
455+
range.start,
456+
range.end,
457+
self.len
458+
);
459+
460+
FrozenPooledBuffer {
461+
inner: Arc::clone(&self.inner),
462+
offset: self.offset + range.start,
463+
len: range.end - range.start,
464+
}
465+
}
466+
467+
pub fn len(&self) -> usize {
468+
self.len
469+
}
470+
471+
pub fn is_empty(&self) -> bool {
472+
self.len == 0
473+
}
474+
475+
pub fn is_aligned(&self) -> bool {
476+
(self.as_ref().as_ptr() as usize).is_multiple_of(ALIGNMENT)
477+
}
478+
}
479+
480+
impl AsRef<[u8]> for FrozenPooledBuffer {
481+
fn as_ref(&self) -> &[u8] {
482+
&self.inner.buffer[self.offset..self.offset + self.len]
483+
}
484+
}
485+
486+
impl Deref for FrozenPooledBuffer {
487+
type Target = [u8];
488+
489+
fn deref(&self) -> &Self::Target {
490+
self.as_ref()
491+
}
492+
}
493+
494+
impl PartialEq for FrozenPooledBuffer {
495+
fn eq(&self, other: &Self) -> bool {
496+
self.as_ref() == other.as_ref()
497+
}
498+
}
499+
500+
/// Allow passing FrozenPooledBuffer directly to DirectFile's write methods without any copy
501+
impl IoBuf for FrozenPooledBuffer {
502+
fn as_init(&self) -> &[u8] {
503+
self.as_ref()
504+
}
505+
}

core/common/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ mod traits;
2929
mod types;
3030
mod utils;
3131

32+
pub use alloc::memory_pool::ALIGNMENT;
3233
pub use error::client_error::ClientError;
3334
pub use error::iggy_error::{IggyError, IggyErrorDiscriminants};
3435
// Locking is feature gated, thus only mod level re-export.

core/common/src/types/message/indexes_mut.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ impl IggyIndexesMut {
6161
/// The returned `IggyIndexes` uses Arc-backed `Bytes`, allowing cheap clones.
6262
pub fn freeze(&mut self) -> IggyIndexes {
6363
let base_position = self.base_position;
64-
let buffer = self.buffer.freeze();
64+
let buffer = self.buffer.freeze_to_bytes();
6565
self.saved_count = 0;
6666
self.base_position = 0;
6767
IggyIndexes::new(buffer, base_position)

core/common/src/types/message/message_view.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,10 +225,25 @@ impl<'a> Iterator for IggyMessageViewIterator<'a> {
225225

226226
#[cfg(test)]
227227
mod tests {
228+
use std::{str::FromStr, sync::Once};
229+
228230
use super::*;
229-
use crate::IggyMessage;
231+
use crate::{IggyMessage, MemoryPool, MemoryPoolConfigOther};
230232
use bytes::Bytes;
231233

234+
static TEST_INIT: Once = Once::new();
235+
236+
fn initialize_pool_for_tests() {
237+
TEST_INIT.call_once(|| {
238+
let config = MemoryPoolConfigOther {
239+
enabled: true,
240+
size: IggyByteSize::from_str("4GiB").unwrap(),
241+
bucket_capacity: 8192,
242+
};
243+
MemoryPool::init_pool(&config);
244+
});
245+
}
246+
232247
fn build_batch() -> crate::IggyMessagesBatch {
233248
let messages = vec![
234249
IggyMessage::builder()
@@ -249,6 +264,7 @@ mod tests {
249264

250265
#[test]
251266
fn should_return_tail_for_indexed_last_after_next() {
267+
initialize_pool_for_tests();
252268
let batch = build_batch();
253269
let mut iter = IggyMessageViewIterator::new_with_boundaries(
254270
batch.buffer(),
@@ -266,6 +282,7 @@ mod tests {
266282

267283
#[test]
268284
fn should_return_last_message_for_raw_last() {
285+
initialize_pool_for_tests();
269286
let batch = build_batch();
270287
let last = IggyMessageViewIterator::new(batch.buffer()).last().unwrap();
271288
assert_eq!(last.payload(), b"three");

core/common/src/types/message/messages_batch_mut.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ impl IggyMessagesBatchMut {
281281
pub fn freeze(&mut self) -> IggyMessagesBatch {
282282
let count = self.count();
283283
let indexes = self.indexes.freeze();
284-
let messages = self.messages.freeze();
284+
let messages = self.messages.freeze_to_bytes();
285285
IggyMessagesBatch::new(indexes, messages, count)
286286
}
287287

0 commit comments

Comments
 (0)