From efd534eee2d39241ce7769c0200224bc387eb25d Mon Sep 17 00:00:00 2001 From: duanyyyyyy Date: Tue, 14 Apr 2026 13:33:05 +0800 Subject: [PATCH] Support paimon Java config aggregation.remove-record-on-delete --- include/paimon/defs.h | 4 + src/paimon/common/defs.cpp | 1 + src/paimon/core/core_options.cpp | 9 ++ src/paimon/core/core_options.h | 1 + src/paimon/core/core_options_test.cpp | 3 + .../aggregate/aggregate_merge_function.cpp | 22 +++- .../aggregate/aggregate_merge_function.h | 7 +- .../aggregate_merge_function_test.cpp | 105 ++++++++++++++++++ .../aggregate/field_aggregator_factory.h | 8 ++ .../field_aggregator_factory_test.cpp | 10 ++ 10 files changed, 166 insertions(+), 4 deletions(-) diff --git a/include/paimon/defs.h b/include/paimon/defs.h index 3ea0da43c..81daa27fc 100644 --- a/include/paimon/defs.h +++ b/include/paimon/defs.h @@ -341,6 +341,10 @@ struct PAIMON_EXPORT Options { /// "global-index.external-path" - Global index root directory, if not set, the global index /// files will be stored under the index directory. static const char GLOBAL_INDEX_EXTERNAL_PATH[]; + /// "aggregation.remove-record-on-delete" - Whether to remove the whole row in aggregation + /// engine when delete records are received. Default value is "false". + static const char AGGREGATION_REMOVE_RECORD_ON_DELETE[]; + /// "scan.tag-name" - Optional tag name used in case of "from-snapshot" scan mode. static const char SCAN_TAG_NAME[]; /// "write-only" - If set to "true", compactions and snapshot expiration will be skipped. This diff --git a/src/paimon/common/defs.cpp b/src/paimon/common/defs.cpp index 113d1327b..40241cbe7 100644 --- a/src/paimon/common/defs.cpp +++ b/src/paimon/common/defs.cpp @@ -85,6 +85,7 @@ const char Options::PARTITION_GENERATE_LEGACY_NAME[] = "partition.legacy-name"; const char Options::BLOB_AS_DESCRIPTOR[] = "blob-as-descriptor"; const char Options::GLOBAL_INDEX_ENABLED[] = "global-index.enabled"; const char Options::GLOBAL_INDEX_EXTERNAL_PATH[] = "global-index.external-path"; +const char Options::AGGREGATION_REMOVE_RECORD_ON_DELETE[] = "aggregation.remove-record-on-delete"; const char Options::SCAN_TAG_NAME[] = "scan.tag-name"; const char Options::WRITE_ONLY[] = "write-only"; const char Options::COMPACTION_MIN_FILE_NUM[] = "compaction.min.file-num"; diff --git a/src/paimon/core/core_options.cpp b/src/paimon/core/core_options.cpp index 3e406998a..a6ac2908f 100644 --- a/src/paimon/core/core_options.cpp +++ b/src/paimon/core/core_options.cpp @@ -399,6 +399,7 @@ struct CoreOptions::Impl { bool force_lookup = false; bool lookup_wait = true; bool partial_update_remove_record_on_delete = false; + bool aggregation_remove_record_on_delete = false; bool file_index_read_enabled = true; bool enable_adaptive_prefetch_strategy = true; bool index_file_in_data_file_dir = false; @@ -555,6 +556,10 @@ Result CoreOptions::FromMap( // Parse partial_update_remove_record_on_delete PAIMON_RETURN_NOT_OK(parser.Parse(Options::PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE, &impl->partial_update_remove_record_on_delete)); + // Parse aggregation_remove_record_on_delete + PAIMON_RETURN_NOT_OK(parser.Parse(Options::AGGREGATION_REMOVE_RECORD_ON_DELETE, + &impl->aggregation_remove_record_on_delete)); + // Parse partial-update.remove-record-on-sequence-group PAIMON_RETURN_NOT_OK(parser.ParseList( Options::PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP, Options::FIELDS_SEPARATOR, @@ -1070,6 +1075,10 @@ bool CoreOptions::PartialUpdateRemoveRecordOnDelete() const { return impl_->partial_update_remove_record_on_delete; } +bool CoreOptions::AggregationRemoveRecordOnDelete() const { + return impl_->aggregation_remove_record_on_delete; +} + std::vector CoreOptions::GetPartialUpdateRemoveRecordOnSequenceGroup() const { return impl_->remove_record_on_sequence_group; } diff --git a/src/paimon/core/core_options.h b/src/paimon/core/core_options.h index d29b7fca0..b672f24c7 100644 --- a/src/paimon/core/core_options.h +++ b/src/paimon/core/core_options.h @@ -120,6 +120,7 @@ class PAIMON_EXPORT CoreOptions { std::map GetFieldsSequenceGroups() const; bool PartialUpdateRemoveRecordOnDelete() const; + bool AggregationRemoveRecordOnDelete() const; std::vector GetPartialUpdateRemoveRecordOnSequenceGroup() const; std::optional GetScanFallbackBranch() const; diff --git a/src/paimon/core/core_options_test.cpp b/src/paimon/core/core_options_test.cpp index 6a8a86855..5124d57ba 100644 --- a/src/paimon/core/core_options_test.cpp +++ b/src/paimon/core/core_options_test.cpp @@ -93,6 +93,7 @@ TEST(CoreOptionsTest, TestDefaultValue) { /*deletion_vector=*/false, /*force_lookup=*/false}; ASSERT_EQ(expected_lookup_strategy, core_options.GetLookupStrategy()); ASSERT_TRUE(core_options.GetFieldsSequenceGroups().empty()); + ASSERT_FALSE(core_options.AggregationRemoveRecordOnDelete()); ASSERT_FALSE(core_options.PartialUpdateRemoveRecordOnDelete()); ASSERT_TRUE(core_options.GetPartialUpdateRemoveRecordOnSequenceGroup().empty()); ASSERT_EQ(std::nullopt, core_options.GetScanFallbackBranch()); @@ -173,6 +174,7 @@ TEST(CoreOptionsTest, TestFromMap) { {Options::CHANGELOG_PRODUCER, "full-compaction"}, {Options::FORCE_LOOKUP, "true"}, {"fields.g_1,g_3.sequence-group", "c,d"}, + {Options::AGGREGATION_REMOVE_RECORD_ON_DELETE, "true"}, {Options::PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE, "true"}, {Options::PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP, "a,b"}, {Options::SCAN_FALLBACK_BRANCH, "fallback"}, @@ -279,6 +281,7 @@ TEST(CoreOptionsTest, TestFromMap) { std::map seq_grp; seq_grp["g_1,g_3"] = "c,d"; ASSERT_EQ(core_options.GetFieldsSequenceGroups(), seq_grp); + ASSERT_TRUE(core_options.AggregationRemoveRecordOnDelete()); ASSERT_TRUE(core_options.PartialUpdateRemoveRecordOnDelete()); ASSERT_EQ(core_options.GetPartialUpdateRemoveRecordOnSequenceGroup(), std::vector({"a", "b"})); diff --git a/src/paimon/core/mergetree/compact/aggregate/aggregate_merge_function.cpp b/src/paimon/core/mergetree/compact/aggregate/aggregate_merge_function.cpp index 6d45ee150..7ac8c023b 100644 --- a/src/paimon/core/mergetree/compact/aggregate/aggregate_merge_function.cpp +++ b/src/paimon/core/mergetree/compact/aggregate/aggregate_merge_function.cpp @@ -49,13 +49,29 @@ Result> AggregateMergeFunction::Create( aggregators.push_back(std::move(agg)); } + bool remove_record_on_delete = options.AggregationRemoveRecordOnDelete(); + PAIMON_ASSIGN_OR_RAISE(std::vector getters, InternalRowUtils::CreateFieldGetters(value_schema, /*use_view=*/true)); - return std::unique_ptr( - new AggregateMergeFunction(std::move(getters), std::move(aggregators))); + return std::unique_ptr(new AggregateMergeFunction( + std::move(getters), std::move(aggregators), remove_record_on_delete)); } Status AggregateMergeFunction::Add(KeyValue&& kv) { + // When removeRecordOnDelete is enabled, if we receive a DELETE row, + // mark the current row for deletion and initialize the row with input values. + if (remove_record_on_delete_ && kv.value_kind == RowKind::Delete()) { + current_delete_row_ = true; + row_ = std::make_unique(getters_.size()); + for (size_t i = 0; i < getters_.size(); i++) { + row_->SetField(i, getters_[i](*(kv.value))); + } + row_->AddDataHolder(std::move(kv.value)); + latest_kv_ = std::move(kv); + return Status::OK(); + } + + current_delete_row_ = false; bool is_retract = kv.value_kind->IsRetract(); for (size_t i = 0; i < getters_.size(); i++) { auto accumulator = getters_[i](*row_); @@ -77,7 +93,7 @@ Status AggregateMergeFunction::Add(KeyValue&& kv) { Result> AggregateMergeFunction::GetResult() { assert(latest_kv_); latest_kv_.value().value = std::move(row_); - latest_kv_.value().value_kind = RowKind::Insert(); + latest_kv_.value().value_kind = current_delete_row_ ? RowKind::Delete() : RowKind::Insert(); latest_kv_.value().level = KeyValue::UNKNOWN_LEVEL; return std::move(latest_kv_); } diff --git a/src/paimon/core/mergetree/compact/aggregate/aggregate_merge_function.h b/src/paimon/core/mergetree/compact/aggregate/aggregate_merge_function.h index 5a2cd5182..f54b57e27 100644 --- a/src/paimon/core/mergetree/compact/aggregate/aggregate_merge_function.h +++ b/src/paimon/core/mergetree/compact/aggregate/aggregate_merge_function.h @@ -51,6 +51,7 @@ class AggregateMergeFunction : public MergeFunction { void Reset() override { latest_kv_ = std::nullopt; + current_delete_row_ = false; row_ = std::make_unique(getters_.size()); for (const auto& agg : aggregators_) { agg->Reset(); @@ -63,9 +64,11 @@ class AggregateMergeFunction : public MergeFunction { private: AggregateMergeFunction(std::vector&& getters, - std::vector>&& aggregators) + std::vector>&& aggregators, + bool remove_record_on_delete) : getters_(std::move(getters)), aggregators_(std::move(aggregators)), + remove_record_on_delete_(remove_record_on_delete), row_(std::make_unique(getters_.size())) { assert(getters_.size() == aggregators_.size()); } @@ -76,6 +79,8 @@ class AggregateMergeFunction : public MergeFunction { private: std::vector getters_; std::vector> aggregators_; + bool remove_record_on_delete_; + bool current_delete_row_ = false; std::optional latest_kv_; std::unique_ptr row_; }; diff --git a/src/paimon/core/mergetree/compact/aggregate/aggregate_merge_function_test.cpp b/src/paimon/core/mergetree/compact/aggregate/aggregate_merge_function_test.cpp index 3554f9061..b6c5da675 100644 --- a/src/paimon/core/mergetree/compact/aggregate/aggregate_merge_function_test.cpp +++ b/src/paimon/core/mergetree/compact/aggregate/aggregate_merge_function_test.cpp @@ -191,4 +191,109 @@ TEST(AggregateMergeFunctionTest, TestSequenceFields) { KeyValueChecker::CheckResult(expected, result_kv, /*key_arity=*/1, /*value_arity=*/4); } +TEST(AggregateMergeFunctionTest, TestRemoveRecordOnDelete) { + arrow::FieldVector fields = {arrow::field("k0", arrow::int32()), + arrow::field("v0", arrow::int32())}; + auto value_schema = arrow::schema(fields); + ASSERT_OK_AND_ASSIGN( + CoreOptions core_options, + CoreOptions::FromMap({{Options::FIELDS_DEFAULT_AGG_FUNC, "sum"}, + {Options::AGGREGATION_REMOVE_RECORD_ON_DELETE, "true"}})); + ASSERT_OK_AND_ASSIGN( + std::unique_ptr merge_func, + AggregateMergeFunction::Create(value_schema, /*primary_keys=*/{"k0"}, core_options)); + + auto pool = GetDefaultPool(); + + // Case 1: INSERT + INSERT, then DELETE -> result should be RowKind::Delete + { + merge_func->Reset(); + KeyValue kv1(RowKind::Insert(), /*sequence_number=*/0, /*level=*/0, + BinaryRowGenerator::GenerateRowPtr({10}, pool.get()), + BinaryRowGenerator::GenerateRowPtr({10, 100}, pool.get())); + KeyValue kv2(RowKind::Insert(), /*sequence_number=*/1, /*level=*/0, + BinaryRowGenerator::GenerateRowPtr({10}, pool.get()), + BinaryRowGenerator::GenerateRowPtr({10, 200}, pool.get())); + KeyValue kv3(RowKind::Delete(), /*sequence_number=*/2, /*level=*/0, + BinaryRowGenerator::GenerateRowPtr({10}, pool.get()), + BinaryRowGenerator::GenerateRowPtr({10, 300}, pool.get())); + ASSERT_OK(merge_func->Add(std::move(kv1))); + ASSERT_OK(merge_func->Add(std::move(kv2))); + ASSERT_OK(merge_func->Add(std::move(kv3))); + auto result_kv = std::move(merge_func->GetResult().value().value()); + // Should return DELETE row kind with the original values from the delete record + KeyValue expected(RowKind::Delete(), /*sequence_number=*/2, + /*level=*/KeyValue::UNKNOWN_LEVEL, + BinaryRowGenerator::GenerateRowPtr({10}, pool.get()), + BinaryRowGenerator::GenerateRowPtr({10, 300}, pool.get())); + KeyValueChecker::CheckResult(expected, result_kv, /*key_arity=*/1, /*value_arity=*/2); + } + + // Case 2: Only INSERT rows, no DELETE -> result should be RowKind::Insert with aggregated + // values + { + merge_func->Reset(); + KeyValue kv1(RowKind::Insert(), /*sequence_number=*/0, /*level=*/0, + BinaryRowGenerator::GenerateRowPtr({10}, pool.get()), + BinaryRowGenerator::GenerateRowPtr({10, 100}, pool.get())); + KeyValue kv2(RowKind::Insert(), /*sequence_number=*/1, /*level=*/0, + BinaryRowGenerator::GenerateRowPtr({10}, pool.get()), + BinaryRowGenerator::GenerateRowPtr({10, 200}, pool.get())); + ASSERT_OK(merge_func->Add(std::move(kv1))); + ASSERT_OK(merge_func->Add(std::move(kv2))); + auto result_kv = std::move(merge_func->GetResult().value().value()); + // Should return INSERT with sum aggregation: 100 + 200 = 300 + KeyValue expected(RowKind::Insert(), /*sequence_number=*/1, + /*level=*/KeyValue::UNKNOWN_LEVEL, + BinaryRowGenerator::GenerateRowPtr({10}, pool.get()), + BinaryRowGenerator::GenerateRowPtr({10, 300}, pool.get())); + KeyValueChecker::CheckResult(expected, result_kv, /*key_arity=*/1, /*value_arity=*/2); + } + + // Case 3: DELETE only -> result should be RowKind::Delete + { + merge_func->Reset(); + KeyValue kv1(RowKind::Delete(), /*sequence_number=*/0, /*level=*/0, + BinaryRowGenerator::GenerateRowPtr({10}, pool.get()), + BinaryRowGenerator::GenerateRowPtr({10, 100}, pool.get())); + ASSERT_OK(merge_func->Add(std::move(kv1))); + auto result_kv = std::move(merge_func->GetResult().value().value()); + KeyValue expected(RowKind::Delete(), /*sequence_number=*/0, + /*level=*/KeyValue::UNKNOWN_LEVEL, + BinaryRowGenerator::GenerateRowPtr({10}, pool.get()), + BinaryRowGenerator::GenerateRowPtr({10, 100}, pool.get())); + KeyValueChecker::CheckResult(expected, result_kv, /*key_arity=*/1, /*value_arity=*/2); + } +} + +TEST(AggregateMergeFunctionTest, TestDeleteWithoutRemoveRecordOnDelete) { + // Without removeRecordOnDelete, DELETE row should be treated as retract (subtract) + arrow::FieldVector fields = {arrow::field("k0", arrow::int32()), + arrow::field("v0", arrow::int32())}; + auto value_schema = arrow::schema(fields); + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, + CoreOptions::FromMap({{Options::FIELDS_DEFAULT_AGG_FUNC, "sum"}})); + ASSERT_OK_AND_ASSIGN( + std::unique_ptr merge_func, + AggregateMergeFunction::Create(value_schema, /*primary_keys=*/{"k0"}, core_options)); + + auto pool = GetDefaultPool(); + merge_func->Reset(); + KeyValue kv1(RowKind::Insert(), /*sequence_number=*/0, /*level=*/0, + BinaryRowGenerator::GenerateRowPtr({10}, pool.get()), + BinaryRowGenerator::GenerateRowPtr({10, 200}, pool.get())); + KeyValue kv2(RowKind::Delete(), /*sequence_number=*/1, /*level=*/0, + BinaryRowGenerator::GenerateRowPtr({10}, pool.get()), + BinaryRowGenerator::GenerateRowPtr({10, 300}, pool.get())); + ASSERT_OK(merge_func->Add(std::move(kv1))); + ASSERT_OK(merge_func->Add(std::move(kv2))); + auto result_kv = std::move(merge_func->GetResult().value().value()); + // Without removeRecordOnDelete, DELETE is retract: 200 - 300 = -100, result is INSERT + KeyValue expected(RowKind::Insert(), /*sequence_number=*/1, + /*level=*/KeyValue::UNKNOWN_LEVEL, + BinaryRowGenerator::GenerateRowPtr({10}, pool.get()), + BinaryRowGenerator::GenerateRowPtr({10, -100}, pool.get())); + KeyValueChecker::CheckResult(expected, result_kv, /*key_arity=*/1, /*value_arity=*/2); +} + } // namespace paimon::test diff --git a/src/paimon/core/mergetree/compact/aggregate/field_aggregator_factory.h b/src/paimon/core/mergetree/compact/aggregate/field_aggregator_factory.h index 2c0dc8707..40584d17b 100644 --- a/src/paimon/core/mergetree/compact/aggregate/field_aggregator_factory.h +++ b/src/paimon/core/mergetree/compact/aggregate/field_aggregator_factory.h @@ -76,7 +76,15 @@ class FieldAggregatorFactory { "Use unsupported aggregation {} or spell aggregate function incorrectly!", str_agg)); } + bool remove_record_on_retract = options.AggregationRemoveRecordOnDelete(); PAIMON_ASSIGN_OR_RAISE(bool ignore_retract, options.FieldAggIgnoreRetract(field_name)); + if (remove_record_on_retract && ignore_retract) { + return Status::Invalid(fmt::format( + "{} and {}.{}.{} have conflicting behavior so should not be enabled at the same " + "time.", + Options::AGGREGATION_REMOVE_RECORD_ON_DELETE, Options::FIELDS_PREFIX, field_name, + Options::IGNORE_RETRACT)); + } if (ignore_retract) { field_aggregator = std::make_unique(std::move(field_aggregator)); } diff --git a/src/paimon/core/mergetree/compact/aggregate/field_aggregator_factory_test.cpp b/src/paimon/core/mergetree/compact/aggregate/field_aggregator_factory_test.cpp index 2349d821d..b62e10924 100644 --- a/src/paimon/core/mergetree/compact/aggregate/field_aggregator_factory_test.cpp +++ b/src/paimon/core/mergetree/compact/aggregate/field_aggregator_factory_test.cpp @@ -114,4 +114,14 @@ TEST(FieldAggregatorFactoryTest, TestSimple) { } } +TEST(FieldAggregatorFactoryTest, TestRemoveRecordOnDeleteConflictsWithIgnoreRetract) { + ASSERT_OK_AND_ASSIGN( + CoreOptions options, + CoreOptions::FromMap({{Options::AGGREGATION_REMOVE_RECORD_ON_DELETE, "true"}, + {"fields.f0.ignore-retract", "true"}})); + auto result = FieldAggregatorFactory::CreateFieldAggregator("f0", arrow::int32(), "sum", options); + ASSERT_FALSE(result.ok()); + ASSERT_TRUE(result.status().message().find("conflicting behavior") != std::string::npos); +} + } // namespace paimon::test