diff --git a/include/paimon/defs.h b/include/paimon/defs.h index d3602cf34..8b77c0e4a 100644 --- a/include/paimon/defs.h +++ b/include/paimon/defs.h @@ -79,6 +79,12 @@ struct PAIMON_EXPORT Options { static const char DEFAULT_AGG_FUNCTION[]; /// IGNORE_RETRACT is "ignore-retract" static const char IGNORE_RETRACT[]; + /// "distinct" - Distinct option for aggregate functions like listagg. Default value is false. + /// Example: fields.f.distinct=true to deduplicate values during aggregation. + static const char DISTINCT[]; + /// "list-agg-delimiter" - Delimiter for listagg aggregate function. Default value is ",". + /// Example: fields.f.list-agg-delimiter="-" to concatenate values with "-". + static const char LIST_AGG_DELIMITER[]; /// SEQUENCE_GROUP is "sequence-group" static const char SEQUENCE_GROUP[]; /// @} diff --git a/src/paimon/CMakeLists.txt b/src/paimon/CMakeLists.txt index ecda6c051..5381f2afe 100644 --- a/src/paimon/CMakeLists.txt +++ b/src/paimon/CMakeLists.txt @@ -582,6 +582,7 @@ if(PAIMON_BUILD_TESTS) core/mergetree/compact/aggregate/field_ignore_retract_agg_test.cpp core/mergetree/compact/aggregate/field_last_non_null_value_agg_test.cpp core/mergetree/compact/aggregate/field_last_value_agg_test.cpp + core/mergetree/compact/aggregate/field_listagg_agg_test.cpp core/mergetree/compact/aggregate/field_min_max_agg_test.cpp core/mergetree/compact/aggregate/field_primary_key_agg_test.cpp core/mergetree/compact/aggregate/field_sum_agg_test.cpp diff --git a/src/paimon/common/defs.cpp b/src/paimon/common/defs.cpp index 176cdd7ae..3d1865ffe 100644 --- a/src/paimon/common/defs.cpp +++ b/src/paimon/common/defs.cpp @@ -23,6 +23,8 @@ const char Options::FIELDS_PREFIX[] = "fields"; const char Options::AGG_FUNCTION[] = "aggregate-function"; const char Options::DEFAULT_AGG_FUNCTION[] = "default-aggregate-function"; const char Options::IGNORE_RETRACT[] = "ignore-retract"; +const char Options::DISTINCT[] = "distinct"; +const char Options::LIST_AGG_DELIMITER[] = "list-agg-delimiter"; const char Options::SEQUENCE_GROUP[] = "sequence-group"; const char Options::BUCKET[] = "bucket"; diff --git a/src/paimon/core/core_options.cpp b/src/paimon/core/core_options.cpp index ca29cd855..6081c8b41 100644 --- a/src/paimon/core/core_options.cpp +++ b/src/paimon/core/core_options.cpp @@ -1114,6 +1114,24 @@ Result CoreOptions::FieldAggIgnoreRetract(const std::string& field_name) c return field_agg_ignore_retract; } +Result CoreOptions::FieldListAggDelimiter(const std::string& field_name) const { + ConfigParser parser(impl_->raw_options); + std::string delimiter = ","; + std::string key = std::string(Options::FIELDS_PREFIX) + "." + field_name + "." + + std::string(Options::LIST_AGG_DELIMITER); + PAIMON_RETURN_NOT_OK(parser.ParseString(key, &delimiter)); + return delimiter; +} + +Result CoreOptions::FieldCollectAggDistinct(const std::string& field_name) const { + ConfigParser parser(impl_->raw_options); + bool distinct = false; + std::string key = std::string(Options::FIELDS_PREFIX) + "." + field_name + "." + + std::string(Options::DISTINCT); + PAIMON_RETURN_NOT_OK(parser.Parse(key, &distinct)); + return distinct; +} + bool CoreOptions::DeletionVectorsEnabled() const { return impl_->deletion_vectors_enabled; } diff --git a/src/paimon/core/core_options.h b/src/paimon/core/core_options.h index 3f18fb3eb..9fb46ad4d 100644 --- a/src/paimon/core/core_options.h +++ b/src/paimon/core/core_options.h @@ -109,6 +109,8 @@ class PAIMON_EXPORT CoreOptions { std::optional GetFieldsDefaultFunc() const; Result> GetFieldAggFunc(const std::string& field_name) const; Result FieldAggIgnoreRetract(const std::string& field_name) const; + Result FieldListAggDelimiter(const std::string& field_name) const; + Result FieldCollectAggDistinct(const std::string& field_name) const; bool DeletionVectorsEnabled() const; bool DeletionVectorsBitmap64() const; int64_t DeletionVectorTargetFileSize() const; diff --git a/src/paimon/core/core_options_test.cpp b/src/paimon/core/core_options_test.cpp index 5aaa2c2f1..6ff1e530e 100644 --- a/src/paimon/core/core_options_test.cpp +++ b/src/paimon/core/core_options_test.cpp @@ -83,6 +83,8 @@ TEST(CoreOptionsTest, TestDefaultValue) { ASSERT_EQ(std::nullopt, core_options.GetFieldsDefaultFunc()); ASSERT_EQ(std::nullopt, core_options.GetFieldAggFunc("f0").value()); ASSERT_FALSE(core_options.FieldAggIgnoreRetract("f1").value()); + ASSERT_EQ(",", core_options.FieldListAggDelimiter("f1").value()); + ASSERT_FALSE(core_options.FieldCollectAggDistinct("f1").value()); ASSERT_FALSE(core_options.DeletionVectorsEnabled()); ASSERT_FALSE(core_options.DeletionVectorsBitmap64()); ASSERT_EQ(2 * 1024 * 1024, core_options.DeletionVectorTargetFileSize()); @@ -171,6 +173,8 @@ TEST(CoreOptionsTest, TestFromMap) { {Options::FIELDS_DEFAULT_AGG_FUNC, "sum"}, {"fields.f0.aggregate-function", "min"}, {"fields.f1.ignore-retract", "true"}, + {"fields.f2.list-agg-delimiter", " | "}, + {"fields.f2.distinct", "true"}, {Options::DELETION_VECTORS_ENABLED, "true"}, {Options::DELETION_VECTOR_BITMAP64, "true"}, {Options::DELETION_VECTOR_INDEX_FILE_TARGET_SIZE, "4MB"}, @@ -272,6 +276,8 @@ TEST(CoreOptionsTest, TestFromMap) { ASSERT_EQ("min", core_options.GetFieldAggFunc("f0").value().value()); ASSERT_TRUE(core_options.FieldAggIgnoreRetract("f1").value()); ASSERT_TRUE(core_options.FieldAggIgnoreRetract("f1").value()); + ASSERT_EQ(" | ", core_options.FieldListAggDelimiter("f2").value()); + ASSERT_TRUE(core_options.FieldCollectAggDistinct("f2").value()); ASSERT_TRUE(core_options.DeletionVectorsEnabled()); ASSERT_TRUE(core_options.DeletionVectorsBitmap64()); ASSERT_EQ(4 * 1024 * 1024, core_options.DeletionVectorTargetFileSize()); diff --git a/src/paimon/core/mergetree/compact/aggregate/field_aggregator_factory.h b/src/paimon/core/mergetree/compact/aggregate/field_aggregator_factory.h index 2c0dc8707..e6462e28c 100644 --- a/src/paimon/core/mergetree/compact/aggregate/field_aggregator_factory.h +++ b/src/paimon/core/mergetree/compact/aggregate/field_aggregator_factory.h @@ -29,6 +29,7 @@ #include "paimon/core/mergetree/compact/aggregate/field_ignore_retract_agg.h" #include "paimon/core/mergetree/compact/aggregate/field_last_non_null_value_agg.h" #include "paimon/core/mergetree/compact/aggregate/field_last_value_agg.h" +#include "paimon/core/mergetree/compact/aggregate/field_listagg_agg.h" #include "paimon/core/mergetree/compact/aggregate/field_max_agg.h" #include "paimon/core/mergetree/compact/aggregate/field_min_agg.h" #include "paimon/core/mergetree/compact/aggregate/field_primary_key_agg.h" @@ -71,6 +72,9 @@ class FieldAggregatorFactory { PAIMON_ASSIGN_OR_RAISE(field_aggregator, FieldBoolOrAgg::Create(field_type)); } else if (str_agg == FieldBoolAndAgg::NAME) { PAIMON_ASSIGN_OR_RAISE(field_aggregator, FieldBoolAndAgg::Create(field_type)); + } else if (str_agg == FieldListaggAgg::NAME) { + PAIMON_ASSIGN_OR_RAISE(field_aggregator, + FieldListaggAgg::Create(field_type, options, field_name)); } else { return Status::Invalid(fmt::format( "Use unsupported aggregation {} or spell aggregate function incorrectly!", diff --git a/src/paimon/core/mergetree/compact/aggregate/field_listagg_agg.h b/src/paimon/core/mergetree/compact/aggregate/field_listagg_agg.h new file mode 100644 index 000000000..d693c7334 --- /dev/null +++ b/src/paimon/core/mergetree/compact/aggregate/field_listagg_agg.h @@ -0,0 +1,134 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "paimon/common/data/data_define.h" +#include "paimon/core/core_options.h" +#include "paimon/core/mergetree/compact/aggregate/field_aggregator.h" + +namespace arrow { +class DataType; +} // namespace arrow + +namespace paimon { +/// listagg aggregate a field of a row. +/// Concatenates string values with a delimiter. +class FieldListaggAgg : public FieldAggregator { + public: + static constexpr char NAME[] = "listagg"; + + static Result> Create( + const std::shared_ptr& field_type, const CoreOptions& options, + const std::string& field_name) { + if (field_type->id() != arrow::Type::type::STRING) { + return Status::Invalid( + fmt::format("invalid field type {} for field '{}' of {}, supposed to be string", + field_type->ToString(), field_name, NAME)); + } + PAIMON_ASSIGN_OR_RAISE(std::string delimiter, options.FieldListAggDelimiter(field_name)); + PAIMON_ASSIGN_OR_RAISE(bool distinct, options.FieldCollectAggDistinct(field_name)); + // When delimiter is empty and distinct is true, fall back to whitespace split. + if (distinct && delimiter.empty()) { + delimiter = " "; + } + return std::unique_ptr( + new FieldListaggAgg(field_type, std::move(delimiter), distinct)); + } + + VariantType Agg(const VariantType& accumulator, const VariantType& input_field) override { + bool accumulator_null = DataDefine::IsVariantNull(accumulator); + bool input_null = DataDefine::IsVariantNull(input_field); + if (accumulator_null || input_null) { + return accumulator_null ? input_field : accumulator; + } + std::string_view acc_str = DataDefine::GetStringView(accumulator); + std::string_view in_str = DataDefine::GetStringView(input_field); + if (in_str.empty()) { + return accumulator; + } + if (acc_str.empty()) { + return input_field; + } + + if (distinct_) { + result_ = AggDistinctImpl(acc_str, in_str); + } else { + // Build into a local string to avoid aliasing when acc_str points into result_ + std::string new_result; + new_result.reserve(acc_str.size() + delimiter_.size() + in_str.size()); + new_result.append(acc_str); + new_result.append(delimiter_); + new_result.append(in_str); + result_ = std::move(new_result); + } + return std::string_view(result_); + } + + private: + std::string AggDistinctImpl(std::string_view acc_str, std::string_view in_str) const { + // Split accumulator tokens into a set for dedup + std::unordered_set seen; + std::string_view remaining = acc_str; + while (true) { + size_t pos = remaining.find(delimiter_); + std::string_view token = + (pos == std::string_view::npos) ? remaining : remaining.substr(0, pos); + if (!token.empty()) { + seen.insert(token); + } + if (pos == std::string_view::npos) { + break; + } + remaining = remaining.substr(pos + delimiter_.size()); + } + + // Start with the full accumulator, then append delimiter + new distinct tokens from input + std::string result; + result.reserve(acc_str.size() + in_str.size()); + result.append(acc_str); + remaining = in_str; + while (true) { + size_t pos = remaining.find(delimiter_); + std::string_view token = + (pos == std::string_view::npos) ? remaining : remaining.substr(0, pos); + if (!token.empty() && seen.insert(token).second) { + result.append(delimiter_); + result.append(token); + } + if (pos == std::string_view::npos) { + break; + } + remaining = remaining.substr(pos + delimiter_.size()); + } + return result; + } + + explicit FieldListaggAgg(const std::shared_ptr& field_type, + std::string delimiter, bool distinct) + : FieldAggregator(std::string(NAME), field_type), + delimiter_(std::move(delimiter)), + distinct_(distinct) {} + + std::string delimiter_; + bool distinct_; + std::string result_; +}; +} // namespace paimon diff --git a/src/paimon/core/mergetree/compact/aggregate/field_listagg_agg_test.cpp b/src/paimon/core/mergetree/compact/aggregate/field_listagg_agg_test.cpp new file mode 100644 index 000000000..f8d02022f --- /dev/null +++ b/src/paimon/core/mergetree/compact/aggregate/field_listagg_agg_test.cpp @@ -0,0 +1,145 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/mergetree/compact/aggregate/field_listagg_agg.h" + +#include +#include +#include + +#include "arrow/type_fwd.h" +#include "gtest/gtest.h" +#include "paimon/core/core_options.h" +#include "paimon/status.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +class FieldListaggAggTest : public testing::Test { + protected: + static Result> MakeAgg(const std::string& delimiter = ",", + bool distinct = false) { + std::map opts; + opts["fields.f.list-agg-delimiter"] = delimiter; + opts["fields.f.distinct"] = distinct ? "true" : "false"; + PAIMON_ASSIGN_OR_RAISE(auto options, CoreOptions::FromMap(opts)); + return FieldListaggAgg::Create(arrow::utf8(), std::move(options), "f"); + } +}; + +TEST_F(FieldListaggAggTest, TestSimple) { + ASSERT_OK_AND_ASSIGN(auto agg, MakeAgg()); + auto ret = agg->Agg(std::string_view("hello"), std::string_view(" world")); + ASSERT_EQ(DataDefine::GetVariantValue(ret), "hello, world"); +} + +TEST_F(FieldListaggAggTest, TestDelimiter) { + ASSERT_OK_AND_ASSIGN(auto agg, MakeAgg("-")); + auto ret = agg->Agg(std::string_view("user1"), std::string_view("user2")); + ASSERT_EQ(DataDefine::GetVariantValue(ret), "user1-user2"); +} + +TEST_F(FieldListaggAggTest, TestNull) { + ASSERT_OK_AND_ASSIGN(auto agg, MakeAgg()); + + // input null -> return accumulator + { + auto ret = agg->Agg(std::string_view("hello"), NullType()); + ASSERT_EQ(DataDefine::GetVariantValue(ret), "hello"); + } + // accumulator null -> return input + { + auto ret = agg->Agg(NullType(), std::string_view("world")); + ASSERT_EQ(DataDefine::GetVariantValue(ret), "world"); + } + // both null -> return null + { + auto ret = agg->Agg(NullType(), NullType()); + ASSERT_TRUE(DataDefine::IsVariantNull(ret)); + } +} + +TEST_F(FieldListaggAggTest, TestEmptyString) { + ASSERT_OK_AND_ASSIGN(auto agg, MakeAgg()); + + // empty input -> return accumulator + { + auto ret = agg->Agg(std::string_view("hello"), std::string_view("")); + ASSERT_EQ(DataDefine::GetVariantValue(ret), "hello"); + } + // empty accumulator -> return input + { + auto ret = agg->Agg(std::string_view(""), std::string_view("world")); + ASSERT_EQ(DataDefine::GetVariantValue(ret), "world"); + } + // both empty -> return input (which is empty) + { + auto ret = agg->Agg(std::string_view(""), std::string_view("")); + ASSERT_EQ(DataDefine::GetVariantValue(ret), ""); + } +} + +TEST_F(FieldListaggAggTest, TestMultipleAccumulation) { + ASSERT_OK_AND_ASSIGN(auto agg, MakeAgg()); + + // "a" + "," + "b" = "a,b", then "a,b" + "," + "c" = "a,b,c" + auto ret = agg->Agg(std::string_view("a"), std::string_view("b")); + ASSERT_EQ(DataDefine::GetVariantValue(ret), "a,b"); + ret = agg->Agg(std::move(ret), std::string_view("c")); + ASSERT_EQ(DataDefine::GetVariantValue(ret), "a,b,c"); +} + +TEST_F(FieldListaggAggTest, TestDistinct) { + ASSERT_OK_AND_ASSIGN(auto agg, MakeAgg(";", true)); + + // "a;b" + "b;c" -> "a;b;c" (deduplicate "b") + auto ret = agg->Agg(std::string_view("a;b"), std::string_view("b;c")); + ASSERT_EQ(DataDefine::GetVariantValue(ret), "a;b;c"); +} + +TEST_F(FieldListaggAggTest, TestDistinctNoDuplicates) { + ASSERT_OK_AND_ASSIGN(auto agg, MakeAgg(" ", true)); + + // "a b" + "c d" -> "a b c d" (no dups to remove) + auto ret = agg->Agg(std::string_view("a b"), std::string_view("c d")); + ASSERT_EQ(DataDefine::GetVariantValue(ret), "a b c d"); +} + +TEST_F(FieldListaggAggTest, TestDistinctEmptyInput) { + ASSERT_OK_AND_ASSIGN(auto agg, MakeAgg(";", true)); + + // empty input -> return accumulator + auto ret = agg->Agg(std::string_view("a;b"), std::string_view("")); + ASSERT_EQ(DataDefine::GetVariantValue(ret), "a;b"); +} + +TEST_F(FieldListaggAggTest, TestDistinctFalse) { + ASSERT_OK_AND_ASSIGN(auto agg, MakeAgg(";", false)); + + // "a;b" + "b;c" -> "a;b;b;c" (no dedup) + auto ret = agg->Agg(std::string_view("a;b"), std::string_view("b;c")); + ASSERT_EQ(DataDefine::GetVariantValue(ret), "a;b;b;c"); +} + +TEST_F(FieldListaggAggTest, TestInvalidType) { + EXPECT_OK_AND_ASSIGN(auto options, CoreOptions::FromMap({})); + auto result = FieldListaggAgg::Create(arrow::int32(), options, "f"); + ASSERT_FALSE(result.ok()); + ASSERT_TRUE(result.status().ToString().find("supposed to be string") != std::string::npos) + << result.status().ToString(); +} + +} // namespace paimon::test