diff --git a/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiScanSupport.scala b/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiScanSupport.scala index 63488d30f..11cb5f63b 100644 --- a/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiScanSupport.scala +++ b/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiScanSupport.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.auron.hudi import java.net.URI import java.util.{Locale, Properties} +import scala.collection.JavaConverters._ + import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession @@ -101,19 +103,15 @@ object HudiScanSupport extends Logging { } private def tableTypeFromOptions(options: Map[String, String]): Option[String] = { - hudiTableTypeKeys - .flatMap(key => options.get(key)) - .headOption + caseInsensitiveValue(options, hudiTableTypeKeys) } - private def baseFileFormatFromOptions(options: Map[String, String]): Option[String] = { - hudiBaseFileFormatKeys - .flatMap(key => options.get(key)) - .headOption + private[hudi] def baseFileFormatFromOptions(options: Map[String, String]): Option[String] = { + caseInsensitiveValue(options, hudiBaseFileFormatKeys) } private def tableTypeFromMeta(options: Map[String, String]): Option[String] = { - val basePath = options.get("path").map(normalizePath) + val basePath = caseInsensitiveValue(options, Seq("path")).map(normalizePath) basePath.flatMap { path => try { val hadoopConf = SparkSession.active.sessionState.newHadoopConf() @@ -124,15 +122,16 @@ object HudiScanSupport extends Logging { if (log.isDebugEnabled()) { logDebug(s"Hudi table properties not found at: $propsPath") } - return None - } - val in = fs.open(propsPath) - try { - val props = new Properties() - props.load(in) - Option(props.getProperty("hoodie.table.type")) - } finally { - in.close() + None + } else { + val in = fs.open(propsPath) + try { + val props = new Properties() + props.load(in) + caseInsensitivePropertyValue(props, Seq("hoodie.table.type")) + } finally { + in.close() + } } } catch { case t: Throwable => @@ -145,7 +144,7 @@ object HudiScanSupport extends Logging { } private def baseFileFormatFromMeta(options: Map[String, String]): Option[String] = { - val basePath = options.get("path").map(normalizePath) + val basePath = caseInsensitiveValue(options, Seq("path")).map(normalizePath) basePath.flatMap { path => try { val hadoopConf = SparkSession.active.sessionState.newHadoopConf() @@ -156,15 +155,16 @@ object HudiScanSupport extends Logging { if (log.isDebugEnabled()) { logDebug(s"Hudi table properties not found at: $propsPath") } - return None - } - val in = fs.open(propsPath) - try { - val props = new Properties() - props.load(in) - Option(props.getProperty("hoodie.table.base.file.format")) - } finally { - in.close() + None + } else { + val in = fs.open(propsPath) + try { + val props = new Properties() + props.load(in) + caseInsensitivePropertyValue(props, Seq("hoodie.table.base.file.format")) + } finally { + in.close() + } } } catch { case t: Throwable => @@ -179,7 +179,7 @@ object HudiScanSupport extends Logging { private def baseFileFormatFromCatalog(catalogTable: Option[CatalogTable]): Option[String] = { catalogTable.flatMap { table => val props = table.properties ++ table.storage.properties - hudiBaseFileFormatKeys.flatMap(props.get).headOption + caseInsensitiveValue(props, hudiBaseFileFormatKeys) } } @@ -205,7 +205,7 @@ object HudiScanSupport extends Logging { private def tableTypeFromCatalog(catalogTable: Option[CatalogTable]): Option[String] = { catalogTable.flatMap { table => val props = table.properties ++ table.storage.properties - hudiTableTypeKeys.flatMap(props.get).headOption + caseInsensitiveValue(props, hudiTableTypeKeys) } } @@ -236,14 +236,13 @@ object HudiScanSupport extends Logging { } private def hasTimeTravel(options: Map[String, String]): Boolean = { - val keys = options.keys.map(_.toLowerCase(Locale.ROOT)) - keys.exists { - case "as.of.instant" => true - case "as.of.timestamp" => true - case "hoodie.datasource.read.as.of.instant" => true - case "hoodie.datasource.read.as.of.timestamp" => true - case _ => false - } + caseInsensitiveValue( + options, + Seq( + "as.of.instant", + "as.of.timestamp", + "hoodie.datasource.read.as.of.instant", + "hoodie.datasource.read.as.of.timestamp")).isDefined } private def normalizePath(rawPath: String): String = { @@ -254,4 +253,32 @@ object HudiScanSupport extends Logging { case _: Throwable => rawPath } } + + private def caseInsensitivePropertyValue( + props: Properties, + keys: Seq[String]): Option[String] = { + keys.iterator + .map { lookupKey => + Option(props.getProperty(lookupKey)).orElse { + props.stringPropertyNames().asScala.iterator.collectFirst { + case key if key.equalsIgnoreCase(lookupKey) => props.getProperty(key) + } + } + } + .collectFirst { case Some(value) => value } + } + + private def caseInsensitiveValue( + values: Map[String, String], + keys: Seq[String]): Option[String] = { + keys.iterator + .map { lookupKey => + values.get(lookupKey).orElse { + values.iterator.collectFirst { + case (key, value) if key.equalsIgnoreCase(lookupKey) => value + } + } + } + .collectFirst { case Some(value) => value } + } } diff --git a/thirdparty/auron-hudi/src/test/scala/org/apache/spark/sql/auron/hudi/HudiScanSupportSuite.scala b/thirdparty/auron-hudi/src/test/scala/org/apache/spark/sql/auron/hudi/HudiScanSupportSuite.scala index a388507cb..9e660debd 100644 --- a/thirdparty/auron-hudi/src/test/scala/org/apache/spark/sql/auron/hudi/HudiScanSupportSuite.scala +++ b/thirdparty/auron-hudi/src/test/scala/org/apache/spark/sql/auron/hudi/HudiScanSupportSuite.scala @@ -226,6 +226,22 @@ class HudiScanSupportSuite extends SparkFunSuite with SharedSparkSession { options)) } + test("hudi scan options are case-insensitive") { + val options = Map( + "Hoodie.DataSource.Write.Table.Type" -> "MERGE_ON_READ", + "Hoodie.Table.Base.File.Format" -> "ORC") + val timeTravelOptions = Map("Hoodie.DataSource.Read.As.Of.Instant" -> "20240101010101") + assert( + !HudiScanSupport.isSupported( + "org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat", + options)) + assert( + !HudiScanSupport.isSupported( + "org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat", + timeTravelOptions)) + assert(HudiScanSupport.baseFileFormatFromOptions(options).contains("ORC")) + } + test("hudi isSupported allows default COW") { assert( HudiScanSupport.isSupported(