Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,62 @@ public long readLong() {
throw new UnsupportedOperationException();
}

// ---- Batch read methods ----
// Default implementations loop over the per-value methods.
// Subclasses should override with bulk/memcpy-style implementations.

/**
* Reads {@code count} integers into {@code dest} starting at {@code offset}.
*
* @param dest destination array
* @param offset start index in dest
* @param count number of values to read
*/
public void readIntegers(int[] dest, int offset, int count) {
for (int i = 0; i < count; i++) {
dest[offset + i] = readInteger();
}
}

/**
* Reads {@code count} longs into {@code dest} starting at {@code offset}.
*
* @param dest destination array
* @param offset start index in dest
* @param count number of values to read
*/
public void readLongs(long[] dest, int offset, int count) {
for (int i = 0; i < count; i++) {
dest[offset + i] = readLong();
}
}

/**
* Reads {@code count} floats into {@code dest} starting at {@code offset}.
*
* @param dest destination array
* @param offset start index in dest
* @param count number of values to read
*/
public void readFloats(float[] dest, int offset, int count) {
for (int i = 0; i < count; i++) {
dest[offset + i] = readFloat();
}
}

/**
* Reads {@code count} doubles into {@code dest} starting at {@code offset}.
*
* @param dest destination array
* @param offset start index in dest
* @param count number of values to read
*/
public void readDoubles(double[] dest, int offset, int count) {
for (int i = 0; i < count; i++) {
dest[offset + i] = readDouble();
}
}

/**
* Skips the next value in the page
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,19 @@ protected int nextElementByteOffset() {
return offset;
}

/**
* Advances the stream position by {@code count} elements and returns the byte offset
* of the first element. Used by batch read methods in subclasses.
*/
protected int advanceByteOffset(int count) {
if (indexInStream + count > valuesCount) {
throw new ParquetDecodingException("Byte-stream data was already exhausted.");
}
int offset = indexInStream * elementSizeInBytes;
indexInStream += count;
return offset;
}

