From 97274a603bcaad0affd24bee6e7521a6e29b13f5 Mon Sep 17 00:00:00 2001 From: meiyi Date: Fri, 15 May 2026 15:02:01 +0800 Subject: [PATCH] [improvement](cloud) Add enable_recycler config to skip recycler dynamically --- cloud/src/common/config.h | 2 + cloud/src/recycler/recycler.cpp | 45 ++++++++++-------- cloud/test/recycler_test.cpp | 81 +++++++++++++++++++++++++++++++++ 3 files changed, 110 insertions(+), 18 deletions(-) diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h index f54f65f9202815..cdc8b5cd190891 100644 --- a/cloud/src/common/config.h +++ b/cloud/src/common/config.h @@ -443,4 +443,6 @@ CONF_Bool(enable_instance_update_watcher, "true"); CONF_mBool(advance_txn_lazy_commit_during_reads, "true"); CONF_mBool(wait_txn_lazy_commit_during_reads, "true"); +// Whether to enable recycler. If false, the recycler will skip scanning instances to pending queue. +CONF_mBool(enable_recycler, "true"); } // namespace doris::cloud::config diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index 401d938a1ae6d9..84e92fa04c1d32 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -249,26 +249,30 @@ void Recycler::instance_scanner_callback() { std::this_thread::sleep_for( std::chrono::seconds(config::recycler_sleep_before_scheduling_seconds)); while (!stopped()) { - std::vector instances; - get_all_instances(txn_kv_.get(), instances); - // TODO(plat1ko): delete job recycle kv of non-existent instances - LOG(INFO) << "Recycler get instances: " << [&instances] { - std::stringstream ss; - for (auto& i : instances) ss << ' ' << i.instance_id(); - return ss.str(); - }(); - if (!instances.empty()) { - // enqueue instances - std::lock_guard lock(mtx_); - for (auto& instance : instances) { - if (instance_filter_.filter_out(instance.instance_id())) continue; - auto [_, success] = pending_instance_set_.insert(instance.instance_id()); - // skip instance already in pending queue - if (success) { - pending_instance_queue_.push_back(std::move(instance)); + if (config::enable_recycler) { + std::vector instances; + get_all_instances(txn_kv_.get(), instances); + // TODO(plat1ko): delete job recycle kv of non-existent instances + LOG(INFO) << "Recycler get instances: " << [&instances] { + std::stringstream ss; + for (auto& i : instances) ss << ' ' << i.instance_id(); + return ss.str(); + }(); + if (!instances.empty()) { + // enqueue instances + std::lock_guard lock(mtx_); + for (auto& instance : instances) { + if (instance_filter_.filter_out(instance.instance_id())) continue; + auto [_, success] = pending_instance_set_.insert(instance.instance_id()); + // skip instance already in pending queue + if (success) { + pending_instance_queue_.push_back(std::move(instance)); + } } + pending_instance_cond_.notify_all(); } - pending_instance_cond_.notify_all(); + } else { + LOG(WARNING) << "Skip recycler since enable_recycler is false"; } { std::unique_lock lock(mtx_); @@ -298,6 +302,11 @@ void Recycler::recycle_callback() { // skip instance in recycling if (recycling_instance_map_.count(instance_id)) continue; } + if (!config::enable_recycler) { + LOG(WARNING) << "Skip recycle instance_id=" << instance_id + << " since enable_recycler is false"; + continue; + } auto instance_recycler = std::make_shared( txn_kv_, instance, _thread_pool_group, txn_lazy_committer_); diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp index ffe2401862bb07..022a83eb88f50c 100644 --- a/cloud/test/recycler_test.cpp +++ b/cloud/test/recycler_test.cpp @@ -8693,4 +8693,85 @@ TEST(RecyclerTest, recycle_tablet_with_delete_file_failure) { EXPECT_EQ(it->size(), 0) << "All recycle rowset keys should be deleted"; } } + +TEST(RecyclerTest, enable_recycler_default_true) { + EXPECT_TRUE(config::enable_recycler); +} + +TEST(RecyclerTest, enable_recycler_skip_instance_scanner) { + auto txn_kv = std::make_shared(); + ASSERT_EQ(txn_kv->init(), 0); + + bool old_val = config::enable_recycler; + config::enable_recycler = false; + DORIS_CLOUD_DEFER { + config::enable_recycler = old_val; + }; + + int64_t old_recycle_interval = config::recycle_interval_seconds; + config::recycle_interval_seconds = 0; + DORIS_CLOUD_DEFER { + config::recycle_interval_seconds = old_recycle_interval; + }; + + int64_t old_sleep = config::recycler_sleep_before_scheduling_seconds; + config::recycler_sleep_before_scheduling_seconds = 0; + DORIS_CLOUD_DEFER { + config::recycler_sleep_before_scheduling_seconds = old_sleep; + }; + + Recycler recycler(txn_kv); + std::thread t([&]() { recycler.instance_scanner_callback(); }); + + // Let the callback complete one iteration: + // sleep(0) -> check enable_recycler (false, skip) -> wait_for(0, timeout) + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + recycler.stopped_ = true; + recycler.notifier_.notify_all(); + t.join(); + + EXPECT_TRUE(recycler.pending_instance_queue_.empty()); +} + +TEST(RecyclerTest, enable_recycler_skip_recycle_callback) { + auto txn_kv = std::make_shared(); + ASSERT_EQ(txn_kv->init(), 0); + + bool old_val = config::enable_recycler; + config::enable_recycler = false; + DORIS_CLOUD_DEFER { + config::enable_recycler = old_val; + }; + + Recycler recycler(txn_kv); + + InstanceInfoPB instance; + instance.set_instance_id("test_instance"); + recycler.pending_instance_queue_.push_back(instance); + recycler.pending_instance_set_.insert("test_instance"); + + std::thread t([&]() { recycler.recycle_callback(); }); + + // Wait until the callback has popped the instance from the queue. + // Can not wait on pending_instance_cond_ here because the callback does + // not notify after popping, which may cause a deadlock: both the main + // thread and the callback end up waiting on the same CV with different + // predicates and no one will wake them up. + while (true) { + { + std::lock_guard lock(recycler.mtx_); + if (recycler.pending_instance_queue_.empty()) break; + } + std::this_thread::yield(); + } + + recycler.stopped_ = true; + recycler.pending_instance_cond_.notify_all(); + t.join(); + + EXPECT_TRUE(recycler.pending_instance_queue_.empty()); + EXPECT_TRUE(recycler.pending_instance_set_.empty()); + EXPECT_TRUE(recycler.recycling_instance_map_.empty()); +} } // namespace doris::cloud