diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h index f54f65f9202815..e100045d4fc76a 100644 --- a/cloud/src/common/config.h +++ b/cloud/src/common/config.h @@ -101,9 +101,15 @@ CONF_Strings(recycle_whitelist, ""); // Comma seprated list CONF_Strings(recycle_blacklist, ""); // Comma seprated list // IO worker thread pool concurrency: object list, delete CONF_mInt32(instance_recycler_worker_pool_size, "32"); +// Max scanned rowsets recycled for one tablet in one recycle_rowsets round. +CONF_mInt32(max_recycle_rowsets_per_tablet_batch, "10000"); // Max number of delete tasks per batch when recycling objects. // Each task deletes up to 1000 files. Controls memory usage during large-scale deletion. CONF_Int32(recycler_max_tasks_per_batch, "1000"); +// Max expired recycle_rowset entries to process for one tablet in one recycle_rowsets scan. +// Remaining entries are left for later scans so deletion can spread across tablet prefixes. +CONF_mInt32(recycle_rowsets_per_tablet_batch_size, "10000"); +CONF_mInt32(recycle_rowsets_delete_batch_size, "300000"); // The worker pool size for http api `statistics_recycle` worker pool CONF_mInt32(instance_recycler_statistics_recycle_worker_pool_size, "5"); CONF_Bool(enable_checker, "false"); diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index 401d938a1ae6d9..d45d87ac0f2bdd 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -97,6 +97,21 @@ void sleep_for_packed_file_retry() { std::this_thread::sleep_for(std::chrono::milliseconds(packed_file_retry_sleep_ms())); } +bool is_packed_slice_path(const doris::RowsetMetaCloudPB& rowset, const std::string& path) { + const auto& locations = rowset.packed_slice_locations(); + auto it = locations.find(path); + return it != locations.end() && it->second.has_packed_file_path() && + !it->second.packed_file_path().empty(); +} + +void add_file_to_delete_if_not_packed(const doris::RowsetMetaCloudPB& rowset, + const std::string& path, + std::vector* file_paths) { + if (!is_packed_slice_path(rowset, path)) { + file_paths->push_back(path); + } +} + } // namespace // return 0 for success get a key, 1 for key not found, negative for error @@ -1529,7 +1544,7 @@ int InstanceRecycler::process_single_packed_file(const std::string& packed_key, return -1; } -int InstanceRecycler::handle_packed_file_kv(std::string_view key, std::string_view /*value*/, +int InstanceRecycler::handle_packed_file_kv(std::string_view key, std::string_view value, PackedFileRecycleStats* stats, int* ret) { if (stats) { ++stats->num_scanned; @@ -1548,6 +1563,31 @@ int InstanceRecycler::handle_packed_file_kv(std::string_view key, std::string_vi return 0; } + cloud::PackedFileInfoPB packed_info; + if (!packed_info.ParseFromArray(value.data(), value.size())) { + LOG_WARNING("failed to parse packed file info from scan") + .tag("instance_id", instance_id_) + .tag("packed_file_path", packed_file_path); + if (stats) { + ++stats->num_failed; + } + if (ret) { + *ret = -1; + } + return 0; + } + + const int64_t now_sec = ::time(nullptr); + const bool due = + config::force_immediate_recycle || + now_sec - packed_info.created_at_sec() >= config::packed_file_correction_delay_seconds; + const bool need_correction = !packed_info.corrected() && due; + const bool need_recycle = + packed_info.state() == cloud::PackedFileInfoPB::RECYCLING && packed_info.ref_cnt() == 0; + if (!need_correction && !need_recycle) { + return 0; + } + std::string packed_key(key); int process_ret = process_single_packed_file(packed_key, packed_file_path, stats); if (process_ret != 0) { @@ -3143,14 +3183,19 @@ int InstanceRecycler::delete_rowset_data(const RowsetMetaCloudPB& rs_meta_pb) { int64_t tablet_id = rs_meta_pb.tablet_id(); const auto& rowset_id = rs_meta_pb.rowset_id_v2(); for (int64_t i = 0; i < num_segments; ++i) { - file_paths.push_back(segment_path(tablet_id, rowset_id, i)); + add_file_to_delete_if_not_packed(rs_meta_pb, segment_path(tablet_id, rowset_id, i), + &file_paths); if (index_format == InvertedIndexStorageFormatPB::V1) { for (const auto& index_id : index_ids) { - file_paths.push_back(inverted_index_path_v1(tablet_id, rowset_id, i, index_id.first, - index_id.second)); + add_file_to_delete_if_not_packed( + rs_meta_pb, + inverted_index_path_v1(tablet_id, rowset_id, i, index_id.first, + index_id.second), + &file_paths); } } else if (!index_ids.empty()) { - file_paths.push_back(inverted_index_path_v2(tablet_id, rowset_id, i)); + add_file_to_delete_if_not_packed( + rs_meta_pb, inverted_index_path_v2(tablet_id, rowset_id, i), &file_paths); } } @@ -3894,11 +3939,15 @@ int InstanceRecycler::delete_rowset_data( continue; } for (int64_t i = 0; i < num_segments; ++i) { - file_paths.push_back(segment_path(tablet_id, rowset_id, i)); + add_file_to_delete_if_not_packed(rs, segment_path(tablet_id, rowset_id, i), + &file_paths); if (index_format == InvertedIndexStorageFormatPB::V1) { for (const auto& index_id : index_ids) { - file_paths.push_back(inverted_index_path_v1(tablet_id, rowset_id, i, - index_id.first, index_id.second)); + add_file_to_delete_if_not_packed( + rs, + inverted_index_path_v1(tablet_id, rowset_id, i, index_id.first, + index_id.second), + &file_paths); } } else if (!index_ids.empty() || inverted_index_get_ret == 1) { // try to recycle inverted index v2 when get_ret == 1 @@ -3910,7 +3959,8 @@ int InstanceRecycler::delete_rowset_data( .tag("inverted index v2 path", inverted_index_path_v2(tablet_id, rowset_id, i)); } - file_paths.push_back(inverted_index_path_v2(tablet_id, rowset_id, i)); + add_file_to_delete_if_not_packed( + rs, inverted_index_path_v2(tablet_id, rowset_id, i), &file_paths); } } } @@ -4892,17 +4942,59 @@ int InstanceRecycler::recycle_rowsets() { .tag("expired_rowset_meta_size", expired_rowset_size); }; - std::vector rowset_keys; + struct RecycleRowsetEntry { + std::string key; + doris::RowsetMetaCloudPB meta; + }; + // Store the scanned recycle key with rowset meta. The scanned key is the actual KV key to delete. + std::vector rowsets; + int64_t current_tablet_id = -1; + int64_t recycled_rowset_count_for_current_tablet = 0; + bool current_tablet_skip_logged = false; + std::string next_scan_begin; + const int64_t rowset_batch_size_per_tablet = + std::max(1, config::recycle_rowsets_per_tablet_batch_size); + const int64_t delete_rowset_batch_size = + std::min(500000, config::recycle_rowsets_delete_batch_size); + auto try_reserve_tablet_recycle_slot = [&](int64_t tablet_id) -> bool { + if (current_tablet_id != tablet_id) { + current_tablet_id = tablet_id; + recycled_rowset_count_for_current_tablet = 0; + current_tablet_skip_logged = false; + } + if (recycled_rowset_count_for_current_tablet >= rowset_batch_size_per_tablet) { + if (!current_tablet_skip_logged) { + LOG_INFO( + "skip recycle rowsets for tablet because per-tablet batch limit is reached") + .tag("instance_id", instance_id_) + .tag("tablet_id", tablet_id) + .tag("limit", rowset_batch_size_per_tablet); + current_tablet_skip_logged = true; + } + const int64_t next_tablet_id = tablet_id == INT64_MAX ? INT64_MAX : tablet_id + 1; + recycle_rowset_key({instance_id_, next_tablet_id, ""}, &next_scan_begin); + return false; + } + ++recycled_rowset_count_for_current_tablet; + return true; + }; + auto next_scan_begin_getter = [&](std::string* begin) -> bool { + if (next_scan_begin.empty()) { + return false; + } + *begin = std::move(next_scan_begin); + next_scan_begin.clear(); + return true; + }; + std::vector rowset_keys_to_mark_recycled; std::vector rowset_keys_to_abort; std::vector prepare_rowset_keys_to_delete; - // rowset_id -> rowset_meta - // store rowset id and meta for statistics rs size when delete - std::map rowsets; // Store keys of rowset recycled by background workers std::mutex async_recycled_rowset_keys_mutex; std::vector async_recycled_rowset_keys; + std::vector rowset_keys_without_data; auto worker_pool = std::make_unique( config::instance_recycler_worker_pool_size, "recycle_rowsets"); worker_pool->start(); @@ -4957,7 +5049,7 @@ int InstanceRecycler::recycle_rowsets() { if (delete_versioned_delete_bitmap_kvs(tablet_id, rowset_id) != 0) { return -1; } - rowset_keys.push_back(std::move(key)); + rowset_keys_without_data.push_back(std::move(key)); return 0; }; @@ -4985,6 +5077,12 @@ int InstanceRecycler::recycle_rowsets() { ++num_expired; expired_rowset_size += v.size(); + int64_t tablet_id = + rowset.has_type() ? rowset.rowset_meta().tablet_id() : rowset.tablet_id(); + if (!try_reserve_tablet_recycle_slot(tablet_id)) { + return 0; + } + if (!rowset.has_type()) { // old version `RecycleRowsetPB` if (!rowset.has_resource_id()) [[unlikely]] { // impossible // in old version, keep this key-value pair and it needs to be checked manually @@ -4995,8 +5093,8 @@ int InstanceRecycler::recycle_rowsets() { // old version `RecycleRowsetPB` may has empty resource_id, just remove the kv. LOG(INFO) << "delete the recycle rowset kv that has empty resource_id, key=" << hex(k) << " value=" << proto_to_json(rowset); - rowset_keys.emplace_back(k); - return -1; + rowset_keys_without_data.emplace_back(k); + return 0; } // decode rowset_id auto k1 = k; @@ -5070,75 +5168,22 @@ int InstanceRecycler::recycle_rowsets() { prepare_rowset_keys_to_delete.emplace_back(k); } else { num_compacted += rowset.type() == RecycleRowsetPB::COMPACT; - rowset_keys.emplace_back(k); - rowsets.emplace(rowset_meta->rowset_id_v2(), std::move(*rowset_meta)); - if (rowset_meta->num_segments() <= 0) { // Skip empty rowset + if (rowset_meta->num_segments() > 0) { // Skip empty rowset + rowsets.emplace_back(std::string(k), std::move(*rowset_meta)); + } else { ++num_empty_rowset; + rowset_keys_without_data.emplace_back(k); } } return 0; }; - auto loop_done = [&]() -> int { - std::vector rowset_keys_to_delete; - std::vector mark_keys_to_process; - std::vector abort_keys_to_process; - std::vector prepare_keys_to_process; - // rowset_id -> rowset_meta - // store rowset id and meta for statistics rs size when delete - std::map rowsets_to_delete; - rowset_keys_to_delete.swap(rowset_keys); - mark_keys_to_process.swap(rowset_keys_to_mark_recycled); - abort_keys_to_process.swap(rowset_keys_to_abort); - prepare_keys_to_process.swap(prepare_rowset_keys_to_delete); - rowsets_to_delete.swap(rowsets); - worker_pool->submit([&, rowset_keys_to_delete = std::move(rowset_keys_to_delete), - rowsets_to_delete = std::move(rowsets_to_delete), - prepare_keys_to_process = std::move(prepare_keys_to_process), - mark_keys_to_process = std::move(mark_keys_to_process), - abort_keys_to_process = std::move(abort_keys_to_process)]() mutable { - if (!mark_keys_to_process.empty() && - batch_mark_rowsets_as_recycled(txn_kv_.get(), instance_id_, - mark_keys_to_process) != 0) { - LOG(WARNING) << "failed to batch mark recycle rowsets as recycled, instance_id=" - << instance_id_; - return; - } - if (!abort_keys_to_process.empty() && - batch_abort_txn_or_job_for_recycle(abort_keys_to_process, true) != - 0) { - return; - } - std::vector prepare_delete_tasks; - if (!prepare_keys_to_process.empty() && - collect_prepare_delete_tasks(txn_kv_.get(), instance_id_, prepare_keys_to_process, - &prepare_delete_tasks) != 0) { - LOG(WARNING) << "failed to collect prepare rowset delete tasks, instance_id=" - << instance_id_; - return; - } - if (!prepare_delete_tasks.empty()) { - std::vector prepare_rowset_keys_to_delete; - prepare_rowset_keys_to_delete.reserve(prepare_delete_tasks.size()); - for (const auto& task : prepare_delete_tasks) { - if (delete_rowset_data(task.resource_id, task.tablet_id, task.rowset_id) != 0) { - LOG(WARNING) << "failed to delete rowset data, key=" << hex(task.key); - return; - } - if (delete_versioned_delete_bitmap_kvs(task.tablet_id, task.rowset_id) != 0) { - return; - } - prepare_rowset_keys_to_delete.emplace_back(task.key); - } - if (txn_remove(txn_kv_.get(), prepare_rowset_keys_to_delete) != 0) { - LOG(WARNING) << "failed to delete recycle rowset kv, instance_id=" - << instance_id_; - return; - } - num_recycled.fetch_add(prepare_rowset_keys_to_delete.size(), - std::memory_order_relaxed); - } - if (delete_rowset_data(rowsets_to_delete, RowsetRecyclingState::FORMAL_ROWSET, + auto submit_delete_rowset_data_job = [&](std::vector rowset_keys, + std::map rowsets) { + worker_pool->submit([&, rowset_keys_to_delete = std::move(rowset_keys), + rowsets_to_delete = std::move(rowsets)]() { + if (!rowsets_to_delete.empty() && + delete_rowset_data(rowsets_to_delete, RowsetRecyclingState::FORMAL_ROWSET, metrics_context) != 0) { LOG(WARNING) << "failed to delete rowset data, instance_id=" << instance_id_; return; @@ -5152,8 +5197,61 @@ int InstanceRecycler::recycle_rowsets() { LOG(WARNING) << "failed to delete recycle rowset kv, instance_id=" << instance_id_; return; } + num_recycled.fetch_add(rowset_keys_to_delete.size(), std::memory_order_relaxed); }); + }; + + bool scan_finished = false; + auto loop_done = [&]() -> int { + if (!scan_finished && rowsets.size() < delete_rowset_batch_size) { + return 0; + } + DORIS_CLOUD_DEFER { + // if return -1 in loop done, rowset info in memory is not cleared, + // it can lead to memory accumulation + rowset_keys_without_data.clear(); + rowsets.clear(); + }; + std::random_device rd; + std::mt19937 g(rd()); + std::ranges::shuffle(rowsets, g); + + std::vector rowset_keys_to_delete; + rowset_keys_to_delete.reserve(rowset_batch_size_per_tablet); + // rowset_id -> rowset_meta + // store rowset id and meta for statistics rs size when delete + std::map rowsets_to_delete; + + size_t rowsets_per_batch_size = 0; + for (auto& rowset : rowsets) { + rowset_keys_to_delete.emplace_back(std::move(rowset.key)); + rowsets_to_delete.emplace(rowset.meta.rowset_id_v2(), std::move(rowset.meta)); + if (++rowsets_per_batch_size < rowset_batch_size_per_tablet) { + continue; + } + + submit_delete_rowset_data_job(std::move(rowset_keys_to_delete), + std::move(rowsets_to_delete)); + rowsets_per_batch_size = 0; + rowset_keys_to_delete.clear(); + rowsets_to_delete.clear(); + } + + if (!rowset_keys_to_delete.empty() || !rowsets_to_delete.empty()) { + submit_delete_rowset_data_job(std::move(rowset_keys_to_delete), + std::move(rowsets_to_delete)); + } + + for (size_t i = 0; i < rowset_keys_without_data.size(); i += rowset_batch_size_per_tablet) { + auto begin = rowset_keys_without_data.begin() + i; + auto end = rowset_keys_without_data.begin() + + std::min(i + rowset_batch_size_per_tablet, rowset_keys_without_data.size()); + std::vector rowset_keys_to_remove(std::make_move_iterator(begin), + std::make_move_iterator(end)); + submit_delete_rowset_data_job(std::move(rowset_keys_to_remove), {}); + } + return 0; }; @@ -5161,8 +5259,16 @@ int InstanceRecycler::recycle_rowsets() { scan_and_statistics_rowsets(); } // recycle_func and loop_done for scan and recycle - int ret = scan_and_recycle(recyc_rs_key0, recyc_rs_key1, std::move(handle_rowset_kv), - std::move(loop_done)); + int ret = scan_and_recycle(recyc_rs_key0, recyc_rs_key1, std::move(handle_rowset_kv), loop_done, + std::move(next_scan_begin_getter)); + scan_finished = true; + // if the size of rowsets is always less than delete_rowset_batch_size + // it need to submit the task directly + // else if the size of rowsets is greater than delete_rowset_batch_size, + // but there are residual, whether due to failed or unsuccessful cleanup, this behavior is idempotent + if (loop_done() != 0) { + ret = -1; + } worker_pool->stop(); @@ -5182,6 +5288,134 @@ int InstanceRecycler::recycle_rowsets() { return ret; } +static int decode_recycle_rowset_tablet_id(std::string_view key, int64_t* tablet_id) { + DCHECK(tablet_id != nullptr); + std::string_view key_without_prefix = key; + key_without_prefix.remove_prefix(1); + std::vector, int, int>> out; + if (decode_key(&key_without_prefix, &out) != 0) { + return -1; + } + if (out.size() < 5) { + return -1; + } + try { + *tablet_id = std::get(std::get<0>(out[3])); + } catch (const std::bad_variant_access&) { + return -1; + } + return 0; +} + +int InstanceRecycler::next_recycle_rowset_tablet_key(const std::string& instance_id, + int64_t tablet_id, std::string* next_key) { + DCHECK(next_key != nullptr); + if (tablet_id == std::numeric_limits::max()) { + return -1; + } + *next_key = recycle_rowset_key({instance_id, tablet_id + 1, ""}); + return 0; +} + +int InstanceRecycler::scan_recycle_rowsets_by_tablet( + std::string begin, std::string_view end, + std::function recycle_func, + std::function loop_done) { + LOG(INFO) << "begin scan_recycle_rowsets_by_tablet key_range=[" << hex(begin) << "," << hex(end) + << ")"; + int ret = 0; + int64_t cnt = 0; + int64_t num_skip_tablets = 0; + int get_range_retried = 0; + std::string err; + DORIS_CLOUD_DEFER_COPY(begin, end) { + LOG(INFO) << "finish scan_recycle_rowsets_by_tablet key_range=[" << hex(begin) << "," + << hex(end) << ") num_scanned=" << cnt << " num_skip_tablets=" << num_skip_tablets + << " get_range_retried=" << get_range_retried << " ret=" << ret << " err=" << err; + }; + + const size_t max_rowsets_per_tablet = + std::max(1, config::max_recycle_rowsets_per_tablet_batch); + std::unique_ptr it; + int64_t tablet_id = -1; + size_t num_tablet_rowsets = 0; + while (it == nullptr /* may be not init */ || (it->more() && !stopped())) { + if (get_range_retried > 1000) { + err = "txn_get exceeds max retry(1000), may not scan all keys"; + ret = -3; + return ret; + } + int get_ret = txn_get(txn_kv_.get(), begin, end, it); + if (get_ret != 0) { + LOG(WARNING) << "failed to get kv, range=[" << hex(begin) << "," << hex(end) + << ") num_scanned=" << cnt << " txn_get_ret=" << get_ret + << " get_range_retried=" << get_range_retried; + ++get_range_retried; + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + continue; + } + if (!it->has_next()) { + LOG(INFO) << "no keys in the given range=[" << hex(begin) << "," << hex(end) << ")"; + break; + } + + bool begin_updated = false; + while (it->has_next()) { + ++cnt; + auto [k, v] = it->next(); + int64_t key_tablet_id = -1; + if (decode_recycle_rowset_tablet_id(k, &key_tablet_id) != 0) { + LOG_WARNING("failed to decode recycle rowset key").tag("key", hex(k)); + err = "decode recycle rowset key error"; + ret = -1; + } + + if (tablet_id < 0) { + tablet_id = key_tablet_id; + } else if (key_tablet_id != tablet_id) { + tablet_id = key_tablet_id; + num_tablet_rowsets = 0; + } + + if (ret == 0) { + // FIXME(gavin): if we want to continue scanning, the recycle_func should not return non-zero + if (recycle_func(k, v) != 0) { + err = "recycle_func error"; + ret = -1; + } + } + + if (++num_tablet_rowsets >= max_rowsets_per_tablet) { + if (next_recycle_rowset_tablet_key(instance_id_, key_tablet_id, &begin) != 0) { + begin = k; + begin.push_back('\x00'); + } else { + ++num_skip_tablets; + } + begin_updated = true; + break; + } + + if (!it->has_next()) { + begin = k; + VLOG_DEBUG << "iterator has no more kvs. key=" << hex(k); + } + } + if (ret != 0) { + break; + } + if (!begin_updated) { + begin.push_back('\x00'); + } + // FIXME(gavin): if we want to continue scanning, the loop_done should not return non-zero + if (loop_done && loop_done() != 0) { + err = "loop_done error"; + ret = -1; + } + } + return ret; +} + int InstanceRecycler::recycle_restore_jobs() { const std::string task_name = "recycle_restore_jobs"; int64_t num_scanned = 0; @@ -5962,7 +6196,7 @@ int InstanceRecycler::recycle_tmp_rowsets() { int InstanceRecycler::scan_and_recycle( std::string begin, std::string_view end, std::function recycle_func, - std::function loop_done) { + std::function loop_done, std::function next_begin_getter) { LOG(INFO) << "begin scan_and_recycle key_range=[" << hex(begin) << "," << hex(end) << ")"; int ret = 0; int64_t cnt = 0; @@ -5994,6 +6228,9 @@ int InstanceRecycler::scan_and_recycle( LOG(INFO) << "no keys in the given range=[" << hex(begin) << "," << hex(end) << ")"; break; // scan finished } + bool begin_updated = false; + LOG(INFO) << "scan_and_recycle iterator key_range=[" << hex(begin) << "," << hex(end) + << ") iterator->size()=" << it->size(); while (it->has_next()) { ++cnt; // recycle corresponding resources @@ -6007,9 +6244,30 @@ int InstanceRecycler::scan_and_recycle( err = "recycle_func error"; ret = -1; } + if (next_begin_getter) { + std::string next_begin; + if (next_begin_getter(&next_begin)) { + if (next_begin > k) { + begin = std::move(next_begin); + begin_updated = true; + VLOG_DEBUG << "scan_and_recycle updates begin to " << hex(begin) + << " after key=" << hex(k); + break; + } + LOG_WARNING("ignore invalid next begin in scan_and_recycle") + .tag("next_begin", hex(next_begin)) + .tag("current_key", hex(k)); + } + } } - begin.push_back('\x00'); // Update to next smallest key for iteration + if (!begin_updated) { + begin.push_back('\x00'); // Update to next smallest key for iteration + } else { + it.reset(); + } + // FIXME(gavin): if we want to continue scanning, the loop_done should not return non-zero + // if we want to continue scanning, the recycle_func should not return non-zero if (loop_done && loop_done() != 0) { err = "loop_done error"; ret = -1; diff --git a/cloud/src/recycler/recycler.h b/cloud/src/recycler/recycler.h index 84062f4103986a..690d46d2e73d5f 100644 --- a/cloud/src/recycler/recycler.h +++ b/cloud/src/recycler/recycler.h @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -32,6 +33,7 @@ #include #include #include +#include #include "common/bvars.h" #include "meta-service/delete_bitmap_lock_white_list.h" @@ -445,7 +447,16 @@ class InstanceRecycler { */ int scan_and_recycle(std::string begin, std::string_view end, std::function recycle_func, - std::function loop_done = nullptr); + std::function loop_done = nullptr, + std::function next_begin_getter = nullptr); + + static int next_recycle_rowset_tablet_key(const std::string& instance_id, int64_t tablet_id, + std::string* next_key); + + int scan_recycle_rowsets_by_tablet( + std::string begin, std::string_view end, + std::function recycle_func, + std::function loop_done = nullptr); // return 0 for success otherwise error int delete_rowset_data(const doris::RowsetMetaCloudPB& rs_meta_pb); diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp index ffe2401862bb07..d83fe9f24f7842 100644 --- a/cloud/test/recycler_test.cpp +++ b/cloud/test/recycler_test.cpp @@ -327,6 +327,43 @@ static int create_recycle_rowset(TxnKv* txn_kv, StorageVaultAccessor* accessor, return 0; } +static int create_recycle_rowset_kv(TxnKv* txn_kv, const doris::RowsetMetaCloudPB& rowset) { + RecycleRowsetPB rowset_pb; + rowset_pb.set_creation_time(current_time); + rowset_pb.set_type(RecycleRowsetPB::COMPACT); + rowset_pb.mutable_rowset_meta()->CopyFrom(rowset); + std::string key = recycle_rowset_key({instance_id, rowset.tablet_id(), rowset.rowset_id_v2()}); + std::string val; + rowset_pb.SerializeToString(&val); + + std::unique_ptr txn; + if (txn_kv->create_txn(&txn) != TxnErrorCode::TXN_OK) { + return -1; + } + txn->put(key, val); + return txn->commit() == TxnErrorCode::TXN_OK ? 0 : -1; +} + +static size_t count_recycle_rowsets(TxnKv* txn_kv) { + std::unique_ptr txn; + EXPECT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); + std::unique_ptr it; + auto begin_key = recycle_key_prefix(instance_id); + auto end_key = recycle_key_prefix(instance_id + '\xff'); + EXPECT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK); + return it->size(); +} + +static size_t count_recycle_rowsets(TxnKv* txn_kv, int64_t tablet_id) { + std::unique_ptr txn; + EXPECT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); + std::unique_ptr it; + auto begin_key = recycle_rowset_key({instance_id, tablet_id, ""}); + auto end_key = recycle_rowset_key({instance_id, tablet_id, "\xff"}); + EXPECT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK); + return it->size(); +} + static int create_tmp_rowset(TxnKv* txn_kv, StorageVaultAccessor* accessor, const doris::RowsetMetaCloudPB& rowset, bool write_schema_kv, bool is_inverted_idx_v2 = false, @@ -1376,6 +1413,206 @@ TEST(RecyclerTest, recycle_rowsets) { check_delete_bitmap_file_size(accessor, tablet_id, 0); } +TEST(RecyclerTest, scan_recycle_rowsets_by_tablet_skips_to_next_tablet_on_batch_limit) { + auto txn_kv = std::make_shared(); + ASSERT_EQ(txn_kv->init(), 0); + + InstanceInfoPB instance; + instance.set_instance_id(instance_id); + auto obj_info = instance.add_obj_info(); + obj_info->set_id("scan_rowsets_by_tablet_limit"); + + InstanceRecycler recycler(txn_kv, instance, thread_group, + std::make_shared(txn_kv)); + + doris::TabletSchemaCloudPB schema; + schema.set_schema_version(1); + constexpr int64_t index_id = 10001; + constexpr int64_t first_tablet_id = 10002; + constexpr int64_t second_tablet_id = 10020; + for (int i = 0; i < 5; ++i) { + auto rowset = + create_rowset("scan_rowsets_by_tablet_limit", first_tablet_id, index_id, 0, schema); + ASSERT_EQ(create_recycle_rowset_kv(txn_kv.get(), rowset), 0); + } + for (int i = 0; i < 2; ++i) { + auto rowset = create_rowset("scan_rowsets_by_tablet_limit", second_tablet_id, index_id, 0, + schema); + ASSERT_EQ(create_recycle_rowset_kv(txn_kv.get(), rowset), 0); + } + + auto old_worker_pool_size = config::instance_recycler_worker_pool_size; + auto old_max_rowsets_per_tablet = config::max_recycle_rowsets_per_tablet_batch; + config::instance_recycler_worker_pool_size = 10; + config::max_recycle_rowsets_per_tablet_batch = 3; + DORIS_CLOUD_DEFER { + config::instance_recycler_worker_pool_size = old_worker_pool_size; + config::max_recycle_rowsets_per_tablet_batch = old_max_rowsets_per_tablet; + }; + + std::vector scanned_tablets; + auto recycle_func = [&](std::string_view k, std::string_view) -> int { + std::string_view k1 = k; + k1.remove_prefix(1); + std::vector, int, int>> out; + decode_key(&k1, &out); + scanned_tablets.push_back(std::get(std::get<0>(out[3]))); + return 0; + }; + + ASSERT_EQ(recycler.scan_recycle_rowsets_by_tablet( + recycle_rowset_key({instance_id, 0, ""}), + recycle_rowset_key({instance_id, INT64_MAX, ""}), recycle_func), + 0); + EXPECT_EQ(scanned_tablets, + std::vector({first_tablet_id, first_tablet_id, first_tablet_id, + second_tablet_id, second_tablet_id})); +} + +TEST(RecyclerTest, scan_recycle_rowsets_by_tablet_accumulates_limit_across_range_gets) { + auto txn_kv = std::make_shared(); + ASSERT_EQ(txn_kv->init(), 0); + + InstanceInfoPB instance; + instance.set_instance_id(instance_id); + auto obj_info = instance.add_obj_info(); + obj_info->set_id("scan_rowsets_by_tablet_cross_range_limit"); + + InstanceRecycler recycler(txn_kv, instance, thread_group, + std::make_shared(txn_kv)); + + doris::TabletSchemaCloudPB schema; + schema.set_schema_version(1); + constexpr int64_t index_id = 10001; + constexpr int64_t first_tablet_id = 10002; + constexpr int64_t second_tablet_id = 10020; + for (int i = 0; i < 5; ++i) { + auto rowset = create_rowset("scan_rowsets_by_tablet_cross_range_limit", first_tablet_id, + index_id, 0, schema); + ASSERT_EQ(create_recycle_rowset_kv(txn_kv.get(), rowset), 0); + } + for (int i = 0; i < 2; ++i) { + auto rowset = create_rowset("scan_rowsets_by_tablet_cross_range_limit", second_tablet_id, + index_id, 0, schema); + ASSERT_EQ(create_recycle_rowset_kv(txn_kv.get(), rowset), 0); + } + + auto old_max_rowsets_per_tablet = config::max_recycle_rowsets_per_tablet_batch; + config::max_recycle_rowsets_per_tablet_batch = 3; + DORIS_CLOUD_DEFER { + config::max_recycle_rowsets_per_tablet_batch = old_max_rowsets_per_tablet; + SyncPoint::get_instance()->clear_all_call_backs(); + }; + + auto sp = SyncPoint::get_instance(); + sp->set_call_back("memkv::Transaction::get", [](auto&& args) { + auto* limit = try_any_cast(args[0]); + *limit = 2; + }); + sp->enable_processing(); + + std::vector scanned_tablets; + auto recycle_func = [&](std::string_view k, std::string_view) -> int { + std::string_view k1 = k; + k1.remove_prefix(1); + std::vector, int, int>> out; + decode_key(&k1, &out); + scanned_tablets.push_back(std::get(std::get<0>(out[3]))); + return 0; + }; + + ASSERT_EQ(recycler.scan_recycle_rowsets_by_tablet( + recycle_rowset_key({instance_id, 0, ""}), + recycle_rowset_key({instance_id, INT64_MAX, ""}), recycle_func), + 0); + EXPECT_EQ(scanned_tablets, + std::vector({first_tablet_id, first_tablet_id, first_tablet_id, + second_tablet_id, second_tablet_id})); +} + +TEST(RecyclerTest, next_recycle_rowset_tablet_key_overwrites_existing_buffer) { + std::string next_key = recycle_rowset_key({instance_id, 10002, "rowset"}); + ASSERT_EQ(InstanceRecycler::next_recycle_rowset_tablet_key(instance_id, 10002, &next_key), 0); + + std::string_view k1 = next_key; + k1.remove_prefix(1); + std::vector, int, int>> out; + ASSERT_EQ(decode_key(&k1, &out), 0); + EXPECT_EQ(std::get(std::get<0>(out[3])), 10003); + EXPECT_TRUE(std::get(std::get<0>(out[4])).empty()); +} + +TEST(RecyclerTest, recycle_rowsets_tablet_batch_limit_recycles_remaining_in_next_round) { + config::retention_seconds = 0; + auto txn_kv = std::make_shared(); + ASSERT_EQ(txn_kv->init(), 0); + + InstanceInfoPB instance; + instance.set_instance_id(instance_id); + auto obj_info = instance.add_obj_info(); + obj_info->set_id("recycle_rowsets_batch_limit"); + obj_info->set_ak(config::test_s3_ak); + obj_info->set_sk(config::test_s3_sk); + obj_info->set_endpoint(config::test_s3_endpoint); + obj_info->set_region(config::test_s3_region); + obj_info->set_bucket(config::test_s3_bucket); + obj_info->set_prefix("recycle_rowsets_batch_limit"); + + auto old_worker_pool_size = config::instance_recycler_worker_pool_size; + auto old_max_rowsets_per_tablet = config::max_recycle_rowsets_per_tablet_batch; + auto old_enable_mark = config::enable_mark_delete_rowset_before_recycle; + config::instance_recycler_worker_pool_size = 1; + config::max_recycle_rowsets_per_tablet_batch = 3; + config::enable_mark_delete_rowset_before_recycle = false; + DORIS_CLOUD_DEFER { + config::instance_recycler_worker_pool_size = old_worker_pool_size; + config::max_recycle_rowsets_per_tablet_batch = old_max_rowsets_per_tablet; + config::enable_mark_delete_rowset_before_recycle = old_enable_mark; + }; + + InstanceRecycler recycler(txn_kv, instance, thread_group, + std::make_shared(txn_kv)); + ASSERT_EQ(recycler.init(), 0); + auto accessor = recycler.accessor_map_.begin()->second; + + doris::TabletSchemaCloudPB schema; + schema.set_schema_version(1); + constexpr int64_t index_id = 10001; + constexpr int64_t first_tablet_id = 10002; + constexpr int64_t second_tablet_id = 10020; + std::vector first_tablet_rowset_ids; + for (int i = 0; i < 5; ++i) { + auto rowset = + create_rowset("recycle_rowsets_batch_limit", first_tablet_id, index_id, 1, schema); + first_tablet_rowset_ids.push_back(rowset.rowset_id_v2()); + ASSERT_EQ(create_recycle_rowset(txn_kv.get(), accessor.get(), rowset, + RecycleRowsetPB::COMPACT, true), + 0); + } + for (int i = 0; i < 2; ++i) { + auto rowset = + create_rowset("recycle_rowsets_batch_limit", second_tablet_id, index_id, 1, schema); + ASSERT_EQ(create_recycle_rowset(txn_kv.get(), accessor.get(), rowset, + RecycleRowsetPB::COMPACT, true), + 0); + } + + ASSERT_EQ(recycler.recycle_rowsets(), 0); + for (int i = 0; i < 3; ++i) { + EXPECT_EQ(accessor->exists(segment_path(first_tablet_id, first_tablet_rowset_ids[i], 0)), + 1); + } + for (int i = 3; i < 5; ++i) { + EXPECT_EQ(accessor->exists(segment_path(first_tablet_id, first_tablet_rowset_ids[i], 0)), + 0); + } + + ASSERT_EQ(recycler.recycle_rowsets(), 0); + for (const auto& rowset_id : first_tablet_rowset_ids) { + EXPECT_EQ(accessor->exists(segment_path(first_tablet_id, rowset_id, 0)), 1); + } +} + TEST(RecyclerTest, recycle_rowsets_with_data_ref_count) { config::retention_seconds = 0; auto txn_kv = std::make_shared(); @@ -1467,6 +1704,311 @@ TEST(RecyclerTest, recycle_rowsets_with_data_ref_count) { check_delete_bitmap_file_size(accessor, tablet_id, 3); } +TEST(RecyclerTest, recycle_rowsets_limit_per_tablet_batch) { + config::retention_seconds = 0; + auto origin_worker_pool_size = config::instance_recycler_worker_pool_size; + auto origin_batch_size = config::recycle_rowsets_per_tablet_batch_size; + config::instance_recycler_worker_pool_size = 4; + config::recycle_rowsets_per_tablet_batch_size = 2; + DORIS_CLOUD_DEFER { + config::instance_recycler_worker_pool_size = origin_worker_pool_size; + config::recycle_rowsets_per_tablet_batch_size = origin_batch_size; + }; + + auto txn_kv = std::make_shared(); + ASSERT_EQ(txn_kv->init(), 0); + + InstanceInfoPB instance; + instance.set_instance_id(instance_id); + auto obj_info = instance.add_obj_info(); + obj_info->set_id("recycle_rowsets_limit_per_tablet_batch"); + obj_info->set_ak(config::test_s3_ak); + obj_info->set_sk(config::test_s3_sk); + obj_info->set_endpoint(config::test_s3_endpoint); + obj_info->set_region(config::test_s3_region); + obj_info->set_bucket(config::test_s3_bucket); + obj_info->set_prefix("recycle_rowsets_limit_per_tablet_batch"); + + InstanceRecycler recycler(txn_kv, instance, thread_group, + std::make_shared(txn_kv)); + ASSERT_EQ(recycler.init(), 0); + auto accessor = recycler.accessor_map_.begin()->second; + + doris::TabletSchemaCloudPB schema; + schema.set_schema_version(1); + schema.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V1); + + constexpr int index_id = 10001; + constexpr int64_t tablet_id0 = 100020; + constexpr int64_t tablet_id1 = 100021; + for (int64_t tablet_id : {tablet_id0, tablet_id1}) { + for (int i = 0; i < 5; ++i) { + auto rowset = create_rowset("recycle_rowsets_limit_per_tablet_batch", tablet_id, + index_id, 1, schema); + ASSERT_EQ(0, create_recycle_rowset(txn_kv.get(), accessor.get(), rowset, + RecycleRowsetPB::COMPACT, true)); + } + } + + auto count_recycle_rowsets = [&](int64_t tablet_id) { + std::unique_ptr txn; + EXPECT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); + std::unique_ptr it; + auto begin_key = recycle_rowset_key({instance_id, tablet_id, ""}); + auto end_key = recycle_rowset_key({instance_id, tablet_id, "\xff"}); + EXPECT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK); + return it->size(); + }; + + ASSERT_EQ(recycler.recycle_rowsets(), 0); + EXPECT_EQ(count_recycle_rowsets(tablet_id0), 3); + EXPECT_EQ(count_recycle_rowsets(tablet_id1), 3); + + ASSERT_EQ(recycler.recycle_rowsets(), 0); + EXPECT_EQ(count_recycle_rowsets(tablet_id0), 1); + EXPECT_EQ(count_recycle_rowsets(tablet_id1), 1); + + ASSERT_EQ(recycler.recycle_rowsets(), 0); + EXPECT_EQ(count_recycle_rowsets(tablet_id0), 0); + EXPECT_EQ(count_recycle_rowsets(tablet_id1), 0); +} + +TEST(RecyclerTest, recycle_rowsets_delete_remaining_rowsets_by_tablet) { + config::retention_seconds = 0; + auto origin_worker_pool_size = config::instance_recycler_worker_pool_size; + auto origin_per_tablet_batch_size = config::recycle_rowsets_per_tablet_batch_size; + auto origin_delete_batch_size = config::recycle_rowsets_delete_batch_size; + config::instance_recycler_worker_pool_size = 4; + config::recycle_rowsets_per_tablet_batch_size = 10; + config::recycle_rowsets_delete_batch_size = 10; + DORIS_CLOUD_DEFER { + config::instance_recycler_worker_pool_size = origin_worker_pool_size; + config::recycle_rowsets_per_tablet_batch_size = origin_per_tablet_batch_size; + config::recycle_rowsets_delete_batch_size = origin_delete_batch_size; + }; + + auto txn_kv = std::make_shared(); + ASSERT_EQ(txn_kv->init(), 0); + + InstanceInfoPB instance; + instance.set_instance_id(instance_id); + auto obj_info = instance.add_obj_info(); + obj_info->set_id("recycle_rowsets_below_batch_threshold"); + obj_info->set_ak(config::test_s3_ak); + obj_info->set_sk(config::test_s3_sk); + obj_info->set_endpoint(config::test_s3_endpoint); + obj_info->set_region(config::test_s3_region); + obj_info->set_bucket(config::test_s3_bucket); + obj_info->set_prefix("recycle_rowsets_below_batch_threshold"); + + InstanceRecycler recycler(txn_kv, instance, thread_group, + std::make_shared(txn_kv)); + ASSERT_EQ(recycler.init(), 0); + auto accessor = recycler.accessor_map_.begin()->second; + + doris::TabletSchemaCloudPB schema; + schema.set_schema_version(1); + schema.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V1); + + constexpr int64_t index_id = 10001; + constexpr int64_t tablet_id0 = 100030; + constexpr int64_t tablet_id1 = 100031; + for (int64_t tablet_id : {tablet_id0, tablet_id1}) { + for (int i = 0; i < 2; ++i) { + auto rowset = create_rowset("recycle_rowsets_below_batch_threshold", tablet_id, + index_id, 1, schema); + ASSERT_EQ(0, create_recycle_rowset(txn_kv.get(), accessor.get(), rowset, + RecycleRowsetPB::COMPACT, true)); + } + auto prefix_rowset = + create_rowset("recycle_rowsets_below_batch_threshold", tablet_id, index_id, 1, + schema, RowsetStatePB::BEGIN_PARTIAL_UPDATE); + ASSERT_EQ(0, create_recycle_rowset(txn_kv.get(), accessor.get(), prefix_rowset, + RecycleRowsetPB::COMPACT, true)); + } + + ASSERT_EQ(recycler.recycle_rowsets(), 0); + std::unique_ptr list_iter; + ASSERT_EQ(0, accessor->list_directory(tablet_path_prefix(tablet_id0), &list_iter)); + EXPECT_FALSE(list_iter->has_next()); + ASSERT_EQ(0, accessor->list_directory(tablet_path_prefix(tablet_id1), &list_iter)); + EXPECT_FALSE(list_iter->has_next()); + EXPECT_EQ(count_recycle_rowsets(txn_kv.get(), tablet_id0), 0); + EXPECT_EQ(count_recycle_rowsets(txn_kv.get(), tablet_id1), 0); + EXPECT_EQ(count_recycle_rowsets(txn_kv.get()), 0); +} + +TEST(RecyclerTest, recycle_rowsets_delete_full_batches_and_leftover_kvs) { + config::retention_seconds = 0; + auto origin_worker_pool_size = config::instance_recycler_worker_pool_size; + auto origin_per_tablet_batch_size = config::recycle_rowsets_per_tablet_batch_size; + auto origin_delete_batch_size = config::recycle_rowsets_delete_batch_size; + config::instance_recycler_worker_pool_size = 1; + config::recycle_rowsets_per_tablet_batch_size = 3; + config::recycle_rowsets_delete_batch_size = 7; + DORIS_CLOUD_DEFER { + config::instance_recycler_worker_pool_size = origin_worker_pool_size; + config::recycle_rowsets_per_tablet_batch_size = origin_per_tablet_batch_size; + config::recycle_rowsets_delete_batch_size = origin_delete_batch_size; + }; + + auto txn_kv = std::make_shared(); + ASSERT_EQ(txn_kv->init(), 0); + + InstanceInfoPB instance; + instance.set_instance_id(instance_id); + auto obj_info = instance.add_obj_info(); + obj_info->set_id("recycle_rowsets_batched_delete"); + obj_info->set_ak(config::test_s3_ak); + obj_info->set_sk(config::test_s3_sk); + obj_info->set_endpoint(config::test_s3_endpoint); + obj_info->set_region(config::test_s3_region); + obj_info->set_bucket(config::test_s3_bucket); + obj_info->set_prefix("recycle_rowsets_batched_delete"); + + InstanceRecycler recycler(txn_kv, instance, thread_group, + std::make_shared(txn_kv)); + ASSERT_EQ(recycler.init(), 0); + auto accessor = recycler.accessor_map_.begin()->second; + + doris::TabletSchemaCloudPB schema; + schema.set_schema_version(1); + schema.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V1); + + constexpr int64_t index_id = 10001; + for (int64_t tablet_id : {100040, 100041, 100042}) { + for (int i = 0; i < 2; ++i) { + auto rowset = + create_rowset("recycle_rowsets_batched_delete", tablet_id, index_id, 1, schema); + ASSERT_EQ(0, create_recycle_rowset(txn_kv.get(), accessor.get(), rowset, + RecycleRowsetPB::COMPACT, true)); + } + auto empty_rowset = + create_rowset("recycle_rowsets_batched_delete", tablet_id, index_id, 0, schema); + ASSERT_EQ(0, create_recycle_rowset(txn_kv.get(), accessor.get(), empty_rowset, + RecycleRowsetPB::COMPACT, true)); + } + + ASSERT_EQ(recycler.recycle_rowsets(), 0); + std::unique_ptr list_iter; + for (int64_t tablet_id : {100040, 100041, 100042}) { + ASSERT_EQ(0, accessor->list_directory(tablet_path_prefix(tablet_id), &list_iter)); + EXPECT_FALSE(list_iter->has_next()); + } + EXPECT_EQ(count_recycle_rowsets(txn_kv.get()), 0); +} + +TEST(RecyclerTest, recycle_rowsets_delete_prefix_rowset_kvs_without_remaining_rowsets) { + config::retention_seconds = 0; + auto origin_worker_pool_size = config::instance_recycler_worker_pool_size; + auto origin_per_tablet_batch_size = config::recycle_rowsets_per_tablet_batch_size; + auto origin_delete_batch_size = config::recycle_rowsets_delete_batch_size; + config::instance_recycler_worker_pool_size = 1; + config::recycle_rowsets_per_tablet_batch_size = 10; + config::recycle_rowsets_delete_batch_size = 10; + DORIS_CLOUD_DEFER { + config::instance_recycler_worker_pool_size = origin_worker_pool_size; + config::recycle_rowsets_per_tablet_batch_size = origin_per_tablet_batch_size; + config::recycle_rowsets_delete_batch_size = origin_delete_batch_size; + }; + + auto txn_kv = std::make_shared(); + ASSERT_EQ(txn_kv->init(), 0); + + InstanceInfoPB instance; + instance.set_instance_id(instance_id); + auto obj_info = instance.add_obj_info(); + obj_info->set_id("recycle_rowsets_prefix_only"); + obj_info->set_ak(config::test_s3_ak); + obj_info->set_sk(config::test_s3_sk); + obj_info->set_endpoint(config::test_s3_endpoint); + obj_info->set_region(config::test_s3_region); + obj_info->set_bucket(config::test_s3_bucket); + obj_info->set_prefix("recycle_rowsets_prefix_only"); + + InstanceRecycler recycler(txn_kv, instance, thread_group, + std::make_shared(txn_kv)); + ASSERT_EQ(recycler.init(), 0); + auto accessor = recycler.accessor_map_.begin()->second; + + doris::TabletSchemaCloudPB schema; + schema.set_schema_version(1); + schema.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V1); + + constexpr int64_t index_id = 10001; + constexpr int64_t tablet_id = 100050; + for (int i = 0; i < 3; ++i) { + auto rowset = create_rowset("recycle_rowsets_prefix_only", tablet_id, index_id, 1, schema, + RowsetStatePB::BEGIN_PARTIAL_UPDATE); + ASSERT_EQ(0, create_recycle_rowset(txn_kv.get(), accessor.get(), rowset, + RecycleRowsetPB::COMPACT, true)); + } + + ASSERT_EQ(recycler.recycle_rowsets(), 0); + std::unique_ptr list_iter; + ASSERT_EQ(0, accessor->list_directory(tablet_path_prefix(tablet_id), &list_iter)); + EXPECT_FALSE(list_iter->has_next()); + EXPECT_EQ(count_recycle_rowsets(txn_kv.get()), 0); +} + +TEST(RecyclerTest, recycle_rowsets_delete_old_empty_resource_id_kvs_with_normal_rowsets) { + config::retention_seconds = 0; + auto origin_worker_pool_size = config::instance_recycler_worker_pool_size; + auto origin_per_tablet_batch_size = config::recycle_rowsets_per_tablet_batch_size; + auto origin_delete_batch_size = config::recycle_rowsets_delete_batch_size; + config::instance_recycler_worker_pool_size = 4; + config::recycle_rowsets_per_tablet_batch_size = 10; + config::recycle_rowsets_delete_batch_size = 10; + DORIS_CLOUD_DEFER { + config::instance_recycler_worker_pool_size = origin_worker_pool_size; + config::recycle_rowsets_per_tablet_batch_size = origin_per_tablet_batch_size; + config::recycle_rowsets_delete_batch_size = origin_delete_batch_size; + }; + + auto txn_kv = std::make_shared(); + ASSERT_EQ(txn_kv->init(), 0); + + InstanceInfoPB instance; + instance.set_instance_id(instance_id); + auto obj_info = instance.add_obj_info(); + obj_info->set_id("recycle_rowsets_old_empty_resource_id"); + obj_info->set_ak(config::test_s3_ak); + obj_info->set_sk(config::test_s3_sk); + obj_info->set_endpoint(config::test_s3_endpoint); + obj_info->set_region(config::test_s3_region); + obj_info->set_bucket(config::test_s3_bucket); + obj_info->set_prefix("recycle_rowsets_old_empty_resource_id"); + + InstanceRecycler recycler(txn_kv, instance, thread_group, + std::make_shared(txn_kv)); + ASSERT_EQ(recycler.init(), 0); + auto accessor = recycler.accessor_map_.begin()->second; + + doris::TabletSchemaCloudPB schema; + schema.set_schema_version(1); + schema.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V1); + + constexpr int64_t index_id = 10001; + constexpr int64_t tablet_id = 100060; + for (int i = 0; i < 3; ++i) { + auto rowset = create_rowset("recycle_rowsets_old_empty_resource_id", tablet_id, index_id, 1, + schema); + ASSERT_EQ(0, create_recycle_rowset(txn_kv.get(), accessor.get(), rowset, + RecycleRowsetPB::COMPACT, true)); + } + for (int i = 0; i < 2; ++i) { + auto old_rowset = create_rowset("", tablet_id, index_id, 0, schema); + ASSERT_EQ(0, create_recycle_rowset(txn_kv.get(), accessor.get(), old_rowset, + RecycleRowsetPB::UNKNOWN, false)); + } + + ASSERT_EQ(recycler.recycle_rowsets(), 0); + std::unique_ptr list_iter; + ASSERT_EQ(0, accessor->list_directory(tablet_path_prefix(tablet_id), &list_iter)); + EXPECT_FALSE(list_iter->has_next()); + EXPECT_EQ(count_recycle_rowsets(txn_kv.get()), 0); +} + TEST(RecyclerTest, bench_recycle_rowsets) { config::retention_seconds = 0; auto txn_kv = std::make_shared(); @@ -5700,6 +6242,9 @@ TEST(RecyclerTest, delete_rowset_data_packed_file_single_rowset) { EXPECT_EQ(TxnErrorCode::TXN_KEY_NOT_FOUND, txn->get(merged_key, &updated_val)); EXPECT_EQ(1, accessor->exists(packed_file_path)); + for (int i = 0; i < rowset.num_segments(); ++i) { + EXPECT_EQ(0, accessor->exists(segment_path(rowset.tablet_id(), rowset.rowset_id_v2(), i))); + } } TEST(RecyclerTest, delete_rowset_data_packed_file_respects_recycled_tablet) { @@ -5863,6 +6408,7 @@ TEST(RecyclerTest, delete_rowset_data_packed_file_batch_rowsets) { EXPECT_EQ(TxnErrorCode::TXN_KEY_NOT_FOUND, txn->get(merged_key, &updated_val)); EXPECT_EQ(1, accessor->exists(packed_file_path)); + EXPECT_EQ(0, accessor->exists(small_path)); } TEST(RecyclerTest, delete_rowset_data_packed_file_multiple_groups) { @@ -5981,7 +6527,7 @@ TEST(RecyclerTest, delete_rowset_data_packed_file_multiple_groups) { } for (const auto& path : segment_paths) { - EXPECT_EQ(1, accessor->exists(path)); + EXPECT_EQ(0, accessor->exists(path)); } for (const auto& path : index_paths) { EXPECT_EQ(1, accessor->exists(path));