From c5e04c4662cc927951a1ca43d7376159fb8d64c4 Mon Sep 17 00:00:00 2001 From: Yann Date: Fri, 10 Apr 2026 21:00:07 +0800 Subject: [PATCH 1/3] [spark] Support column aliases and comments for Paimon views Previously, creating a view with column aliases or comments threw UnsupportedOperationException. This change applies column aliases and comments to the view schema before persisting, enabling SQL like: CREATE VIEW v (col1 COMMENT 'desc') AS SELECT ... Co-Authored-By: Claude Opus 4.6 --- .../spark/execution/PaimonViewExec.scala | 43 +++++++- .../paimon/spark/sql/PaimonViewTestBase.scala | 104 ++++++++++++++++++ 2 files changed, 141 insertions(+), 6 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonViewExec.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonViewExec.scala index 6e807d63ead2..d67b6cf45bed 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonViewExec.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonViewExec.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericInternalRow} import org.apache.spark.sql.catalyst.util.{escapeSingleQuotedString, quoteIfNeeded, StringUtils} import org.apache.spark.sql.connector.catalog.Identifier -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.unsafe.types.UTF8String import scala.collection.JavaConverters._ @@ -49,10 +49,8 @@ case class CreatePaimonViewExec( override def output: Seq[Attribute] = Nil override protected def run(): Seq[InternalRow] = { - if (columnAliases.nonEmpty || columnComments.nonEmpty || queryColumnNames.nonEmpty) { - throw new UnsupportedOperationException( - "columnAliases, columnComments and queryColumnNames are not supported now") - } + // Apply column aliases and comments to the view schema + val finalSchema = applyColumnAliasesAndComments(viewSchema, columnAliases, columnComments) // Note: for replace just drop then create ,this operation is non-atomic. if (replace) { @@ -61,7 +59,7 @@ case class CreatePaimonViewExec( catalog.createView( ident, - viewSchema, + finalSchema, queryText, comment.orNull, properties.asJava, @@ -70,6 +68,39 @@ case class CreatePaimonViewExec( Nil } + /** + * Apply column aliases and comments to the view schema. If columnAliases is empty, the original + * column names are used. If columnComments is empty or a specific comment is None, no comment is + * added. + */ + private def applyColumnAliasesAndComments( + schema: StructType, + aliases: Seq[String], + comments: Seq[Option[String]]): StructType = { + if (aliases.isEmpty && comments.isEmpty) { + return schema + } + + val fields = schema.fields.zipWithIndex.map { + case (field, index) => + val newName = if (index < aliases.length) aliases(index) else field.name + val newComment = if (index < comments.length) comments(index) else None + + // Create a new field with the new name + var newField = StructField(newName, field.dataType, field.nullable, field.metadata) + + // Apply comment if present + newComment match { + case Some(comment) => newField = newField.withComment(comment) + case None => // Keep existing comment if any + } + + newField + } + + StructType(fields) + } + override def simpleString(maxFields: Int): String = { s"CreatePaimonViewExec: $ident" } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTestBase.scala index a0c29bfcbca6..2780332ce95e 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTestBase.scala @@ -290,4 +290,108 @@ abstract class PaimonViewTestBase extends PaimonHiveTestBase { } } } + + test("Paimon View: create view with column comments") { + Seq(sparkCatalogName, paimonHiveCatalogName).foreach { + catalogName => + sql(s"USE $catalogName") + withDatabase("test_db") { + sql("CREATE DATABASE test_db") + sql("USE test_db") + withTable("t") { + withView("v1") { + sql("CREATE TABLE t (id INT, name STRING) USING paimon") + sql("INSERT INTO t VALUES (1, 'alice'), (2, 'bob')") + + // Create view with column comments + sql(""" + |CREATE VIEW v1 ( + | id COMMENT 'the user id', + | name COMMENT 'the user name' + |) AS SELECT * FROM t + |""".stripMargin) + + // Verify view works + checkAnswer(sql("SELECT * FROM v1"), Seq(Row(1, "alice"), Row(2, "bob"))) + + // Verify column comments via DESCRIBE + val descRows = sql("DESC TABLE v1").collectAsList() + assert(descRows.get(0).get(0).equals("id")) + assert(descRows.get(0).get(2).equals("the user id")) + assert(descRows.get(1).get(0).equals("name")) + assert(descRows.get(1).get(2).equals("the user name")) + + // Verify column comments via SHOW CREATE TABLE + val showCreateRows = sql("SHOW CREATE TABLE v1").collectAsList() + val showCreateStr = showCreateRows.get(0).get(0).toString + assert(showCreateStr.contains("id COMMENT 'the user id'")) + assert(showCreateStr.contains("name COMMENT 'the user name'")) + } + } + } + } + } + + test("Paimon View: create view with column aliases") { + Seq(sparkCatalogName, paimonHiveCatalogName).foreach { + catalogName => + sql(s"USE $catalogName") + withDatabase("test_db") { + sql("CREATE DATABASE test_db") + sql("USE test_db") + withTable("t") { + withView("v1") { + sql("CREATE TABLE t (id INT, name STRING) USING paimon") + sql("INSERT INTO t VALUES (1, 'alice'), (2, 'bob')") + + // Create view with column aliases (without comments) + sql("CREATE VIEW v1 (user_id, user_name) AS SELECT * FROM t") + + // Verify view works + checkAnswer(sql("SELECT * FROM v1"), Seq(Row(1, "alice"), Row(2, "bob"))) + + // Verify column names via DESCRIBE + val descRows = sql("DESC TABLE v1").collectAsList() + assert(descRows.get(0).get(0).equals("user_id")) + assert(descRows.get(1).get(0).equals("user_name")) + } + } + } + } + } + + test("Paimon View: create view with column aliases and comments") { + Seq(sparkCatalogName, paimonHiveCatalogName).foreach { + catalogName => + sql(s"USE $catalogName") + withDatabase("test_db") { + sql("CREATE DATABASE test_db") + sql("USE test_db") + withTable("t") { + withView("v1") { + sql("CREATE TABLE t (id INT, name STRING) USING paimon") + sql("INSERT INTO t VALUES (1, 'alice'), (2, 'bob')") + + // Create view with column aliases and comments + sql(""" + |CREATE VIEW v1 ( + | user_id COMMENT 'the user id', + | user_name COMMENT 'the user name' + |) AS SELECT * FROM t + |""".stripMargin) + + // Verify view works + checkAnswer(sql("SELECT * FROM v1"), Seq(Row(1, "alice"), Row(2, "bob"))) + + // Verify column names and comments via DESCRIBE + val descRows = sql("DESC TABLE v1").collectAsList() + assert(descRows.get(0).get(0).equals("user_id")) + assert(descRows.get(0).get(2).equals("the user id")) + assert(descRows.get(1).get(0).equals("user_name")) + assert(descRows.get(1).get(2).equals("the user name")) + } + } + } + } + } } From 7d1087658f3645823d1110ab969654bddcd850fb Mon Sep 17 00:00:00 2001 From: Yann Date: Fri, 10 Apr 2026 22:13:28 +0800 Subject: [PATCH 2/3] [spark] Add validation for queryColumnNames and column aliases length; fix CI - Reject non-empty queryColumnNames with UnsupportedOperationException - Validate that column aliases length matches schema fields count - Fix SHOW CREATE TABLE test assertion to be backtick-agnostic across Spark versions (Spark 3.2 quotes column names with backticks) Co-Authored-By: Claude Opus 4.6 --- .../spark/execution/PaimonViewExec.scala | 25 ++++++++++++------- .../paimon/spark/sql/PaimonViewTestBase.scala | 4 +-- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonViewExec.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonViewExec.scala index d67b6cf45bed..86c053f07c44 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonViewExec.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonViewExec.scala @@ -22,6 +22,7 @@ import org.apache.paimon.spark.catalog.SupportView import org.apache.paimon.spark.leafnode.PaimonLeafV2CommandExec import org.apache.paimon.view.View +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericInternalRow} import org.apache.spark.sql.catalyst.util.{escapeSingleQuotedString, quoteIfNeeded, StringUtils} @@ -49,6 +50,16 @@ case class CreatePaimonViewExec( override def output: Seq[Attribute] = Nil override protected def run(): Seq[InternalRow] = { + if (queryColumnNames.nonEmpty) { + throw new UnsupportedOperationException("queryColumnNames is not supported now") + } + + if (columnAliases.nonEmpty && columnAliases.length != viewSchema.fields.length) { + throw new AnalysisException( + s"The number of column aliases (${columnAliases.length}) " + + s"must match the number of columns (${viewSchema.fields.length})") + } + // Apply column aliases and comments to the view schema val finalSchema = applyColumnAliasesAndComments(viewSchema, columnAliases, columnComments) @@ -71,7 +82,7 @@ case class CreatePaimonViewExec( /** * Apply column aliases and comments to the view schema. If columnAliases is empty, the original * column names are used. If columnComments is empty or a specific comment is None, no comment is - * added. + * added. The length of aliases (if non-empty) is validated before calling this method. */ private def applyColumnAliasesAndComments( schema: StructType, @@ -83,19 +94,15 @@ case class CreatePaimonViewExec( val fields = schema.fields.zipWithIndex.map { case (field, index) => - val newName = if (index < aliases.length) aliases(index) else field.name + val newName = if (aliases.nonEmpty) aliases(index) else field.name val newComment = if (index < comments.length) comments(index) else None - // Create a new field with the new name - var newField = StructField(newName, field.dataType, field.nullable, field.metadata) + val newField = StructField(newName, field.dataType, field.nullable, field.metadata) - // Apply comment if present newComment match { - case Some(comment) => newField = newField.withComment(comment) - case None => // Keep existing comment if any + case Some(c) => newField.withComment(c) + case None => newField } - - newField } StructType(fields) diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTestBase.scala index 2780332ce95e..f4e48f318f47 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTestBase.scala @@ -324,8 +324,8 @@ abstract class PaimonViewTestBase extends PaimonHiveTestBase { // Verify column comments via SHOW CREATE TABLE val showCreateRows = sql("SHOW CREATE TABLE v1").collectAsList() val showCreateStr = showCreateRows.get(0).get(0).toString - assert(showCreateStr.contains("id COMMENT 'the user id'")) - assert(showCreateStr.contains("name COMMENT 'the user name'")) + assert(showCreateStr.contains("COMMENT 'the user id'")) + assert(showCreateStr.contains("COMMENT 'the user name'")) } } } From 656f4e619654dfda8d6c0005d7ef280acf51e585 Mon Sep 17 00:00:00 2001 From: Yann Date: Fri, 10 Apr 2026 22:42:41 +0800 Subject: [PATCH 3/3] [spark] Fix AnalysisException compatibility across Spark versions Use UnsupportedOperationException instead of AnalysisException for column aliases length validation, as AnalysisException constructor signature varies across Spark versions (Spark 4.0 requires errorClass). Co-Authored-By: Claude Opus 4.6 --- .../org/apache/paimon/spark/execution/PaimonViewExec.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonViewExec.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonViewExec.scala index 86c053f07c44..e7ddaf463d8b 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonViewExec.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonViewExec.scala @@ -22,7 +22,6 @@ import org.apache.paimon.spark.catalog.SupportView import org.apache.paimon.spark.leafnode.PaimonLeafV2CommandExec import org.apache.paimon.view.View -import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericInternalRow} import org.apache.spark.sql.catalyst.util.{escapeSingleQuotedString, quoteIfNeeded, StringUtils} @@ -55,7 +54,7 @@ case class CreatePaimonViewExec( } if (columnAliases.nonEmpty && columnAliases.length != viewSchema.fields.length) { - throw new AnalysisException( + throw new UnsupportedOperationException( s"The number of column aliases (${columnAliases.length}) " + s"must match the number of columns (${viewSchema.fields.length})") }