From 9ff82bcd674fbc4143c674c7a26274543f466089 Mon Sep 17 00:00:00 2001 From: weimingdiit Date: Tue, 5 May 2026 16:58:51 +0800 Subject: [PATCH 1/2] [AURON #2234] Handle Hudi scan options case-insensitively Signed-off-by: weimingdiit --- .../sql/auron/hudi/HudiScanSupport.scala | 101 +++++++++++------- .../sql/auron/hudi/HudiScanSupportSuite.scala | 16 +++ 2 files changed, 80 insertions(+), 37 deletions(-) diff --git a/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiScanSupport.scala b/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiScanSupport.scala index 63488d30f..11cb5f63b 100644 --- a/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiScanSupport.scala +++ b/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiScanSupport.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.auron.hudi import java.net.URI import java.util.{Locale, Properties} +import scala.collection.JavaConverters._ + import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession @@ -101,19 +103,15 @@ object HudiScanSupport extends Logging { } private def tableTypeFromOptions(options: Map[String, String]): Option[String] = { - hudiTableTypeKeys - .flatMap(key => options.get(key)) - .headOption + caseInsensitiveValue(options, hudiTableTypeKeys) } - private def baseFileFormatFromOptions(options: Map[String, String]): Option[String] = { - hudiBaseFileFormatKeys - .flatMap(key => options.get(key)) - .headOption + private[hudi] def baseFileFormatFromOptions(options: Map[String, String]): Option[String] = { + caseInsensitiveValue(options, hudiBaseFileFormatKeys) } private def tableTypeFromMeta(options: Map[String, String]): Option[String] = { - val basePath = options.get("path").map(normalizePath) + val basePath = caseInsensitiveValue(options, Seq("path")).map(normalizePath) basePath.flatMap { path => try { val hadoopConf = SparkSession.active.sessionState.newHadoopConf() @@ -124,15 +122,16 @@ object HudiScanSupport extends Logging { if (log.isDebugEnabled()) { logDebug(s"Hudi table properties not found at: $propsPath") } - return None - } - val in = fs.open(propsPath) - try { - val props = new Properties() - props.load(in) - Option(props.getProperty("hoodie.table.type")) - } finally { - in.close() + None + } else { + val in = fs.open(propsPath) + try { + val props = new Properties() + props.load(in) + caseInsensitivePropertyValue(props, Seq("hoodie.table.type")) + } finally { + in.close() + } } } catch { case t: Throwable => @@ -145,7 +144,7 @@ object HudiScanSupport extends Logging { } private def baseFileFormatFromMeta(options: Map[String, String]): Option[String] = { - val basePath = options.get("path").map(normalizePath) + val basePath = caseInsensitiveValue(options, Seq("path")).map(normalizePath) basePath.flatMap { path => try { val hadoopConf = SparkSession.active.sessionState.newHadoopConf() @@ -156,15 +155,16 @@ object HudiScanSupport extends Logging { if (log.isDebugEnabled()) { logDebug(s"Hudi table properties not found at: $propsPath") } - return None - } - val in = fs.open(propsPath) - try { - val props = new Properties() - props.load(in) - Option(props.getProperty("hoodie.table.base.file.format")) - } finally { - in.close() + None + } else { + val in = fs.open(propsPath) + try { + val props = new Properties() + props.load(in) + caseInsensitivePropertyValue(props, Seq("hoodie.table.base.file.format")) + } finally { + in.close() + } } } catch { case t: Throwable => @@ -179,7 +179,7 @@ object HudiScanSupport extends Logging { private def baseFileFormatFromCatalog(catalogTable: Option[CatalogTable]): Option[String] = { catalogTable.flatMap { table => val props = table.properties ++ table.storage.properties - hudiBaseFileFormatKeys.flatMap(props.get).headOption + caseInsensitiveValue(props, hudiBaseFileFormatKeys) } } @@ -205,7 +205,7 @@ object HudiScanSupport extends Logging { private def tableTypeFromCatalog(catalogTable: Option[CatalogTable]): Option[String] = { catalogTable.flatMap { table => val props = table.properties ++ table.storage.properties - hudiTableTypeKeys.flatMap(props.get).headOption + caseInsensitiveValue(props, hudiTableTypeKeys) } } @@ -236,14 +236,13 @@ object HudiScanSupport extends Logging { } private def hasTimeTravel(options: Map[String, String]): Boolean = { - val keys = options.keys.map(_.toLowerCase(Locale.ROOT)) - keys.exists { - case "as.of.instant" => true - case "as.of.timestamp" => true - case "hoodie.datasource.read.as.of.instant" => true - case "hoodie.datasource.read.as.of.timestamp" => true - case _ => false - } + caseInsensitiveValue( + options, + Seq( + "as.of.instant", + "as.of.timestamp", + "hoodie.datasource.read.as.of.instant", + "hoodie.datasource.read.as.of.timestamp")).isDefined } private def normalizePath(rawPath: String): String = { @@ -254,4 +253,32 @@ object HudiScanSupport extends Logging { case _: Throwable => rawPath } } + + private def caseInsensitivePropertyValue( + props: Properties, + keys: Seq[String]): Option[String] = { + keys.iterator + .map { lookupKey => + Option(props.getProperty(lookupKey)).orElse { + props.stringPropertyNames().asScala.iterator.collectFirst { + case key if key.equalsIgnoreCase(lookupKey) => props.getProperty(key) + } + } + } + .collectFirst { case Some(value) => value } + } + + private def caseInsensitiveValue( + values: Map[String, String], + keys: Seq[String]): Option[String] = { + keys.iterator + .map { lookupKey => + values.get(lookupKey).orElse { + values.iterator.collectFirst { + case (key, value) if key.equalsIgnoreCase(lookupKey) => value + } + } + } + .collectFirst { case Some(value) => value } + } } diff --git a/thirdparty/auron-hudi/src/test/scala/org/apache/spark/sql/auron/hudi/HudiScanSupportSuite.scala b/thirdparty/auron-hudi/src/test/scala/org/apache/spark/sql/auron/hudi/HudiScanSupportSuite.scala index a388507cb..9e660debd 100644 --- a/thirdparty/auron-hudi/src/test/scala/org/apache/spark/sql/auron/hudi/HudiScanSupportSuite.scala +++ b/thirdparty/auron-hudi/src/test/scala/org/apache/spark/sql/auron/hudi/HudiScanSupportSuite.scala @@ -226,6 +226,22 @@ class HudiScanSupportSuite extends SparkFunSuite with SharedSparkSession { options)) } + test("hudi scan options are case-insensitive") { + val options = Map( + "Hoodie.DataSource.Write.Table.Type" -> "MERGE_ON_READ", + "Hoodie.Table.Base.File.Format" -> "ORC") + val timeTravelOptions = Map("Hoodie.DataSource.Read.As.Of.Instant" -> "20240101010101") + assert( + !HudiScanSupport.isSupported( + "org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat", + options)) + assert( + !HudiScanSupport.isSupported( + "org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat", + timeTravelOptions)) + assert(HudiScanSupport.baseFileFormatFromOptions(options).contains("ORC")) + } + test("hudi isSupported allows default COW") { assert( HudiScanSupport.isSupported( From 7337bbcddd378eb8defc1bafe1a9cb8d7d5cdc5c Mon Sep 17 00:00:00 2001 From: weimingdiit Date: Sat, 9 May 2026 11:55:58 +0800 Subject: [PATCH 2/2] [AURON #2248] Support native Hudi scan for MOR read optimized queries Signed-off-by: weimingdiit --- .../sql/auron/hudi/HudiScanSupport.scala | 20 ++++++- .../sql/auron/hudi/HudiScanSupportSuite.scala | 53 ++++++++++++++++++- 2 files changed, 70 insertions(+), 3 deletions(-) diff --git a/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiScanSupport.scala b/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiScanSupport.scala index 11cb5f63b..ef4bc9b06 100644 --- a/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiScanSupport.scala +++ b/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiScanSupport.scala @@ -38,10 +38,14 @@ object HudiScanSupport extends Logging { private val hudiOrcFileFormatSuffix = "HoodieOrcFileFormat" private val newHudiOrcFileFormatSuffix = "NewHoodieOrcFileFormat" private val morTableTypes = Set("merge_on_read", "mor") + private val readOptimizedQueryTypes = Set("read_optimized") + private val unsupportedQueryTypes = Set("snapshot", "incremental", "realtime") private val hudiTableTypeKeys = Seq( "hoodie.datasource.write.table.type", "hoodie.datasource.read.table.type", "hoodie.table.type") + private val hudiQueryTypeKeys = + Seq("hoodie.datasource.query.type", "hoodie.datasource.view.type") private val hudiBaseFileFormatKeys = Seq( "hoodie.table.base.file.format", "hoodie.datasource.write.base.file.format", @@ -95,17 +99,29 @@ object HudiScanSupport extends Logging { .orElse(tableTypeFromCatalog(catalogTable)) .orElse(tableTypeFromMeta(options)) .map(_.toLowerCase(Locale.ROOT)) + val queryType = queryTypeFromOptions(options).map(_.toLowerCase(Locale.ROOT)) logDebug(s"Hudi tableType resolved to: ${tableType.getOrElse("unknown")}") - // Only support basic COW tables for the base version. - !tableType.exists(morTableTypes.contains) + queryType match { + case Some(query) if readOptimizedQueryTypes.contains(query) => true + case Some(query) if unsupportedQueryTypes.contains(query) => false + case Some(_) => false + case None => + // MOR snapshot reads may need log-file merging. Native scan is safe only + // when the query explicitly requests read-optimized base-file reads. + !tableType.exists(morTableTypes.contains) + } } private def tableTypeFromOptions(options: Map[String, String]): Option[String] = { caseInsensitiveValue(options, hudiTableTypeKeys) } + private def queryTypeFromOptions(options: Map[String, String]): Option[String] = { + caseInsensitiveValue(options, hudiQueryTypeKeys) + } + private[hudi] def baseFileFormatFromOptions(options: Map[String, String]): Option[String] = { caseInsensitiveValue(options, hudiBaseFileFormatKeys) } diff --git a/thirdparty/auron-hudi/src/test/scala/org/apache/spark/sql/auron/hudi/HudiScanSupportSuite.scala b/thirdparty/auron-hudi/src/test/scala/org/apache/spark/sql/auron/hudi/HudiScanSupportSuite.scala index 9e660debd..02ba6da44 100644 --- a/thirdparty/auron-hudi/src/test/scala/org/apache/spark/sql/auron/hudi/HudiScanSupportSuite.scala +++ b/thirdparty/auron-hudi/src/test/scala/org/apache/spark/sql/auron/hudi/HudiScanSupportSuite.scala @@ -186,6 +186,10 @@ class HudiScanSupportSuite extends SparkFunSuite with SharedSparkSession { Option(props.getProperty("hoodie.table.base.file.format")) } + private def hudiTablePath(tableName: String): String = { + new File(warehouseDir, tableName).getAbsolutePath + } + private def assumeSparkAtLeast(version: String): Unit = { val current = SparkVersionUtil.SPARK_RUNTIME_VERSION assume(current >= version, s"Requires Spark >= $version, current Spark $current") @@ -218,23 +222,40 @@ class HudiScanSupportSuite extends SparkFunSuite with SharedSparkSession { .isEmpty) } - test("hudi isSupported rejects MOR table types") { + test("hudi isSupported rejects MOR table types unless read optimized") { val options = Map("hoodie.datasource.write.table.type" -> "MERGE_ON_READ") assert( !HudiScanSupport.isSupported( "org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat", options)) + assert( + HudiScanSupport.isSupported( + "org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat", + options + ("hoodie.datasource.query.type" -> "read_optimized"))) + assert( + !HudiScanSupport.isSupported( + "org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat", + options + ("hoodie.datasource.query.type" -> "snapshot"))) + assert( + !HudiScanSupport.isSupported( + "org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat", + options + ("hoodie.datasource.query.type" -> "incremental"))) } test("hudi scan options are case-insensitive") { val options = Map( "Hoodie.DataSource.Write.Table.Type" -> "MERGE_ON_READ", "Hoodie.Table.Base.File.Format" -> "ORC") + val readOptimizedOptions = options + ("Hoodie.DataSource.Query.Type" -> "READ_OPTIMIZED") val timeTravelOptions = Map("Hoodie.DataSource.Read.As.Of.Instant" -> "20240101010101") assert( !HudiScanSupport.isSupported( "org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat", options)) + assert( + HudiScanSupport.isSupported( + "org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat", + readOptimizedOptions)) assert( !HudiScanSupport.isSupported( "org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat", @@ -350,4 +371,34 @@ class HudiScanSupportSuite extends SparkFunSuite with SharedSparkSession { assertHasNativeParquetScan(converted) } } + + test("hudi: MOR read-optimized table scan converts to native") { + withTable("hudi_mor_read_optimized") { + spark.sql("""create table hudi_mor_read_optimized (id int, name string) + |using hudi + |tblproperties ( + | 'hoodie.datasource.write.table.type' = 'MERGE_ON_READ' + |)""".stripMargin) + spark.sql("insert into hudi_mor_read_optimized values (1, 'v1'), (2, 'v2')") + + val df = spark.read + .format("hudi") + .option("hoodie.datasource.query.type", "read_optimized") + .load(hudiTablePath("hudi_mor_read_optimized")) + .select("id", "name") + .orderBy("id") + + logFileFormats(df) + val rows = df.collect().toSeq + assert(rows == Seq(Row(1, "v1"), Row(2, "v2"))) + val scan = df.queryExecution.sparkPlan.collectFirst { + case s: org.apache.spark.sql.execution.FileSourceScanExec => s + } + assert(scan.isDefined) + val provider = new HudiConvertProvider + assert(provider.isSupported(scan.get)) + val converted = provider.convert(scan.get) + assertHasNativeParquetScan(converted) + } + } }