// Decode an entire data page
private byte[] decodeData(ByteBuffer encoded, int valuesCount) {
assert encoded.limit() == valuesCount * elementSizeInBytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,12 @@ public ByteStreamSplitValuesReaderForDouble() {
public double readDouble() {
return decodedDataBuffer.getDouble(nextElementByteOffset());
}

@Override
public void readDoubles(double[] dest, int offset, int count) {
int byteOffset = advanceByteOffset(count);
for (int i = 0; i < count; i++) {
dest[offset + i] = decodedDataBuffer.getDouble(byteOffset + i * 8);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,12 @@ public ByteStreamSplitValuesReaderForFloat() {
public float readFloat() {
return decodedDataBuffer.getFloat(nextElementByteOffset());
}

@Override
public void readFloats(float[] dest, int offset, int count) {
int byteOffset = advanceByteOffset(count);
for (int i = 0; i < count; i++) {
dest[offset + i] = decodedDataBuffer.getFloat(byteOffset + i * 4);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,12 @@ public ByteStreamSplitValuesReaderForInteger() {
public int readInteger() {
return decodedDataBuffer.getInt(nextElementByteOffset());
}

@Override
public void readIntegers(int[] dest, int offset, int count) {
int byteOffset = advanceByteOffset(count);
for (int i = 0; i < count; i++) {
dest[offset + i] = decodedDataBuffer.getInt(byteOffset + i * 4);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,12 @@ public ByteStreamSplitValuesReaderForLong() {
public long readLong() {
return decodedDataBuffer.getLong(nextElementByteOffset());
}

@Override
public void readLongs(long[] dest, int offset, int count) {
int byteOffset = advanceByteOffset(count);
for (int i = 0; i < count; i++) {
dest[offset + i] = decodedDataBuffer.getLong(byteOffset + i * 8);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,22 @@ public long readLong() {
return valuesBuffer[valuesRead++];
}

@Override
public void readIntegers(int[] dest, int offset, int count) {
checkRead();
for (int i = 0; i < count; i++) {
dest[offset + i] = (int) valuesBuffer[valuesRead + i];
}
valuesRead += count;
}

@Override
public void readLongs(long[] dest, int offset, int count) {
checkRead();
System.arraycopy(valuesBuffer, valuesRead, dest, offset, count);
valuesRead += count;
}

private void checkRead() {
if (valuesRead >= totalValueCount) {
throw new ParquetDecodingException("no more value to read, total value count is " + totalValueCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,59 @@ public long readLong() {
}
}

@Override
public void readIntegers(int[] dest, int offset, int count) {
try {
// Batch-decode dictionary IDs, then batch-lookup
int[] ids = new int[count];
decoder.readInts(ids, 0, count);
for (int i = 0; i < count; i++) {
dest[offset + i] = dictionary.decodeToInt(ids[i]);
}
} catch (IOException e) {
throw new ParquetDecodingException(e);
}
}

@Override
public void readLongs(long[] dest, int offset, int count) {
try {
int[] ids = new int[count];
decoder.readInts(ids, 0, count);
for (int i = 0; i < count; i++) {
dest[offset + i] = dictionary.decodeToLong(ids[i]);
}
} catch (IOException e) {
throw new ParquetDecodingException(e);
}
}

@Override
public void readFloats(float[] dest, int offset, int count) {
try {
int[] ids = new int[count];
decoder.readInts(ids, 0, count);
for (int i = 0; i < count; i++) {
dest[offset + i] = dictionary.decodeToFloat(ids[i]);
}
} catch (IOException e) {
throw new ParquetDecodingException(e);
}
}

@Override
public void readDoubles(double[] dest, int offset, int count) {
try {
int[] ids = new int[count];
decoder.readInts(ids, 0, count);
for (int i = 0; i < count; i++) {
dest[offset + i] = dictionary.decodeToDouble(ids[i]);
}
} catch (IOException e) {
throw new ParquetDecodingException(e);
}
}

@Override
public void skip() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,17 @@ public double readDouble() {
throw new ParquetDecodingException("could not read double", e);
}
}

@Override
public void readDoubles(double[] dest, int offset, int count) {
try {
for (int i = 0; i < count; i++) {
dest[offset + i] = in.readDouble();
}
} catch (IOException e) {
throw new ParquetDecodingException("could not read doubles", e);
}
}
}

public static class FloatPlainValuesReader extends PlainValuesReader {
Expand All @@ -92,6 +103,17 @@ public float readFloat() {
throw new ParquetDecodingException("could not read float", e);
}
}

@Override
public void readFloats(float[] dest, int offset, int count) {
try {
for (int i = 0; i < count; i++) {
dest[offset + i] = in.readFloat();
}
} catch (IOException e) {
throw new ParquetDecodingException("could not read floats", e);
}
}
}

public static class IntegerPlainValuesReader extends PlainValuesReader {
Expand All @@ -113,6 +135,17 @@ public int readInteger() {
throw new ParquetDecodingException("could not read int", e);
}
}

@Override
public void readIntegers(int[] dest, int offset, int count) {
try {
for (int i = 0; i < count; i++) {
dest[offset + i] = in.readInt();
}
} catch (IOException e) {
throw new ParquetDecodingException("could not read ints", e);
}
}
}

public static class LongPlainValuesReader extends PlainValuesReader {
Expand All @@ -134,5 +167,16 @@ public long readLong() {
throw new ParquetDecodingException("could not read long", e);
}
}

@Override
public void readLongs(long[] dest, int offset, int count) {
try {
for (int i = 0; i < count; i++) {
dest[offset + i] = in.readLong();
}
} catch (IOException e) {
throw new ParquetDecodingException("could not read longs", e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,40 @@ public int readInt() throws IOException {
return result;
}

/**
* Reads {@code count} int values into {@code dest} starting at {@code offset}.
* This avoids per-value virtual dispatch overhead by batching across RLE runs
* and packed groups.
*
* @param dest destination array
* @param offset start index in dest
* @param count number of values to read
*/
public void readInts(int[] dest, int offset, int count) throws IOException {
int remaining = count;
int pos = offset;
while (remaining > 0) {
if (currentCount == 0) {
readNext();
}
int batchSize = Math.min(remaining, currentCount);
switch (mode) {
case RLE:
java.util.Arrays.fill(dest, pos, pos + batchSize, currentValue);
break;
case PACKED:
int startIdx = currentBuffer.length - currentCount;
System.arraycopy(currentBuffer, startIdx, dest, pos, batchSize);
break;
default:
throw new ParquetDecodingException("not a valid mode " + mode);
}
currentCount -= batchSize;
remaining -= batchSize;
pos += batchSize;
}
}

private void readNext() throws IOException {
Preconditions.checkArgument(in.available() > 0, "Reading past RLE/BitPacking stream.");
final int header = BytesUtils.readUnsignedVarInt(in);
Expand Down