Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 141 additions & 16 deletions rust/fory-core/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use crate::error::Error;
use crate::meta::buffer_rw_string::read_latin1_simd;
use crate::stream::ForyStreamBuf;
use byteorder::{ByteOrder, LittleEndian};
use std::cmp::max;

Expand Down Expand Up @@ -499,6 +500,7 @@ impl<'a> Writer<'a> {
pub struct Reader<'a> {
pub(crate) bf: &'a [u8],
pub(crate) cursor: usize,
pub(crate) stream: Option<Box<ForyStreamBuf>>,
}

#[allow(clippy::needless_lifetimes)]
Expand All @@ -507,7 +509,26 @@ impl<'a> Reader<'a> {

#[inline(always)]
pub fn new(bf: &[u8]) -> Reader<'_> {
Reader { bf, cursor: 0 }
Reader {
bf,
cursor: 0,
stream: None,
}
}

/// Construct a stream-backed `Reader`.
pub fn from_stream(stream: crate::stream::ForyStreamBuf) -> Reader<'static> {
let boxed = Box::new(stream);
Reader {
bf: b"",
cursor: 0,
stream: Some(boxed),
}
}

#[inline(always)]
pub fn is_stream_backed(&self) -> bool {
self.stream.is_some()
}

#[inline(always)]
Expand Down Expand Up @@ -544,25 +565,63 @@ impl<'a> Reader<'a> {
self.cursor
}

/// Fill stream buffer up to `target_size` total bytes, then re-pin `bf`.
/// Returns `false` if stream is None OR fill failed.
fn fill_to(&mut self, target_size: usize) -> bool {
let stream = match self.stream.as_mut() {
Some(s) => s,
None => return false,
};
// intentional: fill_buffer validates; set_reader_index only syncs read_pos
let _ = stream.set_reader_index(self.cursor);

let n = target_size.saturating_sub(self.cursor);
if n == 0 {
self.bf = unsafe { std::slice::from_raw_parts(stream.data(), stream.size()) };
return self.bf.len() >= target_size;
}
if stream.fill_buffer(n).is_err() {
return false;
}
self.bf = unsafe { std::slice::from_raw_parts(stream.data(), stream.size()) };
self.bf.len() >= target_size
}

