diff --git a/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchResizer.java b/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchResizer.java index fec8e059789a..5617c4fc475e 100644 --- a/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchResizer.java +++ b/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchResizer.java @@ -31,6 +31,7 @@ public static ColumnarBatchOutIterator create( int minOutputBatchSize, int maxOutputBatchSize, long preferredBatchBytes, + boolean enableCopyRanges, Iterator in) { final Runtime runtime = Runtimes.contextInstance(BackendsApiManager.getBackendName(), "VeloxBatchResizer"); @@ -40,6 +41,7 @@ public static ColumnarBatchOutIterator create( minOutputBatchSize, maxOutputBatchSize, preferredBatchBytes, + enableCopyRanges, new ColumnarBatchInIterator(BackendsApiManager.getBackendName(), in)); return new ColumnarBatchOutIterator(runtime, outHandle); } diff --git a/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchResizerJniWrapper.java b/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchResizerJniWrapper.java index e5b558e97d3a..908b6a2445b1 100644 --- a/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchResizerJniWrapper.java +++ b/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchResizerJniWrapper.java @@ -40,5 +40,6 @@ public native long create( int minOutputBatchSize, int maxOutputBatchSize, long preferredBatchBytes, + boolean enableCopyRanges, ColumnarBatchInIterator itr); } diff --git a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala index dbc833f046aa..3586d81dbbb3 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala @@ -37,6 +37,9 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) { def veloxResizeBatchesShuffleOutput: Boolean = getConf(COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_OUTPUT) + def enableVeloxResizeBatchesCopyRanges: Boolean = + getConf(COLUMNAR_VELOX_RESIZE_BATCHES_COPY_RANGES_ENABLED) + case class ResizeRange(min: Int, max: Int) { assert(max >= min) assert(min > 0, "Min batch size should be larger than 0") @@ -322,6 +325,24 @@ object VeloxConfig extends ConfigRegistry { .booleanConf .createWithDefault(false) + val COLUMNAR_VELOX_RESIZE_BATCHES_COPY_RANGES_ENABLED = + buildConf("spark.gluten.sql.columnar.backend.velox.resizeBatches.copyRanges.enabled") + .doc( + "Enables a VeloxResizeBatchesExec fast path that combines eligible batches using " + + "Velox vector copyRanges instead of generic RowVector append. When possible, it " + + "collects the small input batches for one VeloxResizeBatchesExec output, allocates " + + "the output RowVector once, and bulk-copies child vector ranges. This is most useful " + + "for shuffle-read outputs where plain hash shuffle payloads are materialized as " + + "dense flat vectors. Complex vectors can also use copyRanges, but ARRAY and MAP " + + "still rebuild nested offsets and sizes while bulk-copying child ranges. Unsupported " + + "encodings such as dictionary and constant vectors fall back to the generic copy " + + "path. This option is enabled by default and complements the reader-side raw " + + "payload merge fast path: that path avoids materializing small plain payload " + + "batches, while this option optimizes VeloxResizeBatchesExec when that operator " + + "is enabled.") + .booleanConf + .createWithDefault(true) + val COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_MIN_SIZE = buildConf("spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.minSize") .doc( diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala index 3b2c9490e75f..0ca76bd97a07 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala @@ -17,6 +17,7 @@ package org.apache.gluten.execution import org.apache.gluten.backendsapi.velox.VeloxBatchType +import org.apache.gluten.config.VeloxConfig import org.apache.gluten.extension.columnar.transition.Convention import org.apache.gluten.iterator.ClosableIterator import org.apache.gluten.utils.VeloxBatchResizer @@ -41,7 +42,12 @@ case class VeloxResizeBatchesExec( override protected def mapIterator(in: Iterator[ColumnarBatch]): Iterator[ColumnarBatch] = { VeloxBatchResizer - .create(minOutputBatchSize, maxOutputBatchSize, preferredBatchBytes, in.asJava) + .create( + minOutputBatchSize, + maxOutputBatchSize, + preferredBatchBytes, + VeloxConfig.get.enableVeloxResizeBatchesCopyRanges, + in.asJava) .asScala } diff --git a/backends-velox/src/test/scala/org/apache/gluten/config/AllVeloxConfiguration.scala b/backends-velox/src/test/scala/org/apache/gluten/config/AllVeloxConfiguration.scala index 65059972b97f..057a3124e7a9 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/config/AllVeloxConfiguration.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/config/AllVeloxConfiguration.scala @@ -81,4 +81,9 @@ class AllVeloxConfiguration extends AnyFunSuite { builder.toMarkdown, "dev/gen-all-config-docs.sh") } + + test("Velox resize batches copyRanges is enabled by default") { + assert( + VeloxConfig.COLUMNAR_VELOX_RESIZE_BATCHES_COPY_RANGES_ENABLED.defaultValue.contains(true)) + } } diff --git a/cpp/velox/benchmarks/CMakeLists.txt b/cpp/velox/benchmarks/CMakeLists.txt index e56627466c91..fdfd7bda25f0 100644 --- a/cpp/velox/benchmarks/CMakeLists.txt +++ b/cpp/velox/benchmarks/CMakeLists.txt @@ -35,3 +35,5 @@ add_velox_benchmark(generic_benchmark GenericBenchmark.cc) add_velox_benchmark(parquet_write_benchmark ParquetWriteBenchmark.cc) add_velox_benchmark(plan_validator_util PlanValidatorUtil.cc) + +add_velox_benchmark(velox_batch_resizer_benchmark VeloxBatchResizerBenchmark.cc) diff --git a/cpp/velox/benchmarks/VeloxBatchResizerBenchmark.cc b/cpp/velox/benchmarks/VeloxBatchResizerBenchmark.cc new file mode 100644 index 000000000000..4584dbe1c84a --- /dev/null +++ b/cpp/velox/benchmarks/VeloxBatchResizerBenchmark.cc @@ -0,0 +1,568 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "memory/ColumnarBatchIterator.h" +#include "memory/VeloxColumnarBatch.h" +#include "shuffle/Payload.h" +#include "utils/Exception.h" +#include "utils/VeloxBatchResizer.h" +#include "velox/common/memory/Memory.h" +#include "velox/vector/ComplexVector.h" +#include "velox/vector/FlatVector.h" + +using namespace facebook::velox; + +namespace gluten { +namespace { + +constexpr int32_t kInputBatches = 64; +constexpr int32_t kRowsPerBatch = 64; +constexpr int32_t kTotalRows = kInputBatches * kRowsPerBatch; +constexpr int64_t kPreferredBatchBytes = std::numeric_limits::max(); + +enum class DenseVectorKind { + kMixed, + kFixedWidth, + kStringOnly, + kBoolHeavy, +}; + +struct DenseBenchmarkScenario { + int32_t inputBatches; + int32_t rowsPerBatch; + DenseVectorKind kind; + int32_t fixedWidthColumns; + int32_t stringBytes; + int32_t boolColumns; + bool nullable; +}; + +constexpr DenseBenchmarkScenario kMixed64x64{kInputBatches, kRowsPerBatch, DenseVectorKind::kMixed, 0, 16, 1, true}; +constexpr DenseBenchmarkScenario kMixed16x256{16, 256, DenseVectorKind::kMixed, 0, 16, 1, true}; +constexpr DenseBenchmarkScenario kMixed256x16{256, 16, DenseVectorKind::kMixed, 0, 16, 1, true}; +constexpr DenseBenchmarkScenario + kFixed2_64x64{kInputBatches, kRowsPerBatch, DenseVectorKind::kFixedWidth, 2, 0, 0, false}; +constexpr DenseBenchmarkScenario + kFixed16_64x64{kInputBatches, kRowsPerBatch, DenseVectorKind::kFixedWidth, 16, 0, 0, false}; +constexpr DenseBenchmarkScenario + kLongString64x64{kInputBatches, kRowsPerBatch, DenseVectorKind::kStringOnly, 0, 64, 0, false}; +constexpr DenseBenchmarkScenario + kBoolHeavy64x64{kInputBatches, kRowsPerBatch, DenseVectorKind::kBoolHeavy, 0, 0, 8, false}; + +class ColumnarBatchArray : public ColumnarBatchIterator { + public: + explicit ColumnarBatchArray(std::vector> batches) : batches_(std::move(batches)) {} + + std::shared_ptr next() override { + if (cursor_ >= batches_.size()) { + return nullptr; + } + return batches_[cursor_++]; + } + + private: + std::vector> batches_; + size_t cursor_{0}; +}; + +std::string makeStringValue(int32_t value, int32_t bytes) { + auto stringValue = std::to_string(value); + if (stringValue.size() < bytes) { + stringValue.append(bytes - stringValue.size(), 'x'); + } + return stringValue; +} + +RowVectorPtr makeMixedVector(memory::MemoryPool* pool, const DenseBenchmarkScenario& scenario, int32_t start) { + const auto rows = scenario.rowsPerBatch; + auto i32 = BaseVector::create>(INTEGER(), rows, pool); + auto i64 = BaseVector::create>(BIGINT(), rows, pool); + auto flag = BaseVector::create>(BOOLEAN(), rows, pool); + auto str = BaseVector::create>(VARCHAR(), rows, pool); + + for (auto row = 0; row < rows; ++row) { + const auto value = start + row; + i32->set(row, value); + if (scenario.nullable && row % 7 == 0) { + i64->setNull(row, true); + } else { + i64->set(row, value); + } + flag->set(row, row % 2 == 0); + const auto stringValue = makeStringValue(value, scenario.stringBytes); + str->set(row, StringView(stringValue)); + } + + return std::make_shared( + pool, + ROW({INTEGER(), BIGINT(), BOOLEAN(), VARCHAR()}), + nullptr, + rows, + std::vector{i32, i64, flag, str}); +} + +RowVectorPtr makeFixedWidthVector(memory::MemoryPool* pool, const DenseBenchmarkScenario& scenario, int32_t start) { + const auto rows = scenario.rowsPerBatch; + std::vector children; + std::vector types; + children.reserve(scenario.fixedWidthColumns); + types.reserve(scenario.fixedWidthColumns); + for (auto channel = 0; channel < scenario.fixedWidthColumns; ++channel) { + auto vector = BaseVector::create>(BIGINT(), rows, pool); + for (auto row = 0; row < rows; ++row) { + vector->set(row, static_cast(start + row + channel)); + } + children.push_back(std::move(vector)); + types.push_back(BIGINT()); + } + + return std::make_shared(pool, ROW(std::move(types)), nullptr, rows, std::move(children)); +} + +RowVectorPtr makeStringVector(memory::MemoryPool* pool, const DenseBenchmarkScenario& scenario, int32_t start) { + const auto rows = scenario.rowsPerBatch; + auto str = BaseVector::create>(VARCHAR(), rows, pool); + for (auto row = 0; row < rows; ++row) { + const auto value = start + row; + const auto stringValue = makeStringValue(value, scenario.stringBytes); + str->set(row, StringView(stringValue)); + } + + return std::make_shared(pool, ROW({VARCHAR()}), nullptr, rows, std::vector{str}); +} + +RowVectorPtr makeBoolHeavyVector(memory::MemoryPool* pool, const DenseBenchmarkScenario& scenario, int32_t start) { + const auto rows = scenario.rowsPerBatch; + std::vector children; + std::vector types; + children.reserve(scenario.boolColumns); + types.reserve(scenario.boolColumns); + for (auto channel = 0; channel < scenario.boolColumns; ++channel) { + auto vector = BaseVector::create>(BOOLEAN(), rows, pool); + for (auto row = 0; row < rows; ++row) { + vector->set(row, (start + row + channel) % 2 == 0); + } + children.push_back(std::move(vector)); + types.push_back(BOOLEAN()); + } + + return std::make_shared(pool, ROW(std::move(types)), nullptr, rows, std::move(children)); +} + +RowVectorPtr makeDenseVector(memory::MemoryPool* pool, const DenseBenchmarkScenario& scenario, int32_t start) { + switch (scenario.kind) { + case DenseVectorKind::kMixed: + return makeMixedVector(pool, scenario, start); + case DenseVectorKind::kFixedWidth: + return makeFixedWidthVector(pool, scenario, start); + case DenseVectorKind::kStringOnly: + return makeStringVector(pool, scenario, start); + case DenseVectorKind::kBoolHeavy: + return makeBoolHeavyVector(pool, scenario, start); + } + VELOX_UNREACHABLE(); +} + +std::vector makeSmallVectors(memory::MemoryPool* pool, const DenseBenchmarkScenario& scenario) { + std::vector vectors; + vectors.reserve(scenario.inputBatches); + for (auto batch = 0; batch < scenario.inputBatches; ++batch) { + vectors.push_back(makeDenseVector(pool, scenario, batch * scenario.rowsPerBatch)); + } + return vectors; +} + +std::unique_ptr makeIterator(const std::vector& vectors) { + std::vector> batches; + batches.reserve(vectors.size()); + for (const auto& vector : vectors) { + batches.push_back(std::make_shared(vector)); + } + return std::make_unique(std::move(batches)); +} + +int64_t totalRows(const DenseBenchmarkScenario& scenario) { + return static_cast(scenario.inputBatches) * scenario.rowsPerBatch; +} + +VeloxBatchResizer makeResizeBenchmarkResizer( + memory::MemoryPool* pool, + int64_t outputBatchSize, + std::unique_ptr iterator, + std::optional enableCopyRanges) { + if (enableCopyRanges.has_value()) { + return VeloxBatchResizer( + pool, + outputBatchSize, + std::numeric_limits::max(), + kPreferredBatchBytes, + std::move(iterator), + enableCopyRanges.value()); + } + return VeloxBatchResizer( + pool, outputBatchSize, std::numeric_limits::max(), kPreferredBatchBytes, std::move(iterator)); +} + +void runResizeBenchmark( + benchmark::State& state, + const DenseBenchmarkScenario& scenario, + std::optional enableCopyRanges) { + auto pool = memory::memoryManager()->addLeafPool("VeloxBatchResizerBenchmark"); + const auto vectors = makeSmallVectors(pool.get(), scenario); + int64_t rows = 0; + + for (auto _ : state) { + auto resizer = makeResizeBenchmarkResizer(pool.get(), totalRows(scenario), makeIterator(vectors), enableCopyRanges); + while (auto out = resizer.next()) { + rows += out->numRows(); + } + } + + benchmark::DoNotOptimize(rows); + state.SetItemsProcessed(static_cast(state.iterations()) * totalRows(scenario)); +} + +void runDirectChildCopyRangesBenchmark(benchmark::State& state, const DenseBenchmarkScenario& scenario) { + auto pool = memory::memoryManager()->addLeafPool("VeloxBatchResizerBenchmarkDirectCopy"); + const auto vectors = makeSmallVectors(pool.get(), scenario); + int64_t rows = 0; + + for (auto _ : state) { + auto output = RowVector::createEmpty(vectors[0]->type(), pool.get()); + output->resize(totalRows(scenario)); + vector_size_t offset = 0; + for (const auto& input : vectors) { + const BaseVector::CopyRange range{0, offset, input->size()}; + for (auto channel = 0; channel < input->children().size(); ++channel) { + output->childAt(channel)->copyRanges(input->childAt(channel)->loadedVector(), folly::Range(&range, 1)); + } + offset += input->size(); + } + rows += output->size(); + benchmark::DoNotOptimize(output); + } + + benchmark::DoNotOptimize(rows); + state.SetItemsProcessed(static_cast(state.iterations()) * totalRows(scenario)); +} + +std::shared_ptr allocatePayloadBuffer(arrow::MemoryPool* pool, int64_t size) { + std::shared_ptr buffer; + GLUTEN_ASSIGN_OR_THROW(buffer, arrow::AllocateResizableBuffer(size, pool)); + memset(buffer->mutable_data(), 0x5A, size); + return buffer; +} + +std::shared_ptr allocateEmptyPayloadBuffer(arrow::MemoryPool* pool, int64_t size) { + std::shared_ptr buffer; + GLUTEN_ASSIGN_OR_THROW(buffer, arrow::AllocateResizableBuffer(size, pool)); + return buffer; +} + +void addFixedWidthRawBuffers( + arrow::MemoryPool* pool, + int32_t rows, + int32_t columns, + int32_t valueBytes, + std::vector& validityBuffers, + std::vector>& buffers) { + for (auto channel = 0; channel < columns; ++channel) { + validityBuffers.push_back(true); + buffers.push_back(nullptr); + validityBuffers.push_back(false); + buffers.push_back(allocatePayloadBuffer(pool, rows * valueBytes)); + } +} + +void addFixedWidthRawLayout(int32_t columns, std::vector& validityBuffers) { + for (auto channel = 0; channel < columns; ++channel) { + validityBuffers.push_back(true); + validityBuffers.push_back(false); + } +} + +void addStringRawBuffers( + arrow::MemoryPool* pool, + int32_t rows, + int32_t stringBytes, + bool nullable, + std::vector& validityBuffers, + std::vector>& buffers) { + validityBuffers.push_back(true); + buffers.push_back(nullable ? allocatePayloadBuffer(pool, arrow::bit_util::BytesForBits(rows)) : nullptr); + validityBuffers.push_back(false); + buffers.push_back(allocatePayloadBuffer(pool, rows * sizeof(int32_t))); + validityBuffers.push_back(false); + buffers.push_back(allocatePayloadBuffer(pool, rows * stringBytes)); +} + +void addStringRawLayout(std::vector& validityBuffers) { + validityBuffers.push_back(true); + validityBuffers.push_back(false); + validityBuffers.push_back(false); +} + +void addBoolRawBuffers( + arrow::MemoryPool* pool, + int32_t rows, + int32_t columns, + std::vector& validityBuffers, + std::vector>& buffers) { + for (auto channel = 0; channel < columns; ++channel) { + validityBuffers.push_back(true); + buffers.push_back(nullptr); + validityBuffers.push_back(true); + buffers.push_back(allocatePayloadBuffer(pool, arrow::bit_util::BytesForBits(rows))); + } +} + +void addBoolRawLayout(int32_t columns, std::vector& validityBuffers) { + for (auto channel = 0; channel < columns; ++channel) { + validityBuffers.push_back(true); + validityBuffers.push_back(true); + } +} + +std::vector makeRawPayloadValidityBuffers(const DenseBenchmarkScenario& scenario) { + std::vector validityBuffers; + switch (scenario.kind) { + case DenseVectorKind::kMixed: + addFixedWidthRawLayout(1, validityBuffers); + validityBuffers.push_back(true); + validityBuffers.push_back(false); + addBoolRawLayout(scenario.boolColumns, validityBuffers); + addStringRawLayout(validityBuffers); + break; + case DenseVectorKind::kFixedWidth: + addFixedWidthRawLayout(scenario.fixedWidthColumns, validityBuffers); + break; + case DenseVectorKind::kStringOnly: + addStringRawLayout(validityBuffers); + break; + case DenseVectorKind::kBoolHeavy: + addBoolRawLayout(scenario.boolColumns, validityBuffers); + break; + } + return validityBuffers; +} + +std::unique_ptr makeRawPayload( + arrow::MemoryPool* pool, + const DenseBenchmarkScenario& scenario, + const std::vector& validityBuffers) { + const auto rows = scenario.rowsPerBatch; + std::vector> buffers; + buffers.reserve(validityBuffers.size()); + std::vector generatedValidityBuffers; + switch (scenario.kind) { + case DenseVectorKind::kMixed: + addFixedWidthRawBuffers(pool, rows, 1, sizeof(int32_t), generatedValidityBuffers, buffers); + generatedValidityBuffers.push_back(true); + buffers.push_back(scenario.nullable ? allocatePayloadBuffer(pool, arrow::bit_util::BytesForBits(rows)) : nullptr); + generatedValidityBuffers.push_back(false); + buffers.push_back(allocatePayloadBuffer(pool, rows * sizeof(int64_t))); + addBoolRawBuffers(pool, rows, scenario.boolColumns, generatedValidityBuffers, buffers); + addStringRawBuffers(pool, rows, scenario.stringBytes, false, generatedValidityBuffers, buffers); + break; + case DenseVectorKind::kFixedWidth: + addFixedWidthRawBuffers( + pool, rows, scenario.fixedWidthColumns, sizeof(int64_t), generatedValidityBuffers, buffers); + break; + case DenseVectorKind::kStringOnly: + addStringRawBuffers(pool, rows, scenario.stringBytes, scenario.nullable, generatedValidityBuffers, buffers); + break; + case DenseVectorKind::kBoolHeavy: + addBoolRawBuffers(pool, rows, scenario.boolColumns, generatedValidityBuffers, buffers); + break; + } + GLUTEN_CHECK(generatedValidityBuffers == validityBuffers, "Invalid raw payload buffer layout"); + return std::make_unique(rows, &validityBuffers, nullptr, std::move(buffers)); +} + +std::vector> makeRawPayloads( + arrow::MemoryPool* pool, + const DenseBenchmarkScenario& scenario, + const std::vector& validityBuffers) { + std::vector> payloads; + payloads.reserve(scenario.inputBatches); + for (auto batch = 0; batch < scenario.inputBatches; ++batch) { + payloads.push_back(makeRawPayload(pool, scenario, validityBuffers)); + } + return payloads; +} + +std::unique_ptr mergeRawPayloadsBulkCopy( + std::vector> payloads, + const std::vector& validityBuffers, + arrow::MemoryPool* pool) { + GLUTEN_CHECK(!payloads.empty(), "Cannot merge empty payloads"); + + const auto numBuffers = payloads[0]->numBuffers(); + std::vector payloadRows; + payloadRows.reserve(payloads.size()); + uint32_t totalRows = 0; + std::vector>> inputBuffers(payloads.size()); + std::vector outputSizes(numBuffers, 0); + std::vector hasBuffer(numBuffers, false); + + for (auto payloadIdx = 0; payloadIdx < payloads.size(); ++payloadIdx) { + const auto rows = payloads[payloadIdx]->numRows(); + payloadRows.push_back(rows); + totalRows += rows; + inputBuffers[payloadIdx].reserve(numBuffers); + for (auto bufferIdx = 0; bufferIdx < numBuffers; ++bufferIdx) { + GLUTEN_ASSIGN_OR_THROW(auto buffer, payloads[payloadIdx]->readBufferAt(bufferIdx)); + if (buffer != nullptr) { + hasBuffer[bufferIdx] = true; + if (validityBuffers[bufferIdx]) { + outputSizes[bufferIdx] = arrow::bit_util::BytesForBits(totalRows); + } else { + outputSizes[bufferIdx] += buffer->size(); + } + } + inputBuffers[payloadIdx].push_back(std::move(buffer)); + } + } + + std::vector> outputBuffers(numBuffers); + for (auto bufferIdx = 0; bufferIdx < numBuffers; ++bufferIdx) { + if (hasBuffer[bufferIdx]) { + outputBuffers[bufferIdx] = allocateEmptyPayloadBuffer(pool, outputSizes[bufferIdx]); + } + } + + std::vector byteOffsets(numBuffers, 0); + uint32_t rowOffset = 0; + for (auto payloadIdx = 0; payloadIdx < inputBuffers.size(); ++payloadIdx) { + const auto rows = payloadRows[payloadIdx]; + for (auto bufferIdx = 0; bufferIdx < numBuffers; ++bufferIdx) { + auto& output = outputBuffers[bufferIdx]; + if (output == nullptr) { + continue; + } + + const auto& input = inputBuffers[payloadIdx][bufferIdx]; + if (validityBuffers[bufferIdx]) { + if (input == nullptr) { + arrow::bit_util::SetBitsTo(output->mutable_data(), rowOffset, rows, true); + } else { + arrow::internal::CopyBitmap(input->data(), 0, rows, output->mutable_data(), rowOffset); + } + } else if (input != nullptr) { + memcpy(output->mutable_data() + byteOffsets[bufferIdx], input->data(), input->size()); + byteOffsets[bufferIdx] += input->size(); + } + } + rowOffset += rows; + } + + return std::make_unique(totalRows, &validityBuffers, nullptr, std::move(outputBuffers)); +} + +void BM_VeloxBatchResizerAppendOptOutBaseline(benchmark::State& state, DenseBenchmarkScenario scenario) { + runResizeBenchmark(state, scenario, false); +} + +void BM_VeloxBatchResizerDefaultCopyRanges(benchmark::State& state, DenseBenchmarkScenario scenario) { + runResizeBenchmark(state, scenario, std::nullopt); +} + +void BM_DirectChildCopyRanges(benchmark::State& state, DenseBenchmarkScenario scenario) { + runDirectChildCopyRangesBenchmark(state, scenario); +} + +void BM_ReaderSideRawPayloadBulkCopyModel(benchmark::State& state, DenseBenchmarkScenario scenario) { + auto* pool = arrow::default_memory_pool(); + const auto validityBuffers = makeRawPayloadValidityBuffers(scenario); + int64_t rows = 0; + + for (auto _ : state) { + state.PauseTiming(); + auto payloads = makeRawPayloads(pool, scenario, validityBuffers); + state.ResumeTiming(); + + auto merged = mergeRawPayloadsBulkCopy(std::move(payloads), validityBuffers, pool); + rows += merged->numRows(); + benchmark::DoNotOptimize(merged); + } + + benchmark::DoNotOptimize(rows); + state.SetItemsProcessed(static_cast(state.iterations()) * totalRows(scenario)); +} + +void BM_ReaderSidePreMergedBatchModel(benchmark::State& state, DenseBenchmarkScenario scenario) { + auto pool = memory::memoryManager()->addLeafPool("VeloxBatchResizerBenchmarkRawMergeModel"); + auto mergedScenario = scenario; + mergedScenario.inputBatches = 1; + mergedScenario.rowsPerBatch = totalRows(scenario); + const std::vector mergedVector{makeDenseVector(pool.get(), mergedScenario, 0)}; + int64_t rows = 0; + + for (auto _ : state) { + VeloxBatchResizer resizer( + pool.get(), + totalRows(scenario), + std::numeric_limits::max(), + kPreferredBatchBytes, + makeIterator(mergedVector), + false); + while (auto out = resizer.next()) { + rows += out->numRows(); + } + } + + benchmark::DoNotOptimize(rows); + state.SetItemsProcessed(static_cast(state.iterations()) * totalRows(scenario)); +} + +#define REGISTER_DENSE_SCENARIO_BENCHMARKS(name, scenario) \ + BENCHMARK_CAPTURE(BM_VeloxBatchResizerAppendOptOutBaseline, name, scenario); \ + BENCHMARK_CAPTURE(BM_VeloxBatchResizerDefaultCopyRanges, name, scenario); \ + BENCHMARK_CAPTURE(BM_DirectChildCopyRanges, name, scenario); \ + BENCHMARK_CAPTURE(BM_ReaderSideRawPayloadBulkCopyModel, name, scenario); \ + BENCHMARK_CAPTURE(BM_ReaderSidePreMergedBatchModel, name, scenario) + +REGISTER_DENSE_SCENARIO_BENCHMARKS(Mixed_64x64, kMixed64x64); +REGISTER_DENSE_SCENARIO_BENCHMARKS(Mixed_16x256, kMixed16x256); +REGISTER_DENSE_SCENARIO_BENCHMARKS(Mixed_256x16, kMixed256x16); +REGISTER_DENSE_SCENARIO_BENCHMARKS(Fixed2_64x64, kFixed2_64x64); +REGISTER_DENSE_SCENARIO_BENCHMARKS(Fixed16_64x64, kFixed16_64x64); +REGISTER_DENSE_SCENARIO_BENCHMARKS(LongString_64x64, kLongString64x64); +REGISTER_DENSE_SCENARIO_BENCHMARKS(BoolHeavy_64x64, kBoolHeavy64x64); + +#undef REGISTER_DENSE_SCENARIO_BENCHMARKS + +} // namespace +} // namespace gluten + +int main(int argc, char** argv) { + facebook::velox::memory::MemoryManager::initialize(facebook::velox::memory::MemoryManager::Options{}); + ::benchmark::Initialize(&argc, argv); + ::benchmark::RunSpecifiedBenchmarks(); + ::benchmark::Shutdown(); + return 0; +} diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc index 72640b21af7d..aa4d95994355 100644 --- a/cpp/velox/jni/VeloxJniWrapper.cc +++ b/cpp/velox/jni/VeloxJniWrapper.cc @@ -464,13 +464,19 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_utils_VeloxBatchResizerJniWrapper jint minOutputBatchSize, jint maxOutputBatchSize, jlong preferredBatchBytes, + jboolean enableCopyRanges, jobject jIter) { JNI_METHOD_START auto ctx = getRuntime(env, wrapper); auto pool = dynamic_cast(ctx->memoryManager())->getLeafMemoryPool(); auto iter = makeJniColumnarBatchIterator(env, jIter, ctx); auto appender = std::make_shared(std::make_unique( - pool.get(), minOutputBatchSize, maxOutputBatchSize, preferredBatchBytes, std::move(iter))); + pool.get(), + minOutputBatchSize, + maxOutputBatchSize, + preferredBatchBytes, + std::move(iter), + enableCopyRanges == JNI_TRUE)); return ctx->saveObject(appender); JNI_METHOD_END(kInvalidObjectHandle) } diff --git a/cpp/velox/tests/VeloxBatchResizerTest.cc b/cpp/velox/tests/VeloxBatchResizerTest.cc index b23606b1c867..715857052b27 100644 --- a/cpp/velox/tests/VeloxBatchResizerTest.cc +++ b/cpp/velox/tests/VeloxBatchResizerTest.cc @@ -16,6 +16,9 @@ */ #include +#include +#include +#include #include "utils/VeloxBatchResizer.h" #include "velox/vector/tests/utils/VectorTestBase.h" @@ -53,6 +56,105 @@ class VeloxBatchResizerTest : public ::testing::Test, public test::VectorTestBas return out; } + RowVectorPtr newDenseFlatVector(size_t numRows, int32_t start = 0) { + std::vector> nullableValues; + nullableValues.reserve(numRows); + std::vector strings; + strings.reserve(numRows); + for (auto i = 0; i < numRows; ++i) { + nullableValues.emplace_back(i % 3 == 0 ? std::nullopt : std::optional(start + i)); + strings.emplace_back("long-string-value-" + std::to_string(start + i)); + } + return makeRowVector( + {"i32", "i64", "flag", "str"}, + {makeFlatVector(numRows, [start](auto row) { return start + row; }), + makeNullableFlatVector(nullableValues), + makeFlatVector(numRows, [](auto row) { return row % 2 == 0; }), + makeFlatVector(numRows, [&strings](auto row) { return StringView(strings[row]); })}); + } + + RowVectorPtr newFlatIntVector(size_t numRows, int32_t start = 0) { + return makeRowVector({"i32"}, {makeFlatVector(numRows, [start](auto row) { return start + row; })}); + } + + RowVectorPtr newComplexVector(size_t numRows, int32_t start = 0) { + std::vector> arrays; + arrays.reserve(numRows); + for (auto i = 0; i < numRows; ++i) { + arrays.push_back({start + static_cast(i), start + static_cast(i) + 1}); + } + return makeRowVector( + {"i32", "arr"}, + {makeFlatVector(numRows, [start](auto row) { return start + row; }), + makeArrayVector(arrays)}); + } + + RowVectorPtr newNullableComplexVector(size_t numRows, int32_t start = 0) { + std::vector>>> arrays; + arrays.reserve(numRows); + for (auto i = 0; i < numRows; ++i) { + if (i % 5 == 0) { + arrays.emplace_back(std::nullopt); + } else { + arrays.emplace_back(std::vector>{ + start + static_cast(i), std::nullopt, start + static_cast(i) + 1}); + } + } + return makeRowVector( + {"i32", "arr"}, + {makeFlatVector(numRows, [start](auto row) { return start + row; }), + makeNullableArrayVector(arrays)}); + } + + RowVectorPtr newMapVector(size_t numRows, int32_t start = 0) { + std::vector>>> maps; + maps.reserve(numRows); + for (auto i = 0; i < numRows; ++i) { + const auto value = start + static_cast(i); + maps.push_back({{value, value + 1}, {value + 2, value + 3}}); + } + return makeRowVector( + {"i32", "map"}, + {makeFlatVector(numRows, [start](auto row) { return start + row; }), + makeMapVector(maps)}); + } + + RowVectorPtr newNullableMapVector(size_t numRows, int32_t start = 0) { + std::vector>>>> maps; + maps.reserve(numRows); + for (auto i = 0; i < numRows; ++i) { + if (i % 5 == 0) { + maps.emplace_back(std::nullopt); + } else { + const auto value = start + static_cast(i); + maps.emplace_back( + std::vector>>{{value, value + 1}, {value + 2, std::nullopt}}); + } + } + return makeRowVector( + {"i32", "map"}, + {makeFlatVector(numRows, [start](auto row) { return start + row; }), + makeNullableMapVector(maps)}); + } + + RowVectorPtr newDictionaryVector(size_t numRows, int32_t start = 0) { + auto base = makeFlatVector(numRows, [start](auto row) { return start + row; }); + auto indices = makeIndices(numRows, [](auto row) { return row; }); + return makeRowVector({"dict"}, {wrapInDictionary(indices, numRows, base)}); + } + + RowVectorPtr newTopLevelNullVector(size_t numRows, int32_t start = 0) { + auto nulls = allocateNulls(numRows, pool()); + bits::setNull(nulls->asMutable(), 0, true); + return std::make_shared( + pool(), + ROW({"i32"}, {INTEGER()}), + nulls, + numRows, + std::vector{makeFlatVector(numRows, [start](auto row) { return start + row; })}, + 1); + } + void checkResize( int32_t min, int32_t max, @@ -76,6 +178,42 @@ class VeloxBatchResizerTest : public ::testing::Test, public test::VectorTestBas } ASSERT_EQ(actualOutSizes, outSizes); } + + RowVectorPtr + resizeOnce(const std::vector& vectors, bool enableDenseFlatCopy, VeloxBatchResizeStats* stats) { + auto out = resizeAll(vectors, 100, std::numeric_limits::max(), (10L << 20), enableDenseFlatCopy, stats); + EXPECT_EQ(out.size(), 1); + return out[0]; + } + + std::vector resizeAll( + const std::vector& vectors, + int32_t minOutputBatchSize, + int32_t maxOutputBatchSize, + int64_t preferredBatchBytes, + bool enableDenseFlatCopy, + VeloxBatchResizeStats* stats) { + std::vector> inBatches; + inBatches.reserve(vectors.size()); + for (const auto& vector : vectors) { + inBatches.push_back(std::make_shared(vector)); + } + VeloxBatchResizer resizer( + pool(), + minOutputBatchSize, + maxOutputBatchSize, + preferredBatchBytes, + std::make_unique(std::move(inBatches)), + enableDenseFlatCopy, + stats); + std::vector out; + while (auto next = resizer.next()) { + auto veloxBatch = std::dynamic_pointer_cast(next); + EXPECT_NE(veloxBatch, nullptr); + out.push_back(veloxBatch->getRowVector()); + } + return out; + } }; TEST_F(VeloxBatchResizerTest, sanity) { @@ -100,4 +238,185 @@ TEST_F(VeloxBatchResizerTest, preferredBatchBytesTest) { ASSERT_ANY_THROW(checkResize(0, 0, 0, {}, {})); } +TEST_F(VeloxBatchResizerTest, denseFlatCopyDisabledUsesAppendPath) { + VeloxBatchResizeStats stats; + auto vectors = std::vector{newDenseFlatVector(30, 0), newDenseFlatVector(40, 100)}; + auto actual = resizeOnce(vectors, false, &stats); + auto appendStats = VeloxBatchResizeStats{}; + auto expected = resizeOnce(vectors, false, &appendStats); + test::assertEqualVectors(expected, actual); + EXPECT_EQ(stats.copyRangesBatches, 0); + EXPECT_EQ(stats.appendCopyBatches, 2); +} + +TEST_F(VeloxBatchResizerTest, denseFlatCopyEnabledUsesCopyRangesForFixedWidthAndString) { + VeloxBatchResizeStats stats; + auto vectors = std::vector{newDenseFlatVector(30, 0), newDenseFlatVector(40, 100)}; + auto appendStats = VeloxBatchResizeStats{}; + auto expected = resizeOnce(vectors, false, &appendStats); + + auto actual = resizeOnce(vectors, true, &stats); + + test::assertEqualVectors(expected, actual); + EXPECT_EQ(stats.copyRangesBatches, 2); + EXPECT_EQ(stats.copyRangesOutputBatches, 1); + EXPECT_EQ(stats.appendCopyBatches, 0); +} + +TEST_F(VeloxBatchResizerTest, copyRangesEnabledSupportsComplexType) { + VeloxBatchResizeStats stats; + auto vectors = std::vector{newComplexVector(30, 0), newComplexVector(40, 100)}; + auto appendStats = VeloxBatchResizeStats{}; + auto expected = resizeOnce(vectors, false, &appendStats); + + auto actual = resizeOnce(vectors, true, &stats); + + test::assertEqualVectors(expected, actual); + EXPECT_EQ(stats.copyRangesBatches, 2); + EXPECT_EQ(stats.copyRangesOutputBatches, 1); + EXPECT_EQ(stats.appendCopyBatches, 0); + EXPECT_EQ(stats.copyRangesFallbackBatches, 0); +} + +TEST_F(VeloxBatchResizerTest, copyRangesEnabledSupportsNullableComplexType) { + VeloxBatchResizeStats stats; + auto vectors = std::vector{newNullableComplexVector(30, 0), newNullableComplexVector(40, 100)}; + auto appendStats = VeloxBatchResizeStats{}; + auto expected = resizeOnce(vectors, false, &appendStats); + + auto actual = resizeOnce(vectors, true, &stats); + + test::assertEqualVectors(expected, actual); + EXPECT_EQ(stats.copyRangesBatches, 2); + EXPECT_EQ(stats.copyRangesOutputBatches, 1); + EXPECT_EQ(stats.appendCopyBatches, 0); + EXPECT_EQ(stats.copyRangesFallbackBatches, 0); +} + +TEST_F(VeloxBatchResizerTest, copyRangesEnabledSupportsMapType) { + VeloxBatchResizeStats stats; + auto vectors = std::vector{newMapVector(30, 0), newMapVector(40, 100)}; + auto appendStats = VeloxBatchResizeStats{}; + auto expected = resizeOnce(vectors, false, &appendStats); + + auto actual = resizeOnce(vectors, true, &stats); + + test::assertEqualVectors(expected, actual); + EXPECT_EQ(stats.copyRangesBatches, 2); + EXPECT_EQ(stats.copyRangesOutputBatches, 1); + EXPECT_EQ(stats.appendCopyBatches, 0); + EXPECT_EQ(stats.copyRangesFallbackBatches, 0); +} + +TEST_F(VeloxBatchResizerTest, copyRangesEnabledSupportsNullableMapType) { + VeloxBatchResizeStats stats; + auto vectors = std::vector{newNullableMapVector(30, 0), newNullableMapVector(40, 100)}; + auto appendStats = VeloxBatchResizeStats{}; + auto expected = resizeOnce(vectors, false, &appendStats); + + auto actual = resizeOnce(vectors, true, &stats); + + test::assertEqualVectors(expected, actual); + EXPECT_EQ(stats.copyRangesBatches, 2); + EXPECT_EQ(stats.copyRangesOutputBatches, 1); + EXPECT_EQ(stats.appendCopyBatches, 0); + EXPECT_EQ(stats.copyRangesFallbackBatches, 0); +} + +TEST_F(VeloxBatchResizerTest, copyRangesEnabledFallsBackForConstantEncoding) { + VeloxBatchResizeStats stats; + auto vectors = std::vector{newVector(30), newVector(40)}; + auto appendStats = VeloxBatchResizeStats{}; + auto expected = resizeOnce(vectors, false, &appendStats); + + auto actual = resizeOnce(vectors, true, &stats); + + test::assertEqualVectors(expected, actual); + EXPECT_EQ(stats.copyRangesBatches, 0); + EXPECT_EQ(stats.appendCopyBatches, 2); + EXPECT_EQ(stats.copyRangesFallbackBatches, 2); +} + +TEST_F(VeloxBatchResizerTest, copyRangesEnabledFallsBackForTopLevelNulls) { + VeloxBatchResizeStats stats; + auto vectors = std::vector{newTopLevelNullVector(30, 0), newTopLevelNullVector(40, 100)}; + auto appendStats = VeloxBatchResizeStats{}; + auto expected = resizeOnce(vectors, false, &appendStats); + + auto actual = resizeOnce(vectors, true, &stats); + + test::assertEqualVectors(expected, actual); + EXPECT_EQ(stats.copyRangesBatches, 0); + EXPECT_EQ(stats.copyRangesOutputBatches, 0); + EXPECT_EQ(stats.appendCopyBatches, 2); + EXPECT_EQ(stats.copyRangesFallbackBatches, 2); +} + +TEST_F(VeloxBatchResizerTest, copyRangesEnabledCanMixSmallDenseSparseAndDenseBatches) { + VeloxBatchResizeStats stats; + auto vectors = std::vector{newFlatIntVector(30, 0), newVector(40), newFlatIntVector(20, 100)}; + auto appendStats = VeloxBatchResizeStats{}; + auto expected = resizeOnce(vectors, false, &appendStats); + + auto actual = resizeOnce(vectors, true, &stats); + + ASSERT_EQ(actual->size(), expected->size()); + EXPECT_EQ(actual->size(), 90); + test::assertEqualVectors(expected, actual); + EXPECT_EQ(stats.copyRangesBatches, 2); + EXPECT_EQ(stats.copyRangesOutputBatches, 1); + EXPECT_EQ(stats.appendCopyBatches, 1); + EXPECT_EQ(stats.copyRangesFallbackBatches, 1); +} + +TEST_F(VeloxBatchResizerTest, copyRangesEnabledFlushesCollectedInputsBeforeSplit) { + VeloxBatchResizeStats stats; + auto vectors = + std::vector{newFlatIntVector(40, 0), newFlatIntVector(40, 100), newFlatIntVector(40, 200)}; + + auto actual = resizeAll(vectors, 100, 50, (10L << 20), true, &stats); + + ASSERT_EQ(actual.size(), 3); + test::assertEqualVectors(vectors[0], actual[0]); + test::assertEqualVectors(vectors[1], actual[1]); + test::assertEqualVectors(vectors[2], actual[2]); + EXPECT_EQ(stats.copyRangesBatches, 2); + EXPECT_EQ(stats.copyRangesOutputBatches, 2); + EXPECT_EQ(stats.appendCopyBatches, 0); + EXPECT_EQ(stats.copyRangesFallbackBatches, 0); +} + +TEST_F(VeloxBatchResizerTest, copyRangesEnabledFlushesCollectedInputsAtEndOfInput) { + VeloxBatchResizeStats stats; + auto vectors = + std::vector{newFlatIntVector(30, 0), newFlatIntVector(40, 100), newFlatIntVector(20, 200)}; + auto appendStats = VeloxBatchResizeStats{}; + auto expected = resizeAll(vectors, 100, std::numeric_limits::max(), (10L << 20), false, &appendStats); + + auto actual = resizeAll(vectors, 100, std::numeric_limits::max(), (10L << 20), true, &stats); + + ASSERT_EQ(actual.size(), 1); + ASSERT_EQ(expected.size(), 1); + test::assertEqualVectors(expected[0], actual[0]); + EXPECT_EQ(actual[0]->size(), 90); + EXPECT_EQ(stats.copyRangesBatches, 3); + EXPECT_EQ(stats.copyRangesOutputBatches, 1); + EXPECT_EQ(stats.appendCopyBatches, 0); + EXPECT_EQ(stats.copyRangesFallbackBatches, 0); +} + +TEST_F(VeloxBatchResizerTest, copyRangesEnabledFallsBackForDictionaryEncoding) { + VeloxBatchResizeStats stats; + auto vectors = std::vector{newDictionaryVector(30, 0), newDictionaryVector(40, 100)}; + auto appendStats = VeloxBatchResizeStats{}; + auto expected = resizeOnce(vectors, false, &appendStats); + + auto actual = resizeOnce(vectors, true, &stats); + + test::assertEqualVectors(expected, actual); + EXPECT_EQ(stats.copyRangesBatches, 0); + EXPECT_EQ(stats.appendCopyBatches, 2); + EXPECT_EQ(stats.copyRangesFallbackBatches, 2); +} + } // namespace gluten diff --git a/cpp/velox/utils/VeloxBatchResizer.cc b/cpp/velox/utils/VeloxBatchResizer.cc index a9196a467194..c67b1c751110 100644 --- a/cpp/velox/utils/VeloxBatchResizer.cc +++ b/cpp/velox/utils/VeloxBatchResizer.cc @@ -20,6 +20,52 @@ namespace gluten { namespace { +bool supportsCopyRanges(const facebook::velox::VectorPtr& vector) { + if (vector == nullptr) { + return false; + } + + if (vector->isFlatEncoding()) { + return true; + } + + if (vector->encoding() != facebook::velox::VectorEncoding::Simple::ROW && + vector->encoding() != facebook::velox::VectorEncoding::Simple::ARRAY && + vector->encoding() != facebook::velox::VectorEncoding::Simple::MAP) { + return false; + } + + switch (vector->typeKind()) { + case facebook::velox::TypeKind::ROW: { + const auto* row = vector->as(); + for (const auto& child : row->children()) { + if (!supportsCopyRanges(child)) { + return false; + } + } + return true; + } + case facebook::velox::TypeKind::ARRAY: { + const auto* array = vector->as(); + return supportsCopyRanges(array->elements()); + } + case facebook::velox::TypeKind::MAP: { + const auto* map = vector->as(); + return supportsCopyRanges(map->mapKeys()) && supportsCopyRanges(map->mapValues()); + } + default: + return false; + } +} + +bool supportsCopyRanges(const facebook::velox::RowVectorPtr& rowVector) { + if (rowVector == nullptr || rowVector->encoding() != facebook::velox::VectorEncoding::Simple::ROW || + rowVector->mayHaveNulls()) { + return false; + } + return supportsCopyRanges(std::static_pointer_cast(rowVector)); +} + class SliceRowVector : public ColumnarBatchIterator { public: SliceRowVector(int32_t maxOutputBatchSize, facebook::velox::RowVectorPtr in) @@ -50,17 +96,100 @@ gluten::VeloxBatchResizer::VeloxBatchResizer( int32_t minOutputBatchSize, int32_t maxOutputBatchSize, int64_t preferredBatchBytes, - std::unique_ptr in) + std::unique_ptr in, + bool enableCopyRanges, + VeloxBatchResizeStats* stats) : pool_(pool), minOutputBatchSize_(minOutputBatchSize), maxOutputBatchSize_(maxOutputBatchSize), preferredBatchBytes_(static_cast(preferredBatchBytes)), - in_(std::move(in)) { + enableCopyRanges_(enableCopyRanges), + in_(std::move(in)), + stats_(stats) { GLUTEN_CHECK( minOutputBatchSize_ > 0 && maxOutputBatchSize_ > 0, "Either minOutputBatchSize or maxOutputBatchSize should be larger than 0"); } +void VeloxBatchResizer::appendToBuffer( + facebook::velox::RowVectorPtr& buffer, + const facebook::velox::RowVectorPtr& input) { + buffer->append(input.get()); + if (stats_ != nullptr) { + ++stats_->appendCopyBatches; + } +} + +facebook::velox::RowVectorPtr VeloxBatchResizer::copyBufferedInputs( + const std::vector& inputs) { + GLUTEN_CHECK(!inputs.empty(), "Cannot copy empty inputs"); + + facebook::velox::vector_size_t totalRows = 0; + for (const auto& input : inputs) { + totalRows += input->size(); + } + + auto buffer = facebook::velox::RowVector::createEmpty(inputs[0]->type(), pool_); + buffer->resize(totalRows); + + bool usedCopyRanges = false; + facebook::velox::vector_size_t offset = 0; + for (const auto& input : inputs) { + if (supportsCopyRanges(input)) { + const facebook::velox::BaseVector::CopyRange range{0, offset, input->size()}; + for (auto channel = 0; channel < input->children().size(); ++channel) { + buffer->childAt(channel)->copyRanges(input->childAt(channel)->loadedVector(), folly::Range(&range, 1)); + } + usedCopyRanges = true; + if (stats_ != nullptr) { + ++stats_->copyRangesBatches; + } + } else { + buffer->copy(input.get(), offset, 0, input->size()); + if (stats_ != nullptr) { + ++stats_->copyRangesFallbackBatches; + ++stats_->appendCopyBatches; + } + } + offset += input->size(); + } + + if (usedCopyRanges && stats_ != nullptr) { + ++stats_->copyRangesOutputBatches; + } + return buffer; +} + +std::shared_ptr VeloxBatchResizer::collectAndCopy( + facebook::velox::RowVectorPtr firstInput, + uint64_t numBytes) { + std::vector inputs; + inputs.push_back(std::move(firstInput)); + facebook::velox::vector_size_t bufferedRows = inputs.back()->size(); + + std::shared_ptr cb; + for (cb = in_->next(); cb != nullptr; cb = in_->next()) { + auto vb = VeloxColumnarBatch::from(pool_, cb); + auto rv = vb->getRowVector(); + uint64_t addedBytes = cb->numBytes(); + if (bufferedRows + rv->size() > maxOutputBatchSize_ || + numBytes + addedBytes > static_cast(preferredBatchBytes_)) { + GLUTEN_CHECK(next_ == nullptr, "Invalid state"); + next_ = std::make_unique(maxOutputBatchSize_, rv); + return std::make_shared(copyBufferedInputs(inputs)); + } + + numBytes += addedBytes; + bufferedRows += rv->size(); + inputs.push_back(std::move(rv)); + if (bufferedRows >= minOutputBatchSize_) { + break; + } + } + + return std::make_shared(copyBufferedInputs(inputs)); +} + std::shared_ptr VeloxBatchResizer::next() { if (next_) { auto next = next_->next(); @@ -81,8 +210,12 @@ std::shared_ptr VeloxBatchResizer::next() { if (cb->numRows() < minOutputBatchSize_ && numBytes <= preferredBatchBytes_) { auto vb = VeloxColumnarBatch::from(pool_, cb); auto rv = vb->getRowVector(); + if (enableCopyRanges_) { + return collectAndCopy(std::move(rv), numBytes); + } + auto buffer = facebook::velox::RowVector::createEmpty(rv->type(), pool_); - buffer->append(rv.get()); + appendToBuffer(buffer, rv); for (cb = in_->next(); cb != nullptr; cb = in_->next()) { vb = VeloxColumnarBatch::from(pool_, cb); @@ -95,7 +228,7 @@ std::shared_ptr VeloxBatchResizer::next() { return std::make_shared(buffer); } numBytes += addedBytes; - buffer->append(rv.get()); + appendToBuffer(buffer, rv); if (buffer->size() >= minOutputBatchSize_) { // Buffer is full. break; diff --git a/cpp/velox/utils/VeloxBatchResizer.h b/cpp/velox/utils/VeloxBatchResizer.h index 8afd191dca4f..5a6b71e931bf 100644 --- a/cpp/velox/utils/VeloxBatchResizer.h +++ b/cpp/velox/utils/VeloxBatchResizer.h @@ -23,6 +23,15 @@ namespace gluten { +struct VeloxBatchResizeStats { + int64_t copyRangesBatches{0}; + int64_t copyRangesOutputBatches{0}; + // Counts generic copies: RowVector::append when copyRanges is disabled and + // RowVector::copy fallbacks when copyRanges is enabled. + int64_t appendCopyBatches{0}; + int64_t copyRangesFallbackBatches{0}; +}; + class VeloxBatchResizer : public ColumnarBatchIterator { public: VeloxBatchResizer( @@ -30,7 +39,9 @@ class VeloxBatchResizer : public ColumnarBatchIterator { int32_t minOutputBatchSize, int32_t maxOutputBatchSize, int64_t preferredBatchBytes, - std::unique_ptr in); + std::unique_ptr in, + bool enableCopyRanges = true, + VeloxBatchResizeStats* stats = nullptr); std::shared_ptr next() override; @@ -41,9 +52,17 @@ class VeloxBatchResizer : public ColumnarBatchIterator { const int32_t minOutputBatchSize_; const int32_t maxOutputBatchSize_; const uint64_t preferredBatchBytes_; + const bool enableCopyRanges_; std::unique_ptr in_; + VeloxBatchResizeStats* stats_; std::unique_ptr next_ = nullptr; + + void appendToBuffer(facebook::velox::RowVectorPtr& buffer, const facebook::velox::RowVectorPtr& input); + + facebook::velox::RowVectorPtr copyBufferedInputs(const std::vector& inputs); + + std::shared_ptr collectAndCopy(facebook::velox::RowVectorPtr firstInput, uint64_t numBytes); }; } // namespace gluten diff --git a/docs/Configuration.md b/docs/Configuration.md index 294e6c010ff0..a68f400f173d 100644 --- a/docs/Configuration.md +++ b/docs/Configuration.md @@ -20,132 +20,132 @@ nav_order: 15 ## Gluten configurations -| Key | Default | Description | -|--------------------------------------------------------------------|-------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| spark.gluten.costModel | legacy | The class name of user-defined cost model that will be used by Gluten's transition planner. If not specified, a legacy built-in cost model will be used. The legacy cost model helps RAS planner exhaustively offload computations, and helps transition planner choose columnar-to-columnar transition over others. | -| spark.gluten.enabled | true | Whether to enable gluten. Default value is true. Just an experimental property. Recommend to enable/disable Gluten through the setting for spark.plugins. | -| spark.gluten.execution.resource.expired.time | 86400 | Expired time of execution with resource relation has cached. | -| spark.gluten.expression.blacklist | <undefined> | A black list of expression to skip transform, multiple values separated by commas. | -| spark.gluten.loadLibFromJar | false | Whether to load shared libraries from jars. | -| spark.gluten.loadLibOS | <undefined> | The shared library loader's OS name. | -| spark.gluten.loadLibOSVersion | <undefined> | The shared library loader's OS version. | -| spark.gluten.memory.isolation | false | Enable isolated memory mode. If true, Gluten controls the maximum off-heap memory can be used by each task to X, X = executor memory / max task slots. It's recommended to set true if Gluten serves concurrent queries within a single session, since not all memory Gluten allocated is guaranteed to be spillable. In the case, the feature should be enabled to avoid OOM. | -| spark.gluten.memory.overAcquiredMemoryRatio | 0.3 | If larger than 0, Velox backend will try over-acquire this ratio of the total allocated memory as backup to avoid OOM. | -| spark.gluten.memory.reservationBlockSize | 8MB | Block size of native reservation listener reserve memory from Spark. | -| spark.gluten.numTaskSlotsPerExecutor | -1 | Must provide default value since non-execution operations (e.g. org.apache.spark.sql.Dataset#summary) doesn't propagate configurations using org.apache.spark.sql.execution.SQLExecution#withSQLConfPropagated | -| spark.gluten.shuffleWriter.bufferSize | <undefined> | -| spark.gluten.soft-affinity.duplicateReading.maxCacheItems | 10000 | Enable Soft Affinity duplicate reading detection | -| spark.gluten.soft-affinity.duplicateReadingDetect.enabled | false | If true, Enable Soft Affinity duplicate reading detection | -| spark.gluten.soft-affinity.enabled | false | Whether to enable Soft Affinity scheduling. | -| spark.gluten.soft-affinity.min.target-hosts | 1 | For on HDFS, if there are already target hosts, and then prefer to use the original target hosts to schedule | -| spark.gluten.soft-affinity.replications.num | 2 | Calculate the number of the replications for scheduling to the target executors per file | -| spark.gluten.sql.adaptive.costEvaluator.enabled | true | If true, use org.apache.spark.sql.execution.adaptive.GlutenCostEvaluator as custom cost evaluator class, else follow the configuration spark.sql.adaptive.customCostEvaluatorClass. | -| spark.gluten.sql.ansiFallback.enabled | true | When true (default), Gluten will fall back to Spark when ANSI mode is enabled. When false, Gluten will attempt to execute in ANSI mode. | -| spark.gluten.sql.broadcastNestedLoopJoinTransformerEnabled | true | Config to enable BroadcastNestedLoopJoinExecTransformer. | -| spark.gluten.sql.cacheWholeStageTransformerContext | false | When true, `WholeStageTransformer` will cache the `WholeStageTransformerContext` when executing. It is used to get substrait plan node and native plan string. | -| spark.gluten.sql.cartesianProductTransformerEnabled | true | Config to enable CartesianProductExecTransformer. | -| spark.gluten.sql.collapseGetJsonObject.enabled | false | Collapse nested get_json_object functions as one for optimization. | -| spark.gluten.sql.columnar.appendData | true | Enable or disable columnar v2 command append data. | -| spark.gluten.sql.columnar.arrowUdf | true | Enable or disable columnar arrow udf. | -| spark.gluten.sql.columnar.batchscan | true | Enable or disable columnar batchscan. | -| spark.gluten.sql.columnar.broadcastExchange | true | Enable or disable columnar broadcastExchange. | -| spark.gluten.sql.columnar.broadcastJoin | true | Enable or disable columnar broadcastJoin. | -| spark.gluten.sql.columnar.cast.avg | true | -| spark.gluten.sql.columnar.coalesce | true | Enable or disable columnar coalesce. | -| spark.gluten.sql.columnar.collectLimit | true | Enable or disable columnar collectLimit. | -| spark.gluten.sql.columnar.collectTail | true | Enable or disable columnar collectTail. | -| spark.gluten.sql.columnar.enableNestedColumnPruningInHiveTableScan | true | Enable or disable nested column pruning in hivetablescan. | -| spark.gluten.sql.columnar.enableVanillaVectorizedReaders | true | Enable or disable vanilla vectorized scan. | -| spark.gluten.sql.columnar.executor.libpath || The gluten executor library path. | -| spark.gluten.sql.columnar.expand | true | Enable or disable columnar expand. | -| spark.gluten.sql.columnar.fallback.expressions.threshold | 50 | Fall back filter/project if number of nested expressions reaches this threshold, considering Spark codegen can bring better performance for such case. | -| spark.gluten.sql.columnar.fallback.ignoreRowToColumnar | true | When true, the fallback policy ignores the RowToColumnar when counting fallback number. | -| spark.gluten.sql.columnar.fallback.preferColumnar | true | When true, the fallback policy prefers to use Gluten plan rather than vanilla Spark plan if the both of them contains ColumnarToRow and the vanilla Spark plan ColumnarToRow number is not smaller than Gluten plan. | -| spark.gluten.sql.columnar.filescan | true | Enable or disable columnar filescan. | -| spark.gluten.sql.columnar.filter | true | Enable or disable columnar filter. | -| spark.gluten.sql.columnar.force.hashagg | true | Whether to force to use gluten's hash agg for replacing vanilla spark's sort agg. | -| spark.gluten.sql.columnar.forceShuffledHashJoin | true | -| spark.gluten.sql.columnar.generate | true | -| spark.gluten.sql.columnar.hashagg | true | Enable or disable columnar hashagg. | -| spark.gluten.sql.columnar.hivetablescan | true | Enable or disable columnar hivetablescan. | -| spark.gluten.sql.columnar.libname | gluten | The gluten library name. | -| spark.gluten.sql.columnar.libpath || The gluten library path. | -| spark.gluten.sql.columnar.limit | true | -| spark.gluten.sql.columnar.maxBatchSize | 4096 | -| spark.gluten.sql.columnar.overwriteByExpression | true | Enable or disable columnar v2 command overwrite by expression. | -| spark.gluten.sql.columnar.overwritePartitionsDynamic | true | Enable or disable columnar v2 command overwrite partitions dynamic. | -| spark.gluten.sql.columnar.parquet.write.blockSize | 128MB | -| spark.gluten.sql.columnar.partial.generate | true | Evaluates the non-offload-able HiveUDTF using vanilla Spark generator | -| spark.gluten.sql.columnar.partial.project | true | Break up one project node into 2 phases when some of the expressions are non offload-able. Phase one is a regular offloaded project transformer that evaluates the offload-able expressions in native, phase two preserves the output from phase one and evaluates the remaining non-offload-able expressions using vanilla Spark projections | -| spark.gluten.sql.columnar.physicalJoinOptimizationLevel | 12 | Fallback to row operators if there are several continuous joins. | -| spark.gluten.sql.columnar.physicalJoinOptimizationOutputSize | 52 | Fallback to row operators if there are several continuous joins and matched output size. | -| spark.gluten.sql.columnar.physicalJoinOptimizeEnable | false | Enable or disable columnar physicalJoinOptimize. | -| spark.gluten.sql.columnar.preferStreamingAggregate | true | Velox backend supports `StreamingAggregate`. `StreamingAggregate` uses the less memory as it does not need to hold all groups in memory, so it could avoid spill. When true and the child output ordering satisfies the grouping key then Gluten will choose `StreamingAggregate` as the native operator. | -| spark.gluten.sql.columnar.project | true | Enable or disable columnar project. | -| spark.gluten.sql.columnar.project.collapse | true | Combines two columnar project operators into one and perform alias substitution | -| spark.gluten.sql.columnar.query.fallback.threshold | -1 | The threshold for whether query will fall back by counting the number of ColumnarToRow & vanilla leaf node. | -| spark.gluten.sql.columnar.range | true | Enable or disable columnar range. | -| spark.gluten.sql.columnar.replaceData | true | Enable or disable columnar v2 command replace data. | -| spark.gluten.sql.columnar.scanOnly | false | When enabled, only scan and the filter after scan will be offloaded to native. | -| spark.gluten.sql.columnar.shuffle | true | Enable or disable columnar shuffle. | -| spark.gluten.sql.columnar.shuffle.celeborn.fallback.enabled | true | If enabled, fall back to ColumnarShuffleManager when celeborn service is unavailable.Otherwise, throw an exception. | -| spark.gluten.sql.columnar.shuffle.celeborn.useRssSort | true | If true, use RSS sort implementation for Celeborn sort-based shuffle.If false, use Gluten's row-based sort implementation. Only valid when `spark.celeborn.client.spark.shuffle.writer` is set to `sort`. | -| spark.gluten.sql.columnar.shuffle.codec | <undefined> | By default, the supported codecs are lz4 and zstd. When spark.gluten.sql.columnar.shuffle.codecBackend=qat,the supported codecs are gzip and zstd. | -| spark.gluten.sql.columnar.shuffle.codecBackend | <undefined> | -| spark.gluten.sql.columnar.shuffle.compression.threshold | 100 | If number of rows in a batch falls below this threshold, will copy all buffers into one buffer to compress. | -| spark.gluten.sql.columnar.shuffle.dictionary.enabled | false | Enable dictionary in hash-based shuffle. | -| spark.gluten.sql.columnar.shuffle.merge.threshold | 0.25 | -| spark.gluten.sql.columnar.shuffle.readerBufferSize | 1MB | Buffer size in bytes for shuffle reader reading input stream from local or remote. | -| spark.gluten.sql.columnar.shuffle.realloc.threshold | 0.25 | -| spark.gluten.sql.columnar.shuffle.sort.columns.threshold | 100000 | The threshold to determine whether to use sort-based columnar shuffle. Sort-based shuffle will be used if the number of columns is greater than this threshold. | -| spark.gluten.sql.columnar.shuffle.sort.deserializerBufferSize | 1MB | Buffer size in bytes for sort-based shuffle reader deserializing raw input to columnar batch. | -| spark.gluten.sql.columnar.shuffle.sort.partitions.threshold | 4000 | The threshold to determine whether to use sort-based columnar shuffle. Sort-based shuffle will be used if the number of partitions is greater than this threshold. | -| spark.gluten.sql.columnar.shuffle.typeAwareCompress.enabled | false | Enable type-aware compression (e.g. FFor for 64-bit integers) in shuffle. Not compatible with dictionary encoding; if both are enabled, type-aware compression is automatically disabled. | -| spark.gluten.sql.columnar.shuffledHashJoin | true | Enable or disable columnar shuffledHashJoin. | -| spark.gluten.sql.columnar.shuffledHashJoin.optimizeBuildSide | true | Whether to allow Gluten to choose an optimal build side for shuffled hash join. | -| spark.gluten.sql.columnar.smallFileThreshold | 0.5 | The total size threshold of small files in table scan.To avoid small files being placed into the same partition, Gluten will try to distribute small files into different partitions when the total size of small files is below this threshold. | -| spark.gluten.sql.columnar.sort | true | Enable or disable columnar sort. | -| spark.gluten.sql.columnar.sortMergeJoin | true | Enable or disable columnar sortMergeJoin. This should be set with preferSortMergeJoin=false. | -| spark.gluten.sql.columnar.tableCache | false | Enable or disable columnar table cache. | -| spark.gluten.sql.columnar.takeOrderedAndProject | true | -| spark.gluten.sql.columnar.union | true | Enable or disable columnar union. | -| spark.gluten.sql.columnar.wholeStage.fallback.threshold | -1 | The threshold for whether whole stage will fall back in AQE supported case by counting the number of ColumnarToRow & vanilla leaf node. | -| spark.gluten.sql.columnar.window | true | Enable or disable columnar window. | -| spark.gluten.sql.columnar.window.group.limit | true | Enable or disable columnar window group limit. | -| spark.gluten.sql.columnar.writeToDataSourceV2 | true | Enable or disable columnar v2 command write to data source v2. | -| spark.gluten.sql.columnarSampleEnabled | false | Disable or enable columnar sample. | -| spark.gluten.sql.columnarToRowMemoryThreshold | 64MB | -| spark.gluten.sql.countDistinctWithoutExpand | false | Convert Count Distinct to a UDAF called count_distinct to prevent SparkPlanner converting it to Expand+Count. WARNING: When enabled, count distinct queries will fail to fallback!!! | -| spark.gluten.sql.extendedColumnPruning.enabled | true | Do extended nested column pruning for cases ignored by vanilla Spark. | -| spark.gluten.sql.fallbackRegexpExpressions | false | If true, fall back all regexp expressions. There are a few incompatible cases between RE2 (used by native engine) and java.util.regex (used by Spark). User should enable this property if their incompatibility is intolerable. | -| spark.gluten.sql.fallbackUnexpectedMetadataParquet | false | If enabled, Gluten will not offload scan when unexpected metadata is detected. | -| spark.gluten.sql.fallbackUnexpectedMetadataParquet.limit | 10 | If supplied, metadata of `limit` number of Parquet files will be checked to determine whether to fall back to java scan. | -| spark.gluten.sql.fallbackUnexpectedMetadataParquet.samplePercentage | 0.1 | The percentage of root paths to sample for metadata validation when the number of root paths is large. Value range is (0, 1.0]. 1.0 means check all paths (no sampling). A smaller value reduces validation cost for tables with many partitions. | -| spark.gluten.sql.injectNativePlanStringToExplain | false | When true, Gluten will inject native plan tree to Spark's explain output. | -| spark.gluten.sql.mergeTwoPhasesAggregate.enabled | true | Whether to merge two phases aggregate if there are no other operators between them. | -| spark.gluten.sql.native.arrow.reader.enabled | false | This is config to specify whether to enable the native columnar csv reader | -| spark.gluten.sql.native.bloomFilter | true | -| spark.gluten.sql.native.hive.writer.enabled | true | This is config to specify whether to enable the native columnar writer for HiveFileFormat. Currently only supports HiveFileFormat with Parquet as the output file type. | -| spark.gluten.sql.native.hyperLogLog.Aggregate | true | -| spark.gluten.sql.native.parquet.write.blockRows | 100000000 | -| spark.gluten.sql.native.union | false | Enable or disable native union where computation is completely offloaded to backend. | -| spark.gluten.sql.native.writeColumnMetadataExclusionList | comment | Native write files does not support column metadata. Metadata in list would be removed to support native write files. Multiple values separated by commas. | -| spark.gluten.sql.native.writer.enabled | <undefined> | This is config to specify whether to enable the native columnar parquet/orc writer | -| spark.gluten.sql.orc.charType.scan.fallback.enabled | true | Force fallback for orc char type scan. | -| spark.gluten.sql.pushAggregateThroughJoin.enabled | false | Enables the push-aggregate-through-join optimization in Gluten. When enabled, aggregate operators may be pushed below joins during logical optimization and corresponding physical plans may be rewritten to execute the aggregation earlier. | -| spark.gluten.sql.pushAggregateThroughJoin.maxDepth | 2147483647 | Maximum join traversal depth when applying the push-aggregate-through-join optimization. A value of 1 allows pushing an aggregate through a single join; larger values allow the rule to traverse and push through multiple consecutive joins. | -| spark.gluten.sql.removeNativeWriteFilesSortAndProject | true | When true, Gluten will remove the vanilla Spark V1Writes added sort and project for velox backend. | -| spark.gluten.sql.rewrite.dateTimestampComparison | true | Rewrite the comparision between date and timestamp to timestamp comparison.For example `from_unixtime(ts) > date` will be rewritten to `ts > to_unixtime(date)` | -| spark.gluten.sql.scan.fileSchemeValidation.enabled | true | When true, enable file path scheme validation for scan. Validation will fail if file scheme is not supported by registered file systems, which will cause scan operator fall back. | -| spark.gluten.sql.supported.flattenNestedFunctions | and,or | Flatten nested functions as one for optimization. | -| spark.gluten.sql.text.input.empty.as.default | false | treat empty fields in CSV input as default values. | -| spark.gluten.sql.text.input.max.block.size | 8KB | the max block size for text input rows | -| spark.gluten.sql.validation.printStackOnFailure | false | -| spark.gluten.storage.hdfsViewfs.enabled | false | If enabled, gluten will convert the viewfs path to hdfs path in scala side | -| spark.gluten.supported.hive.udfs || Supported hive udf names. | -| spark.gluten.supported.python.udfs || Supported python udf names. | -| spark.gluten.supported.scala.udfs || Supported scala udf names. | -| spark.gluten.ui.enabled | true | Whether to enable the gluten web UI, If true, attach the gluten UI page to the Spark web UI. | +| Key | Default | Description | +|---------------------------------------------------------------------|-------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| spark.gluten.costModel | legacy | The class name of user-defined cost model that will be used by Gluten's transition planner. If not specified, a legacy built-in cost model will be used. The legacy cost model helps RAS planner exhaustively offload computations, and helps transition planner choose columnar-to-columnar transition over others. | +| spark.gluten.enabled | true | Whether to enable gluten. Default value is true. Just an experimental property. Recommend to enable/disable Gluten through the setting for spark.plugins. | +| spark.gluten.execution.resource.expired.time | 86400 | Expired time of execution with resource relation has cached. | +| spark.gluten.expression.blacklist | <undefined> | A black list of expression to skip transform, multiple values separated by commas. | +| spark.gluten.loadLibFromJar | false | Whether to load shared libraries from jars. | +| spark.gluten.loadLibOS | <undefined> | The shared library loader's OS name. | +| spark.gluten.loadLibOSVersion | <undefined> | The shared library loader's OS version. | +| spark.gluten.memory.isolation | false | Enable isolated memory mode. If true, Gluten controls the maximum off-heap memory can be used by each task to X, X = executor memory / max task slots. It's recommended to set true if Gluten serves concurrent queries within a single session, since not all memory Gluten allocated is guaranteed to be spillable. In the case, the feature should be enabled to avoid OOM. | +| spark.gluten.memory.overAcquiredMemoryRatio | 0.3 | If larger than 0, Velox backend will try over-acquire this ratio of the total allocated memory as backup to avoid OOM. | +| spark.gluten.memory.reservationBlockSize | 8MB | Block size of native reservation listener reserve memory from Spark. | +| spark.gluten.numTaskSlotsPerExecutor | -1 | Must provide default value since non-execution operations (e.g. org.apache.spark.sql.Dataset#summary) doesn't propagate configurations using org.apache.spark.sql.execution.SQLExecution#withSQLConfPropagated | +| spark.gluten.shuffleWriter.bufferSize | <undefined> | +| spark.gluten.soft-affinity.duplicateReading.maxCacheItems | 10000 | Enable Soft Affinity duplicate reading detection | +| spark.gluten.soft-affinity.duplicateReadingDetect.enabled | false | If true, Enable Soft Affinity duplicate reading detection | +| spark.gluten.soft-affinity.enabled | false | Whether to enable Soft Affinity scheduling. | +| spark.gluten.soft-affinity.min.target-hosts | 1 | For on HDFS, if there are already target hosts, and then prefer to use the original target hosts to schedule | +| spark.gluten.soft-affinity.replications.num | 2 | Calculate the number of the replications for scheduling to the target executors per file | +| spark.gluten.sql.adaptive.costEvaluator.enabled | true | If true, use org.apache.spark.sql.execution.adaptive.GlutenCostEvaluator as custom cost evaluator class, else follow the configuration spark.sql.adaptive.customCostEvaluatorClass. | +| spark.gluten.sql.ansiFallback.enabled | true | When true (default), Gluten will fall back to Spark when ANSI mode is enabled. When false, Gluten will attempt to execute in ANSI mode. | +| spark.gluten.sql.broadcastNestedLoopJoinTransformerEnabled | true | Config to enable BroadcastNestedLoopJoinExecTransformer. | +| spark.gluten.sql.cacheWholeStageTransformerContext | false | When true, `WholeStageTransformer` will cache the `WholeStageTransformerContext` when executing. It is used to get substrait plan node and native plan string. | +| spark.gluten.sql.cartesianProductTransformerEnabled | true | Config to enable CartesianProductExecTransformer. | +| spark.gluten.sql.collapseGetJsonObject.enabled | false | Collapse nested get_json_object functions as one for optimization. | +| spark.gluten.sql.columnar.appendData | true | Enable or disable columnar v2 command append data. | +| spark.gluten.sql.columnar.arrowUdf | true | Enable or disable columnar arrow udf. | +| spark.gluten.sql.columnar.batchscan | true | Enable or disable columnar batchscan. | +| spark.gluten.sql.columnar.broadcastExchange | true | Enable or disable columnar broadcastExchange. | +| spark.gluten.sql.columnar.broadcastJoin | true | Enable or disable columnar broadcastJoin. | +| spark.gluten.sql.columnar.cast.avg | true | +| spark.gluten.sql.columnar.coalesce | true | Enable or disable columnar coalesce. | +| spark.gluten.sql.columnar.collectLimit | true | Enable or disable columnar collectLimit. | +| spark.gluten.sql.columnar.collectTail | true | Enable or disable columnar collectTail. | +| spark.gluten.sql.columnar.enableNestedColumnPruningInHiveTableScan | true | Enable or disable nested column pruning in hivetablescan. | +| spark.gluten.sql.columnar.enableVanillaVectorizedReaders | true | Enable or disable vanilla vectorized scan. | +| spark.gluten.sql.columnar.executor.libpath || The gluten executor library path. | +| spark.gluten.sql.columnar.expand | true | Enable or disable columnar expand. | +| spark.gluten.sql.columnar.fallback.expressions.threshold | 50 | Fall back filter/project if number of nested expressions reaches this threshold, considering Spark codegen can bring better performance for such case. | +| spark.gluten.sql.columnar.fallback.ignoreRowToColumnar | true | When true, the fallback policy ignores the RowToColumnar when counting fallback number. | +| spark.gluten.sql.columnar.fallback.preferColumnar | true | When true, the fallback policy prefers to use Gluten plan rather than vanilla Spark plan if the both of them contains ColumnarToRow and the vanilla Spark plan ColumnarToRow number is not smaller than Gluten plan. | +| spark.gluten.sql.columnar.filescan | true | Enable or disable columnar filescan. | +| spark.gluten.sql.columnar.filter | true | Enable or disable columnar filter. | +| spark.gluten.sql.columnar.force.hashagg | true | Whether to force to use gluten's hash agg for replacing vanilla spark's sort agg. | +| spark.gluten.sql.columnar.forceShuffledHashJoin | true | +| spark.gluten.sql.columnar.generate | true | +| spark.gluten.sql.columnar.hashagg | true | Enable or disable columnar hashagg. | +| spark.gluten.sql.columnar.hivetablescan | true | Enable or disable columnar hivetablescan. | +| spark.gluten.sql.columnar.libname | gluten | The gluten library name. | +| spark.gluten.sql.columnar.libpath || The gluten library path. | +| spark.gluten.sql.columnar.limit | true | +| spark.gluten.sql.columnar.maxBatchSize | 4096 | +| spark.gluten.sql.columnar.overwriteByExpression | true | Enable or disable columnar v2 command overwrite by expression. | +| spark.gluten.sql.columnar.overwritePartitionsDynamic | true | Enable or disable columnar v2 command overwrite partitions dynamic. | +| spark.gluten.sql.columnar.parquet.write.blockSize | 128MB | +| spark.gluten.sql.columnar.partial.generate | true | Evaluates the non-offload-able HiveUDTF using vanilla Spark generator | +| spark.gluten.sql.columnar.partial.project | true | Break up one project node into 2 phases when some of the expressions are non offload-able. Phase one is a regular offloaded project transformer that evaluates the offload-able expressions in native, phase two preserves the output from phase one and evaluates the remaining non-offload-able expressions using vanilla Spark projections | +| spark.gluten.sql.columnar.physicalJoinOptimizationLevel | 12 | Fallback to row operators if there are several continuous joins. | +| spark.gluten.sql.columnar.physicalJoinOptimizationOutputSize | 52 | Fallback to row operators if there are several continuous joins and matched output size. | +| spark.gluten.sql.columnar.physicalJoinOptimizeEnable | false | Enable or disable columnar physicalJoinOptimize. | +| spark.gluten.sql.columnar.preferStreamingAggregate | true | Velox backend supports `StreamingAggregate`. `StreamingAggregate` uses the less memory as it does not need to hold all groups in memory, so it could avoid spill. When true and the child output ordering satisfies the grouping key then Gluten will choose `StreamingAggregate` as the native operator. | +| spark.gluten.sql.columnar.project | true | Enable or disable columnar project. | +| spark.gluten.sql.columnar.project.collapse | true | Combines two columnar project operators into one and perform alias substitution | +| spark.gluten.sql.columnar.query.fallback.threshold | -1 | The threshold for whether query will fall back by counting the number of ColumnarToRow & vanilla leaf node. | +| spark.gluten.sql.columnar.range | true | Enable or disable columnar range. | +| spark.gluten.sql.columnar.replaceData | true | Enable or disable columnar v2 command replace data. | +| spark.gluten.sql.columnar.scanOnly | false | When enabled, only scan and the filter after scan will be offloaded to native. | +| spark.gluten.sql.columnar.shuffle | true | Enable or disable columnar shuffle. | +| spark.gluten.sql.columnar.shuffle.celeborn.fallback.enabled | true | If enabled, fall back to ColumnarShuffleManager when celeborn service is unavailable.Otherwise, throw an exception. | +| spark.gluten.sql.columnar.shuffle.celeborn.useRssSort | true | If true, use RSS sort implementation for Celeborn sort-based shuffle.If false, use Gluten's row-based sort implementation. Only valid when `spark.celeborn.client.spark.shuffle.writer` is set to `sort`. | +| spark.gluten.sql.columnar.shuffle.codec | <undefined> | By default, the supported codecs are lz4 and zstd. When spark.gluten.sql.columnar.shuffle.codecBackend=qat,the supported codecs are gzip and zstd. | +| spark.gluten.sql.columnar.shuffle.codecBackend | <undefined> | +| spark.gluten.sql.columnar.shuffle.compression.threshold | 100 | If number of rows in a batch falls below this threshold, will copy all buffers into one buffer to compress. | +| spark.gluten.sql.columnar.shuffle.dictionary.enabled | false | Enable dictionary in hash-based shuffle. | +| spark.gluten.sql.columnar.shuffle.merge.threshold | 0.25 | +| spark.gluten.sql.columnar.shuffle.readerBufferSize | 1MB | Buffer size in bytes for shuffle reader reading input stream from local or remote. | +| spark.gluten.sql.columnar.shuffle.realloc.threshold | 0.25 | +| spark.gluten.sql.columnar.shuffle.sort.columns.threshold | 100000 | The threshold to determine whether to use sort-based columnar shuffle. Sort-based shuffle will be used if the number of columns is greater than this threshold. | +| spark.gluten.sql.columnar.shuffle.sort.deserializerBufferSize | 1MB | Buffer size in bytes for sort-based shuffle reader deserializing raw input to columnar batch. | +| spark.gluten.sql.columnar.shuffle.sort.partitions.threshold | 4000 | The threshold to determine whether to use sort-based columnar shuffle. Sort-based shuffle will be used if the number of partitions is greater than this threshold. | +| spark.gluten.sql.columnar.shuffle.typeAwareCompress.enabled | false | Enable type-aware compression (e.g. FFor for 64-bit integers) in shuffle. Not compatible with dictionary encoding; if both are enabled, type-aware compression is automatically disabled. | +| spark.gluten.sql.columnar.shuffledHashJoin | true | Enable or disable columnar shuffledHashJoin. | +| spark.gluten.sql.columnar.shuffledHashJoin.optimizeBuildSide | true | Whether to allow Gluten to choose an optimal build side for shuffled hash join. | +| spark.gluten.sql.columnar.smallFileThreshold | 0.5 | The total size threshold of small files in table scan.To avoid small files being placed into the same partition, Gluten will try to distribute small files into different partitions when the total size of small files is below this threshold. | +| spark.gluten.sql.columnar.sort | true | Enable or disable columnar sort. | +| spark.gluten.sql.columnar.sortMergeJoin | true | Enable or disable columnar sortMergeJoin. This should be set with preferSortMergeJoin=false. | +| spark.gluten.sql.columnar.tableCache | false | Enable or disable columnar table cache. | +| spark.gluten.sql.columnar.takeOrderedAndProject | true | +| spark.gluten.sql.columnar.union | true | Enable or disable columnar union. | +| spark.gluten.sql.columnar.wholeStage.fallback.threshold | -1 | The threshold for whether whole stage will fall back in AQE supported case by counting the number of ColumnarToRow & vanilla leaf node. | +| spark.gluten.sql.columnar.window | true | Enable or disable columnar window. | +| spark.gluten.sql.columnar.window.group.limit | true | Enable or disable columnar window group limit. | +| spark.gluten.sql.columnar.writeToDataSourceV2 | true | Enable or disable columnar v2 command write to data source v2. | +| spark.gluten.sql.columnarSampleEnabled | false | Disable or enable columnar sample. | +| spark.gluten.sql.columnarToRowMemoryThreshold | 64MB | +| spark.gluten.sql.countDistinctWithoutExpand | false | Convert Count Distinct to a UDAF called count_distinct to prevent SparkPlanner converting it to Expand+Count. WARNING: When enabled, count distinct queries will fail to fallback!!! | +| spark.gluten.sql.extendedColumnPruning.enabled | true | Do extended nested column pruning for cases ignored by vanilla Spark. | +| spark.gluten.sql.fallbackRegexpExpressions | false | If true, fall back all regexp expressions. There are a few incompatible cases between RE2 (used by native engine) and java.util.regex (used by Spark). User should enable this property if their incompatibility is intolerable. | +| spark.gluten.sql.fallbackUnexpectedMetadataParquet | false | If enabled, Gluten will not offload scan when unexpected metadata is detected. | +| spark.gluten.sql.fallbackUnexpectedMetadataParquet.limit | 10 | If supplied, metadata of `limit` number of Parquet files will be checked to determine whether to fall back to java scan. | +| spark.gluten.sql.fallbackUnexpectedMetadataParquet.samplePercentage | 0.1 | The percentage of root paths to sample for metadata validation when the number of root paths is large. Value range is (0, 1.0]. 1.0 means check all paths (no sampling). A smaller value reduces validation cost for tables with many partitions. | +| spark.gluten.sql.injectNativePlanStringToExplain | false | When true, Gluten will inject native plan tree to Spark's explain output. | +| spark.gluten.sql.mergeTwoPhasesAggregate.enabled | true | Whether to merge two phases aggregate if there are no other operators between them. | +| spark.gluten.sql.native.arrow.reader.enabled | false | This is config to specify whether to enable the native columnar csv reader | +| spark.gluten.sql.native.bloomFilter | true | +| spark.gluten.sql.native.hive.writer.enabled | true | This is config to specify whether to enable the native columnar writer for HiveFileFormat. Currently only supports HiveFileFormat with Parquet as the output file type. | +| spark.gluten.sql.native.hyperLogLog.Aggregate | true | +| spark.gluten.sql.native.parquet.write.blockRows | 100000000 | +| spark.gluten.sql.native.union | false | Enable or disable native union where computation is completely offloaded to backend. | +| spark.gluten.sql.native.writeColumnMetadataExclusionList | comment | Native write files does not support column metadata. Metadata in list would be removed to support native write files. Multiple values separated by commas. | +| spark.gluten.sql.native.writer.enabled | <undefined> | This is config to specify whether to enable the native columnar parquet/orc writer | +| spark.gluten.sql.orc.charType.scan.fallback.enabled | true | Force fallback for orc char type scan. | +| spark.gluten.sql.pushAggregateThroughJoin.enabled | false | Enables the push-aggregate-through-join optimization in Gluten. When enabled, aggregate operators may be pushed below joins during logical optimization and corresponding physical plans may be rewritten to execute the aggregation earlier. | +| spark.gluten.sql.pushAggregateThroughJoin.maxDepth | 2147483647 | Maximum join traversal depth when applying the push-aggregate-through-join optimization. A value of 1 allows pushing an aggregate through a single join; larger values allow the rule to traverse and push through multiple consecutive joins. | +| spark.gluten.sql.removeNativeWriteFilesSortAndProject | true | When true, Gluten will remove the vanilla Spark V1Writes added sort and project for velox backend. | +| spark.gluten.sql.rewrite.dateTimestampComparison | true | Rewrite the comparision between date and timestamp to timestamp comparison.For example `from_unixtime(ts) > date` will be rewritten to `ts > to_unixtime(date)` | +| spark.gluten.sql.scan.fileSchemeValidation.enabled | true | When true, enable file path scheme validation for scan. Validation will fail if file scheme is not supported by registered file systems, which will cause scan operator fall back. | +| spark.gluten.sql.supported.flattenNestedFunctions | and,or | Flatten nested functions as one for optimization. | +| spark.gluten.sql.text.input.empty.as.default | false | treat empty fields in CSV input as default values. | +| spark.gluten.sql.text.input.max.block.size | 8KB | the max block size for text input rows | +| spark.gluten.sql.validation.printStackOnFailure | false | +| spark.gluten.storage.hdfsViewfs.enabled | false | If enabled, gluten will convert the viewfs path to hdfs path in scala side | +| spark.gluten.supported.hive.udfs || Supported hive udf names. | +| spark.gluten.supported.python.udfs || Supported python udf names. | +| spark.gluten.supported.scala.udfs || Supported scala udf names. | +| spark.gluten.ui.enabled | true | Whether to enable the gluten web UI, If true, attach the gluten UI page to the Spark web UI. | ## Gluten *experimental* configurations diff --git a/docs/velox-configuration.md b/docs/velox-configuration.md index a608dfbc450b..b428188a888c 100644 --- a/docs/velox-configuration.md +++ b/docs/velox-configuration.md @@ -9,77 +9,78 @@ nav_order: 16 ## Gluten Velox backend configurations -| Key | Default | Description | -|----------------------------------------------------------------------------------|-------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| spark.gluten.sql.columnar.backend.velox.IOThreads | <undefined> | The Size of the IO thread pool in the Connector. This thread pool is used for split preloading and DirectBufferedInput. By default, the value is the same as the maximum task slots per Spark executor. | -| spark.gluten.sql.columnar.backend.velox.SplitPreloadPerDriver | 2 | The split preload per task | -| spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct | 90 | If partial aggregation aggregationPct greater than this value, partial aggregation may be early abandoned. Note: this option only works when flushable partial aggregation is enabled. Ignored when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. | -| spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows | 100000 | If partial aggregation input rows number greater than this value, partial aggregation may be early abandoned. Note: this option only works when flushable partial aggregation is enabled. Ignored when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. | -| spark.gluten.sql.columnar.backend.velox.asyncTimeoutOnTaskStopping | 30000ms | Timeout in milliseconds when waiting for runtime-scoped async work to finish during teardown. | -| spark.gluten.sql.columnar.backend.velox.cacheEnabled | false | Enable Velox cache, default off. It's recommended to enablesoft-affinity as well when enable velox cache. | -| spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct | 0 | Set prefetch cache min pct for velox file scan | -| spark.gluten.sql.columnar.backend.velox.checkUsageLeak | true | Enable check memory usage leak. | -| spark.gluten.sql.columnar.backend.velox.cudf.batchSize | 2147483647 | Cudf input batch size after shuffle reader | -| spark.gluten.sql.columnar.backend.velox.cudf.enableTableScan | false | Enable cudf table scan | -| spark.gluten.sql.columnar.backend.velox.cudf.enableValidation | true | Heuristics you can apply to validate a cuDF/GPU plan and only offload when the entire stage can be fully and profitably executed on GPU | -| spark.gluten.sql.columnar.backend.velox.cudf.memoryPercent | 50 | The initial percent of GPU memory to allocate for memory resource for one thread. | -| spark.gluten.sql.columnar.backend.velox.cudf.memoryResource | async | GPU RMM memory resource. | -| spark.gluten.sql.columnar.backend.velox.cudf.shuffleMaxPrefetchBytes | 1028MB | Maximum bytes to prefetch in CPU memory during GPU shuffle read while waiting for GPU available. | -| spark.gluten.sql.columnar.backend.velox.directorySizeGuess | 32KB | Deprecated, rename to spark.gluten.sql.columnar.backend.velox.footerEstimatedSize | -| spark.gluten.sql.columnar.backend.velox.enableTimestampNtzValidation | true | Enable validation fallback for TimestampNTZ type. When true (default), any plan containing TimestampNTZ will fall back to Spark execution. Set to false during development/testing of TimestampNTZ support to allow native execution. | -| spark.gluten.sql.columnar.backend.velox.fileHandleCacheEnabled | false | Disables caching if false. File handle cache should be disabled if files are mutable, i.e. file content may change while file path stays the same. | -| spark.gluten.sql.columnar.backend.velox.filePreloadThreshold | 1MB | Set the file preload threshold for velox file scan, refer to Velox's file-preload-threshold | -| spark.gluten.sql.columnar.backend.velox.floatingPointMode | loose | Config used to control the tolerance of floating point operations alignment with Spark. When the mode is set to strict, flushing is disabled for sum(float/double)and avg(float/double). When set to loose, flushing will be enabled. | -| spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation | true | Enable flushable aggregation. If true, Gluten will try converting regular aggregation into Velox's flushable aggregation when applicable. A flushable aggregation could emit intermediate result at anytime when memory is full / data reduction ratio is low. | -| spark.gluten.sql.columnar.backend.velox.footerEstimatedSize | 32KB | Set the footer estimated size for velox file scan, refer to Velox's footer-estimated-size | -| spark.gluten.sql.columnar.backend.velox.hashProbe.bloomFilterPushdown.maxSize | 0b | The maximum byte size of Bloom filter that can be generated from hash probe. When set to 0, no Bloom filter will be generated. To achieve optimal performance, this should not be too larger than the CPU cache size on the host. | -| spark.gluten.sql.columnar.backend.velox.hashProbe.dynamicFilterPushdown.enabled | true | Whether hash probe can generate any dynamic filter (including Bloom filter) and push down to upstream operators. | -| spark.gluten.sql.columnar.backend.velox.loadQuantum | 256MB | Set the load quantum for velox file scan, recommend to use the default value (256MB) for performance consideration. If Velox cache is enabled, it can be 8MB at most. | -| spark.gluten.sql.columnar.backend.velox.maxCoalescedBytes | 64MB | Set the max coalesced bytes for velox file scan | -| spark.gluten.sql.columnar.backend.velox.maxCoalescedDistance | 512KB | Set the max coalesced distance bytes for velox file scan | -| spark.gluten.sql.columnar.backend.velox.maxCompiledRegexes | 100 | Controls maximum number of compiled regular expression patterns per function instance per thread of execution. | -| spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemory | <undefined> | Set the max extended memory of partial aggregation in bytes. When this option is set to a value greater than 0, it will override spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio. Note: this option only works when flushable partial aggregation is enabled. Ignored when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. | -| spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio | 0.15 | Set the max extended memory of partial aggregation as maxExtendedPartialAggregationMemoryRatio of offheap size. Note: this option only works when flushable partial aggregation is enabled. Ignored when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. | -| spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemory | <undefined> | Set the max memory of partial aggregation in bytes. When this option is set to a value greater than 0, it will override spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio. Note: this option only works when flushable partial aggregation is enabled. Ignored when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. | -| spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio | 0.1 | Set the max memory of partial aggregation as maxPartialAggregationMemoryRatio of offheap size. Note: this option only works when flushable partial aggregation is enabled. Ignored when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. | -| spark.gluten.sql.columnar.backend.velox.maxPartitionsPerWritersSession | 10000 | Maximum number of partitions per a single table writer instance. | -| spark.gluten.sql.columnar.backend.velox.maxSpillBytes | 100G | The maximum file size of a query | -| spark.gluten.sql.columnar.backend.velox.maxSpillFileSize | 1GB | The maximum size of a single spill file created | -| spark.gluten.sql.columnar.backend.velox.maxSpillLevel | 4 | The max allowed spilling level with zero being the initial spilling level | -| spark.gluten.sql.columnar.backend.velox.maxSpillRunRows | 3M | The maximum row size of a single spill run | -| spark.gluten.sql.columnar.backend.velox.maxTargetFileSize | 0b | The target file size for each output file when writing data. 0 means no limit on target file size, and the actual file size will be determined by other factors such as max partition number and shuffle batch size. | -| spark.gluten.sql.columnar.backend.velox.memCacheSize | 1GB | The memory cache size | -| spark.gluten.sql.columnar.backend.velox.memInitCapacity | 8MB | The initial memory capacity to reserve for a newly created Velox query memory pool. | -| spark.gluten.sql.columnar.backend.velox.memoryPoolCapacityTransferAcrossTasks | true | Whether to allow memory capacity transfer between memory pools from different tasks. | -| spark.gluten.sql.columnar.backend.velox.memoryUseHugePages | false | Use explicit huge pages for Velox memory allocation. | -| spark.gluten.sql.columnar.backend.velox.orc.scan.enabled | true | Enable velox orc scan. If disabled, vanilla spark orc scan will be used. | -| spark.gluten.sql.columnar.backend.velox.orcUseColumnNames | true | Maps table field names to file field names using names, not indices for ORC files. | -| spark.gluten.sql.columnar.backend.velox.parquet.pageSizeBytes | 1MB | The page size in bytes is for compression. | -| spark.gluten.sql.columnar.backend.velox.parquetUseColumnNames | true | Maps table field names to file field names using names, not indices for Parquet files. | -| spark.gluten.sql.columnar.backend.velox.prefetchRowGroups | 1 | Set the prefetch row groups for velox file scan | -| spark.gluten.sql.columnar.backend.velox.queryTraceEnabled | false | Enable query tracing flag. | -| spark.gluten.sql.columnar.backend.velox.reclaimMaxWaitMs | 3600000ms | The max time in ms to wait for memory reclaim. | -| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput | true | If true, combine small columnar batches together before sending to shuffle. The default minimum output batch size is equal to 0.25 * spark.gluten.sql.columnar.maxBatchSize | -| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.minSize | <undefined> | The minimum batch size for shuffle. If size of an input batch is smaller than the value, it will be combined with other batches before sending to shuffle. Only functions when spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput is set to true. Default value: 0.25 * | -| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInputOuptut.minSize | <undefined> | The minimum batch size for shuffle input and output. If size of an input batch is smaller than the value, it will be combined with other batches before sending to shuffle. The same applies for batches output by shuffle read. Only functions when spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput or spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleOutput is set to true. Default value: 0.25 * | -| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleOutput | false | If true, combine small columnar batches together right after shuffle read. The default minimum output batch size is equal to 0.25 * spark.gluten.sql.columnar.maxBatchSize | -| spark.gluten.sql.columnar.backend.velox.showTaskMetricsWhenFinished | false | Show velox full task metrics when finished. | -| spark.gluten.sql.columnar.backend.velox.spillFileSystem | local | The filesystem used to store spill data. local: The local file system. heap-over-local: Write file to JVM heap if having extra heap space. Otherwise write to local file system. | -| spark.gluten.sql.columnar.backend.velox.spillStrategy | auto | none: Disable spill on Velox backend; auto: Let Spark memory manager manage Velox's spilling | -| spark.gluten.sql.columnar.backend.velox.ssdCacheIOThreads | 1 | The IO threads for cache promoting | -| spark.gluten.sql.columnar.backend.velox.ssdCachePath | /tmp | The folder to store the cache files, better on SSD | -| spark.gluten.sql.columnar.backend.velox.ssdCacheShards | 1 | The cache shards | -| spark.gluten.sql.columnar.backend.velox.ssdCacheSize | 1GB | The SSD cache size, will do memory caching only if this value = 0 | -| spark.gluten.sql.columnar.backend.velox.ssdCheckpointIntervalBytes | 0 | Checkpoint after every 'checkpointIntervalBytes' for SSD cache. 0 means no checkpointing. | -| spark.gluten.sql.columnar.backend.velox.ssdChecksumEnabled | false | If true, checksum write to SSD is enabled. | -| spark.gluten.sql.columnar.backend.velox.ssdChecksumReadVerificationEnabled | false | If true, checksum read verification from SSD is enabled. | -| spark.gluten.sql.columnar.backend.velox.ssdDisableFileCow | false | True if copy on write should be disabled. | -| spark.gluten.sql.columnar.backend.velox.ssdODirect | false | The O_DIRECT flag for cache writing | -| spark.gluten.sql.columnar.backend.velox.valueStream.dynamicFilter.enabled | false | Whether to apply dynamic filters pushed down from hash probe in the ValueStream (shuffle reader) operator to filter rows before they reach the hash join. | -| spark.gluten.sql.enable.enhancedFeatures | true | Enable some features including iceberg native write and other features. | -| spark.gluten.sql.rewrite.castArrayToString | true | When true, rewrite `cast(array as String)` to `concat('[', array_join(array, ', ', null), ']')` to allow offloading to Velox. | -| spark.gluten.velox.broadcast.build.targetBytesPerThread | 32MB | It is used to calculate the number of hash table build threads. Based on our testing across various thresholds (1MB to 128MB), we recommend a value of 32MB or 64MB, as these consistently provided the most significant performance gains. | -| spark.gluten.velox.castFromVarcharAddTrimNode | false | If true, will add a trim node which has the same semantic as vanilla Spark to CAST-from-varchar.Otherwise, do nothing. | +| Key | Default | Description | +|----------------------------------------------------------------------------------|-------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| spark.gluten.sql.columnar.backend.velox.IOThreads | <undefined> | The Size of the IO thread pool in the Connector. This thread pool is used for split preloading and DirectBufferedInput. By default, the value is the same as the maximum task slots per Spark executor. | +| spark.gluten.sql.columnar.backend.velox.SplitPreloadPerDriver | 2 | The split preload per task | +| spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct | 90 | If partial aggregation aggregationPct greater than this value, partial aggregation may be early abandoned. Note: this option only works when flushable partial aggregation is enabled. Ignored when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. | +| spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows | 100000 | If partial aggregation input rows number greater than this value, partial aggregation may be early abandoned. Note: this option only works when flushable partial aggregation is enabled. Ignored when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. | +| spark.gluten.sql.columnar.backend.velox.asyncTimeoutOnTaskStopping | 30000ms | Timeout in milliseconds when waiting for runtime-scoped async work to finish during teardown. | +| spark.gluten.sql.columnar.backend.velox.cacheEnabled | false | Enable Velox cache, default off. It's recommended to enablesoft-affinity as well when enable velox cache. | +| spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct | 0 | Set prefetch cache min pct for velox file scan | +| spark.gluten.sql.columnar.backend.velox.checkUsageLeak | true | Enable check memory usage leak. | +| spark.gluten.sql.columnar.backend.velox.cudf.batchSize | 2147483647 | Cudf input batch size after shuffle reader | +| spark.gluten.sql.columnar.backend.velox.cudf.enableTableScan | false | Enable cudf table scan | +| spark.gluten.sql.columnar.backend.velox.cudf.enableValidation | true | Heuristics you can apply to validate a cuDF/GPU plan and only offload when the entire stage can be fully and profitably executed on GPU | +| spark.gluten.sql.columnar.backend.velox.cudf.memoryPercent | 50 | The initial percent of GPU memory to allocate for memory resource for one thread. | +| spark.gluten.sql.columnar.backend.velox.cudf.memoryResource | async | GPU RMM memory resource. | +| spark.gluten.sql.columnar.backend.velox.cudf.shuffleMaxPrefetchBytes | 1028MB | Maximum bytes to prefetch in CPU memory during GPU shuffle read while waiting for GPU available. | +| spark.gluten.sql.columnar.backend.velox.directorySizeGuess | 32KB | Deprecated, rename to spark.gluten.sql.columnar.backend.velox.footerEstimatedSize | +| spark.gluten.sql.columnar.backend.velox.enableTimestampNtzValidation | true | Enable validation fallback for TimestampNTZ type. When true (default), any plan containing TimestampNTZ will fall back to Spark execution. Set to false during development/testing of TimestampNTZ support to allow native execution. | +| spark.gluten.sql.columnar.backend.velox.fileHandleCacheEnabled | false | Disables caching if false. File handle cache should be disabled if files are mutable, i.e. file content may change while file path stays the same. | +| spark.gluten.sql.columnar.backend.velox.filePreloadThreshold | 1MB | Set the file preload threshold for velox file scan, refer to Velox's file-preload-threshold | +| spark.gluten.sql.columnar.backend.velox.floatingPointMode | loose | Config used to control the tolerance of floating point operations alignment with Spark. When the mode is set to strict, flushing is disabled for sum(float/double)and avg(float/double). When set to loose, flushing will be enabled. | +| spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation | true | Enable flushable aggregation. If true, Gluten will try converting regular aggregation into Velox's flushable aggregation when applicable. A flushable aggregation could emit intermediate result at anytime when memory is full / data reduction ratio is low. | +| spark.gluten.sql.columnar.backend.velox.footerEstimatedSize | 32KB | Set the footer estimated size for velox file scan, refer to Velox's footer-estimated-size | +| spark.gluten.sql.columnar.backend.velox.hashProbe.bloomFilterPushdown.maxSize | 0b | The maximum byte size of Bloom filter that can be generated from hash probe. When set to 0, no Bloom filter will be generated. To achieve optimal performance, this should not be too larger than the CPU cache size on the host. | +| spark.gluten.sql.columnar.backend.velox.hashProbe.dynamicFilterPushdown.enabled | true | Whether hash probe can generate any dynamic filter (including Bloom filter) and push down to upstream operators. | +| spark.gluten.sql.columnar.backend.velox.loadQuantum | 256MB | Set the load quantum for velox file scan, recommend to use the default value (256MB) for performance consideration. If Velox cache is enabled, it can be 8MB at most. | +| spark.gluten.sql.columnar.backend.velox.maxCoalescedBytes | 64MB | Set the max coalesced bytes for velox file scan | +| spark.gluten.sql.columnar.backend.velox.maxCoalescedDistance | 512KB | Set the max coalesced distance bytes for velox file scan | +| spark.gluten.sql.columnar.backend.velox.maxCompiledRegexes | 100 | Controls maximum number of compiled regular expression patterns per function instance per thread of execution. | +| spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemory | <undefined> | Set the max extended memory of partial aggregation in bytes. When this option is set to a value greater than 0, it will override spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio. Note: this option only works when flushable partial aggregation is enabled. Ignored when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. | +| spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio | 0.15 | Set the max extended memory of partial aggregation as maxExtendedPartialAggregationMemoryRatio of offheap size. Note: this option only works when flushable partial aggregation is enabled. Ignored when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. | +| spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemory | <undefined> | Set the max memory of partial aggregation in bytes. When this option is set to a value greater than 0, it will override spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio. Note: this option only works when flushable partial aggregation is enabled. Ignored when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. | +| spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio | 0.1 | Set the max memory of partial aggregation as maxPartialAggregationMemoryRatio of offheap size. Note: this option only works when flushable partial aggregation is enabled. Ignored when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. | +| spark.gluten.sql.columnar.backend.velox.maxPartitionsPerWritersSession | 10000 | Maximum number of partitions per a single table writer instance. | +| spark.gluten.sql.columnar.backend.velox.maxSpillBytes | 100G | The maximum file size of a query | +| spark.gluten.sql.columnar.backend.velox.maxSpillFileSize | 1GB | The maximum size of a single spill file created | +| spark.gluten.sql.columnar.backend.velox.maxSpillLevel | 4 | The max allowed spilling level with zero being the initial spilling level | +| spark.gluten.sql.columnar.backend.velox.maxSpillRunRows | 3M | The maximum row size of a single spill run | +| spark.gluten.sql.columnar.backend.velox.maxTargetFileSize | 0b | The target file size for each output file when writing data. 0 means no limit on target file size, and the actual file size will be determined by other factors such as max partition number and shuffle batch size. | +| spark.gluten.sql.columnar.backend.velox.memCacheSize | 1GB | The memory cache size | +| spark.gluten.sql.columnar.backend.velox.memInitCapacity | 8MB | The initial memory capacity to reserve for a newly created Velox query memory pool. | +| spark.gluten.sql.columnar.backend.velox.memoryPoolCapacityTransferAcrossTasks | true | Whether to allow memory capacity transfer between memory pools from different tasks. | +| spark.gluten.sql.columnar.backend.velox.memoryUseHugePages | false | Use explicit huge pages for Velox memory allocation. | +| spark.gluten.sql.columnar.backend.velox.orc.scan.enabled | true | Enable velox orc scan. If disabled, vanilla spark orc scan will be used. | +| spark.gluten.sql.columnar.backend.velox.orcUseColumnNames | true | Maps table field names to file field names using names, not indices for ORC files. | +| spark.gluten.sql.columnar.backend.velox.parquet.pageSizeBytes | 1MB | The page size in bytes is for compression. | +| spark.gluten.sql.columnar.backend.velox.parquetUseColumnNames | true | Maps table field names to file field names using names, not indices for Parquet files. | +| spark.gluten.sql.columnar.backend.velox.prefetchRowGroups | 1 | Set the prefetch row groups for velox file scan | +| spark.gluten.sql.columnar.backend.velox.queryTraceEnabled | false | Enable query tracing flag. | +| spark.gluten.sql.columnar.backend.velox.reclaimMaxWaitMs | 3600000ms | The max time in ms to wait for memory reclaim. | +| spark.gluten.sql.columnar.backend.velox.resizeBatches.copyRanges.enabled | true | Enables a VeloxResizeBatchesExec fast path that combines eligible batches using Velox vector copyRanges instead of generic RowVector append. When possible, it collects the small input batches for one VeloxResizeBatchesExec output, allocates the output RowVector once, and bulk-copies child vector ranges. This is most useful for shuffle-read outputs where plain hash shuffle payloads are materialized as dense flat vectors. Complex vectors can also use copyRanges, but ARRAY and MAP still rebuild nested offsets and sizes while bulk-copying child ranges. Unsupported encodings such as dictionary and constant vectors fall back to the generic copy path. This option is enabled by default and complements the reader-side raw payload merge fast path: that path avoids materializing small plain payload batches, while this option optimizes VeloxResizeBatchesExec when that operator is enabled. | +| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput | true | If true, combine small columnar batches together before sending to shuffle. The default minimum output batch size is equal to 0.25 * spark.gluten.sql.columnar.maxBatchSize | +| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.minSize | <undefined> | The minimum batch size for shuffle. If size of an input batch is smaller than the value, it will be combined with other batches before sending to shuffle. Only functions when spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput is set to true. Default value: 0.25 * | +| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInputOuptut.minSize | <undefined> | The minimum batch size for shuffle input and output. If size of an input batch is smaller than the value, it will be combined with other batches before sending to shuffle. The same applies for batches output by shuffle read. Only functions when spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput or spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleOutput is set to true. Default value: 0.25 * | +| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleOutput | false | If true, combine small columnar batches together right after shuffle read. The default minimum output batch size is equal to 0.25 * spark.gluten.sql.columnar.maxBatchSize | +| spark.gluten.sql.columnar.backend.velox.showTaskMetricsWhenFinished | false | Show velox full task metrics when finished. | +| spark.gluten.sql.columnar.backend.velox.spillFileSystem | local | The filesystem used to store spill data. local: The local file system. heap-over-local: Write file to JVM heap if having extra heap space. Otherwise write to local file system. | +| spark.gluten.sql.columnar.backend.velox.spillStrategy | auto | none: Disable spill on Velox backend; auto: Let Spark memory manager manage Velox's spilling | +| spark.gluten.sql.columnar.backend.velox.ssdCacheIOThreads | 1 | The IO threads for cache promoting | +| spark.gluten.sql.columnar.backend.velox.ssdCachePath | /tmp | The folder to store the cache files, better on SSD | +| spark.gluten.sql.columnar.backend.velox.ssdCacheShards | 1 | The cache shards | +| spark.gluten.sql.columnar.backend.velox.ssdCacheSize | 1GB | The SSD cache size, will do memory caching only if this value = 0 | +| spark.gluten.sql.columnar.backend.velox.ssdCheckpointIntervalBytes | 0 | Checkpoint after every 'checkpointIntervalBytes' for SSD cache. 0 means no checkpointing. | +| spark.gluten.sql.columnar.backend.velox.ssdChecksumEnabled | false | If true, checksum write to SSD is enabled. | +| spark.gluten.sql.columnar.backend.velox.ssdChecksumReadVerificationEnabled | false | If true, checksum read verification from SSD is enabled. | +| spark.gluten.sql.columnar.backend.velox.ssdDisableFileCow | false | True if copy on write should be disabled. | +| spark.gluten.sql.columnar.backend.velox.ssdODirect | false | The O_DIRECT flag for cache writing | +| spark.gluten.sql.columnar.backend.velox.valueStream.dynamicFilter.enabled | false | Whether to apply dynamic filters pushed down from hash probe in the ValueStream (shuffle reader) operator to filter rows before they reach the hash join. | +| spark.gluten.sql.enable.enhancedFeatures | true | Enable some features including iceberg native write and other features. | +| spark.gluten.sql.rewrite.castArrayToString | true | When true, rewrite `cast(array as String)` to `concat('[', array_join(array, ', ', null), ']')` to allow offloading to Velox. | +| spark.gluten.velox.broadcast.build.targetBytesPerThread | 32MB | It is used to calculate the number of hash table build threads. Based on our testing across various thresholds (1MB to 128MB), we recommend a value of 32MB or 64MB, as these consistently provided the most significant performance gains. | +| spark.gluten.velox.castFromVarcharAddTrimNode | false | If true, will add a trim node which has the same semantic as vanilla Spark to CAST-from-varchar.Otherwise, do nothing. | ## Gluten Velox backend *experimental* configurations