Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public static ColumnarBatchOutIterator create(
int minOutputBatchSize,
int maxOutputBatchSize,
long preferredBatchBytes,
boolean enableCopyRanges,
Iterator<ColumnarBatch> in) {
final Runtime runtime =
Runtimes.contextInstance(BackendsApiManager.getBackendName(), "VeloxBatchResizer");
Expand All @@ -40,6 +41,7 @@ public static ColumnarBatchOutIterator create(
minOutputBatchSize,
maxOutputBatchSize,
preferredBatchBytes,
enableCopyRanges,
new ColumnarBatchInIterator(BackendsApiManager.getBackendName(), in));
return new ColumnarBatchOutIterator(runtime, outHandle);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,6 @@ public native long create(
int minOutputBatchSize,
int maxOutputBatchSize,
long preferredBatchBytes,
boolean enableCopyRanges,
ColumnarBatchInIterator itr);
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) {
def veloxResizeBatchesShuffleOutput: Boolean =
getConf(COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_OUTPUT)

def enableVeloxResizeBatchesCopyRanges: Boolean =
getConf(COLUMNAR_VELOX_RESIZE_BATCHES_COPY_RANGES_ENABLED)

case class ResizeRange(min: Int, max: Int) {
assert(max >= min)
assert(min > 0, "Min batch size should be larger than 0")
Expand Down Expand Up @@ -322,6 +325,24 @@ object VeloxConfig extends ConfigRegistry {
.booleanConf
.createWithDefault(false)

val COLUMNAR_VELOX_RESIZE_BATCHES_COPY_RANGES_ENABLED =
buildConf("spark.gluten.sql.columnar.backend.velox.resizeBatches.copyRanges.enabled")
.doc(
"Enables a VeloxResizeBatchesExec fast path that combines eligible batches using " +
"Velox vector copyRanges instead of generic RowVector append. When possible, it " +
"collects the small input batches for one VeloxResizeBatchesExec output, allocates " +
"the output RowVector once, and bulk-copies child vector ranges. This is most useful " +
"for shuffle-read outputs where plain hash shuffle payloads are materialized as " +
"dense flat vectors. Complex vectors can also use copyRanges, but ARRAY and MAP " +
"still rebuild nested offsets and sizes while bulk-copying child ranges. Unsupported " +
"encodings such as dictionary and constant vectors fall back to the generic copy " +
"path. This option is enabled by default and complements the reader-side raw " +
"payload merge fast path: that path avoids materializing small plain payload " +
"batches, while this option optimizes VeloxResizeBatchesExec when that operator " +
"is enabled.")
.booleanConf
.createWithDefault(true)

val COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_MIN_SIZE =
buildConf("spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.minSize")
.doc(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.gluten.execution

import org.apache.gluten.backendsapi.velox.VeloxBatchType
import org.apache.gluten.config.VeloxConfig
import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.gluten.iterator.ClosableIterator
import org.apache.gluten.utils.VeloxBatchResizer
Expand All @@ -41,7 +42,12 @@ case class VeloxResizeBatchesExec(

override protected def mapIterator(in: Iterator[ColumnarBatch]): Iterator[ColumnarBatch] = {
VeloxBatchResizer
.create(minOutputBatchSize, maxOutputBatchSize, preferredBatchBytes, in.asJava)
.create(
minOutputBatchSize,
maxOutputBatchSize,
preferredBatchBytes,
VeloxConfig.get.enableVeloxResizeBatchesCopyRanges,
in.asJava)
.asScala
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,9 @@ class AllVeloxConfiguration extends AnyFunSuite {
builder.toMarkdown,
"dev/gen-all-config-docs.sh")
}

test("Velox resize batches copyRanges is enabled by default") {
assert(
VeloxConfig.COLUMNAR_VELOX_RESIZE_BATCHES_COPY_RANGES_ENABLED.defaultValue.contains(true))
}
}
2 changes: 2 additions & 0 deletions cpp/velox/benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,5 @@ add_velox_benchmark(generic_benchmark GenericBenchmark.cc)
add_velox_benchmark(parquet_write_benchmark ParquetWriteBenchmark.cc)

add_velox_benchmark(plan_validator_util PlanValidatorUtil.cc)

add_velox_benchmark(velox_batch_resizer_benchmark VeloxBatchResizerBenchmark.cc)
Loading
Loading