feat(agg): support paimon Java config aggregation.remove-record-on-delete#225
feat(agg): support paimon Java config aggregation.remove-record-on-delete#225duanyyyyyyy wants to merge 1 commit intoalibaba:mainfrom
Conversation
7ccaec8 to
a0bb9fb
Compare
a0bb9fb to
63b738f
Compare
There was a problem hiding this comment.
Pull request overview
Adds support for the aggregation.remove-record-on-delete table option so the aggregation merge engine can output DELETE records (instead of treating them purely as retractions), aligning behavior with the Java configuration.
Changes:
- Introduce new option constant
aggregation.remove-record-on-deleteand parse/store it inCoreOptions. - Plumb the option into
AggregateMergeFunctionand adjust merge/output logic to returnRowKind::Delete()when applicable. - Add unit tests covering delete behavior with and without the new option.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
src/paimon/core/mergetree/compact/aggregate/aggregate_merge_function.cpp |
Implements delete-short-circuit behavior when the new option is enabled and outputs DELETE via GetResult(). |
src/paimon/core/mergetree/compact/aggregate/aggregate_merge_function.h |
Adds/initializes new merge-function state (remove_record_on_delete_, current_delete_row_). |
src/paimon/core/mergetree/compact/aggregate/aggregate_merge_function_test.cpp |
Adds tests for delete handling with/without aggregation.remove-record-on-delete. |
src/paimon/core/core_options.h |
Declares CoreOptions::AggregationRemoveRecordOnDelete(). |
src/paimon/core/core_options.cpp |
Parses and stores the new option in CoreOptions. |
include/paimon/defs.h |
Declares the new option key in Options. |
src/paimon/common/defs.cpp |
Defines the new option key string literal. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| current_delete_row_ = false; | ||
| bool is_retract = kv.value_kind->IsRetract(); |
There was a problem hiding this comment.
When aggregation.remove-record-on-delete is enabled, receiving a DELETE row overwrites row_ with the delete record’s values and returns early. If a later INSERT/UPDATE_AFTER for the same key is processed (e.g., delete followed by re-insert with a higher sequence number), the aggregation will continue using the delete record as the accumulator, producing incorrect results. The merge state should be cleared when transitioning from a delete-record-on-delete DELETE to a subsequent add record (e.g., reset row_/aggregators_ or otherwise start a fresh accumulator) so re-inserts don’t incorporate deleted values.
| current_delete_row_ = false; | |
| bool is_retract = kv.value_kind->IsRetract(); | |
| bool was_delete_row = current_delete_row_; | |
| current_delete_row_ = false; | |
| bool is_retract = kv.value_kind->IsRetract(); | |
| // If the previous state was a delete row produced by removeRecordOnDelete, | |
| // a following add record should start a new accumulator instead of merging | |
| // with the deleted row's values. | |
| if (was_delete_row && !is_retract) { | |
| row_ = std::make_unique<GenericRow>(getters_.size()); | |
| for (size_t i = 0; i < getters_.size(); i++) { | |
| row_->SetField(i, getters_[i](*(kv.value))); | |
| } | |
| row_->AddDataHolder(std::move(kv.value)); | |
| latest_kv_ = std::move(kv); | |
| return Status::OK(); | |
| } |
There was a problem hiding this comment.
I do not think this is a good suggestion
There was a problem hiding this comment.
It will not happen delete followed by re-insert
| BinaryRowGenerator::GenerateRowPtr({10}, pool.get()), | ||
| BinaryRowGenerator::GenerateRowPtr({10, 100}, pool.get())); | ||
| KeyValueChecker::CheckResult(expected, result_kv, /*key_arity=*/1, /*value_arity=*/2); | ||
| } |
There was a problem hiding this comment.
The new remove-record-on-delete tests cover DELETE as the final record, but they don’t cover the reinsert case (DELETE followed by a higher-sequence INSERT/UPDATE_AFTER for the same key). Given the current implementation, this ordering can yield incorrect aggregated values. Add a test case that exercises DELETE -> INSERT when AGGREGATION_REMOVE_RECORD_ON_DELETE is true to lock in the expected semantics (fresh accumulator after delete).
| } | |
| } | |
| // Case 4: DELETE followed by higher-sequence INSERT should start from a fresh | |
| // accumulator when removeRecordOnDelete is enabled. | |
| { | |
| merge_func->Reset(); | |
| KeyValue kv1(RowKind::Insert(), /*sequence_number=*/0, /*level=*/0, | |
| BinaryRowGenerator::GenerateRowPtr({10}, pool.get()), | |
| BinaryRowGenerator::GenerateRowPtr({10, 100}, pool.get())); | |
| KeyValue kv2(RowKind::Delete(), /*sequence_number=*/1, /*level=*/0, | |
| BinaryRowGenerator::GenerateRowPtr({10}, pool.get()), | |
| BinaryRowGenerator::GenerateRowPtr({10, 200}, pool.get())); | |
| KeyValue kv3(RowKind::Insert(), /*sequence_number=*/2, /*level=*/0, | |
| BinaryRowGenerator::GenerateRowPtr({10}, pool.get()), | |
| BinaryRowGenerator::GenerateRowPtr({10, 300}, pool.get())); | |
| ASSERT_OK(merge_func->Add(std::move(kv1))); | |
| ASSERT_OK(merge_func->Add(std::move(kv2))); | |
| ASSERT_OK(merge_func->Add(std::move(kv3))); | |
| auto result_kv = std::move(merge_func->GetResult().value().value()); | |
| KeyValue expected(RowKind::Insert(), /*sequence_number=*/2, | |
| /*level=*/KeyValue::UNKNOWN_LEVEL, | |
| BinaryRowGenerator::GenerateRowPtr({10}, pool.get()), | |
| BinaryRowGenerator::GenerateRowPtr({10, 300}, pool.get())); | |
| KeyValueChecker::CheckResult(expected, result_kv, /*key_arity=*/1, /*value_arity=*/2); | |
| } |
There was a problem hiding this comment.
also not correct.
Paimon has its own snapshot
|
|
||
| std::map<std::string, std::string> GetFieldsSequenceGroups() const; | ||
| bool PartialUpdateRemoveRecordOnDelete() const; | ||
| bool AggregationRemoveRecordOnDelete() const; |
There was a problem hiding this comment.
please add case in core_options_test.cpp for AggregationRemoveRecordOnDelete() api
There was a problem hiding this comment.
CoreOptionsTest, TestDefaultValue and CoreOptionsTest, TestFromMap
| aggregators.push_back(std::move(agg)); | ||
| } | ||
|
|
||
| bool remove_record_on_delete = options.AggregationRemoveRecordOnDelete(); |
There was a problem hiding this comment.
Please add check in class FieldAggregatorFactory like Java Paimon,
removeRecordOnRetract and fieldIgnoreRetract have conflicting behavior so should not be enabled at the same time.
1cb1e37 to
efd534e
Compare
Purpose
Support Java version
aggregation.remove-record-on-deleteconfigLinked issue: close #xxx
Tests
API and Format
support
aggregation.remove-record-on-deleteoptionsDocumentation
In C++ version
for Java
Generative AI tooling