From 24874fd92d9e4bcd6d862dfa98e889d4f8dfbaa4 Mon Sep 17 00:00:00 2001 From: "zhangchaoming.zcm" Date: Fri, 10 Apr 2026 10:56:07 +0800 Subject: [PATCH 01/10] feat(core): add bucket function implementation - Add BucketFunction interface and BucketFunctionType enum - Implement ModBucketFunction for hash-based bucket assignment - Implement DefaultBucketFunction for default bucket behavior - Add bucket_num option to CoreOptions - Extend BucketIdCalculator to support bucket function - Add unit tests for bucket functions and related components --- include/paimon/defs.h | 4 + include/paimon/utils/bucket_id_calculator.h | 23 +++- src/paimon/CMakeLists.txt | 3 + src/paimon/common/defs.cpp | 1 + .../common/utils/bucket_id_calculator.cpp | 24 +++- .../utils/bucket_id_calculator_test.cpp | 62 ++++++++++ src/paimon/core/bucket/bucket_function.h | 38 ++++++ src/paimon/core/bucket/bucket_function_type.h | 31 +++++ .../core/bucket/default_bucket_function.h | 36 ++++++ .../bucket/default_bucket_function_test.cpp | 82 +++++++++++++ .../core/bucket/mod_bucket_function.cpp | 75 ++++++++++++ src/paimon/core/bucket/mod_bucket_function.h | 46 ++++++++ .../core/bucket/mod_bucket_function_test.cpp | 111 ++++++++++++++++++ src/paimon/core/core_options.cpp | 26 ++++ src/paimon/core/core_options.h | 3 + src/paimon/core/core_options_test.cpp | 10 +- 16 files changed, 567 insertions(+), 8 deletions(-) create mode 100644 src/paimon/core/bucket/bucket_function.h create mode 100644 src/paimon/core/bucket/bucket_function_type.h create mode 100644 src/paimon/core/bucket/default_bucket_function.h create mode 100644 src/paimon/core/bucket/default_bucket_function_test.cpp create mode 100644 src/paimon/core/bucket/mod_bucket_function.cpp create mode 100644 src/paimon/core/bucket/mod_bucket_function.h create mode 100644 src/paimon/core/bucket/mod_bucket_function_test.cpp diff --git a/include/paimon/defs.h b/include/paimon/defs.h index 3ea0da43c..86fbded9a 100644 --- a/include/paimon/defs.h +++ b/include/paimon/defs.h @@ -414,6 +414,10 @@ struct PAIMON_EXPORT Options { /// "lookup.cache.high-priority-pool-ratio" - The fraction of cache memory that is reserved for /// high-priority data like index, filter. Default value is 0.25. static const char LOOKUP_CACHE_HIGH_PRIO_POOL_RATIO[]; + + /// "bucket-function.type" - The bucket function type for paimon bucket. + /// Values can be: "default", "mod", "hive". Default value is "default". + static const char BUCKET_FUNCTION_TYPE[]; }; static constexpr int64_t BATCH_WRITE_COMMIT_IDENTIFIER = std::numeric_limits::max(); diff --git a/include/paimon/utils/bucket_id_calculator.h b/include/paimon/utils/bucket_id_calculator.h index 8a938b623..f7004edb4 100644 --- a/include/paimon/utils/bucket_id_calculator.h +++ b/include/paimon/utils/bucket_id_calculator.h @@ -27,6 +27,7 @@ struct ArrowSchema; struct ArrowArray; namespace paimon { +class BucketFunction; class MemoryPool; /// Calculator for determining bucket ids based on the given bucket keys. @@ -47,6 +48,22 @@ class PAIMON_EXPORT BucketIdCalculator { /// @param num_buckets Number of buckets. static Result> Create(bool is_pk_table, int32_t num_buckets); + + /// Create `BucketIdCalculator` with a custom bucket function and memory pool. + /// @param is_pk_table Whether this is for a primary key table. + /// @param num_buckets Number of buckets. + /// @param bucket_function The bucket function to use for bucket assignment. + /// @param pool Memory pool for memory allocation. + static Result> Create( + bool is_pk_table, int32_t num_buckets, std::unique_ptr bucket_function, + const std::shared_ptr& pool); + + /// Create `BucketIdCalculator` with a custom bucket function and default memory pool. + /// @param is_pk_table Whether this is for a primary key table. + /// @param num_buckets Number of buckets. + /// @param bucket_function The bucket function to use for bucket assignment. + static Result> Create( + bool is_pk_table, int32_t num_buckets, std::unique_ptr bucket_function); /// Calculate bucket ids for the given bucket keys. /// @param bucket_keys Arrow struct array containing the bucket key values. /// @param bucket_schema Arrow schema describing the structure of bucket_keys. @@ -58,11 +75,13 @@ class PAIMON_EXPORT BucketIdCalculator { int32_t* bucket_ids) const; private: - BucketIdCalculator(int32_t num_buckets, const std::shared_ptr& pool) - : num_buckets_(num_buckets), pool_(pool) {} + BucketIdCalculator(int32_t num_buckets, std::unique_ptr bucket_function, + const std::shared_ptr& pool) + : num_buckets_(num_buckets), bucket_function_(std::move(bucket_function)), pool_(pool) {} private: int32_t num_buckets_; + std::unique_ptr bucket_function_; std::shared_ptr pool_; }; } // namespace paimon diff --git a/src/paimon/CMakeLists.txt b/src/paimon/CMakeLists.txt index 56044ab86..c8b072f51 100644 --- a/src/paimon/CMakeLists.txt +++ b/src/paimon/CMakeLists.txt @@ -149,6 +149,7 @@ set(PAIMON_CORE_SRCS core/disk/io_manager.cpp core/append/append_only_writer.cpp core/append/bucketed_append_compact_manager.cpp + core/bucket/mod_bucket_function.cpp core/casting/binary_to_string_cast_executor.cpp core/casting/boolean_to_decimal_cast_executor.cpp core/casting/boolean_to_numeric_cast_executor.cpp @@ -511,6 +512,8 @@ if(PAIMON_BUILD_TESTS) SOURCES core/append/append_only_writer_test.cpp core/append/bucketed_append_compact_manager_test.cpp + core/bucket/default_bucket_function_test.cpp + core/bucket/mod_bucket_function_test.cpp core/casting/cast_executor_factory_test.cpp core/casting/cast_executor_test.cpp core/casting/casted_row_test.cpp diff --git a/src/paimon/common/defs.cpp b/src/paimon/common/defs.cpp index 113d1327b..2c1e9f3e8 100644 --- a/src/paimon/common/defs.cpp +++ b/src/paimon/common/defs.cpp @@ -117,5 +117,6 @@ const char Options::LOOKUP_COMPACT[] = "lookup-compact"; const char Options::LOOKUP_COMPACT_MAX_INTERVAL[] = "lookup-compact.max-interval"; const char Options::LOOKUP_CACHE_MAX_MEMORY_SIZE[] = "lookup.cache-max-memory-size"; const char Options::LOOKUP_CACHE_HIGH_PRIO_POOL_RATIO[] = "lookup.cache.high-priority-pool-ratio"; +const char Options::BUCKET_FUNCTION_TYPE[] = "bucket-function.type"; } // namespace paimon diff --git a/src/paimon/common/utils/bucket_id_calculator.cpp b/src/paimon/common/utils/bucket_id_calculator.cpp index 3f1ae900c..339286cd5 100644 --- a/src/paimon/common/utils/bucket_id_calculator.cpp +++ b/src/paimon/common/utils/bucket_id_calculator.cpp @@ -44,6 +44,8 @@ #include "paimon/common/utils/arrow/status_utils.h" #include "paimon/common/utils/date_time_utils.h" #include "paimon/common/utils/scope_guard.h" +#include "paimon/core/bucket/bucket_function.h" +#include "paimon/core/bucket/default_bucket_function.h" #include "paimon/data/decimal.h" #include "paimon/data/timestamp.h" #include "paimon/memory/memory_pool.h" @@ -238,6 +240,17 @@ static Result WriteBucketRow(int32_t col_id, Result> BucketIdCalculator::Create( bool is_pk_table, int32_t num_buckets, const std::shared_ptr& pool) { + return Create(is_pk_table, num_buckets, std::make_unique(), pool); +} + +Result> BucketIdCalculator::Create(bool is_pk_table, + int32_t num_buckets) { + return Create(is_pk_table, num_buckets, GetDefaultPool()); +} + +Result> BucketIdCalculator::Create( + bool is_pk_table, int32_t num_buckets, std::unique_ptr bucket_function, + const std::shared_ptr& pool) { if (num_buckets == 0 || num_buckets < -2) { return Status::Invalid("num buckets must be -1 or -2 or greater than 0"); } @@ -249,12 +262,13 @@ Result> BucketIdCalculator::Create( if (!is_pk_table && num_buckets == -2) { return Status::Invalid("Append table not support PostponeBucketMode"); } - return std::unique_ptr(new BucketIdCalculator(num_buckets, pool)); + return std::unique_ptr( + new BucketIdCalculator(num_buckets, std::move(bucket_function), pool)); } -Result> BucketIdCalculator::Create(bool is_pk_table, - int32_t num_buckets) { - return Create(is_pk_table, num_buckets, GetDefaultPool()); +Result> BucketIdCalculator::Create( + bool is_pk_table, int32_t num_buckets, std::unique_ptr bucket_function) { + return Create(is_pk_table, num_buckets, std::move(bucket_function), GetDefaultPool()); } Status BucketIdCalculator::CalculateBucketIds(ArrowArray* bucket_keys, ArrowSchema* bucket_schema, @@ -298,7 +312,7 @@ Status BucketIdCalculator::CalculateBucketIds(ArrowArray* bucket_keys, ArrowSche write_functions[col](row, &row_writer); } row_writer.Complete(); - bucket_ids[row] = std::abs(bucket_row.HashCode() % num_buckets_); + bucket_ids[row] = bucket_function_->Bucket(bucket_row, num_buckets_); } guard.Release(); return Status::OK(); diff --git a/src/paimon/common/utils/bucket_id_calculator_test.cpp b/src/paimon/common/utils/bucket_id_calculator_test.cpp index 4739d69a3..8b20b6c48 100644 --- a/src/paimon/common/utils/bucket_id_calculator_test.cpp +++ b/src/paimon/common/utils/bucket_id_calculator_test.cpp @@ -31,6 +31,8 @@ #include "gtest/gtest.h" #include "paimon/common/utils/arrow/status_utils.h" #include "paimon/common/utils/date_time_utils.h" +#include "paimon/core/bucket/default_bucket_function.h" +#include "paimon/core/bucket/mod_bucket_function.h" #include "paimon/fs/local/local_file_system.h" #include "paimon/testing/utils/testharness.h" @@ -63,6 +65,34 @@ class BucketIdCalculatorTest : public ::testing::Test { arrow::struct_(bucket_schema->fields()), data_str)); return CalculateBucketIds(is_pk_table, num_buckets, bucket_schema, bucket_array); } + + Result> CalculateBucketIds( + bool is_pk_table, int32_t num_buckets, std::unique_ptr bucket_function, + const std::shared_ptr& bucket_schema, + const std::shared_ptr& bucket_array) const { + ::ArrowArray c_bucket_array; + EXPECT_TRUE(arrow::ExportArray(*bucket_array, &c_bucket_array).ok()); + ::ArrowSchema c_bucket_schema; + EXPECT_TRUE(arrow::ExportSchema(*bucket_schema, &c_bucket_schema).ok()); + std::vector bucket_ids(bucket_array->length()); + EXPECT_OK_AND_ASSIGN( + auto bucket_id_cal, + BucketIdCalculator::Create(is_pk_table, num_buckets, std::move(bucket_function))); + PAIMON_RETURN_NOT_OK(bucket_id_cal->CalculateBucketIds( + /*bucket_keys=*/&c_bucket_array, /*bucket_schema=*/&c_bucket_schema, + /*bucket_ids=*/bucket_ids.data())); + return bucket_ids; + } + + Result> CalculateBucketIds( + bool is_pk_table, int32_t num_buckets, std::unique_ptr bucket_function, + const std::shared_ptr& bucket_schema, const std::string& data_str) const { + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(auto bucket_array, + arrow::ipc::internal::json::ArrayFromJSON( + arrow::struct_(bucket_schema->fields()), data_str)); + return CalculateBucketIds(is_pk_table, num_buckets, std::move(bucket_function), + bucket_schema, bucket_array); + } }; TEST_F(BucketIdCalculatorTest, TestCompatibleWithJava) { @@ -304,4 +334,36 @@ TEST_F(BucketIdCalculatorTest, TestVariantType) { CalculateBucketIds(/*is_pk_table=*/true, 12345, bucket_schema, bucket_array)); ASSERT_EQ(expected, result2); } + +TEST_F(BucketIdCalculatorTest, TestWithModBucketFunction) { + auto bucket_schema = arrow::schema(arrow::FieldVector({arrow::field("b0", arrow::int32())})); + ASSERT_OK_AND_ASSIGN(auto mod_func, ModBucketFunction::Create(FieldType::INT)); + ASSERT_OK_AND_ASSIGN( + std::vector result, + CalculateBucketIds(/*is_pk_table=*/true, /*num_buckets=*/10, std::move(mod_func), + bucket_schema, "[[10], [-1], [50], [-13], [0]]")); + // Java Math.floorMod semantics: + // floorMod(10, 10) = 0 + // floorMod(-1, 10) = 9 + // floorMod(50, 10) = 0 + // floorMod(-13, 10) = 7 + // floorMod(0, 10) = 0 + std::vector expected = {0, 9, 0, 7, 0}; + ASSERT_EQ(expected, result); +} + +TEST_F(BucketIdCalculatorTest, TestWithDefaultBucketFunctionExplicit) { + auto bucket_schema = arrow::schema(arrow::FieldVector({arrow::field("b0", arrow::int32())})); + // Calculate with explicit DefaultBucketFunction + auto default_func = std::make_unique(); + ASSERT_OK_AND_ASSIGN( + std::vector result_explicit, + CalculateBucketIds(/*is_pk_table=*/true, /*num_buckets=*/10, std::move(default_func), + bucket_schema, "[[10], [-1], [50], [-13], [0]]")); + // Calculate with default (no BucketFunction passed) + ASSERT_OK_AND_ASSIGN(std::vector result_default, + CalculateBucketIds(/*is_pk_table=*/true, /*num_buckets=*/10, bucket_schema, + "[[10], [-1], [50], [-13], [0]]")); + ASSERT_EQ(result_default, result_explicit); +} } // namespace paimon::test diff --git a/src/paimon/core/bucket/bucket_function.h b/src/paimon/core/bucket/bucket_function.h new file mode 100644 index 000000000..77d6e4c3a --- /dev/null +++ b/src/paimon/core/bucket/bucket_function.h @@ -0,0 +1,38 @@ +/* + * Copyright 2024-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +namespace paimon { + +class BinaryRow; + +/// Abstract interface for bucket functions. +/// A bucket function determines which bucket a row should be assigned to. +class BucketFunction { + public: + virtual ~BucketFunction() = default; + + /// Compute the bucket for the given row. + /// @param row The binary row to compute the bucket for. + /// @param num_buckets The total number of buckets. + /// @return The bucket index (0-based). + virtual int32_t Bucket(const BinaryRow& row, int32_t num_buckets) const = 0; +}; + +} // namespace paimon diff --git a/src/paimon/core/bucket/bucket_function_type.h b/src/paimon/core/bucket/bucket_function_type.h new file mode 100644 index 000000000..151107174 --- /dev/null +++ b/src/paimon/core/bucket/bucket_function_type.h @@ -0,0 +1,31 @@ +/* + * Copyright 2024-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +namespace paimon { + +/// Specifies the bucket function type for paimon bucket. +enum class BucketFunctionType { + // Default bucket function using hash code. + DEFAULT = 1, + // Mod bucket function using modulo operation on bucket key. + MOD = 2, + // Hive bucket function (not yet implemented). + HIVE = 3 +}; + +} // namespace paimon diff --git a/src/paimon/core/bucket/default_bucket_function.h b/src/paimon/core/bucket/default_bucket_function.h new file mode 100644 index 000000000..a9475c096 --- /dev/null +++ b/src/paimon/core/bucket/default_bucket_function.h @@ -0,0 +1,36 @@ +/* + * Copyright 2024-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "paimon/common/data/binary_row.h" +#include "paimon/core/bucket/bucket_function.h" + +namespace paimon { + +/// Default bucket function that uses the hash code of the row to determine the bucket. +/// This is consistent with the logic in BucketIdCalculator. +class DefaultBucketFunction : public BucketFunction { + public: + int32_t Bucket(const BinaryRow& row, int32_t num_buckets) const override { + return std::abs(row.HashCode() % num_buckets); + } +}; + +} // namespace paimon diff --git a/src/paimon/core/bucket/default_bucket_function_test.cpp b/src/paimon/core/bucket/default_bucket_function_test.cpp new file mode 100644 index 000000000..37c6868b5 --- /dev/null +++ b/src/paimon/core/bucket/default_bucket_function_test.cpp @@ -0,0 +1,82 @@ +/* + * Copyright 2024-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/bucket/default_bucket_function.h" + +#include "gtest/gtest.h" +#include "paimon/common/data/binary_row.h" +#include "paimon/common/data/binary_row_writer.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +TEST(DefaultBucketFunctionTest, TestBasicHashMod) { + auto pool = GetDefaultPool(); + DefaultBucketFunction func; + + // Create a row with a single INT field + BinaryRow row(1); + BinaryRowWriter writer(&row, 0, pool.get()); + writer.WriteInt(0, 42); + writer.Complete(); + + int32_t num_buckets = 5; + int32_t bucket = func.Bucket(row, num_buckets); + ASSERT_GE(bucket, 0); + ASSERT_LT(bucket, num_buckets); + + // Verify it matches the expected formula: abs(hashCode % numBuckets) + int32_t expected = std::abs(row.HashCode() % num_buckets); + ASSERT_EQ(expected, bucket); +} + +TEST(DefaultBucketFunctionTest, TestDifferentNumBuckets) { + auto pool = GetDefaultPool(); + DefaultBucketFunction func; + + BinaryRow row(1); + BinaryRowWriter writer(&row, 0, pool.get()); + writer.WriteInt(0, 100); + writer.Complete(); + + for (int32_t num_buckets = 1; num_buckets <= 10; num_buckets++) { + int32_t bucket = func.Bucket(row, num_buckets); + ASSERT_GE(bucket, 0); + ASSERT_LT(bucket, num_buckets); + ASSERT_EQ(std::abs(row.HashCode() % num_buckets), bucket); + } +} + +TEST(DefaultBucketFunctionTest, TestMultiFieldRow) { + auto pool = GetDefaultPool(); + DefaultBucketFunction func; + + BinaryRow row(3); + BinaryRowWriter writer(&row, 0, pool.get()); + writer.WriteInt(0, 1); + writer.WriteLong(1, 2); + writer.WriteInt(2, 3); + writer.Complete(); + + int32_t num_buckets = 7; + int32_t bucket = func.Bucket(row, num_buckets); + ASSERT_GE(bucket, 0); + ASSERT_LT(bucket, num_buckets); + ASSERT_EQ(std::abs(row.HashCode() % num_buckets), bucket); +} + +} // namespace paimon::test diff --git a/src/paimon/core/bucket/mod_bucket_function.cpp b/src/paimon/core/bucket/mod_bucket_function.cpp new file mode 100644 index 000000000..dead2a29c --- /dev/null +++ b/src/paimon/core/bucket/mod_bucket_function.cpp @@ -0,0 +1,75 @@ +/* + * Copyright 2024-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/bucket/mod_bucket_function.h" + +#include "fmt/format.h" +#include "paimon/common/data/binary_row.h" +#include "paimon/status.h" + +namespace paimon { + +namespace { + +/// Equivalent to Java's Math.floorMod(int, int). +/// The result always has the same sign as the divisor (y), or is zero. +inline int32_t FloorMod(int32_t x, int32_t y) { + int32_t mod = x % y; + // If the signs of mod and y differ and mod is not zero, adjust. + if ((mod ^ y) < 0 && mod != 0) { + mod += y; + } + return mod; +} + +/// Equivalent to Java's Math.floorMod(long, int). +/// The result always has the same sign as the divisor (y), or is zero. +inline int32_t FloorMod(int64_t x, int32_t y) { + int64_t mod = x % static_cast(y); + // If the signs of mod and y differ and mod is not zero, adjust. + if ((mod ^ static_cast(y)) < 0 && mod != 0) { + mod += y; + } + return static_cast(mod); +} + +} // namespace + +ModBucketFunction::ModBucketFunction(FieldType bucket_key_type) + : bucket_key_type_(bucket_key_type) {} + +Result> ModBucketFunction::Create(FieldType bucket_key_type) { + if (bucket_key_type != FieldType::INT && bucket_key_type != FieldType::BIGINT) { + return Status::Invalid( + fmt::format("ModBucketFunction only supports INT or BIGINT bucket key type, but got {}", + static_cast(bucket_key_type))); + } + return std::unique_ptr(new ModBucketFunction(bucket_key_type)); +} + +int32_t ModBucketFunction::Bucket(const BinaryRow& row, int32_t num_buckets) const { + switch (bucket_key_type_) { + case FieldType::INT: + return FloorMod(row.GetInt(0), num_buckets); + case FieldType::BIGINT: + return FloorMod(row.GetLong(0), num_buckets); + default: + // This should never happen since Create() validates the type. + return 0; + } +} + +} // namespace paimon diff --git a/src/paimon/core/bucket/mod_bucket_function.h b/src/paimon/core/bucket/mod_bucket_function.h new file mode 100644 index 000000000..23f144606 --- /dev/null +++ b/src/paimon/core/bucket/mod_bucket_function.h @@ -0,0 +1,46 @@ +/* + * Copyright 2024-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "paimon/core/bucket/bucket_function.h" +#include "paimon/defs.h" +#include "paimon/result.h" + +namespace paimon { + +/// Mod bucket function that uses modulo operation on the bucket key value. +/// The bucket key must be a single field of INT or BIGINT type. +/// This implements Java's Math.floorMod semantics for negative numbers. +class ModBucketFunction : public BucketFunction { + public: + /// Create a ModBucketFunction with the given bucket key type. + /// @param bucket_key_type The type of the single bucket key field. Must be INT or BIGINT. + /// @return A Result containing the ModBucketFunction or an error status. + static Result> Create(FieldType bucket_key_type); + + int32_t Bucket(const BinaryRow& row, int32_t num_buckets) const override; + + private: + explicit ModBucketFunction(FieldType bucket_key_type); + + FieldType bucket_key_type_; +}; + +} // namespace paimon diff --git a/src/paimon/core/bucket/mod_bucket_function_test.cpp b/src/paimon/core/bucket/mod_bucket_function_test.cpp new file mode 100644 index 000000000..66bea19a3 --- /dev/null +++ b/src/paimon/core/bucket/mod_bucket_function_test.cpp @@ -0,0 +1,111 @@ +/* + * Copyright 2024-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/bucket/mod_bucket_function.h" + +#include "gtest/gtest.h" +#include "paimon/common/data/binary_row.h" +#include "paimon/common/data/binary_row_writer.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +namespace { + +BinaryRow CreateIntRow(int32_t value) { + auto pool = GetDefaultPool(); + BinaryRow row(1); + BinaryRowWriter writer(&row, 0, pool.get()); + writer.WriteInt(0, value); + writer.Complete(); + return row; +} + +BinaryRow CreateLongRow(int64_t value) { + auto pool = GetDefaultPool(); + BinaryRow row(1); + BinaryRowWriter writer(&row, 0, pool.get()); + writer.WriteLong(0, value); + writer.Complete(); + return row; +} + +} // namespace + +TEST(ModBucketFunctionTest, TestIntType) { + ASSERT_OK_AND_ASSIGN(auto func, ModBucketFunction::Create(FieldType::INT)); + + // 1 % 5 = 1 + ASSERT_EQ(1, func->Bucket(CreateIntRow(1), 5)); + // 7 % 5 = 2 + ASSERT_EQ(2, func->Bucket(CreateIntRow(7), 5)); + // -2 floorMod 5 = 3 (Java Math.floorMod(-2, 5) = 3) + ASSERT_EQ(3, func->Bucket(CreateIntRow(-2), 5)); +} + +TEST(ModBucketFunctionTest, TestBigintType) { + ASSERT_OK_AND_ASSIGN(auto func, ModBucketFunction::Create(FieldType::BIGINT)); + + // 8 % 5 = 3 + ASSERT_EQ(3, func->Bucket(CreateLongRow(8), 5)); + // 0 % 5 = 0 + ASSERT_EQ(0, func->Bucket(CreateLongRow(0), 5)); + // -3 floorMod 5 = 2 (Java Math.floorMod(-3L, 5) = 2) + ASSERT_EQ(2, func->Bucket(CreateLongRow(-3), 5)); +} + +TEST(ModBucketFunctionTest, TestUnsupportedType) { + // STRING type should fail + auto result = ModBucketFunction::Create(FieldType::STRING); + ASSERT_NOK(result.status()); +} + +TEST(ModBucketFunctionTest, TestUnsupportedFloatType) { + // FLOAT type should fail + auto result = ModBucketFunction::Create(FieldType::FLOAT); + ASSERT_NOK(result.status()); +} + +TEST(ModBucketFunctionTest, TestUnsupportedDoubleType) { + // DOUBLE type should fail + auto result = ModBucketFunction::Create(FieldType::DOUBLE); + ASSERT_NOK(result.status()); +} + +TEST(ModBucketFunctionTest, TestIntEdgeCases) { + ASSERT_OK_AND_ASSIGN(auto func, ModBucketFunction::Create(FieldType::INT)); + + // 0 % 5 = 0 + ASSERT_EQ(0, func->Bucket(CreateIntRow(0), 5)); + // 5 % 5 = 0 + ASSERT_EQ(0, func->Bucket(CreateIntRow(5), 5)); + // -5 floorMod 5 = 0 + ASSERT_EQ(0, func->Bucket(CreateIntRow(-5), 5)); + // 1 % 1 = 0 + ASSERT_EQ(0, func->Bucket(CreateIntRow(1), 1)); +} + +TEST(ModBucketFunctionTest, TestBigintEdgeCases) { + ASSERT_OK_AND_ASSIGN(auto func, ModBucketFunction::Create(FieldType::BIGINT)); + + // Large value + ASSERT_EQ(3, func->Bucket(CreateLongRow(1000000003L), 5)); + // Negative large value: -1000000003 floorMod 5 = 2 + ASSERT_EQ(2, func->Bucket(CreateLongRow(-1000000003L), 5)); +} + +} // namespace paimon::test diff --git a/src/paimon/core/core_options.cpp b/src/paimon/core/core_options.cpp index 3e406998a..97fe3bb48 100644 --- a/src/paimon/core/core_options.cpp +++ b/src/paimon/core/core_options.cpp @@ -255,6 +255,24 @@ class ConfigParser { return Status::OK(); } + // Parse BucketFunctionType + Status ParseBucketFunctionType(BucketFunctionType* bucket_function_type) const { + auto iter = config_map_.find(Options::BUCKET_FUNCTION_TYPE); + if (iter != config_map_.end()) { + std::string str = StringUtils::ToLowerCase(iter->second); + if (str == "default") { + *bucket_function_type = BucketFunctionType::DEFAULT; + } else if (str == "mod") { + *bucket_function_type = BucketFunctionType::MOD; + } else if (str == "hive") { + *bucket_function_type = BucketFunctionType::HIVE; + } else { + return Status::Invalid(fmt::format("invalid bucket function type: {}", str)); + } + } + return Status::OK(); + } + // Parse StartupMode Status ParseStartupMode(StartupMode* startup_mode) const { auto iter = config_map_.find(Options::SCAN_MODE); @@ -389,6 +407,7 @@ struct CoreOptions::Impl { ExternalPathStrategy external_path_strategy = ExternalPathStrategy::NONE; LookupCompactMode lookup_compact_mode = LookupCompactMode::RADICAL; std::optional lookup_compact_max_interval; + BucketFunctionType bucket_function_type = BucketFunctionType::DEFAULT; int32_t file_compression_zstd_level = 1; @@ -733,6 +752,9 @@ Result CoreOptions::FromMap( "The high priority pool ratio should in the range [0, 1), while input is {}", impl->lookup_cache_high_prio_pool_ratio)); } + + // Parse bucket function type + PAIMON_RETURN_NOT_OK(parser.ParseBucketFunctionType(&impl->bucket_function_type)); return options; } @@ -1217,4 +1239,8 @@ double CoreOptions::GetLookupCacheHighPrioPoolRatio() const { return impl_->lookup_cache_high_prio_pool_ratio; } +BucketFunctionType CoreOptions::GetBucketFunctionType() const { + return impl_->bucket_function_type; +} + } // namespace paimon diff --git a/src/paimon/core/core_options.h b/src/paimon/core/core_options.h index d29b7fca0..9afa5de11 100644 --- a/src/paimon/core/core_options.h +++ b/src/paimon/core/core_options.h @@ -24,6 +24,7 @@ #include #include +#include "paimon/core/bucket/bucket_function_type.h" #include "paimon/core/options/changelog_producer.h" #include "paimon/core/options/compress_options.h" #include "paimon/core/options/external_path_strategy.h" @@ -165,6 +166,8 @@ class PAIMON_EXPORT CoreOptions { const std::map& ToMap() const; + BucketFunctionType GetBucketFunctionType() const; + private: std::optional GetDataFileExternalPaths() const; std::optional GetGlobalIndexExternalPath() const; diff --git a/src/paimon/core/core_options_test.cpp b/src/paimon/core/core_options_test.cpp index 6a8a86855..f633d9611 100644 --- a/src/paimon/core/core_options_test.cpp +++ b/src/paimon/core/core_options_test.cpp @@ -21,6 +21,7 @@ #include "gtest/gtest.h" #include "paimon/common/fs/resolving_file_system.h" +#include "paimon/core/bucket/bucket_function_type.h" #include "paimon/core/options/expire_config.h" #include "paimon/defs.h" #include "paimon/format/file_format.h" @@ -130,6 +131,7 @@ TEST(CoreOptionsTest, TestDefaultValue) { ASSERT_EQ(0.25, core_options.GetLookupCacheHighPrioPoolRatio()); ASSERT_FALSE(core_options.LookupRemoteFileEnabled()); ASSERT_EQ(core_options.GetLookupRemoteLevelThreshold(), INT32_MIN); + ASSERT_EQ(BucketFunctionType::DEFAULT, core_options.GetBucketFunctionType()); } TEST(CoreOptionsTest, TestFromMap) { @@ -218,7 +220,8 @@ TEST(CoreOptionsTest, TestFromMap) { {Options::LOOKUP_CACHE_MAX_MEMORY_SIZE, "1MB"}, {Options::LOOKUP_CACHE_HIGH_PRIO_POOL_RATIO, "0.35"}, {Options::LOOKUP_REMOTE_FILE_ENABLED, "True"}, - {Options::LOOKUP_REMOTE_LEVEL_THRESHOLD, "2"}}; + {Options::LOOKUP_REMOTE_LEVEL_THRESHOLD, "2"}, + {Options::BUCKET_FUNCTION_TYPE, "mod"}}; ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options)); auto fs = core_options.GetFileSystem(); @@ -333,6 +336,7 @@ TEST(CoreOptionsTest, TestFromMap) { ASSERT_EQ(0.35, core_options.GetLookupCacheHighPrioPoolRatio()); ASSERT_TRUE(core_options.LookupRemoteFileEnabled()); ASSERT_EQ(core_options.GetLookupRemoteLevelThreshold(), 2); + ASSERT_EQ(BucketFunctionType::MOD, core_options.GetBucketFunctionType()); } TEST(CoreOptionsTest, TestInvalidCase) { @@ -355,6 +359,8 @@ TEST(CoreOptionsTest, TestInvalidCase) { ASSERT_NOK_WITH_MSG( CoreOptions::FromMap({{Options::LOOKUP_CACHE_HIGH_PRIO_POOL_RATIO, "1.1"}}), "The high priority pool ratio should in the range [0, 1), while input is 1.1"); + ASSERT_NOK_WITH_MSG(CoreOptions::FromMap({{Options::BUCKET_FUNCTION_TYPE, "invalid"}}), + "invalid bucket function type: invalid"); } TEST(CoreOptionsTest, TestLookupCompactMaxIntervalComputedValue) { @@ -564,6 +570,7 @@ TEST(CoreOptionsTest, TestNormalizeValueInCoreOption) { {Options::DATA_FILE_EXTERNAL_PATHS_STRATEGY, "ROUND-ROBIN"}, {Options::LOOKUP_COMPACT, "GENTLE"}, {Options::SCAN_MODE, "DEFAULT"}, + {Options::BUCKET_FUNCTION_TYPE, "MOD"}, }; ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options)); @@ -574,5 +581,6 @@ TEST(CoreOptionsTest, TestNormalizeValueInCoreOption) { ASSERT_EQ(SortEngine::MIN_HEAP, core_options.GetSortEngine()); ASSERT_EQ(LookupCompactMode::GENTLE, core_options.GetLookupCompactMode()); ASSERT_TRUE(core_options.SequenceFieldSortOrderIsAscending()); + ASSERT_EQ(BucketFunctionType::MOD, core_options.GetBucketFunctionType()); } } // namespace paimon::test From ccc5b4e44203c358def4457f98da3d7f9f1fc389 Mon Sep 17 00:00:00 2001 From: "zhangchaoming.zcm" Date: Fri, 10 Apr 2026 11:31:54 +0800 Subject: [PATCH 02/10] feat(core): add Hive-compatible bucket function - Add HiveHasher utility with Hive-compatible hash functions for int, long, bytes, and decimal types - Implement HiveBucketFunction aligned with Java HiveBucketFunction, supporting BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, BINARY, DECIMAL, and DATE field types - Handle special cases: -0.0f/0.0 normalization, decimal trailing zero stripping, and null field hashing - Add comprehensive unit tests covering mixed types, nulls, negative zero, and edge cases --- src/paimon/CMakeLists.txt | 2 + .../core/bucket/hive_bucket_function.cpp | 148 ++++++++++ src/paimon/core/bucket/hive_bucket_function.h | 74 +++++ .../core/bucket/hive_bucket_function_test.cpp | 252 ++++++++++++++++++ src/paimon/core/bucket/hive_hasher.h | 159 +++++++++++ 5 files changed, 635 insertions(+) create mode 100644 src/paimon/core/bucket/hive_bucket_function.cpp create mode 100644 src/paimon/core/bucket/hive_bucket_function.h create mode 100644 src/paimon/core/bucket/hive_bucket_function_test.cpp create mode 100644 src/paimon/core/bucket/hive_hasher.h diff --git a/src/paimon/CMakeLists.txt b/src/paimon/CMakeLists.txt index c8b072f51..aa2662238 100644 --- a/src/paimon/CMakeLists.txt +++ b/src/paimon/CMakeLists.txt @@ -149,6 +149,7 @@ set(PAIMON_CORE_SRCS core/disk/io_manager.cpp core/append/append_only_writer.cpp core/append/bucketed_append_compact_manager.cpp + core/bucket/hive_bucket_function.cpp core/bucket/mod_bucket_function.cpp core/casting/binary_to_string_cast_executor.cpp core/casting/boolean_to_decimal_cast_executor.cpp @@ -513,6 +514,7 @@ if(PAIMON_BUILD_TESTS) core/append/append_only_writer_test.cpp core/append/bucketed_append_compact_manager_test.cpp core/bucket/default_bucket_function_test.cpp + core/bucket/hive_bucket_function_test.cpp core/bucket/mod_bucket_function_test.cpp core/casting/cast_executor_factory_test.cpp core/casting/cast_executor_test.cpp diff --git a/src/paimon/core/bucket/hive_bucket_function.cpp b/src/paimon/core/bucket/hive_bucket_function.cpp new file mode 100644 index 000000000..f4f3543e5 --- /dev/null +++ b/src/paimon/core/bucket/hive_bucket_function.cpp @@ -0,0 +1,148 @@ +/* + * Copyright 2024-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/bucket/hive_bucket_function.h" + +#include +#include +#include + +#include "fmt/format.h" +#include "paimon/common/data/binary_row.h" +#include "paimon/core/bucket/hive_hasher.h" +#include "paimon/status.h" + +namespace paimon { + +namespace { + +constexpr int32_t SEED = 0; + +} // namespace + +HiveBucketFunction::HiveBucketFunction(std::vector field_infos) + : field_infos_(std::move(field_infos)) {} + +Result> HiveBucketFunction::Create( + const std::vector& field_types) { + std::vector field_infos; + field_infos.reserve(field_types.size()); + for (const auto& type : field_types) { + field_infos.emplace_back(type); + } + return Create(field_infos); +} + +Result> HiveBucketFunction::Create( + const std::vector& field_infos) { + if (field_infos.empty()) { + return Status::Invalid("HiveBucketFunction requires at least one field"); + } + for (const auto& info : field_infos) { + switch (info.type) { + case FieldType::BOOLEAN: + case FieldType::TINYINT: + case FieldType::SMALLINT: + case FieldType::INT: + case FieldType::BIGINT: + case FieldType::FLOAT: + case FieldType::DOUBLE: + case FieldType::STRING: + case FieldType::BINARY: + case FieldType::DECIMAL: + case FieldType::DATE: + break; + default: + return Status::Invalid(fmt::format("Unsupported type as Hive bucket key type: {}", + static_cast(info.type))); + } + } + return std::unique_ptr(new HiveBucketFunction(field_infos)); +} + +int32_t HiveBucketFunction::Bucket(const BinaryRow& row, int32_t num_buckets) const { + int32_t hash = SEED; + for (int32_t i = 0; i < row.GetFieldCount(); i++) { + hash = (31 * hash) + ComputeHash(row, i); + } + return Mod(hash & std::numeric_limits::max(), num_buckets); +} + +int32_t HiveBucketFunction::ComputeHash(const BinaryRow& row, int32_t field_index) const { + if (row.IsNullAt(field_index)) { + return 0; + } + + const auto& info = field_infos_[field_index]; + switch (info.type) { + case FieldType::BOOLEAN: + return HiveHasher::HashInt(row.GetBoolean(field_index) ? 1 : 0); + case FieldType::TINYINT: + return HiveHasher::HashInt(static_cast(row.GetByte(field_index))); + case FieldType::SMALLINT: + return HiveHasher::HashInt(static_cast(row.GetShort(field_index))); + case FieldType::INT: + case FieldType::DATE: + return HiveHasher::HashInt(row.GetInt(field_index)); + case FieldType::BIGINT: + return HiveHasher::HashLong(row.GetLong(field_index)); + case FieldType::FLOAT: { + float float_value = row.GetFloat(field_index); + int32_t bits; + if (float_value == -0.0f) { + bits = 0; + } else { + std::memcpy(&bits, &float_value, sizeof(bits)); + } + return HiveHasher::HashInt(bits); + } + case FieldType::DOUBLE: { + double double_value = row.GetDouble(field_index); + int64_t bits; + if (double_value == -0.0) { + bits = 0L; + } else { + std::memcpy(&bits, &double_value, sizeof(bits)); + } + return HiveHasher::HashLong(bits); + } + case FieldType::STRING: { + std::string_view sv = row.GetStringView(field_index); + return HiveHasher::HashBytes(sv.data(), static_cast(sv.size())); + } + case FieldType::BINARY: { + auto bytes = row.GetBinary(field_index); + return HiveHasher::HashBytes(bytes->data(), static_cast(bytes->size())); + } + case FieldType::DECIMAL: { + Decimal decimal = row.GetDecimal(field_index, info.precision, info.scale); + return HiveHasher::HashDecimal(decimal); + } + default: + // This should never happen since Create() validates the types. + return 0; + } +} + +int32_t HiveBucketFunction::Mod(int32_t value, int32_t divisor) { + int32_t remainder = value % divisor; + if (remainder < 0) { + return (remainder + divisor) % divisor; + } + return remainder; +} + +} // namespace paimon diff --git a/src/paimon/core/bucket/hive_bucket_function.h b/src/paimon/core/bucket/hive_bucket_function.h new file mode 100644 index 000000000..06e7e68db --- /dev/null +++ b/src/paimon/core/bucket/hive_bucket_function.h @@ -0,0 +1,74 @@ +/* + * Copyright 2024-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "paimon/core/bucket/bucket_function.h" +#include "paimon/defs.h" +#include "paimon/result.h" + +namespace paimon { + +/// Describes a field's type information needed for Hive hashing. +struct HiveFieldInfo { + FieldType type; + int32_t precision = 0; // Used for DECIMAL type + int32_t scale = 0; // Used for DECIMAL type + + explicit HiveFieldInfo(FieldType t) : type(t) {} + HiveFieldInfo(FieldType t, int32_t p, int32_t s) : type(t), precision(p), scale(s) {} +}; + +/// Hive-compatible bucket function. +/// This implements the same bucket assignment logic as Hive, using Hive's hash functions +/// to ensure compatibility between Paimon and Hive bucketed tables. +/// +/// The hash is computed by iterating over all fields in the row: +/// hash = (31 * hash) + computeHash(field_value) +/// Then the bucket is: (hash & INT32_MAX) % numBuckets +class HiveBucketFunction : public BucketFunction { + public: + /// Create a HiveBucketFunction with the given field types. + /// @param field_types The types of all fields in the bucket key row. + /// @return A Result containing the HiveBucketFunction or an error status. + static Result> Create( + const std::vector& field_types); + + /// Create a HiveBucketFunction with detailed field info (including decimal precision/scale). + /// @param field_infos The detailed type info of all fields in the bucket key row. + /// @return A Result containing the HiveBucketFunction or an error status. + static Result> Create( + const std::vector& field_infos); + + int32_t Bucket(const BinaryRow& row, int32_t num_buckets) const override; + + private: + explicit HiveBucketFunction(std::vector field_infos); + + /// Compute the Hive hash for a single field value. + int32_t ComputeHash(const BinaryRow& row, int32_t field_index) const; + + /// Mod operation that always returns non-negative result. + static int32_t Mod(int32_t value, int32_t divisor); + + std::vector field_infos_; +}; + +} // namespace paimon diff --git a/src/paimon/core/bucket/hive_bucket_function_test.cpp b/src/paimon/core/bucket/hive_bucket_function_test.cpp new file mode 100644 index 000000000..4627474ca --- /dev/null +++ b/src/paimon/core/bucket/hive_bucket_function_test.cpp @@ -0,0 +1,252 @@ +/* + * Copyright 2024-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/bucket/hive_bucket_function.h" + +#include "gtest/gtest.h" +#include "paimon/common/data/binary_row.h" +#include "paimon/common/data/binary_row_writer.h" +#include "paimon/core/bucket/hive_hasher.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +namespace { + +/// Helper to create a BinaryRow with INT, STRING, BINARY, DECIMAL(10,4) fields. +/// Matches the Java test: toBinaryRow(rowType, 7, "hello", {1,2,3}, Decimal("12.3400", 10, 4)) +BinaryRow CreateMixedRow(int32_t int_val, const std::string& str_val, + const std::vector& binary_val, int64_t decimal_unscaled, + int32_t decimal_precision, int32_t decimal_scale) { + auto pool = GetDefaultPool(); + BinaryRow row(4); + BinaryRowWriter writer(&row, 0, pool.get()); + + // Field 0: INT + writer.WriteInt(0, int_val); + + // Field 1: STRING + writer.WriteStringView(1, std::string_view(str_val)); + + // Field 2: BINARY + writer.WriteStringView(2, std::string_view(binary_val.data(), binary_val.size())); + + // Field 3: DECIMAL (compact, precision <= 18) + writer.WriteDecimal( + 3, Decimal::FromUnscaledLong(decimal_unscaled, decimal_precision, decimal_scale), + decimal_precision); + + writer.Complete(); + return row; +} + +/// Helper to create a BinaryRow with all null fields. +BinaryRow CreateNullRow(int32_t num_fields) { + auto pool = GetDefaultPool(); + BinaryRow row(num_fields); + BinaryRowWriter writer(&row, 0, pool.get()); + for (int32_t i = 0; i < num_fields; i++) { + writer.SetNullAt(i); + } + writer.Complete(); + return row; +} + +BinaryRow CreateIntRow(int32_t value) { + auto pool = GetDefaultPool(); + BinaryRow row(1); + BinaryRowWriter writer(&row, 0, pool.get()); + writer.WriteInt(0, value); + writer.Complete(); + return row; +} + +BinaryRow CreateBooleanRow(bool value) { + auto pool = GetDefaultPool(); + BinaryRow row(1); + BinaryRowWriter writer(&row, 0, pool.get()); + writer.WriteBoolean(0, value); + writer.Complete(); + return row; +} + +BinaryRow CreateLongRow(int64_t value) { + auto pool = GetDefaultPool(); + BinaryRow row(1); + BinaryRowWriter writer(&row, 0, pool.get()); + writer.WriteLong(0, value); + writer.Complete(); + return row; +} + +BinaryRow CreateFloatRow(float value) { + auto pool = GetDefaultPool(); + BinaryRow row(1); + BinaryRowWriter writer(&row, 0, pool.get()); + writer.WriteFloat(0, value); + writer.Complete(); + return row; +} + +BinaryRow CreateDoubleRow(double value) { + auto pool = GetDefaultPool(); + BinaryRow row(1); + BinaryRowWriter writer(&row, 0, pool.get()); + writer.WriteDouble(0, value); + writer.Complete(); + return row; +} + +BinaryRow CreateStringRow(const std::string& value) { + auto pool = GetDefaultPool(); + BinaryRow row(1); + BinaryRowWriter writer(&row, 0, pool.get()); + writer.WriteStringView(0, std::string_view(value)); + writer.Complete(); + return row; +} + +} // namespace + +/// Test matching Java: testHiveBucketFunction +/// RowType: INT, STRING, BYTES, DECIMAL(10,4) +/// Values: 7, "hello", {1,2,3}, Decimal("12.3400", 10, 4) +TEST(HiveBucketFunctionTest, TestHiveBucketFunction) { + std::vector field_infos = { + HiveFieldInfo(FieldType::INT), + HiveFieldInfo(FieldType::STRING), + HiveFieldInfo(FieldType::BINARY), + HiveFieldInfo(FieldType::DECIMAL, 10, 4), + }; + ASSERT_OK_AND_ASSIGN(auto func, HiveBucketFunction::Create(field_infos)); + + // Decimal("12.3400", 10, 4) => unscaled = 123400 + BinaryRow row = CreateMixedRow(7, "hello", {1, 2, 3}, 123400, 10, 4); + + // Verify individual hash components: + // HiveHasher.hashBytes("hello") = 99162322 + ASSERT_EQ(99162322, HiveHasher::HashBytes("hello", 5)); + // HiveHasher.hashBytes({1,2,3}) = 1026 + ASSERT_EQ(1026, HiveHasher::HashBytes("\x01\x02\x03", 3)); + // BigDecimal("12.34").hashCode() = 1234 * 31 + 2 = 38256 + // (After normalizing "12.3400" -> "12.34", unscaled=1234, scale=2) + ASSERT_EQ(38256, HiveHasher::HashDecimal(Decimal::FromUnscaledLong(123400, 10, 4))); + + // expectedHash = 31*(31*(31*7 + 99162322) + 1026) + 38256 = 805989529 (with int32 overflow) + // bucket = (805989529 & INT32_MAX) % 8 = 1 + ASSERT_EQ(1, func->Bucket(row, 8)); +} + +/// Test matching Java: testHiveBucketFunctionWithNulls +TEST(HiveBucketFunctionTest, TestHiveBucketFunctionWithNulls) { + std::vector field_types = {FieldType::INT, FieldType::STRING}; + ASSERT_OK_AND_ASSIGN(auto func, HiveBucketFunction::Create(field_types)); + + BinaryRow row = CreateNullRow(2); + + // All nulls => hash = 0, bucket = 0 + ASSERT_EQ(0, func->Bucket(row, 4)); +} + +/// Test unsupported type returns error on Create +TEST(HiveBucketFunctionTest, TestUnsupportedType) { + // TIMESTAMP type should fail + std::vector field_types = {FieldType::TIMESTAMP}; + auto result = HiveBucketFunction::Create(field_types); + ASSERT_NOK(result.status()); +} + +/// Test empty field types returns error +TEST(HiveBucketFunctionTest, TestEmptyFieldTypes) { + std::vector field_types = {}; + auto result = HiveBucketFunction::Create(field_types); + ASSERT_NOK(result.status()); +} + +/// Test single INT field +TEST(HiveBucketFunctionTest, TestSingleIntField) { + std::vector field_types = {FieldType::INT}; + ASSERT_OK_AND_ASSIGN(auto func, HiveBucketFunction::Create(field_types)); + + // hash = 31*0 + 42 = 42, bucket = (42 & INT32_MAX) % 5 = 2 + ASSERT_EQ(2, func->Bucket(CreateIntRow(42), 5)); + // hash = 31*0 + 0 = 0, bucket = 0 + ASSERT_EQ(0, func->Bucket(CreateIntRow(0), 5)); +} + +/// Test BOOLEAN field +TEST(HiveBucketFunctionTest, TestBooleanField) { + std::vector field_types = {FieldType::BOOLEAN}; + ASSERT_OK_AND_ASSIGN(auto func, HiveBucketFunction::Create(field_types)); + + // true => hashInt(1) = 1, bucket = 1 % 4 = 1 + ASSERT_EQ(1, func->Bucket(CreateBooleanRow(true), 4)); + // false => hashInt(0) = 0, bucket = 0 + ASSERT_EQ(0, func->Bucket(CreateBooleanRow(false), 4)); +} + +/// Test BIGINT field +TEST(HiveBucketFunctionTest, TestBigintField) { + std::vector field_types = {FieldType::BIGINT}; + ASSERT_OK_AND_ASSIGN(auto func, HiveBucketFunction::Create(field_types)); + + // Java Long.hashCode(100L) = (int)(100 ^ (100 >>> 32)) = 100 + // bucket = 100 % 7 = 2 + ASSERT_EQ(2, func->Bucket(CreateLongRow(100L), 7)); +} + +/// Test FLOAT field with -0.0f +TEST(HiveBucketFunctionTest, TestFloatNegativeZero) { + std::vector field_types = {FieldType::FLOAT}; + ASSERT_OK_AND_ASSIGN(auto func, HiveBucketFunction::Create(field_types)); + + // -0.0f should be treated as 0 => hashInt(0) = 0 + ASSERT_EQ(func->Bucket(CreateFloatRow(0.0f), 5), func->Bucket(CreateFloatRow(-0.0f), 5)); +} + +/// Test DOUBLE field with -0.0 +TEST(HiveBucketFunctionTest, TestDoubleNegativeZero) { + std::vector field_types = {FieldType::DOUBLE}; + ASSERT_OK_AND_ASSIGN(auto func, HiveBucketFunction::Create(field_types)); + + // -0.0 should be treated as 0L => hashLong(0) = 0 + ASSERT_EQ(func->Bucket(CreateDoubleRow(0.0), 5), func->Bucket(CreateDoubleRow(-0.0), 5)); +} + +/// Test STRING field +TEST(HiveBucketFunctionTest, TestStringField) { + std::vector field_types = {FieldType::STRING}; + ASSERT_OK_AND_ASSIGN(auto func, HiveBucketFunction::Create(field_types)); + + // hashBytes("hello") = 99162322 + // bucket = (99162322 & INT32_MAX) % 10 = 99162322 % 10 = 2 + ASSERT_EQ(2, func->Bucket(CreateStringRow("hello"), 10)); +} + +/// Test different num_buckets produce valid results +TEST(HiveBucketFunctionTest, TestDifferentNumBuckets) { + std::vector field_types = {FieldType::INT}; + ASSERT_OK_AND_ASSIGN(auto func, HiveBucketFunction::Create(field_types)); + + for (int32_t num_buckets = 1; num_buckets <= 20; num_buckets++) { + int32_t bucket = func->Bucket(CreateIntRow(12345), num_buckets); + ASSERT_GE(bucket, 0); + ASSERT_LT(bucket, num_buckets); + } +} + +} // namespace paimon::test diff --git a/src/paimon/core/bucket/hive_hasher.h b/src/paimon/core/bucket/hive_hasher.h new file mode 100644 index 000000000..e496c59c5 --- /dev/null +++ b/src/paimon/core/bucket/hive_hasher.h @@ -0,0 +1,159 @@ +/* + * Copyright 2024-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "paimon/data/decimal.h" + +namespace paimon { + +/// Hive-compatible hash utility functions. +/// This class provides hash functions that are compatible with Hive's ObjectInspectorUtils +/// hash implementation, ensuring consistent bucket assignment between Paimon C++ and Java. +class HiveHasher { + public: + /// Hash an int value (identity function, same as Hive). + static int32_t HashInt(int32_t input) { + return input; + } + + /// Hash a long value (same as Java's Long.hashCode). + static int32_t HashLong(int64_t input) { + return static_cast(input ^ (static_cast(input) >> 32)); + } + + /// Hash a byte array. + static int32_t HashBytes(const char* bytes, int32_t length) { + int32_t result = 0; + for (int32_t i = 0; i < length; i++) { + result = (result * 31) + static_cast(bytes[i]); + } + return result; + } + + /// Normalize a Decimal value for Hive-compatible hashing. + /// This implements the same logic as HiveHasher.normalizeDecimal in Java. + /// + /// The normalization process: + /// 1. Strip trailing zeros + /// 2. Check if integer digits exceed max precision (38) + /// 3. Limit scale to min(38, min(38 - intDigits, currentScale)) + /// 4. Round if necessary using HALF_UP + /// + /// @param decimal The decimal value to normalize. + /// @return The hash code of the normalized decimal, computed as Java BigDecimal.hashCode(). + static int32_t HashDecimal(const Decimal& decimal) { + // Java BigDecimal.hashCode() = unscaledValue.intValue() * 31 + scale + // For compact decimals (precision <= 18), we can use the long value directly. + // For non-compact decimals, we need to handle the 128-bit value. + + // First normalize: strip trailing zeros and limit scale + int32_t precision = decimal.Precision(); + int32_t scale = decimal.Scale(); + auto value = decimal.Value(); + + // Strip trailing zeros + if (value == 0) { + // BigDecimal.ZERO.hashCode() = 0 * 31 + 0 = 0 + return 0; + } + + // Strip trailing zeros by dividing by 10 while remainder is 0 + while (scale > 0 && value != 0) { + auto quotient = value / 10; + auto remainder = value - quotient * 10; + if (remainder != 0) { + break; + } + value = quotient; + scale--; + } + + // After stripping, check if value is zero + if (value == 0) { + return 0; + } + + // Count integer digits + auto abs_value = value < 0 ? -value : value; + int32_t total_digits = 0; + auto temp = abs_value; + while (temp > 0) { + temp /= 10; + total_digits++; + } + int32_t int_digits = total_digits - scale; + + if (int_digits > HIVE_DECIMAL_MAX_PRECISION) { + // Overflow, return 0 (null equivalent) + return 0; + } + + int32_t max_scale = HIVE_DECIMAL_MAX_SCALE; + if (HIVE_DECIMAL_MAX_PRECISION - int_digits < max_scale) { + max_scale = HIVE_DECIMAL_MAX_PRECISION - int_digits; + } + if (scale < max_scale) { + max_scale = scale; + } + + if (scale > max_scale) { + // Need to round: scale down with HALF_UP rounding + int32_t scale_diff = scale - max_scale; + for (int32_t i = 0; i < scale_diff; i++) { + auto quotient = value / 10; + auto remainder = value - quotient * 10; + if (remainder < 0) remainder = -remainder; + if (remainder >= 5) { + value = quotient + (value < 0 ? -1 : 1); + } else { + value = quotient; + } + } + scale = max_scale; + + // Strip trailing zeros again after rounding + while (scale > 0 && value != 0) { + auto quotient = value / 10; + auto remainder = value - quotient * 10; + if (remainder != 0) { + break; + } + value = quotient; + scale--; + } + + if (value == 0) { + return 0; + } + } + + // Compute Java BigDecimal.hashCode(): + // hashCode = intValue(unscaledValue) * 31 + scale + // intValue() returns the low 32 bits of the value + int32_t int_value = static_cast(static_cast(value)); + return int_value * 31 + scale; + } + + private: + static constexpr int32_t HIVE_DECIMAL_MAX_PRECISION = 38; + static constexpr int32_t HIVE_DECIMAL_MAX_SCALE = 38; +}; + +} // namespace paimon From a9cc1f850518964c00ba7c064212cee208704e37 Mon Sep 17 00:00:00 2001 From: "zhangchaoming.zcm" Date: Fri, 10 Apr 2026 11:54:04 +0800 Subject: [PATCH 03/10] minor fix --- include/paimon/utils/bucket_id_calculator.h | 1 + src/paimon/core/bucket/bucket_function.h | 2 +- src/paimon/core/bucket/bucket_function_type.h | 4 ++-- src/paimon/core/bucket/default_bucket_function.h | 2 +- src/paimon/core/bucket/default_bucket_function_test.cpp | 2 +- src/paimon/core/bucket/hive_bucket_function.cpp | 2 +- src/paimon/core/bucket/hive_bucket_function.h | 2 +- src/paimon/core/bucket/hive_bucket_function_test.cpp | 2 +- src/paimon/core/bucket/hive_hasher.h | 2 +- src/paimon/core/bucket/mod_bucket_function.cpp | 2 +- src/paimon/core/bucket/mod_bucket_function.h | 2 +- src/paimon/core/bucket/mod_bucket_function_test.cpp | 2 +- 12 files changed, 13 insertions(+), 12 deletions(-) diff --git a/include/paimon/utils/bucket_id_calculator.h b/include/paimon/utils/bucket_id_calculator.h index f7004edb4..78773b56b 100644 --- a/include/paimon/utils/bucket_id_calculator.h +++ b/include/paimon/utils/bucket_id_calculator.h @@ -64,6 +64,7 @@ class PAIMON_EXPORT BucketIdCalculator { /// @param bucket_function The bucket function to use for bucket assignment. static Result> Create( bool is_pk_table, int32_t num_buckets, std::unique_ptr bucket_function); + /// Calculate bucket ids for the given bucket keys. /// @param bucket_keys Arrow struct array containing the bucket key values. /// @param bucket_schema Arrow schema describing the structure of bucket_keys. diff --git a/src/paimon/core/bucket/bucket_function.h b/src/paimon/core/bucket/bucket_function.h index 77d6e4c3a..999f8a9a5 100644 --- a/src/paimon/core/bucket/bucket_function.h +++ b/src/paimon/core/bucket/bucket_function.h @@ -1,5 +1,5 @@ /* - * Copyright 2024-present Alibaba Inc. + * Copyright 2026-present Alibaba Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/paimon/core/bucket/bucket_function_type.h b/src/paimon/core/bucket/bucket_function_type.h index 151107174..cd5de098b 100644 --- a/src/paimon/core/bucket/bucket_function_type.h +++ b/src/paimon/core/bucket/bucket_function_type.h @@ -1,5 +1,5 @@ /* - * Copyright 2024-present Alibaba Inc. + * Copyright 2026-present Alibaba Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,7 +24,7 @@ enum class BucketFunctionType { DEFAULT = 1, // Mod bucket function using modulo operation on bucket key. MOD = 2, - // Hive bucket function (not yet implemented). + // Hive bucket function HIVE = 3 }; diff --git a/src/paimon/core/bucket/default_bucket_function.h b/src/paimon/core/bucket/default_bucket_function.h index a9475c096..cbcd219fa 100644 --- a/src/paimon/core/bucket/default_bucket_function.h +++ b/src/paimon/core/bucket/default_bucket_function.h @@ -1,5 +1,5 @@ /* - * Copyright 2024-present Alibaba Inc. + * Copyright 2026-present Alibaba Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/paimon/core/bucket/default_bucket_function_test.cpp b/src/paimon/core/bucket/default_bucket_function_test.cpp index 37c6868b5..3894fbc94 100644 --- a/src/paimon/core/bucket/default_bucket_function_test.cpp +++ b/src/paimon/core/bucket/default_bucket_function_test.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2024-present Alibaba Inc. + * Copyright 2026-present Alibaba Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/paimon/core/bucket/hive_bucket_function.cpp b/src/paimon/core/bucket/hive_bucket_function.cpp index f4f3543e5..a5d49b016 100644 --- a/src/paimon/core/bucket/hive_bucket_function.cpp +++ b/src/paimon/core/bucket/hive_bucket_function.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2024-present Alibaba Inc. + * Copyright 2026-present Alibaba Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/paimon/core/bucket/hive_bucket_function.h b/src/paimon/core/bucket/hive_bucket_function.h index 06e7e68db..0805d8b21 100644 --- a/src/paimon/core/bucket/hive_bucket_function.h +++ b/src/paimon/core/bucket/hive_bucket_function.h @@ -1,5 +1,5 @@ /* - * Copyright 2024-present Alibaba Inc. + * Copyright 2026-present Alibaba Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/paimon/core/bucket/hive_bucket_function_test.cpp b/src/paimon/core/bucket/hive_bucket_function_test.cpp index 4627474ca..f6ba2a4ab 100644 --- a/src/paimon/core/bucket/hive_bucket_function_test.cpp +++ b/src/paimon/core/bucket/hive_bucket_function_test.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2024-present Alibaba Inc. + * Copyright 2026-present Alibaba Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/paimon/core/bucket/hive_hasher.h b/src/paimon/core/bucket/hive_hasher.h index e496c59c5..27196a660 100644 --- a/src/paimon/core/bucket/hive_hasher.h +++ b/src/paimon/core/bucket/hive_hasher.h @@ -1,5 +1,5 @@ /* - * Copyright 2024-present Alibaba Inc. + * Copyright 2026-present Alibaba Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/paimon/core/bucket/mod_bucket_function.cpp b/src/paimon/core/bucket/mod_bucket_function.cpp index dead2a29c..2176a2173 100644 --- a/src/paimon/core/bucket/mod_bucket_function.cpp +++ b/src/paimon/core/bucket/mod_bucket_function.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2024-present Alibaba Inc. + * Copyright 2026-present Alibaba Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/paimon/core/bucket/mod_bucket_function.h b/src/paimon/core/bucket/mod_bucket_function.h index 23f144606..6d46a7a1e 100644 --- a/src/paimon/core/bucket/mod_bucket_function.h +++ b/src/paimon/core/bucket/mod_bucket_function.h @@ -1,5 +1,5 @@ /* - * Copyright 2024-present Alibaba Inc. + * Copyright 2026-present Alibaba Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/paimon/core/bucket/mod_bucket_function_test.cpp b/src/paimon/core/bucket/mod_bucket_function_test.cpp index 66bea19a3..5c38aba37 100644 --- a/src/paimon/core/bucket/mod_bucket_function_test.cpp +++ b/src/paimon/core/bucket/mod_bucket_function_test.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2024-present Alibaba Inc. + * Copyright 2026-present Alibaba Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From 5bd8cbcfbe02d97920e5373bf3fe8f1273aab9d0 Mon Sep 17 00:00:00 2001 From: "zhangchaoming.zcm" Date: Fri, 10 Apr 2026 12:13:22 +0800 Subject: [PATCH 04/10] minor fix --- src/paimon/core/bucket/hive_hasher.h | 1 - 1 file changed, 1 deletion(-) diff --git a/src/paimon/core/bucket/hive_hasher.h b/src/paimon/core/bucket/hive_hasher.h index 27196a660..0358b7538 100644 --- a/src/paimon/core/bucket/hive_hasher.h +++ b/src/paimon/core/bucket/hive_hasher.h @@ -64,7 +64,6 @@ class HiveHasher { // For non-compact decimals, we need to handle the 128-bit value. // First normalize: strip trailing zeros and limit scale - int32_t precision = decimal.Precision(); int32_t scale = decimal.Scale(); auto value = decimal.Value(); From 5d0312feacd9837c2e021bc2a65bfb2a04bd5640 Mon Sep 17 00:00:00 2001 From: "zhangchaoming.zcm" Date: Fri, 10 Apr 2026 13:24:32 +0800 Subject: [PATCH 05/10] minor fix --- include/paimon/utils/bucket_id_calculator.h | 3 +++ src/paimon/common/utils/bucket_id_calculator.cpp | 2 ++ src/paimon/core/bucket/hive_hasher.h | 2 +- 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/include/paimon/utils/bucket_id_calculator.h b/include/paimon/utils/bucket_id_calculator.h index 78773b56b..19524b0db 100644 --- a/include/paimon/utils/bucket_id_calculator.h +++ b/include/paimon/utils/bucket_id_calculator.h @@ -75,6 +75,9 @@ class PAIMON_EXPORT BucketIdCalculator { Status CalculateBucketIds(ArrowArray* bucket_keys, ArrowSchema* bucket_schema, int32_t* bucket_ids) const; + /// Destructor + ~BucketIdCalculator(); + private: BucketIdCalculator(int32_t num_buckets, std::unique_ptr bucket_function, const std::shared_ptr& pool) diff --git a/src/paimon/common/utils/bucket_id_calculator.cpp b/src/paimon/common/utils/bucket_id_calculator.cpp index 339286cd5..e587ff304 100644 --- a/src/paimon/common/utils/bucket_id_calculator.cpp +++ b/src/paimon/common/utils/bucket_id_calculator.cpp @@ -238,6 +238,8 @@ static Result WriteBucketRow(int32_t col_id, } } // namespace +BucketIdCalculator::~BucketIdCalculator() = default; + Result> BucketIdCalculator::Create( bool is_pk_table, int32_t num_buckets, const std::shared_ptr& pool) { return Create(is_pk_table, num_buckets, std::make_unique(), pool); diff --git a/src/paimon/core/bucket/hive_hasher.h b/src/paimon/core/bucket/hive_hasher.h index 0358b7538..489fd722e 100644 --- a/src/paimon/core/bucket/hive_hasher.h +++ b/src/paimon/core/bucket/hive_hasher.h @@ -146,7 +146,7 @@ class HiveHasher { // Compute Java BigDecimal.hashCode(): // hashCode = intValue(unscaledValue) * 31 + scale // intValue() returns the low 32 bits of the value - int32_t int_value = static_cast(static_cast(value)); + auto int_value = static_cast(static_cast(value)); return int_value * 31 + scale; } From da310fbf12a810a73d8cfc5738ead87d63a933db Mon Sep 17 00:00:00 2001 From: "zhangchaoming.zcm" Date: Fri, 10 Apr 2026 14:18:29 +0800 Subject: [PATCH 06/10] fix --- include/paimon/utils/bucket_id_calculator.h | 3 +-- src/paimon/common/utils/bucket_id_calculator.cpp | 5 +++++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/include/paimon/utils/bucket_id_calculator.h b/include/paimon/utils/bucket_id_calculator.h index 19524b0db..3588f5fbb 100644 --- a/include/paimon/utils/bucket_id_calculator.h +++ b/include/paimon/utils/bucket_id_calculator.h @@ -80,8 +80,7 @@ class PAIMON_EXPORT BucketIdCalculator { private: BucketIdCalculator(int32_t num_buckets, std::unique_ptr bucket_function, - const std::shared_ptr& pool) - : num_buckets_(num_buckets), bucket_function_(std::move(bucket_function)), pool_(pool) {} + const std::shared_ptr& pool); private: int32_t num_buckets_; diff --git a/src/paimon/common/utils/bucket_id_calculator.cpp b/src/paimon/common/utils/bucket_id_calculator.cpp index e587ff304..85149c2d1 100644 --- a/src/paimon/common/utils/bucket_id_calculator.cpp +++ b/src/paimon/common/utils/bucket_id_calculator.cpp @@ -238,6 +238,11 @@ static Result WriteBucketRow(int32_t col_id, } } // namespace +BucketIdCalculator::BucketIdCalculator(int32_t num_buckets, + std::unique_ptr bucket_function, + const std::shared_ptr& pool) + : num_buckets_(num_buckets), bucket_function_(std::move(bucket_function)), pool_(pool) {} + BucketIdCalculator::~BucketIdCalculator() = default; Result> BucketIdCalculator::Create( From 83238b3a709370441cf29add7c50c42ee09d52ec Mon Sep 17 00:00:00 2001 From: "zhangchaoming.zcm" Date: Mon, 13 Apr 2026 13:48:40 +0800 Subject: [PATCH 07/10] address --- include/paimon/utils/bucket_id_calculator.h | 7 - .../common/utils/bucket_id_calculator.cpp | 5 - .../utils/bucket_id_calculator_test.cpp | 6 +- .../core/bucket/default_bucket_function.h | 1 - .../core/bucket/hive_bucket_function.cpp | 15 +- src/paimon/core/bucket/hive_bucket_function.h | 2 +- .../core/bucket/hive_bucket_function_test.cpp | 178 ++++++++---------- .../core/bucket/mod_bucket_function.cpp | 25 +-- .../core/bucket/mod_bucket_function_test.cpp | 126 ++++++++++--- src/paimon/core/core_options.h | 2 +- src/paimon/core/core_options_test.cpp | 2 +- .../bucket_function_type.h | 13 +- 12 files changed, 210 insertions(+), 172 deletions(-) rename src/paimon/core/{bucket => options}/bucket_function_type.h (55%) diff --git a/include/paimon/utils/bucket_id_calculator.h b/include/paimon/utils/bucket_id_calculator.h index 3588f5fbb..29284ea88 100644 --- a/include/paimon/utils/bucket_id_calculator.h +++ b/include/paimon/utils/bucket_id_calculator.h @@ -58,13 +58,6 @@ class PAIMON_EXPORT BucketIdCalculator { bool is_pk_table, int32_t num_buckets, std::unique_ptr bucket_function, const std::shared_ptr& pool); - /// Create `BucketIdCalculator` with a custom bucket function and default memory pool. - /// @param is_pk_table Whether this is for a primary key table. - /// @param num_buckets Number of buckets. - /// @param bucket_function The bucket function to use for bucket assignment. - static Result> Create( - bool is_pk_table, int32_t num_buckets, std::unique_ptr bucket_function); - /// Calculate bucket ids for the given bucket keys. /// @param bucket_keys Arrow struct array containing the bucket key values. /// @param bucket_schema Arrow schema describing the structure of bucket_keys. diff --git a/src/paimon/common/utils/bucket_id_calculator.cpp b/src/paimon/common/utils/bucket_id_calculator.cpp index 85149c2d1..913e8f68a 100644 --- a/src/paimon/common/utils/bucket_id_calculator.cpp +++ b/src/paimon/common/utils/bucket_id_calculator.cpp @@ -273,11 +273,6 @@ Result> BucketIdCalculator::Create( new BucketIdCalculator(num_buckets, std::move(bucket_function), pool)); } -Result> BucketIdCalculator::Create( - bool is_pk_table, int32_t num_buckets, std::unique_ptr bucket_function) { - return Create(is_pk_table, num_buckets, std::move(bucket_function), GetDefaultPool()); -} - Status BucketIdCalculator::CalculateBucketIds(ArrowArray* bucket_keys, ArrowSchema* bucket_schema, int32_t* bucket_ids) const { ScopeGuard guard([bucket_keys, bucket_schema]() { diff --git a/src/paimon/common/utils/bucket_id_calculator_test.cpp b/src/paimon/common/utils/bucket_id_calculator_test.cpp index 8b20b6c48..a13d39f45 100644 --- a/src/paimon/common/utils/bucket_id_calculator_test.cpp +++ b/src/paimon/common/utils/bucket_id_calculator_test.cpp @@ -75,9 +75,9 @@ class BucketIdCalculatorTest : public ::testing::Test { ::ArrowSchema c_bucket_schema; EXPECT_TRUE(arrow::ExportSchema(*bucket_schema, &c_bucket_schema).ok()); std::vector bucket_ids(bucket_array->length()); - EXPECT_OK_AND_ASSIGN( - auto bucket_id_cal, - BucketIdCalculator::Create(is_pk_table, num_buckets, std::move(bucket_function))); + EXPECT_OK_AND_ASSIGN(auto bucket_id_cal, BucketIdCalculator::Create( + is_pk_table, num_buckets, + std::move(bucket_function), GetDefaultPool())); PAIMON_RETURN_NOT_OK(bucket_id_cal->CalculateBucketIds( /*bucket_keys=*/&c_bucket_array, /*bucket_schema=*/&c_bucket_schema, /*bucket_ids=*/bucket_ids.data())); diff --git a/src/paimon/core/bucket/default_bucket_function.h b/src/paimon/core/bucket/default_bucket_function.h index cbcd219fa..d90d81674 100644 --- a/src/paimon/core/bucket/default_bucket_function.h +++ b/src/paimon/core/bucket/default_bucket_function.h @@ -25,7 +25,6 @@ namespace paimon { /// Default bucket function that uses the hash code of the row to determine the bucket. -/// This is consistent with the logic in BucketIdCalculator. class DefaultBucketFunction : public BucketFunction { public: int32_t Bucket(const BinaryRow& row, int32_t num_buckets) const override { diff --git a/src/paimon/core/bucket/hive_bucket_function.cpp b/src/paimon/core/bucket/hive_bucket_function.cpp index a5d49b016..d0e50cceb 100644 --- a/src/paimon/core/bucket/hive_bucket_function.cpp +++ b/src/paimon/core/bucket/hive_bucket_function.cpp @@ -16,12 +16,14 @@ #include "paimon/core/bucket/hive_bucket_function.h" +#include #include #include #include #include "fmt/format.h" #include "paimon/common/data/binary_row.h" +#include "paimon/common/utils/field_type_utils.h" #include "paimon/core/bucket/hive_hasher.h" #include "paimon/status.h" @@ -29,12 +31,12 @@ namespace paimon { namespace { -constexpr int32_t SEED = 0; +static constexpr int32_t SEED = 0; } // namespace -HiveBucketFunction::HiveBucketFunction(std::vector field_infos) - : field_infos_(std::move(field_infos)) {} +HiveBucketFunction::HiveBucketFunction(const std::vector& field_infos) + : field_infos_(field_infos) {} Result> HiveBucketFunction::Create( const std::vector& field_types) { @@ -67,7 +69,7 @@ Result> HiveBucketFunction::Create( break; default: return Status::Invalid(fmt::format("Unsupported type as Hive bucket key type: {}", - static_cast(info.type))); + FieldTypeUtils::FieldTypeToString(info.type))); } } return std::unique_ptr(new HiveBucketFunction(field_infos)); @@ -124,8 +126,8 @@ int32_t HiveBucketFunction::ComputeHash(const BinaryRow& row, int32_t field_inde return HiveHasher::HashBytes(sv.data(), static_cast(sv.size())); } case FieldType::BINARY: { - auto bytes = row.GetBinary(field_index); - return HiveHasher::HashBytes(bytes->data(), static_cast(bytes->size())); + std::string_view sv = row.GetStringView(field_index); + return HiveHasher::HashBytes(sv.data(), static_cast(sv.size())); } case FieldType::DECIMAL: { Decimal decimal = row.GetDecimal(field_index, info.precision, info.scale); @@ -133,6 +135,7 @@ int32_t HiveBucketFunction::ComputeHash(const BinaryRow& row, int32_t field_inde } default: // This should never happen since Create() validates the types. + assert(false); return 0; } } diff --git a/src/paimon/core/bucket/hive_bucket_function.h b/src/paimon/core/bucket/hive_bucket_function.h index 0805d8b21..2a3969af6 100644 --- a/src/paimon/core/bucket/hive_bucket_function.h +++ b/src/paimon/core/bucket/hive_bucket_function.h @@ -60,7 +60,7 @@ class HiveBucketFunction : public BucketFunction { int32_t Bucket(const BinaryRow& row, int32_t num_buckets) const override; private: - explicit HiveBucketFunction(std::vector field_infos); + explicit HiveBucketFunction(const std::vector& field_infos); /// Compute the Hive hash for a single field value. int32_t ComputeHash(const BinaryRow& row, int32_t field_index) const; diff --git a/src/paimon/core/bucket/hive_bucket_function_test.cpp b/src/paimon/core/bucket/hive_bucket_function_test.cpp index f6ba2a4ab..d711d35ed 100644 --- a/src/paimon/core/bucket/hive_bucket_function_test.cpp +++ b/src/paimon/core/bucket/hive_bucket_function_test.cpp @@ -21,111 +21,87 @@ #include "paimon/common/data/binary_row_writer.h" #include "paimon/core/bucket/hive_hasher.h" #include "paimon/memory/memory_pool.h" +#include "paimon/testing/utils/binary_row_generator.h" #include "paimon/testing/utils/testharness.h" namespace paimon::test { -namespace { - -/// Helper to create a BinaryRow with INT, STRING, BINARY, DECIMAL(10,4) fields. -/// Matches the Java test: toBinaryRow(rowType, 7, "hello", {1,2,3}, Decimal("12.3400", 10, 4)) -BinaryRow CreateMixedRow(int32_t int_val, const std::string& str_val, - const std::vector& binary_val, int64_t decimal_unscaled, - int32_t decimal_precision, int32_t decimal_scale) { - auto pool = GetDefaultPool(); - BinaryRow row(4); - BinaryRowWriter writer(&row, 0, pool.get()); - - // Field 0: INT - writer.WriteInt(0, int_val); - - // Field 1: STRING - writer.WriteStringView(1, std::string_view(str_val)); - - // Field 2: BINARY - writer.WriteStringView(2, std::string_view(binary_val.data(), binary_val.size())); - - // Field 3: DECIMAL (compact, precision <= 18) - writer.WriteDecimal( - 3, Decimal::FromUnscaledLong(decimal_unscaled, decimal_precision, decimal_scale), - decimal_precision); - - writer.Complete(); - return row; -} - -/// Helper to create a BinaryRow with all null fields. -BinaryRow CreateNullRow(int32_t num_fields) { - auto pool = GetDefaultPool(); - BinaryRow row(num_fields); - BinaryRowWriter writer(&row, 0, pool.get()); - for (int32_t i = 0; i < num_fields; i++) { - writer.SetNullAt(i); +class HiveBucketFunctionTest : public ::testing::Test { + protected: + /// Helper to create a BinaryRow with INT, STRING, BINARY, DECIMAL(10,4) fields. + /// Matches the Java test: toBinaryRow(rowType, 7, "hello", {1,2,3}, Decimal("12.3400", 10, 4)) + BinaryRow CreateMixedRow(int32_t int_val, const std::string& str_val, + const std::vector& binary_val, int64_t decimal_unscaled, + int32_t decimal_precision, int32_t decimal_scale) { + auto pool = GetDefaultPool(); + BinaryRow row(4); + BinaryRowWriter writer(&row, 0, pool.get()); + + // Field 0: INT + writer.WriteInt(0, int_val); + + // Field 1: STRING + writer.WriteStringView(1, std::string_view(str_val)); + + // Field 2: BINARY + writer.WriteStringView(2, std::string_view(binary_val.data(), binary_val.size())); + + // Field 3: DECIMAL (compact, precision <= 18) + writer.WriteDecimal( + 3, Decimal::FromUnscaledLong(decimal_unscaled, decimal_precision, decimal_scale), + decimal_precision); + + writer.Complete(); + return row; } - writer.Complete(); - return row; -} -BinaryRow CreateIntRow(int32_t value) { - auto pool = GetDefaultPool(); - BinaryRow row(1); - BinaryRowWriter writer(&row, 0, pool.get()); - writer.WriteInt(0, value); - writer.Complete(); - return row; -} + /// Helper to create a BinaryRow with all null fields. + BinaryRow CreateNullRow(int32_t num_fields) { + auto pool = GetDefaultPool(); + BinaryRow row(num_fields); + BinaryRowWriter writer(&row, 0, pool.get()); + for (int32_t i = 0; i < num_fields; i++) { + writer.SetNullAt(i); + } + writer.Complete(); + return row; + } -BinaryRow CreateBooleanRow(bool value) { - auto pool = GetDefaultPool(); - BinaryRow row(1); - BinaryRowWriter writer(&row, 0, pool.get()); - writer.WriteBoolean(0, value); - writer.Complete(); - return row; -} + BinaryRow CreateIntRow(int32_t value) { + auto pool = GetDefaultPool(); + return BinaryRowGenerator::GenerateRow({value}, pool.get()); + } -BinaryRow CreateLongRow(int64_t value) { - auto pool = GetDefaultPool(); - BinaryRow row(1); - BinaryRowWriter writer(&row, 0, pool.get()); - writer.WriteLong(0, value); - writer.Complete(); - return row; -} + BinaryRow CreateBooleanRow(bool value) { + auto pool = GetDefaultPool(); + return BinaryRowGenerator::GenerateRow({value}, pool.get()); + } -BinaryRow CreateFloatRow(float value) { - auto pool = GetDefaultPool(); - BinaryRow row(1); - BinaryRowWriter writer(&row, 0, pool.get()); - writer.WriteFloat(0, value); - writer.Complete(); - return row; -} + BinaryRow CreateLongRow(int64_t value) { + auto pool = GetDefaultPool(); + return BinaryRowGenerator::GenerateRow({value}, pool.get()); + } -BinaryRow CreateDoubleRow(double value) { - auto pool = GetDefaultPool(); - BinaryRow row(1); - BinaryRowWriter writer(&row, 0, pool.get()); - writer.WriteDouble(0, value); - writer.Complete(); - return row; -} + BinaryRow CreateFloatRow(float value) { + auto pool = GetDefaultPool(); + return BinaryRowGenerator::GenerateRow({value}, pool.get()); + } -BinaryRow CreateStringRow(const std::string& value) { - auto pool = GetDefaultPool(); - BinaryRow row(1); - BinaryRowWriter writer(&row, 0, pool.get()); - writer.WriteStringView(0, std::string_view(value)); - writer.Complete(); - return row; -} + BinaryRow CreateDoubleRow(double value) { + auto pool = GetDefaultPool(); + return BinaryRowGenerator::GenerateRow({value}, pool.get()); + } -} // namespace + BinaryRow CreateStringRow(const std::string& value) { + auto pool = GetDefaultPool(); + return BinaryRowGenerator::GenerateRow({value}, pool.get()); + } +}; /// Test matching Java: testHiveBucketFunction /// RowType: INT, STRING, BYTES, DECIMAL(10,4) /// Values: 7, "hello", {1,2,3}, Decimal("12.3400", 10, 4) -TEST(HiveBucketFunctionTest, TestHiveBucketFunction) { +TEST_F(HiveBucketFunctionTest, TestHiveBucketFunction) { std::vector field_infos = { HiveFieldInfo(FieldType::INT), HiveFieldInfo(FieldType::STRING), @@ -152,7 +128,7 @@ TEST(HiveBucketFunctionTest, TestHiveBucketFunction) { } /// Test matching Java: testHiveBucketFunctionWithNulls -TEST(HiveBucketFunctionTest, TestHiveBucketFunctionWithNulls) { +TEST_F(HiveBucketFunctionTest, TestHiveBucketFunctionWithNulls) { std::vector field_types = {FieldType::INT, FieldType::STRING}; ASSERT_OK_AND_ASSIGN(auto func, HiveBucketFunction::Create(field_types)); @@ -163,22 +139,22 @@ TEST(HiveBucketFunctionTest, TestHiveBucketFunctionWithNulls) { } /// Test unsupported type returns error on Create -TEST(HiveBucketFunctionTest, TestUnsupportedType) { +TEST_F(HiveBucketFunctionTest, TestUnsupportedType) { // TIMESTAMP type should fail std::vector field_types = {FieldType::TIMESTAMP}; auto result = HiveBucketFunction::Create(field_types); - ASSERT_NOK(result.status()); + ASSERT_NOK_WITH_MSG(result.status(), "Unsupported type"); } /// Test empty field types returns error -TEST(HiveBucketFunctionTest, TestEmptyFieldTypes) { +TEST_F(HiveBucketFunctionTest, TestEmptyFieldTypes) { std::vector field_types = {}; auto result = HiveBucketFunction::Create(field_types); - ASSERT_NOK(result.status()); + ASSERT_NOK_WITH_MSG(result.status(), "at least one field"); } /// Test single INT field -TEST(HiveBucketFunctionTest, TestSingleIntField) { +TEST_F(HiveBucketFunctionTest, TestSingleIntField) { std::vector field_types = {FieldType::INT}; ASSERT_OK_AND_ASSIGN(auto func, HiveBucketFunction::Create(field_types)); @@ -189,7 +165,7 @@ TEST(HiveBucketFunctionTest, TestSingleIntField) { } /// Test BOOLEAN field -TEST(HiveBucketFunctionTest, TestBooleanField) { +TEST_F(HiveBucketFunctionTest, TestBooleanField) { std::vector field_types = {FieldType::BOOLEAN}; ASSERT_OK_AND_ASSIGN(auto func, HiveBucketFunction::Create(field_types)); @@ -200,7 +176,7 @@ TEST(HiveBucketFunctionTest, TestBooleanField) { } /// Test BIGINT field -TEST(HiveBucketFunctionTest, TestBigintField) { +TEST_F(HiveBucketFunctionTest, TestBigintField) { std::vector field_types = {FieldType::BIGINT}; ASSERT_OK_AND_ASSIGN(auto func, HiveBucketFunction::Create(field_types)); @@ -210,7 +186,7 @@ TEST(HiveBucketFunctionTest, TestBigintField) { } /// Test FLOAT field with -0.0f -TEST(HiveBucketFunctionTest, TestFloatNegativeZero) { +TEST_F(HiveBucketFunctionTest, TestFloatNegativeZero) { std::vector field_types = {FieldType::FLOAT}; ASSERT_OK_AND_ASSIGN(auto func, HiveBucketFunction::Create(field_types)); @@ -219,7 +195,7 @@ TEST(HiveBucketFunctionTest, TestFloatNegativeZero) { } /// Test DOUBLE field with -0.0 -TEST(HiveBucketFunctionTest, TestDoubleNegativeZero) { +TEST_F(HiveBucketFunctionTest, TestDoubleNegativeZero) { std::vector field_types = {FieldType::DOUBLE}; ASSERT_OK_AND_ASSIGN(auto func, HiveBucketFunction::Create(field_types)); @@ -228,7 +204,7 @@ TEST(HiveBucketFunctionTest, TestDoubleNegativeZero) { } /// Test STRING field -TEST(HiveBucketFunctionTest, TestStringField) { +TEST_F(HiveBucketFunctionTest, TestStringField) { std::vector field_types = {FieldType::STRING}; ASSERT_OK_AND_ASSIGN(auto func, HiveBucketFunction::Create(field_types)); @@ -238,7 +214,7 @@ TEST(HiveBucketFunctionTest, TestStringField) { } /// Test different num_buckets produce valid results -TEST(HiveBucketFunctionTest, TestDifferentNumBuckets) { +TEST_F(HiveBucketFunctionTest, TestDifferentNumBuckets) { std::vector field_types = {FieldType::INT}; ASSERT_OK_AND_ASSIGN(auto func, HiveBucketFunction::Create(field_types)); diff --git a/src/paimon/core/bucket/mod_bucket_function.cpp b/src/paimon/core/bucket/mod_bucket_function.cpp index 2176a2173..9df5dd812 100644 --- a/src/paimon/core/bucket/mod_bucket_function.cpp +++ b/src/paimon/core/bucket/mod_bucket_function.cpp @@ -16,29 +16,23 @@ #include "paimon/core/bucket/mod_bucket_function.h" +#include + #include "fmt/format.h" #include "paimon/common/data/binary_row.h" +#include "paimon/common/utils/field_type_utils.h" #include "paimon/status.h" namespace paimon { namespace { -/// Equivalent to Java's Math.floorMod(int, int). -/// The result always has the same sign as the divisor (y), or is zero. -inline int32_t FloorMod(int32_t x, int32_t y) { - int32_t mod = x % y; - // If the signs of mod and y differ and mod is not zero, adjust. - if ((mod ^ y) < 0 && mod != 0) { - mod += y; - } - return mod; -} - -/// Equivalent to Java's Math.floorMod(long, int). +/// Equivalent to Java's Math.floorMod semantics. /// The result always has the same sign as the divisor (y), or is zero. -inline int32_t FloorMod(int64_t x, int32_t y) { - int64_t mod = x % static_cast(y); +/// Works for both int32_t and int64_t as T. +template +inline int32_t FloorMod(T x, int32_t y) { + auto mod = static_cast(x) % static_cast(y); // If the signs of mod and y differ and mod is not zero, adjust. if ((mod ^ static_cast(y)) < 0 && mod != 0) { mod += y; @@ -55,7 +49,7 @@ Result> ModBucketFunction::Create(FieldType b if (bucket_key_type != FieldType::INT && bucket_key_type != FieldType::BIGINT) { return Status::Invalid( fmt::format("ModBucketFunction only supports INT or BIGINT bucket key type, but got {}", - static_cast(bucket_key_type))); + FieldTypeUtils::FieldTypeToString(bucket_key_type))); } return std::unique_ptr(new ModBucketFunction(bucket_key_type)); } @@ -68,6 +62,7 @@ int32_t ModBucketFunction::Bucket(const BinaryRow& row, int32_t num_buckets) con return FloorMod(row.GetLong(0), num_buckets); default: // This should never happen since Create() validates the type. + assert(false); return 0; } } diff --git a/src/paimon/core/bucket/mod_bucket_function_test.cpp b/src/paimon/core/bucket/mod_bucket_function_test.cpp index 5c38aba37..574c6c4e2 100644 --- a/src/paimon/core/bucket/mod_bucket_function_test.cpp +++ b/src/paimon/core/bucket/mod_bucket_function_test.cpp @@ -17,9 +17,8 @@ #include "paimon/core/bucket/mod_bucket_function.h" #include "gtest/gtest.h" -#include "paimon/common/data/binary_row.h" -#include "paimon/common/data/binary_row_writer.h" #include "paimon/memory/memory_pool.h" +#include "paimon/testing/utils/binary_row_generator.h" #include "paimon/testing/utils/testharness.h" namespace paimon::test { @@ -28,20 +27,12 @@ namespace { BinaryRow CreateIntRow(int32_t value) { auto pool = GetDefaultPool(); - BinaryRow row(1); - BinaryRowWriter writer(&row, 0, pool.get()); - writer.WriteInt(0, value); - writer.Complete(); - return row; + return BinaryRowGenerator::GenerateRow({value}, pool.get()); } BinaryRow CreateLongRow(int64_t value) { auto pool = GetDefaultPool(); - BinaryRow row(1); - BinaryRowWriter writer(&row, 0, pool.get()); - writer.WriteLong(0, value); - writer.Complete(); - return row; + return BinaryRowGenerator::GenerateRow({value}, pool.get()); } } // namespace @@ -68,22 +59,22 @@ TEST(ModBucketFunctionTest, TestBigintType) { ASSERT_EQ(2, func->Bucket(CreateLongRow(-3), 5)); } -TEST(ModBucketFunctionTest, TestUnsupportedType) { - // STRING type should fail - auto result = ModBucketFunction::Create(FieldType::STRING); - ASSERT_NOK(result.status()); -} - -TEST(ModBucketFunctionTest, TestUnsupportedFloatType) { - // FLOAT type should fail - auto result = ModBucketFunction::Create(FieldType::FLOAT); - ASSERT_NOK(result.status()); -} - -TEST(ModBucketFunctionTest, TestUnsupportedDoubleType) { - // DOUBLE type should fail - auto result = ModBucketFunction::Create(FieldType::DOUBLE); - ASSERT_NOK(result.status()); +TEST(ModBucketFunctionTest, TestUnsupportedTypes) { + { + // STRING type should fail + auto result = ModBucketFunction::Create(FieldType::STRING); + ASSERT_NOK_WITH_MSG(result.status(), "only supports INT or BIGINT"); + } + { + // FLOAT type should fail + auto result = ModBucketFunction::Create(FieldType::FLOAT); + ASSERT_NOK_WITH_MSG(result.status(), "only supports INT or BIGINT"); + } + { + // DOUBLE type should fail + auto result = ModBucketFunction::Create(FieldType::DOUBLE); + ASSERT_NOK_WITH_MSG(result.status(), "only supports INT or BIGINT"); + } } TEST(ModBucketFunctionTest, TestIntEdgeCases) { @@ -108,4 +99,83 @@ TEST(ModBucketFunctionTest, TestBigintEdgeCases) { ASSERT_EQ(2, func->Bucket(CreateLongRow(-1000000003L), 5)); } +/// Large random compatibility test to ensure alignment with Java's Math.floorMod behavior. +/// The expected values are pre-computed using Java's Math.floorMod. +TEST(ModBucketFunctionTest, TestCompatibleWithJava) { + ASSERT_OK_AND_ASSIGN(auto int_func, ModBucketFunction::Create(FieldType::INT)); + ASSERT_OK_AND_ASSIGN(auto long_func, ModBucketFunction::Create(FieldType::BIGINT)); + + // Test INT type: pairs of (value, num_buckets) -> expected bucket (Java Math.floorMod) + // These values cover positive, negative, zero, edge cases, and large values. + struct IntTestCase { + int32_t value; + int32_t num_buckets; + int32_t expected; + }; + std::vector int_cases = { + {0, 10, 0}, + {1, 10, 1}, + {-1, 10, 9}, + {10, 10, 0}, + {-10, 10, 0}, + {11, 10, 1}, + {-11, 10, 9}, + {2147483647, 100, 47}, // INT32_MAX + {-2147483647, 100, 53}, // -(INT32_MAX) + {2147483647, 7, 1}, + {-2147483647, 7, 6}, + {123456789, 1000, 789}, + {-123456789, 1000, 211}, + {999, 1, 0}, + {-999, 1, 0}, + {42, 3, 0}, + {-42, 3, 0}, + {43, 3, 1}, + {-43, 3, 2}, + {100, 7, 2}, + {-100, 7, 5}, + }; + for (const auto& tc : int_cases) { + ASSERT_EQ(tc.expected, int_func->Bucket(CreateIntRow(tc.value), tc.num_buckets)) + << "INT floorMod(" << tc.value << ", " << tc.num_buckets << ")"; + } + + // Test BIGINT type: pairs of (value, num_buckets) -> expected bucket (Java Math.floorMod) + struct LongTestCase { + int64_t value; + int32_t num_buckets; + int32_t expected; + }; + std::vector long_cases = { + {0L, 10, 0}, + {1L, 10, 1}, + {-1L, 10, 9}, + {10L, 10, 0}, + {-10L, 10, 0}, + {9223372036854775807L, 100, 7}, // INT64_MAX + {-9223372036854775807L, 100, 93}, // -(INT64_MAX) + {9223372036854775807L, 7, 0}, + {-9223372036854775807L, 7, 0}, + {1234567890123456789L, 1000, 789}, + {-1234567890123456789L, 1000, 211}, + {100L, 7, 2}, + {-100L, 7, 5}, + {999999999999L, 13, 0}, + {-999999999999L, 13, 0}, + }; + for (const auto& tc : long_cases) { + ASSERT_EQ(tc.expected, long_func->Bucket(CreateLongRow(tc.value), tc.num_buckets)) + << "BIGINT floorMod(" << tc.value << ", " << tc.num_buckets << ")"; + } + + // Verify that all bucket results are in valid range [0, num_buckets) + for (int32_t num_buckets = 1; num_buckets <= 50; num_buckets++) { + for (int32_t v = -100; v <= 100; v++) { + int32_t bucket = int_func->Bucket(CreateIntRow(v), num_buckets); + ASSERT_GE(bucket, 0); + ASSERT_LT(bucket, num_buckets); + } + } +} + } // namespace paimon::test diff --git a/src/paimon/core/core_options.h b/src/paimon/core/core_options.h index 9afa5de11..ddaab99b7 100644 --- a/src/paimon/core/core_options.h +++ b/src/paimon/core/core_options.h @@ -24,7 +24,7 @@ #include #include -#include "paimon/core/bucket/bucket_function_type.h" +#include "paimon/core/options/bucket_function_type.h" #include "paimon/core/options/changelog_producer.h" #include "paimon/core/options/compress_options.h" #include "paimon/core/options/external_path_strategy.h" diff --git a/src/paimon/core/core_options_test.cpp b/src/paimon/core/core_options_test.cpp index f633d9611..56cab4848 100644 --- a/src/paimon/core/core_options_test.cpp +++ b/src/paimon/core/core_options_test.cpp @@ -21,7 +21,7 @@ #include "gtest/gtest.h" #include "paimon/common/fs/resolving_file_system.h" -#include "paimon/core/bucket/bucket_function_type.h" +#include "paimon/core/options/bucket_function_type.h" #include "paimon/core/options/expire_config.h" #include "paimon/defs.h" #include "paimon/format/file_format.h" diff --git a/src/paimon/core/bucket/bucket_function_type.h b/src/paimon/core/options/bucket_function_type.h similarity index 55% rename from src/paimon/core/bucket/bucket_function_type.h rename to src/paimon/core/options/bucket_function_type.h index cd5de098b..9efa4d69f 100644 --- a/src/paimon/core/bucket/bucket_function_type.h +++ b/src/paimon/core/options/bucket_function_type.h @@ -19,12 +19,19 @@ namespace paimon { /// Specifies the bucket function type for paimon bucket. +/// This determines how rows are assigned to buckets during data writing. enum class BucketFunctionType { - // Default bucket function using hash code. + /// Default bucket function using hash code. + /// Computes the hash code of the bucket key row and assigns the bucket + /// based on the absolute value of (hashCode % numBuckets). DEFAULT = 1, - // Mod bucket function using modulo operation on bucket key. + /// Mod bucket function using modulo operation on bucket key. + /// Applies Java's Math.floorMod semantics on the bucket key value (INT or BIGINT) + /// to determine the bucket assignment. MOD = 2, - // Hive bucket function + /// Hive-compatible bucket function. + /// Uses Hive's ObjectInspectorUtils hash implementation to ensure + /// consistent bucket assignment between Paimon and Hive bucketed tables. HIVE = 3 }; From a93b068d9172517e3d8e8710e575db4ab961df34 Mon Sep 17 00:00:00 2001 From: "zhangchaoming.zcm" Date: Mon, 13 Apr 2026 19:37:30 +0800 Subject: [PATCH 08/10] address --- include/paimon/utils/bucket_function_type.h | 50 +++++++ include/paimon/utils/bucket_id_calculator.h | 48 +++++-- .../common/utils/bucket_id_calculator.cpp | 52 +++++++- .../utils/bucket_id_calculator_test.cpp | 124 ++++++++++++++++++ src/paimon/core/bucket/hive_bucket_function.h | 12 +- .../core/options/bucket_function_type.h | 22 +--- 6 files changed, 260 insertions(+), 48 deletions(-) create mode 100644 include/paimon/utils/bucket_function_type.h diff --git a/include/paimon/utils/bucket_function_type.h b/include/paimon/utils/bucket_function_type.h new file mode 100644 index 000000000..7506255d3 --- /dev/null +++ b/include/paimon/utils/bucket_function_type.h @@ -0,0 +1,50 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "paimon/defs.h" +#include "paimon/visibility.h" + +namespace paimon { + +/// Specifies the bucket function type for paimon bucket. +/// This determines how rows are assigned to buckets during data writing. +enum class BucketFunctionType { + /// The default bucket function which will use arithmetic: + /// bucket_id = abs(hash_bucket_binary_row % numBuckets) to get bucket. + DEFAULT = 1, + /// The modulus bucket function which will use modulus arithmetic: + /// bucket_id = floorMod(bucket_key_value, numBuckets) to get bucket. + /// Note: the bucket key must be a single field of INT or BIGINT datatype. + MOD = 2, + /// The hive bucket function which will use hive-compatible hash arithmetic to get bucket. + HIVE = 3 +}; + +/// Describes a field's type information needed for Hive hashing. +struct PAIMON_EXPORT HiveFieldInfo { + FieldType type; + int32_t precision = 0; // Used for DECIMAL type + int32_t scale = 0; // Used for DECIMAL type + + explicit HiveFieldInfo(FieldType t) : type(t) {} + HiveFieldInfo(FieldType t, int32_t p, int32_t s) : type(t), precision(p), scale(s) {} +}; + +} // namespace paimon diff --git a/include/paimon/utils/bucket_id_calculator.h b/include/paimon/utils/bucket_id_calculator.h index 29284ea88..5aa4b95bb 100644 --- a/include/paimon/utils/bucket_id_calculator.h +++ b/include/paimon/utils/bucket_id_calculator.h @@ -17,10 +17,12 @@ #pragma once #include #include +#include #include "paimon/memory/memory_pool.h" #include "paimon/result.h" #include "paimon/status.h" +#include "paimon/utils/bucket_function_type.h" #include "paimon/visibility.h" struct ArrowSchema; @@ -36,27 +38,51 @@ class MemoryPool; /// hash-based distribution to ensure even data distribution across buckets. class PAIMON_EXPORT BucketIdCalculator { public: - /// Create `BucketIdCalculator` with custom memory pool. + /// Create `BucketIdCalculator` with default bucket function. /// @param is_pk_table Whether this is for a primary key table. /// @param num_buckets Number of buckets. - /// @param pool Memory pool for memory allocation. + /// @param pool Memory pool for memory allocation. If nullptr, uses default pool. static Result> Create( - bool is_pk_table, int32_t num_buckets, const std::shared_ptr& pool); + bool is_pk_table, int32_t num_buckets, const std::shared_ptr& pool = nullptr); - /// Create `BucketIdCalculator` with default memory pool. + /// Create `BucketIdCalculator` with a custom bucket function. /// @param is_pk_table Whether this is for a primary key table. /// @param num_buckets Number of buckets. - static Result> Create(bool is_pk_table, - int32_t num_buckets); + /// @param bucket_function The bucket function to use for bucket assignment. + /// @param pool Memory pool for memory allocation. If nullptr, uses default pool. + static Result> Create( + bool is_pk_table, int32_t num_buckets, std::unique_ptr bucket_function, + const std::shared_ptr& pool = nullptr); - /// Create `BucketIdCalculator` with a custom bucket function and memory pool. + /// Create `BucketIdCalculator` with a bucket function type. /// @param is_pk_table Whether this is for a primary key table. /// @param num_buckets Number of buckets. - /// @param bucket_function The bucket function to use for bucket assignment. - /// @param pool Memory pool for memory allocation. + /// @param type The bucket function type. Must be BucketFunctionType::DEFAULT. + /// @param pool Memory pool for memory allocation. If nullptr, uses default pool. static Result> Create( - bool is_pk_table, int32_t num_buckets, std::unique_ptr bucket_function, - const std::shared_ptr& pool); + bool is_pk_table, int32_t num_buckets, BucketFunctionType type, + const std::shared_ptr& pool = nullptr); + + /// Create `BucketIdCalculator` with MOD bucket function type. + /// @param is_pk_table Whether this is for a primary key table. + /// @param num_buckets Number of buckets. + /// @param type The bucket function type. Must be BucketFunctionType::MOD. + /// @param bucket_key_type The type of the single bucket key field. Must be INT or BIGINT. + /// @param pool Memory pool for memory allocation. If nullptr, uses default pool. + static Result> Create( + bool is_pk_table, int32_t num_buckets, BucketFunctionType type, FieldType bucket_key_type, + const std::shared_ptr& pool = nullptr); + + /// Create `BucketIdCalculator` with HIVE bucket function type. + /// @param is_pk_table Whether this is for a primary key table. + /// @param num_buckets Number of buckets. + /// @param type The bucket function type. Must be BucketFunctionType::HIVE. + /// @param field_infos The detailed type info of all fields in the bucket key row. + /// @param pool Memory pool for memory allocation. If nullptr, uses default pool. + static Result> Create( + bool is_pk_table, int32_t num_buckets, BucketFunctionType type, + const std::vector& field_infos, + const std::shared_ptr& pool = nullptr); /// Calculate bucket ids for the given bucket keys. /// @param bucket_keys Arrow struct array containing the bucket key values. diff --git a/src/paimon/common/utils/bucket_id_calculator.cpp b/src/paimon/common/utils/bucket_id_calculator.cpp index 913e8f68a..d60dcc0d0 100644 --- a/src/paimon/common/utils/bucket_id_calculator.cpp +++ b/src/paimon/common/utils/bucket_id_calculator.cpp @@ -46,6 +46,8 @@ #include "paimon/common/utils/scope_guard.h" #include "paimon/core/bucket/bucket_function.h" #include "paimon/core/bucket/default_bucket_function.h" +#include "paimon/core/bucket/hive_bucket_function.h" +#include "paimon/core/bucket/mod_bucket_function.h" #include "paimon/data/decimal.h" #include "paimon/data/timestamp.h" #include "paimon/memory/memory_pool.h" @@ -238,6 +240,12 @@ static Result WriteBucketRow(int32_t col_id, } } // namespace +namespace { +std::shared_ptr GetPoolOrDefault(const std::shared_ptr& pool) { + return pool ? pool : GetDefaultPool(); +} +} // namespace + BucketIdCalculator::BucketIdCalculator(int32_t num_buckets, std::unique_ptr bucket_function, const std::shared_ptr& pool) @@ -250,11 +258,6 @@ Result> BucketIdCalculator::Create( return Create(is_pk_table, num_buckets, std::make_unique(), pool); } -Result> BucketIdCalculator::Create(bool is_pk_table, - int32_t num_buckets) { - return Create(is_pk_table, num_buckets, GetDefaultPool()); -} - Result> BucketIdCalculator::Create( bool is_pk_table, int32_t num_buckets, std::unique_ptr bucket_function, const std::shared_ptr& pool) { @@ -270,7 +273,44 @@ Result> BucketIdCalculator::Create( return Status::Invalid("Append table not support PostponeBucketMode"); } return std::unique_ptr( - new BucketIdCalculator(num_buckets, std::move(bucket_function), pool)); + new BucketIdCalculator(num_buckets, std::move(bucket_function), GetPoolOrDefault(pool))); +} + +Result> BucketIdCalculator::Create( + bool is_pk_table, int32_t num_buckets, BucketFunctionType type, + const std::shared_ptr& pool) { + switch (type) { + case BucketFunctionType::DEFAULT: + return Create(is_pk_table, num_buckets, std::make_unique(), + pool); + case BucketFunctionType::MOD: + return Status::Invalid("MOD bucket function type requires a bucket_key_type parameter"); + case BucketFunctionType::HIVE: + return Status::Invalid("HIVE bucket function type requires a field_infos parameter"); + default: + return Status::Invalid("Unknown bucket function type"); + } +} + +Result> BucketIdCalculator::Create( + bool is_pk_table, int32_t num_buckets, BucketFunctionType type, FieldType bucket_key_type, + const std::shared_ptr& pool) { + if (type != BucketFunctionType::MOD) { + return Status::Invalid( + "bucket_key_type parameter is only valid for MOD bucket function type"); + } + PAIMON_ASSIGN_OR_RAISE(auto mod_func, ModBucketFunction::Create(bucket_key_type)); + return Create(is_pk_table, num_buckets, std::move(mod_func), pool); +} + +Result> BucketIdCalculator::Create( + bool is_pk_table, int32_t num_buckets, BucketFunctionType type, + const std::vector& field_infos, const std::shared_ptr& pool) { + if (type != BucketFunctionType::HIVE) { + return Status::Invalid("field_infos parameter is only valid for HIVE bucket function type"); + } + PAIMON_ASSIGN_OR_RAISE(auto hive_func, HiveBucketFunction::Create(field_infos)); + return Create(is_pk_table, num_buckets, std::move(hive_func), pool); } Status BucketIdCalculator::CalculateBucketIds(ArrowArray* bucket_keys, ArrowSchema* bucket_schema, diff --git a/src/paimon/common/utils/bucket_id_calculator_test.cpp b/src/paimon/common/utils/bucket_id_calculator_test.cpp index a13d39f45..160c95058 100644 --- a/src/paimon/common/utils/bucket_id_calculator_test.cpp +++ b/src/paimon/common/utils/bucket_id_calculator_test.cpp @@ -35,6 +35,7 @@ #include "paimon/core/bucket/mod_bucket_function.h" #include "paimon/fs/local/local_file_system.h" #include "paimon/testing/utils/testharness.h" +#include "paimon/utils/bucket_function_type.h" namespace paimon::test { class BucketIdCalculatorTest : public ::testing::Test { @@ -366,4 +367,127 @@ TEST_F(BucketIdCalculatorTest, TestWithDefaultBucketFunctionExplicit) { "[[10], [-1], [50], [-13], [0]]")); ASSERT_EQ(result_default, result_explicit); } + +TEST_F(BucketIdCalculatorTest, TestCreateWithBucketFunctionTypeDefault) { + auto bucket_schema = arrow::schema(arrow::FieldVector({arrow::field("b0", arrow::int32())})); + std::string data_str = "[[10], [-1], [50], [-13], [0]]"; + + // Calculate with BucketFunctionType::DEFAULT + ASSERT_OK_AND_ASSIGN(auto calc_typed, + BucketIdCalculator::Create(/*is_pk_table=*/true, /*num_buckets=*/10, + BucketFunctionType::DEFAULT)); + + // Calculate with the original default Create (no type) + ASSERT_OK_AND_ASSIGN(auto calc_default, + BucketIdCalculator::Create(/*is_pk_table=*/true, /*num_buckets=*/10)); + + auto bucket_array1 = + arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(bucket_schema->fields()), data_str) + .ValueOrDie(); + ::ArrowArray c_array1; + EXPECT_TRUE(arrow::ExportArray(*bucket_array1, &c_array1).ok()); + ::ArrowSchema c_schema1; + EXPECT_TRUE(arrow::ExportSchema(*bucket_schema, &c_schema1).ok()); + std::vector result_typed(bucket_array1->length()); + ASSERT_OK(calc_typed->CalculateBucketIds(&c_array1, &c_schema1, result_typed.data())); + + auto bucket_array2 = + arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(bucket_schema->fields()), data_str) + .ValueOrDie(); + ::ArrowArray c_array2; + EXPECT_TRUE(arrow::ExportArray(*bucket_array2, &c_array2).ok()); + ::ArrowSchema c_schema2; + EXPECT_TRUE(arrow::ExportSchema(*bucket_schema, &c_schema2).ok()); + std::vector result_default(bucket_array2->length()); + ASSERT_OK(calc_default->CalculateBucketIds(&c_array2, &c_schema2, result_default.data())); + + ASSERT_EQ(result_default, result_typed); +} + +TEST_F(BucketIdCalculatorTest, TestCreateWithBucketFunctionTypeMod) { + auto bucket_schema = arrow::schema(arrow::FieldVector({arrow::field("b0", arrow::int32())})); + std::string data_str = "[[10], [-1], [50], [-13], [0]]"; + + // Calculate with BucketFunctionType::MOD + ASSERT_OK_AND_ASSIGN(auto calc_typed, + BucketIdCalculator::Create(/*is_pk_table=*/true, /*num_buckets=*/10, + BucketFunctionType::MOD, FieldType::INT)); + + // Calculate with explicit ModBucketFunction + ASSERT_OK_AND_ASSIGN(auto mod_func, ModBucketFunction::Create(FieldType::INT)); + ASSERT_OK_AND_ASSIGN(std::vector result_explicit, + CalculateBucketIds(/*is_pk_table=*/true, /*num_buckets=*/10, + std::move(mod_func), bucket_schema, data_str)); + + auto bucket_array = + arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(bucket_schema->fields()), data_str) + .ValueOrDie(); + ::ArrowArray c_array; + EXPECT_TRUE(arrow::ExportArray(*bucket_array, &c_array).ok()); + ::ArrowSchema c_schema; + EXPECT_TRUE(arrow::ExportSchema(*bucket_schema, &c_schema).ok()); + std::vector result_typed(bucket_array->length()); + ASSERT_OK(calc_typed->CalculateBucketIds(&c_array, &c_schema, result_typed.data())); + + ASSERT_EQ(result_explicit, result_typed); + // Verify expected values (Java Math.floorMod semantics) + std::vector expected = {0, 9, 0, 7, 0}; + ASSERT_EQ(expected, result_typed); +} + +TEST_F(BucketIdCalculatorTest, TestCreateWithBucketFunctionTypeHive) { + auto bucket_schema = arrow::schema(arrow::FieldVector({arrow::field("b0", arrow::int32())})); + std::string data_str = "[[42], [0], [100]]"; + + std::vector field_infos = {HiveFieldInfo(FieldType::INT)}; + + // Calculate with BucketFunctionType::HIVE + ASSERT_OK_AND_ASSIGN(auto calc_typed, + BucketIdCalculator::Create(/*is_pk_table=*/true, /*num_buckets=*/5, + BucketFunctionType::HIVE, field_infos)); + + auto bucket_array = + arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(bucket_schema->fields()), data_str) + .ValueOrDie(); + ::ArrowArray c_array; + EXPECT_TRUE(arrow::ExportArray(*bucket_array, &c_array).ok()); + ::ArrowSchema c_schema; + EXPECT_TRUE(arrow::ExportSchema(*bucket_schema, &c_schema).ok()); + std::vector result(bucket_array->length()); + ASSERT_OK(calc_typed->CalculateBucketIds(&c_array, &c_schema, result.data())); + + // Verify all bucket ids are in valid range + for (auto bucket_id : result) { + ASSERT_GE(bucket_id, 0); + ASSERT_LT(bucket_id, 5); + } +} + +TEST_F(BucketIdCalculatorTest, TestCreateWithBucketFunctionTypeErrors) { + { + // MOD type without bucket_key_type should fail + ASSERT_NOK_WITH_MSG(BucketIdCalculator::Create(/*is_pk_table=*/true, /*num_buckets=*/10, + BucketFunctionType::MOD, GetDefaultPool()), + "MOD bucket function type requires a bucket_key_type parameter"); + } + { + // HIVE type without field_infos should fail + ASSERT_NOK_WITH_MSG(BucketIdCalculator::Create(/*is_pk_table=*/true, /*num_buckets=*/10, + BucketFunctionType::HIVE, GetDefaultPool()), + "HIVE bucket function type requires a field_infos parameter"); + } + { + // bucket_key_type with non-MOD type should fail + ASSERT_NOK_WITH_MSG(BucketIdCalculator::Create(/*is_pk_table=*/true, /*num_buckets=*/10, + BucketFunctionType::DEFAULT, FieldType::INT), + "bucket_key_type parameter is only valid for MOD bucket function type"); + } + { + // field_infos with non-HIVE type should fail + std::vector field_infos = {HiveFieldInfo(FieldType::INT)}; + ASSERT_NOK_WITH_MSG(BucketIdCalculator::Create(/*is_pk_table=*/true, /*num_buckets=*/10, + BucketFunctionType::DEFAULT, field_infos), + "field_infos parameter is only valid for HIVE bucket function type"); + } +} } // namespace paimon::test diff --git a/src/paimon/core/bucket/hive_bucket_function.h b/src/paimon/core/bucket/hive_bucket_function.h index 2a3969af6..1c76ac618 100644 --- a/src/paimon/core/bucket/hive_bucket_function.h +++ b/src/paimon/core/bucket/hive_bucket_function.h @@ -21,21 +21,11 @@ #include #include "paimon/core/bucket/bucket_function.h" -#include "paimon/defs.h" #include "paimon/result.h" +#include "paimon/utils/bucket_function_type.h" namespace paimon { -/// Describes a field's type information needed for Hive hashing. -struct HiveFieldInfo { - FieldType type; - int32_t precision = 0; // Used for DECIMAL type - int32_t scale = 0; // Used for DECIMAL type - - explicit HiveFieldInfo(FieldType t) : type(t) {} - HiveFieldInfo(FieldType t, int32_t p, int32_t s) : type(t), precision(p), scale(s) {} -}; - /// Hive-compatible bucket function. /// This implements the same bucket assignment logic as Hive, using Hive's hash functions /// to ensure compatibility between Paimon and Hive bucketed tables. diff --git a/src/paimon/core/options/bucket_function_type.h b/src/paimon/core/options/bucket_function_type.h index 9efa4d69f..4ca5a7dc3 100644 --- a/src/paimon/core/options/bucket_function_type.h +++ b/src/paimon/core/options/bucket_function_type.h @@ -16,23 +16,5 @@ #pragma once -namespace paimon { - -/// Specifies the bucket function type for paimon bucket. -/// This determines how rows are assigned to buckets during data writing. -enum class BucketFunctionType { - /// Default bucket function using hash code. - /// Computes the hash code of the bucket key row and assigns the bucket - /// based on the absolute value of (hashCode % numBuckets). - DEFAULT = 1, - /// Mod bucket function using modulo operation on bucket key. - /// Applies Java's Math.floorMod semantics on the bucket key value (INT or BIGINT) - /// to determine the bucket assignment. - MOD = 2, - /// Hive-compatible bucket function. - /// Uses Hive's ObjectInspectorUtils hash implementation to ensure - /// consistent bucket assignment between Paimon and Hive bucketed tables. - HIVE = 3 -}; - -} // namespace paimon +// This header now delegates to the public header to avoid duplicate definitions. +#include "paimon/utils/bucket_function_type.h" From c1c565941b4dd1ef9a75d8a78cf2427bf4400946 Mon Sep 17 00:00:00 2001 From: "zhangchaoming.zcm" Date: Tue, 14 Apr 2026 10:28:32 +0800 Subject: [PATCH 09/10] fix --- include/paimon/utils/bucket_id_calculator.h | 40 ++-- .../common/utils/bucket_id_calculator.cpp | 41 +--- .../utils/bucket_id_calculator_test.cpp | 96 +++----- .../core/bucket/hive_bucket_function_test.cpp | 223 ++++++++++++++++++ 4 files changed, 279 insertions(+), 121 deletions(-) diff --git a/include/paimon/utils/bucket_id_calculator.h b/include/paimon/utils/bucket_id_calculator.h index 5aa4b95bb..29dff8c12 100644 --- a/include/paimon/utils/bucket_id_calculator.h +++ b/include/paimon/utils/bucket_id_calculator.h @@ -41,48 +41,36 @@ class PAIMON_EXPORT BucketIdCalculator { /// Create `BucketIdCalculator` with default bucket function. /// @param is_pk_table Whether this is for a primary key table. /// @param num_buckets Number of buckets. - /// @param pool Memory pool for memory allocation. If nullptr, uses default pool. + /// @param pool Memory pool for memory allocation. static Result> Create( - bool is_pk_table, int32_t num_buckets, const std::shared_ptr& pool = nullptr); + bool is_pk_table, int32_t num_buckets, const std::shared_ptr& pool); /// Create `BucketIdCalculator` with a custom bucket function. /// @param is_pk_table Whether this is for a primary key table. /// @param num_buckets Number of buckets. /// @param bucket_function The bucket function to use for bucket assignment. - /// @param pool Memory pool for memory allocation. If nullptr, uses default pool. + /// @param pool Memory pool for memory allocation. static Result> Create( bool is_pk_table, int32_t num_buckets, std::unique_ptr bucket_function, - const std::shared_ptr& pool = nullptr); + const std::shared_ptr& pool); - /// Create `BucketIdCalculator` with a bucket function type. + /// Create `BucketIdCalculator` with MOD bucket function. /// @param is_pk_table Whether this is for a primary key table. /// @param num_buckets Number of buckets. - /// @param type The bucket function type. Must be BucketFunctionType::DEFAULT. - /// @param pool Memory pool for memory allocation. If nullptr, uses default pool. - static Result> Create( - bool is_pk_table, int32_t num_buckets, BucketFunctionType type, - const std::shared_ptr& pool = nullptr); - - /// Create `BucketIdCalculator` with MOD bucket function type. - /// @param is_pk_table Whether this is for a primary key table. - /// @param num_buckets Number of buckets. - /// @param type The bucket function type. Must be BucketFunctionType::MOD. /// @param bucket_key_type The type of the single bucket key field. Must be INT or BIGINT. - /// @param pool Memory pool for memory allocation. If nullptr, uses default pool. - static Result> Create( - bool is_pk_table, int32_t num_buckets, BucketFunctionType type, FieldType bucket_key_type, - const std::shared_ptr& pool = nullptr); + /// @param pool Memory pool for memory allocation. + static Result> CreateMod( + bool is_pk_table, int32_t num_buckets, FieldType bucket_key_type, + const std::shared_ptr& pool); - /// Create `BucketIdCalculator` with HIVE bucket function type. + /// Create `BucketIdCalculator` with HIVE bucket function. /// @param is_pk_table Whether this is for a primary key table. /// @param num_buckets Number of buckets. - /// @param type The bucket function type. Must be BucketFunctionType::HIVE. /// @param field_infos The detailed type info of all fields in the bucket key row. - /// @param pool Memory pool for memory allocation. If nullptr, uses default pool. - static Result> Create( - bool is_pk_table, int32_t num_buckets, BucketFunctionType type, - const std::vector& field_infos, - const std::shared_ptr& pool = nullptr); + /// @param pool Memory pool for memory allocation. + static Result> CreateHive( + bool is_pk_table, int32_t num_buckets, const std::vector& field_infos, + const std::shared_ptr& pool); /// Calculate bucket ids for the given bucket keys. /// @param bucket_keys Arrow struct array containing the bucket key values. diff --git a/src/paimon/common/utils/bucket_id_calculator.cpp b/src/paimon/common/utils/bucket_id_calculator.cpp index d60dcc0d0..58ca938a0 100644 --- a/src/paimon/common/utils/bucket_id_calculator.cpp +++ b/src/paimon/common/utils/bucket_id_calculator.cpp @@ -240,12 +240,6 @@ static Result WriteBucketRow(int32_t col_id, } } // namespace -namespace { -std::shared_ptr GetPoolOrDefault(const std::shared_ptr& pool) { - return pool ? pool : GetDefaultPool(); -} -} // namespace - BucketIdCalculator::BucketIdCalculator(int32_t num_buckets, std::unique_ptr bucket_function, const std::shared_ptr& pool) @@ -273,42 +267,19 @@ Result> BucketIdCalculator::Create( return Status::Invalid("Append table not support PostponeBucketMode"); } return std::unique_ptr( - new BucketIdCalculator(num_buckets, std::move(bucket_function), GetPoolOrDefault(pool))); + new BucketIdCalculator(num_buckets, std::move(bucket_function), pool)); } -Result> BucketIdCalculator::Create( - bool is_pk_table, int32_t num_buckets, BucketFunctionType type, +Result> BucketIdCalculator::CreateMod( + bool is_pk_table, int32_t num_buckets, FieldType bucket_key_type, const std::shared_ptr& pool) { - switch (type) { - case BucketFunctionType::DEFAULT: - return Create(is_pk_table, num_buckets, std::make_unique(), - pool); - case BucketFunctionType::MOD: - return Status::Invalid("MOD bucket function type requires a bucket_key_type parameter"); - case BucketFunctionType::HIVE: - return Status::Invalid("HIVE bucket function type requires a field_infos parameter"); - default: - return Status::Invalid("Unknown bucket function type"); - } -} - -Result> BucketIdCalculator::Create( - bool is_pk_table, int32_t num_buckets, BucketFunctionType type, FieldType bucket_key_type, - const std::shared_ptr& pool) { - if (type != BucketFunctionType::MOD) { - return Status::Invalid( - "bucket_key_type parameter is only valid for MOD bucket function type"); - } PAIMON_ASSIGN_OR_RAISE(auto mod_func, ModBucketFunction::Create(bucket_key_type)); return Create(is_pk_table, num_buckets, std::move(mod_func), pool); } -Result> BucketIdCalculator::Create( - bool is_pk_table, int32_t num_buckets, BucketFunctionType type, - const std::vector& field_infos, const std::shared_ptr& pool) { - if (type != BucketFunctionType::HIVE) { - return Status::Invalid("field_infos parameter is only valid for HIVE bucket function type"); - } +Result> BucketIdCalculator::CreateHive( + bool is_pk_table, int32_t num_buckets, const std::vector& field_infos, + const std::shared_ptr& pool) { PAIMON_ASSIGN_OR_RAISE(auto hive_func, HiveBucketFunction::Create(field_infos)); return Create(is_pk_table, num_buckets, std::move(hive_func), pool); } diff --git a/src/paimon/common/utils/bucket_id_calculator_test.cpp b/src/paimon/common/utils/bucket_id_calculator_test.cpp index 160c95058..37813fbf2 100644 --- a/src/paimon/common/utils/bucket_id_calculator_test.cpp +++ b/src/paimon/common/utils/bucket_id_calculator_test.cpp @@ -35,7 +35,6 @@ #include "paimon/core/bucket/mod_bucket_function.h" #include "paimon/fs/local/local_file_system.h" #include "paimon/testing/utils/testharness.h" -#include "paimon/utils/bucket_function_type.h" namespace paimon::test { class BucketIdCalculatorTest : public ::testing::Test { @@ -50,8 +49,8 @@ class BucketIdCalculatorTest : public ::testing::Test { ::ArrowSchema c_bucket_schema; EXPECT_TRUE(arrow::ExportSchema(*bucket_schema, &c_bucket_schema).ok()); std::vector bucket_ids(bucket_array->length()); - EXPECT_OK_AND_ASSIGN(auto bucket_id_cal, - BucketIdCalculator::Create(is_pk_table, num_buckets)); + EXPECT_OK_AND_ASSIGN(auto bucket_id_cal, BucketIdCalculator::Create( + is_pk_table, num_buckets, GetDefaultPool())); PAIMON_RETURN_NOT_OK(bucket_id_cal->CalculateBucketIds( /*bucket_keys=*/&c_bucket_array, /*bucket_schema=*/&c_bucket_schema, /*bucket_ids=*/bucket_ids.data())); @@ -247,19 +246,21 @@ TEST_F(BucketIdCalculatorTest, TestCompatibleWithJavaWithTimestamp) { TEST_F(BucketIdCalculatorTest, TestInvalidCase) { { // test invalid bucket id - ASSERT_NOK_WITH_MSG(BucketIdCalculator::Create(/*is_pk_table=*/true, /*num_buckets=*/0), - "num buckets must be -1 or -2 or greater than 0"); + ASSERT_NOK_WITH_MSG( + BucketIdCalculator::Create(/*is_pk_table=*/true, /*num_buckets=*/0, GetDefaultPool()), + "num buckets must be -1 or -2 or greater than 0"); } { // test invalid bucket mode with pk table ASSERT_NOK_WITH_MSG( - BucketIdCalculator::Create(/*is_pk_table=*/true, /*num_buckets=*/-1), + BucketIdCalculator::Create(/*is_pk_table=*/true, /*num_buckets=*/-1, GetDefaultPool()), "DynamicBucketMode or CrossPartitionBucketMode cannot calculate bucket id"); } { // test invalid bucket mode with append table - ASSERT_NOK_WITH_MSG(BucketIdCalculator::Create(/*is_pk_table=*/false, /*num_buckets=*/-2), - "Append table not support PostponeBucketMode"); + ASSERT_NOK_WITH_MSG( + BucketIdCalculator::Create(/*is_pk_table=*/false, /*num_buckets=*/-2, GetDefaultPool()), + "Append table not support PostponeBucketMode"); } { // test invalid bucket_keys @@ -368,18 +369,20 @@ TEST_F(BucketIdCalculatorTest, TestWithDefaultBucketFunctionExplicit) { ASSERT_EQ(result_default, result_explicit); } -TEST_F(BucketIdCalculatorTest, TestCreateWithBucketFunctionTypeDefault) { +TEST_F(BucketIdCalculatorTest, TestCreateWithDefaultBucketFunction) { auto bucket_schema = arrow::schema(arrow::FieldVector({arrow::field("b0", arrow::int32())})); std::string data_str = "[[10], [-1], [50], [-13], [0]]"; - // Calculate with BucketFunctionType::DEFAULT - ASSERT_OK_AND_ASSIGN(auto calc_typed, + // Calculate with explicit DefaultBucketFunction via Create + auto default_func = std::make_unique(); + ASSERT_OK_AND_ASSIGN(auto calc_explicit, BucketIdCalculator::Create(/*is_pk_table=*/true, /*num_buckets=*/10, - BucketFunctionType::DEFAULT)); + std::move(default_func), GetDefaultPool())); - // Calculate with the original default Create (no type) - ASSERT_OK_AND_ASSIGN(auto calc_default, - BucketIdCalculator::Create(/*is_pk_table=*/true, /*num_buckets=*/10)); + // Calculate with the default Create (no BucketFunction) + ASSERT_OK_AND_ASSIGN( + auto calc_default, + BucketIdCalculator::Create(/*is_pk_table=*/true, /*num_buckets=*/10, GetDefaultPool())); auto bucket_array1 = arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(bucket_schema->fields()), data_str) @@ -388,8 +391,8 @@ TEST_F(BucketIdCalculatorTest, TestCreateWithBucketFunctionTypeDefault) { EXPECT_TRUE(arrow::ExportArray(*bucket_array1, &c_array1).ok()); ::ArrowSchema c_schema1; EXPECT_TRUE(arrow::ExportSchema(*bucket_schema, &c_schema1).ok()); - std::vector result_typed(bucket_array1->length()); - ASSERT_OK(calc_typed->CalculateBucketIds(&c_array1, &c_schema1, result_typed.data())); + std::vector result_explicit(bucket_array1->length()); + ASSERT_OK(calc_explicit->CalculateBucketIds(&c_array1, &c_schema1, result_explicit.data())); auto bucket_array2 = arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(bucket_schema->fields()), data_str) @@ -401,17 +404,17 @@ TEST_F(BucketIdCalculatorTest, TestCreateWithBucketFunctionTypeDefault) { std::vector result_default(bucket_array2->length()); ASSERT_OK(calc_default->CalculateBucketIds(&c_array2, &c_schema2, result_default.data())); - ASSERT_EQ(result_default, result_typed); + ASSERT_EQ(result_default, result_explicit); } -TEST_F(BucketIdCalculatorTest, TestCreateWithBucketFunctionTypeMod) { +TEST_F(BucketIdCalculatorTest, TestCreateWithModBucketFunction) { auto bucket_schema = arrow::schema(arrow::FieldVector({arrow::field("b0", arrow::int32())})); std::string data_str = "[[10], [-1], [50], [-13], [0]]"; - // Calculate with BucketFunctionType::MOD - ASSERT_OK_AND_ASSIGN(auto calc_typed, - BucketIdCalculator::Create(/*is_pk_table=*/true, /*num_buckets=*/10, - BucketFunctionType::MOD, FieldType::INT)); + // Calculate with CreateMod + ASSERT_OK_AND_ASSIGN(auto calc_mod, + BucketIdCalculator::CreateMod(/*is_pk_table=*/true, /*num_buckets=*/10, + FieldType::INT, GetDefaultPool())); // Calculate with explicit ModBucketFunction ASSERT_OK_AND_ASSIGN(auto mod_func, ModBucketFunction::Create(FieldType::INT)); @@ -426,25 +429,25 @@ TEST_F(BucketIdCalculatorTest, TestCreateWithBucketFunctionTypeMod) { EXPECT_TRUE(arrow::ExportArray(*bucket_array, &c_array).ok()); ::ArrowSchema c_schema; EXPECT_TRUE(arrow::ExportSchema(*bucket_schema, &c_schema).ok()); - std::vector result_typed(bucket_array->length()); - ASSERT_OK(calc_typed->CalculateBucketIds(&c_array, &c_schema, result_typed.data())); + std::vector result_mod(bucket_array->length()); + ASSERT_OK(calc_mod->CalculateBucketIds(&c_array, &c_schema, result_mod.data())); - ASSERT_EQ(result_explicit, result_typed); + ASSERT_EQ(result_explicit, result_mod); // Verify expected values (Java Math.floorMod semantics) std::vector expected = {0, 9, 0, 7, 0}; - ASSERT_EQ(expected, result_typed); + ASSERT_EQ(expected, result_mod); } -TEST_F(BucketIdCalculatorTest, TestCreateWithBucketFunctionTypeHive) { +TEST_F(BucketIdCalculatorTest, TestCreateWithHiveBucketFunction) { auto bucket_schema = arrow::schema(arrow::FieldVector({arrow::field("b0", arrow::int32())})); std::string data_str = "[[42], [0], [100]]"; std::vector field_infos = {HiveFieldInfo(FieldType::INT)}; - // Calculate with BucketFunctionType::HIVE - ASSERT_OK_AND_ASSIGN(auto calc_typed, - BucketIdCalculator::Create(/*is_pk_table=*/true, /*num_buckets=*/5, - BucketFunctionType::HIVE, field_infos)); + // Calculate with CreateHive + ASSERT_OK_AND_ASSIGN(auto calc_hive, + BucketIdCalculator::CreateHive(/*is_pk_table=*/true, /*num_buckets=*/5, + field_infos, GetDefaultPool())); auto bucket_array = arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(bucket_schema->fields()), data_str) @@ -454,7 +457,7 @@ TEST_F(BucketIdCalculatorTest, TestCreateWithBucketFunctionTypeHive) { ::ArrowSchema c_schema; EXPECT_TRUE(arrow::ExportSchema(*bucket_schema, &c_schema).ok()); std::vector result(bucket_array->length()); - ASSERT_OK(calc_typed->CalculateBucketIds(&c_array, &c_schema, result.data())); + ASSERT_OK(calc_hive->CalculateBucketIds(&c_array, &c_schema, result.data())); // Verify all bucket ids are in valid range for (auto bucket_id : result) { @@ -463,31 +466,4 @@ TEST_F(BucketIdCalculatorTest, TestCreateWithBucketFunctionTypeHive) { } } -TEST_F(BucketIdCalculatorTest, TestCreateWithBucketFunctionTypeErrors) { - { - // MOD type without bucket_key_type should fail - ASSERT_NOK_WITH_MSG(BucketIdCalculator::Create(/*is_pk_table=*/true, /*num_buckets=*/10, - BucketFunctionType::MOD, GetDefaultPool()), - "MOD bucket function type requires a bucket_key_type parameter"); - } - { - // HIVE type without field_infos should fail - ASSERT_NOK_WITH_MSG(BucketIdCalculator::Create(/*is_pk_table=*/true, /*num_buckets=*/10, - BucketFunctionType::HIVE, GetDefaultPool()), - "HIVE bucket function type requires a field_infos parameter"); - } - { - // bucket_key_type with non-MOD type should fail - ASSERT_NOK_WITH_MSG(BucketIdCalculator::Create(/*is_pk_table=*/true, /*num_buckets=*/10, - BucketFunctionType::DEFAULT, FieldType::INT), - "bucket_key_type parameter is only valid for MOD bucket function type"); - } - { - // field_infos with non-HIVE type should fail - std::vector field_infos = {HiveFieldInfo(FieldType::INT)}; - ASSERT_NOK_WITH_MSG(BucketIdCalculator::Create(/*is_pk_table=*/true, /*num_buckets=*/10, - BucketFunctionType::DEFAULT, field_infos), - "field_infos parameter is only valid for HIVE bucket function type"); - } -} } // namespace paimon::test diff --git a/src/paimon/core/bucket/hive_bucket_function_test.cpp b/src/paimon/core/bucket/hive_bucket_function_test.cpp index d711d35ed..74aa3aec5 100644 --- a/src/paimon/core/bucket/hive_bucket_function_test.cpp +++ b/src/paimon/core/bucket/hive_bucket_function_test.cpp @@ -16,6 +16,8 @@ #include "paimon/core/bucket/hive_bucket_function.h" +#include + #include "gtest/gtest.h" #include "paimon/common/data/binary_row.h" #include "paimon/common/data/binary_row_writer.h" @@ -225,4 +227,225 @@ TEST_F(HiveBucketFunctionTest, TestDifferentNumBuckets) { } } +/// Test compatibility with Java HiveBucketFunction across multiple data types. +/// Expected values are computed from the Java implementation: +/// hash = 0 (seed) +/// for each field: hash = 31 * hash + computeHash(field) +/// bucket = (hash & INT32_MAX) % numBuckets +/// +/// Java computeHash per type: +/// BOOLEAN: hashInt(value ? 1 : 0) +/// INT/DATE: hashInt(value) [identity] +/// BIGINT: hashLong(value) = (int)(value ^ (value >>> 32)) +/// FLOAT: hashInt(Float.floatToIntBits(value)), -0.0f treated as 0 +/// DOUBLE: hashLong(Double.doubleToLongBits(value)), -0.0 treated as 0L +/// STRING/BINARY: hashBytes(bytes) +/// DECIMAL: BigDecimal.hashCode() after normalization +TEST_F(HiveBucketFunctionTest, TestCompatibleWithJava) { + auto pool = GetDefaultPool(); + const int32_t num_buckets = 128; + + // Case 1: Single INT field with various values + // Java: hash = 31*0 + hashInt(v) = v + // bucket = (v & INT32_MAX) % 128 + { + std::vector field_types = {FieldType::INT}; + ASSERT_OK_AND_ASSIGN(auto func, HiveBucketFunction::Create(field_types)); + + // hashInt(0) = 0, bucket = 0 + ASSERT_EQ(0, func->Bucket(CreateIntRow(0), num_buckets)); + // hashInt(1) = 1, bucket = 1 + ASSERT_EQ(1, func->Bucket(CreateIntRow(1), num_buckets)); + // hashInt(127) = 127, bucket = 127 + ASSERT_EQ(127, func->Bucket(CreateIntRow(127), num_buckets)); + // hashInt(128) = 128, bucket = 0 + ASSERT_EQ(0, func->Bucket(CreateIntRow(128), num_buckets)); + // hashInt(-1) = -1, (-1 & INT32_MAX) = 2147483647, 2147483647 % 128 = 127 + ASSERT_EQ(127, func->Bucket(CreateIntRow(-1), num_buckets)); + // hashInt(INT32_MIN) = -2147483648, (-2147483648 & INT32_MAX) = 0, bucket = 0 + ASSERT_EQ(0, func->Bucket(CreateIntRow(std::numeric_limits::min()), num_buckets)); + // hashInt(INT32_MAX) = 2147483647, (2147483647 & INT32_MAX) = 2147483647, % 128 = 127 + ASSERT_EQ(127, + func->Bucket(CreateIntRow(std::numeric_limits::max()), num_buckets)); + } + + // Case 2: Single BOOLEAN field + // Java: hashInt(true ? 1 : 0) + { + std::vector field_types = {FieldType::BOOLEAN}; + ASSERT_OK_AND_ASSIGN(auto func, HiveBucketFunction::Create(field_types)); + + // true => hashInt(1) = 1, bucket = 1 % 128 = 1 + ASSERT_EQ(1, func->Bucket(CreateBooleanRow(true), num_buckets)); + // false => hashInt(0) = 0, bucket = 0 + ASSERT_EQ(0, func->Bucket(CreateBooleanRow(false), num_buckets)); + } + + // Case 3: Single BIGINT field + // Java: hashLong(v) = (int)(v ^ (v >>> 32)) + { + std::vector field_types = {FieldType::BIGINT}; + ASSERT_OK_AND_ASSIGN(auto func, HiveBucketFunction::Create(field_types)); + + // hashLong(0) = 0, bucket = 0 + ASSERT_EQ(0, func->Bucket(CreateLongRow(0L), num_buckets)); + // hashLong(100) = (int)(100 ^ 0) = 100, bucket = 100 % 128 = 100 + ASSERT_EQ(100, func->Bucket(CreateLongRow(100L), num_buckets)); + // hashLong(4294967296L) = (int)(4294967296 ^ 1) = 1, bucket = 1 + // 4294967296L = 0x100000000, >>> 32 = 1, xor = 0x100000001, (int) = 1 + ASSERT_EQ(1, func->Bucket(CreateLongRow(4294967296L), num_buckets)); + // hashLong(LONG_MAX) = (int)(0x7FFFFFFFFFFFFFFF ^ 0x7FFFFFFF) = (int)0x7FFFFF80000000 + // = (int)(0x7FFFFFFF80000000) => low 32 bits = 0x80000000 = -2147483648 + // Actually: 0x7FFFFFFFFFFFFFFF ^ (0x7FFFFFFFFFFFFFFF >>> 32) + // = 0x7FFFFFFFFFFFFFFF ^ 0x7FFFFFFF + // = 0x7FFFFFFF80000000 + // (int) = 0x80000000 = -2147483648 + // (-2147483648 & INT32_MAX) = 0, bucket = 0 + ASSERT_EQ(0, func->Bucket(CreateLongRow(std::numeric_limits::max()), num_buckets)); + // hashLong(-1) = (int)(-1 ^ (0xFFFFFFFFFFFFFFFF >>> 32)) + // = (int)(-1 ^ 0xFFFFFFFF) = (int)(0) = 0 + ASSERT_EQ(0, func->Bucket(CreateLongRow(-1L), num_buckets)); + } + + // Case 4: Single FLOAT field + // Java: hashInt(Float.floatToIntBits(v)), -0.0f => 0 + { + std::vector field_types = {FieldType::FLOAT}; + ASSERT_OK_AND_ASSIGN(auto func, HiveBucketFunction::Create(field_types)); + + // 0.0f => bits = 0, hashInt(0) = 0 + ASSERT_EQ(0, func->Bucket(CreateFloatRow(0.0f), num_buckets)); + // -0.0f => treated as 0, hashInt(0) = 0 + ASSERT_EQ(0, func->Bucket(CreateFloatRow(-0.0f), num_buckets)); + // 1.0f => Float.floatToIntBits(1.0f) = 0x3F800000 = 1065353216 + // 1065353216 & INT32_MAX = 1065353216, % 128 = 0 + ASSERT_EQ(0, func->Bucket(CreateFloatRow(1.0f), num_buckets)); + // -1.0f => Float.floatToIntBits(-1.0f) = 0xBF800000 = -1082130432 + // (-1082130432 & INT32_MAX) = 1065353216, % 128 = 0 + ASSERT_EQ(0, func->Bucket(CreateFloatRow(-1.0f), num_buckets)); + } + + // Case 5: Single DOUBLE field + // Java: hashLong(Double.doubleToLongBits(v)), -0.0 => 0L + { + std::vector field_types = {FieldType::DOUBLE}; + ASSERT_OK_AND_ASSIGN(auto func, HiveBucketFunction::Create(field_types)); + + // 0.0 => bits = 0L, hashLong(0) = 0 + ASSERT_EQ(0, func->Bucket(CreateDoubleRow(0.0), num_buckets)); + // -0.0 => treated as 0L, hashLong(0) = 0 + ASSERT_EQ(0, func->Bucket(CreateDoubleRow(-0.0), num_buckets)); + // 1.0 => Double.doubleToLongBits(1.0) = 0x3FF0000000000000 = 4607182418800017408 + // hashLong = (int)(4607182418800017408 ^ (4607182418800017408 >>> 32)) + // = (int)(0x3FF0000000000000 ^ 0x3FF00000) + // = (int)(0x3FF000003FF00000) + // = (int)(0x3FF00000) = 1072693248 + // 1072693248 % 128 = 0 + ASSERT_EQ(0, func->Bucket(CreateDoubleRow(1.0), num_buckets)); + } + + // Case 6: Single STRING field + // Java: hashBytes(bytes) + { + std::vector field_types = {FieldType::STRING}; + ASSERT_OK_AND_ASSIGN(auto func, HiveBucketFunction::Create(field_types)); + + // hashBytes("hello") = 99162322 (verified in TestHiveBucketFunction) + // 99162322 & INT32_MAX = 99162322, % 128 = 82 + ASSERT_EQ(82, func->Bucket(CreateStringRow("hello"), num_buckets)); + // hashBytes("") = 0, bucket = 0 + ASSERT_EQ(0, func->Bucket(CreateStringRow(""), num_buckets)); + // hashBytes("a") = 97, bucket = 97 + ASSERT_EQ(97, func->Bucket(CreateStringRow("a"), num_buckets)); + } + + // Case 7: Single DATE field (same as INT) + // Java: hashInt(daysSinceEpoch) + { + std::vector field_infos = {HiveFieldInfo(FieldType::DATE)}; + ASSERT_OK_AND_ASSIGN(auto func, HiveBucketFunction::Create(field_infos)); + + // DATE is stored as int32 days since epoch, hashed same as INT + // date = 2000 (days), hashInt(2000) = 2000, 2000 % 128 = 80 + ASSERT_EQ(80, func->Bucket(CreateIntRow(2000), num_buckets)); + } + + // Case 8: Multi-field row (INT, STRING, BINARY, DECIMAL) + // This is the same as TestHiveBucketFunction but with num_buckets=128 + // Java step-by-step (all arithmetic in int32 with overflow): + // hash = 0 + // hash = 31*0 + hashInt(7) = 7 + // hash = 31*7 + hashBytes("hello") = 217 + 99162322 = 99162539 + // hash = 31*99162539 + hashBytes({1,2,3}) = int32(-1220928587) + 1026 = -1220927561 + // hash = 31*(-1220927561) + hashDecimal(12.3400) = int32(805951273) + 38256 = 805989529 + // bucket = (805989529 & INT32_MAX) % 128 = 25 + { + std::vector field_infos = { + HiveFieldInfo(FieldType::INT), + HiveFieldInfo(FieldType::STRING), + HiveFieldInfo(FieldType::BINARY), + HiveFieldInfo(FieldType::DECIMAL, 10, 4), + }; + ASSERT_OK_AND_ASSIGN(auto func, HiveBucketFunction::Create(field_infos)); + + BinaryRow row = CreateMixedRow(7, "hello", {1, 2, 3}, 123400, 10, 4); + // Already verified: func->Bucket(row, 8) == 1 + ASSERT_EQ(1, func->Bucket(row, 8)); + // With 128 buckets: 805989529 % 128 = 25 + ASSERT_EQ(25, func->Bucket(row, num_buckets)); + } + + // Case 9: All-null row + // Java: all nulls => hash = 0, bucket = 0 + { + std::vector field_infos = { + HiveFieldInfo(FieldType::INT), + HiveFieldInfo(FieldType::STRING), + HiveFieldInfo(FieldType::BIGINT), + }; + ASSERT_OK_AND_ASSIGN(auto func, HiveBucketFunction::Create(field_infos)); + + BinaryRow row = CreateNullRow(3); + ASSERT_EQ(0, func->Bucket(row, num_buckets)); + } + + // Case 10: Multi-field row with BOOLEAN, INT, BIGINT, FLOAT, DOUBLE, STRING + // Java step-by-step: + // field 0: BOOLEAN true => hashInt(1) = 1 + // field 1: INT 42 => hashInt(42) = 42 + // field 2: BIGINT 100 => hashLong(100) = 100 + // field 3: FLOAT 0.0f => hashInt(0) = 0 + // field 4: DOUBLE 0.0 => hashLong(0) = 0 + // field 5: STRING "a" => hashBytes("a") = 97 + // + // hash = 0 + // hash = 31*0 + 1 = 1 + // hash = 31*1 + 42 = 73 + // hash = 31*73 + 100 = 2363 + // hash = 31*2363 + 0 = 73253 + // hash = 31*73253 + 0 = 2270843 + // hash = 31*2270843 + 97 = 70396230 + // bucket = (70396230 & INT32_MAX) % 128 = 70396230 % 128 = 70 + { + std::vector field_infos = { + HiveFieldInfo(FieldType::BOOLEAN), HiveFieldInfo(FieldType::INT), + HiveFieldInfo(FieldType::BIGINT), HiveFieldInfo(FieldType::FLOAT), + HiveFieldInfo(FieldType::DOUBLE), HiveFieldInfo(FieldType::STRING), + }; + ASSERT_OK_AND_ASSIGN(auto func, HiveBucketFunction::Create(field_infos)); + + BinaryRow row(6); + BinaryRowWriter writer(&row, 0, pool.get()); + writer.WriteBoolean(0, true); + writer.WriteInt(1, 42); + writer.WriteLong(2, 100L); + writer.WriteFloat(3, 0.0f); + writer.WriteDouble(4, 0.0); + writer.WriteStringView(5, std::string_view("a")); + writer.Complete(); + + ASSERT_EQ(70, func->Bucket(row, num_buckets)); + } +} + } // namespace paimon::test From 1f0c880b17fc27fce020b1640d2b5fd3c888e3d1 Mon Sep 17 00:00:00 2001 From: "zhangchaoming.zcm" Date: Wed, 15 Apr 2026 13:10:55 +0800 Subject: [PATCH 10/10] address --- src/paimon/core/core_options.cpp | 26 ++----------------- src/paimon/core/core_options.h | 2 +- src/paimon/core/core_options_test.cpp | 2 +- .../core/options/bucket_function_type.h | 20 -------------- 4 files changed, 4 insertions(+), 46 deletions(-) delete mode 100644 src/paimon/core/options/bucket_function_type.h diff --git a/src/paimon/core/core_options.cpp b/src/paimon/core/core_options.cpp index 32cc15013..ca29cd855 100644 --- a/src/paimon/core/core_options.cpp +++ b/src/paimon/core/core_options.cpp @@ -516,6 +516,8 @@ struct CoreOptions::Impl { // Parse data-evolution.enabled - whether to enable data evolution for row tracking PAIMON_RETURN_NOT_OK( parser.Parse(Options::DATA_EVOLUTION_ENABLED, &data_evolution_enabled)); + // Parse bucket-function - bucket function type, default "DEFAULT" + PAIMON_RETURN_NOT_OK(parser.ParseBucketFunctionType(&bucket_function_type)); return Status::OK(); } @@ -841,30 +843,6 @@ Result CoreOptions::FromMap( PAIMON_RETURN_NOT_OK(impl->ParseCompactionOptions(parser)); PAIMON_RETURN_NOT_OK(impl->ParseLookupOptions(parser)); - PAIMON_RETURN_NOT_OK(parser.Parse(Options::NUM_SORTED_RUNS_COMPACTION_TRIGGER, - &impl->num_sorted_runs_compaction_trigger)); - PAIMON_RETURN_NOT_OK(parser.Parse(Options::NUM_SORTED_RUNS_STOP_TRIGGER, - &impl->num_sorted_runs_stop_trigger)); - PAIMON_RETURN_NOT_OK(parser.Parse(Options::NUM_LEVELS, &impl->num_levels)); - - PAIMON_RETURN_NOT_OK(parser.ParseLookupCompactMode(&impl->lookup_compact_mode)); - PAIMON_RETURN_NOT_OK( - parser.Parse(Options::LOOKUP_COMPACT_MAX_INTERVAL, &impl->lookup_compact_max_interval)); - - // parse lookup cache - PAIMON_RETURN_NOT_OK(parser.ParseMemorySize(Options::LOOKUP_CACHE_MAX_MEMORY_SIZE, - &impl->lookup_cache_max_memory)); - PAIMON_RETURN_NOT_OK(parser.Parse(Options::LOOKUP_CACHE_HIGH_PRIO_POOL_RATIO, - &impl->lookup_cache_high_prio_pool_ratio)); - if (impl->lookup_cache_high_prio_pool_ratio < 0.0 || - impl->lookup_cache_high_prio_pool_ratio >= 1.0) { - return Status::Invalid(fmt::format( - "The high priority pool ratio should in the range [0, 1), while input is {}", - impl->lookup_cache_high_prio_pool_ratio)); - } - - // Parse bucket function type - PAIMON_RETURN_NOT_OK(parser.ParseBucketFunctionType(&impl->bucket_function_type)); return options; } diff --git a/src/paimon/core/core_options.h b/src/paimon/core/core_options.h index 8a839bbf6..3f18fb3eb 100644 --- a/src/paimon/core/core_options.h +++ b/src/paimon/core/core_options.h @@ -24,7 +24,6 @@ #include #include -#include "paimon/core/options/bucket_function_type.h" #include "paimon/core/options/changelog_producer.h" #include "paimon/core/options/compress_options.h" #include "paimon/core/options/external_path_strategy.h" @@ -37,6 +36,7 @@ #include "paimon/result.h" #include "paimon/table/source/startup_mode.h" #include "paimon/type_fwd.h" +#include "paimon/utils/bucket_function_type.h" #include "paimon/visibility.h" namespace paimon { diff --git a/src/paimon/core/core_options_test.cpp b/src/paimon/core/core_options_test.cpp index c45cd8547..5aaa2c2f1 100644 --- a/src/paimon/core/core_options_test.cpp +++ b/src/paimon/core/core_options_test.cpp @@ -21,7 +21,6 @@ #include "gtest/gtest.h" #include "paimon/common/fs/resolving_file_system.h" -#include "paimon/core/options/bucket_function_type.h" #include "paimon/core/options/expire_config.h" #include "paimon/defs.h" #include "paimon/format/file_format.h" @@ -29,6 +28,7 @@ #include "paimon/status.h" #include "paimon/testing/mock/mock_file_system.h" #include "paimon/testing/utils/testharness.h" +#include "paimon/utils/bucket_function_type.h" namespace paimon::test { TEST(CoreOptionsTest, TestDefaultValue) { diff --git a/src/paimon/core/options/bucket_function_type.h b/src/paimon/core/options/bucket_function_type.h deleted file mode 100644 index 4ca5a7dc3..000000000 --- a/src/paimon/core/options/bucket_function_type.h +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright 2026-present Alibaba Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -// This header now delegates to the public header to avoid duplicate definitions. -#include "paimon/utils/bucket_function_type.h"