From 6bdaec5209a070061934360f281be828321d3327 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Mej=C3=ADa?= Date: Fri, 1 May 2026 22:48:15 +0200 Subject: [PATCH] GH-3522: Reduce peak memory during row group flush by eagerly releasing column buffers During flushToFileWriter(), each column's compressed page buffers are now released immediately after being written to disk, rather than held in memory until the entire row group flush completes. For a schema with N columns, this reduces peak flush memory from ~N columns' worth of compressed pages to ~1 column's worth. Changes: - Add writeAllToAndRelease() to ConcatenatingByteBufferCollector for progressive slab-by-slab memory release during write - Make close() idempotent (safe to call after eager release or multiple times) - Call pageWriter.close() after each column in flushToFileWriter() - Add tests for eager release, double-close safety, and output equivalence --- .../ConcatenatingByteBufferCollector.java | 29 +++++++ .../TestConcatenatingByteBufferCollector.java | 80 +++++++++++++++++++ .../hadoop/ColumnChunkPageWriteStore.java | 5 ++ 3 files changed, 114 insertions(+) diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/ConcatenatingByteBufferCollector.java b/parquet-common/src/main/java/org/apache/parquet/bytes/ConcatenatingByteBufferCollector.java index 7a616e9b9d..9b5cd16be3 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/ConcatenatingByteBufferCollector.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/ConcatenatingByteBufferCollector.java @@ -26,6 +26,7 @@ import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; /** @@ -64,10 +65,14 @@ public void collect(BytesInput bytesInput) { @Override public void close() { + if (slabs.isEmpty()) { + return; + } for (ByteBuffer slab : slabs) { allocator.release(slab); } slabs.clear(); + size = 0; } @Override @@ -78,6 +83,30 @@ public void writeAllTo(OutputStream out) throws IOException { } } + /** + * Writes all collected slabs to the given output stream, releasing each slab's + * {@link ByteBuffer} back to the allocator immediately after it has been written. + * This progressively frees memory during the write rather than holding all slabs + * until {@link #close()} is called. + * + *

After this method returns, the collector is empty and {@link #size()} returns 0. + * Calling {@link #close()} afterwards is safe but has no additional effect. + * + * @param out the output stream to write to + * @throws IOException if an I/O error occurs + */ + public void writeAllToAndRelease(OutputStream out) throws IOException { + WritableByteChannel channel = Channels.newChannel(out); + Iterator it = slabs.iterator(); + while (it.hasNext()) { + ByteBuffer slab = it.next(); + channel.write(slab.duplicate()); + allocator.release(slab); + it.remove(); + } + size = 0; + } + @Override public void writeInto(ByteBuffer buffer) { for (ByteBuffer slab : slabs) { diff --git a/parquet-common/src/test/java/org/apache/parquet/bytes/TestConcatenatingByteBufferCollector.java b/parquet-common/src/test/java/org/apache/parquet/bytes/TestConcatenatingByteBufferCollector.java index d973a7c96a..ef51159c17 100644 --- a/parquet-common/src/test/java/org/apache/parquet/bytes/TestConcatenatingByteBufferCollector.java +++ b/parquet-common/src/test/java/org/apache/parquet/bytes/TestConcatenatingByteBufferCollector.java @@ -109,4 +109,84 @@ private static CapacityByteArrayOutputStream capacityByteArrayOutputStream(Strin } return cbaos; } + + @Test + public void testWriteAllToAndRelease() throws IOException { + byte[] result; + ConcatenatingByteBufferCollector collector = new ConcatenatingByteBufferCollector(allocator); + collector.collect(BytesInput.from(bytes("Hello"))); + collector.collect(BytesInput.from(bytes(" "))); + collector.collect(BytesInput.from(bytes("World"))); + + Assert.assertEquals(11, collector.size()); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + collector.writeAllToAndRelease(baos); + result = baos.toByteArray(); + + // After writeAllToAndRelease, the collector should be empty + Assert.assertEquals(0, collector.size()); + + // Verify the data was written correctly + Assert.assertEquals("Hello World", new String(result, StandardCharsets.UTF_8)); + + // close() after writeAllToAndRelease() should be a safe no-op + collector.close(); + } + + @Test + public void testDoubleCloseIsSafe() throws IOException { + ConcatenatingByteBufferCollector collector = new ConcatenatingByteBufferCollector(allocator); + collector.collect(BytesInput.from(bytes("test data"))); + + Assert.assertEquals(9, collector.size()); + + // First close releases the buffers + collector.close(); + Assert.assertEquals(0, collector.size()); + + // Second close should be a no-op and not throw + collector.close(); + } + + @Test + public void testCloseOnEmpty() { + // Close on an empty collector should not throw + ConcatenatingByteBufferCollector collector = new ConcatenatingByteBufferCollector(allocator); + collector.close(); + collector.close(); // double close on empty + } + + @Test + public void testWriteAllToAndReleaseProducesIdenticalOutput() throws IOException { + // Verify that writeAllToAndRelease produces identical output to writeAllTo + byte[] regularResult; + byte[] progressiveResult; + + // Use writeAllTo (non-destructive) + try (ConcatenatingByteBufferCollector collector = new ConcatenatingByteBufferCollector(allocator)) { + collector.collect(BytesInput.fromInt(42)); + collector.collect(BytesInput.from(bytes("parquet"))); + collector.collect(BytesInput.fromInt(99)); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + collector.writeAllTo(baos); + regularResult = baos.toByteArray(); + } + + // Use writeAllToAndRelease (progressive) + ConcatenatingByteBufferCollector collector = new ConcatenatingByteBufferCollector(allocator); + collector.collect(BytesInput.fromInt(42)); + collector.collect(BytesInput.from(bytes("parquet"))); + collector.collect(BytesInput.fromInt(99)); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + collector.writeAllToAndRelease(baos); + progressiveResult = baos.toByteArray(); + + Assert.assertArrayEquals(regularResult, progressiveResult); + + // Already released by writeAllToAndRelease, close is a no-op + collector.close(); + } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java index d9e6ea0990..9b517f8059 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java @@ -692,6 +692,11 @@ public void flushToFileWriter(ParquetFileWriter writer) throws IOException { for (ColumnDescriptor path : schema.getColumns()) { ColumnChunkPageWriter pageWriter = writers.get(path); pageWriter.writeToFileWriter(writer); + // Eagerly release this column's page buffers now that they've been + // written to the file writer. This reduces peak memory during flush + // from the entire compressed row group down to roughly one column's + // worth of compressed pages at a time. + pageWriter.close(); } } }