Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ package org.apache.comet.rules
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.sideBySide
import org.apache.spark.sql.comet.{CometCollectLimitExec, CometColumnarToRowExec, CometNativeColumnarToRowExec, CometNativeWriteExec, CometPlan, CometScanExec, CometSparkToColumnarExec}
import org.apache.spark.sql.comet.{CometCollectLimitExec, CometColumnarToRowExec, CometNativeColumnarToRowExec, CometNativeWriteExec, CometPlan, CometSparkToColumnarExec}
import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometShuffleExchangeExec}
import org.apache.spark.sql.execution.{ColumnarToRowExec, RowToColumnarExec, SparkPlan}
import org.apache.spark.sql.execution.adaptive.QueryStageExec
Expand Down Expand Up @@ -139,35 +139,12 @@ case class EliminateRedundantTransitions(session: SparkSession) extends Rule[Spa
private def createColumnarToRowExec(child: SparkPlan): SparkPlan = {
val schema = child.schema
val useNative = CometConf.COMET_NATIVE_COLUMNAR_TO_ROW_ENABLED.get() &&
CometNativeColumnarToRowExec.supportsSchema(schema) &&
!hasScanUsingMutableBuffers(child)
CometNativeColumnarToRowExec.supportsSchema(schema)

if (useNative) {
CometNativeColumnarToRowExec(child)
} else {
CometColumnarToRowExec(child)
}
}

/**
* Checks if the plan contains a scan that uses mutable buffers. Native C2R is not compatible
* with such scans because the buffers may be modified after C2R reads them.
*
* This includes:
* - CometScanExec with native_iceberg_compat and partition columns - uses
* ConstantColumnReader
*/
private def hasScanUsingMutableBuffers(op: SparkPlan): Boolean = {
op match {
case c: QueryStageExec => hasScanUsingMutableBuffers(c.plan)
case c: ReusedExchangeExec => hasScanUsingMutableBuffers(c.child)
case _ =>
op.exists {
case scan: CometScanExec =>
scan.scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT &&
scan.relation.partitionSchema.nonEmpty
case _ => false
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CometColumnarToRow
CometNativeColumnarToRow
+- CometTakeOrderedAndProject
+- CometProject
+- CometBroadcastHashJoin
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
TakeOrderedAndProject
+- HashAggregate
+- CometColumnarToRow
+- CometNativeColumnarToRow
+- CometColumnarExchange
+- HashAggregate
+- Project
Expand All @@ -11,7 +11,7 @@ TakeOrderedAndProject
: : +- Filter
: : +- BroadcastHashJoin
: : :- BroadcastHashJoin [COMET: Unsupported join type ExistenceJoin(exists#1)]
: : : :- CometColumnarToRow
: : : :- CometNativeColumnarToRow
: : : : +- CometBroadcastHashJoin
: : : : :- CometFilter
: : : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer
Expand All @@ -30,7 +30,7 @@ TakeOrderedAndProject
: : : : +- CometFilter
: : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim
: : : +- BroadcastExchange
: : : +- CometColumnarToRow
: : : +- CometNativeColumnarToRow
: : : +- CometProject
: : : +- CometBroadcastHashJoin
: : : :- CometScan [native_iceberg_compat] parquet spark_catalog.default.web_sales
Expand All @@ -40,7 +40,7 @@ TakeOrderedAndProject
: : : +- CometFilter
: : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim
: : +- BroadcastExchange
: : +- CometColumnarToRow
: : +- CometNativeColumnarToRow
: : +- CometProject
: : +- CometBroadcastHashJoin
: : :- CometScan [native_iceberg_compat] parquet spark_catalog.default.catalog_sales
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CometColumnarToRow
CometNativeColumnarToRow
+- CometTakeOrderedAndProject
+- CometProject
+- CometBroadcastHashJoin
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
TakeOrderedAndProject
+- Project
+- Window [COMET: WindowExec is not fully compatible with Spark (Native WindowExec has known correctness issues). To enable it anyway, set spark.comet.operator.WindowExec.allowIncompatible=true. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html).]
+- CometColumnarToRow
+- CometNativeColumnarToRow
+- CometSort
+- CometExchange
+- CometHashAggregate
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CometColumnarToRow
CometNativeColumnarToRow
+- CometHashAggregate
+- CometExchange
+- CometHashAggregate
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CometColumnarToRow
CometNativeColumnarToRow
+- CometTakeOrderedAndProject
+- CometHashAggregate
+- CometExchange
Expand All @@ -8,7 +8,7 @@ CometColumnarToRow
:- CometProject
: +- CometFilter
: : +- Subquery
: : +- CometColumnarToRow
: : +- CometNativeColumnarToRow
: : +- CometHashAggregate
: : +- CometExchange
: : +- CometHashAggregate
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
CometColumnarToRow
CometNativeColumnarToRow
+- CometTakeOrderedAndProject
+- CometBroadcastHashJoin
:- CometFilter
: : +- Subquery
: : +- CometColumnarToRow
: : +- CometNativeColumnarToRow
: : +- CometHashAggregate
: : +- CometExchange
: : +- CometHashAggregate
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CometColumnarToRow
CometNativeColumnarToRow
+- CometTakeOrderedAndProject
+- CometHashAggregate
+- CometExchange
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
CometColumnarToRow
CometNativeColumnarToRow
+- CometHashAggregate
+- CometColumnarExchange
+- HashAggregate
+- HashAggregate [COMET: Unsupported aggregation mode PartialMerge]
+- CometColumnarToRow
+- CometNativeColumnarToRow
+- CometHashAggregate
+- CometProject
+- CometBroadcastHashJoin
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CometColumnarToRow
CometNativeColumnarToRow
+- CometTakeOrderedAndProject
+- CometHashAggregate
+- CometExchange
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CometColumnarToRow
CometNativeColumnarToRow
+- CometTakeOrderedAndProject
+- CometHashAggregate
+- CometExchange
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CometColumnarToRow
CometNativeColumnarToRow
+- CometTakeOrderedAndProject
+- CometHashAggregate
+- CometExchange
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CometColumnarToRow
CometNativeColumnarToRow
+- CometSort
+- CometExchange
+- CometProject
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
TakeOrderedAndProject
+- Project
+- Window [COMET: WindowExec is not fully compatible with Spark (Native WindowExec has known correctness issues). To enable it anyway, set spark.comet.operator.WindowExec.allowIncompatible=true. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html).]
+- CometColumnarToRow
+- CometNativeColumnarToRow
+- CometSort
+- CometExchange
+- CometHashAggregate
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CometColumnarToRow
CometNativeColumnarToRow
+- CometTakeOrderedAndProject
+- CometFilter
+- CometHashAggregate
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CometColumnarToRow
CometNativeColumnarToRow
+- CometTakeOrderedAndProject
+- CometHashAggregate
+- CometExchange
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CometColumnarToRow
CometNativeColumnarToRow
+- CometHashAggregate
+- CometExchange
+- CometHashAggregate
Expand Down Expand Up @@ -47,7 +47,7 @@ CometColumnarToRow
: : +- CometProject
: : +- CometFilter
: : : +- Subquery
: : : +- CometColumnarToRow
: : : +- CometNativeColumnarToRow
: : : +- CometHashAggregate
: : : +- CometExchange
: : : +- CometHashAggregate
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CometColumnarToRow
CometNativeColumnarToRow
+- CometTakeOrderedAndProject
+- CometUnion
:- CometHashAggregate
Expand Down Expand Up @@ -50,7 +50,7 @@ CometColumnarToRow
: : : +- CometProject
: : : +- CometFilter
: : : : +- Subquery
: : : : +- CometColumnarToRow
: : : : +- CometNativeColumnarToRow
: : : : +- CometHashAggregate
: : : : +- CometExchange
: : : : +- CometHashAggregate
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
Filter
: +- Subquery
: +- HashAggregate
: +- CometColumnarToRow
: +- CometNativeColumnarToRow
: +- CometColumnarExchange
: +- HashAggregate
: +- HashAggregate
: +- CometColumnarToRow
: +- CometNativeColumnarToRow
: +- CometColumnarExchange
: +- HashAggregate
: +- Project
: +- BroadcastHashJoin [COMET: Comet is not compatible with Spark for case conversion in locale-specific cases. Set spark.comet.caseConversion.enabled=true to enable it anyway.]
: :- CometColumnarToRow
: :- CometNativeColumnarToRow
: : +- CometProject
: : +- CometBroadcastHashJoin
: : :- CometProject
Expand Down Expand Up @@ -47,16 +47,16 @@ Filter
: +- CometFilter
: +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer_address
+- HashAggregate
+- CometColumnarToRow
+- CometNativeColumnarToRow
+- CometColumnarExchange
+- HashAggregate
+- HashAggregate
+- CometColumnarToRow
+- CometNativeColumnarToRow
+- CometColumnarExchange
+- HashAggregate
+- Project
+- BroadcastHashJoin [COMET: Comet is not compatible with Spark for case conversion in locale-specific cases. Set spark.comet.caseConversion.enabled=true to enable it anyway.]
:- CometColumnarToRow
:- CometNativeColumnarToRow
: +- CometProject
: +- CometBroadcastHashJoin
: :- CometProject
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
Filter
: +- Subquery
: +- HashAggregate
: +- CometColumnarToRow
: +- CometNativeColumnarToRow
: +- CometColumnarExchange
: +- HashAggregate
: +- HashAggregate
: +- CometColumnarToRow
: +- CometNativeColumnarToRow
: +- CometColumnarExchange
: +- HashAggregate
: +- Project
: +- BroadcastHashJoin [COMET: Comet is not compatible with Spark for case conversion in locale-specific cases. Set spark.comet.caseConversion.enabled=true to enable it anyway.]
: :- CometColumnarToRow
: :- CometNativeColumnarToRow
: : +- CometProject
: : +- CometBroadcastHashJoin
: : :- CometProject
Expand Down Expand Up @@ -47,16 +47,16 @@ Filter
: +- CometFilter
: +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer_address
+- HashAggregate
+- CometColumnarToRow
+- CometNativeColumnarToRow
+- CometColumnarExchange
+- HashAggregate
+- HashAggregate
+- CometColumnarToRow
+- CometNativeColumnarToRow
+- CometColumnarExchange
+- HashAggregate
+- Project
+- BroadcastHashJoin [COMET: Comet is not compatible with Spark for case conversion in locale-specific cases. Set spark.comet.caseConversion.enabled=true to enable it anyway.]
:- CometColumnarToRow
:- CometNativeColumnarToRow
: +- CometProject
: +- CometBroadcastHashJoin
: :- CometProject
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CometColumnarToRow
CometNativeColumnarToRow
+- CometTakeOrderedAndProject
+- CometHashAggregate
+- CometExchange
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CometColumnarToRow
CometNativeColumnarToRow
+- CometTakeOrderedAndProject
+- CometHashAggregate
+- CometExchange
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CometColumnarToRow
CometNativeColumnarToRow
+- CometTakeOrderedAndProject
+- CometHashAggregate
+- CometExchange
Expand Down
Loading
Loading