From b348b2ef946cbc80fe15dfd4faddccc483290d1d Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Fri, 8 May 2026 14:58:16 -0700 Subject: [PATCH] Don't consume the source ByteBuf's readerIndex during serialization For ByteBuf-backed bytes fields, the generated serialize() called dst.writeBytes(src), which advances src's readerIndex. This had two bad consequences: 1. Re-serializing the same message produced wrong output. After the first writeTo, src had zero readable bytes; the second writeTo's _b.writeBytes(src) wrote nothing, while the cached _xLen still reserved space. The wire output was truncated and downstream parsers saw the next field's bytes where the bytes field's payload should be. 2. Two fields backed by the same ByteBuf reference clobbered each other. After the first field consumed src, the second field serialized as empty. (2) was hit by bookkeeper-stream's routing-header pattern, where the same ByteBuf is used as both the request key and the routing header rKey; (1) shows up under gRPC retry. The same offset-0 mistake also affected getXSlice() / getX() / serializeJson() / serializeTextFormat() and the equivalent paths for repeated bytes and map<*, bytes>: slice(0, len) and getBytes(0, ...) ignored a non-zero readerIndex that the caller may have set. Switch every read of a user-set ByteBuf to use buf.readerIndex() as the absolute starting offset: // serialize buf.getBytes(buf.readerIndex(), _b, len); // non-mutating // getXSlice buf.slice(buf.readerIndex(), len); // serializeJson / serializeTextFormat writeJsonBase64(_b, buf, buf.readerIndex(), len); writeTextFormatBytes(_sb, buf, buf.readerIndex(), len); The buffer itself is still stored as-is; only access is non-mutating. Tests cover (1) re-serialization, (2) two fields sharing a buffer, and a non-zero-readerIndex setter case. --- .../generator/LightProtoBytesField.java | 12 ++-- .../generator/LightProtoMapField.java | 14 ++-- .../LightProtoRepeatedBytesField.java | 8 +-- .../lightproto/tests/BytesTest.java | 70 +++++++++++++++++++ 4 files changed, 89 insertions(+), 15 deletions(-) diff --git a/code-generator/src/main/java/io/streamnative/lightproto/generator/LightProtoBytesField.java b/code-generator/src/main/java/io/streamnative/lightproto/generator/LightProtoBytesField.java index dfe6315..ed4b878 100644 --- a/code-generator/src/main/java/io/streamnative/lightproto/generator/LightProtoBytesField.java +++ b/code-generator/src/main/java/io/streamnative/lightproto/generator/LightProtoBytesField.java @@ -98,7 +98,7 @@ public void getter(PrintWriter w) { w.format(" if (%s == null) {\n", ccName); w.format(" return _parsedBuffer.slice(_%sIdx, _%sLen);\n", ccName, ccName); w.format(" } else {\n"); - w.format(" return %s.slice(0, _%sLen);\n", ccName, ccName); + w.format(" return %s.slice(%s.readerIndex(), _%sLen);\n", ccName, ccName, ccName); w.format(" }\n"); w.format("}\n"); } @@ -124,7 +124,7 @@ public void serializedSize(PrintWriter w) { @Override public void serializeJson(PrintWriter w) { w.format("if (_%sIdx == -1) {\n", ccName); - w.format(" LightProtoCodec.writeJsonBase64(_b, %s, 0, _%sLen);\n", ccName, ccName); + w.format(" LightProtoCodec.writeJsonBase64(_b, %s, %s.readerIndex(), _%sLen);\n", ccName, ccName, ccName); w.format("} else {\n"); w.format(" LightProtoCodec.writeJsonBase64(_b, _parsedBuffer, _%sIdx, _%sLen);\n", ccName, ccName); w.format("}\n"); @@ -140,7 +140,8 @@ public void serializeTextFormat(PrintWriter w) { w.format("LightProtoCodec.writeTextFormatIndent(_sb, _indent);\n"); w.format("_sb.append(\"%s: \");\n", field.getName()); w.format("if (_%sIdx == -1) {\n", ccName); - w.format(" LightProtoCodec.writeTextFormatBytes(_sb, %s, 0, _%sLen);\n", ccName, ccName); + w.format(" LightProtoCodec.writeTextFormatBytes(_sb, %s, %s.readerIndex(), _%sLen);\n", + ccName, ccName, ccName); w.format("} else {\n"); w.format(" LightProtoCodec.writeTextFormatBytes(_sb, _parsedBuffer, _%sIdx, _%sLen);\n", ccName, ccName); w.format("}\n"); @@ -159,7 +160,10 @@ public void serialize(PrintWriter w) { w.format("_addr = LightProtoCodec.writeRawVarInt(_base, _addr, _%sLen);\n", ccName); w.format("_b.writerIndex((int)(_addr - _baseOffset));\n"); w.format("if (_%sIdx == -1) {\n", ccName); - w.format(" _b.writeBytes(%s);\n", ccName); + // Use the absolute-indexed copy so we don't mutate the source buffer's + // readerIndex; that allows the message to be re-serialized (e.g. on + // gRPC retry) and lets two fields safely alias the same backing buffer. + w.format(" %s.getBytes(%s.readerIndex(), _b, _%sLen);\n", ccName, ccName, ccName); w.format("} else {\n"); w.format(" _parsedBuffer.getBytes(_%sIdx, _b, _%sLen);\n", ccName, ccName); w.format("}\n"); diff --git a/code-generator/src/main/java/io/streamnative/lightproto/generator/LightProtoMapField.java b/code-generator/src/main/java/io/streamnative/lightproto/generator/LightProtoMapField.java index f8795e8..cdb2d38 100644 --- a/code-generator/src/main/java/io/streamnative/lightproto/generator/LightProtoMapField.java +++ b/code-generator/src/main/java/io/streamnative/lightproto/generator/LightProtoMapField.java @@ -171,7 +171,7 @@ private void generateReturnValueAt(PrintWriter w, String idxVar) { w.format(" LightProtoCodec.BytesHolder _bh = _%sValues[%s];\n", ccName, idxVar); w.format(" if (_bh.idx == -1) {\n"); w.format(" byte[] _res = new byte[_bh.len];\n"); - w.format(" _bh.b.getBytes(0, _res);\n"); + w.format(" _bh.b.getBytes(_bh.b.readerIndex(), _res);\n"); w.format(" return _res;\n"); w.format(" } else {\n"); w.format(" byte[] _res = new byte[_bh.len];\n"); @@ -207,7 +207,7 @@ private void generateResolveValueExpr(PrintWriter w, String idxExpr, String varN w.format(" byte[] %s;\n", varName); w.format(" if (_vbh.idx == -1) {\n"); w.format(" %s = new byte[_vbh.len];\n", varName); - w.format(" _vbh.b.getBytes(0, %s);\n", varName); + w.format(" _vbh.b.getBytes(_vbh.b.readerIndex(), %s);\n", varName); w.format(" } else {\n"); w.format(" %s = new byte[_vbh.len];\n", varName); w.format(" _parsedBuffer.getBytes(_vbh.idx, %s);\n", varName); @@ -892,7 +892,7 @@ private void generateSerializeValueData(PrintWriter w, String idxVar) { w.format(" _addr = LightProtoCodec.writeRawVarInt(_base, _addr, _vbh.len);\n"); w.format(" _b.writerIndex((int)(_addr - _baseOffset));\n"); w.format(" if (_vbh.idx == -1) {\n"); - w.format(" _vbh.b.getBytes(0, _b, _vbh.len);\n"); + w.format(" _vbh.b.getBytes(_vbh.b.readerIndex(), _b, _vbh.len);\n"); w.format(" } else {\n"); w.format(" _parsedBuffer.getBytes(_vbh.idx, _b, _vbh.len);\n"); w.format(" }\n"); @@ -955,7 +955,7 @@ public void copy(PrintWriter w) { w.format(" byte[] _val;\n"); w.format(" if (_vbh.idx == -1) {\n"); w.format(" _val = new byte[_vbh.len];\n"); - w.format(" _vbh.b.getBytes(0, _val);\n"); + w.format(" _vbh.b.getBytes(_vbh.b.readerIndex(), _val);\n"); w.format(" } else {\n"); w.format(" _val = new byte[_vbh.len];\n"); w.format(" _other._parsedBuffer.getBytes(_vbh.idx, _val);\n"); @@ -1063,9 +1063,9 @@ public void equalsCode(PrintWriter w) { w.format(" if (!java.util.Objects.equals(_vsh1.s, _vsh2.s)) return false;\n"); } else if (isBytesValue()) { w.format(" LightProtoCodec.BytesHolder _vbh1 = _%sValues[_i];\n", ccName); - w.format(" io.netty.buffer.ByteBuf _bs1 = _vbh1.b != null ? _vbh1.b.slice(0, _vbh1.len) : _parsedBuffer.slice(_vbh1.idx, _vbh1.len);\n"); + w.format(" io.netty.buffer.ByteBuf _bs1 = _vbh1.b != null ? _vbh1.b.slice(_vbh1.b.readerIndex(), _vbh1.len) : _parsedBuffer.slice(_vbh1.idx, _vbh1.len);\n"); w.format(" LightProtoCodec.BytesHolder _vbh2 = _other._%sValues[_oIdx];\n", ccName); - w.format(" io.netty.buffer.ByteBuf _bs2 = _vbh2.b != null ? _vbh2.b.slice(0, _vbh2.len) : _other._parsedBuffer.slice(_vbh2.idx, _vbh2.len);\n"); + w.format(" io.netty.buffer.ByteBuf _bs2 = _vbh2.b != null ? _vbh2.b.slice(_vbh2.b.readerIndex(), _vbh2.len) : _other._parsedBuffer.slice(_vbh2.idx, _vbh2.len);\n"); w.format(" if (!io.netty.buffer.ByteBufUtil.equals(_bs1, _bs2)) return false;\n"); } else if (isMessageValue()) { w.format(" if (!_%sValues[_i].equals(_other._%sValues[_oIdx])) return false;\n", ccName, ccName); @@ -1117,7 +1117,7 @@ public void hashCodeCode(PrintWriter w) { w.format(" _eH = 31 * _eH + _vsh.s.hashCode();\n"); } else if (isBytesValue()) { w.format(" LightProtoCodec.BytesHolder _vbh = _%sValues[_i];\n", ccName); - w.format(" io.netty.buffer.ByteBuf _bs = _vbh.b != null ? _vbh.b.slice(0, _vbh.len) : _parsedBuffer.slice(_vbh.idx, _vbh.len);\n"); + w.format(" io.netty.buffer.ByteBuf _bs = _vbh.b != null ? _vbh.b.slice(_vbh.b.readerIndex(), _vbh.len) : _parsedBuffer.slice(_vbh.idx, _vbh.len);\n"); w.format(" _eH = 31 * _eH + io.netty.buffer.ByteBufUtil.hashCode(_bs);\n"); } else if (isMessageValue()) { w.format(" _eH = 31 * _eH + _%sValues[_i].hashCode();\n", ccName); diff --git a/code-generator/src/main/java/io/streamnative/lightproto/generator/LightProtoRepeatedBytesField.java b/code-generator/src/main/java/io/streamnative/lightproto/generator/LightProtoRepeatedBytesField.java index 5a19aa4..12bd124 100644 --- a/code-generator/src/main/java/io/streamnative/lightproto/generator/LightProtoRepeatedBytesField.java +++ b/code-generator/src/main/java/io/streamnative/lightproto/generator/LightProtoRepeatedBytesField.java @@ -75,7 +75,7 @@ public void getter(PrintWriter w) { w.format(" if (_bh.b == null) {\n"); w.format(" return _parsedBuffer.slice(_bh.idx, _bh.len);\n"); w.format(" } else {\n"); - w.format(" return _bh.b.slice(0, _bh.len);\n"); + w.format(" return _bh.b.slice(_bh.b.readerIndex(), _bh.len);\n"); w.format(" }\n"); w.format("}\n"); } @@ -88,7 +88,7 @@ public void serialize(PrintWriter w) { w.format(" _addr = LightProtoCodec.writeRawVarInt(_base, _addr, _bh.len);\n"); w.format(" _b.writerIndex((int)(_addr - _baseOffset));\n"); w.format(" if (_bh.idx == -1) {\n"); - w.format(" _bh.b.getBytes(0, _b, _bh.len);\n"); + w.format(" _bh.b.getBytes(_bh.b.readerIndex(), _b, _bh.len);\n"); w.format(" } else {\n"); w.format(" _parsedBuffer.getBytes(_bh.idx, _b, _bh.len);\n"); w.format(" }\n"); @@ -103,7 +103,7 @@ public void serializeJson(PrintWriter w) { w.format(" if (i > 0) { _b.writeByte(','); }\n"); w.format(" LightProtoCodec.BytesHolder _bh = %s[i];\n", pluralName); w.format(" if (_bh.idx == -1) {\n"); - w.format(" LightProtoCodec.writeJsonBase64(_b, _bh.b, 0, _bh.len);\n"); + w.format(" LightProtoCodec.writeJsonBase64(_b, _bh.b, _bh.b.readerIndex(), _bh.len);\n"); w.format(" } else {\n"); w.format(" LightProtoCodec.writeJsonBase64(_b, _parsedBuffer, _bh.idx, _bh.len);\n"); w.format(" }\n"); @@ -129,7 +129,7 @@ public void serializeTextFormat(PrintWriter w) { w.format(" LightProtoCodec.writeTextFormatIndent(_sb, _indent);\n"); w.format(" _sb.append(\"%s: \");\n", field.getName()); w.format(" if (_bh.idx == -1) {\n"); - w.format(" LightProtoCodec.writeTextFormatBytes(_sb, _bh.b, 0, _bh.len);\n"); + w.format(" LightProtoCodec.writeTextFormatBytes(_sb, _bh.b, _bh.b.readerIndex(), _bh.len);\n"); w.format(" } else {\n"); w.format(" LightProtoCodec.writeTextFormatBytes(_sb, _parsedBuffer, _bh.idx, _bh.len);\n"); w.format(" }\n"); diff --git a/tests/src/test/java/io/streamnative/lightproto/tests/BytesTest.java b/tests/src/test/java/io/streamnative/lightproto/tests/BytesTest.java index ae307b9..22d49d8 100644 --- a/tests/src/test/java/io/streamnative/lightproto/tests/BytesTest.java +++ b/tests/src/test/java/io/streamnative/lightproto/tests/BytesTest.java @@ -136,6 +136,76 @@ public void testClearResetsOptionalBytesToDefault() { assertEquals(0, lpb.getSerializedSize()); } + @Test + public void testByteBufNotConsumedBySerialization() throws Exception { + // Setting a ByteBuf-typed bytes field must not advance the source + // buffer's readerIndex during serialization, otherwise: + // 1. the same message can't be serialized more than once (e.g. + // across gRPC retries), and + // 2. two fields backed by the same ByteBuf would clobber each other + // because the second field would read 0 bytes. + ByteBuf payload = Unpooled.wrappedBuffer(new byte[]{10, 20, 30, 40}); + int readerIdxBefore = payload.readerIndex(); + int readableBytesBefore = payload.readableBytes(); + + B lpb = new B().setPayload(payload); + + bb1.clear(); + lpb.writeTo(bb1); + + assertEquals(readerIdxBefore, payload.readerIndex(), + "setX(ByteBuf) + writeTo must not consume the source buffer's readerIndex"); + assertEquals(readableBytesBefore, payload.readableBytes()); + + // Serialize a second time — must produce the same wire bytes. + ByteBuf bb2nd = Unpooled.buffer(4096); + lpb.writeTo(bb2nd); + assertEquals(bb1.readableBytes(), bb2nd.readableBytes()); + assertEquals(bb1, bb2nd); + } + + @Test + public void testByteBufWithNonZeroReaderIndex() throws Exception { + // Setting a ByteBuf whose readerIndex > 0 should still serialize + // the readable region correctly. + ByteBuf raw = Unpooled.wrappedBuffer(new byte[]{99, 99, 99, 1, 2, 3}); + raw.readerIndex(3); // skip three padding bytes + assertEquals(3, raw.readableBytes()); + + B lpb = new B().setPayload(raw); + assertArrayEquals(new byte[]{1, 2, 3}, lpb.getPayload()); + + bb1.clear(); + lpb.writeTo(bb1); + + B parsed = new B(); + parsed.parseFrom(bb1, bb1.readableBytes()); + assertArrayEquals(new byte[]{1, 2, 3}, parsed.getPayload()); + } + + @Test + public void testTwoByteBufFieldsCanShareUnderlyingBuffer() throws Exception { + // Two bytes fields in the same message both backed by the same + // ByteBuf reference must each serialize the full content. This + // mirrors the bookkeeper stream-storage routing-header pattern + // where the same key is stored as both the request key and the + // routing header rKey. + ByteBuf shared = Unpooled.wrappedBuffer(new byte[]{1, 2, 3, 4, 5}); + + B lpb = new B().setPayload(shared); + lpb.addExtraItem(shared); + + bb1.clear(); + lpb.writeTo(bb1); + + B parsed = new B(); + parsed.parseFrom(bb1, bb1.readableBytes()); + + assertArrayEquals(new byte[]{1, 2, 3, 4, 5}, parsed.getPayload()); + assertEquals(1, parsed.getExtraItemsCount()); + assertArrayEquals(new byte[]{1, 2, 3, 4, 5}, parsed.getExtraItemAt(0)); + } + @Test public void testRepeatedBytes() throws Exception { B lpb = new B();