Skip to content

Commit f1b5426

Browse files
author
zhongheng.gy
committed
[common][format] fix OOM when writing/compacting table with large records
1 parent 1e1c9f4 commit f1b5426

8 files changed

Lines changed: 405 additions & 11 deletions

File tree

paimon-common/src/main/java/org/apache/paimon/data/RowHelper.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,14 @@ public class RowHelper implements Serializable {
3636

3737
private static final long serialVersionUID = 1L;
3838

39+
/**
40+
* Threshold in bytes for releasing the internal reuse buffer. When big records are written, the
41+
* BinaryRowWriter's internal segment can grow very large via grow(). The {@link
42+
* #resetIfTooLarge()} method checks this threshold and releases the bloated
43+
* reuseRow/reuseWriter to avoid holding onto oversized buffers indefinitely.
44+
*/
45+
private static final int REUSE_RELEASE_THRESHOLD = 4 * 1024 * 1024; // 4MB
46+
3947
private final FieldGetter[] fieldGetters;
4048
private final ValueSetter[] valueSetters;
4149
private final boolean[] writeNulls;
@@ -81,6 +89,20 @@ public void copyInto(InternalRow row) {
8189
reuseWriter.complete();
8290
}
8391

92+
/**
93+
* Release the internal reuse buffer if the segment exceeds the threshold. This should be called
94+
* after the caller has finished using the reuseRow (e.g. after serialization), so that large
95+
* records don't linger in memory.
96+
*/
97+
public void resetIfTooLarge() {
98+
if (reuseWriter != null
99+
&& reuseWriter.getSegments() != null
100+
&& reuseWriter.getSegments().size() > REUSE_RELEASE_THRESHOLD) {
101+
reuseRow = null;
102+
reuseWriter = null;
103+
}
104+
}
105+
84106
public BinaryRow reuseRow() {
85107
return reuseRow;
86108
}

paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapBytesVector.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -106,19 +106,27 @@ public void fill(byte[] value) {
106106
Arrays.fill(this.length, value.length);
107107
}
108108

109+
/** The maximum size of array to allocate. Some VMs reserve header words in an array. */
110+
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
111+
109112
private void reserveBytes(int newCapacity) {
110113
if (newCapacity > buffer.length) {
111-
int newBytesCapacity = newCapacity * 2;
112-
try {
113-
buffer = Arrays.copyOf(buffer, newBytesCapacity);
114-
} catch (NegativeArraySizeException e) {
114+
if (newCapacity > MAX_ARRAY_SIZE) {
115115
throw new RuntimeException(
116116
String.format(
117-
"The new claimed capacity %s is too large, will overflow the INTEGER.MAX after multiply by 2. "
118-
+ "Try reduce `read.batch-size` to avoid this exception.",
119-
newCapacity),
120-
e);
117+
"The required byte buffer capacity %s exceeds the maximum array size. "
118+
+ "Try reducing `read.batch-size` to avoid this exception.",
119+
newCapacity));
120+
}
121+
// Try to double the capacity for amortized growth. If doubling would overflow,
122+
// fall back to the exact required capacity (capped at MAX_ARRAY_SIZE).
123+
int newBytesCapacity;
124+
if (newCapacity <= (MAX_ARRAY_SIZE >> 1)) {
125+
newBytesCapacity = newCapacity << 1;
126+
} else {
127+
newBytesCapacity = MAX_ARRAY_SIZE;
121128
}
129+
buffer = Arrays.copyOf(buffer, newBytesCapacity);
122130
}
123131
}
124132

paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,13 @@ public BinaryRow deserialize(DataInputView source) throws IOException {
8080
return row;
8181
}
8282

