From 93ae842eda8215275964cc33d171f3de82d927c0 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 17 Mar 2026 09:42:46 -0600 Subject: [PATCH 1/9] run Spark 3.4 tests with native_datafusion scan --- .github/workflows/spark_sql_test.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/spark_sql_test.yml b/.github/workflows/spark_sql_test.yml index 4d777cda87..3a763d3219 100644 --- a/.github/workflows/spark_sql_test.yml +++ b/.github/workflows/spark_sql_test.yml @@ -126,6 +126,7 @@ jobs: # - native_iceberg_compat: Spark 3.5 only config: - {spark-short: '3.4', spark-full: '3.4.3', java: 11, scan-impl: 'auto'} + - {spark-short: '3.4', spark-full: '3.4.3', java: 11, scan-impl: 'native_datafusion'} - {spark-short: '3.5', spark-full: '3.5.8', java: 11, scan-impl: 'auto'} - {spark-short: '3.5', spark-full: '3.5.8', java: 11, scan-impl: 'native_datafusion'} - {spark-short: '4.0', spark-full: '4.0.1', java: 17, scan-impl: 'auto'} From 7fd965560b193e6b2ed7cc577d807a13b3763d23 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 17 Mar 2026 11:36:37 -0600 Subject: [PATCH 2/9] fix: update 3.4.3 diff to ignore failing native_datafusion tests Tag tests with IgnoreCometNativeDataFusion and IgnoreCometNativeScan to match tags already applied in the 3.5.8 diff, plus 3 tests that are specific to Spark 3.4. --- dev/diffs/3.4.3.diff | 228 +++++++++++++++++++++++++++++++++++-------- 1 file changed, 187 insertions(+), 41 deletions(-) diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff index 8738b3813a..6ef58a750a 100644 --- a/dev/diffs/3.4.3.diff +++ b/dev/diffs/3.4.3.diff @@ -1,5 +1,5 @@ diff --git a/pom.xml b/pom.xml -index d3544881af1..9c16099090c 100644 +index d3544881af1..377683b10c5 100644 --- a/pom.xml +++ b/pom.xml @@ -148,6 +148,8 @@ @@ -417,18 +417,19 @@ index daef11ae4d6..9f3cc9181f2 100644 assert(exchanges.size == 2) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala -index f33432ddb6f..1925aac8d97 100644 +index f33432ddb6f..881eab7417c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala -@@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen +@@ -22,6 +22,8 @@ import org.scalatest.GivenWhenThen import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Expression} import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode._ import org.apache.spark.sql.catalyst.plans.ExistenceJoin ++import org.apache.spark.sql.IgnoreCometNativeDataFusion +import org.apache.spark.sql.comet.CometScanExec import org.apache.spark.sql.connector.catalog.{InMemoryTableCatalog, InMemoryTableWithV2FilterCatalog} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive._ -@@ -262,6 +263,9 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -262,6 +264,9 @@ abstract class DynamicPartitionPruningSuiteBase case s: BatchScanExec => s.runtimeFilters.collect { case d: DynamicPruningExpression => d.child } @@ -438,7 +439,7 @@ index f33432ddb6f..1925aac8d97 100644 case _ => Nil } } -@@ -755,7 +759,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -755,7 +760,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -448,7 +449,7 @@ index f33432ddb6f..1925aac8d97 100644 Given("disable broadcast pruning and disable subquery duplication") withSQLConf( SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true", -@@ -1027,7 +1032,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1027,7 +1033,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -458,7 +459,7 @@ index f33432ddb6f..1925aac8d97 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { withTable("large", "dimTwo", "dimThree") { -@@ -1215,7 +1221,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1215,7 +1222,8 @@ abstract class DynamicPartitionPruningSuiteBase } test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " + @@ -468,7 +469,7 @@ index f33432ddb6f..1925aac8d97 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { val df = sql( -@@ -1423,7 +1430,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1423,7 +1431,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -478,7 +479,17 @@ index f33432ddb6f..1925aac8d97 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { val df = sql( """ WITH v as ( -@@ -1729,6 +1737,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat +@@ -1698,7 +1707,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat + * Check the static scan metrics with and without DPP + */ + test("static scan metrics", +- DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) { ++ DisableAdaptiveExecution("DPP in AQE must reuse broadcast"), ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", + SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", + SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") { +@@ -1729,6 +1739,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat case s: BatchScanExec => // we use f1 col for v2 tables due to schema pruning s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1"))) @@ -513,18 +524,29 @@ index a6b295578d6..91acca4306f 100644 test("SPARK-35884: Explain Formatted") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala -index 2796b1cf154..52438178a0e 100644 +index 2796b1cf154..f5e2fdfe450 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala -@@ -33,6 +33,7 @@ import org.apache.spark.sql.TestingUDT.{IntervalUDT, NullData, NullUDT} +@@ -33,6 +33,8 @@ import org.apache.spark.sql.TestingUDT.{IntervalUDT, NullData, NullUDT} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterThan, Literal} import org.apache.spark.sql.catalyst.expressions.IntegralLiteralTestUtils.{negativeInt, positiveInt} import org.apache.spark.sql.catalyst.plans.logical.Filter ++import org.apache.spark.sql.IgnoreCometNativeDataFusion +import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec, CometScanExec, CometSortMergeJoinExec} import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.datasources.FilePartition -@@ -815,6 +816,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -499,7 +501,8 @@ class FileBasedDataSourceSuite extends QueryTest + } + + Seq("parquet", "orc").foreach { format => +- test(s"Spark native readers should respect spark.sql.caseSensitive - ${format}") { ++ test(s"Spark native readers should respect spark.sql.caseSensitive - ${format}", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + withTempDir { dir => + val tableName = s"spark_25132_${format}_native" + val tableDir = dir.getCanonicalPath + s"/$tableName" +@@ -815,6 +818,7 @@ class FileBasedDataSourceSuite extends QueryTest assert(bJoinExec.isEmpty) val smJoinExec = collect(joinedDF.queryExecution.executedPlan) { case smJoin: SortMergeJoinExec => smJoin @@ -532,7 +554,7 @@ index 2796b1cf154..52438178a0e 100644 } assert(smJoinExec.nonEmpty) } -@@ -875,6 +877,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -875,6 +879,7 @@ class FileBasedDataSourceSuite extends QueryTest val fileScan = df.queryExecution.executedPlan collectFirst { case BatchScanExec(_, f: FileScan, _, _, _, _, _, _, _) => f @@ -540,7 +562,7 @@ index 2796b1cf154..52438178a0e 100644 } assert(fileScan.nonEmpty) assert(fileScan.get.partitionFilters.nonEmpty) -@@ -916,6 +919,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -916,6 +921,7 @@ class FileBasedDataSourceSuite extends QueryTest val fileScan = df.queryExecution.executedPlan collectFirst { case BatchScanExec(_, f: FileScan, _, _, _, _, _, _, _) => f @@ -548,7 +570,7 @@ index 2796b1cf154..52438178a0e 100644 } assert(fileScan.nonEmpty) assert(fileScan.get.partitionFilters.isEmpty) -@@ -1100,6 +1104,9 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1100,6 +1106,9 @@ class FileBasedDataSourceSuite extends QueryTest val filters = df.queryExecution.executedPlan.collect { case f: FileSourceScanLike => f.dataFilters case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters @@ -981,7 +1003,7 @@ index 48ad10992c5..51d1ee65422 100644 extensions.injectColumnar(session => MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule())) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala -index 18123a4d6ec..fbe4c766eee 100644 +index 18123a4d6ec..0fe185baa33 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala @@ -17,6 +17,8 @@ @@ -1983,10 +2005,18 @@ index 07e2849ce6f..3e73645b638 100644 ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala -index 104b4e416cd..37ea65081e4 100644 +index 104b4e416cd..8e618094d91 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala -@@ -1096,7 +1096,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -38,6 +38,7 @@ import org.apache.parquet.schema.MessageType + + import org.apache.spark.{SparkConf, SparkException} + import org.apache.spark.sql._ ++import org.apache.spark.sql.{IgnoreCometNativeDataFusion, IgnoreCometNativeScan} + import org.apache.spark.sql.catalyst.dsl.expressions._ + import org.apache.spark.sql.catalyst.expressions._ + import org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints +@@ -1096,7 +1097,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared // When a filter is pushed to Parquet, Parquet can apply it to every row. // So, we can check the number of rows returned from the Parquet // to make sure our filter pushdown work. @@ -1999,7 +2029,7 @@ index 104b4e416cd..37ea65081e4 100644 } } } -@@ -1499,7 +1503,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1499,7 +1504,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2009,7 +2039,7 @@ index 104b4e416cd..37ea65081e4 100644 import testImplicits._ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true", -@@ -1581,7 +1586,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1581,7 +1587,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared // than the total length but should not be a single record. // Note that, if record level filtering is enabled, it should be a single record. // If no filter is pushed down to Parquet, it should be the total length of data. @@ -2022,7 +2052,7 @@ index 104b4e416cd..37ea65081e4 100644 } } } -@@ -1608,7 +1617,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1608,7 +1618,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared // than the total length but should not be a single record. // Note that, if record level filtering is enabled, it should be a single record. // If no filter is pushed down to Parquet, it should be the total length of data. @@ -2035,7 +2065,16 @@ index 104b4e416cd..37ea65081e4 100644 } } } -@@ -1744,7 +1757,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1700,7 +1714,7 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared + (attr, value) => sources.StringContains(attr, value)) + } + +- test("filter pushdown - StringPredicate") { ++ test("filter pushdown - StringPredicate", IgnoreCometNativeScan("cannot be pushed down")) { + import testImplicits._ + // keep() should take effect on StartsWith/EndsWith/Contains + Seq( +@@ -1744,7 +1758,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2045,7 +2084,17 @@ index 104b4e416cd..37ea65081e4 100644 val schema = StructType(Seq( StructField("a", IntegerType, nullable = false) )) -@@ -1985,7 +1999,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1934,7 +1949,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared + } + } + +- test("SPARK-25207: exception when duplicate fields in case-insensitive mode") { ++ test("SPARK-25207: exception when duplicate fields in case-insensitive mode", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + withTempPath { dir => + val count = 10 + val tableName = "spark_25207" +@@ -1985,7 +2001,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2055,7 +2104,7 @@ index 104b4e416cd..37ea65081e4 100644 // block 1: // null count min max // page-0 0 0 99 -@@ -2045,7 +2060,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -2045,7 +2062,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2065,7 +2114,7 @@ index 104b4e416cd..37ea65081e4 100644 withTempPath { dir => val path = dir.getCanonicalPath spark.range(100).selectExpr("id * 2 AS id") -@@ -2277,7 +2293,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite { +@@ -2277,7 +2295,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite { assert(pushedParquetFilters.exists(_.getClass === filterClass), s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") @@ -2078,7 +2127,7 @@ index 104b4e416cd..37ea65081e4 100644 } else { assert(selectedFilters.isEmpty, "There is filter pushed down") } -@@ -2337,7 +2357,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite { +@@ -2337,7 +2359,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite { assert(pushedParquetFilters.exists(_.getClass === filterClass), s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") @@ -2092,10 +2141,38 @@ index 104b4e416cd..37ea65081e4 100644 case _ => throw new AnalysisException("Can not match ParquetTable in the query.") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -index 8670d95c65e..b624c3811dd 100644 +index 8670d95c65e..45669c0b664 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -@@ -1335,7 +1335,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession +@@ -41,6 +41,7 @@ import org.apache.parquet.schema.{MessageType, MessageTypeParser} + + import org.apache.spark.{SPARK_VERSION_SHORT, SparkException, TestUtils} + import org.apache.spark.sql._ ++import org.apache.spark.sql.IgnoreCometNativeDataFusion + import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection} + import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow} + import org.apache.spark.sql.catalyst.util.DateTimeUtils +@@ -1064,7 +1065,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession + } + } + +- test("SPARK-35640: read binary as timestamp should throw schema incompatible error") { ++ test("SPARK-35640: read binary as timestamp should throw schema incompatible error", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + val data = (1 to 4).map(i => Tuple1(i.toString)) + val readSchema = StructType(Seq(StructField("_1", DataTypes.TimestampType))) + +@@ -1075,7 +1077,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession + } + } + +- test("SPARK-35640: int as long should throw schema incompatible error") { ++ test("SPARK-35640: int as long should throw schema incompatible error", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + val data = (1 to 4).map(i => Tuple1(i)) + val readSchema = StructType(Seq(StructField("_1", DataTypes.LongType))) + +@@ -1335,7 +1338,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } @@ -2106,10 +2183,28 @@ index 8670d95c65e..b624c3811dd 100644 checkAnswer( // "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -index 29cb224c878..44837aa953b 100644 +index 29cb224c878..d16c22b73a9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -@@ -978,7 +978,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat + + import org.apache.spark.{DebugFilesystem, SparkConf, SparkException} + import org.apache.spark.sql._ ++import org.apache.spark.sql.IgnoreCometNativeDataFusion + import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} + import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow + import org.apache.spark.sql.catalyst.util.ArrayData +@@ -185,7 +186,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS + } + } + +- test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") { ++ test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + val data = (1 to 1000).map { i => + val ts = new java.sql.Timestamp(i) + Row(ts) +@@ -978,7 +980,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS } } @@ -2119,7 +2214,17 @@ index 29cb224c878..44837aa953b 100644 withAllParquetReaders { withTempPath { path => // Repeated values for dictionary encoding. -@@ -1047,7 +1048,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1031,7 +1034,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS + testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96") + } + +- test("SPARK-34212 Parquet should read decimals correctly") { ++ test("SPARK-34212 Parquet should read decimals correctly", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + def readParquet(schema: String, path: File): DataFrame = { + spark.read.schema(schema).parquet(path.toString) + } +@@ -1047,7 +1051,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS checkAnswer(readParquet(schema, path), df) } @@ -2129,7 +2234,7 @@ index 29cb224c878..44837aa953b 100644 val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)" checkAnswer(readParquet(schema1, path), df) val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)" -@@ -1069,7 +1071,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1069,7 +1074,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, CAST('1.2' AS BINARY) d") df.write.parquet(path.toString) @@ -2139,7 +2244,17 @@ index 29cb224c878..44837aa953b 100644 checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null)) checkAnswer(readParquet("b DECIMAL(11, 1)", path), sql("SELECT 123456.0")) -@@ -1128,7 +1131,7 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1113,7 +1119,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS + } + } + +- test("row group skipping doesn't overflow when reading into larger type") { ++ test("row group skipping doesn't overflow when reading into larger type", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + withTempPath { path => + Seq(0).toDF("a").write.parquet(path.toString) + // The vectorized and non-vectorized readers will produce different exceptions, we don't need +@@ -1128,7 +1135,7 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS .where(s"a < ${Long.MaxValue}") .collect() } @@ -2244,14 +2359,14 @@ index 5c0b7def039..151184bc98c 100644 assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size, s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala -index bf5c51b89bb..ca22370ca3b 100644 +index bf5c51b89bb..6ff4d1ba18b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -27,6 +27,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.parquet.schema.Type._ import org.apache.spark.SparkException -+import org.apache.spark.sql.IgnoreComet ++import org.apache.spark.sql.{IgnoreComet, IgnoreCometNativeDataFusion} import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException import org.apache.spark.sql.functions.desc @@ -2265,6 +2380,26 @@ index bf5c51b89bb..ca22370ca3b 100644 withTempPath { dir => val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = false) val expectedMessage = "Encountered error while reading file" +@@ -1026,7 +1028,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest { + } + } + +- test("schema mismatch failure error message for parquet vectorized reader") { ++ test("schema mismatch failure error message for parquet vectorized reader", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + withTempPath { dir => + val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true) + assert(e.getCause.isInstanceOf[SparkException]) +@@ -1067,7 +1070,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest { + } + } + +- test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array") { ++ test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + import testImplicits._ + + withTempPath { dir => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala index 3a0bd35cb70..b28f06a757f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala @@ -2314,18 +2449,29 @@ index 26e61c6b58d..cb09d7e116a 100644 spark.range(10).selectExpr("id", "id % 3 as p") .write.partitionBy("p").saveAsTable("testDataForScan") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala -index 0ab8691801d..d9125f658ad 100644 +index 0ab8691801d..f1c4b3d92b1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala -@@ -18,6 +18,7 @@ +@@ -18,6 +18,8 @@ package org.apache.spark.sql.execution.python import org.apache.spark.sql.catalyst.plans.logical.{ArrowEvalPython, BatchEvalPython, Limit, LocalLimit} ++import org.apache.spark.sql.IgnoreCometNativeDataFusion +import org.apache.spark.sql.comet._ import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan, SparkPlanTest} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan -@@ -108,6 +109,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -93,7 +95,8 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { + assert(arrowEvalNodes.size == 2) + } + +- test("Python UDF should not break column pruning/filter pushdown -- Parquet V1") { ++ test("Python UDF should not break column pruning/filter pushdown -- Parquet V1", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { + withTempPath { f => + spark.range(10).select($"id".as("a"), $"id".as("b")) +@@ -108,6 +111,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: FileSourceScanExec => scan @@ -2333,7 +2479,7 @@ index 0ab8691801d..d9125f658ad 100644 } assert(scanNodes.length == 1) assert(scanNodes.head.output.map(_.name) == Seq("a")) -@@ -120,11 +122,16 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -120,11 +124,16 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: FileSourceScanExec => scan @@ -2352,7 +2498,7 @@ index 0ab8691801d..d9125f658ad 100644 } } } -@@ -145,6 +152,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -145,6 +154,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: BatchScanExec => scan @@ -2360,7 +2506,7 @@ index 0ab8691801d..d9125f658ad 100644 } assert(scanNodes.length == 1) assert(scanNodes.head.output.map(_.name) == Seq("a")) -@@ -157,6 +165,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -157,6 +167,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: BatchScanExec => scan From f5f321680d95de1edc8cb154ef49427832d9ab5d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 18 Mar 2026 06:15:56 -0600 Subject: [PATCH 3/9] fix: remove redundant IgnoreCometNativeDataFusion imports from 3.4.3 diff Remove imports of IgnoreCometNativeDataFusion in DynamicPartitionPruningSuite and FileBasedDataSourceSuite since both files are in the org.apache.spark.sql package where IgnoreCometNativeDataFusion is defined, making the import redundant and causing "permanently hidden" compilation errors. --- dev/diffs/3.4.3.diff | 34 ++++++++++++++++------------------ 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff index 6ef58a750a..b0f4f1e4fb 100644 --- a/dev/diffs/3.4.3.diff +++ b/dev/diffs/3.4.3.diff @@ -417,19 +417,18 @@ index daef11ae4d6..9f3cc9181f2 100644 assert(exchanges.size == 2) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala -index f33432ddb6f..881eab7417c 100644 +index f33432ddb6f..81b83a7acd1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala -@@ -22,6 +22,8 @@ import org.scalatest.GivenWhenThen +@@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Expression} import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode._ import org.apache.spark.sql.catalyst.plans.ExistenceJoin -+import org.apache.spark.sql.IgnoreCometNativeDataFusion +import org.apache.spark.sql.comet.CometScanExec import org.apache.spark.sql.connector.catalog.{InMemoryTableCatalog, InMemoryTableWithV2FilterCatalog} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive._ -@@ -262,6 +264,9 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -262,6 +263,9 @@ abstract class DynamicPartitionPruningSuiteBase case s: BatchScanExec => s.runtimeFilters.collect { case d: DynamicPruningExpression => d.child } @@ -439,7 +438,7 @@ index f33432ddb6f..881eab7417c 100644 case _ => Nil } } -@@ -755,7 +760,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -755,7 +759,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -449,7 +448,7 @@ index f33432ddb6f..881eab7417c 100644 Given("disable broadcast pruning and disable subquery duplication") withSQLConf( SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true", -@@ -1027,7 +1033,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1027,7 +1032,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -459,7 +458,7 @@ index f33432ddb6f..881eab7417c 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { withTable("large", "dimTwo", "dimThree") { -@@ -1215,7 +1222,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1215,7 +1221,8 @@ abstract class DynamicPartitionPruningSuiteBase } test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " + @@ -469,7 +468,7 @@ index f33432ddb6f..881eab7417c 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { val df = sql( -@@ -1423,7 +1431,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1423,7 +1430,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -479,7 +478,7 @@ index f33432ddb6f..881eab7417c 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { val df = sql( """ WITH v as ( -@@ -1698,7 +1707,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat +@@ -1698,7 +1706,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat * Check the static scan metrics with and without DPP */ test("static scan metrics", @@ -489,7 +488,7 @@ index f33432ddb6f..881eab7417c 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") { -@@ -1729,6 +1739,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat +@@ -1729,6 +1738,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat case s: BatchScanExec => // we use f1 col for v2 tables due to schema pruning s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1"))) @@ -524,19 +523,18 @@ index a6b295578d6..91acca4306f 100644 test("SPARK-35884: Explain Formatted") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala -index 2796b1cf154..f5e2fdfe450 100644 +index 2796b1cf154..db2d16798a3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala -@@ -33,6 +33,8 @@ import org.apache.spark.sql.TestingUDT.{IntervalUDT, NullData, NullUDT} +@@ -33,6 +33,7 @@ import org.apache.spark.sql.TestingUDT.{IntervalUDT, NullData, NullUDT} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterThan, Literal} import org.apache.spark.sql.catalyst.expressions.IntegralLiteralTestUtils.{negativeInt, positiveInt} import org.apache.spark.sql.catalyst.plans.logical.Filter -+import org.apache.spark.sql.IgnoreCometNativeDataFusion +import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec, CometScanExec, CometSortMergeJoinExec} import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.datasources.FilePartition -@@ -499,7 +501,8 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -499,7 +500,8 @@ class FileBasedDataSourceSuite extends QueryTest } Seq("parquet", "orc").foreach { format => @@ -546,7 +544,7 @@ index 2796b1cf154..f5e2fdfe450 100644 withTempDir { dir => val tableName = s"spark_25132_${format}_native" val tableDir = dir.getCanonicalPath + s"/$tableName" -@@ -815,6 +818,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -815,6 +817,7 @@ class FileBasedDataSourceSuite extends QueryTest assert(bJoinExec.isEmpty) val smJoinExec = collect(joinedDF.queryExecution.executedPlan) { case smJoin: SortMergeJoinExec => smJoin @@ -554,7 +552,7 @@ index 2796b1cf154..f5e2fdfe450 100644 } assert(smJoinExec.nonEmpty) } -@@ -875,6 +879,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -875,6 +878,7 @@ class FileBasedDataSourceSuite extends QueryTest val fileScan = df.queryExecution.executedPlan collectFirst { case BatchScanExec(_, f: FileScan, _, _, _, _, _, _, _) => f @@ -562,7 +560,7 @@ index 2796b1cf154..f5e2fdfe450 100644 } assert(fileScan.nonEmpty) assert(fileScan.get.partitionFilters.nonEmpty) -@@ -916,6 +921,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -916,6 +920,7 @@ class FileBasedDataSourceSuite extends QueryTest val fileScan = df.queryExecution.executedPlan collectFirst { case BatchScanExec(_, f: FileScan, _, _, _, _, _, _, _) => f @@ -570,7 +568,7 @@ index 2796b1cf154..f5e2fdfe450 100644 } assert(fileScan.nonEmpty) assert(fileScan.get.partitionFilters.isEmpty) -@@ -1100,6 +1106,9 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1100,6 +1105,9 @@ class FileBasedDataSourceSuite extends QueryTest val filters = df.queryExecution.executedPlan.collect { case f: FileSourceScanLike => f.dataFilters case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters From cd360733abdcc27e0778e04bc92e73bfbce22770 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 18 Mar 2026 11:05:12 -0600 Subject: [PATCH 4/9] fix: check Comet ignore tags before DisableAdaptiveExecution in 3.4.3 diff The test override in SQLTestUtils checked DisableAdaptiveExecution first, so tests with both DisableAdaptiveExecution and IgnoreCometNativeDataFusion tags (like "static scan metrics") would enter the DAE branch and never check the ignore tag. Reorder to match the 3.5.8 diff: check Comet skip tags first with early returns, then handle DisableAdaptiveExecution. --- dev/diffs/3.4.3.diff | 70 +++++++++++++++++++++----------------------- 1 file changed, 33 insertions(+), 37 deletions(-) diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff index b0f4f1e4fb..e90287aa7f 100644 --- a/dev/diffs/3.4.3.diff +++ b/dev/diffs/3.4.3.diff @@ -2948,7 +2948,7 @@ index abe606ad9c1..2d930b64cca 100644 val tblTargetName = "tbl_target" val tblSourceQualified = s"default.$tblSourceName" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala -index dd55fcfe42c..a1d390c93d0 100644 +index dd55fcfe42c..99bc018008a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -27,6 +27,7 @@ import scala.concurrent.duration._ @@ -2967,46 +2967,42 @@ index dd55fcfe42c..a1d390c93d0 100644 import org.apache.spark.sql.execution.FilterExec import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution import org.apache.spark.sql.execution.datasources.DataSourceUtils -@@ -118,7 +120,7 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with - } +@@ -119,6 +121,34 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with override protected def test(testName: String, testTags: Tag*)(testFun: => Any) -- (implicit pos: Position): Unit = { -+ (implicit pos: Position): Unit = { + (implicit pos: Position): Unit = { ++ // Check Comet skip tags first, before DisableAdaptiveExecution handling ++ if (isCometEnabled && testTags.exists(_.isInstanceOf[IgnoreComet])) { ++ ignore(testName + " (disabled when Comet is on)", testTags: _*)(testFun) ++ return ++ } ++ if (isCometEnabled) { ++ val cometScanImpl = CometConf.COMET_NATIVE_SCAN_IMPL.get(conf) ++ val isNativeIcebergCompat = cometScanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT || ++ cometScanImpl == CometConf.SCAN_AUTO ++ val isNativeDataFusion = cometScanImpl == CometConf.SCAN_NATIVE_DATAFUSION || ++ cometScanImpl == CometConf.SCAN_AUTO ++ if (isNativeIcebergCompat && ++ testTags.exists(_.isInstanceOf[IgnoreCometNativeIcebergCompat])) { ++ ignore(testName + " (disabled for NATIVE_ICEBERG_COMPAT)", testTags: _*)(testFun) ++ return ++ } ++ if (isNativeDataFusion && ++ testTags.exists(_.isInstanceOf[IgnoreCometNativeDataFusion])) { ++ ignore(testName + " (disabled for NATIVE_DATAFUSION)", testTags: _*)(testFun) ++ return ++ } ++ if ((isNativeDataFusion || isNativeIcebergCompat) && ++ testTags.exists(_.isInstanceOf[IgnoreCometNativeScan])) { ++ ignore(testName + " (disabled for NATIVE_DATAFUSION and NATIVE_ICEBERG_COMPAT)", ++ testTags: _*)(testFun) ++ return ++ } ++ } if (testTags.exists(_.isInstanceOf[DisableAdaptiveExecution])) { super.test(testName, testTags: _*) { withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { -@@ -126,7 +128,28 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with - } - } - } else { -- super.test(testName, testTags: _*)(testFun) -+ if (isCometEnabled && testTags.exists(_.isInstanceOf[IgnoreComet])) { -+ ignore(testName + " (disabled when Comet is on)", testTags: _*)(testFun) -+ } else { -+ val cometScanImpl = CometConf.COMET_NATIVE_SCAN_IMPL.get(conf) -+ val isNativeIcebergCompat = cometScanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT || -+ cometScanImpl == CometConf.SCAN_AUTO -+ val isNativeDataFusion = cometScanImpl == CometConf.SCAN_NATIVE_DATAFUSION || -+ cometScanImpl == CometConf.SCAN_AUTO -+ if (isCometEnabled && isNativeIcebergCompat && -+ testTags.exists(_.isInstanceOf[IgnoreCometNativeIcebergCompat])) { -+ ignore(testName + " (disabled for NATIVE_ICEBERG_COMPAT)", testTags: _*)(testFun) -+ } else if (isCometEnabled && isNativeDataFusion && -+ testTags.exists(_.isInstanceOf[IgnoreCometNativeDataFusion])) { -+ ignore(testName + " (disabled for NATIVE_DATAFUSION)", testTags: _*)(testFun) -+ } else if (isCometEnabled && (isNativeDataFusion || isNativeIcebergCompat) && -+ testTags.exists(_.isInstanceOf[IgnoreCometNativeScan])) { -+ ignore(testName + " (disabled for NATIVE_DATAFUSION and NATIVE_ICEBERG_COMPAT)", -+ testTags: _*)(testFun) -+ } else { -+ super.test(testName, testTags: _*)(testFun) -+ } -+ } - } - } - -@@ -242,6 +265,29 @@ private[sql] trait SQLTestUtilsBase +@@ -242,6 +272,29 @@ private[sql] trait SQLTestUtilsBase protected override def _sqlContext: SQLContext = self.spark.sqlContext } @@ -3036,7 +3032,7 @@ index dd55fcfe42c..a1d390c93d0 100644 protected override def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { SparkSession.setActiveSession(spark) super.withSQLConf(pairs: _*)(f) -@@ -434,6 +480,8 @@ private[sql] trait SQLTestUtilsBase +@@ -434,6 +487,8 @@ private[sql] trait SQLTestUtilsBase val schema = df.schema val withoutFilters = df.queryExecution.executedPlan.transform { case FilterExec(_, child) => child From 9292ff93022516219584b62a849922f8671de3e1 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 22 Mar 2026 08:21:13 -0600 Subject: [PATCH 5/9] address feedback --- dev/diffs/3.4.3.diff | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff index e90287aa7f..68dc92790b 100644 --- a/dev/diffs/3.4.3.diff +++ b/dev/diffs/3.4.3.diff @@ -2003,7 +2003,7 @@ index 07e2849ce6f..3e73645b638 100644 ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala -index 104b4e416cd..8e618094d91 100644 +index 104b4e416cd..116f003a3a4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -38,6 +38,7 @@ import org.apache.parquet.schema.MessageType @@ -2063,16 +2063,17 @@ index 104b4e416cd..8e618094d91 100644 } } } -@@ -1700,7 +1714,7 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1700,7 +1714,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared (attr, value) => sources.StringContains(attr, value)) } - test("filter pushdown - StringPredicate") { -+ test("filter pushdown - StringPredicate", IgnoreCometNativeScan("cannot be pushed down")) { ++ test("filter pushdown - StringPredicate", ++ IgnoreCometNativeDataFusionScan("cannot be pushed down")) { import testImplicits._ // keep() should take effect on StartsWith/EndsWith/Contains Seq( -@@ -1744,7 +1758,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1744,7 +1759,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2082,7 +2083,7 @@ index 104b4e416cd..8e618094d91 100644 val schema = StructType(Seq( StructField("a", IntegerType, nullable = false) )) -@@ -1934,7 +1949,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1934,7 +1950,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2092,7 +2093,7 @@ index 104b4e416cd..8e618094d91 100644 withTempPath { dir => val count = 10 val tableName = "spark_25207" -@@ -1985,7 +2001,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1985,7 +2002,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2102,7 +2103,7 @@ index 104b4e416cd..8e618094d91 100644 // block 1: // null count min max // page-0 0 0 99 -@@ -2045,7 +2062,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -2045,7 +2063,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2112,7 +2113,7 @@ index 104b4e416cd..8e618094d91 100644 withTempPath { dir => val path = dir.getCanonicalPath spark.range(100).selectExpr("id * 2 AS id") -@@ -2277,7 +2295,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite { +@@ -2277,7 +2296,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite { assert(pushedParquetFilters.exists(_.getClass === filterClass), s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") @@ -2125,7 +2126,7 @@ index 104b4e416cd..8e618094d91 100644 } else { assert(selectedFilters.isEmpty, "There is filter pushed down") } -@@ -2337,7 +2359,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite { +@@ -2337,7 +2360,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite { assert(pushedParquetFilters.exists(_.getClass === filterClass), s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") From 2e92eb5f61500817f6704f6dad12077c2d6fd1dc Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 22 Mar 2026 08:45:17 -0600 Subject: [PATCH 6/9] fix --- dev/diffs/3.4.3.diff | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff index 68dc92790b..d709f08b77 100644 --- a/dev/diffs/3.4.3.diff +++ b/dev/diffs/3.4.3.diff @@ -2003,7 +2003,7 @@ index 07e2849ce6f..3e73645b638 100644 ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala -index 104b4e416cd..116f003a3a4 100644 +index 104b4e416cd..8cc41d15051 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -38,6 +38,7 @@ import org.apache.parquet.schema.MessageType @@ -2069,7 +2069,7 @@ index 104b4e416cd..116f003a3a4 100644 - test("filter pushdown - StringPredicate") { + test("filter pushdown - StringPredicate", -+ IgnoreCometNativeDataFusionScan("cannot be pushed down")) { ++ IgnoreCometNativeDataFusion("cannot be pushed down")) { import testImplicits._ // keep() should take effect on StartsWith/EndsWith/Contains Seq( From e9502bdd36e0b0a8650da667ac3fc61e042f31c0 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 22 Mar 2026 11:34:43 -0600 Subject: [PATCH 7/9] fix: retag schema mismatch tests from #3311 to #3720 in 3.4.3 diff --- dev/diffs/3.4.3.diff | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff index d709f08b77..6eb23f759e 100644 --- a/dev/diffs/3.4.3.diff +++ b/dev/diffs/3.4.3.diff @@ -2140,7 +2140,7 @@ index 104b4e416cd..8cc41d15051 100644 case _ => throw new AnalysisException("Can not match ParquetTable in the query.") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -index 8670d95c65e..45669c0b664 100644 +index 8670d95c65e..c7ba51f770f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -41,6 +41,7 @@ import org.apache.parquet.schema.{MessageType, MessageTypeParser} @@ -2157,7 +2157,7 @@ index 8670d95c65e..45669c0b664 100644 - test("SPARK-35640: read binary as timestamp should throw schema incompatible error") { + test("SPARK-35640: read binary as timestamp should throw schema incompatible error", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { val data = (1 to 4).map(i => Tuple1(i.toString)) val readSchema = StructType(Seq(StructField("_1", DataTypes.TimestampType))) @@ -2167,7 +2167,7 @@ index 8670d95c65e..45669c0b664 100644 - test("SPARK-35640: int as long should throw schema incompatible error") { + test("SPARK-35640: int as long should throw schema incompatible error", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { val data = (1 to 4).map(i => Tuple1(i)) val readSchema = StructType(Seq(StructField("_1", DataTypes.LongType))) @@ -2182,7 +2182,7 @@ index 8670d95c65e..45669c0b664 100644 checkAnswer( // "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -index 29cb224c878..d16c22b73a9 100644 +index 29cb224c878..ee5a87fa200 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat @@ -2199,7 +2199,7 @@ index 29cb224c878..d16c22b73a9 100644 - test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") { + test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { val data = (1 to 1000).map { i => val ts = new java.sql.Timestamp(i) Row(ts) @@ -2219,7 +2219,7 @@ index 29cb224c878..d16c22b73a9 100644 - test("SPARK-34212 Parquet should read decimals correctly") { + test("SPARK-34212 Parquet should read decimals correctly", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { def readParquet(schema: String, path: File): DataFrame = { spark.read.schema(schema).parquet(path.toString) } @@ -2249,7 +2249,7 @@ index 29cb224c878..d16c22b73a9 100644 - test("row group skipping doesn't overflow when reading into larger type") { + test("row group skipping doesn't overflow when reading into larger type", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { withTempPath { path => Seq(0).toDF("a").write.parquet(path.toString) // The vectorized and non-vectorized readers will produce different exceptions, we don't need @@ -2358,7 +2358,7 @@ index 5c0b7def039..151184bc98c 100644 assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size, s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala -index bf5c51b89bb..6ff4d1ba18b 100644 +index bf5c51b89bb..4e2f0bdb389 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -27,6 +27,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName @@ -2385,7 +2385,7 @@ index bf5c51b89bb..6ff4d1ba18b 100644 - test("schema mismatch failure error message for parquet vectorized reader") { + test("schema mismatch failure error message for parquet vectorized reader", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { withTempPath { dir => val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true) assert(e.getCause.isInstanceOf[SparkException]) @@ -2395,7 +2395,7 @@ index bf5c51b89bb..6ff4d1ba18b 100644 - test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array") { + test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { import testImplicits._ withTempPath { dir => From 725159bc598046b1b113a2221c9123fb4b5ec461 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 22 Mar 2026 11:41:31 -0600 Subject: [PATCH 8/9] fix: replace all #3311 references with specific issues in 3.4.3 diff MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - static scan metrics → #3442 (DPP not supported in native_datafusion) - caseSensitive and SPARK-25207 duplicate fields → #3760 (new issue) - Python UDF filter pushdown → fixed by adding CometNativeScanExec pattern matches, removed IgnoreCometNativeDataFusion tag --- dev/diffs/3.4.3.diff | 35 ++-- dev/diffs/4.0.1.diff | 456 +++++++++++++++++++++++++++++++++++-------- 2 files changed, 389 insertions(+), 102 deletions(-) diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff index 6eb23f759e..53807e4ce7 100644 --- a/dev/diffs/3.4.3.diff +++ b/dev/diffs/3.4.3.diff @@ -417,7 +417,7 @@ index daef11ae4d6..9f3cc9181f2 100644 assert(exchanges.size == 2) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala -index f33432ddb6f..81b83a7acd1 100644 +index f33432ddb6f..7d758d2481f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen @@ -484,7 +484,7 @@ index f33432ddb6f..81b83a7acd1 100644 test("static scan metrics", - DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) { + DisableAdaptiveExecution("DPP in AQE must reuse broadcast"), -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3442")) { withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") { @@ -523,7 +523,7 @@ index a6b295578d6..91acca4306f 100644 test("SPARK-35884: Explain Formatted") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala -index 2796b1cf154..db2d16798a3 100644 +index 2796b1cf154..d628f44e4ee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.TestingUDT.{IntervalUDT, NullData, NullUDT} @@ -540,7 +540,7 @@ index 2796b1cf154..db2d16798a3 100644 Seq("parquet", "orc").foreach { format => - test(s"Spark native readers should respect spark.sql.caseSensitive - ${format}") { + test(s"Spark native readers should respect spark.sql.caseSensitive - ${format}", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3760")) { withTempDir { dir => val tableName = s"spark_25132_${format}_native" val tableDir = dir.getCanonicalPath + s"/$tableName" @@ -2003,7 +2003,7 @@ index 07e2849ce6f..3e73645b638 100644 ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala -index 104b4e416cd..8cc41d15051 100644 +index 104b4e416cd..d865077684f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -38,6 +38,7 @@ import org.apache.parquet.schema.MessageType @@ -2089,7 +2089,7 @@ index 104b4e416cd..8cc41d15051 100644 - test("SPARK-25207: exception when duplicate fields in case-insensitive mode") { + test("SPARK-25207: exception when duplicate fields in case-insensitive mode", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3760")) { withTempPath { dir => val count = 10 val tableName = "spark_25207" @@ -2448,7 +2448,7 @@ index 26e61c6b58d..cb09d7e116a 100644 spark.range(10).selectExpr("id", "id % 3 as p") .write.partitionBy("p").saveAsTable("testDataForScan") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala -index 0ab8691801d..f1c4b3d92b1 100644 +index 0ab8691801d..a681cb0aa52 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala @@ -18,6 +18,8 @@ @@ -2460,29 +2460,21 @@ index 0ab8691801d..f1c4b3d92b1 100644 import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan, SparkPlanTest} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan -@@ -93,7 +95,8 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { - assert(arrowEvalNodes.size == 2) - } - -- test("Python UDF should not break column pruning/filter pushdown -- Parquet V1") { -+ test("Python UDF should not break column pruning/filter pushdown -- Parquet V1", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { - withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { - withTempPath { f => - spark.range(10).select($"id".as("a"), $"id".as("b")) -@@ -108,6 +111,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -108,6 +110,8 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: FileSourceScanExec => scan + case scan: CometScanExec => scan ++ case scan: CometNativeScanExec => scan } assert(scanNodes.length == 1) assert(scanNodes.head.output.map(_.name) == Seq("a")) -@@ -120,11 +124,16 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -120,11 +124,18 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: FileSourceScanExec => scan + case scan: CometScanExec => scan ++ case scan: CometNativeScanExec => scan } assert(scanNodes.length == 1) // $"a" is not null and $"a" > 1 @@ -2491,13 +2483,14 @@ index 0ab8691801d..f1c4b3d92b1 100644 + val dataFilters = scanNodes.head match { + case scan: FileSourceScanExec => scan.dataFilters + case scan: CometScanExec => scan.dataFilters ++ case scan: CometNativeScanExec => scan.dataFilters + } + assert(dataFilters.length == 2) + assert(dataFilters.flatMap(_.references.map(_.name)).distinct == Seq("a")) } } } -@@ -145,6 +154,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -145,6 +156,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: BatchScanExec => scan @@ -2505,7 +2498,7 @@ index 0ab8691801d..f1c4b3d92b1 100644 } assert(scanNodes.length == 1) assert(scanNodes.head.output.map(_.name) == Seq("a")) -@@ -157,6 +167,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -157,6 +169,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: BatchScanExec => scan diff --git a/dev/diffs/4.0.1.diff b/dev/diffs/4.0.1.diff index a41ff3bbd3..407807cac5 100644 --- a/dev/diffs/4.0.1.diff +++ b/dev/diffs/4.0.1.diff @@ -39,7 +39,7 @@ index 6c51bd4ff2e..e72ec1d26e2 100644 withSpark(sc) { sc => TestUtils.waitUntilExecutorsUp(sc, 2, 60000) diff --git a/pom.xml b/pom.xml -index 22922143fc3..7c56e5e8641 100644 +index 22922143fc3..97332f7e6ac 100644 --- a/pom.xml +++ b/pom.xml @@ -148,6 +148,8 @@ @@ -574,7 +574,7 @@ index 81713c777bc..b5f92ed9742 100644 assert(exchanges.size == 2) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala -index 2c24cc7d570..21d36ebc6f5 100644 +index 2c24cc7d570..3311e6e3773 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen @@ -615,7 +615,17 @@ index 2c24cc7d570..21d36ebc6f5 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { withTable("large", "dimTwo", "dimThree") { -@@ -1215,7 +1221,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1151,7 +1157,8 @@ abstract class DynamicPartitionPruningSuiteBase + } + } + +- test("join key with multiple references on the filtering plan") { ++ test("join key with multiple references on the filtering plan", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728")) { + withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true", + SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> AQEPropagateEmptyRelation.ruleName, + SQLConf.ANSI_ENABLED.key -> "false" // ANSI mode doesn't support "String + String" +@@ -1215,7 +1222,8 @@ abstract class DynamicPartitionPruningSuiteBase } test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " + @@ -625,7 +635,15 @@ index 2c24cc7d570..21d36ebc6f5 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { val df = sql( -@@ -1424,7 +1431,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1330,6 +1338,7 @@ abstract class DynamicPartitionPruningSuiteBase + } + + test("Subquery reuse across the whole plan", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728"), + DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) { + withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", + SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", +@@ -1424,7 +1433,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -635,7 +653,7 @@ index 2c24cc7d570..21d36ebc6f5 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { val df = sql( """ WITH v as ( -@@ -1455,7 +1463,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1455,7 +1465,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -645,7 +663,17 @@ index 2c24cc7d570..21d36ebc6f5 100644 val df = sql( """ |SELECT s.store_id, f.product_id -@@ -1730,6 +1739,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat +@@ -1699,7 +1710,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat + * Check the static scan metrics with and without DPP + */ + test("static scan metrics", +- DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) { ++ DisableAdaptiveExecution("DPP in AQE must reuse broadcast"), ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", + SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", + SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") { +@@ -1730,6 +1742,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat case s: BatchScanExec => // we use f1 col for v2 tables due to schema pruning s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1"))) @@ -680,18 +708,51 @@ index 9c90e0105a4..fadf2f0f698 100644 test("SPARK-35884: Explain Formatted") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala -index 9c529d14221..2f1bc3880fd 100644 +index 9c529d14221..6cfd87ad864 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala -@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha +@@ -33,6 +33,8 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha import org.apache.spark.sql.catalyst.expressions.IntegralLiteralTestUtils.{negativeInt, positiveInt} import org.apache.spark.sql.catalyst.plans.logical.Filter import org.apache.spark.sql.catalyst.types.DataTypeUtils -+import org.apache.spark.sql.comet.{CometBatchScanExec, CometScanExec, CometSortMergeJoinExec} ++import org.apache.spark.sql.catalyst.util.quietly ++import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec, CometScanExec, CometSortMergeJoinExec} import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.datasources.FilePartition -@@ -967,6 +968,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -203,7 +205,11 @@ class FileBasedDataSourceSuite extends QueryTest + } + + allFileBasedDataSources.foreach { format => +- testQuietly(s"Enabling/disabling ignoreMissingFiles using $format") { ++ val ignoreMissingTags: Seq[org.scalatest.Tag] = if (format == "parquet") { ++ Seq(IgnoreCometNativeDataFusion( ++ "https://github.com/apache/datafusion-comet/issues/3728")) ++ } else Seq.empty ++ test(s"Enabling/disabling ignoreMissingFiles using $format", ignoreMissingTags: _*) { quietly { + def testIgnoreMissingFiles(options: Map[String, String]): Unit = { + withTempDir { dir => + val basePath = dir.getCanonicalPath +@@ -263,7 +269,7 @@ class FileBasedDataSourceSuite extends QueryTest + } + } + } +- } ++ }} + } + + Seq("json", "orc").foreach { format => +@@ -651,7 +657,8 @@ class FileBasedDataSourceSuite extends QueryTest + } + + Seq("parquet", "orc").foreach { format => +- test(s"Spark native readers should respect spark.sql.caseSensitive - ${format}") { ++ test(s"Spark native readers should respect spark.sql.caseSensitive - ${format}", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + withTempDir { dir => + val tableName = s"spark_25132_${format}_native" + val tableDir = dir.getCanonicalPath + s"/$tableName" +@@ -967,6 +974,7 @@ class FileBasedDataSourceSuite extends QueryTest assert(bJoinExec.isEmpty) val smJoinExec = collect(joinedDF.queryExecution.executedPlan) { case smJoin: SortMergeJoinExec => smJoin @@ -699,7 +760,7 @@ index 9c529d14221..2f1bc3880fd 100644 } assert(smJoinExec.nonEmpty) } -@@ -1027,6 +1029,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1027,6 +1035,7 @@ class FileBasedDataSourceSuite extends QueryTest val fileScan = df.queryExecution.executedPlan collectFirst { case BatchScanExec(_, f: FileScan, _, _, _, _) => f @@ -707,7 +768,7 @@ index 9c529d14221..2f1bc3880fd 100644 } assert(fileScan.nonEmpty) assert(fileScan.get.partitionFilters.nonEmpty) -@@ -1068,6 +1071,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1068,6 +1077,7 @@ class FileBasedDataSourceSuite extends QueryTest val fileScan = df.queryExecution.executedPlan collectFirst { case BatchScanExec(_, f: FileScan, _, _, _, _) => f @@ -715,11 +776,22 @@ index 9c529d14221..2f1bc3880fd 100644 } assert(fileScan.nonEmpty) assert(fileScan.get.partitionFilters.isEmpty) -@@ -1252,6 +1256,8 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1241,7 +1251,8 @@ class FileBasedDataSourceSuite extends QueryTest + } + } + +- test("SPARK-41017: filter pushdown with nondeterministic predicates") { ++ test("SPARK-41017: filter pushdown with nondeterministic predicates", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728")) { + withTempPath { path => + val pathStr = path.getCanonicalPath + spark.range(10).write.parquet(pathStr) +@@ -1252,6 +1263,9 @@ class FileBasedDataSourceSuite extends QueryTest val filters = df.queryExecution.executedPlan.collect { case f: FileSourceScanLike => f.dataFilters case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters + case b: CometScanExec => b.dataFilters ++ case b: CometNativeScanExec => b.dataFilters + case b: CometBatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters }.flatten assert(filters.contains(GreaterThan(scan.logicalPlan.output.head, Literal(5L)))) @@ -1253,10 +1325,10 @@ index 0df7f806272..92390bd819f 100644 test("non-matching optional group") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala -index 2e33f6505ab..47fa031add5 100644 +index 2e33f6505ab..d0f84e8c44d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala -@@ -23,10 +23,11 @@ import org.apache.spark.SparkRuntimeException +@@ -23,12 +23,14 @@ import org.apache.spark.SparkRuntimeException import org.apache.spark.sql.catalyst.expressions.SubqueryExpression import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Join, LogicalPlan, Project, Sort, Union} @@ -1268,8 +1340,11 @@ index 2e33f6505ab..47fa031add5 100644 +import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, BroadcastNestedLoopJoinExec} import org.apache.spark.sql.internal.SQLConf ++import org.apache.spark.sql.IgnoreCometNativeDataFusion import org.apache.spark.sql.test.SharedSparkSession -@@ -1529,6 +1530,12 @@ class SubquerySuite extends QueryTest + + class SubquerySuite extends QueryTest +@@ -1529,6 +1531,12 @@ class SubquerySuite extends QueryTest fs.inputRDDs().forall( _.asInstanceOf[FileScanRDD].filePartitions.forall( _.files.forall(_.urlEncodedPath.contains("p=0")))) @@ -1282,7 +1357,17 @@ index 2e33f6505ab..47fa031add5 100644 case _ => false }) } -@@ -2094,7 +2101,7 @@ class SubquerySuite extends QueryTest +@@ -2035,7 +2043,8 @@ class SubquerySuite extends QueryTest + } + } + +- test("Subquery reuse across the whole plan") { ++ test("Subquery reuse across the whole plan", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728")) { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", + SQLConf.OPTIMIZE_ONE_ROW_RELATION_SUBQUERY.key -> "false") { + val df = sql( +@@ -2094,7 +2103,7 @@ class SubquerySuite extends QueryTest df.collect() val exchanges = collect(df.queryExecution.executedPlan) { @@ -1291,7 +1376,13 @@ index 2e33f6505ab..47fa031add5 100644 } assert(exchanges.size === 1) } -@@ -2678,18 +2685,26 @@ class SubquerySuite extends QueryTest +@@ -2674,22 +2683,31 @@ class SubquerySuite extends QueryTest + } + } + +- test("SPARK-43402: FileSourceScanExec supports push down data filter with scalar subquery") { ++ test("SPARK-43402: FileSourceScanExec supports push down data filter with scalar subquery", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728")) { def checkFileSourceScan(query: String, answer: Seq[Row]): Unit = { val df = sql(query) checkAnswer(df, answer) @@ -1821,6 +1912,20 @@ index 47679ed7865..9ffbaecb98e 100644 }.length == hashAggCount) assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s }.length == sortAggCount) } +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala +index 77a988f340e..1acc534064e 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala +@@ -1061,7 +1061,8 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { + } + } + +- test("alter temporary view should follow current storeAnalyzedPlanForView config") { ++ test("alter temporary view should follow current storeAnalyzedPlanForView config", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728")) { + withTable("t") { + Seq(2, 3, 1).toDF("c1").write.format("parquet").saveAsTable("t") + withView("v1") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala index aed11badb71..1a365b5aacf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala @@ -2512,22 +2617,23 @@ index 272be70f9fe..06957694002 100644 assert(collect(initialExecutedPlan) { case i: InMemoryTableScanLike => i diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala -index 0a0b23d1e60..5685926250f 100644 +index 0a0b23d1e60..dcc9c141315 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.Concat import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.logical.Expand import org.apache.spark.sql.catalyst.types.DataTypeUtils -+import org.apache.spark.sql.comet.CometScanExec ++import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec} import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.functions._ -@@ -868,6 +869,7 @@ abstract class SchemaPruningSuite +@@ -868,6 +869,8 @@ abstract class SchemaPruningSuite val fileSourceScanSchemata = collect(df.queryExecution.executedPlan) { case scan: FileSourceScanExec => scan.requiredSchema + case scan: CometScanExec => scan.requiredSchema ++ case scan: CometNativeScanExec => scan.requiredSchema } assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size, s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " + @@ -2621,10 +2727,18 @@ index cd6f41b4ef4..4b6a17344bc 100644 ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala -index 6080a5e8e4b..9aa8f49a62b 100644 +index 6080a5e8e4b..dc64436164f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala -@@ -1102,7 +1102,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -38,6 +38,7 @@ import org.apache.parquet.schema.MessageType + + import org.apache.spark.{SparkConf, SparkException, SparkRuntimeException} + import org.apache.spark.sql._ ++import org.apache.spark.sql.{IgnoreCometNativeDataFusion, IgnoreCometNativeScan} + import org.apache.spark.sql.catalyst.dsl.expressions._ + import org.apache.spark.sql.catalyst.expressions._ + import org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints +@@ -1102,7 +1103,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared // When a filter is pushed to Parquet, Parquet can apply it to every row. // So, we can check the number of rows returned from the Parquet // to make sure our filter pushdown work. @@ -2637,7 +2751,7 @@ index 6080a5e8e4b..9aa8f49a62b 100644 } } } -@@ -1505,7 +1509,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1505,7 +1510,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2647,7 +2761,7 @@ index 6080a5e8e4b..9aa8f49a62b 100644 import testImplicits._ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true", -@@ -1587,7 +1592,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1587,7 +1593,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared // than the total length but should not be a single record. // Note that, if record level filtering is enabled, it should be a single record. // If no filter is pushed down to Parquet, it should be the total length of data. @@ -2660,7 +2774,7 @@ index 6080a5e8e4b..9aa8f49a62b 100644 } } } -@@ -1614,7 +1623,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1614,7 +1624,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared // than the total length but should not be a single record. // Note that, if record level filtering is enabled, it should be a single record. // If no filter is pushed down to Parquet, it should be the total length of data. @@ -2673,7 +2787,7 @@ index 6080a5e8e4b..9aa8f49a62b 100644 } } } -@@ -1706,7 +1719,7 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1706,7 +1720,7 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared (attr, value) => sources.StringContains(attr, value)) } @@ -2682,7 +2796,7 @@ index 6080a5e8e4b..9aa8f49a62b 100644 import testImplicits._ // keep() should take effect on StartsWith/EndsWith/Contains Seq( -@@ -1750,7 +1763,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1750,7 +1764,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2692,7 +2806,17 @@ index 6080a5e8e4b..9aa8f49a62b 100644 val schema = StructType(Seq( StructField("a", IntegerType, nullable = false) )) -@@ -1993,7 +2007,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1940,7 +1955,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared + } + } + +- test("SPARK-25207: exception when duplicate fields in case-insensitive mode") { ++ test("SPARK-25207: exception when duplicate fields in case-insensitive mode", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + withTempPath { dir => + val count = 10 + val tableName = "spark_25207" +@@ -1993,7 +2009,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2702,7 +2826,7 @@ index 6080a5e8e4b..9aa8f49a62b 100644 // block 1: // null count min max // page-0 0 0 99 -@@ -2053,7 +2068,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -2053,7 +2070,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2712,7 +2836,7 @@ index 6080a5e8e4b..9aa8f49a62b 100644 withTempPath { dir => val path = dir.getCanonicalPath spark.range(100).selectExpr("id * 2 AS id") -@@ -2305,7 +2321,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite { +@@ -2305,7 +2323,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite { assert(pushedParquetFilters.exists(_.getClass === filterClass), s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") @@ -2725,7 +2849,7 @@ index 6080a5e8e4b..9aa8f49a62b 100644 } else { assert(selectedFilters.isEmpty, "There is filter pushed down") } -@@ -2368,7 +2388,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite { +@@ -2368,7 +2390,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite { assert(pushedParquetFilters.exists(_.getClass === filterClass), s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") @@ -2739,10 +2863,28 @@ index 6080a5e8e4b..9aa8f49a62b 100644 case _ => assert(false, "Can not match ParquetTable in the query.") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -index 4474ec1fd42..97910c4fc3a 100644 +index 4474ec1fd42..05fa0257c82 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -@@ -1344,7 +1344,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession +@@ -39,6 +39,7 @@ import org.apache.parquet.schema.{MessageType, MessageTypeParser} + + import org.apache.spark.{SPARK_VERSION_SHORT, SparkException, TestUtils} + import org.apache.spark.sql._ ++import org.apache.spark.sql.IgnoreCometNativeDataFusion + import org.apache.spark.sql.catalyst.InternalRow + import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow} + import org.apache.spark.sql.catalyst.util.DateTimeUtils +@@ -1059,7 +1060,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession + } + } + +- test("SPARK-35640: read binary as timestamp should throw schema incompatible error") { ++ test("SPARK-35640: read binary as timestamp should throw schema incompatible error", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { + val data = (1 to 4).map(i => Tuple1(i.toString)) + val readSchema = StructType(Seq(StructField("_1", DataTypes.TimestampType))) + +@@ -1344,7 +1346,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } @@ -2753,10 +2895,38 @@ index 4474ec1fd42..97910c4fc3a 100644 checkAnswer( // "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -index bba71f1c48d..38c60ee2584 100644 +index bba71f1c48d..0b52574e0f3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -@@ -996,7 +996,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat + + import org.apache.spark.{DebugFilesystem, SparkConf, SparkException} + import org.apache.spark.sql._ ++import org.apache.spark.sql.IgnoreCometNativeDataFusion + import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} + import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow + import org.apache.spark.sql.catalyst.util.ArrayData +@@ -185,7 +186,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS + } + } + +- test("SPARK-47447: read TimestampLTZ as TimestampNTZ") { ++ test("SPARK-47447: read TimestampLTZ as TimestampNTZ", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { + val providedSchema = StructType(Seq(StructField("time", TimestampNTZType, false))) + + Seq("INT96", "TIMESTAMP_MICROS", "TIMESTAMP_MILLIS").foreach { tsType => +@@ -318,7 +320,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS + } + } + +- test("Enabling/disabling ignoreCorruptFiles") { ++ test("Enabling/disabling ignoreCorruptFiles", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728")) { + def testIgnoreCorruptFiles(options: Map[String, String]): Unit = { + withTempDir { dir => + val basePath = dir.getCanonicalPath +@@ -996,7 +999,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS Seq(Some("A"), Some("A"), None).toDF().repartition(1) .write.parquet(path.getAbsolutePath) val df = spark.read.parquet(path.getAbsolutePath) @@ -2769,7 +2939,17 @@ index bba71f1c48d..38c60ee2584 100644 } } } -@@ -1060,7 +1064,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1042,7 +1049,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS + testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96") + } + +- test("SPARK-34212 Parquet should read decimals correctly") { ++ test("SPARK-34212 Parquet should read decimals correctly", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { + def readParquet(schema: String, path: File): DataFrame = { + spark.read.schema(schema).parquet(path.toString) + } +@@ -1060,7 +1068,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS checkAnswer(readParquet(schema2, path), df) } @@ -2779,7 +2959,7 @@ index bba71f1c48d..38c60ee2584 100644 val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)" checkAnswer(readParquet(schema1, path), df) val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)" -@@ -1084,7 +1089,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1084,7 +1093,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, CAST('1.2' AS BINARY) d") df.write.parquet(path.toString) @@ -2789,6 +2969,16 @@ index bba71f1c48d..38c60ee2584 100644 checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("a DECIMAL(11, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null)) +@@ -1131,7 +1141,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS + } + } + +- test("row group skipping doesn't overflow when reading into larger type") { ++ test("row group skipping doesn't overflow when reading into larger type", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { + withTempPath { path => + Seq(0).toDF("a").write.parquet(path.toString) + withAllParquetReaders { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala index 30503af0fab..1491f4bc2d5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala @@ -2909,7 +3099,7 @@ index 5c0b7def039..151184bc98c 100644 assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size, s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala -index 0acb21f3e6f..3a7bb73f03c 100644 +index 0acb21f3e6f..1f9c3fd13fc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -27,7 +27,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName @@ -2917,7 +3107,7 @@ index 0acb21f3e6f..3a7bb73f03c 100644 import org.apache.spark.SparkException -import org.apache.spark.sql.{AnalysisException, Row} -+import org.apache.spark.sql.{AnalysisException, IgnoreComet, Row} ++import org.apache.spark.sql.{AnalysisException, IgnoreComet, IgnoreCometNativeDataFusion, Row} import org.apache.spark.sql.catalyst.expressions.Cast.toSQLType import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException import org.apache.spark.sql.functions.desc @@ -2931,10 +3121,39 @@ index 0acb21f3e6f..3a7bb73f03c 100644 withTempPath { dir => val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = false) val expectedMessage = "Encountered error while reading file" +@@ -1046,7 +1047,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest { + } + } + +- test("schema mismatch failure error message for parquet vectorized reader") { ++ test("schema mismatch failure error message for parquet vectorized reader", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { + withTempPath { dir => + val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true) + assert(e.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException]) +@@ -1079,7 +1081,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest { + } + } + +- test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array") { ++ test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { + import testImplicits._ + + withTempPath { dir => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala -index 09ed6955a51..236a4e99824 100644 +index 09ed6955a51..6f9174c9545 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala +@@ -24,7 +24,7 @@ import org.apache.parquet.format.converter.ParquetMetadataConverter + import org.apache.parquet.hadoop.{ParquetFileReader, ParquetOutputFormat} + + import org.apache.spark.SparkException +-import org.apache.spark.sql.{DataFrame, QueryTest, Row} ++import org.apache.spark.sql.{DataFrame, IgnoreCometNativeDataFusion, QueryTest, Row} + import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper + import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException + import org.apache.spark.sql.functions.col @@ -65,7 +65,9 @@ class ParquetTypeWideningSuite withClue( s"with dictionary encoding '$dictionaryEnabled' with timestamp rebase mode " + @@ -2955,7 +3174,7 @@ index 09ed6955a51..236a4e99824 100644 } } -@@ -190,7 +192,8 @@ class ParquetTypeWideningSuite +@@ -190,10 +192,16 @@ class ParquetTypeWideningSuite (Seq("1", "2", Short.MinValue.toString), ShortType, DoubleType), (Seq("1", "2", Int.MinValue.toString), IntegerType, DoubleType), (Seq("1.23", "10.34"), FloatType, DoubleType), @@ -2963,8 +3182,67 @@ index 09ed6955a51..236a4e99824 100644 + // TODO: Comet cannot handle older than "1582-10-15" + (Seq("2020-01-01", "2020-01-02"/* , "1312-02-27" */), DateType, TimestampNTZType) ) ++ wideningTags: Seq[org.scalatest.Tag] = ++ if (fromType == DateType && toType == TimestampNTZType) { ++ Seq(IgnoreCometNativeDataFusion( ++ "https://github.com/apache/datafusion-comet/issues/3728")) ++ } else Seq.empty } - test(s"parquet widening conversion $fromType -> $toType") { +- test(s"parquet widening conversion $fromType -> $toType") { ++ test(s"parquet widening conversion $fromType -> $toType", wideningTags: _*) { + checkAllParquetReaders(values, fromType, toType, expectError = false) + } + +@@ -231,7 +239,8 @@ class ParquetTypeWideningSuite + (Seq("2020-01-01", "2020-01-02", "1312-02-27"), DateType, TimestampType) + ) + } +- test(s"unsupported parquet conversion $fromType -> $toType") { ++ test(s"unsupported parquet conversion $fromType -> $toType", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728")) { + checkAllParquetReaders(values, fromType, toType, expectError = true) + } + +@@ -257,7 +266,8 @@ class ParquetTypeWideningSuite + (Seq("1", "2"), LongType, DecimalType(LongDecimal.precision, 1)) + ) + } +- test(s"unsupported parquet conversion $fromType -> $toType") { ++ test(s"unsupported parquet conversion $fromType -> $toType", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728")) { + checkAllParquetReaders(values, fromType, toType, + expectError = + // parquet-mr allows reading decimals into a smaller precision decimal type without +@@ -271,7 +281,8 @@ class ParquetTypeWideningSuite + (Seq("2020-01-01", "2020-01-02", "1312-02-27"), TimestampNTZType, DateType)) + outputTimestampType <- ParquetOutputTimestampType.values + } +- test(s"unsupported parquet timestamp conversion $fromType ($outputTimestampType) -> $toType") { ++ test(s"unsupported parquet timestamp conversion $fromType ($outputTimestampType) -> $toType", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728")) { + withSQLConf( + SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> outputTimestampType.toString, + SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> LegacyBehaviorPolicy.CORRECTED.toString +@@ -291,7 +302,8 @@ class ParquetTypeWideningSuite + Seq(7 -> 5, 10 -> 5, 20 -> 5, 12 -> 10, 20 -> 10, 22 -> 20) + } + test( +- s"parquet decimal precision change Decimal($fromPrecision, 2) -> Decimal($toPrecision, 2)") { ++ s"parquet decimal precision change Decimal($fromPrecision, 2) -> Decimal($toPrecision, 2)", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728")) { + checkAllParquetReaders( + values = Seq("1.23", "10.34"), + fromType = DecimalType(fromPrecision, 2), +@@ -322,7 +334,8 @@ class ParquetTypeWideningSuite + Seq((5, 2) -> (6, 4), (10, 4) -> (12, 7), (20, 5) -> (22, 8)) + } + test(s"parquet decimal precision and scale change Decimal($fromPrecision, $fromScale) -> " + +- s"Decimal($toPrecision, $toScale)" ++ s"Decimal($toPrecision, $toScale)", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728") + ) { + checkAllParquetReaders( + values = Seq("1.23", "10.34"), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala index 458b5dfc0f4..d209f3c85bc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala @@ -3038,18 +3316,29 @@ index 0dd90925d3c..7d53ec845ef 100644 spark.range(10).selectExpr("id", "id % 3 as p") .write.partitionBy("p").saveAsTable("testDataForScan") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala -index 0ab8691801d..d9125f658ad 100644 +index 0ab8691801d..f1c4b3d92b1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala -@@ -18,6 +18,7 @@ +@@ -18,6 +18,8 @@ package org.apache.spark.sql.execution.python import org.apache.spark.sql.catalyst.plans.logical.{ArrowEvalPython, BatchEvalPython, Limit, LocalLimit} ++import org.apache.spark.sql.IgnoreCometNativeDataFusion +import org.apache.spark.sql.comet._ import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan, SparkPlanTest} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan -@@ -108,6 +109,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -93,7 +95,8 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { + assert(arrowEvalNodes.size == 2) + } + +- test("Python UDF should not break column pruning/filter pushdown -- Parquet V1") { ++ test("Python UDF should not break column pruning/filter pushdown -- Parquet V1", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { + withTempPath { f => + spark.range(10).select($"id".as("a"), $"id".as("b")) +@@ -108,6 +111,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: FileSourceScanExec => scan @@ -3057,7 +3346,7 @@ index 0ab8691801d..d9125f658ad 100644 } assert(scanNodes.length == 1) assert(scanNodes.head.output.map(_.name) == Seq("a")) -@@ -120,11 +122,16 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -120,11 +124,16 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: FileSourceScanExec => scan @@ -3076,7 +3365,7 @@ index 0ab8691801d..d9125f658ad 100644 } } } -@@ -145,6 +152,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -145,6 +154,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: BatchScanExec => scan @@ -3084,7 +3373,7 @@ index 0ab8691801d..d9125f658ad 100644 } assert(scanNodes.length == 1) assert(scanNodes.head.output.map(_.name) == Seq("a")) -@@ -157,6 +165,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -157,6 +167,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: BatchScanExec => scan @@ -3495,7 +3784,7 @@ index 86c4e49f6f6..2e639e5f38d 100644 val tblTargetName = "tbl_target" val tblSourceQualified = s"default.$tblSourceName" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala -index f0f3f94b811..c9d0ecfec41 100644 +index f0f3f94b811..f77b54dcef9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -27,13 +27,14 @@ import scala.jdk.CollectionConverters._ @@ -3522,37 +3811,42 @@ index f0f3f94b811..c9d0ecfec41 100644 import org.apache.spark.sql.execution.FilterExec import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution import org.apache.spark.sql.execution.datasources.DataSourceUtils -@@ -128,7 +130,28 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with - } - } - } else { -- super.test(testName, testTags: _*)(testFun) -+ if (isCometEnabled && testTags.exists(_.isInstanceOf[IgnoreComet])) { -+ ignore(testName + " (disabled when Comet is on)", testTags: _*)(testFun) -+ } else { -+ val cometScanImpl = CometConf.COMET_NATIVE_SCAN_IMPL.get(conf) -+ val isNativeIcebergCompat = cometScanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT || -+ cometScanImpl == CometConf.SCAN_AUTO -+ val isNativeDataFusion = cometScanImpl == CometConf.SCAN_NATIVE_DATAFUSION || -+ cometScanImpl == CometConf.SCAN_AUTO -+ if (isCometEnabled && isNativeIcebergCompat && -+ testTags.exists(_.isInstanceOf[IgnoreCometNativeIcebergCompat])) { -+ ignore(testName + " (disabled for NATIVE_ICEBERG_COMPAT)", testTags: _*)(testFun) -+ } else if (isCometEnabled && isNativeDataFusion && -+ testTags.exists(_.isInstanceOf[IgnoreCometNativeDataFusion])) { -+ ignore(testName + " (disabled for NATIVE_DATAFUSION)", testTags: _*)(testFun) -+ } else if (isCometEnabled && (isNativeDataFusion || isNativeIcebergCompat) && -+ testTags.exists(_.isInstanceOf[IgnoreCometNativeScan])) { -+ ignore(testName + " (disabled for NATIVE_DATAFUSION and NATIVE_ICEBERG_COMPAT)", -+ testTags: _*)(testFun) -+ } else { -+ super.test(testName, testTags: _*)(testFun) -+ } -+ } - } - } +@@ -121,6 +123,34 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with -@@ -248,8 +271,24 @@ private[sql] trait SQLTestUtilsBase + override protected def test(testName: String, testTags: Tag*)(testFun: => Any) + (implicit pos: Position): Unit = { ++ // Check Comet skip tags first, before DisableAdaptiveExecution handling ++ if (isCometEnabled && testTags.exists(_.isInstanceOf[IgnoreComet])) { ++ ignore(testName + " (disabled when Comet is on)", testTags: _*)(testFun) ++ return ++ } ++ if (isCometEnabled) { ++ val cometScanImpl = CometConf.COMET_NATIVE_SCAN_IMPL.get(conf) ++ val isNativeIcebergCompat = cometScanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT || ++ cometScanImpl == CometConf.SCAN_AUTO ++ val isNativeDataFusion = cometScanImpl == CometConf.SCAN_NATIVE_DATAFUSION || ++ cometScanImpl == CometConf.SCAN_AUTO ++ if (isNativeIcebergCompat && ++ testTags.exists(_.isInstanceOf[IgnoreCometNativeIcebergCompat])) { ++ ignore(testName + " (disabled for NATIVE_ICEBERG_COMPAT)", testTags: _*)(testFun) ++ return ++ } ++ if (isNativeDataFusion && ++ testTags.exists(_.isInstanceOf[IgnoreCometNativeDataFusion])) { ++ ignore(testName + " (disabled for NATIVE_DATAFUSION)", testTags: _*)(testFun) ++ return ++ } ++ if ((isNativeDataFusion || isNativeIcebergCompat) && ++ testTags.exists(_.isInstanceOf[IgnoreCometNativeScan])) { ++ ignore(testName + " (disabled for NATIVE_DATAFUSION and NATIVE_ICEBERG_COMPAT)", ++ testTags: _*)(testFun) ++ return ++ } ++ } + if (testTags.exists(_.isInstanceOf[DisableAdaptiveExecution])) { + super.test(testName, testTags: _*) { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { +@@ -248,8 +278,24 @@ private[sql] trait SQLTestUtilsBase override protected def converter: ColumnNodeToExpressionConverter = self.spark.converter } @@ -3577,7 +3871,7 @@ index f0f3f94b811..c9d0ecfec41 100644 super.withSQLConf(pairs: _*)(f) } -@@ -451,6 +490,8 @@ private[sql] trait SQLTestUtilsBase +@@ -451,6 +497,8 @@ private[sql] trait SQLTestUtilsBase val schema = df.schema val withoutFilters = df.queryExecution.executedPlan.transform { case FilterExec(_, child) => child From 6e3ce77b514ff59a3063fa58d619a222ab75c9a8 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 22 Mar 2026 11:56:14 -0600 Subject: [PATCH 9/9] fix: remove unused IgnoreCometNativeDataFusion import in ExtractPythonUDFsSuite --- dev/diffs/3.4.3.diff | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff index 53807e4ce7..e403285369 100644 --- a/dev/diffs/3.4.3.diff +++ b/dev/diffs/3.4.3.diff @@ -2448,19 +2448,18 @@ index 26e61c6b58d..cb09d7e116a 100644 spark.range(10).selectExpr("id", "id % 3 as p") .write.partitionBy("p").saveAsTable("testDataForScan") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala -index 0ab8691801d..a681cb0aa52 100644 +index 0ab8691801d..b18a5bea944 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala -@@ -18,6 +18,8 @@ +@@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.python import org.apache.spark.sql.catalyst.plans.logical.{ArrowEvalPython, BatchEvalPython, Limit, LocalLimit} -+import org.apache.spark.sql.IgnoreCometNativeDataFusion +import org.apache.spark.sql.comet._ import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan, SparkPlanTest} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan -@@ -108,6 +110,8 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -108,6 +109,8 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: FileSourceScanExec => scan @@ -2469,7 +2468,7 @@ index 0ab8691801d..a681cb0aa52 100644 } assert(scanNodes.length == 1) assert(scanNodes.head.output.map(_.name) == Seq("a")) -@@ -120,11 +124,18 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -120,11 +123,18 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: FileSourceScanExec => scan @@ -2490,7 +2489,7 @@ index 0ab8691801d..a681cb0aa52 100644 } } } -@@ -145,6 +156,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -145,6 +155,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: BatchScanExec => scan @@ -2498,7 +2497,7 @@ index 0ab8691801d..a681cb0aa52 100644 } assert(scanNodes.length == 1) assert(scanNodes.head.output.map(_.name) == Seq("a")) -@@ -157,6 +169,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -157,6 +168,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: BatchScanExec => scan