Skip to content

feat(write): add write pipeline with DataFusion INSERT INTO/OVERWRITE support#234

Merged
JingsongLi merged 4 commits intoapache:mainfrom
JingsongLi:writer
Apr 12, 2026
Merged

feat(write): add write pipeline with DataFusion INSERT INTO/OVERWRITE support#234
JingsongLi merged 4 commits intoapache:mainfrom
JingsongLi:writer

Conversation

@JingsongLi
Copy link
Copy Markdown
Contributor

Purpose

Subtask of #232

Add TableWrite for writing Arrow RecordBatches to Paimon append-only tables. Each (partition, bucket) pair gets its own DataFileWriter with direct writes (matching delta-rs DeltaWriter pattern). File rolling uses tokio::spawn for background close, and prepare_commit uses try_join_all for parallel finalization across partition writers.

Key components:

  • TableWrite: routes batches by partition/bucket, holds DataFileWriters
  • DataFileWriter: manages parquet file lifecycle with rolling support
  • WriteBuilder: creates TableWrite and TableCommit instances
  • PaimonDataSink: DataFusion DataSink integration for INSERT/OVERWRITE
  • FormatFileWriter: extended with flush() and in_progress_size()

Configurable options via CoreOptions:

  • file.compression (default: zstd)
  • target-file-size (default: 256MB)
  • write.parquet-buffer-size (default: 256MB)

Includes E2E integration tests for unpartitioned, partitioned, fixed-bucket, multi-commit, column projection, and bucket filtering.

Brief change log

Tests

API and Format

Documentation

let row = BinaryRow::from_serialized_bytes(&msg.partition)?;
let mut spec = HashMap::new();
for (i, key) in partition_keys.iter().enumerate() {
if let Some(datum) = extract_datum(&row, i, &data_types[i])? {
Copy link
Copy Markdown
Contributor

@littlecoder04 littlecoder04 Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will drop NULL partition keys from the overwrite predicate. I reproduced a case where overwriting the NULL partition also deletes other partitions.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch.

… support

Add TableWrite for writing Arrow RecordBatches to Paimon append-only
tables. Each (partition, bucket) pair gets its own DataFileWriter with
direct writes (matching delta-rs DeltaWriter pattern). File rolling
uses tokio::spawn for background close, and prepare_commit uses
try_join_all for parallel finalization across partition writers.

Key components:
- TableWrite: routes batches by partition/bucket, holds DataFileWriters
- DataFileWriter: manages parquet file lifecycle with rolling support
- WriteBuilder: creates TableWrite and TableCommit instances
- PaimonDataSink: DataFusion DataSink integration for INSERT/OVERWRITE
- FormatFileWriter: extended with flush() and in_progress_size()

Configurable options via CoreOptions:
- file.compression (default: zstd)
- target-file-size (default: 256MB)
- write.parquet-buffer-size (default: 256MB)

Includes E2E integration tests for unpartitioned, partitioned,
fixed-bucket, multi-commit, column projection, and bucket filtering.
let datum = extract_datum_from_arrow(batch, row_idx, field_idx, field.data_type())?;
if let Some(d) = datum {
datums.push((d, field.data_type().clone()));
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will drop NULL bucket-key fields before hashing. Java preserves NULL positions here; see FixedBucketRowKeyExtractorTest.testUnCompactDecimalAndTimestampNullValueBucketNumber.
https://github.com/apache/paimon/blob/master/paimon-core/src/test/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractorTest.java

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! Also fix bucket Null values in TableScan.

@littlecoder04
Copy link
Copy Markdown
Contributor

+1

Copy link
Copy Markdown

@leaves12138 leaves12138 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Solid write pipeline implementation. The architecture mirrors the paimon-python design well and the delta-rs style direct-write pattern is a good fit.

Highlights:

  1. TableWrite + DataFileWriter: Clean per (partition, bucket) writer model. The divide_by_partition_bucket routing via arrow_select::take is correct for now. Background file close via JoinSet in roll_file() and parallel prepare_commit with try_join_all are well thought out.

  2. PaimonDataSink: Proper DataSink implementation with write_all for INSERT and overwrite support. The dynamic partition predicate extraction from commit messages for OVERWRITE is the right approach.

  3. TableCommit refactoring: Splitting into explicit commit() (APPEND) and overwrite() (dynamic partition overwrite) methods is cleaner than the implicit overwrite_partition constructor arg.

  4. Integration tests: Comprehensive E2E coverage — unpartitioned, partitioned, fixed bucket, multi-commit, column projection, bucket filtering.

  5. CoreOptions additions: file.compression, target-file-size, write.parquet-buffer-size are the right knobs to expose.

Minor notes (non-blocking):

  • divide_by_partition_bucket creates one UInt32Array of indices per row. For large batches this could be optimized with batch-level partition extraction (e.g., sort-by-partition then slice), but the current approach is correct and simple for a first pass.
  • DataFileMeta uses EMPTY_SERIALIZED_ROW for min_key/max_key and zero sequence numbers — this is fine for append-only but worth a TODO note if PK/compaction support is planned later.
  • The NULL datum handling fix (from the review thread) is correct — dropping NULL from bucket key datums and partition predicate specs was a real bug.

+1, good to merge.

@JingsongLi JingsongLi merged commit ecbf458 into apache:main Apr 12, 2026
8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants