From 4910abb015e362f39cc5a8cf5431e705cd8a645f Mon Sep 17 00:00:00 2001 From: yangtao555 Date: Fri, 15 May 2026 16:53:34 +0800 Subject: [PATCH 1/2] [refactor](profile) Decouple profile node reports from display tree --- .../pipeline/pipeline_fragment_context.cpp | 29 ++-- .../exec/pipeline/pipeline_fragment_context.h | 3 +- be/src/runtime/query_context.cpp | 34 ++--- be/src/runtime/query_context.h | 13 +- .../runtime/runtime_query_statistics_mgr.cpp | 77 +++++------ be/src/runtime/runtime_query_statistics_mgr.h | 12 +- .../common/profile/ExecutionProfile.java | 47 ++++--- .../org/apache/doris/qe/QeProcessorImpl.java | 10 +- .../common/profile/ExecutionProfileTest.java | 129 ++++++++++++++++++ gensrc/thrift/FrontendService.thrift | 13 ++ 10 files changed, 263 insertions(+), 104 deletions(-) create mode 100644 fe/fe-core/src/test/java/org/apache/doris/common/profile/ExecutionProfileTest.java diff --git a/be/src/exec/pipeline/pipeline_fragment_context.cpp b/be/src/exec/pipeline/pipeline_fragment_context.cpp index 8570c771232dab..d44c7ff68b242c 100644 --- a/be/src/exec/pipeline/pipeline_fragment_context.cpp +++ b/be/src/exec/pipeline/pipeline_fragment_context.cpp @@ -2379,9 +2379,8 @@ std::string PipelineFragmentContext::debug_string() { return fmt::to_string(debug_string_buffer); } -std::vector> -PipelineFragmentContext::collect_realtime_profile() const { - std::vector> res; +std::vector PipelineFragmentContext::collect_realtime_profile() const { + std::vector res; // we do not have mutex to protect pipeline_id_to_profile // so we need to make sure this funciton is invoked after fragment context @@ -2394,16 +2393,26 @@ PipelineFragmentContext::collect_realtime_profile() const { return res; } - // Make sure first profile is fragment level profile - auto fragment_profile = std::make_shared(); - _fragment_level_profile->to_thrift(fragment_profile.get(), _runtime_state->profile_level()); - res.push_back(fragment_profile); + TProfileNodeReport fragment_report; + TRuntimeProfileTree fragment_profile; + _fragment_level_profile->to_thrift(&fragment_profile, _runtime_state->profile_level()); + fragment_report.__isset.profile = true; + fragment_report.profile = std::move(fragment_profile); + fragment_report.__set_profile_node_type(TProfileNodeType::FRAGMENT_LEVEL); + res.push_back(std::move(fragment_report)); // pipeline_id_to_profile is initialized in prepare stage + int32_t pipeline_id = 0; for (auto pipeline_profile : _runtime_state->pipeline_id_to_profile()) { - auto profile_ptr = std::make_shared(); - pipeline_profile->to_thrift(profile_ptr.get(), _runtime_state->profile_level()); - res.push_back(profile_ptr); + TProfileNodeReport pipeline_report; + TRuntimeProfileTree pipeline_profile_tree; + pipeline_profile->to_thrift(&pipeline_profile_tree, _runtime_state->profile_level()); + pipeline_report.__isset.profile = true; + pipeline_report.profile = std::move(pipeline_profile_tree); + pipeline_report.__set_profile_node_type(TProfileNodeType::PIPELINE_LEVEL); + pipeline_report.__set_pipeline_id(pipeline_id); + res.push_back(std::move(pipeline_report)); + pipeline_id++; } return res; diff --git a/be/src/exec/pipeline/pipeline_fragment_context.h b/be/src/exec/pipeline/pipeline_fragment_context.h index c220ea386f68db..fd0d6f18e6e46d 100644 --- a/be/src/exec/pipeline/pipeline_fragment_context.h +++ b/be/src/exec/pipeline/pipeline_fragment_context.h @@ -18,6 +18,7 @@ #pragma once #include +#include #include #include @@ -61,7 +62,7 @@ class PipelineFragmentContext : public TaskExecutionContext { void print_profile(const std::string& extra_info); - std::vector> collect_realtime_profile() const; + std::vector collect_realtime_profile() const; std::shared_ptr collect_realtime_load_channel_profile() const; bool is_timeout(timespec now) const; diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index 4e1fa2ab9ecab8..7b6582809ca9cd 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -402,10 +402,10 @@ Status QueryContext::set_workload_group(WorkloadGroupPtr& wg) { return Status::OK(); } -void QueryContext::add_fragment_profile( - int fragment_id, const std::vector>& pipeline_profiles, - std::shared_ptr load_channel_profile) { - if (pipeline_profiles.empty()) { +void QueryContext::add_fragment_profile(int fragment_id, + std::vector profile_node_reports, + std::shared_ptr load_channel_profile) { + if (profile_node_reports.empty()) { std::string msg = fmt::format("Add pipeline profile failed, query {}, fragment {}", print_id(this->_query_id), fragment_id); LOG_ERROR(msg); @@ -414,18 +414,19 @@ void QueryContext::add_fragment_profile( } #ifndef NDEBUG - for (const auto& p : pipeline_profiles) { - DCHECK(p != nullptr) << fmt::format("Add pipeline profile failed, query {}, fragment {}", - print_id(this->_query_id), fragment_id); + for (const auto& report : profile_node_reports) { + DCHECK(report.__isset.profile) + << fmt::format("Add pipeline profile failed, query {}, fragment {}", + print_id(this->_query_id), fragment_id); } #endif std::lock_guard l(_profile_mutex); VLOG_ROW << fmt::format( "Query add fragment profile, query {}, fragment {}, pipeline profile count {} ", - print_id(this->_query_id), fragment_id, pipeline_profiles.size()); + print_id(this->_query_id), fragment_id, profile_node_reports.size()); - _profile_map.insert(std::make_pair(fragment_id, pipeline_profiles)); + _profile_map.insert(std::make_pair(fragment_id, std::move(profile_node_reports))); if (load_channel_profile != nullptr) { _load_channel_profile_map.insert(std::make_pair(fragment_id, load_channel_profile)); @@ -443,15 +444,15 @@ void QueryContext::_report_query_profile() { } ExecEnv::GetInstance()->runtime_query_statistics_mgr()->register_fragment_profile( - _query_id, this->coord_addr, fragment_id, fragment_profile, load_channel_profile); + _query_id, this->coord_addr, fragment_id, std::move(fragment_profile), + load_channel_profile); } ExecEnv::GetInstance()->runtime_query_statistics_mgr()->trigger_profile_reporting(); } -std::unordered_map>> -QueryContext::_collect_realtime_query_profile() { - std::unordered_map>> res; +void QueryContext::_collect_realtime_query_profile( + std::unordered_map>& profile_node_reports) { std::lock_guard lock(_pipeline_map_write_lock); for (const auto& [fragment_id, fragment_ctx_wptr] : _fragment_id_to_pipeline_ctx) { if (auto fragment_ctx = fragment_ctx_wptr.lock()) { @@ -475,17 +476,16 @@ QueryContext::_collect_realtime_query_profile() { continue; } - res.insert(std::make_pair(fragment_id, profile)); + profile_node_reports.insert(std::make_pair(fragment_id, std::move(profile))); } } - - return res; } TReportExecStatusParams QueryContext::get_realtime_exec_status() { TReportExecStatusParams exec_status; - auto realtime_query_profile = _collect_realtime_query_profile(); + std::unordered_map> realtime_query_profile; + _collect_realtime_query_profile(realtime_query_profile); std::vector> load_channel_profiles; for (auto load_channel_profile : _load_channel_profile_map) { diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 3327d48076f5ad..76c82043640470 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -17,6 +17,7 @@ #pragma once +#include #include #include #include @@ -376,15 +377,15 @@ class QueryContext : public std::enable_shared_from_this { // PipelineTask 3 // Operator 3 // fragment_id -> list - std::unordered_map>> _profile_map; + std::unordered_map> _profile_map; std::unordered_map> _load_channel_profile_map; std::shared_ptr> _ai_resources; void _report_query_profile(); - std::unordered_map>> - _collect_realtime_query_profile(); + void _collect_realtime_query_profile( + std::unordered_map>& profile_node_reports); std::mutex _error_url_lock; std::string _load_error_url; @@ -401,10 +402,8 @@ class QueryContext : public std::enable_shared_from_this { public: // when fragment of pipeline is closed, it will register its profile to this map by using add_fragment_profile - void add_fragment_profile( - int fragment_id, - const std::vector>& pipeline_profile, - std::shared_ptr load_channel_profile); + void add_fragment_profile(int fragment_id, std::vector profile_node_reports, + std::shared_ptr load_channel_profile); TReportExecStatusParams get_realtime_exec_status(); diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp b/be/src/runtime/runtime_query_statistics_mgr.cpp index 0ac505c7f4bc4d..957e24354271eb 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.cpp +++ b/be/src/runtime/runtime_query_statistics_mgr.cpp @@ -115,15 +115,13 @@ static Status _do_report_exec_stats_rpc(const TNetworkAddress& coor_addr, } static void _report_query_profiles_function( - std::unordered_map< - TUniqueId, - std::tuple< - TNetworkAddress, - std::unordered_map>>>> + std::unordered_map>>> profile_copy, std::unordered_map, std::shared_ptr> load_channel_profile_copy) { - // query_id -> {coordinator_addr, {fragment_id -> std::vectpr}} + // query_id -> {coordinator_addr, {fragment_id -> list}} for (auto& entry : profile_copy) { const auto& query_id = entry.first; const auto& coor_addr = std::get<0>(entry.second); @@ -168,46 +166,37 @@ static void _report_query_profiles_function( TReportExecStatusParams RuntimeQueryStatisticsMgr::create_report_exec_status_params( const TUniqueId& query_id, - std::unordered_map>> - fragment_id_to_profile, + std::unordered_map> + fragment_id_to_profile_node_reports, std::vector> load_channel_profiles, bool is_done) { - // This function will clear the data of fragment_id_to_profile and load_channel_profiles. + // This function will clear the data of fragment_id_to_profile_node_reports and + // load_channel_profiles. TQueryProfile profile; profile.__set_query_id(query_id); - std::map> fragment_id_to_profile_req; + std::map> fragment_id_to_profile_node_reports_req; - for (const auto& entry : fragment_id_to_profile) { + for (auto& entry : fragment_id_to_profile_node_reports) { int32_t fragment_id = entry.first; - const std::vector>& fragment_profile = entry.second; - std::vector detailed_params; - bool is_first = true; - for (auto pipeline_profile : fragment_profile) { - if (pipeline_profile == nullptr) { + std::vector& profile_node_reports = entry.second; + for (const auto& profile_node_report : profile_node_reports) { + if (!profile_node_report.__isset.profile || + !profile_node_report.__isset.profile_node_type) { auto msg = fmt::format("Register fragment profile {} {} failed, profile is null", print_id(query_id), fragment_id); DCHECK(false) << msg; LOG_ERROR(msg); continue; } - - TDetailedReportParams tmp; - THRIFT_MOVE_VALUES(tmp, profile, *pipeline_profile); - // First profile is fragment level - tmp.__set_is_fragment_level(is_first); - is_first = false; - // tmp.fragment_instance_id is not needed for pipeline x - detailed_params.push_back(std::move(tmp)); } - - fragment_id_to_profile_req[fragment_id] = std::move(detailed_params); + fragment_id_to_profile_node_reports_req[fragment_id] = std::move(profile_node_reports); } - if (fragment_id_to_profile_req.empty()) { + if (fragment_id_to_profile_node_reports_req.empty()) { LOG_WARNING("No fragment profile found for query {}", print_id(query_id)); } - profile.__set_fragment_id_to_profile(fragment_id_to_profile_req); + profile.__set_fragment_id_to_profile_node_reports(fragment_id_to_profile_node_reports_req); std::vector load_channel_profiles_req; for (auto load_channel_profile : load_channel_profiles) { @@ -230,8 +219,7 @@ TReportExecStatusParams RuntimeQueryStatisticsMgr::create_report_exec_status_par TReportExecStatusParams req; THRIFT_MOVE_VALUES(req, query_profile, profile); req.__set_backend_id(ExecEnv::GetInstance()->cluster_info()->backend_id); - // invalid query id to avoid API compatibility during upgrade - req.__set_query_id(TUniqueId()); + req.__set_query_id(query_id); req.__set_done(is_done); return req; @@ -265,10 +253,12 @@ void RuntimeQueryStatisticsMgr::trigger_profile_reporting() { _load_channel_profile_map.swap(load_channel_profile_copy); } - // ATTN: Local variables are copied to avoid memory reclamation issues. - auto st = _thread_pool->submit_func([profile_copy, load_channel_profile_copy]() { - _report_query_profiles_function(profile_copy, load_channel_profile_copy); - }); + auto st = _thread_pool->submit_func( + [profile_copy = std::move(profile_copy), + load_channel_profile_copy = std::move(load_channel_profile_copy)]() mutable { + _report_query_profiles_function(std::move(profile_copy), + std::move(load_channel_profile_copy)); + }); if (!st.ok()) { LOG_WARNING("Failed to submit profile reporting task, reason: {}", st.to_string()); @@ -290,10 +280,11 @@ void RuntimeQueryStatisticsMgr::stop_report_thread() { void RuntimeQueryStatisticsMgr::register_fragment_profile( const TUniqueId& query_id, const TNetworkAddress& coor_addr, int32_t fragment_id, - std::vector> p_profiles, + std::vector profile_node_reports, std::shared_ptr load_channel_profile) { - for (const auto& p : p_profiles) { - if (p == nullptr) { + for (const auto& profile_node_report : profile_node_reports) { + if (!profile_node_report.__isset.profile || + !profile_node_report.__isset.profile_node_type) { auto msg = fmt::format("Register fragment profile {} {} failed, profile is null", print_id(query_id), fragment_id); DCHECK(false) << msg; @@ -306,20 +297,20 @@ void RuntimeQueryStatisticsMgr::register_fragment_profile( if (!_profile_map.contains(query_id)) { _profile_map[query_id] = std::make_tuple( - coor_addr, - std::unordered_map>>()); + coor_addr, std::unordered_map>()); } - std::unordered_map>>& - fragment_profile_map = std::get<1>(_profile_map[query_id]); - fragment_profile_map.insert(std::make_pair(fragment_id, p_profiles)); + std::unordered_map>& fragment_profile_map = + std::get<1>(_profile_map[query_id]); + auto profile_node_report_size = profile_node_reports.size(); + fragment_profile_map.insert(std::make_pair(fragment_id, std::move(profile_node_reports))); if (load_channel_profile != nullptr) { _load_channel_profile_map[std::make_pair(query_id, fragment_id)] = load_channel_profile; } VLOG_CRITICAL << fmt::format("register x profile done {}, fragment {}, profiles {}", - print_id(query_id), fragment_id, p_profiles.size()); + print_id(query_id), fragment_id, profile_node_report_size); } void RuntimeQueryStatisticsMgr::register_resource_context( diff --git a/be/src/runtime/runtime_query_statistics_mgr.h b/be/src/runtime/runtime_query_statistics_mgr.h index 84546fb3d2b6ff..dfb878bc86513c 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.h +++ b/be/src/runtime/runtime_query_statistics_mgr.h @@ -18,6 +18,7 @@ #pragma once #include +#include #include #include @@ -42,8 +43,8 @@ class RuntimeQueryStatisticsMgr { static TReportExecStatusParams create_report_exec_status_params( const TUniqueId& q_id, - std::unordered_map>> - fragment_id_to_profile, + std::unordered_map> + fragment_id_to_profile_node_reports, std::vector> load_channel_profile, bool is_done); void register_resource_context(std::string query_id, @@ -65,7 +66,7 @@ class RuntimeQueryStatisticsMgr { void register_fragment_profile(const TUniqueId& query_id, const TNetworkAddress& const_addr, int32_t fragment_id, - std::vector> p_profiles, + std::vector profile_node_reports, std::shared_ptr load_channel_profile_x); // When query is finished, try to report query profiles to FE. // ATTN: Profile is reported to fe fragment by fragment. @@ -81,11 +82,10 @@ class RuntimeQueryStatisticsMgr { std::atomic_bool started = false; std::mutex _profile_map_lock; - // query_id -> {coordinator_addr, {fragment_id -> std::vector}} + // query_id -> {coordinator_addr, {fragment_id -> std::vector}} std::unordered_map< TUniqueId, - std::tuple>>>> + std::tuple>>> _profile_map; std::unordered_map, std::shared_ptr> diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java index 2f6b0992ef663d..999c96849a6f6f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java @@ -22,8 +22,9 @@ import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.SafeStringBuilder; import org.apache.doris.planner.PlanFragmentId; -import org.apache.doris.thrift.TDetailedReportParams; import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TProfileNodeReport; +import org.apache.doris.thrift.TProfileNodeType; import org.apache.doris.thrift.TQueryProfile; import org.apache.doris.thrift.TRuntimeProfileTree; import org.apache.doris.thrift.TStatusCode; @@ -224,40 +225,48 @@ public Status updateProfile(TQueryProfile profile, TNetworkAddress backendHBAddr return new Status(TStatusCode.INVALID_ARGUMENT, "QueryId is not set"); } - if (!profile.isSetFragmentIdToProfile()) { - LOG.warn("{} FragmentIdToProfile is not set", DebugUtil.printId(profile.getQueryId())); - return new Status(TStatusCode.INVALID_ARGUMENT, "FragmentIdToProfile is not set"); + if (!profile.isSetFragmentIdToProfileNodeReports()) { + LOG.warn("{} FragmentIdToProfileNodeReports is not set", DebugUtil.printId(profile.getQueryId())); + return new Status(TStatusCode.INVALID_ARGUMENT, "FragmentIdToProfileNodeReports is not set"); } - for (Entry> entry : profile.getFragmentIdToProfile().entrySet()) { + for (Entry> entry + : profile.getFragmentIdToProfileNodeReports().entrySet()) { int fragmentId = entry.getKey(); - List fragmentProfile = entry.getValue(); - int pipelineIdx = 0; + List fragmentProfile = entry.getValue(); List taskProfile = Lists.newArrayList(); String suffix = "(host=" + backendHBAddress + ")"; - for (TDetailedReportParams pipelineProfile : fragmentProfile) { + for (TProfileNodeReport profileNodeReport : fragmentProfile) { String name = ""; - boolean isFragmentLevel = (pipelineProfile.isSetIsFragmentLevel() && pipelineProfile.is_fragment_level); - if (isFragmentLevel) { - // Fragment Level profile is also represented by TDetailedReportParams. + if (!profileNodeReport.isSetProfileNodeType()) { + LOG.warn("Profile node type is not set, {}", DebugUtil.printId(profile.getQueryId())); + return new Status(TStatusCode.INVALID_ARGUMENT, "Profile node type is not set"); + } + TProfileNodeType profileNodeType = profileNodeReport.getProfileNodeType(); + if (profileNodeType == TProfileNodeType.FRAGMENT_LEVEL) { name = "FragmentLevelProfile:" + suffix; + } else if (profileNodeType == TProfileNodeType.PIPELINE_LEVEL) { + if (!profileNodeReport.isSetPipelineId()) { + LOG.warn("Pipeline id is not set, {}", DebugUtil.printId(profile.getQueryId())); + return new Status(TStatusCode.INVALID_ARGUMENT, "Pipeline id is not set"); + } + name = "Pipeline " + profileNodeReport.getPipelineId() + suffix; } else { - name = "Pipeline " + pipelineIdx + suffix; - pipelineIdx++; + LOG.warn("Unsupported profile node type {}, query {}", profileNodeType, + DebugUtil.printId(profile.getQueryId())); + return new Status(TStatusCode.INVALID_ARGUMENT, "Unsupported profile node type"); } RuntimeProfile profileNode = new RuntimeProfile(name); - // The taskProfile is used to save the profile of the pipeline, without - // considering the FragmentLevel. - if (!isFragmentLevel) { + if (profileNodeType == TProfileNodeType.PIPELINE_LEVEL) { taskProfile.add(profileNode); } - if (!pipelineProfile.isSetProfile()) { + if (!profileNodeReport.isSetProfile()) { LOG.warn("Profile is not set, {}", DebugUtil.printId(profile.getQueryId())); return new Status(TStatusCode.INVALID_ARGUMENT, "Profile is not set"); } - profileNode.update(pipelineProfile.profile); + profileNode.update(profileNodeReport.profile); profileNode.setIsDone(isDone); fragmentProfiles.get(fragmentId).addChild(profileNode, true); } @@ -265,7 +274,7 @@ public Status updateProfile(TQueryProfile profile, TNetworkAddress backendHBAddr } LOG.info("Profile update finished query: {} fragments: {} isDone: {}", - DebugUtil.printId(getQueryId()), profile.getFragmentIdToProfile().size(), isDone); + DebugUtil.printId(getQueryId()), profile.getFragmentIdToProfileNodeReports().size(), isDone); if (profile.isSetLoadChannelProfiles()) { for (TRuntimeProfileTree loadChannelProfile : profile.getLoadChannelProfiles()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java index ff023aeb9394de..3184c890ea726d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java @@ -247,9 +247,12 @@ public TReportExecStatusResult reportExecStatus(TReportExecStatusParams params, // with profile in a single rpc, this will make FE ignore the exec status and may lead to bug in query // like insert into select. if (params.isSetBackendId() && params.isSetDone()) { + int fragmentNum = params.getQueryProfile().isSetFragmentIdToProfileNodeReports() + ? params.getQueryProfile().getFragmentIdToProfileNodeReports().size() + : 0; LOG.info("Receive profile {} report from {}, isDone {}, fragments {}", DebugUtil.printId(params.getQueryProfile().getQueryId()), beAddr.toString(), - params.isDone(), params.getQueryProfile().fragment_id_to_profile.size()); + params.isDone(), fragmentNum); Backend backend = Env.getCurrentSystemInfo().getBackend(params.getBackendId()); if (backend == null) { @@ -277,6 +280,11 @@ public TReportExecStatusResult reportExecStatus(TReportExecStatusParams params, } } + if (params.isSetQueryProfile() && !params.isSetStatus()) { + result.setStatus(new TStatus(TStatusCode.OK)); + return result; + } + final QueryInfo info = coordinatorMap.get(params.query_id); result.setStatus(new TStatus(TStatusCode.OK)); if (info == null) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/profile/ExecutionProfileTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/profile/ExecutionProfileTest.java new file mode 100644 index 00000000000000..dac8d50c40c1d5 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/common/profile/ExecutionProfileTest.java @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.profile; + +import org.apache.doris.common.Status; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TProfileNodeReport; +import org.apache.doris.thrift.TProfileNodeType; +import org.apache.doris.thrift.TQueryProfile; +import org.apache.doris.thrift.TRuntimeProfileNode; +import org.apache.doris.thrift.TRuntimeProfileTree; +import org.apache.doris.thrift.TStatusCode; +import org.apache.doris.thrift.TUniqueId; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; +import java.util.Map; + +public class ExecutionProfileTest { + + @Test + public void testUpdateProfileBuildsDisplayTreeFromStructuredProfileNodeType() { + ExecutionProfile executionProfile = new ExecutionProfile(new TUniqueId(1, 2), Lists.newArrayList(0)); + TQueryProfile queryProfile = buildQueryProfile(Lists.newArrayList( + buildProfileNodeReport(TProfileNodeType.FRAGMENT_LEVEL, -1, "fragment-level", false), + buildProfileNodeReport(TProfileNodeType.PIPELINE_LEVEL, 7, "pipeline", false))); + + Status status = executionProfile.updateProfile(queryProfile, new TNetworkAddress("be1", 9050), false); + + Assert.assertEquals(TStatusCode.OK, status.getErrorCode()); + RuntimeProfile fragmentProfile = executionProfile.getRoot() + .getChildMap().get("Fragments") + .getChildMap().get("Fragment 0"); + Assert.assertTrue(fragmentProfile.getChildMap() + .containsKey("FragmentLevelProfile:(host=TNetworkAddress(hostname:be1, port:9050))")); + Assert.assertTrue(fragmentProfile.getChildMap() + .containsKey("Pipeline 7(host=TNetworkAddress(hostname:be1, port:9050))")); + } + + @Test + public void testUpdateProfileOnlyAddsPipelineReportsToAggregatedProfile() { + ExecutionProfile executionProfile = new ExecutionProfile(new TUniqueId(1, 2), Lists.newArrayList(0)); + TQueryProfile queryProfile = buildQueryProfile(Lists.newArrayList( + buildProfileNodeReport(TProfileNodeType.FRAGMENT_LEVEL, -1, "fragment-level", true), + buildProfileNodeReport(TProfileNodeType.PIPELINE_LEVEL, 7, "pipeline", true))); + + Status status = executionProfile.updateProfile(queryProfile, new TNetworkAddress("be1", 9050), false); + + Assert.assertEquals(TStatusCode.OK, status.getErrorCode()); + + RuntimeProfile aggregatedProfile = executionProfile.getPipelineAggregatedProfile(Maps.newHashMap()); + RuntimeProfile aggregatedFragmentProfile = aggregatedProfile.getChildMap().get("Fragment 0"); + Assert.assertEquals(1, aggregatedFragmentProfile.getChildList().size()); + Assert.assertTrue(aggregatedFragmentProfile.getChildList().get(0).first.getName() + .contains("Pipeline 0(instance_num=1)")); + } + + private TQueryProfile buildQueryProfile(List profileNodeReports) { + TQueryProfile queryProfile = new TQueryProfile(); + queryProfile.setQueryId(new TUniqueId(1, 2)); + Map> fragmentReports = Maps.newHashMap(); + fragmentReports.put(0, profileNodeReports); + queryProfile.setFragmentIdToProfileNodeReports(fragmentReports); + return queryProfile; + } + + private TProfileNodeReport buildProfileNodeReport(TProfileNodeType type, int pipelineId, String name, + boolean withTaskProfile) { + TProfileNodeReport report = new TProfileNodeReport(); + report.setProfile(buildRuntimeProfileTree(name, withTaskProfile)); + report.setProfileNodeType(type); + if (type == TProfileNodeType.PIPELINE_LEVEL) { + report.setPipelineId(pipelineId); + } + return report; + } + + private TRuntimeProfileTree buildRuntimeProfileTree(String name, boolean withTaskProfile) { + TRuntimeProfileTree profileTree = new TRuntimeProfileTree(); + TRuntimeProfileNode root = new TRuntimeProfileNode(); + root.setName(name); + root.setNumChildren(withTaskProfile ? 1 : 0); + root.setCounters(Lists.newArrayList()); + root.setMetadata(0); + root.setIndent(true); + root.setInfoStrings(Maps.newHashMap()); + root.setInfoStringsDisplayOrder(Lists.newArrayList()); + root.setChildCountersMap(Maps.newHashMap()); + root.setTimestamp(-1); + profileTree.addToNodes(root); + if (withTaskProfile) { + profileTree.addToNodes(buildRuntimeProfileNode("task")); + } + return profileTree; + } + + private TRuntimeProfileNode buildRuntimeProfileNode(String name) { + TRuntimeProfileNode node = new TRuntimeProfileNode(); + node.setName(name); + node.setNumChildren(0); + node.setCounters(Lists.newArrayList()); + node.setMetadata(0); + node.setIndent(true); + node.setInfoStrings(Maps.newHashMap()); + node.setInfoStringsDisplayOrder(Lists.newArrayList()); + node.setChildCountersMap(Maps.newHashMap()); + node.setTimestamp(-1); + return node; + } +} diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 07eb9333637721..221e77c2879a68 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -185,6 +185,17 @@ struct TDetailedReportParams { 4: optional bool is_fragment_level } +enum TProfileNodeType { + FRAGMENT_LEVEL = 1, + PIPELINE_LEVEL = 2 +} + +struct TProfileNodeReport { + 1: optional RuntimeProfile.TRuntimeProfileTree profile + 2: optional TProfileNodeType profile_node_type + 3: optional i32 pipeline_id +} + struct TQueryStatistics { // A thrift structure identical to the PQueryStatistics structure. @@ -230,6 +241,8 @@ struct TQueryProfile { 4: optional list instance_profiles 5: optional list load_channel_profiles + + 6: optional map> fragment_id_to_profile_node_reports } struct TFragmentInstanceReport { From aa3a9619a518a52b3e6fd891d417be6539905d3e Mon Sep 17 00:00:00 2001 From: yangtao555 Date: Sat, 16 May 2026 17:39:04 +0800 Subject: [PATCH 2/2] Keep profile-only reports compatible with old FE --- be/src/runtime/runtime_query_statistics_mgr.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp b/be/src/runtime/runtime_query_statistics_mgr.cpp index 957e24354271eb..75fc5c7f09d566 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.cpp +++ b/be/src/runtime/runtime_query_statistics_mgr.cpp @@ -219,7 +219,10 @@ TReportExecStatusParams RuntimeQueryStatisticsMgr::create_report_exec_status_par TReportExecStatusParams req; THRIFT_MOVE_VALUES(req, query_profile, profile); req.__set_backend_id(ExecEnv::GetInstance()->cluster_info()->backend_id); - req.__set_query_id(query_id); + // This RPC is profile-only: the real query id is carried in query_profile.query_id. + // Keep the top-level query id invalid so old FEs do not route this status-less + // report into updateFragmentExecStatus(), which expects params.status. + req.__set_query_id(TUniqueId()); req.__set_done(is_done); return req;