Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions include/paimon/defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ struct PAIMON_EXPORT Options {
static const char DEFAULT_AGG_FUNCTION[];
/// IGNORE_RETRACT is "ignore-retract"
static const char IGNORE_RETRACT[];
/// "distinct" - Distinct option for aggregate functions like listagg. Default value is false.
/// Example: fields.f.distinct=true to deduplicate values during aggregation.
static const char DISTINCT[];
/// "list-agg-delimiter" - Delimiter for listagg aggregate function. Default value is ",".
/// Example: fields.f.list-agg-delimiter="-" to concatenate values with "-".
static const char LIST_AGG_DELIMITER[];
/// SEQUENCE_GROUP is "sequence-group"
static const char SEQUENCE_GROUP[];
/// @}
Expand Down
1 change: 1 addition & 0 deletions src/paimon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,7 @@ if(PAIMON_BUILD_TESTS)
core/mergetree/compact/aggregate/field_ignore_retract_agg_test.cpp
core/mergetree/compact/aggregate/field_last_non_null_value_agg_test.cpp
core/mergetree/compact/aggregate/field_last_value_agg_test.cpp
core/mergetree/compact/aggregate/field_listagg_agg_test.cpp
core/mergetree/compact/aggregate/field_min_max_agg_test.cpp
core/mergetree/compact/aggregate/field_primary_key_agg_test.cpp
core/mergetree/compact/aggregate/field_sum_agg_test.cpp
Expand Down
2 changes: 2 additions & 0 deletions src/paimon/common/defs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ const char Options::FIELDS_PREFIX[] = "fields";
const char Options::AGG_FUNCTION[] = "aggregate-function";
const char Options::DEFAULT_AGG_FUNCTION[] = "default-aggregate-function";
const char Options::IGNORE_RETRACT[] = "ignore-retract";
const char Options::DISTINCT[] = "distinct";
const char Options::LIST_AGG_DELIMITER[] = "list-agg-delimiter";
const char Options::SEQUENCE_GROUP[] = "sequence-group";

const char Options::BUCKET[] = "bucket";
Expand Down
18 changes: 18 additions & 0 deletions src/paimon/core/core_options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1093,6 +1093,24 @@ Result<bool> CoreOptions::FieldAggIgnoreRetract(const std::string& field_name) c
return field_agg_ignore_retract;
}

Result<std::string> CoreOptions::FieldListAggDelimiter(const std::string& field_name) const {
ConfigParser parser(impl_->raw_options);
std::string delimiter = ",";
std::string key = std::string(Options::FIELDS_PREFIX) + "." + field_name + "." +
std::string(Options::LIST_AGG_DELIMITER);
PAIMON_RETURN_NOT_OK(parser.ParseString(key, &delimiter));
return delimiter;
}

Result<bool> CoreOptions::FieldCollectAggDistinct(const std::string& field_name) const {
ConfigParser parser(impl_->raw_options);
bool distinct = false;
std::string key = std::string(Options::FIELDS_PREFIX) + "." + field_name + "." +
std::string(Options::DISTINCT);
PAIMON_RETURN_NOT_OK(parser.Parse<bool>(key, &distinct));
return distinct;
}

