Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,9 @@ public static long estimateRgEndOffset(boolean isCompressed,
// Stretch the slop by a factor to safely accommodate following compression blocks.
// We need to calculate the maximum number of blocks(stretchFactor) by bufferSize accordingly.
if (isCompressed) {
int stretchFactor = 2 + (MAX_VALUES_LENGTH * MAX_BYTE_WIDTH - 1) / bufferSize;
// RLEv2 DIRECT runs can need a 2-byte header in addition to their value payload.
int maxRleDirectRunSize = MAX_VALUES_LENGTH * MAX_BYTE_WIDTH + 2;
int stretchFactor = 2 + (maxRleDirectRunSize - 1) / bufferSize;
Comment on lines +220 to +222
slop = stretchFactor * (OutStream.HEADER_SIZE + bufferSize);
}
return isLast ? streamLength : Math.min(streamLength, nextGroupOffset + slop);
Expand Down Expand Up @@ -300,6 +302,8 @@ public static int getIndexPosition(OrcProto.ColumnEncoding.Kind columnEncoding,
// the maximum byte width for each value
static final int MAX_BYTE_WIDTH =
SerializationUtils.decodeBitWidth(SerializationUtils.FixedBitSizes.SIXTYFOUR.ordinal()) / 8;
// RLEv2 DIRECT run header size in bytes
public static final int RLE_V2_HEADER_SIZE = 2;

/**
* Is this stream part of a dictionary?
Expand Down
68 changes: 68 additions & 0 deletions java/core/src/test/org/apache/orc/impl/TestInStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,16 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;

import static org.apache.orc.impl.RecordReaderUtils.MAX_BYTE_WIDTH;
import static org.apache.orc.impl.RecordReaderUtils.MAX_VALUES_LENGTH;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

public class TestInStream {
Expand Down Expand Up @@ -1000,4 +1005,67 @@ public void testStreamResetWithoutIncreasedLength() throws IOException {
byte[] inBuffer = new byte[5];
assertEquals(5, inStream.read(inBuffer));
}

/**
* Demonstrates that the old estimateRgEndOffset slop calculation is insufficient.
* When a compressed stream is truncated at the old estimated end offset,
* reading a full RLE v2 DIRECT run fails because the estimated slop doesn't
* account for enough compressed blocks.
*/
@Test
public void testTruncatedRleV2DirectRunAtEstimatedEndFails() throws Exception {
final int bufferSize = 1024;
final int chunkSize = OutStream.HEADER_SIZE + bufferSize;
final int nextGroupOffset = bufferSize;
final int oldStretchFactor =
2 + (MAX_VALUES_LENGTH * MAX_BYTE_WIDTH - 1) / bufferSize;
final int oldEstimatedEnd = nextGroupOffset + oldStretchFactor * chunkSize;
Comment on lines +1018 to +1022

TestInStream.OutputCollector receiver = new TestInStream.OutputCollector();
CompressionCodec codec = new ZlibCodec();
StreamOptions streamOptions = new StreamOptions(bufferSize)
.withCodec(codec, codec.getDefaultOptions());
byte[] data = new byte[bufferSize * 6];
new Random(42).nextBytes(data);
try (OutStream out = new OutStream("test", streamOptions, receiver)) {
out.write(data);
out.flush();
}

byte[] encoded = receiver.buffer.get();
assertEquals(nextGroupOffset + 5 * chunkSize, oldEstimatedEnd);
assertTrue(encoded.length > oldEstimatedEnd);

InStream stream = InStream.create("test",
new BufferChunk(ByteBuffer.wrap(encoded, 0, oldEstimatedEnd), 0),
0, oldEstimatedEnd,
InStream.options().withCodec(codec).withBufferSize(bufferSize));
byte[] rleDirectRun = new byte[MAX_VALUES_LENGTH * MAX_BYTE_WIDTH
+ RecordReaderUtils.RLE_V2_HEADER_SIZE];

stream.seek(new SimplePositionProvider(nextGroupOffset, 0));
IllegalArgumentException error = assertThrows(
IllegalArgumentException.class, () -> {
int offset = 0;
while (offset < rleDirectRun.length) {
offset += stream.read(
rleDirectRun, offset, rleDirectRun.length - offset);
Comment on lines +1051 to +1052
}
});
assertTrue(error.getMessage().contains("Buffer size too small"));
}

private static class SimplePositionProvider implements PositionProvider {
private final long[] positions;
private int index = 0;

SimplePositionProvider(long... positions) {
this.positions = positions;
}

@Override
public long getNext() {
return positions[index++];
}
}
}
81 changes: 81 additions & 0 deletions java/core/src/test/org/apache/orc/impl/TestOrcLargeStripe.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
import org.apache.orc.CompressionKind;
import org.apache.orc.OrcConf;
import org.apache.orc.OrcFile;
Expand Down Expand Up @@ -159,6 +163,83 @@ public void testConfigMaxChunkLimit() throws IOException {
assertEquals(1000, ((RecordReaderImpl) recordReader).getMaxDiskRangeChunkLimit());
}

@Test
public void testRleV2DirectSeekAtBufferBoundaryWithSkippedEndRowGroup()
throws IOException {
final int bufferSize = 1024;
final int rowIndexStride = 512 * 512 + 1;
final int rowGroupCount = 3;
final int selectedRowGroup = 1;
TypeDescription schema = TypeDescription.createStruct()
.addField("rg", TypeDescription.createLong())
.addField("value", TypeDescription.createLong());

writeRleV2BoundaryFile(schema, bufferSize, rowIndexStride, rowGroupCount);

Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf).filesystem(fs));
assertEquals(CompressionKind.ZLIB, reader.getCompressionKind());
assertEquals(bufferSize, reader.getCompressionSize());
assertEquals(1, reader.getStripes().size());

