Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ class HudiConvertProvider extends AuronConvertProvider with Logging {
exec match {
case scan: FileSourceScanExec =>
// Only handle Hudi-backed file scans; other scans fall through.
HudiScanSupport.isSupported(scan)
HudiScanSupport.supportedFileFormat(scan).nonEmpty
case _ => false
}
}

override def convert(exec: SparkPlan): SparkPlan = {
exec match {
case scan: FileSourceScanExec if HudiScanSupport.isSupported(scan) =>
HudiScanSupport.fileFormat(scan) match {
case scan: FileSourceScanExec =>
HudiScanSupport.supportedFileFormat(scan) match {
case Some(HudiScanSupport.ParquetFormat) =>
if (!SparkAuronConfiguration.ENABLE_SCAN_PARQUET.get()) {
return exec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package org.apache.spark.sql.auron.hudi
import java.net.URI
import java.util.{Locale, Properties}

import scala.collection.JavaConverters._
import scala.util.Using

import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
Expand Down Expand Up @@ -46,14 +49,23 @@ object HudiScanSupport extends Logging {
"hoodie.datasource.write.storage.type")

def fileFormat(scan: FileSourceScanExec): Option[HudiFileFormat] = {
val catalog = catalogTable(scan.relation)
lazy val tableProperties = hudiTablePropertiesFromMeta(scan.relation.options)
fileFormat(scan, catalog, tableProperties)
}

private def fileFormat(
scan: FileSourceScanExec,
catalogTable: Option[CatalogTable],
tableProperties: => Option[Properties]): Option[HudiFileFormat] = {
val fileFormatName = scan.relation.fileFormat.getClass.getName
val fromClass = fileFormat(fileFormatName)
if (fromClass.nonEmpty) {
return fromClass
}
// Spark may report generic Orc/Parquet formats for Hudi; use metadata fallback
// only when the underlying file index indicates a Hudi table.
fileFormatFromMeta(scan, catalogTable(scan.relation), fileFormatName)
fileFormatFromMeta(scan, catalogTable, tableProperties, fileFormatName)
}

private[hudi] def fileFormat(fileFormatName: String): Option[HudiFileFormat] = {
Expand All @@ -72,16 +84,28 @@ object HudiScanSupport extends Logging {
}

def isSupported(scan: FileSourceScanExec): Boolean =
isSupported(fileFormat(scan), scan.relation.options, catalogTable(scan.relation))
supportedFileFormat(scan).nonEmpty

def supportedFileFormat(scan: FileSourceScanExec): Option[HudiFileFormat] = {
val catalog = catalogTable(scan.relation)
lazy val tableProperties = hudiTablePropertiesFromMeta(scan.relation.options)
val resolvedFileFormat = fileFormat(scan, catalog, tableProperties)
if (isSupported(resolvedFileFormat, scan.relation.options, catalog, tableProperties)) {
resolvedFileFormat
} else {
None
}
}

private[hudi] def isSupported(fileFormatName: String, options: Map[String, String]): Boolean = {
isSupported(fileFormat(fileFormatName), options, None)
isSupported(fileFormat(fileFormatName), options, None, None)
}

private[hudi] def isSupported(
fileFormat: Option[HudiFileFormat],
options: Map[String, String],
catalogTable: Option[CatalogTable]): Boolean = {
catalogTable: Option[CatalogTable],
tableProperties: => Option[Properties]): Boolean = {
if (fileFormat.isEmpty) {
return false
}
Expand All @@ -91,7 +115,7 @@ object HudiScanSupport extends Logging {

val tableType = tableTypeFromOptions(options)
.orElse(tableTypeFromCatalog(catalogTable))
.orElse(tableTypeFromMeta(options))
.orElse(tableTypeFromMeta(tableProperties))
.map(_.toLowerCase(Locale.ROOT))

logDebug(s"Hudi tableType resolved to: ${tableType.getOrElse("unknown")}")
Expand All @@ -101,51 +125,23 @@ object HudiScanSupport extends Logging {
}

private def tableTypeFromOptions(options: Map[String, String]): Option[String] = {
hudiTableTypeKeys
.flatMap(key => options.get(key))
.headOption
caseInsensitiveValue(options, hudiTableTypeKeys)
}

private def baseFileFormatFromOptions(options: Map[String, String]): Option[String] = {
hudiBaseFileFormatKeys
.flatMap(key => options.get(key))
.headOption
private[hudi] def baseFileFormatFromOptions(options: Map[String, String]): Option[String] = {
caseInsensitiveValue(options, hudiBaseFileFormatKeys)
}

private def tableTypeFromMeta(options: Map[String, String]): Option[String] = {
val basePath = options.get("path").map(normalizePath)
basePath.flatMap { path =>
try {
val hadoopConf = SparkSession.active.sessionState.newHadoopConf()
val base = new Path(path)
val fs = base.getFileSystem(hadoopConf)
val propsPath = new Path(base, ".hoodie/hoodie.properties")
if (!fs.exists(propsPath)) {
if (log.isDebugEnabled()) {
logDebug(s"Hudi table properties not found at: $propsPath")
}
return None
}
val in = fs.open(propsPath)
try {
val props = new Properties()
props.load(in)
Option(props.getProperty("hoodie.table.type"))
} finally {
in.close()
}
} catch {
case t: Throwable =>
if (log.isDebugEnabled()) {
logDebug(s"Failed to load hudi table type from $path", t)
}
None
}
}
}
private def tableTypeFromMeta(tableProperties: Option[Properties]): Option[String] =
tableProperties.flatMap(props =>
caseInsensitivePropertyValue(props, Seq("hoodie.table.type")))

private def baseFileFormatFromMeta(tableProperties: Option[Properties]): Option[String] =
tableProperties.flatMap(props =>
caseInsensitivePropertyValue(props, Seq("hoodie.table.base.file.format")))

private def baseFileFormatFromMeta(options: Map[String, String]): Option[String] = {
val basePath = options.get("path").map(normalizePath)
private def hudiTablePropertiesFromMeta(options: Map[String, String]): Option[Properties] = {
val basePath = caseInsensitiveValue(options, Seq("path")).map(normalizePath)
basePath.flatMap { path =>
try {
val hadoopConf = SparkSession.active.sessionState.newHadoopConf()
Expand All @@ -156,20 +152,18 @@ object HudiScanSupport extends Logging {
if (log.isDebugEnabled()) {
logDebug(s"Hudi table properties not found at: $propsPath")
}
return None
}
val in = fs.open(propsPath)
try {
None
} else {
val props = new Properties()
props.load(in)
Option(props.getProperty("hoodie.table.base.file.format"))
} finally {
in.close()
Using.resource(fs.open(propsPath)) { in =>
props.load(in)
}
Some(props)
}
} catch {
case t: Throwable =>
if (log.isDebugEnabled()) {
logDebug(s"Failed to load hudi base file format from $path", t)
logDebug(s"Failed to load Hudi table properties from $path", t)
}
None
}
Expand All @@ -179,21 +173,22 @@ object HudiScanSupport extends Logging {
private def baseFileFormatFromCatalog(catalogTable: Option[CatalogTable]): Option[String] = {
catalogTable.flatMap { table =>
val props = table.properties ++ table.storage.properties
hudiBaseFileFormatKeys.flatMap(props.get).headOption
caseInsensitiveValue(props, hudiBaseFileFormatKeys)
}
}

private def fileFormatFromMeta(
scan: FileSourceScanExec,
catalogTable: Option[CatalogTable],
tableProperties: => Option[Properties],
fileFormatName: String): Option[HudiFileFormat] = {
// Avoid treating non-Hudi tables as Hudi when Spark reports generic formats.
if (!isHudiFileIndex(scan.relation.location)) {
return None
}
val baseFormat = baseFileFormatFromOptions(scan.relation.options)
.orElse(baseFileFormatFromCatalog(catalogTable))
.orElse(baseFileFormatFromMeta(scan.relation.options))
.orElse(baseFileFormatFromMeta(tableProperties))
.map(_.toLowerCase(Locale.ROOT))
baseFormat.flatMap {
case "orc" if fileFormatName.contains("OrcFileFormat") => Some(OrcFormat)
Expand All @@ -205,7 +200,7 @@ object HudiScanSupport extends Logging {
private def tableTypeFromCatalog(catalogTable: Option[CatalogTable]): Option[String] = {
catalogTable.flatMap { table =>
val props = table.properties ++ table.storage.properties
hudiTableTypeKeys.flatMap(props.get).headOption
caseInsensitiveValue(props, hudiTableTypeKeys)
}
}

Expand Down Expand Up @@ -236,14 +231,13 @@ object HudiScanSupport extends Logging {
}

private def hasTimeTravel(options: Map[String, String]): Boolean = {
val keys = options.keys.map(_.toLowerCase(Locale.ROOT))
keys.exists {
case "as.of.instant" => true
case "as.of.timestamp" => true
case "hoodie.datasource.read.as.of.instant" => true
case "hoodie.datasource.read.as.of.timestamp" => true
case _ => false
}
caseInsensitiveValue(
options,
Seq(
"as.of.instant",
"as.of.timestamp",
"hoodie.datasource.read.as.of.instant",
"hoodie.datasource.read.as.of.timestamp")).isDefined
}

private def normalizePath(rawPath: String): String = {
Expand All @@ -254,4 +248,32 @@ object HudiScanSupport extends Logging {
case _: Throwable => rawPath
}
}

private def caseInsensitivePropertyValue(
props: Properties,
keys: Seq[String]): Option[String] = {
keys.iterator
.map { lookupKey =>
Option(props.getProperty(lookupKey)).orElse {
props.stringPropertyNames().asScala.iterator.collectFirst {
case key if key.equalsIgnoreCase(lookupKey) => props.getProperty(key)
}
}
}
.collectFirst { case Some(value) => value }
}

private def caseInsensitiveValue(
values: Map[String, String],
keys: Seq[String]): Option[String] = {
keys.iterator
.map { lookupKey =>
values.get(lookupKey).orElse {
values.iterator.collectFirst {
case (key, value) if key.equalsIgnoreCase(lookupKey) => value
}
}
}
.collectFirst { case Some(value) => value }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,22 @@ class HudiScanSupportSuite extends SparkFunSuite with SharedSparkSession {
options))
}

test("hudi scan options are case-insensitive") {
val options = Map(
"Hoodie.DataSource.Write.Table.Type" -> "MERGE_ON_READ",
"Hoodie.Table.Base.File.Format" -> "ORC")
val timeTravelOptions = Map("Hoodie.DataSource.Read.As.Of.Instant" -> "20240101010101")
assert(
!HudiScanSupport.isSupported(
"org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat",
options))
assert(
!HudiScanSupport.isSupported(
"org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat",
timeTravelOptions))
assert(HudiScanSupport.baseFileFormatFromOptions(options).contains("ORC"))
}

test("hudi isSupported allows default COW") {
assert(
HudiScanSupport.isSupported(
Expand Down
Loading