Skip to content

Commit 7ccaec8

Browse files
author
duanyan.duan
committed
Support paimon Java config aggregation.remove-record-on-delete
1 parent 2d8da13 commit 7ccaec8

7 files changed

Lines changed: 145 additions & 4 deletions

File tree

include/paimon/defs.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,10 @@ struct PAIMON_EXPORT Options {
341341
/// "global-index.external-path" - Global index root directory, if not set, the global index
342342
/// files will be stored under the index directory.
343343
static const char GLOBAL_INDEX_EXTERNAL_PATH[];
344+
/// "aggregation.remove-record-on-delete" - Whether to remove the whole row in aggregation
345+
/// engine when delete records are received. Default value is "false".
346+
static const char AGGREGATION_REMOVE_RECORD_ON_DELETE[];
347+
344348
/// "scan.tag-name" - Optional tag name used in case of "from-snapshot" scan mode.
345349
static const char SCAN_TAG_NAME[];
346350
/// "write-only" - If set to "true", compactions and snapshot expiration will be skipped. This

src/paimon/common/defs.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ const char Options::PARTITION_GENERATE_LEGACY_NAME[] = "partition.legacy-name";
8585
const char Options::BLOB_AS_DESCRIPTOR[] = "blob-as-descriptor";
8686
const char Options::GLOBAL_INDEX_ENABLED[] = "global-index.enabled";
8787
const char Options::GLOBAL_INDEX_EXTERNAL_PATH[] = "global-index.external-path";
88+
const char Options::AGGREGATION_REMOVE_RECORD_ON_DELETE[] =
89+
"aggregation.remove-record-on-delete";
8890
const char Options::SCAN_TAG_NAME[] = "scan.tag-name";
8991
const char Options::WRITE_ONLY[] = "write-only";
9092
const char Options::COMPACTION_MIN_FILE_NUM[] = "compaction.min.file-num";

src/paimon/core/core_options.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,7 @@ struct CoreOptions::Impl {
399399
bool force_lookup = false;
400400
bool lookup_wait = true;
401401
bool partial_update_remove_record_on_delete = false;
402+
bool aggregation_remove_record_on_delete = false;
402403
bool file_index_read_enabled = true;
403404
bool enable_adaptive_prefetch_strategy = true;
404405
bool index_file_in_data_file_dir = false;
@@ -555,6 +556,10 @@ Result<CoreOptions> CoreOptions::FromMap(
555556
// Parse partial_update_remove_record_on_delete
556557
PAIMON_RETURN_NOT_OK(parser.Parse<bool>(Options::PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE,
557558
&impl->partial_update_remove_record_on_delete));
559+
// Parse aggregation_remove_record_on_delete
560+
PAIMON_RETURN_NOT_OK(parser.Parse<bool>(Options::AGGREGATION_REMOVE_RECORD_ON_DELETE,
561+
&impl->aggregation_remove_record_on_delete));
562+
558563
// Parse partial-update.remove-record-on-sequence-group
559564
PAIMON_RETURN_NOT_OK(parser.ParseList<std::string>(
560565
Options::PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP, Options::FIELDS_SEPARATOR,
@@ -1070,6 +1075,10 @@ bool CoreOptions::PartialUpdateRemoveRecordOnDelete() const {
10701075
return impl_->partial_update_remove_record_on_delete;
10711076
}
10721077

1078+
bool CoreOptions::AggregationRemoveRecordOnDelete() const {
1079+
return impl_->aggregation_remove_record_on_delete;
1080+
}
1081+
10731082
std::vector<std::string> CoreOptions::GetPartialUpdateRemoveRecordOnSequenceGroup() const {
10741083
return impl_->remove_record_on_sequence_group;
10751084
}

src/paimon/core/core_options.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ class PAIMON_EXPORT CoreOptions {
120120

121121
std::map<std::string, std::string> GetFieldsSequenceGroups() const;
122122
bool PartialUpdateRemoveRecordOnDelete() const;
123+
bool AggregationRemoveRecordOnDelete() const;
123124
std::vector<std::string> GetPartialUpdateRemoveRecordOnSequenceGroup() const;
124125

125126
std::optional<std::string> GetScanFallbackBranch() const;

src/paimon/core/mergetree/compact/aggregate/aggregate_merge_function.cpp

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,29 @@ Result<std::unique_ptr<AggregateMergeFunction>> AggregateMergeFunction::Create(
4949
aggregators.push_back(std::move(agg));
5050
}
5151

52+
bool remove_record_on_delete = options.AggregationRemoveRecordOnDelete();
53+
5254
PAIMON_ASSIGN_OR_RAISE(std::vector<InternalRow::FieldGetterFunc> getters,
5355
InternalRowUtils::CreateFieldGetters(value_schema, /*use_view=*/true));
54-
return std::unique_ptr<AggregateMergeFunction>(
55-
new AggregateMergeFunction(std::move(getters), std::move(aggregators)));
56+
return std::unique_ptr<AggregateMergeFunction>(new AggregateMergeFunction(
57+
std::move(getters), std::move(aggregators), remove_record_on_delete));
5658
}
5759

5860
Status AggregateMergeFunction::Add(KeyValue&& kv) {
61+
// When removeRecordOnDelete is enabled, if we receive a DELETE row,
62+
// mark the current row for deletion and initialize the row with input values.
63+
if (remove_record_on_delete_ && kv.value_kind == RowKind::Delete()) {
64+
current_delete_row_ = true;
65+
row_ = std::make_unique<GenericRow>(getters_.size());
66+
for (size_t i = 0; i < getters_.size(); i++) {
67+
row_->SetField(i, getters_[i](*(kv.value)));
68+
}
69+
row_->AddDataHolder(std::move(kv.value));
70+
latest_kv_ = std::move(kv);
71+
return Status::OK();
72+
}
73+
74+
current_delete_row_ = false;
5975
bool is_retract = kv.value_kind->IsRetract();
6076
for (size_t i = 0; i < getters_.size(); i++) {
6177
auto accumulator = getters_[i](*row_);
@@ -77,7 +93,7 @@ Status AggregateMergeFunction::Add(KeyValue&& kv) {
7793
Result<std::optional<KeyValue>> AggregateMergeFunction::GetResult() {
7894
assert(latest_kv_);
7995
latest_kv_.value().value = std::move(row_);
80-
latest_kv_.value().value_kind = RowKind::Insert();
96+
latest_kv_.value().value_kind = current_delete_row_ ? RowKind::Delete() : RowKind::Insert();
8197
latest_kv_.value().level = KeyValue::UNKNOWN_LEVEL;
8298
return std::move(latest_kv_);
8399
}

src/paimon/core/mergetree/compact/aggregate/aggregate_merge_function.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ class AggregateMergeFunction : public MergeFunction {
5151

5252
void Reset() override {
5353
latest_kv_ = std::nullopt;
54+
current_delete_row_ = false;
5455
row_ = std::make_unique<GenericRow>(getters_.size());
5556
for (const auto& agg : aggregators_) {
5657
agg->Reset();
@@ -63,9 +64,11 @@ class AggregateMergeFunction : public MergeFunction {
6364

6465
private:
6566
AggregateMergeFunction(std::vector<InternalRow::FieldGetterFunc>&& getters,
66-
std::vector<std::unique_ptr<FieldAggregator>>&& aggregators)
67+
std::vector<std::unique_ptr<FieldAggregator>>&& aggregators,
68+
bool remove_record_on_delete)
6769
: getters_(std::move(getters)),
6870
aggregators_(std::move(aggregators)),
71+
remove_record_on_delete_(remove_record_on_delete),
6972
row_(std::make_unique<GenericRow>(getters_.size())) {
7073
assert(getters_.size() == aggregators_.size());
7174
}
@@ -76,6 +79,8 @@ class AggregateMergeFunction : public MergeFunction {
7679
private:
7780
std::vector<InternalRow::FieldGetterFunc> getters_;
7881
std::vector<std::unique_ptr<FieldAggregator>> aggregators_;
82+
bool remove_record_on_delete_;
83+
bool current_delete_row_ = false;
7984
std::optional<KeyValue> latest_kv_;
8085
std::unique_ptr<GenericRow> row_;
8186
};

src/paimon/core/mergetree/compact/aggregate/aggregate_merge_function_test.cpp

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,4 +191,108 @@ TEST(AggregateMergeFunctionTest, TestSequenceFields) {
191191
KeyValueChecker::CheckResult(expected, result_kv, /*key_arity=*/1, /*value_arity=*/4);
192192
}
193193

194+
TEST(AggregateMergeFunctionTest, TestRemoveRecordOnDelete) {
195+
arrow::FieldVector fields = {arrow::field("k0", arrow::int32()),
196+
arrow::field("v0", arrow::int32())};
197+
auto value_schema = arrow::schema(fields);
198+
ASSERT_OK_AND_ASSIGN(
199+
CoreOptions core_options,
200+
CoreOptions::FromMap({{Options::FIELDS_DEFAULT_AGG_FUNC, "sum"},
201+
{Options::AGGREGATION_REMOVE_RECORD_ON_DELETE, "true"}}));
202+
ASSERT_OK_AND_ASSIGN(
203+
std::unique_ptr<AggregateMergeFunction> merge_func,
204+
AggregateMergeFunction::Create(value_schema, /*primary_keys=*/{"k0"}, core_options));
205+
206+
auto pool = GetDefaultPool();
207+
208+
// Case 1: INSERT + INSERT, then DELETE -> result should be RowKind::Delete
209+
{
210+
merge_func->Reset();
211+
KeyValue kv1(RowKind::Insert(), /*sequence_number=*/0, /*level=*/0,
212+
BinaryRowGenerator::GenerateRowPtr({10}, pool.get()),
213+
BinaryRowGenerator::GenerateRowPtr({10, 100}, pool.get()));
214+
KeyValue kv2(RowKind::Insert(), /*sequence_number=*/1, /*level=*/0,
215+
BinaryRowGenerator::GenerateRowPtr({10}, pool.get()),
216+
BinaryRowGenerator::GenerateRowPtr({10, 200}, pool.get()));
217+
KeyValue kv3(RowKind::Delete(), /*sequence_number=*/2, /*level=*/0,
218+
BinaryRowGenerator::GenerateRowPtr({10}, pool.get()),
219+
BinaryRowGenerator::GenerateRowPtr({10, 300}, pool.get()));
220+
ASSERT_OK(merge_func->Add(std::move(kv1)));
221+
ASSERT_OK(merge_func->Add(std::move(kv2)));
222+
ASSERT_OK(merge_func->Add(std::move(kv3)));
223+
auto result_kv = std::move(merge_func->GetResult().value().value());
224+
// Should return DELETE row kind with the original values from the delete record
225+
KeyValue expected(RowKind::Delete(), /*sequence_number=*/2,
226+
/*level=*/KeyValue::UNKNOWN_LEVEL,
227+
BinaryRowGenerator::GenerateRowPtr({10}, pool.get()),
228+
BinaryRowGenerator::GenerateRowPtr({10, 300}, pool.get()));
229+
KeyValueChecker::CheckResult(expected, result_kv, /*key_arity=*/1, /*value_arity=*/2);
230+
}
231+
232+
// Case 2: Only INSERT rows, no DELETE -> result should be RowKind::Insert with aggregated values
233+
{
234+
merge_func->Reset();
235+
KeyValue kv1(RowKind::Insert(), /*sequence_number=*/0, /*level=*/0,
236+
BinaryRowGenerator::GenerateRowPtr({10}, pool.get()),
237+
BinaryRowGenerator::GenerateRowPtr({10, 100}, pool.get()));
238+
KeyValue kv2(RowKind::Insert(), /*sequence_number=*/1, /*level=*/0,
239+
BinaryRowGenerator::GenerateRowPtr({10}, pool.get()),
240+
BinaryRowGenerator::GenerateRowPtr({10, 200}, pool.get()));
241+
ASSERT_OK(merge_func->Add(std::move(kv1)));
242+
ASSERT_OK(merge_func->Add(std::move(kv2)));
243+
auto result_kv = std::move(merge_func->GetResult().value().value());
244+
// Should return INSERT with sum aggregation: 100 + 200 = 300
245+
KeyValue expected(RowKind::Insert(), /*sequence_number=*/1,
246+
/*level=*/KeyValue::UNKNOWN_LEVEL,
247+
BinaryRowGenerator::GenerateRowPtr({10}, pool.get()),
248+
BinaryRowGenerator::GenerateRowPtr({10, 300}, pool.get()));
249+
KeyValueChecker::CheckResult(expected, result_kv, /*key_arity=*/1, /*value_arity=*/2);
250+
}
251+
252+
// Case 3: DELETE only -> result should be RowKind::Delete
253+
{
254+
merge_func->Reset();
255+
KeyValue kv1(RowKind::Delete(), /*sequence_number=*/0, /*level=*/0,
256+
BinaryRowGenerator::GenerateRowPtr({10}, pool.get()),
257+
BinaryRowGenerator::GenerateRowPtr({10, 100}, pool.get()));
258+
ASSERT_OK(merge_func->Add(std::move(kv1)));
259+
auto result_kv = std::move(merge_func->GetResult().value().value());
260+
KeyValue expected(RowKind::Delete(), /*sequence_number=*/0,
261+
/*level=*/KeyValue::UNKNOWN_LEVEL,
262+
BinaryRowGenerator::GenerateRowPtr({10}, pool.get()),
263+
BinaryRowGenerator::GenerateRowPtr({10, 100}, pool.get()));
264+
KeyValueChecker::CheckResult(expected, result_kv, /*key_arity=*/1, /*value_arity=*/2);
265+
}
266+
}
267+
268+
TEST(AggregateMergeFunctionTest, TestDeleteWithoutRemoveRecordOnDelete) {
269+
// Without removeRecordOnDelete, DELETE row should be treated as retract (subtract)
270+
arrow::FieldVector fields = {arrow::field("k0", arrow::int32()),
271+
arrow::field("v0", arrow::int32())};
272+
auto value_schema = arrow::schema(fields);
273+
ASSERT_OK_AND_ASSIGN(CoreOptions core_options,
274+
CoreOptions::FromMap({{Options::FIELDS_DEFAULT_AGG_FUNC, "sum"}}));
275+
ASSERT_OK_AND_ASSIGN(
276+
std::unique_ptr<AggregateMergeFunction> merge_func,
277+
AggregateMergeFunction::Create(value_schema, /*primary_keys=*/{"k0"}, core_options));
278+
279+
auto pool = GetDefaultPool();
280+
merge_func->Reset();
281+
KeyValue kv1(RowKind::Insert(), /*sequence_number=*/0, /*level=*/0,
282+
BinaryRowGenerator::GenerateRowPtr({10}, pool.get()),
283+
BinaryRowGenerator::GenerateRowPtr({10, 200}, pool.get()));
284+
KeyValue kv2(RowKind::Delete(), /*sequence_number=*/1, /*level=*/0,
285+
BinaryRowGenerator::GenerateRowPtr({10}, pool.get()),
286+
BinaryRowGenerator::GenerateRowPtr({10, 300}, pool.get()));
287+
ASSERT_OK(merge_func->Add(std::move(kv1)));
288+
ASSERT_OK(merge_func->Add(std::move(kv2)));
289+
auto result_kv = std::move(merge_func->GetResult().value().value());
290+
// Without removeRecordOnDelete, DELETE is retract: 200 - 300 = -100, result is INSERT
291+
KeyValue expected(RowKind::Insert(), /*sequence_number=*/1,
292+
/*level=*/KeyValue::UNKNOWN_LEVEL,
293+
BinaryRowGenerator::GenerateRowPtr({10}, pool.get()),
294+
BinaryRowGenerator::GenerateRowPtr({10, -100}, pool.get()));
295+
KeyValueChecker::CheckResult(expected, result_kv, /*key_arity=*/1, /*value_arity=*/2);
296+
}
297+
194298
} // namespace paimon::test

0 commit comments

Comments
 (0)