Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.auron.hudi
import java.net.URI
import java.util.{Locale, Properties}

import scala.collection.JavaConverters._

import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
Expand All @@ -36,10 +38,14 @@ object HudiScanSupport extends Logging {
private val hudiOrcFileFormatSuffix = "HoodieOrcFileFormat"
private val newHudiOrcFileFormatSuffix = "NewHoodieOrcFileFormat"
private val morTableTypes = Set("merge_on_read", "mor")
private val readOptimizedQueryTypes = Set("read_optimized")
private val unsupportedQueryTypes = Set("snapshot", "incremental", "realtime")
private val hudiTableTypeKeys = Seq(
"hoodie.datasource.write.table.type",
"hoodie.datasource.read.table.type",
"hoodie.table.type")
private val hudiQueryTypeKeys =
Seq("hoodie.datasource.query.type", "hoodie.datasource.view.type")
private val hudiBaseFileFormatKeys = Seq(
"hoodie.table.base.file.format",
"hoodie.datasource.write.base.file.format",
Expand Down Expand Up @@ -93,27 +99,35 @@ object HudiScanSupport extends Logging {
.orElse(tableTypeFromCatalog(catalogTable))
.orElse(tableTypeFromMeta(options))
.map(_.toLowerCase(Locale.ROOT))
val queryType = queryTypeFromOptions(options).map(_.toLowerCase(Locale.ROOT))

logDebug(s"Hudi tableType resolved to: ${tableType.getOrElse("unknown")}")

// Only support basic COW tables for the base version.
!tableType.exists(morTableTypes.contains)
queryType match {
case Some(query) if readOptimizedQueryTypes.contains(query) => true
case Some(query) if unsupportedQueryTypes.contains(query) => false
case Some(_) => false
case None =>
// MOR snapshot reads may need log-file merging. Native scan is safe only
// when the query explicitly requests read-optimized base-file reads.
!tableType.exists(morTableTypes.contains)
}
}

private def tableTypeFromOptions(options: Map[String, String]): Option[String] = {
hudiTableTypeKeys
.flatMap(key => options.get(key))
.headOption
caseInsensitiveValue(options, hudiTableTypeKeys)
}

private def queryTypeFromOptions(options: Map[String, String]): Option[String] = {
caseInsensitiveValue(options, hudiQueryTypeKeys)
}

private def baseFileFormatFromOptions(options: Map[String, String]): Option[String] = {
hudiBaseFileFormatKeys
.flatMap(key => options.get(key))
.headOption
private[hudi] def baseFileFormatFromOptions(options: Map[String, String]): Option[String] = {
caseInsensitiveValue(options, hudiBaseFileFormatKeys)
}

private def tableTypeFromMeta(options: Map[String, String]): Option[String] = {
val basePath = options.get("path").map(normalizePath)
val basePath = caseInsensitiveValue(options, Seq("path")).map(normalizePath)
basePath.flatMap { path =>
try {
val hadoopConf = SparkSession.active.sessionState.newHadoopConf()
Expand All @@ -124,15 +138,16 @@ object HudiScanSupport extends Logging {
if (log.isDebugEnabled()) {
logDebug(s"Hudi table properties not found at: $propsPath")
}
return None
}
val in = fs.open(propsPath)
try {
val props = new Properties()
props.load(in)
Option(props.getProperty("hoodie.table.type"))
} finally {
in.close()
None
} else {
val in = fs.open(propsPath)
try {
val props = new Properties()
props.load(in)
caseInsensitivePropertyValue(props, Seq("hoodie.table.type"))
} finally {
in.close()
}
}
} catch {
case t: Throwable =>
Expand All @@ -145,7 +160,7 @@ object HudiScanSupport extends Logging {
}

private def baseFileFormatFromMeta(options: Map[String, String]): Option[String] = {
val basePath = options.get("path").map(normalizePath)
val basePath = caseInsensitiveValue(options, Seq("path")).map(normalizePath)
basePath.flatMap { path =>
try {
val hadoopConf = SparkSession.active.sessionState.newHadoopConf()
Expand All @@ -156,15 +171,16 @@ object HudiScanSupport extends Logging {
if (log.isDebugEnabled()) {
logDebug(s"Hudi table properties not found at: $propsPath")
}
return None
}
val in = fs.open(propsPath)
try {
val props = new Properties()
props.load(in)
Option(props.getProperty("hoodie.table.base.file.format"))
} finally {
in.close()
None
} else {
val in = fs.open(propsPath)
try {
val props = new Properties()
props.load(in)
caseInsensitivePropertyValue(props, Seq("hoodie.table.base.file.format"))
} finally {
in.close()
}
}
} catch {
case t: Throwable =>
Expand All @@ -179,7 +195,7 @@ object HudiScanSupport extends Logging {
private def baseFileFormatFromCatalog(catalogTable: Option[CatalogTable]): Option[String] = {
catalogTable.flatMap { table =>
val props = table.properties ++ table.storage.properties
hudiBaseFileFormatKeys.flatMap(props.get).headOption
caseInsensitiveValue(props, hudiBaseFileFormatKeys)
}
}

Expand All @@ -205,7 +221,7 @@ object HudiScanSupport extends Logging {
private def tableTypeFromCatalog(catalogTable: Option[CatalogTable]): Option[String] = {
catalogTable.flatMap { table =>
val props = table.properties ++ table.storage.properties
hudiTableTypeKeys.flatMap(props.get).headOption
caseInsensitiveValue(props, hudiTableTypeKeys)
}
}

Expand Down Expand Up @@ -236,14 +252,13 @@ object HudiScanSupport extends Logging {
}

private def hasTimeTravel(options: Map[String, String]): Boolean = {
val keys = options.keys.map(_.toLowerCase(Locale.ROOT))
keys.exists {
case "as.of.instant" => true
case "as.of.timestamp" => true
case "hoodie.datasource.read.as.of.instant" => true
case "hoodie.datasource.read.as.of.timestamp" => true
case _ => false
}
caseInsensitiveValue(
options,
Seq(
"as.of.instant",
"as.of.timestamp",
"hoodie.datasource.read.as.of.instant",
"hoodie.datasource.read.as.of.timestamp")).isDefined
}

private def normalizePath(rawPath: String): String = {
Expand All @@ -254,4 +269,32 @@ object HudiScanSupport extends Logging {
case _: Throwable => rawPath
}
}

private def caseInsensitivePropertyValue(
props: Properties,
keys: Seq[String]): Option[String] = {
keys.iterator
.map { lookupKey =>
Option(props.getProperty(lookupKey)).orElse {
props.stringPropertyNames().asScala.iterator.collectFirst {
case key if key.equalsIgnoreCase(lookupKey) => props.getProperty(key)
}
}
}
.collectFirst { case Some(value) => value }
}

private def caseInsensitiveValue(
values: Map[String, String],
keys: Seq[String]): Option[String] = {
keys.iterator
.map { lookupKey =>
values.get(lookupKey).orElse {
values.iterator.collectFirst {
case (key, value) if key.equalsIgnoreCase(lookupKey) => value
}
}
}
.collectFirst { case Some(value) => value }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ class HudiScanSupportSuite extends SparkFunSuite with SharedSparkSession {
Option(props.getProperty("hoodie.table.base.file.format"))
}

private def hudiTablePath(tableName: String): String = {
new File(warehouseDir, tableName).getAbsolutePath
}

private def assumeSparkAtLeast(version: String): Unit = {
val current = SparkVersionUtil.SPARK_RUNTIME_VERSION
assume(current >= version, s"Requires Spark >= $version, current Spark $current")
Expand Down Expand Up @@ -218,12 +222,45 @@ class HudiScanSupportSuite extends SparkFunSuite with SharedSparkSession {
.isEmpty)
}

test("hudi isSupported rejects MOR table types") {
test("hudi isSupported rejects MOR table types unless read optimized") {
val options = Map("hoodie.datasource.write.table.type" -> "MERGE_ON_READ")
assert(
!HudiScanSupport.isSupported(
"org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat",
options))
assert(
HudiScanSupport.isSupported(
"org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat",
options + ("hoodie.datasource.query.type" -> "read_optimized")))
assert(
!HudiScanSupport.isSupported(
"org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat",
options + ("hoodie.datasource.query.type" -> "snapshot")))
assert(
!HudiScanSupport.isSupported(
"org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat",
options + ("hoodie.datasource.query.type" -> "incremental")))
}

test("hudi scan options are case-insensitive") {
val options = Map(
"Hoodie.DataSource.Write.Table.Type" -> "MERGE_ON_READ",
"Hoodie.Table.Base.File.Format" -> "ORC")
val readOptimizedOptions = options + ("Hoodie.DataSource.Query.Type" -> "READ_OPTIMIZED")
val timeTravelOptions = Map("Hoodie.DataSource.Read.As.Of.Instant" -> "20240101010101")
assert(
!HudiScanSupport.isSupported(
"org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat",
options))
assert(
HudiScanSupport.isSupported(
"org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat",
readOptimizedOptions))
assert(
!HudiScanSupport.isSupported(
"org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat",
timeTravelOptions))
assert(HudiScanSupport.baseFileFormatFromOptions(options).contains("ORC"))
}

test("hudi isSupported allows default COW") {
Expand Down Expand Up @@ -334,4 +371,34 @@ class HudiScanSupportSuite extends SparkFunSuite with SharedSparkSession {
assertHasNativeParquetScan(converted)
}
}

test("hudi: MOR read-optimized table scan converts to native") {
withTable("hudi_mor_read_optimized") {
spark.sql("""create table hudi_mor_read_optimized (id int, name string)
|using hudi
|tblproperties (
| 'hoodie.datasource.write.table.type' = 'MERGE_ON_READ'
|)""".stripMargin)
spark.sql("insert into hudi_mor_read_optimized values (1, 'v1'), (2, 'v2')")

val df = spark.read
.format("hudi")
.option("hoodie.datasource.query.type", "read_optimized")
.load(hudiTablePath("hudi_mor_read_optimized"))
.select("id", "name")
.orderBy("id")

logFileFormats(df)
val rows = df.collect().toSeq
assert(rows == Seq(Row(1, "v1"), Row(2, "v2")))
val scan = df.queryExecution.sparkPlan.collectFirst {
case s: org.apache.spark.sql.execution.FileSourceScanExec => s
}
assert(scan.isDefined)
val provider = new HudiConvertProvider
assert(provider.isSupported(scan.get))
val converted = provider.convert(scan.get)
assertHasNativeParquetScan(converted)
}
}
}
Loading