Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/velox_backend_enhanced.yml
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ jobs:
java -version
$MVN_CMD clean test -Pspark-4.0 -Pscala-2.13 -Pjava-17 -Pbackends-velox -Piceberg \
-Pspark-ut -DargLine="-Dspark.test.home=/opt/shims/spark40/spark_home/" \
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.EnhancedFeaturesTest,org.apache.gluten.tags.SkipTest
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTest
- name: Upload test report
if: always()
uses: actions/upload-artifact@v4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ case class IcebergColumnarBatchDataWriter(
}

override def write(batch: ColumnarBatch): Unit = {
// Pass the original batch to native code
// The native code will use the schema (writeSchema) we provided during initialization
// to determine which columns to write, effectively filtering out metadata columns
// like __row_operation, _file, _pos that Spark 4.0 adds
val batchHandle = ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName, batch)
jniWrapper.write(writer, batchHandle)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,23 @@ import org.apache.spark.sql.types.StructType
import org.apache.iceberg.spark.source.IcebergWriteUtil
import org.apache.iceberg.types.TypeUtil

import scala.collection.JavaConverters._

abstract class AbstractIcebergWriteExec extends IcebergWriteExec {

// the writer factory works for both batch and streaming
private def createIcebergDataWriteFactory(schema: StructType): IcebergDataWriteFactory = {
val writeSchema = IcebergWriteUtil.getWriteSchema(write)
val nestedField = TypeUtil.visit(writeSchema, new IcebergNestedFieldVisitor)
// Filter out metadata columns from the Spark output schema and reorder to match Iceberg schema
// Spark 4.0 may include metadata columns in the output schema during UPDATE operations,
// but these should not be written to the Iceberg table
val writeFieldNames = writeSchema.columns().asScala.map(_.name()).toSet
val filteredSchema = StructType(
schema.fields.filter(field => writeFieldNames.contains(field.name))
)
IcebergDataWriteFactory(
schema,
filteredSchema,
getFileFormat(IcebergWriteUtil.getFileFormat(write)),
IcebergWriteUtil.getDirectory(write),
getCodec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,4 +410,59 @@ class VeloxIcebergSuite extends IcebergSuite {
_.values.exists(_.contains("Not support write table with sort order"))))
}
}

test("iceberg read cow table - update after schema evolution") {
withTable("iceberg_cow_update_evolved_tb") {
spark.sql("""
|create table iceberg_cow_update_evolved_tb (
| id int,
| name string,
| age int
|) using iceberg
|tblproperties (
| 'format-version' = '2',
| 'write.delete.mode' = 'copy-on-write',
| 'write.update.mode' = 'copy-on-write',
| 'write.merge.mode' = 'copy-on-write'
|)
|""".stripMargin)

spark.sql("""
|alter table iceberg_cow_update_evolved_tb
|add columns (salary decimal(10, 2))
|""".stripMargin)

spark.sql("""
|insert into table iceberg_cow_update_evolved_tb values
| (1, 'Name1', 23, 3400.00),
| (2, 'Name2', 30, 5500.00),
| (3, 'Name3', 35, 6500.00)
|""".stripMargin)

val df = spark.sql("""
|update iceberg_cow_update_evolved_tb
|set name = 'Name4'
|where id = 1
|""".stripMargin)

assert(
df.queryExecution.executedPlan
.asInstanceOf[CommandResultExec]
.commandPhysicalPlan
.isInstanceOf[VeloxIcebergReplaceDataExec])

checkAnswer(
spark.sql("""
|select id, name, age, salary
|from iceberg_cow_update_evolved_tb
|order by id
|""".stripMargin),
Seq(
Row(1, "Name4", 23, new java.math.BigDecimal("3400.00")),
Row(2, "Name2", 30, new java.math.BigDecimal("5500.00")),
Row(3, "Name1", 35, new java.math.BigDecimal("6500.00"))
)
)
}
}
}
27 changes: 24 additions & 3 deletions cpp/velox/compute/iceberg/IcebergWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,10 @@ std::shared_ptr<IcebergInsertTableHandle> createIcebergInsertTableHandle(
nestedField.children[i]));
}
}

auto fileNameGenerator = std::make_shared<const GlutenIcebergFileNameGenerator>(
partitionId, taskId, operationId, fileFormat);

std::shared_ptr<const connector::hive::LocationHandle> locationHandle =
std::make_shared<connector::hive::LocationHandle>(
outputDirectoryPath, outputDirectoryPath, connector::hive::LocationHandle::TableType::kExisting);
Expand Down Expand Up @@ -212,7 +212,28 @@ IcebergWriter::IcebergWriter(
}

void IcebergWriter::write(const VeloxColumnarBatch& batch) {
dataSink_->appendData(batch.getRowVector());
auto inputRowVector = batch.getRowVector();
auto inputRowType = asRowType(inputRowVector->type());

if (inputRowType->size() != rowType_->size()) {
std::vector<VectorPtr> dataColumns;
dataColumns.reserve(rowType_->size());

for (size_t i = 0; i < rowType_->size(); ++i) {
dataColumns.push_back(inputRowVector->childAt(i + 1));
}

auto filteredRowVector = std::make_shared<RowVector>(
pool_.get(),
rowType_,
inputRowVector->nulls(),
inputRowVector->size(),
std::move(dataColumns));

dataSink_->appendData(filteredRowVector);
} else {
dataSink_->appendData(inputRowVector);
}
}

std::vector<std::string> IcebergWriter::commit() {
Expand Down
Loading