Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions include/paimon/defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,10 @@ struct PAIMON_EXPORT Options {
/// "global-index.external-path" - Global index root directory, if not set, the global index
/// files will be stored under the index directory.
static const char GLOBAL_INDEX_EXTERNAL_PATH[];
/// "aggregation.remove-record-on-delete" - Whether to remove the whole row in aggregation
/// engine when delete records are received. Default value is "false".
static const char AGGREGATION_REMOVE_RECORD_ON_DELETE[];

/// "scan.tag-name" - Optional tag name used in case of "from-snapshot" scan mode.
static const char SCAN_TAG_NAME[];
/// "write-only" - If set to "true", compactions and snapshot expiration will be skipped. This
Expand Down
1 change: 1 addition & 0 deletions src/paimon/common/defs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ const char Options::PARTITION_GENERATE_LEGACY_NAME[] = "partition.legacy-name";
const char Options::BLOB_AS_DESCRIPTOR[] = "blob-as-descriptor";
const char Options::GLOBAL_INDEX_ENABLED[] = "global-index.enabled";
const char Options::GLOBAL_INDEX_EXTERNAL_PATH[] = "global-index.external-path";
const char Options::AGGREGATION_REMOVE_RECORD_ON_DELETE[] = "aggregation.remove-record-on-delete";
const char Options::SCAN_TAG_NAME[] = "scan.tag-name";
const char Options::WRITE_ONLY[] = "write-only";
const char Options::COMPACTION_MIN_FILE_NUM[] = "compaction.min.file-num";
Expand Down
9 changes: 9 additions & 0 deletions src/paimon/core/core_options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ struct CoreOptions::Impl {
bool force_lookup = false;
bool lookup_wait = true;
bool partial_update_remove_record_on_delete = false;
bool aggregation_remove_record_on_delete = false;
bool file_index_read_enabled = true;
bool enable_adaptive_prefetch_strategy = true;
bool index_file_in_data_file_dir = false;
Expand Down Expand Up @@ -555,6 +556,10 @@ Result<CoreOptions> CoreOptions::FromMap(
// Parse partial_update_remove_record_on_delete
PAIMON_RETURN_NOT_OK(parser.Parse<bool>(Options::PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE,
&impl->partial_update_remove_record_on_delete));
// Parse aggregation_remove_record_on_delete
PAIMON_RETURN_NOT_OK(parser.Parse<bool>(Options::AGGREGATION_REMOVE_RECORD_ON_DELETE,
&impl->aggregation_remove_record_on_delete));

// Parse partial-update.remove-record-on-sequence-group
PAIMON_RETURN_NOT_OK(parser.ParseList<std::string>(
Options::PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP, Options::FIELDS_SEPARATOR,
Expand Down Expand Up @@ -1070,6 +1075,10 @@ bool CoreOptions::PartialUpdateRemoveRecordOnDelete() const {
return impl_->partial_update_remove_record_on_delete;
}

bool CoreOptions::AggregationRemoveRecordOnDelete() const {
return impl_->aggregation_remove_record_on_delete;
}

std::vector<std::string> CoreOptions::GetPartialUpdateRemoveRecordOnSequenceGroup() const {
return impl_->remove_record_on_sequence_group;
}
Expand Down
1 change: 1 addition & 0 deletions src/paimon/core/core_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ class PAIMON_EXPORT CoreOptions {

std::map<std::string, std::string> GetFieldsSequenceGroups() const;
bool PartialUpdateRemoveRecordOnDelete() const;
bool AggregationRemoveRecordOnDelete() const;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add case in core_options_test.cpp for AggregationRemoveRecordOnDelete() api

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CoreOptionsTest, TestDefaultValue and CoreOptionsTest, TestFromMap

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

std::vector<std::string> GetPartialUpdateRemoveRecordOnSequenceGroup() const;

std::optional<std::string> GetScanFallbackBranch() const;
Expand Down
3 changes: 3 additions & 0 deletions src/paimon/core/core_options_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ TEST(CoreOptionsTest, TestDefaultValue) {
/*deletion_vector=*/false, /*force_lookup=*/false};
ASSERT_EQ(expected_lookup_strategy, core_options.GetLookupStrategy());
ASSERT_TRUE(core_options.GetFieldsSequenceGroups().empty());
ASSERT_FALSE(core_options.AggregationRemoveRecordOnDelete());
ASSERT_FALSE(core_options.PartialUpdateRemoveRecordOnDelete());
ASSERT_TRUE(core_options.GetPartialUpdateRemoveRecordOnSequenceGroup().empty());
ASSERT_EQ(std::nullopt, core_options.GetScanFallbackBranch());
Expand Down Expand Up @@ -173,6 +174,7 @@ TEST(CoreOptionsTest, TestFromMap) {
{Options::CHANGELOG_PRODUCER, "full-compaction"},
{Options::FORCE_LOOKUP, "true"},
{"fields.g_1,g_3.sequence-group", "c,d"},
{Options::AGGREGATION_REMOVE_RECORD_ON_DELETE, "true"},
{Options::PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE, "true"},
{Options::PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP, "a,b"},
{Options::SCAN_FALLBACK_BRANCH, "fallback"},
Expand Down Expand Up @@ -279,6 +281,7 @@ TEST(CoreOptionsTest, TestFromMap) {
std::map<std::string, std::string> seq_grp;
seq_grp["g_1,g_3"] = "c,d";
ASSERT_EQ(core_options.GetFieldsSequenceGroups(), seq_grp);
ASSERT_TRUE(core_options.AggregationRemoveRecordOnDelete());
ASSERT_TRUE(core_options.PartialUpdateRemoveRecordOnDelete());
ASSERT_EQ(core_options.GetPartialUpdateRemoveRecordOnSequenceGroup(),
std::vector<std::string>({"a", "b"}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,29 @@ Result<std::unique_ptr<AggregateMergeFunction>> AggregateMergeFunction::Create(
aggregators.push_back(std::move(agg));
}

bool remove_record_on_delete = options.AggregationRemoveRecordOnDelete();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add check in class FieldAggregatorFactory like Java Paimon,
removeRecordOnRetract and fieldIgnoreRetract have conflicting behavior so should not be enabled at the same time.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated


PAIMON_ASSIGN_OR_RAISE(std::vector<InternalRow::FieldGetterFunc> getters,
InternalRowUtils::CreateFieldGetters(value_schema, /*use_view=*/true));
return std::unique_ptr<AggregateMergeFunction>(
new AggregateMergeFunction(std::move(getters), std::move(aggregators)));
return std::unique_ptr<AggregateMergeFunction>(new AggregateMergeFunction(
std::move(getters), std::move(aggregators), remove_record_on_delete));
}

Status AggregateMergeFunction::Add(KeyValue&& kv) {
// When removeRecordOnDelete is enabled, if we receive a DELETE row,
// mark the current row for deletion and initialize the row with input values.
if (remove_record_on_delete_ && kv.value_kind == RowKind::Delete()) {
current_delete_row_ = true;
row_ = std::make_unique<GenericRow>(getters_.size());
for (size_t i = 0; i < getters_.size(); i++) {
row_->SetField(i, getters_[i](*(kv.value)));
}
row_->AddDataHolder(std::move(kv.value));
latest_kv_ = std::move(kv);
return Status::OK();
}

current_delete_row_ = false;
bool is_retract = kv.value_kind->IsRetract();
Comment on lines +73 to 75
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When aggregation.remove-record-on-delete is enabled, receiving a DELETE row overwrites row_ with the delete record’s values and returns early. If a later INSERT/UPDATE_AFTER for the same key is processed (e.g., delete followed by re-insert with a higher sequence number), the aggregation will continue using the delete record as the accumulator, producing incorrect results. The merge state should be cleared when transitioning from a delete-record-on-delete DELETE to a subsequent add record (e.g., reset row_/aggregators_ or otherwise start a fresh accumulator) so re-inserts don’t incorporate deleted values.

Suggested change
current_delete_row_ = false;
bool is_retract = kv.value_kind->IsRetract();
bool was_delete_row = current_delete_row_;
current_delete_row_ = false;
bool is_retract = kv.value_kind->IsRetract();
// If the previous state was a delete row produced by removeRecordOnDelete,
// a following add record should start a new accumulator instead of merging
// with the deleted row's values.
if (was_delete_row && !is_retract) {
row_ = std::make_unique<GenericRow>(getters_.size());
for (size_t i = 0; i < getters_.size(); i++) {
row_->SetField(i, getters_[i](*(kv.value)));
}
row_->AddDataHolder(std::move(kv.value));
latest_kv_ = std::move(kv);
return Status::OK();
}

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think this is a good suggestion

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will not happen delete followed by re-insert

for (size_t i = 0; i < getters_.size(); i++) {
auto accumulator = getters_[i](*row_);
Expand All @@ -77,7 +93,7 @@ Status AggregateMergeFunction::Add(KeyValue&& kv) {
Result<std::optional<KeyValue>> AggregateMergeFunction::GetResult() {
assert(latest_kv_);
latest_kv_.value().value = std::move(row_);
latest_kv_.value().value_kind = RowKind::Insert();
latest_kv_.value().value_kind = current_delete_row_ ? RowKind::Delete() : RowKind::Insert();
latest_kv_.value().level = KeyValue::UNKNOWN_LEVEL;
return std::move(latest_kv_);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class AggregateMergeFunction : public MergeFunction {

void Reset() override {
latest_kv_ = std::nullopt;
current_delete_row_ = false;
row_ = std::make_unique<GenericRow>(getters_.size());
for (const auto& agg : aggregators_) {
agg->Reset();
Expand All @@ -63,9 +64,11 @@ class AggregateMergeFunction : public MergeFunction {

private:
AggregateMergeFunction(std::vector<InternalRow::FieldGetterFunc>&& getters,
std::vector<std::unique_ptr<FieldAggregator>>&& aggregators)
std::vector<std::unique_ptr<FieldAggregator>>&& aggregators,
bool remove_record_on_delete)
: getters_(std::move(getters)),
aggregators_(std::move(aggregators)),
remove_record_on_delete_(remove_record_on_delete),
row_(std::make_unique<GenericRow>(getters_.size())) {
assert(getters_.size() == aggregators_.size());
}
Expand All @@ -76,6 +79,8 @@ class AggregateMergeFunction : public MergeFunction {
private:
std::vector<InternalRow::FieldGetterFunc> getters_;
std::vector<std::unique_ptr<FieldAggregator>> aggregators_;
bool remove_record_on_delete_;
bool current_delete_row_ = false;
std::optional<KeyValue> latest_kv_;
std::unique_ptr<GenericRow> row_;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,4 +191,109 @@ TEST(AggregateMergeFunctionTest, TestSequenceFields) {
KeyValueChecker::CheckResult(expected, result_kv, /*key_arity=*/1, /*value_arity=*/4);
}

TEST(AggregateMergeFunctionTest, TestRemoveRecordOnDelete) {
arrow::FieldVector fields = {arrow::field("k0", arrow::int32()),
arrow::field("v0", arrow::int32())};
auto value_schema = arrow::schema(fields);
ASSERT_OK_AND_ASSIGN(
CoreOptions core_options,
CoreOptions::FromMap({{Options::FIELDS_DEFAULT_AGG_FUNC, "sum"},
{Options::AGGREGATION_REMOVE_RECORD_ON_DELETE, "true"}}));
ASSERT_OK_AND_ASSIGN(
std::unique_ptr<AggregateMergeFunction> merge_func,
AggregateMergeFunction::Create(value_schema, /*primary_keys=*/{"k0"}, core_options));

auto pool = GetDefaultPool();

// Case 1: INSERT + INSERT, then DELETE -> result should be RowKind::Delete
{
merge_func->Reset();
KeyValue kv1(RowKind::Insert(), /*sequence_number=*/0, /*level=*/0,
BinaryRowGenerator::GenerateRowPtr({10}, pool.get()),
BinaryRowGenerator::GenerateRowPtr({10, 100}, pool.get()));
KeyValue kv2(RowKind::Insert(), /*sequence_number=*/1, /*level=*/0,
BinaryRowGenerator::GenerateRowPtr({10}, pool.get()),
BinaryRowGenerator::GenerateRowPtr({10, 200}, pool.get()));
KeyValue kv3(RowKind::Delete(), /*sequence_number=*/2, /*level=*/0,
BinaryRowGenerator::GenerateRowPtr({10}, pool.get()),
BinaryRowGenerator::GenerateRowPtr({10, 300}, pool.get()));
ASSERT_OK(merge_func->Add(std::move(kv1)));
ASSERT_OK(merge_func->Add(std::move(kv2)));
ASSERT_OK(merge_func->Add(std::move(kv3)));
auto result_kv = std::move(merge_func->GetResult().value().value());
// Should return DELETE row kind with the original values from the delete record
KeyValue expected(RowKind::Delete(), /*sequence_number=*/2,
/*level=*/KeyValue::UNKNOWN_LEVEL,
BinaryRowGenerator::GenerateRowPtr({10}, pool.get()),
BinaryRowGenerator::GenerateRowPtr({10, 300}, pool.get()));
KeyValueChecker::CheckResult(expected, result_kv, /*key_arity=*/1, /*value_arity=*/2);
}

// Case 2: Only INSERT rows, no DELETE -> result should be RowKind::Insert with aggregated
// values
{
merge_func->Reset();
KeyValue kv1(RowKind::Insert(), /*sequence_number=*/0, /*level=*/0,
BinaryRowGenerator::GenerateRowPtr({10}, pool.get()),
BinaryRowGenerator::GenerateRowPtr({10, 100}, pool.get()));
KeyValue kv2(RowKind::Insert(), /*sequence_number=*/1, /*level=*/0,
BinaryRowGenerator::GenerateRowPtr({10}, pool.get()),
BinaryRowGenerator::GenerateRowPtr({10, 200}, pool.get()));
ASSERT_OK(merge_func->Add(std::move(kv1)));
ASSERT_OK(merge_func->Add(std::move(kv2)));
auto result_kv = std::move(merge_func->GetResult().value().value());
// Should return INSERT with sum aggregation: 100 + 200 = 300
KeyValue expected(RowKind::Insert(), /*sequence_number=*/1,
/*level=*/KeyValue::UNKNOWN_LEVEL,
BinaryRowGenerator::GenerateRowPtr({10}, pool.get()),
BinaryRowGenerator::GenerateRowPtr({10, 300}, pool.get()));
KeyValueChecker::CheckResult(expected, result_kv, /*key_arity=*/1, /*value_arity=*/2);
}

// Case 3: DELETE only -> result should be RowKind::Delete
{
merge_func->Reset();
KeyValue kv1(RowKind::Delete(), /*sequence_number=*/0, /*level=*/0,
BinaryRowGenerator::GenerateRowPtr({10}, pool.get()),
BinaryRowGenerator::GenerateRowPtr({10, 100}, pool.get()));
ASSERT_OK(merge_func->Add(std::move(kv1)));
auto result_kv = std::move(merge_func->GetResult().value().value());
KeyValue expected(RowKind::Delete(), /*sequence_number=*/0,
/*level=*/KeyValue::UNKNOWN_LEVEL,
BinaryRowGenerator::GenerateRowPtr({10}, pool.get()),
BinaryRowGenerator::GenerateRowPtr({10, 100}, pool.get()));
KeyValueChecker::CheckResult(expected, result_kv, /*key_arity=*/1, /*value_arity=*/2);
}
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new remove-record-on-delete tests cover DELETE as the final record, but they don’t cover the reinsert case (DELETE followed by a higher-sequence INSERT/UPDATE_AFTER for the same key). Given the current implementation, this ordering can yield incorrect aggregated values. Add a test case that exercises DELETE -> INSERT when AGGREGATION_REMOVE_RECORD_ON_DELETE is true to lock in the expected semantics (fresh accumulator after delete).

Suggested change
}
}
// Case 4: DELETE followed by higher-sequence INSERT should start from a fresh
// accumulator when removeRecordOnDelete is enabled.
{
merge_func->Reset();
KeyValue kv1(RowKind::Insert(), /*sequence_number=*/0, /*level=*/0,
BinaryRowGenerator::GenerateRowPtr({10}, pool.get()),
BinaryRowGenerator::GenerateRowPtr({10, 100}, pool.get()));
KeyValue kv2(RowKind::Delete(), /*sequence_number=*/1, /*level=*/0,
BinaryRowGenerator::GenerateRowPtr({10}, pool.get()),
BinaryRowGenerator::GenerateRowPtr({10, 200}, pool.get()));
KeyValue kv3(RowKind::Insert(), /*sequence_number=*/2, /*level=*/0,
BinaryRowGenerator::GenerateRowPtr({10}, pool.get()),
BinaryRowGenerator::GenerateRowPtr({10, 300}, pool.get()));
ASSERT_OK(merge_func->Add(std::move(kv1)));
ASSERT_OK(merge_func->Add(std::move(kv2)));
ASSERT_OK(merge_func->Add(std::move(kv3)));
auto result_kv = std::move(merge_func->GetResult().value().value());
KeyValue expected(RowKind::Insert(), /*sequence_number=*/2,
/*level=*/KeyValue::UNKNOWN_LEVEL,
BinaryRowGenerator::GenerateRowPtr({10}, pool.get()),
BinaryRowGenerator::GenerateRowPtr({10, 300}, pool.get()));
KeyValueChecker::CheckResult(expected, result_kv, /*key_arity=*/1, /*value_arity=*/2);
}

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also not correct.
Paimon has its own snapshot

}

TEST(AggregateMergeFunctionTest, TestDeleteWithoutRemoveRecordOnDelete) {
// Without removeRecordOnDelete, DELETE row should be treated as retract (subtract)
arrow::FieldVector fields = {arrow::field("k0", arrow::int32()),
arrow::field("v0", arrow::int32())};
auto value_schema = arrow::schema(fields);
ASSERT_OK_AND_ASSIGN(CoreOptions core_options,
CoreOptions::FromMap({{Options::FIELDS_DEFAULT_AGG_FUNC, "sum"}}));
ASSERT_OK_AND_ASSIGN(
std::unique_ptr<AggregateMergeFunction> merge_func,
AggregateMergeFunction::Create(value_schema, /*primary_keys=*/{"k0"}, core_options));

auto pool = GetDefaultPool();
merge_func->Reset();
KeyValue kv1(RowKind::Insert(), /*sequence_number=*/0, /*level=*/0,
BinaryRowGenerator::GenerateRowPtr({10}, pool.get()),
BinaryRowGenerator::GenerateRowPtr({10, 200}, pool.get()));
KeyValue kv2(RowKind::Delete(), /*sequence_number=*/1, /*level=*/0,
BinaryRowGenerator::GenerateRowPtr({10}, pool.get()),
BinaryRowGenerator::GenerateRowPtr({10, 300}, pool.get()));
ASSERT_OK(merge_func->Add(std::move(kv1)));
ASSERT_OK(merge_func->Add(std::move(kv2)));
auto result_kv = std::move(merge_func->GetResult().value().value());
// Without removeRecordOnDelete, DELETE is retract: 200 - 300 = -100, result is INSERT
KeyValue expected(RowKind::Insert(), /*sequence_number=*/1,
/*level=*/KeyValue::UNKNOWN_LEVEL,
BinaryRowGenerator::GenerateRowPtr({10}, pool.get()),
BinaryRowGenerator::GenerateRowPtr({10, -100}, pool.get()));
KeyValueChecker::CheckResult(expected, result_kv, /*key_arity=*/1, /*value_arity=*/2);
}

} // namespace paimon::test
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,15 @@ class FieldAggregatorFactory {
"Use unsupported aggregation {} or spell aggregate function incorrectly!",
str_agg));
}
bool remove_record_on_retract = options.AggregationRemoveRecordOnDelete();
PAIMON_ASSIGN_OR_RAISE(bool ignore_retract, options.FieldAggIgnoreRetract(field_name));
if (remove_record_on_retract && ignore_retract) {
return Status::Invalid(fmt::format(
"{} and {}.{}.{} have conflicting behavior so should not be enabled at the same "
"time.",
Options::AGGREGATION_REMOVE_RECORD_ON_DELETE, Options::FIELDS_PREFIX, field_name,
Options::IGNORE_RETRACT));
}
if (ignore_retract) {
field_aggregator = std::make_unique<FieldIgnoreRetractAgg>(std::move(field_aggregator));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,14 @@ TEST(FieldAggregatorFactoryTest, TestSimple) {
}
}

TEST(FieldAggregatorFactoryTest, TestRemoveRecordOnDeleteConflictsWithIgnoreRetract) {
ASSERT_OK_AND_ASSIGN(
CoreOptions options,
CoreOptions::FromMap({{Options::AGGREGATION_REMOVE_RECORD_ON_DELETE, "true"},
{"fields.f0.ignore-retract", "true"}}));
auto result = FieldAggregatorFactory::CreateFieldAggregator("f0", arrow::int32(), "sum", options);
ASSERT_FALSE(result.ok());
ASSERT_TRUE(result.status().message().find("conflicting behavior") != std::string::npos);
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider use ASSERT_NOK_WITH_MSG.


} // namespace paimon::test
Loading