diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonViewExec.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonViewExec.scala index 6e807d63ead2..e7ddaf463d8b 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonViewExec.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonViewExec.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericInternalRow} import org.apache.spark.sql.catalyst.util.{escapeSingleQuotedString, quoteIfNeeded, StringUtils} import org.apache.spark.sql.connector.catalog.Identifier -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.unsafe.types.UTF8String import scala.collection.JavaConverters._ @@ -49,11 +49,19 @@ case class CreatePaimonViewExec( override def output: Seq[Attribute] = Nil override protected def run(): Seq[InternalRow] = { - if (columnAliases.nonEmpty || columnComments.nonEmpty || queryColumnNames.nonEmpty) { + if (queryColumnNames.nonEmpty) { + throw new UnsupportedOperationException("queryColumnNames is not supported now") + } + + if (columnAliases.nonEmpty && columnAliases.length != viewSchema.fields.length) { throw new UnsupportedOperationException( - "columnAliases, columnComments and queryColumnNames are not supported now") + s"The number of column aliases (${columnAliases.length}) " + + s"must match the number of columns (${viewSchema.fields.length})") } + // Apply column aliases and comments to the view schema + val finalSchema = applyColumnAliasesAndComments(viewSchema, columnAliases, columnComments) + // Note: for replace just drop then create ,this operation is non-atomic. if (replace) { catalog.dropView(ident, true) @@ -61,7 +69,7 @@ case class CreatePaimonViewExec( catalog.createView( ident, - viewSchema, + finalSchema, queryText, comment.orNull, properties.asJava, @@ -70,6 +78,35 @@ case class CreatePaimonViewExec( Nil } + /** + * Apply column aliases and comments to the view schema. If columnAliases is empty, the original + * column names are used. If columnComments is empty or a specific comment is None, no comment is + * added. The length of aliases (if non-empty) is validated before calling this method. + */ + private def applyColumnAliasesAndComments( + schema: StructType, + aliases: Seq[String], + comments: Seq[Option[String]]): StructType = { + if (aliases.isEmpty && comments.isEmpty) { + return schema + } + + val fields = schema.fields.zipWithIndex.map { + case (field, index) => + val newName = if (aliases.nonEmpty) aliases(index) else field.name + val newComment = if (index < comments.length) comments(index) else None + + val newField = StructField(newName, field.dataType, field.nullable, field.metadata) + + newComment match { + case Some(c) => newField.withComment(c) + case None => newField + } + } + + StructType(fields) + } + override def simpleString(maxFields: Int): String = { s"CreatePaimonViewExec: $ident" } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTestBase.scala index a0c29bfcbca6..f4e48f318f47 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTestBase.scala @@ -290,4 +290,108 @@ abstract class PaimonViewTestBase extends PaimonHiveTestBase { } } } + + test("Paimon View: create view with column comments") { + Seq(sparkCatalogName, paimonHiveCatalogName).foreach { + catalogName => + sql(s"USE $catalogName") + withDatabase("test_db") { + sql("CREATE DATABASE test_db") + sql("USE test_db") + withTable("t") { + withView("v1") { + sql("CREATE TABLE t (id INT, name STRING) USING paimon") + sql("INSERT INTO t VALUES (1, 'alice'), (2, 'bob')") + + // Create view with column comments + sql(""" + |CREATE VIEW v1 ( + | id COMMENT 'the user id', + | name COMMENT 'the user name' + |) AS SELECT * FROM t + |""".stripMargin) + + // Verify view works + checkAnswer(sql("SELECT * FROM v1"), Seq(Row(1, "alice"), Row(2, "bob"))) + + // Verify column comments via DESCRIBE + val descRows = sql("DESC TABLE v1").collectAsList() + assert(descRows.get(0).get(0).equals("id")) + assert(descRows.get(0).get(2).equals("the user id")) + assert(descRows.get(1).get(0).equals("name")) + assert(descRows.get(1).get(2).equals("the user name")) + + // Verify column comments via SHOW CREATE TABLE + val showCreateRows = sql("SHOW CREATE TABLE v1").collectAsList() + val showCreateStr = showCreateRows.get(0).get(0).toString + assert(showCreateStr.contains("COMMENT 'the user id'")) + assert(showCreateStr.contains("COMMENT 'the user name'")) + } + } + } + } + } + + test("Paimon View: create view with column aliases") { + Seq(sparkCatalogName, paimonHiveCatalogName).foreach { + catalogName => + sql(s"USE $catalogName") + withDatabase("test_db") { + sql("CREATE DATABASE test_db") + sql("USE test_db") + withTable("t") { + withView("v1") { + sql("CREATE TABLE t (id INT, name STRING) USING paimon") + sql("INSERT INTO t VALUES (1, 'alice'), (2, 'bob')") + + // Create view with column aliases (without comments) + sql("CREATE VIEW v1 (user_id, user_name) AS SELECT * FROM t") + + // Verify view works + checkAnswer(sql("SELECT * FROM v1"), Seq(Row(1, "alice"), Row(2, "bob"))) + + // Verify column names via DESCRIBE + val descRows = sql("DESC TABLE v1").collectAsList() + assert(descRows.get(0).get(0).equals("user_id")) + assert(descRows.get(1).get(0).equals("user_name")) + } + } + } + } + } + + test("Paimon View: create view with column aliases and comments") { + Seq(sparkCatalogName, paimonHiveCatalogName).foreach { + catalogName => + sql(s"USE $catalogName") + withDatabase("test_db") { + sql("CREATE DATABASE test_db") + sql("USE test_db") + withTable("t") { + withView("v1") { + sql("CREATE TABLE t (id INT, name STRING) USING paimon") + sql("INSERT INTO t VALUES (1, 'alice'), (2, 'bob')") + + // Create view with column aliases and comments + sql(""" + |CREATE VIEW v1 ( + | user_id COMMENT 'the user id', + | user_name COMMENT 'the user name' + |) AS SELECT * FROM t + |""".stripMargin) + + // Verify view works + checkAnswer(sql("SELECT * FROM v1"), Seq(Row(1, "alice"), Row(2, "bob"))) + + // Verify column names and comments via DESCRIBE + val descRows = sql("DESC TABLE v1").collectAsList() + assert(descRows.get(0).get(0).equals("user_id")) + assert(descRows.get(0).get(2).equals("the user id")) + assert(descRows.get(1).get(0).equals("user_name")) + assert(descRows.get(1).get(2).equals("the user name")) + } + } + } + } + } }