/// Ensure `self.cursor + n` bytes are available.
/// fast path: target <= size_ → return true
/// stream path: call fill_to(target), check again.
#[inline(always)]
fn value_at(&self, index: usize) -> Result<u8, Error> {
match self.bf.get(index) {
None => Err(Error::buffer_out_of_bound(
index,
self.bf.len(),
self.bf.len(),
)),
Some(v) => Ok(*v),
fn ensure_readable(&mut self, n: usize) -> Result<(), Error> {
let target = self.cursor + n;
if target <= self.bf.len() {
return Ok(());
}
if !self.fill_to(target) {
return Err(Error::buffer_out_of_bound(self.cursor, n, self.bf.len()));
}
if target > self.bf.len() {
return Err(Error::buffer_out_of_bound(self.cursor, n, self.bf.len()));
}
Ok(())
}

#[inline(always)]
fn check_bound(&self, n: usize) -> Result<(), Error> {
if self.cursor + n > self.bf.len() {
Err(Error::buffer_out_of_bound(self.cursor, n, self.bf.len()))
} else {
Ok(())
fn value_at(&mut self, index: usize) -> Result<u8, Error> {
if index >= self.bf.len() {
// Need index+1 bytes total; fill to that target.
if !self.fill_to(index + 1) || index >= self.bf.len() {
return Err(Error::buffer_out_of_bound(index, 1, self.bf.len()));
}
}
Ok(unsafe { *self.bf.get_unchecked(index) })
}

/// stream fill on miss. Changing to `&mut self` is the single
/// change that gives ALL 27 existing read methods stream support
/// without touching them individually — they all call this.
#[inline(always)]
fn check_bound(&mut self, n: usize) -> Result<(), Error> {
self.ensure_readable(n)
}

#[inline(always)]
Expand Down Expand Up @@ -595,8 +654,12 @@ impl<'a> Reader<'a> {
}
}

/// `stream_->reader_index(reader_index_)` when stream-backed.
pub fn set_cursor(&mut self, cursor: usize) {
self.cursor = cursor;
if let Some(ref mut stream) = self.stream {
let _ = stream.set_reader_index(cursor);
}
}

// ============ BOOL (TypeId = 1) ============
Expand Down Expand Up @@ -692,6 +755,7 @@ impl<'a> Reader<'a> {

#[inline(always)]
pub fn read_u16(&mut self) -> Result<u16, Error> {
self.check_bound(2)?;
let slice = self.slice_after_cursor();
let result = LittleEndian::read_u16(slice);
self.cursor += 2;
Expand All @@ -702,6 +766,7 @@ impl<'a> Reader<'a> {

#[inline(always)]
pub fn read_u32(&mut self) -> Result<u32, Error> {
self.check_bound(4)?;
let slice = self.slice_after_cursor();
let result = LittleEndian::read_u32(slice);
self.cursor += 4;
Expand All @@ -712,6 +777,9 @@ impl<'a> Reader<'a> {

#[inline(always)]
pub fn read_varuint32(&mut self) -> Result<u32, Error> {
if self.stream.is_some() && self.bf.len().saturating_sub(self.cursor) < 5 {
return self.read_varuint32_stream();
}
let b0 = self.value_at(self.cursor)? as u32;
if b0 < 0x80 {
self.move_next(1);
Expand Down Expand Up @@ -749,6 +817,7 @@ impl<'a> Reader<'a> {

#[inline(always)]
pub fn read_u64(&mut self) -> Result<u64, Error> {
self.check_bound(8)?;
let slice = self.slice_after_cursor();
let result = LittleEndian::read_u64(slice);
self.cursor += 8;
Expand All @@ -759,6 +828,9 @@ impl<'a> Reader<'a> {

#[inline(always)]
pub fn read_varuint64(&mut self) -> Result<u64, Error> {
if self.stream.is_some() && self.bf.len().saturating_sub(self.cursor) < 9 {
return self.read_varuint64_stream();
}
let b0 = self.value_at(self.cursor)? as u64;
if b0 < 0x80 {
self.move_next(1);
Expand Down Expand Up @@ -847,6 +919,7 @@ impl<'a> Reader<'a> {

#[inline(always)]
pub fn read_f32(&mut self) -> Result<f32, Error> {
self.check_bound(4)?;
let slice = self.slice_after_cursor();
let result = LittleEndian::read_f32(slice);
self.cursor += 4;
Expand All @@ -857,6 +930,7 @@ impl<'a> Reader<'a> {

#[inline(always)]
pub fn read_f64(&mut self) -> Result<f64, Error> {
self.check_bound(8)?;
let slice = self.slice_after_cursor();
let result = LittleEndian::read_f64(slice);
self.cursor += 8;
Expand Down Expand Up @@ -951,6 +1025,7 @@ impl<'a> Reader<'a> {

#[inline(always)]
pub fn read_u128(&mut self) -> Result<u128, Error> {
self.check_bound(16)?;
let slice = self.slice_after_cursor();
let result = LittleEndian::read_u128(slice);
self.cursor += 16;
Expand Down Expand Up @@ -983,6 +1058,9 @@ impl<'a> Reader<'a> {

#[inline(always)]
pub fn read_varuint36small(&mut self) -> Result<u64, Error> {
if self.stream.is_some() && self.bf.len().saturating_sub(self.cursor) < 8 {
return self.read_varuint36small_stream();
}
let start = self.cursor;
let slice = self.slice_after_cursor();

Expand Down Expand Up @@ -1027,9 +1105,56 @@ impl<'a> Reader<'a> {
}
Ok(result)
}

/// Byte-by-byte varuint32 decode for stream-backed path.
fn read_varuint32_stream(&mut self) -> Result<u32, Error> {
let mut result = 0u32;
for i in 0..5 {
let b = self.value_at(self.cursor)? as u32;
self.cursor += 1;
result |= (b & 0x7F) << (i * 7);
if (b & 0x80) == 0 {
return Ok(result);
}
}
Err(Error::encode_error("Invalid var_uint32 encoding"))
}

/// Byte-by-byte varuint64 decode for stream-backed path.
fn read_varuint64_stream(&mut self) -> Result<u64, Error> {
let mut result = 0u64;
for i in 0..8u64 {
let b = self.value_at(self.cursor)? as u64;
self.cursor += 1;
result |= (b & 0x7F) << (i * 7);
if (b & 0x80) == 0 {
return Ok(result);
}
}
// 9th byte — full 8 bits
let b = self.value_at(self.cursor)? as u64;
self.cursor += 1;
result |= b << 56;
Ok(result)
}

/// Byte-by-byte varuint36small decode for stream-backed path.
fn read_varuint36small_stream(&mut self) -> Result<u64, Error> {
let mut result = 0u64;
for i in 0..4u64 {
let b = self.value_at(self.cursor)? as u64;
self.cursor += 1;
result |= (b & 0x7F) << (i * 7);
if (b & 0x80) == 0 {
return Ok(result);
}
}
let b = self.value_at(self.cursor)? as u64;
self.cursor += 1;
result |= b << 28;
Ok(result)
}
}

#[allow(clippy::needless_lifetimes)]
unsafe impl<'a> Send for Reader<'a> {}
#[allow(clippy::needless_lifetimes)]
unsafe impl<'a> Sync for Reader<'a> {}
74 changes: 68 additions & 6 deletions rust/fory-core/src/fory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::serializer::{Serializer, StructSerializer};
use crate::types::config_flags::{IS_CROSS_LANGUAGE_FLAG, IS_NULL_FLAG};
use crate::types::{RefMode, SIZE_OF_REF_AND_TYPE};
use std::cell::UnsafeCell;
use std::io::Read;
use std::mem;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::OnceLock;
Expand Down Expand Up @@ -959,13 +960,74 @@ impl Fory {
reader: &mut Reader,
) -> Result<T, Error> {
self.with_read_context(|context| {
let outlive_buffer = unsafe { mem::transmute::<&[u8], &[u8]>(reader.bf) };
let mut new_reader = Reader::new(outlive_buffer);
new_reader.set_cursor(reader.cursor);
context.attach_reader(new_reader);
if reader.is_stream_backed() {
// Stream-backed path: move the owned stream out of the caller's reader,
// construct a fresh stream-backed reader at the current cursor, hand it
// to the context, then restore all state from the returned reader.
// This is the sequential-read case: caller creates Reader::from_stream(...)
// once and calls deserialize_from repeatedly.
let stream = mem::take(&mut reader.stream)
.expect("is_stream_backed was true but stream is None");
let cursor = reader.cursor;
let mut stream_reader = Reader::from_stream(*stream);
// Sync cursor: the stream already consumed [0..cursor], re-position.
stream_reader.set_cursor(cursor);
context.attach_reader(stream_reader);
let result = self.deserialize_with_context(context);
let returned = context.detach_reader();
// Restore state back to caller's reader.
reader.cursor = returned.cursor;
reader.stream = returned.stream;
// Re-pin bf from the (possibly grown after fill_to) stream buffer.
// SAFETY: same invariant as Reader::from_stream and fill_to:
// bf points into Box-owned stream buffer, owned by reader.stream,
// which lives as long as reader.
if let Some(ref s) = reader.stream {
reader.bf = unsafe { std::slice::from_raw_parts(s.data(), s.size()) };
}
result
} else {
// In-memory path: unchanged from original.
let outlive_buffer = unsafe { mem::transmute::<&[u8], &[u8]>(reader.bf) };
let mut new_reader = Reader::new(outlive_buffer);
new_reader.set_cursor(reader.cursor);
context.attach_reader(new_reader);
let result = self.deserialize_with_context(context);
let end = context.detach_reader().get_cursor();
reader.set_cursor(end);
result
}
})
}

/// Deserializes a single value of type `T` from any `Read` source.
///
/// Equivalent of C++ `fory.deserialize<T>(Buffer(ForyInputStream(source)))`.
/// Internally wraps the source in a [`crate::stream::ForyStreamBuf`] and calls
/// [`deserialize_from`](Self::deserialize_from).
///
/// For deserializing **multiple values sequentially** from one stream
/// (e.g. a network socket or pipe), create the reader once and reuse it:
///
/// ```rust,ignore
/// use fory_core::{Fory, Reader};
/// use fory_core::stream::ForyStreamBuf;
///
/// let fory = Fory::default();
/// let mut reader = Reader::from_stream(ForyStreamBuf::new(my_socket));
/// let first: i32 = fory.deserialize_from(&mut reader).unwrap();
/// let second: String = fory.deserialize_from(&mut reader).unwrap();
/// ```
pub fn deserialize_from_stream<T: Serializer + ForyDefault>(
&self,
source: impl Read + Send + 'static,
) -> Result<T, Error> {
self.with_read_context(|context| {
let stream = crate::stream::ForyStreamBuf::new(source);
let reader = Reader::from_stream(stream);
context.attach_reader(reader);
let result = self.deserialize_with_context(context);
let end = context.detach_reader().get_cursor();
reader.set_cursor(end);
context.detach_reader();
result
})
}
Expand Down
2 changes: 2 additions & 0 deletions rust/fory-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ pub mod meta;
pub mod resolver;
pub mod row;
pub mod serializer;
pub mod stream;
pub mod types;
pub mod util;

Expand All @@ -199,3 +200,4 @@ pub use crate::resolver::type_resolver::{TypeInfo, TypeResolver};
pub use crate::serializer::weak::{ArcWeak, RcWeak};
pub use crate::serializer::{read_data, write_data, ForyDefault, Serializer, StructSerializer};
pub use crate::types::{RefFlag, RefMode, TypeId};
pub use stream::ForyStreamBuf;
Loading
Loading