bool CoreOptions::DeletionVectorsEnabled() const {
return impl_->deletion_vectors_enabled;
}
Expand Down
2 changes: 2 additions & 0 deletions src/paimon/core/core_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ class PAIMON_EXPORT CoreOptions {
std::optional<std::string> GetFieldsDefaultFunc() const;
Result<std::optional<std::string>> GetFieldAggFunc(const std::string& field_name) const;
Result<bool> FieldAggIgnoreRetract(const std::string& field_name) const;
Result<std::string> FieldListAggDelimiter(const std::string& field_name) const;
Result<bool> FieldCollectAggDistinct(const std::string& field_name) const;
bool DeletionVectorsEnabled() const;
bool DeletionVectorsBitmap64() const;
int64_t DeletionVectorTargetFileSize() const;
Expand Down
6 changes: 6 additions & 0 deletions src/paimon/core/core_options_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ TEST(CoreOptionsTest, TestDefaultValue) {
ASSERT_EQ(std::nullopt, core_options.GetFieldsDefaultFunc());
ASSERT_EQ(std::nullopt, core_options.GetFieldAggFunc("f0").value());
ASSERT_FALSE(core_options.FieldAggIgnoreRetract("f1").value());
ASSERT_EQ(",", core_options.FieldListAggDelimiter("f1").value());
ASSERT_FALSE(core_options.FieldCollectAggDistinct("f1").value());
ASSERT_FALSE(core_options.DeletionVectorsEnabled());
ASSERT_FALSE(core_options.DeletionVectorsBitmap64());
ASSERT_EQ(2 * 1024 * 1024, core_options.DeletionVectorTargetFileSize());
Expand Down Expand Up @@ -169,6 +171,8 @@ TEST(CoreOptionsTest, TestFromMap) {
{Options::FIELDS_DEFAULT_AGG_FUNC, "sum"},
{"fields.f0.aggregate-function", "min"},
{"fields.f1.ignore-retract", "true"},
{"fields.f2.list-agg-delimiter", " | "},
{"fields.f2.distinct", "true"},
{Options::DELETION_VECTORS_ENABLED, "true"},
{Options::DELETION_VECTOR_BITMAP64, "true"},
{Options::DELETION_VECTOR_INDEX_FILE_TARGET_SIZE, "4MB"},
Expand Down Expand Up @@ -269,6 +273,8 @@ TEST(CoreOptionsTest, TestFromMap) {
ASSERT_EQ("min", core_options.GetFieldAggFunc("f0").value().value());
ASSERT_TRUE(core_options.FieldAggIgnoreRetract("f1").value());
ASSERT_TRUE(core_options.FieldAggIgnoreRetract("f1").value());
ASSERT_EQ(" | ", core_options.FieldListAggDelimiter("f2").value());
ASSERT_TRUE(core_options.FieldCollectAggDistinct("f2").value());
ASSERT_TRUE(core_options.DeletionVectorsEnabled());
ASSERT_TRUE(core_options.DeletionVectorsBitmap64());
ASSERT_EQ(4 * 1024 * 1024, core_options.DeletionVectorTargetFileSize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "paimon/core/mergetree/compact/aggregate/field_ignore_retract_agg.h"
#include "paimon/core/mergetree/compact/aggregate/field_last_non_null_value_agg.h"
#include "paimon/core/mergetree/compact/aggregate/field_last_value_agg.h"
#include "paimon/core/mergetree/compact/aggregate/field_listagg_agg.h"
#include "paimon/core/mergetree/compact/aggregate/field_max_agg.h"
#include "paimon/core/mergetree/compact/aggregate/field_min_agg.h"
#include "paimon/core/mergetree/compact/aggregate/field_primary_key_agg.h"
Expand Down Expand Up @@ -71,6 +72,9 @@ class FieldAggregatorFactory {
PAIMON_ASSIGN_OR_RAISE(field_aggregator, FieldBoolOrAgg::Create(field_type));
} else if (str_agg == FieldBoolAndAgg::NAME) {
PAIMON_ASSIGN_OR_RAISE(field_aggregator, FieldBoolAndAgg::Create(field_type));
} else if (str_agg == FieldListaggAgg::NAME) {
PAIMON_ASSIGN_OR_RAISE(field_aggregator,
FieldListaggAgg::Create(field_type, options, field_name));
} else {
return Status::Invalid(fmt::format(
"Use unsupported aggregation {} or spell aggregate function incorrectly!",
Expand Down
134 changes: 134 additions & 0 deletions src/paimon/core/mergetree/compact/aggregate/field_listagg_agg.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright 2026-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <memory>
#include <string>
#include <unordered_set>

#include "paimon/common/data/data_define.h"
#include "paimon/core/core_options.h"
#include "paimon/core/mergetree/compact/aggregate/field_aggregator.h"

namespace arrow {
class DataType;
} // namespace arrow

namespace paimon {
/// listagg aggregate a field of a row.
/// Concatenates string values with a delimiter.
class FieldListaggAgg : public FieldAggregator {
public:
static constexpr char NAME[] = "listagg";

static Result<std::unique_ptr<FieldListaggAgg>> Create(
const std::shared_ptr<arrow::DataType>& field_type, const CoreOptions& options,
const std::string& field_name) {
if (field_type->id() != arrow::Type::type::STRING) {
return Status::Invalid(
fmt::format("invalid field type {} for field '{}' of {}, supposed to be string",
field_type->ToString(), field_name, NAME));
}
PAIMON_ASSIGN_OR_RAISE(std::string delimiter, options.FieldListAggDelimiter(field_name));
PAIMON_ASSIGN_OR_RAISE(bool distinct, options.FieldCollectAggDistinct(field_name));
// When delimiter is empty and distinct is true, fall back to whitespace split.
if (distinct && delimiter.empty()) {
delimiter = " ";
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the Java logic is: if distinct is true, it applies a fallback mechanism (e.g. deduplication), otherwise it directly concatenates the values.

return std::unique_ptr<FieldListaggAgg>(
new FieldListaggAgg(field_type, std::move(delimiter), distinct));
}

VariantType Agg(const VariantType& accumulator, const VariantType& input_field) override {
bool accumulator_null = DataDefine::IsVariantNull(accumulator);
bool input_null = DataDefine::IsVariantNull(input_field);
if (accumulator_null || input_null) {
return accumulator_null ? input_field : accumulator;
}
std::string_view acc_str = DataDefine::GetStringView(accumulator);
std::string_view in_str = DataDefine::GetStringView(input_field);
if (in_str.empty()) {
return accumulator;
}
if (acc_str.empty()) {
return input_field;
}

if (distinct_) {
result_ = AggDistinctImpl(acc_str, in_str);
} else {
// Build into a local string to avoid aliasing when acc_str points into result_
std::string new_result;
new_result.reserve(acc_str.size() + delimiter_.size() + in_str.size());
new_result.append(acc_str);
new_result.append(delimiter_);
new_result.append(in_str);
result_ = std::move(new_result);
}
return std::string_view(result_);
}

private:
std::string AggDistinctImpl(std::string_view acc_str, std::string_view in_str) const {
// Split accumulator tokens into a set for dedup
std::unordered_set<std::string_view> seen;
std::string_view remaining = acc_str;
while (true) {
size_t pos = remaining.find(delimiter_);
std::string_view token =
(pos == std::string_view::npos) ? remaining : remaining.substr(0, pos);
if (!token.empty()) {
seen.insert(token);
}
if (pos == std::string_view::npos) {
break;
}
remaining = remaining.substr(pos + delimiter_.size());
}

// Start with the full accumulator, then append delimiter + new distinct tokens from input
std::string result;
result.reserve(acc_str.size() + in_str.size());
result.append(acc_str);
remaining = in_str;
while (true) {
size_t pos = remaining.find(delimiter_);
std::string_view token =
(pos == std::string_view::npos) ? remaining : remaining.substr(0, pos);
if (!token.empty() && seen.insert(token).second) {
result.append(delimiter_);
result.append(token);
}
if (pos == std::string_view::npos) {
break;
}
remaining = remaining.substr(pos + delimiter_.size());
}
return result;
}

explicit FieldListaggAgg(const std::shared_ptr<arrow::DataType>& field_type,
std::string delimiter, bool distinct)
: FieldAggregator(std::string(NAME), field_type),
delimiter_(std::move(delimiter)),
distinct_(distinct) {}

std::string delimiter_;
bool distinct_;
std::string result_;
};
} // namespace paimon
145 changes: 145 additions & 0 deletions src/paimon/core/mergetree/compact/aggregate/field_listagg_agg_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Copyright 2026-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "paimon/core/mergetree/compact/aggregate/field_listagg_agg.h"

#include <map>
#include <string>
#include <string_view>

#include "arrow/type_fwd.h"
#include "gtest/gtest.h"
#include "paimon/core/core_options.h"
#include "paimon/status.h"
#include "paimon/testing/utils/testharness.h"

namespace paimon::test {

class FieldListaggAggTest : public testing::Test {
protected:
static Result<std::unique_ptr<FieldListaggAgg>> MakeAgg(const std::string& delimiter = ",",
bool distinct = false) {
std::map<std::string, std::string> opts;
opts["fields.f.list-agg-delimiter"] = delimiter;
opts["fields.f.distinct"] = distinct ? "true" : "false";
PAIMON_ASSIGN_OR_RAISE(auto options, CoreOptions::FromMap(opts));
return FieldListaggAgg::Create(arrow::utf8(), std::move(options), "f");
}
};

TEST_F(FieldListaggAggTest, TestSimple) {
ASSERT_OK_AND_ASSIGN(auto agg, MakeAgg());
auto ret = agg->Agg(std::string_view("hello"), std::string_view(" world"));
ASSERT_EQ(DataDefine::GetVariantValue<std::string_view>(ret), "hello, world");
}

TEST_F(FieldListaggAggTest, TestDelimiter) {
ASSERT_OK_AND_ASSIGN(auto agg, MakeAgg("-"));
auto ret = agg->Agg(std::string_view("user1"), std::string_view("user2"));
ASSERT_EQ(DataDefine::GetVariantValue<std::string_view>(ret), "user1-user2");
}

TEST_F(FieldListaggAggTest, TestNull) {
ASSERT_OK_AND_ASSIGN(auto agg, MakeAgg());

// input null -> return accumulator
{
auto ret = agg->Agg(std::string_view("hello"), NullType());
ASSERT_EQ(DataDefine::GetVariantValue<std::string_view>(ret), "hello");
}
// accumulator null -> return input
{
auto ret = agg->Agg(NullType(), std::string_view("world"));
ASSERT_EQ(DataDefine::GetVariantValue<std::string_view>(ret), "world");
}
// both null -> return null
{
auto ret = agg->Agg(NullType(), NullType());
ASSERT_TRUE(DataDefine::IsVariantNull(ret));
}
}

TEST_F(FieldListaggAggTest, TestEmptyString) {
ASSERT_OK_AND_ASSIGN(auto agg, MakeAgg());

// empty input -> return accumulator
{
auto ret = agg->Agg(std::string_view("hello"), std::string_view(""));
ASSERT_EQ(DataDefine::GetVariantValue<std::string_view>(ret), "hello");
}
// empty accumulator -> return input
{
auto ret = agg->Agg(std::string_view(""), std::string_view("world"));
ASSERT_EQ(DataDefine::GetVariantValue<std::string_view>(ret), "world");
}
// both empty -> return input (which is empty)
{
auto ret = agg->Agg(std::string_view(""), std::string_view(""));
ASSERT_EQ(DataDefine::GetVariantValue<std::string_view>(ret), "");
}
}

TEST_F(FieldListaggAggTest, TestMultipleAccumulation) {
ASSERT_OK_AND_ASSIGN(auto agg, MakeAgg());

// "a" + "," + "b" = "a,b", then "a,b" + "," + "c" = "a,b,c"
auto ret = agg->Agg(std::string_view("a"), std::string_view("b"));
ASSERT_EQ(DataDefine::GetVariantValue<std::string_view>(ret), "a,b");
ret = agg->Agg(std::move(ret), std::string_view("c"));
ASSERT_EQ(DataDefine::GetVariantValue<std::string_view>(ret), "a,b,c");
}

TEST_F(FieldListaggAggTest, TestDistinct) {
ASSERT_OK_AND_ASSIGN(auto agg, MakeAgg(";", true));

// "a;b" + "b;c" -> "a;b;c" (deduplicate "b")
auto ret = agg->Agg(std::string_view("a;b"), std::string_view("b;c"));
ASSERT_EQ(DataDefine::GetVariantValue<std::string_view>(ret), "a;b;c");
}

TEST_F(FieldListaggAggTest, TestDistinctNoDuplicates) {
ASSERT_OK_AND_ASSIGN(auto agg, MakeAgg(" ", true));

// "a b" + "c d" -> "a b c d" (no dups to remove)
auto ret = agg->Agg(std::string_view("a b"), std::string_view("c d"));
ASSERT_EQ(DataDefine::GetVariantValue<std::string_view>(ret), "a b c d");
}

TEST_F(FieldListaggAggTest, TestDistinctEmptyInput) {
ASSERT_OK_AND_ASSIGN(auto agg, MakeAgg(";", true));

// empty input -> return accumulator
auto ret = agg->Agg(std::string_view("a;b"), std::string_view(""));
ASSERT_EQ(DataDefine::GetVariantValue<std::string_view>(ret), "a;b");
}

TEST_F(FieldListaggAggTest, TestDistinctFalse) {
ASSERT_OK_AND_ASSIGN(auto agg, MakeAgg(";", false));

// "a;b" + "b;c" -> "a;b;b;c" (no dedup)
auto ret = agg->Agg(std::string_view("a;b"), std::string_view("b;c"));
ASSERT_EQ(DataDefine::GetVariantValue<std::string_view>(ret), "a;b;b;c");
}

TEST_F(FieldListaggAggTest, TestInvalidType) {
EXPECT_OK_AND_ASSIGN(auto options, CoreOptions::FromMap({}));
auto result = FieldListaggAgg::Create(arrow::int32(), options, "f");
ASSERT_FALSE(result.ok());
ASSERT_TRUE(result.status().ToString().find("supposed to be string") != std::string::npos)
<< result.status().ToString();
}

} // namespace paimon::test