Skip to content

Commit 6cf0103

Browse files
committed
feat(spill): add SpillWriter, SpillReader and SpillChannelManager for spill-to-disk support
1 parent 3b444c7 commit 6cf0103

17 files changed

+1201
-34
lines changed

src/paimon/CMakeLists.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,10 @@ set(PAIMON_COMMON_SRCS
146146
common/utils/string_utils.cpp)
147147

148148
set(PAIMON_CORE_SRCS
149+
core/disk/file_io_channel.cpp
149150
core/disk/io_manager.cpp
151+
core/mergetree/spill_reader.cpp
152+
core/mergetree/spill_writer.cpp
150153
core/append/append_only_writer.cpp
151154
core/append/bucketed_append_compact_manager.cpp
152155
core/casting/binary_to_string_cast_executor.cpp
@@ -605,6 +608,9 @@ if(PAIMON_BUILD_TESTS)
605608
core/mergetree/merge_tree_writer_test.cpp
606609
core/mergetree/write_buffer_test.cpp
607610
core/mergetree/sorted_run_test.cpp
611+
core/mergetree/spill_channel_manager_test.cpp
612+
core/mergetree/spill_reader_test.cpp
613+
core/mergetree/spill_writer_test.cpp
608614
core/migrate/file_meta_utils_test.cpp
609615
core/operation/metrics/compaction_metrics_test.cpp
610616
core/operation/data_evolution_file_store_scan_test.cpp

src/paimon/common/utils/arrow/arrow_utils.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818

1919
#include "arrow/array/array_base.h"
2020
#include "arrow/array/array_nested.h"
21+
#include "arrow/util/compression.h"
2122
#include "paimon/common/utils/arrow/status_utils.h"
23+
#include "paimon/common/utils/string_utils.h"
2224

2325
namespace paimon {
2426
Result<std::shared_ptr<arrow::Schema>> ArrowUtils::DataTypeToSchema(
@@ -161,4 +163,14 @@ Result<std::shared_ptr<arrow::StructArray>> ArrowUtils::RemoveFieldFromStructArr
161163
return array;
162164
}
163165

166+
Result<arrow::Compression::type> ArrowUtils::GetCompressionType(const std::string& compression) {
167+
std::string normalized = StringUtils::ToLowerCase(compression);
168+
if (normalized.empty() || normalized == "none") {
169+
normalized = "uncompressed";
170+
}
171+
PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(arrow::Compression::type compression_type,
172+
arrow::util::Codec::GetCompressionType(normalized));
173+
return compression_type;
174+
}
175+
164176
} // namespace paimon

src/paimon/common/utils/arrow/arrow_utils.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <vector>
2020

2121
#include "arrow/api.h"
22+
#include "arrow/util/type_fwd.h"
2223
#include "fmt/format.h"
2324
#include "paimon/result.h"
2425

@@ -49,6 +50,10 @@ class PAIMON_EXPORT ArrowUtils {
4950
static bool EqualsIgnoreNullable(const std::shared_ptr<arrow::DataType>& type,
5051
const std::shared_ptr<arrow::DataType>& other_type);
5152

53+
/// Normalize and resolve a compression string to an Arrow compression type.
54+
/// Handles "none" and empty string by mapping them to "uncompressed".
55+
static Result<arrow::Compression::type> GetCompressionType(const std::string& compression);
56+
5257
private:
5358
static Status InnerCheckNullabilityMatch(const std::shared_ptr<arrow::Field>& field,
5459
const std::shared_ptr<arrow::Array>& data);
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Copyright 2026-present Alibaba Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "paimon/core/disk/file_io_channel.h"
18+
19+
#include <iomanip>
20+
#include <sstream>
21+
#include <utility>
22+
23+
#include "paimon/common/utils/path_util.h"
24+
25+
namespace paimon {
26+
27+
std::string FileIOChannel::GenerateRandomHexString(std::mt19937* random) {
28+
std::uniform_int_distribution<int32_t> dist(0, 255);
29+
std::ostringstream hex_stream;
30+
hex_stream << std::hex << std::setfill('0');
31+
for (int32_t i = 0; i < kRandomBytesLength; ++i) {
32+
hex_stream << std::setw(2) << dist(*random);
33+
}
34+
return hex_stream.str();
35+
}
36+
37+
FileIOChannel::ID::ID(const std::string& path) : path_(path) {}
38+
39+
FileIOChannel::ID::ID(const std::string& base_path, std::mt19937* random)
40+
: path_(PathUtil::JoinPath(base_path, GenerateRandomHexString(random) + ".channel")) {}
41+
42+
FileIOChannel::ID::ID(const std::string& base_path, const std::string& prefix, std::mt19937* random)
43+
: path_(PathUtil::JoinPath(base_path,
44+
prefix + "-" + GenerateRandomHexString(random) + ".channel")) {}
45+
46+
const std::string& FileIOChannel::ID::GetPath() const {
47+
return path_;
48+
}
49+
50+
bool FileIOChannel::ID::operator==(const ID& other) const {
51+
return path_ == other.path_;
52+
}
53+
54+
bool FileIOChannel::ID::operator!=(const ID& other) const {
55+
return !(*this == other);
56+
}
57+
58+
size_t FileIOChannel::ID::Hash::operator()(const ID& id) const {
59+
return std::hash<std::string>{}(id.path_);
60+
}
61+
62+
FileIOChannel::Enumerator::Enumerator(const std::string& base_path, std::mt19937* random)
63+
: path_(base_path), name_prefix_(GenerateRandomHexString(random)) {}
64+
65+
FileIOChannel::ID FileIOChannel::Enumerator::Next() {
66+
std::ostringstream filename;
67+
filename << name_prefix_ << "." << std::setfill('0') << std::setw(6) << (local_counter_++)
68+
<< ".channel";
69+
70+
std::string full_path = PathUtil::JoinPath(path_, filename.str());
71+
return ID(full_path);
72+
}
73+
74+
} // namespace paimon
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright 2026-present Alibaba Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#pragma once
18+
19+
#include <cstdint>
20+
#include <memory>
21+
#include <random>
22+
#include <string>
23+
24+
#include "paimon/visibility.h"
25+
26+
namespace paimon {
27+
28+
/// A FileIOChannel represents a collection of files that belong logically to the same resource.
29+
class PAIMON_EXPORT FileIOChannel {
30+
public:
31+
class PAIMON_EXPORT ID {
32+
public:
33+
ID() = default;
34+
35+
explicit ID(const std::string& path);
36+
37+
ID(const std::string& base_path, std::mt19937* random);
38+
39+
ID(const std::string& base_path, const std::string& prefix, std::mt19937* random);
40+
41+
const std::string& GetPath() const;
42+
43+
bool operator==(const ID& other) const;
44+
45+
bool operator!=(const ID& other) const;
46+
47+
struct Hash {
48+
size_t operator()(const ID& id) const;
49+
};
50+
51+
private:
52+
std::string path_;
53+
};
54+
55+
private:
56+
static constexpr int32_t kRandomBytesLength = 16;
57+
static std::string GenerateRandomHexString(std::mt19937* random);
58+
59+
public:
60+
class PAIMON_EXPORT Enumerator {
61+
public:
62+
Enumerator(const std::string& base_path, std::mt19937* random);
63+
64+
ID Next();
65+
66+
private:
67+
std::string path_;
68+
std::string name_prefix_;
69+
uint64_t local_counter_{0};
70+
};
71+
};
72+
73+
} // namespace paimon

src/paimon/core/disk/io_manager.cpp

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -13,32 +13,9 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
#include "paimon/disk/io_manager.h"
17-
18-
#include "paimon/common/utils/path_util.h"
19-
#include "paimon/common/utils/uuid.h"
16+
#include "paimon/core/disk/io_manager_impl.h"
2017

2118
namespace paimon {
22-
class IOManagerImpl : public IOManager {
23-
public:
24-
explicit IOManagerImpl(const std::string& tmp_dir) : tmp_dir_(tmp_dir) {}
25-
26-
const std::string& GetTempDir() const override {
27-
return tmp_dir_;
28-
}
29-
30-
Result<std::string> GenerateTempFilePath(const std::string& prefix) const override {
31-
std::string uuid;
32-
if (!UUID::Generate(&uuid)) {
33-
return Status::Invalid("generate uuid for io manager tmp path failed.");
34-
}
35-
return PathUtil::JoinPath(tmp_dir_, prefix + "-" + uuid + std::string(kSuffix));
36-
}
37-
38-
private:
39-
static constexpr char kSuffix[] = ".channel";
40-
std::string tmp_dir_;
41-
};
4219

4320
std::unique_ptr<IOManager> IOManager::Create(const std::string& tmp_dir) {
4421
return std::make_unique<IOManagerImpl>(tmp_dir);
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright 2026-present Alibaba Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#pragma once
18+
19+
#include <mutex>
20+
#include <random>
21+
#include <string>
22+
23+
#include "paimon/common/utils/path_util.h"
24+
#include "paimon/common/utils/uuid.h"
25+
#include "paimon/core/disk/file_io_channel.h"
26+
#include "paimon/disk/io_manager.h"
27+
28+
namespace paimon {
29+
30+
class IOManagerImpl : public IOManager {
31+
public:
32+
explicit IOManagerImpl(const std::string& tmp_dir) : tmp_dir_(tmp_dir) {
33+
std::random_device rd;
34+
random_.seed(rd());
35+
}
36+
37+
const std::string& GetTempDir() const override {
38+
return tmp_dir_;
39+
}
40+
41+
Result<std::string> GenerateTempFilePath(const std::string& prefix) const override {
42+
std::string uuid;
43+
if (!UUID::Generate(&uuid)) {
44+
return Status::Invalid("generate uuid for io manager tmp path failed.");
45+
}
46+
return PathUtil::JoinPath(tmp_dir_, prefix + "-" + uuid + std::string(kSuffix));
47+
}
48+
49+
Result<FileIOChannel::ID> CreateChannel() {
50+
std::lock_guard<std::mutex> lock(mutex_);
51+
return FileIOChannel::ID(tmp_dir_, &random_);
52+
}
53+
54+
Result<FileIOChannel::ID> CreateChannel(const std::string& prefix) {
55+
std::lock_guard<std::mutex> lock(mutex_);
56+
return FileIOChannel::ID(tmp_dir_, prefix, &random_);
57+
}
58+
59+
Result<std::shared_ptr<FileIOChannel::Enumerator>> CreateChannelEnumerator() {
60+
std::lock_guard<std::mutex> lock(mutex_);
61+
return std::make_shared<FileIOChannel::Enumerator>(tmp_dir_, &random_);
62+
}
63+
64+
private:
65+
static constexpr char kSuffix[] = ".channel";
66+
std::string tmp_dir_;
67+
std::mutex mutex_;
68+
std::mt19937 random_;
69+
};
70+
71+
} // namespace paimon

src/paimon/core/disk/io_manager_test.cpp

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,13 @@
1414
* limitations under the License.
1515
*/
1616

17-
#include "paimon/disk/io_manager.h"
18-
1917
#include <memory>
2018
#include <string>
2119

2220
#include "gtest/gtest.h"
2321
#include "paimon/common/utils/path_util.h"
2422
#include "paimon/common/utils/string_utils.h"
23+
#include "paimon/core/disk/io_manager_impl.h"
2524
#include "paimon/testing/utils/testharness.h"
2625

2726
namespace paimon::test {
@@ -63,4 +62,33 @@ TEST(IOManagerTest, GenerateTempFilePathShouldBeDifferentAcrossCalls) {
6362
ASSERT_NE(path1, path2);
6463
}
6564

65+
TEST(IOManagerTest, CreateChannelShouldReturnValidAndUniquePaths) {
66+
auto tmp_dir = UniqueTestDirectory::Create();
67+
auto manager = std::make_unique<IOManagerImpl>(tmp_dir->Str());
68+
const std::string prefix = "spill";
69+
70+
ASSERT_OK_AND_ASSIGN(auto channel1, manager->CreateChannel());
71+
ASSERT_TRUE(StringUtils::StartsWith(channel1.GetPath(), tmp_dir->Str() + "/"));
72+
ASSERT_TRUE(StringUtils::EndsWith(channel1.GetPath(), ".channel"));
73+
ASSERT_EQ(PathUtil::GetName(channel1.GetPath()).size(), 32 + std::string(".channel").size());
74+
75+
ASSERT_OK_AND_ASSIGN(auto channel2, manager->CreateChannel(prefix));
76+
ASSERT_TRUE(StringUtils::StartsWith(PathUtil::GetName(channel2.GetPath()), prefix + "-"));
77+
}
78+
79+
TEST(IOManagerTest, CreateChannelEnumeratorShouldReturnSequentialAndUniquePaths) {
80+
auto tmp_dir = UniqueTestDirectory::Create();
81+
auto manager = std::make_unique<IOManagerImpl>(tmp_dir->Str());
82+
83+
ASSERT_OK_AND_ASSIGN(auto enumerator, manager->CreateChannelEnumerator());
84+
85+
for (int i = 0; i < 10; ++i) {
86+
auto channel_id = enumerator->Next();
87+
ASSERT_TRUE(StringUtils::StartsWith(channel_id.GetPath(), tmp_dir->Str() + "/"));
88+
std::string counter_str = std::to_string(i);
89+
std::string padded_counter = std::string(6 - counter_str.size(), '0') + counter_str;
90+
ASSERT_TRUE(StringUtils::EndsWith(channel_id.GetPath(), "." + padded_counter + ".channel"));
91+
}
92+
}
93+
6694
} // namespace paimon::test

0 commit comments

Comments
 (0)