From ec7c384e450bd42243b745d4f6e7a310a0bb9e25 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 18 Mar 2026 06:33:23 -0600 Subject: [PATCH 1/8] feat: run Spark 4.0 SQL tests with native_datafusion scan Add native_datafusion scan-impl matrix entry for Spark 4.0 in spark_sql_test.yml and update 4.0.1.diff to ignore tests that fail with native_datafusion scan (same tests as Spark 3.4). --- .github/workflows/spark_sql_test.yml | 3 + dev/diffs/4.0.1.diff | 191 ++++++++++++++++++++++----- 2 files changed, 161 insertions(+), 33 deletions(-) diff --git a/.github/workflows/spark_sql_test.yml b/.github/workflows/spark_sql_test.yml index 4d777cda87..4123fe1755 100644 --- a/.github/workflows/spark_sql_test.yml +++ b/.github/workflows/spark_sql_test.yml @@ -129,10 +129,13 @@ jobs: - {spark-short: '3.5', spark-full: '3.5.8', java: 11, scan-impl: 'auto'} - {spark-short: '3.5', spark-full: '3.5.8', java: 11, scan-impl: 'native_datafusion'} - {spark-short: '4.0', spark-full: '4.0.1', java: 17, scan-impl: 'auto'} + - {spark-short: '4.0', spark-full: '4.0.1', java: 17, scan-impl: 'native_datafusion'} # Skip sql_hive-1 for Spark 4.0 due to https://github.com/apache/datafusion-comet/issues/2946 exclude: - config: {spark-short: '4.0', spark-full: '4.0.1', java: 17, scan-impl: 'auto'} module: {name: "sql_hive-1", args1: "", args2: "hive/testOnly * -- -l org.apache.spark.tags.ExtendedHiveTest -l org.apache.spark.tags.SlowHiveTest"} + - config: {spark-short: '4.0', spark-full: '4.0.1', java: 17, scan-impl: 'native_datafusion'} + module: {name: "sql_hive-1", args1: "", args2: "hive/testOnly * -- -l org.apache.spark.tags.ExtendedHiveTest -l org.apache.spark.tags.SlowHiveTest"} fail-fast: false name: spark-sql-${{ matrix.config.scan-impl }}-${{ matrix.module.name }}/spark-${{ matrix.config.spark-full }} runs-on: ${{ matrix.os }} diff --git a/dev/diffs/4.0.1.diff b/dev/diffs/4.0.1.diff index a41ff3bbd3..1dfbeaf343 100644 --- a/dev/diffs/4.0.1.diff +++ b/dev/diffs/4.0.1.diff @@ -39,7 +39,7 @@ index 6c51bd4ff2e..e72ec1d26e2 100644 withSpark(sc) { sc => TestUtils.waitUntilExecutorsUp(sc, 2, 60000) diff --git a/pom.xml b/pom.xml -index 22922143fc3..7c56e5e8641 100644 +index 22922143fc3..97332f7e6ac 100644 --- a/pom.xml +++ b/pom.xml @@ -148,6 +148,8 @@ @@ -574,7 +574,7 @@ index 81713c777bc..b5f92ed9742 100644 assert(exchanges.size == 2) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala -index 2c24cc7d570..21d36ebc6f5 100644 +index 2c24cc7d570..6738395bdae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen @@ -645,7 +645,17 @@ index 2c24cc7d570..21d36ebc6f5 100644 val df = sql( """ |SELECT s.store_id, f.product_id -@@ -1730,6 +1739,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat +@@ -1699,7 +1708,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat + * Check the static scan metrics with and without DPP + */ + test("static scan metrics", +- DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) { ++ DisableAdaptiveExecution("DPP in AQE must reuse broadcast"), ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", + SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", + SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") { +@@ -1730,6 +1740,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat case s: BatchScanExec => // we use f1 col for v2 tables due to schema pruning s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1"))) @@ -680,7 +690,7 @@ index 9c90e0105a4..fadf2f0f698 100644 test("SPARK-35884: Explain Formatted") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala -index 9c529d14221..2f1bc3880fd 100644 +index 9c529d14221..85de73e4b96 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha @@ -691,7 +701,17 @@ index 9c529d14221..2f1bc3880fd 100644 import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.datasources.FilePartition -@@ -967,6 +968,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -651,7 +652,8 @@ class FileBasedDataSourceSuite extends QueryTest + } + + Seq("parquet", "orc").foreach { format => +- test(s"Spark native readers should respect spark.sql.caseSensitive - ${format}") { ++ test(s"Spark native readers should respect spark.sql.caseSensitive - ${format}", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + withTempDir { dir => + val tableName = s"spark_25132_${format}_native" + val tableDir = dir.getCanonicalPath + s"/$tableName" +@@ -967,6 +969,7 @@ class FileBasedDataSourceSuite extends QueryTest assert(bJoinExec.isEmpty) val smJoinExec = collect(joinedDF.queryExecution.executedPlan) { case smJoin: SortMergeJoinExec => smJoin @@ -699,7 +719,7 @@ index 9c529d14221..2f1bc3880fd 100644 } assert(smJoinExec.nonEmpty) } -@@ -1027,6 +1029,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1027,6 +1030,7 @@ class FileBasedDataSourceSuite extends QueryTest val fileScan = df.queryExecution.executedPlan collectFirst { case BatchScanExec(_, f: FileScan, _, _, _, _) => f @@ -707,7 +727,7 @@ index 9c529d14221..2f1bc3880fd 100644 } assert(fileScan.nonEmpty) assert(fileScan.get.partitionFilters.nonEmpty) -@@ -1068,6 +1071,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1068,6 +1072,7 @@ class FileBasedDataSourceSuite extends QueryTest val fileScan = df.queryExecution.executedPlan collectFirst { case BatchScanExec(_, f: FileScan, _, _, _, _) => f @@ -715,7 +735,7 @@ index 9c529d14221..2f1bc3880fd 100644 } assert(fileScan.nonEmpty) assert(fileScan.get.partitionFilters.isEmpty) -@@ -1252,6 +1256,8 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1252,6 +1257,8 @@ class FileBasedDataSourceSuite extends QueryTest val filters = df.queryExecution.executedPlan.collect { case f: FileSourceScanLike => f.dataFilters case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters @@ -2621,10 +2641,18 @@ index cd6f41b4ef4..4b6a17344bc 100644 ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala -index 6080a5e8e4b..9aa8f49a62b 100644 +index 6080a5e8e4b..dc64436164f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala -@@ -1102,7 +1102,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -38,6 +38,7 @@ import org.apache.parquet.schema.MessageType + + import org.apache.spark.{SparkConf, SparkException, SparkRuntimeException} + import org.apache.spark.sql._ ++import org.apache.spark.sql.{IgnoreCometNativeDataFusion, IgnoreCometNativeScan} + import org.apache.spark.sql.catalyst.dsl.expressions._ + import org.apache.spark.sql.catalyst.expressions._ + import org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints +@@ -1102,7 +1103,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared // When a filter is pushed to Parquet, Parquet can apply it to every row. // So, we can check the number of rows returned from the Parquet // to make sure our filter pushdown work. @@ -2637,7 +2665,7 @@ index 6080a5e8e4b..9aa8f49a62b 100644 } } } -@@ -1505,7 +1509,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1505,7 +1510,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2647,7 +2675,7 @@ index 6080a5e8e4b..9aa8f49a62b 100644 import testImplicits._ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true", -@@ -1587,7 +1592,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1587,7 +1593,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared // than the total length but should not be a single record. // Note that, if record level filtering is enabled, it should be a single record. // If no filter is pushed down to Parquet, it should be the total length of data. @@ -2660,7 +2688,7 @@ index 6080a5e8e4b..9aa8f49a62b 100644 } } } -@@ -1614,7 +1623,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1614,7 +1624,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared // than the total length but should not be a single record. // Note that, if record level filtering is enabled, it should be a single record. // If no filter is pushed down to Parquet, it should be the total length of data. @@ -2673,7 +2701,7 @@ index 6080a5e8e4b..9aa8f49a62b 100644 } } } -@@ -1706,7 +1719,7 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1706,7 +1720,7 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared (attr, value) => sources.StringContains(attr, value)) } @@ -2682,7 +2710,7 @@ index 6080a5e8e4b..9aa8f49a62b 100644 import testImplicits._ // keep() should take effect on StartsWith/EndsWith/Contains Seq( -@@ -1750,7 +1763,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1750,7 +1764,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2692,7 +2720,17 @@ index 6080a5e8e4b..9aa8f49a62b 100644 val schema = StructType(Seq( StructField("a", IntegerType, nullable = false) )) -@@ -1993,7 +2007,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -1940,7 +1955,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared + } + } + +- test("SPARK-25207: exception when duplicate fields in case-insensitive mode") { ++ test("SPARK-25207: exception when duplicate fields in case-insensitive mode", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + withTempPath { dir => + val count = 10 + val tableName = "spark_25207" +@@ -1993,7 +2009,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2702,7 +2740,7 @@ index 6080a5e8e4b..9aa8f49a62b 100644 // block 1: // null count min max // page-0 0 0 99 -@@ -2053,7 +2068,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared +@@ -2053,7 +2070,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } @@ -2712,7 +2750,7 @@ index 6080a5e8e4b..9aa8f49a62b 100644 withTempPath { dir => val path = dir.getCanonicalPath spark.range(100).selectExpr("id * 2 AS id") -@@ -2305,7 +2321,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite { +@@ -2305,7 +2323,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite { assert(pushedParquetFilters.exists(_.getClass === filterClass), s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") @@ -2725,7 +2763,7 @@ index 6080a5e8e4b..9aa8f49a62b 100644 } else { assert(selectedFilters.isEmpty, "There is filter pushed down") } -@@ -2368,7 +2388,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite { +@@ -2368,7 +2390,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite { assert(pushedParquetFilters.exists(_.getClass === filterClass), s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.") @@ -2739,10 +2777,28 @@ index 6080a5e8e4b..9aa8f49a62b 100644 case _ => assert(false, "Can not match ParquetTable in the query.") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -index 4474ec1fd42..97910c4fc3a 100644 +index 4474ec1fd42..533d4c28dab 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -@@ -1344,7 +1344,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession +@@ -39,6 +39,7 @@ import org.apache.parquet.schema.{MessageType, MessageTypeParser} + + import org.apache.spark.{SPARK_VERSION_SHORT, SparkException, TestUtils} + import org.apache.spark.sql._ ++import org.apache.spark.sql.IgnoreCometNativeDataFusion + import org.apache.spark.sql.catalyst.InternalRow + import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow} + import org.apache.spark.sql.catalyst.util.DateTimeUtils +@@ -1059,7 +1060,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession + } + } + +- test("SPARK-35640: read binary as timestamp should throw schema incompatible error") { ++ test("SPARK-35640: read binary as timestamp should throw schema incompatible error", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + val data = (1 to 4).map(i => Tuple1(i.toString)) + val readSchema = StructType(Seq(StructField("_1", DataTypes.TimestampType))) + +@@ -1344,7 +1346,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } @@ -2753,10 +2809,28 @@ index 4474ec1fd42..97910c4fc3a 100644 checkAnswer( // "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -index bba71f1c48d..38c60ee2584 100644 +index bba71f1c48d..59e4d003966 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -@@ -996,7 +996,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat + + import org.apache.spark.{DebugFilesystem, SparkConf, SparkException} + import org.apache.spark.sql._ ++import org.apache.spark.sql.IgnoreCometNativeDataFusion + import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} + import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow + import org.apache.spark.sql.catalyst.util.ArrayData +@@ -185,7 +186,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS + } + } + +- test("SPARK-47447: read TimestampLTZ as TimestampNTZ") { ++ test("SPARK-47447: read TimestampLTZ as TimestampNTZ", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + val providedSchema = StructType(Seq(StructField("time", TimestampNTZType, false))) + + Seq("INT96", "TIMESTAMP_MICROS", "TIMESTAMP_MILLIS").foreach { tsType => +@@ -996,7 +998,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS Seq(Some("A"), Some("A"), None).toDF().repartition(1) .write.parquet(path.getAbsolutePath) val df = spark.read.parquet(path.getAbsolutePath) @@ -2769,7 +2843,17 @@ index bba71f1c48d..38c60ee2584 100644 } } } -@@ -1060,7 +1064,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1042,7 +1048,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS + testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96") + } + +- test("SPARK-34212 Parquet should read decimals correctly") { ++ test("SPARK-34212 Parquet should read decimals correctly", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + def readParquet(schema: String, path: File): DataFrame = { + spark.read.schema(schema).parquet(path.toString) + } +@@ -1060,7 +1067,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS checkAnswer(readParquet(schema2, path), df) } @@ -2779,7 +2863,7 @@ index bba71f1c48d..38c60ee2584 100644 val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)" checkAnswer(readParquet(schema1, path), df) val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)" -@@ -1084,7 +1089,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1084,7 +1092,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, CAST('1.2' AS BINARY) d") df.write.parquet(path.toString) @@ -2789,6 +2873,16 @@ index bba71f1c48d..38c60ee2584 100644 checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("a DECIMAL(11, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null)) +@@ -1131,7 +1140,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS + } + } + +- test("row group skipping doesn't overflow when reading into larger type") { ++ test("row group skipping doesn't overflow when reading into larger type", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + withTempPath { path => + Seq(0).toDF("a").write.parquet(path.toString) + withAllParquetReaders { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala index 30503af0fab..1491f4bc2d5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala @@ -2909,7 +3003,7 @@ index 5c0b7def039..151184bc98c 100644 assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size, s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala -index 0acb21f3e6f..3a7bb73f03c 100644 +index 0acb21f3e6f..0192c6b48f5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -27,7 +27,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName @@ -2917,7 +3011,7 @@ index 0acb21f3e6f..3a7bb73f03c 100644 import org.apache.spark.SparkException -import org.apache.spark.sql.{AnalysisException, Row} -+import org.apache.spark.sql.{AnalysisException, IgnoreComet, Row} ++import org.apache.spark.sql.{AnalysisException, IgnoreComet, IgnoreCometNativeDataFusion, Row} import org.apache.spark.sql.catalyst.expressions.Cast.toSQLType import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException import org.apache.spark.sql.functions.desc @@ -2931,6 +3025,26 @@ index 0acb21f3e6f..3a7bb73f03c 100644 withTempPath { dir => val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = false) val expectedMessage = "Encountered error while reading file" +@@ -1046,7 +1047,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest { + } + } + +- test("schema mismatch failure error message for parquet vectorized reader") { ++ test("schema mismatch failure error message for parquet vectorized reader", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + withTempPath { dir => + val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true) + assert(e.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException]) +@@ -1079,7 +1081,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest { + } + } + +- test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array") { ++ test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + import testImplicits._ + + withTempPath { dir => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala index 09ed6955a51..236a4e99824 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala @@ -3038,18 +3152,29 @@ index 0dd90925d3c..7d53ec845ef 100644 spark.range(10).selectExpr("id", "id % 3 as p") .write.partitionBy("p").saveAsTable("testDataForScan") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala -index 0ab8691801d..d9125f658ad 100644 +index 0ab8691801d..f1c4b3d92b1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala -@@ -18,6 +18,7 @@ +@@ -18,6 +18,8 @@ package org.apache.spark.sql.execution.python import org.apache.spark.sql.catalyst.plans.logical.{ArrowEvalPython, BatchEvalPython, Limit, LocalLimit} ++import org.apache.spark.sql.IgnoreCometNativeDataFusion +import org.apache.spark.sql.comet._ import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan, SparkPlanTest} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan -@@ -108,6 +109,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -93,7 +95,8 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { + assert(arrowEvalNodes.size == 2) + } + +- test("Python UDF should not break column pruning/filter pushdown -- Parquet V1") { ++ test("Python UDF should not break column pruning/filter pushdown -- Parquet V1", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { + withTempPath { f => + spark.range(10).select($"id".as("a"), $"id".as("b")) +@@ -108,6 +111,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: FileSourceScanExec => scan @@ -3057,7 +3182,7 @@ index 0ab8691801d..d9125f658ad 100644 } assert(scanNodes.length == 1) assert(scanNodes.head.output.map(_.name) == Seq("a")) -@@ -120,11 +122,16 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -120,11 +124,16 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: FileSourceScanExec => scan @@ -3076,7 +3201,7 @@ index 0ab8691801d..d9125f658ad 100644 } } } -@@ -145,6 +152,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -145,6 +154,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: BatchScanExec => scan @@ -3084,7 +3209,7 @@ index 0ab8691801d..d9125f658ad 100644 } assert(scanNodes.length == 1) assert(scanNodes.head.output.map(_.name) == Seq("a")) -@@ -157,6 +165,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -157,6 +167,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: BatchScanExec => scan From 14d65e0105447f981e6539259881a81a8c222d5d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 18 Mar 2026 10:48:08 -0600 Subject: [PATCH 2/8] fix: add CometNativeScanExec support and ignore failing tests in 4.0.1 diff Add missing CometNativeScanExec pattern matches to SchemaPruningSuite and FileBasedDataSourceSuite, fixing all 183 ParquetV1SchemaPruningSuite failures. Tag remaining incompatible tests with IgnoreCometNativeDataFusion. --- dev/diffs/4.0.1.diff | 215 +++++++++++++++++++++++++++++++++++++------ 1 file changed, 185 insertions(+), 30 deletions(-) diff --git a/dev/diffs/4.0.1.diff b/dev/diffs/4.0.1.diff index 1dfbeaf343..2556372454 100644 --- a/dev/diffs/4.0.1.diff +++ b/dev/diffs/4.0.1.diff @@ -574,7 +574,7 @@ index 81713c777bc..b5f92ed9742 100644 assert(exchanges.size == 2) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala -index 2c24cc7d570..6738395bdae 100644 +index 2c24cc7d570..638f946353a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen @@ -615,7 +615,17 @@ index 2c24cc7d570..6738395bdae 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { withTable("large", "dimTwo", "dimThree") { -@@ -1215,7 +1221,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1151,7 +1157,8 @@ abstract class DynamicPartitionPruningSuiteBase + } + } + +- test("join key with multiple references on the filtering plan") { ++ test("join key with multiple references on the filtering plan", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728")) { + withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true", + SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> AQEPropagateEmptyRelation.ruleName, + SQLConf.ANSI_ENABLED.key -> "false" // ANSI mode doesn't support "String + String" +@@ -1215,7 +1222,8 @@ abstract class DynamicPartitionPruningSuiteBase } test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " + @@ -625,7 +635,7 @@ index 2c24cc7d570..6738395bdae 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { val df = sql( -@@ -1424,7 +1431,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1424,7 +1432,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -635,7 +645,7 @@ index 2c24cc7d570..6738395bdae 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { val df = sql( """ WITH v as ( -@@ -1455,7 +1463,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1455,7 +1464,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -645,7 +655,7 @@ index 2c24cc7d570..6738395bdae 100644 val df = sql( """ |SELECT s.store_id, f.product_id -@@ -1699,7 +1708,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat +@@ -1699,7 +1709,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat * Check the static scan metrics with and without DPP */ test("static scan metrics", @@ -655,7 +665,7 @@ index 2c24cc7d570..6738395bdae 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") { -@@ -1730,6 +1740,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat +@@ -1730,6 +1741,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat case s: BatchScanExec => // we use f1 col for v2 tables due to schema pruning s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1"))) @@ -690,18 +700,40 @@ index 9c90e0105a4..fadf2f0f698 100644 test("SPARK-35884: Explain Formatted") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala -index 9c529d14221..85de73e4b96 100644 +index 9c529d14221..d2fe6b14ac2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha import org.apache.spark.sql.catalyst.expressions.IntegralLiteralTestUtils.{negativeInt, positiveInt} import org.apache.spark.sql.catalyst.plans.logical.Filter import org.apache.spark.sql.catalyst.types.DataTypeUtils -+import org.apache.spark.sql.comet.{CometBatchScanExec, CometScanExec, CometSortMergeJoinExec} ++import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec, CometScanExec, CometSortMergeJoinExec} import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.datasources.FilePartition -@@ -651,7 +652,8 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -203,7 +204,11 @@ class FileBasedDataSourceSuite extends QueryTest + } + + allFileBasedDataSources.foreach { format => +- testQuietly(s"Enabling/disabling ignoreMissingFiles using $format") { ++ val ignoreMissingTags: Seq[org.scalatest.Tag] = if (format == "parquet") { ++ Seq(IgnoreCometNativeDataFusion( ++ "https://github.com/apache/datafusion-comet/issues/3728")) ++ } else Seq.empty ++ test(s"Enabling/disabling ignoreMissingFiles using $format", ignoreMissingTags: _*) { quietly { + def testIgnoreMissingFiles(options: Map[String, String]): Unit = { + withTempDir { dir => + val basePath = dir.getCanonicalPath +@@ -263,7 +268,7 @@ class FileBasedDataSourceSuite extends QueryTest + } + } + } +- } ++ }} + } + + Seq("json", "orc").foreach { format => +@@ -651,7 +656,8 @@ class FileBasedDataSourceSuite extends QueryTest } Seq("parquet", "orc").foreach { format => @@ -711,7 +743,7 @@ index 9c529d14221..85de73e4b96 100644 withTempDir { dir => val tableName = s"spark_25132_${format}_native" val tableDir = dir.getCanonicalPath + s"/$tableName" -@@ -967,6 +969,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -967,6 +973,7 @@ class FileBasedDataSourceSuite extends QueryTest assert(bJoinExec.isEmpty) val smJoinExec = collect(joinedDF.queryExecution.executedPlan) { case smJoin: SortMergeJoinExec => smJoin @@ -719,7 +751,7 @@ index 9c529d14221..85de73e4b96 100644 } assert(smJoinExec.nonEmpty) } -@@ -1027,6 +1030,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1027,6 +1034,7 @@ class FileBasedDataSourceSuite extends QueryTest val fileScan = df.queryExecution.executedPlan collectFirst { case BatchScanExec(_, f: FileScan, _, _, _, _) => f @@ -727,7 +759,7 @@ index 9c529d14221..85de73e4b96 100644 } assert(fileScan.nonEmpty) assert(fileScan.get.partitionFilters.nonEmpty) -@@ -1068,6 +1072,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1068,6 +1076,7 @@ class FileBasedDataSourceSuite extends QueryTest val fileScan = df.queryExecution.executedPlan collectFirst { case BatchScanExec(_, f: FileScan, _, _, _, _) => f @@ -735,11 +767,22 @@ index 9c529d14221..85de73e4b96 100644 } assert(fileScan.nonEmpty) assert(fileScan.get.partitionFilters.isEmpty) -@@ -1252,6 +1257,8 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1241,7 +1250,8 @@ class FileBasedDataSourceSuite extends QueryTest + } + } + +- test("SPARK-41017: filter pushdown with nondeterministic predicates") { ++ test("SPARK-41017: filter pushdown with nondeterministic predicates", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728")) { + withTempPath { path => + val pathStr = path.getCanonicalPath + spark.range(10).write.parquet(pathStr) +@@ -1252,6 +1262,9 @@ class FileBasedDataSourceSuite extends QueryTest val filters = df.queryExecution.executedPlan.collect { case f: FileSourceScanLike => f.dataFilters case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters + case b: CometScanExec => b.dataFilters ++ case b: CometNativeScanExec => b.dataFilters + case b: CometBatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters }.flatten assert(filters.contains(GreaterThan(scan.logicalPlan.output.head, Literal(5L)))) @@ -1273,10 +1316,10 @@ index 0df7f806272..92390bd819f 100644 test("non-matching optional group") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala -index 2e33f6505ab..47fa031add5 100644 +index 2e33f6505ab..d0f84e8c44d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala -@@ -23,10 +23,11 @@ import org.apache.spark.SparkRuntimeException +@@ -23,12 +23,14 @@ import org.apache.spark.SparkRuntimeException import org.apache.spark.sql.catalyst.expressions.SubqueryExpression import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Join, LogicalPlan, Project, Sort, Union} @@ -1288,8 +1331,11 @@ index 2e33f6505ab..47fa031add5 100644 +import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, BroadcastNestedLoopJoinExec} import org.apache.spark.sql.internal.SQLConf ++import org.apache.spark.sql.IgnoreCometNativeDataFusion import org.apache.spark.sql.test.SharedSparkSession -@@ -1529,6 +1530,12 @@ class SubquerySuite extends QueryTest + + class SubquerySuite extends QueryTest +@@ -1529,6 +1531,12 @@ class SubquerySuite extends QueryTest fs.inputRDDs().forall( _.asInstanceOf[FileScanRDD].filePartitions.forall( _.files.forall(_.urlEncodedPath.contains("p=0")))) @@ -1302,7 +1348,17 @@ index 2e33f6505ab..47fa031add5 100644 case _ => false }) } -@@ -2094,7 +2101,7 @@ class SubquerySuite extends QueryTest +@@ -2035,7 +2043,8 @@ class SubquerySuite extends QueryTest + } + } + +- test("Subquery reuse across the whole plan") { ++ test("Subquery reuse across the whole plan", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728")) { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", + SQLConf.OPTIMIZE_ONE_ROW_RELATION_SUBQUERY.key -> "false") { + val df = sql( +@@ -2094,7 +2103,7 @@ class SubquerySuite extends QueryTest df.collect() val exchanges = collect(df.queryExecution.executedPlan) { @@ -1311,7 +1367,13 @@ index 2e33f6505ab..47fa031add5 100644 } assert(exchanges.size === 1) } -@@ -2678,18 +2685,26 @@ class SubquerySuite extends QueryTest +@@ -2674,22 +2683,31 @@ class SubquerySuite extends QueryTest + } + } + +- test("SPARK-43402: FileSourceScanExec supports push down data filter with scalar subquery") { ++ test("SPARK-43402: FileSourceScanExec supports push down data filter with scalar subquery", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728")) { def checkFileSourceScan(query: String, answer: Seq[Row]): Unit = { val df = sql(query) checkAnswer(df, answer) @@ -1841,6 +1903,20 @@ index 47679ed7865..9ffbaecb98e 100644 }.length == hashAggCount) assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s }.length == sortAggCount) } +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala +index 77a988f340e..1acc534064e 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala +@@ -1061,7 +1061,8 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { + } + } + +- test("alter temporary view should follow current storeAnalyzedPlanForView config") { ++ test("alter temporary view should follow current storeAnalyzedPlanForView config", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728")) { + withTable("t") { + Seq(2, 3, 1).toDF("c1").write.format("parquet").saveAsTable("t") + withView("v1") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala index aed11badb71..1a365b5aacf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala @@ -2532,22 +2608,23 @@ index 272be70f9fe..06957694002 100644 assert(collect(initialExecutedPlan) { case i: InMemoryTableScanLike => i diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala -index 0a0b23d1e60..5685926250f 100644 +index 0a0b23d1e60..dcc9c141315 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.Concat import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.logical.Expand import org.apache.spark.sql.catalyst.types.DataTypeUtils -+import org.apache.spark.sql.comet.CometScanExec ++import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec} import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.functions._ -@@ -868,6 +869,7 @@ abstract class SchemaPruningSuite +@@ -868,6 +869,8 @@ abstract class SchemaPruningSuite val fileSourceScanSchemata = collect(df.queryExecution.executedPlan) { case scan: FileSourceScanExec => scan.requiredSchema + case scan: CometScanExec => scan.requiredSchema ++ case scan: CometNativeScanExec => scan.requiredSchema } assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size, s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " + @@ -2809,7 +2886,7 @@ index 4474ec1fd42..533d4c28dab 100644 checkAnswer( // "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -index bba71f1c48d..59e4d003966 100644 +index bba71f1c48d..eb0238b8d2a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat @@ -2830,7 +2907,17 @@ index bba71f1c48d..59e4d003966 100644 val providedSchema = StructType(Seq(StructField("time", TimestampNTZType, false))) Seq("INT96", "TIMESTAMP_MICROS", "TIMESTAMP_MILLIS").foreach { tsType => -@@ -996,7 +998,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -318,7 +320,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS + } + } + +- test("Enabling/disabling ignoreCorruptFiles") { ++ test("Enabling/disabling ignoreCorruptFiles", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728")) { + def testIgnoreCorruptFiles(options: Map[String, String]): Unit = { + withTempDir { dir => + val basePath = dir.getCanonicalPath +@@ -996,7 +999,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS Seq(Some("A"), Some("A"), None).toDF().repartition(1) .write.parquet(path.getAbsolutePath) val df = spark.read.parquet(path.getAbsolutePath) @@ -2843,7 +2930,7 @@ index bba71f1c48d..59e4d003966 100644 } } } -@@ -1042,7 +1048,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1042,7 +1049,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96") } @@ -2853,7 +2940,7 @@ index bba71f1c48d..59e4d003966 100644 def readParquet(schema: String, path: File): DataFrame = { spark.read.schema(schema).parquet(path.toString) } -@@ -1060,7 +1067,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1060,7 +1068,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS checkAnswer(readParquet(schema2, path), df) } @@ -2863,7 +2950,7 @@ index bba71f1c48d..59e4d003966 100644 val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)" checkAnswer(readParquet(schema1, path), df) val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)" -@@ -1084,7 +1092,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1084,7 +1093,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, CAST('1.2' AS BINARY) d") df.write.parquet(path.toString) @@ -2873,7 +2960,7 @@ index bba71f1c48d..59e4d003966 100644 checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("a DECIMAL(11, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null)) -@@ -1131,7 +1140,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1131,7 +1141,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS } } @@ -3046,9 +3133,18 @@ index 0acb21f3e6f..0192c6b48f5 100644 withTempPath { dir => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala -index 09ed6955a51..236a4e99824 100644 +index 09ed6955a51..6f9174c9545 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala +@@ -24,7 +24,7 @@ import org.apache.parquet.format.converter.ParquetMetadataConverter + import org.apache.parquet.hadoop.{ParquetFileReader, ParquetOutputFormat} + + import org.apache.spark.SparkException +-import org.apache.spark.sql.{DataFrame, QueryTest, Row} ++import org.apache.spark.sql.{DataFrame, IgnoreCometNativeDataFusion, QueryTest, Row} + import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper + import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException + import org.apache.spark.sql.functions.col @@ -65,7 +65,9 @@ class ParquetTypeWideningSuite withClue( s"with dictionary encoding '$dictionaryEnabled' with timestamp rebase mode " + @@ -3069,7 +3165,7 @@ index 09ed6955a51..236a4e99824 100644 } } -@@ -190,7 +192,8 @@ class ParquetTypeWideningSuite +@@ -190,10 +192,16 @@ class ParquetTypeWideningSuite (Seq("1", "2", Short.MinValue.toString), ShortType, DoubleType), (Seq("1", "2", Int.MinValue.toString), IntegerType, DoubleType), (Seq("1.23", "10.34"), FloatType, DoubleType), @@ -3077,8 +3173,67 @@ index 09ed6955a51..236a4e99824 100644 + // TODO: Comet cannot handle older than "1582-10-15" + (Seq("2020-01-01", "2020-01-02"/* , "1312-02-27" */), DateType, TimestampNTZType) ) ++ wideningTags: Seq[org.scalatest.Tag] = ++ if (fromType == DateType && toType == TimestampNTZType) { ++ Seq(IgnoreCometNativeDataFusion( ++ "https://github.com/apache/datafusion-comet/issues/3728")) ++ } else Seq.empty + } +- test(s"parquet widening conversion $fromType -> $toType") { ++ test(s"parquet widening conversion $fromType -> $toType", wideningTags: _*) { + checkAllParquetReaders(values, fromType, toType, expectError = false) + } + +@@ -231,7 +239,8 @@ class ParquetTypeWideningSuite + (Seq("2020-01-01", "2020-01-02", "1312-02-27"), DateType, TimestampType) + ) + } +- test(s"unsupported parquet conversion $fromType -> $toType") { ++ test(s"unsupported parquet conversion $fromType -> $toType", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728")) { + checkAllParquetReaders(values, fromType, toType, expectError = true) + } + +@@ -257,7 +266,8 @@ class ParquetTypeWideningSuite + (Seq("1", "2"), LongType, DecimalType(LongDecimal.precision, 1)) + ) } - test(s"parquet widening conversion $fromType -> $toType") { +- test(s"unsupported parquet conversion $fromType -> $toType") { ++ test(s"unsupported parquet conversion $fromType -> $toType", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728")) { + checkAllParquetReaders(values, fromType, toType, + expectError = + // parquet-mr allows reading decimals into a smaller precision decimal type without +@@ -271,7 +281,8 @@ class ParquetTypeWideningSuite + (Seq("2020-01-01", "2020-01-02", "1312-02-27"), TimestampNTZType, DateType)) + outputTimestampType <- ParquetOutputTimestampType.values + } +- test(s"unsupported parquet timestamp conversion $fromType ($outputTimestampType) -> $toType") { ++ test(s"unsupported parquet timestamp conversion $fromType ($outputTimestampType) -> $toType", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728")) { + withSQLConf( + SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> outputTimestampType.toString, + SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> LegacyBehaviorPolicy.CORRECTED.toString +@@ -291,7 +302,8 @@ class ParquetTypeWideningSuite + Seq(7 -> 5, 10 -> 5, 20 -> 5, 12 -> 10, 20 -> 10, 22 -> 20) + } + test( +- s"parquet decimal precision change Decimal($fromPrecision, 2) -> Decimal($toPrecision, 2)") { ++ s"parquet decimal precision change Decimal($fromPrecision, 2) -> Decimal($toPrecision, 2)", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728")) { + checkAllParquetReaders( + values = Seq("1.23", "10.34"), + fromType = DecimalType(fromPrecision, 2), +@@ -322,7 +334,8 @@ class ParquetTypeWideningSuite + Seq((5, 2) -> (6, 4), (10, 4) -> (12, 7), (20, 5) -> (22, 8)) + } + test(s"parquet decimal precision and scale change Decimal($fromPrecision, $fromScale) -> " + +- s"Decimal($toPrecision, $toScale)" ++ s"Decimal($toPrecision, $toScale)", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728") + ) { + checkAllParquetReaders( + values = Seq("1.23", "10.34"), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala index 458b5dfc0f4..d209f3c85bc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala From 6f0cd2bb7b0949ead849f0f371cdb52ddde18ad0 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 18 Mar 2026 14:12:44 -0600 Subject: [PATCH 3/8] fix: add missing quietly import in 4.0.1 diff FileBasedDataSourceSuite --- dev/diffs/4.0.1.diff | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/dev/diffs/4.0.1.diff b/dev/diffs/4.0.1.diff index 2556372454..92319ff7c0 100644 --- a/dev/diffs/4.0.1.diff +++ b/dev/diffs/4.0.1.diff @@ -700,18 +700,19 @@ index 9c90e0105a4..fadf2f0f698 100644 test("SPARK-35884: Explain Formatted") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala -index 9c529d14221..d2fe6b14ac2 100644 +index 9c529d14221..6cfd87ad864 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala -@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha +@@ -33,6 +33,8 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha import org.apache.spark.sql.catalyst.expressions.IntegralLiteralTestUtils.{negativeInt, positiveInt} import org.apache.spark.sql.catalyst.plans.logical.Filter import org.apache.spark.sql.catalyst.types.DataTypeUtils ++import org.apache.spark.sql.catalyst.util.quietly +import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec, CometScanExec, CometSortMergeJoinExec} import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.datasources.FilePartition -@@ -203,7 +204,11 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -203,7 +205,11 @@ class FileBasedDataSourceSuite extends QueryTest } allFileBasedDataSources.foreach { format => @@ -724,7 +725,7 @@ index 9c529d14221..d2fe6b14ac2 100644 def testIgnoreMissingFiles(options: Map[String, String]): Unit = { withTempDir { dir => val basePath = dir.getCanonicalPath -@@ -263,7 +268,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -263,7 +269,7 @@ class FileBasedDataSourceSuite extends QueryTest } } } @@ -733,7 +734,7 @@ index 9c529d14221..d2fe6b14ac2 100644 } Seq("json", "orc").foreach { format => -@@ -651,7 +656,8 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -651,7 +657,8 @@ class FileBasedDataSourceSuite extends QueryTest } Seq("parquet", "orc").foreach { format => @@ -743,7 +744,7 @@ index 9c529d14221..d2fe6b14ac2 100644 withTempDir { dir => val tableName = s"spark_25132_${format}_native" val tableDir = dir.getCanonicalPath + s"/$tableName" -@@ -967,6 +973,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -967,6 +974,7 @@ class FileBasedDataSourceSuite extends QueryTest assert(bJoinExec.isEmpty) val smJoinExec = collect(joinedDF.queryExecution.executedPlan) { case smJoin: SortMergeJoinExec => smJoin @@ -751,7 +752,7 @@ index 9c529d14221..d2fe6b14ac2 100644 } assert(smJoinExec.nonEmpty) } -@@ -1027,6 +1034,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1027,6 +1035,7 @@ class FileBasedDataSourceSuite extends QueryTest val fileScan = df.queryExecution.executedPlan collectFirst { case BatchScanExec(_, f: FileScan, _, _, _, _) => f @@ -759,7 +760,7 @@ index 9c529d14221..d2fe6b14ac2 100644 } assert(fileScan.nonEmpty) assert(fileScan.get.partitionFilters.nonEmpty) -@@ -1068,6 +1076,7 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1068,6 +1077,7 @@ class FileBasedDataSourceSuite extends QueryTest val fileScan = df.queryExecution.executedPlan collectFirst { case BatchScanExec(_, f: FileScan, _, _, _, _) => f @@ -767,7 +768,7 @@ index 9c529d14221..d2fe6b14ac2 100644 } assert(fileScan.nonEmpty) assert(fileScan.get.partitionFilters.isEmpty) -@@ -1241,7 +1250,8 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1241,7 +1251,8 @@ class FileBasedDataSourceSuite extends QueryTest } } @@ -777,7 +778,7 @@ index 9c529d14221..d2fe6b14ac2 100644 withTempPath { path => val pathStr = path.getCanonicalPath spark.range(10).write.parquet(pathStr) -@@ -1252,6 +1262,9 @@ class FileBasedDataSourceSuite extends QueryTest +@@ -1252,6 +1263,9 @@ class FileBasedDataSourceSuite extends QueryTest val filters = df.queryExecution.executedPlan.collect { case f: FileSourceScanLike => f.dataFilters case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters From 63afbd60118237b3d38a0272d4adb7d918ab6ec4 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 22 Mar 2026 11:26:50 -0600 Subject: [PATCH 4/8] fix: check Comet ignore tags before DisableAdaptiveExecution in 4.0.1 diff The test override in SQLTestUtils checked DisableAdaptiveExecution first, so tests with both DisableAdaptiveExecution and IgnoreCometNativeDataFusion tags (like "static scan metrics") would enter the DAE branch and never check the ignore tag. Reorder to check Comet skip tags first with early returns, then handle DisableAdaptiveExecution. Also add IgnoreCometNativeDataFusion to "Subquery reuse across the whole plan" in DynamicPartitionPruningSuite which was missing the annotation. --- dev/diffs/4.0.1.diff | 87 +++++++++++++++++++++++++------------------- 1 file changed, 50 insertions(+), 37 deletions(-) diff --git a/dev/diffs/4.0.1.diff b/dev/diffs/4.0.1.diff index 92319ff7c0..b8cc4331b9 100644 --- a/dev/diffs/4.0.1.diff +++ b/dev/diffs/4.0.1.diff @@ -574,7 +574,7 @@ index 81713c777bc..b5f92ed9742 100644 assert(exchanges.size == 2) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala -index 2c24cc7d570..638f946353a 100644 +index 2c24cc7d570..3311e6e3773 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen @@ -635,7 +635,15 @@ index 2c24cc7d570..638f946353a 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { val df = sql( -@@ -1424,7 +1432,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1330,6 +1338,7 @@ abstract class DynamicPartitionPruningSuiteBase + } + + test("Subquery reuse across the whole plan", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3728"), + DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) { + withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", + SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", +@@ -1424,7 +1433,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -645,7 +653,7 @@ index 2c24cc7d570..638f946353a 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { val df = sql( """ WITH v as ( -@@ -1455,7 +1464,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1455,7 +1465,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -655,7 +663,7 @@ index 2c24cc7d570..638f946353a 100644 val df = sql( """ |SELECT s.store_id, f.product_id -@@ -1699,7 +1709,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat +@@ -1699,7 +1710,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat * Check the static scan metrics with and without DPP */ test("static scan metrics", @@ -665,7 +673,7 @@ index 2c24cc7d570..638f946353a 100644 withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") { -@@ -1730,6 +1741,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat +@@ -1730,6 +1742,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat case s: BatchScanExec => // we use f1 col for v2 tables due to schema pruning s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1"))) @@ -3776,7 +3784,7 @@ index 86c4e49f6f6..2e639e5f38d 100644 val tblTargetName = "tbl_target" val tblSourceQualified = s"default.$tblSourceName" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala -index f0f3f94b811..c9d0ecfec41 100644 +index f0f3f94b811..f77b54dcef9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -27,13 +27,14 @@ import scala.jdk.CollectionConverters._ @@ -3803,37 +3811,42 @@ index f0f3f94b811..c9d0ecfec41 100644 import org.apache.spark.sql.execution.FilterExec import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution import org.apache.spark.sql.execution.datasources.DataSourceUtils -@@ -128,7 +130,28 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with - } - } - } else { -- super.test(testName, testTags: _*)(testFun) -+ if (isCometEnabled && testTags.exists(_.isInstanceOf[IgnoreComet])) { -+ ignore(testName + " (disabled when Comet is on)", testTags: _*)(testFun) -+ } else { -+ val cometScanImpl = CometConf.COMET_NATIVE_SCAN_IMPL.get(conf) -+ val isNativeIcebergCompat = cometScanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT || -+ cometScanImpl == CometConf.SCAN_AUTO -+ val isNativeDataFusion = cometScanImpl == CometConf.SCAN_NATIVE_DATAFUSION || -+ cometScanImpl == CometConf.SCAN_AUTO -+ if (isCometEnabled && isNativeIcebergCompat && -+ testTags.exists(_.isInstanceOf[IgnoreCometNativeIcebergCompat])) { -+ ignore(testName + " (disabled for NATIVE_ICEBERG_COMPAT)", testTags: _*)(testFun) -+ } else if (isCometEnabled && isNativeDataFusion && -+ testTags.exists(_.isInstanceOf[IgnoreCometNativeDataFusion])) { -+ ignore(testName + " (disabled for NATIVE_DATAFUSION)", testTags: _*)(testFun) -+ } else if (isCometEnabled && (isNativeDataFusion || isNativeIcebergCompat) && -+ testTags.exists(_.isInstanceOf[IgnoreCometNativeScan])) { -+ ignore(testName + " (disabled for NATIVE_DATAFUSION and NATIVE_ICEBERG_COMPAT)", -+ testTags: _*)(testFun) -+ } else { -+ super.test(testName, testTags: _*)(testFun) -+ } -+ } - } - } +@@ -121,6 +123,34 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with -@@ -248,8 +271,24 @@ private[sql] trait SQLTestUtilsBase + override protected def test(testName: String, testTags: Tag*)(testFun: => Any) + (implicit pos: Position): Unit = { ++ // Check Comet skip tags first, before DisableAdaptiveExecution handling ++ if (isCometEnabled && testTags.exists(_.isInstanceOf[IgnoreComet])) { ++ ignore(testName + " (disabled when Comet is on)", testTags: _*)(testFun) ++ return ++ } ++ if (isCometEnabled) { ++ val cometScanImpl = CometConf.COMET_NATIVE_SCAN_IMPL.get(conf) ++ val isNativeIcebergCompat = cometScanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT || ++ cometScanImpl == CometConf.SCAN_AUTO ++ val isNativeDataFusion = cometScanImpl == CometConf.SCAN_NATIVE_DATAFUSION || ++ cometScanImpl == CometConf.SCAN_AUTO ++ if (isNativeIcebergCompat && ++ testTags.exists(_.isInstanceOf[IgnoreCometNativeIcebergCompat])) { ++ ignore(testName + " (disabled for NATIVE_ICEBERG_COMPAT)", testTags: _*)(testFun) ++ return ++ } ++ if (isNativeDataFusion && ++ testTags.exists(_.isInstanceOf[IgnoreCometNativeDataFusion])) { ++ ignore(testName + " (disabled for NATIVE_DATAFUSION)", testTags: _*)(testFun) ++ return ++ } ++ if ((isNativeDataFusion || isNativeIcebergCompat) && ++ testTags.exists(_.isInstanceOf[IgnoreCometNativeScan])) { ++ ignore(testName + " (disabled for NATIVE_DATAFUSION and NATIVE_ICEBERG_COMPAT)", ++ testTags: _*)(testFun) ++ return ++ } ++ } + if (testTags.exists(_.isInstanceOf[DisableAdaptiveExecution])) { + super.test(testName, testTags: _*) { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { +@@ -248,8 +278,24 @@ private[sql] trait SQLTestUtilsBase override protected def converter: ColumnNodeToExpressionConverter = self.spark.converter } @@ -3858,7 +3871,7 @@ index f0f3f94b811..c9d0ecfec41 100644 super.withSQLConf(pairs: _*)(f) } -@@ -451,6 +490,8 @@ private[sql] trait SQLTestUtilsBase +@@ -451,6 +497,8 @@ private[sql] trait SQLTestUtilsBase val schema = df.schema val withoutFilters = df.queryExecution.executedPlan.transform { case FilterExec(_, child) => child From 5ccfc3bd30944f07384681d485a9715b213a04bd Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 22 Mar 2026 11:29:22 -0600 Subject: [PATCH 5/8] fix: retag schema mismatch tests from #3311 to #3720 Retag 6 tests in 4.0.1 diff and 3 tests in 3.5.8 diff that are specifically about DataFusion not throwing errors for Parquet schema mismatches (the scope of #3720), rather than the broader schema incompatibility issue (#3311). --- dev/diffs/3.5.8.diff | 10 +++++----- dev/diffs/4.0.1.diff | 18 +++++++++--------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff index db495f1e23..121043e6d6 100644 --- a/dev/diffs/3.5.8.diff +++ b/dev/diffs/3.5.8.diff @@ -2189,7 +2189,7 @@ index 8ed9ef1630e..a865928c1b2 100644 checkAnswer( // "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -index f6472ba3d9d..7f00caf5063 100644 +index f6472ba3d9d..5ea2d938664 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -185,7 +185,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS @@ -2218,7 +2218,7 @@ index f6472ba3d9d..7f00caf5063 100644 - test("SPARK-34212 Parquet should read decimals correctly") { + test("SPARK-34212 Parquet should read decimals correctly", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { def readParquet(schema: String, path: File): DataFrame = { spark.read.schema(schema).parquet(path.toString) } @@ -2357,7 +2357,7 @@ index 5c0b7def039..151184bc98c 100644 assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size, s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala -index 3f47c5e506f..92a5eafec84 100644 +index 3f47c5e506f..f1ce3194279 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -27,6 +27,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName @@ -2384,7 +2384,7 @@ index 3f47c5e506f..92a5eafec84 100644 - test("schema mismatch failure error message for parquet vectorized reader") { + test("schema mismatch failure error message for parquet vectorized reader", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { withTempPath { dir => val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true) assert(e.getCause.isInstanceOf[SparkException]) @@ -2394,7 +2394,7 @@ index 3f47c5e506f..92a5eafec84 100644 - test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array") { + test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { import testImplicits._ withTempPath { dir => diff --git a/dev/diffs/4.0.1.diff b/dev/diffs/4.0.1.diff index b8cc4331b9..407807cac5 100644 --- a/dev/diffs/4.0.1.diff +++ b/dev/diffs/4.0.1.diff @@ -2863,7 +2863,7 @@ index 6080a5e8e4b..dc64436164f 100644 case _ => assert(false, "Can not match ParquetTable in the query.") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -index 4474ec1fd42..533d4c28dab 100644 +index 4474ec1fd42..05fa0257c82 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -39,6 +39,7 @@ import org.apache.parquet.schema.{MessageType, MessageTypeParser} @@ -2880,7 +2880,7 @@ index 4474ec1fd42..533d4c28dab 100644 - test("SPARK-35640: read binary as timestamp should throw schema incompatible error") { + test("SPARK-35640: read binary as timestamp should throw schema incompatible error", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { val data = (1 to 4).map(i => Tuple1(i.toString)) val readSchema = StructType(Seq(StructField("_1", DataTypes.TimestampType))) @@ -2895,7 +2895,7 @@ index 4474ec1fd42..533d4c28dab 100644 checkAnswer( // "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -index bba71f1c48d..eb0238b8d2a 100644 +index bba71f1c48d..0b52574e0f3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat @@ -2912,7 +2912,7 @@ index bba71f1c48d..eb0238b8d2a 100644 - test("SPARK-47447: read TimestampLTZ as TimestampNTZ") { + test("SPARK-47447: read TimestampLTZ as TimestampNTZ", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { val providedSchema = StructType(Seq(StructField("time", TimestampNTZType, false))) Seq("INT96", "TIMESTAMP_MICROS", "TIMESTAMP_MILLIS").foreach { tsType => @@ -2945,7 +2945,7 @@ index bba71f1c48d..eb0238b8d2a 100644 - test("SPARK-34212 Parquet should read decimals correctly") { + test("SPARK-34212 Parquet should read decimals correctly", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { def readParquet(schema: String, path: File): DataFrame = { spark.read.schema(schema).parquet(path.toString) } @@ -2975,7 +2975,7 @@ index bba71f1c48d..eb0238b8d2a 100644 - test("row group skipping doesn't overflow when reading into larger type") { + test("row group skipping doesn't overflow when reading into larger type", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { withTempPath { path => Seq(0).toDF("a").write.parquet(path.toString) withAllParquetReaders { @@ -3099,7 +3099,7 @@ index 5c0b7def039..151184bc98c 100644 assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size, s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala -index 0acb21f3e6f..0192c6b48f5 100644 +index 0acb21f3e6f..1f9c3fd13fc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -27,7 +27,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName @@ -3127,7 +3127,7 @@ index 0acb21f3e6f..0192c6b48f5 100644 - test("schema mismatch failure error message for parquet vectorized reader") { + test("schema mismatch failure error message for parquet vectorized reader", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { withTempPath { dir => val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true) assert(e.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException]) @@ -3137,7 +3137,7 @@ index 0acb21f3e6f..0192c6b48f5 100644 - test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array") { + test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { import testImplicits._ withTempPath { dir => From 6ed96701e983679dcfcd108a2e354ff5dd81038c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 22 Mar 2026 11:29:59 -0600 Subject: [PATCH 6/8] fix: revert unintended 3.5.8 diff changes --- dev/diffs/3.5.8.diff | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff index 121043e6d6..db495f1e23 100644 --- a/dev/diffs/3.5.8.diff +++ b/dev/diffs/3.5.8.diff @@ -2189,7 +2189,7 @@ index 8ed9ef1630e..a865928c1b2 100644 checkAnswer( // "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -index f6472ba3d9d..5ea2d938664 100644 +index f6472ba3d9d..7f00caf5063 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -185,7 +185,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS @@ -2218,7 +2218,7 @@ index f6472ba3d9d..5ea2d938664 100644 - test("SPARK-34212 Parquet should read decimals correctly") { + test("SPARK-34212 Parquet should read decimals correctly", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { def readParquet(schema: String, path: File): DataFrame = { spark.read.schema(schema).parquet(path.toString) } @@ -2357,7 +2357,7 @@ index 5c0b7def039..151184bc98c 100644 assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size, s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala -index 3f47c5e506f..f1ce3194279 100644 +index 3f47c5e506f..92a5eafec84 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -27,6 +27,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName @@ -2384,7 +2384,7 @@ index 3f47c5e506f..f1ce3194279 100644 - test("schema mismatch failure error message for parquet vectorized reader") { + test("schema mismatch failure error message for parquet vectorized reader", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { withTempPath { dir => val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true) assert(e.getCause.isInstanceOf[SparkException]) @@ -2394,7 +2394,7 @@ index 3f47c5e506f..f1ce3194279 100644 - test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array") { + test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { import testImplicits._ withTempPath { dir => From c8afc9055bdd26a829f61178201d00c8ea12a6c6 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 22 Mar 2026 11:43:01 -0600 Subject: [PATCH 7/8] fix: replace all #3311 references with specific issues in 4.0.1 diff MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - static scan metrics → #3442 (DPP not supported in native_datafusion) - caseSensitive and SPARK-25207 duplicate fields → #3760 (new issue) - Python UDF filter pushdown → fixed by adding CometNativeScanExec pattern matches, removed IgnoreCometNativeDataFusion tag --- dev/diffs/4.0.1.diff | 35 ++++++++++++++--------------------- 1 file changed, 14 insertions(+), 21 deletions(-) diff --git a/dev/diffs/4.0.1.diff b/dev/diffs/4.0.1.diff index 407807cac5..9a4b74776a 100644 --- a/dev/diffs/4.0.1.diff +++ b/dev/diffs/4.0.1.diff @@ -574,7 +574,7 @@ index 81713c777bc..b5f92ed9742 100644 assert(exchanges.size == 2) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala -index 2c24cc7d570..3311e6e3773 100644 +index 2c24cc7d570..12096ea361e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen @@ -669,7 +669,7 @@ index 2c24cc7d570..3311e6e3773 100644 test("static scan metrics", - DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) { + DisableAdaptiveExecution("DPP in AQE must reuse broadcast"), -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3442")) { withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") { @@ -708,7 +708,7 @@ index 9c90e0105a4..fadf2f0f698 100644 test("SPARK-35884: Explain Formatted") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala -index 9c529d14221..6cfd87ad864 100644 +index 9c529d14221..5c4e370dfff 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -33,6 +33,8 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha @@ -748,7 +748,7 @@ index 9c529d14221..6cfd87ad864 100644 Seq("parquet", "orc").foreach { format => - test(s"Spark native readers should respect spark.sql.caseSensitive - ${format}") { + test(s"Spark native readers should respect spark.sql.caseSensitive - ${format}", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3760")) { withTempDir { dir => val tableName = s"spark_25132_${format}_native" val tableDir = dir.getCanonicalPath + s"/$tableName" @@ -2727,7 +2727,7 @@ index cd6f41b4ef4..4b6a17344bc 100644 ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala -index 6080a5e8e4b..dc64436164f 100644 +index 6080a5e8e4b..cef477c8b4d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -38,6 +38,7 @@ import org.apache.parquet.schema.MessageType @@ -2812,7 +2812,7 @@ index 6080a5e8e4b..dc64436164f 100644 - test("SPARK-25207: exception when duplicate fields in case-insensitive mode") { + test("SPARK-25207: exception when duplicate fields in case-insensitive mode", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3760")) { withTempPath { dir => val count = 10 val tableName = "spark_25207" @@ -3316,7 +3316,7 @@ index 0dd90925d3c..7d53ec845ef 100644 spark.range(10).selectExpr("id", "id % 3 as p") .write.partitionBy("p").saveAsTable("testDataForScan") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala -index 0ab8691801d..f1c4b3d92b1 100644 +index 0ab8691801d..a681cb0aa52 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala @@ -18,6 +18,8 @@ @@ -3328,29 +3328,21 @@ index 0ab8691801d..f1c4b3d92b1 100644 import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan, SparkPlanTest} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan -@@ -93,7 +95,8 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { - assert(arrowEvalNodes.size == 2) - } - -- test("Python UDF should not break column pruning/filter pushdown -- Parquet V1") { -+ test("Python UDF should not break column pruning/filter pushdown -- Parquet V1", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { - withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { - withTempPath { f => - spark.range(10).select($"id".as("a"), $"id".as("b")) -@@ -108,6 +111,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -108,6 +110,8 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: FileSourceScanExec => scan + case scan: CometScanExec => scan ++ case scan: CometNativeScanExec => scan } assert(scanNodes.length == 1) assert(scanNodes.head.output.map(_.name) == Seq("a")) -@@ -120,11 +124,16 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -120,11 +124,18 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: FileSourceScanExec => scan + case scan: CometScanExec => scan ++ case scan: CometNativeScanExec => scan } assert(scanNodes.length == 1) // $"a" is not null and $"a" > 1 @@ -3359,13 +3351,14 @@ index 0ab8691801d..f1c4b3d92b1 100644 + val dataFilters = scanNodes.head match { + case scan: FileSourceScanExec => scan.dataFilters + case scan: CometScanExec => scan.dataFilters ++ case scan: CometNativeScanExec => scan.dataFilters + } + assert(dataFilters.length == 2) + assert(dataFilters.flatMap(_.references.map(_.name)).distinct == Seq("a")) } } } -@@ -145,6 +154,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -145,6 +156,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: BatchScanExec => scan @@ -3373,7 +3366,7 @@ index 0ab8691801d..f1c4b3d92b1 100644 } assert(scanNodes.length == 1) assert(scanNodes.head.output.map(_.name) == Seq("a")) -@@ -157,6 +167,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -157,6 +169,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: BatchScanExec => scan From b2907c3360b7e9679264964e850a4a53a271590e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 22 Mar 2026 12:02:36 -0600 Subject: [PATCH 8/8] fix: remove unused IgnoreCometNativeDataFusion import in ExtractPythonUDFsSuite --- dev/diffs/4.0.1.diff | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/dev/diffs/4.0.1.diff b/dev/diffs/4.0.1.diff index 9a4b74776a..a0b1e81d0d 100644 --- a/dev/diffs/4.0.1.diff +++ b/dev/diffs/4.0.1.diff @@ -3316,19 +3316,18 @@ index 0dd90925d3c..7d53ec845ef 100644 spark.range(10).selectExpr("id", "id % 3 as p") .write.partitionBy("p").saveAsTable("testDataForScan") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala -index 0ab8691801d..a681cb0aa52 100644 +index 0ab8691801d..b18a5bea944 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala -@@ -18,6 +18,8 @@ +@@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.python import org.apache.spark.sql.catalyst.plans.logical.{ArrowEvalPython, BatchEvalPython, Limit, LocalLimit} -+import org.apache.spark.sql.IgnoreCometNativeDataFusion +import org.apache.spark.sql.comet._ import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan, SparkPlanTest} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan -@@ -108,6 +110,8 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -108,6 +109,8 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: FileSourceScanExec => scan @@ -3337,7 +3336,7 @@ index 0ab8691801d..a681cb0aa52 100644 } assert(scanNodes.length == 1) assert(scanNodes.head.output.map(_.name) == Seq("a")) -@@ -120,11 +124,18 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -120,11 +123,18 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: FileSourceScanExec => scan @@ -3358,7 +3357,7 @@ index 0ab8691801d..a681cb0aa52 100644 } } } -@@ -145,6 +156,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -145,6 +155,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: BatchScanExec => scan @@ -3366,7 +3365,7 @@ index 0ab8691801d..a681cb0aa52 100644 } assert(scanNodes.length == 1) assert(scanNodes.head.output.map(_.name) == Seq("a")) -@@ -157,6 +169,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -157,6 +168,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: BatchScanExec => scan