SearchArgument sarg = SearchArgumentFactory.newBuilder()
.equals("rg", PredicateLeaf.Type.LONG, (long) selectedRowGroup)
.build();
Reader.Options options = reader.options()
.searchArgument(sarg, new String[] {"rg"})
.useSelected(true)
.allowSARGToFilter(true);

long rowsRead = 0;
VectorizedRowBatch batch = schema.createRowBatch();
try (RecordReader rows = reader.rows(options)) {
while (rows.nextBatch(batch)) {
rowsRead += validateSelectedRowGroup(batch, selectedRowGroup);
}
}
assertEquals(rowIndexStride, rowsRead);
}

private void writeRleV2BoundaryFile(TypeDescription schema, int bufferSize, int rowIndexStride, int rowGroupCount)
throws IOException {
try (Writer writer = OrcFile.createWriter(testFilePath,
OrcFile.writerOptions(conf)
.fileSystem(fs)
.setSchema(schema)
.compress(CompressionKind.ZLIB)
.enforceBufferSize()
.bufferSize(bufferSize)
.stripeSize(100_000_000)
.rowIndexStride(rowIndexStride))) {
VectorizedRowBatch batch = schema.createRowBatch();
LongColumnVector rg = (LongColumnVector) batch.cols[0];
LongColumnVector value = (LongColumnVector) batch.cols[1];
Random random = new Random(42);
for (int row = 0; row < rowIndexStride * rowGroupCount; ++row) {
rg.vector[batch.size] = row / rowIndexStride;
value.vector[batch.size] = random.nextLong();
batch.size += 1;
if (batch.size == batch.getMaxSize()) {
writer.addRowBatch(batch);
batch.reset();
}
}
if (batch.size != 0) {
writer.addRowBatch(batch);
}
}
}

private long validateSelectedRowGroup(VectorizedRowBatch batch, int selectedRowGroup) {
LongColumnVector rg = (LongColumnVector) batch.cols[0];
long selectedRows = 0;
for (int i = 0; i < batch.size; ++i) {
int row = batch.selectedInUse ? batch.selected[i] : i;
assertEquals(selectedRowGroup, rg.vector[row]);
selectedRows += 1;
}
return selectedRows;
}

@Test
public void testStringDirectGreaterThan2GB() throws IOException {
final Runtime rt = Runtime.getRuntime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,8 @@
import java.util.List;
import java.util.TimeZone;

import static org.apache.orc.impl.RecordReaderUtils.MAX_BYTE_WIDTH;
import static org.apache.orc.impl.RecordReaderUtils.MAX_VALUES_LENGTH;
import static org.apache.orc.OrcFile.CURRENT_WRITER;
import static org.apache.orc.impl.RecordReaderUtils.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
Expand Down Expand Up @@ -1544,7 +1543,7 @@ public void testPartialPlanCompressed() throws Exception {
new InStream.StreamOptions()
.withCodec(OrcCodecPool.getCodec(CompressionKind.ZLIB))
.withBufferSize(1024);
int stretchFactor = 2 + (MAX_VALUES_LENGTH * MAX_BYTE_WIDTH - 1) / options.getBufferSize();
int stretchFactor = 2 + (MAX_VALUES_LENGTH * MAX_BYTE_WIDTH + RLE_V2_HEADER_SIZE - 1) / options.getBufferSize();
final int SLOP = stretchFactor * (OutStream.HEADER_SIZE + options.getBufferSize());
MockDataReader dataReader = new MockDataReader(schema, options)
.addStream(1, OrcProto.Stream.Kind.ROW_INDEX,
Expand Down
Loading