Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cloud/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -443,4 +443,6 @@ CONF_Bool(enable_instance_update_watcher, "true");
CONF_mBool(advance_txn_lazy_commit_during_reads, "true");
CONF_mBool(wait_txn_lazy_commit_during_reads, "true");

// Whether to enable recycler. If false, the recycler will skip scanning instances to pending queue.
CONF_mBool(enable_recycler, "true");
} // namespace doris::cloud::config
45 changes: 27 additions & 18 deletions cloud/src/recycler/recycler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,26 +249,30 @@ void Recycler::instance_scanner_callback() {
std::this_thread::sleep_for(
std::chrono::seconds(config::recycler_sleep_before_scheduling_seconds));
while (!stopped()) {
std::vector<InstanceInfoPB> instances;
get_all_instances(txn_kv_.get(), instances);
// TODO(plat1ko): delete job recycle kv of non-existent instances
LOG(INFO) << "Recycler get instances: " << [&instances] {
std::stringstream ss;
for (auto& i : instances) ss << ' ' << i.instance_id();
return ss.str();
}();
if (!instances.empty()) {
// enqueue instances
std::lock_guard lock(mtx_);
for (auto& instance : instances) {
if (instance_filter_.filter_out(instance.instance_id())) continue;
auto [_, success] = pending_instance_set_.insert(instance.instance_id());
// skip instance already in pending queue
if (success) {
pending_instance_queue_.push_back(std::move(instance));
if (config::enable_recycler) {
std::vector<InstanceInfoPB> instances;
get_all_instances(txn_kv_.get(), instances);
// TODO(plat1ko): delete job recycle kv of non-existent instances
LOG(INFO) << "Recycler get instances: " << [&instances] {
std::stringstream ss;
for (auto& i : instances) ss << ' ' << i.instance_id();
return ss.str();
}();
if (!instances.empty()) {
// enqueue instances
Comment thread
mymeiyi marked this conversation as resolved.
std::lock_guard lock(mtx_);
for (auto& instance : instances) {
if (instance_filter_.filter_out(instance.instance_id())) continue;
auto [_, success] = pending_instance_set_.insert(instance.instance_id());
// skip instance already in pending queue
if (success) {
pending_instance_queue_.push_back(std::move(instance));
}
}
pending_instance_cond_.notify_all();
}
pending_instance_cond_.notify_all();
} else {
LOG(WARNING) << "Skip recycler since enable_recycler is false";
}
{
std::unique_lock lock(mtx_);
Expand Down Expand Up @@ -298,6 +302,11 @@ void Recycler::recycle_callback() {
// skip instance in recycling
if (recycling_instance_map_.count(instance_id)) continue;
}
Comment thread
mymeiyi marked this conversation as resolved.
if (!config::enable_recycler) {
LOG(WARNING) << "Skip recycle instance_id=" << instance_id
<< " since enable_recycler is false";
continue;
}
auto instance_recycler = std::make_shared<InstanceRecycler>(
txn_kv_, instance, _thread_pool_group, txn_lazy_committer_);

Expand Down
81 changes: 81 additions & 0 deletions cloud/test/recycler_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8693,4 +8693,85 @@ TEST(RecyclerTest, recycle_tablet_with_delete_file_failure) {
EXPECT_EQ(it->size(), 0) << "All recycle rowset keys should be deleted";
}
}

TEST(RecyclerTest, enable_recycler_default_true) {
EXPECT_TRUE(config::enable_recycler);
}

TEST(RecyclerTest, enable_recycler_skip_instance_scanner) {
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);

bool old_val = config::enable_recycler;
config::enable_recycler = false;
DORIS_CLOUD_DEFER {
config::enable_recycler = old_val;
};

int64_t old_recycle_interval = config::recycle_interval_seconds;
config::recycle_interval_seconds = 0;
DORIS_CLOUD_DEFER {
config::recycle_interval_seconds = old_recycle_interval;
};

int64_t old_sleep = config::recycler_sleep_before_scheduling_seconds;
config::recycler_sleep_before_scheduling_seconds = 0;
DORIS_CLOUD_DEFER {
config::recycler_sleep_before_scheduling_seconds = old_sleep;
};

Recycler recycler(txn_kv);
std::thread t([&]() { recycler.instance_scanner_callback(); });

// Let the callback complete one iteration:
// sleep(0) -> check enable_recycler (false, skip) -> wait_for(0, timeout)
std::this_thread::sleep_for(std::chrono::milliseconds(100));

recycler.stopped_ = true;
recycler.notifier_.notify_all();
t.join();

EXPECT_TRUE(recycler.pending_instance_queue_.empty());
Comment thread
mymeiyi marked this conversation as resolved.
}

TEST(RecyclerTest, enable_recycler_skip_recycle_callback) {
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);

bool old_val = config::enable_recycler;
config::enable_recycler = false;
DORIS_CLOUD_DEFER {
config::enable_recycler = old_val;
};

Recycler recycler(txn_kv);

InstanceInfoPB instance;
instance.set_instance_id("test_instance");
recycler.pending_instance_queue_.push_back(instance);
recycler.pending_instance_set_.insert("test_instance");

std::thread t([&]() { recycler.recycle_callback(); });

// Wait until the callback has popped the instance from the queue.
// Can not wait on pending_instance_cond_ here because the callback does
// not notify after popping, which may cause a deadlock: both the main
// thread and the callback end up waiting on the same CV with different
// predicates and no one will wake them up.
while (true) {
{
std::lock_guard lock(recycler.mtx_);
if (recycler.pending_instance_queue_.empty()) break;
}
std::this_thread::yield();
}

recycler.stopped_ = true;
recycler.pending_instance_cond_.notify_all();
t.join();

EXPECT_TRUE(recycler.pending_instance_queue_.empty());
Comment thread
mymeiyi marked this conversation as resolved.
EXPECT_TRUE(recycler.pending_instance_set_.empty());
EXPECT_TRUE(recycler.recycling_instance_map_.empty());
}
} // namespace doris::cloud
Loading