diff --git a/code-generator/src/main/java/io/streamnative/lightproto/generator/LightProtoBytesField.java b/code-generator/src/main/java/io/streamnative/lightproto/generator/LightProtoBytesField.java index dfe6315..ed4b878 100644 --- a/code-generator/src/main/java/io/streamnative/lightproto/generator/LightProtoBytesField.java +++ b/code-generator/src/main/java/io/streamnative/lightproto/generator/LightProtoBytesField.java @@ -98,7 +98,7 @@ public void getter(PrintWriter w) { w.format(" if (%s == null) {\n", ccName); w.format(" return _parsedBuffer.slice(_%sIdx, _%sLen);\n", ccName, ccName); w.format(" } else {\n"); - w.format(" return %s.slice(0, _%sLen);\n", ccName, ccName); + w.format(" return %s.slice(%s.readerIndex(), _%sLen);\n", ccName, ccName, ccName); w.format(" }\n"); w.format("}\n"); } @@ -124,7 +124,7 @@ public void serializedSize(PrintWriter w) { @Override public void serializeJson(PrintWriter w) { w.format("if (_%sIdx == -1) {\n", ccName); - w.format(" LightProtoCodec.writeJsonBase64(_b, %s, 0, _%sLen);\n", ccName, ccName); + w.format(" LightProtoCodec.writeJsonBase64(_b, %s, %s.readerIndex(), _%sLen);\n", ccName, ccName, ccName); w.format("} else {\n"); w.format(" LightProtoCodec.writeJsonBase64(_b, _parsedBuffer, _%sIdx, _%sLen);\n", ccName, ccName); w.format("}\n"); @@ -140,7 +140,8 @@ public void serializeTextFormat(PrintWriter w) { w.format("LightProtoCodec.writeTextFormatIndent(_sb, _indent);\n"); w.format("_sb.append(\"%s: \");\n", field.getName()); w.format("if (_%sIdx == -1) {\n", ccName); - w.format(" LightProtoCodec.writeTextFormatBytes(_sb, %s, 0, _%sLen);\n", ccName, ccName); + w.format(" LightProtoCodec.writeTextFormatBytes(_sb, %s, %s.readerIndex(), _%sLen);\n", + ccName, ccName, ccName); w.format("} else {\n"); w.format(" LightProtoCodec.writeTextFormatBytes(_sb, _parsedBuffer, _%sIdx, _%sLen);\n", ccName, ccName); w.format("}\n"); @@ -159,7 +160,10 @@ public void serialize(PrintWriter w) { w.format("_addr = LightProtoCodec.writeRawVarInt(_base, _addr, _%sLen);\n", ccName); w.format("_b.writerIndex((int)(_addr - _baseOffset));\n"); w.format("if (_%sIdx == -1) {\n", ccName); - w.format(" _b.writeBytes(%s);\n", ccName); + // Use the absolute-indexed copy so we don't mutate the source buffer's + // readerIndex; that allows the message to be re-serialized (e.g. on + // gRPC retry) and lets two fields safely alias the same backing buffer. + w.format(" %s.getBytes(%s.readerIndex(), _b, _%sLen);\n", ccName, ccName, ccName); w.format("} else {\n"); w.format(" _parsedBuffer.getBytes(_%sIdx, _b, _%sLen);\n", ccName, ccName); w.format("}\n"); diff --git a/code-generator/src/main/java/io/streamnative/lightproto/generator/LightProtoMapField.java b/code-generator/src/main/java/io/streamnative/lightproto/generator/LightProtoMapField.java index f8795e8..cdb2d38 100644 --- a/code-generator/src/main/java/io/streamnative/lightproto/generator/LightProtoMapField.java +++ b/code-generator/src/main/java/io/streamnative/lightproto/generator/LightProtoMapField.java @@ -171,7 +171,7 @@ private void generateReturnValueAt(PrintWriter w, String idxVar) { w.format(" LightProtoCodec.BytesHolder _bh = _%sValues[%s];\n", ccName, idxVar); w.format(" if (_bh.idx == -1) {\n"); w.format(" byte[] _res = new byte[_bh.len];\n"); - w.format(" _bh.b.getBytes(0, _res);\n"); + w.format(" _bh.b.getBytes(_bh.b.readerIndex(), _res);\n"); w.format(" return _res;\n"); w.format(" } else {\n"); w.format(" byte[] _res = new byte[_bh.len];\n"); @@ -207,7 +207,7 @@ private void generateResolveValueExpr(PrintWriter w, String idxExpr, String varN w.format(" byte[] %s;\n", varName); w.format(" if (_vbh.idx == -1) {\n"); w.format(" %s = new byte[_vbh.len];\n", varName); - w.format(" _vbh.b.getBytes(0, %s);\n", varName); + w.format(" _vbh.b.getBytes(_vbh.b.readerIndex(), %s);\n", varName); w.format(" } else {\n"); w.format(" %s = new byte[_vbh.len];\n", varName); w.format(" _parsedBuffer.getBytes(_vbh.idx, %s);\n", varName); @@ -892,7 +892,7 @@ private void generateSerializeValueData(PrintWriter w, String idxVar) { w.format(" _addr = LightProtoCodec.writeRawVarInt(_base, _addr, _vbh.len);\n"); w.format(" _b.writerIndex((int)(_addr - _baseOffset));\n"); w.format(" if (_vbh.idx == -1) {\n"); - w.format(" _vbh.b.getBytes(0, _b, _vbh.len);\n"); + w.format(" _vbh.b.getBytes(_vbh.b.readerIndex(), _b, _vbh.len);\n"); w.format(" } else {\n"); w.format(" _parsedBuffer.getBytes(_vbh.idx, _b, _vbh.len);\n"); w.format(" }\n"); @@ -955,7 +955,7 @@ public void copy(PrintWriter w) { w.format(" byte[] _val;\n"); w.format(" if (_vbh.idx == -1) {\n"); w.format(" _val = new byte[_vbh.len];\n"); - w.format(" _vbh.b.getBytes(0, _val);\n"); + w.format(" _vbh.b.getBytes(_vbh.b.readerIndex(), _val);\n"); w.format(" } else {\n"); w.format(" _val = new byte[_vbh.len];\n"); w.format(" _other._parsedBuffer.getBytes(_vbh.idx, _val);\n"); @@ -1063,9 +1063,9 @@ public void equalsCode(PrintWriter w) { w.format(" if (!java.util.Objects.equals(_vsh1.s, _vsh2.s)) return false;\n"); } else if (isBytesValue()) { w.format(" LightProtoCodec.BytesHolder _vbh1 = _%sValues[_i];\n", ccName); - w.format(" io.netty.buffer.ByteBuf _bs1 = _vbh1.b != null ? _vbh1.b.slice(0, _vbh1.len) : _parsedBuffer.slice(_vbh1.idx, _vbh1.len);\n"); + w.format(" io.netty.buffer.ByteBuf _bs1 = _vbh1.b != null ? _vbh1.b.slice(_vbh1.b.readerIndex(), _vbh1.len) : _parsedBuffer.slice(_vbh1.idx, _vbh1.len);\n"); w.format(" LightProtoCodec.BytesHolder _vbh2 = _other._%sValues[_oIdx];\n", ccName); - w.format(" io.netty.buffer.ByteBuf _bs2 = _vbh2.b != null ? _vbh2.b.slice(0, _vbh2.len) : _other._parsedBuffer.slice(_vbh2.idx, _vbh2.len);\n"); + w.format(" io.netty.buffer.ByteBuf _bs2 = _vbh2.b != null ? _vbh2.b.slice(_vbh2.b.readerIndex(), _vbh2.len) : _other._parsedBuffer.slice(_vbh2.idx, _vbh2.len);\n"); w.format(" if (!io.netty.buffer.ByteBufUtil.equals(_bs1, _bs2)) return false;\n"); } else if (isMessageValue()) { w.format(" if (!_%sValues[_i].equals(_other._%sValues[_oIdx])) return false;\n", ccName, ccName); @@ -1117,7 +1117,7 @@ public void hashCodeCode(PrintWriter w) { w.format(" _eH = 31 * _eH + _vsh.s.hashCode();\n"); } else if (isBytesValue()) { w.format(" LightProtoCodec.BytesHolder _vbh = _%sValues[_i];\n", ccName); - w.format(" io.netty.buffer.ByteBuf _bs = _vbh.b != null ? _vbh.b.slice(0, _vbh.len) : _parsedBuffer.slice(_vbh.idx, _vbh.len);\n"); + w.format(" io.netty.buffer.ByteBuf _bs = _vbh.b != null ? _vbh.b.slice(_vbh.b.readerIndex(), _vbh.len) : _parsedBuffer.slice(_vbh.idx, _vbh.len);\n"); w.format(" _eH = 31 * _eH + io.netty.buffer.ByteBufUtil.hashCode(_bs);\n"); } else if (isMessageValue()) { w.format(" _eH = 31 * _eH + _%sValues[_i].hashCode();\n", ccName); diff --git a/code-generator/src/main/java/io/streamnative/lightproto/generator/LightProtoRepeatedBytesField.java b/code-generator/src/main/java/io/streamnative/lightproto/generator/LightProtoRepeatedBytesField.java index 5a19aa4..12bd124 100644 --- a/code-generator/src/main/java/io/streamnative/lightproto/generator/LightProtoRepeatedBytesField.java +++ b/code-generator/src/main/java/io/streamnative/lightproto/generator/LightProtoRepeatedBytesField.java @@ -75,7 +75,7 @@ public void getter(PrintWriter w) { w.format(" if (_bh.b == null) {\n"); w.format(" return _parsedBuffer.slice(_bh.idx, _bh.len);\n"); w.format(" } else {\n"); - w.format(" return _bh.b.slice(0, _bh.len);\n"); + w.format(" return _bh.b.slice(_bh.b.readerIndex(), _bh.len);\n"); w.format(" }\n"); w.format("}\n"); } @@ -88,7 +88,7 @@ public void serialize(PrintWriter w) { w.format(" _addr = LightProtoCodec.writeRawVarInt(_base, _addr, _bh.len);\n"); w.format(" _b.writerIndex((int)(_addr - _baseOffset));\n"); w.format(" if (_bh.idx == -1) {\n"); - w.format(" _bh.b.getBytes(0, _b, _bh.len);\n"); + w.format(" _bh.b.getBytes(_bh.b.readerIndex(), _b, _bh.len);\n"); w.format(" } else {\n"); w.format(" _parsedBuffer.getBytes(_bh.idx, _b, _bh.len);\n"); w.format(" }\n"); @@ -103,7 +103,7 @@ public void serializeJson(PrintWriter w) { w.format(" if (i > 0) { _b.writeByte(','); }\n"); w.format(" LightProtoCodec.BytesHolder _bh = %s[i];\n", pluralName); w.format(" if (_bh.idx == -1) {\n"); - w.format(" LightProtoCodec.writeJsonBase64(_b, _bh.b, 0, _bh.len);\n"); + w.format(" LightProtoCodec.writeJsonBase64(_b, _bh.b, _bh.b.readerIndex(), _bh.len);\n"); w.format(" } else {\n"); w.format(" LightProtoCodec.writeJsonBase64(_b, _parsedBuffer, _bh.idx, _bh.len);\n"); w.format(" }\n"); @@ -129,7 +129,7 @@ public void serializeTextFormat(PrintWriter w) { w.format(" LightProtoCodec.writeTextFormatIndent(_sb, _indent);\n"); w.format(" _sb.append(\"%s: \");\n", field.getName()); w.format(" if (_bh.idx == -1) {\n"); - w.format(" LightProtoCodec.writeTextFormatBytes(_sb, _bh.b, 0, _bh.len);\n"); + w.format(" LightProtoCodec.writeTextFormatBytes(_sb, _bh.b, _bh.b.readerIndex(), _bh.len);\n"); w.format(" } else {\n"); w.format(" LightProtoCodec.writeTextFormatBytes(_sb, _parsedBuffer, _bh.idx, _bh.len);\n"); w.format(" }\n"); diff --git a/tests/src/test/java/io/streamnative/lightproto/tests/BytesTest.java b/tests/src/test/java/io/streamnative/lightproto/tests/BytesTest.java index ae307b9..22d49d8 100644 --- a/tests/src/test/java/io/streamnative/lightproto/tests/BytesTest.java +++ b/tests/src/test/java/io/streamnative/lightproto/tests/BytesTest.java @@ -136,6 +136,76 @@ public void testClearResetsOptionalBytesToDefault() { assertEquals(0, lpb.getSerializedSize()); } + @Test + public void testByteBufNotConsumedBySerialization() throws Exception { + // Setting a ByteBuf-typed bytes field must not advance the source + // buffer's readerIndex during serialization, otherwise: + // 1. the same message can't be serialized more than once (e.g. + // across gRPC retries), and + // 2. two fields backed by the same ByteBuf would clobber each other + // because the second field would read 0 bytes. + ByteBuf payload = Unpooled.wrappedBuffer(new byte[]{10, 20, 30, 40}); + int readerIdxBefore = payload.readerIndex(); + int readableBytesBefore = payload.readableBytes(); + + B lpb = new B().setPayload(payload); + + bb1.clear(); + lpb.writeTo(bb1); + + assertEquals(readerIdxBefore, payload.readerIndex(), + "setX(ByteBuf) + writeTo must not consume the source buffer's readerIndex"); + assertEquals(readableBytesBefore, payload.readableBytes()); + + // Serialize a second time — must produce the same wire bytes. + ByteBuf bb2nd = Unpooled.buffer(4096); + lpb.writeTo(bb2nd); + assertEquals(bb1.readableBytes(), bb2nd.readableBytes()); + assertEquals(bb1, bb2nd); + } + + @Test + public void testByteBufWithNonZeroReaderIndex() throws Exception { + // Setting a ByteBuf whose readerIndex > 0 should still serialize + // the readable region correctly. + ByteBuf raw = Unpooled.wrappedBuffer(new byte[]{99, 99, 99, 1, 2, 3}); + raw.readerIndex(3); // skip three padding bytes + assertEquals(3, raw.readableBytes()); + + B lpb = new B().setPayload(raw); + assertArrayEquals(new byte[]{1, 2, 3}, lpb.getPayload()); + + bb1.clear(); + lpb.writeTo(bb1); + + B parsed = new B(); + parsed.parseFrom(bb1, bb1.readableBytes()); + assertArrayEquals(new byte[]{1, 2, 3}, parsed.getPayload()); + } + + @Test + public void testTwoByteBufFieldsCanShareUnderlyingBuffer() throws Exception { + // Two bytes fields in the same message both backed by the same + // ByteBuf reference must each serialize the full content. This + // mirrors the bookkeeper stream-storage routing-header pattern + // where the same key is stored as both the request key and the + // routing header rKey. + ByteBuf shared = Unpooled.wrappedBuffer(new byte[]{1, 2, 3, 4, 5}); + + B lpb = new B().setPayload(shared); + lpb.addExtraItem(shared); + + bb1.clear(); + lpb.writeTo(bb1); + + B parsed = new B(); + parsed.parseFrom(bb1, bb1.readableBytes()); + + assertArrayEquals(new byte[]{1, 2, 3, 4, 5}, parsed.getPayload()); + assertEquals(1, parsed.getExtraItemsCount()); + assertArrayEquals(new byte[]{1, 2, 3, 4, 5}, parsed.getExtraItemAt(0)); + } + @Test public void testRepeatedBytes() throws Exception { B lpb = new B();