From 076e78714d592c153a5c401523601ba006c05ed4 Mon Sep 17 00:00:00 2001 From: thexia Date: Thu, 7 May 2026 23:55:57 +0800 Subject: [PATCH] ORC-2619: Fix estimateRgEndOffset slop calculation for incompressible data The stretchFactor calculation in estimateRgEndOffset did not account for the 2-byte RLEv2 DIRECT run header. This caused insufficient buffer allocation when data is incompressible, leading to 'Buffer size too small' errors. Fix: Include RLE_V2_HEADER_SIZE in the worst-case payload calculation. Add test demonstrating the issue with the old formula. --- .../apache/orc/impl/RecordReaderUtils.java | 6 +- .../org/apache/orc/impl/TestInStream.java | 68 ++++++++++++++++ .../apache/orc/impl/TestOrcLargeStripe.java | 81 +++++++++++++++++++ .../apache/orc/impl/TestRecordReaderImpl.java | 5 +- 4 files changed, 156 insertions(+), 4 deletions(-) diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java index 2c73970688..d1f6f69c6a 100644 --- a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java +++ b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java @@ -217,7 +217,9 @@ public static long estimateRgEndOffset(boolean isCompressed, // Stretch the slop by a factor to safely accommodate following compression blocks. // We need to calculate the maximum number of blocks(stretchFactor) by bufferSize accordingly. if (isCompressed) { - int stretchFactor = 2 + (MAX_VALUES_LENGTH * MAX_BYTE_WIDTH - 1) / bufferSize; + // RLEv2 DIRECT runs can need a 2-byte header in addition to their value payload. + int maxRleDirectRunSize = MAX_VALUES_LENGTH * MAX_BYTE_WIDTH + 2; + int stretchFactor = 2 + (maxRleDirectRunSize - 1) / bufferSize; slop = stretchFactor * (OutStream.HEADER_SIZE + bufferSize); } return isLast ? streamLength : Math.min(streamLength, nextGroupOffset + slop); @@ -300,6 +302,8 @@ public static int getIndexPosition(OrcProto.ColumnEncoding.Kind columnEncoding, // the maximum byte width for each value static final int MAX_BYTE_WIDTH = SerializationUtils.decodeBitWidth(SerializationUtils.FixedBitSizes.SIXTYFOUR.ordinal()) / 8; + // RLEv2 DIRECT run header size in bytes + public static final int RLE_V2_HEADER_SIZE = 2; /** * Is this stream part of a dictionary? diff --git a/java/core/src/test/org/apache/orc/impl/TestInStream.java b/java/core/src/test/org/apache/orc/impl/TestInStream.java index 596afcf304..5d02f494d5 100644 --- a/java/core/src/test/org/apache/orc/impl/TestInStream.java +++ b/java/core/src/test/org/apache/orc/impl/TestInStream.java @@ -35,11 +35,16 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Random; +import static org.apache.orc.impl.RecordReaderUtils.MAX_BYTE_WIDTH; +import static org.apache.orc.impl.RecordReaderUtils.MAX_VALUES_LENGTH; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotSame; import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; public class TestInStream { @@ -1000,4 +1005,67 @@ public void testStreamResetWithoutIncreasedLength() throws IOException { byte[] inBuffer = new byte[5]; assertEquals(5, inStream.read(inBuffer)); } + + /** + * Demonstrates that the old estimateRgEndOffset slop calculation is insufficient. + * When a compressed stream is truncated at the old estimated end offset, + * reading a full RLE v2 DIRECT run fails because the estimated slop doesn't + * account for enough compressed blocks. + */ + @Test + public void testTruncatedRleV2DirectRunAtEstimatedEndFails() throws Exception { + final int bufferSize = 1024; + final int chunkSize = OutStream.HEADER_SIZE + bufferSize; + final int nextGroupOffset = bufferSize; + final int oldStretchFactor = + 2 + (MAX_VALUES_LENGTH * MAX_BYTE_WIDTH - 1) / bufferSize; + final int oldEstimatedEnd = nextGroupOffset + oldStretchFactor * chunkSize; + + TestInStream.OutputCollector receiver = new TestInStream.OutputCollector(); + CompressionCodec codec = new ZlibCodec(); + StreamOptions streamOptions = new StreamOptions(bufferSize) + .withCodec(codec, codec.getDefaultOptions()); + byte[] data = new byte[bufferSize * 6]; + new Random(42).nextBytes(data); + try (OutStream out = new OutStream("test", streamOptions, receiver)) { + out.write(data); + out.flush(); + } + + byte[] encoded = receiver.buffer.get(); + assertEquals(nextGroupOffset + 5 * chunkSize, oldEstimatedEnd); + assertTrue(encoded.length > oldEstimatedEnd); + + InStream stream = InStream.create("test", + new BufferChunk(ByteBuffer.wrap(encoded, 0, oldEstimatedEnd), 0), + 0, oldEstimatedEnd, + InStream.options().withCodec(codec).withBufferSize(bufferSize)); + byte[] rleDirectRun = new byte[MAX_VALUES_LENGTH * MAX_BYTE_WIDTH + + RecordReaderUtils.RLE_V2_HEADER_SIZE]; + + stream.seek(new SimplePositionProvider(nextGroupOffset, 0)); + IllegalArgumentException error = assertThrows( + IllegalArgumentException.class, () -> { + int offset = 0; + while (offset < rleDirectRun.length) { + offset += stream.read( + rleDirectRun, offset, rleDirectRun.length - offset); + } + }); + assertTrue(error.getMessage().contains("Buffer size too small")); + } + + private static class SimplePositionProvider implements PositionProvider { + private final long[] positions; + private int index = 0; + + SimplePositionProvider(long... positions) { + this.positions = positions; + } + + @Override + public long getNext() { + return positions[index++]; + } + } } diff --git a/java/core/src/test/org/apache/orc/impl/TestOrcLargeStripe.java b/java/core/src/test/org/apache/orc/impl/TestOrcLargeStripe.java index 6f8dfb478b..91fb3ed5b7 100644 --- a/java/core/src/test/org/apache/orc/impl/TestOrcLargeStripe.java +++ b/java/core/src/test/org/apache/orc/impl/TestOrcLargeStripe.java @@ -22,7 +22,11 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory; import org.apache.orc.CompressionKind; import org.apache.orc.OrcConf; import org.apache.orc.OrcFile; @@ -159,6 +163,83 @@ public void testConfigMaxChunkLimit() throws IOException { assertEquals(1000, ((RecordReaderImpl) recordReader).getMaxDiskRangeChunkLimit()); } + @Test + public void testRleV2DirectSeekAtBufferBoundaryWithSkippedEndRowGroup() + throws IOException { + final int bufferSize = 1024; + final int rowIndexStride = 512 * 512 + 1; + final int rowGroupCount = 3; + final int selectedRowGroup = 1; + TypeDescription schema = TypeDescription.createStruct() + .addField("rg", TypeDescription.createLong()) + .addField("value", TypeDescription.createLong()); + + writeRleV2BoundaryFile(schema, bufferSize, rowIndexStride, rowGroupCount); + + Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf).filesystem(fs)); + assertEquals(CompressionKind.ZLIB, reader.getCompressionKind()); + assertEquals(bufferSize, reader.getCompressionSize()); + assertEquals(1, reader.getStripes().size()); + + SearchArgument sarg = SearchArgumentFactory.newBuilder() + .equals("rg", PredicateLeaf.Type.LONG, (long) selectedRowGroup) + .build(); + Reader.Options options = reader.options() + .searchArgument(sarg, new String[] {"rg"}) + .useSelected(true) + .allowSARGToFilter(true); + + long rowsRead = 0; + VectorizedRowBatch batch = schema.createRowBatch(); + try (RecordReader rows = reader.rows(options)) { + while (rows.nextBatch(batch)) { + rowsRead += validateSelectedRowGroup(batch, selectedRowGroup); + } + } + assertEquals(rowIndexStride, rowsRead); + } + + private void writeRleV2BoundaryFile(TypeDescription schema, int bufferSize, int rowIndexStride, int rowGroupCount) + throws IOException { + try (Writer writer = OrcFile.createWriter(testFilePath, + OrcFile.writerOptions(conf) + .fileSystem(fs) + .setSchema(schema) + .compress(CompressionKind.ZLIB) + .enforceBufferSize() + .bufferSize(bufferSize) + .stripeSize(100_000_000) + .rowIndexStride(rowIndexStride))) { + VectorizedRowBatch batch = schema.createRowBatch(); + LongColumnVector rg = (LongColumnVector) batch.cols[0]; + LongColumnVector value = (LongColumnVector) batch.cols[1]; + Random random = new Random(42); + for (int row = 0; row < rowIndexStride * rowGroupCount; ++row) { + rg.vector[batch.size] = row / rowIndexStride; + value.vector[batch.size] = random.nextLong(); + batch.size += 1; + if (batch.size == batch.getMaxSize()) { + writer.addRowBatch(batch); + batch.reset(); + } + } + if (batch.size != 0) { + writer.addRowBatch(batch); + } + } + } + + private long validateSelectedRowGroup(VectorizedRowBatch batch, int selectedRowGroup) { + LongColumnVector rg = (LongColumnVector) batch.cols[0]; + long selectedRows = 0; + for (int i = 0; i < batch.size; ++i) { + int row = batch.selectedInUse ? batch.selected[i] : i; + assertEquals(selectedRowGroup, rg.vector[row]); + selectedRows += 1; + } + return selectedRows; + } + @Test public void testStringDirectGreaterThan2GB() throws IOException { final Runtime rt = Runtime.getRuntime(); diff --git a/java/core/src/test/org/apache/orc/impl/TestRecordReaderImpl.java b/java/core/src/test/org/apache/orc/impl/TestRecordReaderImpl.java index 3c70b7284a..5e1de781f7 100644 --- a/java/core/src/test/org/apache/orc/impl/TestRecordReaderImpl.java +++ b/java/core/src/test/org/apache/orc/impl/TestRecordReaderImpl.java @@ -85,9 +85,8 @@ import java.util.List; import java.util.TimeZone; -import static org.apache.orc.impl.RecordReaderUtils.MAX_BYTE_WIDTH; -import static org.apache.orc.impl.RecordReaderUtils.MAX_VALUES_LENGTH; import static org.apache.orc.OrcFile.CURRENT_WRITER; +import static org.apache.orc.impl.RecordReaderUtils.*; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; @@ -1544,7 +1543,7 @@ public void testPartialPlanCompressed() throws Exception { new InStream.StreamOptions() .withCodec(OrcCodecPool.getCodec(CompressionKind.ZLIB)) .withBufferSize(1024); - int stretchFactor = 2 + (MAX_VALUES_LENGTH * MAX_BYTE_WIDTH - 1) / options.getBufferSize(); + int stretchFactor = 2 + (MAX_VALUES_LENGTH * MAX_BYTE_WIDTH + RLE_V2_HEADER_SIZE - 1) / options.getBufferSize(); final int SLOP = stretchFactor * (OutStream.HEADER_SIZE + options.getBufferSize()); MockDataReader dataReader = new MockDataReader(schema, options) .addStream(1, OrcProto.Stream.Kind.ROW_INDEX,