diff --git a/rust/fory-core/src/buffer.rs b/rust/fory-core/src/buffer.rs index e726a6b44d..abc3eaadca 100644 --- a/rust/fory-core/src/buffer.rs +++ b/rust/fory-core/src/buffer.rs @@ -17,8 +17,10 @@ use crate::error::Error; use crate::meta::buffer_rw_string::read_latin1_simd; +use crate::stream::ForyStreamBuf; use byteorder::{ByteOrder, LittleEndian}; use std::cmp::max; +use std::io::Read; /// Threshold for using SIMD optimizations in string operations. /// For buffers smaller than this, direct copy is faster than SIMD setup overhead. @@ -499,6 +501,13 @@ impl<'a> Writer<'a> { pub struct Reader<'a> { pub(crate) bf: &'a [u8], pub(crate) cursor: usize, + /// Optional stream backing for incremental deserialization. + /// When `Some`, `bf` points into the stream's buffer via unsafe reborrow. + /// When `None` (default), Reader operates on a borrowed slice with zero overhead. + /// + /// # Equivalent + /// C++ `Buffer::stream_` / `Buffer::stream_owner_` + stream: Option>, } #[allow(clippy::needless_lifetimes)] @@ -507,7 +516,158 @@ impl<'a> Reader<'a> { #[inline(always)] pub fn new(bf: &[u8]) -> Reader<'_> { - Reader { bf, cursor: 0 } + Reader { + bf, + cursor: 0, + stream: None, + } + } + + /// Creates a stream-backed reader for incremental deserialization. + /// + /// Returns `Reader<'static>` because the reader owns its data through the stream + /// buffer. The `bf` field points into the stream's internal `Vec` via unsafe + /// reborrow, matching C++ where `Buffer::data_` points into `ForyInputStream`'s + /// buffer. + /// + /// # Safety invariants + /// + /// 1. `bf` is reborrowed from the stream after every `fill_to` call + /// 2. `cursor` is always a valid absolute position within the stream buffer + /// 3. `ensure_readable` is called before every read access + /// 4. The stream is owned by Reader, ensuring buffer validity + /// + /// # Equivalent + /// C++ `Buffer(ForyInputStream&)` constructor + pub fn from_stream(source: Box) -> Reader<'static> { + Self::from_stream_with_capacity(source, 4096) + } + + /// Creates a stream-backed reader with specified initial buffer capacity. + /// + /// # Equivalent + /// C++ `Buffer(shared_ptr)` constructor + pub fn from_stream_with_capacity(source: Box, capacity: usize) -> Reader<'static> { + let stream = Box::new(ForyStreamBuf::with_capacity(source, capacity)); + // SAFETY: bf starts as empty slice. First ensure_readable will fill and reborrow. + // The stream is owned by Reader, so the buffer lives as long as Reader. + let bf: &'static [u8] = &[]; + Reader { + bf, + cursor: 0, + stream: Some(stream), + } + } + + /// Returns true if this reader is backed by a stream. + /// + /// # Equivalent + /// C++ `Buffer::is_stream_backed()` + #[inline(always)] + pub fn is_stream_backed(&self) -> bool { + self.stream.is_some() + } + + /// Takes ownership of the stream out of this reader. + /// Used by `deserialize_from` to transfer stream state to the context reader. + pub(crate) fn take_stream(&mut self) -> Option> { + self.stream.take() + } + + /// Restores a stream and syncs bf/cursor from it. + /// Used by `deserialize_from` to return stream ownership after deserialization. + pub(crate) fn restore_stream(&mut self, stream: Box) { + // SAFETY: Reborrow bf from stream. Same invariants as from_stream. + self.bf = unsafe { std::slice::from_raw_parts(stream.data_ptr(), stream.size()) }; + self.cursor = stream.reader_index(); + self.stream = Some(stream); + } + + /// Creates a Reader from an existing ForyStreamBuf. + /// Used internally by `deserialize_from` when transferring stream state. + pub(crate) fn from_stream_buf(stream: Box) -> Reader<'static> { + // SAFETY: Same invariants as from_stream. + let bf: &'static [u8] = + unsafe { std::slice::from_raw_parts(stream.data_ptr(), stream.size()) }; + let cursor = stream.reader_index(); + Reader { + bf, + cursor, + stream: Some(stream), + } + } + + /// Fills the stream buffer to ensure `target_size` total bytes are available. + /// After fill, reborrows `bf` from the stream's buffer. + /// + /// # Equivalent + /// C++ `Buffer::fill_to(uint32_t target_size, Error& error)` + #[inline(always)] + fn fill_to(&mut self, target_size: usize) -> Result<(), Error> { + if target_size <= self.bf.len() { + return Ok(()); + } + if let Some(stream) = &mut self.stream { + // Sync cursor to stream before filling + // C++: stream_->reader_index(reader_index_); + stream.set_reader_index(self.cursor); + + // Fill buffer: need (target_size - cursor) bytes from current position + let needed = target_size - self.cursor; + stream.fill_buffer(needed)?; + + // Reborrow bf from stream's (potentially reallocated) buffer + // C++: data_ = stream_->data(); size_ = stream_->size(); + // reader_index_ = stream_->reader_index(); + // SAFETY: stream owns the buffer, which lives as long as Reader. + // After fill_buffer, the buffer pointer may have changed due to + // Vec reallocation, so we must reborrow. + self.bf = unsafe { std::slice::from_raw_parts(stream.data_ptr(), stream.size()) }; + self.cursor = stream.reader_index(); + + Ok(()) + } else { + // Not stream-backed, cannot fill + Err(Error::buffer_out_of_bound( + self.cursor, + target_size - self.cursor, + self.bf.len(), + )) + } + } + + /// Ensures `len` bytes are readable from the current cursor position. + /// For slice-backed readers, simple bounds check. For stream-backed, + /// fills from source if needed. + /// + /// # Equivalent + /// C++ `Buffer::ensure_readable(uint32_t length, Error& error)` + #[inline(always)] + fn ensure_readable(&mut self, len: usize) -> Result<(), Error> { + let target = self.cursor + len; + // Fast path: data already available (zero overhead for in-memory) + if target <= self.bf.len() { + return Ok(()); + } + // Stream path or error + self.fill_to(target)?; + // Verify after fill + if self.cursor + len > self.bf.len() { + return Err(Error::buffer_out_of_bound(self.cursor, len, self.bf.len())); + } + Ok(()) + } + + /// Syncs the stream's reader_index with our cursor after a read. + /// No-op when not stream-backed. + /// + /// # Equivalent + /// C++ `if (stream_ != nullptr) { stream_->reader_index(reader_index_); }` + #[inline(always)] + fn sync_stream_pos(&mut self) { + if let Some(stream) = &mut self.stream { + stream.set_reader_index(self.cursor); + } } #[inline(always)] @@ -545,24 +705,23 @@ impl<'a> Reader<'a> { } #[inline(always)] - fn value_at(&self, index: usize) -> Result { - match self.bf.get(index) { - None => Err(Error::buffer_out_of_bound( - index, - self.bf.len(), - self.bf.len(), - )), - Some(v) => Ok(*v), + fn value_at(&mut self, index: usize) -> Result { + if index < self.bf.len() { + return Ok(self.bf[index]); + } + // Stream path: try to fill + if self.stream.is_some() { + self.fill_to(index + 1)?; + if index < self.bf.len() { + return Ok(self.bf[index]); + } } + Err(Error::buffer_out_of_bound(index, 1, self.bf.len())) } #[inline(always)] - fn check_bound(&self, n: usize) -> Result<(), Error> { - if self.cursor + n > self.bf.len() { - Err(Error::buffer_out_of_bound(self.cursor, n, self.bf.len())) - } else { - Ok(()) - } + fn check_bound(&mut self, n: usize) -> Result<(), Error> { + self.ensure_readable(n) } #[inline(always)] @@ -576,6 +735,7 @@ impl<'a> Reader<'a> { pub fn skip(&mut self, len: usize) -> Result<(), Error> { self.check_bound(len)?; self.move_next(len); + self.sync_stream_pos(); Ok(()) } @@ -584,6 +744,7 @@ impl<'a> Reader<'a> { self.check_bound(len)?; let result = &self.bf[self.cursor..self.cursor + len]; self.move_next(len); + self.sync_stream_pos(); Ok(result) } @@ -662,6 +823,7 @@ impl<'a> Reader<'a> { if (i & 0b1) != 0b1 { // Bit 0 is 0, small value encoded in 4 bytes self.cursor += 4; + self.sync_stream_pos(); Ok((i >> 1) as i64) // arithmetic right shift preserves sign } else { // Bit 0 is 1, big value: skip flag byte and read 8 bytes @@ -669,6 +831,7 @@ impl<'a> Reader<'a> { self.cursor += 1; let value = LittleEndian::read_i64(&self.bf[self.cursor..]); self.cursor += 8; + self.sync_stream_pos(); Ok(value) } } @@ -683,8 +846,10 @@ impl<'a> Reader<'a> { #[inline(always)] pub fn read_u8(&mut self) -> Result { - let result = self.value_at(self.cursor)?; - self.move_next(1); + self.ensure_readable(1)?; + let result = self.bf[self.cursor]; + self.cursor += 1; + self.sync_stream_pos(); Ok(result) } @@ -692,9 +857,10 @@ impl<'a> Reader<'a> { #[inline(always)] pub fn read_u16(&mut self) -> Result { - let slice = self.slice_after_cursor(); - let result = LittleEndian::read_u16(slice); + self.ensure_readable(2)?; + let result = LittleEndian::read_u16(&self.bf[self.cursor..]); self.cursor += 2; + self.sync_stream_pos(); Ok(result) } @@ -702,9 +868,10 @@ impl<'a> Reader<'a> { #[inline(always)] pub fn read_u32(&mut self) -> Result { - let slice = self.slice_after_cursor(); - let result = LittleEndian::read_u32(slice); + self.ensure_readable(4)?; + let result = LittleEndian::read_u32(&self.bf[self.cursor..]); self.cursor += 4; + self.sync_stream_pos(); Ok(result) } @@ -712,9 +879,17 @@ impl<'a> Reader<'a> { #[inline(always)] pub fn read_varuint32(&mut self) -> Result { + // Stream fallback: byte-at-a-time when not enough bytes for bulk read + // C++: if (stream_ != nullptr && size_ - reader_index_ < 5) + if self.stream.is_some() && (self.bf.len() - self.cursor) < 5 { + self.ensure_readable(1)?; + return self.read_varuint32_stream(); + } + let b0 = self.value_at(self.cursor)? as u32; if b0 < 0x80 { self.move_next(1); + self.sync_stream_pos(); return Ok(b0); } @@ -722,6 +897,7 @@ impl<'a> Reader<'a> { let mut encoded = (b0 & 0x7F) | ((b1 & 0x7F) << 7); if b1 < 0x80 { self.move_next(2); + self.sync_stream_pos(); return Ok(encoded); } @@ -729,6 +905,7 @@ impl<'a> Reader<'a> { encoded |= (b2 & 0x7F) << 14; if b2 < 0x80 { self.move_next(3); + self.sync_stream_pos(); return Ok(encoded); } @@ -736,12 +913,14 @@ impl<'a> Reader<'a> { encoded |= (b3 & 0x7F) << 21; if b3 < 0x80 { self.move_next(4); + self.sync_stream_pos(); return Ok(encoded); } let b4 = self.value_at(self.cursor + 4)? as u32; encoded |= b4 << 28; self.move_next(5); + self.sync_stream_pos(); Ok(encoded) } @@ -749,9 +928,10 @@ impl<'a> Reader<'a> { #[inline(always)] pub fn read_u64(&mut self) -> Result { - let slice = self.slice_after_cursor(); - let result = LittleEndian::read_u64(slice); + self.ensure_readable(8)?; + let result = LittleEndian::read_u64(&self.bf[self.cursor..]); self.cursor += 8; + self.sync_stream_pos(); Ok(result) } @@ -759,9 +939,17 @@ impl<'a> Reader<'a> { #[inline(always)] pub fn read_varuint64(&mut self) -> Result { + // Stream fallback: byte-at-a-time when not enough bytes for bulk read + // C++: if (stream_ != nullptr && size_ - reader_index_ < 9) + if self.stream.is_some() && (self.bf.len() - self.cursor) < 9 { + self.ensure_readable(1)?; + return self.read_varuint64_stream(); + } + let b0 = self.value_at(self.cursor)? as u64; if b0 < 0x80 { self.move_next(1); + self.sync_stream_pos(); return Ok(b0); } @@ -769,6 +957,7 @@ impl<'a> Reader<'a> { let mut result = (b0 & 0x7F) | ((b1 & 0x7F) << 7); if b1 < 0x80 { self.move_next(2); + self.sync_stream_pos(); return Ok(result); } @@ -776,6 +965,7 @@ impl<'a> Reader<'a> { result |= (b2 & 0x7F) << 14; if b2 < 0x80 { self.move_next(3); + self.sync_stream_pos(); return Ok(result); } @@ -783,6 +973,7 @@ impl<'a> Reader<'a> { result |= (b3 & 0x7F) << 21; if b3 < 0x80 { self.move_next(4); + self.sync_stream_pos(); return Ok(result); } @@ -790,6 +981,7 @@ impl<'a> Reader<'a> { result |= (b4 & 0x7F) << 28; if b4 < 0x80 { self.move_next(5); + self.sync_stream_pos(); return Ok(result); } @@ -797,6 +989,7 @@ impl<'a> Reader<'a> { result |= (b5 & 0x7F) << 35; if b5 < 0x80 { self.move_next(6); + self.sync_stream_pos(); return Ok(result); } @@ -804,6 +997,7 @@ impl<'a> Reader<'a> { result |= (b6 & 0x7F) << 42; if b6 < 0x80 { self.move_next(7); + self.sync_stream_pos(); return Ok(result); } @@ -811,12 +1005,14 @@ impl<'a> Reader<'a> { result |= (b7 & 0x7F) << 49; if b7 < 0x80 { self.move_next(8); + self.sync_stream_pos(); return Ok(result); } let b8 = self.value_at(self.cursor + 8)? as u64; result |= (b8 & 0xFF) << 56; self.move_next(9); + self.sync_stream_pos(); Ok(result) } @@ -832,6 +1028,7 @@ impl<'a> Reader<'a> { if (i & 0b1) != 0b1 { // Bit 0 is 0, small value encoded in 4 bytes self.cursor += 4; + self.sync_stream_pos(); Ok((i >> 1) as u64) } else { // Bit 0 is 1, big value: skip flag byte and read 8 bytes @@ -839,6 +1036,7 @@ impl<'a> Reader<'a> { self.cursor += 1; let value = LittleEndian::read_u64(&self.bf[self.cursor..]); self.cursor += 8; + self.sync_stream_pos(); Ok(value) } } @@ -847,9 +1045,10 @@ impl<'a> Reader<'a> { #[inline(always)] pub fn read_f32(&mut self) -> Result { - let slice = self.slice_after_cursor(); - let result = LittleEndian::read_f32(slice); + self.ensure_readable(4)?; + let result = LittleEndian::read_f32(&self.bf[self.cursor..]); self.cursor += 4; + self.sync_stream_pos(); Ok(result) } @@ -857,9 +1056,10 @@ impl<'a> Reader<'a> { #[inline(always)] pub fn read_f64(&mut self) -> Result { - let slice = self.slice_after_cursor(); - let result = LittleEndian::read_f64(slice); + self.ensure_readable(8)?; + let result = LittleEndian::read_f64(&self.bf[self.cursor..]); self.cursor += 8; + self.sync_stream_pos(); Ok(result) } @@ -868,6 +1068,7 @@ impl<'a> Reader<'a> { #[inline(always)] pub fn read_latin1_string(&mut self, len: usize) -> Result { self.check_bound(len)?; + // sync_stream_pos is called implicitly via move_next paths below if len < SIMD_THRESHOLD { // Fast path for small buffers unsafe { @@ -883,6 +1084,7 @@ impl<'a> Reader<'a> { std::ptr::copy_nonoverlapping(src.as_ptr(), dst, len); vec.set_len(len); self.move_next(len); + self.sync_stream_pos(); Ok(String::from_utf8_unchecked(vec)) } else { // Contains Latin1 bytes (0x80-0xFF): must convert to UTF-8 @@ -904,6 +1106,7 @@ impl<'a> Reader<'a> { out.set_len(out_len); self.move_next(len); + self.sync_stream_pos(); Ok(String::from_utf8_unchecked(out)) } } @@ -916,6 +1119,7 @@ impl<'a> Reader<'a> { #[inline(always)] pub fn read_utf8_string(&mut self, len: usize) -> Result { self.check_bound(len)?; + // sync_stream_pos is handled by move_next below // don't use simd for memory copy, copy_non_overlapping is faster unsafe { let mut vec = Vec::with_capacity(len); @@ -925,6 +1129,7 @@ impl<'a> Reader<'a> { std::ptr::copy_nonoverlapping(src, dst, len); vec.set_len(len); self.move_next(len); + self.sync_stream_pos(); // SAFETY: Assuming valid UTF-8 bytes (responsibility of serialization protocol) Ok(String::from_utf8_unchecked(vec)) } @@ -933,12 +1138,14 @@ impl<'a> Reader<'a> { #[inline(always)] pub fn read_utf16_string(&mut self, len: usize) -> Result { self.check_bound(len)?; + // sync_stream_pos is handled by move_next below let slice = self.sub_slice(self.cursor, self.cursor + len)?; let units: Vec = slice .chunks_exact(2) .map(|c| u16::from_le_bytes([c[0], c[1]])) .collect(); self.move_next(len); + self.sync_stream_pos(); Ok(String::from_utf16_lossy(&units)) } @@ -951,9 +1158,10 @@ impl<'a> Reader<'a> { #[inline(always)] pub fn read_u128(&mut self) -> Result { - let slice = self.slice_after_cursor(); - let result = LittleEndian::read_u128(slice); + self.ensure_readable(16)?; + let result = LittleEndian::read_u128(&self.bf[self.cursor..]); self.cursor += 16; + self.sync_stream_pos(); Ok(result) } @@ -983,12 +1191,19 @@ impl<'a> Reader<'a> { #[inline(always)] pub fn read_varuint36small(&mut self) -> Result { + // Stream fallback: byte-at-a-time when not enough bytes for bulk read + // C++: if (stream_ != nullptr && size_ - reader_index_ < 8) + if self.stream.is_some() && (self.bf.len() - self.cursor) < 8 { + self.ensure_readable(1)?; + return self.read_varuint36small_stream(); + } + let start = self.cursor; - let slice = self.slice_after_cursor(); + let remaining = self.bf.len() - self.cursor; - if slice.len() >= 8 { - // here already check bound - let bulk = self.read_u64()?; + if remaining >= 8 { + self.ensure_readable(8)?; + let bulk = LittleEndian::read_u64(&self.bf[self.cursor..]); let mut result = bulk & 0x7F; let mut read_idx = start; @@ -1009,9 +1224,11 @@ impl<'a> Reader<'a> { } } self.cursor = read_idx + 1; + self.sync_stream_pos(); return Ok(result); } + // Slow path: byte-by-byte read for in-memory buffers near end let mut result = 0u64; let mut shift = 0; while self.cursor < self.bf.len() { @@ -1025,8 +1242,77 @@ impl<'a> Reader<'a> { return Err(Error::encode_error("varuint36small overflow")); } } + self.sync_stream_pos(); Ok(result) } + + // ============ Stream fallback methods ============ + // Byte-at-a-time varint decoding for stream-backed readers. + // These match C++ read_var_uint32_stream, read_var_uint64_stream, + // and read_var_uint36_small_stream from buffer.h. + + /// Reads a varuint32 byte-by-byte from stream-backed buffer. + /// + /// # Equivalent + /// C++ `Buffer::read_var_uint32_stream(Error& error)` + fn read_varuint32_stream(&mut self) -> Result { + let mut result = 0u32; + for i in 0..5u32 { + self.ensure_readable(1)?; + let b = self.bf[self.cursor]; + self.cursor += 1; + self.sync_stream_pos(); + result |= ((b & 0x7F) as u32) << (i * 7); + if (b & 0x80) == 0 { + return Ok(result); + } + } + Err(Error::encode_error("Invalid var_uint32 encoding")) + } + + /// Reads a varuint64 byte-by-byte from stream-backed buffer. + /// + /// # Equivalent + /// C++ `Buffer::read_var_uint64_stream(Error& error)` + fn read_varuint64_stream(&mut self) -> Result { + let mut result = 0u64; + for i in 0..8u32 { + self.ensure_readable(1)?; + let b = self.bf[self.cursor]; + self.cursor += 1; + self.sync_stream_pos(); + result |= ((b & 0x7F) as u64) << (i * 7); + if (b & 0x80) == 0 { + return Ok(result); + } + } + // 9th byte: bits 56-63 + self.ensure_readable(1)?; + let b = self.bf[self.cursor]; + self.cursor += 1; + self.sync_stream_pos(); + result |= (b as u64) << 56; + Ok(result) + } + + /// Reads a varuint36small byte-by-byte from stream-backed buffer. + /// + /// # Equivalent + /// C++ `Buffer::read_var_uint36_small_stream(Error& error)` + fn read_varuint36small_stream(&mut self) -> Result { + let mut result = 0u64; + for i in 0..5u32 { + self.ensure_readable(1)?; + let b = self.bf[self.cursor]; + self.cursor += 1; + self.sync_stream_pos(); + result |= ((b & 0x7F) as u64) << (i * 7); + if (b & 0x80) == 0 { + return Ok(result); + } + } + Err(Error::encode_error("Invalid var_uint36_small encoding")) + } } #[allow(clippy::needless_lifetimes)] diff --git a/rust/fory-core/src/fory.rs b/rust/fory-core/src/fory.rs index 9d3f826941..7a128ebd80 100644 --- a/rust/fory-core/src/fory.rs +++ b/rust/fory-core/src/fory.rs @@ -959,14 +959,30 @@ impl Fory { reader: &mut Reader, ) -> Result { self.with_read_context(|context| { - let outlive_buffer = unsafe { mem::transmute::<&[u8], &[u8]>(reader.bf) }; - let mut new_reader = Reader::new(outlive_buffer); - new_reader.set_cursor(reader.cursor); - context.attach_reader(new_reader); - let result = self.deserialize_with_context(context); - let end = context.detach_reader().get_cursor(); - reader.set_cursor(end); - result + if let Some(stream) = reader.take_stream() { + // Stream-backed reader: transfer stream ownership to context reader. + // This ensures the context reader can fill from the stream during + // deserialization. After deserialization, ownership is restored. + let context_reader = Reader::from_stream_buf(stream); + context.attach_reader(context_reader); + let result = self.deserialize_with_context(context); + let mut detached = context.detach_reader(); + // Transfer stream back to caller's reader + if let Some(stream_back) = detached.take_stream() { + reader.restore_stream(stream_back); + } + result + } else { + // Slice-backed reader: original behavior + let outlive_buffer = unsafe { mem::transmute::<&[u8], &[u8]>(reader.bf) }; + let mut new_reader = Reader::new(outlive_buffer); + new_reader.set_cursor(reader.cursor); + context.attach_reader(new_reader); + let result = self.deserialize_with_context(context); + let end = context.detach_reader().get_cursor(); + reader.set_cursor(end); + result + } }) } diff --git a/rust/fory-core/src/lib.rs b/rust/fory-core/src/lib.rs index 9666bacf01..8b70b6f086 100644 --- a/rust/fory-core/src/lib.rs +++ b/rust/fory-core/src/lib.rs @@ -184,6 +184,7 @@ pub mod meta; pub mod resolver; pub mod row; pub mod serializer; +pub mod stream; pub mod types; pub mod util; diff --git a/rust/fory-core/src/stream.rs b/rust/fory-core/src/stream.rs new file mode 100644 index 0000000000..834437cbe3 --- /dev/null +++ b/rust/fory-core/src/stream.rs @@ -0,0 +1,202 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Streaming buffer for incremental deserialization. +//! +//! Provides [`ForyStreamBuf`], which wraps any [`Read`] source +//! and maintains a growable internal buffer that data is appended into on +//! demand. This is the Rust equivalent of C++ `ForyInputStreamBuf` from +//! `cpp/fory/util/stream.h` (PR #3307). +//! +//! # Design +//! +//! The buffer grows monotonically (no compaction). Data is always appended at +//! the end, matching the C++ `ForyInputStreamBuf::fill_buffer` behavior where +//! `write_pos = cur_size` and new bytes are written after existing valid data. +//! +//! `reader_index` is an absolute position within this growing buffer, so the +//! [`Reader`](crate::buffer::Reader) cursor never needs adjustment after a fill. + +use crate::error::Error; +use std::io::{self, Read}; + +/// Default initial buffer capacity matching C++ `ForyInputStreamBuf` default. +const DEFAULT_CAPACITY: usize = 4096; + +/// Streaming buffer that reads from a source on demand. +/// +/// Wraps any `Read` source and maintains a growable buffer. Data is appended +/// at the end on each `fill_buffer` call, never compacted. This matches the +/// C++ `ForyInputStreamBuf` which uses a `std::vector` that only grows. +/// +/// # Equivalent +/// C++ `ForyInputStreamBuf` in `cpp/fory/util/stream.h` +pub struct ForyStreamBuf { + source: Box, + /// Buffer holding all fetched data. Only grows, never compacted. + buffer: Vec, + /// Number of valid (filled) bytes in buffer. Equivalent to C++ `size()` + /// which returns `egptr() - eback()`. + valid_len: usize, + /// Current read position. Equivalent to C++ `reader_index()` which + /// returns `gptr() - eback()`. + read_pos: usize, +} + +impl ForyStreamBuf { + /// Creates a new stream buffer with default capacity (4096 bytes). + pub fn new(source: Box) -> Self { + Self::with_capacity(source, DEFAULT_CAPACITY) + } + + /// Creates a new stream buffer with specified initial capacity. + /// + /// # Equivalent + /// C++ `ForyInputStreamBuf(std::istream&, uint32_t buffer_size)` + pub fn with_capacity(source: Box, capacity: usize) -> Self { + Self { + source, + buffer: Vec::with_capacity(capacity.max(1)), + valid_len: 0, + read_pos: 0, + } + } + + /// Ensures at least `min_bytes` are available to read beyond current + /// `read_pos`. Reads from source in a loop until enough data is available + /// or EOF/error is reached. + /// + /// Does NOT compact β€” new data is appended at `valid_len`, matching C++ + /// `ForyInputStreamBuf::fill_buffer` where `write_pos = cur_size`. + /// + /// # Errors + /// Returns `Error::buffer_out_of_bound` if EOF is reached before enough + /// bytes are available. + /// + /// # Equivalent + /// C++ `ForyInputStreamBuf::fill_buffer(uint32_t min_fill_size)` + pub fn fill_buffer(&mut self, min_bytes: usize) -> Result<(), Error> { + if min_bytes == 0 { + return Ok(()); + } + if self.remaining() >= min_bytes { + return Ok(()); + } + + // Calculate required total buffer size + // C++: required = cur_size + (min_fill_size - remaining_size()) + let required = self.valid_len + (min_bytes - self.remaining()); + + // Grow buffer capacity if needed (double or required, whichever is larger) + if required > self.buffer.len() { + let new_cap = (self.buffer.len() * 2).max(required); + self.buffer.resize(new_cap, 0); + } + + // Read from source until we have enough data + // C++: while (remaining_size() < min_fill_size) { ... sgetn(...) ... } + while self.remaining() < min_bytes { + let writable = self.buffer.len() - self.valid_len; + if writable == 0 { + // Need to grow more + let new_cap = self.buffer.len() * 2 + 1; + self.buffer.resize(new_cap, 0); + continue; + } + + match self.source.read(&mut self.buffer[self.valid_len..]) { + Ok(0) => { + // EOF before getting enough bytes + return Err(Error::buffer_out_of_bound( + self.read_pos, + min_bytes, + self.remaining(), + )); + } + Ok(n) => { + self.valid_len += n; + } + Err(e) if e.kind() == io::ErrorKind::Interrupted => { + continue; + } + Err(_) => { + return Err(Error::buffer_out_of_bound( + self.read_pos, + min_bytes, + self.remaining(), + )); + } + } + } + + Ok(()) + } + + /// Returns pointer to the start of the buffer. + /// + /// # Equivalent + /// C++ `ForyInputStreamBuf::data()` which returns `eback()` (start of buffer) + #[inline(always)] + pub fn data_ptr(&self) -> *const u8 { + self.buffer.as_ptr() + } + + /// Returns total valid bytes in buffer (from start). + /// + /// # Equivalent + /// C++ `ForyInputStreamBuf::size()` which returns `egptr() - eback()` + #[inline(always)] + pub fn size(&self) -> usize { + self.valid_len + } + + /// Returns current read position (absolute, from buffer start). + /// + /// # Equivalent + /// C++ `ForyInputStreamBuf::reader_index()` which returns `gptr() - eback()` + #[inline(always)] + pub fn reader_index(&self) -> usize { + self.read_pos + } + + /// Sets the read position. + /// + /// # Panics + /// Panics if index exceeds valid data length. + /// + /// # Equivalent + /// C++ `ForyInputStreamBuf::reader_index(uint32_t index)` + #[inline(always)] + pub fn set_reader_index(&mut self, index: usize) { + assert!( + index <= self.valid_len, + "reader index {} exceeds valid data length {}", + index, + self.valid_len + ); + self.read_pos = index; + } + + /// Returns number of unread bytes remaining. + /// + /// # Equivalent + /// C++ `ForyInputStreamBuf::remaining_size()` which returns `egptr() - gptr()` + #[inline(always)] + pub fn remaining(&self) -> usize { + self.valid_len.saturating_sub(self.read_pos) + } +} diff --git a/rust/tests/tests/mod.rs b/rust/tests/tests/mod.rs index 2f83762a50..bd521da326 100644 --- a/rust/tests/tests/mod.rs +++ b/rust/tests/tests/mod.rs @@ -19,4 +19,5 @@ mod compatible; mod test_any; mod test_collection; mod test_max_dyn_depth; +mod test_stream; mod test_tuple; diff --git a/rust/tests/tests/test_stream.rs b/rust/tests/tests/test_stream.rs new file mode 100644 index 0000000000..d16c9b8c49 --- /dev/null +++ b/rust/tests/tests/test_stream.rs @@ -0,0 +1,277 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Tests for stream-backed deserialization. +//! +//! Mirrors C++ `stream_test.cc` and `buffer_test.cc` stream tests from PR #3307. +//! Uses a `OneByteReader` that delivers data one byte at a time, which is the +//! worst-case scenario for streaming: every multi-byte read triggers a fill. + +use fory_core::buffer::{Reader, Writer}; +use fory_core::fory::Fory; +use fory_derive::ForyObject; +use std::io::{self, Read}; + +/// A `Read` implementation that delivers exactly one byte per `read()` call. +/// This forces the stream buffer to fill repeatedly, exercising all ensure_readable paths. +/// Equivalent to C++ `OneByteStreamBuf` / `OneByteIStream` from stream_test.cc. +struct OneByteReader { + data: Vec, + pos: usize, +} + +impl OneByteReader { + fn new(data: Vec) -> Self { + Self { data, pos: 0 } + } +} + +impl Read for OneByteReader { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + if self.pos >= self.data.len() || buf.is_empty() { + return Ok(0); + } + buf[0] = self.data[self.pos]; + self.pos += 1; + Ok(1) + } +} + +// ============ Buffer-level tests (matching C++ buffer_test.cc) ============ + +/// Tests reading various fixed-size and variable-length encoded types from +/// a one-byte stream. Equivalent to C++ `Buffer, StreamReadFromOneByteSource`. +#[test] +fn test_stream_buffer_read_primitives() { + // Write test data + let mut buf = Vec::new(); + let mut writer = Writer::from_buffer(&mut buf); + writer.write_u32(0x01020304); + writer.write_i64(-1234567890); + writer.write_var_uint32(300); + writer.write_varint64(-4567890123); + writer.write_tagged_u64(0x123456789); + writer.write_var_uint36_small(0x1FFFF); + + // Read from one-byte stream + let mut reader = Reader::from_stream_with_capacity(Box::new(OneByteReader::new(buf)), 8); + + assert_eq!(reader.read_u32().unwrap(), 0x01020304); + assert_eq!(reader.read_i64().unwrap(), -1234567890); + assert_eq!(reader.read_varuint32().unwrap(), 300); + assert_eq!(reader.read_varint64().unwrap(), -4567890123); + assert_eq!(reader.read_tagged_u64().unwrap(), 0x123456789); + assert_eq!(reader.read_varuint36small().unwrap(), 0x1FFFF); +} + +/// Tests that stream-backed reader correctly handles short reads. +/// Equivalent to C++ `Buffer, StreamReadErrorWhenInsufficientData`. +#[test] +fn test_stream_short_read_error() { + let data = vec![0x01, 0x02, 0x03]; // only 3 bytes for a u32 read + let mut reader = Reader::from_stream_with_capacity(Box::new(OneByteReader::new(data)), 2); + + let result = reader.read_u32(); + assert!(result.is_err(), "expected error reading u32 from 3 bytes"); +} + +/// Tests basic bool/u8 types and skip. +#[test] +fn test_stream_read_small_types() { + let mut buf = Vec::new(); + let mut writer = Writer::from_buffer(&mut buf); + writer.write_bool(true); + writer.write_i8(-42); + writer.write_u8(0xAB); + writer.write_i16(-1000); + writer.write_u16(60000); + + let mut reader = Reader::from_stream_with_capacity(Box::new(OneByteReader::new(buf)), 4); + + assert!(reader.read_bool().unwrap()); + assert_eq!(reader.read_i8().unwrap(), -42); + assert_eq!(reader.read_u8().unwrap(), 0xAB); + assert_eq!(reader.read_i16().unwrap(), -1000); + assert_eq!(reader.read_u16().unwrap(), 60000); +} + +/// Tests float types through stream. +#[test] +fn test_stream_read_floats() { + let mut buf = Vec::new(); + let mut writer = Writer::from_buffer(&mut buf); + writer.write_f32(1.5f32); + writer.write_f64(123.456789f64); + + let mut reader = Reader::from_stream_with_capacity(Box::new(OneByteReader::new(buf)), 4); + + assert!((reader.read_f32().unwrap() - 1.5f32).abs() < f32::EPSILON); + assert!((reader.read_f64().unwrap() - 123.456789f64).abs() < f64::EPSILON); +} + +/// Tests read_bytes and skip through stream. +#[test] +fn test_stream_read_bytes_and_skip() { + let mut buf = Vec::new(); + let mut writer = Writer::from_buffer(&mut buf); + writer.write_u32(0xDEADBEEF); + writer.write_bytes(&[1, 2, 3, 4, 5]); + writer.write_u8(0xFF); + + let mut reader = Reader::from_stream_with_capacity(Box::new(OneByteReader::new(buf)), 4); + + // Skip past the u32 + reader.skip(4).unwrap(); + // Read 5 bytes + let bytes = reader.read_bytes(5).unwrap().to_vec(); + assert_eq!(bytes, vec![1, 2, 3, 4, 5]); + // Read trailing byte + assert_eq!(reader.read_u8().unwrap(), 0xFF); +} + +// ============ Fory-level tests (matching C++ stream_test.cc) ============ + +/// Equivalent to C++ `StreamSerializationTest, PrimitiveAndStringRoundTrip`. +#[test] +fn test_stream_fory_primitive_roundtrip() { + let fory = Fory::default(); + + // i64 roundtrip through stream + let original: i64 = -9876543212345; + let bytes = fory.serialize(&original).unwrap(); + let mut reader = Reader::from_stream_with_capacity(Box::new(OneByteReader::new(bytes)), 8); + let result: i64 = fory.deserialize_from(&mut reader).unwrap(); + assert_eq!(result, original); +} + +/// Tests string deserialization through stream. +#[test] +fn test_stream_fory_string_roundtrip() { + let fory = Fory::default(); + + let original = "stream-hello-δΈ–η•Œ".to_string(); + let bytes = fory.serialize(&original).unwrap(); + let mut reader = Reader::from_stream_with_capacity(Box::new(OneByteReader::new(bytes)), 8); + let result: String = fory.deserialize_from(&mut reader).unwrap(); + assert_eq!(result, original); +} + +/// Tests custom struct roundtrip. Equivalent to C++ `StreamSerializationTest, StructRoundTrip`. +#[test] +fn test_stream_fory_struct_roundtrip() { + #[derive(ForyObject, Debug, PartialEq)] + struct StreamPoint { + x: i32, + y: i32, + } + + let mut fory = Fory::default(); + fory.register::(100).unwrap(); + + let original = StreamPoint { x: 42, y: -7 }; + let bytes = fory.serialize(&original).unwrap(); + let mut reader = Reader::from_stream_with_capacity(Box::new(OneByteReader::new(bytes)), 4); + let result: StreamPoint = fory.deserialize_from(&mut reader).unwrap(); + assert_eq!(result, original); +} + +/// Tests sequential deserialization of multiple objects from a single stream. +/// Equivalent to C++ `StreamSerializationTest, SequentialDeserializeFromSingleStream`. +#[test] +fn test_stream_fory_sequential_deserialize() { + #[derive(ForyObject, Debug, PartialEq)] + struct SeqPoint { + x: i32, + y: i32, + } + + let mut fory = Fory::default(); + fory.register::(100).unwrap(); + + // Serialize multiple objects into one buffer + let mut bytes = Vec::new(); + fory.serialize_to(&mut bytes, &42i32).unwrap(); + fory.serialize_to(&mut bytes, &"next-value".to_string()) + .unwrap(); + fory.serialize_to(&mut bytes, &SeqPoint { x: 9, y: 8 }) + .unwrap(); + + let total_len = bytes.len(); + + // Deserialize sequentially from one-byte stream + let mut reader = Reader::from_stream_with_capacity(Box::new(OneByteReader::new(bytes)), 3); + + let first: i32 = fory.deserialize_from(&mut reader).unwrap(); + assert_eq!(first, 42); + + let second: String = fory.deserialize_from(&mut reader).unwrap(); + assert_eq!(second, "next-value"); + + let third: SeqPoint = fory.deserialize_from(&mut reader).unwrap(); + assert_eq!(third, SeqPoint { x: 9, y: 8 }); + + // Final cursor should match total serialized length + assert_eq!(reader.get_cursor(), total_len); +} + +/// Tests that truncated stream produces an error. +/// Equivalent to C++ `StreamSerializationTest, TruncatedStreamReturnsError`. +#[test] +fn test_stream_fory_truncated_error() { + #[derive(ForyObject, Debug, PartialEq)] + struct TruncPoint { + x: i32, + y: i32, + } + + let mut fory = Fory::default(); + fory.register::(100).unwrap(); + + let original = TruncPoint { x: 7, y: 7 }; + let mut bytes = fory.serialize(&original).unwrap(); + assert!(bytes.len() > 1); + bytes.pop(); // Remove last byte + + let mut reader = Reader::from_stream_with_capacity(Box::new(OneByteReader::new(bytes)), 4); + let result = fory.deserialize_from::(&mut reader); + assert!(result.is_err(), "expected error from truncated stream"); +} + +/// Tests Vec roundtrip through stream. +#[test] +fn test_stream_fory_vec_roundtrip() { + let fory = Fory::default(); + + let original = vec![1i32, 2, 3, 5, 8, 13, 21]; + let bytes = fory.serialize(&original).unwrap(); + let mut reader = Reader::from_stream_with_capacity(Box::new(OneByteReader::new(bytes)), 4); + let result: Vec = fory.deserialize_from(&mut reader).unwrap(); + assert_eq!(result, original); +} + +/// Ensures that existing in-memory paths are not regressed. +/// No stream involved β€” just verifies Reader::new still works as expected. +#[test] +fn test_no_regression_in_memory_reader() { + let fory = Fory::default(); + + let original = "Hello, regression test!".to_string(); + let bytes = fory.serialize(&original).unwrap(); + let mut reader = Reader::new(&bytes); + let result: String = fory.deserialize_from(&mut reader).unwrap(); + assert_eq!(result, original); +}