83+
/**
84+
* Threshold above which we consider a reuse buffer "oversized" and eligible for shrinking. This
85+
* prevents accumulation of large byte arrays when a few large records inflate the reuse buffer
86+
* and subsequent small records never trigger reallocation.
87+
*/
88+
private static final int REUSE_SHRINK_THRESHOLD = 4 * 1024 * 1024; // 4MB
89+
8390
public BinaryRow deserialize(BinaryRow reuse, DataInputView source) throws IOException {
8491
MemorySegment[] segments = reuse.getSegments();
8592
checkArgument(
@@ -88,6 +95,12 @@ public BinaryRow deserialize(BinaryRow reuse, DataInputView source) throws IOExc
8895

8996
int length = source.readInt();
9097
if (segments == null || segments[0].size() < length) {
98+
// Need a larger buffer
99+
segments = new MemorySegment[] {MemorySegment.wrap(new byte[length])};
100+
} else if (segments[0].size() > REUSE_SHRINK_THRESHOLD) {
101+
// The existing buffer is oversized (> 4MB). Shrink it to avoid holding onto large
102+
// byte arrays indefinitely, which can cause OOM when many merge channels each
103+
// retain a bloated reuse buffer.
91104
segments = new MemorySegment[] {MemorySegment.wrap(new byte[length])};
92105
}
93106
source.readFully(segments[0].getArray(), 0, length);

paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,16 @@ public InternalRowSerializer duplicate() {
5959

6060
@Override
6161
public void serialize(InternalRow row, DataOutputView target) throws IOException {
62-
binarySerializer.serialize(toBinaryRow(row), target);
62+
try {
63+
binarySerializer.serialize(toBinaryRow(row), target);
64+
} finally {
65+
// Must use finally here: toBinaryRow() may inflate RowHelper's internal buffer
66+
// for large records (e.g. 100MB+). The serialization can exit via EOFException
67+
// thrown by SimpleCollectingOutputView.nextSegment() when the sort buffer is
68+
// full, which is caught by BinaryInMemorySortBuffer.write() as a normal signal.
69+
// Without finally, the bloated buffer would never be released on that path.
70+
rowHelper.resetIfTooLarge();
71+
}
6372
}
6473

6574
@Override
@@ -132,7 +141,13 @@ public InternalRow createReuseInstance() {
132141
@Override
133142
public int serializeToPages(InternalRow row, AbstractPagedOutputView target)
134143
throws IOException {
135-
return binarySerializer.serializeToPages(toBinaryRow(row), target);
144+
try {
145+
return binarySerializer.serializeToPages(toBinaryRow(row), target);
146+
} finally {
147+
// Same as serialize(): must use finally because EOFException may bypass normal
148+
// return when the sort buffer is full.
149+
rowHelper.resetIfTooLarge();
150+
}
136151
}
137152

138153
@Override
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.data;
20+
21+
import org.apache.paimon.types.DataTypes;
22+
import org.apache.paimon.types.RowKind;
23+
24+
import org.junit.jupiter.api.Test;
25+
26+
import java.util.Arrays;
27+
28+
import static org.assertj.core.api.Assertions.assertThat;
29+
30+
/** Tests for {@link RowHelper}, focusing on the resetIfTooLarge() behavior. */
31+
class RowHelperTest {
32+
33+
@Test
34+
void testResetIfTooLargeReleasesOversizedBuffer() {
35+
RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING(), DataTypes.BYTES()));
36+
37+
// Write a large record (> 4MB) to inflate the internal buffer
38+
byte[] largePayload = new byte[5 * 1024 * 1024]; // 5MB
39+
Arrays.fill(largePayload, (byte) 'x');
40+
GenericRow largeRow = GenericRow.of(BinaryString.fromString("key"), largePayload);
41+
largeRow.setRowKind(RowKind.INSERT);
42+
helper.copyInto(largeRow);
43+
44+
assertThat(helper.reuseRow()).isNotNull();
45+
46+
// resetIfTooLarge() should release the bloated buffer
47+
helper.resetIfTooLarge();
48+
assertThat(helper.reuseRow()).isNull();
49+
}
50+
51+
@Test
52+
void testResetIfTooLargeKeepsSmallBuffer() {
53+
RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING(), DataTypes.INT()));
54+
55+
// Write a small record (< 4MB)
56+
GenericRow smallRow = GenericRow.of(BinaryString.fromString("hello"), 42);
57+
smallRow.setRowKind(RowKind.INSERT);
58+
helper.copyInto(smallRow);
59+
60+
assertThat(helper.reuseRow()).isNotNull();
61+
62+
// resetIfTooLarge() should NOT release the small buffer
63+
helper.resetIfTooLarge();
64+
assertThat(helper.reuseRow()).isNotNull();
65+
}
66+
67+
@Test
68+
void testResetIfTooLargeBeforeCopyInto() {
69+
RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING()));
70+
71+
// reuseRow is null before any copyInto
72+
assertThat(helper.reuseRow()).isNull();
73+
74+
// resetIfTooLarge() should be safe to call when reuseRow is null
75+
helper.resetIfTooLarge();
76+
assertThat(helper.reuseRow()).isNull();
77+
}
78+
79+
@Test
80+
void testReuseIsRecreatedAfterRelease() {
81+
RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING(), DataTypes.BYTES()));
82+
83+
// Write a large record to inflate the buffer
84+
byte[] largePayload = new byte[5 * 1024 * 1024];
85+
GenericRow largeRow = GenericRow.of(BinaryString.fromString("key"), largePayload);
86+
largeRow.setRowKind(RowKind.INSERT);
87+
helper.copyInto(largeRow);
88+
helper.resetIfTooLarge();
89+
assertThat(helper.reuseRow()).isNull();
90+
91+
// Write a small record — reuseRow should be recreated
92+
GenericRow smallRow = GenericRow.of(BinaryString.fromString("small"), new byte[10]);
93+
smallRow.setRowKind(RowKind.INSERT);
94+
helper.copyInto(smallRow);
95+
assertThat(helper.reuseRow()).isNotNull();
96+
97+
// Small buffer should survive resetIfTooLarge()
98+
helper.resetIfTooLarge();
99+
assertThat(helper.reuseRow()).isNotNull();
100+
}
101+
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.data.columnar.heap;
20+
21+
import org.junit.jupiter.api.Test;
22+
23+
import static org.assertj.core.api.Assertions.assertThat;
24+
25+
/** Tests for {@link HeapBytesVector#putByteArray}, focusing on reserveBytes() overflow safety. */
26+
class HeapBytesVectorReserveBytesTest {
27+
28+
@Test
29+
void testNormalGrowthDoublesCapacity() {
30+
HeapBytesVector vector = new HeapBytesVector(4);
31+
int initialBufferSize = vector.buffer.length;
32+
33+
// Write enough data to trigger growth
34+
byte[] data = new byte[initialBufferSize + 1];
35+
vector.putByteArray(0, data, 0, data.length);
36+
37+
// Buffer should have doubled
38+
assertThat(vector.buffer.length).isEqualTo((initialBufferSize + 1) * 2);
39+
}
40+
41+
@Test
42+
void testPutByteArrayStoresDataCorrectly() {
43+
HeapBytesVector vector = new HeapBytesVector(4);
44+
45+
byte[] data1 = new byte[] {1, 2, 3};
46+
byte[] data2 = new byte[] {4, 5, 6, 7};
47+
vector.putByteArray(0, data1, 0, data1.length);
48+
vector.putByteArray(1, data2, 0, data2.length);
49+
50+
HeapBytesVector.Bytes bytes0 = vector.getBytes(0);
51+
assertThat(bytes0.len).isEqualTo(3);
52+
assertThat(vector.buffer[bytes0.offset]).isEqualTo((byte) 1);
53+
assertThat(vector.buffer[bytes0.offset + 2]).isEqualTo((byte) 3);
54+
55+
HeapBytesVector.Bytes bytes1 = vector.getBytes(1);
56+
assertThat(bytes1.len).isEqualTo(4);
57+
assertThat(vector.buffer[bytes1.offset]).isEqualTo((byte) 4);
58+
}
59+
60+
@Test
61+
void testLargeCapacityDoesNotOverflow() {
62+
HeapBytesVector vector = new HeapBytesVector(2);
63+
64+
// Simulate a scenario where the required capacity is large but still within
65+
// MAX_ARRAY_SIZE. We can't actually allocate Integer.MAX_VALUE bytes in a test,
66+
// but we can verify the logic by checking that a moderately large allocation works.
67+
int largeSize = 64 * 1024 * 1024; // 64MB
68+
byte[] largeData = new byte[largeSize];
69+
vector.putByteArray(0, largeData, 0, largeData.length);
70+
71+
assertThat(vector.buffer.length).isGreaterThanOrEqualTo(largeSize);
72+
assertThat(vector.getBytes(0).len).isEqualTo(largeSize);
73+
}
74+
75+
@Test
76+
void testResetClearsBytesAppended() {
77+
HeapBytesVector vector = new HeapBytesVector(4);
78+
79+
byte[] data = new byte[] {1, 2, 3};
80+
vector.putByteArray(0, data, 0, data.length);
81+
82+
vector.reset();
83+
84+
// After reset, we should be able to write again from the beginning
85+
byte[] data2 = new byte[] {10, 20};
86+
vector.putByteArray(0, data2, 0, data2.length);
87+
88+
HeapBytesVector.Bytes bytes = vector.getBytes(0);
89+
assertThat(bytes.offset).isEqualTo(0);
90+
assertThat(bytes.len).isEqualTo(2);
91+
assertThat(vector.buffer[0]).isEqualTo((byte) 10);
92+
}
93+
}

0 commit comments

Comments
 (0)