-
Notifications
You must be signed in to change notification settings - Fork 1.3k
[spark] Support column aliases and comments for Paimon views #7625
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.InternalRow | |
| import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericInternalRow} | ||
| import org.apache.spark.sql.catalyst.util.{escapeSingleQuotedString, quoteIfNeeded, StringUtils} | ||
| import org.apache.spark.sql.connector.catalog.Identifier | ||
| import org.apache.spark.sql.types.StructType | ||
| import org.apache.spark.sql.types.{StructField, StructType} | ||
| import org.apache.spark.unsafe.types.UTF8String | ||
|
|
||
| import scala.collection.JavaConverters._ | ||
|
|
@@ -49,19 +49,27 @@ case class CreatePaimonViewExec( | |
| override def output: Seq[Attribute] = Nil | ||
|
|
||
| override protected def run(): Seq[InternalRow] = { | ||
| if (columnAliases.nonEmpty || columnComments.nonEmpty || queryColumnNames.nonEmpty) { | ||
| if (queryColumnNames.nonEmpty) { | ||
| throw new UnsupportedOperationException("queryColumnNames is not supported now") | ||
| } | ||
|
|
||
| if (columnAliases.nonEmpty && columnAliases.length != viewSchema.fields.length) { | ||
| throw new UnsupportedOperationException( | ||
| "columnAliases, columnComments and queryColumnNames are not supported now") | ||
| s"The number of column aliases (${columnAliases.length}) " + | ||
| s"must match the number of columns (${viewSchema.fields.length})") | ||
| } | ||
|
|
||
| // Apply column aliases and comments to the view schema | ||
| val finalSchema = applyColumnAliasesAndComments(viewSchema, columnAliases, columnComments) | ||
|
|
||
| // Note: for replace just drop then create ,this operation is non-atomic. | ||
| if (replace) { | ||
| catalog.dropView(ident, true) | ||
| } | ||
|
|
||
| catalog.createView( | ||
| ident, | ||
| viewSchema, | ||
| finalSchema, | ||
| queryText, | ||
| comment.orNull, | ||
| properties.asJava, | ||
|
|
@@ -70,6 +78,35 @@ case class CreatePaimonViewExec( | |
| Nil | ||
| } | ||
|
|
||
| /** | ||
| * Apply column aliases and comments to the view schema. If columnAliases is empty, the original | ||
| * column names are used. If columnComments is empty or a specific comment is None, no comment is | ||
| * added. The length of aliases (if non-empty) is validated before calling this method. | ||
| */ | ||
| private def applyColumnAliasesAndComments( | ||
| schema: StructType, | ||
| aliases: Seq[String], | ||
| comments: Seq[Option[String]]): StructType = { | ||
| if (aliases.isEmpty && comments.isEmpty) { | ||
| return schema | ||
| } | ||
|
|
||
| val fields = schema.fields.zipWithIndex.map { | ||
| case (field, index) => | ||
| val newName = if (aliases.nonEmpty) aliases(index) else field.name | ||
| val newComment = if (index < comments.length) comments(index) else None | ||
|
|
||
|
Comment on lines
+94
to
+98
|
||
| val newField = StructField(newName, field.dataType, field.nullable, field.metadata) | ||
|
|
||
| newComment match { | ||
| case Some(c) => newField.withComment(c) | ||
| case None => newField | ||
| } | ||
| } | ||
|
|
||
| StructType(fields) | ||
| } | ||
|
|
||
| override def simpleString(maxFields: Int): String = { | ||
| s"CreatePaimonViewExec: $ident" | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
queryColumnNamesis still accepted byCreatePaimonViewExecbut is now silently ignored (previously it triggered an UnsupportedOperationException). Since this rule runs in the parser and bypasses Spark's built-inCREATE VIEWvalidation/semantics, this can lead to incorrect behavior for Spark versions/paths that populatequeryColumnNames(or future compatibility work). Either implement the intended semantics forqueryColumnNamesor explicitly reject non-empty values with a clearAnalysisException/UnsupportedOperationExceptionso we don't create views with partially applied metadata.