From 0eaa760348fa4c05d7a05f579cb9a38e1486b22c Mon Sep 17 00:00:00 2001 From: zhli1142015 Date: Fri, 15 May 2026 13:51:24 +0800 Subject: [PATCH 1/2] [VL] Restore hash shuffle reader payload merging After #10499, the hash shuffle reader changed from potentially merging multiple payloads into larger batches to returning one batch per payload. That kept shuffle-read output batches small and increased downstream overhead. Restore reader-side coalescing for mergeable plain hash shuffle payloads, but flush at Spark shuffle stream boundaries so payloads from different input streams are never combined. Keep dictionary and complex-type payloads unmerged, reset dictionary state per stream, and carry over a payload that would exceed the configured batch size. Add stream-local merge tests covering multi-column primitive/bool/string/nullable data, per-stream merge boundaries, carry-over, dictionary and complex-type paths, and invalid batch sizes. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- cpp/velox/shuffle/VeloxShuffleReader.cc | 160 ++++++++++-- cpp/velox/shuffle/VeloxShuffleReader.h | 10 + cpp/velox/tests/VeloxShuffleWriterTest.cc | 289 ++++++++++++++++++++++ 3 files changed, 438 insertions(+), 21 deletions(-) diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc b/cpp/velox/shuffle/VeloxShuffleReader.cc index bb2010378ed..2e5f1f625a7 100644 --- a/cpp/velox/shuffle/VeloxShuffleReader.cc +++ b/cpp/velox/shuffle/VeloxShuffleReader.cc @@ -56,6 +56,11 @@ arrow::Result readBlockType(arrow::io::InputStream* inputStream) { return type; } +uint32_t validateHashShuffleReaderBatchSize(int32_t batchSize) { + GLUTEN_CHECK(batchSize > 0, fmt::format("Hash shuffle reader batch size must be positive, but got {}", batchSize)); + return static_cast(batchSize); +} + struct BufferViewReleaser { BufferViewReleaser() : BufferViewReleaser(nullptr) {} @@ -300,6 +305,23 @@ std::shared_ptr makeColumnarBatch( return std::make_shared(std::move(rowVector)); } +std::shared_ptr makeColumnarBatch( + RowTypePtr type, + std::unique_ptr payload, + memory::MemoryPool* pool, + int64_t& deserializeTime) { + ScopedTimer timer(&deserializeTime); + std::vector veloxBuffers; + auto numBuffers = payload->numBuffers(); + veloxBuffers.reserve(numBuffers); + for (size_t i = 0; i < numBuffers; ++i) { + GLUTEN_ASSIGN_OR_THROW(auto buffer, payload->readBufferAt(i)); + veloxBuffers.push_back(convertToVeloxBuffer(std::move(buffer))); + } + auto rowVector = deserialize(type, payload->numRows(), veloxBuffers, {}, {}, pool); + return std::make_shared(std::move(rowVector)); +} + arrow::Result readDictionaryBuffer(arrow::io::InputStream* in, facebook::velox::memory::MemoryPool* pool, arrow::util::Codec* codec) { size_t bufferSize; @@ -444,23 +466,39 @@ VeloxHashShuffleReaderDeserializer::VeloxHashShuffleReaderDeserializer( const std::shared_ptr& schema, const std::shared_ptr& codec, const facebook::velox::RowTypePtr& rowType, + int32_t batchSize, int64_t readerBufferSize, VeloxMemoryManager* memoryManager, + std::vector isValidityBuffer, + bool hasComplexType, int64_t& deserializeTime, int64_t& decompressTime) : streamReader_(streamReader), schema_(schema), codec_(codec), rowType_(rowType), + batchSize_(validateHashShuffleReaderBatchSize(batchSize)), readerBufferSize_(readerBufferSize), memoryManager_(memoryManager), + isValidityBuffer_(std::move(isValidityBuffer)), + hasComplexType_(hasComplexType), deserializeTime_(deserializeTime), decompressTime_(decompressTime) {} +bool VeloxHashShuffleReaderDeserializer::shouldSkipMerge() const { + // Complex type or dictionary encodings do not support merging. + return hasComplexType_ || !dictionaryFields_.empty(); +} + bool VeloxHashShuffleReaderDeserializer::resolveNextBlockType() { + if (blockTypeResolved_) { + return true; + } + GLUTEN_ASSIGN_OR_THROW(auto blockType, readBlockType(in_.get())); switch (blockType) { case BlockType::kEndOfStream: + in_ = nullptr; return false; case BlockType::kDictionary: { VeloxDictionaryReader reader(rowType_, memoryManager_->getLeafMemoryPool().get(), codec_.get()); @@ -485,6 +523,7 @@ bool VeloxHashShuffleReaderDeserializer::resolveNextBlockType() { default: throw GlutenException(fmt::format("Unsupported block type: {}", static_cast(blockType))); } + blockTypeResolved_ = true; return true; } @@ -499,6 +538,12 @@ void VeloxHashShuffleReaderDeserializer::loadNextStream() { return; } + if (!dictionaryFields_.empty() || !dictionaries_.empty()) { + dictionaryFields_.clear(); + dictionaries_.clear(); + } + blockTypeResolved_ = false; + if (readerBufferSize_ > 0) { GLUTEN_ASSIGN_OR_THROW( in_, @@ -510,36 +555,106 @@ void VeloxHashShuffleReaderDeserializer::loadNextStream() { } std::shared_ptr VeloxHashShuffleReaderDeserializer::next() { - if (in_ == nullptr) { - loadNextStream(); + while (true) { + if (in_ == nullptr) { + if (merged_) { + return makeColumnarBatch( + rowType_, std::move(merged_), memoryManager_->getLeafMemoryPool().get(), deserializeTime_); + } - if (reachedEos_) { - return nullptr; + loadNextStream(); + + if (reachedEos_) { + return nullptr; + } + } + if (resolveNextBlockType()) { + break; } } - while (!resolveNextBlockType()) { - loadNextStream(); - - if (reachedEos_) { - return nullptr; + if (shouldSkipMerge()) { + if (merged_) { + return makeColumnarBatch( + rowType_, std::move(merged_), memoryManager_->getLeafMemoryPool().get(), deserializeTime_); } + + uint32_t numRows = 0; + GLUTEN_ASSIGN_OR_THROW( + auto arrowBuffers, + BlockPayload::deserialize( + in_.get(), codec_, memoryManager_->defaultArrowMemoryPool(), numRows, deserializeTime_, decompressTime_)); + + blockTypeResolved_ = false; + + return makeColumnarBatch( + rowType_, + numRows, + std::move(arrowBuffers), + dictionaryFields_, + dictionaries_, + memoryManager_->getLeafMemoryPool().get(), + deserializeTime_); } + std::vector> arrowBuffers{}; uint32_t numRows = 0; - GLUTEN_ASSIGN_OR_THROW( - auto arrowBuffers, - BlockPayload::deserialize( - in_.get(), codec_, memoryManager_->defaultArrowMemoryPool(), numRows, deserializeTime_, decompressTime_)); + while (!merged_ || merged_->numRows() < batchSize_) { + if (in_ == nullptr) { + if (merged_) { + break; + } + + loadNextStream(); + if (reachedEos_) { + break; + } + } + if (!resolveNextBlockType()) { + continue; + } + + if (shouldSkipMerge()) { + break; + } + + GLUTEN_ASSIGN_OR_THROW( + arrowBuffers, + BlockPayload::deserialize( + in_.get(), codec_, memoryManager_->defaultArrowMemoryPool(), numRows, deserializeTime_, decompressTime_)); - return makeColumnarBatch( - rowType_, - numRows, - std::move(arrowBuffers), - dictionaryFields_, - dictionaries_, - memoryManager_->getLeafMemoryPool().get(), - deserializeTime_); + blockTypeResolved_ = false; + + if (!merged_) { + merged_ = std::make_unique(numRows, &isValidityBuffer_, schema_, std::move(arrowBuffers)); + arrowBuffers.clear(); + continue; + } + + auto mergedRows = merged_->numRows() + numRows; + if (mergedRows > batchSize_) { + break; + } + + auto append = std::make_unique(numRows, &isValidityBuffer_, schema_, std::move(arrowBuffers)); + GLUTEN_ASSIGN_OR_THROW( + merged_, + InMemoryPayload::merge(std::move(merged_), std::move(append), memoryManager_->defaultArrowMemoryPool())); + arrowBuffers.clear(); + } + + if (!merged_) { + return nullptr; + } + + auto columnarBatch = + makeColumnarBatch(rowType_, std::move(merged_), memoryManager_->getLeafMemoryPool().get(), deserializeTime_); + + if (!arrowBuffers.empty()) { + merged_ = std::make_unique(numRows, &isValidityBuffer_, schema_, std::move(arrowBuffers)); + } + + return columnarBatch; } VeloxSortShuffleReaderDeserializer::VeloxSortShuffleReaderDeserializer( @@ -832,8 +947,11 @@ std::unique_ptr VeloxShuffleReaderDeserializerFactory::cr schema_, codec_, rowType_, + batchSize_, readerBufferSize_, memoryManager_, + isValidityBuffer_, + hasComplexType_, deserializeTime_, decompressTime_); case ShuffleWriterType::kSortShuffle: diff --git a/cpp/velox/shuffle/VeloxShuffleReader.h b/cpp/velox/shuffle/VeloxShuffleReader.h index f30595dde45..0b08fe675aa 100644 --- a/cpp/velox/shuffle/VeloxShuffleReader.h +++ b/cpp/velox/shuffle/VeloxShuffleReader.h @@ -34,14 +34,19 @@ class VeloxHashShuffleReaderDeserializer final : public ColumnarBatchIterator { const std::shared_ptr& schema, const std::shared_ptr& codec, const facebook::velox::RowTypePtr& rowType, + int32_t batchSize, int64_t readerBufferSize, VeloxMemoryManager* memoryManager, + std::vector isValidityBuffer, + bool hasComplexType, int64_t& deserializeTime, int64_t& decompressTime); std::shared_ptr next() override; private: + bool shouldSkipMerge() const; + bool resolveNextBlockType(); void loadNextStream(); @@ -50,15 +55,20 @@ class VeloxHashShuffleReaderDeserializer final : public ColumnarBatchIterator { std::shared_ptr schema_; std::shared_ptr codec_; facebook::velox::RowTypePtr rowType_; + uint32_t batchSize_; int64_t readerBufferSize_; VeloxMemoryManager* memoryManager_; + std::vector isValidityBuffer_; + bool hasComplexType_; int64_t& deserializeTime_; int64_t& decompressTime_; std::shared_ptr in_{nullptr}; + std::unique_ptr merged_{nullptr}; bool reachedEos_{false}; + bool blockTypeResolved_{false}; std::vector dictionaryFields_{}; std::vector dictionaries_{}; diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc b/cpp/velox/tests/VeloxShuffleWriterTest.cc index bfb783f4be7..e0d900b2b19 100644 --- a/cpp/velox/tests/VeloxShuffleWriterTest.cc +++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc @@ -18,6 +18,9 @@ #include #include +#include + +#include "shuffle/Payload.h" #include "shuffle/VeloxHashShuffleWriter.h" #include "shuffle/VeloxRssSortShuffleWriter.h" #include "shuffle/VeloxSortShuffleWriter.h" @@ -191,6 +194,23 @@ std::shared_ptr createPartitionWriter( } } +class MultiStreamReader : public StreamReader { + public: + explicit MultiStreamReader(std::vector> streams) + : streams_(std::move(streams)) {} + + std::shared_ptr readNextStream(arrow::MemoryPool*) override { + if (index_ >= streams_.size()) { + return nullptr; + } + return std::move(streams_[index_++]); + } + + private: + std::vector> streams_; + size_t index_{0}; +}; + } // namespace class VeloxShuffleWriterTestEnvironment : public ::testing::Environment { @@ -441,6 +461,275 @@ class RoundRobinPartitioningShuffleWriterTest : public VeloxShuffleWriterTest { } }; +class VeloxShuffleReaderStreamMergeTest : public ::testing::Test, public VeloxShuffleWriterTestBase { + protected: + void SetUp() override { + VeloxShuffleWriterTestBase::setUpTestData(); + } + + std::shared_ptr writeSinglePartitionStream( + const RowVectorPtr& vector, + bool enableDictionary = false) { + return writeSinglePartitionStream(std::vector{vector}, enableDictionary); + } + + std::shared_ptr writeSinglePartitionStream( + const std::vector& vectors, + bool enableDictionary = false) { + GLUTEN_ASSIGN_OR_THROW(auto dataFile, createTempShuffleFile(localDirs_[0])); + + auto shuffleWriterOptions = std::make_shared(); + shuffleWriterOptions->partitioning = Partitioning::kSingle; + shuffleWriterOptions->splitBufferSize = 1024; + + auto partitionWriter = createPartitionWriter( + PartitionWriterType::kLocal, 1, dataFile, localDirs_, arrow::Compression::UNCOMPRESSED, 0, 0, enableDictionary); + GLUTEN_ASSIGN_OR_THROW( + auto shuffleWriter, + VeloxShuffleWriter::create( + ShuffleWriterType::kHashShuffle, 1, partitionWriter, shuffleWriterOptions, getDefaultMemoryManager())); + + for (const auto& vector : vectors) { + GLUTEN_THROW_NOT_OK( + shuffleWriter->write(std::make_shared(vector), ShuffleWriter::kMinMemLimit)); + } + GLUTEN_THROW_NOT_OK(shuffleWriter->stop()); + + const auto& lengths = shuffleWriter->partitionLengths(); + VELOX_CHECK_EQ(lengths.size(), 1); + + std::shared_ptr file; + GLUTEN_ASSIGN_OR_THROW(file, arrow::io::ReadableFile::Open(dataFile)); + readableFiles_.push_back(file); + + GLUTEN_ASSIGN_OR_THROW(auto in, arrow::io::RandomAccessFile::GetStream(file, 0, lengths[0])); + return in; + } + + std::shared_ptr makeDictionaryPayloadOnlyStream(uint32_t numRows) { + GLUTEN_ASSIGN_OR_THROW( + auto indices, arrow::AllocateResizableBuffer(sizeof(int32_t) * numRows, arrow::default_memory_pool())); + std::vector rawIndices(numRows, 0); + std::memcpy(indices->mutable_data(), rawIndices.data(), sizeof(int32_t) * numRows); + std::shared_ptr indicesBuffer = std::move(indices); + + std::vector> buffers{std::shared_ptr{}, indicesBuffer}; + static const std::vector kStringDictionaryPayloadValidity = {true, false}; + GLUTEN_ASSIGN_OR_THROW( + auto payload, + BlockPayload::fromBuffers( + Payload::kUncompressed, + numRows, + std::move(buffers), + &kStringDictionaryPayloadValidity, + arrow::default_memory_pool(), + nullptr)); + + GLUTEN_ASSIGN_OR_THROW(auto os, arrow::io::BufferOutputStream::Create(1024, arrow::default_memory_pool())); + static constexpr uint8_t kDictionaryPayload = static_cast(BlockType::kDictionaryPayload); + GLUTEN_THROW_NOT_OK(os->Write(&kDictionaryPayload, sizeof(kDictionaryPayload))); + GLUTEN_THROW_NOT_OK(payload->serialize(os.get())); + + GLUTEN_ASSIGN_OR_THROW(auto buffer, os->Finish()); + return std::make_shared(buffer); + } + + std::vector readStreams( + const RowTypePtr& rowType, + int32_t batchSize, + std::vector> streams) { + const auto schema = toArrowSchema(rowType, getDefaultMemoryManager()->getLeafMemoryPool().get()); + std::shared_ptr codec = + createCompressionCodec(arrow::Compression::UNCOMPRESSED, CodecBackend::NONE); + auto deserializerFactory = std::make_unique( + schema, + codec, + arrowCompressionTypeToVelox(arrow::Compression::UNCOMPRESSED), + rowType, + batchSize, + kDefaultReadBufferSize, + kDefaultDeserializerBufferSize, + getDefaultMemoryManager(), + ShuffleWriterType::kHashShuffle); + + auto reader = std::make_shared(std::move(deserializerFactory)); + const auto iter = reader->read(std::make_shared(std::move(streams))); + + std::vector output; + while (iter->hasNext()) { + output.push_back(std::dynamic_pointer_cast(iter->next())->getRowVector()); + } + return output; + } + + std::vector> readableFiles_; +}; + +TEST_F(VeloxShuffleReaderStreamMergeTest, hashReaderMergesWithinStream) { + constexpr int32_t kBatchSize = 6; + std::vector inputs = { + makeRowVector({ + makeFlatVector({1, 2}), + makeFlatVector({true, false}), + makeFlatVector({"a", "bb"}), + makeNullableFlatVector({10, std::nullopt}), + }), + makeRowVector({ + makeFlatVector({3, 4}), + makeFlatVector({false, true}), + makeFlatVector({"ccc", "dddd"}), + makeNullableFlatVector({std::nullopt, 40}), + }), + makeRowVector({ + makeFlatVector({5, 6}), + makeFlatVector({true, true}), + makeFlatVector({"eeeee", "ffffff"}), + makeNullableFlatVector({50, 60}), + }), + makeRowVector({ + makeFlatVector({7, 8}), + makeFlatVector({false, false}), + makeFlatVector({"ggggggg", "hhhhhhhh"}), + makeNullableFlatVector({std::nullopt, std::nullopt}), + })}; + + std::vector> streams = {writeSinglePartitionStream(inputs)}; + + auto output = readStreams(facebook::velox::asRowType(inputs[0]->type()), kBatchSize, std::move(streams)); + + ASSERT_EQ(output.size(), 2); + ASSERT_EQ(output[0]->size(), kBatchSize); + ASSERT_EQ(output[1]->size(), inputs[3]->size()); + facebook::velox::test::assertEqualVectors(mergeRowVectors({inputs[0], inputs[1], inputs[2]}), output[0]); + facebook::velox::test::assertEqualVectors(inputs[3], output[1]); +} + +TEST_F(VeloxShuffleReaderStreamMergeTest, hashReaderMergesWithinEachStreamOnly) { + constexpr int32_t kBatchSize = 100; + std::vector inputs = { + makeRowVector({makeFlatVector({1, 2})}), + makeRowVector({makeFlatVector({3, 4})}), + makeRowVector({makeFlatVector({5, 6})}), + makeRowVector({makeFlatVector({7, 8})})}; + + std::vector> streams = { + writeSinglePartitionStream({inputs[0], inputs[1]}), writeSinglePartitionStream({inputs[2], inputs[3]})}; + + auto output = readStreams(facebook::velox::asRowType(inputs[0]->type()), kBatchSize, std::move(streams)); + + ASSERT_EQ(output.size(), 2); + facebook::velox::test::assertEqualVectors(mergeRowVectors({inputs[0], inputs[1]}), output[0]); + facebook::velox::test::assertEqualVectors(mergeRowVectors({inputs[2], inputs[3]}), output[1]); +} + +TEST_F(VeloxShuffleReaderStreamMergeTest, hashReaderDoesNotMergeAcrossStreams) { + constexpr int32_t kBatchSize = 6; + std::vector inputs = { + makeRowVector({makeFlatVector({1, 2})}), + makeRowVector({makeFlatVector({3, 4})}), + makeRowVector({makeFlatVector({5, 6})}), + makeRowVector({makeFlatVector({7, 8})})}; + + std::vector> streams; + streams.reserve(inputs.size()); + for (const auto& input : inputs) { + streams.push_back(writeSinglePartitionStream(input)); + } + + auto output = readStreams(facebook::velox::asRowType(inputs[0]->type()), kBatchSize, std::move(streams)); + + ASSERT_EQ(output.size(), inputs.size()); + for (size_t i = 0; i < inputs.size(); ++i) { + facebook::velox::test::assertEqualVectors(inputs[i], output[i]); + } +} + +TEST_F(VeloxShuffleReaderStreamMergeTest, hashReaderRejectsNonPositiveBatchSize) { + auto input = makeRowVector({makeFlatVector({1})}); + const auto rowType = facebook::velox::asRowType(input->type()); + + EXPECT_THROW((void)readStreams(rowType, 0, {}), GlutenException); + EXPECT_THROW((void)readStreams(rowType, -1, {}), GlutenException); +} + +TEST_F(VeloxShuffleReaderStreamMergeTest, hashReaderCarriesOverPayloadThatWouldExceedBatchSize) { + constexpr int32_t kBatchSize = 6; + std::vector inputs = { + makeRowVector({makeFlatVector({1, 2, 3, 4})}), + makeRowVector({makeFlatVector({5, 6, 7, 8})}), + makeRowVector({makeFlatVector({9})})}; + + std::vector> streams = {writeSinglePartitionStream(inputs)}; + + auto output = readStreams(facebook::velox::asRowType(inputs[0]->type()), kBatchSize, std::move(streams)); + + ASSERT_EQ(output.size(), 2); + facebook::velox::test::assertEqualVectors(inputs[0], output[0]); + facebook::velox::test::assertEqualVectors(mergeRowVectors({inputs[1], inputs[2]}), output[1]); +} + +TEST_F(VeloxShuffleReaderStreamMergeTest, hashReaderFlushesMergedRowsBeforeDictionaryStream) { + constexpr int32_t kBatchSize = 100; + auto plainInput = makeRowVector({makeFlatVector({"plain-1", "plain-2"})}); + auto dictionaryInput = makeRowVector({makeFlatVector({"same", "same", "same", "same"})}); + std::vector> streams = { + writeSinglePartitionStream(plainInput), writeSinglePartitionStream(dictionaryInput, true)}; + + auto output = readStreams(facebook::velox::asRowType(plainInput->type()), kBatchSize, std::move(streams)); + + ASSERT_EQ(output.size(), 2); + facebook::velox::test::assertEqualVectors(plainInput, output[0]); + facebook::velox::test::assertEqualVectors(dictionaryInput, output[1]); +} + +TEST_F(VeloxShuffleReaderStreamMergeTest, hashReaderDoesNotMergeComplexTypeStreams) { + constexpr int32_t kBatchSize = 100; + std::vector inputs = { + makeRowVector({makeArrayVector({{1, 2}, {3}})}), + makeRowVector({makeArrayVector({{4}, {5, 6}})})}; + + std::vector> streams; + streams.reserve(inputs.size()); + for (const auto& input : inputs) { + streams.push_back(writeSinglePartitionStream(input)); + } + + auto output = readStreams(facebook::velox::asRowType(inputs[0]->type()), kBatchSize, std::move(streams)); + + ASSERT_EQ(output.size(), inputs.size()); + facebook::velox::test::assertEqualVectors(inputs[0], output[0]); + facebook::velox::test::assertEqualVectors(inputs[1], output[1]); +} + +TEST_F(VeloxShuffleReaderStreamMergeTest, hashReaderDoesNotReuseDictionaryAcrossStreams) { + auto dictionaryInput = makeRowVector({makeFlatVector({"same", "same", "same", "same"})}); + std::vector> streams = { + writeSinglePartitionStream(dictionaryInput, true), makeDictionaryPayloadOnlyStream(2)}; + + const auto rowType = facebook::velox::asRowType(dictionaryInput->type()); + const auto schema = toArrowSchema(rowType, getDefaultMemoryManager()->getLeafMemoryPool().get()); + std::shared_ptr codec = + createCompressionCodec(arrow::Compression::UNCOMPRESSED, CodecBackend::NONE); + auto deserializerFactory = std::make_unique( + schema, + codec, + arrowCompressionTypeToVelox(arrow::Compression::UNCOMPRESSED), + rowType, + kDefaultBatchSize, + kDefaultReadBufferSize, + kDefaultDeserializerBufferSize, + getDefaultMemoryManager(), + ShuffleWriterType::kHashShuffle); + + auto reader = std::make_shared(std::move(deserializerFactory)); + const auto iter = reader->read(std::make_shared(std::move(streams))); + + ASSERT_TRUE(iter->hasNext()); + facebook::velox::test::assertEqualVectors( + dictionaryInput, std::dynamic_pointer_cast(iter->next())->getRowVector()); + EXPECT_THROW((void)iter->hasNext(), GlutenException); +} + TEST_P(SinglePartitioningShuffleWriterTest, single) { if (GetParam().shuffleWriterType != ShuffleWriterType::kHashShuffle) { return; From 418503b6c8a764caefb87a5ee7aeeb9ad61f704b Mon Sep 17 00:00:00 2001 From: zhli1142015 Date: Fri, 15 May 2026 23:13:47 +0800 Subject: [PATCH 2/2] [VL] Add hash shuffle reader stream merge config Gate the reader-side raw payload merge fast path behind a Velox config and document how it complements VeloxResizeBatchesExec. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- ...VeloxCelebornColumnarBatchSerializer.scala | 6 +- .../apache/gluten/config/VeloxConfig.scala | 17 +++ .../vectorized/ColumnarBatchSerializer.scala | 7 +- cpp/core/jni/JniWrapper.cc | 4 +- cpp/core/shuffle/Options.h | 4 + cpp/velox/compute/VeloxRuntime.cc | 3 +- cpp/velox/shuffle/VeloxShuffleReader.cc | 17 ++- cpp/velox/shuffle/VeloxShuffleReader.h | 6 +- cpp/velox/tests/VeloxShuffleWriterTest.cc | 67 +++++--- docs/velox-configuration.md | 143 +++++++++--------- .../vectorized/ShuffleReaderJniWrapper.java | 3 +- 11 files changed, 177 insertions(+), 100 deletions(-) diff --git a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala index f377ea99f66..2c36c773f48 100644 --- a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala +++ b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala @@ -17,7 +17,7 @@ package org.apache.spark.shuffle import org.apache.gluten.backendsapi.BackendsApiManager -import org.apache.gluten.config.{GlutenConfig, ShuffleWriterType} +import org.apache.gluten.config.{GlutenConfig, ShuffleWriterType, VeloxConfig} import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators import org.apache.gluten.runtime.Runtimes import org.apache.gluten.utils.ArrowAbiUtil @@ -95,6 +95,7 @@ private class CelebornColumnarBatchSerializerInstance( val batchSize = GlutenConfig.get.maxBatchSize val readerBufferSize = GlutenConfig.get.columnarShuffleReaderBufferSize val deserializerBufferSize = GlutenConfig.get.columnarSortShuffleDeserializerBufferSize + val enableHashShuffleReaderStreamMerge = VeloxConfig.get.enableHashShuffleReaderStreamMerge val handle = jniWrapper .make( cSchema.memoryAddress(), @@ -103,7 +104,8 @@ private class CelebornColumnarBatchSerializerInstance( batchSize, readerBufferSize, deserializerBufferSize, - shuffleWriterType.name + shuffleWriterType.name, + enableHashShuffleReaderStreamMerge ) // Close shuffle reader instance as lately as the end of task processing, // since the native reader could hold a reference to memory pool that diff --git a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala index dbc833f046a..9ea203d42be 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala @@ -37,6 +37,9 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) { def veloxResizeBatchesShuffleOutput: Boolean = getConf(COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_OUTPUT) + def enableHashShuffleReaderStreamMerge: Boolean = + getConf(COLUMNAR_VELOX_HASH_SHUFFLE_READER_STREAM_MERGE_ENABLED) + case class ResizeRange(min: Int, max: Int) { assert(max >= min) assert(min > 0, "Min batch size should be larger than 0") @@ -322,6 +325,20 @@ object VeloxConfig extends ConfigRegistry { .booleanConf .createWithDefault(false) + val COLUMNAR_VELOX_HASH_SHUFFLE_READER_STREAM_MERGE_ENABLED = + buildConf("spark.gluten.sql.columnar.backend.velox.hashShuffle.reader.streamMerge.enabled") + .doc( + "Enables a reader-side raw payload merge fast path for plain hash shuffle payloads " + + "within each shuffle input stream. This path merges payload buffers before Velox " + + "vectors are materialized, so it has lower per-batch overhead than generic " + + "VeloxResizeBatchesExec resizing, but it only covers plain payloads. Complex types " + + "and dictionary-encoded payloads are not merged by this path. " + + "VeloxResizeBatchesExec can still be enabled separately as a generic complement " + + "for types and encodings not covered by this fast path. If false, each hash " + + "shuffle payload is returned as its own columnar batch.") + .booleanConf + .createWithDefault(false) + val COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_MIN_SIZE = buildConf("spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.minSize") .doc( diff --git a/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala b/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala index 3b5fce63f8c..284d931ecc8 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala @@ -17,7 +17,7 @@ package org.apache.gluten.vectorized import org.apache.gluten.backendsapi.BackendsApiManager -import org.apache.gluten.config.{GlutenConfig, ShuffleWriterType} +import org.apache.gluten.config.{GlutenConfig, ShuffleWriterType, VeloxConfig} import org.apache.gluten.iterator.ClosableIterator import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators import org.apache.gluten.runtime.Runtimes @@ -104,6 +104,7 @@ private class ColumnarBatchSerializerInstanceImpl( val batchSize = GlutenConfig.get.maxBatchSize val readerBufferSize = GlutenConfig.get.columnarShuffleReaderBufferSize val deserializerBufferSize = GlutenConfig.get.columnarSortShuffleDeserializerBufferSize + val enableHashShuffleReaderStreamMerge = VeloxConfig.get.enableHashShuffleReaderStreamMerge val shuffleReaderHandle = jniWrapper.make( cSchema.memoryAddress(), compressionCodec, @@ -111,7 +112,9 @@ private class ColumnarBatchSerializerInstanceImpl( batchSize, readerBufferSize, deserializerBufferSize, - shuffleWriterType.name) + shuffleWriterType.name, + enableHashShuffleReaderStreamMerge + ) // Close shuffle reader instance as lately as the end of task processing, // since the native reader could hold a reference to memory pool that // was used to create all buffers read from shuffle reader. The pool diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index 9e194be6ea8..70689c950aa 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -1211,7 +1211,8 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleReaderJniWrappe jint batchSize, jlong readerBufferSize, jlong deserializerBufferSize, - jstring shuffleWriterType) { + jstring shuffleWriterType, + jboolean enableHashShuffleReaderStreamMerge) { JNI_METHOD_START auto ctx = getRuntime(env, wrapper); @@ -1223,6 +1224,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleReaderJniWrappe options.batchSize = batchSize; options.readerBufferSize = readerBufferSize; options.deserializerBufferSize = deserializerBufferSize; + options.enableHashShuffleReaderStreamMerge = enableHashShuffleReaderStreamMerge; options.shuffleWriterType = ShuffleWriter::stringToType(jStringToCString(env, shuffleWriterType)); std::shared_ptr schema = diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h index 1d7f9ad9f9c..ea3aff10cf7 100644 --- a/cpp/core/shuffle/Options.h +++ b/cpp/core/shuffle/Options.h @@ -63,6 +63,10 @@ struct ShuffleReaderOptions { // Buffer size when deserializing rows into columnar batches. Only used for sort-based shuffle. int64_t deserializerBufferSize = kDefaultDeserializerBufferSize; + + // Whether to enable the reader-side raw payload merge fast path for plain hash shuffle payloads within one input + // stream. + bool enableHashShuffleReaderStreamMerge = false; }; struct ShuffleWriterOptions { diff --git a/cpp/velox/compute/VeloxRuntime.cc b/cpp/velox/compute/VeloxRuntime.cc index 1e2cc6f3082..e3eac17a22a 100644 --- a/cpp/velox/compute/VeloxRuntime.cc +++ b/cpp/velox/compute/VeloxRuntime.cc @@ -585,7 +585,8 @@ std::shared_ptr VeloxRuntime::createShuffleReader( options.readerBufferSize, options.deserializerBufferSize, memoryManager(), - options.shuffleWriterType); + options.shuffleWriterType, + options.enableHashShuffleReaderStreamMerge); return std::make_shared(std::move(deserializerFactory)); } diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc b/cpp/velox/shuffle/VeloxShuffleReader.cc index 2e5f1f625a7..a469d5c7702 100644 --- a/cpp/velox/shuffle/VeloxShuffleReader.cc +++ b/cpp/velox/shuffle/VeloxShuffleReader.cc @@ -471,6 +471,7 @@ VeloxHashShuffleReaderDeserializer::VeloxHashShuffleReaderDeserializer( VeloxMemoryManager* memoryManager, std::vector isValidityBuffer, bool hasComplexType, + bool enableStreamMerge, int64_t& deserializeTime, int64_t& decompressTime) : streamReader_(streamReader), @@ -482,12 +483,17 @@ VeloxHashShuffleReaderDeserializer::VeloxHashShuffleReaderDeserializer( memoryManager_(memoryManager), isValidityBuffer_(std::move(isValidityBuffer)), hasComplexType_(hasComplexType), + enableStreamMerge_(enableStreamMerge), deserializeTime_(deserializeTime), decompressTime_(decompressTime) {} bool VeloxHashShuffleReaderDeserializer::shouldSkipMerge() const { - // Complex type or dictionary encodings do not support merging. - return hasComplexType_ || !dictionaryFields_.empty(); + // Stream merge is a reader-side raw payload fast path: for plain payloads it + // concatenates buffers before Velox vectors are materialized, avoiding the generic + // RowVector append cost paid by VeloxResizeBatchesExec. Keep complex and dictionary + // payloads on the existing per-payload path; VeloxResizeBatchesExec can be enabled + // separately as the generic complement for those cases. + return !enableStreamMerge_ || hasComplexType_ || !dictionaryFields_.empty(); } bool VeloxHashShuffleReaderDeserializer::resolveNextBlockType() { @@ -912,7 +918,8 @@ VeloxShuffleReaderDeserializerFactory::VeloxShuffleReaderDeserializerFactory( int64_t readerBufferSize, int64_t deserializerBufferSize, VeloxMemoryManager* memoryManager, - ShuffleWriterType shuffleWriterType) + ShuffleWriterType shuffleWriterType, + bool enableHashShuffleReaderStreamMerge) : schema_(schema), codec_(codec), veloxCompressionType_(veloxCompressionType), @@ -921,7 +928,8 @@ VeloxShuffleReaderDeserializerFactory::VeloxShuffleReaderDeserializerFactory( readerBufferSize_(readerBufferSize), deserializerBufferSize_(deserializerBufferSize), memoryManager_(memoryManager), - shuffleWriterType_(shuffleWriterType) { + shuffleWriterType_(shuffleWriterType), + enableHashShuffleReaderStreamMerge_(enableHashShuffleReaderStreamMerge) { initFromSchema(); } @@ -952,6 +960,7 @@ std::unique_ptr VeloxShuffleReaderDeserializerFactory::cr memoryManager_, isValidityBuffer_, hasComplexType_, + enableHashShuffleReaderStreamMerge_, deserializeTime_, decompressTime_); case ShuffleWriterType::kSortShuffle: diff --git a/cpp/velox/shuffle/VeloxShuffleReader.h b/cpp/velox/shuffle/VeloxShuffleReader.h index 0b08fe675aa..f92f0a2cc32 100644 --- a/cpp/velox/shuffle/VeloxShuffleReader.h +++ b/cpp/velox/shuffle/VeloxShuffleReader.h @@ -39,6 +39,7 @@ class VeloxHashShuffleReaderDeserializer final : public ColumnarBatchIterator { VeloxMemoryManager* memoryManager, std::vector isValidityBuffer, bool hasComplexType, + bool enableStreamMerge, int64_t& deserializeTime, int64_t& decompressTime); @@ -60,6 +61,7 @@ class VeloxHashShuffleReaderDeserializer final : public ColumnarBatchIterator { VeloxMemoryManager* memoryManager_; std::vector isValidityBuffer_; bool hasComplexType_; + bool enableStreamMerge_; int64_t& deserializeTime_; int64_t& decompressTime_; @@ -171,7 +173,8 @@ class VeloxShuffleReaderDeserializerFactory { int64_t readerBufferSize, int64_t deserializerBufferSize, VeloxMemoryManager* memoryManager, - ShuffleWriterType shuffleWriterType); + ShuffleWriterType shuffleWriterType, + bool enableHashShuffleReaderStreamMerge = false); std::unique_ptr createDeserializer(const std::shared_ptr& streamReader); @@ -195,6 +198,7 @@ class VeloxShuffleReaderDeserializerFactory { bool hasComplexType_{false}; ShuffleWriterType shuffleWriterType_; + bool enableHashShuffleReaderStreamMerge_; int64_t deserializeTime_{0}; int64_t decompressTime_{0}; diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc b/cpp/velox/tests/VeloxShuffleWriterTest.cc index e0d900b2b19..18046629d48 100644 --- a/cpp/velox/tests/VeloxShuffleWriterTest.cc +++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc @@ -19,6 +19,7 @@ #include #include +#include #include "shuffle/Payload.h" #include "shuffle/VeloxHashShuffleWriter.h" @@ -537,20 +538,36 @@ class VeloxShuffleReaderStreamMergeTest : public ::testing::Test, public VeloxSh std::vector readStreams( const RowTypePtr& rowType, int32_t batchSize, - std::vector> streams) { + std::vector> streams, + std::optional enableStreamMerge = std::nullopt) { const auto schema = toArrowSchema(rowType, getDefaultMemoryManager()->getLeafMemoryPool().get()); std::shared_ptr codec = createCompressionCodec(arrow::Compression::UNCOMPRESSED, CodecBackend::NONE); - auto deserializerFactory = std::make_unique( - schema, - codec, - arrowCompressionTypeToVelox(arrow::Compression::UNCOMPRESSED), - rowType, - batchSize, - kDefaultReadBufferSize, - kDefaultDeserializerBufferSize, - getDefaultMemoryManager(), - ShuffleWriterType::kHashShuffle); + std::unique_ptr deserializerFactory; + if (enableStreamMerge.has_value()) { + deserializerFactory = std::make_unique( + schema, + codec, + arrowCompressionTypeToVelox(arrow::Compression::UNCOMPRESSED), + rowType, + batchSize, + kDefaultReadBufferSize, + kDefaultDeserializerBufferSize, + getDefaultMemoryManager(), + ShuffleWriterType::kHashShuffle, + enableStreamMerge.value()); + } else { + deserializerFactory = std::make_unique( + schema, + codec, + arrowCompressionTypeToVelox(arrow::Compression::UNCOMPRESSED), + rowType, + batchSize, + kDefaultReadBufferSize, + kDefaultDeserializerBufferSize, + getDefaultMemoryManager(), + ShuffleWriterType::kHashShuffle); + } auto reader = std::make_shared(std::move(deserializerFactory)); const auto iter = reader->read(std::make_shared(std::move(streams))); @@ -595,7 +612,7 @@ TEST_F(VeloxShuffleReaderStreamMergeTest, hashReaderMergesWithinStream) { std::vector> streams = {writeSinglePartitionStream(inputs)}; - auto output = readStreams(facebook::velox::asRowType(inputs[0]->type()), kBatchSize, std::move(streams)); + auto output = readStreams(facebook::velox::asRowType(inputs[0]->type()), kBatchSize, std::move(streams), true); ASSERT_EQ(output.size(), 2); ASSERT_EQ(output[0]->size(), kBatchSize); @@ -604,6 +621,22 @@ TEST_F(VeloxShuffleReaderStreamMergeTest, hashReaderMergesWithinStream) { facebook::velox::test::assertEqualVectors(inputs[3], output[1]); } +TEST_F(VeloxShuffleReaderStreamMergeTest, hashReaderDoesNotMergeByDefault) { + constexpr int32_t kBatchSize = 100; + std::vector inputs = { + makeRowVector({makeFlatVector({1, 2})}), + makeRowVector({makeFlatVector({3, 4})}), + makeRowVector({makeFlatVector({5, 6})})}; + + const auto rowType = facebook::velox::asRowType(inputs[0]->type()); + auto output = readStreams(rowType, kBatchSize, {writeSinglePartitionStream(inputs)}); + + ASSERT_EQ(output.size(), inputs.size()); + for (size_t i = 0; i < inputs.size(); ++i) { + facebook::velox::test::assertEqualVectors(inputs[i], output[i]); + } +} + TEST_F(VeloxShuffleReaderStreamMergeTest, hashReaderMergesWithinEachStreamOnly) { constexpr int32_t kBatchSize = 100; std::vector inputs = { @@ -615,7 +648,7 @@ TEST_F(VeloxShuffleReaderStreamMergeTest, hashReaderMergesWithinEachStreamOnly) std::vector> streams = { writeSinglePartitionStream({inputs[0], inputs[1]}), writeSinglePartitionStream({inputs[2], inputs[3]})}; - auto output = readStreams(facebook::velox::asRowType(inputs[0]->type()), kBatchSize, std::move(streams)); + auto output = readStreams(facebook::velox::asRowType(inputs[0]->type()), kBatchSize, std::move(streams), true); ASSERT_EQ(output.size(), 2); facebook::velox::test::assertEqualVectors(mergeRowVectors({inputs[0], inputs[1]}), output[0]); @@ -636,7 +669,7 @@ TEST_F(VeloxShuffleReaderStreamMergeTest, hashReaderDoesNotMergeAcrossStreams) { streams.push_back(writeSinglePartitionStream(input)); } - auto output = readStreams(facebook::velox::asRowType(inputs[0]->type()), kBatchSize, std::move(streams)); + auto output = readStreams(facebook::velox::asRowType(inputs[0]->type()), kBatchSize, std::move(streams), true); ASSERT_EQ(output.size(), inputs.size()); for (size_t i = 0; i < inputs.size(); ++i) { @@ -661,7 +694,7 @@ TEST_F(VeloxShuffleReaderStreamMergeTest, hashReaderCarriesOverPayloadThatWouldE std::vector> streams = {writeSinglePartitionStream(inputs)}; - auto output = readStreams(facebook::velox::asRowType(inputs[0]->type()), kBatchSize, std::move(streams)); + auto output = readStreams(facebook::velox::asRowType(inputs[0]->type()), kBatchSize, std::move(streams), true); ASSERT_EQ(output.size(), 2); facebook::velox::test::assertEqualVectors(inputs[0], output[0]); @@ -675,7 +708,7 @@ TEST_F(VeloxShuffleReaderStreamMergeTest, hashReaderFlushesMergedRowsBeforeDicti std::vector> streams = { writeSinglePartitionStream(plainInput), writeSinglePartitionStream(dictionaryInput, true)}; - auto output = readStreams(facebook::velox::asRowType(plainInput->type()), kBatchSize, std::move(streams)); + auto output = readStreams(facebook::velox::asRowType(plainInput->type()), kBatchSize, std::move(streams), true); ASSERT_EQ(output.size(), 2); facebook::velox::test::assertEqualVectors(plainInput, output[0]); @@ -694,7 +727,7 @@ TEST_F(VeloxShuffleReaderStreamMergeTest, hashReaderDoesNotMergeComplexTypeStrea streams.push_back(writeSinglePartitionStream(input)); } - auto output = readStreams(facebook::velox::asRowType(inputs[0]->type()), kBatchSize, std::move(streams)); + auto output = readStreams(facebook::velox::asRowType(inputs[0]->type()), kBatchSize, std::move(streams), true); ASSERT_EQ(output.size(), inputs.size()); facebook::velox::test::assertEqualVectors(inputs[0], output[0]); diff --git a/docs/velox-configuration.md b/docs/velox-configuration.md index a608dfbc450..a0c0691e2fb 100644 --- a/docs/velox-configuration.md +++ b/docs/velox-configuration.md @@ -9,77 +9,78 @@ nav_order: 16 ## Gluten Velox backend configurations -| Key | Default | Description | -|----------------------------------------------------------------------------------|-------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| spark.gluten.sql.columnar.backend.velox.IOThreads | <undefined> | The Size of the IO thread pool in the Connector. This thread pool is used for split preloading and DirectBufferedInput. By default, the value is the same as the maximum task slots per Spark executor. | -| spark.gluten.sql.columnar.backend.velox.SplitPreloadPerDriver | 2 | The split preload per task | -| spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct | 90 | If partial aggregation aggregationPct greater than this value, partial aggregation may be early abandoned. Note: this option only works when flushable partial aggregation is enabled. Ignored when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. | -| spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows | 100000 | If partial aggregation input rows number greater than this value, partial aggregation may be early abandoned. Note: this option only works when flushable partial aggregation is enabled. Ignored when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. | -| spark.gluten.sql.columnar.backend.velox.asyncTimeoutOnTaskStopping | 30000ms | Timeout in milliseconds when waiting for runtime-scoped async work to finish during teardown. | -| spark.gluten.sql.columnar.backend.velox.cacheEnabled | false | Enable Velox cache, default off. It's recommended to enablesoft-affinity as well when enable velox cache. | -| spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct | 0 | Set prefetch cache min pct for velox file scan | -| spark.gluten.sql.columnar.backend.velox.checkUsageLeak | true | Enable check memory usage leak. | -| spark.gluten.sql.columnar.backend.velox.cudf.batchSize | 2147483647 | Cudf input batch size after shuffle reader | -| spark.gluten.sql.columnar.backend.velox.cudf.enableTableScan | false | Enable cudf table scan | -| spark.gluten.sql.columnar.backend.velox.cudf.enableValidation | true | Heuristics you can apply to validate a cuDF/GPU plan and only offload when the entire stage can be fully and profitably executed on GPU | -| spark.gluten.sql.columnar.backend.velox.cudf.memoryPercent | 50 | The initial percent of GPU memory to allocate for memory resource for one thread. | -| spark.gluten.sql.columnar.backend.velox.cudf.memoryResource | async | GPU RMM memory resource. | -| spark.gluten.sql.columnar.backend.velox.cudf.shuffleMaxPrefetchBytes | 1028MB | Maximum bytes to prefetch in CPU memory during GPU shuffle read while waiting for GPU available. | -| spark.gluten.sql.columnar.backend.velox.directorySizeGuess | 32KB | Deprecated, rename to spark.gluten.sql.columnar.backend.velox.footerEstimatedSize | -| spark.gluten.sql.columnar.backend.velox.enableTimestampNtzValidation | true | Enable validation fallback for TimestampNTZ type. When true (default), any plan containing TimestampNTZ will fall back to Spark execution. Set to false during development/testing of TimestampNTZ support to allow native execution. | -| spark.gluten.sql.columnar.backend.velox.fileHandleCacheEnabled | false | Disables caching if false. File handle cache should be disabled if files are mutable, i.e. file content may change while file path stays the same. | -| spark.gluten.sql.columnar.backend.velox.filePreloadThreshold | 1MB | Set the file preload threshold for velox file scan, refer to Velox's file-preload-threshold | -| spark.gluten.sql.columnar.backend.velox.floatingPointMode | loose | Config used to control the tolerance of floating point operations alignment with Spark. When the mode is set to strict, flushing is disabled for sum(float/double)and avg(float/double). When set to loose, flushing will be enabled. | -| spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation | true | Enable flushable aggregation. If true, Gluten will try converting regular aggregation into Velox's flushable aggregation when applicable. A flushable aggregation could emit intermediate result at anytime when memory is full / data reduction ratio is low. | -| spark.gluten.sql.columnar.backend.velox.footerEstimatedSize | 32KB | Set the footer estimated size for velox file scan, refer to Velox's footer-estimated-size | -| spark.gluten.sql.columnar.backend.velox.hashProbe.bloomFilterPushdown.maxSize | 0b | The maximum byte size of Bloom filter that can be generated from hash probe. When set to 0, no Bloom filter will be generated. To achieve optimal performance, this should not be too larger than the CPU cache size on the host. | -| spark.gluten.sql.columnar.backend.velox.hashProbe.dynamicFilterPushdown.enabled | true | Whether hash probe can generate any dynamic filter (including Bloom filter) and push down to upstream operators. | -| spark.gluten.sql.columnar.backend.velox.loadQuantum | 256MB | Set the load quantum for velox file scan, recommend to use the default value (256MB) for performance consideration. If Velox cache is enabled, it can be 8MB at most. | -| spark.gluten.sql.columnar.backend.velox.maxCoalescedBytes | 64MB | Set the max coalesced bytes for velox file scan | -| spark.gluten.sql.columnar.backend.velox.maxCoalescedDistance | 512KB | Set the max coalesced distance bytes for velox file scan | -| spark.gluten.sql.columnar.backend.velox.maxCompiledRegexes | 100 | Controls maximum number of compiled regular expression patterns per function instance per thread of execution. | -| spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemory | <undefined> | Set the max extended memory of partial aggregation in bytes. When this option is set to a value greater than 0, it will override spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio. Note: this option only works when flushable partial aggregation is enabled. Ignored when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. | -| spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio | 0.15 | Set the max extended memory of partial aggregation as maxExtendedPartialAggregationMemoryRatio of offheap size. Note: this option only works when flushable partial aggregation is enabled. Ignored when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. | -| spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemory | <undefined> | Set the max memory of partial aggregation in bytes. When this option is set to a value greater than 0, it will override spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio. Note: this option only works when flushable partial aggregation is enabled. Ignored when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. | -| spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio | 0.1 | Set the max memory of partial aggregation as maxPartialAggregationMemoryRatio of offheap size. Note: this option only works when flushable partial aggregation is enabled. Ignored when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. | -| spark.gluten.sql.columnar.backend.velox.maxPartitionsPerWritersSession | 10000 | Maximum number of partitions per a single table writer instance. | -| spark.gluten.sql.columnar.backend.velox.maxSpillBytes | 100G | The maximum file size of a query | -| spark.gluten.sql.columnar.backend.velox.maxSpillFileSize | 1GB | The maximum size of a single spill file created | -| spark.gluten.sql.columnar.backend.velox.maxSpillLevel | 4 | The max allowed spilling level with zero being the initial spilling level | -| spark.gluten.sql.columnar.backend.velox.maxSpillRunRows | 3M | The maximum row size of a single spill run | -| spark.gluten.sql.columnar.backend.velox.maxTargetFileSize | 0b | The target file size for each output file when writing data. 0 means no limit on target file size, and the actual file size will be determined by other factors such as max partition number and shuffle batch size. | -| spark.gluten.sql.columnar.backend.velox.memCacheSize | 1GB | The memory cache size | -| spark.gluten.sql.columnar.backend.velox.memInitCapacity | 8MB | The initial memory capacity to reserve for a newly created Velox query memory pool. | -| spark.gluten.sql.columnar.backend.velox.memoryPoolCapacityTransferAcrossTasks | true | Whether to allow memory capacity transfer between memory pools from different tasks. | -| spark.gluten.sql.columnar.backend.velox.memoryUseHugePages | false | Use explicit huge pages for Velox memory allocation. | -| spark.gluten.sql.columnar.backend.velox.orc.scan.enabled | true | Enable velox orc scan. If disabled, vanilla spark orc scan will be used. | -| spark.gluten.sql.columnar.backend.velox.orcUseColumnNames | true | Maps table field names to file field names using names, not indices for ORC files. | -| spark.gluten.sql.columnar.backend.velox.parquet.pageSizeBytes | 1MB | The page size in bytes is for compression. | -| spark.gluten.sql.columnar.backend.velox.parquetUseColumnNames | true | Maps table field names to file field names using names, not indices for Parquet files. | -| spark.gluten.sql.columnar.backend.velox.prefetchRowGroups | 1 | Set the prefetch row groups for velox file scan | -| spark.gluten.sql.columnar.backend.velox.queryTraceEnabled | false | Enable query tracing flag. | -| spark.gluten.sql.columnar.backend.velox.reclaimMaxWaitMs | 3600000ms | The max time in ms to wait for memory reclaim. | -| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput | true | If true, combine small columnar batches together before sending to shuffle. The default minimum output batch size is equal to 0.25 * spark.gluten.sql.columnar.maxBatchSize | -| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.minSize | <undefined> | The minimum batch size for shuffle. If size of an input batch is smaller than the value, it will be combined with other batches before sending to shuffle. Only functions when spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput is set to true. Default value: 0.25 * | -| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInputOuptut.minSize | <undefined> | The minimum batch size for shuffle input and output. If size of an input batch is smaller than the value, it will be combined with other batches before sending to shuffle. The same applies for batches output by shuffle read. Only functions when spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput or spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleOutput is set to true. Default value: 0.25 * | -| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleOutput | false | If true, combine small columnar batches together right after shuffle read. The default minimum output batch size is equal to 0.25 * spark.gluten.sql.columnar.maxBatchSize | -| spark.gluten.sql.columnar.backend.velox.showTaskMetricsWhenFinished | false | Show velox full task metrics when finished. | -| spark.gluten.sql.columnar.backend.velox.spillFileSystem | local | The filesystem used to store spill data. local: The local file system. heap-over-local: Write file to JVM heap if having extra heap space. Otherwise write to local file system. | -| spark.gluten.sql.columnar.backend.velox.spillStrategy | auto | none: Disable spill on Velox backend; auto: Let Spark memory manager manage Velox's spilling | -| spark.gluten.sql.columnar.backend.velox.ssdCacheIOThreads | 1 | The IO threads for cache promoting | -| spark.gluten.sql.columnar.backend.velox.ssdCachePath | /tmp | The folder to store the cache files, better on SSD | -| spark.gluten.sql.columnar.backend.velox.ssdCacheShards | 1 | The cache shards | -| spark.gluten.sql.columnar.backend.velox.ssdCacheSize | 1GB | The SSD cache size, will do memory caching only if this value = 0 | -| spark.gluten.sql.columnar.backend.velox.ssdCheckpointIntervalBytes | 0 | Checkpoint after every 'checkpointIntervalBytes' for SSD cache. 0 means no checkpointing. | -| spark.gluten.sql.columnar.backend.velox.ssdChecksumEnabled | false | If true, checksum write to SSD is enabled. | -| spark.gluten.sql.columnar.backend.velox.ssdChecksumReadVerificationEnabled | false | If true, checksum read verification from SSD is enabled. | -| spark.gluten.sql.columnar.backend.velox.ssdDisableFileCow | false | True if copy on write should be disabled. | -| spark.gluten.sql.columnar.backend.velox.ssdODirect | false | The O_DIRECT flag for cache writing | -| spark.gluten.sql.columnar.backend.velox.valueStream.dynamicFilter.enabled | false | Whether to apply dynamic filters pushed down from hash probe in the ValueStream (shuffle reader) operator to filter rows before they reach the hash join. | -| spark.gluten.sql.enable.enhancedFeatures | true | Enable some features including iceberg native write and other features. | -| spark.gluten.sql.rewrite.castArrayToString | true | When true, rewrite `cast(array as String)` to `concat('[', array_join(array, ', ', null), ']')` to allow offloading to Velox. | -| spark.gluten.velox.broadcast.build.targetBytesPerThread | 32MB | It is used to calculate the number of hash table build threads. Based on our testing across various thresholds (1MB to 128MB), we recommend a value of 32MB or 64MB, as these consistently provided the most significant performance gains. | -| spark.gluten.velox.castFromVarcharAddTrimNode | false | If true, will add a trim node which has the same semantic as vanilla Spark to CAST-from-varchar.Otherwise, do nothing. | +| Key | Default | Description | +|----------------------------------------------------------------------------------|-------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| spark.gluten.sql.columnar.backend.velox.IOThreads | <undefined> | The Size of the IO thread pool in the Connector. This thread pool is used for split preloading and DirectBufferedInput. By default, the value is the same as the maximum task slots per Spark executor. | +| spark.gluten.sql.columnar.backend.velox.SplitPreloadPerDriver | 2 | The split preload per task | +| spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct | 90 | If partial aggregation aggregationPct greater than this value, partial aggregation may be early abandoned. Note: this option only works when flushable partial aggregation is enabled. Ignored when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. | +| spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows | 100000 | If partial aggregation input rows number greater than this value, partial aggregation may be early abandoned. Note: this option only works when flushable partial aggregation is enabled. Ignored when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. | +| spark.gluten.sql.columnar.backend.velox.asyncTimeoutOnTaskStopping | 30000ms | Timeout in milliseconds when waiting for runtime-scoped async work to finish during teardown. | +| spark.gluten.sql.columnar.backend.velox.cacheEnabled | false | Enable Velox cache, default off. It's recommended to enablesoft-affinity as well when enable velox cache. | +| spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct | 0 | Set prefetch cache min pct for velox file scan | +| spark.gluten.sql.columnar.backend.velox.checkUsageLeak | true | Enable check memory usage leak. | +| spark.gluten.sql.columnar.backend.velox.cudf.batchSize | 2147483647 | Cudf input batch size after shuffle reader | +| spark.gluten.sql.columnar.backend.velox.cudf.enableTableScan | false | Enable cudf table scan | +| spark.gluten.sql.columnar.backend.velox.cudf.enableValidation | true | Heuristics you can apply to validate a cuDF/GPU plan and only offload when the entire stage can be fully and profitably executed on GPU | +| spark.gluten.sql.columnar.backend.velox.cudf.memoryPercent | 50 | The initial percent of GPU memory to allocate for memory resource for one thread. | +| spark.gluten.sql.columnar.backend.velox.cudf.memoryResource | async | GPU RMM memory resource. | +| spark.gluten.sql.columnar.backend.velox.cudf.shuffleMaxPrefetchBytes | 1028MB | Maximum bytes to prefetch in CPU memory during GPU shuffle read while waiting for GPU available. | +| spark.gluten.sql.columnar.backend.velox.directorySizeGuess | 32KB | Deprecated, rename to spark.gluten.sql.columnar.backend.velox.footerEstimatedSize | +| spark.gluten.sql.columnar.backend.velox.enableTimestampNtzValidation | true | Enable validation fallback for TimestampNTZ type. When true (default), any plan containing TimestampNTZ will fall back to Spark execution. Set to false during development/testing of TimestampNTZ support to allow native execution. | +| spark.gluten.sql.columnar.backend.velox.fileHandleCacheEnabled | false | Disables caching if false. File handle cache should be disabled if files are mutable, i.e. file content may change while file path stays the same. | +| spark.gluten.sql.columnar.backend.velox.filePreloadThreshold | 1MB | Set the file preload threshold for velox file scan, refer to Velox's file-preload-threshold | +| spark.gluten.sql.columnar.backend.velox.floatingPointMode | loose | Config used to control the tolerance of floating point operations alignment with Spark. When the mode is set to strict, flushing is disabled for sum(float/double)and avg(float/double). When set to loose, flushing will be enabled. | +| spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation | true | Enable flushable aggregation. If true, Gluten will try converting regular aggregation into Velox's flushable aggregation when applicable. A flushable aggregation could emit intermediate result at anytime when memory is full / data reduction ratio is low. | +| spark.gluten.sql.columnar.backend.velox.footerEstimatedSize | 32KB | Set the footer estimated size for velox file scan, refer to Velox's footer-estimated-size | +| spark.gluten.sql.columnar.backend.velox.hashProbe.bloomFilterPushdown.maxSize | 0b | The maximum byte size of Bloom filter that can be generated from hash probe. When set to 0, no Bloom filter will be generated. To achieve optimal performance, this should not be too larger than the CPU cache size on the host. | +| spark.gluten.sql.columnar.backend.velox.hashProbe.dynamicFilterPushdown.enabled | true | Whether hash probe can generate any dynamic filter (including Bloom filter) and push down to upstream operators. | +| spark.gluten.sql.columnar.backend.velox.hashShuffle.reader.streamMerge.enabled | false | Enables a reader-side raw payload merge fast path for plain hash shuffle payloads within each shuffle input stream. This path merges payload buffers before Velox vectors are materialized, so it has lower per-batch overhead than generic VeloxResizeBatchesExec resizing, but it only covers plain payloads. Complex types and dictionary-encoded payloads are not merged by this path. VeloxResizeBatchesExec can still be enabled separately as a generic complement for types and encodings not covered by this fast path. If false, each hash shuffle payload is returned as its own columnar batch. | +| spark.gluten.sql.columnar.backend.velox.loadQuantum | 256MB | Set the load quantum for velox file scan, recommend to use the default value (256MB) for performance consideration. If Velox cache is enabled, it can be 8MB at most. | +| spark.gluten.sql.columnar.backend.velox.maxCoalescedBytes | 64MB | Set the max coalesced bytes for velox file scan | +| spark.gluten.sql.columnar.backend.velox.maxCoalescedDistance | 512KB | Set the max coalesced distance bytes for velox file scan | +| spark.gluten.sql.columnar.backend.velox.maxCompiledRegexes | 100 | Controls maximum number of compiled regular expression patterns per function instance per thread of execution. | +| spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemory | <undefined> | Set the max extended memory of partial aggregation in bytes. When this option is set to a value greater than 0, it will override spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio. Note: this option only works when flushable partial aggregation is enabled. Ignored when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. | +| spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio | 0.15 | Set the max extended memory of partial aggregation as maxExtendedPartialAggregationMemoryRatio of offheap size. Note: this option only works when flushable partial aggregation is enabled. Ignored when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. | +| spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemory | <undefined> | Set the max memory of partial aggregation in bytes. When this option is set to a value greater than 0, it will override spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio. Note: this option only works when flushable partial aggregation is enabled. Ignored when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. | +| spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio | 0.1 | Set the max memory of partial aggregation as maxPartialAggregationMemoryRatio of offheap size. Note: this option only works when flushable partial aggregation is enabled. Ignored when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. | +| spark.gluten.sql.columnar.backend.velox.maxPartitionsPerWritersSession | 10000 | Maximum number of partitions per a single table writer instance. | +| spark.gluten.sql.columnar.backend.velox.maxSpillBytes | 100G | The maximum file size of a query | +| spark.gluten.sql.columnar.backend.velox.maxSpillFileSize | 1GB | The maximum size of a single spill file created | +| spark.gluten.sql.columnar.backend.velox.maxSpillLevel | 4 | The max allowed spilling level with zero being the initial spilling level | +| spark.gluten.sql.columnar.backend.velox.maxSpillRunRows | 3M | The maximum row size of a single spill run | +| spark.gluten.sql.columnar.backend.velox.maxTargetFileSize | 0b | The target file size for each output file when writing data. 0 means no limit on target file size, and the actual file size will be determined by other factors such as max partition number and shuffle batch size. | +| spark.gluten.sql.columnar.backend.velox.memCacheSize | 1GB | The memory cache size | +| spark.gluten.sql.columnar.backend.velox.memInitCapacity | 8MB | The initial memory capacity to reserve for a newly created Velox query memory pool. | +| spark.gluten.sql.columnar.backend.velox.memoryPoolCapacityTransferAcrossTasks | true | Whether to allow memory capacity transfer between memory pools from different tasks. | +| spark.gluten.sql.columnar.backend.velox.memoryUseHugePages | false | Use explicit huge pages for Velox memory allocation. | +| spark.gluten.sql.columnar.backend.velox.orc.scan.enabled | true | Enable velox orc scan. If disabled, vanilla spark orc scan will be used. | +| spark.gluten.sql.columnar.backend.velox.orcUseColumnNames | true | Maps table field names to file field names using names, not indices for ORC files. | +| spark.gluten.sql.columnar.backend.velox.parquet.pageSizeBytes | 1MB | The page size in bytes is for compression. | +| spark.gluten.sql.columnar.backend.velox.parquetUseColumnNames | true | Maps table field names to file field names using names, not indices for Parquet files. | +| spark.gluten.sql.columnar.backend.velox.prefetchRowGroups | 1 | Set the prefetch row groups for velox file scan | +| spark.gluten.sql.columnar.backend.velox.queryTraceEnabled | false | Enable query tracing flag. | +| spark.gluten.sql.columnar.backend.velox.reclaimMaxWaitMs | 3600000ms | The max time in ms to wait for memory reclaim. | +| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput | true | If true, combine small columnar batches together before sending to shuffle. The default minimum output batch size is equal to 0.25 * spark.gluten.sql.columnar.maxBatchSize | +| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.minSize | <undefined> | The minimum batch size for shuffle. If size of an input batch is smaller than the value, it will be combined with other batches before sending to shuffle. Only functions when spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput is set to true. Default value: 0.25 * | +| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInputOuptut.minSize | <undefined> | The minimum batch size for shuffle input and output. If size of an input batch is smaller than the value, it will be combined with other batches before sending to shuffle. The same applies for batches output by shuffle read. Only functions when spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput or spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleOutput is set to true. Default value: 0.25 * | +| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleOutput | false | If true, combine small columnar batches together right after shuffle read. The default minimum output batch size is equal to 0.25 * spark.gluten.sql.columnar.maxBatchSize | +| spark.gluten.sql.columnar.backend.velox.showTaskMetricsWhenFinished | false | Show velox full task metrics when finished. | +| spark.gluten.sql.columnar.backend.velox.spillFileSystem | local | The filesystem used to store spill data. local: The local file system. heap-over-local: Write file to JVM heap if having extra heap space. Otherwise write to local file system. | +| spark.gluten.sql.columnar.backend.velox.spillStrategy | auto | none: Disable spill on Velox backend; auto: Let Spark memory manager manage Velox's spilling | +| spark.gluten.sql.columnar.backend.velox.ssdCacheIOThreads | 1 | The IO threads for cache promoting | +| spark.gluten.sql.columnar.backend.velox.ssdCachePath | /tmp | The folder to store the cache files, better on SSD | +| spark.gluten.sql.columnar.backend.velox.ssdCacheShards | 1 | The cache shards | +| spark.gluten.sql.columnar.backend.velox.ssdCacheSize | 1GB | The SSD cache size, will do memory caching only if this value = 0 | +| spark.gluten.sql.columnar.backend.velox.ssdCheckpointIntervalBytes | 0 | Checkpoint after every 'checkpointIntervalBytes' for SSD cache. 0 means no checkpointing. | +| spark.gluten.sql.columnar.backend.velox.ssdChecksumEnabled | false | If true, checksum write to SSD is enabled. | +| spark.gluten.sql.columnar.backend.velox.ssdChecksumReadVerificationEnabled | false | If true, checksum read verification from SSD is enabled. | +| spark.gluten.sql.columnar.backend.velox.ssdDisableFileCow | false | True if copy on write should be disabled. | +| spark.gluten.sql.columnar.backend.velox.ssdODirect | false | The O_DIRECT flag for cache writing | +| spark.gluten.sql.columnar.backend.velox.valueStream.dynamicFilter.enabled | false | Whether to apply dynamic filters pushed down from hash probe in the ValueStream (shuffle reader) operator to filter rows before they reach the hash join. | +| spark.gluten.sql.enable.enhancedFeatures | true | Enable some features including iceberg native write and other features. | +| spark.gluten.sql.rewrite.castArrayToString | true | When true, rewrite `cast(array as String)` to `concat('[', array_join(array, ', ', null), ']')` to allow offloading to Velox. | +| spark.gluten.velox.broadcast.build.targetBytesPerThread | 32MB | It is used to calculate the number of hash table build threads. Based on our testing across various thresholds (1MB to 128MB), we recommend a value of 32MB or 64MB, as these consistently provided the most significant performance gains. | +| spark.gluten.velox.castFromVarcharAddTrimNode | false | If true, will add a trim node which has the same semantic as vanilla Spark to CAST-from-varchar.Otherwise, do nothing. | ## Gluten Velox backend *experimental* configurations diff --git a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java index 6a0f2130d79..449bc865581 100644 --- a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java +++ b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java @@ -42,7 +42,8 @@ public native long make( int batchSize, long readerBufferSize, long deserializerBufferSize, - String shuffleWriterType); + String shuffleWriterType, + boolean enableHashShuffleReaderStreamMerge); public native long read(long shuffleReaderHandle, ShuffleStreamReader streamReader);