From 111fe4a33f44205ed9e36f61aa980dbc04c31c41 Mon Sep 17 00:00:00 2001 From: "menglingda.mld" Date: Fri, 10 Apr 2026 15:15:52 +0800 Subject: [PATCH] feat(spill): add SpillWriter, SpillReader and SpillChannelManager for spill-to-disk support --- src/paimon/CMakeLists.txt | 5 + src/paimon/common/utils/arrow/arrow_utils.cpp | 12 + src/paimon/common/utils/arrow/arrow_utils.h | 5 + .../common/utils/arrow/arrow_utils_test.cpp | 39 +++ src/paimon/core/disk/file_channel_manager.h | 89 +++++++ src/paimon/core/disk/file_io_channel.cpp | 73 ++++++ src/paimon/core/disk/file_io_channel.h | 71 +++++ src/paimon/core/disk/io_manager.cpp | 26 +- src/paimon/core/disk/io_manager_impl.h | 81 ++++++ src/paimon/core/disk/io_manager_test.cpp | 47 +++- .../core/mergetree/spill_channel_manager.h | 59 +++++ .../mergetree/spill_channel_manager_test.cpp | 108 ++++++++ src/paimon/core/mergetree/spill_reader.cpp | 160 ++++++++++++ src/paimon/core/mergetree/spill_reader.h | 93 +++++++ .../mergetree/spill_reader_writer_test.cpp | 247 ++++++++++++++++++ src/paimon/core/mergetree/spill_writer.cpp | 116 ++++++++ src/paimon/core/mergetree/spill_writer.h | 75 ++++++ .../format/parquet/parquet_writer_builder.cpp | 11 +- 18 files changed, 1282 insertions(+), 35 deletions(-) create mode 100644 src/paimon/core/disk/file_channel_manager.h create mode 100644 src/paimon/core/disk/file_io_channel.cpp create mode 100644 src/paimon/core/disk/file_io_channel.h create mode 100644 src/paimon/core/disk/io_manager_impl.h create mode 100644 src/paimon/core/mergetree/spill_channel_manager.h create mode 100644 src/paimon/core/mergetree/spill_channel_manager_test.cpp create mode 100644 src/paimon/core/mergetree/spill_reader.cpp create mode 100644 src/paimon/core/mergetree/spill_reader.h create mode 100644 src/paimon/core/mergetree/spill_reader_writer_test.cpp create mode 100644 src/paimon/core/mergetree/spill_writer.cpp create mode 100644 src/paimon/core/mergetree/spill_writer.h diff --git a/src/paimon/CMakeLists.txt b/src/paimon/CMakeLists.txt index 08bd8eaf8..e9498f0f6 100644 --- a/src/paimon/CMakeLists.txt +++ b/src/paimon/CMakeLists.txt @@ -146,7 +146,10 @@ set(PAIMON_COMMON_SRCS common/utils/string_utils.cpp) set(PAIMON_CORE_SRCS + core/disk/file_io_channel.cpp core/disk/io_manager.cpp + core/mergetree/spill_reader.cpp + core/mergetree/spill_writer.cpp core/append/append_only_writer.cpp core/append/bucketed_append_compact_manager.cpp core/casting/binary_to_string_cast_executor.cpp @@ -605,6 +608,8 @@ if(PAIMON_BUILD_TESTS) core/mergetree/merge_tree_writer_test.cpp core/mergetree/write_buffer_test.cpp core/mergetree/sorted_run_test.cpp + core/mergetree/spill_channel_manager_test.cpp + core/mergetree/spill_reader_writer_test.cpp core/migrate/file_meta_utils_test.cpp core/operation/metrics/compaction_metrics_test.cpp core/operation/data_evolution_file_store_scan_test.cpp diff --git a/src/paimon/common/utils/arrow/arrow_utils.cpp b/src/paimon/common/utils/arrow/arrow_utils.cpp index c27b6c4b4..89cf2fdb0 100644 --- a/src/paimon/common/utils/arrow/arrow_utils.cpp +++ b/src/paimon/common/utils/arrow/arrow_utils.cpp @@ -18,7 +18,9 @@ #include "arrow/array/array_base.h" #include "arrow/array/array_nested.h" +#include "arrow/util/compression.h" #include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/common/utils/string_utils.h" namespace paimon { Result> ArrowUtils::DataTypeToSchema( @@ -161,4 +163,14 @@ Result> ArrowUtils::RemoveFieldFromStructArr return array; } +Result ArrowUtils::GetCompressionType(const std::string& compression) { + std::string normalized = StringUtils::ToLowerCase(compression); + if (normalized.empty() || normalized == "none") { + normalized = "uncompressed"; + } + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(arrow::Compression::type compression_type, + arrow::util::Codec::GetCompressionType(normalized)); + return compression_type; +} + } // namespace paimon diff --git a/src/paimon/common/utils/arrow/arrow_utils.h b/src/paimon/common/utils/arrow/arrow_utils.h index a3638c381..a1f1b56ad 100644 --- a/src/paimon/common/utils/arrow/arrow_utils.h +++ b/src/paimon/common/utils/arrow/arrow_utils.h @@ -19,6 +19,7 @@ #include #include "arrow/api.h" +#include "arrow/util/type_fwd.h" #include "fmt/format.h" #include "paimon/result.h" @@ -49,6 +50,10 @@ class PAIMON_EXPORT ArrowUtils { static bool EqualsIgnoreNullable(const std::shared_ptr& type, const std::shared_ptr& other_type); + /// Normalize and resolve a compression string to an Arrow compression type. + /// Handles "none" and empty string by mapping them to "uncompressed". + static Result GetCompressionType(const std::string& compression); + private: static Status InnerCheckNullabilityMatch(const std::shared_ptr& field, const std::shared_ptr& data); diff --git a/src/paimon/common/utils/arrow/arrow_utils_test.cpp b/src/paimon/common/utils/arrow/arrow_utils_test.cpp index e91cd71d9..0422071f2 100644 --- a/src/paimon/common/utils/arrow/arrow_utils_test.cpp +++ b/src/paimon/common/utils/arrow/arrow_utils_test.cpp @@ -445,4 +445,43 @@ TEST(ArrowUtilsTest, TestEqualsIgnoreNullable) { ASSERT_TRUE(ArrowUtils::EqualsIgnoreNullable(struct_type1, struct_type2)); } } + +TEST(ArrowUtilsTest, TestGetCompressionType) { + { + ASSERT_OK_AND_ASSIGN(auto type, ArrowUtils::GetCompressionType("")); + ASSERT_EQ(type, arrow::Compression::UNCOMPRESSED); + } + { + ASSERT_OK_AND_ASSIGN(auto type, ArrowUtils::GetCompressionType("none")); + ASSERT_EQ(type, arrow::Compression::UNCOMPRESSED); + } + { + ASSERT_OK_AND_ASSIGN(auto type, ArrowUtils::GetCompressionType("uncompressed")); + ASSERT_EQ(type, arrow::Compression::UNCOMPRESSED); + } + { + ASSERT_OK_AND_ASSIGN(auto type, ArrowUtils::GetCompressionType("zstd")); + ASSERT_EQ(type, arrow::Compression::ZSTD); + } + { + ASSERT_OK_AND_ASSIGN(auto type, ArrowUtils::GetCompressionType("ZSTD")); + ASSERT_EQ(type, arrow::Compression::ZSTD); + } + { + ASSERT_OK_AND_ASSIGN(auto type, ArrowUtils::GetCompressionType("lz4")); + ASSERT_EQ(type, arrow::Compression::LZ4_FRAME); + } + { + ASSERT_OK_AND_ASSIGN(auto type, ArrowUtils::GetCompressionType("snappy")); + ASSERT_EQ(type, arrow::Compression::SNAPPY); + } + { + ASSERT_OK_AND_ASSIGN(auto type, ArrowUtils::GetCompressionType("gzip")); + ASSERT_EQ(type, arrow::Compression::GZIP); + } + { + ASSERT_NOK(ArrowUtils::GetCompressionType("invalid_codec")); + } +} + } // namespace paimon::test diff --git a/src/paimon/core/disk/file_channel_manager.h b/src/paimon/core/disk/file_channel_manager.h new file mode 100644 index 000000000..8b90fb0dd --- /dev/null +++ b/src/paimon/core/disk/file_channel_manager.h @@ -0,0 +1,89 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "paimon/common/utils/path_util.h" +#include "paimon/common/utils/uuid.h" +#include "paimon/core/disk/file_io_channel.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/result.h" + +namespace paimon { +class FileChannelManager { + public: + static Result> Create(const std::string& tmp_dir, + const std::string& prefix) { + std::string uuid; + if (!UUID::Generate(&uuid)) { + return Status::Invalid("Failed to generate UUID for FileChannelManager."); + } + std::string spill_dir = PathUtil::JoinPath(tmp_dir, "paimon-" + prefix + "-" + uuid); + + auto fs = std::make_shared(); + PAIMON_RETURN_NOT_OK(fs->Mkdirs(spill_dir)); + + std::random_device rd; + std::mt19937 random(rd()); + + return std::unique_ptr( + new FileChannelManager(spill_dir, std::move(random), fs)); + } + + ~FileChannelManager() { + if (!spill_dir_.empty() && fs_ != nullptr) { + [[maybe_unused]] auto status = fs_->Delete(spill_dir_, /*recursive=*/true); + } + } + + FileChannelManager(const FileChannelManager&) = delete; + FileChannelManager& operator=(const FileChannelManager&) = delete; + + FileIOChannel::ID CreateChannel() { + std::lock_guard lock(mutex_); + return FileIOChannel::ID(spill_dir_, &random_); + } + + FileIOChannel::ID CreateChannel(const std::string& prefix) { + std::lock_guard lock(mutex_); + return FileIOChannel::ID(spill_dir_, prefix, &random_); + } + + std::shared_ptr CreateChannelEnumerator() { + std::lock_guard lock(mutex_); + return std::make_shared(spill_dir_, &random_); + } + + const std::string& GetSpillDir() const { + return spill_dir_; + } + + private: + FileChannelManager(const std::string& spill_dir, std::mt19937&& random, + const std::shared_ptr& fs) + : spill_dir_(spill_dir), random_(std::move(random)), fs_(fs) {} + std::string spill_dir_; + std::mutex mutex_; + std::mt19937 random_; + std::shared_ptr fs_; +}; + +} // namespace paimon diff --git a/src/paimon/core/disk/file_io_channel.cpp b/src/paimon/core/disk/file_io_channel.cpp new file mode 100644 index 000000000..442e6c0c5 --- /dev/null +++ b/src/paimon/core/disk/file_io_channel.cpp @@ -0,0 +1,73 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/disk/file_io_channel.h" + +#include +#include +#include + +#include "paimon/common/utils/path_util.h" + +namespace paimon { +std::string FileIOChannel::GenerateRandomHexString(std::mt19937* random) { + std::uniform_int_distribution dist(0, 255); + std::ostringstream hex_stream; + hex_stream << std::hex << std::setfill('0'); + for (int32_t i = 0; i < kRandomBytesLength; ++i) { + hex_stream << std::setw(2) << dist(*random); + } + return hex_stream.str(); +} + +FileIOChannel::ID::ID(const std::string& path) : path_(path) {} + +FileIOChannel::ID::ID(const std::string& base_path, std::mt19937* random) + : path_(PathUtil::JoinPath(base_path, GenerateRandomHexString(random) + ".channel")) {} + +FileIOChannel::ID::ID(const std::string& base_path, const std::string& prefix, std::mt19937* random) + : path_(PathUtil::JoinPath(base_path, + prefix + "-" + GenerateRandomHexString(random) + ".channel")) {} + +const std::string& FileIOChannel::ID::GetPath() const { + return path_; +} + +bool FileIOChannel::ID::operator==(const ID& other) const { + return path_ == other.path_; +} + +bool FileIOChannel::ID::operator!=(const ID& other) const { + return !(*this == other); +} + +size_t FileIOChannel::ID::Hash::operator()(const ID& id) const { + return std::hash{}(id.path_); +} + +FileIOChannel::Enumerator::Enumerator(const std::string& base_path, std::mt19937* random) + : path_(base_path), name_prefix_(GenerateRandomHexString(random)) {} + +FileIOChannel::ID FileIOChannel::Enumerator::Next() { + std::ostringstream filename; + filename << name_prefix_ << "." << std::setfill('0') << std::setw(6) << (local_counter_++) + << ".channel"; + + std::string full_path = PathUtil::JoinPath(path_, filename.str()); + return ID(full_path); +} + +} // namespace paimon diff --git a/src/paimon/core/disk/file_io_channel.h b/src/paimon/core/disk/file_io_channel.h new file mode 100644 index 000000000..165add20b --- /dev/null +++ b/src/paimon/core/disk/file_io_channel.h @@ -0,0 +1,71 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "paimon/visibility.h" + +namespace paimon { +class PAIMON_EXPORT FileIOChannel { + public: + class PAIMON_EXPORT ID { + public: + ID() = default; + + explicit ID(const std::string& path); + + ID(const std::string& base_path, std::mt19937* random); + + ID(const std::string& base_path, const std::string& prefix, std::mt19937* random); + + const std::string& GetPath() const; + + bool operator==(const ID& other) const; + + bool operator!=(const ID& other) const; + + struct Hash { + size_t operator()(const ID& id) const; + }; + + private: + std::string path_; + }; + + private: + static constexpr int32_t kRandomBytesLength = 16; + static std::string GenerateRandomHexString(std::mt19937* random); + + public: + class PAIMON_EXPORT Enumerator { + public: + Enumerator(const std::string& base_path, std::mt19937* random); + + ID Next(); + + private: + std::string path_; + std::string name_prefix_; + uint64_t local_counter_{0}; + }; +}; + +} // namespace paimon diff --git a/src/paimon/core/disk/io_manager.cpp b/src/paimon/core/disk/io_manager.cpp index cc1586f3f..1ae6a82bf 100644 --- a/src/paimon/core/disk/io_manager.cpp +++ b/src/paimon/core/disk/io_manager.cpp @@ -13,33 +13,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include "paimon/disk/io_manager.h" - -#include "paimon/common/utils/path_util.h" -#include "paimon/common/utils/uuid.h" +#include "paimon/core/disk/io_manager_impl.h" namespace paimon { -class IOManagerImpl : public IOManager { - public: - explicit IOManagerImpl(const std::string& tmp_dir) : tmp_dir_(tmp_dir) {} - - const std::string& GetTempDir() const override { - return tmp_dir_; - } - - Result GenerateTempFilePath(const std::string& prefix) const override { - std::string uuid; - if (!UUID::Generate(&uuid)) { - return Status::Invalid("generate uuid for io manager tmp path failed."); - } - return PathUtil::JoinPath(tmp_dir_, prefix + "-" + uuid + std::string(kSuffix)); - } - - private: - static constexpr char kSuffix[] = ".channel"; - std::string tmp_dir_; -}; - std::unique_ptr IOManager::Create(const std::string& tmp_dir) { return std::make_unique(tmp_dir); } diff --git a/src/paimon/core/disk/io_manager_impl.h b/src/paimon/core/disk/io_manager_impl.h new file mode 100644 index 000000000..250e1af96 --- /dev/null +++ b/src/paimon/core/disk/io_manager_impl.h @@ -0,0 +1,81 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "paimon/common/utils/path_util.h" +#include "paimon/common/utils/uuid.h" +#include "paimon/core/disk/file_channel_manager.h" +#include "paimon/disk/io_manager.h" + +namespace paimon { +class IOManagerImpl : public IOManager { + public: + explicit IOManagerImpl(const std::string& tmp_dir) : tmp_dir_(tmp_dir) {} + + const std::string& GetTempDir() const override { + return tmp_dir_; + } + + Result GenerateTempFilePath(const std::string& prefix) const override { + std::string uuid; + if (!UUID::Generate(&uuid)) { + return Status::Invalid("generate uuid for io manager tmp path failed."); + } + return PathUtil::JoinPath(tmp_dir_, prefix + "-" + uuid + std::string(kSuffix)); + } + + Result CreateChannel() { + PAIMON_ASSIGN_OR_RAISE(auto* manager, GetFileChannelManager()); + return manager->CreateChannel(); + } + + Result CreateChannel(const std::string& prefix) { + PAIMON_ASSIGN_OR_RAISE(auto* manager, GetFileChannelManager()); + return manager->CreateChannel(prefix); + } + + Result> CreateChannelEnumerator() { + PAIMON_ASSIGN_OR_RAISE(auto* manager, GetFileChannelManager()); + return manager->CreateChannelEnumerator(); + } + + Result GetSpillDir() { + PAIMON_ASSIGN_OR_RAISE(auto* manager, GetFileChannelManager()); + return manager->GetSpillDir(); + } + + private: + Result GetFileChannelManager() { + std::lock_guard lock(mutex_); + if (file_channel_manager_ == nullptr) { + PAIMON_ASSIGN_OR_RAISE(file_channel_manager_, + FileChannelManager::Create(tmp_dir_, kDirNamePrefix)); + } + return file_channel_manager_.get(); + } + + static constexpr char kSuffix[] = ".channel"; + static constexpr char kDirNamePrefix[] = "io"; + std::string tmp_dir_; + std::mutex mutex_; + std::unique_ptr file_channel_manager_; +}; + +} // namespace paimon diff --git a/src/paimon/core/disk/io_manager_test.cpp b/src/paimon/core/disk/io_manager_test.cpp index 9c55ffb2f..68ff0befd 100644 --- a/src/paimon/core/disk/io_manager_test.cpp +++ b/src/paimon/core/disk/io_manager_test.cpp @@ -14,14 +14,13 @@ * limitations under the License. */ -#include "paimon/disk/io_manager.h" - #include #include #include "gtest/gtest.h" #include "paimon/common/utils/path_util.h" #include "paimon/common/utils/string_utils.h" +#include "paimon/core/disk/io_manager_impl.h" #include "paimon/testing/utils/testharness.h" namespace paimon::test { @@ -63,4 +62,48 @@ TEST(IOManagerTest, GenerateTempFilePathShouldBeDifferentAcrossCalls) { ASSERT_NE(path1, path2); } +TEST(IOManagerTest, CreateChannelShouldReturnValidAndUniquePaths) { + auto tmp_dir = UniqueTestDirectory::Create(); + std::shared_ptr manager = IOManager::Create(tmp_dir->Str()); + auto manager_impl = std::dynamic_pointer_cast(manager); + const std::string prefix = "spill"; + + ASSERT_OK_AND_ASSIGN(auto channel1, manager_impl->CreateChannel()); + ASSERT_TRUE(StringUtils::StartsWith(channel1.GetPath(), tmp_dir->Str() + "/paimon-io-")); + ASSERT_TRUE(StringUtils::EndsWith(channel1.GetPath(), ".channel")); + ASSERT_EQ(PathUtil::GetName(channel1.GetPath()).size(), 32 + std::string(".channel").size()); + + ASSERT_OK_AND_ASSIGN(auto channel2, manager_impl->CreateChannel(prefix)); + ASSERT_TRUE(StringUtils::StartsWith(PathUtil::GetName(channel2.GetPath()), prefix + "-")); +} + +TEST(IOManagerTest, CreateChannelEnumeratorShouldReturnSequentialAndUniquePaths) { + auto tmp_dir = UniqueTestDirectory::Create(); + std::shared_ptr manager = IOManager::Create(tmp_dir->Str()); + auto manager_impl = std::dynamic_pointer_cast(manager); + + ASSERT_OK_AND_ASSIGN(auto enumerator, manager_impl->CreateChannelEnumerator()); + + for (int i = 0; i < 10; ++i) { + auto channel_id = enumerator->Next(); + ASSERT_TRUE(StringUtils::StartsWith(channel_id.GetPath(), tmp_dir->Str() + "/paimon-io-")); + std::string counter_str = std::to_string(i); + std::string padded_counter = std::string(6 - counter_str.size(), '0') + counter_str; + ASSERT_TRUE(StringUtils::EndsWith(channel_id.GetPath(), "." + padded_counter + ".channel")); + } +} + +TEST(IOManagerTest, GetSpillDirShouldReturnPaimonIoSubdirectory) { + auto tmp_dir = UniqueTestDirectory::Create(); + std::shared_ptr manager = IOManager::Create(tmp_dir->Str()); + auto manager_impl = std::dynamic_pointer_cast(manager); + + ASSERT_OK_AND_ASSIGN(std::string spill_dir, manager_impl->GetSpillDir()); + ASSERT_TRUE(StringUtils::StartsWith(spill_dir, tmp_dir->Str() + "/paimon-io-")); + ASSERT_FALSE(StringUtils::EndsWith(spill_dir, "/")); + + ASSERT_OK_AND_ASSIGN(auto channel, manager_impl->CreateChannel()); + ASSERT_TRUE(StringUtils::StartsWith(channel.GetPath(), spill_dir + "/")); +} + } // namespace paimon::test diff --git a/src/paimon/core/mergetree/spill_channel_manager.h b/src/paimon/core/mergetree/spill_channel_manager.h new file mode 100644 index 000000000..254a35053 --- /dev/null +++ b/src/paimon/core/mergetree/spill_channel_manager.h @@ -0,0 +1,59 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "paimon/core/disk/file_io_channel.h" +#include "paimon/fs/file_system.h" +#include "paimon/status.h" + +namespace paimon { + +class SpillChannelManager { + public: + SpillChannelManager(const std::shared_ptr& fs, size_t initial_capacity) : fs_(fs) { + channels_.reserve(initial_capacity); + } + + void AddChannel(const FileIOChannel::ID& channel_id) { + channels_.emplace(channel_id); + } + + Status DeleteChannel(const FileIOChannel::ID& channel_id) { + PAIMON_RETURN_NOT_OK(fs_->Delete(channel_id.GetPath())); + channels_.erase(channel_id); + return Status::OK(); + } + + void Reset() { + for (const auto& channel : channels_) { + [[maybe_unused]] auto status = fs_->Delete(channel.GetPath()); + } + channels_.clear(); + } + + const std::unordered_set& GetChannels() const { + return channels_; + } + + private: + std::unordered_set channels_; + std::shared_ptr fs_; +}; + +} // namespace paimon diff --git a/src/paimon/core/mergetree/spill_channel_manager_test.cpp b/src/paimon/core/mergetree/spill_channel_manager_test.cpp new file mode 100644 index 000000000..2ca50b98d --- /dev/null +++ b/src/paimon/core/mergetree/spill_channel_manager_test.cpp @@ -0,0 +1,108 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/mergetree/spill_channel_manager.h" + +#include + +#include "gtest/gtest.h" +#include "paimon/core/disk/io_manager_impl.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +class SpillChannelManagerTest : public ::testing::Test { + public: + void SetUp() override { + test_dir_ = UniqueTestDirectory::Create(); + file_system_ = test_dir_->GetFileSystem(); + io_manager_ = std::make_unique(test_dir_->Str()); + } + + FileIOChannel::ID CreateTempFile() { + EXPECT_OK_AND_ASSIGN(auto channel_id, io_manager_->CreateChannel()); + // Create the actual file on disk + EXPECT_OK_AND_ASSIGN(auto out, + file_system_->Create(channel_id.GetPath(), /*overwrite=*/false)); + EXPECT_OK(out->Close()); + return channel_id; + } + + protected: + std::shared_ptr file_system_; + std::unique_ptr test_dir_; + std::unique_ptr io_manager_; +}; + +TEST_F(SpillChannelManagerTest, AddAndGetChannels) { + SpillChannelManager manager(file_system_, 128); + + auto channel1 = CreateTempFile(); + auto channel2 = CreateTempFile(); + + manager.AddChannel(channel1); + manager.AddChannel(channel2); + + const auto& channels = manager.GetChannels(); + ASSERT_EQ(channels.size(), 2); + ASSERT_GT(channels.count(channel1), 0); + ASSERT_GT(channels.count(channel2), 0); +} + +TEST_F(SpillChannelManagerTest, DeleteChannelRemovesFileAndEntry) { + SpillChannelManager manager(file_system_, 128); + + auto channel = CreateTempFile(); + manager.AddChannel(channel); + + ASSERT_OK_AND_ASSIGN(bool exists_before, file_system_->Exists(channel.GetPath())); + ASSERT_TRUE(exists_before); + + ASSERT_OK(manager.DeleteChannel(channel)); + ASSERT_EQ(manager.GetChannels().size(), 0); + ASSERT_OK_AND_ASSIGN(bool exists_after, file_system_->Exists(channel.GetPath())); + ASSERT_FALSE(exists_after); +} + +TEST_F(SpillChannelManagerTest, ResetDeletesAllFiles) { + SpillChannelManager manager(file_system_, 128); + + auto channel1 = CreateTempFile(); + auto channel2 = CreateTempFile(); + auto channel3 = CreateTempFile(); + + manager.AddChannel(channel1); + manager.AddChannel(channel2); + manager.AddChannel(channel3); + + ASSERT_OK_AND_ASSIGN(bool e1, file_system_->Exists(channel1.GetPath())); + ASSERT_OK_AND_ASSIGN(bool e2, file_system_->Exists(channel2.GetPath())); + ASSERT_OK_AND_ASSIGN(bool e3, file_system_->Exists(channel3.GetPath())); + ASSERT_TRUE(e1); + ASSERT_TRUE(e2); + ASSERT_TRUE(e3); + + manager.Reset(); + + ASSERT_OK_AND_ASSIGN(bool a1, file_system_->Exists(channel1.GetPath())); + ASSERT_OK_AND_ASSIGN(bool a2, file_system_->Exists(channel2.GetPath())); + ASSERT_OK_AND_ASSIGN(bool a3, file_system_->Exists(channel3.GetPath())); + ASSERT_FALSE(a1); + ASSERT_FALSE(a2); + ASSERT_FALSE(a3); +} + +} // namespace paimon::test diff --git a/src/paimon/core/mergetree/spill_reader.cpp b/src/paimon/core/mergetree/spill_reader.cpp new file mode 100644 index 000000000..fbab7ecbd --- /dev/null +++ b/src/paimon/core/mergetree/spill_reader.cpp @@ -0,0 +1,160 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/mergetree/spill_reader.h" + +#include "paimon/common/data/columnar/columnar_row_ref.h" +#include "paimon/common/metrics/metrics_impl.h" +#include "paimon/common/table/special_fields.h" +#include "paimon/common/types/row_kind.h" +#include "paimon/common/utils/arrow/arrow_input_stream_adapter.h" +#include "paimon/common/utils/arrow/mem_utils.h" +#include "paimon/common/utils/arrow/status_utils.h" + +namespace paimon { + +SpillReader::SpillReader(const std::shared_ptr& fs, + const std::shared_ptr& key_schema, + const std::shared_ptr& value_schema, + const std::shared_ptr& pool) + : fs_(fs), + key_schema_(key_schema), + value_schema_(value_schema), + pool_(pool), + arrow_pool_(GetArrowPool(pool)), + metrics_(std::make_shared()) {} + +Result> SpillReader::Create( + const std::shared_ptr& fs, const std::shared_ptr& key_schema, + const std::shared_ptr& value_schema, const std::shared_ptr& pool, + const FileIOChannel::ID& channel_id) { + std::unique_ptr reader(new SpillReader(fs, key_schema, value_schema, pool)); + PAIMON_RETURN_NOT_OK(reader->Open(channel_id)); + return reader; +} + +Status SpillReader::Open(const FileIOChannel::ID& channel_id) { + const std::string& file_path = channel_id.GetPath(); + PAIMON_ASSIGN_OR_RAISE(in_stream_, fs_->Open(file_path)); + PAIMON_ASSIGN_OR_RAISE(auto file_status, fs_->GetFileStatus(file_path)); + uint64_t file_len = file_status->GetLen(); + arrow_input_stream_adapter_ = + std::make_shared(in_stream_, arrow_pool_, file_len); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( + arrow_reader_, arrow::ipc::RecordBatchFileReader::Open(arrow_input_stream_adapter_)); + num_record_batches_ = arrow_reader_->num_record_batches(); + current_batch_index_ = 0; + return Status::OK(); +} + +SpillReader::Iterator::Iterator(SpillReader* reader) : reader_(reader) {} + +bool SpillReader::Iterator::HasNext() const { + return cursor_ < reader_->batch_length_; +} + +Result SpillReader::Iterator::Next() { + PAIMON_ASSIGN_OR_RAISE(const RowKind* row_kind, + RowKind::FromByteValue(reader_->row_kind_array_->Value(cursor_))); + int64_t sequence_number = reader_->sequence_number_array_->Value(cursor_); + auto key = std::make_unique(reader_->key_ctx_, cursor_); + auto value = std::make_unique(reader_->value_ctx_, cursor_); + cursor_++; + return KeyValue(row_kind, sequence_number, /*level=*/0, std::move(key), std::move(value)); +} + +Result> SpillReader::NextBatch() { + Reset(); + if (current_batch_index_ >= num_record_batches_) { + return std::unique_ptr(); + } + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr record_batch, + arrow_reader_->ReadRecordBatch(current_batch_index_)); + current_batch_index_++; + + batch_length_ = record_batch->num_rows(); + + auto sequence_number_col = + record_batch->GetColumnByName(SpecialFields::SequenceNumber().Name()); + if (!sequence_number_col) { + return Status::Invalid("cannot find _SEQUENCE_NUMBER column in spill file"); + } + sequence_number_array_ = + std::dynamic_pointer_cast>(sequence_number_col); + if (!sequence_number_array_) { + return Status::Invalid("cannot cast _SEQUENCE_NUMBER column to int64 arrow array"); + } + + auto value_kind_col = record_batch->GetColumnByName(SpecialFields::ValueKind().Name()); + if (!value_kind_col) { + return Status::Invalid("cannot find _VALUE_KIND column in spill file"); + } + row_kind_array_ = + std::dynamic_pointer_cast>(value_kind_col); + if (!row_kind_array_) { + return Status::Invalid("cannot cast _VALUE_KIND column to int8 arrow array"); + } + + arrow::ArrayVector key_fields; + key_fields.reserve(key_schema_->num_fields()); + for (const auto& key_field : key_schema_->fields()) { + auto col = record_batch->GetColumnByName(key_field->name()); + if (!col) { + return Status::Invalid("cannot find key field " + key_field->name() + " in spill file"); + } + key_fields.emplace_back(col); + } + + arrow::ArrayVector value_fields; + value_fields.reserve(value_schema_->num_fields()); + for (const auto& value_field : value_schema_->fields()) { + auto col = record_batch->GetColumnByName(value_field->name()); + if (!col) { + return Status::Invalid("cannot find value field " + value_field->name() + + " in spill file"); + } + value_fields.emplace_back(col); + } + + key_ctx_ = std::make_shared(key_fields, pool_); + value_ctx_ = std::make_shared(value_fields, pool_); + + return std::make_unique(this); +} + +std::shared_ptr SpillReader::GetReaderMetrics() const { + return metrics_; +} + +void SpillReader::Close() { + Reset(); + arrow_reader_.reset(); + arrow_input_stream_adapter_.reset(); + if (in_stream_) { + [[maybe_unused]] auto status = in_stream_->Close(); + in_stream_.reset(); + } +} + +void SpillReader::Reset() { + key_ctx_.reset(); + value_ctx_.reset(); + sequence_number_array_.reset(); + row_kind_array_.reset(); + batch_length_ = 0; +} + +} // namespace paimon diff --git a/src/paimon/core/mergetree/spill_reader.h b/src/paimon/core/mergetree/spill_reader.h new file mode 100644 index 000000000..113312ed6 --- /dev/null +++ b/src/paimon/core/mergetree/spill_reader.h @@ -0,0 +1,93 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "arrow/array/array_primitive.h" +#include "arrow/ipc/api.h" +#include "paimon/common/data/columnar/columnar_batch_context.h" +#include "paimon/core/disk/file_io_channel.h" +#include "paimon/core/io/key_value_record_reader.h" +#include "paimon/core/key_value.h" +#include "paimon/fs/file_system.h" +#include "paimon/memory/memory_pool.h" + +namespace arrow { +class MemoryPool; +} // namespace arrow + +namespace paimon { + +class ArrowInputStreamAdapter; +class Metrics; + +class SpillReader : public KeyValueRecordReader { + public: + static Result> Create( + const std::shared_ptr& fs, const std::shared_ptr& key_schema, + const std::shared_ptr& value_schema, const std::shared_ptr& pool, + const FileIOChannel::ID& channel_id); + + SpillReader(const SpillReader&) = delete; + SpillReader& operator=(const SpillReader&) = delete; + + class Iterator : public KeyValueRecordReader::Iterator { + public: + explicit Iterator(SpillReader* reader); + bool HasNext() const override; + Result Next() override; + + private: + int64_t cursor_ = 0; + SpillReader* reader_ = nullptr; + }; + + Result> NextBatch() override; + std::shared_ptr GetReaderMetrics() const override; + void Close() override; + + private: + SpillReader(const std::shared_ptr& fs, + const std::shared_ptr& key_schema, + const std::shared_ptr& value_schema, + const std::shared_ptr& pool); + + Status Open(const FileIOChannel::ID& channel_id); + void Reset(); + + std::shared_ptr fs_; + std::shared_ptr key_schema_; + std::shared_ptr value_schema_; + std::shared_ptr pool_; + std::shared_ptr arrow_pool_; + std::shared_ptr metrics_; + + std::shared_ptr in_stream_; + std::shared_ptr arrow_input_stream_adapter_; + std::shared_ptr arrow_reader_; + int32_t current_batch_index_ = 0; + int32_t num_record_batches_ = 0; + + int64_t batch_length_ = 0; + std::shared_ptr> sequence_number_array_; + std::shared_ptr> row_kind_array_; + std::shared_ptr key_ctx_; + std::shared_ptr value_ctx_; +}; + +} // namespace paimon diff --git a/src/paimon/core/mergetree/spill_reader_writer_test.cpp b/src/paimon/core/mergetree/spill_reader_writer_test.cpp new file mode 100644 index 000000000..7c48d696e --- /dev/null +++ b/src/paimon/core/mergetree/spill_reader_writer_test.cpp @@ -0,0 +1,247 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include + +#include "arrow/api.h" +#include "arrow/ipc/json_simple.h" +#include "gtest/gtest.h" +#include "paimon/common/table/special_fields.h" +#include "paimon/common/types/data_field.h" +#include "paimon/core/disk/io_manager_impl.h" +#include "paimon/core/mergetree/spill_channel_manager.h" +#include "paimon/core/mergetree/spill_reader.h" +#include "paimon/core/mergetree/spill_writer.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +class SpillReaderWriterTest : public ::testing::TestWithParam { + public: + void SetUp() override { + pool_ = GetDefaultPool(); + test_dir_ = UniqueTestDirectory::Create(); + file_system_ = test_dir_->GetFileSystem(); + + io_manager_ = std::make_unique(test_dir_->Str()); + ASSERT_OK_AND_ASSIGN(channel_enumerator_, io_manager_->CreateChannelEnumerator()); + spill_channel_manager_ = std::make_shared(file_system_, 128); + + // Build write schema: [_SEQUENCE_NUMBER, _VALUE_KIND, key fields..., value fields...] + value_fields_ = {DataField(0, arrow::field("f0", arrow::utf8())), + DataField(1, arrow::field("f1", arrow::int32()))}; + key_fields_ = {DataField(0, arrow::field("f0", arrow::utf8()))}; + + key_schema_ = DataField::ConvertDataFieldsToArrowSchema(key_fields_); + value_schema_ = DataField::ConvertDataFieldsToArrowSchema(value_fields_); + write_schema_ = SpecialFields::CompleteSequenceAndValueKindField(value_schema_); + write_type_ = arrow::struct_(write_schema_->fields()); + } + + std::shared_ptr CreateRecordBatch(const std::string& json_data, + int64_t num_rows) const { + auto array = arrow::ipc::internal::json::ArrayFromJSON(write_type_, json_data).ValueOrDie(); + auto struct_array = std::dynamic_pointer_cast(array); + return arrow::RecordBatch::Make(write_schema_, num_rows, struct_array->fields()); + } + + Result> CreateSpillWriter() const { + return SpillWriter::Create(file_system_, write_schema_, channel_enumerator_, + spill_channel_manager_, GetParam(), /*compression_level=*/1); + } + + FileIOChannel::ID WriteSpillFile( + const std::vector>& batches) { + EXPECT_OK_AND_ASSIGN(auto writer, CreateSpillWriter()); + for (const auto& batch : batches) { + EXPECT_OK(writer->WriteBatch(batch)); + } + EXPECT_OK(writer->Close()); + return writer->GetChannelId(); + } + + Result> CreateSpillReader( + const FileIOChannel::ID& channel_id) const { + return SpillReader::Create(file_system_, key_schema_, value_schema_, pool_, channel_id); + } + + protected: + std::shared_ptr pool_; + std::shared_ptr file_system_; + std::unique_ptr test_dir_; + std::unique_ptr io_manager_; + std::shared_ptr channel_enumerator_; + std::shared_ptr spill_channel_manager_; + + std::vector value_fields_; + std::vector key_fields_; + std::shared_ptr write_schema_; + std::shared_ptr write_type_; + std::shared_ptr key_schema_; + std::shared_ptr value_schema_; +}; + +TEST_P(SpillReaderWriterTest, TestWriteBatch) { + FileIOChannel::ID channel_id_1; + FileIOChannel::ID channel_id_2; + + // First writer + { + ASSERT_OK_AND_ASSIGN(auto writer, CreateSpillWriter()); + + auto batch = CreateRecordBatch(R"([ + [0, 1, "Alice", 10], + [1, 1, "Bob", 20] + ])", + 2); + ASSERT_OK(writer->WriteBatch(batch)); + ASSERT_OK_AND_ASSIGN(int64_t file_size, writer->GetFileSize()); + ASSERT_GT(file_size, 0); + ASSERT_OK(writer->Close()); + channel_id_1 = writer->GetChannelId(); + } + // Second writer + { + ASSERT_OK_AND_ASSIGN(auto writer, CreateSpillWriter()); + + auto batch_a = CreateRecordBatch(R"([ + [2, 1, "Carol", 30], + [3, 1, "Dave", 40] + ])", + 2); + auto batch_b = CreateRecordBatch(R"([ + [4, 1, "Eve", 50], + [5, 1, "Frank", 60], + [6, 1, "Grace", 70] + ])", + 3); + ASSERT_OK(writer->WriteBatch(batch_a)); + ASSERT_OK_AND_ASSIGN(int64_t size_before, writer->GetFileSize()); + ASSERT_OK(writer->WriteBatch(batch_b)); + ASSERT_OK_AND_ASSIGN(int64_t size_after, writer->GetFileSize()); + ASSERT_GT(size_after, size_before); + ASSERT_OK(writer->Close()); + channel_id_2 = writer->GetChannelId(); + } + // Read back first writer's data + { + ASSERT_OK_AND_ASSIGN(auto reader, CreateSpillReader(channel_id_1)); + + std::vector expected_keys = {"Alice", "Bob"}; + int total_rows = 0; + int batch_count = 0; + while (true) { + ASSERT_OK_AND_ASSIGN(auto iter, reader->NextBatch()); + if (iter == nullptr) { + break; + } + batch_count++; + while (iter->HasNext()) { + ASSERT_OK_AND_ASSIGN(auto kv, iter->Next()); + ASSERT_EQ(kv.key->GetStringView(0), expected_keys[total_rows]); + total_rows++; + } + } + ASSERT_EQ(batch_count, 1); + ASSERT_EQ(total_rows, 2); + reader->Close(); + } + // Read back second writer's data + { + ASSERT_OK_AND_ASSIGN(auto reader, CreateSpillReader(channel_id_2)); + + std::vector expected_keys = {"Carol", "Dave", "Eve", "Frank", "Grace"}; + int total_rows = 0; + int batch_count = 0; + while (true) { + ASSERT_OK_AND_ASSIGN(auto iter, reader->NextBatch()); + if (iter == nullptr) { + break; + } + batch_count++; + while (iter->HasNext()) { + ASSERT_OK_AND_ASSIGN(auto kv, iter->Next()); + ASSERT_EQ(kv.key->GetStringView(0), expected_keys[total_rows]); + total_rows++; + } + } + ASSERT_EQ(batch_count, 2); + ASSERT_EQ(total_rows, 5); + reader->Close(); + } +} + +TEST_P(SpillReaderWriterTest, TestReadBatch) { + { + auto batch1 = CreateRecordBatch(R"([[0, 1, "Alice", 10], [1, 1, "Bob", 20]])", 2); + auto batch2 = + CreateRecordBatch(R"([[2, 1, "Carol", 30], [3, 2, "Dave", 40], [4, 3, "Eve", 50]])", 3); + + auto channel_id = WriteSpillFile({batch1, batch2}); + ASSERT_OK_AND_ASSIGN(auto reader, CreateSpillReader(channel_id)); + + std::vector expected_keys = {"Alice", "Bob", "Carol", "Dave", "Eve"}; + std::vector expected_values = {10, 20, 30, 40, 50}; + std::vector expected_seqs = {0, 1, 2, 3, 4}; + std::vector expected_kinds = {1, 1, 1, 2, 3}; + + int total_rows = 0; + int batch_count = 0; + while (true) { + ASSERT_OK_AND_ASSIGN(auto iter, reader->NextBatch()); + if (iter == nullptr) break; + batch_count++; + while (iter->HasNext()) { + ASSERT_OK_AND_ASSIGN(auto kv, iter->Next()); + ASSERT_EQ(kv.key->GetStringView(0), expected_keys[total_rows]); + ASSERT_EQ(kv.value->GetStringView(0), expected_keys[total_rows]); + ASSERT_EQ(kv.value->GetInt(1), expected_values[total_rows]); + ASSERT_EQ(kv.sequence_number, expected_seqs[total_rows]); + ASSERT_EQ(kv.value_kind->ToByteValue(), expected_kinds[total_rows]); + total_rows++; + } + } + ASSERT_EQ(batch_count, 2); + ASSERT_EQ(total_rows, 5); + reader->Close(); + } + { + auto empty_batch = + arrow::RecordBatch::Make(write_schema_, 0, + {arrow::MakeEmptyArray(arrow::int64()).ValueOrDie(), + arrow::MakeEmptyArray(arrow::int8()).ValueOrDie(), + arrow::MakeEmptyArray(arrow::utf8()).ValueOrDie(), + arrow::MakeEmptyArray(arrow::int32()).ValueOrDie()}); + + auto channel_id = WriteSpillFile({empty_batch}); + ASSERT_OK_AND_ASSIGN(auto reader, CreateSpillReader(channel_id)); + + ASSERT_OK_AND_ASSIGN(auto iter, reader->NextBatch()); + if (iter != nullptr) { + ASSERT_FALSE(iter->HasNext()); + } + reader->Close(); + } +} + +INSTANTIATE_TEST_SUITE_P(CompressionTypes, SpillReaderWriterTest, + ::testing::Values("zstd", "none", "uncompressed", "lz4")); + +} // namespace paimon::test diff --git a/src/paimon/core/mergetree/spill_writer.cpp b/src/paimon/core/mergetree/spill_writer.cpp new file mode 100644 index 000000000..7dcfee130 --- /dev/null +++ b/src/paimon/core/mergetree/spill_writer.cpp @@ -0,0 +1,116 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/mergetree/spill_writer.h" + +#include "paimon/common/utils/arrow/arrow_output_stream_adapter.h" +#include "paimon/common/utils/arrow/arrow_utils.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/common/utils/scope_guard.h" +#include "paimon/core/mergetree/spill_channel_manager.h" + +namespace paimon { + +SpillWriter::SpillWriter(const std::shared_ptr& fs, + const std::shared_ptr& schema, + const std::shared_ptr& channel_enumerator, + const std::shared_ptr& spill_channel_manager, + const std::string& compression, int32_t compression_level) + : fs_(fs), + schema_(schema), + channel_enumerator_(channel_enumerator), + spill_channel_manager_(spill_channel_manager), + compression_(compression), + compression_level_(compression_level) {} + +Result> SpillWriter::Create( + const std::shared_ptr& fs, const std::shared_ptr& schema, + const std::shared_ptr& channel_enumerator, + const std::shared_ptr& spill_channel_manager, + const std::string& compression, int32_t compression_level) { + std::unique_ptr writer(new SpillWriter( + fs, schema, channel_enumerator, spill_channel_manager, compression, compression_level)); + PAIMON_RETURN_NOT_OK(writer->Open()); + return writer; +} + +Status SpillWriter::Open() { + channel_id_ = channel_enumerator_->Next(); + auto ipc_write_options = arrow::ipc::IpcWriteOptions::Defaults(); + auto cleanup_guard = ScopeGuard([&]() { + arrow_writer_.reset(); + arrow_output_stream_adapter_.reset(); + if (out_stream_) { + [[maybe_unused]] auto status = out_stream_->Close(); + out_stream_.reset(); + } + if (!channel_id_.GetPath().empty()) { + [[maybe_unused]] auto status = fs_->Delete(channel_id_.GetPath()); + } + }); + PAIMON_ASSIGN_OR_RAISE(auto arrow_compression, ArrowUtils::GetCompressionType(compression_)); + if (!arrow::util::Codec::SupportsCompressionLevel(arrow_compression)) { + compression_level_ = arrow::util::Codec::UseDefaultCompressionLevel(); + } + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( + ipc_write_options.codec, arrow::util::Codec::Create(arrow_compression, compression_level_)); + PAIMON_ASSIGN_OR_RAISE(out_stream_, fs_->Create(channel_id_.GetPath(), /*overwrite=*/false)); + arrow_output_stream_adapter_ = std::make_shared(out_stream_); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( + arrow_writer_, + arrow::ipc::MakeFileWriter(arrow_output_stream_adapter_, schema_, ipc_write_options)); + spill_channel_manager_->AddChannel(channel_id_); + cleanup_guard.Release(); + return Status::OK(); +} + +Status SpillWriter::WriteBatch(const std::shared_ptr& batch) { + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow_writer_->WriteRecordBatch(*batch)); + return Status::OK(); +} + +Status SpillWriter::Close() { + if (closed_) { + return Status::OK(); + } + if (arrow_writer_) { + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow_writer_->Close()); + } + if (out_stream_) { + PAIMON_RETURN_NOT_OK(out_stream_->Close()); + } + closed_ = true; + return Status::OK(); +} + +Result SpillWriter::GetFileSize() const { + if (channel_id_.GetPath().empty()) { + return Status::Invalid("spill writer has no channel id"); + } + if (!closed_ && arrow_output_stream_adapter_ != nullptr) { + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(int64_t file_size, arrow_output_stream_adapter_->Tell()); + return file_size; + } + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr file_status, + fs_->GetFileStatus(channel_id_.GetPath())); + return static_cast(file_status->GetLen()); +} + +const FileIOChannel::ID& SpillWriter::GetChannelId() const { + return channel_id_; +} + +} // namespace paimon diff --git a/src/paimon/core/mergetree/spill_writer.h b/src/paimon/core/mergetree/spill_writer.h new file mode 100644 index 000000000..4f6ec3cf6 --- /dev/null +++ b/src/paimon/core/mergetree/spill_writer.h @@ -0,0 +1,75 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "arrow/ipc/api.h" +#include "paimon/core/disk/file_io_channel.h" +#include "paimon/fs/file_system.h" +#include "paimon/result.h" +#include "paimon/status.h" + +namespace arrow { +class RecordBatch; +class Schema; +} // namespace arrow + +namespace paimon { + +class ArrowOutputStreamAdapter; +class SpillChannelManager; + +class SpillWriter { + public: + static Result> Create( + const std::shared_ptr& fs, const std::shared_ptr& schema, + const std::shared_ptr& channel_enumerator, + const std::shared_ptr& spill_channel_manager, + const std::string& compression, int32_t compression_level); + + SpillWriter(const SpillWriter&) = delete; + SpillWriter& operator=(const SpillWriter&) = delete; + + Status WriteBatch(const std::shared_ptr& batch); + Status Close(); + Result GetFileSize() const; + const FileIOChannel::ID& GetChannelId() const; + + private: + SpillWriter(const std::shared_ptr& fs, const std::shared_ptr& schema, + const std::shared_ptr& channel_enumerator, + const std::shared_ptr& spill_channel_manager, + const std::string& compression, int32_t compression_level); + + Status Open(); + + std::shared_ptr fs_; + std::shared_ptr schema_; + std::shared_ptr channel_enumerator_; + std::shared_ptr spill_channel_manager_; + std::string compression_; + int32_t compression_level_; + std::shared_ptr out_stream_; + std::shared_ptr arrow_output_stream_adapter_; + std::shared_ptr arrow_writer_; + FileIOChannel::ID channel_id_; + bool closed_ = false; +}; + +} // namespace paimon diff --git a/src/paimon/format/parquet/parquet_writer_builder.cpp b/src/paimon/format/parquet/parquet_writer_builder.cpp index 15a0e1d90..c2d5375c5 100644 --- a/src/paimon/format/parquet/parquet_writer_builder.cpp +++ b/src/paimon/format/parquet/parquet_writer_builder.cpp @@ -22,9 +22,9 @@ #include "arrow/util/compression.h" #include "arrow/util/type_fwd.h" #include "fmt/format.h" +#include "paimon/common/utils/arrow/arrow_utils.h" #include "paimon/common/utils/arrow/status_utils.h" #include "paimon/common/utils/options_utils.h" -#include "paimon/common/utils/string_utils.h" #include "paimon/core/core_options.h" #include "paimon/format/parquet/parquet_format_defs.h" #include "paimon/format/parquet/parquet_format_writer.h" @@ -52,13 +52,8 @@ Result> ParquetWriterBuilder::Build( Result> ParquetWriterBuilder::PrepareWriterProperties( const std::string& compression) { PAIMON_ASSIGN_OR_RAISE(CoreOptions core_options, CoreOptions::FromMap(options_)); - std::string normalized_compression = StringUtils::ToLowerCase(compression); - if (normalized_compression == "none") { - normalized_compression = "uncompressed"; - } - PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( - arrow::Compression::type compression_type, - arrow::util::Codec::GetCompressionType(StringUtils::ToLowerCase(normalized_compression))); + PAIMON_ASSIGN_OR_RAISE(arrow::Compression::type compression_type, + ArrowUtils::GetCompressionType(compression)); ::parquet::WriterProperties::Builder builder; builder.memory_pool(pool_.get()); builder.write_batch_size(batch_size_);