From 3fd43ac31fd7036726eef27df8a3271d0d042ec8 Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Sat, 11 Apr 2026 17:05:09 +0800 Subject: [PATCH 1/6] feat(compact): add FieldListaggAgg aggregate functions --- include/paimon/defs.h | 4 + src/paimon/CMakeLists.txt | 1 + src/paimon/common/defs.cpp | 2 + src/paimon/core/core_options.cpp | 18 +++ src/paimon/core/core_options.h | 2 + .../aggregate/field_aggregator_factory.h | 4 + .../compact/aggregate/field_listagg_agg.h | 128 +++++++++++++++ .../aggregate/field_listagg_agg_test.cpp | 153 ++++++++++++++++++ 8 files changed, 312 insertions(+) create mode 100644 src/paimon/core/mergetree/compact/aggregate/field_listagg_agg.h create mode 100644 src/paimon/core/mergetree/compact/aggregate/field_listagg_agg_test.cpp diff --git a/include/paimon/defs.h b/include/paimon/defs.h index 3ea0da43c..dbb112143 100644 --- a/include/paimon/defs.h +++ b/include/paimon/defs.h @@ -79,6 +79,10 @@ struct PAIMON_EXPORT Options { static const char DEFAULT_AGG_FUNCTION[]; /// IGNORE_RETRACT is "ignore-retract" static const char IGNORE_RETRACT[]; + /// DISTINCT is "distinct" + static const char DISTINCT[]; + /// LIST_AGG_DELIMITER is "list-agg-delimiter" + static const char LIST_AGG_DELIMITER[]; /// SEQUENCE_GROUP is "sequence-group" static const char SEQUENCE_GROUP[]; /// @} diff --git a/src/paimon/CMakeLists.txt b/src/paimon/CMakeLists.txt index 56044ab86..c3b33575c 100644 --- a/src/paimon/CMakeLists.txt +++ b/src/paimon/CMakeLists.txt @@ -574,6 +574,7 @@ if(PAIMON_BUILD_TESTS) core/mergetree/compact/aggregate/field_ignore_retract_agg_test.cpp core/mergetree/compact/aggregate/field_last_non_null_value_agg_test.cpp core/mergetree/compact/aggregate/field_last_value_agg_test.cpp + core/mergetree/compact/aggregate/field_listagg_agg_test.cpp core/mergetree/compact/aggregate/field_min_max_agg_test.cpp core/mergetree/compact/aggregate/field_primary_key_agg_test.cpp core/mergetree/compact/aggregate/field_sum_agg_test.cpp diff --git a/src/paimon/common/defs.cpp b/src/paimon/common/defs.cpp index 113d1327b..e0eaa520f 100644 --- a/src/paimon/common/defs.cpp +++ b/src/paimon/common/defs.cpp @@ -23,6 +23,8 @@ const char Options::FIELDS_PREFIX[] = "fields"; const char Options::AGG_FUNCTION[] = "aggregate-function"; const char Options::DEFAULT_AGG_FUNCTION[] = "default-aggregate-function"; const char Options::IGNORE_RETRACT[] = "ignore-retract"; +const char Options::DISTINCT[] = "distinct"; +const char Options::LIST_AGG_DELIMITER[] = "list-agg-delimiter"; const char Options::SEQUENCE_GROUP[] = "sequence-group"; const char Options::BUCKET[] = "bucket"; diff --git a/src/paimon/core/core_options.cpp b/src/paimon/core/core_options.cpp index 3e406998a..400cbcac2 100644 --- a/src/paimon/core/core_options.cpp +++ b/src/paimon/core/core_options.cpp @@ -1004,6 +1004,24 @@ Result CoreOptions::FieldAggIgnoreRetract(const std::string& field_name) c return field_agg_ignore_retract; } +Result CoreOptions::FieldListAggDelimiter(const std::string& field_name) const { + ConfigParser parser(impl_->raw_options); + std::string delimiter = ","; + std::string key = std::string(Options::FIELDS_PREFIX) + "." + field_name + "." + + std::string(Options::LIST_AGG_DELIMITER); + PAIMON_RETURN_NOT_OK(parser.ParseString(key, &delimiter)); + return delimiter; +} + +Result CoreOptions::FieldCollectAggDistinct(const std::string& field_name) const { + ConfigParser parser(impl_->raw_options); + bool distinct = false; + std::string key = std::string(Options::FIELDS_PREFIX) + "." + field_name + "." + + std::string(Options::DISTINCT); + PAIMON_RETURN_NOT_OK(parser.Parse(key, &distinct)); + return distinct; +} + bool CoreOptions::DeletionVectorsEnabled() const { return impl_->deletion_vectors_enabled; } diff --git a/src/paimon/core/core_options.h b/src/paimon/core/core_options.h index d29b7fca0..2eb3dc691 100644 --- a/src/paimon/core/core_options.h +++ b/src/paimon/core/core_options.h @@ -108,6 +108,8 @@ class PAIMON_EXPORT CoreOptions { std::optional GetFieldsDefaultFunc() const; Result> GetFieldAggFunc(const std::string& field_name) const; Result FieldAggIgnoreRetract(const std::string& field_name) const; + Result FieldListAggDelimiter(const std::string& field_name) const; + Result FieldCollectAggDistinct(const std::string& field_name) const; bool DeletionVectorsEnabled() const; bool DeletionVectorsBitmap64() const; int64_t DeletionVectorTargetFileSize() const; diff --git a/src/paimon/core/mergetree/compact/aggregate/field_aggregator_factory.h b/src/paimon/core/mergetree/compact/aggregate/field_aggregator_factory.h index 2c0dc8707..e6462e28c 100644 --- a/src/paimon/core/mergetree/compact/aggregate/field_aggregator_factory.h +++ b/src/paimon/core/mergetree/compact/aggregate/field_aggregator_factory.h @@ -29,6 +29,7 @@ #include "paimon/core/mergetree/compact/aggregate/field_ignore_retract_agg.h" #include "paimon/core/mergetree/compact/aggregate/field_last_non_null_value_agg.h" #include "paimon/core/mergetree/compact/aggregate/field_last_value_agg.h" +#include "paimon/core/mergetree/compact/aggregate/field_listagg_agg.h" #include "paimon/core/mergetree/compact/aggregate/field_max_agg.h" #include "paimon/core/mergetree/compact/aggregate/field_min_agg.h" #include "paimon/core/mergetree/compact/aggregate/field_primary_key_agg.h" @@ -71,6 +72,9 @@ class FieldAggregatorFactory { PAIMON_ASSIGN_OR_RAISE(field_aggregator, FieldBoolOrAgg::Create(field_type)); } else if (str_agg == FieldBoolAndAgg::NAME) { PAIMON_ASSIGN_OR_RAISE(field_aggregator, FieldBoolAndAgg::Create(field_type)); + } else if (str_agg == FieldListaggAgg::NAME) { + PAIMON_ASSIGN_OR_RAISE(field_aggregator, + FieldListaggAgg::Create(field_type, options, field_name)); } else { return Status::Invalid(fmt::format( "Use unsupported aggregation {} or spell aggregate function incorrectly!", diff --git a/src/paimon/core/mergetree/compact/aggregate/field_listagg_agg.h b/src/paimon/core/mergetree/compact/aggregate/field_listagg_agg.h new file mode 100644 index 000000000..486b68b59 --- /dev/null +++ b/src/paimon/core/mergetree/compact/aggregate/field_listagg_agg.h @@ -0,0 +1,128 @@ +/* + * Copyright 2024-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "paimon/common/data/data_define.h" +#include "paimon/core/core_options.h" +#include "paimon/core/mergetree/compact/aggregate/field_aggregator.h" + +namespace arrow { +class DataType; +} // namespace arrow + +namespace paimon { +/// listagg aggregate a field of a row. +/// Concatenates string values with a delimiter. +class FieldListaggAgg : public FieldAggregator { + public: + static constexpr char NAME[] = "listagg"; + + static Result> Create( + const std::shared_ptr& field_type, const CoreOptions& options, + const std::string& field_name) { + if (field_type->id() != arrow::Type::type::STRING) { + return Status::Invalid( + fmt::format("invalid field type {} for {}, supposed to be string", + field_type->ToString(), NAME)); + } + PAIMON_ASSIGN_OR_RAISE(std::string delimiter, options.FieldListAggDelimiter(field_name)); + PAIMON_ASSIGN_OR_RAISE(bool distinct, options.FieldCollectAggDistinct(field_name)); + return std::unique_ptr( + new FieldListaggAgg(field_type, std::move(delimiter), distinct)); + } + + VariantType Agg(const VariantType& accumulator, const VariantType& input_field) override { + bool accumulator_null = DataDefine::IsVariantNull(accumulator); + bool input_null = DataDefine::IsVariantNull(input_field); + if (accumulator_null || input_null) { + return accumulator_null ? input_field : accumulator; + } + std::string_view acc_str = DataDefine::GetStringView(accumulator); + std::string_view in_str = DataDefine::GetStringView(input_field); + if (in_str.empty()) { + return accumulator; + } + if (acc_str.empty()) { + return input_field; + } + + if (distinct_) { + result_ = AggDistinctImpl(acc_str, in_str); + } else { + // Build into a local string to avoid aliasing when acc_str points into result_ + std::string new_result; + new_result.reserve(acc_str.size() + delimiter_.size() + in_str.size()); + new_result.append(acc_str); + new_result.append(delimiter_); + new_result.append(in_str); + result_ = std::move(new_result); + } + return std::string_view(result_); + } + + private: + std::string AggDistinctImpl(std::string_view acc_str, std::string_view in_str) { + // Split accumulator tokens into a set for dedup + std::unordered_set seen; + std::string_view remaining = acc_str; + while (true) { + size_t pos = remaining.find(delimiter_); + std::string_view token = + (pos == std::string_view::npos) ? remaining : remaining.substr(0, pos); + if (!token.empty()) { + seen.insert(token); + } + if (pos == std::string_view::npos) { + break; + } + remaining = remaining.substr(pos + delimiter_.size()); + } + + // Start with the full accumulator, then append delimiter + new distinct tokens from input + std::string result(acc_str); + remaining = in_str; + while (true) { + size_t pos = remaining.find(delimiter_); + std::string_view token = + (pos == std::string_view::npos) ? remaining : remaining.substr(0, pos); + if (!token.empty() && seen.insert(token).second) { + result += delimiter_; + result += token; + } + if (pos == std::string_view::npos) { + break; + } + remaining = remaining.substr(pos + delimiter_.size()); + } + return result; + } + + explicit FieldListaggAgg(const std::shared_ptr& field_type, + std::string delimiter, bool distinct) + : FieldAggregator(std::string(NAME), field_type), + delimiter_(std::move(delimiter)), + distinct_(distinct) {} + + std::string delimiter_; + bool distinct_; + std::string result_; +}; +} // namespace paimon diff --git a/src/paimon/core/mergetree/compact/aggregate/field_listagg_agg_test.cpp b/src/paimon/core/mergetree/compact/aggregate/field_listagg_agg_test.cpp new file mode 100644 index 000000000..65d6efdd9 --- /dev/null +++ b/src/paimon/core/mergetree/compact/aggregate/field_listagg_agg_test.cpp @@ -0,0 +1,153 @@ +/* + * Copyright 2024-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/mergetree/compact/aggregate/field_listagg_agg.h" + +#include +#include +#include + +#include "arrow/type.h" +#include "arrow/type_fwd.h" +#include "gtest/gtest.h" +#include "paimon/core/core_options.h" +#include "paimon/status.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +static CoreOptions CreateOptions(const std::map& opts = {}) { + auto result = CoreOptions::FromMap(opts); + assert(result.ok()); + return std::move(result.value()); +} + +TEST(FieldListaggAggTest, TestSimple) { + auto options = CreateOptions({{"fields.f.aggregate-function", "listagg"}}); + ASSERT_OK_AND_ASSIGN(std::unique_ptr field_listagg_agg, + FieldListaggAgg::Create(arrow::utf8(), options, "f")); + auto agg_ret = field_listagg_agg->Agg(std::string_view("hello"), std::string_view(" world")); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), "hello, world"); +} + +TEST(FieldListaggAggTest, TestDelimiter) { + auto options = CreateOptions( + {{"fields.f.aggregate-function", "listagg"}, {"fields.f.list-agg-delimiter", "-"}}); + ASSERT_OK_AND_ASSIGN(std::unique_ptr field_listagg_agg, + FieldListaggAgg::Create(arrow::utf8(), options, "f")); + auto agg_ret = field_listagg_agg->Agg(std::string_view("user1"), std::string_view("user2")); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), "user1-user2"); +} + +TEST(FieldListaggAggTest, TestNull) { + auto options = CreateOptions({{"fields.f.aggregate-function", "listagg"}}); + ASSERT_OK_AND_ASSIGN(std::unique_ptr field_listagg_agg, + FieldListaggAgg::Create(arrow::utf8(), options, "f")); + // input null -> return accumulator + { + auto agg_ret = field_listagg_agg->Agg(std::string_view("hello"), NullType()); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), "hello"); + } + // accumulator null -> return input + { + auto agg_ret = field_listagg_agg->Agg(NullType(), std::string_view("world")); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), "world"); + } + // both null -> return null + { + auto agg_ret = field_listagg_agg->Agg(NullType(), NullType()); + ASSERT_TRUE(DataDefine::IsVariantNull(agg_ret)); + } +} + +TEST(FieldListaggAggTest, TestEmptyString) { + auto options = CreateOptions({{"fields.f.aggregate-function", "listagg"}}); + ASSERT_OK_AND_ASSIGN(std::unique_ptr field_listagg_agg, + FieldListaggAgg::Create(arrow::utf8(), options, "f")); + // empty input -> return accumulator + { + auto agg_ret = field_listagg_agg->Agg(std::string_view("hello"), std::string_view("")); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), "hello"); + } + // empty accumulator -> return input + { + auto agg_ret = field_listagg_agg->Agg(std::string_view(""), std::string_view("world")); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), "world"); + } + // both empty -> return input (which is empty) + { + auto agg_ret = field_listagg_agg->Agg(std::string_view(""), std::string_view("")); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), ""); + } +} + +TEST(FieldListaggAggTest, TestMultipleAccumulation) { + auto options = CreateOptions({{"fields.f.aggregate-function", "listagg"}}); + ASSERT_OK_AND_ASSIGN(std::unique_ptr field_listagg_agg, + FieldListaggAgg::Create(arrow::utf8(), options, "f")); + // simulate iteratively accumulating with default delimiter ",": + // "a" + "," + "b" = "a,b", then "a,b" + "," + "c" = "a,b,c" + auto ret = field_listagg_agg->Agg(std::string_view("a"), std::string_view("b")); + ASSERT_EQ(DataDefine::GetVariantValue(ret), "a,b"); + ret = field_listagg_agg->Agg(std::move(ret), std::string_view("c")); + ASSERT_EQ(DataDefine::GetVariantValue(ret), "a,b,c"); +} + +TEST(FieldListaggAggTest, TestDistinct) { + auto options = CreateOptions({{"fields.f.aggregate-function", "listagg"}, + {"fields.f.list-agg-delimiter", ";"}, + {"fields.f.distinct", "true"}}); + ASSERT_OK_AND_ASSIGN(std::unique_ptr field_listagg_agg, + FieldListaggAgg::Create(arrow::utf8(), options, "f")); + + // accumulator="a;b", input="b;c" -> result="a;b;c" (deduplicate "b") + auto agg_ret = field_listagg_agg->Agg(std::string_view("a;b"), std::string_view("b;c")); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), "a;b;c"); +} + +TEST(FieldListaggAggTest, TestDistinctNoDuplicates) { + auto options = CreateOptions({{"fields.f.aggregate-function", "listagg"}, + {"fields.f.list-agg-delimiter", " "}, + {"fields.f.distinct", "true"}}); + ASSERT_OK_AND_ASSIGN(std::unique_ptr field_listagg_agg, + FieldListaggAgg::Create(arrow::utf8(), options, "f")); + + // accumulator="a b", input="c d" -> result="a b c d" (no dups to remove) + auto agg_ret = field_listagg_agg->Agg(std::string_view("a b"), std::string_view("c d")); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), "a b c d"); +} + +TEST(FieldListaggAggTest, TestDistinctEmptyInput) { + auto options = CreateOptions({{"fields.f.aggregate-function", "listagg"}, + {"fields.f.list-agg-delimiter", ";"}, + {"fields.f.distinct", "true"}}); + ASSERT_OK_AND_ASSIGN(std::unique_ptr field_listagg_agg, + FieldListaggAgg::Create(arrow::utf8(), options, "f")); + + // empty input -> return accumulator + auto agg_ret = field_listagg_agg->Agg(std::string_view("a;b"), std::string_view("")); + ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), "a;b"); +} + +TEST(FieldListaggAggTest, TestInvalidType) { + auto options = CreateOptions({{"fields.f.aggregate-function", "listagg"}}); + auto result = FieldListaggAgg::Create(arrow::int32(), options, "f"); + ASSERT_FALSE(result.ok()); + ASSERT_TRUE(result.status().ToString().find("supposed to be string") != std::string::npos) + << result.status().ToString(); +} + +} // namespace paimon::test From c0b5ed5e737f469f76352b174ac880d268b6b108 Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Mon, 13 Apr 2026 20:59:12 +0800 Subject: [PATCH 2/6] fix(compact): address code review comments on FieldListaggAgg - Reject empty delimiter when distinct is enabled to prevent infinite loop in AggDistinctImpl (find("") always returns 0) - Pre-reserve string capacity and use append() instead of += in AggDistinctImpl to avoid repeated reallocations - Replace assert() with gtest ADD_FAILURE() in test CreateOptions() to ensure failures are reported properly even under NDEBUG Co-Authored-By: Claude Opus 4.6 --- .../mergetree/compact/aggregate/field_listagg_agg.h | 12 +++++++++--- .../compact/aggregate/field_listagg_agg_test.cpp | 5 ++++- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/src/paimon/core/mergetree/compact/aggregate/field_listagg_agg.h b/src/paimon/core/mergetree/compact/aggregate/field_listagg_agg.h index 486b68b59..bc1e81eab 100644 --- a/src/paimon/core/mergetree/compact/aggregate/field_listagg_agg.h +++ b/src/paimon/core/mergetree/compact/aggregate/field_listagg_agg.h @@ -45,6 +45,10 @@ class FieldListaggAgg : public FieldAggregator { } PAIMON_ASSIGN_OR_RAISE(std::string delimiter, options.FieldListAggDelimiter(field_name)); PAIMON_ASSIGN_OR_RAISE(bool distinct, options.FieldCollectAggDistinct(field_name)); + if (distinct && delimiter.empty()) { + return Status::Invalid( + fmt::format("invalid empty delimiter for {} when distinct is enabled", NAME)); + } return std::unique_ptr( new FieldListaggAgg(field_type, std::move(delimiter), distinct)); } @@ -97,15 +101,17 @@ class FieldListaggAgg : public FieldAggregator { } // Start with the full accumulator, then append delimiter + new distinct tokens from input - std::string result(acc_str); + std::string result; + result.reserve(acc_str.size() + in_str.size()); + result.append(acc_str); remaining = in_str; while (true) { size_t pos = remaining.find(delimiter_); std::string_view token = (pos == std::string_view::npos) ? remaining : remaining.substr(0, pos); if (!token.empty() && seen.insert(token).second) { - result += delimiter_; - result += token; + result.append(delimiter_); + result.append(token); } if (pos == std::string_view::npos) { break; diff --git a/src/paimon/core/mergetree/compact/aggregate/field_listagg_agg_test.cpp b/src/paimon/core/mergetree/compact/aggregate/field_listagg_agg_test.cpp index 65d6efdd9..0d70d1dab 100644 --- a/src/paimon/core/mergetree/compact/aggregate/field_listagg_agg_test.cpp +++ b/src/paimon/core/mergetree/compact/aggregate/field_listagg_agg_test.cpp @@ -31,7 +31,10 @@ namespace paimon::test { static CoreOptions CreateOptions(const std::map& opts = {}) { auto result = CoreOptions::FromMap(opts); - assert(result.ok()); + if (!result.ok()) { + ADD_FAILURE() << result.status().ToString(); + return CoreOptions(); + } return std::move(result.value()); } From 06c875a4dc7f34cc35b92885fe5b133028d39bcb Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Tue, 14 Apr 2026 21:58:51 +0800 Subject: [PATCH 3/6] fix(compact): address reviewer comments on FieldListaggAgg PR Co-Authored-By: Claude Opus 4.6 --- include/paimon/defs.h | 3 +- src/paimon/core/core_options_test.cpp | 6 + .../compact/aggregate/field_listagg_agg.h | 14 +- .../aggregate/field_listagg_agg_test.cpp | 153 +++++++++--------- 4 files changed, 91 insertions(+), 85 deletions(-) diff --git a/include/paimon/defs.h b/include/paimon/defs.h index dbb112143..596b156fa 100644 --- a/include/paimon/defs.h +++ b/include/paimon/defs.h @@ -79,7 +79,8 @@ struct PAIMON_EXPORT Options { static const char DEFAULT_AGG_FUNCTION[]; /// IGNORE_RETRACT is "ignore-retract" static const char IGNORE_RETRACT[]; - /// DISTINCT is "distinct" + /// "distinct" - Distinct option for aggregate functions like listagg. Default value is false. + /// Example: fields.f.distinct=true to deduplicate values during aggregation. static const char DISTINCT[]; /// LIST_AGG_DELIMITER is "list-agg-delimiter" static const char LIST_AGG_DELIMITER[]; diff --git a/src/paimon/core/core_options_test.cpp b/src/paimon/core/core_options_test.cpp index 6a8a86855..1d7b2e25c 100644 --- a/src/paimon/core/core_options_test.cpp +++ b/src/paimon/core/core_options_test.cpp @@ -82,6 +82,8 @@ TEST(CoreOptionsTest, TestDefaultValue) { ASSERT_EQ(std::nullopt, core_options.GetFieldsDefaultFunc()); ASSERT_EQ(std::nullopt, core_options.GetFieldAggFunc("f0").value()); ASSERT_FALSE(core_options.FieldAggIgnoreRetract("f1").value()); + ASSERT_EQ(",", core_options.FieldListAggDelimiter("f1").value()); + ASSERT_FALSE(core_options.FieldCollectAggDistinct("f1").value()); ASSERT_FALSE(core_options.DeletionVectorsEnabled()); ASSERT_FALSE(core_options.DeletionVectorsBitmap64()); ASSERT_EQ(2 * 1024 * 1024, core_options.DeletionVectorTargetFileSize()); @@ -167,6 +169,8 @@ TEST(CoreOptionsTest, TestFromMap) { {Options::FIELDS_DEFAULT_AGG_FUNC, "sum"}, {"fields.f0.aggregate-function", "min"}, {"fields.f1.ignore-retract", "true"}, + {"fields.f2.list-agg-delimiter", " | "}, + {"fields.f2.distinct", "true"}, {Options::DELETION_VECTORS_ENABLED, "true"}, {Options::DELETION_VECTOR_BITMAP64, "true"}, {Options::DELETION_VECTOR_INDEX_FILE_TARGET_SIZE, "4MB"}, @@ -265,6 +269,8 @@ TEST(CoreOptionsTest, TestFromMap) { ASSERT_EQ("min", core_options.GetFieldAggFunc("f0").value().value()); ASSERT_TRUE(core_options.FieldAggIgnoreRetract("f1").value()); ASSERT_TRUE(core_options.FieldAggIgnoreRetract("f1").value()); + ASSERT_EQ(" | ", core_options.FieldListAggDelimiter("f2").value()); + ASSERT_TRUE(core_options.FieldCollectAggDistinct("f2").value()); ASSERT_TRUE(core_options.DeletionVectorsEnabled()); ASSERT_TRUE(core_options.DeletionVectorsBitmap64()); ASSERT_EQ(4 * 1024 * 1024, core_options.DeletionVectorTargetFileSize()); diff --git a/src/paimon/core/mergetree/compact/aggregate/field_listagg_agg.h b/src/paimon/core/mergetree/compact/aggregate/field_listagg_agg.h index bc1e81eab..163ca5b10 100644 --- a/src/paimon/core/mergetree/compact/aggregate/field_listagg_agg.h +++ b/src/paimon/core/mergetree/compact/aggregate/field_listagg_agg.h @@ -1,5 +1,5 @@ /* - * Copyright 2024-present Alibaba Inc. + * Copyright 2026-present Alibaba Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -40,14 +40,14 @@ class FieldListaggAgg : public FieldAggregator { const std::string& field_name) { if (field_type->id() != arrow::Type::type::STRING) { return Status::Invalid( - fmt::format("invalid field type {} for {}, supposed to be string", - field_type->ToString(), NAME)); + fmt::format("invalid field type {} for field '{}' of {}, supposed to be string", + field_type->ToString(), field_name, NAME)); } PAIMON_ASSIGN_OR_RAISE(std::string delimiter, options.FieldListAggDelimiter(field_name)); PAIMON_ASSIGN_OR_RAISE(bool distinct, options.FieldCollectAggDistinct(field_name)); - if (distinct && delimiter.empty()) { - return Status::Invalid( - fmt::format("invalid empty delimiter for {} when distinct is enabled", NAME)); + // When delimiter is empty, fall back to whitespace split + if (delimiter.empty()) { + delimiter = " "; } return std::unique_ptr( new FieldListaggAgg(field_type, std::move(delimiter), distinct)); @@ -83,7 +83,7 @@ class FieldListaggAgg : public FieldAggregator { } private: - std::string AggDistinctImpl(std::string_view acc_str, std::string_view in_str) { + std::string AggDistinctImpl(std::string_view acc_str, std::string_view in_str) const { // Split accumulator tokens into a set for dedup std::unordered_set seen; std::string_view remaining = acc_str; diff --git a/src/paimon/core/mergetree/compact/aggregate/field_listagg_agg_test.cpp b/src/paimon/core/mergetree/compact/aggregate/field_listagg_agg_test.cpp index 0d70d1dab..d3e5b683a 100644 --- a/src/paimon/core/mergetree/compact/aggregate/field_listagg_agg_test.cpp +++ b/src/paimon/core/mergetree/compact/aggregate/field_listagg_agg_test.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2024-present Alibaba Inc. + * Copyright 2026-present Alibaba Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,7 +20,6 @@ #include #include -#include "arrow/type.h" #include "arrow/type_fwd.h" #include "gtest/gtest.h" #include "paimon/core/core_options.h" @@ -29,124 +28,124 @@ namespace paimon::test { -static CoreOptions CreateOptions(const std::map& opts = {}) { - auto result = CoreOptions::FromMap(opts); - if (!result.ok()) { - ADD_FAILURE() << result.status().ToString(); - return CoreOptions(); +class FieldListaggAggTest : public testing::Test { + protected: + static std::unique_ptr MakeAgg(const std::string& delimiter = ",", + const bool distinct = false) { + std::map opts; + opts["fields.f.list-agg-delimiter"] = delimiter; + opts["fields.f.distinct"] = distinct ? "true" : "false"; + auto options_result = CoreOptions::FromMap(opts); + if (!options_result.ok()) { + ADD_FAILURE() << "Failed to create CoreOptions: " << options_result.status().ToString(); + return nullptr; + } + auto result = + FieldListaggAgg::Create(arrow::utf8(), std::move(options_result).value(), "f"); + if (!result.ok()) { + ADD_FAILURE() << "Failed to create FieldListaggAgg: " << result.status().ToString(); + return nullptr; + } + return std::move(result).value(); } - return std::move(result.value()); -} +}; -TEST(FieldListaggAggTest, TestSimple) { - auto options = CreateOptions({{"fields.f.aggregate-function", "listagg"}}); - ASSERT_OK_AND_ASSIGN(std::unique_ptr field_listagg_agg, - FieldListaggAgg::Create(arrow::utf8(), options, "f")); - auto agg_ret = field_listagg_agg->Agg(std::string_view("hello"), std::string_view(" world")); - ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), "hello, world"); +TEST_F(FieldListaggAggTest, TestSimple) { + auto agg = MakeAgg(); + auto ret = agg->Agg(std::string_view("hello"), std::string_view(" world")); + ASSERT_EQ(DataDefine::GetVariantValue(ret), "hello, world"); } -TEST(FieldListaggAggTest, TestDelimiter) { - auto options = CreateOptions( - {{"fields.f.aggregate-function", "listagg"}, {"fields.f.list-agg-delimiter", "-"}}); - ASSERT_OK_AND_ASSIGN(std::unique_ptr field_listagg_agg, - FieldListaggAgg::Create(arrow::utf8(), options, "f")); - auto agg_ret = field_listagg_agg->Agg(std::string_view("user1"), std::string_view("user2")); - ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), "user1-user2"); +TEST_F(FieldListaggAggTest, TestDelimiter) { + auto agg = MakeAgg("-"); + auto ret = agg->Agg(std::string_view("user1"), std::string_view("user2")); + ASSERT_EQ(DataDefine::GetVariantValue(ret), "user1-user2"); } -TEST(FieldListaggAggTest, TestNull) { - auto options = CreateOptions({{"fields.f.aggregate-function", "listagg"}}); - ASSERT_OK_AND_ASSIGN(std::unique_ptr field_listagg_agg, - FieldListaggAgg::Create(arrow::utf8(), options, "f")); +TEST_F(FieldListaggAggTest, TestNull) { + auto agg = MakeAgg(); + // input null -> return accumulator { - auto agg_ret = field_listagg_agg->Agg(std::string_view("hello"), NullType()); - ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), "hello"); + auto ret = agg->Agg(std::string_view("hello"), NullType()); + ASSERT_EQ(DataDefine::GetVariantValue(ret), "hello"); } // accumulator null -> return input { - auto agg_ret = field_listagg_agg->Agg(NullType(), std::string_view("world")); - ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), "world"); + auto ret = agg->Agg(NullType(), std::string_view("world")); + ASSERT_EQ(DataDefine::GetVariantValue(ret), "world"); } // both null -> return null { - auto agg_ret = field_listagg_agg->Agg(NullType(), NullType()); - ASSERT_TRUE(DataDefine::IsVariantNull(agg_ret)); + auto ret = agg->Agg(NullType(), NullType()); + ASSERT_TRUE(DataDefine::IsVariantNull(ret)); } } -TEST(FieldListaggAggTest, TestEmptyString) { - auto options = CreateOptions({{"fields.f.aggregate-function", "listagg"}}); - ASSERT_OK_AND_ASSIGN(std::unique_ptr field_listagg_agg, - FieldListaggAgg::Create(arrow::utf8(), options, "f")); +TEST_F(FieldListaggAggTest, TestEmptyString) { + auto agg = MakeAgg(); + // empty input -> return accumulator { - auto agg_ret = field_listagg_agg->Agg(std::string_view("hello"), std::string_view("")); - ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), "hello"); + auto ret = agg->Agg(std::string_view("hello"), std::string_view("")); + ASSERT_EQ(DataDefine::GetVariantValue(ret), "hello"); } // empty accumulator -> return input { - auto agg_ret = field_listagg_agg->Agg(std::string_view(""), std::string_view("world")); - ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), "world"); + auto ret = agg->Agg(std::string_view(""), std::string_view("world")); + ASSERT_EQ(DataDefine::GetVariantValue(ret), "world"); } // both empty -> return input (which is empty) { - auto agg_ret = field_listagg_agg->Agg(std::string_view(""), std::string_view("")); - ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), ""); + auto ret = agg->Agg(std::string_view(""), std::string_view("")); + ASSERT_EQ(DataDefine::GetVariantValue(ret), ""); } } -TEST(FieldListaggAggTest, TestMultipleAccumulation) { - auto options = CreateOptions({{"fields.f.aggregate-function", "listagg"}}); - ASSERT_OK_AND_ASSIGN(std::unique_ptr field_listagg_agg, - FieldListaggAgg::Create(arrow::utf8(), options, "f")); - // simulate iteratively accumulating with default delimiter ",": +TEST_F(FieldListaggAggTest, TestMultipleAccumulation) { + auto agg = MakeAgg(); + // "a" + "," + "b" = "a,b", then "a,b" + "," + "c" = "a,b,c" - auto ret = field_listagg_agg->Agg(std::string_view("a"), std::string_view("b")); + auto ret = agg->Agg(std::string_view("a"), std::string_view("b")); ASSERT_EQ(DataDefine::GetVariantValue(ret), "a,b"); - ret = field_listagg_agg->Agg(std::move(ret), std::string_view("c")); + ret = agg->Agg(ret, std::string_view("c")); ASSERT_EQ(DataDefine::GetVariantValue(ret), "a,b,c"); } -TEST(FieldListaggAggTest, TestDistinct) { - auto options = CreateOptions({{"fields.f.aggregate-function", "listagg"}, - {"fields.f.list-agg-delimiter", ";"}, - {"fields.f.distinct", "true"}}); - ASSERT_OK_AND_ASSIGN(std::unique_ptr field_listagg_agg, - FieldListaggAgg::Create(arrow::utf8(), options, "f")); +TEST_F(FieldListaggAggTest, TestDistinct) { + auto agg = MakeAgg(";", true); - // accumulator="a;b", input="b;c" -> result="a;b;c" (deduplicate "b") - auto agg_ret = field_listagg_agg->Agg(std::string_view("a;b"), std::string_view("b;c")); - ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), "a;b;c"); + // "a;b" + "b;c" -> "a;b;c" (deduplicate "b") + auto ret = agg->Agg(std::string_view("a;b"), std::string_view("b;c")); + ASSERT_EQ(DataDefine::GetVariantValue(ret), "a;b;c"); } -TEST(FieldListaggAggTest, TestDistinctNoDuplicates) { - auto options = CreateOptions({{"fields.f.aggregate-function", "listagg"}, - {"fields.f.list-agg-delimiter", " "}, - {"fields.f.distinct", "true"}}); - ASSERT_OK_AND_ASSIGN(std::unique_ptr field_listagg_agg, - FieldListaggAgg::Create(arrow::utf8(), options, "f")); +TEST_F(FieldListaggAggTest, TestDistinctNoDuplicates) { + auto agg = MakeAgg(" ", true); - // accumulator="a b", input="c d" -> result="a b c d" (no dups to remove) - auto agg_ret = field_listagg_agg->Agg(std::string_view("a b"), std::string_view("c d")); - ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), "a b c d"); + // "a b" + "c d" -> "a b c d" (no dups to remove) + auto ret = agg->Agg(std::string_view("a b"), std::string_view("c d")); + ASSERT_EQ(DataDefine::GetVariantValue(ret), "a b c d"); } -TEST(FieldListaggAggTest, TestDistinctEmptyInput) { - auto options = CreateOptions({{"fields.f.aggregate-function", "listagg"}, - {"fields.f.list-agg-delimiter", ";"}, - {"fields.f.distinct", "true"}}); - ASSERT_OK_AND_ASSIGN(std::unique_ptr field_listagg_agg, - FieldListaggAgg::Create(arrow::utf8(), options, "f")); +TEST_F(FieldListaggAggTest, TestDistinctEmptyInput) { + auto agg = MakeAgg(";", true); // empty input -> return accumulator - auto agg_ret = field_listagg_agg->Agg(std::string_view("a;b"), std::string_view("")); - ASSERT_EQ(DataDefine::GetVariantValue(agg_ret), "a;b"); + auto ret = agg->Agg(std::string_view("a;b"), std::string_view("")); + ASSERT_EQ(DataDefine::GetVariantValue(ret), "a;b"); +} + +TEST_F(FieldListaggAggTest, TestDistinctFalse) { + auto agg = MakeAgg(";", false); + + // "a;b" + "b;c" -> "a;b;b;c" (no dedup) + auto ret = agg->Agg(std::string_view("a;b"), std::string_view("b;c")); + ASSERT_EQ(DataDefine::GetVariantValue(ret), "a;b;b;c"); } -TEST(FieldListaggAggTest, TestInvalidType) { - auto options = CreateOptions({{"fields.f.aggregate-function", "listagg"}}); +TEST_F(FieldListaggAggTest, TestInvalidType) { + EXPECT_OK_AND_ASSIGN(auto options, CoreOptions::FromMap({})); auto result = FieldListaggAgg::Create(arrow::int32(), options, "f"); ASSERT_FALSE(result.ok()); ASSERT_TRUE(result.status().ToString().find("supposed to be string") != std::string::npos) From 358cb3f8852d3aaa96779ca4498d9ee7088c8772 Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Tue, 14 Apr 2026 23:02:36 +0800 Subject: [PATCH 4/6] 1 --- include/paimon/defs.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/include/paimon/defs.h b/include/paimon/defs.h index 596b156fa..f54e279da 100644 --- a/include/paimon/defs.h +++ b/include/paimon/defs.h @@ -82,7 +82,8 @@ struct PAIMON_EXPORT Options { /// "distinct" - Distinct option for aggregate functions like listagg. Default value is false. /// Example: fields.f.distinct=true to deduplicate values during aggregation. static const char DISTINCT[]; - /// LIST_AGG_DELIMITER is "list-agg-delimiter" + /// "list-agg-delimiter" - Delimiter for listagg aggregate function. Default value is ",". + /// Example: fields.f.list-agg-delimiter="-" to concatenate values with "-". static const char LIST_AGG_DELIMITER[]; /// SEQUENCE_GROUP is "sequence-group" static const char SEQUENCE_GROUP[]; From 7ab3d7a88fa14d76e3b24ac94226a0211fc821fb Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Tue, 14 Apr 2026 23:08:59 +0800 Subject: [PATCH 5/6] 1 --- .../aggregate/field_listagg_agg_test.cpp | 38 +++++++------------ 1 file changed, 14 insertions(+), 24 deletions(-) diff --git a/src/paimon/core/mergetree/compact/aggregate/field_listagg_agg_test.cpp b/src/paimon/core/mergetree/compact/aggregate/field_listagg_agg_test.cpp index d3e5b683a..f8d02022f 100644 --- a/src/paimon/core/mergetree/compact/aggregate/field_listagg_agg_test.cpp +++ b/src/paimon/core/mergetree/compact/aggregate/field_listagg_agg_test.cpp @@ -30,40 +30,30 @@ namespace paimon::test { class FieldListaggAggTest : public testing::Test { protected: - static std::unique_ptr MakeAgg(const std::string& delimiter = ",", - const bool distinct = false) { + static Result> MakeAgg(const std::string& delimiter = ",", + bool distinct = false) { std::map opts; opts["fields.f.list-agg-delimiter"] = delimiter; opts["fields.f.distinct"] = distinct ? "true" : "false"; - auto options_result = CoreOptions::FromMap(opts); - if (!options_result.ok()) { - ADD_FAILURE() << "Failed to create CoreOptions: " << options_result.status().ToString(); - return nullptr; - } - auto result = - FieldListaggAgg::Create(arrow::utf8(), std::move(options_result).value(), "f"); - if (!result.ok()) { - ADD_FAILURE() << "Failed to create FieldListaggAgg: " << result.status().ToString(); - return nullptr; - } - return std::move(result).value(); + PAIMON_ASSIGN_OR_RAISE(auto options, CoreOptions::FromMap(opts)); + return FieldListaggAgg::Create(arrow::utf8(), std::move(options), "f"); } }; TEST_F(FieldListaggAggTest, TestSimple) { - auto agg = MakeAgg(); + ASSERT_OK_AND_ASSIGN(auto agg, MakeAgg()); auto ret = agg->Agg(std::string_view("hello"), std::string_view(" world")); ASSERT_EQ(DataDefine::GetVariantValue(ret), "hello, world"); } TEST_F(FieldListaggAggTest, TestDelimiter) { - auto agg = MakeAgg("-"); + ASSERT_OK_AND_ASSIGN(auto agg, MakeAgg("-")); auto ret = agg->Agg(std::string_view("user1"), std::string_view("user2")); ASSERT_EQ(DataDefine::GetVariantValue(ret), "user1-user2"); } TEST_F(FieldListaggAggTest, TestNull) { - auto agg = MakeAgg(); + ASSERT_OK_AND_ASSIGN(auto agg, MakeAgg()); // input null -> return accumulator { @@ -83,7 +73,7 @@ TEST_F(FieldListaggAggTest, TestNull) { } TEST_F(FieldListaggAggTest, TestEmptyString) { - auto agg = MakeAgg(); + ASSERT_OK_AND_ASSIGN(auto agg, MakeAgg()); // empty input -> return accumulator { @@ -103,17 +93,17 @@ TEST_F(FieldListaggAggTest, TestEmptyString) { } TEST_F(FieldListaggAggTest, TestMultipleAccumulation) { - auto agg = MakeAgg(); + ASSERT_OK_AND_ASSIGN(auto agg, MakeAgg()); // "a" + "," + "b" = "a,b", then "a,b" + "," + "c" = "a,b,c" auto ret = agg->Agg(std::string_view("a"), std::string_view("b")); ASSERT_EQ(DataDefine::GetVariantValue(ret), "a,b"); - ret = agg->Agg(ret, std::string_view("c")); + ret = agg->Agg(std::move(ret), std::string_view("c")); ASSERT_EQ(DataDefine::GetVariantValue(ret), "a,b,c"); } TEST_F(FieldListaggAggTest, TestDistinct) { - auto agg = MakeAgg(";", true); + ASSERT_OK_AND_ASSIGN(auto agg, MakeAgg(";", true)); // "a;b" + "b;c" -> "a;b;c" (deduplicate "b") auto ret = agg->Agg(std::string_view("a;b"), std::string_view("b;c")); @@ -121,7 +111,7 @@ TEST_F(FieldListaggAggTest, TestDistinct) { } TEST_F(FieldListaggAggTest, TestDistinctNoDuplicates) { - auto agg = MakeAgg(" ", true); + ASSERT_OK_AND_ASSIGN(auto agg, MakeAgg(" ", true)); // "a b" + "c d" -> "a b c d" (no dups to remove) auto ret = agg->Agg(std::string_view("a b"), std::string_view("c d")); @@ -129,7 +119,7 @@ TEST_F(FieldListaggAggTest, TestDistinctNoDuplicates) { } TEST_F(FieldListaggAggTest, TestDistinctEmptyInput) { - auto agg = MakeAgg(";", true); + ASSERT_OK_AND_ASSIGN(auto agg, MakeAgg(";", true)); // empty input -> return accumulator auto ret = agg->Agg(std::string_view("a;b"), std::string_view("")); @@ -137,7 +127,7 @@ TEST_F(FieldListaggAggTest, TestDistinctEmptyInput) { } TEST_F(FieldListaggAggTest, TestDistinctFalse) { - auto agg = MakeAgg(";", false); + ASSERT_OK_AND_ASSIGN(auto agg, MakeAgg(";", false)); // "a;b" + "b;c" -> "a;b;b;c" (no dedup) auto ret = agg->Agg(std::string_view("a;b"), std::string_view("b;c")); From fbf509489d6de7466ae83529ef05ad773d73d1d5 Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Wed, 15 Apr 2026 10:50:25 +0800 Subject: [PATCH 6/6] update --- .../core/mergetree/compact/aggregate/field_listagg_agg.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/paimon/core/mergetree/compact/aggregate/field_listagg_agg.h b/src/paimon/core/mergetree/compact/aggregate/field_listagg_agg.h index 163ca5b10..d693c7334 100644 --- a/src/paimon/core/mergetree/compact/aggregate/field_listagg_agg.h +++ b/src/paimon/core/mergetree/compact/aggregate/field_listagg_agg.h @@ -45,8 +45,8 @@ class FieldListaggAgg : public FieldAggregator { } PAIMON_ASSIGN_OR_RAISE(std::string delimiter, options.FieldListAggDelimiter(field_name)); PAIMON_ASSIGN_OR_RAISE(bool distinct, options.FieldCollectAggDistinct(field_name)); - // When delimiter is empty, fall back to whitespace split - if (delimiter.empty()) { + // When delimiter is empty and distinct is true, fall back to whitespace split. + if (distinct && delimiter.empty()) { delimiter = " "; } return std::unique_ptr(