From 9ff82bcd674fbc4143c674c7a26274543f466089 Mon Sep 17 00:00:00 2001 From: weimingdiit Date: Tue, 5 May 2026 16:58:51 +0800 Subject: [PATCH 1/2] [AURON #2234] Handle Hudi scan options case-insensitively Signed-off-by: weimingdiit --- .../sql/auron/hudi/HudiScanSupport.scala | 101 +++++++++++------- .../sql/auron/hudi/HudiScanSupportSuite.scala | 16 +++ 2 files changed, 80 insertions(+), 37 deletions(-) diff --git a/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiScanSupport.scala b/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiScanSupport.scala index 63488d30f..11cb5f63b 100644 --- a/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiScanSupport.scala +++ b/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiScanSupport.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.auron.hudi import java.net.URI import java.util.{Locale, Properties} +import scala.collection.JavaConverters._ + import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession @@ -101,19 +103,15 @@ object HudiScanSupport extends Logging { } private def tableTypeFromOptions(options: Map[String, String]): Option[String] = { - hudiTableTypeKeys - .flatMap(key => options.get(key)) - .headOption + caseInsensitiveValue(options, hudiTableTypeKeys) } - private def baseFileFormatFromOptions(options: Map[String, String]): Option[String] = { - hudiBaseFileFormatKeys - .flatMap(key => options.get(key)) - .headOption + private[hudi] def baseFileFormatFromOptions(options: Map[String, String]): Option[String] = { + caseInsensitiveValue(options, hudiBaseFileFormatKeys) } private def tableTypeFromMeta(options: Map[String, String]): Option[String] = { - val basePath = options.get("path").map(normalizePath) + val basePath = caseInsensitiveValue(options, Seq("path")).map(normalizePath) basePath.flatMap { path => try { val hadoopConf = SparkSession.active.sessionState.newHadoopConf() @@ -124,15 +122,16 @@ object HudiScanSupport extends Logging { if (log.isDebugEnabled()) { logDebug(s"Hudi table properties not found at: $propsPath") } - return None - } - val in = fs.open(propsPath) - try { - val props = new Properties() - props.load(in) - Option(props.getProperty("hoodie.table.type")) - } finally { - in.close() + None + } else { + val in = fs.open(propsPath) + try { + val props = new Properties() + props.load(in) + caseInsensitivePropertyValue(props, Seq("hoodie.table.type")) + } finally { + in.close() + } } } catch { case t: Throwable => @@ -145,7 +144,7 @@ object HudiScanSupport extends Logging { } private def baseFileFormatFromMeta(options: Map[String, String]): Option[String] = { - val basePath = options.get("path").map(normalizePath) + val basePath = caseInsensitiveValue(options, Seq("path")).map(normalizePath) basePath.flatMap { path => try { val hadoopConf = SparkSession.active.sessionState.newHadoopConf() @@ -156,15 +155,16 @@ object HudiScanSupport extends Logging { if (log.isDebugEnabled()) { logDebug(s"Hudi table properties not found at: $propsPath") } - return None - } - val in = fs.open(propsPath) - try { - val props = new Properties() - props.load(in) - Option(props.getProperty("hoodie.table.base.file.format")) - } finally { - in.close() + None + } else { + val in = fs.open(propsPath) + try { + val props = new Properties() + props.load(in) + caseInsensitivePropertyValue(props, Seq("hoodie.table.base.file.format")) + } finally { + in.close() + } } } catch { case t: Throwable => @@ -179,7 +179,7 @@ object HudiScanSupport extends Logging { private def baseFileFormatFromCatalog(catalogTable: Option[CatalogTable]): Option[String] = { catalogTable.flatMap { table => val props = table.properties ++ table.storage.properties - hudiBaseFileFormatKeys.flatMap(props.get).headOption + caseInsensitiveValue(props, hudiBaseFileFormatKeys) } } @@ -205,7 +205,7 @@ object HudiScanSupport extends Logging { private def tableTypeFromCatalog(catalogTable: Option[CatalogTable]): Option[String] = { catalogTable.flatMap { table => val props = table.properties ++ table.storage.properties - hudiTableTypeKeys.flatMap(props.get).headOption + caseInsensitiveValue(props, hudiTableTypeKeys) } } @@ -236,14 +236,13 @@ object HudiScanSupport extends Logging { } private def hasTimeTravel(options: Map[String, String]): Boolean = { - val keys = options.keys.map(_.toLowerCase(Locale.ROOT)) - keys.exists { - case "as.of.instant" => true - case "as.of.timestamp" => true - case "hoodie.datasource.read.as.of.instant" => true - case "hoodie.datasource.read.as.of.timestamp" => true - case _ => false - } + caseInsensitiveValue( + options, + Seq( + "as.of.instant", + "as.of.timestamp", + "hoodie.datasource.read.as.of.instant", + "hoodie.datasource.read.as.of.timestamp")).isDefined } private def normalizePath(rawPath: String): String = { @@ -254,4 +253,32 @@ object HudiScanSupport extends Logging { case _: Throwable => rawPath } } + + private def caseInsensitivePropertyValue( + props: Properties, + keys: Seq[String]): Option[String] = { + keys.iterator + .map { lookupKey => + Option(props.getProperty(lookupKey)).orElse { + props.stringPropertyNames().asScala.iterator.collectFirst { + case key if key.equalsIgnoreCase(lookupKey) => props.getProperty(key) + } + } + } + .collectFirst { case Some(value) => value } + } + + private def caseInsensitiveValue( + values: Map[String, String], + keys: Seq[String]): Option[String] = { + keys.iterator + .map { lookupKey => + values.get(lookupKey).orElse { + values.iterator.collectFirst { + case (key, value) if key.equalsIgnoreCase(lookupKey) => value + } + } + } + .collectFirst { case Some(value) => value } + } } diff --git a/thirdparty/auron-hudi/src/test/scala/org/apache/spark/sql/auron/hudi/HudiScanSupportSuite.scala b/thirdparty/auron-hudi/src/test/scala/org/apache/spark/sql/auron/hudi/HudiScanSupportSuite.scala index a388507cb..9e660debd 100644 --- a/thirdparty/auron-hudi/src/test/scala/org/apache/spark/sql/auron/hudi/HudiScanSupportSuite.scala +++ b/thirdparty/auron-hudi/src/test/scala/org/apache/spark/sql/auron/hudi/HudiScanSupportSuite.scala @@ -226,6 +226,22 @@ class HudiScanSupportSuite extends SparkFunSuite with SharedSparkSession { options)) } + test("hudi scan options are case-insensitive") { + val options = Map( + "Hoodie.DataSource.Write.Table.Type" -> "MERGE_ON_READ", + "Hoodie.Table.Base.File.Format" -> "ORC") + val timeTravelOptions = Map("Hoodie.DataSource.Read.As.Of.Instant" -> "20240101010101") + assert( + !HudiScanSupport.isSupported( + "org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat", + options)) + assert( + !HudiScanSupport.isSupported( + "org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat", + timeTravelOptions)) + assert(HudiScanSupport.baseFileFormatFromOptions(options).contains("ORC")) + } + test("hudi isSupported allows default COW") { assert( HudiScanSupport.isSupported( From 50588cccae430b8a95ad9a1864c5d46f6bae5d32 Mon Sep 17 00:00:00 2001 From: weimingdiit Date: Sat, 9 May 2026 11:16:13 +0800 Subject: [PATCH 2/2] [AURON #2246] Reuse Hudi table properties when detecting native scan support Signed-off-by: weimingdiit --- .../sql/auron/hudi/HudiConvertProvider.scala | 6 +- .../sql/auron/hudi/HudiScanSupport.scala | 87 +++++++++---------- 2 files changed, 44 insertions(+), 49 deletions(-) diff --git a/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiConvertProvider.scala b/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiConvertProvider.scala index da96d5644..5623b235a 100644 --- a/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiConvertProvider.scala +++ b/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiConvertProvider.scala @@ -44,15 +44,15 @@ class HudiConvertProvider extends AuronConvertProvider with Logging { exec match { case scan: FileSourceScanExec => // Only handle Hudi-backed file scans; other scans fall through. - HudiScanSupport.isSupported(scan) + HudiScanSupport.supportedFileFormat(scan).nonEmpty case _ => false } } override def convert(exec: SparkPlan): SparkPlan = { exec match { - case scan: FileSourceScanExec if HudiScanSupport.isSupported(scan) => - HudiScanSupport.fileFormat(scan) match { + case scan: FileSourceScanExec => + HudiScanSupport.supportedFileFormat(scan) match { case Some(HudiScanSupport.ParquetFormat) => if (!SparkAuronConfiguration.ENABLE_SCAN_PARQUET.get()) { return exec diff --git a/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiScanSupport.scala b/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiScanSupport.scala index 11cb5f63b..9251de9dd 100644 --- a/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiScanSupport.scala +++ b/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiScanSupport.scala @@ -20,6 +20,7 @@ import java.net.URI import java.util.{Locale, Properties} import scala.collection.JavaConverters._ +import scala.util.Using import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging @@ -48,6 +49,15 @@ object HudiScanSupport extends Logging { "hoodie.datasource.write.storage.type") def fileFormat(scan: FileSourceScanExec): Option[HudiFileFormat] = { + val catalog = catalogTable(scan.relation) + lazy val tableProperties = hudiTablePropertiesFromMeta(scan.relation.options) + fileFormat(scan, catalog, tableProperties) + } + + private def fileFormat( + scan: FileSourceScanExec, + catalogTable: Option[CatalogTable], + tableProperties: => Option[Properties]): Option[HudiFileFormat] = { val fileFormatName = scan.relation.fileFormat.getClass.getName val fromClass = fileFormat(fileFormatName) if (fromClass.nonEmpty) { @@ -55,7 +65,7 @@ object HudiScanSupport extends Logging { } // Spark may report generic Orc/Parquet formats for Hudi; use metadata fallback // only when the underlying file index indicates a Hudi table. - fileFormatFromMeta(scan, catalogTable(scan.relation), fileFormatName) + fileFormatFromMeta(scan, catalogTable, tableProperties, fileFormatName) } private[hudi] def fileFormat(fileFormatName: String): Option[HudiFileFormat] = { @@ -74,16 +84,28 @@ object HudiScanSupport extends Logging { } def isSupported(scan: FileSourceScanExec): Boolean = - isSupported(fileFormat(scan), scan.relation.options, catalogTable(scan.relation)) + supportedFileFormat(scan).nonEmpty + + def supportedFileFormat(scan: FileSourceScanExec): Option[HudiFileFormat] = { + val catalog = catalogTable(scan.relation) + lazy val tableProperties = hudiTablePropertiesFromMeta(scan.relation.options) + val resolvedFileFormat = fileFormat(scan, catalog, tableProperties) + if (isSupported(resolvedFileFormat, scan.relation.options, catalog, tableProperties)) { + resolvedFileFormat + } else { + None + } + } private[hudi] def isSupported(fileFormatName: String, options: Map[String, String]): Boolean = { - isSupported(fileFormat(fileFormatName), options, None) + isSupported(fileFormat(fileFormatName), options, None, None) } private[hudi] def isSupported( fileFormat: Option[HudiFileFormat], options: Map[String, String], - catalogTable: Option[CatalogTable]): Boolean = { + catalogTable: Option[CatalogTable], + tableProperties: => Option[Properties]): Boolean = { if (fileFormat.isEmpty) { return false } @@ -93,7 +115,7 @@ object HudiScanSupport extends Logging { val tableType = tableTypeFromOptions(options) .orElse(tableTypeFromCatalog(catalogTable)) - .orElse(tableTypeFromMeta(options)) + .orElse(tableTypeFromMeta(tableProperties)) .map(_.toLowerCase(Locale.ROOT)) logDebug(s"Hudi tableType resolved to: ${tableType.getOrElse("unknown")}") @@ -110,40 +132,15 @@ object HudiScanSupport extends Logging { caseInsensitiveValue(options, hudiBaseFileFormatKeys) } - private def tableTypeFromMeta(options: Map[String, String]): Option[String] = { - val basePath = caseInsensitiveValue(options, Seq("path")).map(normalizePath) - basePath.flatMap { path => - try { - val hadoopConf = SparkSession.active.sessionState.newHadoopConf() - val base = new Path(path) - val fs = base.getFileSystem(hadoopConf) - val propsPath = new Path(base, ".hoodie/hoodie.properties") - if (!fs.exists(propsPath)) { - if (log.isDebugEnabled()) { - logDebug(s"Hudi table properties not found at: $propsPath") - } - None - } else { - val in = fs.open(propsPath) - try { - val props = new Properties() - props.load(in) - caseInsensitivePropertyValue(props, Seq("hoodie.table.type")) - } finally { - in.close() - } - } - } catch { - case t: Throwable => - if (log.isDebugEnabled()) { - logDebug(s"Failed to load hudi table type from $path", t) - } - None - } - } - } + private def tableTypeFromMeta(tableProperties: Option[Properties]): Option[String] = + tableProperties.flatMap(props => + caseInsensitivePropertyValue(props, Seq("hoodie.table.type"))) + + private def baseFileFormatFromMeta(tableProperties: Option[Properties]): Option[String] = + tableProperties.flatMap(props => + caseInsensitivePropertyValue(props, Seq("hoodie.table.base.file.format"))) - private def baseFileFormatFromMeta(options: Map[String, String]): Option[String] = { + private def hudiTablePropertiesFromMeta(options: Map[String, String]): Option[Properties] = { val basePath = caseInsensitiveValue(options, Seq("path")).map(normalizePath) basePath.flatMap { path => try { @@ -157,19 +154,16 @@ object HudiScanSupport extends Logging { } None } else { - val in = fs.open(propsPath) - try { - val props = new Properties() + val props = new Properties() + Using.resource(fs.open(propsPath)) { in => props.load(in) - caseInsensitivePropertyValue(props, Seq("hoodie.table.base.file.format")) - } finally { - in.close() } + Some(props) } } catch { case t: Throwable => if (log.isDebugEnabled()) { - logDebug(s"Failed to load hudi base file format from $path", t) + logDebug(s"Failed to load Hudi table properties from $path", t) } None } @@ -186,6 +180,7 @@ object HudiScanSupport extends Logging { private def fileFormatFromMeta( scan: FileSourceScanExec, catalogTable: Option[CatalogTable], + tableProperties: => Option[Properties], fileFormatName: String): Option[HudiFileFormat] = { // Avoid treating non-Hudi tables as Hudi when Spark reports generic formats. if (!isHudiFileIndex(scan.relation.location)) { @@ -193,7 +188,7 @@ object HudiScanSupport extends Logging { } val baseFormat = baseFileFormatFromOptions(scan.relation.options) .orElse(baseFileFormatFromCatalog(catalogTable)) - .orElse(baseFileFormatFromMeta(scan.relation.options)) + .orElse(baseFileFormatFromMeta(tableProperties)) .map(_.toLowerCase(Locale.ROOT)) baseFormat.flatMap { case "orc" if fileFormatName.contains("OrcFileFormat") => Some(OrcFormat)