diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp index e2f587e151a8e8..622bc0288fac94 100644 --- a/cloud/src/common/bvars.cpp +++ b/cloud/src/common/bvars.cpp @@ -140,12 +140,16 @@ bvar::Status g_bvar_recycler_task_max_concurrency("recycler_task_max_co mBvarIntAdder g_bvar_recycler_instance_recycle_task_status("recycler_instance_recycle_task_status", { "status"}); // recycler's mbvars // cost time of the last whole recycle process -mBvarStatus g_bvar_recycler_instance_last_round_recycle_duration("recycler_instance_last_round_recycle_duration",{"instance_id"}); -mBvarStatus g_bvar_recycler_instance_next_ts("recycler_instance_next_ts",{"instance_id"}); +mBvarAdder g_bvar_recycler_instance_last_round_recycle_duration( + "recycler_instance_last_round_recycle_duration", {"instance_id"}); +mBvarAdder g_bvar_recycler_instance_next_ts("recycler_instance_next_ts", {"instance_id"}); // start and end timestamps of the recycle process -mBvarStatus g_bvar_recycler_instance_recycle_start_ts("recycler_instance_recycle_start_ts",{"instance_id"}); -mBvarStatus g_bvar_recycler_instance_recycle_end_ts("recycler_instance_recycle_end_ts",{"instance_id"}); -mBvarStatus g_bvar_recycler_instance_recycle_last_success_ts("recycler_instance_recycle_last_success_ts",{"instance_id"}); +mBvarAdder g_bvar_recycler_instance_recycle_start_ts( + "recycler_instance_recycle_start_ts", {"instance_id"}); +mBvarAdder g_bvar_recycler_instance_recycle_end_ts( + "recycler_instance_recycle_end_ts", {"instance_id"}); +mBvarAdder g_bvar_recycler_instance_recycle_last_success_ts( + "recycler_instance_recycle_last_success_ts", {"instance_id"}); // recycler's mbvars // instance_id: unique identifier for the instance @@ -153,20 +157,27 @@ mBvarStatus g_bvar_recycler_instance_recycle_last_success_ts("recycler_ // status: status of the recycle task (submitted, completed, error) mBvarIntAdder g_bvar_recycler_vault_recycle_task_status("recycler_vault_recycle_task_status", {"instance_id", "resource_id", "status"}); // current concurrency of vault delete task -mBvarStatus g_bvar_recycler_instance_last_round_recycled_num("recycler_instance_last_round_recycled_num", {"instance_id", "resource_type"}); -mBvarStatus g_bvar_recycler_instance_last_round_to_recycle_num("recycler_instance_last_round_to_recycle_num", {"instance_id", "resource_type"}); -mBvarStatus g_bvar_recycler_instance_last_round_recycled_bytes("recycler_instance_last_round_recycled_bytes", {"instance_id", "resource_type"}); -mBvarStatus g_bvar_recycler_instance_last_round_to_recycle_bytes("recycler_instance_last_round_to_recycle_bytes", {"instance_id", "resource_type"}); -mBvarStatus g_bvar_recycler_instance_last_round_recycle_elpased_ts("recycler_instance_last_round_recycle_elpased_ts", {"instance_id", "resource_type"}); +mBvarAdder g_bvar_recycler_instance_last_round_recycled_num( + "recycler_instance_last_round_recycled_num", {"instance_id", "resource_type"}); +mBvarAdder g_bvar_recycler_instance_last_round_to_recycle_num( + "recycler_instance_last_round_to_recycle_num", {"instance_id", "resource_type"}); +mBvarAdder g_bvar_recycler_instance_last_round_recycled_bytes( + "recycler_instance_last_round_recycled_bytes", {"instance_id", "resource_type"}); +mBvarAdder g_bvar_recycler_instance_last_round_to_recycle_bytes( + "recycler_instance_last_round_to_recycle_bytes", {"instance_id", "resource_type"}); +mBvarAdder g_bvar_recycler_instance_last_round_recycle_elpased_ts( + "recycler_instance_last_round_recycle_elpased_ts", {"instance_id", "resource_type"}); // total recycled num and bytes of resources since recycler started mBvarInt64Adder g_bvar_recycler_instance_recycle_total_num_since_started("recycler_instance_recycle_total_num_since_started", {"instance_id", "resource_type"}); mBvarInt64Adder g_bvar_recycler_instance_recycle_total_bytes_since_started("recycler_instance_recycle_total_bytes_since_started", {"instance_id", "resource_type"}); mBvarIntAdder g_bvar_recycler_instance_recycle_round("recycler_instance_recycle_round", {"instance_id", "resource_type"}); // represents the ms required per resource to be recycled // value of -1 means no resource recycled -mBvarStatus g_bvar_recycler_instance_recycle_time_per_resource("recycler_instance_recycle_time_per_resource", {"instance_id", "resource_type"}); +mBvarAdder g_bvar_recycler_instance_recycle_time_per_resource( + "recycler_instance_recycle_time_per_resource", {"instance_id", "resource_type"}); // represents the bytes of resources that can be recycled per ms -mBvarStatus g_bvar_recycler_instance_recycle_bytes_per_ms("recycler_instance_recycle_bytes_per_ms", {"instance_id", "resource_type"}); +mBvarAdder g_bvar_recycler_instance_recycle_bytes_per_ms( + "recycler_instance_recycle_bytes_per_ms", {"instance_id", "resource_type"}); BvarStatusWithTag g_bvar_recycler_packed_file_recycled_kv_num("recycler", "packed_file_recycled_kv_num"); BvarStatusWithTag g_bvar_recycler_packed_file_recycled_kv_bytes( diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h index 5497cf6a754b35..34ae9cfa91ba1d 100644 --- a/cloud/src/common/bvars.h +++ b/cloud/src/common/bvars.h @@ -27,11 +27,13 @@ #include #include #include +#include #include #include #include #include +#include #include #include #include @@ -147,6 +149,8 @@ template struct is_valid_bvar_type> : std::true_type {}; template <> struct is_valid_bvar_type : std::true_type {}; +template +struct is_valid_bvar_type> : std::true_type {}; template struct is_bvar_status : std::false_type {}; template @@ -160,7 +164,7 @@ class mBvarWrapper { : counter_(metric_name, std::list(dim_names)) { static_assert(is_valid_bvar_type::value, "BvarType must be one of the supported bvar types (Adder, IntRecorder, " - "LatencyRecorder, Maxer, Status)"); + "LatencyRecorder, Maxer, Status, WindowEx)"); } template @@ -536,6 +540,8 @@ class MBvarLatencyRecorderWithStatus { using mBvarIntAdder = mBvarWrapper>; using mBvarInt64Adder = mBvarWrapper>; using mBvarDoubleAdder = mBvarWrapper>; +template +using mBvarAdder = mBvarWrapper, window_size>>; using mBvarIntRecorder = mBvarWrapper; using mBvarLatencyRecorder = mBvarWrapper; using mBvarIntMaxer = mBvarWrapper>; @@ -647,23 +653,23 @@ extern BvarStatusWithTag g_bvar_recycler_recycle_restore_job_earlest_ts // recycler's mbvars extern bvar::Status g_bvar_recycler_task_max_concurrency; extern mBvarIntAdder g_bvar_recycler_instance_recycle_task_status; -extern mBvarStatus g_bvar_recycler_instance_last_round_recycle_duration; -extern mBvarStatus g_bvar_recycler_instance_next_ts; -extern mBvarStatus g_bvar_recycler_instance_recycle_start_ts; -extern mBvarStatus g_bvar_recycler_instance_recycle_end_ts; -extern mBvarStatus g_bvar_recycler_instance_recycle_last_success_ts; +extern mBvarAdder g_bvar_recycler_instance_last_round_recycle_duration; +extern mBvarAdder g_bvar_recycler_instance_next_ts; +extern mBvarAdder g_bvar_recycler_instance_recycle_start_ts; +extern mBvarAdder g_bvar_recycler_instance_recycle_end_ts; +extern mBvarAdder g_bvar_recycler_instance_recycle_last_success_ts; extern mBvarIntAdder g_bvar_recycler_vault_recycle_task_status; -extern mBvarStatus g_bvar_recycler_instance_last_round_recycled_num; -extern mBvarStatus g_bvar_recycler_instance_last_round_to_recycle_num; -extern mBvarStatus g_bvar_recycler_instance_last_round_recycled_bytes; -extern mBvarStatus g_bvar_recycler_instance_last_round_to_recycle_bytes; -extern mBvarStatus g_bvar_recycler_instance_last_round_recycle_elpased_ts; +extern mBvarAdder g_bvar_recycler_instance_last_round_recycled_num; +extern mBvarAdder g_bvar_recycler_instance_last_round_to_recycle_num; +extern mBvarAdder g_bvar_recycler_instance_last_round_recycled_bytes; +extern mBvarAdder g_bvar_recycler_instance_last_round_to_recycle_bytes; +extern mBvarAdder g_bvar_recycler_instance_last_round_recycle_elpased_ts; extern mBvarInt64Adder g_bvar_recycler_instance_recycle_total_num_since_started; extern mBvarInt64Adder g_bvar_recycler_instance_recycle_total_bytes_since_started; extern mBvarIntAdder g_bvar_recycler_instance_recycle_round; -extern mBvarStatus g_bvar_recycler_instance_recycle_time_per_resource; -extern mBvarStatus g_bvar_recycler_instance_recycle_bytes_per_ms; +extern mBvarAdder g_bvar_recycler_instance_recycle_time_per_resource; +extern mBvarAdder g_bvar_recycler_instance_recycle_bytes_per_ms; extern BvarStatusWithTag g_bvar_recycler_packed_file_recycled_kv_num; extern BvarStatusWithTag g_bvar_recycler_packed_file_recycled_kv_bytes; extern BvarStatusWithTag g_bvar_recycler_packed_file_recycle_cost_ms; diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index 401d938a1ae6d9..1d45410f5e021a 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -749,12 +749,6 @@ auto task_wrapper(Func... funcs) -> std::function { int InstanceRecycler::do_recycle() { TEST_SYNC_POINT("InstanceRecycler.do_recycle"); - tablet_metrics_context_.reset(); - segment_metrics_context_.reset(); - DORIS_CLOUD_DEFER { - tablet_metrics_context_.finish_report(); - segment_metrics_context_.finish_report(); - }; if (instance_info_.status() == InstanceInfoPB::DELETED) { int res = recycle_cluster_snapshots(); if (res != 0) { @@ -2100,6 +2094,7 @@ int collect_prepare_delete_tasks(TxnKv* txn_kv, const std::string& instance_id, int InstanceRecycler::recycle_ref_rowsets(bool* has_unrecycled_rowsets) { const std::string task_name = "recycle_ref_rowsets"; *has_unrecycled_rowsets = false; + RecyclerMetricsContext metrics_context(instance_id_, task_name); std::string data_rowset_ref_count_key_start = versioned::data_rowset_ref_count_key({instance_id_, 0, ""}); @@ -2113,6 +2108,7 @@ int InstanceRecycler::recycle_ref_rowsets(bool* has_unrecycled_rowsets) { DORIS_CLOUD_DEFER { unregister_recycle_task(task_name); + metrics_context.report_elapsed_time(); int64_t cost = duration_cast(steady_clock::now().time_since_epoch()).count() - start_time; LOG_WARNING("recycle ref rowsets finished, cost={}s", cost) @@ -2158,7 +2154,6 @@ int InstanceRecycler::recycle_ref_rowsets(bool* has_unrecycled_rowsets) { break; } - RecyclerMetricsContext metrics_context(instance_id_, task_name); if (recycle_versioned_tablet(tablet_id, metrics_context) != 0) { LOG_WARNING("failed to recycle tablet") .tag("instance_id", instance_id_) @@ -2221,9 +2216,9 @@ int InstanceRecycler::recycle_indexes() { DORIS_CLOUD_DEFER { unregister_recycle_task(task_name); + metrics_context.report_elapsed_time(); int64_t cost = duration_cast(steady_clock::now().time_since_epoch()).count() - start_time; - metrics_context.finish_report(); LOG_WARNING("recycle indexes finished, cost={}s", cost) .tag("instance_id", instance_id_) .tag("num_scanned", num_scanned) @@ -2320,8 +2315,8 @@ int InstanceRecycler::recycle_indexes() { } } - metrics_context.total_recycled_num = ++num_recycled; - metrics_context.report(); + ++num_recycled; + metrics_context.total_recycled_num++; check_recycle_task(instance_id_, task_name, num_scanned, num_recycled, start_time); index_keys.push_back(k); return 0; @@ -2454,9 +2449,9 @@ int InstanceRecycler::recycle_partitions() { DORIS_CLOUD_DEFER { unregister_recycle_task(task_name); + metrics_context.report_elapsed_time(); int64_t cost = duration_cast(steady_clock::now().time_since_epoch()).count() - start_time; - metrics_context.finish_report(); LOG_WARNING("recycle partitions finished, cost={}s", cost) .tag("instance_id", instance_id_) .tag("num_scanned", num_scanned) @@ -2564,14 +2559,13 @@ int InstanceRecycler::recycle_partitions() { if (ret == 0) { ++num_recycled; + metrics_context.total_recycled_num++; check_recycle_task(instance_id_, task_name, num_scanned, num_recycled, start_time); partition_keys.push_back(k); if (part_pb.db_id() > 0) { partition_version_keys.push_back(partition_version_key( {instance_id_, part_pb.db_id(), part_pb.table_id(), partition_id})); } - metrics_context.total_recycled_num = num_recycled; - metrics_context.report(); } return ret; }; @@ -2624,8 +2618,8 @@ int InstanceRecycler::recycle_versions() { auto start_time = steady_clock::now(); DORIS_CLOUD_DEFER { + metrics_context.report_elapsed_time(); auto cost = duration(steady_clock::now() - start_time).count(); - metrics_context.finish_report(); LOG_WARNING("recycle table and partition versions finished, cost={}s", cost) .tag("instance_id", instance_id_) .tag("num_scanned", num_scanned) @@ -2695,8 +2689,8 @@ int InstanceRecycler::recycle_versions() { if (err != TxnErrorCode::TXN_OK) { return -1; } - metrics_context.total_recycled_num = ++num_recycled; - metrics_context.report(); + ++num_recycled; + metrics_context.total_recycled_num++; is_recycled = true; return 0; }; @@ -2719,8 +2713,8 @@ int InstanceRecycler::recycle_orphan_partitions() { auto start_time = steady_clock::now(); DORIS_CLOUD_DEFER { + metrics_context.report_elapsed_time(); auto cost = duration(steady_clock::now() - start_time).count(); - metrics_context.finish_report(); LOG_WARNING("recycle orphan table and partition versions finished, cost={}s", cost) .tag("instance_id", instance_id_) .tag("num_scanned", num_scanned) @@ -2806,8 +2800,8 @@ int InstanceRecycler::recycle_orphan_partitions() { if (err != TxnErrorCode::TXN_OK) { return -1; } - metrics_context.total_recycled_num = ++num_recycled; - metrics_context.report(); + ++num_recycled; + metrics_context.total_recycled_num++; return 0; }; @@ -4122,9 +4116,6 @@ int InstanceRecycler::scan_tablets_and_statistics(int64_t table_id, int64_t inde return 0; }; int ret = scan_and_recycle(tablet_key_begin, tablet_key_end, std::move(scan_and_statistics)); - metrics_context.report(true); - tablet_metrics_context_.report(true); - segment_metrics_context_.report(true); return ret; } @@ -4458,10 +4449,6 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id, RecyclerMetricsContext& recycle_rowsets_data_size + recycle_rowsets_index_size; metrics_context.total_recycled_data_size += recycle_rowsets_data_size + recycle_rowsets_index_size; - tablet_metrics_context_.report(); - segment_metrics_context_.report(); - metrics_context.report(); - txn.reset(); if (txn_kv_->create_txn(&txn) != TxnErrorCode::TXN_OK) { LOG_WARNING("failed to recycle tablet ") @@ -4775,10 +4762,6 @@ int InstanceRecycler::recycle_versioned_tablet(int64_t tablet_id, recycle_rowsets_data_size + recycle_rowsets_index_size; metrics_context.total_recycled_data_size += recycle_rowsets_data_size + recycle_rowsets_index_size; - tablet_metrics_context_.report(); - segment_metrics_context_.report(); - metrics_context.report(); - txn.reset(); if (txn_kv_->create_txn(&txn) != TxnErrorCode::TXN_OK) { LOG_WARNING("failed to recycle tablet ") @@ -4876,9 +4859,9 @@ int InstanceRecycler::recycle_rowsets() { DORIS_CLOUD_DEFER { unregister_recycle_task(task_name); + metrics_context.report_elapsed_time(); int64_t cost = duration_cast(steady_clock::now().time_since_epoch()).count() - start_time; - metrics_context.finish_report(); LOG_WARNING("recycle rowsets finished, cost={}s", cost) .tag("instance_id", instance_id_) .tag("num_scanned", num_scanned) @@ -5175,10 +5158,6 @@ int InstanceRecycler::recycle_rowsets() { } } - // Report final metrics after all concurrent tasks completed - segment_metrics_context_.report(); - metrics_context.report(); - return ret; } @@ -5205,9 +5184,9 @@ int InstanceRecycler::recycle_restore_jobs() { DORIS_CLOUD_DEFER { unregister_recycle_task(task_name); + metrics_context.report_elapsed_time(); int64_t cost = duration_cast(steady_clock::now().time_since_epoch()).count() - start_time; - metrics_context.finish_report(); LOG_INFO("recycle restore jobs finished, cost={}s", cost) .tag("instance_id", instance_id_) @@ -5324,8 +5303,8 @@ int InstanceRecycler::recycle_restore_jobs() { return -1; } - metrics_context.total_recycled_num = ++num_recycled; - metrics_context.report(); + ++num_recycled; + metrics_context.total_recycled_num++; check_recycle_task(instance_id_, task_name, num_scanned, num_recycled, start_time); restore_job_keys.push_back(k); @@ -5398,9 +5377,9 @@ int InstanceRecycler::recycle_versioned_rowsets() { DORIS_CLOUD_DEFER { unregister_recycle_task(task_name); + metrics_context.report_elapsed_time(); int64_t cost = duration_cast(steady_clock::now().time_since_epoch()).count() - start_time; - metrics_context.finish_report(); LOG_WARNING("recycle rowsets finished, cost={}s", cost) .tag("instance_id", instance_id_) .tag("num_scanned", num_scanned) @@ -5588,10 +5567,6 @@ int InstanceRecycler::recycle_versioned_rowsets() { } } - // Report final metrics after all concurrent tasks completed - segment_metrics_context_.report(); - metrics_context.report(); - return ret; } @@ -5748,9 +5723,9 @@ int InstanceRecycler::recycle_tmp_rowsets() { DORIS_CLOUD_DEFER { unregister_recycle_task(task_name); + metrics_context.report_elapsed_time(); int64_t cost = duration_cast(steady_clock::now().time_since_epoch()).count() - start_time; - metrics_context.finish_report(); LOG_WARNING("recycle tmp rowsets finished, cost={}s", cost) .tag("instance_id", instance_id_) .tag("num_scanned", num_scanned) @@ -5952,10 +5927,6 @@ int InstanceRecycler::recycle_tmp_rowsets() { worker_pool->stop(); - // Report final metrics after all concurrent tasks completed - segment_metrics_context_.report(); - metrics_context.report(); - return ret; } @@ -6040,9 +6011,9 @@ int InstanceRecycler::abort_timeout_txn() { DORIS_CLOUD_DEFER { unregister_recycle_task(task_name); + metrics_context.report_elapsed_time(); int64_t cost = duration_cast(steady_clock::now().time_since_epoch()).count() - start_time; - metrics_context.finish_report(); LOG_WARNING("end to abort timeout txn, cost={}s", cost) .tag("instance_id", instance_id_) .tag("num_scanned", num_scanned) @@ -6150,8 +6121,8 @@ int InstanceRecycler::abort_timeout_txn() { .tag("txn_id", txn_id); return -1; } - metrics_context.total_recycled_num = ++num_abort; - metrics_context.report(); + ++num_abort; + metrics_context.total_recycled_num++; } return 0; @@ -6170,6 +6141,7 @@ int InstanceRecycler::recycle_expired_txn_label() { int64_t num_scanned = 0; int64_t num_expired = 0; std::atomic_long num_recycled = 0; + int64_t num_recycled_reported = 0; RecyclerMetricsContext metrics_context(instance_id_, task_name); int ret = 0; @@ -6187,9 +6159,9 @@ int InstanceRecycler::recycle_expired_txn_label() { register_recycle_task(task_name, start_time); DORIS_CLOUD_DEFER { unregister_recycle_task(task_name); + metrics_context.report_elapsed_time(); int64_t cost = duration_cast(steady_clock::now().time_since_epoch()).count() - start_time; - metrics_context.finish_report(); LOG_WARNING("end to recycle expired txn, cost={}s", cost) .tag("instance_id", instance_id_) .tag("num_scanned", num_scanned) @@ -6372,9 +6344,10 @@ int InstanceRecycler::recycle_expired_txn_label() { ret = finished ? ret : -1; - // Update metrics after all concurrent tasks completed - metrics_context.total_recycled_num = num_recycled.load(); - metrics_context.report(); + auto current_num_recycled = num_recycled.load(); + DCHECK_GE(current_num_recycled, num_recycled_reported); + metrics_context.total_recycled_num += current_num_recycled - num_recycled_reported; + num_recycled_reported = current_num_recycled; TEST_SYNC_POINT_CALLBACK("InstanceRecycler::recycle_expired_txn_label.failure", &ret); @@ -6526,9 +6499,9 @@ int InstanceRecycler::recycle_copy_jobs() { DORIS_CLOUD_DEFER { unregister_recycle_task(task_name); + metrics_context.report_elapsed_time(); int64_t cost = duration_cast(steady_clock::now().time_since_epoch()).count() - start_time; - metrics_context.finish_report(); LOG_WARNING("recycle copy jobs finished, cost={}s", cost) .tag("instance_id", instance_id_) .tag("num_scanned", num_scanned) @@ -6652,8 +6625,8 @@ int InstanceRecycler::recycle_copy_jobs() { return -1; } - metrics_context.total_recycled_num = ++num_recycled; - metrics_context.report(); + ++num_recycled; + metrics_context.total_recycled_num++; check_recycle_task(instance_id_, task_name, num_scanned, num_recycled, start_time); return 0; }; @@ -6774,9 +6747,9 @@ int InstanceRecycler::recycle_stage() { DORIS_CLOUD_DEFER { unregister_recycle_task(task_name); + metrics_context.report_elapsed_time(); int64_t cost = duration_cast(steady_clock::now().time_since_epoch()).count() - start_time; - metrics_context.finish_report(); LOG_WARNING("recycle stage, cost={}s", cost) .tag("instance_id", instance_id_) .tag("num_scanned", num_scanned) @@ -6845,8 +6818,8 @@ int InstanceRecycler::recycle_stage() { << ", ret=" << ret; return -1; } - metrics_context.total_recycled_num = ++num_recycled; - metrics_context.report(); + ++num_recycled; + metrics_context.total_recycled_num++; check_recycle_task(instance_id_, "recycle_stage", num_scanned, num_recycled, start_time); stage_keys.push_back(k); return 0; @@ -6877,9 +6850,9 @@ int InstanceRecycler::recycle_expired_stage_objects() { RecyclerMetricsContext metrics_context(instance_id_, "recycle_expired_stage_objects"); DORIS_CLOUD_DEFER { + metrics_context.report_elapsed_time(); int64_t cost = duration_cast(steady_clock::now().time_since_epoch()).count() - start_time; - metrics_context.finish_report(); LOG_WARNING("recycle expired stage objects, cost={}s", cost) .tag("instance_id", instance_id_); }; @@ -6946,7 +6919,6 @@ int InstanceRecycler::recycle_expired_stage_objects() { continue; } metrics_context.total_recycled_num++; - metrics_context.report(); } return ret; } @@ -7037,11 +7009,7 @@ int InstanceRecycler::scan_and_statistics_indexes() { return 0; }; - int ret = scan_and_recycle(index_key0, index_key1, std::move(handle_index_kv)); - metrics_context.report(true); - segment_metrics_context_.report(true); - tablet_metrics_context_.report(true); - return ret; + return scan_and_recycle(index_key0, index_key1, std::move(handle_index_kv)); } // Scan and statistics partitions that need to be recycled @@ -7104,11 +7072,7 @@ int InstanceRecycler::scan_and_statistics_partitions() { return ret; }; - int ret = scan_and_recycle(part_key0, part_key1, std::move(handle_partition_kv)); - metrics_context.report(true); - segment_metrics_context_.report(true); - tablet_metrics_context_.report(true); - return ret; + return scan_and_recycle(part_key0, part_key1, std::move(handle_partition_kv)); } // Scan and statistics rowsets that need to be recycled @@ -7148,7 +7112,8 @@ int InstanceRecycler::scan_and_statistics_rowsets() { return 0; } - if(!rowset_meta->has_is_recycled() || !rowset_meta->is_recycled()) { + if (config::enable_mark_delete_rowset_before_recycle && + (!rowset_meta->has_is_recycled() || !rowset_meta->is_recycled())) { return 0; } @@ -7163,10 +7128,7 @@ int InstanceRecycler::scan_and_statistics_rowsets() { segment_metrics_context_.total_need_recycle_data_size += rowset_meta->total_disk_size(); return 0; }; - int ret = scan_and_recycle(recyc_rs_key0, recyc_rs_key1, std::move(handle_rowset_kv)); - metrics_context.report(true); - segment_metrics_context_.report(true); - return ret; + return scan_and_recycle(recyc_rs_key0, recyc_rs_key1, std::move(handle_rowset_kv)); } // Scan and statistics tmp_rowsets that need to be recycled @@ -7195,7 +7157,8 @@ int InstanceRecycler::scan_and_statistics_tmp_rowsets() { DCHECK_GT(rowset.txn_id(), 0) << "txn_id=" << rowset.txn_id() << " rowset=" << rowset.ShortDebugString(); - if(!rowset.has_is_recycled() || !rowset.is_recycled()) { + if (config::enable_mark_delete_rowset_before_recycle && + (!rowset.has_is_recycled() || !rowset.is_recycled())) { return 0; } @@ -7203,6 +7166,7 @@ int InstanceRecycler::scan_and_statistics_tmp_rowsets() { if (rowset.num_segments() > 0) [[unlikely]] { // impossible return 0; } + metrics_context.total_need_recycle_num++; return 0; } @@ -7212,10 +7176,7 @@ int InstanceRecycler::scan_and_statistics_tmp_rowsets() { segment_metrics_context_.total_need_recycle_num += rowset.num_segments(); return 0; }; - int ret = scan_and_recycle(tmp_rs_key0, tmp_rs_key1, std::move(handle_tmp_rowsets_kv)); - metrics_context.report(true); - segment_metrics_context_.report(true); - return ret; + return scan_and_recycle(tmp_rs_key0, tmp_rs_key1, std::move(handle_tmp_rowsets_kv)); } // Scan and statistics abort_timeout_txn that need to be recycled @@ -7272,9 +7233,8 @@ int InstanceRecycler::scan_and_statistics_abort_timeout_txn() { return 0; }; - int ret = scan_and_recycle(begin_txn_running_key, end_txn_running_key, std::move(handle_abort_timeout_txn_kv)); - metrics_context.report(true); - return ret; + return scan_and_recycle(begin_txn_running_key, end_txn_running_key, + std::move(handle_abort_timeout_txn_kv)); } // Scan and statistics expired_txn_label that need to be recycled @@ -7306,9 +7266,8 @@ int InstanceRecycler::scan_and_statistics_expired_txn_label() { return 0; }; - int ret = scan_and_recycle(begin_recycle_txn_key, end_recycle_txn_key, std::move(handle_expired_txn_label_kv)); - metrics_context.report(true); - return ret; + return scan_and_recycle(begin_recycle_txn_key, end_recycle_txn_key, + std::move(handle_expired_txn_label_kv)); } // Scan and statistics copy_jobs that need to be recycled @@ -7358,9 +7317,7 @@ int InstanceRecycler::scan_and_statistics_copy_jobs() { return 0; }; - int ret = scan_and_recycle(key0, key1, std::move(scan_and_statistics)); - metrics_context.report(true); - return ret; + return scan_and_recycle(key0, key1, std::move(scan_and_statistics)); } // Scan and statistics stage that need to be recycled @@ -7416,9 +7373,7 @@ int InstanceRecycler::scan_and_statistics_stage() { return 0; }; - int ret = scan_and_recycle(key0, key1, std::move(scan_and_statistics)); - metrics_context.report(true); - return ret; + return scan_and_recycle(key0, key1, std::move(scan_and_statistics)); } // Scan and statistics expired_stage_objects that need to be recycled @@ -7457,7 +7412,6 @@ int InstanceRecycler::scan_and_statistics_expired_stage_objects() { }; scan_and_statistics(); - metrics_context.report(true); return 0; } @@ -7506,9 +7460,7 @@ int InstanceRecycler::scan_and_statistics_versions() { return 0; }; - int ret = scan_and_recycle(version_key_begin, version_key_end, std::move(scan_and_statistics)); - metrics_context.report(true); - return ret; + return scan_and_recycle(version_key_begin, version_key_end, std::move(scan_and_statistics)); } // Scan and statistics restore jobs that need to be recycled @@ -7543,9 +7495,7 @@ int InstanceRecycler::scan_and_statistics_restore_jobs() { return 0; }; - int ret = scan_and_recycle(restore_job_key0, restore_job_key1, std::move(scan_and_statistics)); - metrics_context.report(true); - return ret; + return scan_and_recycle(restore_job_key0, restore_job_key1, std::move(scan_and_statistics)); } void InstanceRecycler::scan_and_statistics_operation_logs() { @@ -7584,8 +7534,6 @@ void InstanceRecycler::scan_and_statistics_operation_logs() { metrics_context.total_need_recycle_data_size += operation_log.ByteSizeLong(); } } - - metrics_context.report(true); } int InstanceRecycler::classify_rowset_task_by_ref_count( diff --git a/cloud/src/recycler/recycler.h b/cloud/src/recycler/recycler.h index 84062f4103986a..775ab6e3df4759 100644 --- a/cloud/src/recycler/recycler.h +++ b/cloud/src/recycler/recycler.h @@ -146,92 +146,146 @@ struct RowsetDeleteTask { class RecyclerMetricsContext { public: + enum class MetricType { + NEED_RECYCLE_BYTES, + NEED_RECYCLE_NUM, + RECYCLED_BYTES, + RECYCLED_NUM, + }; + + class MetricValue { + public: + MetricValue(RecyclerMetricsContext* context, MetricType type) + : context_(context), type_(type) {} + + MetricValue& operator+=(uint64_t delta) { + context_->put(type_, delta); + return *this; + } + + MetricValue& operator++() { + *this += 1; + return *this; + } + + uint64_t operator++(int) { + context_->put(type_, 1); + return 0; + } + + private: + RecyclerMetricsContext* context_; + MetricType type_; + }; + + class RecycledNumMetricValue { + public: + explicit RecycledNumMetricValue(RecyclerMetricsContext* context) : context_(context) {} + + RecycledNumMetricValue& operator+=(uint64_t delta) { + value_.fetch_add(delta); + context_->put(MetricType::RECYCLED_NUM, delta); + return *this; + } + + RecycledNumMetricValue& operator++() { + *this += 1; + return *this; + } + + uint64_t operator++(int) { + auto old_value = value_.fetch_add(1); + context_->put(MetricType::RECYCLED_NUM, 1); + return old_value; + } + + uint64_t load() const { return value_.load(); } + + private: + RecyclerMetricsContext* context_; + std::atomic_ullong value_ = 0; + }; + + class RecycledBytesMetricValue { + public: + explicit RecycledBytesMetricValue(RecyclerMetricsContext* context) : context_(context) {} + + RecycledBytesMetricValue& operator+=(uint64_t delta) { + value_.fetch_add(delta); + context_->put(MetricType::RECYCLED_BYTES, delta); + return *this; + } + + uint64_t load() const { return value_.load(); } + + private: + RecyclerMetricsContext* context_; + std::atomic_ullong value_ = 0; + }; + RecyclerMetricsContext() = default; RecyclerMetricsContext(std::string instance_id, std::string operation_type) - : operation_type(std::move(operation_type)), instance_id(std::move(instance_id)) { - start(); - } + : operation_type(std::move(operation_type)), + instance_id(std::move(instance_id)), + start_time_(std::chrono::steady_clock::now()) {} ~RecyclerMetricsContext() = default; - std::atomic_ullong total_need_recycle_data_size = 0; - std::atomic_ullong total_need_recycle_num = 0; + MetricValue total_need_recycle_data_size {this, MetricType::NEED_RECYCLE_BYTES}; + MetricValue total_need_recycle_num {this, MetricType::NEED_RECYCLE_NUM}; - std::atomic_ullong total_recycled_data_size = 0; - std::atomic_ullong total_recycled_num = 0; + RecycledBytesMetricValue total_recycled_data_size {this}; + RecycledNumMetricValue total_recycled_num {this}; std::string operation_type; std::string instance_id; - double start_time = 0; - - void start() { - start_time = duration_cast( - std::chrono::system_clock::now().time_since_epoch()) - .count(); - } - - double duration() const { - return duration_cast( - std::chrono::system_clock::now().time_since_epoch()) - .count() - - start_time; - } - - void reset() { - total_need_recycle_data_size = 0; - total_need_recycle_num = 0; - total_recycled_data_size = 0; - total_recycled_num = 0; - start_time = duration_cast( - std::chrono::system_clock::now().time_since_epoch()) - .count(); - } - - void finish_report() { - if (!operation_type.empty()) { - double cost = duration(); - g_bvar_recycler_instance_last_round_recycle_elpased_ts.put( - {instance_id, operation_type}, cost); - g_bvar_recycler_instance_recycle_round.put({instance_id, operation_type}, 1); - g_bvar_recycler_instance_recycle_total_bytes_since_started.put( - {instance_id, operation_type}, total_recycled_data_size.load()); - g_bvar_recycler_instance_recycle_total_num_since_started.put( - {instance_id, operation_type}, total_recycled_num.load()); - LOG(INFO) << "recycle instance: " << instance_id - << ", operation type: " << operation_type << ", cost: " << cost - << " ms, total recycled num: " << total_recycled_num.load() - << ", total recycled data size: " << total_recycled_data_size.load() - << " bytes"; - if (cost != 0) { - if (total_recycled_num.load() != 0) { - g_bvar_recycler_instance_recycle_time_per_resource.put( - {instance_id, operation_type}, cost / total_recycled_num.load()); - } - g_bvar_recycler_instance_recycle_bytes_per_ms.put( - {instance_id, operation_type}, total_recycled_data_size.load() / cost); + void report_elapsed_time() { + if (operation_type.empty()) { + return; + } + const auto cost = duration_cast( + std::chrono::steady_clock::now() - start_time_) + .count(); + g_bvar_recycler_instance_last_round_recycle_elpased_ts.put({instance_id, operation_type}, + cost); + const auto recycled_num = total_recycled_num.load(); + const auto recycled_data_size = total_recycled_data_size.load(); + if (cost != 0) { + if (recycled_num != 0) { + g_bvar_recycler_instance_recycle_time_per_resource.put( + {instance_id, operation_type}, static_cast(cost) / recycled_num); } + g_bvar_recycler_instance_recycle_bytes_per_ms.put( + {instance_id, operation_type}, static_cast(recycled_data_size) / cost); } } - // `is_begin` is used to initialize total num of items need to be recycled - void report(bool is_begin = false) { - if (!operation_type.empty()) { - // is init - if (is_begin) { - auto value = total_need_recycle_num.load(); - - g_bvar_recycler_instance_last_round_to_recycle_bytes.put( - {instance_id, operation_type}, total_need_recycle_data_size.load()); - g_bvar_recycler_instance_last_round_to_recycle_num.put( - {instance_id, operation_type}, value); - } else { - g_bvar_recycler_instance_last_round_recycled_bytes.put( - {instance_id, operation_type}, total_recycled_data_size.load()); - g_bvar_recycler_instance_last_round_recycled_num.put({instance_id, operation_type}, - total_recycled_num.load()); - } +private: + std::chrono::steady_clock::time_point start_time_; + + void put(MetricType type, uint64_t value) { + if (operation_type.empty() || value == 0) { + return; + } + switch (type) { + case MetricType::NEED_RECYCLE_BYTES: + g_bvar_recycler_instance_last_round_to_recycle_bytes.put({instance_id, operation_type}, + value); + break; + case MetricType::NEED_RECYCLE_NUM: + g_bvar_recycler_instance_last_round_to_recycle_num.put({instance_id, operation_type}, + value); + break; + case MetricType::RECYCLED_BYTES: + g_bvar_recycler_instance_last_round_recycled_bytes.put({instance_id, operation_type}, + value); + break; + case MetricType::RECYCLED_NUM: + g_bvar_recycler_instance_last_round_recycled_num.put({instance_id, operation_type}, + value); + break; } } }; diff --git a/cloud/src/recycler/recycler_operation_log.cpp b/cloud/src/recycler/recycler_operation_log.cpp index f6743a5dabbc83..ca09285c3bb4d1 100644 --- a/cloud/src/recycler/recycler_operation_log.cpp +++ b/cloud/src/recycler/recycler_operation_log.cpp @@ -731,7 +731,7 @@ int InstanceRecycler::recycle_operation_logs() { size_t recycled_operation_log_data_size = 0; DORIS_CLOUD_DEFER { - metrics_context.finish_report(); + metrics_context.report_elapsed_time(); report_oplog_recycle_stats(instance_id_, oplog_stats); int64_t cost = stop_watch.elapsed_us() / 1000'000; @@ -797,7 +797,6 @@ int InstanceRecycler::recycle_operation_logs() { operation_log_data_size += value_size; max_operation_log_data_size = std::max(max_operation_log_data_size, value_size); oplog_stats.total_num.fetch_add(1, std::memory_order_relaxed); - metrics_context.report(); return 0; }; diff --git a/cloud/src/recycler/s3_accessor.cpp b/cloud/src/recycler/s3_accessor.cpp index cc4384b75a504e..559fb058f5f64e 100644 --- a/cloud/src/recycler/s3_accessor.cpp +++ b/cloud/src/recycler/s3_accessor.cpp @@ -65,6 +65,12 @@ bvar::LatencyRecorder s3_get_latency("s3_get"); bvar::LatencyRecorder s3_put_latency("s3_put"); bvar::LatencyRecorder s3_delete_object_latency("s3_delete_object"); bvar::LatencyRecorder s3_delete_objects_latency("s3_delete_objects"); +bvar::Adder s3_delete_objects_success_object_count; +bvar::Adder s3_delete_objects_failed_object_count; +bvar::Window> s3_delete_objects_success_object_count_window( + "s3_delete_objects_success_object_count_1m", &s3_delete_objects_success_object_count, 10); +bvar::Window> s3_delete_objects_failed_object_count_window( + "s3_delete_objects_failed_object_count_1m", &s3_delete_objects_failed_object_count, 10); bvar::LatencyRecorder s3_head_latency("s3_head"); bvar::LatencyRecorder s3_multi_part_upload_latency("s3_multi_part_upload"); bvar::LatencyRecorder s3_list_latency("s3_list"); diff --git a/cloud/src/recycler/s3_accessor.h b/cloud/src/recycler/s3_accessor.h index 28d12c7f9808db..b154cd79e48005 100644 --- a/cloud/src/recycler/s3_accessor.h +++ b/cloud/src/recycler/s3_accessor.h @@ -19,6 +19,7 @@ #include #include +#include #include #include @@ -46,6 +47,10 @@ extern bvar::LatencyRecorder s3_get_latency; extern bvar::LatencyRecorder s3_put_latency; extern bvar::LatencyRecorder s3_delete_object_latency; extern bvar::LatencyRecorder s3_delete_objects_latency; +extern bvar::Adder s3_delete_objects_success_object_count; +extern bvar::Adder s3_delete_objects_failed_object_count; +extern bvar::Window> s3_delete_objects_success_object_count_window; +extern bvar::Window> s3_delete_objects_failed_object_count_window; extern bvar::LatencyRecorder s3_head_latency; extern bvar::LatencyRecorder s3_multi_part_upload_latency; extern bvar::LatencyRecorder s3_list_latency; diff --git a/cloud/src/recycler/s3_obj_client.cpp b/cloud/src/recycler/s3_obj_client.cpp index c45a6aae168c0c..787890738d31ff 100644 --- a/cloud/src/recycler/s3_obj_client.cpp +++ b/cloud/src/recycler/s3_obj_client.cpp @@ -269,6 +269,7 @@ ObjectStorageResponse S3ObjClient::delete_objects(const std::string& bucket, } Aws::S3::Model::Delete del; + auto object_count = objects.size(); del.WithObjects(std::move(objects)).SetQuiet(true); delete_request.SetDelete(std::move(del)); auto delete_outcome = s3_put_rate_limit([&]() { @@ -276,6 +277,7 @@ ObjectStorageResponse S3ObjClient::delete_objects(const std::string& bucket, return s3_client_->DeleteObjects(delete_request); }); if (!delete_outcome.IsSuccess()) { + s3_bvar::s3_delete_objects_failed_object_count << object_count; LOG_WARNING("failed to delete objects") .tag("endpoint", endpoint_) .tag("bucket", bucket) @@ -287,6 +289,10 @@ ObjectStorageResponse S3ObjClient::delete_objects(const std::string& bucket, return -1; } + const auto failed_object_count = delete_outcome.GetResult().GetErrors().size(); + s3_bvar::s3_delete_objects_failed_object_count << failed_object_count; + s3_bvar::s3_delete_objects_success_object_count << object_count - failed_object_count; + return 0; };