From 58d850424ca70372eb8450c0971c4a6d7bfba057 Mon Sep 17 00:00:00 2001 From: Andrii Ivanov Date: Thu, 28 May 2026 12:02:25 +0300 Subject: [PATCH 1/4] wip --- .markdownlint.yaml | 5 + .vscode/settings.json | 3 - CHANGELOG.md | 42 ++- README.md | 77 +++++- example/README.md | 45 ++++ example/basic/main.dart | 106 ++++++++ example/file_streaming/main.dart | 147 +++++++++++ example/main.dart | 182 ------------- example/network_streaming/main.dart | 81 ++++++ example/network_streaming/models.dart | 79 ++++++ example/network_streaming/transformer.dart | 45 ++++ lib/pro_binary.dart | 3 + lib/src/binary_reader.dart | 37 +++ lib/src/binary_writer.dart | 145 ++++++++--- lib/src/constants.dart | 14 + .../native.dart} | 0 .../web.dart} | 0 lib/src/stream/binary_stream_transformer.dart | 47 +--- lib/src/stream/stream_binary_reader.dart | 243 +++++++----------- lib/src/stream/transactional_reader.dart | 203 +++++++++++++++ .../transactional_stream_transformer.dart | 69 +++++ .../deserialization_bench.dart | 0 .../pool_bench.dart | 0 .../serialization_bench.dart | 0 .../strings_bench.dart | 26 ++ pubspec.yaml | 5 +- .../stream_binary_reader_coverage_test.dart | 2 +- test/stream/stream_binary_reader_test.dart | 15 ++ test/unit/binary_reader_string_test.dart | 74 ++++++ test/unit/binary_writer_string_test.dart | 32 +++ 30 files changed, 1298 insertions(+), 429 deletions(-) create mode 100644 .markdownlint.yaml delete mode 100644 .vscode/settings.json create mode 100644 example/README.md create mode 100644 example/basic/main.dart create mode 100644 example/file_streaming/main.dart delete mode 100644 example/main.dart create mode 100644 example/network_streaming/main.dart create mode 100644 example/network_streaming/models.dart create mode 100644 example/network_streaming/transformer.dart create mode 100644 lib/src/constants.dart rename lib/src/{constants_native.dart => constants/native.dart} (100%) rename lib/src/{constants_web.dart => constants/web.dart} (100%) create mode 100644 lib/src/stream/transactional_reader.dart create mode 100644 lib/src/stream/transactional_stream_transformer.dart rename {test/performance => performance}/deserialization_bench.dart (100%) rename {test/performance => performance}/pool_bench.dart (100%) rename {test/performance => performance}/serialization_bench.dart (100%) rename {test/performance => performance}/strings_bench.dart (84%) diff --git a/.markdownlint.yaml b/.markdownlint.yaml new file mode 100644 index 0000000..617646b --- /dev/null +++ b/.markdownlint.yaml @@ -0,0 +1,5 @@ +default: true +extends: null + +MD013: false + diff --git a/.vscode/settings.json b/.vscode/settings.json deleted file mode 100644 index 4a203ff..0000000 --- a/.vscode/settings.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "cSpell.words": ["mocktail", "sublist"] -} diff --git a/CHANGELOG.md b/CHANGELOG.md index d6355d7..709974c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,22 @@ -## 4.0.0 + + +# 5.0.0 + +- **BREAKING CHANGES:** + - **Stream API**: rewritten architecture — `StreamBinaryReader` now `implements TransactionalReader>` with delegation to state + - **`NotEnoughDataException`**: message changed from `required $required bytes` to `required $required` + +- **New Features:** + - **Strings**: added `writeStringFixed` and `readStringFixed` for fixed-width length-prefixed strings. + - **Stream API**: added `TransactionalReader` interface for abstracting transactional read model + - **Stream API**: added `ChunkedTransactionalState` base class for managing chunk queue and bookmarks + - **Stream API**: added `TransactionalStreamTransformer` — generic `StreamTransformer` for parsing streams + - **Stream API**: `BinaryStreamTransformer` now extends `TransactionalStreamTransformer` + - **BinaryWriter**: improved `writeVarString` documentation with examples + - **BinaryReader**: `peekByte()` added bounds check — throws `RangeError` + - **Examples**: full restructuring — removed `example/main.dart`, added `example/basic/`, `example/network_streaming/`, `example/file_streaming/` + +# 4.0.0 **BREAKING CHANGES:** @@ -20,7 +38,7 @@ **Tests:** Improved all tests -## 3.2.0 +# 3.2.0 **BREAKING CHANGES:** @@ -50,7 +68,7 @@ - Added tests for pool statistics, edge cases takeBytes/reset/release -## 3.1.0 +# 3.1.0 - **feat**: Added `BinaryWriterPool.withWriter()` for safer and more concise object pool usage. - **feat**: Added modern API features for a more idiomatic experience: @@ -68,7 +86,7 @@ - **docs**: Fixed minor typos and improved documentation for `BinaryWriterPool`. - **docs**: Complete README overhaul with a focus on recipes and technical clarity. -## 3.0.0 +# 3.0.0 **Improvements:** @@ -97,12 +115,12 @@ - **fix**: Resolved known issues -## 2.2.0 +# 2.2.0 **test**: Added integration tests for new error handling features **deps**: Update internal dependencies to latest versions -## 2.1.0 +# 2.1.0 - **feat**: Added detailed error messages with context (offset, available bytes) - **feat**: Added `toBytes()` method in `BinaryWriter` (returns buffer without reset) @@ -114,16 +132,16 @@ - **test**: Added new tests for boundary checks and new methods - **docs**: Updated documentation with better examples and error handling -## 2.0.0 +# 2.0.0 - Update dependencies - sdk: ^3.6.0 -## 1.1.1 +# 1.1.1 - fix: warnings -## 1.1.0 +# 1.1.0 - fix: Increased test coverage, providing more comprehensive validation for edge cases. - performance: Optimized buffer management to reduce memory reallocations and improve efficiency. @@ -138,15 +156,15 @@ - feat: Added `bytesRead` property to monitor the total number of bytes read from the buffer. - feat: Introduced `reset` method, allowing users to reset the reading position to the start of the buffer for convenient re-reading. -## 1.0.2 +# 1.0.2 - docs: Updated documentation. -## 1.0.1 +# 1.0.1 - docs: Updated documentation. - feat: Added `example` directory with basic usage examples. -## 1.0.0 +# 1.0.0 - Initial release. diff --git a/README.md b/README.md index a79913f..f22570e 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ * **One-Pass Strings**: Optimized `writeVarString` with optimistic shift (30% faster). * **Smart Buffering**: Exponential growth (×1.5) and object pooling. * **Compact Encoding**: VarInt & ZigZag support -* **Stream Parsing**: `StreamBinaryReader` and `BinaryStreamTransformer` for async data. +* **Stream Parsing**: `StreamBinaryReader` and `BinaryStreamTransformer` for async data. Extensible via `TransactionalReader` and `TransactionalStreamTransformer`. * **Universal**: Supports Native & Web (WASM/JS) with consistent API. * **Modern API**: Leverages Dart Extension Types for zero-overhead abstractions. @@ -20,7 +20,7 @@ ```yaml dependencies: - pro_binary: ^4.0.0 + pro_binary: ^5.0.0 ``` ## Quick Start @@ -50,6 +50,7 @@ final reader2 = BinaryReader.fromList(bytesList); ## Recipes & Patterns ### 1. Efficient Object Serialization + ```dart class User { final int id; @@ -70,41 +71,55 @@ class User { ``` ### 2. High-Frequency writes (Pooling) + Avoid GC pressure by reusing writer instances. **Recommended (Safe & Concise):** + ```dart final data = BinaryWriterPool.withWriter((writer) { writer.writeUint32(1); writer.writeVarString('Dart Rocks!'); - return writer.toBytes(); // View of the buffer + + // toBytes(): returns a zero-copy VIEW. Use for immediate processing (e.g. socket.add). + // takeBytes(): detaches the buffer and RESETS the writer. Safe for returning data. + return writer.takeBytes(); }); ``` **Low-level API:** + ```dart final writer = BinaryWriterPool.acquire(); try { writer.writeUint32(1); writer.writeVarString('Dart Rocks!'); + final data = writer.toBytes(); + socket.add(data); // Process data BEFORE releasing back to the pool } finally { BinaryWriterPool.release(writer); } ``` ### 3. Stream Parsing (Async Binary Messages) + Process binary data arriving in chunks over a stream. **Custom Transformer:** + ```dart class MessageParser extends BinaryStreamTransformer { @override Message? parse(StreamBinaryReader reader) { // Return null when not enough data yet - if (!reader.hasBytes(4)) return null; + if (!reader.hasBytes(4)) { + return null; + } + final id = reader.readUint32(); final name = reader.readVarString(); + return Message(id, name); } } @@ -114,6 +129,7 @@ stream.transform(MessageParser()).listen((msg) => print(msg)); ``` **Manual Chunk Reading:** + ```dart final reader = StreamBinaryReader(); reader.addChunk(chunk1); @@ -130,6 +146,7 @@ try { ``` ### 4. Binary Packets (Manual navigation) + ```dart final reader = BinaryReader(bytes); final type = reader[0]; // Absolute peek via operator [] @@ -139,24 +156,56 @@ if (reader.hasBytes(4)) { } ``` +## Examples + +Explore the [example](example/) directory for complete, runnable projects: + +* [Basic Usage](example/basic/): Simple serialization and deserialization. +* [File Streaming](example/file_streaming/): Reading and writing large binary files using streams. +* [Network Streaming](example/network_streaming/): Implementing a custom protocol for TCP/Socket data. + ## API Overview -Full API documentation: https://pub.dev/documentation/pro_binary/latest/pro_binary/ +[Full API documentation](https://pub.dev/documentation/pro_binary/latest/pro_binary/) -| Class | Description | -| -------------------------------- | ------------------------------------------------------------------------------------------------------------------ | -| **BinaryWriter** | Encode data: fixed types, VarInt/ZigZag, strings, bytes. Supports `takeBytes()`, `toBytes()`, `reset()`, `seek()`. | -| **BinaryReader** | Decode data: all fixed/variable types, navigation (`skip`, `seek`, `rewind`, `peek`), `rebind()` for reuse. | -| **StreamBinaryReader** | Async streaming: chunk-based reading with `bookmark`/`rollback`/`commit` transactional model. | -| **BinaryStreamTransformer\** | Stream parser: extend and implement `parse()` to process binary streams. | -| **BinaryWriterPool** | Object pool: `acquire()`/`release()` or `withWriter()` for high-frequency writes. | -| **getUtf8Length** | Utility: calculate UTF-8 byte length without encoding. | +| Class | Description | +| ----- | ------------ | +| **BinaryWriter** | Encode data: fixed types, VarInt/ZigZag, strings, bytes. Supports `takeBytes()`, `toBytes()`, `reset()`, `seek()`. | +| **BinaryReader** | Decode data: all fixed/variable types, navigation (`skip`, `seek`, `rewind`, `peek`), `rebind()` for reuse. | +| **StreamBinaryReader** | Async streaming: chunk-based reading with `bookmark`/`rollback`/`commit` transactional model. | +| **BinaryStreamTransformer\** | Stream parser: extend and implement `parse()` to process binary streams. | +| **TransactionalStreamTransformer\** | Generic stream transformer: extend for custom chunk types and readers. | +| **BinaryWriterPool** | Object pool: `acquire()`/`release()` or `withWriter()` for high-frequency writes. | +| **getUtf8Length** | Utility: calculate UTF-8 byte length without encoding. | ## Performance Run benchmarks to see it in action: + ```bash -dart run benchmark_harness:bench --flavor aot --target test/performance/serialization_bench.dart +# Serialization (Writer) +dart run performance/serialization_bench.dart + +# Deserialization (Reader) +dart run performance/deserialization_bench.dart + +# String encoding (One-pass vs Two-pass vs Standard) +dart run performance/strings_bench.dart + +# Object Pooling (GC impact mitigation) +dart run performance/pool_bench.dart +``` + +## Testing + +The library is heavily tested with over 200+ unit and integration tests. + +```bash +# Run all tests +dart test + +# Run tests with coverage +dart test --coverage=coverage ``` ## License diff --git a/example/README.md b/example/README.md new file mode 100644 index 0000000..3b1a197 --- /dev/null +++ b/example/README.md @@ -0,0 +1,45 @@ +# pro_binary Examples + +This directory contains examples demonstrating how to use `pro_binary` effectively in various scenarios. + +## 📁 Examples Structure + +### 1. [Basic Serialization & Streaming](basic/) +A self-contained example showing the core API: +* **Simple Serialization**: How to encode and decode a class. +* **Pool API**: Using `BinaryWriterPool` for high-performance applications. +* **Basic Streaming**: Implementing a `BinaryStreamTransformer` to parse objects from fragmented byte streams. + +### 2. [Advanced Network Streaming](network_streaming/) +A multi-file architectural example simulating a real-world IoT/Telemetry protocol: +* **Protocol Framing**: Searching for sync bytes (magic bytes). +* **Nested Models**: Encoding/decoding complex structures with lists. +* **Fragmentation Resilience**: Proving that the parser can reconstruct packets from tiny network chunks. + +### 3. [File Streaming (Big Data)](file_streaming/) +A high-performance example demonstrating how to process large binary files: +* **Incremental Processing**: Using `File.openRead()` to process data without loading the entire file into RAM. +* **Market Data Simulation**: Parsing 250,000+ trade records (Market Ticks) on-the-fly. +* **Memory Efficiency**: Maintaining a constant memory footprint regardless of file size. + +--- + +## 🚀 How to Run + +You can run any example directly using the Dart CLI: + +```bash +# Run the basic overview +dart example/basic/main.dart + +# Run the advanced telemetry simulation +dart example/network_streaming/main.dart + +# Run the big data file streaming example +dart example/file_streaming/main.dart +``` + +## 🛠 Best Practices Demonstrated +* **Model Design**: Using `factory Model.decode(BinaryReader r)` and `void encode(BinaryWriter w)`. +* **Efficiency**: Utilizing `takeBytes()` to reuse writers and `BinaryReader.rebind()` for readers. +* **Robustness**: Handling `NotEnoughDataException` for asynchronous data sources. diff --git a/example/basic/main.dart b/example/basic/main.dart new file mode 100644 index 0000000..bcadbef --- /dev/null +++ b/example/basic/main.dart @@ -0,0 +1,106 @@ +import 'dart:io'; +import 'dart:typed_data'; + +import 'package:pro_binary/pro_binary.dart'; + +void main() { + // 1. Basic Serialization + final user = User( + id: 101, + name: 'Dart 🚀', + isActive: true, + level: 5, + balance: -150, // Demonstrate signed VarInt (ZigZag) + score: 99.5, + avatar: Uint8List.fromList([0xDE, 0xAD, 0xBE, 0xEF]), + ); + + final writer = BinaryWriter(); + user.encode(writer); + + // takeBytes() returns Uint8List and resets writer for reuse + final bytes = writer.takeBytes(); + _log('Serialization'); + _log(' Encoded: ${bytes.length} bytes'); + // 2. Basic Deserialization + final reader = BinaryReader(bytes); + final decodedUser = User.decode(reader); + + _log('\nDeserialization -'); + _log(' Decoded: $decodedUser'); + _log(' Avatar: ${decodedUser.avatar}'); + + // 3. Pool API (Recommended for high-frequency operations) + // Reuses internal buffers to minimize Garbage Collection pressure. + _log('\nPool API'); + final pooledBytes = BinaryWriterPool.withWriter((w) { + user.encode(w); + return w.toBytes(); // toBytes() returns a zero-copy view + }); + _log(' Pooled serialization done: ${pooledBytes.length} bytes'); + + // 4. Navigation & Peeking + _log('\nNavigation & Peeking'); + final navReader = BinaryReader(pooledBytes); + // Support for index operator [] (peeking at specific position) + _log(' Peek byte at index 0: 0x${navReader[0].toRadixString(16)}'); + navReader.skip(1); // Manually move cursor + _log(' Remaining bytes: ${navReader.availableBytes}'); +} + +/// A domain model demonstrating diverse data types and serialization patterns. +class User { + User({ + required this.id, + required this.name, + required this.isActive, + required this.level, + required this.balance, + required this.score, + required this.avatar, + }); + + /// Pattern: Factory for deserialization. + factory User.decode(BinaryReader r) => User( + id: r.readVarUint(), // Compact variable-length unsigned int + name: r.readVarString(), // Length-prefixed UTF-8 string + isActive: r.readBool(), // 1 byte boolean + level: r.readUint8(), // Fixed-size 1 byte unsigned int + balance: r.readVarInt(), // Signed variable-length int (ZigZag) + score: r.readFloat64(), // 8 byte floating point + avatar: r.readVarBytes(), // Length-prefixed byte array + ); + + final int id; + final String name; + final bool isActive; + final int level; + final int balance; + final double score; + final Uint8List avatar; + + /// Pattern: Instance method for serialization. + void encode(BinaryWriter w) { + w + ..writeVarUint(id) + ..writeVarString(name) + ..writeBool(isActive) + ..writeUint8(level) + ..writeVarInt(balance) + ..writeFloat64(score) + ..writeVarBytes(avatar); + } + + @override + String toString() => + 'User(' + 'id: $id, ' + 'name: "$name", ' + 'active: $isActive, ' + 'lvl: $level, ' + 'bal: $balance, ' + 'score: $score' + ')'; +} + +void _log([Object? object = '']) => stdout.writeln(object); diff --git a/example/file_streaming/main.dart b/example/file_streaming/main.dart new file mode 100644 index 0000000..9494f8f --- /dev/null +++ b/example/file_streaming/main.dart @@ -0,0 +1,147 @@ +import 'dart:io'; +import 'dart:math'; + +import 'package:pro_binary/pro_binary.dart'; + +void main() async { + const fileName = 'market_history.bin'; + const totalTicks = 500000; + + _log('- File Streaming Example: Market Tick Data -'); + + // 1. Generation Phase + _log('Generating $totalTicks ticks into "$fileName"...'); + + final file = File(fileName); + final ios = file.openWrite(); + + final random = Random(); + var lastPrice = 50000.0; + + final writeWatch = Stopwatch()..start(); + + final writer = BinaryWriter(initialBufferSize: 65536); + + for (var i = 0; i < totalTicks; i++) { + // Simulate price movement + lastPrice += (random.nextDouble() - 0.5) * 10; + + // final writer = BinaryWriterPool.acquire(); + TradeTick( + timestamp: DateTime.now().millisecondsSinceEpoch, + price: lastPrice, + volume: random.nextInt(100) + 1, + isBuy: random.nextBool(), + ).encode(writer); + + if (writer.bytesWritten >= 64000) { + ios.add(writer.toBytes()); + writer.seek(0); + } + } + + if (writer.bytesWritten > 0) { + ios.add(writer.toBytes()); + writer.seek(0); + } + + writeWatch.stop(); + + await ios.close(); + + _log( + 'File generated. Size: ' + '${(file.lengthSync() / 1024 / 1024).toStringAsFixed(2)} MB, ' + 'time: ${writeWatch.elapsedMilliseconds} ms\n', + ); + + // 2. Parsing Phase + _log('Reading and parsing file incrementally via File.openRead()...'); + + var tickCount = 0; + var totalVolume = 0; + var maxPrice = 0.0; + + final readWatch = Stopwatch()..start(); + + // Create the stream from file and pipe it through our transformer + final tickStream = file.openRead().transform(TickTransformer()); + + await for (final tick in tickStream) { + tickCount++; + totalVolume += tick.volume; + if (tick.price > maxPrice) { + maxPrice = tick.price; + } + + if (tickCount % 50000 == 0) { + _log(' Processed $tickCount ticks...'); + } + } + + readWatch.stop(); + + _log('\n✅ Parsing complete!'); + _log('Total Ticks: $tickCount'); + _log('Total Volume: $totalVolume'); + _log('Max Price: \$${maxPrice.toStringAsFixed(2)}'); + _log('Read time taken: ${readWatch.elapsedMilliseconds}ms'); + + // 3. Cleanup + if (file.existsSync()) { + await file.delete(); + _log('\nTemporary file deleted.'); + } +} + +/// Represents a single trade tick. +class TradeTick { + TradeTick({ + required this.timestamp, + required this.price, + required this.volume, + required this.isBuy, + }); + + factory TradeTick.decode(StreamBinaryReader r) => TradeTick( + timestamp: r.readUint64(), + price: r.readFloat64(), + volume: r.readUint32(), + isBuy: r.readBool(), + ); + + final int timestamp; + final double price; + final int volume; + final bool isBuy; + + void encode(BinaryWriter w) { + w + ..writeUint64(timestamp) + ..writeFloat64(price) + ..writeUint32(volume) + ..writeBool(isBuy); + } +} + +/// Transformer that parses [TradeTick]s from a stream. +/// Since the file is just a sequence of fixed-size records, +/// we can simply try to decode. +class TickTransformer extends BinaryStreamTransformer { + @override + TradeTick? parse(StreamBinaryReader reader) { + try { + // TradeTick is exactly 8 + 8 + 4 + 1 = 21 bytes. + // We can either check length explicitly or just try to decode. + if (!reader.hasBytes(21)) { + return null; + } + + return TradeTick.decode(reader); + } on NotEnoughDataException { + return null; + } + } +} + +void _log([Object? object = '']) => stdout.writeln(object); diff --git a/example/main.dart b/example/main.dart deleted file mode 100644 index 6811c77..0000000 --- a/example/main.dart +++ /dev/null @@ -1,182 +0,0 @@ -// Example demonstrating best practices for using `pro_binary` in Dart. -// ignore_for_file: unreachable_from_main -import 'dart:io'; -import 'dart:typed_data'; - -import 'package:pro_binary/pro_binary.dart'; - -void main() { - // 1. Pool API — recommended for high-frequency writes - log('1. Pool API'); - - final bytes = BinaryWriterPool.withWriter((writer) { - User(id: 101, name: 'Dart 🚀', isActive: true, score: 99.5).encode(writer); - return writer.toBytes(); - }); - - log('Serialized: ${bytes.length} bytes'); - - // 2. Deserialization with navigation - log('\n2. Navigation'); - final reader = BinaryReader(bytes); - - // Peek without consuming - log('First byte: 0x${reader[0].toRadixString(16).padLeft(2, '0')}'); - log('Peek 4 bytes: ${reader.peekBytes(4)}'); - - // Read and navigate - final user = User.decode(reader); - log('Decoded: $user'); - - // Rebind reader to new data (reuse without allocation) - final moreData = Uint8List.fromList([1, 0, 0, 0, 42]); - reader.rebind(moreData); - log('Rebound, read: ${reader.readUint8()}'); - - // 3. Writer reuse with reset - log('\n3. Writer Reuse'); - final writer = BinaryWriter() - ..writeVarUint(1) - ..writeVarString('first'); - final first = writer.takeBytes(); // Resets writer - log('First batch: ${first.length} bytes'); - - writer - ..writeVarUint(2) - ..writeVarString('second'); - final second = writer.takeBytes(); - log('Second batch: ${second.length} bytes'); - - // 4. Signed VarInt (ZigZag) — efficient for deltas - log('\n4. Signed VarInt (ZigZag)'); - final deltaWriter = BinaryWriter(); - for (final delta in [0, -1, 1, -42, 42, -1000, 1000]) { - deltaWriter.writeVarInt(delta); - } - final deltaBytes = deltaWriter.takeBytes(); - log('Encoded 7 deltas in ${deltaBytes.length} bytes'); - - final deltaReader = BinaryReader(deltaBytes); - log('Decoded: ${List.generate(7, (_) => deltaReader.readVarInt())}'); - - // 5. getUtf8Length utility - log('\n5. UTF-8 Length'); - const ascii = 'Hello'; - const unicode = 'Hello 世界 🌍'; - log('"$ascii" -> ${getUtf8Length(ascii)} bytes'); - log('"$unicode" -> ${getUtf8Length(unicode)} bytes'); - - // 6. Pool statistics - log('\n6. Pool Stats'); - final stats = BinaryWriterPool.stats; - log( - 'Pooled: ${stats.pooled}, Hits: ${stats.acquireHit}, ' - 'Misses: ${stats.acquireMiss}', - ); - log('Hit rate: ${(stats.hitRate * 100).toStringAsFixed(1)}%'); - - // 7. Stream parsing (requires actual Stream>) - log('\n7. Stream Parsing'); - // In real usage: - // stream.transform(MessageParser()).listen((msg) => print(msg)); - log('Use: stream.transform(MessageParser()).listen(...)'); - - // 8. Concise callable syntax - log('\n8. Callable Syntax'); - final cWriter = BinaryWriter(); - cWriter([0xAA, 0xBB, 0xCC, 0xDD]); // writeBytes shorthand - final cBytes = cWriter.takeBytes(); - - final cReader = BinaryReader(cBytes); - log('Callable read 2 bytes: ${cReader(2)}'); // readBytes shorthand - - // 9. fromList convenience - log('\n9. List Support'); - final listReader = BinaryReader.fromList([0x01, 0x02, 0x03, 0x04]); - log('From List: ${listReader.readBytes(4)}'); - - // 10. takeBytes vs toBytes - log('\n10. takeBytes vs toBytes'); - final w1 = BinaryWriter()..writeUint32(42); - final view = w1.toBytes(); // View, writer keeps state - w1.writeUint32(100); - log('toBytes() snapshot: ${view.length} bytes'); - log('After more writes: ${w1.takeBytes().length} bytes (writer reset)'); - - final w2 = BinaryWriter()..writeUint32(42); - final owned = w2.takeBytes(); // Resets writer - log('takeBytes() owns buffer: ${owned.length} bytes'); - - log('\nAll examples completed successfully!'); -} - -void log([Object? object = '']) => stdout.writeln(object); - -/// A simple domain model to demonstrate serialization best practices. -class User { - User({ - required this.id, - required this.name, - required this.isActive, - required this.score, - }); - - /// Recommended pattern: Factory for deserialization. - factory User.decode(BinaryReader r) => User( - id: r.readVarUint(), - name: r.readVarString(), - isActive: r.readBool(), - score: r.readFloat64(), - ); - - final int id; - final String name; - final bool isActive; - final double score; - - /// Recommended pattern: Instance method for serialization. - void encode(BinaryWriter w) { - w - ..writeVarUint(id) - ..writeVarString(name) - ..writeBool(isActive) - ..writeFloat64(score); - } - - @override - String toString() => - 'User(id: $id, name: "$name", active: $isActive, score: $score)'; -} - -/// Message for streaming example. -class Message { - Message({required this.type, required this.payload}); - - factory Message.decode(StreamBinaryReader r) => Message( - type: r.readUint8(), - payload: r.readVarString(), - ); - - final int type; - final String payload; - - void encode(BinaryWriter w) { - w - ..writeUint8(type) - ..writeVarString(payload); - } - - @override - String toString() => 'Message(type: $type, payload: "$payload")'; -} - -/// Stream parser for [Message]. -class MessageParser extends BinaryStreamTransformer { - @override - Message? parse(StreamBinaryReader reader) { - if (!reader.hasBytes(1)) { - return null; - } - return Message.decode(reader); - } -} diff --git a/example/network_streaming/main.dart b/example/network_streaming/main.dart new file mode 100644 index 0000000..af0c636 --- /dev/null +++ b/example/network_streaming/main.dart @@ -0,0 +1,81 @@ +import 'dart:async'; +import 'dart:io'; +import 'dart:typed_data'; + +import 'package:pro_binary/pro_binary.dart'; + +import 'models.dart'; +import 'transformer.dart'; + +void main() async { + _log('Advanced Streaming Example'); + _log('Simulating a fragmented network stream...\n'); + + // 1. Prepare some test data + final packets = [ + TelemetryPacket( + packetId: 1, + readings: [ + SensorData(id: 'temp_1', value: 24.5, timestamp: 1622548800000), + SensorData(id: 'hum_1', value: 45.2, timestamp: 1622548800000), + ], + ), + TelemetryPacket( + packetId: 2, + readings: [ + SensorData(id: 'press_1', value: 1013.25, timestamp: 1622548805000), + ], + ), + ]; + + // 2. Encode packets into a single raw byte buffer with framing + final writer = BinaryWriter(); + for (final packet in packets) { + // Write framing: [Magic] [Length] + final payloadWriter = BinaryWriter(); + packet.encode(payloadWriter); + final payload = payloadWriter.takeBytes(); + + writer + ..writeUint8(TelemetryTransformer.magicByte) + ..writeVarBytes(payload); + } + + final allBytes = writer.takeBytes(); + + // 3. Create a stream and apply our TelemetryTransformer + final controller = StreamController>(); + final telemetryStream = controller.stream.transform(TelemetryTransformer()); + + // Listen for parsed packets + final subscription = telemetryStream.listen((packet) { + _log('✅ Received: $packet'); + for (final reading in packet.readings) { + _log(' -> $reading'); + } + }); + + // 4. Simulate network fragmentation (sending 7 bytes at a time) + _log('Streaming ${allBytes.length} bytes in 7-byte chunks...'); + const chunkSize = 7; + for (var i = 0; i < allBytes.length; i += chunkSize) { + final end = (i + chunkSize < allBytes.length) + ? i + chunkSize + : allBytes.length; + final chunk = allBytes.sublist(i, end); + + _log(' [$i]Sending chunk: ${chunk.length} bytes'); + controller.add(Uint8List.fromList(chunk)); + + // Small delay to simulate network latency + await Future.delayed(const Duration(milliseconds: 50)); + } + + await controller.close(); + await subscription.asFuture(); + await subscription.cancel(); + + _log('\nStream closed. All packets reconstructed successfully.'); +} + +void _log([Object? object = '']) => stdout.writeln(object); diff --git a/example/network_streaming/models.dart b/example/network_streaming/models.dart new file mode 100644 index 0000000..4fdc77c --- /dev/null +++ b/example/network_streaming/models.dart @@ -0,0 +1,79 @@ +import 'package:pro_binary/pro_binary.dart'; + +/// Represents data from a single sensor. +class SensorData { + SensorData({ + required this.id, + required this.value, + required this.timestamp, + }); + + /// Decodes [SensorData] from a [BinaryReader]. + factory SensorData.decode(BinaryReader r) => SensorData( + id: r.readVarString(), + value: r.readFloat64(), + timestamp: r.readUint64(), + ); + + /// Decodes [SensorData] from a [StreamBinaryReader]. + factory SensorData.decodeStream(StreamBinaryReader r) => SensorData( + id: r.readVarString(), + value: r.readFloat64(), + timestamp: r.readUint64(), + ); + + final String id; + final double value; + final int timestamp; + + /// Encodes [SensorData] using a standard [BinaryWriter]. + void encode(BinaryWriter w) { + w + ..writeVarString(id) + ..writeFloat64(value) + ..writeUint64(timestamp); + } + + @override + String toString() => 'Sensor(id: $id, val: ${value.toStringAsFixed(2)})'; +} + +/// A telemetry packet containing multiple sensor readings. +class TelemetryPacket { + TelemetryPacket({required this.packetId, required this.readings}); + + /// Decodes a full packet from a [BinaryReader]. + factory TelemetryPacket.decode(BinaryReader r) { + final id = r.readUint32(); + final count = r.readVarUint(); + final readings = List.generate(count, (_) => SensorData.decode(r)); + + return TelemetryPacket(packetId: id, readings: readings); + } + + /// Decodes a full packet from a [StreamBinaryReader]. + factory TelemetryPacket.decodeStream(StreamBinaryReader r) { + final id = r.readUint32(); + final count = r.readVarUint(); + final readings = List.generate(count, (_) => SensorData.decodeStream(r)); + + return TelemetryPacket(packetId: id, readings: readings); + } + + final int packetId; + final List readings; + + /// Encodes the packet. + void encode(BinaryWriter w) { + w + ..writeUint32(packetId) + ..writeVarUint(readings.length); + + for (final reading in readings) { + reading.encode(w); + } + } + + @override + String toString() => 'Packet #$packetId [${readings.length} readings]'; +} diff --git a/example/network_streaming/transformer.dart b/example/network_streaming/transformer.dart new file mode 100644 index 0000000..3471d37 --- /dev/null +++ b/example/network_streaming/transformer.dart @@ -0,0 +1,45 @@ +import 'package:pro_binary/pro_binary.dart'; +import 'models.dart'; + +/// A transformer that parses [TelemetryPacket]s from a raw byte stream. +/// +/// It uses a "Length-Prefixed" protocol format: +/// [1 byte: Magic (0xAA)] [VarUint: Length of Payload] [Payload: Packet] +/// +/// Thanks to [BinaryStreamTransformer] and its transactional model, +/// we don't need to manually buffer data or handle partial reads. +class TelemetryTransformer extends BinaryStreamTransformer { + static const magicByte = 0xAA; + + @override + TelemetryPacket? parse(StreamBinaryReader reader) { + // 1. Sync check: Look for the magic byte + // If not found, skip until we find it or run out of data. + while (reader.availableBytes > 0) { + if (reader.readUint8() == magicByte) { + break; + } + } + + // If we finished the loop without finding magicByte, it means we don't + // have enough data yet to even start a packet header. + if (reader.availableBytes == 0) { + return null; + } + + // 2. Transactional block: + // We use readVarBytes() which automatically: + // - Reads the VarUint length + // - Reads the payload bytes + // - Throws NotEnoughDataException if any part is missing + // + // BinaryStreamTransformer will catch the exception and perform + // an automatic rollback to the state BEFORE this parse() call. + final payload = reader.readVarBytes(); + + // 3. Success: All data is present. + // Since we now have a contiguous Uint8List payload, we can use + // the faster BinaryReader for the final decoding step. + return TelemetryPacket.decode(BinaryReader(payload)); + } +} diff --git a/lib/pro_binary.dart b/lib/pro_binary.dart index de5dc6f..d482e03 100644 --- a/lib/pro_binary.dart +++ b/lib/pro_binary.dart @@ -3,5 +3,8 @@ library; export 'src/binary_reader.dart'; export 'src/binary_writer.dart'; +export 'src/constants.dart'; export 'src/stream/binary_stream_transformer.dart'; export 'src/stream/stream_binary_reader.dart'; +export 'src/stream/transactional_reader.dart'; +export 'src/stream/transactional_stream_transformer.dart'; diff --git a/lib/src/binary_reader.dart b/lib/src/binary_reader.dart index 869a51b..ea214de 100644 --- a/lib/src/binary_reader.dart +++ b/lib/src/binary_reader.dart @@ -1,6 +1,8 @@ import 'dart:convert'; import 'dart:typed_data'; +import 'constants.dart'; + /// A high-performance binary reader for decoding data from a byte buffer. /// /// Provides methods for reading various data types including: @@ -529,6 +531,41 @@ extension type BinaryReader._(_ReaderState _rs) { return readString(length, allowMalformed: allowMalformed); } + /// Reads a UTF-8 encoded string prefixed with a fixed-width length. + /// + /// The length prefix size is determined by [lengthEncoding]. + /// + /// [lengthEncoding] specifies the size of the length prefix (defaults to + /// [LengthEncoding.u8]). + /// [allowMalformed] if true, malformed UTF-8 sequences will be replaced + /// with U+FFFD. + /// + /// This is the counterpart to `BinaryWriter.writeStringFixed`. + /// + /// Example: + /// ```dart + /// final text = reader.readStringFixed(lengthEncoding: LengthEncoding.u16); + /// ``` + @pragma('vm:prefer-inline') + @pragma('dart2js:tryInline') + String readStringFixed({ + LengthEncoding lengthEncoding = .u8, + bool allowMalformed = false, + }) { + final length = _readLength(lengthEncoding); + + return readString(length, allowMalformed: allowMalformed); + } + + @pragma('vm:prefer-inline') + @pragma('dart2js:tryInline') + int _readLength(LengthEncoding encoding) => switch (encoding) { + .u8 => readUint8(), + .u16 => readUint16(), + .u32 => readUint32(), + .u64 => readUint64(), + }; + /// Reads a boolean value (1 byte). /// /// A byte value of 0 is interpreted as `false`, any non-zero value as `true`. diff --git a/lib/src/binary_writer.dart b/lib/src/binary_writer.dart index c3aa517..00f24bb 100644 --- a/lib/src/binary_writer.dart +++ b/lib/src/binary_writer.dart @@ -1,8 +1,6 @@ import 'dart:typed_data'; -// See https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Number/MAX_SAFE_INTEGER -// for explanation of max safe integer in JavaScript. -import 'constants_native.dart' if (dart.library.js_util) 'constants_web.dart'; +import 'constants.dart'; part 'binary_writer_pool.dart'; part 'string_utils.dart'; @@ -79,6 +77,7 @@ extension type BinaryWriter._(_WriterState _ws) { _ws.ensureOneByte(); _ws.list[offset++] = value; _ws.offset = offset; + return; } @@ -94,6 +93,7 @@ extension type BinaryWriter._(_WriterState _ws) { if (v < 0x80) { list[offset++] = v; _ws.offset = offset; + return; } @@ -105,6 +105,7 @@ extension type BinaryWriter._(_WriterState _ws) { if (v < 0x80) { list[offset++] = v; _ws.offset = offset; + return; } @@ -116,6 +117,7 @@ extension type BinaryWriter._(_WriterState _ws) { if (v < 0x80) { list[offset++] = v; _ws.offset = offset; + return; } @@ -442,13 +444,17 @@ extension type BinaryWriter._(_WriterState _ws) { /// /// Example: /// ```dart - /// // Length-prefixed string (recommended for most protocols) + /// // 1. Easy way (highly optimized) + /// writer.writeVarString('Hello, 世界! 🌍'); + /// + /// // 2. Manual length-prefixed string (if data is already encoded) /// final text = 'Hello, 世界! 🌍'; /// final utf8Bytes = utf8.encode(text); /// writer.writeVarUint(utf8Bytes.length); // Write byte length /// writer.writeBytes(utf8Bytes); // Write pre-encoded string data - /// // Or for simple fixed-length strings: - /// writer.writeString('MAGIC'); // No length prefix needed + /// + /// // 3. Fixed-length strings (no prefix) + /// writer.writeString('MAGIC'); /// ``` /// /// **Performance:** Highly optimized for ASCII-heavy strings. @@ -589,56 +595,69 @@ extension type BinaryWriter._(_WriterState _ws) { final len = value.length; if (len == 0) { writeVarUint(0); - return; } - // Step 1: Optimistic estimation of VarInt size based on string length. - // Most strings are ASCII, where byte length == character length. - final estimatedVarIntSize = _varIntSize(len); + // 1. Optimistically estimate the VarInt size based on string character + // length. For pure ASCII strings, byte length matches character length + // exactly. + final estimatedSize = _varIntSize(len); - // Ensure enough space for the worst-case scenario (3 bytes per UTF-16 unit) - _ws.ensureSize(estimatedVarIntSize + len * 3); + // Cache the initial offset locally to avoid redundant heap lookups. final startOffset = _ws.offset; - // Step 2: Skip space for the estimated VarInt length - _ws.offset += estimatedVarIntSize; + // Ensure enough space for the prefix and the worst-case UTF-8 scenario + // (3 bytes per unit). + _ws + ..ensureSize(estimatedSize + len * 3) + // 2. Reserve space for the estimated length prefix using a fast direct + // assignment. + ..offset = startOffset + estimatedSize; - // Step 3: Write the actual string data + // 3. Write the actual string data directly into the buffer. writeString(value, allowMalformed: allowMalformed); - final byteLength = _ws.offset - (startOffset + estimatedVarIntSize); + // Cache the offset immediately after writing to determine the exact byte + // length. + var currentOffset = _ws.offset; + final byteLength = currentOffset - (startOffset + estimatedSize); - // Step 4: Check if our estimate was correct for the actual byte length - final actualVarIntSize = _varIntSize(byteLength); + // 4. Determine the actual VarInt size required for the encoded byte length. + final actualSize = _varIntSize(byteLength); - // Step 5: If the estimate was wrong, shift the string data - if (actualVarIntSize != estimatedVarIntSize) { - final shift = actualVarIntSize - estimatedVarIntSize; + // 5. If the optimistic estimate was wrong (e.g., due to multi-byte UTF-8 + // characters), shift the written string data to accommodate the actual + //VarInt size. + if (actualSize != estimatedSize) { + final shift = actualSize - estimatedSize; if (shift > 0) { _ws.ensureSize(shift); } - // Fast native memory shift using setRange (memmove) + // Perform a fast native memory shift (memmove) using setRange. _ws.list.setRange( - startOffset + actualVarIntSize, - _ws.offset + shift, + startOffset + actualSize, + currentOffset + shift, _ws.list, - startOffset + estimatedVarIntSize, + startOffset + estimatedSize, ); - _ws.offset += shift; + + // Adjust the local tracker instead of modifying the heap property + // repeatedly. + currentOffset += shift; } - // Step 6: Backtrack and write the actual VarInt length - final finalOffset = _ws.offset; - seek(startOffset); + // 6. Backtrack to the start offset and write the authentic VarInt length + // prefix. + _ws.offset = startOffset; writeVarUint(byteLength); - _ws.offset = finalOffset; + + // 7. Advance the buffer's final offset to the absolute end of the payload. + _ws.offset = currentOffset; } - /// Returns the number of bytes needed to encode [value] as a VarInt. @pragma('vm:prefer-inline') - @pragma('dart2js:tryInline') + @pragma('vm:prefer-inline') int _varIntSize(int value) => switch (value) { < 0x80 => 1, < 0x4000 => 2, @@ -647,6 +666,68 @@ extension type BinaryWriter._(_WriterState _ws) { _ => 5, }; + /// Writes a UTF-8 encoded string prefixed with a fixed-width length. + /// + /// The length prefix size is determined by [lengthEncoding]. + /// + /// [value] is the string to write. + /// [lengthEncoding] specifies the size of the length prefix (defaults to + /// [LengthEncoding.u8]). + /// [allowMalformed] if true, malformed UTF-8 sequences will be replaced + /// with U+FFFD. + /// + /// This is useful when you want to avoid VarInt encoding for lengths or + /// when the format requires a specific fixed-width length prefix. + /// + /// Example: + /// ```dart + /// writer.writeStringFixed('Hello', lengthEncoding: LengthEncoding.u16); + /// ``` + @pragma('vm:prefer-inline') + @pragma('vm:prefer-inline') + void writeStringFixed( + String value, { + LengthEncoding lengthEncoding = .u8, + bool allowMalformed = true, + }) { + final len = value.length; + if (len == 0) { + _writeLength(0, lengthEncoding); + return; + } + + final sizeInBytes = lengthEncoding.sizeInBytes; + _ws.ensureSize(sizeInBytes + len * 3); + + final startOffset = _ws.offset; + + _ws.offset = startOffset + sizeInBytes; + + writeString(value, allowMalformed: allowMalformed); + + final finalOffset = _ws.offset; + final byteLength = finalOffset - (startOffset + sizeInBytes); + + _ws.offset = startOffset; + _writeLength(byteLength, lengthEncoding); + _ws.offset = finalOffset; + } + + @pragma('vm:prefer-inline') + @pragma('dart2js:tryInline') + void _writeLength(int length, LengthEncoding encoding) { + switch (encoding) { + case .u8: + writeUint8(length); + case .u16: + writeUint16(length); + case .u32: + writeUint32(length); + case .u64: + writeUint64(length); + } + } + /// Writes a boolean value as a single byte. /// /// `true` is written as `1` and `false` as `0`. diff --git a/lib/src/constants.dart b/lib/src/constants.dart new file mode 100644 index 0000000..71a1784 --- /dev/null +++ b/lib/src/constants.dart @@ -0,0 +1,14 @@ +// See https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Number/MAX_SAFE_INTEGER +// for explanation of max safe integer in JavaScript. +export 'constants/native.dart' if (dart.library.js_util) 'constants/web.dart'; + +enum LengthEncoding { + u8(1), + u16(2), + u32(4), + u64(8); + + const LengthEncoding(this.sizeInBytes); + + final int sizeInBytes; +} diff --git a/lib/src/constants_native.dart b/lib/src/constants/native.dart similarity index 100% rename from lib/src/constants_native.dart rename to lib/src/constants/native.dart diff --git a/lib/src/constants_web.dart b/lib/src/constants/web.dart similarity index 100% rename from lib/src/constants_web.dart rename to lib/src/constants/web.dart diff --git a/lib/src/stream/binary_stream_transformer.dart b/lib/src/stream/binary_stream_transformer.dart index 12205cc..833e3f7 100644 --- a/lib/src/stream/binary_stream_transformer.dart +++ b/lib/src/stream/binary_stream_transformer.dart @@ -1,11 +1,13 @@ import 'dart:async'; import 'stream_binary_reader.dart'; +import 'transactional_reader.dart'; +import 'transactional_stream_transformer.dart'; /// A [StreamTransformer] that simplifies parsing binary messages from a stream. /// /// It manages an internal [StreamBinaryReader] and handles -/// [NotEnoughDataException] by automatically rolling back the reader state +/// [NotEnoughDataException] by automatically rolling back the reader state /// and waiting for more data /// from the stream. /// @@ -27,50 +29,12 @@ import 'stream_binary_reader.dart'; /// stream.transform(MyMessageTransformer()).listen((msg) => print(msg.name)); /// ``` abstract class BinaryStreamTransformer - extends StreamTransformerBase, T> { + extends TransactionalStreamTransformer, StreamBinaryReader> { /// Creates a new [BinaryStreamTransformer]. const BinaryStreamTransformer(); @override - Stream bind(Stream> stream) async* { - final reader = StreamBinaryReader(); - - await for (final chunk in stream) { - reader.addChunk(chunk); - yield* _parseLoop(reader); - } - - // Final attempt to parse remaining data after stream is closed - yield* _parseLoop(reader); - } - - Stream _parseLoop(StreamBinaryReader reader) async* { - while (reader.availableBytes > 0) { - reader.bookmark(); - final bytesBefore = reader.availableBytes; - try { - final result = parse(reader); - if (result == null) { - reader.rollback(); - break; // Wait for more data - } else { - reader.commit(); - yield result; - if (reader.availableBytes == bytesBefore) { - // parse() returned a result without consuming any data — - // break to avoid an infinite loop - break; - } - } - } on NotEnoughDataException { - reader.rollback(); - break; // Wait for more data - } catch (e) { - reader.rollback(); - rethrow; - } - } - } + StreamBinaryReader createReader() => StreamBinaryReader(); /// Parses a single message from the [reader]. /// @@ -80,5 +44,6 @@ abstract class BinaryStreamTransformer /// /// **Recommendation:** prefer throwing [NotEnoughDataException] for /// explicit control, or return `null` for simple "not yet ready" cases. + @override T? parse(StreamBinaryReader reader); } diff --git a/lib/src/stream/stream_binary_reader.dart b/lib/src/stream/stream_binary_reader.dart index 3c4d87f..c2e64b4 100644 --- a/lib/src/stream/stream_binary_reader.dart +++ b/lib/src/stream/stream_binary_reader.dart @@ -1,26 +1,11 @@ -import 'dart:collection'; import 'dart:convert'; import 'dart:typed_data'; -import '../binary_reader.dart'; - -/// Exception thrown when [StreamBinaryReader] does not have enough data -/// to complete a read operation. -class NotEnoughDataException implements Exception { - /// Creates a new [NotEnoughDataException]. - const NotEnoughDataException(this.required, this.available); - - /// The number of bytes required to complete the operation. - final int required; +import 'package:meta/meta.dart'; - /// The number of bytes currently available in the reader. - final int available; - - @override - String toString() => - 'NotEnoughDataException: required $required bytes, but only ' - '$available available.'; -} +import '../binary_reader.dart'; +import '../constants.dart'; +import 'transactional_reader.dart'; /// A reader designed for asynchronous streaming data that spans multiple /// chunks. @@ -32,30 +17,25 @@ class NotEnoughDataException implements Exception { /// /// It supports a transactional model ([bookmark], [rollback], [commit]) which /// is essential for stream parsing when a message might be incomplete. -extension type StreamBinaryReader._(_StreamReaderState _s) { +extension type StreamBinaryReader._(_StreamReaderState _s) + implements TransactionalReader> { /// Creates a new [StreamBinaryReader]. StreamBinaryReader() : this._(_StreamReaderState()); /// The total number of unread bytes currently available across all chunks. + @redeclare int get availableBytes => _s.availableBytes; /// Adds a new chunk of data to the reader. - void addChunk(List bytes) { - final chunk = bytes is Uint8List ? bytes : Uint8List.fromList(bytes); - if (chunk.isEmpty) { - return; - } - - _s.chunks.add(chunk); - _s.availableBytes += chunk.length; - - if (_s.currentReader == null) { - final relativeIndex = _s.currentAbsoluteIndex - _s.queueStartIndex; - if (relativeIndex >= 0 && relativeIndex < _s.chunks.length) { - _s.currentReader = BinaryReader(_s.chunks.elementAt(relativeIndex)); - } - } - } + /// + /// **Performance Tip:** For maximum performance, it is highly recommended to + /// pass a [Uint8List]. If a standard `List` is provided, it will be + /// copied into a new [Uint8List] internally, which incurs a performance cost. + /// + /// Most streams from `dart:io` (like `File.openRead` or `Socket`) yield + /// [Uint8List] even though they are typed as `Stream>`. + @redeclare + void addChunk(List bytes) => _s.addChunk(bytes); /// Creates a checkpoint of the current reader state. /// @@ -64,96 +44,28 @@ extension type StreamBinaryReader._(_StreamReaderState _s) { /// [rollback] to restore the state and wait for more data. @pragma('vm:prefer-inline') @pragma('dart2js:tryInline') - void bookmark() { - if (_s.bookmarkCount >= _s.bookmarkAbsoluteIndex.length) { - _s._growBookmarks(); - } - - final count = _s.bookmarkCount; - _s.bookmarkAbsoluteIndex[count] = _s.currentAbsoluteIndex; - _s.bookmarkReaderOffset[count] = _s.currentReader?.offset ?? 0; - _s.bookmarkAvailableBytes[count] = _s.availableBytes; - _s.bookmarkCount++; - } + @redeclare + void bookmark() => _s.bookmark(); /// Restores the reader to the state of the last [bookmark]. /// /// This removes the last bookmark from the stack. @pragma('vm:prefer-inline') @pragma('dart2js:tryInline') - void rollback() { - if (_s.bookmarkCount == 0) { - throw StateError('No bookmark to rollback to'); - } - - _s.bookmarkCount--; - final count = _s.bookmarkCount; - final absIndex = _s.bookmarkAbsoluteIndex[count]; - final readerOffset = _s.bookmarkReaderOffset[count]; - final availableBytes = _s.bookmarkAvailableBytes[count]; - - _s.currentAbsoluteIndex = absIndex; - _s.availableBytes = availableBytes; - - if (_s.chunks.isNotEmpty) { - final relativeIndex = absIndex - _s.queueStartIndex; - final targetChunk = _s.chunks.elementAt(relativeIndex); - final cr = _s.currentReader; - if (cr != null) { - cr - ..rebind(targetChunk) - ..seek(readerOffset); - } else { - _s.currentReader = BinaryReader(targetChunk)..seek(readerOffset); - } - } else { - _s.currentReader = null; - } - } + @redeclare + void rollback() => _s.rollback(); /// Removes the last [bookmark] without restoring the state. /// /// Call this when a message has been successfully and fully parsed. @pragma('vm:prefer-inline') @pragma('dart2js:tryInline') - void commit() { - if (_s.bookmarkCount == 0) { - throw StateError('No bookmark to commit'); - } - - _s.bookmarkCount--; - _prune(); - } + @redeclare + void commit() => _s.commit(); @pragma('vm:prefer-inline') @pragma('dart2js:tryInline') - void _advanceChunk() { - _s.currentAbsoluteIndex++; - if (_s.bookmarkCount == 0) { - _s.chunks.removeFirst(); - _s.queueStartIndex++; - } - - final relativeIndex = _s.currentAbsoluteIndex - _s.queueStartIndex; - if (relativeIndex < _s.chunks.length) { - _s.currentReader!.rebind(_s.chunks.elementAt(relativeIndex)); - } else { - _s.currentReader = null; - } - } - - @pragma('vm:prefer-inline') - @pragma('dart2js:tryInline') - void _prune() { - final minNeeded = _s.bookmarkCount == 0 - ? _s.currentAbsoluteIndex - : _s.bookmarkAbsoluteIndex[0]; - - while (_s.queueStartIndex < minNeeded) { - _s.chunks.removeFirst(); - _s.queueStartIndex++; - } - } + void _advanceChunk() => _s.advanceChunk(); @pragma('vm:prefer-inline') @pragma('dart2js:tryInline') @@ -533,6 +445,7 @@ extension type StreamBinaryReader._(_StreamReaderState _s) { } final bytes = readBytes(length); + return utf8.decode(bytes, allowMalformed: allowMalformed); } @@ -547,6 +460,29 @@ extension type StreamBinaryReader._(_StreamReaderState _s) { return readString(length, allowMalformed: allowMalformed); } + /// Reads a UTF-8 encoded string prefixed with a fixed-width length. + /// + /// Throws [NotEnoughDataException] if fewer than the required bytes for the + /// length prefix are available. + @pragma('vm:prefer-inline') + @pragma('dart2js:tryInline') + String readStringFixed({ + LengthEncoding lengthEncoding = .u8, + bool allowMalformed = false, + }) { + final length = _readLength(lengthEncoding); + return readString(length, allowMalformed: allowMalformed); + } + + @pragma('vm:prefer-inline') + @pragma('dart2js:tryInline') + int _readLength(LengthEncoding encoding) => switch (encoding) { + .u8 => readUint8(), + .u16 => readUint16(), + .u32 => readUint32(), + .u64 => readUint64(), + }; + /// Advances the read position by the specified number of bytes. /// /// Throws [NotEnoughDataException] if fewer than [length] bytes are @@ -589,44 +525,65 @@ extension type StreamBinaryReader._(_StreamReaderState _s) { } /// Internal state holder for [StreamBinaryReader]. -final class _StreamReaderState { - _StreamReaderState() - : availableBytes = 0, - currentAbsoluteIndex = 0, - queueStartIndex = 0, - chunks = ListQueue(), - bookmarkAbsoluteIndex = Int32List(16), - bookmarkReaderOffset = Int32List(16), - bookmarkAvailableBytes = Int32List(16), - bookmarkCount = 0; - - final ListQueue chunks; - - int currentAbsoluteIndex; - int queueStartIndex; - int availableBytes; +final class _StreamReaderState extends ChunkedTransactionalState + implements TransactionalReader> { + _StreamReaderState() : bookmarkReaderOffset = Int32List(16), super(); + BinaryReader? currentReader; // Zero-allocation bookmarks using parallel arrays - Int32List bookmarkAbsoluteIndex; Int32List bookmarkReaderOffset; - Int32List bookmarkAvailableBytes; - int bookmarkCount; + + @override + void addChunk(List bytes) { + if (bytes.isEmpty) { + return; + } + + final chunk = bytes is Uint8List ? bytes : Uint8List.fromList(bytes); + super.addChunk(chunk); + } + + @override + int getChunkLength(Uint8List chunk) => chunk.length; + + @override + bool get hasCurrentReader => currentReader != null; + + @override + void onBindReader(Uint8List chunk, int offset) { + final cr = currentReader; + if (cr != null) { + cr + ..rebind(chunk) + ..seek(offset); + } else { + currentReader = BinaryReader(chunk)..seek(offset); + } + } + + @override + void onUnbindReader() { + currentReader = null; + } + + @override + void onSaveBookmark(int bookmarkIndex) { + bookmarkReaderOffset[bookmarkIndex] = currentReader?.offset ?? 0; + } + + @override + int onRestoreBookmark(int bookmarkIndex) => + bookmarkReaderOffset[bookmarkIndex]; @pragma('vm:never-inline') - void _growBookmarks() { - final newCapacity = bookmarkAbsoluteIndex.length * 2; - final newAbsIndex = Int32List(newCapacity); - final newOffset = Int32List(newCapacity); - final newAvail = Int32List(newCapacity); - - newAbsIndex.setRange(0, bookmarkCount, bookmarkAbsoluteIndex); - newOffset.setRange(0, bookmarkCount, bookmarkReaderOffset); - newAvail.setRange(0, bookmarkCount, bookmarkAvailableBytes); - - bookmarkAbsoluteIndex = newAbsIndex; - bookmarkReaderOffset = newOffset; - bookmarkAvailableBytes = newAvail; + @override + void growBookmarks() { + super.growBookmarks(); + + final newCapacity = bookmarkAbsoluteIndex.length; + bookmarkReaderOffset = Int32List(newCapacity) + ..setRange(0, bookmarkCount, bookmarkReaderOffset); } } diff --git a/lib/src/stream/transactional_reader.dart b/lib/src/stream/transactional_reader.dart new file mode 100644 index 0000000..bc94f59 --- /dev/null +++ b/lib/src/stream/transactional_reader.dart @@ -0,0 +1,203 @@ +import 'dart:collection'; +import 'dart:typed_data'; + +/// Exception thrown when a [TransactionalReader] does not have enough data +/// to complete a read operation. +class NotEnoughDataException implements Exception { + /// Creates a new [NotEnoughDataException]. + const NotEnoughDataException(this.required, this.available); + + /// The number of bytes/elements required to complete the operation. + final int required; + + /// The number of bytes/elements currently available in the reader. + final int available; + + @override + String toString() => + 'NotEnoughDataException: required $required, but only ' + '$available available.'; +} + +/// A reader that supports a transactional model for streaming data. +abstract interface class TransactionalReader { + /// The total number of unread elements currently available. + int get availableBytes; + + /// Adds a new chunk of data to the reader. + /// + /// **Performance Tip:** If the implementation supports it, passing a platform + /// optimized list type (like `Uint8List` for bytes) can enable zero-copy + /// operations. + void addChunk(TChunk chunk); + + /// Creates a checkpoint of the current reader state. + void bookmark(); + + /// Restores the reader to the state of the last [bookmark]. + void rollback(); + + /// Removes the last [bookmark] without restoring the state. + void commit(); +} + +/// A reusable base class for managing chunked transactional state. +/// +/// It provides the core logic for managing the chunk queue and bookmarks. +/// Subclasses should extend this class, implement the [TransactionalReader] +/// interface, and override the abstract hook methods to customize behavior. +abstract base class ChunkedTransactionalState { + /// Creates a new [ChunkedTransactionalState]. + ChunkedTransactionalState() + : availableBytes = 0, + currentAbsoluteIndex = 0, + queueStartIndex = 0, + chunks = ListQueue(), + bookmarkAbsoluteIndex = Int32List(16), + bookmarkAvailableBytes = Int32List(16), + bookmarkCount = 0; + + /// The queue of chunks currently being processed. + final ListQueue chunks; + + /// The absolute index of the current chunk being read. + int currentAbsoluteIndex; + + /// The absolute index of the first chunk in the queue. + int queueStartIndex; + + /// The total number of unread elements currently available. + int availableBytes; + + /// Parallel array for bookmarking [currentAbsoluteIndex]. + Int32List bookmarkAbsoluteIndex; + + /// Parallel array for bookmarking [availableBytes]. + Int32List bookmarkAvailableBytes; + + /// The current number of active bookmarks. + int bookmarkCount; + + /// Internal implementation of [TransactionalReader.addChunk]. + void addChunk(TChunk chunk) { + chunks.add(chunk); + availableBytes += getChunkLength(chunk); + + if (!hasCurrentReader) { + final relativeIndex = currentAbsoluteIndex - queueStartIndex; + if (relativeIndex >= 0 && relativeIndex < chunks.length) { + onBindReader(chunks.elementAt(relativeIndex), 0); + } + } + } + + /// Internal implementation of [TransactionalReader.bookmark]. + void bookmark() { + if (bookmarkCount >= bookmarkAbsoluteIndex.length) { + growBookmarks(); + } + + final count = bookmarkCount; + bookmarkAbsoluteIndex[count] = currentAbsoluteIndex; + bookmarkAvailableBytes[count] = availableBytes; + onSaveBookmark(count); + bookmarkCount++; + } + + /// Internal implementation of [TransactionalReader.rollback]. + void rollback() { + if (bookmarkCount == 0) { + throw StateError('No bookmark to rollback to'); + } + + bookmarkCount--; + final count = bookmarkCount; + currentAbsoluteIndex = bookmarkAbsoluteIndex[count]; + availableBytes = bookmarkAvailableBytes[count]; + + if (chunks.isNotEmpty) { + final relativeIndex = currentAbsoluteIndex - queueStartIndex; + final chunk = chunks.elementAt(relativeIndex); + onBindReader(chunk, onRestoreBookmark(count)); + } else { + onUnbindReader(); + } + } + + /// Internal implementation of [TransactionalReader.commit]. + void commit() { + if (bookmarkCount == 0) { + throw StateError('No bookmark to commit'); + } + + bookmarkCount--; + prune(); + } + + /// Advances the read position to the next chunk. + void advanceChunk() { + currentAbsoluteIndex++; + if (bookmarkCount == 0) { + chunks.removeFirst(); + queueStartIndex++; + } + + final relativeIndex = currentAbsoluteIndex - queueStartIndex; + if (relativeIndex < chunks.length) { + onBindReader(chunks.elementAt(relativeIndex), 0); + } else { + onUnbindReader(); + } + } + + /// Removes consumed chunks from the queue that are no longer needed for any + /// active bookmarks. + void prune() { + final minNeeded = bookmarkCount == 0 + ? currentAbsoluteIndex + : bookmarkAbsoluteIndex[0]; + + while (queueStartIndex < minNeeded) { + chunks.removeFirst(); + queueStartIndex++; + } + } + + /// Increases the capacity of the bookmark arrays. + /// Subclasses should override this if they have additional bookmark arrays, + /// calling `super.growBookmarks()` first. + void growBookmarks() { + final newCapacity = bookmarkAbsoluteIndex.length * 2; + final newAbsIndex = Int32List(newCapacity); + final newAvail = Int32List(newCapacity); + + newAbsIndex.setRange(0, bookmarkCount, bookmarkAbsoluteIndex); + newAvail.setRange(0, bookmarkCount, bookmarkAvailableBytes); + + bookmarkAbsoluteIndex = newAbsIndex; + bookmarkAvailableBytes = newAvail; + } + + // --- Abstract Hooks --- + + /// Returns the length of the given [chunk]. + int getChunkLength(TChunk chunk); + + /// Returns `true` if a reader is currently bound to a chunk. + bool get hasCurrentReader; + + /// Binds a reader to the given [chunk] at the specified [offset]. + void onBindReader(TChunk chunk, int offset); + + /// Unbinds the current reader. + void onUnbindReader(); + + /// Hook for subclasses to save their specific cursor state for a bookmark. + void onSaveBookmark(int bookmarkIndex); + + /// Hook for subclasses to restore their specific cursor state from a + /// bookmark. + /// Returns the offset within the chunk that should be passed to + /// [onBindReader]. + int onRestoreBookmark(int bookmarkIndex); +} diff --git a/lib/src/stream/transactional_stream_transformer.dart b/lib/src/stream/transactional_stream_transformer.dart new file mode 100644 index 0000000..eea1527 --- /dev/null +++ b/lib/src/stream/transactional_stream_transformer.dart @@ -0,0 +1,69 @@ +import 'dart:async'; +import 'transactional_reader.dart'; + +/// A [StreamTransformer] that simplifies parsing messages from a stream +/// using a transactional reader. +/// +/// It manages an internal [TransactionalReader] and handles +/// [NotEnoughDataException] by automatically rolling back the reader state +/// and waiting for more data from the stream. +abstract class TransactionalStreamTransformer< + TMessage, + TChunk, + TReader extends TransactionalReader +> + extends StreamTransformerBase { + /// Creates a new [TransactionalStreamTransformer]. + const TransactionalStreamTransformer(); + + /// Creates the reader instance to be used for this stream. + TReader createReader(); + + @override + Stream bind(Stream stream) async* { + final reader = createReader(); + + await for (final chunk in stream) { + reader.addChunk(chunk); + yield* _parseLoop(reader); + } + + // Final attempt to parse remaining data after stream is closed + yield* _parseLoop(reader); + } + + Stream _parseLoop(TReader reader) async* { + while (reader.availableBytes > 0) { + reader.bookmark(); + final bytesBefore = reader.availableBytes; + try { + final result = parse(reader); + if (result == null) { + reader.rollback(); + break; // Wait for more data + } else { + reader.commit(); + yield result; + if (reader.availableBytes == bytesBefore) { + // parse() returned a result without consuming any data — + // break to avoid an infinite loop + break; + } + } + } on NotEnoughDataException { + reader.rollback(); + break; // Wait for more data + } catch (e) { + reader.rollback(); + rethrow; + } + } + } + + /// Parses a single message from the [reader]. + /// + /// Return the parsed object, or `null` if there is not enough data. + /// Alternatively, throw [NotEnoughDataException] explicitly. + /// Both approaches trigger automatic rollback and wait for more data. + TMessage? parse(TReader reader); +} diff --git a/test/performance/deserialization_bench.dart b/performance/deserialization_bench.dart similarity index 100% rename from test/performance/deserialization_bench.dart rename to performance/deserialization_bench.dart diff --git a/test/performance/pool_bench.dart b/performance/pool_bench.dart similarity index 100% rename from test/performance/pool_bench.dart rename to performance/pool_bench.dart diff --git a/test/performance/serialization_bench.dart b/performance/serialization_bench.dart similarity index 100% rename from test/performance/serialization_bench.dart rename to performance/serialization_bench.dart diff --git a/test/performance/strings_bench.dart b/performance/strings_bench.dart similarity index 84% rename from test/performance/strings_bench.dart rename to performance/strings_bench.dart index f63e3b1..3cdf9f8 100644 --- a/test/performance/strings_bench.dart +++ b/performance/strings_bench.dart @@ -112,8 +112,34 @@ class StandardDartNaiveBench extends BenchmarkBase { } } +class FixedStringBench extends BenchmarkBase { + FixedStringBench(String name, this.payload, this.encoding) + : super('String [$name] (Fixed ${encoding.name})'); + + final String payload; + final LengthEncoding encoding; + late BinaryWriter writer; + + @override + void setup() { + writer = BinaryWriter(initialBufferSize: payload.length * 3 + 10); + } + + @override + void run() { + writer + ..reset() + ..writeStringFixed(payload, lengthEncoding: encoding); + + if (writer.bytesWritten == 0) { + throw Exception(); + } + } +} + void runComparison(String name, String payload) { OnePassStringBench(name, payload).report(); + FixedStringBench(name, payload, LengthEncoding.u32).report(); TwoPassStringBench(name, payload).report(); StandardDartCorrectBench(name, payload).report(); StandardDartNaiveBench(name, payload).report(); diff --git a/pubspec.yaml b/pubspec.yaml index fec9197..c9a08e5 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,7 +1,7 @@ name: pro_binary description: Efficient binary serialization library for Dart. Encodes and decodes various data types. -version: 4.0.0 +version: 5.0.0 repository: https://github.com/pro100andrey/pro_binary issue_tracker: https://github.com/pro100andrey/pro_binary/issues @@ -26,6 +26,9 @@ topics: environment: sdk: ^3.10.0 +dependencies: + meta: ^1.18.2 + dev_dependencies: benchmark_harness: ^2.4.0 pro_lints: ^6.0.0 diff --git a/test/stream/stream_binary_reader_coverage_test.dart b/test/stream/stream_binary_reader_coverage_test.dart index f76581e..e3d96f3 100644 --- a/test/stream/stream_binary_reader_coverage_test.dart +++ b/test/stream/stream_binary_reader_coverage_test.dart @@ -302,7 +302,7 @@ void main() { (e) => e.toString(), 'toString', allOf( - contains('required 4 bytes'), + contains('required 4'), contains('2 available'), ), ), diff --git a/test/stream/stream_binary_reader_test.dart b/test/stream/stream_binary_reader_test.dart index 37f5375..19874e6 100644 --- a/test/stream/stream_binary_reader_test.dart +++ b/test/stream/stream_binary_reader_test.dart @@ -75,5 +75,20 @@ void main() { expect(reader.readVarString(), equals('Stream')); }); + + test('readStringFixed handles chunk boundary', () { + final writer = BinaryWriter() + ..writeStringFixed('Streaming', lengthEncoding: LengthEncoding.u16); + final bytes = writer.takeBytes(); + + reader + ..addChunk(bytes.sublist(0, 5)) + ..addChunk(bytes.sublist(5)); + + expect( + reader.readStringFixed(lengthEncoding: LengthEncoding.u16), + equals('Streaming'), + ); + }); }); } diff --git a/test/unit/binary_reader_string_test.dart b/test/unit/binary_reader_string_test.dart index 5ca0c38..8067d76 100644 --- a/test/unit/binary_reader_string_test.dart +++ b/test/unit/binary_reader_string_test.dart @@ -49,5 +49,79 @@ void main() { expect(result, isNotEmpty); }); }); + + group('readStringFixed', () { + test('read with LengthEncoding.u8', () { + final buffer = Uint8List.fromList([3, 65, 66, 67]); + final reader = BinaryReader(buffer); + expect( + reader.readStringFixed(), + equals('ABC'), + ); + }); + + test('read with LengthEncoding.u16', () { + final buffer = Uint8List.fromList([0, 3, 65, 66, 67]); + final reader = BinaryReader(buffer); + expect( + reader.readStringFixed(lengthEncoding: .u16), + equals('ABC'), + ); + }); + + test('read empty string with LengthEncoding.u32', () { + final buffer = Uint8List.fromList([0, 0, 0, 0]); + final reader = BinaryReader(buffer); + expect( + reader.readStringFixed(lengthEncoding: LengthEncoding.u32), + equals(''), + ); + }); + + test('read multi-byte string with LengthEncoding.u8', () { + final buffer = Uint8List.fromList([ + 12, + 208, + 159, + 209, + 128, + 208, + 184, + 208, + 178, + 208, + 181, + 209, + 130, + ]); + final reader = BinaryReader(buffer); + expect( + reader.readStringFixed(), + equals('Привет'), + ); + }); + + test('read with LengthEncoding.u64', () { + final buffer = Uint8List.fromList([ + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 4, + 68, + 65, + 82, + 84, + ]); + final reader = BinaryReader(buffer); + expect( + reader.readStringFixed(lengthEncoding: LengthEncoding.u64), + equals('DART'), + ); + }); + }); }); } diff --git a/test/unit/binary_writer_string_test.dart b/test/unit/binary_writer_string_test.dart index 37d95df..25b0a12 100644 --- a/test/unit/binary_writer_string_test.dart +++ b/test/unit/binary_writer_string_test.dart @@ -308,5 +308,37 @@ void main() { expect(bytes, equals([42, 43])); }); }); + + group('writeStringFixed', () { + test('write with LengthEncoding.u8', () { + writer.writeStringFixed('ABC'); + expect(writer.takeBytes(), equals([3, 65, 66, 67])); + }); + + test('write with LengthEncoding.u16', () { + writer.writeStringFixed('ABC', lengthEncoding: LengthEncoding.u16); + expect(writer.takeBytes(), equals([0, 3, 65, 66, 67])); + }); + + test('write empty string with LengthEncoding.u32', () { + writer.writeStringFixed('', lengthEncoding: LengthEncoding.u32); + expect(writer.takeBytes(), equals([0, 0, 0, 0])); + }); + + test('write multi-byte string with LengthEncoding.u8', () { + writer.writeStringFixed('Привет'); + final bytes = writer.takeBytes(); + expect(bytes[0], equals(12)); // 6 characters * 2 bytes + expect(utf8.decode(bytes.sublist(1)), equals('Привет')); + }); + + test('write with LengthEncoding.u64', () { + writer.writeStringFixed('DART', lengthEncoding: LengthEncoding.u64); + final bytes = writer.takeBytes(); + expect(bytes.length, equals(8 + 4)); + expect(bytes.sublist(0, 8), equals([0, 0, 0, 0, 0, 0, 0, 4])); + expect(utf8.decode(bytes.sublist(8)), equals('DART')); + }); + }); }); } From 091bbe8bb635d22b56a38b8dcc861aa0020836a3 Mon Sep 17 00:00:00 2001 From: Andrii Ivanov Date: Thu, 28 May 2026 12:06:20 +0300 Subject: [PATCH 2/4] upd: readme --- README.md | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index f22570e..8f7146a 100644 --- a/README.md +++ b/README.md @@ -168,15 +168,15 @@ Explore the [example](example/) directory for complete, runnable projects: [Full API documentation](https://pub.dev/documentation/pro_binary/latest/pro_binary/) -| Class | Description | -| ----- | ------------ | -| **BinaryWriter** | Encode data: fixed types, VarInt/ZigZag, strings, bytes. Supports `takeBytes()`, `toBytes()`, `reset()`, `seek()`. | -| **BinaryReader** | Decode data: all fixed/variable types, navigation (`skip`, `seek`, `rewind`, `peek`), `rebind()` for reuse. | -| **StreamBinaryReader** | Async streaming: chunk-based reading with `bookmark`/`rollback`/`commit` transactional model. | -| **BinaryStreamTransformer\** | Stream parser: extend and implement `parse()` to process binary streams. | -| **TransactionalStreamTransformer\** | Generic stream transformer: extend for custom chunk types and readers. | -| **BinaryWriterPool** | Object pool: `acquire()`/`release()` or `withWriter()` for high-frequency writes. | -| **getUtf8Length** | Utility: calculate UTF-8 byte length without encoding. | +| Component | Description | +| --------- | ----------- | +| **BinaryWriter** | Fast encoder for fixed-width, VarInt/ZigZag, and one-pass strings. Features automatic expansion and pooling. | +| **BinaryReader** | Zero-copy decoder with advanced navigation (`seek`, `rewind`, `peek`). Optimized for performance. | +| **StreamBinaryReader** | Handles async data chunks seamlessly with a transactional `bookmark`/`rollback` model for partial data. | +| **BinaryStreamTransformer** | The easiest way to parse a `Stream>` into a stream of typed messages or objects. | +| **BinaryWriterPool** | Object pool for `BinaryWriter` to eliminate GC pressure during high-frequency write operations. | +| **getUtf8Length** | High-speed utility to calculate UTF-8 byte length without encoding (O(n) but heavily optimized). | +| **TransactionalReader** | Base interface for custom transactional readers. Used internally by `StreamBinaryReader`. | ## Performance From 1d07d983a18ba7de18ff8535eb3fdf723e42a1d2 Mon Sep 17 00:00:00 2001 From: Andrii Ivanov Date: Thu, 28 May 2026 12:20:10 +0300 Subject: [PATCH 3/4] update: README --- README.md | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 8f7146a..f171d18 100644 --- a/README.md +++ b/README.md @@ -4,17 +4,17 @@ [![Tests](https://github.com/pro100andrey/pro_binary/workflows/Tests/badge.svg)](https://github.com/pro100andrey/pro_binary/actions) [![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT) -**High-performance binary serialization for Dart.** Optimized for speed, zero-copy reads, and Protocol Buffers-compatible encoding. +**High-performance binary serialization and deserialization for Dart.** Optimized for high-frequency network protocols, real-time streaming, and fast local storage. Features zero-copy reads, object pooling, and transactional stream parsing. ## Key Features -* **Zero-Copy Reads**: Operations return `Uint8List` views without allocation. -* **One-Pass Strings**: Optimized `writeVarString` with optimistic shift (30% faster). -* **Smart Buffering**: Exponential growth (×1.5) and object pooling. -* **Compact Encoding**: VarInt & ZigZag support -* **Stream Parsing**: `StreamBinaryReader` and `BinaryStreamTransformer` for async data. Extensible via `TransactionalReader` and `TransactionalStreamTransformer`. -* **Universal**: Supports Native & Web (WASM/JS) with consistent API. -* **Modern API**: Leverages Dart Extension Types for zero-overhead abstractions. +* **Extreme Performance:** Built from the ground up for speed. Leverages Dart Extension Types for zero-overhead abstractions and direct memory manipulation. +* **Zero-Copy Reads:** Deserialization operations return `Uint8List` views instead of allocating new memory arrays, significantly reducing GC (Garbage Collector) pauses. +* **One-Pass String Encoding:** Features a highly optimized `writeVarString` with optimistic size estimation and native memory shifting. Up to **~30% faster** than standard `utf8.encode`. +* **Zero-Allocation Object Pooling:** Includes built-in `BinaryWriterPool` to reuse writer instances. Perfect for high-frequency network packets (e.g., game servers, WebSockets). +* **Compact Encoding:** Native support for VarInt and ZigZag encoding to shrink payload sizes for integers. +* **Transactional Stream Parsing:** Easily process fragmented asynchronous data chunks using `StreamBinaryReader` with `bookmark()` and `rollback()` capabilities. +* **Cross-Platform:** 100% pure Dart. Works seamlessly across Native (AOT/JIT) and Web (WASM/JS) with a consistent, predictable API. ## Installation From 5b29069a793e93d15dbfa8a5a2bd7f7746540779 Mon Sep 17 00:00:00 2001 From: Andrii Ivanov Date: Thu, 28 May 2026 12:21:33 +0300 Subject: [PATCH 4/4] dart format . --- lib/src/binary_writer.dart | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/src/binary_writer.dart b/lib/src/binary_writer.dart index 00f24bb..d98b7c1 100644 --- a/lib/src/binary_writer.dart +++ b/lib/src/binary_writer.dart @@ -617,7 +617,7 @@ extension type BinaryWriter._(_WriterState _ws) { // 3. Write the actual string data directly into the buffer. writeString(value, allowMalformed: allowMalformed); - // Cache the offset immediately after writing to determine the exact byte + // Cache the offset immediately after writing to determine the exact byte // length. var currentOffset = _ws.offset; final byteLength = currentOffset - (startOffset + estimatedSize); @@ -625,8 +625,8 @@ extension type BinaryWriter._(_WriterState _ws) { // 4. Determine the actual VarInt size required for the encoded byte length. final actualSize = _varIntSize(byteLength); - // 5. If the optimistic estimate was wrong (e.g., due to multi-byte UTF-8 - // characters), shift the written string data to accommodate the actual + // 5. If the optimistic estimate was wrong (e.g., due to multi-byte UTF-8 + // characters), shift the written string data to accommodate the actual //VarInt size. if (actualSize != estimatedSize) { final shift = actualSize - estimatedSize; @@ -642,7 +642,7 @@ extension type BinaryWriter._(_WriterState _ws) { startOffset + estimatedSize, ); - // Adjust the local tracker instead of modifying the heap property + // Adjust the local tracker instead of modifying the heap property // repeatedly. currentOffset += shift; } @@ -671,7 +671,7 @@ extension type BinaryWriter._(_WriterState _ws) { /// The length prefix size is determined by [lengthEncoding]. /// /// [value] is the string to write. - /// [lengthEncoding] specifies the size of the length prefix (defaults to + /// [lengthEncoding] specifies the size of the length prefix (defaults to /// [LengthEncoding.u8]). /// [allowMalformed] if true, malformed UTF-8 sequences will be replaced /// with U+FFFD.