From b6f9dc7a467fb88d5f67b471f349c57eab14e43e Mon Sep 17 00:00:00 2001 From: Zakir Date: Thu, 19 Feb 2026 15:41:22 +0530 Subject: [PATCH 1/2] feat(rust): add streaming deserialization support Introduces ForyStreamBuf and Reader::from_stream for incremental stream-backed deserialization from any Read source. Preserves existing in-memory fast path with zero overhead. Closes #3300 --- rust/fory-core/src/buffer.rs | 157 ++++++++++++++-- rust/fory-core/src/fory.rs | 74 +++++++- rust/fory-core/src/lib.rs | 2 + rust/fory-core/src/stream.rs | 305 ++++++++++++++++++++++++++++++++ rust/tests/tests/stream_test.rs | 68 +++++++ 5 files changed, 584 insertions(+), 22 deletions(-) create mode 100644 rust/fory-core/src/stream.rs create mode 100644 rust/tests/tests/stream_test.rs diff --git a/rust/fory-core/src/buffer.rs b/rust/fory-core/src/buffer.rs index e726a6b44d..4e03389583 100644 --- a/rust/fory-core/src/buffer.rs +++ b/rust/fory-core/src/buffer.rs @@ -17,6 +17,7 @@ use crate::error::Error; use crate::meta::buffer_rw_string::read_latin1_simd; +use crate::stream::ForyStreamBuf; use byteorder::{ByteOrder, LittleEndian}; use std::cmp::max; @@ -499,6 +500,7 @@ impl<'a> Writer<'a> { pub struct Reader<'a> { pub(crate) bf: &'a [u8], pub(crate) cursor: usize, + pub(crate) stream: Option>, } #[allow(clippy::needless_lifetimes)] @@ -507,7 +509,26 @@ impl<'a> Reader<'a> { #[inline(always)] pub fn new(bf: &[u8]) -> Reader<'_> { - Reader { bf, cursor: 0 } + Reader { + bf, + cursor: 0, + stream: None, + } + } + + /// Construct a stream-backed `Reader`. + pub fn from_stream(stream: crate::stream::ForyStreamBuf) -> Reader<'static> { + let boxed = Box::new(stream); + Reader { + bf: b"", + cursor: 0, + stream: Some(boxed), + } + } + + #[inline(always)] + pub fn is_stream_backed(&self) -> bool { + self.stream.is_some() } #[inline(always)] @@ -544,25 +565,63 @@ impl<'a> Reader<'a> { self.cursor } + /// Fill stream buffer up to `target_size` total bytes, then re-pin `bf`. + /// Returns `false` if stream is None OR fill failed. + fn fill_to(&mut self, target_size: usize) -> bool { + let stream = match self.stream.as_mut() { + Some(s) => s, + None => return false, + }; + // intentional: fill_buffer validates; set_reader_index only syncs read_pos + let _ = stream.set_reader_index(self.cursor); + + let n = target_size.saturating_sub(self.cursor); + if n == 0 { + self.bf = unsafe { std::slice::from_raw_parts(stream.data(), stream.size()) }; + return self.bf.len() >= target_size; + } + if stream.fill_buffer(n).is_err() { + return false; + } + self.bf = unsafe { std::slice::from_raw_parts(stream.data(), stream.size()) }; + self.bf.len() >= target_size + } + + /// Ensure `self.cursor + n` bytes are available. + /// fast path: target <= size_ → return true + /// stream path: call fill_to(target), check again. #[inline(always)] - fn value_at(&self, index: usize) -> Result { - match self.bf.get(index) { - None => Err(Error::buffer_out_of_bound( - index, - self.bf.len(), - self.bf.len(), - )), - Some(v) => Ok(*v), + fn ensure_readable(&mut self, n: usize) -> Result<(), Error> { + let target = self.cursor + n; + if target <= self.bf.len() { + return Ok(()); } + if !self.fill_to(target) { + return Err(Error::buffer_out_of_bound(self.cursor, n, self.bf.len())); + } + if target > self.bf.len() { + return Err(Error::buffer_out_of_bound(self.cursor, n, self.bf.len())); + } + Ok(()) } #[inline(always)] - fn check_bound(&self, n: usize) -> Result<(), Error> { - if self.cursor + n > self.bf.len() { - Err(Error::buffer_out_of_bound(self.cursor, n, self.bf.len())) - } else { - Ok(()) + fn value_at(&mut self, index: usize) -> Result { + if index >= self.bf.len() { + // Need index+1 bytes total; fill to that target. + if !self.fill_to(index + 1) || index >= self.bf.len() { + return Err(Error::buffer_out_of_bound(index, 1, self.bf.len())); + } } + Ok(unsafe { *self.bf.get_unchecked(index) }) + } + + /// stream fill on miss. Changing to `&mut self` is the single + /// change that gives ALL 27 existing read methods stream support + /// without touching them individually — they all call this. + #[inline(always)] + fn check_bound(&mut self, n: usize) -> Result<(), Error> { + self.ensure_readable(n) } #[inline(always)] @@ -595,8 +654,12 @@ impl<'a> Reader<'a> { } } + /// `stream_->reader_index(reader_index_)` when stream-backed. pub fn set_cursor(&mut self, cursor: usize) { self.cursor = cursor; + if let Some(ref mut stream) = self.stream { + let _ = stream.set_reader_index(cursor); + } } // ============ BOOL (TypeId = 1) ============ @@ -692,6 +755,7 @@ impl<'a> Reader<'a> { #[inline(always)] pub fn read_u16(&mut self) -> Result { + self.check_bound(2)?; let slice = self.slice_after_cursor(); let result = LittleEndian::read_u16(slice); self.cursor += 2; @@ -702,6 +766,7 @@ impl<'a> Reader<'a> { #[inline(always)] pub fn read_u32(&mut self) -> Result { + self.check_bound(4)?; let slice = self.slice_after_cursor(); let result = LittleEndian::read_u32(slice); self.cursor += 4; @@ -712,6 +777,9 @@ impl<'a> Reader<'a> { #[inline(always)] pub fn read_varuint32(&mut self) -> Result { + if self.stream.is_some() && self.bf.len().saturating_sub(self.cursor) < 5 { + return self.read_varuint32_stream(); + } let b0 = self.value_at(self.cursor)? as u32; if b0 < 0x80 { self.move_next(1); @@ -749,6 +817,7 @@ impl<'a> Reader<'a> { #[inline(always)] pub fn read_u64(&mut self) -> Result { + self.check_bound(8)?; let slice = self.slice_after_cursor(); let result = LittleEndian::read_u64(slice); self.cursor += 8; @@ -759,6 +828,9 @@ impl<'a> Reader<'a> { #[inline(always)] pub fn read_varuint64(&mut self) -> Result { + if self.stream.is_some() && self.bf.len().saturating_sub(self.cursor) < 9 { + return self.read_varuint64_stream(); + } let b0 = self.value_at(self.cursor)? as u64; if b0 < 0x80 { self.move_next(1); @@ -847,6 +919,7 @@ impl<'a> Reader<'a> { #[inline(always)] pub fn read_f32(&mut self) -> Result { + self.check_bound(4)?; let slice = self.slice_after_cursor(); let result = LittleEndian::read_f32(slice); self.cursor += 4; @@ -857,6 +930,7 @@ impl<'a> Reader<'a> { #[inline(always)] pub fn read_f64(&mut self) -> Result { + self.check_bound(8)?; let slice = self.slice_after_cursor(); let result = LittleEndian::read_f64(slice); self.cursor += 8; @@ -951,6 +1025,7 @@ impl<'a> Reader<'a> { #[inline(always)] pub fn read_u128(&mut self) -> Result { + self.check_bound(16)?; let slice = self.slice_after_cursor(); let result = LittleEndian::read_u128(slice); self.cursor += 16; @@ -983,6 +1058,9 @@ impl<'a> Reader<'a> { #[inline(always)] pub fn read_varuint36small(&mut self) -> Result { + if self.stream.is_some() && self.bf.len().saturating_sub(self.cursor) < 8 { + return self.read_varuint36small_stream(); + } let start = self.cursor; let slice = self.slice_after_cursor(); @@ -1027,9 +1105,56 @@ impl<'a> Reader<'a> { } Ok(result) } + + /// Byte-by-byte varuint32 decode for stream-backed path. + fn read_varuint32_stream(&mut self) -> Result { + let mut result = 0u32; + for i in 0..5 { + let b = self.value_at(self.cursor)? as u32; + self.cursor += 1; + result |= (b & 0x7F) << (i * 7); + if (b & 0x80) == 0 { + return Ok(result); + } + } + Err(Error::encode_error("Invalid var_uint32 encoding")) + } + + /// Byte-by-byte varuint64 decode for stream-backed path. + fn read_varuint64_stream(&mut self) -> Result { + let mut result = 0u64; + for i in 0..8u64 { + let b = self.value_at(self.cursor)? as u64; + self.cursor += 1; + result |= (b & 0x7F) << (i * 7); + if (b & 0x80) == 0 { + return Ok(result); + } + } + // 9th byte — full 8 bits + let b = self.value_at(self.cursor)? as u64; + self.cursor += 1; + result |= b << 56; + Ok(result) + } + + /// Byte-by-byte varuint36small decode for stream-backed path. + fn read_varuint36small_stream(&mut self) -> Result { + let mut result = 0u64; + for i in 0..4u64 { + let b = self.value_at(self.cursor)? as u64; + self.cursor += 1; + result |= (b & 0x7F) << (i * 7); + if (b & 0x80) == 0 { + return Ok(result); + } + } + let b = self.value_at(self.cursor)? as u64; + self.cursor += 1; + result |= b << 28; + Ok(result) + } } #[allow(clippy::needless_lifetimes)] unsafe impl<'a> Send for Reader<'a> {} -#[allow(clippy::needless_lifetimes)] -unsafe impl<'a> Sync for Reader<'a> {} diff --git a/rust/fory-core/src/fory.rs b/rust/fory-core/src/fory.rs index 9d3f826941..0f2e1bfa7d 100644 --- a/rust/fory-core/src/fory.rs +++ b/rust/fory-core/src/fory.rs @@ -26,6 +26,7 @@ use crate::serializer::{Serializer, StructSerializer}; use crate::types::config_flags::{IS_CROSS_LANGUAGE_FLAG, IS_NULL_FLAG}; use crate::types::{RefMode, SIZE_OF_REF_AND_TYPE}; use std::cell::UnsafeCell; +use std::io::Read; use std::mem; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::OnceLock; @@ -959,13 +960,74 @@ impl Fory { reader: &mut Reader, ) -> Result { self.with_read_context(|context| { - let outlive_buffer = unsafe { mem::transmute::<&[u8], &[u8]>(reader.bf) }; - let mut new_reader = Reader::new(outlive_buffer); - new_reader.set_cursor(reader.cursor); - context.attach_reader(new_reader); + if reader.is_stream_backed() { + // Stream-backed path: move the owned stream out of the caller's reader, + // construct a fresh stream-backed reader at the current cursor, hand it + // to the context, then restore all state from the returned reader. + // This is the sequential-read case: caller creates Reader::from_stream(...) + // once and calls deserialize_from repeatedly. + let stream = mem::take(&mut reader.stream) + .expect("is_stream_backed was true but stream is None"); + let cursor = reader.cursor; + let mut stream_reader = Reader::from_stream(*stream); + // Sync cursor: the stream already consumed [0..cursor], re-position. + stream_reader.set_cursor(cursor); + context.attach_reader(stream_reader); + let result = self.deserialize_with_context(context); + let returned = context.detach_reader(); + // Restore state back to caller's reader. + reader.cursor = returned.cursor; + reader.stream = returned.stream; + // Re-pin bf from the (possibly grown after fill_to) stream buffer. + // SAFETY: same invariant as Reader::from_stream and fill_to: + // bf points into Box-owned stream buffer, owned by reader.stream, + // which lives as long as reader. + if let Some(ref s) = reader.stream { + reader.bf = unsafe { std::slice::from_raw_parts(s.data(), s.size()) }; + } + result + } else { + // In-memory path: unchanged from original. + let outlive_buffer = unsafe { mem::transmute::<&[u8], &[u8]>(reader.bf) }; + let mut new_reader = Reader::new(outlive_buffer); + new_reader.set_cursor(reader.cursor); + context.attach_reader(new_reader); + let result = self.deserialize_with_context(context); + let end = context.detach_reader().get_cursor(); + reader.set_cursor(end); + result + } + }) + } + + /// Deserializes a single value of type `T` from any `Read` source. + /// + /// Equivalent of C++ `fory.deserialize(Buffer(ForyInputStream(source)))`. + /// Internally wraps the source in a [`crate::stream::ForyStreamBuf`] and calls + /// [`deserialize_from`](Self::deserialize_from). + /// + /// For deserializing **multiple values sequentially** from one stream + /// (e.g. a network socket or pipe), create the reader once and reuse it: + /// + /// ```rust,ignore + /// use fory_core::{Fory, Reader}; + /// use fory_core::stream::ForyStreamBuf; + /// + /// let fory = Fory::default(); + /// let mut reader = Reader::from_stream(ForyStreamBuf::new(my_socket)); + /// let first: i32 = fory.deserialize_from(&mut reader).unwrap(); + /// let second: String = fory.deserialize_from(&mut reader).unwrap(); + /// ``` + pub fn deserialize_from_stream( + &self, + source: impl Read + Send + 'static, + ) -> Result { + self.with_read_context(|context| { + let stream = crate::stream::ForyStreamBuf::new(source); + let reader = Reader::from_stream(stream); + context.attach_reader(reader); let result = self.deserialize_with_context(context); - let end = context.detach_reader().get_cursor(); - reader.set_cursor(end); + context.detach_reader(); result }) } diff --git a/rust/fory-core/src/lib.rs b/rust/fory-core/src/lib.rs index 9666bacf01..897be2aff2 100644 --- a/rust/fory-core/src/lib.rs +++ b/rust/fory-core/src/lib.rs @@ -184,6 +184,7 @@ pub mod meta; pub mod resolver; pub mod row; pub mod serializer; +pub mod stream; pub mod types; pub mod util; @@ -199,3 +200,4 @@ pub use crate::resolver::type_resolver::{TypeInfo, TypeResolver}; pub use crate::serializer::weak::{ArcWeak, RcWeak}; pub use crate::serializer::{read_data, write_data, ForyDefault, Serializer, StructSerializer}; pub use crate::types::{RefFlag, RefMode, TypeId}; +pub use stream::ForyStreamBuf; diff --git a/rust/fory-core/src/stream.rs b/rust/fory-core/src/stream.rs new file mode 100644 index 0000000000..85a62e3b0f --- /dev/null +++ b/rust/fory-core/src/stream.rs @@ -0,0 +1,305 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Streaming buffer for incremental deserialization. + +use crate::error::Error; +use std::io::{self, Read}; + +const DEFAULT_BUFFER_SIZE: usize = 4096; + +/// Single internal `Vec` window. `valid_len` = `egptr()-eback()`. +/// `read_pos` = `gptr()-eback()`. [`fill_buffer`] grows on demand. +/// +/// [`fill_buffer`]: ForyStreamBuf::fill_buffer +pub struct ForyStreamBuf { + source: Box, + /// Backing window — equivalent of C++ `buffer_` (`std::vector`) + buffer: Vec, + /// Bytes fetched from source — equivalent of `egptr() - eback()` + valid_len: usize, + /// Current read cursor — equivalent of `gptr() - eback()` + read_pos: usize, +} + +impl ForyStreamBuf { + pub fn new(source: impl Read + Send + 'static) -> Self { + Self::with_capacity(source, DEFAULT_BUFFER_SIZE) + } + + /// Allocates and zero-initialises the backing window immediately, + /// `std::vector(buffer_size)` in the constructor. + pub fn with_capacity(source: impl Read + Send + 'static, buffer_size: usize) -> Self { + let cap = buffer_size.max(1); + let buffer = vec![0u8; cap]; + Self { + source: Box::new(source), + buffer, + valid_len: 0, + read_pos: 0, + } + } + + /// Pull bytes from source until `remaining() >= min_fill_size`. + pub fn fill_buffer(&mut self, min_fill_size: usize) -> Result<(), Error> { + if min_fill_size == 0 || self.remaining() >= min_fill_size { + return Ok(()); + } + + let need = min_fill_size - self.remaining(); + + let required = self + .valid_len + .checked_add(need) + .filter(|&r| r <= u32::MAX as usize) + .ok_or_else(|| { + Error::buffer_out_of_bound(self.read_pos, min_fill_size, self.remaining()) + })?; + + // Grow if required > current buffer length + if required > self.buffer.len() { + let new_cap = (self.buffer.len() * 2).max(required); + self.buffer.resize(new_cap, 0); + } + + while self.remaining() < min_fill_size { + let writable = self.buffer.len() - self.valid_len; + if writable == 0 { + // Inner double `buffer_.size() * 2 + 1` with u32 overflow guard + let new_cap = self + .buffer + .len() + .checked_mul(2) + .and_then(|n| n.checked_add(1)) + .filter(|&n| n <= u32::MAX as usize) + .ok_or_else(|| { + Error::buffer_out_of_bound(self.read_pos, min_fill_size, self.remaining()) + })?; + self.buffer.resize(new_cap, 0); + // fall through — self.buffer[self.valid_len..] is now non-empty + } + match self.source.read(&mut self.buffer[self.valid_len..]) { + Ok(0) => { + // `read_bytes <= 0` → buffer_out_of_bound + return Err(Error::buffer_out_of_bound( + self.read_pos, + min_fill_size, + self.remaining(), + )); + } + Ok(n) => self.valid_len += n, + Err(e) if e.kind() == io::ErrorKind::Interrupted => continue, + Err(_) => { + return Err(Error::buffer_out_of_bound( + self.read_pos, + min_fill_size, + self.remaining(), + )); + } + } + } + Ok(()) + } + + /// Move cursor backward by `size` bytes. + /// + /// `setg(eback(), gptr() - size, egptr())` + /// + /// Panics if `size > read_pos`. + pub fn rewind(&mut self, size: usize) { + assert!( + size <= self.read_pos, + "rewind size {} exceeds consumed bytes {}", + size, + self.read_pos + ); + self.read_pos -= size; + } + + /// Advance cursor forward by `size` bytes without pulling from source. + /// + /// `gbump(static_cast(size))` + /// + /// Panics if `size > remaining()`. + pub fn consume(&mut self, size: usize) { + assert!( + size <= self.remaining(), + "consume size {} exceeds available bytes {}", + size, + self.remaining() + ); + self.read_pos += size; + } + + /// Raw pointer to byte 0 of the internal window. + /// + /// Re-read by `Reader` (buffer.rs) after every `fill_buffer` call that + /// may reallocate + /// `data_ = stream_->data()`. + /// + /// `uint8_t* data()` → `reinterpret_cast(eback())`. + /// + /// # Safety + /// Valid until the next `fill_buffer` call that causes reallocation. + /// `Reader` always re-reads this pointer after every `fill_buffer`. + #[inline(always)] + pub(crate) fn data(&self) -> *const u8 { + self.buffer.as_ptr() + } + + /// Total fetched bytes + #[inline(always)] + pub fn size(&self) -> usize { + self.valid_len + } + + /// Current read cursor + #[inline(always)] + pub fn reader_index(&self) -> usize { + self.read_pos + } + + /// Set cursor to absolute `index`. + /// + /// Called by `Reader` (buffer.rs) after every cursor advance, mirroring + /// + /// Returns `Err` if `index > valid_len` + #[inline(always)] + pub(crate) fn set_reader_index(&mut self, index: usize) -> Result<(), Error> { + if index > self.valid_len { + return Err(Error::buffer_out_of_bound(index, 0, self.valid_len)); + } + self.read_pos = index; + Ok(()) + } + + /// Unread bytes in window + #[inline(always)] + pub fn remaining(&self) -> usize { + self.valid_len.saturating_sub(self.read_pos) + } + + /// Always `true` — used by `Reader` (buffer.rs) to branch into the stream path. + #[inline(always)] + pub fn is_stream_backed(&self) -> bool { + true + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::io::Cursor; + + /// Reads exactly 1 byte at a time. + struct OneByteCursor(Cursor>); + impl Read for OneByteCursor { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + if buf.is_empty() { + return Ok(0); + } + let mut one = [0u8; 1]; + match self.0.read(&mut one)? { + 0 => Ok(0), + _ => { + buf[0] = one[0]; + Ok(1) + } + } + } + } + + #[test] + fn test_rewind() { + let data = vec![0x01u8, 0x02, 0x03, 0x04, 0x05]; + let mut s = ForyStreamBuf::with_capacity(OneByteCursor(Cursor::new(data)), 2); + s.fill_buffer(4).unwrap(); + assert_eq!(s.size(), 4); + assert_eq!(s.reader_index(), 0); + s.consume(3); + assert_eq!(s.reader_index(), 3); + s.rewind(2); + assert_eq!(s.reader_index(), 1); + s.consume(1); + assert_eq!(s.reader_index(), 2); + } + + #[test] + fn test_short_read_error() { + let mut s = ForyStreamBuf::new(Cursor::new(vec![0x01u8, 0x02, 0x03])); + assert!(s.fill_buffer(4).is_err()); + } + + // Sequential fills with tiny-chunk reader + #[test] + fn test_sequential_fill() { + let data: Vec = (0u8..=9).collect(); + let mut s = ForyStreamBuf::with_capacity(OneByteCursor(Cursor::new(data)), 2); + s.fill_buffer(3).unwrap(); + assert!(s.remaining() >= 3); + s.consume(3); + s.fill_buffer(3).unwrap(); + assert!(s.remaining() >= 3); + } + + #[test] + fn test_overflow_guard() { + // valid_len near usize::MAX would overflow without the u32 guard. + // We can't actually allocate that — just verify the guard logic + // via a saturating check on a real (tiny) stream. + let mut s = ForyStreamBuf::new(Cursor::new(vec![0u8; 8])); + // Requesting more than the source has should error, not panic/overflow + assert!(s.fill_buffer(16).is_err()); + } + + #[test] + fn test_consume_panics_on_overrun() { + let result = std::panic::catch_unwind(|| { + let mut s = ForyStreamBuf::new(Cursor::new(vec![0x01u8])); + s.fill_buffer(1).unwrap(); + s.consume(2); // only 1 byte available + }); + assert!(result.is_err()); + } + + #[test] + fn test_rewind_panics_on_overrun() { + let result = std::panic::catch_unwind(|| { + let mut s = ForyStreamBuf::new(Cursor::new(vec![0x01u8, 0x02])); + s.fill_buffer(2).unwrap(); + s.consume(1); + s.rewind(2); // only consumed 1 + }); + assert!(result.is_err()); + } + + #[test] + fn test_set_reader_index() { + let mut s = ForyStreamBuf::new(Cursor::new(vec![0x01u8, 0x02, 0x03])); + s.fill_buffer(3).unwrap(); + assert!(s.set_reader_index(2).is_ok()); + assert_eq!(s.reader_index(), 2); + assert_eq!(s.remaining(), 1); + assert!(s.set_reader_index(4).is_err()); // beyond valid_len + } + + #[test] + fn test_is_stream_backed() { + let s = ForyStreamBuf::new(Cursor::new(vec![])); + assert!(s.is_stream_backed()); + } +} diff --git a/rust/tests/tests/stream_test.rs b/rust/tests/tests/stream_test.rs new file mode 100644 index 0000000000..f3e2b5b0bd --- /dev/null +++ b/rust/tests/tests/stream_test.rs @@ -0,0 +1,68 @@ +#[cfg(test)] +mod stream_tests { + use fory_core::buffer::Reader; + use fory_core::stream::ForyStreamBuf; + use fory_core::Fory; + use std::io::Cursor; + + struct OneByte(Cursor>); + impl std::io::Read for OneByte { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + if buf.is_empty() { + return Ok(0); + } + let mut one = [0u8]; + match self.0.read(&mut one)? { + 0 => Ok(0), + _ => { + buf[0] = one[0]; + Ok(1) + } + } + } + } + + #[test] + fn test_primitive_stream_roundtrip() { + let fory = Fory::default(); + let bytes = fory.serialize(&-9876543212345i64).unwrap(); + let result: i64 = fory + .deserialize_from_stream(OneByte(Cursor::new(bytes))) + .unwrap(); + assert_eq!(result, -9876543212345i64); + + let bytes = fory.serialize(&"stream-hello-世界".to_string()).unwrap(); + let result: String = fory + .deserialize_from_stream(OneByte(Cursor::new(bytes))) + .unwrap(); + assert_eq!(result, "stream-hello-世界"); + } + + #[test] + fn test_sequential_stream_reads() { + let fory = Fory::default(); + let mut bytes = Vec::new(); + fory.serialize_to(&mut bytes, &12345i32).unwrap(); + fory.serialize_to(&mut bytes, &"next-value".to_string()) + .unwrap(); + fory.serialize_to(&mut bytes, &99i64).unwrap(); + + let mut reader = Reader::from_stream(ForyStreamBuf::new(OneByte(Cursor::new(bytes)))); + let first: i32 = fory.deserialize_from(&mut reader).unwrap(); + let second: String = fory.deserialize_from(&mut reader).unwrap(); + let third: i64 = fory.deserialize_from(&mut reader).unwrap(); + + assert_eq!(first, 12345); + assert_eq!(second, "next-value"); + assert_eq!(third, 99); + } + + #[test] + fn test_truncated_stream_returns_error() { + let fory = Fory::default(); + let mut bytes = fory.serialize(&"hello world".to_string()).unwrap(); + bytes.pop(); + let result: Result = fory.deserialize_from_stream(Cursor::new(bytes)); + assert!(result.is_err()); + } +} From 18de0597aca5850c423ad833611b94092e92d0a3 Mon Sep 17 00:00:00 2001 From: Zakir Date: Thu, 19 Feb 2026 15:47:24 +0530 Subject: [PATCH 2/2] chore: add license header to stream_test.rs --- rust/tests/tests/stream_test.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/rust/tests/tests/stream_test.rs b/rust/tests/tests/stream_test.rs index f3e2b5b0bd..cc54f2c8d8 100644 --- a/rust/tests/tests/stream_test.rs +++ b/rust/tests/tests/stream_test.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + #[cfg(test)] mod stream_tests { use fory_core::buffer::Reader;