Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.auron.hudi
import java.net.URI
import java.util.{Locale, Properties}

import scala.collection.JavaConverters._

import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
Expand Down Expand Up @@ -101,19 +103,15 @@ object HudiScanSupport extends Logging {
}

private def tableTypeFromOptions(options: Map[String, String]): Option[String] = {
hudiTableTypeKeys
.flatMap(key => options.get(key))
.headOption
caseInsensitiveValue(options, hudiTableTypeKeys)
}

private def baseFileFormatFromOptions(options: Map[String, String]): Option[String] = {
hudiBaseFileFormatKeys
.flatMap(key => options.get(key))
.headOption
private[hudi] def baseFileFormatFromOptions(options: Map[String, String]): Option[String] = {
caseInsensitiveValue(options, hudiBaseFileFormatKeys)
}

private def tableTypeFromMeta(options: Map[String, String]): Option[String] = {
val basePath = options.get("path").map(normalizePath)
val basePath = caseInsensitiveValue(options, Seq("path")).map(normalizePath)
basePath.flatMap { path =>
try {
val hadoopConf = SparkSession.active.sessionState.newHadoopConf()
Expand All @@ -124,15 +122,16 @@ object HudiScanSupport extends Logging {
if (log.isDebugEnabled()) {
logDebug(s"Hudi table properties not found at: $propsPath")
}
return None
}
val in = fs.open(propsPath)
try {
val props = new Properties()
props.load(in)
Option(props.getProperty("hoodie.table.type"))
} finally {
in.close()
None
} else {
val in = fs.open(propsPath)
try {
val props = new Properties()
props.load(in)
caseInsensitivePropertyValue(props, Seq("hoodie.table.type"))
} finally {
in.close()
}
}
} catch {
case t: Throwable =>
Expand All @@ -145,7 +144,7 @@ object HudiScanSupport extends Logging {
}

private def baseFileFormatFromMeta(options: Map[String, String]): Option[String] = {
val basePath = options.get("path").map(normalizePath)
val basePath = caseInsensitiveValue(options, Seq("path")).map(normalizePath)
basePath.flatMap { path =>
try {
val hadoopConf = SparkSession.active.sessionState.newHadoopConf()
Expand All @@ -156,15 +155,16 @@ object HudiScanSupport extends Logging {
if (log.isDebugEnabled()) {
logDebug(s"Hudi table properties not found at: $propsPath")
}
return None
}
val in = fs.open(propsPath)
try {
val props = new Properties()
props.load(in)
Option(props.getProperty("hoodie.table.base.file.format"))
} finally {
in.close()
None
} else {
val in = fs.open(propsPath)
try {
val props = new Properties()
props.load(in)
caseInsensitivePropertyValue(props, Seq("hoodie.table.base.file.format"))
} finally {
in.close()
}
}
} catch {
case t: Throwable =>
Expand All @@ -179,7 +179,7 @@ object HudiScanSupport extends Logging {
private def baseFileFormatFromCatalog(catalogTable: Option[CatalogTable]): Option[String] = {
catalogTable.flatMap { table =>
val props = table.properties ++ table.storage.properties
hudiBaseFileFormatKeys.flatMap(props.get).headOption
caseInsensitiveValue(props, hudiBaseFileFormatKeys)
}
}

Expand All @@ -205,7 +205,7 @@ object HudiScanSupport extends Logging {
private def tableTypeFromCatalog(catalogTable: Option[CatalogTable]): Option[String] = {
catalogTable.flatMap { table =>
val props = table.properties ++ table.storage.properties
hudiTableTypeKeys.flatMap(props.get).headOption
caseInsensitiveValue(props, hudiTableTypeKeys)
}
}

Expand Down Expand Up @@ -236,14 +236,13 @@ object HudiScanSupport extends Logging {
}

private def hasTimeTravel(options: Map[String, String]): Boolean = {
val keys = options.keys.map(_.toLowerCase(Locale.ROOT))
keys.exists {
case "as.of.instant" => true
case "as.of.timestamp" => true
case "hoodie.datasource.read.as.of.instant" => true
case "hoodie.datasource.read.as.of.timestamp" => true
case _ => false
}
caseInsensitiveValue(
options,
Seq(
"as.of.instant",
"as.of.timestamp",
"hoodie.datasource.read.as.of.instant",
"hoodie.datasource.read.as.of.timestamp")).isDefined
}

private def normalizePath(rawPath: String): String = {
Expand All @@ -254,4 +253,32 @@ object HudiScanSupport extends Logging {
case _: Throwable => rawPath
}
}

private def caseInsensitivePropertyValue(
props: Properties,
keys: Seq[String]): Option[String] = {
keys.iterator
.map { lookupKey =>
Option(props.getProperty(lookupKey)).orElse {
props.stringPropertyNames().asScala.iterator.collectFirst {
case key if key.equalsIgnoreCase(lookupKey) => props.getProperty(key)
}
}
}
.collectFirst { case Some(value) => value }
}

private def caseInsensitiveValue(
values: Map[String, String],
keys: Seq[String]): Option[String] = {
keys.iterator
.map { lookupKey =>
values.get(lookupKey).orElse {
values.iterator.collectFirst {
case (key, value) if key.equalsIgnoreCase(lookupKey) => value
}
}
}
.collectFirst { case Some(value) => value }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,22 @@ class HudiScanSupportSuite extends SparkFunSuite with SharedSparkSession {
options))
}

test("hudi scan options are case-insensitive") {
val options = Map(
"Hoodie.DataSource.Write.Table.Type" -> "MERGE_ON_READ",
"Hoodie.Table.Base.File.Format" -> "ORC")
val timeTravelOptions = Map("Hoodie.DataSource.Read.As.Of.Instant" -> "20240101010101")
assert(
!HudiScanSupport.isSupported(
"org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat",
options))
assert(
!HudiScanSupport.isSupported(
"org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat",
timeTravelOptions))
assert(HudiScanSupport.baseFileFormatFromOptions(options).contains("ORC"))
Comment thread
weimingdiit marked this conversation as resolved.
}

test("hudi isSupported allows default COW") {
assert(
HudiScanSupport.isSupported(
Expand Down
Loading