diff --git a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala index 783d68a00cd..03208adbcf3 100644 --- a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala +++ b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala @@ -150,6 +150,7 @@ class VeloxCelebornColumnarShuffleWriter[K, V]( GlutenShuffleUtils.getStartPartitionId(dep.nativePartitioning, context.partitionId), nativeBufferSize, GlutenConfig.get.columnarShuffleReallocThreshold, + GlutenConfig.get.columnarShufflePartitionBufferEvictThreshold, partitionWriterHandle ) case SortShuffleWriterType => diff --git a/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java b/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java index ef35818c7b5..e01f97ba3ea 100644 --- a/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java +++ b/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java @@ -185,6 +185,7 @@ protected void writeImpl(Iterator> records) { columnarDep.nativePartitioning(), partitionId), nativeBufferSize, reallocThreshold, + GlutenConfig.get().columnarShufflePartitionBufferEvictThreshold(), partitionWriterHandle); } diff --git a/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala b/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala index 01f4bd06bab..ff1c66e0373 100644 --- a/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala +++ b/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala @@ -192,6 +192,7 @@ class ColumnarShuffleWriter[K, V]( taskContext.partitionId), nativeBufferSize, reallocThreshold, + GlutenConfig.get.columnarShufflePartitionBufferEvictThreshold, partitionWriterHandle ) } diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index f109865840e..46b9d7603ce 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -990,6 +990,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe jint startPartitionId, jint splitBufferSize, jdouble splitBufferReallocThreshold, + jint partitionBufferEvictThreshold, jlong partitionWriterHandle) { JNI_METHOD_START const auto ctx = getRuntime(env, wrapper); @@ -1004,7 +1005,8 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe toPartitioning(jStringToCString(env, partitioningNameJstr)), startPartitionId, splitBufferSize, - splitBufferReallocThreshold); + splitBufferReallocThreshold, + partitionBufferEvictThreshold); return ctx->saveObject(ctx->createShuffleWriter(numPartitions, partitionWriter, shuffleWriterOptions)); JNI_METHOD_END(kInvalidObjectHandle) diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h index ea3aff10cf7..649a1647749 100644 --- a/cpp/core/shuffle/Options.h +++ b/cpp/core/shuffle/Options.h @@ -27,6 +27,7 @@ namespace gluten { static constexpr int16_t kDefaultBatchSize = 4096; +static constexpr int32_t kDefaultPartitionBufferEvictThreshold = -1; static constexpr int32_t kDefaultShuffleWriterBufferSize = 4096; static constexpr int64_t kDefaultSortBufferThreshold = 64 << 20; static constexpr int64_t kDefaultPushMemoryThreshold = 4096; @@ -85,6 +86,7 @@ struct ShuffleWriterOptions { struct HashShuffleWriterOptions : ShuffleWriterOptions { int32_t splitBufferSize = kDefaultShuffleWriterBufferSize; double splitBufferReallocThreshold = kDefaultSplitBufferReallocThreshold; + int32_t partitionBufferEvictThreshold = kDefaultPartitionBufferEvictThreshold; HashShuffleWriterOptions() : ShuffleWriterOptions(ShuffleWriterType::kHashShuffle) {} @@ -92,10 +94,12 @@ struct HashShuffleWriterOptions : ShuffleWriterOptions { Partitioning partitioning, int32_t startPartitionId, int32_t partitionBufferSize, - double partitionBufferReallocThreshold) + double partitionBufferReallocThreshold, + int32_t partitionBufferEvictThreshold = kDefaultPartitionBufferEvictThreshold) : ShuffleWriterOptions(ShuffleWriterType::kHashShuffle, partitioning, startPartitionId), splitBufferSize(partitionBufferSize), - splitBufferReallocThreshold(partitionBufferReallocThreshold) {} + splitBufferReallocThreshold(partitionBufferReallocThreshold), + partitionBufferEvictThreshold(partitionBufferEvictThreshold) {} protected: HashShuffleWriterOptions(ShuffleWriterType shuffleWriterType) : ShuffleWriterOptions(shuffleWriterType) {} @@ -105,10 +109,12 @@ struct HashShuffleWriterOptions : ShuffleWriterOptions { Partitioning partitioning, int32_t startPartitionId, int32_t partitionBufferSize, - double partitionBufferReallocThreshold) + double partitionBufferReallocThreshold, + int32_t partitionBufferEvictThreshold = kDefaultPartitionBufferEvictThreshold) : ShuffleWriterOptions(shuffleWriterType, partitioning, startPartitionId), splitBufferSize(partitionBufferSize), - splitBufferReallocThreshold(partitionBufferReallocThreshold) {} + splitBufferReallocThreshold(partitionBufferReallocThreshold), + partitionBufferEvictThreshold(partitionBufferEvictThreshold) {} }; struct SortShuffleWriterOptions : ShuffleWriterOptions { diff --git a/cpp/core/shuffle/Payload.cc b/cpp/core/shuffle/Payload.cc index 230806cd2f2..5d02a844833 100644 --- a/cpp/core/shuffle/Payload.cc +++ b/cpp/core/shuffle/Payload.cc @@ -60,7 +60,7 @@ arrow::Result readPayloadType(arrow::io::InputStream* is) { } arrow::Result compressBuffer( - const std::shared_ptr& buffer, + const std::shared_ptr buffer, uint8_t* output, int64_t outputLength, arrow::util::Codec* codec) { diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc index e071e8a1c3e..89f1dbcda62 100644 --- a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc @@ -441,9 +441,41 @@ arrow::Status VeloxHashShuffleWriter::doSplit(const facebook::velox::RowVector& printPartitionBuffer(); setSplitState(SplitState::kInit); + if (partitionBufferEvictThreshold_ > 0) { + // After split, evict large partition buffers to free up memory for the next input RowVector. + const auto partitionBytes = estimatePartitionBufferBytes(); + for (uint32_t pid = 0; pid < partitionBytes.size(); ++pid) { + if (partitionBufferBase_[pid] > 0 && partitionBytes[pid] >= partitionBufferEvictThreshold_) { + RETURN_NOT_OK(evictPartitionBuffers(pid, false)); + } + } + } return arrow::Status::OK(); } +std::vector VeloxHashShuffleWriter::estimatePartitionBufferBytes() const { + std::vector partitionBytes(numPartitions_, 0); + + for (const auto& columnBuffers : partitionBuffers_) { + for (uint32_t pid = 0; pid < columnBuffers.size(); ++pid) { + for (const auto& buffer : columnBuffers[pid]) { + if (buffer) { + partitionBytes[pid] += buffer->capacity(); + } + } + } + } + + for (uint32_t pid = 0; pid < complexTypeFlushBuffer_.size(); ++pid) { + const auto& buffer = complexTypeFlushBuffer_[pid]; + if (buffer) { + partitionBytes[pid] += buffer->capacity(); + } + } + + return partitionBytes; +} + arrow::Status VeloxHashShuffleWriter::splitRowVector(const facebook::velox::RowVector& rv) { SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingSplitRV]); diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.h b/cpp/velox/shuffle/VeloxHashShuffleWriter.h index 899c2be2692..d2901019b7f 100644 --- a/cpp/velox/shuffle/VeloxHashShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.h @@ -278,7 +278,8 @@ class VeloxHashShuffleWriter : public VeloxShuffleWriter { MemoryManager* memoryManager) : VeloxShuffleWriter(numPartitions, partitionWriter, options, memoryManager), splitBufferSize_(options->splitBufferSize), - splitBufferReallocThreshold_(options->splitBufferReallocThreshold) { + splitBufferReallocThreshold_(options->splitBufferReallocThreshold), + partitionBufferEvictThreshold_(options->partitionBufferEvictThreshold) { arenas_.resize(numPartitions); } @@ -287,6 +288,8 @@ class VeloxHashShuffleWriter : public VeloxShuffleWriter { arrow::Status initColumnTypes(const facebook::velox::RowVector& rv); + std::vector estimatePartitionBufferBytes() const; + arrow::Status splitRowVector(const facebook::velox::RowVector& rv); arrow::Status initFromRowVector(const facebook::velox::RowVector& rv); @@ -396,6 +399,7 @@ class VeloxHashShuffleWriter : public VeloxShuffleWriter { protected: int32_t splitBufferSize_; double splitBufferReallocThreshold_; + int32_t partitionBufferEvictThreshold_; std::shared_ptr schema_; diff --git a/docs/Configuration.md b/docs/Configuration.md index 9a7cc8af8c7..1bd4dce5b47 100644 --- a/docs/Configuration.md +++ b/docs/Configuration.md @@ -95,6 +95,7 @@ nav_order: 15 | spark.gluten.sql.columnar.shuffle.compression.threshold | 100 | If number of rows in a batch falls below this threshold, will copy all buffers into one buffer to compress. | | spark.gluten.sql.columnar.shuffle.dictionary.enabled | false | Enable dictionary in hash-based shuffle. | | spark.gluten.sql.columnar.shuffle.merge.threshold | 0.25 | +| spark.gluten.sql.columnar.shuffle.partitionBufferEvictThreshold | -1 | For Velox hash shuffle writer, evict partition buffers larger than this threshold after splitting an input batch. Use non-positive value to disable this feature. | | spark.gluten.sql.columnar.shuffle.readerBufferSize | 1MB | Buffer size in bytes for shuffle reader reading input stream from local or remote. | | spark.gluten.sql.columnar.shuffle.realloc.threshold | 0.25 | | spark.gluten.sql.columnar.shuffle.sort.columns.threshold | 100000 | The threshold to determine whether to use sort-based columnar shuffle. Sort-based shuffle will be used if the number of columns is greater than this threshold. | diff --git a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java index a389da68603..87685f8505c 100644 --- a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java +++ b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java @@ -43,6 +43,7 @@ public native long createHashShuffleWriter( int startPartitionId, int splitBufferSize, double splitBufferReallocThreshold, + int partitionBufferEvictThreshold, long partitionWriterHandle); public native long createSortShuffleWriter( diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala index 0bbbcead63b..f7ac297dec0 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala @@ -225,6 +225,9 @@ class GlutenConfig(conf: SQLConf) extends GlutenCoreConfig(conf) { def columnarShuffleReallocThreshold: Double = getConf(COLUMNAR_SHUFFLE_REALLOC_THRESHOLD) + def columnarShufflePartitionBufferEvictThreshold: Int = + getConf(COLUMNAR_SHUFFLE_PARTITION_BUFFER_EVICT_THRESHOLD) + def columnarShuffleMergeThreshold: Double = getConf(SHUFFLE_WRITER_MERGE_THRESHOLD) def columnarShuffleCodec: Option[String] = getConf(COLUMNAR_SHUFFLE_CODEC) @@ -1074,6 +1077,14 @@ object GlutenConfig extends ConfigRegistry { .checkValue(v => v >= 0 && v <= 1, "Buffer reallocation threshold must between [0, 1]") .createWithDefault(0.25) + val COLUMNAR_SHUFFLE_PARTITION_BUFFER_EVICT_THRESHOLD = + buildConf("spark.gluten.sql.columnar.shuffle.partitionBufferEvictThreshold") + .doc( + "For Velox hash shuffle writer, evict partition buffers larger than this threshold " + + "after splitting an input batch. Use non-positive value to disable this feature.") + .intConf + .createWithDefault(-1) + val COLUMNAR_SHUFFLE_CODEC = buildConf("spark.gluten.sql.columnar.shuffle.codec") .doc(